Skip to content

Commit

Permalink
Merge pull request #178 from wwade/job-notify-arg
Browse files Browse the repository at this point in the history
main: add a "-N/--notify" arg to run and notify together
  • Loading branch information
wwade committed May 13, 2022
2 parents c7ec87f + 7ba883e commit 4450931
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 61 deletions.
2 changes: 1 addition & 1 deletion jobrunner/db/__init__.py
Expand Up @@ -470,7 +470,7 @@ def filterJobsWith(job, startTime=True, skipReminders=False):
pass
return True

def getJobMatch(self, key, thisWs, skipReminders=False):
def getJobMatch(self, key, thisWs, skipReminders=False) -> JobInfo:
# pylint: disable=too-many-return-statements,too-many-branches,
# pylint: disable=too-many-statements
if key == '.':
Expand Down
167 changes: 118 additions & 49 deletions jobrunner/main.py
Expand Up @@ -4,10 +4,18 @@
import hashlib
import os
from os.path import expanduser
from subprocess import PIPE, CalledProcessError, Popen, check_call, check_output
from subprocess import (
DEVNULL,
PIPE,
CalledProcessError,
Popen,
check_call,
check_output,
)
import sys
import tempfile
import time
from typing import IO, List, Optional, Tuple

import dateutil.parser
import dateutil.tz
Expand Down Expand Up @@ -76,29 +84,13 @@ def impl_main(args=None):
cmd = []
if options.mail:
oneJob = jobs.getJobMatch(options.mail[0], options.tw)
subj = '[job-status] '
if len(options.mail) > 1:
subj += "Multiple jobs: %s" % repr(options.mail)
else:
subj += str(oneJob)
cmd.extend([config.mailProgram, '-s', subj])
if options.cc:
for ccAddr in options.cc:
if '@' not in ccAddr:
assert config.mailDomain
ccAddr += '@' + config.mailDomain
cmd += ['-c', ccAddr]
if config.mailProgram == 'chatmail':
# Special case for built-in chatmail, which should inherit any of the
# base args given to job, such as which rc file to use, etc.
cmd.extend(baseParsedArgsToArgList(args or sys.argv, options))
cmd.append(options.to)
cmd = sendMailOrNotifyCmd(args, options.mail, options, config, oneJob)
elif options.command:
bashCmd = postCommand(options.command)
cmd = ['bash', '-c', bashCmd]
elif options.retry:
oldJob = jobs.getJobMatch(options.retry, options.tw)
if os.getcwd() != oldJob.pwd:
if os.getcwd() != oldJob.pwd and oldJob.pwd:
sprint(
"NOTE: Changing directory to '%s' to retry job." %
oldJob.pwd)
Expand All @@ -110,7 +102,7 @@ def impl_main(args=None):
elif options.reminder:
cmd = None
elif options.program:
cmd = [options.program]
cmd = [str(options.program)]
if options.args:
cmd = cmd + postCommand(options.args)
elif options.wait:
Expand Down Expand Up @@ -207,36 +199,13 @@ def impl_main(args=None):
sprint("reminder: '%s'" % options.reminder)
sys.exit(0)

assert cmd is not None
if options.mail:
# Collect output files as attachments from dep jobs
# pylint: disable=consider-using-with
tmp = tempfile.NamedTemporaryFile(prefix='jobInfo-')
options.input = tmp.name
mailSize = 0

# Remove 'to' address temporarily
assert cmd is not None
lastArg = cmd.pop(-1)
for j in mailDeps:
depJob = jobs.inactive[j.permKey]
safeWrite(tmp, depJob.detail(options.verbose))
safeWrite(tmp, "\n" + SPACER_EACH + "\n")
lines = autoDecode(check_output(['tail', '-n20', depJob.logfile]))
safeWrite(tmp, lines)
safeWrite(tmp, SPACER_EACH + "\n")
safeWrite(tmp, "\n")
try:
stat = os.stat(depJob.logfile)
except OSError:
continue
if (mailSize + stat.st_size * 4 / 3) < 8 * 1024 * 1024:
mailSize += stat.st_size
cmd += ['-a', depJob.logfile]
tmp.flush()
cmd.append(lastArg)
cmd, inp = extendMailOrNotifyCmdLockRequired(cmd, jobs, mailDeps)
options.input = inp.name

# unlocked
runJob(cmd, options, jobs, job, fd, doIsolate)
runJob(args, cmd, options, config, jobs, job, fd, doIsolate)


def main(args=None):
Expand Down Expand Up @@ -306,7 +275,7 @@ def safeWrite(fd, value):
fd.write(value.encode('utf-8'))


def postCommand(cmd):
def postCommand(cmd: List[str]) -> List[str]:
return cmd


Expand Down Expand Up @@ -700,6 +669,11 @@ def parseArgs(args=None):
metavar="KEY",
action="append",
help="Send mail on job completion for job specified by KEY")
op.add_argument(
"-N",
"--notify",
action="store_true",
help="Send mail (notify) on job completion")
op.add_argument("-t", "--to", metavar="ADDRESS",
help="Specify 'to' address for mail notification "
"(default=%(default)s)",
Expand Down Expand Up @@ -778,7 +752,79 @@ def handleIsolate(cmd):
return netnsd


def runJob(cmd, options, jobs, job, fd, doIsolate):
def sendMailOrNotifyCmd(
args: Optional[List[str]],
notifyArg: List[str],
options: argparse.Namespace,
config: Config,
job: JobInfo,
) -> List[str]:
cmd = []
subj = '[job-status] '
if len(notifyArg) > 1:
subj += "Multiple jobs: %s" % repr(notifyArg)
else:
subj += str(job)
cmd.extend([config.mailProgram, '-s', subj])
if options.cc:
for ccAddr in options.cc:
if '@' not in ccAddr:
assert config.mailDomain
ccAddr += '@' + config.mailDomain
cmd += ['-c', ccAddr]
if config.mailProgram == 'chatmail':
# Special case for built-in chatmail, which should inherit any of the
# base args given to job, such as which rc file to use, etc.
cmd.extend(baseParsedArgsToArgList(args or sys.argv, options))
cmd.append(options.to)
return cmd


def extendMailOrNotifyCmdLockRequired(
cmd: List[str],
jobs: JobsBase,
mailDeps: List[JobInfo],
) -> Tuple[List[str], IO]:
# Collect output files as attachments from dep jobs
# pylint: disable=consider-using-with
tmp = tempfile.NamedTemporaryFile(prefix='jobInfo-')
inp = tmp
mailSize = 0

# Remove 'to' address temporarily
assert cmd is not None
lastArg = cmd.pop(-1)
for j in mailDeps:
depJob = jobs.inactive[j.permKey]
safeWrite(tmp, depJob.detail(False))
safeWrite(tmp, "\n" + SPACER_EACH + "\n")
assert depJob.logfile
lines = autoDecode(check_output(['tail', '-n20', depJob.logfile]))
safeWrite(tmp, lines)
safeWrite(tmp, SPACER_EACH + "\n")
safeWrite(tmp, "\n")
try:
stat = os.stat(depJob.logfile)
except OSError:
continue
if (mailSize + stat.st_size * 4 / 3) < 8 * 1024 * 1024:
mailSize += stat.st_size
cmd += ['-a', depJob.logfile]
tmp.flush()
cmd.append(lastArg)
return cmd, inp


def runJob(
args: List[str],
cmd: List[str],
options: argparse.Namespace,
config: Config,
jobs: JobsBase,
job: JobInfo,
fd: int,
doIsolate: bool,
) -> None:
# pylint: disable=too-many-arguments
# pylint: disable=too-many-statements
LOG.info("execute: %s", job.cmdStr)
Expand Down Expand Up @@ -817,6 +863,29 @@ def runJob(cmd, options, jobs, job, fd, doIsolate):
LOG.debug("locked DB, writing 'finish' status rc=%d", rc)
finish(job, rc)
LOG.debug("unlocked, should now exit rc=%d", rc)

if options.notify:
notifyCmd = sendMailOrNotifyCmd(args, [], options, config, job)
with lockedSection(jobs):
notifyCmd, inp = extendMailOrNotifyCmdLockRequired(
notifyCmd,
jobs,
[job],
)
fpIn = encoding_open(inp.name, "r")
try:
LOG.debug("running notifyCmd %r", notifyCmd)
notifyRc = check_call(
notifyCmd,
stdin=fpIn,
stdout=DEVNULL,
stderr=DEVNULL,
)
LOG.debug("check_call() => rc=%d", notifyRc)
except Exception as err: # pylint: disable=broad-except
sprint("Notification error:", err, file=sys.stderr)
LOG.debug("General exception (ignored)", exc_info=True)

LOG.debug("exit rc=%d", rc)
sys.exit(rc)

Expand Down
10 changes: 6 additions & 4 deletions jobrunner/test/integration/send_email.py
@@ -1,8 +1,10 @@
#!/usr/bin/env python
from __future__ import absolute_import, division, print_function

import json
import os
import sys

import simplejson as json

print(json.dumps(sys.argv[1:]))
dumpFile = os.getenv("SEND_EMAIL_DUMP_FILE")
if dumpFile:
with open(dumpFile, "w", encoding="utf-8") as fp:
json.dump(sys.argv[1:], fp)
30 changes: 23 additions & 7 deletions jobrunner/test/integration/smoke_test.py
Expand Up @@ -264,6 +264,7 @@ class RunMailTest(TestCase):
Test mail-related options
--mail
--notify
--to
--cc
--rc-file
Expand Down Expand Up @@ -291,13 +292,28 @@ def test(self):
# --cc
jobf('--rc-file', rcFile.name, '--mail', 'true', '--to', 'someone',
'--cc', 'another')
args2 = self.getMailArgs(lastKey())
print(repr(args2))
self.assertIn('-s', args2)
self.assertIn('-a', args2)
self.assertNotIn('me', args2)
self.assertIn('someone', args2)
self.assertIn('another@example.com', args2)
args2 = self.getMailArgs(lastKey())
print(repr(args2))
self.assertIn('-s', args2)
self.assertIn('-a', args2)
self.assertNotIn('me', args2)
self.assertIn('someone', args2)
self.assertIn('another@example.com', args2)

def testNotify(self):
with testEnv():
with NamedTemporaryFile() as rcFile:
rcFile.write(MAIL_CONFIG.encode('utf-8'))
rcFile.flush()
with NamedTemporaryFile() as dumpFile:
os.environ["SEND_EMAIL_DUMP_FILE"] = dumpFile.name
# --notify
jobf('--rc-file', rcFile.name, '--notify', 'true')
with open(dumpFile.name, "r", encoding="utf-8") as fp:
args = json.load(fp)
self.assertIn('-s', args)
self.assertIn('-a', args)
self.assertIn('me', args)


class OtherCommandSmokeTest(TestCase):
Expand Down

0 comments on commit 4450931

Please sign in to comment.