Skip to content

Commit

Permalink
Eliminate timeout in waiting on event queue (#861)
Browse files Browse the repository at this point in the history
* Eliminate timeout in waiting on event queue.

* Fix broken documentation of timeout in EventDispatcher..

* Update the changelog.

* Update changelog.rst

Co-authored-by: Mickaël Schoentgen <contact@tiger-222.fr>
  • Loading branch information
sattlerc and BoboTiG committed Dec 11, 2021
1 parent 57e80a5 commit 61d8f86
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
4 changes: 2 additions & 2 deletions changelog.rst
Expand Up @@ -8,8 +8,8 @@ Changelog

2021-xx-xx • `full history <https://github.com/gorakhargosh/watchdog/compare/v2.1.6...master>`__

-
- Thanks to our beloved contributors: @
- Eliminate timeout in waiting on event queue. (`#861 <https://github.com/gorakhargosh/watchdog/pull/861>`_)
- Thanks to our beloved contributors: @sattlerc

2.1.6
~~~~~
Expand Down
31 changes: 20 additions & 11 deletions src/watchdog/observers/api.py
Expand Up @@ -154,49 +154,55 @@ class EventDispatcher(BaseThread):
that dispatch events from an event queue to appropriate event handlers.
:param timeout:
Event queue blocking timeout (in seconds).
Timeout value (in seconds) passed to emitters
constructions in the child class BaseObserver.
:type timeout:
``float``
"""

_stop_event = object()
"""Event inserted into the queue to signal a requested stop."""

def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseThread.__init__(self)
self._event_queue = EventQueue()
self._timeout = timeout

@property
def timeout(self):
"""Event queue block timeout."""
"""Timeout value to construct emitters with."""
return self._timeout

def stop(self):
BaseThread.stop(self)
try:
self.event_queue.put_nowait(EventDispatcher._stop_event)
except queue.Full:
pass

@property
def event_queue(self):
"""The event queue which is populated with file system events
by emitters and from which events are dispatched by a dispatcher
thread."""
return self._event_queue

def dispatch_events(self, event_queue, timeout):
def dispatch_events(self, event_queue):
"""Override this method to consume events from an event queue, blocking
on the queue for the specified timeout before raising :class:`queue.Empty`.
:param event_queue:
Event queue to populate with one set of events.
:type event_queue:
:class:`EventQueue`
:param timeout:
Interval period (in seconds) to wait before timing out on the
event queue.
:type timeout:
``float``
:raises:
:class:`queue.Empty`
"""

def run(self):
while self.should_keep_running():
try:
self.dispatch_events(self.event_queue, self.timeout)
self.dispatch_events(self.event_queue)
except queue.Empty:
continue

Expand Down Expand Up @@ -360,8 +366,11 @@ def unschedule_all(self):
def on_thread_stop(self):
self.unschedule_all()

def dispatch_events(self, event_queue, timeout):
event, watch = event_queue.get(block=True, timeout=timeout)
def dispatch_events(self, event_queue):
entry = event_queue.get(block=True)
if entry is EventDispatcher._stop_event:
return
event, watch = entry

with self._lock:
# To allow unschedule/stop and safe removal of event handlers
Expand Down

0 comments on commit 61d8f86

Please sign in to comment.