-
Notifications
You must be signed in to change notification settings - Fork 369
/
event.rb
75 lines (63 loc) · 2.64 KB
/
event.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
require 'ddtrace/contrib/analytics'
require 'ddtrace/contrib/active_support/notifications/event'
require 'ddtrace/contrib/racecar/ext'
require 'ddtrace/ext/integration'
module Datadog
module Contrib
module Racecar
# Defines basic behaviors for an ActiveRecord event.
module Event
def self.included(base)
base.send(:include, ActiveSupport::Notifications::Event)
base.send(:extend, ClassMethods)
end
# Class methods for Racecar events.
# Note, they share the same process method and before_trace method.
module ClassMethods
def subscription(*args)
super.tap do |subscription|
subscription.before_trace { ensure_clean_context! }
end
end
def span_options
{ service: configuration[:service_name] }
end
def tracer
-> { configuration[:tracer] }
end
def configuration
Datadog.configuration[:racecar]
end
def process(span, event, _id, payload)
span.service = configuration[:service_name]
span.resource = payload[:consumer_class]
# Tag as an external peer service
span.set_tag(Datadog::Ext::Integration::TAG_PEER_SERVICE, span.service)
# Set analytics sample rate
if Contrib::Analytics.enabled?(configuration[:analytics_enabled])
Contrib::Analytics.set_sample_rate(span, configuration[:analytics_sample_rate])
end
# Measure service stats
Contrib::Analytics.set_measured(span)
span.set_tag(Ext::TAG_TOPIC, payload[:topic])
span.set_tag(Ext::TAG_CONSUMER, payload[:consumer_class])
span.set_tag(Ext::TAG_PARTITION, payload[:partition])
span.set_tag(Ext::TAG_OFFSET, payload[:offset]) if payload.key?(:offset)
span.set_tag(Ext::TAG_FIRST_OFFSET, payload[:first_offset]) if payload.key?(:first_offset)
span.set_tag(Ext::TAG_MESSAGE_COUNT, payload[:message_count]) if payload.key?(:message_count)
span.set_error(payload[:exception_object]) if payload[:exception_object]
end
private
# Context objects are thread-bound.
# If Racecar re-uses threads, context from a previous trace
# could leak into the new trace. This "cleans" current context,
# preventing such a leak.
def ensure_clean_context!
return unless configuration[:tracer].call_context.current_span
configuration[:tracer].provider.context = Context.new
end
end
end
end
end
end