Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(celery): Send queue name to Sentry #2984

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions sentry_sdk/consts.py
Expand Up @@ -264,6 +264,12 @@ class SPANDATA:
Example: 418
"""

MESSAGING_DESTINATION_NAME = "messaging.destination.name"
"""
The destination name where the message is being consumed from,
e.g. the queue name or topic.
"""

SERVER_ADDRESS = "server.address"
"""
Name of the database host.
Expand Down Expand Up @@ -366,6 +372,7 @@ class OP:
LANGCHAIN_TOOL = "ai.tool.langchain"
LANGCHAIN_AGENT = "ai.agent.langchain"
LANGCHAIN_CHAT_COMPLETIONS_CREATE = "ai.chat_completions.create.langchain"
QUEUE_PROCESS = "queue.process"
QUEUE_SUBMIT_ARQ = "queue.submit.arq"
QUEUE_TASK_ARQ = "queue.task.arq"
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
Expand Down
26 changes: 22 additions & 4 deletions sentry_sdk/integrations/celery/__init__.py
Expand Up @@ -4,7 +4,7 @@
import sentry_sdk
from sentry_sdk import isolation_scope
from sentry_sdk.api import continue_trace
from sentry_sdk.consts import OP
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations.celery.beat import (
_patch_beat_apply_entry,
Expand Down Expand Up @@ -325,20 +325,38 @@ def _inner(*args, **kwargs):
return _inner # type: ignore


def _set_messaging_destination_name(task, span):
# type: (Any, Span) -> None
"""Set "messaging.destination.name" tag for span"""
with capture_internal_exceptions():
delivery_info = task.request.delivery_info
routing_key = delivery_info.get("routing_key")
if delivery_info.get("exchange") == "" and routing_key is not None:
# Empty exchange indicates the default exchange, meaning the tasks
# are sent to the queue with the same name as the routing key.
span.set_data(SPANDATA.MESSAGING_DESTINATION_NAME, routing_key)


def _wrap_task_call(task, f):
# type: (Any, F) -> F

# Need to wrap task call because the exception is caught before we get to
# see it. Also celery's reported stacktrace is untrustworthy.

# functools.wraps is important here because celery-once looks at this
# method's name.
# method's name. @ensure_integration_enabled internally calls functools.wraps,
# but if we ever remove the @ensure_integration_enabled decorator, we need
# to add @functools.wraps(f) here.
antonpirker marked this conversation as resolved.
Show resolved Hide resolved
# https://github.com/getsentry/sentry-python/issues/421
@wraps(f)
@ensure_integration_enabled(CeleryIntegration, f)
def _inner(*args, **kwargs):
# type: (*Any, **Any) -> Any
try:
return f(*args, **kwargs)
with sentry_sdk.start_span(
op=OP.QUEUE_PROCESS, description=task.name
) as span:
_set_messaging_destination_name(task, span)
return f(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
with capture_internal_exceptions():
Expand Down
55 changes: 54 additions & 1 deletion tests/integrations/celery/test_celery.py
Expand Up @@ -209,7 +209,17 @@ def dummy_task(x, y):
else:
assert execution_event["contexts"]["trace"]["status"] == "ok"

assert execution_event["spans"] == []
assert len(execution_event["spans"]) == 1
assert (
antonpirker marked this conversation as resolved.
Show resolved Hide resolved
execution_event["spans"][0].items()
>= {
"trace_id": str(transaction.trace_id),
"same_process_as_parent": True,
"op": "queue.process",
"description": "dummy_task",
"data": ApproxDict(),
}.items()
)
assert submission_event["spans"] == [
{
"data": ApproxDict(),
Expand Down Expand Up @@ -606,3 +616,46 @@ def example_task():
pytest.fail("Calling `apply_async` without arguments raised a TypeError")

assert result.get() == "success"


@pytest.mark.parametrize("routing_key", ("celery", "custom"))
@mock.patch("celery.app.task.Task.request")
def test_messaging_destination_name_default_exchange(
mock_request, routing_key, init_celery, capture_events
):
celery_app = init_celery(enable_tracing=True)
events = capture_events()
mock_request.delivery_info = {"routing_key": routing_key, "exchange": ""}

@celery_app.task()
def task(): ...

task.apply_async()

(event,) = events
(span,) = event["spans"]
assert span["data"]["messaging.destination.name"] == routing_key


@mock.patch("celery.app.task.Task.request")
def test_messaging_destination_name_nondefault_exchange(
mock_request, init_celery, capture_events
):
"""
Currently, we only capture the routing key as the messaging.destination.name when
we are using the default exchange (""). This is because the default exchange ensures
that the routing key is the queue name. Other exchanges may not guarantee this
behavior.
"""
celery_app = init_celery(enable_tracing=True)
events = capture_events()
mock_request.delivery_info = {"routing_key": "celery", "exchange": "custom"}

@celery_app.task()
def task(): ...

task.apply_async()

(event,) = events
(span,) = event["spans"]
assert "messaging.destination.name" not in span["data"]