/
processor.rb
128 lines (104 loc) · 3.21 KB
/
processor.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# frozen_string_literal: true
module Listen
module Event
class Processor
def initialize(config, reasons)
@config = config
@listener = config.listener
@reasons = reasons
_reset_no_unprocessed_events
end
# TODO: implement this properly instead of checking the state at arbitrary
# points in time
def loop_for(latency)
@latency = latency
loop do
event = _wait_until_events
_check_stopped
_wait_until_events_calm_down
_wait_until_no_longer_paused
_process_changes(event)
end
rescue Stopped
Listen.logger.debug('Processing stopped')
end
private
class Stopped < RuntimeError
end
def _wait_until_events_calm_down
loop do
now = _timestamp
# Assure there's at least latency between callbacks to allow
# for accumulating changes
diff = _deadline - now
break if diff <= 0
# give events a bit of time to accumulate so they can be
# compressed/optimized
_sleep(diff)
end
end
def _wait_until_no_longer_paused
@listener.wait_for_state(*(Listener.states.keys - [:paused]))
end
def _check_stopped
if @listener.stopped?
_flush_wakeup_reasons
raise Stopped
end
end
def _sleep(seconds)
_check_stopped
config.sleep(seconds)
_check_stopped
_flush_wakeup_reasons do |reason|
if reason == :event && !@listener.paused?
_remember_time_of_first_unprocessed_event
end
end
end
def _remember_time_of_first_unprocessed_event
@_remember_time_of_first_unprocessed_event ||= _timestamp
end
def _reset_no_unprocessed_events
@_remember_time_of_first_unprocessed_event = nil
end
def _deadline
@_remember_time_of_first_unprocessed_event + @latency
end
# blocks until event is popped
# returns the event or `nil` when the event_queue is closed
def _wait_until_events
config.event_queue.pop.tap do |_event|
@_remember_time_of_first_unprocessed_event ||= _timestamp
end
end
def _flush_wakeup_reasons
until @reasons.empty?
reason = @reasons.pop
yield reason if block_given?
end
end
def _timestamp
config.timestamp
end
# for easier testing without sleep loop
def _process_changes(event)
_reset_no_unprocessed_events
changes = [event]
changes << config.event_queue.pop until config.event_queue.empty?
return unless config.callable?
hash = config.optimize_changes(changes)
result = [hash[:modified], hash[:added], hash[:removed]]
return if result.all?(&:empty?)
block_start = _timestamp
exception_note = " (exception)"
::Listen::Thread.rescue_and_log('_process_changes') do
config.call(*result)
exception_note = nil
end
Listen.logger.debug "Callback#{exception_note} took #{_timestamp - block_start} sec"
end
attr_reader :config
end
end
end