Source code for openff.bespokefit.executor.services.fragmenter.app
import json
from fastapi import APIRouter
from fastapi.responses import Response
from openff.fragmenter.fragment import FragmentationResult
from openff.utilities import MissingOptionalDependencyError
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.fragmenter import worker
from openff.bespokefit.executor.services.fragmenter.cache import (
cached_fragmentation_task,
)
from openff.bespokefit.executor.services.fragmenter.models import (
FragmenterGETResponse,
FragmenterPOSTBody,
FragmenterPOSTResponse,
)
from openff.bespokefit.executor.utilities.celery import get_task_information
from openff.bespokefit.executor.utilities.depiction import IMAGE_UNAVAILABLE_SVG
from openff.bespokefit.executor.utilities.redis import connect_to_default_redis
router = APIRouter()
__settings = current_settings()
__GET_ENDPOINT = "/" + __settings.BEFLOW_FRAGMENTER_PREFIX + "/{fragmentation_id}"
__GET_FRAGMENT_IMAGE_ENDPOINT = (
"/"
+ __settings.BEFLOW_FRAGMENTER_PREFIX
+ "/{fragmentation_id}/fragment/{fragment_id}/image"
)
[docs]@router.get(__GET_ENDPOINT)
def get_fragment(fragmentation_id: str) -> FragmenterGETResponse:
task_info = get_task_information(worker.celery_app, fragmentation_id)
task_result = task_info["result"]
return FragmenterGETResponse(
id=fragmentation_id,
self=__settings.BEFLOW_API_V1_STR
+ __GET_ENDPOINT.format(fragmentation_id=fragmentation_id),
status=task_info["status"],
result=task_result,
error=json.dumps(task_info["error"]),
_links={
f"fragment-{i}-image": (
__settings.BEFLOW_API_V1_STR
+ __GET_FRAGMENT_IMAGE_ENDPOINT.format(
fragmentation_id=fragmentation_id, fragment_id=i
)
)
for i, fragment in enumerate(
[] if task_result is None else task_result["fragments"]
)
},
)
[docs]@router.post("/" + __settings.BEFLOW_FRAGMENTER_PREFIX)
def post_fragment(body: FragmenterPOSTBody) -> FragmenterPOSTResponse:
# We use celery delay method in order to enqueue the task with the given
# parameters
task_id = cached_fragmentation_task(
task=body, redis_connection=connect_to_default_redis()
)
return FragmenterPOSTResponse(
id=task_id,
self=__settings.BEFLOW_API_V1_STR
+ __GET_ENDPOINT.format(fragmentation_id=task_id),
)
[docs]@router.get(__GET_FRAGMENT_IMAGE_ENDPOINT)
def get_fragment_image(fragmentation_id: str, fragment_id: int) -> Response:
task_info = get_task_information(worker.celery_app, fragmentation_id)
if task_info["status"] != "success":
return Response(IMAGE_UNAVAILABLE_SVG, media_type="image/svg+xml")
result = FragmentationResult.parse_obj(task_info["result"])
if fragment_id < 0 or fragment_id >= len(result.fragments):
return Response(IMAGE_UNAVAILABLE_SVG, media_type="image/svg+xml")
fragment = result.fragments[fragment_id]
try:
from openff.fragmenter.depiction import _oe_render_fragment
svg_content = _oe_render_fragment(
result.parent_molecule,
fragment.molecule,
fragment.bond_indices,
image_width=200,
image_height=200,
)
except (ModuleNotFoundError, MissingOptionalDependencyError):
from openff.fragmenter.depiction import _rd_render_fragment
svg_content = _rd_render_fragment(
result.parent_molecule,
fragment.molecule,
fragment.bond_indices,
image_width=200,
image_height=200,
)
return Response(svg_content, media_type="image/svg+xml")