Skip to content

Commit

Permalink
Merge pull request #241 from wwade/notifier
Browse files Browse the repository at this point in the history
cmd: add --notifier PROGRAM arg to invoke system notifier
  • Loading branch information
wwade committed Jan 6, 2023
2 parents 3a70177 + 016cafd commit eb3a871
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 47 deletions.
41 changes: 41 additions & 0 deletions README.rst
Expand Up @@ -129,6 +129,47 @@ Sample rcfile:
# It should show up as "user/some_long_integer" somewhere in the span's metadata.
user1 = <long integer>
System Notifications (Systemd user service example)
---------------------------------------------------

If you want to enable notifications when jobs finish, one way to do this is to use the --notifier
argument.

``~/.config/systemd/user/job-notify.service``:

.. code:: aconf
[Unit]
Description=Jobrunner Notifier
[Service]
Type=simple
ExecStart=env job --notifier jsonNotify.py
RestartSec=30
Restart=always
[Install]
WantedBy=default.target
``~/.local/bin/jsonNotify.py``:

.. code:: python
#!/usr/bin/env python3
from json import load
import subprocess
from sys import stdin
cmd = ["notify-send"]
data = load(stdin)
rc = data.get("rc", 0)
if rc != 0:
cmd += ["--urgency=critical"]
cmd += [data["subject"], data["body"]]
subprocess.run(cmd)
Hacking
-------

Expand Down
51 changes: 33 additions & 18 deletions jobrunner/db/__init__.py
@@ -1,22 +1,18 @@
from __future__ import absolute_import, division, print_function

from datetime import datetime
from functools import cmp_to_key
import logging
import os
import os.path
import subprocess
import sys
import tempfile
import time
from typing import Optional
from typing import List, Optional
from uuid import uuid4

from dateutil import parser
from dateutil.tz import tzlocal, tzutc
import simplejson as json
import six
from six import text_type
from six.moves import filter

from jobrunner import utils

Expand Down Expand Up @@ -83,9 +79,6 @@ def db(self):
def filterJobs(self, k):
return k not in self.special

def iteritems(self):
return six.iteritems(self.db)

def getCount(self):
return int(self.db[self.ITEMCOUNT])

Expand All @@ -99,7 +92,7 @@ def setCount(self, inc):
def getCheckpoint(self):
try:
return dateTimeFromJson(json.loads(self.db[self.CHECKPOINT]))
except (KeyError, EOFError, json.scanner.JSONDecodeError):
except (KeyError, EOFError, json.JSONDecodeError):
epoch = datetime.utcfromtimestamp(0)
return epoch.replace(tzinfo=tzutc())

Expand Down Expand Up @@ -291,9 +284,14 @@ def prune(self, exceptNum=None):
del self.inactive[job.key]

@staticmethod
def getDbSorted(db, _limit, useCp=False, filterWs=False):
def getDbSorted(db: DatabaseBase,
_limit: Optional[int] = None,
useCp=False,
filterWs=False) -> List[JobInfo]:
cpUtc = None
if useCp:
cpUtc = db.checkpoint
curWs = None
if filterWs:
curWs = utils.workspaceIdentity()
jobList = []
Expand All @@ -303,16 +301,15 @@ def getDbSorted(db, _limit, useCp=False, filterWs=False):
job = db[k]
except KeyError:
continue
if useCp:
if cpUtc:
refTime = job.createTime
if not refTime or refTime < cpUtc:
continue
if filterWs:
if job.workspace != curWs:
continue
if filterWs and job.workspace != curWs:
continue
jobList.append(job)
# if _limit and len( jobList ) > _limit:
# break
if _limit and len(jobList) > _limit:
break
jobList.sort(reverse=False)
return jobList

Expand Down Expand Up @@ -610,6 +607,24 @@ def getResources(self):
val += self.plugins.getResources(self)
return val

def notifyActivity(self, callback: str) -> None:
curJobs = {j.key for j in self.getDbSorted(self.active, None, False)}
while True:
safeSleep(1, self)
activeJobs = {j.key for j in self.getDbSorted(self.active, None, False)}
for k in curJobs - activeJobs:
if k not in self.inactive:
continue
job = self.inactive[k]
payload = {
"subject": "Job finished " + str(job),
"body": job.detail(),
"rc": job.rc,
}
inp = json.dumps(payload)
subprocess.run([callback], input=inp, text=True, check=False)
curJobs = activeJobs

def watchActivity(self):
# pylint: disable=too-many-locals,too-many-branches,too-many-statements
def _nonReminder(job):
Expand Down Expand Up @@ -758,7 +773,7 @@ def _byAge(refA, refB):
sprint(
" last %s, \033[97m%s\033[0m ago" %
(res, diffTime))
sprint(" " + text_type(j))
sprint(" " + str(j))
if wkspace in remind:
sprint(" reminders:")
for j in remind[wkspace]:
Expand Down
8 changes: 4 additions & 4 deletions jobrunner/info.py
@@ -1,11 +1,10 @@
from builtins import map, range
import errno
from functools import total_ordering
from logging import getLogger
import os
from shlex import quote
import string
from typing import Any, List, Optional
from typing import Any, Iterable, List, Optional, Sized

import dateutil.tz

Expand Down Expand Up @@ -497,7 +496,8 @@ def getValue(self, what):
except AttributeError:
return "N/A"

def showInOrder(self, order, level):
def showInOrder(self, order: Iterable[str],
level: Optional[Sized] = None) -> str:
longLine = 0
for k in order:
if len(k) > longLine:
Expand Down Expand Up @@ -533,7 +533,7 @@ def showReminder(self):
]
return self.showInOrder(order, None)

def detail(self, level):
def detail(self, level: Optional[Sized] = None) -> str:
if self.reminder:
return self.showReminder()

Expand Down
15 changes: 12 additions & 3 deletions jobrunner/main.py
Expand Up @@ -345,7 +345,9 @@ def addNonExecOptions(op):
op.add_argument("-L", "--list-inactive", action="store_true",
help="List inactive jobs")
op.add_argument("-W", "--watch", action="store_true",
help="Watch for any job acitvity")
help="Watch for any job activity")
op.add_argument("--notifier", metavar="PROGRAM",
help="Notify using PROGRAM on any job activity")
op.add_argument("-s", "--show", metavar="KEY", action="append",
help="Get details for job specified by KEY")
op.add_argument("-K", "--last-key", action="store_true",
Expand Down Expand Up @@ -493,9 +495,10 @@ def handleNonExecOptions(options: argparse.Namespace, jobs: JobsBase):
return False


def handleNonExecWriteOptions(options, jobs):
def handleNonExecWriteOptions(options, jobs: JobsBase):
# pylint: disable=too-many-branches
# pylint: disable=too-many-return-statements
# pylint: disable=too-many-statements
if options.stop:
errors = []
for k in options.stop:
Expand Down Expand Up @@ -555,6 +558,12 @@ def handleNonExecWriteOptions(options, jobs):
sprint("Exit on user interrupt")
raise ExitCode(1) from err
return True
elif options.notifier:
try:
jobs.notifyActivity(options.notifier)
except KeyboardInterrupt:
pass
return True
else:
return False

Expand Down Expand Up @@ -796,7 +805,7 @@ def extendMailOrNotifyCmdLockRequired(
lastArg = cmd.pop(-1)
for j in mailDeps:
depJob = jobs.inactive[j.permKey]
safeWrite(tmp, depJob.detail(False))
safeWrite(tmp, depJob.detail())
safeWrite(tmp, "\n" + SPACER_EACH + "\n")
assert depJob.logfile
out = check_output(["tail", "-n20", depJob.logfile])
Expand Down
16 changes: 16 additions & 0 deletions jobrunner/test/integration/dump_json_input.py
@@ -0,0 +1,16 @@
#!/usr/bin/env python3

from json import dump, load
from os import environ
from sys import stdin


def main():
data = load(stdin)
with open(environ["DUMP_FILE"], "w", encoding="utf-8") as fp:
dump(data, fp, indent=2)
print("dumped")


if __name__ == "__main__":
main()
4 changes: 2 additions & 2 deletions jobrunner/test/integration/integration_lib.py
Expand Up @@ -147,7 +147,7 @@ def inactiveCount():
return int(run(["job", "--count"], capture=True))


def spawn(cmd):
def spawn(cmd, env=None):
print(" ".join(map(quote, cmd)))
child = pexpect.spawn(cmd[0], cmd[1:], echo=True)
child = pexpect.spawn(cmd[0], cmd[1:], echo=True, env=env)
return child
64 changes: 44 additions & 20 deletions jobrunner/test/integration/smoke_test.py
@@ -1,8 +1,8 @@
from __future__ import absolute_import, division, print_function

from json import load
from logging import getLogger
import os
import re
from shlex import quote
from subprocess import CalledProcessError, check_call
from tempfile import NamedTemporaryFile
import time
Expand Down Expand Up @@ -210,26 +210,50 @@ def test(self):
def testWatchWait(self):
with getTestEnv():
# --watch
# --notifier
# --wait
print("+ job --watch")
child = spawn(["job", "--watch"])
child.expect("No jobs running")
child.expect(r"\r")
sleeper = spawn(["job", "--foreground", "sleep", "60"])
sleeper.expect(r"execute: sleep 60")

# Confirm --watch output
child.expect(r"1 job running")
child.sendintr()

# Wait for the sleep 60
waiter = spawn(["job", "--wait", "sleep"])
waiter.expect(r"adding dependency.*sleep 60")

# Kill the sleep 60
sleeper.sendintr()
waiter.expect(r"Dependent job failed:.*sleep 60")
waiter.expect(EOF)
watch = spawn(["job", "--watch"])
watch.expect("No jobs running")
watch.expect(r"\r")

with NamedTemporaryFile() as notifierOut:
notifierOut.close()
env = dict(os.environ)
env["DUMP_FILE"] = notifierOut.name
cmd = ["job", "--notifier", "./dump_json_input.py"]
print("+", map(quote, cmd))
notifier = spawn(cmd, env=env)

sleeper = spawn(["job", "--foreground", "sleep", "60"])
sleeper.expect(r"execute: sleep 60")

# Confirm --watch output
watch.expect(r"1 job running")
watch.sendintr()

# Wait for the sleep 60
waiter = spawn(["job", "--wait", "sleep"])
waiter.expect(r"adding dependency.*sleep 60")

# Kill the sleep 60
sleeper.sendintr()
waiter.expect(r"Dependent job failed:.*sleep 60")
waiter.expect(EOF)

notifier.expect("dumped")
notifier.sendintr()
notifier.expect(EOF)
notifierOut.close()

with open(notifierOut.name, encoding="utf-8") as fp:
dumped = load(fp)
print(dumped)
self.assertIn("subject", dumped)
self.assertIn("body", dumped)
self.assertIn("rc", dumped)
self.assertEqual(-1, dumped["rc"])
self.assertRegex(dumped["subject"], r"^Job finished.*sleep 60$")

def testRobot(self):
with getTestEnv():
Expand Down

0 comments on commit eb3a871

Please sign in to comment.