Source code for openff.bespokefit.executor.services.coordinator.worker

import asyncio
import logging
import time

import redis

from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator.storage import (
    TaskStatus,
    get_n_tasks,
    get_task,
    peek_task_status,
    pop_task_status,
    push_task_status,
    save_task,
)

_logger = logging.getLogger(__name__)


async def _process_task(task_id: int) -> bool:
    task = get_task(task_id)
    task_status = task.status

    if task.status == "success" or task.status == "errored":
        return True

    if task.running_stage is None:
        task.running_stage = task.pending_stages.pop(0)
        await task.running_stage.enter(task)

    stage_status = task.running_stage.status
    await task.running_stage.update()

    task_state_message = f"[task id={task_id}] transitioned from {{0}} -> {{1}}"

    if task.status != task_status and task_status == "waiting":
        print(task_state_message.format(task_status, task.status), flush=True)

    if stage_status != task.running_stage.status:
        print(
            f"[task id={task_id}] {task.running_stage.type} transitioned from "
            f"{stage_status} -> {task.running_stage.status}",
            flush=True,
        )

    if task.running_stage.status in {"success", "errored"}:
        task.completed_stages.append(task.running_stage)
        task.running_stage = None

    if task.status != task_status and task_status != "waiting":
        print(task_state_message.format(task_status, task.status), flush=True)

    save_task(task)
    return False


[docs]async def cycle(): # pragma: no cover settings = current_settings() n_connection_errors = 0 while True: sleep_time = settings.BEFLOW_COORDINATOR_MAX_UPDATE_INTERVAL try: start_time = time.perf_counter() # First update any running tasks, pushing them to the 'complete' queue if # they have finished, so as to figure out how many new tasks can be moved # from running to waiting. task_id = peek_task_status(TaskStatus.running) processed_task_ids = set() while task_id is not None: if task_id in processed_task_ids: break has_finished = await _process_task(task_id) # Needed to let other async threads run even if there are hundreds of # tasks running await asyncio.sleep(0.0) push_task_status( pop_task_status(TaskStatus.running), TaskStatus.running if not has_finished else TaskStatus.complete, ) processed_task_ids.add(task_id) task_id = peek_task_status(TaskStatus.running) n_running_tasks = get_n_tasks(TaskStatus.running) n_tasks_to_queue = min( settings.BEFLOW_COORDINATOR_MAX_RUNNING_TASKS - n_running_tasks, get_n_tasks(TaskStatus.waiting), ) for _ in range(n_tasks_to_queue): push_task_status( pop_task_status(TaskStatus.waiting), TaskStatus.running ) n_connection_errors = 0 # Make sure we don't cycle too often sleep_time = max(sleep_time - (time.perf_counter() - start_time), 0.0) except (KeyboardInterrupt, asyncio.CancelledError): break except ( ConnectionError, redis.exceptions.ConnectionError, redis.exceptions.BusyLoadingError, ) as e: n_connection_errors += 1 if n_connection_errors >= 3: raise e if isinstance(e, redis.exceptions.RedisError): _logger.warning( f"Failed to connect to Redis - {3 - n_connection_errors} attempts " f"remaining." ) await asyncio.sleep(sleep_time)