Source code for openff.evaluator.datasets.curation.workflow
import logging
from typing import List, Union, overload
import numpy
import pandas
from pydantic import BaseModel, Field
from openff.evaluator.datasets import PhysicalPropertyDataSet
from openff.evaluator.datasets.curation.components import CurationComponent
from openff.evaluator.datasets.curation.components.conversion import (
ConversionComponentSchema,
)
from openff.evaluator.datasets.curation.components.filtering import (
FilterComponentSchema,
)
from openff.evaluator.datasets.curation.components.freesolv import (
FreeSolvComponentSchema,
)
from openff.evaluator.datasets.curation.components.selection import (
SelectionComponentSchema,
)
from openff.evaluator.datasets.curation.components.thermoml import (
ThermoMLComponentSchema,
)
logger = logging.getLogger(__name__)
[docs]class CurationWorkflowSchema(BaseModel):
"""A schemas which encodes how a set of curation components should be applied
sequentially to a data set."""
component_schemas: List[
Union[
ConversionComponentSchema,
FilterComponentSchema,
FreeSolvComponentSchema,
SelectionComponentSchema,
ThermoMLComponentSchema,
]
] = Field(
default_factory=list,
description="The schemas of the components to apply as part of this workflow. "
"The components will be applied in the order they appear in this list.",
)
[docs]class CurationWorkflow:
"""A convenience class for applying a set of curation components
sequentially to a data set."""
@classmethod
@overload
def apply(
cls,
data_set: PhysicalPropertyDataSet,
schema: CurationWorkflowSchema,
n_processes: int = 1,
) -> PhysicalPropertyDataSet:
...
@classmethod
@overload
def apply(
cls,
data_set: pandas.DataFrame,
schema: CurationWorkflowSchema,
n_processes: int = 1,
) -> pandas.DataFrame:
...
[docs] @classmethod
def apply(cls, data_set, schema, n_processes=1):
"""Apply each component of this curation workflow to an initial data set in
sequence.
Parameters
----------
data_set
The data set to apply the workflow to. This may either be a
data set object or it's pandas representation.
schema
The schema which defines the components to apply.
n_processes
The number of processes that each component is allowed to
parallelize across.
Returns
-------
The data set which has had the curation workflow applied to it.
"""
component_classes = CurationComponent.components
data_frame = data_set
if isinstance(data_frame, PhysicalPropertyDataSet):
data_frame = data_frame.to_pandas()
data_frame = data_frame.copy()
data_frame = data_frame.fillna(value=numpy.nan)
for component_schema in schema.component_schemas:
component_class_name = component_schema.__class__.__name__.replace(
"Schema", ""
)
component_class = component_classes[component_class_name]
logger.info(f"Applying {component_class_name}")
data_frame = component_class.apply(
data_frame, component_schema, n_processes
)
logger.info(f"{component_class_name} applied")
data_frame = data_frame.fillna(value=numpy.nan)
if isinstance(data_set, PhysicalPropertyDataSet):
data_frame = PhysicalPropertyDataSet.from_pandas(data_frame)
return data_frame