diff --git a/src/xdist/dsession.py b/src/xdist/dsession.py index 9dcfc2a6..f1045558 100644 --- a/src/xdist/dsession.py +++ b/src/xdist/dsession.py @@ -15,7 +15,8 @@ LoadGroupScheduling, WorkStealingScheduling, ) - +import logging +import pickle from queue import Empty, Queue @@ -286,6 +287,13 @@ def worker_testreport(self, node, rep): self.config.hook.pytest_runtest_logreport(report=rep) self._handlefailures(rep) + def worker_runtest_logmessage(self, node, record): + record = pickle.loads(record) + for handler in logging.getLogger().handlers: + process = record.levelno >= handler.level + if process: + handler.handle(record) + def worker_runtest_protocol_complete(self, node, item_index, duration): """ Emitted when a node fires the 'runtest_protocol_complete' event, diff --git a/src/xdist/remote.py b/src/xdist/remote.py index e035f772..496d0ff3 100644 --- a/src/xdist/remote.py +++ b/src/xdist/remote.py @@ -11,6 +11,9 @@ import os import time from typing import Any +import pickle +import logging +import copy import pytest from execnet.gateway_base import dumps, DumpError @@ -56,6 +59,48 @@ def worker_title(title): pass +class RemoteMessageHandler(logging.Handler): + """ + This handler sends events to a queue. Typically, it would be used together + with a multiprocessing Queue to centralise logging to file in one process + (in a multi-process application), so as to avoid file write contention + between processes. + + This code is new in Python 3.2, but this class can be copy pasted into + user code for use with earlier Python versions. + + Largely based on QueueHandler in handlers.py in cpython : + Source: https://github.com/python/cpython/blob/8f324b7ecd2df3036fab098c4c8ac185ac07b277/Lib/logging/handlers.py#L1412 + """ + + def __init__(self, queue): + """ + Initialise an instance, using the passed queue. + """ + logging.Handler.__init__(self) + self.queue = queue + + def emit(self, record): + """ + Emit a record. + + Writes the LogRecord to the queue, preparing it for pickling first. + """ + try: + msg = self.format(record) + # bpo-35726: make copy of record to avoid affecting other handlers in the chain. + record = copy.copy(record) + record.message = msg + record.msg = msg + record.args = None + record.exc_info = None + record.exc_text = None + x = pickle.dumps(record) + self.queue.send_log(x) + except Exception: + self.handleError(record) + + class WorkerInteractor: SHUTDOWN_MARK = object() QUEUE_REPLACED_MARK = object() @@ -70,6 +115,14 @@ def __init__(self, config, channel): self.nextitem_index = None config.pluginmanager.register(self) + # pump cli messages back to master if a level is set + if config.option.log_cli_level: + rootlog = logging.getLogger() + myhandler = RemoteMessageHandler(self) + rootlog.addHandler(myhandler) + level = logging.getLevelName(config.option.log_cli_level) + myhandler.setLevel(level) + def _make_queue(self): return self.channel.gateway.execmodel.queue.Queue() @@ -86,6 +139,9 @@ def sendevent(self, name, **kwargs): self.log("sending", name, kwargs) self.channel.send((name, kwargs)) + def send_log(self, pickled_record): + self.sendevent("runtest_logmessage", pickled_record=pickled_record) + @pytest.hookimpl def pytest_internalerror(self, excrepr): formatted_error = str(excrepr) diff --git a/src/xdist/workermanage.py b/src/xdist/workermanage.py index fdd4109a..7e5b4b6e 100644 --- a/src/xdist/workermanage.py +++ b/src/xdist/workermanage.py @@ -362,6 +362,8 @@ def process_from_remote(self, eventcall): # noqa too complex self.notify_inproc(eventname, node=self, ids=kwargs["ids"]) elif eventname == "runtest_protocol_complete": self.notify_inproc(eventname, node=self, **kwargs) + elif eventname == "runtest_logmessage": + self.notify_inproc(eventname, node=self, **kwargs) elif eventname == "unscheduled": self.notify_inproc(eventname, node=self, **kwargs) elif eventname == "logwarning":