import abc
import ast
import copy
import json
import logging
import os
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Generic, List, Sequence, Tuple, TypeVar, Union
import numpy as np
from openff.qcsubmit.results import (
BasicResultCollection,
OptimizationResultCollection,
TorsionDriveResultCollection,
)
from openff.toolkit.topology import Molecule as OFFMolecule
from openff.toolkit.typing.engines.smirnoff import ForceField
from openff.units import unit
from qcelemental.models import AtomicResult
from qcelemental.models.procedures import OptimizationResult, TorsionDriveResult
from qcportal.optimization import OptimizationRecord
from qcportal.record_models import BaseRecord
from qcportal.torsiondrive import TorsiondriveRecord
from openff.bespokefit.exceptions import OptimizerError, QCRecordMissMatchError
from openff.bespokefit.optimizers.forcebalance.templates import (
AbInitioTargetTemplate,
InputOptionsTemplate,
OptGeoOptionsTemplate,
OptGeoTargetTemplate,
TorsionProfileTargetTemplate,
VibrationTargetTemplate,
)
from openff.bespokefit.schema.data import BespokeQCData, LocalQCData
from openff.bespokefit.schema.fitting import OptimizationStageSchema
from openff.bespokefit.schema.optimizers import ForceBalanceSchema
from openff.bespokefit.schema.targets import (
AbInitioTargetSchema,
OptGeoTargetSchema,
TargetSchema,
TorsionProfileTargetSchema,
VibrationTargetSchema,
)
from openff.bespokefit.utilities.tempcd import temporary_cd
if TYPE_CHECKING:
from qcelemental.models import Molecule as QCMolecule
_logger = logging.getLogger(__name__)
R = TypeVar("R", bound=Union[AtomicResult, OptimizationResult, TorsionDriveResult])
S = TypeVar("S", bound=BaseRecord)
T = TypeVar("T", bound=TargetSchema)
_TARGET_SECTION_TEMPLATES = {
AbInitioTargetSchema: AbInitioTargetTemplate,
TorsionProfileTargetSchema: TorsionProfileTargetTemplate,
OptGeoTargetSchema: OptGeoTargetTemplate,
VibrationTargetSchema: VibrationTargetTemplate,
}
def _standardize_grid_id_str(grid_id: str) -> tuple[Union[int, float]]:
"""Ensures a grid id is of the form '(grid_id_1,)' rather than 'grid_id_1' as is
sometimes the case when using QCEngine.
"""
grid_id = ast.literal_eval(grid_id)
grid_id = [grid_id] if isinstance(grid_id, int) else grid_id
return tuple(grid_id)
class _TargetFactory(Generic[T], abc.ABC):
@classmethod
@abc.abstractmethod
def _target_name_prefix(cls) -> str:
raise NotImplementedError()
@classmethod
def _section_extras_to_exclude(cls) -> List[str]:
"""The list of extras which may be present in a target schemas ``.extras``
dictionary to exclude when generating the section to add to the ``optimize.in``
file."""
return []
@classmethod
def _generate_targets_section(cls, target_template: T, target_names: List[str]):
"""Creates the target sections which will need to be added to the main
ForceBalance 'options.in' file."""
target_template = target_template.copy(deep=True)
target_template.extras = {
key: value
for key, value in target_template.extras.items()
if key not in cls._section_extras_to_exclude()
}
template_factory = _TARGET_SECTION_TEMPLATES[target_template.__class__]
return template_factory.generate(target_template, target_names)
@classmethod
def _batch_qc_records(
cls, target: TargetSchema, qc_records: List[Tuple[BaseRecord, OFFMolecule]]
) -> Dict[str, List[Tuple[BaseRecord, OFFMolecule]]]:
"""A function which places the input QC records into per target batches.
For most targets there will be a single record per target, however certain
targets (e.g. OptGeo) can perform on batches to reduce the time taken to
evaluate the target.
Args:
target: The target the inputs are being generated for.
qc_records: The QC records to batch
Returns:
A dictionary of the target name and a list of the records to include for that
target.
"""
def qc_record_id(qc_record):
return qc_record.extras["id"] if "id" in qc_record.extras else qc_record.id
return {
f"{cls._target_name_prefix()}-{qc_record_id(qc_record)}": [
(qc_record, molecule)
]
for qc_record, molecule in qc_records
}
@classmethod
@abc.abstractmethod
def _generate_target(
cls, target: T, qc_records: List[Tuple[BaseRecord, OFFMolecule]]
):
"""Create the required input files for a particular target.
Notes:
* This function assumes that the files should be created in the current
working directory.
"""
raise NotImplementedError()
@classmethod
def _local_to_qc_records(
cls, qc_data: LocalQCData[R]
) -> List[Tuple[R, OFFMolecule]]:
"""Converts a 'local' dataset of QCEngine outputs to a list of QC records."""
qc_records = []
for i, qc_result in enumerate(qc_data.qc_records):
qc_result = qc_result.copy(deep=True)
# Assign an id to the **QCElemental** result. This is a dirty way to do
# this but I can't think of a cleaner way to handle it...
qc_result.extras["id"] = str(i)
grid_ids = None
if isinstance(qc_result, AtomicResult):
cmiles = qc_result.molecule.extras[
"canonical_isomeric_explicit_hydrogen_mapped_smiles"
]
geometries = [qc_result.molecule.geometry]
elif isinstance(qc_result, OptimizationResult):
cmiles = qc_result.initial_molecule.extras[
"canonical_isomeric_explicit_hydrogen_mapped_smiles"
]
geometries = [qc_result.final_molecule.geometry]
elif isinstance(qc_result, TorsionDriveResult):
cmiles = qc_result.initial_molecule[0].extras[
"canonical_isomeric_explicit_hydrogen_mapped_smiles"
]
geometries = []
grid_ids = []
def _grid_id_key(x):
if isinstance(x, str):
# unwrap a str like "(-165,)"
x = ast.literal_eval(x)
if isinstance(x, Sequence):
x = int(x[0])
return x
for grid_id in sorted(qc_result.final_molecules, key=_grid_id_key):
grid_ids.append(_standardize_grid_id_str(grid_id))
geometries.append(qc_result.final_molecules[grid_id].geometry)
else:
raise NotImplementedError()
molecule: OFFMolecule = OFFMolecule.from_mapped_smiles(cmiles)
molecule._conformers = [
np.array(geometry, float).reshape(-1, 3) * unit.bohr
for geometry in geometries
]
if grid_ids is not None:
molecule.properties["grid_ids"] = grid_ids
qc_records.append((qc_result, molecule))
return qc_records
@classmethod
def generate(cls, root_directory: Union[str, Path], target: T):
"""Creates the input files for a target schema in a specified directory.
Args:
root_directory: The root directory to create the inputs in.
target: The target to create the inputs for. A single target schema may map
to several new ForceBalance targets, typically one per QC record
referenced by the schema.
"""
# The optimizer should have swapped out a bespoke QC data set
# with an existing set?
if isinstance(
target.reference_data,
(
BasicResultCollection,
OptimizationResultCollection,
TorsionDriveResultCollection,
),
):
qc_records = target.reference_data.to_records()
elif isinstance(target.reference_data, BespokeQCData):
raise RuntimeError(
"`BespokeQCData` must be converted into `LocalQCData` before generating "
"targets."
)
elif isinstance(target.reference_data, LocalQCData):
qc_records = cls._local_to_qc_records(target.reference_data)
else:
raise NotImplementedError()
target_batches = cls._batch_qc_records(target, qc_records)
for target_name, target_records in target_batches.items():
print(f"generating target directory for {target_name}")
target_directory = os.path.join(root_directory, target_name)
os.makedirs(target_directory, exist_ok=True)
with temporary_cd(target_directory):
cls._generate_target(target, target_records)
return cls._generate_targets_section(target, [*target_batches])
[docs]class AbInitioTargetFactory(_TargetFactory[AbInitioTargetSchema]):
@classmethod
def _target_name_prefix(cls) -> str:
return "ab-initio"
@classmethod
def _generate_target(
cls,
target: T,
qc_records: List[
Tuple[Union[TorsiondriveRecord, TorsionDriveResult], OFFMolecule]
],
):
from forcebalance.molecule import Molecule as FBMolecule
if isinstance(target, AbInitioTargetSchema) and target.fit_force is True:
raise NotImplementedError()
assert len(qc_records) == 1
qc_record, off_molecule = qc_records[0]
qc_record_id = (
qc_record.extras["id"] if "id" in qc_record.extras else qc_record.id
)
# form a Molecule object from the first torsion grid data
if isinstance(qc_record, TorsiondriveRecord):
grid_energies = qc_record.final_energies
elif isinstance(qc_record, TorsionDriveResult):
grid_energies = {
_standardize_grid_id_str(key): value
for key, value in qc_record.final_energies.items()
}
else:
raise NotImplementedError()
grid_conformers = {
grid_id: conformer.m_as(unit.angstrom)
for grid_id, conformer in zip(
off_molecule.properties["grid_ids"], off_molecule.conformers
)
}
grid_ids = sorted(grid_energies, key=lambda x: x[0])
# Create a FB molecule object from the QCData molecule.
fb_molecule = FBMolecule()
fb_molecule.Data = {
"resname": ["UNK"] * off_molecule.n_atoms,
"resid": [0] * off_molecule.n_atoms,
"elem": [atom.symbol for atom in off_molecule.atoms],
"bonds": [
(bond.atom1_index, bond.atom2_index) for bond in off_molecule.bonds
],
"name": f"{qc_record_id}",
"xyzs": [grid_conformers[grid_id] for grid_id in grid_ids],
"qm_energies": [grid_energies[grid_id] for grid_id in grid_ids],
"comms": [f"torsion grid {grid_id}" for grid_id in grid_ids],
}
# Write the data
fb_molecule.write("qdata.txt")
fb_molecule.write("scan.xyz")
off_molecule = copy.deepcopy(off_molecule)
off_molecule._conformers = [off_molecule.conformers[0]]
off_molecule.to_file("input.sdf", "SDF")
# OpenFF does not map molecules into PDB well (it leads to files where Br is
# confused with B for example), so we use ForceBalance instead.
fb_molecule.Data["xyzs"] = [fb_molecule.Data["xyzs"][0]]
del fb_molecule.Data["qm_energies"]
del fb_molecule.Data["comms"]
fb_molecule.write("conf.pdb")
try:
# This is a QCPortal object ...
# qcportal.torsiondrive.record_models.TorsiondriveRecord
metadata: dict = qc_record.specification.optimization_specification.keywords
except AttributeError:
# unless it's actually a qcelemental object, specifically a
# qcelemental.models.procedures.TorsionDriveResult
metadata: dict = qc_record.optimization_spec.keywords
metadata["torsion_grid_ids"] = [
grid_id if not isinstance(grid_id, str) else tuple(json.loads(grid_id))
for grid_id in grid_ids
]
metadata["energy_decrease_thresh"] = None
metadata["energy_upper_limit"] = target.energy_cutoff
with open("metadata.json", "w") as file:
file.write(json.dumps(metadata))
[docs]class TorsionProfileTargetFactory(
AbInitioTargetFactory, _TargetFactory[TorsionProfileTargetSchema]
):
@classmethod
def _target_name_prefix(cls) -> str:
return "torsion"
@classmethod
def _generate_target(
cls,
target: TorsionProfileTargetSchema,
qc_records: List[
Tuple[Union[TorsiondriveRecord, TorsionDriveResult], OFFMolecule]
],
):
# noinspection PyTypeChecker
super(TorsionProfileTargetFactory, cls)._generate_target(target, qc_records)
qc_record, off_molecule = qc_records[0]
if isinstance(qc_record, TorsiondriveRecord):
grid_energies = qc_record.final_energies
metadata = qc_record.specification.optimization_specification.keywords
metadata["dihedrals"] = qc_record.specification.keywords.dihedrals
elif isinstance(qc_record, TorsionDriveResult):
grid_energies = {
_standardize_grid_id_str(key): value
for key, value in qc_record.final_energies.items()
}
metadata = qc_record.optimization_spec.keywords
metadata["dihedrals"] = qc_record.keywords.dihedrals
else:
raise NotImplementedError()
grid_ids = sorted(grid_energies, key=lambda x: x[0])
metadata["torsion_grid_ids"] = [
grid_id if not isinstance(grid_id, str) else tuple(json.loads(grid_id))
for grid_id in grid_ids
]
metadata["energy_decrease_thresh"] = None
metadata["energy_upper_limit"] = target.energy_cutoff
with open("metadata.json", "w") as file:
file.write(json.dumps(metadata))
[docs]class VibrationTargetFactory(_TargetFactory[VibrationTargetSchema]):
@classmethod
def _target_name_prefix(cls) -> str:
return "vib-freq"
@classmethod
def _compute_normal_modes(
cls, mass_weighted_hessian: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""Computes the normal modes from a mass weighted hessian.
Notes:
This function was taken from the ``make_vib_freq_target.py`` script in the
``openforcefield-forcebalance`` repository at commit hash f689096.
Args:
mass_weighted_hessian: The mass weighted hessian to compute the normal
modes from.
Returns:
A tuple of the frequencies and the corresponding normal modes.
"""
# 1. diagonalize the hessian matrix
eigen_values, eigen_vectors = np.linalg.eigh(mass_weighted_hessian)
# 2. convert eigenvalues to frequencies
# TODO: Magic numbers need to be replaced with simtk based conversion factors.
coefficient = 0.5 / np.pi * 33.3564095 # 10^12 Hz => cm-1
negatives = (eigen_values >= 0).astype(int) * 2 - 1 # record the negative ones
frequencies = np.sqrt(np.abs(eigen_values)) * coefficient * negatives
# 3. convert eigenvectors to normal modes
noa = int(len(mass_weighted_hessian) / 3)
normal_modes = eigen_vectors
normal_modes = normal_modes.T.reshape(noa * 3, noa, 3)
# step 5: remove the 6 freqs with smallest abs value and corresponding normal
# modes
n_remove = 5 if noa == 2 else 6
larger_freq_idxs = np.sort(
np.argpartition(np.abs(frequencies), n_remove)[n_remove:]
)
frequencies = frequencies[larger_freq_idxs]
normal_modes = normal_modes[larger_freq_idxs]
return frequencies, normal_modes
@classmethod
def _create_vdata_file(
cls,
qc_record: Union[BaseRecord, "AtomicResult"],
qc_molecule: "QCMolecule",
off_molecule: OFFMolecule,
):
"""Creates a file containing the vibrational frequency data.
Notes:
This function was taken from the ``make_vib_freq_target.py`` script in the
``openforcefield-forcebalance`` repository at commit hash f689096.
Args:
qc_record: The QC record containing the target hessian data.
qc_molecule: The QC molecule associated with the QC record.
off_molecule: An OpenFF representation of the QC molecule.
"""
qc_record_id = (
qc_record.extras["id"] if "id" in qc_record.extras else qc_record.id
)
try:
# this is a qcportal.record_models.BaseRecord
driver = qc_record.specification.driver.value
except AttributeError:
# or a qcelemental.models.results.AtomicResult
driver = qc_record.driver.value
if driver != "hessian" or qc_record.return_result is None:
raise QCRecordMissMatchError(
f"The QC record with id={qc_record_id} does not contain the gradient "
f"information required by a vibration fitting target."
)
# Check the magnitude of the gradient
try:
# qcportal.record_models.BaseRecord
gradient = qc_record.properties["current gradient"]
except (AttributeError, TypeError):
# qcelemental.models.results.AtomicResult
gradient = qc_record.properties.return_gradient
if np.abs(gradient).max() > 1e-3:
_logger.warning(
f"the max gradient of record={qc_record_id} is greater than 1e-3"
)
# Get the list of masses for the molecule to be consistent with ForceBalance
masses = np.array([atom.mass.m_as(unit.dalton) for atom in off_molecule.atoms])
# Compute the mass-weighted hessian
invert_sqrt_mass_array_repeat = 1.0 / np.sqrt(masses.repeat(3))
hessian = np.asarray(qc_record.return_result).reshape(
(len(masses) * 3, len(masses) * 3)
)
mass_weighted_hessian = (
hessian
* invert_sqrt_mass_array_repeat[:, np.newaxis]
* invert_sqrt_mass_array_repeat[np.newaxis, :]
)
# Convert units from Eh / bohr^2 * dalton to 10^24 s^-2
# TODO: Magic numbers need to be replaced with simtk based conversion factors.
mass_weighted_hessian *= 937583.07
# Perform the normal mode analysis
frequencies, normal_modes = cls._compute_normal_modes(mass_weighted_hessian)
# write vdata.txt
with open("vdata.txt", "w") as file:
file.write(qc_molecule.to_string("xyz") + "\n")
for frequency, normal_mode in zip(frequencies, normal_modes):
file.write(f"{frequency}\n")
for nx, ny, nz in normal_mode:
file.write(f"{nx:13.4f} {ny:13.4f} {nz:13.4f}\n")
file.write("\n")
@classmethod
def _generate_target(
cls,
target: VibrationTargetSchema,
qc_records: List[Tuple[Union[BaseRecord, "AtomicResult"], OFFMolecule]],
):
from forcebalance.molecule import Molecule as FBMolecule
assert len(qc_records) == 1
qc_record, off_molecule = qc_records[0]
qc_record_id = (
qc_record.extras["id"] if "id" in qc_record.extras else qc_record.id
)
fb_molecule = FBMolecule()
fb_molecule.Data = {
"resname": ["UNK"] * off_molecule.n_atoms,
"resid": [0] * off_molecule.n_atoms,
"elem": [atom.symbol for atom in off_molecule.atoms],
"bonds": [
(bond.atom1_index, bond.atom2_index) for bond in off_molecule.bonds
],
"name": f"{qc_record_id}",
"xyzs": [off_molecule.conformers[0].m_as(unit.angstrom)],
}
# form a Molecule object from the first torsion grid data
fb_molecule.write("conf.pdb")
off_molecule.to_file("input.sdf", "SDF")
cls._create_vdata_file(qc_record, off_molecule.to_qcschema(), off_molecule)
[docs]class OptGeoTargetFactory(_TargetFactory[OptGeoTargetSchema]):
@classmethod
def _target_name_prefix(cls) -> str:
return "opt-geo"
@classmethod
def _section_extras_to_exclude(cls) -> List[str]:
return ["batch_size"]
@classmethod
def _batch_qc_records(
cls, target: OptGeoTargetSchema, qc_records: List[BaseRecord]
):
batch_size = int(target.extras.get("batch_size", 50))
n_records = len(qc_records)
n_targets = (n_records + batch_size - 1) // batch_size
return {
f"{cls._target_name_prefix()}-batch-{target_index}": qc_records[
target_index * batch_size : (target_index + 1) * batch_size
]
for target_index in range(n_targets)
}
@classmethod
def _generate_target(
cls,
target: OptGeoTargetSchema,
qc_records: List[
Tuple[Union[OptimizationRecord, OptimizationResult], OFFMolecule]
],
):
from forcebalance.molecule import Molecule as FBMolecule
record_names = []
for i, (qc_record, off_molecule) in enumerate(qc_records):
qc_record_id = (
qc_record.extras["id"] if "id" in qc_record.extras else qc_record.id
)
record_name = f"{qc_record_id}-{i}"
record_names.append(record_name)
# form a Molecule object from the first torsion grid data
qc_molecule = off_molecule.to_qcschema()
with open(f"{record_name}.xyz", "w") as file:
file.write(qc_molecule.to_string("xyz"))
fb_molecule = FBMolecule()
fb_molecule.Data = {
"resname": ["UNK"] * off_molecule.n_atoms,
"resid": [0] * off_molecule.n_atoms,
"elem": [atom.symbol for atom in off_molecule.atoms],
"bonds": [
(bond.atom1_index, bond.atom2_index) for bond in off_molecule.bonds
],
"name": f"{qc_record_id}",
"xyzs": [off_molecule.conformers[0].m_as(unit.angstrom)],
}
fb_molecule.write(f"{record_name}.pdb")
off_molecule.to_file(f"{record_name}.sdf", "SDF")
# Create the options file
with open("optgeo_options.txt", "w") as file:
file.write(OptGeoOptionsTemplate.generate(target, record_names))