/
bunny_test.rb
368 lines (294 loc) · 11 KB
/
bunny_test.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
# encoding: utf-8
# This file is distributed under New Relic's license terms.
# See https://github.com/newrelic/newrelic-ruby-agent/blob/main/LICENSE for complete details.
require File.expand_path(File.join(File.dirname(__FILE__), "..", "..", "..", "helpers", "misc"))
class BunnyTest < Minitest::Test
include MultiverseHelpers
setup_and_teardown_agent do
@conn = Bunny.new
@conn.start
@chan = @conn.create_channel
end
def teardown
@conn.close
end
def test_metrics_recorded_for_produce_to_the_default_exchange
with_queue do |queue|
in_transaction "test_txn" do
queue.publish("test_msg")
end
assert_metrics_recorded [
["MessageBroker/RabbitMQ/Exchange/Produce/Named/Default", "test_txn"],
"MessageBroker/RabbitMQ/Exchange/Produce/Named/Default"
]
end
end
def test_metrics_recorded_for_consume_from_the_default_exchange
with_queue do |queue|
queue.publish "test_msg"
in_transaction "test_txn" do
queue.pop
end
assert_metrics_recorded [
["MessageBroker/RabbitMQ/Exchange/Consume/Named/Default", "test_txn"],
"MessageBroker/RabbitMQ/Exchange/Consume/Named/Default"
]
end
end
def test_cat_headers_not_read_for_pop_by_default
cross_process_id = "321#123"
with_queue do |queue|
with_config :"cross_application_tracer.enabled" => true, :cross_process_id => cross_process_id, :encoding_key => "abc" do
in_transaction "first_txn" do
queue.publish "test_msg"
end
in_transaction "test_txn" do
queue.pop
end
event = last_transaction_event
refute event[0].has_key?("nr.guid"), "Event should not have key 'nr.guid'"
refute event[0].has_key?("nr.referringTransactionGuid"), "Event should not have key 'nr.referringTransactionGuid'"
refute event[0].has_key?("nr.tripId"), "Event should not have key 'nr.tripId'"
refute event[0].has_key?("nr.pathHash"), "Event should not have key 'nr.pathHash'"
refute event[0].has_key?("nr.referringPathHash"), "Event should not have key 'nr.referringPathHash'"
assert_metrics_not_recorded ["ClientApplication/#{cross_process_id}/all"]
end
end
end
def test_metrics_recorded_for_produce_to_a_named_exchange
x = Bunny::Exchange.new @chan, :fanout, "activity.events"
in_transaction "test_txn" do
x.publish "hi"
end
assert_metrics_recorded [
["MessageBroker/RabbitMQ/Exchange/Produce/Named/activity.events", "test_txn"],
"MessageBroker/RabbitMQ/Exchange/Produce/Named/activity.events"
]
end
def test_metrics_recorded_for_consume_from_a_named_exchange
x = @chan.fanout "activity.events"
with_queue do |queue|
queue.bind x
x.publish "howdy", {
routing_key: "red"
}
in_transaction "test_txn" do
queue.pop
end
assert_metrics_recorded [
["MessageBroker/RabbitMQ/Exchange/Consume/Named/activity.events", "test_txn"],
"MessageBroker/RabbitMQ/Exchange/Consume/Named/activity.events"
]
end
end
def test_segment_parameters_recorded_for_produce
with_config(:'distributed_tracing.enabled' => false) do
x = @chan.fanout "activity.events"
headers = {foo: "bar"}
in_transaction "test_txn" do
x.publish "howdy", {
routing_key: "red",
headers: headers,
reply_to: "blue",
correlation_id: "abc"
}
end
node = find_node_with_name_matching last_transaction_trace, /^MessageBroker\//
assert_equal :fanout, node.params[:exchange_type]
assert_equal "red", node.params[:routing_key]
assert_equal headers, node.params[:headers]
assert_equal "blue", node.params[:reply_to]
assert_equal "abc", node.params[:correlation_id]
end
end
def test_segment_parameters_recorded_for_consume
headers = {foo: "bar"}
with_queue do |queue|
queue.publish "howdy", {
headers: headers,
reply_to: "blue",
correlation_id: "abc"
}
in_transaction "test_txn" do
queue.pop
end
node = find_node_with_name_matching last_transaction_trace, /^MessageBroker\//
assert_equal :direct, node.params[:exchange_type]
assert_equal queue.name, node.params[:routing_key]
assert_equal({"foo" => "bar"}, node.params[:headers])
assert_equal "blue", node.params[:reply_to]
assert_equal "abc", node.params[:correlation_id]
end
end
def test_pop_returns_original_message
with_queue do |queue|
queue.publish "howdy"
msg = queue.pop
assert Array === msg, "message was not an array"
assert Bunny::GetResponse === msg[0]
assert Bunny::MessageProperties === msg[1]
assert_equal "howdy", msg[2]
end
end
def test_error_starting_amqp_segment_does_not_interfere_with_transaction
NewRelic::Agent::Messaging.stubs(:start_amqp_publish_segment).raises(StandardError.new("Boo"))
with_queue do |queue|
in_transaction "test_txn" do
#our instrumentation should error here, but not interfere with bunny
queue.publish("test_msg")
#this segment should be fine
segment = NewRelic::Agent::Tracer.start_segment name: "Custom/blah/method"
segment.finish if segment
end
assert_metrics_recorded ["Custom/blah/method"]
refute_metrics_recorded [
"MessageBroker/RabbitMQ/Exchange/Produce/Named/Default"
]
end
end
def test_transaction_implicitly_created_for_consume
lock = Mutex.new
cond = ConditionVariable.new
msg = nil
exchange = @chan.direct('myDirectExchange')
with_config :'attributes.include' => ['message.exchangeType'] do
with_queue do |queue|
queue.bind(exchange, routing_key: 'some.key')
queue.subscribe(:block => false) do |delivery_info, properties, payload|
lock.synchronize do
msg = payload
cond.signal
end
end
lock.synchronize do
exchange.publish "hi", routing_key: 'some.key'
cond.wait(lock)
end
# Even with the condition variable above there is a race condition between
# when the subscribe block finishes and when the transaction is committed.
# This gross code below is here to account for that. Also, we don't
# ever expect to hit the max number of cycles, but we are being defensive so
# that this test doesn't block indefinitely if something unexpected occurs.
cycles = 0
until (tt = last_transaction_trace) || cycles > 10
sleep 0.1
cycles += 1
end
assert_equal "hi", msg
refute_nil tt, "Did not expect tt to be nil. Something terrible has occurred."
expected_destinations = NewRelic::Agent::AttributeFilter::DST_TRANSACTION_TRACER |
NewRelic::Agent::AttributeFilter::DST_TRANSACTION_EVENTS |
NewRelic::Agent::AttributeFilter::DST_ERROR_COLLECTOR
assert_equal({ :"message.routingKey" => "some.key",
:"message.queueName" => queue.name,
:"message.exchangeType" => :direct,
},
tt.attributes.agent_attributes_for(expected_destinations))
# metrics
assert_metrics_recorded ["OtherTransaction/Message/RabbitMQ/Exchange/Named/myDirectExchange"]
end
end
end
def test_metrics_recorded_for_purge_with_server_named_queue
with_queue do |queue|
in_transaction "test_txn" do
queue.publish "test"
queue.purge
end
assert_metrics_recorded "MessageBroker/RabbitMQ/Queue/Purge/Temp"
end
end
def test_metrics_recorded_for_purge_with_named_queue
with_queue false do |queue|
in_transaction "test_txn" do
queue.publish "test"
queue.purge
end
assert_metrics_recorded "MessageBroker/RabbitMQ/Queue/Purge/Named/#{queue.name}"
end
end
def test_noticed_error_at_segment_and_txn_on_error
txn = nil
begin
with_queue false do |queue|
Bunny::Channel.any_instance.stubs("basic_get").raises(Timeout::Error)
in_transaction do |msg_txn|
txn = msg_txn
queue.publish "test"
queue.pop
end
end
rescue StandardError => e
# NOP -- allowing span and transaction to notice error
end
wait_until_not_nil(5) { txn }
assert_segment_noticed_error txn, /^MessageBroker\/RabbitMQ/, "Timeout::Error", /timeout/i
assert_transaction_noticed_error txn, "Timeout::Error"
end
def test_noticed_error_only_at_segment_on_error
txn = nil
with_queue false do |queue|
Bunny::Channel.any_instance.stubs("basic_get").raises(Timeout::Error)
in_transaction do |msg_txn|
begin
txn = msg_txn
queue.publish "test"
queue.pop
rescue StandardError => e
# NOP -- allowing ONLY span to notice error
end
end
end
assert_segment_noticed_error txn, /^MessageBroker\/RabbitMQ/, "Timeout::Error", /timeout/i
refute_transaction_noticed_error txn, "Timeout::Error"
end
def test_error_starting_message_broker_segment_does_not_interfere_with_transaction
with_queue do |queue|
NewRelic::Agent::Tracer.stubs(:start_message_broker_segment).raises(StandardError.new("Boo"))
in_transaction "test_txn" do
# This should error
queue.publish "test_msg"
#this segment should be fine
segment = NewRelic::Agent::Tracer.start_segment name: "Custom/blah/method"
segment.finish
end
msg = queue.pop
assert_equal "test_msg", msg[2]
assert_metrics_recorded ["Custom/blah/method"]
refute_metrics_recorded ["MessageBroker/RabbitMQ/Exchange/Produce/Named/Default"]
end
end
def test_pop_returning_no_message_doesnt_error
NewRelic::Agent.stubs(:logger).returns(NewRelic::Agent::MemoryLogger.new)
with_queue do |queue|
in_transaction "test_txn" do
queue.pop
end
assert_empty NewRelic::Agent.logger.messages
end
end
def test_pop_returning_a_good_message_send_to_an_exchange_we_havent_accessed_doesnt_error
NewRelic::Agent.stubs(:logger).returns(NewRelic::Agent::MemoryLogger.new)
with_queue do |queue|
# publish in such a way that the exchange object does not end up in channel.exchanges
channel = queue.channel
channel.basic_publish("test_msg", "", queue.name)
assert_empty channel.exchanges
in_transaction "test_txn" do
msg = queue.pop
assert_equal "test_msg", msg[2]
end
assert_empty NewRelic::Agent.logger.messages
assert_metrics_recorded ["MessageBroker/RabbitMQ/Exchange/Consume/Named/Default"]
end
end
def with_queue temp=true, exclusive=true, &block
queue_name = temp ? "" : random_string
queue = @chan.queue(queue_name, exclusive: exclusive)
yield queue if block_given?
@chan.queue(queue.name).purge
end
def random_string
Time.now.to_s
end
end