Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(huey): add Huey Integration #1555

Merged
merged 22 commits into from
Jan 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
73 changes: 73 additions & 0 deletions .github/workflows/test-integration-huey.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
name: Test huey

on:
push:
branches:
- master
- release/**

pull_request:

# Cancel in progress workflows on pull_requests.
# https://docs.github.com/en/actions/using-jobs/using-concurrency#example-using-a-fallback-value
concurrency:
group: ${{ github.workflow }}-${{ github.head_ref || github.run_id }}
cancel-in-progress: true

permissions:
contents: read

env:
BUILD_CACHE_KEY: ${{ github.sha }}
CACHED_BUILD_PATHS: |
${{ github.workspace }}/dist-serverless

jobs:
test:
name: huey, python ${{ matrix.python-version }}, ${{ matrix.os }}
runs-on: ${{ matrix.os }}
timeout-minutes: 45

strategy:
fail-fast: false
matrix:
python-version: ["2.7","3.5","3.6","3.7","3.8","3.9","3.10","3.11"]
# python3.6 reached EOL and is no longer being supported on
# new versions of hosted runners on Github Actions
# ubuntu-20.04 is the last version that supported python3.6
# see https://github.com/actions/setup-python/issues/544#issuecomment-1332535877
os: [ubuntu-20.04]

steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Setup Test Env
run: |
pip install codecov "tox>=3,<4"

- name: Test huey
timeout-minutes: 45
shell: bash
run: |
set -x # print commands that are executed
coverage erase

./scripts/runtox.sh "${{ matrix.python-version }}-huey" --cov=tests --cov=sentry_sdk --cov-report= --cov-branch
coverage combine .coverage*
coverage xml -i
codecov --file coverage.xml

check_required_tests:
name: All huey tests passed or skipped
needs: test
# Always run this, even if a dependent job failed
if: always()
runs-on: ubuntu-20.04
steps:
- name: Check for failures
if: contains(needs.test.result, 'failure')
run: |
echo "One of the dependent jobs have failed. You may need to re-run it." && exit 1
2 changes: 2 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,5 @@ disallow_untyped_defs = False
ignore_missing_imports = True
[mypy-flask.signals]
ignore_missing_imports = True
[mypy-huey.*]
ignore_missing_imports = True
2 changes: 2 additions & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class OP:
QUEUE_SUBMIT_CELERY = "queue.submit.celery"
QUEUE_TASK_CELERY = "queue.task.celery"
QUEUE_TASK_RQ = "queue.task.rq"
QUEUE_SUBMIT_HUEY = "queue.submit.huey"
QUEUE_TASK_HUEY = "queue.task.huey"
SUBPROCESS = "subprocess"
SUBPROCESS_WAIT = "subprocess.wait"
SUBPROCESS_COMMUNICATE = "subprocess.communicate"
Expand Down
154 changes: 154 additions & 0 deletions sentry_sdk/integrations/huey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
from __future__ import absolute_import

import sys
from datetime import datetime

from sentry_sdk._compat import reraise
from sentry_sdk._types import MYPY
from sentry_sdk import Hub
from sentry_sdk.consts import OP, SENSITIVE_DATA_SUBSTITUTE
from sentry_sdk.hub import _should_send_default_pii
from sentry_sdk.integrations import DidNotEnable, Integration
from sentry_sdk.tracing import Transaction, TRANSACTION_SOURCE_TASK
from sentry_sdk.utils import capture_internal_exceptions, event_from_exception

if MYPY:
from typing import Any, Callable, Optional, Union, TypeVar

from sentry_sdk._types import EventProcessor, Event, Hint
from sentry_sdk.utils import ExcInfo

F = TypeVar("F", bound=Callable[..., Any])

try:
from huey.api import Huey, Result, ResultGroup, Task
from huey.exceptions import CancelExecution, RetryTask
except ImportError:
raise DidNotEnable("Huey is not installed")


HUEY_CONTROL_FLOW_EXCEPTIONS = (CancelExecution, RetryTask)


class HueyIntegration(Integration):
identifier = "huey"

@staticmethod
def setup_once():
# type: () -> None
patch_enqueue()
patch_execute()


def patch_enqueue():
# type: () -> None
old_enqueue = Huey.enqueue

def _sentry_enqueue(self, task):
# type: (Huey, Task) -> Optional[Union[Result, ResultGroup]]
hub = Hub.current

if hub.get_integration(HueyIntegration) is None:
return old_enqueue(self, task)

with hub.start_span(op=OP.QUEUE_SUBMIT_HUEY, description=task.name):
return old_enqueue(self, task)

Huey.enqueue = _sentry_enqueue


def _make_event_processor(task):
# type: (Any) -> EventProcessor
def event_processor(event, hint):
# type: (Event, Hint) -> Optional[Event]

with capture_internal_exceptions():
tags = event.setdefault("tags", {})
tags["huey_task_id"] = task.id
tags["huey_task_retry"] = task.default_retries > task.retries
extra = event.setdefault("extra", {})
extra["huey-job"] = {
"task": task.name,
"args": task.args
if _should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE,
"kwargs": task.kwargs
if _should_send_default_pii()
else SENSITIVE_DATA_SUBSTITUTE,
"retry": (task.default_retries or 0) - task.retries,
}

return event

return event_processor


def _capture_exception(exc_info):
# type: (ExcInfo) -> None
hub = Hub.current

if exc_info[0] in HUEY_CONTROL_FLOW_EXCEPTIONS:
hub.scope.transaction.set_status("aborted")
return

hub.scope.transaction.set_status("internal_error")
event, hint = event_from_exception(
exc_info,
client_options=hub.client.options if hub.client else None,
mechanism={"type": HueyIntegration.identifier, "handled": False},
)
hub.capture_event(event, hint=hint)


def _wrap_task_execute(func):
# type: (F) -> F
def _sentry_execute(*args, **kwargs):
# type: (*Any, **Any) -> Any
hub = Hub.current
if hub.get_integration(HueyIntegration) is None:
return func(*args, **kwargs)

try:
result = func(*args, **kwargs)
except Exception:
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)

return result

return _sentry_execute # type: ignore


def patch_execute():
# type: () -> None
old_execute = Huey._execute

def _sentry_execute(self, task, timestamp=None):
# type: (Huey, Task, Optional[datetime]) -> Any
hub = Hub.current

if hub.get_integration(HueyIntegration) is None:
return old_execute(self, task, timestamp)

with hub.push_scope() as scope:
with capture_internal_exceptions():
scope._name = "huey"
scope.clear_breadcrumbs()
scope.add_event_processor(_make_event_processor(task))

transaction = Transaction(
name=task.name,
status="ok",
op=OP.QUEUE_TASK_HUEY,
source=TRANSACTION_SOURCE_TASK,
)

if not getattr(task, "_sentry_is_patched", False):
task.execute = _wrap_task_execute(task.execute)
task._sentry_is_patched = True

with hub.start_transaction(transaction):
return old_execute(self, task, timestamp)

Huey._execute = _sentry_execute
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def get_file_text(file_name):
"django": ["django>=1.8"],
"sanic": ["sanic>=0.8"],
"celery": ["celery>=3"],
"huey": ["huey>=2"],
"beam": ["apache-beam>=2.12"],
"rq": ["rq>=0.6"],
"aiohttp": ["aiohttp>=3.5"],
Expand Down
3 changes: 3 additions & 0 deletions tests/integrations/huey/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import pytest

pytest.importorskip("huey")
140 changes: 140 additions & 0 deletions tests/integrations/huey/test_huey.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import pytest
from decimal import DivisionByZero

from sentry_sdk import start_transaction
from sentry_sdk.integrations.huey import HueyIntegration

from huey.api import MemoryHuey, Result
from huey.exceptions import RetryTask


@pytest.fixture
def init_huey(sentry_init):
def inner():
sentry_init(
integrations=[HueyIntegration()],
traces_sample_rate=1.0,
send_default_pii=True,
debug=True,
)

return MemoryHuey(name="sentry_sdk")

return inner


@pytest.fixture(autouse=True)
def flush_huey_tasks(init_huey):
huey = init_huey()
huey.flush()


def execute_huey_task(huey, func, *args, **kwargs):
exceptions = kwargs.pop("exceptions", None)
result = func(*args, **kwargs)
task = huey.dequeue()
if exceptions is not None:
try:
huey.execute(task)
except exceptions:
pass
else:
huey.execute(task)
return result


def test_task_result(init_huey):
huey = init_huey()

@huey.task()
def increase(num):
return num + 1

result = increase(3)

assert isinstance(result, Result)
assert len(huey) == 1
task = huey.dequeue()
assert huey.execute(task) == 4
assert result.get() == 4


@pytest.mark.parametrize("task_fails", [True, False], ids=["error", "success"])
def test_task_transaction(capture_events, init_huey, task_fails):
huey = init_huey()

@huey.task()
def division(a, b):
return a / b

events = capture_events()
execute_huey_task(
huey, division, 1, int(not task_fails), exceptions=(DivisionByZero,)
)

if task_fails:
error_event = events.pop(0)
assert error_event["exception"]["values"][0]["type"] == "ZeroDivisionError"
assert error_event["exception"]["values"][0]["mechanism"]["type"] == "huey"

(event,) = events
assert event["type"] == "transaction"
assert event["transaction"] == "division"
assert event["transaction_info"] == {"source": "task"}

if task_fails:
assert event["contexts"]["trace"]["status"] == "internal_error"
else:
assert event["contexts"]["trace"]["status"] == "ok"

assert "huey_task_id" in event["tags"]
assert "huey_task_retry" in event["tags"]


def test_task_retry(capture_events, init_huey):
huey = init_huey()
context = {"retry": True}

@huey.task()
def retry_task(context):
if context["retry"]:
context["retry"] = False
raise RetryTask()

events = capture_events()
result = execute_huey_task(huey, retry_task, context)
(event,) = events

assert event["transaction"] == "retry_task"
assert event["tags"]["huey_task_id"] == result.task.id
assert len(huey) == 1

task = huey.dequeue()
huey.execute(task)
(event, _) = events

assert event["transaction"] == "retry_task"
assert event["tags"]["huey_task_id"] == result.task.id
assert len(huey) == 0


def test_huey_enqueue(init_huey, capture_events):
huey = init_huey()

@huey.task(name="different_task_name")
def dummy_task():
pass

events = capture_events()

with start_transaction() as transaction:
dummy_task()

(event,) = events

assert event["contexts"]["trace"]["trace_id"] == transaction.trace_id
assert event["contexts"]["trace"]["span_id"] == transaction.span_id

assert len(event["spans"])
assert event["spans"][0]["op"] == "queue.submit.huey"
assert event["spans"][0]["description"] == "different_task_name"