Source code for openff.evaluator.storage.storage

"""
Defines the base API for the openff-evaluator storage backend.
"""
import abc
import uuid
from collections import defaultdict
from threading import RLock
from typing import Dict

from openff.evaluator.attributes import Attribute
from openff.evaluator.storage.data import (
    BaseStoredData,
    ForceFieldData,
    HashableStoredData,
    ReplaceableData,
)


[docs]class StorageBackend(abc.ABC): """An abstract base representation of how the openff-evaluator will interact with and store simulation data. Notes ----- When implementing this class, only private methods should be overridden as the public methods only mainly implement thread locks, while their private version perform their actual function. """ class _ObjectKeyData(BaseStoredData): """An object which keeps track of the items in the storage system. """ object_keys = Attribute( docstring="The unique keys of the objects stored in a `StorageBackend`.", type_hint=dict, default_value=dict(), ) @classmethod def has_ancillary_data(cls): return False def to_storage_query(self): # This should never be called so doesn't need an # implementation. raise NotImplementedError()
[docs] def __init__(self): """Constructs a new StorageBackend object.""" self._stored_object_keys = None self._stored_object_keys_id = "object_keys" # Store a map between the unique id of a stored object, # and its hash value for easy comparision. self._object_hashes: Dict[int, str] = dict() # Create a thread lock to prevent concurrent # thread access. self._lock = RLock() self._load_stored_object_keys()
def _load_stored_object_keys(self): """Load the unique key to each object stored in the storage system. """ keys_object, _ = self._retrieve_object(self._stored_object_keys_id) if keys_object is None: keys_object = StorageBackend._ObjectKeyData() assert isinstance(keys_object, StorageBackend._ObjectKeyData) stored_object_keys = keys_object.object_keys self._stored_object_keys = defaultdict(list) all_object_keys = set() for data_type in stored_object_keys: for unique_key in stored_object_keys[data_type]: if not self._object_exists(unique_key): # The stored entry key does not exist in the system, # so skip the entry. This may happen when the local # file do not exist on disk any more for example. continue if unique_key in all_object_keys: raise KeyError( "Two objects with the same unique key have been found." ) stored_object, _ = self.retrieve_object(unique_key) # Make sure the data matches the expected type and is valid. assert stored_object.__class__.__name__ == data_type stored_object.validate() if isinstance(stored_object, HashableStoredData): self._object_hashes[hash(stored_object)] = unique_key self._stored_object_keys[data_type].append(unique_key) all_object_keys.add(unique_key) # Store a fresh copy of the key dictionary so that only entries # that exist in the system actually referenced. self._save_stored_object_keys() def _save_stored_object_keys(self): """Save the unique key of each of the objects stored in the storage system.""" keys_object = StorageBackend._ObjectKeyData() keys_object.object_keys = self._stored_object_keys self._store_object(keys_object, self._stored_object_keys_id) @abc.abstractmethod def _object_exists(self, storage_key): """Check whether an object with the specified key exists in the storage system. Parameters ---------- storage_key: str A unique key that describes where the stored object can be found within the storage system. Returns ------- True if the object is within the storage system. """ raise NotImplementedError() def _is_key_unique(self, storage_key): """Checks whether a given key is already in the storage system. Parameters ---------- storage_key: str The key to check for. Returns ------- bool `True` if the key exists in the system, `False` otherwise. """ # Make sure the key in unique. return not any( storage_key in self._stored_object_keys[data_type] for data_type in self._stored_object_keys ) @abc.abstractmethod def _store_object( self, object_to_store, storage_key=None, ancillary_data_path=None ): """The internal implementation of the `store_object` method. It is safe to assume here that all object and key validation has already been performed, and that this method is called under a thread lock. Notes ----- This method should overwrite any existing data with the same key. Parameters ---------- object_to_store: BaseStoredData The object to store. storage_key: str, optional A unique key to associate with the stored object. If `None`, one will be randomly generated ancillary_data_path: str, optional The data path to the ancillary directory-like data to store alongside the object if the data type requires one. """ raise NotImplementedError()
[docs] def store_object(self, object_to_store, ancillary_data_path=None): """Store an object in the storage system, returning the key of the stored object. This may be different to `storage_key` depending on whether the same or a similar object was already present in the system. Parameters ---------- object_to_store: BaseStoredData The object to store. ancillary_data_path: str, optional The data path to the ancillary directory-like data to store alongside the object if the data type requires one. Returns ------- str The unique key assigned to the stored object. """ # Make sure the object is valid. if object_to_store is None: raise ValueError("The object to store cannot be None.") object_to_store.validate() # Make sure the object is a supported type. if not isinstance(object_to_store, BaseStoredData): raise ValueError( "Only objects inheriting from `BaseStoredData` can " "be stored in the storage system." ) # Make sure we have ancillary data if required. object_class = object_to_store.__class__ if object_class.has_ancillary_data() and ancillary_data_path is None: raise ValueError("This object requires ancillary data.") # Check whether the exact same object already exists within # the storage system based on its hash. storage_key = self.has_object(object_to_store) if storage_key is not None: if not isinstance(object_to_store, ReplaceableData): # Handle the case where the existing data # should be returned, rather than storing # the passed object. return storage_key existing_object, _ = self.retrieve_object(storage_key, ReplaceableData) # noinspection PyTypeChecker object_to_store = object_to_store.most_information( existing_object, object_to_store ) if object_to_store is None: raise ValueError( "Something went wrong when trying to " "determine whether the object trying to " "be stored is redundant." ) elif object_to_store == existing_object: # Don't try to re-store the existing object. return storage_key else: # Generate a unique id for this object. while storage_key is None or not self._is_key_unique(storage_key): storage_key = str(uuid.uuid4()).replace("-", "") # Hash this object if appropriate if isinstance(object_to_store, HashableStoredData): self._object_hashes[hash(object_to_store)] = storage_key # Save the object into the storage system with the given key. with self._lock: self._store_object(object_to_store, storage_key, ancillary_data_path) # Register the key in the storage system. if ( not isinstance(object_to_store, StorageBackend._ObjectKeyData) and storage_key not in self._stored_object_keys[object_class.__name__] ): self._stored_object_keys[object_class.__name__].append(storage_key) self._save_stored_object_keys() return storage_key
[docs] def store_force_field(self, force_field): """A convenience method for storing `ForceFieldSource` objects. Parameters ---------- force_field: ForceFieldSource The force field to store. Returns ------- str The unique id of the stored force field. """ force_field_data = ForceFieldData() force_field_data.force_field_source = force_field return self.store_object(force_field_data)
@abc.abstractmethod def _retrieve_object(self, storage_key, expected_type=None): """The internal implementation of the `retrieve_object` method. It is safe to assume that this method is called under a thread lock. Parameters ---------- storage_key: str A unique key that describes where the stored object can be found within the storage system. expected_type: type of BaseStoredData, optional The expected data type. An exception is raised if the retrieved data doesn't match the type. Returns ------- BaseStoredData, optional The stored object if the object key is found, otherwise None. str, optional The path to the ancillary data if present. """ raise NotImplementedError()
[docs] def retrieve_object(self, storage_key, expected_type=None): """Retrieves a stored object for the estimators storage system. Parameters ---------- storage_key: str A unique key that describes where the stored object can be found within the storage system. expected_type: type of BaseStoredData, optional The expected data type. An exception is raised if the retrieved data doesn't match the type. Returns ------- BaseStoredData, optional The stored object if the object key is found, otherwise None. str, optional The path to the ancillary data if present. """ with self._lock: return self._retrieve_object(storage_key, expected_type)
[docs] def retrieve_force_field(self, storage_key): """A convenience method for retrieving `ForceFieldSource` objects. Parameters ---------- storage_key: str The key of the force field to retrieve. Returns ------- ForceFieldSource The retrieved force field source. """ force_field_data, _ = self.retrieve_object(storage_key, ForceFieldData) if force_field_data is None: raise KeyError( f"The force field with id {storage_key} does not exist " f"in the storage system." ) return force_field_data.force_field_source
def _has_object(self, storage_object): """The internal implementation of the `has_object` method. It is safe to assume that this method is called under a thread lock. Parameters ---------- storage_object: BaseStoredData The object to check for. Returns ------- str, optional The unique key of the object if it is in the system, `None` otherwise. """ if isinstance(storage_object, HashableStoredData): hash_key = hash(storage_object) return self._object_hashes.get(hash_key, None) data_query = storage_object.to_storage_query() query_results = self.query(data_query) if len(query_results) == 0: return None if len(query_results) > 1 or len(query_results[0]) > 1: raise ValueError( "The backend contains multiple copies of the " "same piece of data. This should not be possible." ) storage_key, _, _ = next(iter(query_results.values()))[0] return storage_key
[docs] def has_object(self, storage_object): """Checks whether a given hashable object exists in the storage system. Parameters ---------- storage_object: BaseStoredData The object to check for. Returns ------- str, optional The unique key of the object if it is in the system, `None` otherwise. """ with self._lock: return self._has_object(storage_object)
[docs] def has_force_field(self, force_field): """A convenience method for checking whether the specified `ForceFieldSource` object is stored in the backend. Parameters ---------- force_field: ForceFieldSource The force field to look for. Returns ------- str, optional The unique key of the object if it is in the system, `None` otherwise. """ force_field_data = ForceFieldData() force_field_data.force_field_source = force_field return self.has_object(force_field_data)
def _query(self, data_query): """The internal implementation of the `query` method. It is safe to assume that this method is called under a thread lock. Parameters ---------- data_query: BaseDataQuery The query to perform. Returns ------- dict of tuple and list of tuple of str, BaseStoredData and str The data that matches the query partitioned by the matched values. The list values take the form (storage_key, data_object, data_directory_path). """ data_class = data_query.data_class() results = defaultdict(list) if len(self._stored_object_keys.get(data_class.__name__, [])) == 0: # Exit early of there are no objects of the correct type. return results for unique_key in self._stored_object_keys[data_class.__name__]: if not self._object_exists(unique_key): # Make sure the object is still in the system. continue stored_object, stored_directory = self.retrieve_object( unique_key, data_class ) matches = data_query.apply(stored_object) if matches is None: continue results[matches].append((unique_key, stored_object, stored_directory)) return results
[docs] def query(self, data_query): """Query the storage backend for data matching the query criteria. Parameters ---------- data_query: BaseDataQuery The query to perform. Returns ------- dict of tuple and list of tuple of str, BaseStoredData and str The data that matches the query partitioned by the matched values. The list values take the form (storage_key, data_object, data_directory_path). """ with self._lock: return self._query(data_query)