-
Notifications
You must be signed in to change notification settings - Fork 369
/
subscribers.rb
112 lines (94 loc) · 4.23 KB
/
subscribers.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
require 'ddtrace/contrib/analytics'
require 'ddtrace/contrib/mongodb/ext'
require 'ddtrace/contrib/mongodb/parsers'
require 'ddtrace/ext/integration'
module Datadog
module Contrib
module MongoDB
# `MongoCommandSubscriber` listens to all events from the `Monitoring`
# system available in the Mongo driver.
class MongoCommandSubscriber
def started(event)
pin = Datadog::Pin.get_from(event.address)
return unless pin && pin.enabled?
# start a trace and store it in the current thread; using the `operation_id`
# is safe since it's a unique id used to link events together. Also only one
# thread is involved in this execution so thread-local storage should be safe. Reference:
# https://github.com/mongodb/mongo-ruby-driver/blob/master/lib/mongo/monitoring.rb#L70
# https://github.com/mongodb/mongo-ruby-driver/blob/master/lib/mongo/monitoring/publishable.rb#L38-L56
span = pin.tracer.trace(Ext::SPAN_COMMAND, service: pin.service, span_type: Ext::SPAN_TYPE_COMMAND)
set_span(event, span)
# build a quantized Query using the Parser module
query = MongoDB.query_builder(event.command_name, event.database_name, event.command)
serialized_query = query.to_s
# Tag as an external peer service
span.set_tag(Datadog::Ext::Integration::TAG_PEER_SERVICE, span.service)
# Set analytics sample rate
if analytics_enabled?
Contrib::Analytics.set_sample_rate(span, analytics_sample_rate)
end
# add operation tags; the full query is stored and used as a resource,
# since it has been quantized and reduced
span.set_tag(Ext::TAG_DB, query['database'])
span.set_tag(Ext::TAG_COLLECTION, query['collection'])
span.set_tag(Ext::TAG_OPERATION, query['operation'])
span.set_tag(Ext::TAG_QUERY, serialized_query)
span.set_tag(Datadog::Ext::NET::TARGET_HOST, event.address.host)
span.set_tag(Datadog::Ext::NET::TARGET_PORT, event.address.port)
# set the resource with the quantized query
span.resource = serialized_query
end
def failed(event)
span = get_span(event)
return unless span
# the failure is not a real exception because it's handled by
# the framework itself, so we set only the error and the message
span.set_error(event)
rescue StandardError => e
Datadog.logger.debug("error when handling MongoDB 'failed' event: #{e}")
ensure
# whatever happens, the Span must be removed from the local storage and
# it must be finished to prevent any leak
span.finish unless span.nil?
clear_span(event)
end
def succeeded(event)
span = get_span(event)
return unless span
# add fields that are available only after executing the query
rows = event.reply.fetch('n', nil)
span.set_tag(Ext::TAG_ROWS, rows) unless rows.nil?
rescue StandardError => e
Datadog.logger.debug("error when handling MongoDB 'succeeded' event: #{e}")
ensure
# whatever happens, the Span must be removed from the local storage and
# it must be finished to prevent any leak
span.finish unless span.nil?
clear_span(event)
end
private
def get_span(event)
Thread.current[:datadog_mongo_span] \
&& Thread.current[:datadog_mongo_span][event.request_id]
end
def set_span(event, span)
Thread.current[:datadog_mongo_span] ||= {}
Thread.current[:datadog_mongo_span][event.request_id] = span
end
def clear_span(event)
return if Thread.current[:datadog_mongo_span].nil?
Thread.current[:datadog_mongo_span].delete(event.request_id)
end
def analytics_enabled?
Contrib::Analytics.enabled?(datadog_configuration[:analytics_enabled])
end
def analytics_sample_rate
datadog_configuration[:analytics_sample_rate]
end
def datadog_configuration
Datadog.configuration[:mongo]
end
end
end
end
end