Source code for openff.bespokefit.executor.utilities.celery
import json
import multiprocessing
from typing import Any, Dict, List, Optional
from celery import Celery
from celery.result import AsyncResult
from redis import Redis
from typing_extensions import TypedDict
from openff.bespokefit.executor.services.models import Error
from openff.bespokefit.executor.utilities.typing import Status
from openff.bespokefit.utilities import current_settings
[docs]class TaskInformation(TypedDict):
id: str
status: Status
result: Optional[Dict[str, Any]]
error: Optional[Dict[str, Any]]
[docs]def get_status(task_result: AsyncResult) -> Status:
return {
"PENDING": "waiting",
"STARTED": "running",
"RETRY": "running",
"FAILURE": "errored",
"SUCCESS": "success",
}[task_result.status]
[docs]def configure_celery_app(
app_name: str, redis_connection: Redis, include: List[str] = None
):
settings = current_settings()
redis_host_name = redis_connection.connection_pool.connection_kwargs["host"]
redis_port = redis_connection.connection_pool.connection_kwargs["port"]
redis_db = redis_connection.connection_pool.connection_kwargs["db"]
password = settings.BEFLOW_REDIS_PASSWORD
celery_app = Celery(
app_name,
backend=f"redis://:{password}@{redis_host_name}:{redis_port}/{redis_db}",
broker=f"redis://:{password}@{redis_host_name}:{redis_port}/{redis_db}",
include=include,
)
celery_app.conf.task_track_started = True
celery_app.conf.task_default_queue = app_name
celery_app.conf.broker_transport_options = {"visibility_timeout": 1000000}
celery_app.conf.result_expires = None
return celery_app
def _spawn_worker(celery_app, concurrency: int = 1, **kwargs):
worker = celery_app.Worker(
concurrency=concurrency,
loglevel="INFO",
logfile=f"celery-{celery_app.main}.log",
quiet=True,
hostname=celery_app.main,
**kwargs,
)
worker.start()
[docs]def spawn_worker(
celery_app, concurrency: int = 1, asynchronous: bool = True, **kwargs
) -> Optional[multiprocessing.Process]:
if concurrency < 1:
return
if asynchronous: # pragma: no cover
worker_process = multiprocessing.Process(
target=_spawn_worker, args=(celery_app, concurrency), daemon=True
)
worker_process.start()
return worker_process
else:
_spawn_worker(celery_app, concurrency, **kwargs)
[docs]def get_task_information(app: Celery, task_id: str) -> TaskInformation:
task_result = AsyncResult(task_id, app=app)
task_output = (
None
if not isinstance(task_result.result, str)
else json.loads(task_result.result)
)
task_raw_error = (
None
if not isinstance(task_result.result, BaseException)
else task_result.result
)
task_error = (
None
if task_raw_error is None
else Error(
type=task_raw_error.__class__.__name__,
message=str(task_raw_error),
traceback=task_result.traceback,
)
)
task_status = get_status(task_result)
return TaskInformation(
id=task_id,
status=task_status,
result=task_output if task_status != "errored" else None,
error=None if not task_error else task_error.dict(),
)