Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Flushing transport on exit in 3.12 #2991

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 32 additions & 9 deletions sentry_sdk/transport.py
Expand Up @@ -192,7 +192,6 @@ def __init__(
Transport.__init__(self, options)
assert self.parsed_dsn is not None
self.options = options # type: Dict[str, Any]
self._worker = BackgroundWorker(queue_size=options["transport_queue_size"])
self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION)
self._disabled_until = {} # type: Dict[DataCategory, datetime]
self._retry = urllib3.util.Retry()
Expand All @@ -216,6 +215,9 @@ def __init__(
ca_certs=options["ca_certs"],
proxy_headers=options["proxy_headers"],
)
self._worker = BackgroundWorker(
sender=EnvelopeSender(self), queue_size=options["transport_queue_size"]
)

from sentry_sdk import Hub

Expand Down Expand Up @@ -396,6 +398,7 @@ def _send_envelope(
# type: (...) -> None

# remove all items from the envelope which are over quota
print("sending")
new_items = []
for item in envelope.items:
if self._check_disabled(item.data_category):
Expand Down Expand Up @@ -543,14 +546,14 @@ def capture_envelope(
# type: (...) -> None
hub = self.hub_cls.current

def send_envelope_wrapper():
# type: () -> None
with hub:
with capture_internal_exceptions():
self._send_envelope(envelope)
self._flush_client_reports()
# def send_envelope_wrapper():
# # type: () -> None
# with hub:
# with capture_internal_exceptions():
# self._send_envelope(envelope)
# self._flush_client_reports()

if not self._worker.submit(send_envelope_wrapper):
if not self._worker.submit(envelope):
self.on_dropped_event("full_queue")
for item in envelope.items:
self.record_lost_event("queue_overflow", item=item)
Expand All @@ -564,7 +567,7 @@ def flush(
logger.debug("Flushing HTTP transport")

if timeout > 0:
self._worker.submit(lambda: self._flush_client_reports(force=True))
# self._worker.submit(FlushClientReportsJob(self))
self._worker.flush(timeout, callback)

def kill(self):
Expand All @@ -573,6 +576,26 @@ def kill(self):
self._worker.kill()


class EnvelopeSender:
def __init__(self, transport):
# type: (Transport) -> None
self.transport = transport

def __call__(self, envelope) -> None:
print(f"Called with {envelope}")
self.transport._send_envelope(envelope)
self.transport._flush_client_reports()


class FlushClientReportsJob:
def __init__(self, transport):
# type: (Transport) -> None
self.transport = transport

def __call__(self) -> None:
self.transport._flush_client_reports(force=True)


class _FunctionTransport(Transport):
"""
DEPRECATED: Users wishing to provide a custom transport should subclass
Expand Down
205 changes: 131 additions & 74 deletions sentry_sdk/worker.py
Expand Up @@ -2,8 +2,9 @@
import threading

from time import sleep, time
from multiprocessing import get_context
from sentry_sdk._queue import Queue, FullError
from sentry_sdk.utils import logger
from sentry_sdk.utils import capture_internal_exceptions, logger
from sentry_sdk.consts import DEFAULT_QUEUE_SIZE

from sentry_sdk._types import TYPE_CHECKING
Expand All @@ -13,129 +14,185 @@
from typing import Optional
from typing import Callable

from multiprocessing.connection import Connection
from multiprocessing.context import BaseContext


_TERMINATOR = object()


class _JobProducer:
def __init__(self, connection, context):
# type: (Connection, BaseContext) -> None
self._conn = connection
self._pending = 0
self._lock = context.Lock()

def _update_pending(self):
# type: () -> None
"""Must be called with self._lock held."""
while self._conn.poll():
self._conn.recv()
self._pending -= 1

def submit(self, obj):
# type: (object) -> None
with self._lock:
# breakpoint()
self._update_pending()
self._pending += 1
self._conn.send(obj)

def pending(self):
# type: () -> int
with self._lock:
self._update_pending()
return self._pending

def close(self):
# type: () -> None
self._conn.close()


class _JobConsumer:
def __init__(self, connection):
# type: (Connection) -> None
self._conn = connection

def consume_until_close(self):
while True:
try:
obj = self._conn.recv()
except EOFError:
break

yield obj

# Acknowledge that we received the object
self._conn.send(None)

sleep(0)


def _make_job_queue(ctx):
# type: (BaseContext) -> tuple[_JobProducer, _JobConsumer]
a, b = ctx.Pipe()
return _JobProducer(a, ctx), _JobConsumer(b)


def dummy():
pass


class BackgroundWorker:
def __init__(self, queue_size=DEFAULT_QUEUE_SIZE):
# type: (int) -> None
self._queue = Queue(queue_size) # type: Queue
self._lock = threading.Lock()
self._thread = None # type: Optional[threading.Thread]
self._thread_for_pid = None # type: Optional[int]
def __init__(self, sender=lambda _: None, queue_size=DEFAULT_QUEUE_SIZE):
# type: (Any, int) -> None
self._mp_ctx = get_context("fork")
self._queue_producer, self._queue_consumer = _make_job_queue(self._mp_ctx)
self._process_for_pid = None # type: Optional[int]
self.start(sender)

@property
def is_alive(self):
# type: () -> bool
if self._thread_for_pid != os.getpid():
return False
if not self._thread:
if self._process_for_pid != os.getpid():
return False
return self._thread.is_alive()

def _ensure_thread(self):
# type: () -> None
if not self.is_alive:
self.start()
return self._process.is_alive()

def _timed_queue_join(self, timeout):
# type: (float) -> bool
deadline = time() + timeout
queue = self._queue

queue.all_tasks_done.acquire()
sleep(timeout)
if self._queue_producer.pending() == 0:
return True

try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
return False
queue.all_tasks_done.wait(timeout=delay)
return False
# deadline = time() + timeout
# queue = self._queue

return True
finally:
queue.all_tasks_done.release()
# queue.all_tasks_done.acquire()

def start(self):
# type: () -> None
with self._lock:
if not self.is_alive:
self._thread = threading.Thread(
target=self._target, name="sentry-sdk.BackgroundWorker"
)
self._thread.daemon = True
try:
self._thread.start()
self._thread_for_pid = os.getpid()
except RuntimeError:
# At this point we can no longer start because the interpreter
# is already shutting down. Sadly at this point we can no longer
# send out events.
self._thread = None
# try:
# while queue.unfinished_tasks:
# delay = deadline - time()
# if delay <= 0:
# return False
# queue.all_tasks_done.wait(timeout=delay)

# return True
# finally:
# queue.all_tasks_done.release()

def start(self, sender=lambda _: None):
# type: (Any) -> None
if not self.is_alive:
print("starting process")
self._process = self._mp_ctx.Process(
target=self._target, name="sentry-sdk.BackgroundWorker", args=(sender,)
)
self._process.daemon = True
self._process.start()
self._process_for_pid = os.getpid()

def kill(self):
# type: () -> None
"""
Kill worker thread. Returns immediately. Not useful for
waiting on shutdown for events, use `flush` for that.

No other methods may be called after this one; the
behavior of doing so is undefined. We recommend destroying
all references to the worker after calling this method.
"""
logger.debug("background worker got kill request")
with self._lock:
if self._thread:
try:
self._queue.put_nowait(_TERMINATOR)
except FullError:
logger.debug("background worker queue full, kill failed")

self._thread = None
self._thread_for_pid = None
if self._process:
try:
self._queue_producer.close()
except FullError:
logger.debug("background worker queue full, kill failed")

def flush(self, timeout, callback=None):
# type: (float, Optional[Any]) -> None
logger.debug("background worker got flush request")
with self._lock:
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
if self.is_alive and timeout > 0.0:
self._wait_flush(timeout, callback)
logger.debug("background worker flushed")

def full(self):
# type: () -> bool
return self._queue.full()
return False
# TODO: Update
# return self._queue.full()

def _wait_flush(self, timeout, callback):
# type: (float, Optional[Any]) -> None
# type: (float, Optional[Any]) -> Nonex
initial_timeout = min(0.1, timeout)
if not self._timed_queue_join(initial_timeout):
pending = self._queue.qsize() + 1
pending = self._queue_producer.pending()
logger.debug("%d event(s) pending on flush", pending)
if callback is not None:
callback(pending, timeout)

if not self._timed_queue_join(timeout - initial_timeout):
pending = self._queue.qsize() + 1
pending = self._queue_producer.pending()
logger.error("flush timed out, dropped %s events", pending)

def submit(self, callback):
# type: (Callable[[], None]) -> bool
self._ensure_thread()
try:
self._queue.put_nowait(callback)
self._queue_producer.submit(callback)
return True
except FullError:
return False

def _target(self):
# type: () -> None
while True:
callback = self._queue.get()
def _target(self, sender):
# type: (Any) -> None
for envelope in self._queue_consumer.consume_until_close():
print("got envelope")
print(envelope)
try:
if callback is _TERMINATOR:
break
try:
callback()
except Exception:
logger.error("Failed processing job", exc_info=True)
finally:
self._queue.task_done()
sleep(0)
print(sender)
sender(envelope)
except Exception:
logger.error("Failed processing job", exc_info=True)