Source code for openff.evaluator.properties.dielectric

"""
A collection of dielectric physical property definitions.
"""
import copy

import numpy as np
import pint
from simtk import openmm
from simtk.openmm import XmlSerializer

from openff.evaluator import unit
from openff.evaluator.attributes import UNDEFINED
from openff.evaluator.datasets import PhysicalProperty, PropertyPhase
from openff.evaluator.datasets.thermoml import thermoml_property
from openff.evaluator.layers import register_calculation_schema
from openff.evaluator.layers.reweighting import ReweightingLayer, ReweightingSchema
from openff.evaluator.layers.simulation import SimulationLayer, SimulationSchema
from openff.evaluator.protocols import analysis, reweighting
from openff.evaluator.protocols.utils import (
    generate_base_reweighting_protocols,
    generate_base_simulation_protocols,
    generate_gradient_protocol_group,
)
from openff.evaluator.thermodynamics import ThermodynamicState
from openff.evaluator.utils import timeseries
from openff.evaluator.utils.statistics import bootstrap
from openff.evaluator.workflow import WorkflowSchema, workflow_protocol
from openff.evaluator.workflow.attributes import InputAttribute, OutputAttribute
from openff.evaluator.workflow.utils import ProtocolPath


@workflow_protocol()
class ExtractAverageDielectric(analysis.AverageTrajectoryProperty):
    """Extracts the average dielectric constant from a simulation trajectory."""

    system_path = InputAttribute(
        docstring="The path to the XML system object which defines the forces present in the system.",
        type_hint=str,
        default_value=UNDEFINED,
    )
    thermodynamic_state = InputAttribute(
        docstring="The thermodynamic state at which the trajectory was generated.",
        type_hint=ThermodynamicState,
        default_value=UNDEFINED,
    )

    dipole_moments = OutputAttribute(
        docstring="The raw (possibly correlated) dipole moments which were used in "
        "the dielectric calculation.",
        type_hint=pint.Quantity,
    )
    volumes = OutputAttribute(
        docstring="The raw (possibly correlated) which were used in the dielectric calculation.",
        type_hint=pint.Quantity,
    )

    uncorrelated_volumes = OutputAttribute(
        docstring="The uncorrelated volumes which were used in the dielectric "
        "calculation.",
        type_hint=pint.Quantity,
    )

    def _bootstrap_function(self, **sample_kwargs):
        """Calculates the static dielectric constant from an
        array of dipoles and volumes.

        Notes
        -----
        The static dielectric constant is taken from for Equation 7 of [1]

        References
        ----------
        [1] A. Glattli, X. Daura and W. F. van Gunsteren. Derivation of an improved simple point charge
            model for liquid water: SPC/A and SPC/L. J. Chem. Phys. 116(22):9811-9828, 2002

        Parameters
        ----------
        sample_kwargs: dict of str and np.ndarray
            A key words dictionary of the bootstrap sample data, where the
            sample data is a numpy array of shape=(num_frames, num_dimensions)
            with dtype=float. The kwargs should include the dipole moment and
            the system volume

        Returns
        -------
        float
            The unitless static dielectric constant
        """

        dipole_moments = sample_kwargs["dipoles"]
        volumes = sample_kwargs["volumes"]

        temperature = self.thermodynamic_state.temperature

        dipole_mu = dipole_moments.mean(0)
        shifted_dipoles = dipole_moments - dipole_mu

        dipole_variance = (shifted_dipoles * shifted_dipoles).sum(-1).mean(0) * (
            unit.elementary_charge * unit.nanometers
        ) ** 2

        volume = volumes.mean() * unit.nanometer ** 3

        e0 = 8.854187817e-12 * unit.farad / unit.meter  # Taken from QCElemental

        dielectric_constant = 1.0 + dipole_variance / (
            3 * unit.boltzmann_constant * temperature * volume * e0
        )

        return dielectric_constant

    def _extract_charges(self):
        """Extracts all of the charges from a system object.

        Returns
        -------
        list of float
        """
        from simtk import unit as simtk_unit

        charge_list = []

        with open(self._system_path, "r") as file:
            system = XmlSerializer.deserialize(file.read())

        for force_index in range(system.getNumForces()):

            force = system.getForce(force_index)

            if not isinstance(force, openmm.NonbondedForce):
                continue

            for atom_index in range(force.getNumParticles()):
                charge = force.getParticleParameters(atom_index)[0]
                charge = charge.value_in_unit(simtk_unit.elementary_charge)

                charge_list.append(charge)

        return charge_list

    def _extract_dipoles_and_volumes(self):
        """Extract the systems dipole moments and volumes.

        Returns
        -------
        numpy.ndarray
            The dipole moments of the trajectory (shape=(n_frames, 3), dtype=float)
        numpy.ndarray
            The volumes of the trajectory (shape=(n_frames, 1), dtype=float)
        """
        import mdtraj

        dipole_moments = []
        volumes = []
        charge_list = self._extract_charges()

        for chunk in mdtraj.iterload(
            self.trajectory_path, top=self.input_coordinate_file, chunk=50
        ):

            dipole_moments.extend(mdtraj.geometry.dipole_moments(chunk, charge_list))
            volumes.extend(chunk.unitcell_volumes)

        dipole_moments = np.array(dipole_moments)
        volumes = np.array(volumes)

        return dipole_moments, volumes

    def _execute(self, directory, available_resources):

        super(ExtractAverageDielectric, self)._execute(directory, available_resources)

        # Extract the dipoles
        dipole_moments, volumes = self._extract_dipoles_and_volumes()
        self.dipole_moments = dipole_moments * unit.dimensionless

        (
            dipole_moments,
            self.equilibration_index,
            self.statistical_inefficiency,
        ) = timeseries.decorrelate_time_series(dipole_moments)

        uncorrelated_length = len(volumes) - self.equilibration_index

        sample_indices = timeseries.get_uncorrelated_indices(
            uncorrelated_length, self.statistical_inefficiency
        )
        sample_indices = [index + self.equilibration_index for index in sample_indices]

        self.volumes = volumes * unit.nanometer ** 3
        uncorrelated_volumes = volumes[sample_indices]

        self.uncorrelated_values = dipole_moments * unit.dimensionless
        self.uncorrelated_volumes = uncorrelated_volumes * unit.nanometer ** 3

        value, uncertainty = bootstrap(
            self._bootstrap_function,
            self.bootstrap_iterations,
            self.bootstrap_sample_size,
            dipoles=dipole_moments,
            volumes=uncorrelated_volumes,
        )

        self.value = (value * unit.dimensionless).plus_minus(
            uncertainty * unit.dimensionless
        )


@workflow_protocol()
class ReweightDielectricConstant(reweighting.BaseMBARProtocol):
    """Reweights a set of dipole moments (`reference_observables`) and volumes
    (`reference_volumes`) using MBAR, and then combines these to yeild the reweighted
    dielectric constant. Uncertainties in the dielectric constant are determined
    by bootstrapping.
    """

    reference_dipole_moments = InputAttribute(
        docstring="A Quantity wrapped np.ndarray of the dipole moments of each "
        "of the reference states.",
        type_hint=list,
        default_value=UNDEFINED,
    )
    reference_volumes = InputAttribute(
        docstring="A Quantity wrapped np.ndarray of the volumes of each of the "
        "reference states.",
        type_hint=list,
        default_value=UNDEFINED,
    )

    thermodynamic_state = InputAttribute(
        docstring="The thermodynamic state at which the trajectory was generated.",
        type_hint=ThermodynamicState,
        default_value=UNDEFINED,
    )

    def __init__(self, protocol_id):
        super().__init__(protocol_id)
        self.bootstrap_uncertainties = True

    def _bootstrap_function(
        self,
        reference_reduced_potentials,
        target_reduced_potentials,
        **reference_observables,
    ):

        assert len(reference_observables) == 3

        transposed_observables = {}

        for key in reference_observables:
            transposed_observables[key] = np.transpose(reference_observables[key])

        values, _, _ = self._reweight_observables(
            np.transpose(reference_reduced_potentials),
            np.transpose(target_reduced_potentials),
            **transposed_observables,
        )

        average_squared_dipole = values["dipoles_sqr"]
        average_dipole_squared = np.linalg.norm(values["dipoles"])

        dipole_variance = (average_squared_dipole - average_dipole_squared) * (
            unit.elementary_charge * unit.nanometers
        ) ** 2

        volume = values["volumes"] * unit.nanometer ** 3

        e0 = 8.854187817e-12 * unit.farad / unit.meter  # Taken from QCElemental

        dielectric_constant = 1.0 + dipole_variance / (
            3
            * unit.boltzmann_constant
            * self.thermodynamic_state.temperature
            * volume
            * e0
        )

        return dielectric_constant

    def _execute(self, directory, available_resources):

        if len(self.reference_dipole_moments) == 0:
            raise ValueError("There were no dipole moments to reweight.")

        if len(self.reference_volumes) == 0:
            raise ValueError("There were no volumes to reweight.")

        if not isinstance(
            self.reference_dipole_moments[0], pint.Quantity
        ) or not isinstance(self.reference_volumes[0], pint.Quantity):

            raise ValueError(
                "The reference observables should be a list of "
                "pint.Quantity wrapped ndarray's.",
            )

        if len(self.reference_dipole_moments) != len(self.reference_volumes):

            raise ValueError(
                "The number of reference dipoles does not match the "
                "number of reference volumes.",
            )

        for reference_dipoles, reference_volumes in zip(
            self.reference_dipole_moments, self.reference_volumes
        ):

            if len(reference_dipoles) == len(reference_volumes):
                continue

            raise ValueError(
                "The number of reference dipoles does not match the "
                "number of reference volumes.",
            )

        self._reference_observables = self.reference_dipole_moments

        dipole_moments = self._prepare_observables_array(self.reference_dipole_moments)
        dipole_moments_sqr = np.array(
            [[np.dot(dipole, dipole) for dipole in np.transpose(dipole_moments)]]
        )

        volumes = self._prepare_observables_array(self.reference_volumes)

        if self.bootstrap_uncertainties:

            self._execute_with_bootstrapping(
                unit.dimensionless,
                dipoles=dipole_moments,
                dipoles_sqr=dipole_moments_sqr,
                volumes=volumes,
            )
        else:

            raise ValueError(
                "Dielectric constant can only be reweighted in conjunction "
                "with bootstrapped uncertainties.",
            )


[docs]@thermoml_property( "Relative permittivity at zero frequency", supported_phases=PropertyPhase.Liquid, ) class DielectricConstant(PhysicalProperty): """A class representation of a dielectric property"""
[docs] @classmethod def default_unit(cls): return unit.dimensionless
[docs] @staticmethod def default_simulation_schema( absolute_tolerance=UNDEFINED, relative_tolerance=UNDEFINED, n_molecules=1000 ): """Returns the default calculation schema to use when estimating this class of property from direct simulations. Parameters ---------- absolute_tolerance: pint.Quantity, optional The absolute tolerance to estimate the property to within. relative_tolerance: float The tolerance (as a fraction of the properties reported uncertainty) to estimate the property to within. n_molecules: int The number of molecules to use in the simulation. Returns ------- SimulationSchema The schema to follow when estimating this property. """ assert absolute_tolerance == UNDEFINED or relative_tolerance == UNDEFINED calculation_schema = SimulationSchema() calculation_schema.absolute_tolerance = absolute_tolerance calculation_schema.relative_tolerance = relative_tolerance # Define the protocol which will extract the average dielectric constant # from the results of a simulation. extract_dielectric = ExtractAverageDielectric("extract_dielectric") extract_dielectric.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) # Define the protocols which will run the simulation itself. use_target_uncertainty = ( absolute_tolerance != UNDEFINED or relative_tolerance != UNDEFINED ) protocols, value_source, output_to_store = generate_base_simulation_protocols( extract_dielectric, use_target_uncertainty, n_molecules=n_molecules, ) # Make sure the input of the analysis protcol is properly hooked up. extract_dielectric.system_path = ProtocolPath( "system_path", protocols.assign_parameters.id ) # Dielectric constants typically take longer to converge, so we need to # reflect this in the maximum number of convergence iterations. protocols.converge_uncertainty.max_iterations = 400 # Set up the gradient calculations. For dielectric constants, we need to use # a slightly specialised reweighting protocol which we set up here. coordinate_source = ProtocolPath( "output_coordinate_file", protocols.equilibration_simulation.id ) trajectory_source = ProtocolPath( "trajectory_file_path", protocols.converge_uncertainty.id, protocols.production_simulation.id, ) statistics_source = ProtocolPath( "statistics_file_path", protocols.converge_uncertainty.id, protocols.production_simulation.id, ) gradient_mbar_protocol = ReweightDielectricConstant("gradient_mbar") gradient_mbar_protocol.reference_dipole_moments = [ ProtocolPath( "dipole_moments", protocols.converge_uncertainty.id, extract_dielectric.id, ) ] gradient_mbar_protocol.reference_volumes = [ ProtocolPath( "volumes", protocols.converge_uncertainty.id, extract_dielectric.id ) ] gradient_mbar_protocol.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) gradient_mbar_protocol.reference_reduced_potentials = statistics_source ( gradient_group, gradient_replicator, gradient_source, ) = generate_gradient_protocol_group( gradient_mbar_protocol, ProtocolPath("force_field_path", "global"), coordinate_source, trajectory_source, statistics_source, ) # Build the workflow schema. schema = WorkflowSchema() schema.protocol_schemas = [ protocols.build_coordinates.schema, protocols.assign_parameters.schema, protocols.energy_minimisation.schema, protocols.equilibration_simulation.schema, protocols.converge_uncertainty.schema, protocols.extract_uncorrelated_trajectory.schema, protocols.extract_uncorrelated_statistics.schema, gradient_group.schema, ] schema.protocol_replicators = [gradient_replicator] schema.outputs_to_store = {"full_system": output_to_store} schema.gradients_sources = [gradient_source] schema.final_value_source = value_source calculation_schema.workflow_schema = schema return calculation_schema
[docs] @staticmethod def default_reweighting_schema( absolute_tolerance=UNDEFINED, relative_tolerance=UNDEFINED, n_effective_samples=50, ): """Returns the default calculation schema to use when estimating this property by reweighting existing data. Parameters ---------- absolute_tolerance: pint.Quantity, optional The absolute tolerance to estimate the property to within. relative_tolerance: float The tolerance (as a fraction of the properties reported uncertainty) to estimate the property to within. n_effective_samples: int The minimum number of effective samples to require when reweighting the cached simulation data. Returns ------- ReweightingSchema The schema to follow when estimating this property. """ assert absolute_tolerance == UNDEFINED or relative_tolerance == UNDEFINED calculation_schema = ReweightingSchema() calculation_schema.absolute_tolerance = absolute_tolerance calculation_schema.relative_tolerance = relative_tolerance data_replicator_id = "data_replicator" # Set up a protocol to extract the dielectric constant from the stored data. extract_dielectric = ExtractAverageDielectric( f"calc_dielectric_$({data_replicator_id})" ) # For the dielectric constant, we employ a slightly more advanced reweighting # protocol set up for calculating fluctuation properties. reweight_dielectric = ReweightDielectricConstant("reweight_dielectric") reweight_dielectric.reference_dipole_moments = ProtocolPath( "uncorrelated_values", extract_dielectric.id ) reweight_dielectric.reference_volumes = ProtocolPath( "uncorrelated_volumes", extract_dielectric.id ) reweight_dielectric.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) reweight_dielectric.bootstrap_uncertainties = True reweight_dielectric.bootstrap_iterations = 200 reweight_dielectric.required_effective_samples = n_effective_samples protocols, data_replicator = generate_base_reweighting_protocols( extract_dielectric, reweight_dielectric, data_replicator_id ) # Make sure input is taken from the correct protocol outputs. extract_dielectric.system_path = ProtocolPath( "system_path", protocols.build_reference_system.id ) extract_dielectric.thermodynamic_state = ProtocolPath( "thermodynamic_state", protocols.unpack_stored_data.id ) # Set up the gradient calculations coordinate_path = ProtocolPath( "output_coordinate_path", protocols.concatenate_trajectories.id ) trajectory_path = ProtocolPath( "output_trajectory_path", protocols.concatenate_trajectories.id ) statistics_path = ProtocolPath( "statistics_file_path", protocols.reduced_target_potential.id ) reweight_dielectric_template = copy.deepcopy(reweight_dielectric) ( gradient_group, gradient_replicator, gradient_source, ) = generate_gradient_protocol_group( reweight_dielectric_template, ProtocolPath("force_field_path", "global"), coordinate_path, trajectory_path, statistics_path, replicator_id="grad", effective_sample_indices=ProtocolPath( "effective_sample_indices", reweight_dielectric.id ), ) schema = WorkflowSchema() schema.protocol_schemas = [ *(x.schema for x in protocols), gradient_group.schema, ] schema.protocol_replicators = [data_replicator, gradient_replicator] schema.gradients_sources = [gradient_source] schema.final_value_source = ProtocolPath("value", protocols.mbar_protocol.id) calculation_schema.workflow_schema = schema return calculation_schema
# Register the properties via the plugin system. register_calculation_schema( DielectricConstant, SimulationLayer, DielectricConstant.default_simulation_schema ) register_calculation_schema( DielectricConstant, ReweightingLayer, DielectricConstant.default_reweighting_schema )