Source code for openff.evaluator.protocols.analysis

A collection of protocols for running analysing the results of molecular simulations.
import abc
import itertools
import typing
from os import path

import numpy as np

from openff.evaluator import unit
from openff.evaluator.attributes import UNDEFINED
from openff.evaluator.forcefield import ParameterGradient, SmirnoffForceFieldSource
from openff.evaluator.forcefield.system import ParameterizedSystem
from openff.evaluator.thermodynamics import ThermodynamicState
from openff.evaluator.utils import timeseries
from openff.evaluator.utils.observables import (
from openff.evaluator.utils.openmm import openmm_quantity_to_pint, system_subset
from openff.evaluator.utils.timeseries import (
from openff.evaluator.workflow import Protocol, workflow_protocol
from openff.evaluator.workflow.attributes import (

if typing.TYPE_CHECKING:
    from simtk import openmm

E0 = 8.854187817e-12 * unit.farad / unit.meter  # Taken from QCElemental

def compute_dielectric_constant(
    dipole_moments: ObservableArray,
    volumes: ObservableArray,
    temperature: unit.Quantity,
) -> Observable:
    """A function to compute the average dielectric constant from an array of
    dipole moments and an array of volumes, whereby the average values of the
    observables are computed using a custom function.

        The dipole moments array.
        The volume array.
        The temperature at which the dipole_moments and volumes were sampled.
        The function to use when evaluating the average of an observable.

        The average value of the dielectric constant.

    dipole_moments_sqr = dipole_moments * dipole_moments
    dipole_moments_sqr = ObservableArray(
            ParameterGradient(gradient.key, gradient.value.sum(axis=1))
            for gradient in dipole_moments_sqr.gradients

    avg_sqr_dipole_moments = average_function(observable=dipole_moments_sqr)
    avg_sqr_dipole_moments = ObservableArray(
        avg_sqr_dipole_moments.value, avg_sqr_dipole_moments.gradients

    avg_dipole_moment = average_function(observable=dipole_moments)

    avg_dipole_moment_sqr = avg_dipole_moment * avg_dipole_moment
    avg_dipole_moment_sqr = ObservableArray(
            ParameterGradient(gradient.key, gradient.value.sum(axis=1))
            for gradient in avg_dipole_moment_sqr.gradients

    avg_volume = average_function(observable=volumes)
    avg_volume = ObservableArray(avg_volume.value, avg_volume.gradients)

    dipole_variance = avg_sqr_dipole_moments - avg_dipole_moment_sqr

    prefactor = 1.0 / (3.0 * E0 * unit.boltzmann_constant * temperature)

    dielectric_constant = 1.0 * unit.dimensionless + prefactor * (
        dipole_variance / avg_volume

    return Observable(
                    * gradient.value.units
                    / dielectric_constant.value.units
            for gradient in dielectric_constant.gradients

[docs]class BaseAverageObservable(Protocol, abc.ABC): """An abstract base class for protocols which will calculate the average value of an observable and its uncertainty via bootstrapping. """ bootstrap_iterations = InputAttribute( docstring="The number of bootstrap iterations to perform.", type_hint=int, default_value=250, merge_behavior=InequalityMergeBehaviour.LargestValue, ) bootstrap_sample_size = InputAttribute( docstring="The relative sample size to use for bootstrapping.", type_hint=float, default_value=1.0, merge_behavior=InequalityMergeBehaviour.LargestValue, ) thermodynamic_state = InputAttribute( docstring="The state at which the observables were computed. This is required " "to compute ensemble averages of the gradients of the observable with respect " "to force field parameters.", type_hint=ThermodynamicState, default_value=UNDEFINED, optional=True, ) potential_energies = InputAttribute( docstring="The potential energies which were evaluated at the same " "configurations and using the same force field parameters as the observable to " "average. This is required to compute ensemble averages of the gradients of " "the observable with respect to force field parameters.", type_hint=ObservableArray, default_value=UNDEFINED, optional=True, ) value = OutputAttribute( docstring="The average value of the observable.", type_hint=Observable ) time_series_statistics = OutputAttribute( docstring="Statistics about the observables from which the average was computed." "These include the statistical inefficiency and the index after which the " "observables have become stationary (i.e. equilibrated).", type_hint=TimeSeriesStatistics, ) @abc.abstractmethod def _observables(self) -> typing.Dict[str, ObservableArray]: """A function which should return the observables to pass to the ``_bootstrap_function`` function.""" raise NotImplementedError() def _bootstrap_function(self, **kwargs: ObservableArray) -> Observable: """The function to perform on the data set being sampled by bootstrapping. Parameters ---------- observables The bootstrap sample values. Returns ------- The result of evaluating the data. """ # The simple base function only supports a single observable. assert len(kwargs) == 1 # Compute the mean observable. sample_observable = next(iter(kwargs.values())) mean_observable = np.mean(sample_observable.value, axis=0) if sample_observable.value.shape[1] > 1: mean_observable = mean_observable.reshape(1, -1) else: mean_observable = mean_observable.item() # Retrieve the potential gradients for easy access potential_gradients = { gradient.key: gradient.value for gradient in ( [] if self.potential_energies == UNDEFINED else self.potential_energies.gradients ) } observable_gradients = { gradient.key: gradient for gradient in ( [] if self.potential_energies == UNDEFINED else sample_observable.gradients ) } # Compute the mean gradients. gradients = [] for gradient_key in observable_gradients: gradient = observable_gradients[gradient_key] value = np.mean(gradient.value, axis=0) - self.thermodynamic_state.beta * ( np.mean( sample_observable.value * potential_gradients[gradient.key], axis=0, ) - ( np.mean(sample_observable.value, axis=0) * np.mean(potential_gradients[gradient.key], axis=0) ) ) if sample_observable.value.shape[1] > 1: value = value.reshape(1, -1) else: value = value.item() gradients.append(ParameterGradient(key=gradient.key, value=value)) return_type = ( Observable if sample_observable.value.shape[1] == 1 else ObservableArray ) return return_type(value=mean_observable, gradients=gradients) def _execute(self, directory, available_resources): # Retrieve the list of observables to compute the average using. observables = self._observables() if len(observables) == 0: raise ValueError("There are no observables to average.") expected_length = len(next(iter(observables.values()))) if not any(len(observable) != expected_length for observable in observables): raise ValueError("The observables to average must have the same length.") if ( self.potential_energies != UNDEFINED and self.thermodynamic_state == UNDEFINED ): raise ValueError( "The `thermodynamic_state` must be provided when the " "`potential_energies` input is specified (i.e. when gradients should " "be computed." ) if self.potential_energies != UNDEFINED: potential_gradients = { gradient.key for gradient in self.potential_energies.gradients } if any( {gradient.key for gradient in observable.gradients} != potential_gradients for observable in observables.values() ): raise ValueError( "The potential energies must have been differentiated with respect " "to the same force field parameters as the observables of interest." ) # Find the largest equilibration time and statistical inefficiency for each of # the observables. equilibration_index = -1 statistical_inefficiency = 0.0 for observable in observables.values(): observable_statistics = analyze_time_series(observable.value.magnitude) equilibration_index = max( equilibration_index, observable_statistics.equilibration_index ) statistical_inefficiency = max( statistical_inefficiency, observable_statistics.statistical_inefficiency ) equilibration_index = int(equilibration_index) statistical_inefficiency = float(statistical_inefficiency) uncorrelated_indices = get_uncorrelated_indices( expected_length - equilibration_index, statistical_inefficiency ) self.time_series_statistics = TimeSeriesStatistics( n_total_points=expected_length, n_uncorrelated_points=len(uncorrelated_indices), statistical_inefficiency=statistical_inefficiency, equilibration_index=equilibration_index, ) # Decorrelate the observables. uncorrelated_observables = { key: observable.subset( [index + equilibration_index for index in uncorrelated_indices] ) for key, observable in observables.items() } if self.potential_energies != UNDEFINED: self.potential_energies = self.potential_energies.subset( [index + equilibration_index for index in uncorrelated_indices] ) self.value = bootstrap( self._bootstrap_function, self.bootstrap_iterations, self.bootstrap_sample_size, **uncorrelated_observables, )
[docs]@workflow_protocol() class AverageObservable(BaseAverageObservable): """Computes the average value of an observable as well as bootstrapped uncertainties for the average. """ observable = InputAttribute( docstring="The file path to the observable which should be averaged.", type_hint=ObservableArray, default_value=UNDEFINED, ) divisor = InputAttribute( docstring="A value to divide the statistic by. This is useful if a statistic " "(such as enthalpy) needs to be normalised by the number of molecules.", type_hint=typing.Union[int, float, unit.Quantity], default_value=1.0, ) def _observables(self) -> typing.Dict[str, ObservableArray]: return {"observable": self.observable / self.divisor}
[docs]@workflow_protocol() class AverageDielectricConstant(BaseAverageObservable): """Computes the average value of the dielectric constant from a set of dipole moments (M) and volumes (V) sampled over the course of a molecular simulation such that ``eps = 1 + (<M^2> - <M>^2) / (3.0 * eps_0 * <V> * kb * T)`` [1]_. References ---------- [1] A. Glattli, X. Daura and W. F. van Gunsteren. Derivation of an improved simple point charge model for liquid water: SPC/A and SPC/L. J. Chem. Phys. 116(22): 9811-9828, 2002 """ dipole_moments = InputAttribute( docstring="The dipole moments of each sampled configuration.", type_hint=ObservableArray, default_value=UNDEFINED, ) volumes = InputAttribute( docstring="The volume of each sampled configuration.", type_hint=ObservableArray, default_value=UNDEFINED, ) def _observables(self): return {"dipole_moments": self.dipole_moments, "volumes": self.volumes} def _bootstrap_function(self, **kwargs: ObservableArray): return compute_dielectric_constant( kwargs.pop("dipole_moments"), kwargs.pop("volumes"), self.thermodynamic_state.temperature, super(AverageDielectricConstant, self)._bootstrap_function, )
[docs]@workflow_protocol() class AverageFreeEnergies(Protocol): """A protocol which computes the Boltzmann weighted average (ΔG° = -RT × Log[ Σ_{n} exp(-βΔG°_{n}) ]) of a set of free energies which were measured at the same thermodynamic state. Confidence intervals are computed by bootstrapping with replacement. """ values: typing.List[Observable] = InputAttribute( docstring="The values to add together.", type_hint=list, default_value=UNDEFINED ) thermodynamic_state = InputAttribute( docstring="The thermodynamic state at which the free energies were measured.", type_hint=ThermodynamicState, default_value=UNDEFINED, ) bootstrap_cycles = InputAttribute( docstring="The number of bootstrap cycles to perform when estimating " "the uncertainty in the combined free energies.", type_hint=int, default_value=2000, ) result = OutputAttribute(docstring="The sum of the values.", type_hint=Observable) confidence_intervals = OutputAttribute( docstring="The 95% confidence intervals on the average free energy.", type_hint=unit.Quantity, ) def _execute(self, directory, available_resources): from scipy.special import logsumexp default_unit = unit.kilocalorie / unit.mole boltzmann_factor = ( self.thermodynamic_state.temperature * unit.molar_gas_constant ) boltzmann_factor.ito(default_unit) beta = 1.0 / boltzmann_factor values = [ (-beta * for value in self.values ] # Compute the mean. mean = logsumexp(values) # Compute the gradients of the mean. value_gradients = [ {gradient.key: -beta * gradient.value for gradient in value.gradients} for value in self.values ] value_gradients_by_key = { gradient_key: [ gradients_by_key[gradient_key] for gradients_by_key in value_gradients ] for gradient_key in value_gradients[0] } mean_gradients = [] for gradient_key, gradient_values in value_gradients_by_key.items(): expected_unit = value_gradients[0][gradient_key].units d_log_mean_numerator, d_mean_numerator_sign = logsumexp( values, b=[ for x in gradient_values], return_sign=True, ) d_mean_numerator = d_mean_numerator_sign * np.exp(d_log_mean_numerator) d_mean_d_theta = d_mean_numerator / np.exp(mean) mean_gradients.append( ParameterGradient( key=gradient_key, value=-boltzmann_factor * d_mean_d_theta * expected_unit, ) ) # Compute the standard error and 95% CI cycle_result = np.empty(self.bootstrap_cycles) for cycle_index, cycle in enumerate(range(self.bootstrap_cycles)): cycle_values = np.empty(len(self.values)) for value_index, value in enumerate(self.values): cycle_mean = cycle_sem = sampled_value = np.random.normal(cycle_mean, cycle_sem) * default_unit cycle_values[value_index] = ( (-beta * sampled_value).to(unit.dimensionless).magnitude ) # ΔG° = -RT × Log[ Σ_{n} exp(-βΔG°_{n}) ] cycle_result[cycle_index] = logsumexp(cycle_values) mean = -boltzmann_factor * mean sem = np.std(-boltzmann_factor * cycle_result) confidence_intervals = np.empty(2) sorted_statistics = np.sort(cycle_result) confidence_intervals[0] = sorted_statistics[int(0.025 * self.bootstrap_cycles)] confidence_intervals[1] = sorted_statistics[int(0.975 * self.bootstrap_cycles)] confidence_intervals = -boltzmann_factor * confidence_intervals self.result = Observable(value=mean.plus_minus(sem), gradients=mean_gradients) self.confidence_intervals = confidence_intervals
[docs] def validate(self, attribute_type=None): super(AverageFreeEnergies, self).validate(attribute_type) assert all(isinstance(x, Observable) for x in self.values) if len(self.values) == 0: return expected_gradients = {gradient.key for gradient in self.values[0].gradients} if not all( expected_gradients == {gradient.key for gradient in value.gradients} for value in self.values ): raise ValueError( "The values must contain gradient information for the same set of " "force field parameters." )
[docs]@workflow_protocol() class ComputeDipoleMoments(Protocol): """A protocol which will compute the dipole moment for each configuration in a trajectory and for a given parameterized system.""" parameterized_system = InputAttribute( docstring="The parameterized system which encodes the charge on each atom " "in the system.", type_hint=ParameterizedSystem, default_value=UNDEFINED, ) trajectory_path = InputAttribute( docstring="The file path to the trajectory of configurations.", type_hint=str, default_value=UNDEFINED, ) gradient_parameters = InputAttribute( docstring="An optional list of parameters to differentiate the dipole moments " "with respect to.", type_hint=list, default_value=lambda: list(), ) dipole_moments = OutputAttribute( docstring="The computed dipole moments.", type_hint=ObservableArray ) @classmethod def _extract_charges( cls, system: "openmm.System" ) -> typing.Optional[unit.Quantity]: """Retrieve the charge on each atom from an OpenMM system object. Parameters ---------- The system object containing the charges. Returns ------- The charge on each atom in the system if any are present, otherwise none. """ from simtk import openmm from simtk import unit as simtk_unit forces = [ system.getForce(force_index) for force_index in range(system.getNumForces()) if isinstance(system.getForce(force_index), openmm.NonbondedForce) ] if len(forces) > 1: raise ValueError( f"The system must contain no more than one non-bonded force, however " f"{len(forces)} were found." ) if len(forces) == 0: return None charges = np.array( [ forces[0] .getParticleParameters(atom_index)[0] .value_in_unit(simtk_unit.elementary_charge) for atom_index in range(forces[0].getNumParticles()) ] ) return charges * unit.elementary_charge def _compute_charge_derivatives(self, n_atoms: int): d_charge_d_theta = {key: np.zeros(n_atoms) for key in self.gradient_parameters} if len(self.gradient_parameters) > 0 and not isinstance( self.parameterized_system.force_field, SmirnoffForceFieldSource ): raise ValueError( "Derivates can only be computed for systems parameterized with " "SMIRNOFF force fields." ) force_field = self.parameterized_system.force_field.to_force_field() topology = self.parameterized_system.topology for key in self.gradient_parameters: reverse_system, reverse_value = system_subset(key, force_field, topology) forward_system, forward_value = system_subset( key, force_field, topology, 0.1 ) reverse_value = openmm_quantity_to_pint(reverse_value) forward_value = openmm_quantity_to_pint(forward_value) reverse_charges = self._extract_charges(reverse_system) forward_charges = self._extract_charges(forward_system) if reverse_charges is None and forward_charges is None: d_charge_d_theta[key] /= forward_value.units else: d_charge_d_theta[key] = (forward_charges - reverse_charges) / ( forward_value - reverse_value ) return d_charge_d_theta def _execute(self, directory, available_resources): import mdtraj charges = self._extract_charges(self.parameterized_system.system) charge_derivatives = self._compute_charge_derivatives(len(charges)) dipole_moments = [] dipole_gradients = {key: [] for key in self.gradient_parameters} for chunk in mdtraj.iterload( self.trajectory_path, top=self.parameterized_system.topology_path, chunk=50 ): xyz =, 2, 1) * unit.nanometers dipole_moments.extend( for key in self.gradient_parameters: dipole_gradients[key].extend([key])) self.dipole_moments = ObservableArray( value=np.vstack(dipole_moments), gradients=[ ParameterGradient(key=key, value=np.vstack(dipole_gradients[key])) for key in self.gradient_parameters ], )
[docs]class BaseDecorrelateProtocol(Protocol, abc.ABC): """An abstract base class for protocols which will subsample a set of data, yielding only equilibrated, uncorrelated data. """ time_series_statistics: typing.Union[ TimeSeriesStatistics, typing.List[TimeSeriesStatistics] ] = InputAttribute( docstring="Statistics about the data to decorrelate. This should include the " "statistical inefficiency and the index after which the observables have " "become stationary (i.e. equilibrated). If a list of such statistics are " "provided it will be assumed that multiple time series which have been " "joined together are being decorrelated and hence will each be decorrelated " "separately.", type_hint=typing.Union[list, TimeSeriesStatistics], default_value=UNDEFINED, ) def _n_expected(self) -> int: """Returns the expected number of samples to decorrelate.""" time_series_statistics = self.time_series_statistics if isinstance(time_series_statistics, TimeSeriesStatistics): time_series_statistics = [time_series_statistics] return sum(statistics.n_total_points for statistics in time_series_statistics) def _uncorrelated_indices(self) -> typing.List[int]: """Returns the indices of the time series being decorrelated to retain.""" time_series_statistics = self.time_series_statistics if isinstance(time_series_statistics, TimeSeriesStatistics): time_series_statistics = [time_series_statistics] n_cumulative = [ 0, *itertools.accumulate( [statistics.n_total_points for statistics in time_series_statistics] ), ] uncorrelated_indices = [ n_cumulative[statistics_index] + index + statistics.equilibration_index for statistics_index, statistics in enumerate(time_series_statistics) for index in timeseries.get_uncorrelated_indices( statistics.n_total_points - statistics.equilibration_index, statistics.statistical_inefficiency, ) ] assert len(uncorrelated_indices) == sum( statistics.n_uncorrelated_points for statistics in time_series_statistics ) return uncorrelated_indices
[docs]@workflow_protocol() class DecorrelateTrajectory(BaseDecorrelateProtocol): """A protocol which will subsample frames from a trajectory, yielding only uncorrelated frames as determined from a provided statistical inefficiency and equilibration time. """ input_coordinate_file = InputAttribute( docstring="The file path to the starting coordinates of a trajectory.", type_hint=str, default_value=UNDEFINED, ) input_trajectory_path = InputAttribute( docstring="The file path to the trajectory to subsample.", type_hint=str, default_value=UNDEFINED, ) output_trajectory_path = OutputAttribute( docstring="The file path to the subsampled trajectory.", type_hint=str ) @staticmethod def _yield_frame(file, topology, stride): """A generator which yields frames of a DCD trajectory. Parameters ---------- file: mdtraj.DCDTrajectoryFile The file object being used to read the trajectory. topology: mdtraj.Topology The object which describes the topology of the trajectory. stride Only read every stride-th frame. Returns ------- mdtraj.Trajectory A trajectory containing only a single frame. """ while True: frame = file.read_as_traj(topology, n_frames=1, stride=stride) if len(frame) == 0: return yield frame def _execute(self, directory, available_resources): import mdtraj from mdtraj.formats.dcd import DCDTrajectoryFile from mdtraj.utils import in_units_of # Set the output path. self.output_trajectory_path = path.join( directory, "uncorrelated_trajectory.dcd" ) # Load in the trajectories topology. topology = mdtraj.load_frame(self.input_coordinate_file, 0).topology # Parse the internal mdtraj distance unit. While private access is undesirable, # this is never publicly defined and I believe this route to be preferable # over hard coding this unit. # noinspection PyProtectedMember base_distance_unit = mdtraj.Trajectory._distance_unit # Determine the frames to retrain uncorrelated_indices = {*self._uncorrelated_indices()} frame_count = 0 with DCDTrajectoryFile(self.input_trajectory_path, "r") as input_file: with DCDTrajectoryFile(self.output_trajectory_path, "w") as output_file: for frame in self._yield_frame(input_file, topology, 1): if frame_count in uncorrelated_indices: output_file.write( xyz=in_units_of(, base_distance_unit, output_file.distance_unit ), cell_lengths=in_units_of( frame.unitcell_lengths, base_distance_unit, output_file.distance_unit, ), cell_angles=frame.unitcell_angles[0], ) frame_count += 1 assert frame_count == self._n_expected()
[docs]@workflow_protocol() class DecorrelateObservables(BaseDecorrelateProtocol): """A protocol which will subsample a trajectory of observables, yielding only uncorrelated entries as determined from a provided statistical inefficiency and equilibration time. """ input_observables = InputAttribute( docstring="The observables to decorrelate.", type_hint=typing.Union[ObservableArray, ObservableFrame], default_value=UNDEFINED, ) output_observables = OutputAttribute( docstring="The decorrelated observables.", type_hint=typing.Union[ObservableArray, ObservableFrame], ) def _execute(self, directory, available_resources): assert len(self.input_observables) == self._n_expected() uncorrelated_indices = self._uncorrelated_indices() uncorrelated_observable = self.input_observables.subset(uncorrelated_indices) self.output_observables = uncorrelated_observable