Skip to content

Commit

Permalink
feat(tracer): add remote configuration support
Browse files Browse the repository at this point in the history
  • Loading branch information
Kyle-Verhoog committed Nov 2, 2023
1 parent c1b8800 commit dbbbac5
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 6 deletions.
24 changes: 24 additions & 0 deletions ddtrace/__init__.py
Expand Up @@ -45,6 +45,30 @@

# a global tracer instance with integration settings
tracer = Tracer()
config._subscribe(["logs_injection"], tracer._on_global_config_update)

from ddtrace.internal.remoteconfig.worker import remoteconfig_poller

from ddtrace.internal.remoteconfig._pubsub import PubSub
from ddtrace.internal.remoteconfig._pubsub import RemoteConfigSubscriber
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisher
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector


remoteconfig_poller.enable()


class PS(PubSub):
__publisher_class__ = RemoteConfigPublisher
__subscriber_class__ = RemoteConfigSubscriber
__shared_data__ = PublisherSubscriberConnector()

def __init__(self, callback):
self._publisher = self.__publisher_class__(self.__shared_data__, None)
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, "Config")


remoteconfig_poller.register("APM_TRACING", PS(callback=config._handle_remoteconfig))

__all__ = [
"patch",
Expand Down
5 changes: 5 additions & 0 deletions ddtrace/appsec/_capabilities.py
Expand Up @@ -29,6 +29,10 @@ class Flags(enum.IntFlag):
ASM_CUSTOM_BLOCKING_RESPONSE = 1 << 9
ASM_TRUSTED_IPS = 1 << 10
ASM_API_SECURITY_SAMPLE_RATE = 1 << 11
APM_TRACING_SAMPLE_RATE = 1 << 12
APM_TRACING_LOGS_INJECTION = 1 << 13
APM_TRACING_HTTP_HEADER_TAGS = 1 << 14
APM_TRACING_CUSTOM_TAGS = 1 << 15


_ALL_ASM_BLOCKING = (
Expand All @@ -54,6 +58,7 @@ def _appsec_rc_flags(test_tracer: Optional[ddtrace.Tracer] = None) -> Flags:
value |= _ALL_ASM_BLOCKING
if asm_config._api_security_enabled:
value |= Flags.ASM_API_SECURITY_SAMPLE_RATE
value |= Flags.APM_TRACING_SAMPLE_RATE
return value


Expand Down
2 changes: 1 addition & 1 deletion ddtrace/internal/remoteconfig/client.py
Expand Up @@ -438,7 +438,7 @@ def _load_new_configurations(self, list_callbacks, applied_configs, client_confi
self._apply_callback(list_callbacks, callback, config_content, target, config)
except Exception:
error_message = "Failed to apply configuration %s for product %r" % (config, config.product_name)
log.debug(error_message, exc_info=True)
log.warning(error_message, exc_info=True)
config.apply_state = 3 # Error state
config.apply_error = error_message
applied_configs[target] = config
Expand Down
1 change: 1 addition & 0 deletions ddtrace/internal/remoteconfig/worker.py
Expand Up @@ -58,6 +58,7 @@ def _agent_check(self):

def _online(self):
# type: () -> None

with StopWatch() as sw:
if not self._client.request():
# An error occurred, so we transition back to the agent check
Expand Down
81 changes: 76 additions & 5 deletions ddtrace/settings/config.py
Expand Up @@ -28,6 +28,7 @@
from ..internal.constants import PROPAGATION_STYLE_B3_SINGLE
from ..internal.logger import get_logger
from ..internal.schema import DEFAULT_SPAN_SERVICE_NAME
from ..internal.utils.http import normalize_header_name
from ..internal.utils.formats import asbool
from ..internal.utils.formats import parse_tags_str
from ..pin import Pin
Expand Down Expand Up @@ -191,48 +192,71 @@ def get_error_ranges(error_range_str):
return error_ranges # type: ignore[return-value]


ConfigSource = Literal["default", "env", "code"]
_ConfigSource = Literal["default", "env", "code", "remote_config"]


class _ConfigItem:
"""Configuration item that tracks the value of a setting, and where it came from."""

def __init__(self, name, default, envs):
# type: (str, Any, List[Tuple[str, Callable[[], Any]]]) -> None
self._name = name
self._default_value = default
self._env_value = None
self._code_value = None
self._rc_value = None
self._envs = envs
for (env_var, parser) in envs:
if env_var in os.environ:
self._env_value = parser(os.environ[env_var])
break

def set_value_source(self, value, source):
if source == "code":
self._code_value = value
elif source == "remote_config":
self._rc_value = value

def set_code(self, value):
# type: (Any) -> None
self._code_value = value

def unset_rc(self):
# type: () -> None
self._rc_value = None

def set_rc(self, value):
# type: (Any) -> None
self._rc_value = value

def value(self):
# type: () -> Any
if self._rc_value is not None:
return self._rc_value
if self._code_value is not None:
return self._code_value
if self._env_value is not None:
return self._env_value
return self._default_value

def source(self):
# type: () -> ConfigSource
# type: () -> _ConfigSource
if self._rc_value is not None:
return "remote_config"
if self._code_value is not None:
return "code"
if self._env_value is not None:
return "env"
return "default"

def __repr__(self):
return "<{} name={} default={} env_value={} user_value={}>".format(
return "<{} name={} default={} env_value={} user_value={} remote_config_value={}>".format(
self.__class__.__name__,
self._name,
self._default_value,
self._env_value,
self._code_value,
self._rc_value,
)


Expand Down Expand Up @@ -607,16 +631,63 @@ def __setattr__(self, key, value):
if key == "_config":
return super(self.__class__, self).__setattr__(key, value)
elif key in self._config:
self._config[key].set_code(value)
self._notify_subscribers([key])
self._set_config_items([(key, value, "code")])
return None
else:
return super(self.__class__, self).__setattr__(key, value)

def _set_config_items(self, items):
# type: (List[Tuple[str, Any, _ConfigSource]]) -> None
for (key, value, origin) in items:
self._config[key].set_value_source(value, origin)

from ..internal.telemetry import telemetry_writer

telemetry_writer.add_event(
{
"configuration": [
{
"name": k,
"value": self._config[k].value(),
"origin": self._get_source(k),
}
for k, _, _ in items
],
},
"app-client-configuration-change",
)
self._notify_subscribers([i[0] for i in items])

def _reset(self):
# type: () -> None
self._config = _default_config()

def _get_source(self, item):
# type: (str) -> str
return self._config[item].source()

def _handle_remoteconfig(self, data, test_tracer=None):
# type: (Any, Any) -> None
if not data:
for item in self._config.values():
item.unset_rc()
return

config = data["config"][0]
if "lib_config" not in config:
log.warning("unexpected RC payload")
return

lib_config = config["lib_config"]
updated_items = [] # type: List[Tuple[str, Any]]

if "tracing_sampling_rate" in lib_config:
updated_items.append(("_trace_sample_rate", lib_config["tracing_sampling_rate"]))
if "tracing_header_tags" in lib_config:
rc_tags = lib_config["tracing_header_tags"]
tags = None if rc_tags is None else {normalize_header_name(e["header"]): e["tag_name"] for e in rc_tags}
updated_items.append(("trace_http_header_tags", tags))
if "log_injection_enabled" in lib_config:
updated_items.append(("logs_injection", lib_config["log_injection_enabled"]))

self._set_config_items([(k, v, "remote_config") for k, v in updated_items])
12 changes: 12 additions & 0 deletions ddtrace/tracer.py
Expand Up @@ -1053,3 +1053,15 @@ def _use_sync_mode():
@staticmethod
def _is_span_internal(span):
return not span.span_type or span.span_type in _INTERNAL_APPLICATION_SPAN_TYPES

@classmethod
def _on_global_config_update(cls, config, items):
if "logs_injection" in items:
if config.logs_injection:
from ddtrace.contrib.logging import patch

patch()
else:
from ddtrace.contrib.logging import unpatch

unpatch()
50 changes: 50 additions & 0 deletions tests/internal/test_settings.py
Expand Up @@ -9,6 +9,31 @@ def config():
yield Config()


def _base_rc_config(cfg):
lib_config = {
"runtime_metrics_enabled": None,
"tracing_debug": None,
"tracing_header_tags": None,
"tracing_service_mapping": None,
"tracing_sampling_rate": None,
"tracing_sampling_rules": None,
"span_sampling_rules": None,
"data_streams_enabled": None,
}
lib_config.update(cfg)

return {
"metadata": [],
"config": [
{
"action": "enable",
"service_target": {"service": None, "env": None},
"lib_config": lib_config,
}
],
}


@pytest.mark.parametrize(
"testcase",
[
Expand All @@ -35,6 +60,13 @@ def config():
"expected": {"_trace_sample_rate": 0.8},
"expected_source": {"_trace_sample_rate": "code"},
},
{
"env": {"DD_TRACE_SAMPLE_RATE": "0.9"},
"code": {"_trace_sample_rate": 0.8},
"rc": {"tracing_sampling_rate": 0.7},
"expected": {"_trace_sample_rate": 0.7},
"expected_source": {"_trace_sample_rate": "remote_config"},
},
{
"env": {"DD_LOGS_INJECTION": "true"},
"expected": {"logs_injection": True},
Expand All @@ -46,6 +78,13 @@ def config():
"expected": {"logs_injection": False},
"expected_source": {"logs_injection": "code"},
},
{
"env": {"DD_LOGS_INJECTION": "true"},
"code": {"logs_injection": False},
"rc": {"log_injection_enabled": True},
"expected": {"logs_injection": True},
"expected_source": {"logs_injection": "remote_config"},
},
{
"env": {"DD_TRACE_HEADER_TAGS": "X-Header-Tag-1:header_tag_1,X-Header-Tag-2:header_tag_2"},
"expected": {
Expand All @@ -59,6 +98,13 @@ def config():
"expected": {"trace_http_header_tags": {"header": "value"}},
"expected_source": {"trace_http_header_tags": "code"},
},
{
"env": {"DD_TRACE_HEADER_TAGS": "X-Header-Tag-1:header_tag_1,X-Header-Tag-2:header_tag_2"},
"code": {"trace_http_header_tags": {"header": "value"}},
"rc": {"tracing_header_tags": [{"header": "Content-Length", "tag_name": ""}]},
"expected": {"trace_http_header_tags": {"content-length": ""}},
"expected_source": {"trace_http_header_tags": "remote_config"},
},
],
)
def test_settings(testcase, config, monkeypatch):
Expand All @@ -69,6 +115,10 @@ def test_settings(testcase, config, monkeypatch):
for code_name, code_value in testcase.get("code", {}).items():
setattr(config, code_name, code_value)

rc_items = testcase.get("rc", {}).items()
if rc_items:
config._handle_remoteconfig(_base_rc_config(rc_items), None)

for expected_name, expected_value in testcase["expected"].items():
assert getattr(config, expected_name) == expected_value

Expand Down

0 comments on commit dbbbac5

Please sign in to comment.