Source code for openff.evaluator.datasets.curation.components.selection

import functools
import itertools
from enum import Enum
from typing import TYPE_CHECKING, List, Set, Tuple, Union

import numpy
import pandas
from pydantic import BaseModel, Field, conlist, validator
from typing_extensions import Literal

from openff.evaluator.datasets.curation.components import (
    CurationComponent,
    CurationComponentSchema,
)
from openff.evaluator.datasets.curation.components.filtering import (
    FilterByEnvironments,
    FilterByEnvironmentsSchema,
    FilterBySubstances,
    FilterBySubstancesSchema,
)
from openff.evaluator.datasets.utilities import (
    data_frame_to_substances,
    reorder_data_frame,
)
from openff.evaluator.utils.checkmol import ChemicalEnvironment
from openff.evaluator.utils.exceptions import MissingOptionalDependency

PropertyType = Tuple[str, int]

if TYPE_CHECKING:

    PositiveInt = int

    try:
        from openeye.oegraphsim import OEFingerPrint
    except ImportError:
        OEFingerPrint = None

else:
    from pydantic import PositiveInt


[docs]class State(BaseModel): temperature: float = Field(..., description="The temperature (K) of interest.") pressure: float = Field(..., description="The pressure (kPa) of interest.") mole_fractions: Tuple[float, ...] = Field( ..., description="The composition of interest." )
[docs]class TargetState(BaseModel): property_types: List[PropertyType] = Field( ..., description="The properties to select at the specified states." ) states: List[State] = Field( ..., description="The states at which data points should be selected." ) @classmethod @validator("property_types") def property_types_validator(cls, value): assert len(value) > 0 n_components = value[0][1] assert all(x[1] == n_components for x in value) return value
[docs]class FingerPrintType(Enum): Tree = "Tree" MACCS166 = "MACCS166"
[docs]class SelectSubstancesSchema(CurationComponentSchema): type: Literal["SelectSubstances"] = "SelectSubstances" target_environments: conlist(ChemicalEnvironment, min_items=1) = Field( ..., description="The chemical environments which selected substances should " "contain.", ) n_per_environment: PositiveInt = Field( ..., description="The number of substances to ideally select for each chemical " "environment of interest. In the case of pure substances, this will be the " "number of substances to choose per environment specified in " "`target_environments`. For binary mixtures, this will be the number of " "substances to select for each pair of environments constructed from the " "`target_environments` list and so on.", ) substances_to_exclude: List[Tuple[str, ...]] = Field( default_factory=list, description="The substances to 1) filter from the available data before " "selecting substances and 2) to penalize similarity to. This field is " "mainly expected to be used when creating a test set which is distinct from " "a training set.", ) finger_print_type: FingerPrintType = Field( FingerPrintType.Tree, description="The type of finger print to use in the distance metrics.", ) per_property: bool = Field( ..., description="Whether the selection algorithm should be run once per " "property (e.g. select substances for pure densities, and then select " "substances for pure enthalpies of vaporization), or whether to run it " "once for the whole data set without consideration for if the selected " "substances have data points available for each property type in the set. " "This option should usually be set to false when the data set to select " "from strictly contains data points for each type of property for the same" "set of systems, and true otherwise.", )
[docs]class SelectSubstances(CurationComponent): """A component for selecting a specified number data points which were measured for systems containing a specified set of chemical functionalities. """ @classmethod def _check_oe_available(cls): """Check if the `oechem` and `oegraphsim` modules are available for import. Raises ------- MissingOptionalDependency """ try: from openeye import oechem, oegraphsim except ImportError as e: raise MissingOptionalDependency(e.path, False) unlicensed_library = ( "openeye.oechem" if not oechem.OEChemIsLicensed() else "openeye.oegraphsim" if not oegraphsim.OEGraphSimIsLicensed() else None ) if unlicensed_library is not None: raise MissingOptionalDependency(unlicensed_library, True) @classmethod @functools.lru_cache(3000) def _compute_molecule_finger_print( cls, smiles: str, finger_print_type: FingerPrintType ) -> "OEFingerPrint": """Computes the finger print for a given molecule using the OpenEye toolkit. Parameters ---------- smiles The smiles pattern to generate a finger print for. finger_print_type The type of finger print to generate. Returns ------- The generated finger print. """ from openeye.oegraphsim import ( OEFingerPrint, OEFPType_MACCS166, OEFPType_Tree, OEMakeFP, ) from openff.toolkit.topology import Molecule oe_molecule = Molecule.from_smiles(smiles).to_openeye() if finger_print_type == FingerPrintType.Tree: oe_finger_print_type = OEFPType_Tree elif finger_print_type == FingerPrintType.MACCS166: oe_finger_print_type = OEFPType_MACCS166 else: raise NotImplementedError() finger_print = OEFingerPrint() OEMakeFP(finger_print, oe_molecule, oe_finger_print_type) return finger_print @classmethod def _compute_mixture_finger_print( cls, mixture: Tuple[str, ...], finger_print_type: FingerPrintType ) -> Tuple["OEFingerPrint", ...]: """Computes the finger print of a mixture of molecules as defined by their smiles patterns. Parameters ---------- mixture The smiles patterns of the molecules in the mixture. finger_print_type The type of finger print to generate. Returns ------- tuple of OEFingerPrint The finger print of each molecule in the mixture. """ mixture_finger_print = tuple( cls._compute_molecule_finger_print(x, finger_print_type) for x in mixture ) return mixture_finger_print @classmethod @functools.lru_cache(3000) def _compute_distance( cls, mixture_a: Tuple[str, ...], mixture_b: Tuple[str, ...], finger_print_type: FingerPrintType, ): """Computes the 'distance' between two mixtures based on their finger prints. The distance is defined as the minimum of - the OETanimoto distance between component a of mixture a and component a of mixture b + the OETanimoto distance between component b of mixture a and component b of mixture b and - the OETanimoto distance between component b of mixture a and component a of mixture b + the OETanimoto distance between component a of mixture a and component b of mixture b Parameters ---------- mixture_a The smiles patterns of the molecules in mixture a. mixture_b The smiles patterns of the molecules in mixture b. finger_print_type The type of finger print to base the distance metric on. Returns ------- float The distance between the mixtures """ from openeye.oegraphsim import OETanimoto if sorted(mixture_a) == sorted(mixture_b): return 0.0 finger_print_a = cls._compute_mixture_finger_print(mixture_a, finger_print_type) finger_print_b = cls._compute_mixture_finger_print(mixture_b, finger_print_type) if len(mixture_a) == 1 and len(mixture_b) == 1: distance = 1.0 - OETanimoto(finger_print_a[0], finger_print_b[0]) elif len(mixture_a) == 2 and len(mixture_b) == 2: distance = min( (1.0 - OETanimoto(finger_print_a[0], finger_print_b[0])) + (1.0 - OETanimoto(finger_print_a[1], finger_print_b[1])), (1.0 - OETanimoto(finger_print_a[1], finger_print_b[0])) + (1.0 - OETanimoto(finger_print_a[0], finger_print_b[1])), ) else: raise NotImplementedError() return distance @classmethod def _compute_distance_with_set( cls, mixture, mixtures: List[Tuple[str, ...]], finger_print_type: FingerPrintType, ) -> float: """Computes the distances between a given mixture and a set of other mixtures. This is computed as the sum of `_compute_distance(mixture, mixtures[i])` for all i in `mixtures`. Parameters ---------- mixture The mixture to compute the distances from. mixtures The set of mixtures to compare with `mixture`. finger_print_type: OEFPTypeBase The type of finger print to base the distance metric on. Returns ------- The calculated distance. """ distance = sum( cls._compute_distance(mixture, x, finger_print_type) for x in mixtures ) return distance @classmethod def _select_substances( cls, data_frame: pandas.DataFrame, n_substances: int, previously_chosen: List[Tuple[str, ...]], finger_print_type: FingerPrintType, ) -> List[Tuple[str, ...]]: # Store the substances which can be selected, and those which # have already been selected. open_list = [*data_frame_to_substances(data_frame)] closed_list = [] # Determine the maximum number of substances which can be selected. max_n_possible = min(len(open_list), n_substances) while len(open_list) > 0 and len(closed_list) < max_n_possible: def distance_metric(mixture): return cls._compute_distance_with_set( mixture, [*previously_chosen, *closed_list], finger_print_type ) least_similar = sorted(open_list, key=distance_metric, reverse=True)[0] open_list.remove(least_similar) closed_list.append(least_similar) return closed_list @classmethod def _apply_to_data_frame(cls, data_frame, schema, n_processes): """Applies the selection algorithm to a specified data frame. Parameters ---------- data_frame The data frame to apply the algorithm to. schema This component schema. n_processes The number of processes available to the component. """ selected_substances = [] min_n_components = data_frame["N Components"].min() max_n_components = data_frame["N Components"].max() # Perform the selection one for each size of substance (e.g. once # for pure, once for binary etc.) for n_components in range(min_n_components, max_n_components + 1): component_data = data_frame[data_frame["N Components"] == n_components] if len(component_data) == 0: continue # Define all permutations of the target environments. if n_components == 1: chemical_environments = [(x,) for x in schema.target_environments] elif n_components == 2: chemical_environments = [ *[(x, x) for x in schema.target_environments], *itertools.combinations(schema.target_environments, r=2), ] else: raise NotImplementedError() # Keep a track of the selected substances selected_n_substances: Set[Tuple[str, ...]] = set() for chemical_environment in chemical_environments: # Filter out any environments not currently being considered. environment_filter = FilterByEnvironmentsSchema( per_component_environments={ n_components: [[x] for x in chemical_environment] } ) environment_data = FilterByEnvironments.apply( component_data, environment_filter, n_processes ) if len(environment_data) == 0: continue # Define the substances which the newly selected substance # should be unique to. substances_to_penalize = { *selected_n_substances, *[ x for x in schema.substances_to_exclude if len(x) == n_components ], } environment_selected_substances = cls._select_substances( environment_data, schema.n_per_environment, [*substances_to_penalize], schema.finger_print_type, ) selected_n_substances.update(environment_selected_substances) # Remove the newly selected substances from the pool to select from. component_data = FilterBySubstances.apply( component_data, FilterBySubstancesSchema( substances_to_exclude=environment_selected_substances ), n_processes=n_processes, ) selected_substances.extend(selected_n_substances) # Filter the data frame to retain only the selected substances. data_frame = FilterBySubstances.apply( data_frame, FilterBySubstancesSchema(substances_to_include=selected_substances), n_processes=n_processes, ) return data_frame @classmethod def _apply( cls, data_frame: pandas.DataFrame, schema: SelectSubstancesSchema, n_processes ) -> pandas.DataFrame: # Make sure OpenEye is available for computing the finger prints. cls._check_oe_available() # Filter out any substances which should be excluded data_frame = FilterBySubstances.apply( data_frame, FilterBySubstancesSchema( substances_to_exclude=schema.substances_to_exclude ), n_processes=n_processes, ) max_n_components = data_frame["N Components"].max() if max_n_components > 2: raise NotImplementedError() if schema.per_property: # Partition the data frame into ones which only contain a # single property type. property_headers = [ header for header in data_frame if header.find(" Value ") >= 0 ] data_frames_to_filter = [ data_frame[data_frame[header].notna()] for header in property_headers ] else: data_frames_to_filter = [data_frame] filtered_data_frames = [ cls._apply_to_data_frame(filtered_data_frame, schema, n_processes) for filtered_data_frame in data_frames_to_filter ] if len(filtered_data_frames) == 1: filtered_data_frame = filtered_data_frames[0] else: filtered_data_frame = pandas.concat( filtered_data_frames, ignore_index=True, sort=False ) return filtered_data_frame
[docs]class SelectDataPointsSchema(CurationComponentSchema): type: Literal["SelectDataPoints"] = "SelectDataPoints" target_states: List[TargetState] = Field( ..., description="A list of the target states for which we would ideally include " "data points for (e.g. density data points measured at ambient conditions, or " "for density AND enthalpy of mixing measurements made for systems with a " "roughly 50:50 composition).", )
[docs]class SelectDataPoints(CurationComponent): """A component for selecting a set of data points which are measured as close as possible to a particular set of states. The points will be chosen so as to try and maximise the number of properties measured at the same condition (e.g. ideally we would have a data point for each property at T=298.15 and p=1atm) as this will maximise the chances that we can extract all properties from a single simulation. """ @classmethod def _property_header(cls, data_frame, property_type): for column in data_frame: if column.find(f"{property_type} Value") < 0: continue return column return None @classmethod def _distances_to_state(cls, data_frame: pandas.DataFrame, state_point: State): distance_sqr = ( data_frame["Temperature (K)"] - state_point.temperature ) ** 2 + ( data_frame["Pressure (kPa)"] / 10.0 - state_point.pressure / 10.0 ) ** 2 for component_index in range(len(state_point.mole_fractions)): distance_sqr += ( data_frame[f"Mole Fraction {component_index + 1}"] - state_point.mole_fractions[component_index] ) ** 2 return distance_sqr @classmethod def _distances_to_cluster( cls, data_frame: pandas.DataFrame, target_state: TargetState ): distances_sqr = pandas.DataFrame() for index, state_point in enumerate(target_state.states): distances_sqr[index] = cls._distances_to_state(data_frame, state_point) return distances_sqr @classmethod def _select_substance_data_points(cls, original_data_frame, target_state): n_components = target_state.property_types[0][1] data_frame = original_data_frame[ original_data_frame["N Components"] == n_components ].copy() data_frame["Property Type"] = "" property_types = [x[0] for x in target_state.property_types] for property_type in property_types: property_header = cls._property_header(data_frame, property_type) if not property_header: continue data_frame.loc[ data_frame[property_header].notna(), "Property Type" ] = property_type data_frame["Temperature (K)"] = data_frame["Temperature (K)"].round(2) data_frame["Pressure (kPa)"] = data_frame["Pressure (kPa)"].round(1) for index in range(n_components): data_frame[f"Mole Fraction {index + 1}"] = data_frame[ f"Mole Fraction {index + 1}" ].round(3) # Compute the distance to each cluster distances = cls._distances_to_cluster(data_frame, target_state) data_frame["Cluster"] = distances.idxmin(axis=1) cluster_headers = [ "Temperature (K)", "Pressure (kPa)", *[f"Mole Fraction {index + 1}" for index in range(n_components)], ] # Compute how may data points are present for each state in the different # clusters. grouped_data = data_frame.groupby( by=[*cluster_headers, "Cluster"], as_index=False, ).agg({"Property Type": pandas.Series.nunique}) selected_data = [False] * len(data_frame) for cluster_index in range(len(target_state.states)): # Calculate the distance between each clustered state and # the center of the cluster (i.e the clustered state). cluster_data = grouped_data[grouped_data["Cluster"] == cluster_index] cluster_data["Distance"] = cls._distances_to_state( cluster_data, target_state.states[cluster_index] ) if len(cluster_data) == 0: continue open_list = [x[0] for x in target_state.property_types] while len(open_list) > 0 and len(cluster_data) > 0: # Find the clustered state which is closest to the center of # the cluster. Points measured at this state will becomes the # candidates to be selected. sorted_cluster_data = cluster_data.sort_values( by=["Property Type", "Distance"], ascending=[False, True] ) closest_index = sorted_cluster_data.index[0] # Find the data points which were measured at the clustered state. select_data = data_frame["Property Type"].isin(open_list) for cluster_header in cluster_headers: select_data = select_data & numpy.isclose( data_frame[cluster_header], sorted_cluster_data.loc[closest_index, cluster_header], ) selected_property_types = data_frame[select_data][ "Property Type" ].unique() # Make sure to select a single data point for each type of property. for selected_property_type in selected_property_types: selected_property_data = original_data_frame[ select_data & (data_frame["Property Type"] == selected_property_type) ] if len(selected_property_data) <= 1: continue # Multiple data points were measured for this property type # at the clustered state. We sort these multiple data points # by their distance to the target state and select the closest. # This is not guaranteed to be optimal but should be an ok # approximation in most cases. selected_data_distances = cls._distances_to_state( selected_property_data, target_state.states[cluster_index] ) sorted_data_distances = selected_data_distances.sort_values( ascending=True ) select_data[sorted_data_distances.index] = False select_data[sorted_data_distances.index[0]] = True selected_data = selected_data | select_data for property_type in data_frame[select_data]["Property Type"].unique(): open_list.remove(property_type) cluster_data = cluster_data.drop(closest_index) if len(selected_data) == 0: return pandas.DataFrame() return original_data_frame[selected_data] @classmethod def _apply( cls, data_frame: pandas.DataFrame, schema: SelectDataPointsSchema, n_processes ) -> pandas.DataFrame: max_n_substances = data_frame["N Components"].max() component_headers = [f"Component {i + 1}" for i in range(max_n_substances)] # Re-order the data frame so that the components are alphabetically sorted. # This will make it easier to find unique substances. ordered_data_frame = reorder_data_frame(data_frame) # Find all of the unique substances in the data frame. unique_substances = ordered_data_frame[component_headers].drop_duplicates() selected_data = [] # Start to choose the state points for each unique substance. for _, unique_substance in unique_substances.iterrows(): substance_data_frame = ordered_data_frame for index, component in enumerate(unique_substance[component_headers]): if pandas.isnull(component): substance_data_frame = substance_data_frame[ substance_data_frame[component_headers[index]].isna() ] else: substance_data_frame = substance_data_frame[ substance_data_frame[component_headers[index]] == component ] for target_state in schema.target_states: substance_selected_data = cls._select_substance_data_points( substance_data_frame, target_state ) if len(substance_selected_data) == 0: continue selected_data.append(substance_selected_data) selected_data = pandas.concat(selected_data, ignore_index=True, sort=False) return selected_data
SelectionComponentSchema = Union[SelectSubstancesSchema, SelectDataPointsSchema]