forked from elastic/apm-agent-python
/
traces.py
1277 lines (1135 loc) · 49.8 KB
/
traces.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# BSD 3-Clause License
#
# Copyright (c) 2019, Elasticsearch BV
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import functools
import random
import re
import threading
import time
import timeit
import urllib.parse
import warnings
from collections import defaultdict
from datetime import timedelta
from types import TracebackType
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, TypeVar, Union
import elasticapm
from elasticapm.conf import constants
from elasticapm.conf.constants import LABEL_RE, SPAN, TRANSACTION
from elasticapm.context import init_execution_context
from elasticapm.metrics.base_metrics import Timer
from elasticapm.utils import encoding, get_name_from_func, nested_key, url_to_destination_resource
from elasticapm.utils.disttracing import TraceParent
from elasticapm.utils.logging import get_logger
from elasticapm.utils.time import time_to_perf_counter
__all__ = ("capture_span", "label", "set_transaction_name", "set_custom_context", "set_user_context")
error_logger = get_logger("elasticapm.errors")
logger = get_logger("elasticapm.traces")
_time_func = timeit.default_timer
execution_context = init_execution_context()
SpanType = Union["Span", "DroppedSpan"]
_AnnotatedFunctionT = TypeVar("_AnnotatedFunctionT")
class ChildDuration(object):
__slots__ = ("obj", "_nesting_level", "_start", "_duration", "_lock")
def __init__(self, obj: "BaseSpan"):
self.obj = obj
self._nesting_level: int = 0
self._start: float = 0
self._duration: timedelta = timedelta(seconds=0)
self._lock = threading.Lock()
def start(self, timestamp: float):
with self._lock:
self._nesting_level += 1
if self._nesting_level == 1:
self._start = timestamp
def stop(self, timestamp: float):
with self._lock:
self._nesting_level -= 1
if self._nesting_level == 0:
self._duration += timedelta(seconds=timestamp - self._start)
@property
def duration(self) -> timedelta:
return self._duration
class BaseSpan(object):
def __init__(self, labels=None, start=None, links: Optional[Sequence[TraceParent]] = None):
self._child_durations = ChildDuration(self)
self.labels = {}
self.outcome: Optional[str] = None
self.compression_buffer: Optional[Union[Span, DroppedSpan]] = None
self.compression_buffer_lock = threading.Lock()
self.start_time: float = time_to_perf_counter(start) if start is not None else _time_func()
self.ended_time: Optional[float] = None
self.duration: Optional[timedelta] = None
self.links: Optional[List[Dict[str, str]]] = None
if links:
for trace_parent in links:
self.add_link(trace_parent)
if labels:
self.label(**labels)
def child_started(self, timestamp):
self._child_durations.start(timestamp)
def child_ended(self, child: SpanType):
with self.compression_buffer_lock:
if not child.is_compression_eligible():
if self.compression_buffer:
self.compression_buffer.report()
self.compression_buffer = None
child.report()
elif self.compression_buffer is None:
self.compression_buffer = child
elif not self.compression_buffer.try_to_compress(child):
self.compression_buffer.report()
self.compression_buffer = child
def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None):
self.ended_time = _time_func()
self.duration = duration if duration is not None else timedelta(seconds=self.ended_time - self.start_time)
if self.compression_buffer:
self.compression_buffer.report()
self.compression_buffer = None
def to_dict(self) -> dict:
raise NotImplementedError()
def label(self, **labels):
"""
Label this span with one or multiple key/value labels. Keys should be strings, values can be strings, booleans,
or numerical values (int, float, Decimal)
span_obj.label(key1="value1", key2=True, key3=42)
Note that keys will be dedotted, replacing dot (.), star (*) and double quote (") with an underscore (_)
:param labels: key/value pairs of labels
:return: None
"""
labels = encoding.enforce_label_format(labels)
self.labels.update(labels)
def add_link(self, trace_parent: TraceParent) -> None:
"""
Causally link this span/transaction to another span/transaction
"""
if self.links is None:
self.links = []
self.links.append({"trace_id": trace_parent.trace_id, "span_id": trace_parent.span_id})
def set_success(self):
self.outcome = constants.OUTCOME.SUCCESS
def set_failure(self):
self.outcome = constants.OUTCOME.FAILURE
@staticmethod
def get_dist_tracing_id() -> str:
return "%016x" % random.getrandbits(64)
@property
def tracer(self) -> "Tracer":
raise NotImplementedError()
class Transaction(BaseSpan):
def __init__(
self,
tracer: "Tracer",
transaction_type: str = "custom",
trace_parent: Optional[TraceParent] = None,
is_sampled: bool = True,
start: Optional[float] = None,
sample_rate: Optional[float] = None,
links: Optional[Sequence[TraceParent]] = None,
):
"""
tracer
Tracer object
transaction_type
Transaction type
trace_parent
TraceParent object representing the parent trace and trace state
is_sampled
Whether or not this transaction is sampled
start
Optional start timestamp. This is expected to be an epoch timestamp
in seconds (such as from `time.time()`). If it is not, it's recommended
that a `duration` is passed into the `end()` method.
sample_rate
Sample rate which was used to decide whether to sample this transaction.
This is reported to the APM server so that unsampled transactions can
be extrapolated.
links:
A list of traceparents to link this transaction causally
"""
self.id = self.get_dist_tracing_id()
if not trace_parent:
trace_parent = TraceParent.new(self.id, is_sampled)
self.trace_parent: TraceParent = trace_parent
self.timestamp = start if start is not None else time.time()
self.name: Optional[str] = None
self.result: Optional[str] = None
self.transaction_type = transaction_type
self._tracer = tracer
# The otel bridge uses Transactions/Spans interchangeably -- storing
# a reference to the Transaction in the Transaction simplifies things.
self.transaction = self
self.config_span_compression_enabled = tracer.config.span_compression_enabled
self.config_span_compression_exact_match_max_duration = tracer.config.span_compression_exact_match_max_duration
self.config_span_compression_same_kind_max_duration = tracer.config.span_compression_same_kind_max_duration
self.config_exit_span_min_duration = tracer.config.exit_span_min_duration
self.config_transaction_max_spans = tracer.config.transaction_max_spans
self.dropped_spans: int = 0
self.context: Dict[str, Any] = {}
self._is_sampled = is_sampled
self.sample_rate = sample_rate
self._span_counter: int = 0
self._span_timers: Dict[Tuple[str, str], Timer] = defaultdict(Timer)
self._span_timers_lock = threading.Lock()
self._dropped_span_statistics = defaultdict(lambda: {"count": 0, "duration.sum.us": 0})
try:
self._breakdown = self.tracer._agent._metrics.get_metricset(
"elasticapm.metrics.sets.breakdown.BreakdownMetricSet"
)
except (LookupError, AttributeError):
self._breakdown = None
super().__init__(start=start)
if links:
for trace_parent in links:
self.add_link(trace_parent)
def end(self, skip_frames: int = 0, duration: Optional[timedelta] = None):
super().end(skip_frames, duration)
if self._breakdown:
for (span_type, span_subtype), timer in self._span_timers.items():
labels = {
"span.type": span_type,
"transaction.name": self.name,
"transaction.type": self.transaction_type,
}
if span_subtype:
labels["span.subtype"] = span_subtype
val = timer.val
self._breakdown.timer("span.self_time", reset_on_collect=True, unit="us", **labels).update(
val[0], val[1]
)
if self.is_sampled:
self._breakdown.timer(
"span.self_time",
reset_on_collect=True,
unit="us",
**{"span.type": "app", "transaction.name": self.name, "transaction.type": self.transaction_type},
).update((self.duration - self._child_durations.duration).total_seconds() * 1_000_000)
def _begin_span(
self,
name,
span_type,
context=None,
leaf=False,
labels=None,
parent_span_id=None,
span_subtype=None,
span_action=None,
sync=None,
start=None,
auto_activate=True,
links: Optional[Sequence[TraceParent]] = None,
):
parent_span = execution_context.get_span()
tracer = self.tracer
if parent_span and parent_span.leaf:
span = DroppedSpan(parent_span, leaf=True)
elif self.config_transaction_max_spans and self._span_counter > self.config_transaction_max_spans - 1:
self.dropped_spans += 1
span = DroppedSpan(parent_span, context=context)
else:
span = Span(
transaction=self,
name=name,
span_type=span_type or "code.custom",
context=context,
leaf=leaf,
labels=labels,
parent=parent_span,
parent_span_id=parent_span_id,
span_subtype=span_subtype,
span_action=span_action,
sync=sync,
start=start,
links=links,
)
span.frames = tracer.frames_collector_func()
self._span_counter += 1
if auto_activate:
execution_context.set_span(span)
return span
def begin_span(
self,
name,
span_type,
context=None,
leaf=False,
labels=None,
span_subtype=None,
span_action=None,
sync=None,
start=None,
auto_activate=True,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Begin a new span
:param name: name of the span
:param span_type: type of the span
:param context: a context dict
:param leaf: True if this is a leaf span
:param labels: a flat string/string dict of labels
:param span_subtype: sub type of the span, e.g. "postgresql"
:param span_action: action of the span , e.g. "query"
:param sync: indicate if the span is synchronous or not. In most cases, `None` should be used
:param start: timestamp, mostly useful for testing
:param auto_activate: whether to set this span in execution_context
:param links: an optional list of traceparents to link this span with
:return: the Span object
"""
return self._begin_span(
name,
span_type,
context=context,
leaf=leaf,
labels=labels,
parent_span_id=None,
span_subtype=span_subtype,
span_action=span_action,
sync=sync,
start=start,
auto_activate=auto_activate,
links=links,
)
def end_span(self, skip_frames: int = 0, duration: Optional[float] = None, outcome: str = "unknown"):
"""
End the currently active span
:param skip_frames: numbers of frames to skip in the stack trace
:param duration: override duration, mostly useful for testing
:param outcome: outcome of the span, either success, failure or unknown
:return: the ended span
"""
span = execution_context.get_span()
if span is None:
raise LookupError()
# only overwrite span outcome if it is still unknown
if not span.outcome or span.outcome == "unknown":
span.outcome = outcome
span.end(skip_frames=skip_frames, duration=duration)
return span
def ensure_parent_id(self) -> str:
"""If current trace_parent has no span_id, generate one, then return it
This is used to generate a span ID which the RUM agent will use to correlate
the RUM transaction with the backend transaction.
"""
if self.trace_parent.span_id == self.id:
self.trace_parent.span_id = "%016x" % random.getrandbits(64)
logger.debug("Set parent id to generated %s", self.trace_parent.span_id)
return self.trace_parent.span_id
def to_dict(self) -> dict:
self.context["tags"] = self.labels
result = {
"id": self.id,
"trace_id": self.trace_parent.trace_id,
"name": encoding.keyword_field(self.name or ""),
"type": encoding.keyword_field(self.transaction_type),
"duration": self.duration.total_seconds() * 1000,
"result": encoding.keyword_field(str(self.result)),
"timestamp": int(self.timestamp * 1_000_000), # microseconds
"outcome": self.outcome,
"sampled": self.is_sampled,
"span_count": {"started": self._span_counter, "dropped": self.dropped_spans},
}
if self._dropped_span_statistics:
result["dropped_spans_stats"] = [
{
"destination_service_resource": resource,
"service_target_type": target_type,
"service_target_name": target_name,
"outcome": outcome,
"duration": {"count": v["count"], "sum": {"us": int(v["duration.sum.us"])}},
}
for (resource, outcome, target_type, target_name), v in self._dropped_span_statistics.items()
]
if self.sample_rate is not None:
result["sample_rate"] = float(self.sample_rate)
if self.trace_parent:
result["trace_id"] = self.trace_parent.trace_id
# only set parent_id if this transaction isn't the root
if self.trace_parent.span_id and self.trace_parent.span_id != self.id:
result["parent_id"] = self.trace_parent.span_id
if self.links:
result["links"] = self.links
# faas context belongs top-level on the transaction
if "faas" in self.context:
result["faas"] = self.context.pop("faas")
# otel attributes and spankind need to be top-level
if "otel_spankind" in self.context:
result["otel"] = {"span_kind": self.context.pop("otel_spankind")}
# Some transaction_store_tests use the Tracer without a Client -- the
# extra check against `get_client()` is here to make those tests pass
if elasticapm.get_client() and elasticapm.get_client().check_server_version(gte=(7, 16)):
if "otel_attributes" in self.context:
if "otel" not in result:
result["otel"] = {"attributes": self.context.pop("otel_attributes")}
else:
result["otel"]["attributes"] = self.context.pop("otel_attributes")
else:
# Attributes map to labels for older versions
attributes = self.context.pop("otel_attributes", {})
for key, value in attributes.items():
result["context"]["tags"][key] = value
if self.is_sampled:
result["context"] = self.context
return result
def track_span_duration(self, span_type, span_subtype, self_duration):
# TODO: once asynchronous spans are supported, we should check if the transaction is already finished
# TODO: and, if it has, exit without tracking.
with self._span_timers_lock:
self._span_timers[(span_type, span_subtype)].update(self_duration.total_seconds() * 1_000_000)
@property
def is_sampled(self) -> bool:
return self._is_sampled
@is_sampled.setter
def is_sampled(self, is_sampled):
"""
This should never be called in normal operation, but often is used
for testing. We just want to make sure our sample_rate comes out correctly
in tracestate if we set is_sampled to False.
"""
self._is_sampled = is_sampled
if not is_sampled:
if self.sample_rate:
self.sample_rate = "0"
self.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, self.sample_rate)
@property
def tracer(self) -> "Tracer":
return self._tracer
def track_dropped_span(self, span: SpanType):
with self._span_timers_lock:
try:
resource = span.context["destination"]["service"]["resource"]
target_type = nested_key(span.context, "service", "target", "type")
target_name = nested_key(span.context, "service", "target", "name")
stats = self._dropped_span_statistics[(resource, span.outcome, target_type, target_name)]
stats["count"] += 1
stats["duration.sum.us"] += int(span.duration.total_seconds() * 1_000_000)
except KeyError:
pass
class Span(BaseSpan):
__slots__ = (
"id",
"transaction",
"name",
"type",
"subtype",
"action",
"context",
"leaf",
"dist_tracing_propagated",
"timestamp",
"start_time",
"ended_time",
"duration",
"parent",
"parent_span_id",
"frames",
"labels",
"sync",
"outcome",
"_child_durations",
"_cancelled",
)
def __init__(
self,
transaction: Transaction,
name: str,
span_type: str,
context: Optional[dict] = None,
leaf: bool = False,
labels: Optional[dict] = None,
parent: Optional["Span"] = None,
parent_span_id: Optional[str] = None,
span_subtype: Optional[str] = None,
span_action: Optional[str] = None,
sync: Optional[bool] = None,
start: Optional[int] = None,
links: Optional[Sequence[TraceParent]] = None,
):
"""
Create a new Span
:param transaction: transaction object that this span relates to
:param name: Generic name of the span
:param span_type: type of the span, e.g. db
:param context: context dictionary
:param leaf: is this span a leaf span?
:param labels: a dict of labels
:param parent_span_id: override of the span ID
:param span_subtype: sub type of the span, e.g. mysql
:param span_action: sub type of the span, e.g. query
:param sync: indicate if the span was executed synchronously or asynchronously
:param start: timestamp, mostly useful for testing
"""
self.id = self.get_dist_tracing_id()
self.transaction = transaction
self.name = name
self.context = context if context is not None else {}
self.leaf = leaf
# timestamp is bit of a mix of monotonic and non-monotonic time sources.
# we take the (non-monotonic) transaction timestamp, and add the (monotonic) difference of span
# start time and transaction start time. In this respect, the span timestamp is guaranteed to grow
# monotonically with respect to the transaction timestamp
self.parent = parent
self.parent_span_id = parent_span_id
self.frames = None
self.sync = sync
self.type = span_type
self.subtype = span_subtype
self.action = span_action
self.dist_tracing_propagated = False
self.composite: Dict[str, Any] = {}
self._cancelled: bool = False
super().__init__(labels=labels, start=start, links=links)
self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time)
if self.transaction._breakdown:
p = self.parent if self.parent else self.transaction
p.child_started(self.start_time)
def to_dict(self) -> dict:
if (
self.composite
and self.composite["compression_strategy"] == "same_kind"
and nested_key(self.context, "destination", "service", "resource")
):
name = "Calls to " + self.context["destination"]["service"]["resource"]
else:
name = self.name
result = {
"id": self.id,
"transaction_id": self.transaction.id,
"trace_id": self.transaction.trace_parent.trace_id,
# use either the explicitly set parent_span_id, or the id of the parent, or finally the transaction id
"parent_id": self.parent_span_id or (self.parent.id if self.parent else self.transaction.id),
"name": encoding.keyword_field(name),
"type": encoding.keyword_field(self.type),
"subtype": encoding.keyword_field(self.subtype),
"action": encoding.keyword_field(self.action),
"timestamp": int(self.timestamp * 1000000), # microseconds
"duration": self.duration.total_seconds() * 1000,
"outcome": self.outcome,
}
if self.transaction.sample_rate is not None:
result["sample_rate"] = float(self.transaction.sample_rate)
if self.sync is not None:
result["sync"] = self.sync
if self.labels:
if self.context is None:
self.context = {}
self.context["tags"] = self.labels
if self.links:
result["links"] = self.links
if self.context:
self.autofill_resource_context()
# otel attributes and spankind need to be top-level
if "otel_spankind" in self.context:
result["otel"] = {"span_kind": self.context.pop("otel_spankind")}
if elasticapm.get_client().check_server_version(gte=(7, 16)):
if "otel_attributes" in self.context:
if "otel" not in result:
result["otel"] = {"attributes": self.context.pop("otel_attributes")}
else:
result["otel"]["attributes"] = self.context.pop("otel_attributes")
else:
# Attributes map to labels for older versions
attributes = self.context.pop("otel_attributes", {})
if attributes and ("tags" not in self.context):
self.context["tags"] = {}
for key, value in attributes.items():
self.context["tags"][key] = value
result["context"] = self.context
if self.frames:
result["stacktrace"] = self.frames
if self.composite:
result["composite"] = {
"compression_strategy": self.composite["compression_strategy"],
"sum": self.composite["sum"].total_seconds() * 1000,
"count": self.composite["count"],
}
return result
def is_same_kind(self, other_span: SpanType) -> bool:
"""
For compression purposes, two spans are considered to be of the same kind if they have the same
values for type, subtype, and destination.service.resource
:param other_span: another span object
:return: bool
"""
target_type = nested_key(self.context, "service", "target", "type")
target_name = nested_key(self.context, "service", "target", "name")
return bool(
self.type == other_span.type
and self.subtype == other_span.subtype
and (target_type or target_name)
and target_type == nested_key(other_span.context, "service", "target", "type")
and target_name == nested_key(other_span.context, "service", "target", "name")
)
def is_exact_match(self, other_span: SpanType) -> bool:
"""
For compression purposes, two spans are considered to be an exact match if the have the same
name and are of the same kind.
:param other_span: another span object
:return: bool
"""
return bool(self.name == other_span.name and self.is_same_kind(other_span))
def is_compression_eligible(self) -> bool:
"""
Determine if this span is eligible for compression.
"""
if self.transaction.config_span_compression_enabled:
return self.leaf and not self.dist_tracing_propagated and self.outcome in (None, constants.OUTCOME.SUCCESS)
return False
@property
def discardable(self) -> bool:
return self.leaf and not self.dist_tracing_propagated and self.outcome == constants.OUTCOME.SUCCESS
def end(self, skip_frames: int = 0, duration: Optional[float] = None):
"""
End this span and queue it for sending.
:param skip_frames: amount of frames to skip from the beginning of the stack trace
:param duration: override duration, mostly useful for testing
:return: None
"""
self.autofill_resource_context()
self.autofill_service_target()
super().end(skip_frames, duration)
tracer = self.transaction.tracer
if (
tracer.span_stack_trace_min_duration >= timedelta(seconds=0)
and self.duration >= tracer.span_stack_trace_min_duration
and self.frames
):
self.frames = tracer.frames_processing_func(self.frames)[skip_frames:]
else:
self.frames = None
current_span = execution_context.get_span()
# Because otel can detach context without ending the span, we need to
# make sure we only unset the span if it's currently set.
if current_span is self:
execution_context.unset_span()
p = self.parent if self.parent else self.transaction
if self.transaction._breakdown:
p._child_durations.stop(self.start_time + self.duration.total_seconds())
self.transaction.track_span_duration(
self.type, self.subtype, self.duration - self._child_durations.duration
)
p.child_ended(self)
def report(self) -> None:
if self.discardable and self.duration < self.transaction.config_exit_span_min_duration:
self.transaction.track_dropped_span(self)
self.transaction.dropped_spans += 1
elif self._cancelled:
self.transaction._span_counter -= 1
else:
self.tracer.queue_func(SPAN, self.to_dict())
def try_to_compress(self, sibling: SpanType) -> bool:
compression_strategy = (
self._try_to_compress_composite(sibling) if self.composite else self._try_to_compress_regular(sibling)
)
if not compression_strategy:
return False
if not self.composite:
self.composite = {"compression_strategy": compression_strategy, "count": 1, "sum": self.duration}
self.composite["count"] += 1
self.composite["sum"] += sibling.duration
self.duration = timedelta(seconds=sibling.ended_time - self.start_time)
self.transaction._span_counter -= 1
return True
def _try_to_compress_composite(self, sibling: SpanType) -> Optional[str]:
if self.composite["compression_strategy"] == "exact_match":
return (
"exact_match"
if (
self.is_exact_match(sibling)
and sibling.duration <= self.transaction.config_span_compression_exact_match_max_duration
)
else None
)
elif self.composite["compression_strategy"] == "same_kind":
return (
"same_kind"
if (
self.is_same_kind(sibling)
and sibling.duration <= self.transaction.config_span_compression_same_kind_max_duration
)
else None
)
return None
def _try_to_compress_regular(self, sibling: SpanType) -> Optional[str]:
if not self.is_same_kind(sibling):
return None
if self.name == sibling.name:
max_duration = self.transaction.config_span_compression_exact_match_max_duration
if self.duration <= max_duration and sibling.duration <= max_duration:
return "exact_match"
return None
max_duration = self.transaction.config_span_compression_same_kind_max_duration
if self.duration <= max_duration and sibling.duration <= max_duration:
return "same_kind"
return None
def update_context(self, key, data):
"""
Update the context data for given key
:param key: the key, e.g. "db"
:param data: a dictionary
:return: None
"""
current = self.context.get(key, {})
current.update(data)
self.context[key] = current
def autofill_resource_context(self):
"""Automatically fills "resource" fields based on other fields"""
if self.context:
resource = nested_key(self.context, "destination", "service", "resource")
if not resource and (self.leaf or any(k in self.context for k in ("destination", "db", "message", "http"))):
type_info = self.subtype or self.type
instance = nested_key(self.context, "db", "instance")
queue_name = nested_key(self.context, "message", "queue", "name")
http_url = nested_key(self.context, "http", "url")
if instance:
resource = f"{type_info}/{instance}"
elif queue_name:
resource = f"{type_info}/{queue_name}"
elif http_url:
resource = url_to_destination_resource(http_url)
else:
resource = type_info
if "destination" not in self.context:
self.context["destination"] = {}
if "service" not in self.context["destination"]:
self.context["destination"]["service"] = {}
self.context["destination"]["service"]["resource"] = resource
# set fields that are deprecated, but still required by APM Server API
if "name" not in self.context["destination"]["service"]:
self.context["destination"]["service"]["name"] = ""
if "type" not in self.context["destination"]["service"]:
self.context["destination"]["service"]["type"] = ""
def autofill_service_target(self):
if self.leaf:
service_target = nested_key(self.context, "service", "target") or {}
if "type" not in service_target: # infer type from span type & subtype
# use sub-type if provided, fallback on type othewise
service_target["type"] = self.subtype or self.type
if "name" not in service_target: # infer name from span attributes
if nested_key(self.context, "db", "instance"): # database spans
service_target["name"] = self.context["db"]["instance"]
elif "message" in self.context: # messaging spans
service_target["name"] = self.context["message"]["queue"]["name"]
elif nested_key(self.context, "http", "url"): # http spans
url = self.context["http"]["url"]
parsed_url = urllib.parse.urlparse(url)
service_target["name"] = parsed_url.hostname
if parsed_url.port:
service_target["name"] += f":{parsed_url.port}"
if "service" not in self.context:
self.context["service"] = {}
self.context["service"]["target"] = service_target
elif nested_key(self.context, "service", "target"):
# non-exit spans should not have service.target.* fields
del self.context["service"]["target"]
def cancel(self) -> None:
"""
Mark span as cancelled. Cancelled spans don't count towards started spans nor dropped spans.
No checks are made to ensure that spans which already propagated distributed context are not
cancelled.
"""
self._cancelled = True
def __str__(self):
return "{}/{}/{}".format(self.name, self.type, self.subtype)
@property
def tracer(self) -> "Tracer":
return self.transaction.tracer
class DroppedSpan(BaseSpan):
__slots__ = ("leaf", "parent", "id", "context", "outcome", "dist_tracing_propagated")
def __init__(self, parent, leaf=False, start=None, context=None):
self.parent = parent
self.leaf = leaf
self.id = None
self.dist_tracing_propagated = False
self.context = context
self.outcome = constants.OUTCOME.UNKNOWN
super(DroppedSpan, self).__init__(start=start)
def end(self, skip_frames: int = 0, duration: Optional[float] = None):
super().end(skip_frames, duration)
execution_context.unset_span()
def child_started(self, timestamp):
pass
def child_ended(self, child: SpanType):
pass
def update_context(self, key, data):
pass
def report(self):
pass
def try_to_compress(self, sibling: SpanType) -> bool:
return False
def is_compression_eligible(self) -> bool:
return False
@property
def name(self):
return "DroppedSpan"
@property
def type(self):
return None
@property
def subtype(self):
return None
@property
def action(self):
return None
class Tracer(object):
def __init__(self, frames_collector_func, frames_processing_func, queue_func, config, agent: "elasticapm.Client"):
self.config = config
self.queue_func = queue_func
self.frames_processing_func = frames_processing_func
self.frames_collector_func = frames_collector_func
self._agent = agent
self._ignore_patterns = [re.compile(p) for p in config.transactions_ignore_patterns or []]
@property
def span_stack_trace_min_duration(self) -> timedelta:
if self.config.span_stack_trace_min_duration != timedelta(
seconds=0.005
) or self.config.span_frames_min_duration == timedelta(seconds=0.005):
# No need to check span_frames_min_duration
return self.config.span_stack_trace_min_duration
else:
# span_stack_trace_min_duration is default value and span_frames_min_duration is non-default.
# warn and use span_frames_min_duration
warnings.warn(
"`span_frames_min_duration` is deprecated. Please use `span_stack_trace_min_duration`.",
DeprecationWarning,
)
if self.config.span_frames_min_duration < timedelta(seconds=0):
return timedelta(seconds=0)
elif self.config.span_frames_min_duration == timedelta(seconds=0):
return timedelta(seconds=-1)
else:
return self.config.span_frames_min_duration
def begin_transaction(
self,
transaction_type: str,
trace_parent: Optional[TraceParent] = None,
start: Optional[float] = None,
auto_activate: bool = True,
links: Optional[Sequence[TraceParent]] = None,
) -> Transaction:
"""
Start a new transactions and bind it in a thread-local variable
:param transaction_type: type of the transaction, e.g. "request"
:param trace_parent: an optional TraceParent object
:param start: override the start timestamp, mostly useful for testing
:param auto_activate: whether to set this transaction in execution_context
:param links: list of traceparents to causally link this transaction to
:returns the Transaction object
"""
links = links if links else []
continuation_strategy = self.config.trace_continuation_strategy
# we restart the trace if continuation strategy is "restart", or if it is "restart_external" and our
# "es" key is not in the tracestate header. In both cases, the original TraceParent is added to trace links.
if trace_parent and continuation_strategy != constants.TRACE_CONTINUATION_STRATEGY.CONTINUE:
if continuation_strategy == constants.TRACE_CONTINUATION_STRATEGY.RESTART or (
continuation_strategy == constants.TRACE_CONTINUATION_STRATEGY.RESTART_EXTERNAL
and not trace_parent.tracestate_dict
):
links.append(trace_parent)
trace_parent = None
if trace_parent:
is_sampled = bool(trace_parent.trace_options.recorded)
sample_rate = trace_parent.tracestate_dict.get(constants.TRACESTATE.SAMPLE_RATE)
else:
is_sampled = (
self.config.transaction_sample_rate == 1.0 or self.config.transaction_sample_rate > random.random()
)
if not is_sampled:
sample_rate = "0"
else:
sample_rate = str(self.config.transaction_sample_rate)
transaction = Transaction(
self,
transaction_type,
trace_parent=trace_parent,
is_sampled=is_sampled,
start=start,
sample_rate=sample_rate,
links=links,
)
if trace_parent is None:
transaction.trace_parent.add_tracestate(constants.TRACESTATE.SAMPLE_RATE, sample_rate)
if auto_activate:
execution_context.set_transaction(transaction)
return transaction
def end_transaction(self, result=None, transaction_name=None, duration=None):
"""
End the current transaction and queue it for sending
:param result: result of the transaction, e.g. "OK" or 200
:param transaction_name: name of the transaction
:param duration: override duration, mostly useful for testing
:return:
"""
transaction = execution_context.get_transaction(clear=True)
if transaction:
if transaction.name is None:
transaction.name = str(transaction_name) if transaction_name is not None else ""
transaction.end(duration=duration)
if self._should_ignore(transaction.name):
return
if not transaction.is_sampled and self._agent.check_server_version(gte=(8, 0)):
return
if transaction.result is None:
transaction.result = result
self.queue_func(TRANSACTION, transaction.to_dict())
return transaction
def _should_ignore(self, transaction_name):