diff --git a/Justfile b/Justfile index 511fbd9a..e4d537d4 100755 --- a/Justfile +++ b/Justfile @@ -3,6 +3,8 @@ source "${VSI_COMMON_DIR}/linux/just_env" "$(dirname "${BASH_SOURCE[0]}")"/'terra.env' # Plugins +source "${VSI_COMMON_DIR}/linux/ask_question" +source "${VSI_COMMON_DIR}/linux/command_tools.bsh" source "${VSI_COMMON_DIR}/linux/docker_functions.bsh" source "${VSI_COMMON_DIR}/linux/just_docker_functions.bsh" source "${VSI_COMMON_DIR}/linux/just_singularity_functions.bsh" @@ -28,7 +30,13 @@ JUST_DEFAULTIFY_FUNCTIONS+=(terra_caseify) function Terra_Pipenv() { if [[ ${TERRA_LOCAL-} == 1 ]]; then - PIPENV_PIPFILE="${TERRA_CWD}/Pipfile" pipenv ${@+"${@}"} || return $? + if [ -n "${VIRTUAL_ENV+set}" ]; then + echo "Warning: You appear to be in a virtual env" >&2 + echo "Deactivate external virtual envs before running just" >&2 + ask_question "Continue?" answer_continue n + [ "$answer_continue" == "0" ] && return 1 + fi + ${DRYRUN} env PIPENV_PIPFILE="${TERRA_CWD}/Pipfile" pipenv ${@+"${@}"} || return $? else Just-docker-compose -f "${TERRA_CWD}/docker-compose-main.yml" run ${TERRA_PIPENV_IMAGE-terra} pipenv ${@+"${@}"} || return $? fi @@ -115,10 +123,6 @@ function terra_caseify() extra_args=$# ;; - run_redis) # Run redis - Just-docker-compose -f "${TERRA_CWD}/docker-compose.yml" run redis ${@+"${@}"} - extra_args=$# - ;; run_celery) # Starts a celery worker local node_name if [[ ${TERRA_LOCAL-} == 1 ]]; then @@ -127,7 +131,23 @@ function terra_caseify() node_name="docker@%h" fi - Terra_Pipenv run celery -A terra.executor.celery.app worker --loglevel="${TERRA_CELLER_LOG_LEVEL-INFO}" -n "${node_name}" + # Untested + if [ "${OS-}" = "Windows_NT" ]; then + # https://www.distributedpython.com/2018/08/21/celery-4-windows/ + local FORKED_BY_MULTIPROCESSING + export FORKED_BY_MULTIPROCESSING=1 + fi + + # We might be able to use CELERY_LOADER to avoid the -A argument + Terra_Pipenv run python -m terra.executor.celery -A terra.executor.celery.app worker --loglevel="${TERRA_CELERY_LOG_LEVEL-INFO}" -n "${node_name}" + ;; + + run_flower) # Start the flower server + # Flower doesn't actually need the tasks loaded in the app, so clear it + TERRA_CELERY_INCLUDE='[]' Terra_Pipenv run python -m terra.executor.celery -A terra.executor.celery.app flower + ;; + shutdown_celery) # Shuts down all celery workers on all nodes + Terra_Pipenv run python -c "from terra.executor.celery import app; app.control.broadcast('shutdown')" ;; ### Run Debugging containers ### @@ -203,7 +223,8 @@ function terra_caseify() terra_test-pep8) # Run pep8 test justify terra pep8 echo "Running flake8..." - Terra_Pipenv run bash -c 'flake8 \ + Terra_Pipenv run bash -c 'cd ${TERRA_TERRA_DIR}; + flake8 \ "${TERRA_TERRA_DIR}/terra"' ;; @@ -240,6 +261,98 @@ function terra_caseify() extra_args=$# ;; + terra_setup) # Setup pipenv using system python and/or conda + local output_dir + local CONDA + local PYTHON + local download_conda=0 + + parse_args extra_args --dir output_dir: --python PYTHON: --conda CONDA: --download download_conda -- ${@+"${@}"} + + if [ -z "${output_dir:+set}" ]; then + echo "--dir must be specified" >& 2 + exit 2 + fi + + mkdir -p "${output_dir}" + # relative to absolute + output_dir="$(cd "${output_dir}"; pwd)" + + local use_conda + local platform_bin + if [ "${OS-}" = "Windows_NT" ]; then + platform_bin=Scripts + else + platform_bin=bin + fi + + if [ -n "${PYTHON:+set}" ]; then + use_conda=0 + elif [ -n "${CONDA:+set}" ]; then + use_conda=1 + else + if [ "${download_conda}" == "0" ] && command -v python3 &> /dev/null; then + PYTHON=python3 + use_conda=0 + elif [ "${download_conda}" == "0" ] && command -v python &> /dev/null; then + PYTHON=python + use_conda=0 + elif [ "${download_conda}" == "0" ] && command -v conda3 &> /dev/null; then + CONDA=conda3 + use_conda=1 + elif [ "${download_conda}" == "0" ] && command -v conda &> /dev/null; then + CONDA=conda + use_conda=1 + elif [ "${download_conda}" == "0" ] && command -v conda2 &> /dev/null; then + CONDA=conda2 + use_conda=1 + else + source "${VSI_COMMON_DIR}/linux/web_tools.bsh" + source "${VSI_COMMON_DIR}/linux/dir_tools.bsh" + make_temp_path temp_dir -d + if [ "${OS-}" = "Windows_NT" ]; then + download_to_stdout "https://repo.anaconda.com/miniconda/Miniconda3-latest-Windows-x86_64.exe" > "${temp_dir}/install_conda.exe" + MSYS2_ARG_CONV_EXCL="*" "${temp_dir}/install_conda.exe" /NoRegistry=1 /InstallationType=JustMe /S "/D=$(cygpath -aw "${temp_dir}/conda")" + CONDA="${temp_dir}/conda/Scripts/conda" + else + if [[ ${OSTYPE-} = darwin* ]]; then + URL="https://repo.anaconda.com/miniconda/Miniconda3-latest-MacOSX-x86_64.sh" + else + URL="https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh" + fi + download_to_stdout "${URL}" > "${temp_dir}/install_conda.sh" + bash "${temp_dir}/install_conda.sh" -b -p "${temp_dir}/conda" -s + CONDA="${temp_dir}/conda/bin/conda" + fi + use_conda=1 + fi + fi + + if [ "${use_conda}" = "1" ]; then + "${CONDA}" create -y -p "${output_dir}/.python" 'python<=3.8' + PYTHON="${output_dir}/.python/${platform_bin}/python" + fi + + # Make sure python is 3.6 or newer + local python_version="$("${PYTHON}" --version | awk '{print $2}')" + source "${VSI_COMMON_DIR}/linux/requirements.bsh" + if ! meet_requirements "${python_version}" '>=3.6' '<3.9'; then + echo "Python version ${python_version} does not meet the expected requirements" >&2 + read -srn1 -d '' -p "Press any key to continue" + echo + fi + + source "${VSI_COMMON_DIR}/docker/recipes/get-pipenv" + PIPENV_VIRTUALENV="${output_dir}" install_pipenv + + local add_to_local + echo "" >&2 + ask_question "Do you want to add \"${output_dir}/${platform_bin}\" to your local.env automatically?" add_to_local y + if [ "${add_to_local}" == "1" ]; then + echo $'\n'"PATH=\"${output_dir}/${platform_bin}:\${PATH}\"" >> "${TERRA_CWD}/local.env" + fi + ;; + terra_pipenv) # Run pipenv commands in Terra's pipenv conatainer. Useful for \ # installing/updating pipenv packages into terra TERRA_PIPENV_IMAGE=terra_pipenv Terra_Pipenv ${@+"${@}"} diff --git a/README.md b/README.md index 255a3c3a..3bc65ef8 100644 --- a/README.md +++ b/README.md @@ -17,3 +17,27 @@ source 'setup.env' just terra sync just terra run ``` + +## Setting up pipenv + +There are a number of reasons `pipenv` running python 3.6 or newer may not be available, especially on older operating systems. To automatically setup `pipenv` in a directory for you, run `just terra setup --dir {directory to install pipenv in}`. This does not require elevated permissions. + +`just terra setup` will attempt to setup pipenv using a series of different strategies: + +1. It will look for the Python 3 executable (`python3` or `python`). If this is found, it will be used to setup `pipenv` + - A specific python executable can be specified using the `--python` flag +2. If `python` cannot be found, it will look for the `conda3`/`conda`/`conda2` executable and use that to first setup Python 3.7, and then setup `pipenv` + - A specific executable of conda can be specified using the `--conda` flag +3. If all else fails, MiniConda will be downloaded from the internet, installed, and used to first setup Python 3.7, and then setup `pipenv` +4. If an invalid version of python or conda is detected, the download approach can be forced using the `--download` flag. +5. Once `pipenv` is setup, it should be added to your `PATH` using the `local.env` file. This will be done for you if you answer yes to the final question at the end. + +## Running an app in celery + +1. `just terra up` - To start redis queue (only once) +2. `just run celery` - To start a celery worker (run on each worker node) +3. `just run dsm ...` - To start processing job + +When done +4. `just shutdown celery` - To shutdown _all_ celery workers on _all_ nodes +5. `just terra down` - To shutdown redis. \ No newline at end of file diff --git a/docker-compose-main.yml b/docker-compose-main.yml index ea86c742..3176dfec 100644 --- a/docker-compose-main.yml +++ b/docker-compose-main.yml @@ -6,17 +6,17 @@ services: dockerfile: docker/terra.Dockerfile # prevent different users from clobbering each others images image: ${TERRA_DOCKER_REPO}:terra_${TERRA_USERNAME} - environment: + environment: &terra_environment # Variables for docker_entrypoint.bsh - - DOCKER_UID=${TERRA_UID} - - DOCKER_GIDS=${TERRA_GIDS} - - DOCKER_GROUP_NAMES=${TERRA_GROUP_NAMES} - - DOCKER_USERNAME=user - - DISPLAY - - JUSTFILE=${TERRA_TERRA_DIR_DOCKER}/docker/terra.Justfile - - JUST_SETTINGS=${TERRA_TERRA_DIR_DOCKER}/terra.env - - PYTHONPATH=${TERRA_PYTHONPATH-} - - TZ + DOCKER_UID: ${TERRA_UID} + DOCKER_GIDS: ${TERRA_GIDS} + DOCKER_GROUP_NAMES: ${TERRA_GROUP_NAMES} + DOCKER_USERNAME: user + JUSTFILE: ${TERRA_TERRA_DIR_DOCKER}/docker/terra.Justfile + JUST_SETTINGS: ${TERRA_TERRA_DIR_DOCKER}/terra.env + PYTHONPATH: ${TERRA_PYTHONPATH-} + DISPLAY: + TZ: cap_add: - SYS_PTRACE # Useful for gdb volumes: @@ -38,6 +38,12 @@ services: source: terra-venv target: /venv + terra-demo: + <<: *terra + environment: + <<: *terra_environment + TERRA_SETTINGS_FILE: + redis-commander: image: rediscommander/redis-commander ports: @@ -46,14 +52,14 @@ services: - source: redis_secret target: ${TERRA_REDIS_SECRET_DOCKER} - source: redis_commander_secret - target: ${TERRA_REDIS_COMMANDER_SECRET_FILE} + target: ${TERRA_REDIS_COMMANDER_SECRET} command: | sh -c ' echo -n '"'"'{ "connections":[ { "password": "'"'"' > /redis-commander/config/local-production.json - cat /run/secrets/redis_password | sed '"'"'s|\\|\\\\|g;s|"|\\"|g'"'"' >> /redis-commander/config/local-production.json + cat /run/secrets/${TERRA_REDIS_SECRET_DOCKER} | sed '"'"'s|\\|\\\\|g;s|"|\\"|g'"'"' >> /redis-commander/config/local-production.json echo -n '"'"'", "host": "${TERRA_REDIS_HOSTNAME_DOCKER}", "label": "terra", @@ -68,7 +74,7 @@ services: "httpAuth": { "username": "admin", "passwordHash": "'"'"'>> /redis-commander/config/local-production.json - cat "/run/secrets/${TERRA_REDIS_COMMANDER_SECRET_FILE}" | sed '"'"'s|\\|\\\\|g;s|"|\\"|g'"'"' >> /redis-commander/config/local-production.json + cat "/run/secrets/${TERRA_REDIS_COMMANDER_SECRET}" | sed '"'"'s|\\|\\\\|g;s|"|\\"|g'"'"' >> /redis-commander/config/local-production.json echo '"'"'" } } diff --git a/docs/conf.py b/docs/conf.py index 1e3a542e..c0615110 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -51,6 +51,7 @@ intersphinx_mapping = { 'python': ('https://docs.python.org/3.6', None), 'vsi_common': ('https://visionsystemsinc.github.io/vsi_common/', None), + 'celery': ('https://docs.celeryproject.org/en/stable/', None) } # Add any paths that contain templates here, relative to this directory. @@ -103,6 +104,8 @@ ('py:class', 'json.encoder.JSONEncoder'), ('py:class', 'concurrent.futures._base.Executor'), ('py:class', 'concurrent.futures._base.Future'), + ('py:class', 'concurrent.futures.process.ProcessPoolExecutor'), + ('py:class', 'concurrent.futures.thread.ThreadPoolExecutor'), ('py:class', 'argparse._AppendAction'), ('py:data', 'logging.DEBUG'), ('py:data', 'logging.WARNING'), diff --git a/docs/terra/settings.rst b/docs/terra/settings.rst index 2ad9e400..21bc38dc 100644 --- a/docs/terra/settings.rst +++ b/docs/terra/settings.rst @@ -1,6 +1,19 @@ .. _settings: +Terra Settings +-------------- + +.. option:: terra.zone + + Terra can be running in one of three areas of execution, or "zones": the master controller (``controller``), a service runner (``runner``), or a task (``task``). The different zones could all be running on the main host, or other containers or computers, depending on the compute and executor. + + The master controller includes: the CLI, workflow, stage and service definitions layers. + + This variable is automatically updated, and should only be read. + + Default: ``controller`` + Workflow Settings ----------------- diff --git a/external/vsi_common b/external/vsi_common index c98eb284..f1d5e62a 160000 --- a/external/vsi_common +++ b/external/vsi_common @@ -1 +1 @@ -Subproject commit c98eb284c89311e5b1d89a949660a0fa24df3818 +Subproject commit f1d5e62aa8be566fe1ba88095ba9386819b2379d diff --git a/setup.py b/setup.py index d03058cc..3b2f8cff 100644 --- a/setup.py +++ b/setup.py @@ -11,6 +11,8 @@ extra_requires=extra_requires, install_requires=[ "jstyleson", - "envcontext" + "envcontext", + # I use signal and task from celery, no matter what + "celery" ] ) diff --git a/terra.env b/terra.env index 784ab0df..2449de1a 100644 --- a/terra.env +++ b/terra.env @@ -42,7 +42,6 @@ if [ -d "/tmp/.X11-unix" ]; then ${TERRA_VOLUMES+"${TERRA_VOLUMES[@]}"}) fi -: ${TERRA_DOCKER_RUNTIME="$([[ "$(nvidia-docker version 2>/dev/null)" = "NVIDIA Docker: 2"* ]] && echo nvidia)"} # Redis values : ${TERRA_REDIS_PORT=6379} : ${TERRA_REDIS_PORT_DOCKER=6379} @@ -72,16 +71,28 @@ fi if [[ ! -f /.dockerenv && ! -s "${TERRA_REDIS_SECRET_FILE}" ]]; then source "${VSI_COMMON_DIR}/linux/random.bsh" - # No quotes allowed - urandom_password 20 '\x21\x23-\x26\x28-\x7E' > "${TERRA_REDIS_SECRET_FILE}" + # Allow printable ascii characters excpet quotes, ';' (for an unknown redis/celery parsing reason), ':' or '@' (for redis url reasons) + urandom_password 20 '\x21\x23-\x26\x28-\x39\x3c-\x3f\x41-\x7E' > "${TERRA_REDIS_SECRET_FILE}" fi #** # .. envvar:: TERRA_CELERY_MAIN_NAME # -# Name of the main module if running as __main__. This is used as the prefix for auto-generated task names. +# (Optional) Name of the main module if running as __main__. This is used as the prefix for auto-generated task names that are defined in the same module as ``__main__`` (Usually caused by ``python -m``). At first, python will try ``sys.modules['__main__'].__spec__.name``, before using this value, when that fails. +# +# .. envvar:: TERRA_KEEP_TEMP_DIR +# +# Optional environment variable that, when set to ``1``, will keep the temporary config files generated for containers. For debug use. +# +# .. envvar:: TERRA_DISABLE_SETTINGS_DUMP +# +# Optional environment variable that, when set to ``1``, will disable the saving of ``settings.json`` files in the processing dir. This is particularly useful for test script or jupyter notebooks where you do not want to litter ``settings.json`` files everywhere. For debug use. +# +# .. envvar:: TERRA_DISABLE_TERRA_LOG +# +# Optional environment variable that, when set to ``1``, will disable the saving of the ``terra_log`` file in the processing dir. This is particularly useful for test script or jupyter notebooks where you do not want to litter ``terra_log`` files everywhere. For debug use. #** -: ${TERRA_CELERY_MAIN_NAME=terra} + #** # .. envvar:: TERRA_CELERY_CONF # diff --git a/terra/compute/base.py b/terra/compute/base.py index 9197d192..989e89d2 100644 --- a/terra/compute/base.py +++ b/terra/compute/base.py @@ -1,8 +1,17 @@ import os - +import time +import atexit +from logging import StreamHandler +from logging.handlers import SocketHandler +import threading +import warnings + +from terra import settings import terra.compute.utils from terra.executor import Executor -from terra.logger import getLogger +from terra.logger import ( + getLogger, LogRecordSocketReceiver, SkipStdErrAddFilter +) logger = getLogger(__name__) @@ -63,9 +72,6 @@ def add_volume(self, local, remote, flags=None, prefix=None, self._validate_volume(local, remote, local_must_exist=local_must_exist) self.volumes.append((local, remote)) - def get_volume_map(self, config, service_info): - return [] - def pre_run(self): ''' A function that runs before the run service @@ -75,7 +81,13 @@ def pre_run(self): :class:`terra.compute.base.BaseService` is mainly responsible for handling Executors that need a separate volume translation ''' - self.executor_configuration_map = Executor.configuration_map(self) + + # The executor volume map is calculated on the host side, where all the + # information is available. For example if using docker and celery, then + # docker config need to be run to get the container volumes, and that has + # to be run on the host machine. So this is calculated here. + settings.executor.volume_map = Executor.configuration_map(self) + logger.debug4("Executor Volume map: %s", settings.executor.volume_map) def post_run(self): pass @@ -156,6 +168,9 @@ def defaultCommand(self, service_class, *args, **kwargs): # bind function and return it return defaultCommand.__get__(self, type(self)) + def get_volume_map(self, config, service_info): + return [] + def run_service(self, *args, **kwargs): ''' Place holder for code to run an instance in the compute. Runs @@ -178,5 +193,95 @@ def configuration_map_service(self, service_info): return service_info.volumes + @staticmethod + def configure_logger(sender, **kwargs): + if settings.terra.zone == 'controller': + # Setup log file for use in configure + if os.environ.get('TERRA_DISABLE_TERRA_LOG') != '1': + sender._log_file = os.path.join( + settings.processing_dir, + terra.logger._SetupTerraLogger.default_log_prefix) + else: + sender._log_file = os.devnull + os.makedirs(settings.processing_dir, exist_ok=True) + sender._log_file = open(sender._log_file, 'a') + sender.main_log_handler = StreamHandler(stream=sender._log_file) + sender.root_logger.addHandler(sender.main_log_handler) + + # setup the TCP socket listener + sender.tcp_logging_server = LogRecordSocketReceiver( + settings.logging.server.hostname, settings.logging.server.port) + listener_thread = threading.Thread( + target=sender.tcp_logging_server.serve_until_stopped) + listener_thread.setDaemon(True) + listener_thread.start() + + # Wait up to a second, to make sure the thread started + for _ in range(1000): + if sender.tcp_logging_server.ready: + break + time.sleep(0.001) + else: # pragma: no cover + warnings.warn("TCP Logging server thread did not startup. " + "This is probably not a problem, unless logging isn't " + "working.", RuntimeWarning) + + # Auto cleanup + @atexit.register + def cleanup_thread(): + sender.tcp_logging_server.abort = 1 + listener_thread.join(timeout=5) + if listener_thread.is_alive(): # pragma: no cover + warnings.warn("TCP Logger Server Thread did not shut down " + "gracefully. Attempting to exit anyways.", + RuntimeWarning) + elif settings.terra.zone == 'runner': + sender.main_log_handler = SocketHandler( + settings.logging.server.hostname, settings.logging.server.port) + # By default, all runners have access to the master controllers stderr, + # so there is no need for the master controller to echo out the log + # messages a second time. + sender.main_log_handler.addFilter(SkipStdErrAddFilter()) + sender.root_logger.addHandler(sender.main_log_handler) + + @staticmethod + def reconfigure_logger(sender, **kwargs): + # sender is logger in this case + # + # The default logging handler is a StreamHandler. This will reconfigure its + # output stream + + if settings.terra.zone == 'controller': + if os.environ.get('TERRA_DISABLE_TERRA_LOG') != '1': + log_file = os.path.join( + settings.processing_dir, + terra.logger._SetupTerraLogger.default_log_prefix) + else: + log_file = os.devnull + + # Check to see if _log_file is unset. If it is, this is due to _log_file + # being called without configure being called. While it is not important + # this work, it's more likely for unit testsing + # if not os.path.samefile(log_file, sender._log_file.name): + if getattr(sender, '_log_file', None) is not None and \ + log_file != sender._log_file.name: + os.makedirs(settings.processing_dir, exist_ok=True) + sender._log_file.close() + sender._log_file = open(log_file, 'a') + elif settings.terra.zone == 'runner': + # Only if it's changed + if settings.logging.server.hostname != sender.main_log_handler.host or \ + settings.logging.server.port != sender.main_log_handler.port: + # Reconnect Socket Handler + sender.main_log_handler.close() + try: + sender.root_logger.removeHandler(sender.main_log_handler) + except ValueError: # pragma: no cover + pass + + sender.main_log_handler = SocketHandler( + settings.logging.server.hostname, settings.logging.server.port) + sender.root_logger.addHandler(sender.main_log_handler) + services = {} diff --git a/terra/compute/container.py b/terra/compute/container.py index 1477f795..0ad4f3d3 100644 --- a/terra/compute/container.py +++ b/terra/compute/container.py @@ -1,7 +1,6 @@ import os import posixpath import ntpath -from os import environ as env import re import pathlib from tempfile import TemporaryDirectory @@ -29,18 +28,25 @@ def __init__(self): self.extra_compose_files = [] def pre_run(self): - self.temp_dir = TemporaryDirectory() + # Need to run Base's pre_run first, so it has a chance to update settings + # for special executors, etc... + super().pre_run() + + self.temp_dir = TemporaryDirectory(suffix=f"_{type(self).__name__}") + if self.env.get('TERRA_KEEP_TEMP_DIR', None) == "1": + self.temp_dir._finalizer.detach() temp_dir = pathlib.Path(self.temp_dir.name) # Check to see if and are already defined, this will play nicely with # external influences env_volume_index = 1 - while f'{env["JUST_PROJECT_PREFIX"]}_VOLUME_{env_volume_index}' in \ + while f'{self.env["JUST_PROJECT_PREFIX"]}_VOLUME_{env_volume_index}' in \ self.env: env_volume_index += 1 # Setup volumes for container - self.env[f'{env["JUST_PROJECT_PREFIX"]}_VOLUME_{env_volume_index}'] = \ + self.env[f'{self.env["JUST_PROJECT_PREFIX"]}_' + f'VOLUME_{env_volume_index}'] = \ f'{str(temp_dir)}:/tmp_settings:rw' env_volume_index += 1 @@ -50,13 +56,13 @@ def pre_run(self): volume_str = f'{volume_host}:{volume_container}' if volume_flags: volume_str += f':{volume_flags}' - self.env[f'{env["JUST_PROJECT_PREFIX"]}_VOLUME_{env_volume_index}'] = \ + self.env[f'{self.env["JUST_PROJECT_PREFIX"]}_' + f'VOLUME_{env_volume_index}'] = \ volume_str env_volume_index += 1 - volume_map = compute.configuration_map(self) - - logger.debug3("Compute Volume map: %s", volume_map) + settings.compute.volume_map = compute.configuration_map(self) + logger.debug4("Compute Volume map: %s", settings.compute.volume_map) # Setup config file for container @@ -64,7 +70,7 @@ def pre_run(self): container_config = translate_settings_paths( TerraJSONEncoder.serializableSettings(settings), - volume_map, + settings.compute.volume_map, self.container_platform) if os.name == "nt": # pragma: no linux cover @@ -75,15 +81,15 @@ def pre_run(self): + '|TERRA_SETTINGS_FILE' # Dump the settings + container_config['terra']['zone'] = 'runner' with open(temp_dir / 'config.json', 'w') as fid: json.dump(container_config, fid) - super().pre_run() - def post_run(self): super().post_run() # Delete temp_dir - self.temp_dir.cleanup() + if self.env.get('TERRA_KEEP_TEMP_DIR', None) != "1": + self.temp_dir.cleanup() # self.temp_dir = None # Causes a warning, hopefully there wasn't a reason # I did it this way. diff --git a/terra/compute/docker.py b/terra/compute/docker.py index bce7d367..9b782558 100644 --- a/terra/compute/docker.py +++ b/terra/compute/docker.py @@ -42,7 +42,7 @@ def run_service(self, service_info): just --wrap Just-docker-compose \\ -f {service_info.compose_files} ... \\ - run {service_info.compose_service_name} \\ + run -T {service_info.compose_service_name} \\ {service_info.command} ''' optional_args = {} @@ -50,7 +50,7 @@ def run_service(self, service_info): pid = just("--wrap", "Just-docker-compose", *sum([['-f', cf] for cf in service_info.compose_files], []), - 'run', service_info.compose_service_name, + 'run', '-T', service_info.compose_service_name, *service_info.command + extra_arguments, **optional_args, env=service_info.env) @@ -96,7 +96,9 @@ def get_volume_map(self, config, service_info): ans = re.match(docker_volume_re, volume).groups() volume_map.append((ans[0], ans[2])) - volume_map = volume_map + service_info.volumes + # This is not needed, because service_info.volumes are already in + # service_info.env, added by terra.compute.base.BaseService.pre_run + # volume_map = volume_map + service_info.volumes slashes = '/' if os.name == 'nt': diff --git a/terra/compute/singularity.py b/terra/compute/singularity.py index 561fdf93..7deb1228 100644 --- a/terra/compute/singularity.py +++ b/terra/compute/singularity.py @@ -72,7 +72,8 @@ def get_volume_map(self, config, service_info): volume = volume.split(':') volume_map.append((volume[0], volume[1])) - volume_map = volume_map + service_info.volumes + # I think this causes duplicates, just like in the docker + # volume_map = volume_map + service_info.volumes slashes = '/' if os.name == 'nt': diff --git a/terra/compute/utils.py b/terra/compute/utils.py index 7a9e4dd4..912d8121 100644 --- a/terra/compute/utils.py +++ b/terra/compute/utils.py @@ -40,6 +40,7 @@ from vsi.tools.python import nested_patch from terra.core.utils import Handler +import terra.core.signals from terra import settings import terra.compute.base from terra.core.settings import filename_suffixes @@ -92,6 +93,12 @@ def _connect_backend(self): For the most part, workflows will be interacting with :data:`compute` to ``run`` services. Easier access via ``terra.compute.compute`` ''' +terra.core.signals.logger_configure.connect( + lambda *args, **kwargs: compute.configure_logger(*args, **kwargs), + weak=False) +terra.core.signals.logger_reconfigure.connect( + lambda *args, **kwargs: compute.reconfigure_logger(*args, **kwargs), + weak=False) def get_default_service_class(cls): @@ -189,7 +196,7 @@ def just(*args, **kwargs): if logger.getEffectiveLevel() <= DEBUG1: dd = dict_diff(env, just_env)[3] if dd: - logger.debug1('Environment Modification:\n' + '\n'.join(dd)) + logger.debug4('Environment Modification:\n' + '\n'.join(dd)) # Get bash path for windows compatibility. I can't explain this error, but # while the PATH is set right, I can't call "bash" because the WSL bash is diff --git a/terra/compute/virtualenv.py b/terra/compute/virtualenv.py index c65c6253..a7b679c3 100644 --- a/terra/compute/virtualenv.py +++ b/terra/compute/virtualenv.py @@ -56,7 +56,7 @@ def run_service(self, service_info): if logger.getEffectiveLevel() <= DEBUG1: dd = dict_diff(os.environ, env)[3] if dd: - logger.debug1('Environment Modification:\n' + '\n'.join(dd)) + logger.debug4('Environment Modification:\n' + '\n'.join(dd)) # Similar (but different) to a bug in docker compute, the right python # executable is not found on the path, possibly because Popen doesn't @@ -99,23 +99,23 @@ class Service(BaseService): ''' def pre_run(self): - """ - - """ super().pre_run() # Create a temp directory, store it in this instance - self.temp_dir = TemporaryDirectory() + self.temp_dir = TemporaryDirectory(suffix=f"_{type(self).__name__}") + if self.env.get('TERRA_KEEP_TEMP_DIR', None) == "1": + self.temp_dir._finalizer.detach() # Use a config.json file to store settings within that temp directory temp_config_file = os.path.join(self.temp_dir.name, 'config.json') # Serialize config file - docker_config = TerraJSONEncoder.serializableSettings(settings) + venv_config = TerraJSONEncoder.serializableSettings(settings) # Dump the serialized config to the temp config file + venv_config['terra']['zone'] = 'runner' with open(temp_config_file, 'w') as fid: - json.dump(docker_config, fid) + json.dump(venv_config, fid) # Set the Terra settings file for this service runner to the temp config # file @@ -124,4 +124,8 @@ def pre_run(self): def post_run(self): super().post_run() # Delete temp_dir - self.temp_dir.cleanup() + if self.env.get('TERRA_KEEP_TEMP_DIR', None) != "1": + # Calling this just prevents the annoying warning from saying "Hey, you + # know that automatic cleanup? It happened! Maybe you should manually + # call the automatic cleanup, cause yeah, that makes sense!" + self.temp_dir.cleanup() diff --git a/terra/core/exceptions.py b/terra/core/exceptions.py index 3a905530..c3438415 100644 --- a/terra/core/exceptions.py +++ b/terra/core/exceptions.py @@ -2,3 +2,9 @@ class ImproperlyConfigured(Exception): """ Exception for Terra is somehow improperly configured """ + + +class ConfigurationWarning(Warning): + """ + Warning that Terra may be improperly configured + """ diff --git a/terra/core/settings.py b/terra/core/settings.py index f7fec15f..a9f42a56 100644 --- a/terra/core/settings.py +++ b/terra/core/settings.py @@ -147,14 +147,24 @@ # POSSIBILITY OF SUCH DAMAGE. import os +from uuid import uuid4 +# from datetime import datetime +from logging.handlers import DEFAULT_TCP_LOGGING_PORT from inspect import isfunction from functools import wraps - -from terra.core.exceptions import ImproperlyConfigured +from json import JSONEncoder +import platform +import warnings +import threading +import concurrent.futures +import copy + +from terra.core.exceptions import ImproperlyConfigured, ConfigurationWarning +# Do not import terra.logger or terra.signals here, or any module that +# imports them from vsi.tools.python import ( nested_patch_inplace, nested_patch, nested_update, nested_in_dict ) -from json import JSONEncoder try: import jstyleson as json @@ -248,7 +258,19 @@ def unittest(self): return os.environ.get('TERRA_UNITTEST', None) == "1" -# TODO: come up with a way for apps to extend this themselves +@settings_property +def need_to_set_virtualenv_dir(self): + warnings.warn("You are using the virtualenv compute, and did not set " + "settings.compute.virtualenv_dir in your config file. " + "Using system python.", ConfigurationWarning) + return None + + +@settings_property +def terra_uuid(self): + return str(uuid4()) + + global_templates = [ ( # Global Defaults @@ -256,15 +278,33 @@ def unittest(self): { "logging": { "level": "ERROR", - "format": f"%(asctime)s (%(hostname)s): %(levelname)s - %(message)s", + "format": "%(asctime)s (%(hostname)s:%(zone)s): " + "%(levelname)s/%(processName)s - %(filename)s - %(message)s", "date_format": None, - "style": "%" + "style": "%", + "server": { + # This is tricky use of a setting, because the master controller will + # be the first to set it, but the runner and task will inherit the + # master controller's values, not their node names, should they be + # different (such as celery and spark) + "hostname": platform.node(), + "port": DEFAULT_TCP_LOGGING_PORT + } }, "executor": { - "type": "ThreadPoolExecutor" + "type": "ProcessPoolExecutor", + 'volume_map': [] }, "compute": { - "arch": "terra.compute.dummy" + "arch": "terra.compute.dummy", + 'volume_map': [] + }, + 'terra': { + # unlike other settings, this should NOT be overwritten by a + # config.json file, there is currently nothing to prevent that + 'zone': 'controller', + # 'start_time': datetime.now(), # Not json serializable yet + 'uuid': terra_uuid }, 'status_file': status_file, 'processing_dir': processing_dir, @@ -274,11 +314,11 @@ def unittest(self): ), ( {"compute": {"arch": "terra.compute.virtualenv"}}, # Pattern - {"compute": {"virtualenv_dir": None}} # Defaults + {"compute": {"virtualenv_dir": need_to_set_virtualenv_dir}} # Defaults ), ( # So much for DRY :( {"compute": {"arch": "virtualenv"}}, - {"compute": {"virtualenv_dir": None}} + {"compute": {"virtualenv_dir": need_to_set_virtualenv_dir}} ) ] ''':class:`list` of (:class:`dict`, :class:`dict`): Templates are how we @@ -300,7 +340,6 @@ class LazyObject: Based off of Django's LazyObject ''' - _wrapped = None ''' The internal object being wrapped ''' @@ -329,9 +368,9 @@ def __getattr__(self, name, *args, **kwargs): def __setattr__(self, name, value): '''Supported''' - if name == "_wrapped": - # Assign to __dict__ to avoid infinite __setattr__ loops. - self.__dict__["_wrapped"] = value + if name in ("_wrapped", "__class__"): + # Call super to avoid infinite __setattr__ loops. + super().__setattr__(name, value) else: if self._wrapped is None: self._setup() @@ -428,6 +467,20 @@ def _setup(self, name=None): self.configure(json.load(fid)) self._wrapped.config_file = os.environ.get(ENVIRONMENT_VARIABLE) + def __getstate__(self): + if self._wrapped is None: + self._setup() + return {'_wrapped': self._wrapped} + + def __setstate__(self, state): + self._wrapped = state['_wrapped'] + + # This should NOT be done on a per instance basis, this is only for + # the global terra.settings. So maybe this should be done in a context + # manager?? + # from terra.core.signals import post_settings_configured + # post_settings_configured.send(sender=self) + def __repr__(self): # Hardcode the class name as otherwise it yields 'Settings'. if self._wrapped is None: @@ -452,8 +505,6 @@ def configure(self, *args, **kwargs): ImproperlyConfigured If settings is already configured, will throw this exception """ - from terra.core.signals import post_settings_configured - if self._wrapped is not None: raise ImproperlyConfigured('Settings already configured.') logger.debug2('Pre settings configure') @@ -479,10 +530,18 @@ def read_json(json_file): nested_patch_inplace( self._wrapped, lambda key, value: (isinstance(key, str) + and (isinstance(value, str) + or getattr(value, 'settings_property', False)) and any(key.endswith(pattern) for pattern in json_include_suffixes)), lambda key, value: read_json(value)) + # Importing these here is intentional, it guarantees the signals are + # connected so that executor and computes can setup logging if need be + import terra.executor # noqa + import terra.compute # noqa + + from terra.core.signals import post_settings_configured post_settings_configured.send(sender=self) logger.debug2('Post settings configure') @@ -519,7 +578,49 @@ def __enter__(self): return self._wrapped.__enter__() def __exit__(self, exc_type=None, exc_value=None, traceback=None): - return self._wrapped.__exit__(exc_type, exc_value, traceback) + return_value = self._wrapped.__exit__(exc_type, exc_value, traceback) + + # Incase the logger was messed with in the context, reset it. + from terra.core.signals import post_settings_context + post_settings_context.send(sender=self, post_settings_context=True) + + return return_value + + +class LazySettingsThreaded(LazySettings): + @classmethod + def downcast(cls, obj): + # This downcast function was intended for LazySettings instances only + assert type(obj) == LazySettings + # Put settings in __wrapped where property below expects it. + settings = obj._wrapped + # Downcast + obj.__class__ = cls + obj.__wrapped = settings + obj.__tls = threading.local() + + @property + def _wrapped(self): + ''' + Thread safe version of _wrapped getter + ''' + thread = threading.current_thread() + if thread._target == concurrent.futures.thread._worker: + if not hasattr(self.__tls, 'settings'): + self.__tls.settings = copy.deepcopy(self.__wrapped) + return self.__tls.settings + else: + return self.__wrapped + + def __setattr__(self, name, value): + '''Supported''' + if name in ("_LazySettingsThreaded__wrapped", + "_LazySettingsThreaded__tls"): + # Call original __setattr__ to avoid infinite __setattr__ loops. + object.__setattr__(self, name, value) + else: + # Normal LazyObject setter + super().__setattr__(name, value) class ObjectDict(dict): @@ -635,6 +736,8 @@ def default(self, obj): if obj._wrapped is None: raise ImproperlyConfigured('Settings not initialized') return TerraJSONEncoder.serializableSettings(obj._wrapped) + # elif isinstance(obj, datetime): + # return str(obj) return JSONEncoder.default(self, obj) # pragma: no cover @staticmethod @@ -655,11 +758,24 @@ def serializableSettings(obj): if isinstance(obj, LazySettings): obj = obj._wrapped - return nested_patch( + # I do not os.path.expandvars(val) here, because the Just-docker-compose + # takes care of that for me, so I can still use the envvar names in the + # containers + + obj = nested_patch( obj, lambda k, v: isfunction(v) and hasattr(v, 'settings_property'), lambda k, v: v(obj)) + obj = nested_patch( + obj, + lambda k, v: any(v is not None and isinstance(k, str) + and k.endswith(pattern) + for pattern in filename_suffixes), + lambda k, v: os.path.expanduser(v)) + + return obj + @staticmethod def dumps(obj, **kwargs): ''' diff --git a/terra/core/signals.py b/terra/core/signals.py index 20f2a12f..61b83869 100644 --- a/terra/core/signals.py +++ b/terra/core/signals.py @@ -37,6 +37,7 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. +import os import threading import weakref @@ -197,9 +198,16 @@ def send(self, sender, **named): ------- list Return a list of tuple pairs [(receiver, response), ... ]. + + Environment Variables + --------------------- + TERRA_UNITTEST + Setting this to ``1`` will disable send. This is used during + unittesting to prevent unexpected behavior """ if not self.receivers or \ - self.sender_receivers_cache.get(sender) is NO_RECEIVERS: + self.sender_receivers_cache.get(sender) is NO_RECEIVERS or \ + os.environ.get('TERRA_UNITTEST') == "1": return [] return [ @@ -228,9 +236,16 @@ def send_robust(self, sender, **named): Return a list of tuple pairs [(receiver, response), ... ]. If any receiver raises an error (specifically any subclass of Exception), return the error instance as the result for that receiver. + + Environment Variables + --------------------- + TERRA_UNITTEST + Setting this to ``1`` will disable send. This is used during + unittesting to prevent unexpected behavior """ if not self.receivers or \ - self.sender_receivers_cache.get(sender) is NO_RECEIVERS: + self.sender_receivers_cache.get(sender) is NO_RECEIVERS or \ + os.environ.get('TERRA_UNITTEST') == "1": return [] # Call each receiver with whatever arguments it can accept. @@ -342,7 +357,8 @@ def _decorator(func): return _decorator -__all__ = ['Signal', 'receiver', 'post_settings_configured'] +__all__ = ['Signal', 'receiver', 'post_settings_configured', + 'post_settings_context', 'logger_configure', 'logger_reconfigure'] # a signal for settings done being loaded post_settings_configured = Signal() @@ -353,6 +369,23 @@ def _decorator(func): manual call to :func:`terra.core.settings.LazySettings.configure`. ''' +post_settings_context = Signal() +'''Signal: +Sent after scope __exit__ from a settings context (i.e., with statement). +''' + +logger_configure = Signal() +'''Signal: +Sent to the executor after the logger has been configured. This will happen +after the post_settings_configured signal. +''' + +logger_reconfigure = Signal() +'''Signal: +Sent to the executor after the logger has been reconfigured. This will happen +after the logger_configure signal. +''' + from terra.logger import getLogger # noqa logger = getLogger(__name__) # Must be after post_settings_configured to prevent circular import errors. diff --git a/terra/executor/base.py b/terra/executor/base.py new file mode 100644 index 00000000..288be72e --- /dev/null +++ b/terra/executor/base.py @@ -0,0 +1,18 @@ +from concurrent.futures import Future, Executor + +from terra.logger import getLogger +logger = getLogger(__name__) + + +class BaseExecutor(Executor): + @staticmethod + def configure_logger(sender, **kwargs): + pass + + @staticmethod + def reconfigure_logger(sender, **kwargs): + pass + + +class BaseFuture(Future): + pass diff --git a/terra/executor/celery/__init__.py b/terra/executor/celery/__init__.py index 96420497..dc6b447c 100644 --- a/terra/executor/celery/__init__.py +++ b/terra/executor/celery/__init__.py @@ -1,7 +1,7 @@ -#!/usr/bin/env python - +import sys from os import environ as env +from celery.signals import worker_process_init from celery import Celery from .executor import CeleryExecutor @@ -10,14 +10,23 @@ __all__ = ['CeleryExecutor'] -app = Celery(env['TERRA_CELERY_MAIN_NAME']) + +main_name = env.get('TERRA_CELERY_MAIN_NAME', None) +if main_name is None: + try: + main_name = sys.modules['__main__'].__spec__.name + except AttributeError: + # if __spec__ is None, then __main__ is a builtin + main_name = "main_name_unset__set_TERRA_CELERY_MAIN_NAME" +app = Celery(main_name) + app.config_from_object(env['TERRA_CELERY_CONF']) -# import traceback -# traceback.print_stack() + +@worker_process_init.connect +def start_worker_child(*args, **kwargs): + from terra import settings + settings.terra.zone = 'task' # Running on windows. # https://stackoverflow.com/questions/37255548/how-to-run-celery-on-windows - -if __name__ == '__main__': # pragma: no cover - app.start() diff --git a/terra/executor/celery/__main__.py b/terra/executor/celery/__main__.py new file mode 100644 index 00000000..c1d0dadb --- /dev/null +++ b/terra/executor/celery/__main__.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +from os import environ as env +from . import app + +# Terra +from terra import settings + + +def main(): + if env.get('TERRA_SETTINGS_FILE', '') == '': + settings.configure( + { + 'executor': {'type': 'CeleryExecutor'}, + 'terra': {'zone': 'task_controller'}, + 'logging': {'level': 'NOTSET'} + } + ) + + app.start() + + +if __name__ == '__main__': # pragma: no cover + main() diff --git a/terra/executor/celery/celeryconfig.py b/terra/executor/celery/celeryconfig.py index c53e01d6..e378d0cc 100644 --- a/terra/executor/celery/celeryconfig.py +++ b/terra/executor/celery/celeryconfig.py @@ -21,7 +21,9 @@ result_backend = broker_url task_serializer = 'pickle' +result_serializer = 'pickle' accept_content = ['json', 'pickle'] +result_accept_content = ['json', 'pickle'] result_expires = 3600 # App needs to define include @@ -29,3 +31,4 @@ if celery_include: import ast include = ast.literal_eval(celery_include) + include += type(include)(['terra.tests.demo.tasks']) diff --git a/terra/executor/celery/executor.py b/terra/executor/celery/executor.py index 0f796194..6b607aba 100644 --- a/terra/executor/celery/executor.py +++ b/terra/executor/celery/executor.py @@ -16,18 +16,32 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os from os import environ as env -from concurrent.futures import Future, Executor, as_completed +from concurrent.futures import as_completed from concurrent.futures._base import (RUNNING, FINISHED, CANCELLED, CANCELLED_AND_NOTIFIED) from threading import Lock, Thread import time +from logging import NullHandler, StreamHandler +from logging.handlers import SocketHandler +from celery.signals import setup_logging + +from terra.executor.base import BaseFuture, BaseExecutor +import terra +from terra import settings from terra.logger import getLogger logger = getLogger(__name__) -class CeleryExecutorFuture(Future): +# stop celery from hijacking the logger +@setup_logging.connect +def setup_loggers(*args, **kwargs): + pass + + +class CeleryExecutorFuture(BaseFuture): def __init__(self, asyncresult): self._ar = asyncresult super().__init__() @@ -41,11 +55,12 @@ def cancel(self): Returns True if the future was cancelled, False otherwise. A future cannot be cancelled if it is running or has already completed. """ + logger.debug4(f'Canceling task {self._ar.id}') with self._condition: if self._state in [RUNNING, FINISHED, CANCELLED, CANCELLED_AND_NOTIFIED]: return super().cancel() - # Not running and not canceled. May be possible to cancel! + # Not running and not cancelled. May be possible to cancel! self._ar.ready() # Triggers an update check if self._ar.state != 'REVOKED': self._ar.revoke() @@ -75,7 +90,7 @@ def cancel(self): return result -class CeleryExecutor(Executor): +class CeleryExecutor(BaseExecutor): """ Executor implementation using celery tasks. @@ -134,7 +149,7 @@ def _update_futures(self): ar.ready() # Just trigger the AsyncResult state update check if ar.state == 'REVOKED': - logger.debug1('Celery task "%s" canceled.', ar.id) + logger.warning('Celery task "%s" cancelled.', ar.id) if not fut.cancelled(): if not fut.cancel(): # pragma: no cover logger.error('Future was not running but failed to be cancelled') @@ -142,18 +157,18 @@ def _update_futures(self): # Future is CANCELLED -> CANCELLED_AND_NOTIFIED elif ar.state in ('RUNNING', 'RETRY'): - logger.debug1('Celery task "%s" running.', ar.id) + logger.debug4('Celery task "%s" running.', ar.id) if not fut.running(): fut.set_running_or_notify_cancel() # Future is RUNNING elif ar.state == 'SUCCESS': - logger.debug1('Celery task "%s" resolved.', ar.id) + logger.debug4('Celery task "%s" resolved.', ar.id) fut.set_result(ar.get(disable_sync_subtasks=False)) # Future is FINISHED elif ar.state == 'FAILURE': - logger.debug1('Celery task "%s" resolved with error.', ar.id) + logger.error('Celery task "%s" resolved with error.', ar.id) fut.set_exception(ar.result) # Future is FINISHED @@ -189,9 +204,10 @@ def submit(self, fn, *args, **kwargs): return future def shutdown(self, wait=True): + logger.debug1('Shutting down celery tasks...') with self._shutdown_lock: self._shutdown = True - for fut in self._futures: + for fut in tuple(self._futures): fut.cancel() if wait: @@ -231,21 +247,60 @@ def __init__(self, service_info): volume_map = compute.get_volume_map(config, service_clone) - # # In the case of docker, the config has /tmp_settings in there, this - # # should be removed, as it is not in the celery worker. I don't think it - # # would cause any problems, but it's inaccurate. - # volume_map = [v for v in volume_map if v[1] != '/tmp_settings'] - return volume_map - # optional_args = {} - # optional_args['justfile'] = justfile + @staticmethod + def configure_logger(sender, **kwargs): + if settings.terra.zone == 'task': # pragma: no cover + # This will never really be reached, because the task_controller will + # configure the logger, and then fork. + sender.main_log_handler = NullHandler() + elif settings.terra.zone == 'task_controller': + # Setup log file for use in configure + if os.environ.get('TERRA_DISABLE_TERRA_LOG') != '1': + sender._log_file = os.path.join(settings.processing_dir, + terra.logger._logs.default_log_prefix) + else: + sender._log_file = os.devnull + os.makedirs(settings.processing_dir, exist_ok=True) + sender._log_file = open(sender._log_file, 'a') + sender.main_log_handler = StreamHandler(stream=sender._log_file) + sender.root_logger.addHandler(sender.main_log_handler) - # args = ["--wrap", "Just-docker-compose"] + \ - # sum([['-f', cf] for cf in compose_files], []) + \ - # ['config'] + @staticmethod + def reconfigure_logger(sender, pre_run_task=False, + post_settings_context=False, **kwargs): + if settings.terra.zone == 'task': + if pre_run_task: + if sender.main_log_handler: + sender.main_log_handler.close() + try: + sender.root_logger.removeHandler(sender.main_log_handler) + except ValueError: + pass + sender.main_log_handler = SocketHandler( + settings.logging.server.hostname, + settings.logging.server.port) + sender.root_logger.addHandler(sender.main_log_handler) + if post_settings_context: + # when the celery task is done, its logger is automatically + # reconfigured; use that opportunity to close the stream + if sender.main_log_handler: + sender.main_log_handler.close() + try: + sender.root_logger.removeHandler(sender.main_log_handler) + except ValueError: + pass + sender.main_log_handler = NullHandler() + sender.root_logger.addHandler(sender.main_log_handler) + elif settings.terra.zone == 'task_controller': + if os.environ.get('TERRA_DISABLE_TERRA_LOG') != '1': + log_file = os.path.join(settings.processing_dir, + terra.logger._logs.default_log_prefix) + else: + log_file = os.devnull - # pid = just(*args, stdout=PIPE, - # **optional_args, - # env=service_info.env) - # return pid.communicate()[0] + if log_file != sender._log_file.name: + os.makedirs(settings.processing_dir, exist_ok=True) + sender._log_file.close() + sender._log_file = open(log_file, 'a') diff --git a/terra/executor/dummy.py b/terra/executor/dummy.py index 7450ba11..03ac0626 100644 --- a/terra/executor/dummy.py +++ b/terra/executor/dummy.py @@ -1,13 +1,16 @@ -from concurrent.futures import Future, Executor from threading import Lock +from terra.executor.base import BaseFuture, BaseExecutor +from terra import settings from terra.logger import getLogger logger = getLogger(__name__) -class DummyExecutor(Executor): +class DummyExecutor(BaseExecutor): """ - Executor that does the nothin, just logs what would happen. + Executor that does nothing, just logs what would happen. + + Note: Don't base new executors off of this example """ def __init__(self, *arg, **kwargs): @@ -21,11 +24,15 @@ def submit(self, fn, *args, **kwargs): if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') - f = Future() + original_zone = settings.terra.zone + # Fake the zone for the log messages + settings.terra.zone = 'task' + f = BaseFuture() logger.info(f'Run function: {fn}') logger.info(f'With args: {args}') logger.info(f'With kwargs: {kwargs}') f.set_result(None) + settings.terra.zone = original_zone return f def shutdown(self, wait=True): diff --git a/terra/executor/process.py b/terra/executor/process.py new file mode 100644 index 00000000..bc42e7a0 --- /dev/null +++ b/terra/executor/process.py @@ -0,0 +1,9 @@ +import concurrent.futures +import terra.executor.base + +__all__ = ['ProcessPoolExecutor'] + + +class ProcessPoolExecutor(concurrent.futures.ProcessPoolExecutor, + terra.executor.base.BaseExecutor): + pass diff --git a/terra/executor/sync.py b/terra/executor/sync.py index 2eba10cb..423ccdb7 100644 --- a/terra/executor/sync.py +++ b/terra/executor/sync.py @@ -1,11 +1,10 @@ -from concurrent.futures import Future, Executor from threading import Lock - -# No need for a global shutdown lock here, not multi-threaded/process +from terra.executor.base import BaseExecutor, BaseFuture -class SyncExecutor(Executor): +# No need for a global shutdown lock here, not multi-threaded/process +class SyncExecutor(BaseExecutor): """ Executor that does the job synchronously. @@ -27,7 +26,7 @@ def submit(self, fn, *args, **kwargs): if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') - f = Future() + f = BaseFuture() try: result = fn(*args, **kwargs) except BaseException as e: diff --git a/terra/executor/thread.py b/terra/executor/thread.py new file mode 100644 index 00000000..96940055 --- /dev/null +++ b/terra/executor/thread.py @@ -0,0 +1,39 @@ +import concurrent.futures + +import terra.executor.base +import terra.core.settings + +__all__ = ['ThreadPoolExecutor'] + + +class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor, + terra.executor.base.BaseExecutor): + ''' + Terra version of :class:`concurrent.futures.ThreadPoolExecutor` + + Unlike other executors, :class:`ThreadPoolExecutor` has no process isolation. + This results in a scenario where multiple threads could use terra settings to + influence each other, which is not typical behavior, given that all other + executors have process isolation and do not allow this. + + :class:`ThreadPoolExecutor` will downcast :obj:`terra.core.settings` to a + thread-safe :class:`terra.core.settings.LazySettingsThreaded` where each + Executor thread has it's own thread local storage version of the settings + structure. + + This behavior is limited to threads started by :class:`ThreadPoolExecutor` + only. All other threads will have normal thread behavior with the runner + threads, and use a single version of the settings. The only side effect is if + a task starts its own thread, it will be treated as one of the runner + threads, not a task thread. The currently known downside to this is log + messages will be reported as coming from the runner rather than task zone. + However, any attempts to edit settings from this rogue thread could + potentially have other unintended consequences. + ''' + + def __init__(self, *args, **kwargs): + # Make terra.setting "thread safe" + if not isinstance(terra.settings, + terra.core.settings.LazySettingsThreaded): + terra.core.settings.LazySettingsThreaded.downcast(terra.settings) + super().__init__(*args, **kwargs) diff --git a/terra/executor/utils.py b/terra/executor/utils.py index d7432a22..ad199ac3 100644 --- a/terra/executor/utils.py +++ b/terra/executor/utils.py @@ -1,7 +1,9 @@ -import concurrent.futures +from importlib import import_module + from terra import settings +import terra.core.signals from terra.core.utils import ClassHandler -from importlib import import_module +import terra.logger class ExecutorHandler(ClassHandler): @@ -33,13 +35,17 @@ def _connect_backend(self): elif backend_name == "SyncExecutor": from terra.executor.sync import SyncExecutor return SyncExecutor - elif backend_name == "ThreadPoolExecutor": - return concurrent.futures.ThreadPoolExecutor - elif backend_name == "ProcessPoolExecutor": - return concurrent.futures.ProcessPoolExecutor + elif backend_name == "ThreadPoolExecutor" or \ + backend_name == "concurrent.futures.ThreadPoolExecutor": + from terra.executor.thread import ThreadPoolExecutor + return ThreadPoolExecutor + elif backend_name == "ProcessPoolExecutor" or \ + backend_name == "concurrent.futures.ProcessPoolExecutor": + from terra.executor.process import ProcessPoolExecutor + return ProcessPoolExecutor elif backend_name == "CeleryExecutor": - import terra.executor.celery - return terra.executor.celery.CeleryExecutor + from terra.executor.celery import CeleryExecutor + return CeleryExecutor else: module_name = backend_name.rsplit('.', 1) module = import_module(f'{module_name[0]}') @@ -47,7 +53,9 @@ def _connect_backend(self): def configuration_map(self, service_info): if not hasattr(self._connection, 'configuration_map'): - return {} + # Default behavior + return [] + # else call the class specific implementation return self._connection.configuration_map(service_info) @@ -55,3 +63,12 @@ def configuration_map(self, service_info): '''ExecutorHandler: The executor handler that all services will be interfacing with when running parallel computation tasks. ''' +# This Executor type is setup automatically, via +# Handler.__getattr__ => Handler._connection => Executor._connect_backend, +# when the signal is sent. So use a lambda to delay getattr +terra.core.signals.logger_configure.connect( + lambda *args, **kwargs: Executor.configure_logger(*args, **kwargs), + weak=False) +terra.core.signals.logger_reconfigure.connect( + lambda *args, **kwargs: Executor.reconfigure_logger(*args, **kwargs), + weak=False) diff --git a/terra/logger.py b/terra/logger.py index 7357d0b3..c63b18a0 100644 --- a/terra/logger.py +++ b/terra/logger.py @@ -41,8 +41,8 @@ To use the logger, in any module always: ``` -from terra.logging import get_logger -logger = get_logger(__name__) +from terra.logging import getLogger +logger = getLogger(__name__) ``` And then use the ``logger`` object anywhere in the module. This logger is a @@ -67,18 +67,26 @@ import io import warnings from datetime import datetime, timezone +import socketserver +import struct +import select +import pickle +import terra from terra.core.exceptions import ImproperlyConfigured +# Do not import terra.settings or terra.signals here, or any module that +# imports them from logging import ( - CRITICAL, ERROR, INFO, FATAL, WARN, WARNING, NOTSET, - getLogger, _acquireLock, _releaseLock, currentframe, + CRITICAL, ERROR, INFO, FATAL, WARN, WARNING, NOTSET, Filter, + getLogger, _acquireLock, _releaseLock, currentframe, Formatter, _srcfile as logging_srcfile, Logger as Logger_original ) __all__ = ['getLogger', 'CRITICAL', 'ERROR', 'INFO', 'FATAL', 'WARN', - 'WARNING', 'NOTSET', 'DEBUG1', 'DEBUG2', 'DEBUG3', 'Logger'] + 'WARNING', 'NOTSET', 'DEBUG1', 'DEBUG2', 'DEBUG3', 'DEBUG4', + 'Logger'] class HandlerLoggingContext(object): @@ -118,32 +126,115 @@ def __exit__(self, et, ev, tb): # implicit return of None => don't swallow exceptions +# from https://docs.python.org/3/howto/logging-cookbook.html +class LogRecordStreamHandler(socketserver.StreamRequestHandler): + """Handler for a streaming logging request. + + This basically logs the record using whatever logging policy is + configured locally. + """ + + def handle(self): + """ + Handle multiple requests - each expected to be a 4-byte length, + followed by the LogRecord in pickle format. Logs the record + according to whatever policy is configured locally. + """ + while True: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack('>L', chunk)[0] + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unPickle(chunk) + record = logging.makeLogRecord(obj) + self.handleLogRecord(record) + + def unPickle(self, data): + return pickle.loads(data) + + def handleLogRecord(self, record): + # if a name is specified, we use the named logger rather than the one + # implied by the record. + if self.server.logname is not None: + name = self.server.logname + else: + name = record.name + logger = getLogger(name) + # N.B. EVERY record gets logged. This is because Logger.handle + # is normally called AFTER logger-level filtering. If you want + # to do filtering, do it at the client end to save wasting + # cycles and network bandwidth! + logger.handle(record) + + +class LogRecordSocketReceiver(socketserver.ThreadingTCPServer): + """ + Simple TCP socket-based logging receiver suitable for testing. + """ + + allow_reuse_address = True + + def __init__(self, host='localhost', + port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, + handler=LogRecordStreamHandler): + socketserver.ThreadingTCPServer.__init__(self, (host, port), handler) + self.abort = False + self.ready = False + self.timeout = 0.1 + self.logname = None + + def serve_until_stopped(self): + abort = False + self.ready = True + while not abort: + rd, wr, ex = select.select([self.socket.fileno()], + [], [], + self.timeout) + if rd: + self.handle_request() + abort = self.abort + self.ready = False + + class _SetupTerraLogger(): ''' A simple logger class used internally to configure the logger before and after :data:`terra.settings` is configured ''' - default_formatter = logging.Formatter('%(asctime)s (preconfig) : ' - '%(levelname)s - %(message)s') + default_formatter = logging.Formatter('%(asctime)s (%(hostname)s:%(zone)s) :' + ' %(levelname)s - %(filename)s -' + ' %(message)s') default_stderr_handler_level = logging.WARNING default_tmp_prefix = "terra_initial_tmp_" default_log_prefix = "terra_log" def __init__(self): self._configured = False + + # This must always use logging's getLogger. If a custom Terra getLogger is + # ever defined, don't use it to get the root logger self.root_logger = logging.getLogger(None) self.root_logger.setLevel(0) + # Add the Terra filter to the rootlogger, so that it gets the same extra + # args any other terra.logger.Logger would get + self.root_logger.addFilter(TerraAddFilter()) # stream -> stderr self.stderr_handler = logging.StreamHandler(sys.stderr) self.stderr_handler.setLevel(self.default_stderr_handler_level) self.stderr_handler.setFormatter(self.default_formatter) + self.stderr_handler.addFilter(StdErrFilter()) self.root_logger.addHandler(self.stderr_handler) # Set up temporary file logger - self.tmp_file = tempfile.NamedTemporaryFile(mode="w+", - prefix=self.default_tmp_prefix, - delete=False) + if os.environ.get('TERRA_DISABLE_TERRA_LOG') != '1': + self.tmp_file = tempfile.NamedTemporaryFile( + mode="w+", prefix=self.default_tmp_prefix, delete=False) + else: + self.tmp_file = open(os.devnull, mode='w+') self.tmp_handler = logging.StreamHandler(stream=self.tmp_file) self.tmp_handler.setLevel(0) self.tmp_handler.setFormatter(self.default_formatter) @@ -154,13 +245,14 @@ def __init__(self): logging.handlers.MemoryHandler(capacity=1000) self.preconfig_stderr_handler.setLevel(0) self.preconfig_stderr_handler.setFormatter(self.default_formatter) + self.preconfig_stderr_handler.addFilter(StdErrFilter()) self.root_logger.addHandler(self.preconfig_stderr_handler) - self.preconfig_file_handler = \ + self.preconfig_main_log_handler = \ logging.handlers.MemoryHandler(capacity=1000) - self.preconfig_file_handler.setLevel(0) - self.preconfig_file_handler.setFormatter(self.default_formatter) - self.root_logger.addHandler(self.preconfig_file_handler) + self.preconfig_main_log_handler.setLevel(0) + self.preconfig_main_log_handler.setFormatter(self.default_formatter) + self.root_logger.addHandler(self.preconfig_main_log_handler) # Replace the exception hook with our exception handler self.setup_logging_exception_hook() @@ -175,6 +267,34 @@ def __init__(self): # Enable warnings to default warnings.simplefilter('default') + # Disable known warnings that there's nothing to be done about. + for module in ('yaml', 'celery.app.amqp'): + warnings.filterwarnings("ignore", + category=DeprecationWarning, module=module, + message="Using or importing the ABCs") + warnings.filterwarnings("ignore", + category=DeprecationWarning, module='osgeo', + message="the imp module is deprecated") + + # This disables a message that spams the screen: + # "pipbox received method enable_events() [reply_to:None ticket:None]" + # This is the only debug message in all of kombu.pidbox, so this is pretty + # safe to do + pidbox_logger = getLogger('kombu.pidbox') + pidbox_logger.setLevel(INFO) + + @property + def main_log_handler(self): + try: + return self.__main_log_handler + except AttributeError: + raise AttributeError("'_logs' has no 'main_log_handler'. An executor " + "class' 'configure_logger' method should setup a " + "'main_log_handler'.") + + @main_log_handler.setter + def main_log_handler(self, value): + self.__main_log_handler = value def setup_logging_exception_hook(self): ''' @@ -191,12 +311,27 @@ def setup_logging_exception_hook(self): def handle_exception(exc_type, exc_value, exc_traceback): # Try catch here because I want to make sure the original hook is called try: - logger.error("Uncaught exception", - exc_info=(exc_type, exc_value, exc_traceback)) + # Use getLogger instead of logger (defined below) incase there is an + # exception on import, this will make it easier to get a normal error + # message + getLogger(__name__).critical("Uncaught exception", + extra={'skip_stderr': True}, + exc_info=(exc_type, + exc_value, + exc_traceback)) except Exception: # pragma: no cover - print('There was an exception logging in the execpetion handler!') + print('There was an exception logging in the exception handler!', + file=sys.stderr) traceback.print_exc() + try: + from terra import settings + zone = settings.terra.zone + except Exception: + zone = 'preconfig' + print(f'Exception in {zone} on {platform.node()}', + file=sys.stderr) + return original_hook(exc_type, exc_value, exc_traceback) # Replace the hook @@ -221,8 +356,23 @@ def setup_logging_ipython_exception_hook(self): original_exception = InteractiveShell.showtraceback def handle_traceback(*args, **kwargs): # pragma: no cover - getLogger(__name__).error("Uncaught exception", - exc_info=sys.exc_info()) + try: + getLogger(__name__).critical("Uncaught exception", + extra={'skip_stderr': True}, + exc_info=sys.exc_info()) + except Exception: + print('There was an exception logging in the exception handler!', + file=sys.stderr) + traceback.print_exc() + + try: + from terra import settings + zone = settings.terra.zone + except Exception: + zone = 'preconfig' + print(f'Exception in {zone} on {platform.node()}', + file=sys.stderr) + return original_exception(*args, **kwargs) InteractiveShell.showtraceback = handle_traceback @@ -230,7 +380,31 @@ def handle_traceback(*args, **kwargs): # pragma: no cover except ImportError: # pragma: no cover pass - def configure_logger(self, sender, **kwargs): + def set_level_and_formatter(self): + from terra import settings + formatter = logging.Formatter(fmt=settings.logging.format, + datefmt=settings.logging.date_format, + style=settings.logging.style) + + stderr_formatter = ColorFormatter(fmt=settings.logging.format, + datefmt=settings.logging.date_format, + style=settings.logging.style) + + # Configure log level + level = settings.logging.level + if isinstance(level, str): + # make level case insensitive + level = level.upper() + + if getattr(self, 'stderr_handler', None) is not None: + self.stderr_handler.setLevel(level) + self.stderr_handler.setFormatter(stderr_formatter) + + if getattr(self, 'main_log_handler', None) is not None: + self.main_log_handler.setLevel(level) + self.main_log_handler.setFormatter(formatter) + + def configure_logger(self, sender=None, signal=None, **kwargs): ''' Call back function to configure the logger after settings have been configured @@ -244,41 +418,25 @@ def configure_logger(self, sender, **kwargs): "unexpected") raise ImproperlyConfigured() - formatter = logging.Formatter(fmt=settings.logging.format, - datefmt=settings.logging.date_format, - style=settings.logging.style) - - # Setup log file for use in configure - self.log_file = os.path.join(settings.processing_dir, - self.default_log_prefix) - os.makedirs(settings.processing_dir, exist_ok=True) - self.log_file = open(self.log_file, 'a') - - self.file_handler = logging.StreamHandler(stream=self.log_file) - - # Configure log level - level = settings.logging.level - if isinstance(level, str): - # make level case insensitive - level = level.upper() - self.stderr_handler.setLevel(level) - self.file_handler.setLevel(level) - - # Configure format - self.file_handler.setFormatter(formatter) - self.stderr_handler.setFormatter(formatter) + # This sends a signal to the current Executor type, which has already been + # imported at the end of LazySettings.configure. We don't import Executor + # here to reduce the concerns of this module + import terra.core.signals + terra.core.signals.logger_configure.send(sender=self, **kwargs) + self.set_level_and_formatter() - # Swap some handlers - self.root_logger.addHandler(self.file_handler) + # Now that the real logger has been set up, swap some handlers self.root_logger.removeHandler(self.preconfig_stderr_handler) - self.root_logger.removeHandler(self.preconfig_file_handler) + self.root_logger.removeHandler(self.preconfig_main_log_handler) self.root_logger.removeHandler(self.tmp_handler) - settings_dump = os.path.join(settings.processing_dir, - datetime.now(timezone.utc).strftime( - 'settings_%Y_%m_%d_%H_%M_%S_%f.json')) - with open(settings_dump, 'w') as fid: - fid.write(TerraJSONEncoder.dumps(settings, indent=2)) + if os.environ.get('TERRA_DISABLE_SETTINGS_DUMP') != '1': + settings_dump = os.path.join( + settings.processing_dir, + datetime.now(timezone.utc).strftime( + f'settings_{settings.terra.uuid}_%Y_%m_%d_%H_%M_%S_%f.json')) + with open(settings_dump, 'w') as fid: + fid.write(TerraJSONEncoder.dumps(settings, indent=2)) # filter the stderr buffer self.preconfig_stderr_handler.buffer = \ @@ -288,38 +446,106 @@ def configure_logger(self, sender, **kwargs): # level messages. This is probably not necessary because error/critical # messages before configure should be rare, and are probably worth # repeating. Repeating is the only way to get them formatted right the - # second time anyways. This applys to stderr only, not the log file + # second time anyways. This applies to stderr only, not the log file # if (x.levelno >= level)] and # (x.levelno < default_stderr_handler_level)] # Filter file buffer. Never remove default_stderr_handler_level message, # they won't be in the new output file - self.preconfig_file_handler.buffer = \ - [x for x in self.preconfig_file_handler.buffer - if (x.levelno >= self.file_handler.level)] + self.preconfig_main_log_handler.buffer = \ + [x for x in self.preconfig_main_log_handler.buffer + if (x.levelno >= self.main_log_handler.level)] # Flush the buffers self.preconfig_stderr_handler.setTarget(self.stderr_handler) self.preconfig_stderr_handler.flush() self.preconfig_stderr_handler = None - self.preconfig_file_handler.setTarget(self.file_handler) - self.preconfig_file_handler.flush() - self.preconfig_file_handler = None + self.preconfig_main_log_handler.setTarget(self.main_log_handler) + self.preconfig_main_log_handler.flush() + self.preconfig_main_log_handler = None self.tmp_handler = None # Remove the temporary file now that you are done with it self.tmp_file.close() - os.unlink(self.tmp_file.name) + if os.path.exists(self.tmp_file.name) and self.tmp_file.name != os.devnull: + os.unlink(self.tmp_file.name) self.tmp_file = None self._configured = True + def reconfigure_logger(self, sender=None, signal=None, **kwargs): + if not self._configured: + self.root_logger.error("It is unexpected for reconfigure_logger to be " + "called, without first calling configure_logger. " + "This is not critical, but should not happen.") + + # This sends a signal to the current Executor type, which has already been + # imported at the end of LazySettings.configure. We don't import Executor + # here to reduce the concerns of this module + import terra.core.signals + terra.core.signals.logger_reconfigure.send(sender=self, **kwargs) -extra_logger_variables = {'hostname': platform.node()} -'''dict: Extra logger variables that can be reference in log messages''' + self.set_level_and_formatter() + + +class TerraAddFilter(Filter): + def filter(self, record): + if not hasattr(record, 'hostname'): + record.hostname = platform.node() + if not hasattr(record, 'zone'): + try: + if terra.settings.configured: + record.zone = terra.settings.terra.zone + else: + record.zone = 'preconfig' + except BaseException: + record.zone = 'preconfig' + return True + + +class StdErrFilter(Filter): + def filter(self, record): + return not getattr(record, 'skip_stderr', False) + + +class SkipStdErrAddFilter(Filter): + def filter(self, record): + record.skip_stderr = getattr(record, 'skip_stderr', True) + return True + + +class ColorFormatter(Formatter): + use_color = True + + def format(self, record): + if self.use_color: + zone = record.__dict__.get('zone', 'preconfig') + if zone == "preconfig": + record.__dict__['zone'] = '\033[33mpreconfig\033[0m' + elif zone == "controller": + record.__dict__['zone'] = '\033[32mcontroller\033[0m' + elif zone == "runner": + record.__dict__['zone'] = '\033[35mrunner\033[0m' + elif zone == "task": + record.__dict__['zone'] = '\033[34mtask\033[0m' + else: + record.__dict__['zone'] = f'\033[31m{record.__dict__["zone"]}\033[0m' + + msg = super().format(record) + record.__dict__['zone'] = zone + return msg + else: + return super().format(record) class Logger(Logger_original): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # I like https://stackoverflow.com/a/17558764/4166604 better than + # https://stackoverflow.com/a/28050837/4166604, it has the ability to add + # logic/function calls, if I so desire + self.addFilter(TerraAddFilter()) + def findCaller(self, stack_info=False, stacklevel=1): """ Find the stack frame of the caller so that we can note the source @@ -357,11 +583,8 @@ def findCaller(self, stack_info=False, stacklevel=1): break return rv - # Define _log instead of logger adapter, this works better (setLoggerClass) - # https://stackoverflow.com/a/28050837/4166604 - def _log(self, *args, **kwargs): - kwargs['extra'] = extra_logger_variables - return super()._log(*args, **kwargs) + # Define _log instead of logger adapter if needed, this works better + # (setLoggerClass) https://stackoverflow.com/a/28050837/4166604 def debug1(self, msg, *args, **kwargs): ''' @@ -384,6 +607,13 @@ def debug3(self, msg, *args, **kwargs): ''' self.log(DEBUG3, msg, *args, **kwargs) + def debug4(self, msg, *args, **kwargs): + ''' + Logs a message with level :data:`DEBUG4` on this logger. The arguments are + interpreted as for :func:`logging.debug` + ''' + self.log(DEBUG4, msg, *args, **kwargs) + fatal = logging.LoggerAdapter.critical @@ -398,6 +628,7 @@ def handle_warning(message, category, filename, lineno, file=None, line=None): it will call warnings.formatwarning and will log the resulting string to a warnings logger named "py.warnings" with level logging.WARNING. """ + if file is not None: # I don't actually know how this can be not None if _warnings_showwarning is not None: # pragma: no cover _warnings_showwarning(message, category, filename, lineno, file, line) @@ -407,8 +638,13 @@ def handle_warning(message, category, filename, lineno, file=None, line=None): logger.warning("%s", s) +# Ordinarily we would use __file__ for this, but frozen modules don't always +# have __file__ set, for some reason (see Issue CPython#21736). Thus, we get +# the filename from a handy code object from a function defined in this +# module. (There's no particular reason for picking debug1.) _srcfiles = (logging_srcfile, - os.path.normcase(Logger.debug1.__code__.co_filename)) + os.path.normcase(Logger.debug1.__code__.co_filename), + warnings.showwarning.__code__.co_filename) DEBUG1 = 10 @@ -434,24 +670,43 @@ def handle_warning(message, category, filename, lineno, file=None, line=None): output used to debug algorithms ''' +DEBUG4 = 7 +''' +Debug level four, even more verbose. + +Should be used for spamming the screen +''' + + logging.addLevelName(DEBUG1, "DEBUG1") logging.addLevelName(DEBUG2, "DEBUG2") logging.addLevelName(DEBUG3, "DEBUG3") +logging.addLevelName(DEBUG4, "DEBUG4") logging.setLoggerClass(Logger) # Get the logger here, AFTER all the changes to the logger class logger = getLogger(__name__) -# Disable log setup for unittests. Can't use settings here ;) -if os.environ.get('TERRA_UNITTEST', None) != "1": # pragma: no cover + +def _setup_terra_logger(): # Must be import signal after getLogger is defined... Currently this is # imported from logger. But if a custom getLogger is defined eventually, it # will need to be defined before importing terra.core.signals. import terra.core.signals # Configure logging (pre configure) - _logs = _SetupTerraLogger() + logs = _SetupTerraLogger() + + # Register post_configure with settings + terra.core.signals.post_settings_configured.connect(logs.configure_logger) + + # Handle a "with" settings context manager + terra.core.signals.post_settings_context.connect(logs.reconfigure_logger) - # register post_configure with settings - terra.core.signals.post_settings_configured.connect(_logs.configure_logger) + return logs + + +# Disable log setup for unittests. Can't use settings here ;) +if os.environ.get('TERRA_UNITTEST', None) != "1": # pragma: no cover + _logs = _setup_terra_logger() diff --git a/terra/task.py b/terra/task.py new file mode 100644 index 00000000..e9553371 --- /dev/null +++ b/terra/task.py @@ -0,0 +1,145 @@ +import os +from tempfile import gettempdir + +from celery import shared_task as original_shared_task +from celery.app.task import Task + +from vsi.tools.python import args_to_kwargs, ARGS, KWARGS + +from terra import settings +from terra.core.settings import TerraJSONEncoder +import terra.logger +import terra.compute.utils +from terra.logger import getLogger +logger = getLogger(__name__) + +__all__ = ['TerraTask', 'shared_task'] + + +# Take the shared task decorator, and add some Terra defaults, so you don't +# need to specify them EVERY task +def shared_task(*args, **kwargs): + kwargs['bind'] = kwargs.pop('bind', True) + kwargs['base'] = kwargs.pop('base', TerraTask) + return original_shared_task(*args, **kwargs) + + +class TerraTask(Task): + def _get_volume_mappings(self): + executor_volume_map = self.request.settings['executor']['volume_map'] + + if executor_volume_map: + compute_volume_map = \ + self.request.settings['compute']['volume_map'] + # Flip each mount point, so it goes from runner to controller + reverse_compute_volume_map = [[x[1], x[0]] + for x in compute_volume_map] + # Reverse order. This will be important in case one mount point mounts + # inside another + reverse_compute_volume_map.reverse() + + reverse_executor_volume_map = [[x[1], x[0]] + for x in executor_volume_map] + reverse_executor_volume_map.reverse() + + else: + reverse_compute_volume_map = [] + compute_volume_map = [] + reverse_executor_volume_map = [] + + return (compute_volume_map, reverse_compute_volume_map, + executor_volume_map, reverse_executor_volume_map) + + def translate_paths(self, payload, reverse_compute_volume_map, + executor_volume_map): + if reverse_compute_volume_map or executor_volume_map: + # If either translation is needed, start by applying the ~ home dir + # expansion and settings_property (which wouldn't have made it through + # pure json conversion, but the ~ will) + payload = TerraJSONEncoder.serializableSettings(payload) + # Go from compute runner to master controller + if reverse_compute_volume_map: + payload = terra.compute.utils.translate_settings_paths( + payload, reverse_compute_volume_map) + # Go from master controller to executor + if executor_volume_map: + payload = terra.compute.utils.translate_settings_paths( + payload, executor_volume_map) + return payload + + # Don't need to apply translations for apply, it runs locally + # def apply(self, *args, **kwargs): + + # apply_async needs to smuggle a copy of the settings to the task + def apply_async(self, args=None, kwargs=None, task_id=None, + *args2, **kwargs2): + current_settings = TerraJSONEncoder.serializableSettings(settings) + return super().apply_async(args=args, kwargs=kwargs, + headers={'settings': current_settings}, + task_id=task_id, *args2, **kwargs2) + + def __call__(self, *args, **kwargs): + # this is only set when apply_async was called. + if getattr(self.request, 'settings', None): + if not settings.configured: + # Cover a potential (unlikely) corner case where setting might not be + # configured yet + settings.configure({'processing_dir': gettempdir()}) + + # Create a settings context, so I can replace it with the task's settings + with settings: + # Calculate the exector's mapped version of the runner's settings + compute_volume_map, reverse_compute_volume_map, \ + executor_volume_map, reverse_executor_volume_map = \ + self._get_volume_mappings() + + # Load the executor version of the runner's settings + settings._wrapped.clear() + settings._wrapped.update(self.translate_paths( + self.request.settings, + reverse_compute_volume_map, + executor_volume_map)) + # This is needed here because I just loaded settings from a runner! + settings.terra.zone = 'task' + + # Just in case processing dir doesn't exist + if not os.path.exists(settings.processing_dir): + logger.critical(f'Dir "{settings.processing_dir}" is not accessible ' + 'by the executor, please make sure the worker has ' + 'access to this directory') + settings.processing_dir = gettempdir() + logger.warning('Using temporary directory: ' + f'"{settings.processing_dir}" for the processing dir') + + # Calculate the executor's mapped version of the arguments + kwargs = args_to_kwargs(self.run, args, kwargs) + args_only = kwargs.pop(ARGS, ()) + kwargs.update(kwargs.pop(KWARGS, ())) + kwargs = self.translate_paths(kwargs, + reverse_compute_volume_map, + executor_volume_map) + # Set up logger to talk to master controller + terra.logger._logs.reconfigure_logger(pre_run_task=True) + return_value = self.run(*args_only, **kwargs) + + # Calculate the runner mapped version of the executor's return value + return_value = self.translate_paths(return_value, + reverse_executor_volume_map, + compute_volume_map) + else: + # Must call (synchronous) apply or python __call__ with no volume + # mappings + if settings.configured: + original_zone = settings.terra.zone + settings.terra.zone = 'task' + try: + return_value = self.run(*args, **kwargs) + finally: + if settings.configured: + settings.terra.zone = original_zone + return return_value + + # # from https://stackoverflow.com/a/45333231/1771778 + # def on_failure(self, exc, task_id, args, kwargs, einfo): + # logger.exception('Celery task failure!!!', exc_info=exc) + # return super().on_failure(exc, task_id, args, kwargs, einfo) diff --git a/terra/tests/__init__.py b/terra/tests/__init__.py index e0ed1ce1..ae00379d 100644 --- a/terra/tests/__init__.py +++ b/terra/tests/__init__.py @@ -1,12 +1,17 @@ import os +import warnings + + +original_environ = os.environ.copy() # Use this as a package level setup def load_tests(loader, standard_tests, pattern): if os.environ.get('TERRA_UNITTEST', None) != "1": - print('WARNING: Running terra tests without setting TERRA_UNITTEST will ' - 'result in side effects such as extraneouse log files being ' - 'generated') + warnings.warn( + 'WARNING: Running terra tests without setting TERRA_UNITTEST will ' + 'result in side effects such as extraneous log files being ' + 'generated') this_dir = os.path.dirname(__file__) package_tests = loader.discover(start_dir=this_dir, pattern=pattern) diff --git a/terra/tests/demo/__init__.py b/terra/tests/demo/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/terra/tests/demo/__main__.py b/terra/tests/demo/__main__.py new file mode 100644 index 00000000..ec090bdf --- /dev/null +++ b/terra/tests/demo/__main__.py @@ -0,0 +1,103 @@ +''' +Demo app that tests if a terra config is working + +*** WARNING *** This will spin up real computers and workers, if you are +configured to do so. May result in a small amount of billing. +''' + +from os import environ as env +import tempfile +import os +import json +import pydoc + +from terra import settings +from terra.core.settings import ENVIRONMENT_VARIABLE, settings_property +from terra.core.exceptions import ImproperlyConfigured +from terra.utils.cli import FullPaths, ArgumentParser + + +@settings_property +def singularity_unset(self): + raise ImproperlyConfigured('You must set --compose and --service for ' + 'singularity') + + +def demo_templates(): + docker = { + "demo": {"compose": os.path.join(env['TERRA_TERRA_DIR'], + 'docker-compose-main.yml'), + "service": "terra-demo"} + } + + singularity = { + "demo": {"compose": singularity_unset, + "service": singularity_unset} + } + + templates = [ + ({"compute": {"arch": "docker"}}, docker), + ({"compute": {"arch": "terra.compute.docker"}}, docker), + ({"compute": {"arch": "singularity"}}, singularity), + ({"compute": {"arch": "terra.compute.singularity"}}, singularity) + ] + return templates + + +def get_parser(): + parser = ArgumentParser(description="View Angle Runner") + aa = parser.add_argument + aa('--loglevel', type=str, help="Log level", default=None) + aa('--compose', type=str, default=None, + help="Compose filename (for docker/singularity)") + aa('--service', type=str, default=None, + help="Service name (for docker/singularity)") + aa('settings', type=str, help="JSON settings file", + default=os.environ.get(ENVIRONMENT_VARIABLE), action=FullPaths) + return parser + + +def main(processing_dir, args=None): + args = get_parser().parse_args(args) + + # Load settings + with open(args.settings, 'r') as fid: + settings_json = json.load(fid) + + # Patch settings for demo + settings_json['processing_dir'] = processing_dir + settings_json['demo'] = {} + + if args.compose: + settings_json['demo']['compose'] = args.compose + if args.service: + settings_json['demo']['service'] = args.service + if args.loglevel: + try: + settings_json['logging']['level'] = args.loglevel + except KeyError: + settings_json['logging'] = {'level': args.loglevel} + + # Configure settings + settings.add_templates(demo_templates()) + settings.configure(settings_json) + + # import pprint + # pprint.pprint(settings) + + # Run workflow + from .workflows import DemoWorkflow + DemoWorkflow().demonate() + + +if __name__ == '__main__': + processing_dir = tempfile.TemporaryDirectory() + try: + main(processing_dir.name) + with open(os.path.join(processing_dir.name, 'terra_log'), 'r') as fid: + print('-------------------') + print('Paging log messages') + print('-------------------') + pydoc.pager(fid.read()) + finally: + processing_dir.cleanup() diff --git a/terra/tests/demo/runners/__init__.py b/terra/tests/demo/runners/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/terra/tests/demo/runners/demo1.py b/terra/tests/demo/runners/demo1.py new file mode 100644 index 00000000..0861a97f --- /dev/null +++ b/terra/tests/demo/runners/demo1.py @@ -0,0 +1,21 @@ +''' +Demo app that tests if a terra config is working + +*** WARNING *** This will spin up real computers and workers, if you are +configured to do so. May result in a small amount of billing. +''' + +from terra.utils.cli import ArgumentParser +from terra import settings +from terra.logger import getLogger +logger = getLogger(__name__) + + +def main(args=None): + settings.terra.zone + logger.critical('Demo 1') + + +if __name__ == '__main__': + ArgumentParser().parse_args() + main() diff --git a/terra/tests/demo/runners/demo2.py b/terra/tests/demo/runners/demo2.py new file mode 100644 index 00000000..0621fae8 --- /dev/null +++ b/terra/tests/demo/runners/demo2.py @@ -0,0 +1,37 @@ +''' +Demo app that tests if a terra config is working + +*** WARNING *** This will spin up real computers and workers, if you are +configured to do so. May result in a small amount of billing. +''' + +import concurrent.futures + +from terra.tests.demo.tasks import demo2 +from terra.utils.cli import ArgumentParser +from terra.executor import Executor +from terra import settings +from terra.logger import getLogger +logger = getLogger(__name__) + + +def main(args=None): + settings.terra.zone + logger.critical('Demo 2 Starting') + + futures = {} + + with Executor(max_workers=4) as executor: + for x in range(1, 3): + for y in range(4, 6): + futures[executor.submit(demo2, x, y)] = (x, y) + + for future in concurrent.futures.as_completed(futures): + logger.info(f'Completed: {settings.terra.zone} {futures[future]}') + + logger.critical('Demo 2 Done') + + +if __name__ == '__main__': + ArgumentParser().parse_args() + main() diff --git a/terra/tests/demo/services.py b/terra/tests/demo/services.py new file mode 100644 index 00000000..f33629e8 --- /dev/null +++ b/terra/tests/demo/services.py @@ -0,0 +1,73 @@ +from terra import settings +from terra.compute.docker import ( + Service as DockerService, + Compute as DockerCompute +) +from terra.compute.singularity import ( + Service as SingularityService, + Compute as SingularityCompute +) +from terra.compute.virtualenv import ( + Service as VirtualEnvService, + Compute as VirtualEnvCompute +) +from terra.compute.base import BaseService + + +class Demo1(BaseService): + ''' Simple Demo Service ''' + command = ['python', '-m', 'terra.tests.demo.runners.demo1'] + CONTAINER_PROCESSING_DIR = "/opt/test" + + def pre_run(self): + self.add_volume(settings.processing_dir, + Demo1.CONTAINER_PROCESSING_DIR, + 'rw') + super().pre_run() + + +@DockerCompute.register(Demo1) +class Demo1_docker(DockerService, Demo1): + def __init__(self): + super().__init__() + self.compose_files = [settings.demo.compose] + self.compose_service_name = settings.demo.service + + +@SingularityCompute.register(Demo1) +class Demo1_singularity(SingularityService, Demo1): + def __init__(self): + super().__init__() + self.compose_files = [settings.demo.compose] + self.compose_service_name = settings.demo.service + + +@VirtualEnvCompute.register(Demo1) +class Demo1_virtualenv(VirtualEnvService, Demo1): + pass + + +class Demo2(Demo1): + ''' Simple Demo Service ''' + command = ['python', '-m', 'terra.tests.demo.runners.demo2'] + + +@DockerCompute.register(Demo2) +class Demo2_docker(DockerService, Demo2): + def __init__(self): + super().__init__() + self.compose_files = [settings.demo.compose] + self.compose_service_name = settings.demo.service + + +@SingularityCompute.register(Demo2) +class Demo2_singularity(SingularityService, Demo2): + def __init__(self): + super().__init__() + self.compose_files = [settings.demo.compose] + self.compose_service_name = settings.demo.service + + +@VirtualEnvCompute.register(Demo2) +class Demo2_virtualenv(VirtualEnvService, Demo2): + pass diff --git a/terra/tests/demo/tasks.py b/terra/tests/demo/tasks.py new file mode 100644 index 00000000..e38b650d --- /dev/null +++ b/terra/tests/demo/tasks.py @@ -0,0 +1,11 @@ +from terra.task import shared_task + +# Terra core +from terra.logger import getLogger +logger = getLogger(__name__) + + +@shared_task +def demo2(self, x, y): + logger.critical(f"Task: {x} {y}") + return x * y diff --git a/terra/tests/demo/workflows.py b/terra/tests/demo/workflows.py new file mode 100644 index 00000000..61fe1081 --- /dev/null +++ b/terra/tests/demo/workflows.py @@ -0,0 +1,12 @@ +from terra.compute import compute + +from terra.logger import getLogger +logger = getLogger(__name__) + + +class DemoWorkflow: + def demonate(self): + logger.critical('Starting demo workflow') + compute.run('terra.tests.demo.services.Demo1') + compute.run('terra.tests.demo.services.Demo2') + logger.critical('Ran demo workflow') diff --git a/terra/tests/test_compute_base.py b/terra/tests/test_compute_base.py index 2904e3ab..67bfa0f9 100644 --- a/terra/tests/test_compute_base.py +++ b/terra/tests/test_compute_base.py @@ -2,32 +2,18 @@ from unittest import mock from terra import settings -from terra.compute import base -from .utils import TestCase +import terra.compute.base +from .utils import ( + TestCase, TestSettingsConfiguredCase, TestSettingsUnconfiguredCase +) -# Registration test -class Foo: - class TestService(base.BaseService): - pass - - -class TestService_base(Foo.TestService, base.BaseService): - pass - - -class TestServiceBase(TestCase): - def setUp(self): - # I want to be able to use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) - super().setUp() - settings.configure({}) - +class TestServiceBase(TestSettingsConfiguredCase): # Simulate external env var @mock.patch.dict(os.environ, {'FOO': "BAR"}) def test_env(self): # Test that a service inherits the environment correctly - service = base.BaseService() + service = terra.compute.base.BaseService() # App specific env var service.env['BAR'] = 'foo' # Make sure both show up @@ -37,40 +23,37 @@ def test_env(self): self.assertNotIn("BAR", os.environ) def test_add_volumes(self): - service = base.BaseService() + service = terra.compute.base.BaseService() # Add a volumes service.add_volume("/local", "/remote") # Make sure it's in the list self.assertIn(("/local", "/remote"), service.volumes) - # Unconfigure settings - @mock.patch.object(settings, '_wrapped', None) - def test_volumes_and_configuration_map(self): - # Add a volumes - service = base.BaseService() - service.add_volume("/local", "/remote") + def test_registry(self): + with mock.patch.dict(terra.compute.base.services, clear=True): + # Registration test + class Foo: + class TestService(terra.compute.base.BaseService): + pass - # Test configuration_map - settings.configure({}) - # Make sure the volume is in the map - self.assertEqual([("/local", "/remote")], - base.BaseCompute().configuration_map(service)) + class TestService_base(Foo.TestService, terra.compute.base.BaseService): + pass - @mock.patch.dict(base.services, clear=True) - def test_registry(self): - # Register a class class, just for fun - base.BaseCompute.register(Foo.TestService)(TestService_base) + # Register a class class, just for fun + terra.compute.base.BaseCompute.register(Foo.TestService)( + TestService_base) - self.assertIn(Foo.TestService.__module__ + '.Foo.TestService', - base.services) + self.assertIn(Foo.TestService.__module__ + '.' + + Foo.TestService.__qualname__, + terra.compute.base.services) - with self.assertRaises(base.AlreadyRegisteredException, - msg='Compute command "car" does not have a service ' - 'implementation "car_service"'): - base.BaseCompute.register(Foo.TestService)(lambda x: 1) + with self.assertRaises(terra.compute.base.AlreadyRegisteredException, + msg='Compute command "car" does not have a ' + 'service implementation "car_service"'): + terra.compute.base.BaseCompute.register(Foo.TestService)(lambda x: 1) def test_getattr(self): - class Foo(base.BaseCompute): + class Foo(terra.compute.base.BaseCompute): def bar_service(self): pass @@ -80,11 +63,25 @@ def bar_service(self): foo.car +class TestServiceBaseUnconfigured(TestSettingsUnconfiguredCase): + def test_volumes_and_configuration_map(self): + # Add a volumes + service = terra.compute.base.BaseService() + service.add_volume("/local", "/remote") + + # Test configuration_map + settings.configure({}) + # Make sure the volume is in the map + self.assertEqual( + [("/local", "/remote")], + terra.compute.base.BaseCompute().configuration_map(service)) + + class TestUnitTests(TestCase): def last_test_registered_services(self): self.assertFalse( - base.services, - msg="If you are seting this, one of the other unit tests has " + terra.compute.base.services, + msg="If you are seeing this, one of the other unit tests has " "registered a terra service. This side effect should be " "prevented by mocking out the terra.compute.base.services dict. " "Otherwise unit tests can interfere with each other.") diff --git a/terra/tests/test_compute_container.py b/terra/tests/test_compute_container.py index 003c8ffc..fd51ca36 100644 --- a/terra/tests/test_compute_container.py +++ b/terra/tests/test_compute_container.py @@ -1,14 +1,13 @@ import os import ntpath import json -import tempfile from unittest import mock, skipIf from terra import settings from terra.executor.utils import Executor from terra.compute import base import terra.compute.container -from vsi.test.utils import TestCase, NamedTemporaryFileFactory +from .utils import TestNamedTemporaryFileCase, TestSettingsUnconfiguredCase class SomeService(terra.compute.container.ContainerService): @@ -30,11 +29,9 @@ def mock_map_lcow(self, *args, **kwargs): return [('/c/foo', '/bar')] -class TestComputeContainerCase(TestCase): +class TestComputeContainerCase(TestSettingsUnconfiguredCase): def setUp(self): self.temp_dir - # Use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) # This will resets the _connection to an uninitialized state self.patches.append( mock.patch.object(terra.compute.utils.ComputeHandler, @@ -51,12 +48,11 @@ def setUp(self): 'test_dir': '/opt/projects/terra/terra_dsm/external/terra/foo'}) -class TestContainerService(TestComputeContainerCase): +class TestContainerService(TestComputeContainerCase, + TestNamedTemporaryFileCase): # Test the flushing configuration to json for a container mechanism def setUp(self): - self.patches.append(mock.patch.object(tempfile, 'NamedTemporaryFile', - NamedTemporaryFileFactory(self))) self.patches.append(mock.patch.object(json, 'dump', self.json_dump)) # self.common calls service.pre_run which trigger Executor self.patches.append(mock.patch.dict(Executor.__dict__)) diff --git a/terra/tests/test_compute_docker.py b/terra/tests/test_compute_docker.py index ed216c20..8cf23351 100644 --- a/terra/tests/test_compute_docker.py +++ b/terra/tests/test_compute_docker.py @@ -10,13 +10,11 @@ from terra.compute import docker import terra.compute.utils -from .utils import TestCase +from .utils import TestSettingsUnconfiguredCase -class TestComputeDockerCase(TestCase): +class TestComputeDockerCase(TestSettingsUnconfiguredCase): def setUp(self): - # Use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) # This will resets the _connection to an uninitialized state self.patches.append( mock.patch.object(terra.compute.utils.ComputeHandler, @@ -102,7 +100,7 @@ def test_run(self): compute.run(MockJustService()) # Run a docker service self.assertEqual(('--wrap', 'Just-docker-compose', - '-f', 'file1', 'run', 'launch', 'ls'), + '-f', 'file1', 'run', '-T', 'launch', 'ls'), self.just_args) self.assertEqual({'justfile': None, 'env': {'BAR': 'FOO'}}, self.just_kwargs) diff --git a/terra/tests/test_compute_dummy.py b/terra/tests/test_compute_dummy.py index c9fd616c..19f5e411 100644 --- a/terra/tests/test_compute_dummy.py +++ b/terra/tests/test_compute_dummy.py @@ -6,7 +6,7 @@ from terra.compute import dummy import terra.compute.utils -from .utils import TestCase +from .utils import TestSettingsUnconfiguredCase # Test Dummy Definition @@ -31,10 +31,8 @@ def __init__(self): self.d = 44 -class TestComputeDummyCase(TestCase): +class TestComputeDummyCase(TestSettingsUnconfiguredCase): def setUp(self): - # Use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) # Use registry self.patches.append(mock.patch.dict(base.services, clear=True)) # Use compute diff --git a/terra/tests/test_compute_singularity.py b/terra/tests/test_compute_singularity.py index 44f57920..e287fe30 100644 --- a/terra/tests/test_compute_singularity.py +++ b/terra/tests/test_compute_singularity.py @@ -6,14 +6,12 @@ from terra.compute import singularity import terra.compute.utils -from .utils import TestCase +from .utils import TestSettingsUnconfiguredCase -class TestComputeSingularityCase(TestCase): +class TestComputeSingularityCase(TestSettingsUnconfiguredCase): def setUp(self): - # Use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) - # This will resets the _connection to an uninitialized state + # This will reset the _connection to an uninitialized state self.patches.append( mock.patch.object(terra.compute.utils.ComputeHandler, '_connection', diff --git a/terra/tests/test_compute_utils.py b/terra/tests/test_compute_utils.py index 09d86f28..9cf8e49f 100644 --- a/terra/tests/test_compute_utils.py +++ b/terra/tests/test_compute_utils.py @@ -3,7 +3,7 @@ import warnings from terra import settings -from .utils import TestCase +from .utils import TestSettingsUnconfiguredCase import terra.compute.utils as utils import terra.compute.dummy import terra.compute.docker @@ -34,7 +34,7 @@ class Service2_test: # I am purposefully showing multiple ways to mock _wrapped for demonstration # purposes -class TestComputeUtilsCase(TestCase): +class TestComputeUtilsCase(TestSettingsUnconfiguredCase): def setUp(self): # Use setting self.patches.append(mock.patch.object(settings, '_wrapped', None)) @@ -129,12 +129,12 @@ def mock_popen(*args, **kwargs): class TestBaseJust(TestComputeUtilsCase): def setUp(self): - self.patches.append(mock.patch.object(utils, 'Popen', mock_popen)) - super().setUp() - # Make a copy self.original_env = os.environ.copy() + self.patches.append(mock.patch.object(utils, 'Popen', mock_popen)) + super().setUp() + def tearDown(self): super().tearDown() # Make sure nothing inadvertently changed environ @@ -176,7 +176,7 @@ def test_just_kwargs(self): def test_logging_code(self): # Test the debug1 diffdict log output - with self.assertLogs(utils.__name__, level="DEBUG1") as cm: + with self.assertLogs(utils.__name__, level="DEBUG4") as cm: env = os.environ.copy() env.pop('PATH') env['FOO'] = 'BAR' diff --git a/terra/tests/test_compute_virtualenv.py b/terra/tests/test_compute_virtualenv.py index 36cc17e2..fff0d54e 100644 --- a/terra/tests/test_compute_virtualenv.py +++ b/terra/tests/test_compute_virtualenv.py @@ -7,7 +7,7 @@ from terra.compute import virtualenv import terra.compute.utils -from .utils import TestCase +from .utils import TestSettingsUnconfiguredCase class MockVirtualEnvService(virtualenv.Service): @@ -17,10 +17,8 @@ def __init__(self): self.env["BAR"] = "FOO" -class TestVirtualEnv(TestCase): +class TestVirtualEnv(TestSettingsUnconfiguredCase): def setUp(self): - # Use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) # self.run trigger Executor self.patches.append(mock.patch.dict(Executor.__dict__)) # This will resets the _connection to an uninitialized state @@ -37,7 +35,8 @@ def setUp(self): # patches.append(mock.patch.dict(base.services, clear=True)) super().setUp() settings.configure({ - 'compute': {'arch': 'virtualenv'}, + 'compute': {'arch': 'virtualenv', + 'virtualenv_dir': None}, 'processing_dir': self.temp_dir.name, 'test_dir': '/opt/projects/terra/terra_dsm/external/terra/foo'}) @@ -98,7 +97,7 @@ def test_logging_code(self): service = MockVirtualEnvService() # Test logging code - with self.assertLogs(virtualenv.__name__, level="DEBUG1") as cm: + with self.assertLogs(virtualenv.__name__, level="DEBUG4") as cm: os.environ['BAR'] = 'FOO' env = os.environ.copy() env.pop('BAR') @@ -110,10 +109,13 @@ def test_logging_code(self): env_lines = [x for x in cm.output if "Environment Modification:" in x][0] env_lines = env_lines.split('\n') - self.assertEqual(len(env_lines), 4) + self.assertEqual(len(env_lines), 5) self.assertTrue(any(o.startswith('- BAR:') for o in env_lines)) self.assertTrue(any(o.startswith('+ FOO:') for o in env_lines)) # Added by Terra self.assertTrue(any(o.startswith('+ TERRA_SETTINGS_FILE:') for o in env_lines)) + # Added by TestSettingsUnconfiguredCase + self.assertTrue(any(o.startswith('- TERRA_SETTINGS_FILE:') + for o in env_lines)) diff --git a/terra/tests/test_core_settings.py b/terra/tests/test_core_settings.py index cd09d04b..b8fcaf65 100644 --- a/terra/tests/test_core_settings.py +++ b/terra/tests/test_core_settings.py @@ -1,20 +1,19 @@ import os import sys import json +import time from unittest import mock from tempfile import TemporaryDirectory, NamedTemporaryFile import tempfile from envcontext import EnvironmentContext -from .utils import TestCase -from .test_logger import TestLoggerCase - +from .utils import TestCase, TestLoggerCase, TestLoggerConfigureCase from terra import settings from terra.core.exceptions import ImproperlyConfigured from terra.core.settings import ( ObjectDict, settings_property, Settings, LazyObject, TerraJSONEncoder, - ExpandedString + ExpandedString, LazySettings ) @@ -221,15 +220,11 @@ def test_dir(self): self.assertIn('c', dir(d.b[0][0])) -class TestSettings(TestCase): - def setUp(self): - # Useful for tests that set this - self.patches.append(mock.patch.dict(os.environ, - {'TERRA_SETTINGS_FILE': ""})) - # Use settings - self.patches.append(mock.patch.object(settings, '_wrapped', None)) - super().setUp() - +class TestSettings(TestLoggerCase): + # TestLoggerCase sets TERRA_SETTINGS_FILE to a valid file, in order to get + # an ImproperlyConfigured Exception here, TERRA_SETTINGS_FILE must be set to + # not a file, such as the empty string. + @mock.patch.dict(os.environ, TERRA_SETTINGS_FILE='') def test_unconfigured(self): with self.assertRaises(ImproperlyConfigured): settings.foo @@ -650,13 +645,16 @@ def last_test_settings(self): "Otherwise unit tests can interfere with each other") -class TestCircularDependency(TestLoggerCase): +class TestSettingsClass(TestCase): + def last_test_settings_class(self): + self.assertEqual(type(settings), LazySettings) + + +class TestCircularDependency(TestLoggerConfigureCase): # I don't want this unloading terra to interfere with other last_tests, as # this would reset modules to their initial state, giving false positives to # corruption checks. So mock it @mock.patch.dict(sys.modules) - # Needed to make circular imports - @mock.patch.dict(os.environ, TERRA_UNITTEST='0') def last_test_import_settings(self): # Unload terra for module in list(sys.modules.keys()): @@ -666,6 +664,17 @@ def last_test_import_settings(self): import terra.core.settings terra.core.settings.settings._setup() + # Shut down TCP server + terra.logger._logs.tcp_logging_server.abort = True + + for x in range(1000): + if not terra.logger._logs.tcp_logging_server.ready: + break + time.sleep(0.001) + else: + self.assertFalse(terra.logger._logs.tcp_logging_server.ready, + 'TCP Server did not shut down within a second') + # Picky windows import terra.logger - terra.logger._logs.log_file.close() + terra.logger._logs._log_file.close() diff --git a/terra/tests/test_executor_celery.py b/terra/tests/test_executor_celery.py index 1f9832c3..dda113ee 100644 --- a/terra/tests/test_executor_celery.py +++ b/terra/tests/test_executor_celery.py @@ -40,7 +40,7 @@ def test_redis_passwordfile(self): @mock.patch.dict(os.environ, TERRA_CELERY_INCLUDE='["foo", "bar"]') def test_include(self): import terra.executor.celery.celeryconfig as cc - self.assertEqual(cc.include, ['foo', 'bar']) + self.assertEqual(cc.include, ['foo', 'bar', 'terra.tests.demo.tasks']) class MockAsyncResult: @@ -92,9 +92,9 @@ def wait_for_state(self, future, state): time.sleep(0.001) if future._state == state: break - if x == 99: - raise TimeoutError(f'Took longer than 100us for a 1us update for ' - f'{future._state} to become {state}') + else: + raise TimeoutError(f'Took longer than 100ms for a 1ms update for ' + f'{future._state} to become {state}') def test_simple(self): test = test_factory() @@ -158,16 +158,18 @@ def test_update_futures_finish(self): time.sleep(0.001) if not len(self.executor._futures): break - if x == 99: - raise TimeoutError('Took longer than 100us for a 1us update') + else: + raise TimeoutError('Took longer than 100ms for a 1ms update') def test_update_futures_revoked(self): test = test_factory() future = self.executor.submit(test) self.assertFalse(future.cancelled()) - future._ar.state = 'REVOKED' - self.wait_for_state(future, 'CANCELLED_AND_NOTIFIED') + with self.assertLogs() as cm: + future._ar.state = 'REVOKED' + self.wait_for_state(future, 'CANCELLED_AND_NOTIFIED') + self.assertRegex(str(cm.output), 'WARNING.*Celery task.*cancelled') self.assertTrue(future.cancelled()) def test_update_futures_success(self): @@ -184,9 +186,11 @@ def test_update_futures_failure(self): future = self.executor.submit(test) self.assertIsNone(future._result) - future._ar.state = 'FAILURE' - future._ar.result = TypeError('On no') - self.wait_for_state(future, 'FINISHED') + with self.assertLogs() as cm: + future._ar.state = 'FAILURE' + future._ar.result = TypeError('On no') + self.wait_for_state(future, 'FINISHED') + self.assertRegex(str(cm.output), 'ERROR.*Celery task.*resolved with error') def test_shutdown(self): test = test_factory() diff --git a/terra/tests/test_executor_dummy.py b/terra/tests/test_executor_dummy.py index b97a0a4d..77c62c87 100644 --- a/terra/tests/test_executor_dummy.py +++ b/terra/tests/test_executor_dummy.py @@ -1,5 +1,5 @@ from terra.executor import dummy -from .utils import TestCase +from .utils import TestSettingsConfiguredCase def test1(x): @@ -10,7 +10,7 @@ def test2(x): return x + 13 -class TestExecutorDummy(TestCase): +class TestExecutorDummy(TestSettingsConfiguredCase): def setUp(self): super().setUp() self.executor = dummy.DummyExecutor() diff --git a/terra/tests/test_executor_utils.py b/terra/tests/test_executor_utils.py index a818c969..baa7427c 100644 --- a/terra/tests/test_executor_utils.py +++ b/terra/tests/test_executor_utils.py @@ -1,19 +1,17 @@ -from unittest import mock, SkipTest +from unittest import SkipTest import concurrent.futures from terra import settings -from .utils import TestCase +from .utils import ( + TestCase, TestExecutorCase, TestThreadPoolExecutorCase, + TestSettingsUnconfiguredCase +) from terra.executor.utils import ExecutorHandler, Executor from terra.executor.dummy import DummyExecutor from terra.executor.sync import SyncExecutor -class TestExecutorHandler(TestCase): - def setUp(self): - self.patches.append(mock.patch.object(settings, '_wrapped', None)) - self.patches.append(mock.patch.dict(Executor.__dict__)) - super().setUp() - +class TestExecutorHandler(TestExecutorCase, TestSettingsUnconfiguredCase): def test_executor_handler(self): settings.configure({'executor': {'type': 'DummyExecutor'}}) @@ -38,10 +36,11 @@ def test_executor_name_sync(self): settings.configure({'executor': {'type': 'SyncExecutor'}}) self.assertIsInstance(Executor._connection(), SyncExecutor) - def test_executor_name_thread(self): - settings.configure({'executor': {'type': 'ThreadPoolExecutor'}}) - self.assertIsInstance(Executor._connection(), - concurrent.futures.ThreadPoolExecutor) + # TODO: It takes more mocking to make this test pass now + # def test_executor_name_thread(self): + # settings.configure({'executor': {'type': 'ThreadPoolExecutor'}}) + # self.assertIsInstance(Executor._connection(), + # concurrent.futures.ThreadPoolExecutor) def test_executor_name_process(self): settings.configure({'executor': {'type': 'ProcessPoolExecutor'}}) @@ -60,7 +59,15 @@ def test_executor_name_celery(self): def test_executor_name_by_name(self): settings.configure( - {'executor': {'type': 'concurrent.futures.ThreadPoolExecutor'}}) + {'executor': {'type': 'concurrent.futures.ProcessPoolExecutor'}}) + self.assertIsInstance(Executor._connection(), + concurrent.futures.ProcessPoolExecutor) + + +class TestThreadExecutorHandler(TestThreadPoolExecutorCase, + TestSettingsUnconfiguredCase): + def test_executor_name_thread(self): + settings.configure({'executor': {'type': 'ThreadPoolExecutor'}}) self.assertIsInstance(Executor._connection(), concurrent.futures.ThreadPoolExecutor) diff --git a/terra/tests/test_logger.py b/terra/tests/test_logger.py index ac607b17..7c49b50b 100644 --- a/terra/tests/test_logger.py +++ b/terra/tests/test_logger.py @@ -2,18 +2,15 @@ import io import os import sys -import json import logging import uuid -import tempfile import platform import warnings from terra.core.exceptions import ImproperlyConfigured from terra import settings -from vsi.test.utils import TestCase, make_traceback, NamedTemporaryFileFactory +from .utils import TestCase, make_traceback, TestLoggerConfigureCase from terra import logger -from terra.core import signals class TestHandlerLoggingContext(TestCase): @@ -38,49 +35,7 @@ def test_handler_logging_context(self): self.assertIn(message2, str(handler_swap.buffer)) -class TestLoggerCase(TestCase): - def setUp(self): - self.original_system_hook = sys.excepthook - self.patches.append(mock.patch.object(settings, '_wrapped', None)) - self.patches.append(mock.patch.object(tempfile, 'NamedTemporaryFile', - NamedTemporaryFileFactory(self))) - settings_filename = os.path.join(self.temp_dir.name, 'config.json') - self.patches.append(mock.patch.dict(os.environ, - TERRA_SETTINGS_FILE=settings_filename)) - super().setUp() - - # Don't use settings.configure here, because I need to test out logging - # signals - config = {"processing_dir": self.temp_dir.name} - with open(settings_filename, 'w') as fid: - json.dump(config, fid) - - self._logs = logger._SetupTerraLogger() - - # # register post_configure with settings - signals.post_settings_configured.connect(self._logs.configure_logger) - - def tearDown(self): - # Remove all the logger handlers - sys.excepthook = self.original_system_hook - try: - self._logs.log_file.close() - except AttributeError: - pass - # Windows is pickier about deleting files - try: - if self._logs.tmp_file: - self._logs.tmp_file.close() - except AttributeError: - pass - self._logs.root_logger.handlers = [] - signals.post_settings_configured.disconnect(self._logs.configure_logger) - # Apparently this is unnecessary because signals use weak refs, that are - # auto removed on free, but I think it's still better to put this here. - super().tearDown() - - -class TestLogger(TestLoggerCase): +class TestLogger(TestLoggerConfigureCase): def test_setup_working(self): self.assertFalse(settings.configured) self.assertEqual(settings.processing_dir, self.temp_dir.name) @@ -93,10 +48,11 @@ def test_double_configure(self): self._logs.configure_logger(None) def test_temp_file_cleanup(self): - self.assertExist(self.temp_log_file) + tmp_file = self._logs.tmp_file.name + self.assertExist(tmp_file) self.assertFalse(self._logs._configured) settings.processing_dir - self.assertNotExist(self.temp_log_file) + self.assertNotExist(tmp_file) self.assertTrue(self._logs._configured) def test_exception_hook_installed(self): @@ -113,11 +69,12 @@ def save_exec_info(exc_type, exc, tb): self.tb = tb sys.excepthook = save_exec_info self._logs.setup_logging_exception_hook() - with self.assertLogs() as cm: - # with self.assertRaises(ZeroDivisionError): - tb = make_traceback() - sys.excepthook(ZeroDivisionError, - ZeroDivisionError('division by almost zero'), tb) + with mock.patch('sys.stderr', new_callable=io.StringIO): + with self.assertLogs() as cm: + # with self.assertRaises(ZeroDivisionError): + tb = make_traceback() + sys.excepthook(ZeroDivisionError, + ZeroDivisionError('division by almost zero'), tb) self.assertIn('division by almost zero', str(cm.output)) # Test stack trace stuff in there @@ -134,13 +91,14 @@ def test_root_logger_setup(self): def test_logs_stderr(self): stderr_handler = [h for h in self._logs.root_logger.handlers if hasattr(h, 'stream') and h.stream == sys.stderr][0] - self.assertIs(self._logs.stderr_handler, stderr_handler) self.assertEqual(stderr_handler.level, logging.WARNING) + self.assertIs(self._logs.stderr_handler, stderr_handler) def test_logs_temp_file(self): temp_handler = [ h for h in self._logs.root_logger.handlers - if hasattr(h, 'stream') and h.stream.name == self.temp_log_file][0] + if hasattr(h, 'stream') + and h.stream.name == self._logs.tmp_file.name][0] # Test that log everything is set self.assertEqual(temp_handler.level, logger.NOTSET) self.assertEqual(self._logs.root_logger.level, logger.NOTSET) @@ -154,24 +112,32 @@ def test_formatter(self): # This doesn't get formatted # with self.assertLogs(__name__, logger.ERROR) as cm: # logger.getLogger(__name__).error('Hi') + + test_logger = logger.getLogger(f'{__name__}.test_formatter') record = logging.LogRecord(__name__, logger.ERROR, __file__, 0, "Hiya", (), None) + self.assertTrue(test_logger.filter(record)) + self.assertTrue(self._logs.stderr_handler.filter(record)) self.assertEqual(self._logs.stderr_handler.format(record), "foo bar Hiya") + @mock.patch('terra.logger.ColorFormatter.use_color', False) def test_hostname(self): test_logger = logger.getLogger(f'{__name__}.test_hostname') record = test_logger.makeRecord(__name__, logger.ERROR, __file__, 0, - "Hiya", (), None, - extra=logger.extra_logger_variables) - self.assertIn('(preconfig)', self._logs.stderr_handler.format(record)) + "Hiya", (), None) + self.assertTrue(test_logger.filter(record)) + self.assertTrue(self._logs.stderr_handler.filter(record)) + self.assertIn(f'({platform.node()}:preconfig)', + self._logs.stderr_handler.format(record)) settings._setup() record = test_logger.makeRecord(__name__, logger.ERROR, __file__, 0, - "Hiya", (), None, - extra=logger.extra_logger_variables) - self.assertIn(f'({platform.node()})', + "Hiya", (), None) + self.assertTrue(test_logger.filter(record)) + self.assertTrue(self._logs.stderr_handler.filter(record)) + self.assertIn(f'({platform.node()}:controller)', self._logs.stderr_handler.format(record)) # Test https://stackoverflow.com/q/19615876/4166604 @@ -299,7 +265,7 @@ def last_test_logger(self): self.assertFalse( root_logger.handlers, - msg="If you are seting this, one of the other unit tests has " + msg="If you are seeing this, one of the other unit tests has " "initialized the logger. This side effect should be " "prevented for you automatically. If you are seeing this, you " "have configured logging manually, and should make sure you " diff --git a/terra/tests/test_other.py b/terra/tests/test_other.py new file mode 100644 index 00000000..7ea5c15d --- /dev/null +++ b/terra/tests/test_other.py @@ -0,0 +1,9 @@ +import os + +from .utils import TestCase +from terra.tests import original_environ + + +class TestOtherThings(TestCase): + def last_test_environ_change(self): + self.assertEqual(os.environ, original_environ) diff --git a/terra/tests/test_signals.py b/terra/tests/test_signals.py index a7239e4d..b2bb7fb9 100644 --- a/terra/tests/test_signals.py +++ b/terra/tests/test_signals.py @@ -1,8 +1,8 @@ -from terra.core.signals import Signal, receiver, post_settings_configured -from .utils import TestCase +from terra.core.signals import Signal, receiver +from .utils import TestSignalCase -class TestSignals(TestCase): +class TestSignals(TestSignalCase): def signal_handle1(self, sender, **kwargs): self.assertEqual(sender, self.sender) self.kwargs = kwargs @@ -149,12 +149,16 @@ def decorated2(sender, **kwargs): self.assertEqual(self.count, 1.1) -class TestUnitTests(TestCase): - def last_test_signals(self): - for signal in [post_settings_configured]: - self.assertFalse( - signal.receivers, - msg="If you are seting this, one of the other unit tests has " - "left a signal connected. This side effect should be " - "prevented by disconnecting any functions you connected to a " - "signal.") +# This no longer matters, as signals are disabled in unitted tests now? +# class TestUnitTests(TestCase): +# def last_test_signals(self): +# for signal in [signals.post_settings_configured, +# signals.post_settings_context, +# signals.logger_configure, +# signals.logger_reconfigure]: +# self.assertFalse( +# signal.receivers, +# msg="If you are seeing this, one of the other unit tests has " +# "left a signal connected. This side effect should " +# "be prevented by disconnecting any functions you connected to " +# "a signal.") diff --git a/terra/tests/test_utils_workflow.py b/terra/tests/test_utils_workflow.py index 93b7b815..735163e3 100644 --- a/terra/tests/test_utils_workflow.py +++ b/terra/tests/test_utils_workflow.py @@ -1,27 +1,18 @@ -from unittest import mock import re -import os import json from terra.utils.workflow import resumable, AlreadyRunException from terra import settings from terra.logger import DEBUG1 -from .utils import TestCase +from .utils import TestSettingsUnconfiguredCase class Klass: pass -class TestResumable(TestCase): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.patches = [] - +class TestResumable(TestSettingsUnconfiguredCase): def setUp(self): - self.patches.append(mock.patch.dict(os.environ, - {'TERRA_SETTINGS_FILE': ""})) - self.patches.append(mock.patch.object(settings, '_wrapped', None)) super().setUp() settings.configure({'processing_dir': self.temp_dir.name}) diff --git a/terra/tests/utils.py b/terra/tests/utils.py index c4f5c5d3..fa86591f 100644 --- a/terra/tests/utils.py +++ b/terra/tests/utils.py @@ -1,5 +1,137 @@ +import os +import sys +import json +from unittest import mock + from vsi.test.utils import ( - TestCase, make_traceback + TestCase, make_traceback, TestNamedTemporaryFileCase ) -__all__ = ["TestCase", "make_traceback"] +from terra import settings + + +__all__ = ["TestCase", "make_traceback", "TestNamedTemporaryFileCase", + "TestSettingsUnconfiguredCase", "TestSettingsConfiguredCase", + "TestComputeCase", "TestExecutorCase", "TestSignalCase", + "TestLoggerConfigureCase"] + + +class TestSettingsUnconfiguredCase(TestCase): + def __init__(self, *args, **kwargs): + self.settings_filename = '' + super().__init__(*args, **kwargs) + + def setUp(self): + # Useful for tests that set this + self.patches.append(mock.patch.dict( + os.environ, + {'TERRA_SETTINGS_FILE': self.settings_filename})) + # Use settings + self.patches.append(mock.patch.object(settings, '_wrapped', None)) + super().setUp() + + +class TestSettingsConfiguredCase(TestSettingsUnconfiguredCase): + def setUp(self): + super().setUp() + settings.configure({}) + + +class TestLoggerCase(TestSettingsUnconfiguredCase, TestNamedTemporaryFileCase): + def setUp(self): + self.original_system_hook = sys.excepthook + attrs = {'serve_until_stopped.return_value': True, 'ready': True} + MockLogRecordSocketReceiver = mock.Mock(**attrs) + self.patches.append(mock.patch('terra.logger.LogRecordSocketReceiver', + MockLogRecordSocketReceiver)) + self.patches.append(mock.patch( + 'terra.compute.base.LogRecordSocketReceiver', + MockLogRecordSocketReceiver)) + # Special customization of TestSettingsUnconfiguredCase + self.settings_filename = os.path.join(self.temp_dir.name, 'config.json') + config = {"processing_dir": self.temp_dir.name} + with open(self.settings_filename, 'w') as fid: + json.dump(config, fid) + + super().setUp() + + # Run _setup_terra_logger AFTER the patches have been applied, or else the + # temp files will be in /tmp, not self.temp_dir, and the terra_initial_tmp_ + # files won't get auto cleaned up + import terra.logger + self._logs = terra.logger._setup_terra_logger() + + def tearDown(self): + sys.excepthook = self.original_system_hook + + try: + self._logs.log_file.close() + except AttributeError: + pass + # Windows is pickier about deleting files + try: + if self._logs.tmp_file: + self._logs.tmp_file.close() + except AttributeError: + pass + self._logs.root_logger.handlers = [] + import terra.core.signals + terra.core.signals.post_settings_configured.disconnect( + self._logs.configure_logger) + terra.core.signals.post_settings_context.disconnect( + self._logs.reconfigure_logger) + super().tearDown() + + +class TestComputeCase(TestCase): + def setUp(self): + import terra.compute.utils + self.patches.append(mock.patch.dict(terra.compute.utils.compute.__dict__)) + super().setUp() + + +class TestExecutorCase(TestCase): + def setUp(self): + import terra.executor.utils + self.patches.append(mock.patch.dict( + terra.executor.utils.Executor.__dict__)) + super().setUp() + + +class TestThreadPoolExecutorCase(TestExecutorCase): + ''' + Special care is needed for ThreadPoolExecutor because it downcasts settings + ''' + + def setUp(self): + self.settings_class_patch = mock.patch.object( + settings, '__class__', type(settings), create=False) + super().setUp() + self.settings_class_patch.start() + # This mock behavior needs to be modified, because setting __class__ is + # unlike normal attributes, it doesn't get overwritten in __dict__, so + # setting is_local prevents delattr being called on __class__, which would + # be the wrong thing to do. + self.settings_class_patch.is_local = True + + # This class does not mock or clean up __wrapped or __tls, but they do not + # introduce sideeffects. + + def tearDown(self): + # This has to be stopped before the rest, or else a setattr error occurs. + self.settings_class_patch.stop() + super().tearDown() + + +class TestSignalCase(TestCase): + def setUp(self): + self.patches.append(mock.patch.dict(os.environ, TERRA_UNITTEST='0')) + super().setUp() + + +# Enable signals. Most logging tests require configure logger to actually +# be called. LogRecordSocketReceiver is mocked out, so no lasting side +# effects should occur. +class TestLoggerConfigureCase(TestLoggerCase, TestSignalCase, + TestComputeCase, TestExecutorCase): + pass diff --git a/terra/workflow.py b/terra/workflow.py index ae4bb9cd..778e3fea 100644 --- a/terra/workflow.py +++ b/terra/workflow.py @@ -21,6 +21,7 @@ class PipelineWorkflow: def __init__(self): self.pipeline = list() + super().__init__() # locate index of service name in workflow pipeline def service_index(self, service_name=None, default_index=0):