Source code for openff.bespokefit.workflows.bespoke

"""
This is the main bespokefit workflow factory which is executed and builds the bespoke
workflows.
"""
import hashlib
import logging
import os
from collections import defaultdict
from typing import Dict, List, Optional, Tuple, Union

from openff.fragmenter.fragment import WBOFragmenter
from openff.qcsubmit.common_structures import QCSpec
from openff.qcsubmit.results import (
    BasicResultCollection,
    OptimizationResultCollection,
    TorsionDriveResultCollection,
)
from openff.qcsubmit.serializers import deserialize, serialize
from openff.qcsubmit.workflow_components import ComponentResult
from openff.toolkit.topology import Molecule
from openff.toolkit.typing.engines.smirnoff import ForceField
from qcelemental.models.common_models import Model
from qcportal.optimization import OptimizationRecord
from qcportal.torsiondrive import TorsiondriveRecord

from openff.bespokefit._pydantic import ClassBase, Field, validator
from openff.bespokefit.exceptions import (
    MissingTorsionTargetSMARTS,
    OptimizerError,
    TargetNotSetError,
)
from openff.bespokefit.fragmentation import FragmentationEngine
from openff.bespokefit.optimizers import get_optimizer, list_optimizers
from openff.bespokefit.schema.data import LocalQCData
from openff.bespokefit.schema.fitting import (
    BespokeOptimizationSchema,
    OptimizationStageSchema,
)
from openff.bespokefit.schema.optimizers import ForceBalanceSchema, OptimizerSchema
from openff.bespokefit.schema.smirnoff import (
    ProperTorsionHyperparameters,
    SMIRNOFFHyperparameters,
    validate_smirks,
)
from openff.bespokefit.schema.targets import (
    BespokeQCData,
    TargetSchema,
    TorsionProfileTargetSchema,
)
from openff.bespokefit.schema.tasks import (
    HessianTaskSpec,
    OptimizationTaskSpec,
    Torsion1DTaskSpec,
)
from openff.bespokefit.utilities import parallel
from openff.bespokefit.utilities.smirks import SMIRKSettings, SMIRKSType
from openff.bespokefit.utilities.smirnoff import ForceFieldEditor

QCResultRecord = Union[OptimizationRecord, TorsiondriveRecord]
QCResultCollection = Union[
    TorsionDriveResultCollection,
    OptimizationResultCollection,
    BasicResultCollection,
]

_logger = logging.getLogger(__name__)

_DEFAULT_ROTATABLE_SMIRKS = "[!#1]~[!$(*#*)&!D1:1]-,=;!@[!$(*#*)&!D1:2]~[!#1]"


[docs]class BespokeWorkflowFactory(ClassBase): """The bespokefit workflow factory which is a template of the settings that will be used to generate the specific fitting schema for each molecule. """ initial_force_field: str = Field( "openff_unconstrained-2.0.0.offxml", description="The name of the unconstrained force field to use as a starting " "point for optimization. The force field must be installed with conda/mamba.", ) optimizer: Union[str, OptimizerSchema] = Field( ForceBalanceSchema(penalty_type="L1"), description="The optimizer that should be used with the targets already set.", ) target_templates: List[TargetSchema] = Field( [TorsionProfileTargetSchema()], description="Templates for the fitting targets to use as part of the " "optimization. The ``reference_data`` attribute of each schema will be " "automatically populated by this factory.", ) parameter_hyperparameters: List[SMIRNOFFHyperparameters] = Field( [ ProperTorsionHyperparameters(), ], description="The settings which describe how types of parameters, e.g. the " "force constant of a bond parameter, should be restrained during the " "optimisation such as through the inclusion of harmonic priors.", ) target_torsion_smirks: Optional[List[str]] = Field( [_DEFAULT_ROTATABLE_SMIRKS], description="A list of SMARTS patterns that should be used to identify the " "**bonds** within the target molecule to generate bespoke torsions around. Each " "SMARTS pattern should include **two** indexed atoms that correspond to the " "two atoms involved in the central bond." "\n" "By default bespoke torsion parameters (if requested) will be constructed for " "all non-terminal 'rotatable bonds'", ) smirk_settings: SMIRKSettings = Field( SMIRKSettings(), description="The settings that should be used when generating SMIRKS patterns for this optimization stage.", ) fragmentation_engine: Optional[FragmentationEngine] = Field( WBOFragmenter(), description="The Fragment engine that should be used to fragment the molecule, " "note that if None is provided the molecules will not be fragmented. By default " "we use the WBO fragmenter by the Open Force Field Consortium.", ) default_qc_specs: List[QCSpec] = Field( default_factory=lambda: [QCSpec()], description="The default specification (e.g. method, basis) to use when " "performing any new QC calculations. If multiple specs are provided, each spec " "will be considered in order until one is found that i) is available based on " "the installed dependencies, and ii) is compatible with the molecule of " "interest.", ) @validator("initial_force_field") def _check_force_field(cls, force_field: str) -> str: """Check that the force field is available via the toolkit.""" assert ForceField(force_field) is not None return force_field @validator("optimizer") def _check_optimizer(cls, optimizer: Union[str, OptimizerSchema]): """ Set the optimizer settings to be used. Parameters ---------- optimizer: Union[str, BaseOptimizer] The optimizer that should be added to the workflow, targets should also be added before creating the fitting schema. """ if isinstance(optimizer, str): # we can check for the optimizer and attach it return get_optimizer(optimizer.lower())() if optimizer.type.lower() not in list_optimizers(): raise OptimizerError( f"The requested optimizer {optimizer.type} was not registered " f"with bespokefit." ) return optimizer @validator("target_torsion_smirks") def _check_target_torsion_smirks( cls, values: Optional[List[str]] ) -> Optional[List[str]]: if values: return [validate_smirks(value, 2) for value in values] return values def _pre_run_check(self) -> None: """ Check that all required settings are declared before running. """ # now check we have targets in each optimizer if len(self.target_templates) == 0: raise OptimizerError( "There are no optimization targets in the optimization workflow." ) elif self.target_torsion_smirks is None: if ( self.fragmentation_engine or SMIRKSType.ProperTorsions in self.target_smirks ): # We need the target torsion smirks in 2 cases # 1 We wish to fragment to molecule # 2 We do not want to fragment but still want to fit torsions raise MissingTorsionTargetSMARTS( "The `target_torsion_smirks` have not been set and are required for this workflow." ) elif len(self.parameter_hyperparameters) == 0: raise TargetNotSetError( "There are no parameter settings specified which will mean that the " "optimiser has no parameters to optimize." ) else: return @property def target_smirks(self) -> List[SMIRKSType]: """Returns a list of the target smirks types based on the selected hyper parameters.""" return list( {SMIRKSType(parameter.type) for parameter in self.parameter_hyperparameters} )
[docs] def to_file(self, file_name: str) -> None: """ Export the factory to yaml or json file. Parameters ---------- file_name: str The name of the file the workflow should be exported to, the type is determined from the name. """ serialize(serializable=self.dict(), file_name=file_name)
[docs] @classmethod def from_file(cls, file_name: str): """ Build the factory from a model serialised to file. """ return cls.parse_obj(deserialize(file_name=file_name))
@classmethod def _deduplicated_list( cls, molecules: Union[Molecule, List[Molecule], str] ) -> ComponentResult: """ Create a deduplicated list of molecules based on the input type. """ input_file, molecule, input_directory = None, None, None if isinstance(molecules, str): # this is an input file or folder if os.path.isfile(molecules): input_file = molecules else: input_directory = molecules elif isinstance(molecules, Molecule): molecule = [molecules] else: molecule = molecules return ComponentResult( component_name="default", component_provenance={}, component_description={}, molecules=molecule, input_file=input_file, input_directory=input_directory, )
[docs] def optimization_schemas_from_molecules( self, molecules: Union[Molecule, List[Molecule]], processors: Optional[int] = 1, ) -> List[BespokeOptimizationSchema]: """This is the main function of the workflow which takes the general fitting meta-template and generates a specific one for the set of molecules that are passed. Parameters ---------- molecules: The molecule or list of molecules which should be processed by the schema to generate the fitting schema. processors: The number of processors that should be used when building the workflow, this helps with fragmentation which can be quite slow for large numbers of molecules. """ # TODO: Expand to accept the QCSubmit results datasets directly to create the # fitting schema and fill the tasks. # TODO: How do we support dihedral tagging? # create a deduplicated list of molecules first. deduplicated_molecules = self._deduplicated_list(molecules=molecules).molecules optimization_schemas = [ schema for schema in parallel.apply_async( self.optimization_schema_from_molecule, [(molecule, i) for i, molecule in enumerate(deduplicated_molecules)], n_processes=processors, verbose=True, description="Building Fitting Schema", ) if schema is not None ] return optimization_schemas
[docs] def optimization_schema_from_molecule( self, molecule: Molecule, index: int = 0 ) -> Optional[BespokeOptimizationSchema]: """Build an optimization schema from an input molecule this involves fragmentation. """ # make sure all required variables have been declared self._pre_run_check() return self._build_optimization_schema(molecule=molecule, index=index)
[docs] def optimization_schemas_from_results( self, results: QCResultCollection, combine: bool = False, processors: Optional[int] = 1, ) -> List[BespokeOptimizationSchema]: """ Create a set of optimization schemas (one per molecule) from some results. Here input molecules are turned into tasks and results are updated during the process. If multiple targets are in the workflow the results will be applied to the correct target other targets can be updated after by calling update with parameters. """ # group the tasks if requested sorted_records = self._group_records(results.to_records(), combine) optimization_schemas = [ schema for schema in parallel.apply_async( self._optimization_schema_from_records, [(records, i) for i, records in enumerate(sorted_records)], n_processes=processors, verbose=True, description="Building Fitting Schema", ) if schema is not None ] return optimization_schemas
@classmethod def _group_records( cls, records: List[Tuple[QCResultRecord, Molecule]], combine ) -> List[List[Tuple[QCResultRecord, Molecule]]]: """Group the result records into a list that can be processed into a fitting schema, combining results collected for the same molecule when requested. """ if not combine: return [[record] for record in records] combined_records = [] # loop over the results and combine multiple results for the same molecule # this only effects multiple torsion drives per_molecule_records = defaultdict(list) for record, molecule in records: inchi_key = molecule.to_inchikey(fixed_hydrogens=True) per_molecule_records[inchi_key].append((record, molecule)) for unique_records in per_molecule_records.values(): combined_records.append(unique_records) return combined_records def _optimization_schema_from_records( self, records: List[Tuple[QCResultRecord, Molecule]], index: int, ) -> BespokeOptimizationSchema: """ Create an optimization task for a given list of results. Notes: * This method assumes a result records were generated for the same molecule. * The list allows multiple results to be combined from the same molecule which is mostly useful for torsion drives. """ assert ( len({molecule.to_inchikey(fixed_hydrogens=True) for _, molecule in records}) == 1 ), "all records must be for the same molecule" records_by_type = defaultdict(list) for record_tuple in records: records_by_type[record_tuple[0].__class__].append(record_tuple) local_qc_data = {} for record_type, records_of_type in records_by_type.items(): record_type_label = {TorsiondriveRecord: "torsion1d"}[record_type] local_qc_data[record_type_label] = LocalQCData.from_remote_records( records_of_type ) opt_schema = self._build_optimization_schema( molecule=records[0][1], index=index, # bespoke_parameters=bespoke_parameters, local_qc_data=local_qc_data, ) return opt_schema def _select_qc_spec(self, molecule: Molecule) -> QCSpec: """Attempts to select a QC spec for a given molecule from the defaults list.""" if len(self.default_qc_specs) != 1: raise NotImplementedError( "Currently only a single default QC spec must be specified." ) return self.default_qc_specs[0] def _build_optimization_schema( self, molecule: Molecule, index: int, local_qc_data: Optional[Dict[str, LocalQCData]] = None, ) -> BespokeOptimizationSchema: """For a given molecule schema build an optimization schema.""" force_field_editor = ForceFieldEditor(self.initial_force_field) ff_hash = hashlib.sha512( force_field_editor.force_field.to_string( discard_cosmetic_attributes=True ).encode() ).hexdigest() # Populate the targets targets = [] task_type_to_spec = { "torsion1d": Torsion1DTaskSpec, "optimization": OptimizationTaskSpec, "hessian": HessianTaskSpec, } default_qc_spec = self._select_qc_spec(molecule) local_qc_data = {} if local_qc_data is None else local_qc_data for target_template in self.target_templates: target_schema = target_template.copy(deep=True) targets.append(target_schema) # set the calculation specification for provenance and caching task_type = target_schema.bespoke_task_type() target_specification = task_type_to_spec[task_type]( program=default_qc_spec.program.lower(), # lower to hit the cache more often model=Model( method=default_qc_spec.method.lower(), basis=default_qc_spec.basis.lower() if default_qc_spec.basis is not None else default_qc_spec.basis, ), ) # only overwrite with general settings if not configured if target_schema.calculation_specification is None: target_schema.calculation_specification = target_specification if target_schema.reference_data is not None: continue if task_type in local_qc_data: target_schema.reference_data = local_qc_data[task_type] continue target_schema.reference_data = BespokeQCData(spec=target_specification) # work out if we need the target torsion smirks # they are required if we are fragmenting or fitting torsions else not needed if self.fragmentation_engine or SMIRKSType.ProperTorsions in self.target_smirks: target_torsion_smirks = self.target_torsion_smirks else: target_torsion_smirks = None schema = BespokeOptimizationSchema( id=f"bespoke_task_{index}", smiles=molecule.to_smiles(mapped=True), initial_force_field=force_field_editor.force_field.to_string(), initial_force_field_hash=ff_hash, stages=[ OptimizationStageSchema( optimizer=self.optimizer, parameters=[], parameter_hyperparameters=self.parameter_hyperparameters, targets=targets, ) ], fragmentation_engine=self.fragmentation_engine, target_torsion_smirks=target_torsion_smirks, smirk_settings=self.smirk_settings, ) return schema