Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eliminate timeout in waiting on event queue #861

Merged
merged 4 commits into from Dec 11, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 3 additions & 3 deletions changelog.rst
Expand Up @@ -8,8 +8,8 @@ Changelog

2021-xx-xx • `full history <https://github.com/gorakhargosh/watchdog/compare/v2.1.6...master>`__

-
- Thanks to our beloved contributors: @
- Eliminate timeout in waiting on event queue. (`#861 <https://github.com/gorakhargosh/watchdog/pull/861>`_)
- Thanks to our beloved contributors: @sattlerc

2.1.6
~~~~~
Expand Down Expand Up @@ -60,7 +60,7 @@ Changelog

- [mac] Fix relative path handling for non-recursive watch. (`#797 <https://github.com/gorakhargosh/watchdog/pull/797>`_)
- [windows] On PyPy, events happening right after ``start()`` were missed. Add a workaround for that. (`#796 <https://github.com/gorakhargosh/watchdog/pull/796>`_)
- Thanks to our beloved contributors: @oprypin, @CCP-Aporia, @BoboTiG
w- Thanks to our beloved contributors: @oprypin, @CCP-Aporia, @BoboTiG
BoboTiG marked this conversation as resolved.
Show resolved Hide resolved

2.1.1
~~~~~
Expand Down
31 changes: 20 additions & 11 deletions src/watchdog/observers/api.py
Expand Up @@ -154,49 +154,55 @@ class EventDispatcher(BaseThread):
that dispatch events from an event queue to appropriate event handlers.

:param timeout:
Event queue blocking timeout (in seconds).
Timeout value (in seconds) passed to emitters
constructions in the child class BaseObserver.
:type timeout:
``float``
"""

_stop_event = object()
"""Event inserted into the queue to signal a requested stop."""

def __init__(self, timeout=DEFAULT_OBSERVER_TIMEOUT):
BaseThread.__init__(self)
self._event_queue = EventQueue()
self._timeout = timeout

@property
def timeout(self):
"""Event queue block timeout."""
"""Timeout value to construct emitters with."""
return self._timeout

def stop(self):
BaseThread.stop(self)
try:
self.event_queue.put_nowait(EventDispatcher._stop_event)
except queue.Full:
pass

@property
def event_queue(self):
"""The event queue which is populated with file system events
by emitters and from which events are dispatched by a dispatcher
thread."""
return self._event_queue

def dispatch_events(self, event_queue, timeout):
def dispatch_events(self, event_queue):
"""Override this method to consume events from an event queue, blocking
on the queue for the specified timeout before raising :class:`queue.Empty`.

:param event_queue:
Event queue to populate with one set of events.
:type event_queue:
:class:`EventQueue`
:param timeout:
Interval period (in seconds) to wait before timing out on the
event queue.
:type timeout:
``float``
:raises:
:class:`queue.Empty`
"""

def run(self):
while self.should_keep_running():
try:
self.dispatch_events(self.event_queue, self.timeout)
self.dispatch_events(self.event_queue)
except queue.Empty:
continue

Expand Down Expand Up @@ -360,8 +366,11 @@ def unschedule_all(self):
def on_thread_stop(self):
self.unschedule_all()

def dispatch_events(self, event_queue, timeout):
event, watch = event_queue.get(block=True, timeout=timeout)
def dispatch_events(self, event_queue):
entry = event_queue.get(block=True)
if entry is EventDispatcher._stop_event:
return
event, watch = entry

with self._lock:
# To allow unschedule/stop and safe removal of event handlers
Expand Down