Source code for openff.bespokefit.executor.services.coordinator.app

import asyncio
import logging
import os
import signal
import urllib.parse
from typing import Optional

from fastapi import APIRouter, HTTPException
from fastapi.responses import Response
from openff.toolkit.topology import Molecule

from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator import worker
from openff.bespokefit.executor.services.coordinator.models import (
    CoordinatorGETPageResponse,
    CoordinatorGETResponse,
    CoordinatorPOSTBody,
    CoordinatorPOSTResponse,
)
from openff.bespokefit.executor.services.coordinator.storage import (
    TaskStatus,
    create_task,
    get_n_tasks,
    get_task,
    get_task_ids,
)
from openff.bespokefit.executor.services.models import Link
from openff.bespokefit.executor.utilities.depiction import smiles_to_image

router = APIRouter()

_logger = logging.getLogger(__name__)
_worker_task: Optional[asyncio.Future] = None

__settings = current_settings()

__GET_TASK_IMAGE_ENDPOINT = (
    "/" + __settings.BEFLOW_COORDINATOR_PREFIX + "/{optimization_id}/image"
)


[docs]@router.get("/" + __settings.BEFLOW_COORDINATOR_PREFIX) def get_optimizations( skip: int = 0, limit: int = 1000, status: Optional[TaskStatus] = None ) -> CoordinatorGETPageResponse: """Retrieves all bespoke optimizations that have been submitted to this server.""" task_ids = get_task_ids(skip, limit, status=(None if status is None else {status})) n_total_tasks = get_n_tasks(status) contents = [ Link( self=( f"{__settings.BEFLOW_API_V1_STR}/" f"{__settings.BEFLOW_COORDINATOR_PREFIX}/" f"{task_id}" ), id=str(task_id), ) for task_id in task_ids ] prev_index = max(0, skip - limit) next_index = min(n_total_tasks, skip + limit) status_url = "" if status is None else f"&status={status.value}" return CoordinatorGETPageResponse( self=( f"{__settings.BEFLOW_API_V1_STR}/" f"{__settings.BEFLOW_COORDINATOR_PREFIX}" f"?skip={skip}" f"&limit={limit}" f"{status_url}" ), prev=None if prev_index >= skip else ( f"{__settings.BEFLOW_API_V1_STR}/" f"{__settings.BEFLOW_COORDINATOR_PREFIX}" f"?skip={prev_index}" f"&limit={limit}" f"{status_url}" ), next=None if (next_index <= skip or next_index == n_total_tasks) else ( f"{__settings.BEFLOW_API_V1_STR}/" f"{__settings.BEFLOW_COORDINATOR_PREFIX}" f"?skip={next_index}" f"&limit={limit}" f"{status_url}" ), contents=contents, )
[docs]@router.get("/" + __settings.BEFLOW_COORDINATOR_PREFIX + "/{optimization_id}") def get_optimization(optimization_id: int) -> CoordinatorGETResponse: """Retrieves a bespoke optimization that has been submitted to this server using its unique id.""" try: response = CoordinatorGETResponse.from_task(get_task(optimization_id)) except IndexError: raise HTTPException(status_code=404, detail=f"{optimization_id} not found") response.links = { "image": ( __settings.BEFLOW_API_V1_STR + __GET_TASK_IMAGE_ENDPOINT.format(optimization_id=optimization_id) ) } return response
[docs]@router.post("/" + __settings.BEFLOW_COORDINATOR_PREFIX) def post_optimization(body: CoordinatorPOSTBody) -> CoordinatorPOSTResponse: """Submit a bespoke optimization to be performed by the server.""" try: # Make sure the input SMILES does not have any atoms mapped as these may # cause issues for certain stages such as fragmentation. molecule = Molecule.from_smiles(body.input_schema.smiles) molecule.properties.pop("atom_map", None) body.input_schema.smiles = molecule.to_smiles(mapped=True) except BaseException as e: raise HTTPException( status_code=400, detail="molecule could not be understood" ) from e task_id = create_task(body.input_schema) return CoordinatorPOSTResponse( id=str(task_id), self=( f"{__settings.BEFLOW_API_V1_STR}/" f"{__settings.BEFLOW_COORDINATOR_PREFIX}/{task_id}" ), )
[docs]@router.get(__GET_TASK_IMAGE_ENDPOINT) async def get_molecule_image(optimization_id: int): """Render the molecule associated with a particular bespoke optimization to an SVG file.""" try: task = get_task(optimization_id) except IndexError: raise HTTPException(status_code=404, detail=f"{optimization_id} not found") svg_content = smiles_to_image(urllib.parse.unquote(task.input_schema.smiles)) svg_response = Response(svg_content, media_type="image/svg+xml") return svg_response
[docs]@router.on_event("startup") def startup(): main_loop = asyncio.get_event_loop() global _worker_task _worker_task = asyncio.ensure_future(worker.cycle(), loop=main_loop) def _handle_task_result(task: asyncio.Task) -> None: # noinspection PyBroadException try: task.result() except asyncio.CancelledError: pass except BaseException: _logger.exception( "Exception raised by the main loop. This should never happen." ) os.kill(os.getpid(), signal.SIGINT) _worker_task.add_done_callback(_handle_task_result)
[docs]@router.on_event("shutdown") def shutdown(): if _worker_task is not None: _worker_task.cancel()