forked from fluent/fluentd
/
test_io_handler.rb
148 lines (119 loc) · 4.42 KB
/
test_io_handler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
require_relative '../../helper'
require 'fluent/plugin/in_tail'
require 'fluent/plugin/metrics_local'
require 'tempfile'
class IntailIOHandlerTest < Test::Unit::TestCase
setup do
@file = Tempfile.new('intail_io_handler').binmode
opened_file_metrics = Fluent::Plugin::LocalMetrics.new
opened_file_metrics.configure(config_element('metrics', '', {}))
closed_file_metrics = Fluent::Plugin::LocalMetrics.new
closed_file_metrics.configure(config_element('metrics', '', {}))
rotated_file_metrics = Fluent::Plugin::LocalMetrics.new
rotated_file_metrics.configure(config_element('metrics', '', {}))
@metrics = Fluent::Plugin::TailInput::MetricsInfo.new(opened_file_metrics, closed_file_metrics, rotated_file_metrics)
end
teardown do
@file.close rescue nil
@file.unlink rescue nil
end
def create_target_info
Fluent::Plugin::TailInput::TargetInfo.new(@file.path, Fluent::FileWrapper.stat(@file.path).ino)
end
def create_watcher
Fluent::Plugin::TailInput::TailWatcher.new(create_target_info, nil, nil, nil, nil, nil, nil, nil, nil)
end
test '#on_notify load file content and passed it to receive_lines method' do
text = "this line is test\ntest line is test\n"
@file.write(text)
@file.close
watcher = create_watcher
update_pos = 0
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
stub(pe).update_pos { |val| update_pos = val }
pe
end
returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
r.on_notify
assert_equal text.bytesize, update_pos
assert_equal text, returned_lines
r.on_notify
assert_equal text.bytesize, update_pos
assert_equal text, returned_lines
end
sub_test_case 'when open_on_every_update is true and read_pos returns always 0' do
test 'open new IO and change pos to 0 and read it' do
text = "this line is test\ntest line is test\n"
@file.write(text)
@file.close
update_pos = 0
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
stub(pe).update_pos { |val| update_pos = val }
pe
end
returned_lines = ''
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 100, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: true, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.join
true
end
r.on_notify
assert_equal text.bytesize, update_pos
assert_equal text, returned_lines
r.on_notify
assert_equal text * 2, returned_lines
end
end
sub_test_case 'when limit is 5' do
test 'call receive_lines once when short line(less than 8192)' do
text = "line\n" * 8
@file.write(text)
@file.close
update_pos = 0
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
stub(pe).update_pos { |val| update_pos = val }
pe
end
returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
r.on_notify
assert_equal 8, returned_lines[0].size
end
test 'call receive_lines some times when long line(more than 8192)' do
t = 'line' * (8192 / 8)
text = "#{t}\n" * 8
@file.write(text)
@file.close
update_pos = 0
watcher = create_watcher
stub(watcher).pe do
pe = 'position_file'
stub(pe).read_pos { 0 }
stub(pe).update_pos { |val| update_pos = val }
pe
end
returned_lines = []
r = Fluent::Plugin::TailInput::TailWatcher::IOHandler.new(watcher, path: @file.path, read_lines_limit: 5, read_bytes_limit_per_second: -1, log: $log, open_on_every_update: false, metrics: @metrics) do |lines, _watcher|
returned_lines << lines.dup
true
end
r.on_notify
assert_equal 5, returned_lines[0].size
assert_equal 3, returned_lines[1].size
end
end
end