Source code for openff.bespokefit.executor.services.coordinator.models

from typing import Dict, List, Optional

from openff.bespokefit._pydantic import BaseModel, Field
from openff.bespokefit.executor.services import current_settings
from openff.bespokefit.executor.services.coordinator.stages import StageType
from openff.bespokefit.executor.services.models import Link, PaginatedCollection
from openff.bespokefit.executor.utilities.typing import Status
from openff.bespokefit.schema.fitting import BespokeOptimizationSchema
from openff.bespokefit.schema.results import BespokeOptimizationResults


[docs]class CoordinatorGETPageResponse(PaginatedCollection[Link]): """"""
[docs]class CoordinatorGETStageStatus(BaseModel): type: str = Field(..., description="The type of stage.") status: Status = Field(..., description="The status of the stage.") error: Optional[str] = Field( ..., description="The error, if any, raised by the stage." ) results: Optional[List[Link]] = Field( ..., description="Links to the results generated by this stage." )
[docs] @classmethod def from_stage(cls, stage: StageType): stage_ids = stage.id if hasattr(stage, "id") else stage.ids if isinstance(stage_ids, str): stage_ids = [stage_ids] elif isinstance(stage_ids, dict): stage_ids = sorted( { stage_id for dict_values in stage_ids.values() for stage_id in dict_values } ) elif stage_ids is None: pass else: raise NotImplementedError() settings = current_settings() base_endpoint = f"{settings.BEFLOW_API_V1_STR}/" endpoints = { "fragmentation": f"{base_endpoint}{settings.BEFLOW_FRAGMENTER_PREFIX}/", "qc-generation": f"{base_endpoint}{settings.BEFLOW_QC_COMPUTE_PREFIX}/", "optimization": f"{base_endpoint}{settings.BEFLOW_OPTIMIZER_PREFIX}/", } return CoordinatorGETStageStatus( type=stage.type, status=stage.status, error=stage.error, results=None if stage_ids is None else [ Link(id=stage_id, self=f"{endpoints[stage.type]}{stage_id}") for stage_id in stage_ids ], )
[docs]class CoordinatorGETResponse(Link): smiles: str = Field( ..., description="The SMILES representation of the molecule that the bespoke " "parameters are being generated for.", ) stages: List[CoordinatorGETStageStatus] = Field( ..., description="The stages of the bespoke optimization." ) results: Optional[BespokeOptimizationResults] = Field( None, description="The output of the bespoke optimization." ) links: Dict[str, str] = Field( {}, description="Links to resources associated with the model.", alias="_links" )
[docs] @classmethod def from_task(cls, task: "CoordinatorTask"): settings = current_settings() stages = [ *task.pending_stages, *([] if task.running_stage is None else [task.running_stage]), *task.completed_stages, ] stages_by_type = {stage.type: stage for stage in stages} stage_responses = [ CoordinatorGETStageStatus.from_stage(stage) for stage in stages ] return CoordinatorGETResponse( id=task.id, self=( f"{settings.BEFLOW_API_V1_STR}/" f"{settings.BEFLOW_COORDINATOR_PREFIX}/{task.id}" ), smiles=task.input_schema.smiles, stages=stage_responses, results=( None if "optimization" not in stages_by_type else stages_by_type["optimization"].result ), )
[docs]class CoordinatorPOSTBody(BaseModel): input_schema: BespokeOptimizationSchema = Field(..., description="")
[docs]class CoordinatorPOSTResponse(Link): """"""
[docs]class CoordinatorTask(BaseModel): """An internal model that tracks a task (i.e. a bespoke optimization) that is being executed by the executor. """ id: str = Field(..., description="The unique ID associated with this task.") input_schema: BespokeOptimizationSchema = Field(..., description="") pending_stages: List[StageType] = Field(..., description="") running_stage: Optional[StageType] = Field(None, description="") completed_stages: List[StageType] = Field([], description="") @property def status(self) -> Status: if ( self.running_stage is None and len(self.completed_stages) == 0 and len(self.pending_stages) > 0 ): return "waiting" if any(stage.status == "errored" for stage in self.completed_stages): return "errored" if self.running_stage is not None or len(self.pending_stages) > 0: return "running" if all(stage.status == "success" for stage in self.completed_stages): return "success" raise NotImplementedError()