Source code for openff.bespokefit.executor.utilities.redis

import atexit
import functools
import os
import shlex
import shutil
import subprocess
import time
from typing import IO, Dict, Optional, Tuple, Union

import redis

from openff.bespokefit.executor.services import current_settings

__REDIS_VERSION: int = 1
__CONNECTION_POOL: Dict[
    Tuple[str, int, Optional[int], Optional[str], bool], redis.Redis
] = {}


[docs]class RedisNotConfiguredError(BaseException): """An exception raised when connecting to a redis server that doesn't appear to have been configured by `openff-bespokefit`. """
[docs]class RedisBadConfigurationError(BaseException): """An exception raised when connecting to a redis server that doesn't appear to have been correctly configured for use with `openff-bespokefit`. """
[docs]def expected_redis_config_version() -> int: return __REDIS_VERSION
[docs]def connect_to_default_redis(validate: bool = True) -> redis.Redis: """Connects to a redis server using the settings defined by the `BEFLOW_REDIS_ADDRESS`, `BEFLOW_REDIS_PORT`, `BEFLOW_REDIS_DB` and `BEFLOW_REDIS_PASSWORD` settings. """ settings = current_settings() return connect_to_redis( host=settings.BEFLOW_REDIS_ADDRESS, port=settings.BEFLOW_REDIS_PORT, db=settings.BEFLOW_REDIS_DB, password=settings.BEFLOW_REDIS_PASSWORD, validate=validate, )
[docs]def connect_to_redis( host: str, port: int, db: int, validate: bool = True, password: Optional[str] = None ) -> redis.Redis: """Connects to a redis server using the specified settings.""" connection_key = (host, port, db, password, validate) if connection_key in __CONNECTION_POOL: return __CONNECTION_POOL[connection_key] connection = redis.Redis(host=host, port=port, db=db, password=password) if validate: version = connection.get("openff-bespokefit:redis-version") if version is None: raise RedisNotConfiguredError( f"The redis server at host={host} and port={port} does not contain a " f"`openff-bespokefit:redis-version` key. This likely means it was not " f"configured for use with OpenFF BespokeFit. Alternatively if you have " f"just updated to a new version of OpenFF BespokeFit, try deleting any " f"old `redis.db` files." ) elif int(version) != __REDIS_VERSION: raise RedisBadConfigurationError( f"The redis server at host={host} and port={port} expects a version of " f"OpenFF BespokeFit that supports a redis configurations with version " f"{version}, while the current version only supports version " f"{__REDIS_VERSION}." ) __CONNECTION_POOL[connection_key] = connection return connection
[docs]def is_redis_available( host: str, port: int = 6363, password: Optional[str] = None ) -> bool: """Returns whether a server running on the local host on a particular port is available. """ redis_client = redis.Redis(host=host, port=port, password=password) try: redis_client.get("null") except (redis.exceptions.ConnectionError, redis.exceptions.BusyLoadingError): return False return True
def _cleanup_redis(redis_process: subprocess.Popen): redis_process.terminate()
[docs]def launch_redis( port: int = 6363, stderr_file: Optional[Union[IO, int]] = None, stdout_file: Optional[Union[IO, int]] = None, directory: Optional[str] = None, persistent: bool = True, terminate_at_exit: bool = True, ) -> subprocess.Popen: settings = current_settings() redis_server_path = shutil.which("redis-server") if redis_server_path is None: raise RuntimeError( "The `redis-server` command could not be found. Please make sure `redis` is " "correctly installed." ) redis_cli_path = shutil.which("redis-cli") if redis_cli_path is None: raise RuntimeError( "The `redis-cli` command could not be found. Please make sure `redis` is " "correctly installed." ) if is_redis_available( host="localhost", port=port, password=settings.BEFLOW_REDIS_PASSWORD ): raise RuntimeError(f"There is already a server running at localhost:{port}") redis_save_exists = os.path.isfile( "redis.db" if not directory else os.path.join(directory, "redis.db") ) # to allow connections from other machines we need a default user password redis_command = f"redis-server --port {str(port)} --dbfilename redis.db --requirepass {settings.BEFLOW_REDIS_PASSWORD}" if directory: redis_command = f"{redis_command} --dir {directory}" if persistent: redis_command = ( f"{redis_command} --save 900 1 --save 300 100 --save 60 200 --save 15 1000" ) redis_process = subprocess.Popen( shlex.split(redis_command), stderr=stderr_file, stdout=stdout_file, preexec_fn=os.setpgrp, ) if terminate_at_exit: atexit.register(functools.partial(_cleanup_redis, redis_process)) timeout = True for i in range(0, 60): if is_redis_available( host="localhost", port=port, password=settings.BEFLOW_REDIS_PASSWORD ): timeout = False break time.sleep(1.0) if timeout: raise RuntimeError("The redis server failed to start.") try: connect_to_redis( host="localhost", port=port, db=settings.BEFLOW_REDIS_DB, password=settings.BEFLOW_REDIS_PASSWORD, validate=True, ) except RedisNotConfiguredError: if redis_save_exists: raise connection = connect_to_redis( host="localhost", port=port, db=settings.BEFLOW_REDIS_DB, password=settings.BEFLOW_REDIS_PASSWORD, validate=False, ) connection.set( "openff-bespokefit:redis-version", expected_redis_config_version() ) return redis_process