Skip to content

Commit

Permalink
Merge pull request #160 from tgen/develop
Browse files Browse the repository at this point in the history
v1.7.3 release
  • Loading branch information
PedalheadPHX committed Sep 19, 2023
2 parents e8238e3 + abd7397 commit 0823bae
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 119 deletions.
16 changes: 16 additions & 0 deletions docs/releases/1.7.3.md
@@ -0,0 +1,16 @@
# Jetstream v1.7.3 Release Notes

## Major changes
- For slurm backends, the `sacct` pinginess has been reduced, and we request less information instead of `--all`, this reduces load on the slurmdbd
- The slurm_singularity backend can now submit jobs without a container definition
- Added an `md5` and `assignbin` filter for using in templates
- Resolves https://github.com/tgen/jetstream/issues/101

## Bug fixes
- Not all asyncio.Event(loop)'s were fixed in previous commits, this should fix other cases impeding us from using python 3.10 https://github.com/tgen/jetstream/issues/144

## Minor changes
- Adjusted handling of gpu jobs for the `slurm_singularity` backend, we now set `SINGULARITYENV_CUDA_VISIBLE_DEVICES`

## Ease of use updates
- A bash completion script is available under `extras/completions/jetstream.bash`, this is still in development, but can be used as a template for other users. This can be installed under `~/.bash_completion` or to your preferred user completion script dir, e.g. `~/.local/share/bash-completion/completions/jetstream.bash`
18 changes: 13 additions & 5 deletions docs/templates.md
Expand Up @@ -238,21 +238,21 @@ several other tools have been added with Jetstream and can be used inside templa
### Globals

- `raise`: Raise an error while rendering the template
Example: `{{ if foo < 42 }}{{ raise('foo should be at least 42') }}{{ endif }}`
- Example: `{{ if foo < 42 }}{{ raise('foo should be at least 42') }}{{ endif }}`

- `log`: Log messages to the Jetstream logger while template renders
Example: `{{ log('Foo is {}'.format(foo), level='CRITICAL') }}`
- Example: `{{ log('Foo is {}'.format(foo), level='CRITICAL') }}`

- `env`: Returns environment variable value
Example: `echo foo is {{ getenv('FOO') }}`
- Example: `echo foo is {{ getenv('FOO') }}`

- `getenv`: Returns environment variable value, this will return None if value
is not set whereas `env` will raise an error. A different fallback value can
be given as the second argument.
Example: `echo foo is {{ getenv('FOO', None) }}`
- Example: `echo foo is {{ getenv('FOO', None) }}`

- `setenv`: Sets an environment variable when the template is rendered
Example: `{{ setenv('FOO', '42') }}`
- Example: `{{ setenv('FOO', '42') }}`


### Filters
Expand All @@ -267,6 +267,14 @@ several other tools have been added with Jetstream and can be used inside templa

- `sha256`: Returns sha256 hexdigest for a string

- `md5`: Returns md5sum of a file defined with a path
- Example: `{{ required_scripts.some_script.path | md5 }}`

- `assignbin`: Returns the 0-based bin the value falls in.
- The default bin edges are 0 to infinity, meaning this will return 0 if the bin edges are not defined.
- Returns -1 if the input value is out of bounds.
- Any value landing on an edge will floor to lower bin.
- This also accepts a list of labels such that: `{{ assignbin(5,[0,2,4,6],['low','med','high']) }}` returns 'high'. Moreover, `{{ assignbin(4,[0,2,4,6],['low','med','high']) }}` would return 'med'.

# Template rendering data

Expand Down
49 changes: 49 additions & 0 deletions extras/completions/jetstream.bash
@@ -0,0 +1,49 @@
#!/bin/bash

_jetstream_compgen_filenames() {
local cur="$1"

# Files, excluding directories:
grep -v -F -f <(compgen -d -P ^ -S '$' -- "$cur") \
<(compgen -f -P ^ -S '$' -- "$cur") |
sed -e 's/^\^//' -e 's/\$$/ /'

# Directories:
compgen -d -S / -- "$cur"
}

_jetstream() {
local cur prev opts

cur=${COMP_WORDS[COMP_CWORD]}
prev=${COMP_WORDS[COMP_CWORD-1]}
opts="--mash -m --backend --build -b --render -r --status -s --output -o --verbose -v --logging -l --no-logs"

if [[ ${prev} == --backend ]]; then
COMPREPLY=( $(compgen -W "slurm slurm_singularity" -- ${cur}) )
return 0
fi

if [[ ${cur} == -* ]]; then
COMPREPLY=( $(compgen -W "${opts}" -- ${cur}) )
return 0
fi

case ${COMP_CWORD} in
1)
COMPREPLY=($(compgen -W "init run mash build render tasks project pipelines settings" -- ${cur}))
;;
2)
case ${prev} in
pipelines)
COMPREPLY=($(compgen -W "$(head ${HOME}/jetstream_pipelines/*/pipeline.yaml | grep '^ name:\|^ version:' | awk '{ print $2 }' | xargs -n2 | sed 's/ /@/g')" -- ${cur}))
;;
esac
;;
*)
COMPREPLY=($(_jetstream_compgen_filenames "$cur"))
;;
esac
}

complete -o nospace -F _jetstream jetstream
2 changes: 1 addition & 1 deletion jetstream/__init__.py
Expand Up @@ -9,7 +9,7 @@
__author__ = 'Ryan Richholt'
__maintainer__ = 'Bryce Turner'
__email__ = 'bturner@tgen.org'
__version__ = '1.7.2'
__version__ = '1.7.3'


# Configure parallel library dependencies (Used by numpy)
Expand Down
22 changes: 13 additions & 9 deletions jetstream/backends/slurm.py
Expand Up @@ -3,6 +3,7 @@
import json
import logging
import os
import sys
import re
import shlex
import shutil
Expand Down Expand Up @@ -40,7 +41,7 @@ def __init__(
sbatch_args=None,
sbatch_delay=0.1,
sbatch_executable=None,
sacct_fields=('JobID', 'Elapsed'),
sacct_fields=('JobID', 'Elapsed', 'State', 'ExitCode'),
job_monitor_max_fails=5):
"""SlurmBackend submits tasks as jobs to a Slurm batch cluster
Expand Down Expand Up @@ -128,7 +129,7 @@ async def job_monitor(self):
self._bump_next_update()
continue
try:
sacct_data = sacct(*self.jobs, return_data=True)
sacct_data = sacct(*self.jobs, sacct_fields=self.sacct_fields, return_data=True)
except Exception:
if failures <= 0:
raise
Expand Down Expand Up @@ -228,7 +229,10 @@ async def spawn(self, task):
self._bump_next_update()
log.info(f'SlurmBackend submitted({job.jid}): {task.name}')

job.event = asyncio.Event(loop=self.runner.loop)
if sys.version_info > (3, 8):
job.event = asyncio.Event()
else:
job.event = asyncio.Event(loop=self.runner.loop)
self.jobs[job.jid] = job

await job.event.wait()
Expand Down Expand Up @@ -333,18 +337,18 @@ def is_ok(self):
return False


def wait(*job_ids, update_frequency=10):
def wait(*job_ids, sacct_fields=None, update_frequency=10):
"""Wait for one or more slurm batch jobs to complete"""
while 1:
jobs = sacct(*job_ids)
jobs = sacct(*job_ids, sacct_fields=sacct_fields)

if all([j.is_done() for j in jobs]):
return
else:
time.sleep(update_frequency)


def sacct(*job_ids, chunk_size=1000, strict=False, return_data=False):
def sacct(*job_ids, sacct_fields=None, chunk_size=1000, strict=False, return_data=False):
"""Query sacct for job records.
Jobs are returned for each job id, but steps will be combined under a
Expand All @@ -361,7 +365,7 @@ def sacct(*job_ids, chunk_size=1000, strict=False, return_data=False):
data = {}
for i in range(0, len(job_ids), chunk_size):
chunk = job_ids[i: i + chunk_size]
sacct_output = launch_sacct(*chunk)
sacct_output = launch_sacct(*chunk, sacct_fields=sacct_fields)
data.update(sacct_output)

log.debug('Status updates for {} jobs'.format(len(data)))
Expand All @@ -381,7 +385,7 @@ def sacct(*job_ids, chunk_size=1000, strict=False, return_data=False):
return jobs


def launch_sacct(*job_ids, delimiter=SLURM_SACCT_DELIMITER, raw=False):
def launch_sacct(*job_ids, sacct_fields=None, delimiter=SLURM_SACCT_DELIMITER, raw=False):
"""Launch sacct command and return stdout data
This function returns raw query results, sacct() will be more
Expand All @@ -393,7 +397,7 @@ def launch_sacct(*job_ids, delimiter=SLURM_SACCT_DELIMITER, raw=False):
:return: Dict or Bytes
"""
log.debug('Sacct request for {} jobs...'.format(len(job_ids)))
args = ['sacct', '-P', '--format', 'all', '--delimiter={}'.format(delimiter)]
args = ['sacct', '-P', '--format', '{}'.format(','.join(sacct_fields)), '--delimiter={}'.format(delimiter)]

for jid in job_ids:
args.extend(['-j', str(jid)])
Expand Down

0 comments on commit 0823bae

Please sign in to comment.