Source code for openff.evaluator.datasets.curation.components.thermoml

import glob
import io
import logging
import os
import tarfile
from multiprocessing import Pool
from typing import List, Optional, Union

import pandas
import requests
from pydantic import Field, HttpUrl
from typing_extensions import Literal

from openff.evaluator.datasets.curation.components import (
from openff.evaluator.datasets.thermoml import ThermoMLDataSet
from openff.evaluator.utils.utils import temporarily_change_directory

logger = logging.getLogger(__name__)

def _default_journals():
    return ["JCED", "JCT", "FPE", "TCA", "IJT"]

[docs]class ImportThermoMLDataSchema(CurationComponentSchema): type: Literal["ImportThermoMLData"] = "ImportThermoMLData" retain_uncertainties: bool = Field( True, description="If False, all uncertainties in measured property values will be " "stripped from the final data set.", ) cache_file_name: Optional[str] = Field( None, description="The path to the file to store the output of this component " "into, and to restore the output of this component from.", ) journal_names: List[Literal["JCED", "JCT", "FPE", "TCA", "IJT"]] = Field( default_factory=_default_journals, description="The abbreviated names of the journals to import data from.", ) root_archive_url: HttpUrl = Field( default="", description="The root url where the ThermoML archives can be downloaded from.", )
[docs]class ImportThermoMLData(CurationComponent): """A component which will import all supported data from the NIST ThermoML archive for (optionally) specified journals. """ @classmethod def _download_data(cls, schema: ImportThermoMLDataSchema): for journal in schema.journal_names: # Download the archive of all properties from the journal. request = requests.get( f"{schema.root_archive_url}/{journal}.tgz", stream=True ) # Make sure the request went ok. try: request.raise_for_status() except requests.exceptions.HTTPError as error: print(error.response.text) raise # Unzip the files into the temporary directory. tar_file = tar_file.extractall() @classmethod def _process_archive(cls, file_path: str) -> pandas.DataFrame: logger.debug(f"Processing {file_path}") # noinspection PyBroadException try: data_set = ThermoMLDataSet.from_file(file_path) except Exception: logger.exception( f"An exception was raised when processing {file_path}. This file will " f"be skipped." ) return pandas.DataFrame() # A data set will be none if no 'valid' properties were found # in the archive file. if data_set is None: return pandas.DataFrame() data_frame = data_set.to_pandas() return data_frame @classmethod def _apply( cls, data_frame: pandas.DataFrame, schema: ImportThermoMLDataSchema, n_processes, ) -> pandas.DataFrame: if schema.cache_file_name is not None and os.path.isfile( schema.cache_file_name ): cached_data = pandas.read_csv(schema.cache_file_name) return cached_data with temporarily_change_directory(): logger.debug("Downloading archive data") cls._download_data(schema) # Get the names of the extracted files file_names = glob.glob("*.xml") logger.debug("Processing archives") with Pool(processes=n_processes) as pool: data_frames = [*pool.imap(cls._process_archive, file_names)] pool.join() logger.debug("Joining archives") thermoml_data_frame = pandas.concat(data_frames, ignore_index=True, sort=False) for header in thermoml_data_frame: if header.find(" Uncertainty ") >= 0 and not schema.retain_uncertainties: thermoml_data_frame = thermoml_data_frame.drop(header, axis=1) data_frame = pandas.concat( [data_frame, thermoml_data_frame], ignore_index=True, sort=False ) if schema.cache_file_name is not None: data_frame.to_csv(schema.cache_file_name, index=False) return data_frame
ThermoMLComponentSchema = Union[ImportThermoMLDataSchema]