Skip to content

Commit

Permalink
Merge pull request #172 from tgen/develop
Browse files Browse the repository at this point in the history
Jetstream v1.7.4
  • Loading branch information
PedalheadPHX committed Nov 28, 2023
2 parents 0823bae + b0a34e2 commit f9594f7
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 94 deletions.
15 changes: 15 additions & 0 deletions docs/releases/1.7.4.md
@@ -0,0 +1,15 @@
# Jetstream v1.7.4 Release Notes

## Major changes
- Improved pipeline version parsing to use PEP440 style versioning - `development` and `latest` have been added as aliases to the latest development and stable release respectively.
- Pipelines and their version now have a defined comparison format, e.g. defining `__lt__` and `__eq__` functions, this allows for a sorted pipeline list.
- Improved handling support of JS_PIPELINE_PATH both within template via the `expand_vars` function and within the slurm_singularity backend.

## Bug fixes
- The slurm_singularity backend has improved search functionality for finding cached images, previously only found cached images if the digest was explicitly defined for the task.
- Avoid erroneously attempting to bind $JS_PIPELINE_PATH if it has not been set, e.g. if the user is simply running `jetstream run` without any pipeline context.

## Minor changes
- Linting related adjustments to the slurm_singularity.py backend
- Limiting the networkx version range to exclude the 3.0 release for now

2 changes: 1 addition & 1 deletion jetstream/__init__.py
Expand Up @@ -9,7 +9,7 @@
__author__ = 'Ryan Richholt'
__maintainer__ = 'Bryce Turner'
__email__ = 'bturner@tgen.org'
__version__ = '1.7.3'
__version__ = '1.7.4'


# Configure parallel library dependencies (Used by numpy)
Expand Down
165 changes: 85 additions & 80 deletions jetstream/backends/slurm_singularity.py
Expand Up @@ -28,6 +28,7 @@
SLURM_PRESETS = settings['slurm_presets'].get(dict)
RUNNER_PRESETS = settings['singularity_presets'].get(dict)


class SlurmSingularityBackend(BaseBackend):
"""SlurmSingularityBackend will spawn tasks using a Slurm batch scheduler.
Expand All @@ -38,16 +39,16 @@ class SlurmSingularityBackend(BaseBackend):
respects = ('cmd', 'stdin', 'stdout', 'stderr', 'cpus', 'mem', 'walltime',
'slurm_args')

def __init__(self,
sacct_frequency=60,
def __init__(self,
sacct_frequency=60,
sbatch_delay=0.5,
sbatch_executable=None,
sbatch_executable=None,
sbatch_account=None,
sacct_fields=('JobID', 'Elapsed', 'State', 'ExitCode'),
job_monitor_max_fails=5,
job_monitor_max_fails=5,
max_jobs=1024,
singularity_executable=None,
input_file_validation=False ):
input_file_validation=False):
"""SlurmSingularityBackend submits tasks as jobs to a Slurm batch cluster
:param sacct_frequency: Frequency in seconds that job updates will
Expand Down Expand Up @@ -79,24 +80,25 @@ def __init__(self,
# stdout=devnull,
# stderr=devnull
# )

self.singularity_executable = singularity_executable
if self.singularity_executable is None:
self.singularity_executable = shutil.which('singularity') or 'singularity'

self._singularity_run_sem = BoundedSemaphore( self.max_jobs ) # To ensure pulls have exclusive use of singularity

# To ensure pulls have exclusive use of singularity
self._singularity_run_sem = BoundedSemaphore(self.max_jobs)
self._singularity_pull_lock = Lock()
self._singularity_pull_cache = {}

signal.signal(signal.SIGABRT, self.cancel)
signal.signal(signal.SIGHUP, self.cancel)
signal.signal(signal.SIGIOT, self.cancel)
signal.signal(signal.SIGQUIT, self.cancel)
signal.signal(signal.SIGTERM, self.cancel)
signal.signal(signal.SIGINT, self.cancel)

log.info('SlurmSingularityBackend initialized')

def _bump_next_update(self):
self._next_update = datetime.now() + timedelta(seconds=self.sacct_frequency)
log.debug(f'Next sacct update bumped to {self._next_update.isoformat()}')
Expand Down Expand Up @@ -197,14 +199,14 @@ async def spawn(self, task):

stdin, stdout, stderr = get_fd_paths(task, self.runner.project)

input_filenames = task.directives.get( 'input', [] )
output_filenames = task.directives.get( 'output', [] )
container = task.directives.get( 'container', None )
digest = task.directives.get( 'digest', None )
if container == None:
input_filenames = task.directives.get('input', [])
output_filenames = task.directives.get('output', [])

container = task.directives.get('container', None)
digest = task.directives.get('digest', None)
if container is None:
# raise RuntimeError(f'container argument missing for task: {task.name}')
log.debug( f'Task: {task.name} is missing a container definition! Using basic slurm submission' )
log.debug(f'Task: {task.name} is missing a container definition! Using basic slurm submission')
singularity_image = None
singularity_hostname = None
docker_authentication_token = None
Expand All @@ -217,7 +219,7 @@ async def spawn(self, task):
try:
image, tag = container.split(':')
except ValueError:
log.debug( f'Tag not defined for {container}, assuming latest')
log.debug(f'Tag not defined for {container}, assuming latest')
image = container
tag = 'latest'

Expand All @@ -227,39 +229,39 @@ async def spawn(self, task):
# Stripping sha256 in case it was already included in digest
digest = re.sub('^sha256:', '', digest)
singularity_image = f"docker://{image}@sha256:{digest}"
singularity_hostname = task.directives.get( 'singularity_hostname', None )
docker_authentication_token = task.directives.get( 'docker_authentication_token', None )
log.debug( f'Task: {task.name}, going to pull: {singularity_image}' )

singularity_hostname = task.directives.get('singularity_hostname', None)

docker_authentication_token = task.directives.get('docker_authentication_token', None)

log.debug(f'Task: {task.name}, going to pull: {singularity_image}')
try:
if singularity_image in self._singularity_pull_cache:
pass
else:
async with self._singularity_pull_lock:
for i in range( self.max_jobs ):
for i in range(self.max_jobs):
await self._singularity_run_sem.acquire()
pull_command_run_string = ""
if docker_authentication_token is not None:
pull_command_run_string += f"""SINGULARITY_DOCKER_USERNAME='$oauthtoken' SINGULARITY_DOCKER_PASSWORD={docker_authentication_token} """
pull_command_run_string += f'singularity exec --cleanenv --nohttps {singularity_image} true'
log.debug( f'Task: {task.name}, pulling: {pull_command_run_string}' )
_p = await create_subprocess_shell( pull_command_run_string,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE )
log.debug(f'Task: {task.name}, pulling: {pull_command_run_string}')
_p = await create_subprocess_shell(pull_command_run_string,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
_stdout, _stderr = await _p.communicate()
log.debug( f'Task: {task.name}, pulled, stdout: {_stdout}' )
log.debug( f'Task: {task.name}, pulled, stderr: {_stderr}' )
self._singularity_pull_cache[ singularity_image ] = singularity_image
for i in range( self.max_jobs ):
log.debug(f'Task: {task.name}, pulled, stdout: {_stdout}')
log.debug(f'Task: {task.name}, pulled, stderr: {_stderr}')
self._singularity_pull_cache[singularity_image] = singularity_image
for i in range(self.max_jobs):
self._singularity_run_sem.release()
except Exception as e:
log.warning(f'Exception during singularity prepull: {e}')
p = await create_subprocess_shell( "exit 1;" )
p = await create_subprocess_shell("exit 1;")

log.debug(f'Task: {task.name}, pull complete: {singularity_image}')

log.debug( f'Task: {task.name}, pull complete: {singularity_image}' )

async with self.sbatch_lock:
time.sleep(self.sbatch_delay)
job = await sbatch(
Expand Down Expand Up @@ -524,10 +526,10 @@ def parse_sacct(data, delimiter=SLURM_SACCT_DELIMITER, id_pattern=SLURM_JOB_ID_P
return jobs


async def sbatch(cmd, identity, singularity_image, singularity_executable="singularity",
async def sbatch(cmd, identity, singularity_image, singularity_executable="singularity",
singularity_run_sem=None, singularity_hostname=None, singularity_image_digest=None,
runner_args=None, runner_preset=None, docker_authentication_token=None, name=None,
input_filenames=[], output_filenames=[], stdin=None, stdout=None, stderr=None,
runner_args=None, runner_preset=None, docker_authentication_token=None, name=None,
input_filenames=[], output_filenames=[], stdin=None, stdout=None, stderr=None,
tasks=None, cpus_per_task=1, mem="2G", walltime="1h", comment=None,
queue_args=None, queue_preset=None, sbatch_executable=None,
sbatch_account=None, input_file_validation=False):
Expand All @@ -536,41 +538,41 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
singularity_mounts = set()
if input_file_validation:
for input_filename_glob_pattern in input_filenames:
input_filenames_glob = glob.glob( input_filename_glob_pattern )
if len( input_filenames_glob ) == 0:
input_filenames_glob = glob.glob(input_filename_glob_pattern)
if len(input_filenames_glob) == 0:
raise RuntimeError(f'Task {name}: input file(s) do not exist: {input_filename_glob_pattern}')
for input_filename in input_filenames_glob:
input_filename = os.path.abspath( input_filename )
input_filename_head, input_filename_tail = os.path.split( input_filename )
if not input_filename_head.startswith(os.getcwd()):
singularity_mounts.add( input_filename_head )
input_filename = os.path.abspath(input_filename)
input_filename_head, input_filename_tail = os.path.split(input_filename)
if not input_filename_head.startswith(f'{os.getcwd()}/'):
singularity_mounts.add(input_filename_head)
else:
for input_filename in input_filenames:
input_filename = os.path.abspath( input_filename )
input_filename_head, input_filename_tail = os.path.split( input_filename )
if not input_filename_head.startswith(os.getcwd()):
singularity_mounts.add( input_filename_head )
input_filename = os.path.abspath(input_filename)
input_filename_head, input_filename_tail = os.path.split(input_filename)
if not input_filename_head.startswith(f'{os.getcwd()}/'):
singularity_mounts.add(input_filename_head)
for output_filename in output_filenames:
output_filename = os.path.abspath( output_filename )
output_filename_head, output_filename_tail = os.path.split( output_filename )
if not output_filename_head.startswith(os.getcwd()):
singularity_mounts.add( output_filename_head )
os.makedirs( output_filename_head, exist_ok=True )
output_filename = os.path.abspath(output_filename)
output_filename_head, output_filename_tail = os.path.split(output_filename)
if not output_filename_head.startswith(f'{os.getcwd()}/'):
singularity_mounts.add(output_filename_head)
os.makedirs(output_filename_head, exist_ok=True)

mount_strings = []
for singularity_mount in singularity_mounts:
mount_strings.append( "--bind %s" % ( singularity_mount ) )
singularity_mounts_string = " ".join( mount_strings ).strip()
mount_strings.append("--bind %s" % (singularity_mount))
singularity_mounts_string = " ".join(mount_strings).strip()

# create cmd script
os.makedirs( "jetstream/cmd", mode = 0o777, exist_ok = True )
os.makedirs("jetstream/cmd", mode=0o777, exist_ok=True)
millis = int(round(time.time() * 1000))
if name is None:
name = "script"
cmd_script_filename = f"jetstream/cmd/{millis}_{identity}.cmd"
cmd_script_filename = os.path.abspath( cmd_script_filename )
with open( cmd_script_filename, "w" ) as cmd_script:
cmd_script.write( cmd )
cmd_script_filename = os.path.abspath(cmd_script_filename)
with open(cmd_script_filename, "w") as cmd_script:
cmd_script.write(cmd)

sbatch_args = []

Expand Down Expand Up @@ -601,12 +603,12 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu

if walltime:
sbatch_args.extend(['-t', walltime])

if sbatch_account:
sbatch_args.extend(['--account', sbatch_account])

if comment:
fixed_comment = '"' + comment.replace('"',"'") + '"'
fixed_comment = '"' + comment.replace('"', "'") + '"'
sbatch_args.extend(['--comment', fixed_comment])

if queue_args:
Expand All @@ -625,7 +627,7 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
Slurm env 2:
defq = CPU nodes with 7 day walltime
scavenge = CPU nodes with 1 day walltime
Instead of pushing out different implementations of the specific pipeline, we can define the changes
pseudo-globally. In other words they are slurm environment specific but visually we can maintain the
same codebase for the pipelines.
Expand All @@ -639,7 +641,7 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu

sbatch_script = "#!/bin/bash\n"
skip_next = False
for i in range( 0, len(sbatch_args)):
for i in range(0, len(sbatch_args)):
if skip_next:
skip_next = False
continue
Expand All @@ -666,14 +668,17 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
else:
singularity_args.extend(runner_args)

singularity_exec_args = "--bind $JS_PIPELINE_PATH --bind $PWD --pwd $PWD --workdir /tmp --cleanenv --contain"
singularity_exec_args = "--bind $PWD --pwd $PWD --workdir /tmp --cleanenv --contain"
if os.getenv('JS_PIPELINE_PATH') is not None:
singularity_exec_args += " --bind {}".format(os.getenv('JS_PIPELINE_PATH'))
sbatch_script += "export SINGULARITYENV_JS_PIPELINE_PATH={}\n".format(os.getenv('JS_PIPELINE_PATH'))

if any('gpu' in s for s in [singularity_args, sbatch_args]):
if all('--nv' not in s for s in singularity_args):
if 'gpu' in ' '.join(map(str, sbatch_args)):
if '--nv' not in singularity_exec_args:
singularity_exec_args += ' --nv'

for arg in singularity_args:
singularity_exec_args += f" {arg}"
singularity_exec_args += f" {arg}"

singularity_hostname_arg = ""
if singularity_hostname is not None:
Expand All @@ -687,10 +692,10 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
# CUDA_VISIBLE_DEVICES is a standard method for declaring which GPUs a user is authorized to use - recognized by tensorflow for example
sbatch_script += f"[[ -v CUDA_VISIBLE_DEVICES ]] && export SINGULARITYENV_CUDA_VISIBLE_DEVICES=\"$CUDA_VISIBLE_DEVICES\"\n"
# We set the SINGULARITY_CACHEDIR to the default if it isn't defined by the user
sbatch_script += f"[[ -v SINGULARITY_CACHEDIR ]] || SINGULARITY_CACHEDIR=$HOME/.singularity/cache\n"
sbatch_script += f"[[ -v SINGULARITY_CACHEDIR ]] || SINGULARITY_CACHEDIR=$HOME/.singularity\n"
# Searching for the cached image and using it if it exists
sbatch_script += f"for file in $(find $SINGULARITY_CACHEDIR -type f -name \"{singularity_image_digest}\"); do\n"
sbatch_script += f" {singularity_executable} inspect $file > /dev/null 2>&1 && IMAGE_PATH=$file\n"
sbatch_script += f"for file in $(find $SINGULARITY_CACHEDIR/cache/oci-tmp -type f); do\n"
sbatch_script += f" {singularity_executable} inspect $file 2> /dev/null | grep -q '{singularity_image.split('docker://')[-1]}' && IMAGE_PATH=$file && break\n"
sbatch_script += f"done\n"
sbatch_script += f"if [[ -v IMAGE_PATH ]] ; then\n"
sbatch_script += f" {singularity_run_env_vars}{singularity_executable} exec {singularity_exec_args} {singularity_hostname_arg}{singularity_mounts_string} $IMAGE_PATH bash {cmd_script_filename}\n"
Expand All @@ -703,11 +708,11 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
if name is None:
name = "script"
sbatch_script_filename = f"jetstream/cmd/{millis}_{identity}.sbatch"
sbatch_script_filename = os.path.abspath( sbatch_script_filename )
with open( sbatch_script_filename, "w" ) as sbatch_script_file:
sbatch_script_file.write( sbatch_script )
sbatch_script_filename = os.path.abspath(sbatch_script_filename)
with open(sbatch_script_filename, "w") as sbatch_script_file:
sbatch_script_file.write(sbatch_script)

submit_sbatch_args = [ "sbatch", sbatch_script_filename ]
submit_sbatch_args = ["sbatch", sbatch_script_filename]
remaining_tries = SLURM_SBATCH_RETRY

while 1:
Expand All @@ -719,7 +724,7 @@ async def sbatch(cmd, identity, singularity_image, singularity_executable="singu
except subprocess.CalledProcessError:
if remaining_tries == 0:
raise
else:
else:
remaining_tries -= 1
log.exception(f'Error during sbatch, retrying in 60s ...')
time.sleep(60)
Expand Down

0 comments on commit f9594f7

Please sign in to comment.