Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(huey): add Huey Integration #1555

Merged
merged 22 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@ disallow_untyped_defs = False
ignore_missing_imports = True
[mypy-flask.signals]
ignore_missing_imports = True
[mypy-huey.*]
ignore_missing_imports = True
148 changes: 148 additions & 0 deletions sentry_sdk/integrations/huey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from __future__ import absolute_import

import sys
from datetime import datetime

from sentry_sdk._compat import reraise
from sentry_sdk._types import MYPY
from sentry_sdk import Hub
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception

if MYPY:
from typing import Any, Callable, Optional, Union, TypeVar

from sentry_sdk._types import EventProcessor, Event, Hint
from sentry_sdk.utils import ExcInfo

F = TypeVar("F", bound=Callable[..., Any])

try:
from huey.api import Huey, Result, ResultGroup, Task
from huey.exceptions import CancelExecution, RetryTask
except ImportError:
raise DidNotEnable("Huey is not installed")


HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask)


class HueyIntegration(Integration):
identifier = "huey"

@staticmethod
def setup_once():
# type: () -> None
patch_enqueue()
patch_execute()


def patch_enqueue():
# type: () -> None
old_enqueue = Huey.enqueue

def _sentry_enqueue(self, task):
# type: (Huey, Task) -> Optional[Union[Result, ResultGroup]]
hub = Hub.current

if hub.get_integration(HueyIntegration) is None:
return old_enqueue(self, task)

with hub.start_span(op="huey.enqueue", description=task.name):
Zhenay marked this conversation as resolved.
Show resolved Hide resolved
return old_enqueue(self, task)

Huey.enqueue = _sentry_enqueue


def _make_event_processor(task):
# type: (Any) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]

with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["huey_task_id"] = task.id
tags["huey_task_retry"] = task.default_retries > task.retries
extra = event.setdefault("extra", {})
extra["huey-job"] = {
"task": task.name,
"args": task.args,
"kwargs": task.kwargs,
"retry": (task.default_retries or 0) - task.retries,
}

return event

return event_processor


def _capture_exception(exc_info):
# type: (ExcInfo) -> None
hub = Hub.current

if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS:
hub.scope.transaction.set_status("aborted")
return

hub.scope.transaction.set_status("internal_error")
event, hint = event_from_exception(
exc_info,
client_options=hub.client.options if hub.client else None,
mechanism={"type": "huey", "handled": False},
Zhenay marked this conversation as resolved.
Show resolved Hide resolved
)
hub.capture_event(event, hint=hint)


def _wrap_task_execute(func):
# type: (F) -> F
def _sentry_execute(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(HueyIntegration) is None:
return func(*args, **kwargs)

try:
result = func(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

return _sentry_execute # type: ignore


def patch_execute():
# type: () -> None
old_execute = Huey._execute

def _sentry_execute(self, task, timestamp=None):
# type: (Huey, Task, Optional[datetime]) -> Any
hub = Hub.current

if hub.get_integration(HueyIntegration) is None:
return old_execute(self, task, timestamp)

with hub.push_scope() as scope:
with capture_internal_exceptions():
scope._name = "huey"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task))

transaction = Transaction(
name=task.name,
status="ok",
op="huey.task",
Zhenay marked this conversation as resolved.
Show resolved Hide resolved
source=TRANSACTION_SOURCE_TASK,
)

if not getattr(task, "_sentry_is_patched", False):
task.execute = _wrap_task_execute(task.execute)
task._sentry_is_patched = True

with hub.start_transaction(transaction):
return old_execute(self, task, timestamp)

Huey._execute = _sentry_execute
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_file_text(file_name):
"django": ["django>=1.8"],
"sanic": ["sanic>=0.8"],
"celery": ["celery>=3"],
"huey": ["huey>=2"],
"beam": ["apache-beam>=2.12"],
"rq": ["rq>=0.6"],
"aiohttp": ["aiohttp>=3.5"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/huey/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("huey")
140 changes: 140 additions & 0 deletions tests/integrations/huey/test_huey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import pytest
from decimal import DivisionByZero

from sentry_sdk import start_transaction
from sentry_sdk.integrations.huey import HueyIntegration

from huey.api import RedisExpireHuey, Result
from huey.exceptions import RetryTask


@pytest.fixture
def init_huey(sentry_init):
def inner():
sentry_init(
integrations=[HueyIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
debug=True,
)

return RedisExpireHuey(name="sentry_sdk", url="redis://127.0.0.1:6379")

return inner


@pytest.fixture(autouse=True)
def flush_huey_tasks(init_huey):
huey = init_huey()
huey.flush()


def execute_huey_task(huey, func, *args, **kwargs):
exceptions = kwargs.pop("exceptions", None)
result = func(*args, **kwargs)
task = huey.dequeue()
if exceptions is not None:
try:
huey.execute(task)
except exceptions:
pass
else:
huey.execute(task)
return result


def test_task_result(init_huey):
huey = init_huey()

@huey.task()
def increase(num):
return num + 1

result = increase(3)

assert isinstance(result, Result)
assert len(huey) == 1
task = huey.dequeue()
assert huey.execute(task) == 4
assert result.get() == 4


@pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"])
def test_task_transaction(capture_events, init_huey, task_fails):
huey = init_huey()

@huey.task()
def division(a, b):
return a / b

events = capture_events()
execute_huey_task(
huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,)
)

if task_fails:
error_event = events.pop(0)
assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError"
assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey"

(event,) = events
assert event["type"] == "transaction"
assert event["transaction"] == "division"
assert event["transaction_info"] == {"source": "task"}

if task_fails:
assert event["contexts"]["trace"]["status"] == "internal_error"
else:
assert event["contexts"]["trace"]["status"] == "ok"

assert "huey_task_id" in event["tags"]
assert "huey_task_retry" in event["tags"]


def test_task_retry(capture_events, init_huey):
huey = init_huey()
context = {"retry": True}

@huey.task()
def retry_task(context):
if context["retry"]:
context["retry"] = False
raise RetryTask()

events = capture_events()
result = execute_huey_task(huey, retry_task, context)
(event,) = events

assert event["transaction"] == "retry_task"
assert event["tags"]["huey_task_id"] == result.task.id
assert len(huey) == 1

task = huey.dequeue()
huey.execute(task)
(event, _) = events

assert event["transaction"] == "retry_task"
assert event["tags"]["huey_task_id"] == result.task.id
assert len(huey) == 0


def test_huey_enqueue(init_huey, capture_events):
huey = init_huey()

@huey.task(name="different_task_name")
def dummy_task():
pass

events = capture_events()

with start_transaction() as transaction:
dummy_task()

(event,) = events

assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert event["contexts"]["trace"]["span_id"] == transaction.span_id

assert len(event["spans"])
assert event["spans"][0]["op"] == "huey.enqueue"
Zhenay marked this conversation as resolved.
Show resolved Hide resolved
assert event["spans"][0]["description"] == "different_task_name"
6 changes: 6 additions & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ envlist =
{py3.6,py3.7,py3.8}-celery-{5.0}
{py3.7,py3.8,py3.9,py3.10}-celery-{5.1,5.2}

{py2.7,py3.5,py3.6,py3.7,py3.8,py3.9,py3.10}-huey-2

py3.7-beam-{2.12,2.13,2.32,2.33}

# The aws_lambda tests deploy to the real AWS and have their own matrix of Python versions.
Expand Down Expand Up @@ -208,6 +210,9 @@ deps =
{py3.7}-celery: importlib-metadata<5.0
{py2.7,py3.6,py3.7,py3.8,py3.9,py3.10}-celery: newrelic

huey: redis
huey-2: huey>=2.0

requests: requests>=2.0

aws_lambda: boto3
Expand Down Expand Up @@ -302,6 +307,7 @@ setenv =
bottle: TESTPATH=tests/integrations/bottle
falcon: TESTPATH=tests/integrations/falcon
celery: TESTPATH=tests/integrations/celery
huey: TESTPATH=tests/integrations/huey
requests: TESTPATH=tests/integrations/requests
aws_lambda: TESTPATH=tests/integrations/aws_lambda
gcp: TESTPATH=tests/integrations/gcp
Expand Down