Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions sentry_sdk/integrations/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import sentry_sdk
from sentry_sdk.integrations import Integration
from sentry_sdk.scope import use_isolation_scope, use_scope
from sentry_sdk.utils import event_from_exception

from typing import TYPE_CHECKING

Expand All @@ -28,7 +29,7 @@ def setup_once():


def _wrap_submit_call(func):
# type: (Any) -> Any
# type: (Callable[..., Future[Any]]) -> Callable[..., Future[Any]]
"""
Wrap submit call to propagate scopes on task submission.
"""
Expand All @@ -49,6 +50,30 @@ def wrapped_fn(*args, **kwargs):
with use_scope(current_scope):
return fn(*args, **kwargs)

return func(self, wrapped_fn, *args, **kwargs)
future = func(self, wrapped_fn, *args, **kwargs)

def report_exceptions(future):
# type: (Future[Any]) -> None
exception = future.exception()
integration = sentry_sdk.get_client().get_integration(ConcurrentIntegration)

if (
exception is None
or integration is None
or not integration.record_exceptions_on_futures
):
return

event, hint = event_from_exception(
exception,
client_options=sentry_sdk.get_client().options,
mechanism={"type": "concurrent", "handled": False},
)
sentry_sdk.capture_event(event, hint=hint)

if integration.record_exceptions_on_futures:
future.add_done_callback(report_exceptions)

return future

return sentry_submit
152 changes: 152 additions & 0 deletions tests/integrations/concurrent/test_concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,149 @@
from concurrent import futures
from concurrent.futures import Future, ThreadPoolExecutor

import pytest

import sentry_sdk

from sentry_sdk.integrations.concurrent import ConcurrentIntegration
from sentry_sdk.integrations.dedupe import DedupeIntegration
from sentry_sdk.integrations.excepthook import ExcepthookIntegration
from sentry_sdk.integrations.threading import ThreadingIntegration

original_submit = ThreadPoolExecutor.submit
original_set_exception = Future.set_exception


@pytest.mark.parametrize("record_exceptions_on_futures", (True, False))
def test_handles_exceptions(sentry_init, capture_events, record_exceptions_on_futures):
sentry_init(
default_integrations=False,
integrations=[
ConcurrentIntegration(
record_exceptions_on_futures=record_exceptions_on_futures
)
],
)
events = capture_events()

def crash():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(crash)
with pytest.raises(ZeroDivisionError):
future.result()

if record_exceptions_on_futures:
(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "concurrent"
assert not exception["mechanism"]["handled"]
else:
assert not events


# ThreadPoolExecutor uses threading, but catches exceptions before the Sentry threading integration
@pytest.mark.parametrize(
"potentially_conflicting_integrations",
[
[ThreadingIntegration(propagate_scope=True)],
[ThreadingIntegration(propagate_scope=False)],
[],
],
)
def test_threading_enabled_no_duplicate(
sentry_init, capture_events, potentially_conflicting_integrations
):
sentry_init(
default_integrations=False,
integrations=[
ConcurrentIntegration(),
]
+ potentially_conflicting_integrations,
)
events = capture_events()

def crash():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(crash)
with pytest.raises(ZeroDivisionError):
future.result()

(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "concurrent"
assert not exception["mechanism"]["handled"]


def test_concurrent_deduplicates(
sentry_init, capture_events, capture_record_lost_event_calls
):
sentry_init(
default_integrations=False,
integrations=[
ExcepthookIntegration(),
DedupeIntegration(),
ConcurrentIntegration(),
],
)
events = capture_events()
record_lost_event_calls = capture_record_lost_event_calls()

def crash():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(crash)
try:
future.result()
except Exception:
sentry_sdk.capture_exception()

(event,) = events
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"

(lost_event_call,) = record_lost_event_calls
assert lost_event_call == ("event_processor", "error", None, 1)


def test_propagates_tag(sentry_init, capture_events):
sentry_init(
default_integrations=False,
integrations=[ConcurrentIntegration()],
)
events = capture_events()

def stage1():
sentry_sdk.get_isolation_scope().set_tag("stage1", "true")
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(stage2)
with pytest.raises(ZeroDivisionError):
future.result()

def stage2():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(stage1)
future.result()

(event,) = events

(exception,) = event["exception"]["values"]

assert exception["type"] == "ZeroDivisionError"
assert exception["mechanism"]["type"] == "concurrent"
assert not exception["mechanism"]["handled"]

assert event["tags"]["stage1"] == "true"


def test_propagates_threadpool_scope(sentry_init, capture_events):
sentry_init(
default_integrations=False,
Expand Down Expand Up @@ -65,6 +200,23 @@ def double(number):
assert event["spans"][3]["trace_id"] == event["spans"][0]["trace_id"]


def test_double_patching(sentry_init, capture_events):
sentry_init(integrations=[ConcurrentIntegration()])
events = capture_events()

def run():
1 / 0

with futures.ThreadPoolExecutor(max_workers=1) as executor:
for _ in range(10):
executor.submit(run)

assert len(events) == 10
for event in events:
(exception,) = event["exception"]["values"]
assert exception["type"] == "ZeroDivisionError"


def test_scope_data_not_leaked_in_executor(sentry_init):
sentry_init(
integrations=[ConcurrentIntegration()],
Expand Down