Source code for openff.recharge.esp.storage._storage

"""This module contains classes which are able to store and retrieve
calculated electrostatic potentials in unified data collections.
"""

import warnings
import functools
from collections import defaultdict
from contextlib import contextmanager
from typing import TYPE_CHECKING, ContextManager, Dict, List, Optional

import numpy
from openff.units import unit, Quantity
from openff.recharge._pydantic import BaseModel, Field
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from openff.toolkit.utils.exceptions import AtomMappingWarning
from openff.recharge.esp import ESPSettings
from openff.recharge.esp.storage.db import (
    DB_VERSION,
    DBBase,
    DBConformerRecord,
    DBESPSettings,
    DBGeneralProvenance,
    DBGridSettings,
    DBInformation,
    DBMoleculeRecord,
    DBPCMSettings,
    DBSoftwareProvenance,
)
from openff.recharge.esp.storage.exceptions import IncompatibleDBVersion

if TYPE_CHECKING:
    from openff.toolkit import Molecule

    Array = numpy.ndarray
else:
    from openff.recharge.utilities.pydantic import Array, wrapped_array_validator


[docs]class MoleculeESPRecord(BaseModel): """A record which contains information about the molecule that the ESP was measured for (including the exact conformer coordinates), provenance about how the ESP was calculated, and the values of the ESP at each of the grid points.""" tagged_smiles: str = Field( ..., description="The tagged SMILES patterns (SMARTS) which encodes both the " "molecule stored in this record, a map between the atoms and the molecule and " "their coordinates.", ) conformer: Array[float] = Field( ..., description="The coordinates [Angstrom] of this conformer with " "shape=(n_atoms, 3).", ) grid_coordinates: Array[float] = Field( ..., description="The grid coordinates [Angstrom] which the ESP was calculated on " "with shape=(n_grid_points, 3).", ) esp: Array[float] = Field( ..., description="The value of the ESP [Hartree / e] at each of the grid " "coordinates with shape=(n_grid_points, 1).", ) electric_field: Optional[Array[float]] = Field( ..., description="The value of the electric field [Hartree / (e . a0)] at each of " "the grid coordinates with shape=(n_grid_points, 3).", ) esp_settings: ESPSettings = Field( ..., description="The settings used to generate the ESP stored in this record." ) _validate_conformer = wrapped_array_validator("conformer", unit.angstrom) _validate_grid = wrapped_array_validator("grid_coordinates", unit.angstrom) _validate_esp = wrapped_array_validator("esp", unit.hartree / unit.e) _validate_field = wrapped_array_validator( "electric_field", unit.hartree / (unit.bohr * unit.e) ) @property def conformer_quantity(self) -> Quantity: return self.conformer * unit.angstrom @property def grid_coordinates_quantity(self) -> Quantity: return self.grid_coordinates * unit.angstrom @property def esp_quantity(self) -> Quantity: return self.esp * unit.hartree / unit.e @property def electric_field_quantity(self) -> Quantity: return self.electric_field * unit.hartree / (unit.bohr * unit.e)
[docs] @classmethod def from_molecule( cls, molecule: "Molecule", conformer: Quantity, grid_coordinates: Quantity, esp: Quantity, electric_field: Optional[Quantity], esp_settings: ESPSettings, ) -> "MoleculeESPRecord": """Creates a new ``MoleculeESPRecord`` from an existing molecule object, taking care of creating the InChI and SMARTS representations. Parameters ---------- molecule The molecule to store in the record. conformer The coordinates [Angstrom] of this conformer with shape=(n_atoms, 3). grid_coordinates The grid coordinates [Angstrom] which the ESP was calculated on with shape=(n_grid_points, 3). esp The value of the ESP [Hartree / e] at each of the grid coordinates with shape=(n_grid_points, 1). electric_field The value of the electric field [Hartree / (e . a0)] at each of the grid coordinates with shape=(n_grid_points, 3). esp_settings The settings used to generate the ESP stored in this record. Returns ------- The created record. """ tagged_smiles = molecule.to_smiles( isomeric=True, explicit_hydrogens=True, mapped=True ) return MoleculeESPRecord( tagged_smiles=tagged_smiles, conformer=conformer, grid_coordinates=grid_coordinates, esp=esp, electric_field=electric_field, esp_settings=esp_settings, )
class Config: orm_mode = True
[docs]class MoleculeESPStore: """A class used to store the electrostatic potentials and the grid on which they were evaluated for multiple molecules in multiple conformers, as well as to retrieve and query this stored data. This class currently can only store the data in a SQLite data base. """ @property def db_version(self) -> int: with self._get_session() as db: db_info = db.query(DBInformation).first() return db_info.version @property def general_provenance(self) -> Dict[str, str]: with self._get_session() as db: db_info = db.query(DBInformation).first() return { provenance.key: provenance.value for provenance in db_info.general_provenance } @property def software_provenance(self) -> Dict[str, str]: with self._get_session() as db: db_info = db.query(DBInformation).first() return { provenance.key: provenance.value for provenance in db_info.software_provenance }
[docs] def __init__(self, database_path: str = "esp-store.sqlite"): """ Parameters ---------- database_path The path to the SQLite database to store to and retrieve data from. """ self._database_url = f"sqlite:///{database_path}" self._engine = create_engine(self._database_url, echo=False) DBBase.metadata.create_all(self._engine) self._session_maker = sessionmaker( autocommit=False, autoflush=False, bind=self._engine ) # Validate the DB version if present, or add one if not. with self._get_session() as db: db_info = db.query(DBInformation).first() if not db_info: db_info = DBInformation(version=DB_VERSION) db.add(db_info) if db_info.version != DB_VERSION: raise IncompatibleDBVersion(db_info.version, DB_VERSION)
[docs] def set_provenance( self, general_provenance: Dict[str, str], software_provenance: Dict[str, str], ): """Set the stores provenance information. Parameters ---------- general_provenance A dictionary storing provenance about the store such as the author, which QCArchive data set it was generated from, when it was generated etc. software_provenance A dictionary storing the provenance of the software and packages used to generate the data in the store. """ with self._get_session() as db: db_info: DBInformation = db.query(DBInformation).first() db_info.general_provenance = [ DBGeneralProvenance(key=key, value=value) for key, value in general_provenance.items() ] db_info.software_provenance = [ DBSoftwareProvenance(key=key, value=value) for key, value in software_provenance.items() ]
@contextmanager def _get_session(self) -> ContextManager[Session]: session = self._session_maker() try: yield session session.commit() except BaseException as e: session.rollback() raise e finally: session.close() @classmethod def _db_records_to_model( cls, db_records: List[DBMoleculeRecord] ) -> List[MoleculeESPRecord]: """Maps a set of database records into their corresponding data models. Parameters ---------- db_records The records to map. Returns ------- The mapped data models. """ # noinspection PyTypeChecker return [ MoleculeESPRecord( tagged_smiles=db_conformer.tagged_smiles, conformer=db_conformer.coordinates, grid_coordinates=db_conformer.grid, esp=db_conformer.esp, electric_field=db_conformer.field, esp_settings=ESPSettings( basis=db_conformer.esp_settings.basis, method=db_conformer.esp_settings.method, grid_settings=DBGridSettings.db_to_instance( db_conformer.grid_settings ), pcm_settings=( None if not db_conformer.pcm_settings else DBPCMSettings.db_to_instance(db_conformer.pcm_settings) ), ), ) for db_record in db_records for db_conformer in db_record.conformers ] @classmethod def _store_smiles_records( cls, db: Session, smiles: str, records: List[MoleculeESPRecord] ) -> DBMoleculeRecord: """Stores a set of records which all store information for the same molecule. Parameters ---------- db The current database session. smiles The smiles representation of the molecule. records The records to store. """ existing_db_molecule = ( db.query(DBMoleculeRecord).filter(DBMoleculeRecord.smiles == smiles).first() ) if existing_db_molecule is not None: db_record = existing_db_molecule else: db_record = DBMoleculeRecord(smiles=smiles) # noinspection PyTypeChecker # noinspection PyUnresolvedReferences db_record.conformers.extend( DBConformerRecord( tagged_smiles=record.tagged_smiles, coordinates=record.conformer, grid=record.grid_coordinates, esp=record.esp, field=record.electric_field, grid_settings=DBGridSettings.unique( db, record.esp_settings.grid_settings ), pcm_settings=( None if not record.esp_settings.pcm_settings else DBPCMSettings.unique(db, record.esp_settings.pcm_settings) ), esp_settings=DBESPSettings.unique(db, record.esp_settings), ) for record in records ) if existing_db_molecule is None: db.add(db_record) return db_record @classmethod @functools.lru_cache(10000) def _tagged_to_canonical_smiles(cls, tagged_smiles: str) -> str: """Converts a smiles pattern which contains atom indices into a canonical smiles pattern without indices. Parameters ---------- tagged_smiles The tagged smiles pattern to convert. Returns ------- The canonical smiles pattern. """ from openff.toolkit import Molecule with warnings.catch_warnings(): warnings.simplefilter("ignore", category=AtomMappingWarning) smiles = Molecule.from_smiles( tagged_smiles, allow_undefined_stereo=True ).to_smiles(isomeric=False, explicit_hydrogens=False, mapped=False) return smiles
[docs] def store(self, *records: MoleculeESPRecord): """Store the electrostatic potentials calculated for a given molecule in the data store. Parameters ---------- records The records to store. Returns ------- The records as they appear in the store. """ # Validate an re-partition the records by their smiles patterns. records_by_smiles: Dict[str, List[MoleculeESPRecord]] = defaultdict(list) for record in records: record = MoleculeESPRecord(**record.dict()) smiles = self._tagged_to_canonical_smiles(record.tagged_smiles) records_by_smiles[smiles].append(record) # Store the records. with self._get_session() as db: for smiles in records_by_smiles: self._store_smiles_records(db, smiles, records_by_smiles[smiles])
[docs] def retrieve( self, smiles: Optional[str] = None, basis: Optional[str] = None, method: Optional[str] = None, implicit_solvent: Optional[bool] = None, ) -> List[MoleculeESPRecord]: """Retrieve records stored in this data store, optionally according to a set of filters.""" with self._get_session() as db: db_records = db.query(DBMoleculeRecord) if smiles is not None: smiles = self._tagged_to_canonical_smiles(smiles) db_records = db_records.filter(DBMoleculeRecord.smiles == smiles) if basis is not None or method is not None or implicit_solvent is not None: db_records = db_records.join(DBConformerRecord) if basis is not None or method is not None: db_records = db_records.join( DBESPSettings, DBConformerRecord.esp_settings ) if basis is not None: db_records = db_records.filter(DBESPSettings.basis == basis) if method is not None: db_records = db_records.filter(DBESPSettings.method == method) if implicit_solvent is not None: if implicit_solvent: db_records = db_records.filter( DBConformerRecord.pcm_settings_id.isnot(None) ) else: db_records = db_records.filter( DBConformerRecord.pcm_settings_id.is_(None) ) db_records = db_records.all() records = self._db_records_to_model(db_records) if basis: records = [ record for record in records if record.esp_settings.basis == basis ] if method: records = [ record for record in records if record.esp_settings.method == method ] return records
[docs] def list(self) -> List[str]: """Lists the molecules which exist in and may be retrieved from the store.""" with self._get_session() as db: return [smiles for (smiles,) in db.query(DBMoleculeRecord.smiles).all()]