Source code for openff.evaluator.properties.dielectric

"""
A collection of dielectric physical property definitions.
"""
import copy

import numpy as np
import pint
from simtk import openmm
from simtk.openmm import XmlSerializer

from openff.evaluator import unit
from openff.evaluator.attributes import UNDEFINED
from openff.evaluator.datasets import PhysicalProperty, PropertyPhase
from openff.evaluator.datasets.thermoml import thermoml_property
from openff.evaluator.layers import register_calculation_schema
from openff.evaluator.layers.reweighting import ReweightingLayer, ReweightingSchema
from openff.evaluator.layers.simulation import SimulationLayer, SimulationSchema
from openff.evaluator.protocols import analysis, reweighting
from openff.evaluator.protocols.utils import (
    generate_base_reweighting_protocols,
    generate_base_simulation_protocols,
    generate_gradient_protocol_group,
)
from openff.evaluator.thermodynamics import ThermodynamicState
from openff.evaluator.utils import timeseries
from openff.evaluator.utils.statistics import bootstrap
from openff.evaluator.workflow import WorkflowSchema, workflow_protocol
from openff.evaluator.workflow.attributes import InputAttribute, OutputAttribute
from openff.evaluator.workflow.utils import ProtocolPath


[docs]@workflow_protocol() class ExtractAverageDielectric(analysis.AverageTrajectoryProperty): """Extracts the average dielectric constant from a simulation trajectory.""" system_path = InputAttribute( docstring="The path to the XML system object which defines the forces present in the system.", type_hint=str, default_value=UNDEFINED, ) thermodynamic_state = InputAttribute( docstring="The thermodynamic state at which the trajectory was generated.", type_hint=ThermodynamicState, default_value=UNDEFINED, ) dipole_moments = OutputAttribute( docstring="The raw (possibly correlated) dipole moments which were used in " "the dielectric calculation.", type_hint=pint.Quantity, ) volumes = OutputAttribute( docstring="The raw (possibly correlated) which were used in the dielectric calculation.", type_hint=pint.Quantity, ) uncorrelated_volumes = OutputAttribute( docstring="The uncorrelated volumes which were used in the dielectric " "calculation.", type_hint=pint.Quantity, ) def _bootstrap_function(self, **sample_kwargs): """Calculates the static dielectric constant from an array of dipoles and volumes. Notes ----- The static dielectric constant is taken from for Equation 7 of [1] References ---------- [1] A. Glattli, X. Daura and W. F. van Gunsteren. Derivation of an improved simple point charge model for liquid water: SPC/A and SPC/L. J. Chem. Phys. 116(22):9811-9828, 2002 Parameters ---------- sample_kwargs: dict of str and np.ndarray A key words dictionary of the bootstrap sample data, where the sample data is a numpy array of shape=(num_frames, num_dimensions) with dtype=float. The kwargs should include the dipole moment and the system volume Returns ------- float The unitless static dielectric constant """ dipole_moments = sample_kwargs["dipoles"] volumes = sample_kwargs["volumes"] temperature = self.thermodynamic_state.temperature dipole_mu = dipole_moments.mean(0) shifted_dipoles = dipole_moments - dipole_mu dipole_variance = (shifted_dipoles * shifted_dipoles).sum(-1).mean(0) * ( unit.elementary_charge * unit.nanometers ) ** 2 volume = volumes.mean() * unit.nanometer ** 3 e0 = 8.854187817e-12 * unit.farad / unit.meter # Taken from QCElemental dielectric_constant = 1.0 + dipole_variance / ( 3 * unit.boltzmann_constant * temperature * volume * e0 ) return dielectric_constant def _extract_charges(self): """Extracts all of the charges from a system object. Returns ------- list of float """ from simtk import unit as simtk_unit charge_list = [] with open(self._system_path, "r") as file: system = XmlSerializer.deserialize(file.read()) for force_index in range(system.getNumForces()): force = system.getForce(force_index) if not isinstance(force, openmm.NonbondedForce): continue for atom_index in range(force.getNumParticles()): charge = force.getParticleParameters(atom_index)[0] charge = charge.value_in_unit(simtk_unit.elementary_charge) charge_list.append(charge) return charge_list def _extract_dipoles_and_volumes(self): """Extract the systems dipole moments and volumes. Returns ------- numpy.ndarray The dipole moments of the trajectory (shape=(n_frames, 3), dtype=float) numpy.ndarray The volumes of the trajectory (shape=(n_frames, 1), dtype=float) """ import mdtraj dipole_moments = [] volumes = [] charge_list = self._extract_charges() for chunk in mdtraj.iterload( self.trajectory_path, top=self.input_coordinate_file, chunk=50 ): dipole_moments.extend(mdtraj.geometry.dipole_moments(chunk, charge_list)) volumes.extend(chunk.unitcell_volumes) dipole_moments = np.array(dipole_moments) volumes = np.array(volumes) return dipole_moments, volumes def _execute(self, directory, available_resources): super(ExtractAverageDielectric, self)._execute(directory, available_resources) # Extract the dipoles dipole_moments, volumes = self._extract_dipoles_and_volumes() self.dipole_moments = dipole_moments * unit.dimensionless ( dipole_moments, self.equilibration_index, self.statistical_inefficiency, ) = timeseries.decorrelate_time_series(dipole_moments) uncorrelated_length = len(volumes) - self.equilibration_index sample_indices = timeseries.get_uncorrelated_indices( uncorrelated_length, self.statistical_inefficiency ) sample_indices = [index + self.equilibration_index for index in sample_indices] self.volumes = volumes * unit.nanometer ** 3 uncorrelated_volumes = volumes[sample_indices] self.uncorrelated_values = dipole_moments * unit.dimensionless self.uncorrelated_volumes = uncorrelated_volumes * unit.nanometer ** 3 value, uncertainty = bootstrap( self._bootstrap_function, self.bootstrap_iterations, self.bootstrap_sample_size, dipoles=dipole_moments, volumes=uncorrelated_volumes, ) self.value = (value * unit.dimensionless).plus_minus( uncertainty * unit.dimensionless )
[docs]@workflow_protocol() class ReweightDielectricConstant(reweighting.BaseMBARProtocol): """Reweights a set of dipole moments (`reference_observables`) and volumes (`reference_volumes`) using MBAR, and then combines these to yeild the reweighted dielectric constant. Uncertainties in the dielectric constant are determined by bootstrapping. """ reference_dipole_moments = InputAttribute( docstring="A Quantity wrapped np.ndarray of the dipole moments of each " "of the reference states.", type_hint=list, default_value=UNDEFINED, ) reference_volumes = InputAttribute( docstring="A Quantity wrapped np.ndarray of the volumes of each of the " "reference states.", type_hint=list, default_value=UNDEFINED, ) thermodynamic_state = InputAttribute( docstring="The thermodynamic state at which the trajectory was generated.", type_hint=ThermodynamicState, default_value=UNDEFINED, )
[docs] def __init__(self, protocol_id): super().__init__(protocol_id) self.bootstrap_uncertainties = True
def _bootstrap_function( self, reference_reduced_potentials, target_reduced_potentials, **reference_observables, ): assert len(reference_observables) == 3 transposed_observables = {} for key in reference_observables: transposed_observables[key] = np.transpose(reference_observables[key]) values, _, _ = self._reweight_observables( np.transpose(reference_reduced_potentials), np.transpose(target_reduced_potentials), **transposed_observables, ) average_squared_dipole = values["dipoles_sqr"] average_dipole_squared = np.linalg.norm(values["dipoles"]) dipole_variance = (average_squared_dipole - average_dipole_squared) * ( unit.elementary_charge * unit.nanometers ) ** 2 volume = values["volumes"] * unit.nanometer ** 3 e0 = 8.854187817e-12 * unit.farad / unit.meter # Taken from QCElemental dielectric_constant = 1.0 + dipole_variance / ( 3 * unit.boltzmann_constant * self.thermodynamic_state.temperature * volume * e0 ) return dielectric_constant def _execute(self, directory, available_resources): if len(self.reference_dipole_moments) == 0: raise ValueError("There were no dipole moments to reweight.") if len(self.reference_volumes) == 0: raise ValueError("There were no volumes to reweight.") if not isinstance( self.reference_dipole_moments[0], pint.Quantity ) or not isinstance(self.reference_volumes[0], pint.Quantity): raise ValueError( "The reference observables should be a list of " "pint.Quantity wrapped ndarray's.", ) if len(self.reference_dipole_moments) != len(self.reference_volumes): raise ValueError( "The number of reference dipoles does not match the " "number of reference volumes.", ) for reference_dipoles, reference_volumes in zip( self.reference_dipole_moments, self.reference_volumes ): if len(reference_dipoles) == len(reference_volumes): continue raise ValueError( "The number of reference dipoles does not match the " "number of reference volumes.", ) self._reference_observables = self.reference_dipole_moments dipole_moments = self._prepare_observables_array(self.reference_dipole_moments) dipole_moments_sqr = np.array( [[np.dot(dipole, dipole) for dipole in np.transpose(dipole_moments)]] ) volumes = self._prepare_observables_array(self.reference_volumes) if self.bootstrap_uncertainties: self._execute_with_bootstrapping( unit.dimensionless, dipoles=dipole_moments, dipoles_sqr=dipole_moments_sqr, volumes=volumes, ) else: raise ValueError( "Dielectric constant can only be reweighted in conjunction " "with bootstrapped uncertainties.", )
[docs]@thermoml_property( "Relative permittivity at zero frequency", supported_phases=PropertyPhase.Liquid, ) class DielectricConstant(PhysicalProperty): """A class representation of a dielectric property"""
[docs] @classmethod def default_unit(cls): return unit.dimensionless
[docs] @staticmethod def default_simulation_schema( absolute_tolerance=UNDEFINED, relative_tolerance=UNDEFINED, n_molecules=1000 ): """Returns the default calculation schema to use when estimating this class of property from direct simulations. Parameters ---------- absolute_tolerance: pint.Quantity, optional The absolute tolerance to estimate the property to within. relative_tolerance: float The tolerance (as a fraction of the properties reported uncertainty) to estimate the property to within. n_molecules: int The number of molecules to use in the simulation. Returns ------- SimulationSchema The schema to follow when estimating this property. """ assert absolute_tolerance == UNDEFINED or relative_tolerance == UNDEFINED calculation_schema = SimulationSchema() calculation_schema.absolute_tolerance = absolute_tolerance calculation_schema.relative_tolerance = relative_tolerance # Define the protocol which will extract the average dielectric constant # from the results of a simulation. extract_dielectric = ExtractAverageDielectric("extract_dielectric") extract_dielectric.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) # Define the protocols which will run the simulation itself. use_target_uncertainty = ( absolute_tolerance != UNDEFINED or relative_tolerance != UNDEFINED ) protocols, value_source, output_to_store = generate_base_simulation_protocols( extract_dielectric, use_target_uncertainty, n_molecules=n_molecules, ) # Make sure the input of the analysis protcol is properly hooked up. extract_dielectric.system_path = ProtocolPath( "system_path", protocols.assign_parameters.id ) # Dielectric constants typically take longer to converge, so we need to # reflect this in the maximum number of convergence iterations. protocols.converge_uncertainty.max_iterations = 400 # Set up the gradient calculations. For dielectric constants, we need to use # a slightly specialised reweighting protocol which we set up here. coordinate_source = ProtocolPath( "output_coordinate_file", protocols.equilibration_simulation.id ) trajectory_source = ProtocolPath( "trajectory_file_path", protocols.converge_uncertainty.id, protocols.production_simulation.id, ) statistics_source = ProtocolPath( "statistics_file_path", protocols.converge_uncertainty.id, protocols.production_simulation.id, ) gradient_mbar_protocol = ReweightDielectricConstant("gradient_mbar") gradient_mbar_protocol.reference_dipole_moments = [ ProtocolPath( "dipole_moments", protocols.converge_uncertainty.id, extract_dielectric.id, ) ] gradient_mbar_protocol.reference_volumes = [ ProtocolPath( "volumes", protocols.converge_uncertainty.id, extract_dielectric.id ) ] gradient_mbar_protocol.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) gradient_mbar_protocol.reference_reduced_potentials = statistics_source ( gradient_group, gradient_replicator, gradient_source, ) = generate_gradient_protocol_group( gradient_mbar_protocol, ProtocolPath("force_field_path", "global"), coordinate_source, trajectory_source, statistics_source, ) # Build the workflow schema. schema = WorkflowSchema() schema.protocol_schemas = [ protocols.build_coordinates.schema, protocols.assign_parameters.schema, protocols.energy_minimisation.schema, protocols.equilibration_simulation.schema, protocols.converge_uncertainty.schema, protocols.extract_uncorrelated_trajectory.schema, protocols.extract_uncorrelated_statistics.schema, gradient_group.schema, ] schema.protocol_replicators = [gradient_replicator] schema.outputs_to_store = {"full_system": output_to_store} schema.gradients_sources = [gradient_source] schema.final_value_source = value_source calculation_schema.workflow_schema = schema return calculation_schema
[docs] @staticmethod def default_reweighting_schema( absolute_tolerance=UNDEFINED, relative_tolerance=UNDEFINED, n_effective_samples=50, ): """Returns the default calculation schema to use when estimating this property by reweighting existing data. Parameters ---------- absolute_tolerance: pint.Quantity, optional The absolute tolerance to estimate the property to within. relative_tolerance: float The tolerance (as a fraction of the properties reported uncertainty) to estimate the property to within. n_effective_samples: int The minimum number of effective samples to require when reweighting the cached simulation data. Returns ------- ReweightingSchema The schema to follow when estimating this property. """ assert absolute_tolerance == UNDEFINED or relative_tolerance == UNDEFINED calculation_schema = ReweightingSchema() calculation_schema.absolute_tolerance = absolute_tolerance calculation_schema.relative_tolerance = relative_tolerance data_replicator_id = "data_replicator" # Set up a protocol to extract the dielectric constant from the stored data. extract_dielectric = ExtractAverageDielectric( f"calc_dielectric_$({data_replicator_id})" ) # For the dielectric constant, we employ a slightly more advanced reweighting # protocol set up for calculating fluctuation properties. reweight_dielectric = ReweightDielectricConstant("reweight_dielectric") reweight_dielectric.reference_dipole_moments = ProtocolPath( "uncorrelated_values", extract_dielectric.id ) reweight_dielectric.reference_volumes = ProtocolPath( "uncorrelated_volumes", extract_dielectric.id ) reweight_dielectric.thermodynamic_state = ProtocolPath( "thermodynamic_state", "global" ) reweight_dielectric.bootstrap_uncertainties = True reweight_dielectric.bootstrap_iterations = 200 reweight_dielectric.required_effective_samples = n_effective_samples protocols, data_replicator = generate_base_reweighting_protocols( extract_dielectric, reweight_dielectric, data_replicator_id ) # Make sure input is taken from the correct protocol outputs. extract_dielectric.system_path = ProtocolPath( "system_path", protocols.build_reference_system.id ) extract_dielectric.thermodynamic_state = ProtocolPath( "thermodynamic_state", protocols.unpack_stored_data.id ) # Set up the gradient calculations coordinate_path = ProtocolPath( "output_coordinate_path", protocols.concatenate_trajectories.id ) trajectory_path = ProtocolPath( "output_trajectory_path", protocols.concatenate_trajectories.id ) statistics_path = ProtocolPath( "statistics_file_path", protocols.reduced_target_potential.id ) reweight_dielectric_template = copy.deepcopy(reweight_dielectric) ( gradient_group, gradient_replicator, gradient_source, ) = generate_gradient_protocol_group( reweight_dielectric_template, ProtocolPath("force_field_path", "global"), coordinate_path, trajectory_path, statistics_path, replicator_id="grad", effective_sample_indices=ProtocolPath( "effective_sample_indices", reweight_dielectric.id ), ) schema = WorkflowSchema() schema.protocol_schemas = [ *(x.schema for x in protocols), gradient_group.schema, ] schema.protocol_replicators = [data_replicator, gradient_replicator] schema.gradients_sources = [gradient_source] schema.final_value_source = ProtocolPath("value", protocols.mbar_protocol.id) calculation_schema.workflow_schema = schema return calculation_schema
# Register the properties via the plugin system. register_calculation_schema( DielectricConstant, SimulationLayer, DielectricConstant.default_simulation_schema ) register_calculation_schema( DielectricConstant, ReweightingLayer, DielectricConstant.default_reweighting_schema )