/
test_observer.py
161 lines (121 loc) · 4.38 KB
/
test_observer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
# coding: utf-8
#
# Copyright 2014 Thomas Amland <thomas.amland@gmail.com>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import threading
from unittest.mock import patch
import pytest
from watchdog.events import FileSystemEventHandler, FileModifiedEvent
from watchdog.observers.api import EventEmitter, BaseObserver
@pytest.fixture
def observer():
obs = BaseObserver(EventEmitter)
yield obs
obs.stop()
with contextlib.suppress(RuntimeError):
obs.join()
@pytest.fixture
def observer2():
obs = BaseObserver(EventEmitter)
yield obs
obs.stop()
with contextlib.suppress(RuntimeError):
obs.join()
def test_schedule_should_start_emitter_if_running(observer):
observer.start()
observer.schedule(None, '')
(emitter,) = observer.emitters
assert emitter.is_alive()
def test_schedule_should_not_start_emitter_if_not_running(observer):
observer.schedule(None, '')
(emitter,) = observer.emitters
assert not emitter.is_alive()
def test_start_should_start_emitter(observer):
observer.schedule(None, '')
observer.start()
(emitter,) = observer.emitters
assert emitter.is_alive()
def test_stop_should_stop_emitter(observer):
observer.schedule(None, '')
observer.start()
(emitter,) = observer.emitters
assert emitter.is_alive()
observer.stop()
observer.join()
assert not observer.is_alive()
assert not emitter.is_alive()
def test_unschedule_self(observer):
"""
Tests that unscheduling a watch from within an event handler correctly
correctly unregisters emitter and handler without deadlocking.
"""
class EventHandler(FileSystemEventHandler):
def on_modified(self, event):
observer.unschedule(watch)
unschedule_finished.set()
unschedule_finished = threading.Event()
watch = observer.schedule(EventHandler(), '')
observer.start()
(emitter,) = observer.emitters
emitter.queue_event(FileModifiedEvent(''))
assert unschedule_finished.wait()
assert len(observer.emitters) == 0
def test_schedule_after_unschedule_all(observer):
observer.start()
observer.schedule(None, '')
assert len(observer.emitters) == 1
observer.unschedule_all()
assert len(observer.emitters) == 0
observer.schedule(None, '')
assert len(observer.emitters) == 1
def test_2_observers_on_the_same_path(observer, observer2):
assert observer is not observer2
observer.schedule(None, '')
assert len(observer.emitters) == 1
observer2.schedule(None, '')
assert len(observer2.emitters) == 1
def test_start_failure_should_not_prevent_further_try(observer):
observer.schedule(None, '')
emitters = observer.emitters
assert len(emitters) == 1
# Make the emitter to fail on start()
def mocked_start():
raise OSError()
emitter = next(iter(emitters))
with patch.object(emitter, "start", new=mocked_start):
with pytest.raises(OSError):
observer.start()
# The emitter should be removed from the list
assert len(observer.emitters) == 0
# Restoring the original behavior should work like there never be emitters
observer.start()
assert len(observer.emitters) == 0
# Re-schduling the watch should work
observer.schedule(None, '')
assert len(observer.emitters) == 1
def test_schedule_failure_should_not_prevent_future_schedules(monkeypatch, observer):
observer.start()
# Make the emitter fail on start(), and subsequently the observer to fail on schedule()
def mocked_start(emitter):
raise OSError()
monkeypatch.setattr(EventEmitter, "start", mocked_start)
with pytest.raises(OSError):
observer.schedule(None, '')
# The emitter should not be in the list
emitters = observer.emitters
assert len(emitters) == 0
# Re-schduling the watch should work
observer.schedule(None, '')
assert len(observer.emitters) == 1