Skip to content

Commit

Permalink
feat(tracer): add remote configuration support
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle-Verhoog committed Nov 5, 2023
1 parent c1b8800 commit ec59296
Show file tree
Hide file tree
Showing 8 changed files with 250 additions and 7 deletions.
23 changes: 23 additions & 0 deletions ddtrace/__init__.py
Expand Up @@ -45,6 +45,29 @@

# a global tracer instance with integration settings
tracer = Tracer()
config._subscribe(["logs_injection", "_trace_sample_rate"], tracer._on_global_config_update)

from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.remoteconfig._pubsub import PubSub
from ddtrace.internal.remoteconfig._pubsub import RemoteConfigSubscriber
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisher
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector


remoteconfig_poller.enable()


class _GlobalConfigPubSub(PubSub):
__publisher_class__ = RemoteConfigPublisher
__subscriber_class__ = RemoteConfigSubscriber
__shared_data__ = PublisherSubscriberConnector()

def __init__(self, callback):
self._publisher = self.__publisher_class__(self.__shared_data__, None)
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, "GlobalConfig")


remoteconfig_poller.register("APM_TRACING", _GlobalConfigPubSub(callback=config._handle_remoteconfig))

__all__ = [
"patch",
Expand Down
7 changes: 7 additions & 0 deletions ddtrace/appsec/_capabilities.py
Expand Up @@ -29,6 +29,10 @@ class Flags(enum.IntFlag):
ASM_CUSTOM_BLOCKING_RESPONSE = 1 << 9
ASM_TRUSTED_IPS = 1 << 10
ASM_API_SECURITY_SAMPLE_RATE = 1 << 11
APM_TRACING_SAMPLE_RATE = 1 << 12
APM_TRACING_LOGS_INJECTION = 1 << 13
APM_TRACING_HTTP_HEADER_TAGS = 1 << 14
APM_TRACING_CUSTOM_TAGS = 1 << 15


_ALL_ASM_BLOCKING = (
Expand All @@ -54,6 +58,9 @@ def _appsec_rc_flags(test_tracer: Optional[ddtrace.Tracer] = None) -> Flags:
value |= _ALL_ASM_BLOCKING
if asm_config._api_security_enabled:
value |= Flags.ASM_API_SECURITY_SAMPLE_RATE
value |= Flags.APM_TRACING_LOGS_INJECTION
value |= Flags.APM_TRACING_SAMPLE_RATE
value |= Flags.APM_TRACING_HTTP_HEADER_TAGS
return value


Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/remoteconfig/client.py
Expand Up @@ -438,7 +438,7 @@ def _load_new_configurations(self, list_callbacks, applied_configs, client_confi
self._apply_callback(list_callbacks, callback, config_content, target, config)
except Exception:
error_message = "Failed to apply configuration %s for product %r" % (config, config.product_name)
log.debug(error_message, exc_info=True)
log.warning(error_message, exc_info=True)
config.apply_state = 3 # Error state
config.apply_error = error_message
applied_configs[target] = config
Expand Down
1 change: 1 addition & 0 deletions ddtrace/internal/remoteconfig/worker.py
Expand Up @@ -58,6 +58,7 @@ def _agent_check(self):

def _online(self):
# type: () -> None

with StopWatch() as sw:
if not self._client.request():
# An error occurred, so we transition back to the agent check
Expand Down
81 changes: 76 additions & 5 deletions ddtrace/settings/config.py
Expand Up @@ -28,6 +28,7 @@
from ..internal.constants import PROPAGATION_STYLE_B3_SINGLE
from ..internal.logger import get_logger
from ..internal.schema import DEFAULT_SPAN_SERVICE_NAME
from ..internal.utils.http import normalize_header_name
from ..internal.utils.formats import asbool
from ..internal.utils.formats import parse_tags_str
from ..pin import Pin
Expand Down Expand Up @@ -191,48 +192,71 @@ def get_error_ranges(error_range_str):
return error_ranges # type: ignore[return-value]


ConfigSource = Literal["default", "env", "code"]
_ConfigSource = Literal["default", "env", "code", "remote_config"]


class _ConfigItem:
"""Configuration item that tracks the value of a setting, and where it came from."""

def __init__(self, name, default, envs):
# type: (str, Any, List[Tuple[str, Callable[[], Any]]]) -> None
self._name = name
self._default_value = default
self._env_value = None
self._code_value = None
self._rc_value = None
self._envs = envs
for (env_var, parser) in envs:
if env_var in os.environ:
self._env_value = parser(os.environ[env_var])
break

def set_value_source(self, value, source):
if source == "code":
self._code_value = value
elif source == "remote_config":
self._rc_value = value

def set_code(self, value):
# type: (Any) -> None
self._code_value = value

def unset_rc(self):
# type: () -> None
self._rc_value = None

def set_rc(self, value):
# type: (Any) -> None
self._rc_value = value

def value(self):
# type: () -> Any
if self._rc_value is not None:
return self._rc_value
if self._code_value is not None:
return self._code_value
if self._env_value is not None:
return self._env_value
return self._default_value

def source(self):
# type: () -> ConfigSource
# type: () -> _ConfigSource
if self._rc_value is not None:
return "remote_config"
if self._code_value is not None:
return "code"
if self._env_value is not None:
return "env"
return "default"

def __repr__(self):
return "<{} name={} default={} env_value={} user_value={}>".format(
return "<{} name={} default={} env_value={} user_value={} remote_config_value={}>".format(
self.__class__.__name__,
self._name,
self._default_value,
self._env_value,
self._code_value,
self._rc_value,
)


Expand Down Expand Up @@ -607,16 +631,63 @@ def __setattr__(self, key, value):
if key == "_config":
return super(self.__class__, self).__setattr__(key, value)
elif key in self._config:
self._config[key].set_code(value)
self._notify_subscribers([key])
self._set_config_items([(key, value, "code")])
return None
else:
return super(self.__class__, self).__setattr__(key, value)

def _set_config_items(self, items):
# type: (List[Tuple[str, Any, _ConfigSource]]) -> None
for (key, value, origin) in items:
self._config[key].set_value_source(value, origin)

from ..internal.telemetry import telemetry_writer

telemetry_writer.add_event(
{
"configuration": [
{
"name": k,
"value": self._config[k].value(),
"origin": self._get_source(k),
}
for k, _, _ in items
],
},
"app-client-configuration-change",
)
self._notify_subscribers([i[0] for i in items])

def _reset(self):
# type: () -> None
self._config = _default_config()

def _get_source(self, item):
# type: (str) -> str
return self._config[item].source()

def _handle_remoteconfig(self, data, test_tracer=None):
# type: (Any, Any) -> None
if not data:
for item in self._config.values():
item.unset_rc()
return

config = data["config"][0]
if "lib_config" not in config:
log.warning("unexpected RC payload")
return

lib_config = config["lib_config"]
updated_items = [] # type: List[Tuple[str, Any]]

if "tracing_sampling_rate" in lib_config:
updated_items.append(("_trace_sample_rate", lib_config["tracing_sampling_rate"]))
if "tracing_header_tags" in lib_config:
rc_tags = lib_config["tracing_header_tags"]
tags = None if rc_tags is None else {normalize_header_name(e["header"]): e["tag_name"] for e in rc_tags}
updated_items.append(("trace_http_header_tags", tags))
if "log_injection_enabled" in lib_config:
updated_items.append(("logs_injection", lib_config["log_injection_enabled"]))

self._set_config_items([(k, v, "remote_config") for k, v in updated_items])
26 changes: 26 additions & 0 deletions ddtrace/tracer.py
Expand Up @@ -228,6 +228,7 @@ def __init__(

self.enabled = config._tracing_enabled
self.context_provider = context_provider or DefaultContextProvider()
self._user_sampler = None # type: Optional[BaseSampler]
self._sampler = DatadogSampler() # type: BaseSampler
self._dogstatsd_url = agent.get_stats_url() if dogstatsd_url is None else dogstatsd_url
self._compute_stats = config._trace_compute_stats
Expand Down Expand Up @@ -416,6 +417,7 @@ def configure(
if iast_enabled is not None:
self._iast_enabled = asm_config._iast_enabled = iast_enabled

self._user_sampler = sampler
if sampler is not None:
self._sampler = sampler

Expand Down Expand Up @@ -1053,3 +1055,27 @@ def _use_sync_mode():
@staticmethod
def _is_span_internal(span):
return not span.span_type or span.span_type in _INTERNAL_APPLICATION_SPAN_TYPES

def _on_global_config_update(self, config, items):
if "logs_injection" in items:
if config.logs_injection:
from ddtrace.contrib.logging import patch

patch()
else:
from ddtrace.contrib.logging import unpatch

unpatch()

if "_trace_sample_rate" in items:
if config._get_source("_trace_sample_rate") != "remote_config" and self._user_sampler:
self.configure(sampler=self._user_sampler)
return

sample_rate = None
if config._get_source("_trace_sample_rate") != "default":
sample_rate = config._trace_sample_rate
# Since the API allows users to set create and set sampler objects
# rather than just a rate, save the previously set sampler object
# self.configure(sampler=self._sampler)
self.configure(sampler=DatadogSampler(default_sample_rate=sample_rate))
2 changes: 1 addition & 1 deletion riotfile.py
Expand Up @@ -251,7 +251,7 @@ def select_pys(min_version=MIN_PYTHON_VERSION, max_version=MAX_PYTHON_VERSION):
# Enabling coverage for integration tests breaks certain tests in CI
# Also, running two separate pytest sessions, the ``civisibility`` one with --no-ddtrace
command="pytest --no-ddtrace --no-cov --ignore-glob='*civisibility*' {cmdargs} tests/integration/ && pytest --no-cov --no-ddtrace {cmdargs} tests/integration/test_integration_civisibility.py", # noqa: E501
pkgs={"msgpack": [latest], "coverage": latest},
pkgs={"msgpack": [latest], "coverage": latest, "python-json-logger": "==2.0.7"},
venvs=[
Venv(
name="integration-latest",
Expand Down

0 comments on commit ec59296

Please sign in to comment.