Skip to content

Commit

Permalink
Add Huey Integration (#1555)
Browse files Browse the repository at this point in the history
* Minimal Huey integration
  • Loading branch information
Zhenay committed Jan 25, 2023
1 parent 88880be commit 762557a
Show file tree
Hide file tree
Showing 8 changed files with 383 additions and 1 deletion.
73 changes: 73 additions & 0 deletions .github/workflows/test-integration-huey.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Test huey

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless
jobs:
test:
name: huey, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 45

strategy:
fail-fast: false
matrix:
python-version: ["2.7","3.5","3.6","3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install codecov "tox>=3,<4"
- name: Test huey
timeout-minutes: 45
shell: bash
run: |
set -x # print commands that are executed
coverage erase
./scripts/runtox.sh "${{ matrix.python-version }}-huey" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
coverage combine .coverage*
coverage xml -i
codecov --file coverage.xml
check_required_tests:
name: All huey tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@ disallow_untyped_defs = False
ignore_missing_imports = True
[mypy-flask.signals]
ignore_missing_imports = True
[mypy-huey.*]
ignore_missing_imports = True
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class OP:
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
QUEUE_TASK_CELERY = "queue.task.celery"
QUEUE_TASK_RQ = "queue.task.rq"
QUEUE_SUBMIT_HUEY = "queue.submit.huey"
QUEUE_TASK_HUEY = "queue.task.huey"
SUBPROCESS = "subprocess"
SUBPROCESS_WAIT = "subprocess.wait"
SUBPROCESS_COMMUNICATE = "subprocess.communicate"
Expand Down
154 changes: 154 additions & 0 deletions sentry_sdk/integrations/huey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from __future__ import absolute_import

import sys
from datetime import datetime

from sentry_sdk._compat import reraise
from sentry_sdk._types import MYPY
from sentry_sdk import Hub
from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception

if MYPY:
from typing import Any, Callable, Optional, Union, TypeVar

from sentry_sdk._types import EventProcessor, Event, Hint
from sentry_sdk.utils import ExcInfo

F = TypeVar("F", bound=Callable[..., Any])

try:
from huey.api import Huey, Result, ResultGroup, Task
from huey.exceptions import CancelExecution, RetryTask
except ImportError:
raise DidNotEnable("Huey is not installed")


HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask)


class HueyIntegration(Integration):
identifier = "huey"

@staticmethod
def setup_once():
# type: () -> None
patch_enqueue()
patch_execute()


def patch_enqueue():
# type: () -> None
old_enqueue = Huey.enqueue

def _sentry_enqueue(self, task):
# type: (Huey, Task) -> Optional[Union[Result, ResultGroup]]
hub = Hub.current

if hub.get_integration(HueyIntegration) is None:
return old_enqueue(self, task)

with hub.start_span(op=OP.QUEUE_SUBMIT_HUEY, description=task.name):
return old_enqueue(self, task)

Huey.enqueue = _sentry_enqueue


def _make_event_processor(task):
# type: (Any) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]

with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["huey_task_id"] = task.id
tags["huey_task_retry"] = task.default_retries > task.retries
extra = event.setdefault("extra", {})
extra["huey-job"] = {
"task": task.name,
"args": task.args
if _should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE,
"kwargs": task.kwargs
if _should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE,
"retry": (task.default_retries or 0) - task.retries,
}

return event

return event_processor


def _capture_exception(exc_info):
# type: (ExcInfo) -> None
hub = Hub.current

if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS:
hub.scope.transaction.set_status("aborted")
return

hub.scope.transaction.set_status("internal_error")
event, hint = event_from_exception(
exc_info,
client_options=hub.client.options if hub.client else None,
mechanism={"type": HueyIntegration.identifier, "handled": False},
)
hub.capture_event(event, hint=hint)


def _wrap_task_execute(func):
# type: (F) -> F
def _sentry_execute(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(HueyIntegration) is None:
return func(*args, **kwargs)

try:
result = func(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

return _sentry_execute # type: ignore


def patch_execute():
# type: () -> None
old_execute = Huey._execute

def _sentry_execute(self, task, timestamp=None):
# type: (Huey, Task, Optional[datetime]) -> Any
hub = Hub.current

if hub.get_integration(HueyIntegration) is None:
return old_execute(self, task, timestamp)

with hub.push_scope() as scope:
with capture_internal_exceptions():
scope._name = "huey"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task))

transaction = Transaction(
name=task.name,
status="ok",
op=OP.QUEUE_TASK_HUEY,
source=TRANSACTION_SOURCE_TASK,
)

if not getattr(task, "_sentry_is_patched", False):
task.execute = _wrap_task_execute(task.execute)
task._sentry_is_patched = True

with hub.start_transaction(transaction):
return old_execute(self, task, timestamp)

Huey._execute = _sentry_execute
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_file_text(file_name):
"django": ["django>=1.8"],
"sanic": ["sanic>=0.8"],
"celery": ["celery>=3"],
"huey": ["huey>=2"],
"beam": ["apache-beam>=2.12"],
"rq": ["rq>=0.6"],
"aiohttp": ["aiohttp>=3.5"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/huey/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("huey")
140 changes: 140 additions & 0 deletions tests/integrations/huey/test_huey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import pytest
from decimal import DivisionByZero

from sentry_sdk import start_transaction
from sentry_sdk.integrations.huey import HueyIntegration

from huey.api import MemoryHuey, Result
from huey.exceptions import RetryTask


@pytest.fixture
def init_huey(sentry_init):
def inner():
sentry_init(
integrations=[HueyIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
debug=True,
)

return MemoryHuey(name="sentry_sdk")

return inner


@pytest.fixture(autouse=True)
def flush_huey_tasks(init_huey):
huey = init_huey()
huey.flush()


def execute_huey_task(huey, func, *args, **kwargs):
exceptions = kwargs.pop("exceptions", None)
result = func(*args, **kwargs)
task = huey.dequeue()
if exceptions is not None:
try:
huey.execute(task)
except exceptions:
pass
else:
huey.execute(task)
return result


def test_task_result(init_huey):
huey = init_huey()

@huey.task()
def increase(num):
return num + 1

result = increase(3)

assert isinstance(result, Result)
assert len(huey) == 1
task = huey.dequeue()
assert huey.execute(task) == 4
assert result.get() == 4


@pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"])
def test_task_transaction(capture_events, init_huey, task_fails):
huey = init_huey()

@huey.task()
def division(a, b):
return a / b

events = capture_events()
execute_huey_task(
huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,)
)

if task_fails:
error_event = events.pop(0)
assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError"
assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey"

(event,) = events
assert event["type"] == "transaction"
assert event["transaction"] == "division"
assert event["transaction_info"] == {"source": "task"}

if task_fails:
assert event["contexts"]["trace"]["status"] == "internal_error"
else:
assert event["contexts"]["trace"]["status"] == "ok"

assert "huey_task_id" in event["tags"]
assert "huey_task_retry" in event["tags"]


def test_task_retry(capture_events, init_huey):
huey = init_huey()
context = {"retry": True}

@huey.task()
def retry_task(context):
if context["retry"]:
context["retry"] = False
raise RetryTask()

events = capture_events()
result = execute_huey_task(huey, retry_task, context)
(event,) = events

assert event["transaction"] == "retry_task"
assert event["tags"]["huey_task_id"] == result.task.id
assert len(huey) == 1

task = huey.dequeue()
huey.execute(task)
(event, _) = events

assert event["transaction"] == "retry_task"
assert event["tags"]["huey_task_id"] == result.task.id
assert len(huey) == 0


def test_huey_enqueue(init_huey, capture_events):
huey = init_huey()

@huey.task(name="different_task_name")
def dummy_task():
pass

events = capture_events()

with start_transaction() as transaction:
dummy_task()

(event,) = events

assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert event["contexts"]["trace"]["span_id"] == transaction.span_id

assert len(event["spans"])
assert event["spans"][0]["op"] == "queue.submit.huey"
assert event["spans"][0]["description"] == "different_task_name"

0 comments on commit 762557a

Please sign in to comment.