Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,10 @@ ignore_missing_imports = true
module = "agents.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "dramatiq.*"
ignore_missing_imports = true

#
# Tool: Flake8
#
Expand Down
3 changes: 3 additions & 0 deletions scripts/populate_tox/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@
},
"dramatiq": {
"package": "dramatiq",
"deps": {
"py3.6": ["aiocontextvars"],
},
},
"falcon": {
"package": "falcon",
Expand Down
1 change: 1 addition & 0 deletions sentry_sdk/consts.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ class OP:
QUEUE_TASK_HUEY = "queue.task.huey"
QUEUE_SUBMIT_RAY = "queue.submit.ray"
QUEUE_TASK_RAY = "queue.task.ray"
QUEUE_TASK_DRAMATIQ = "queue.task.dramatiq"
SUBPROCESS = "subprocess"
SUBPROCESS_WAIT = "subprocess.wait"
SUBPROCESS_COMMUNICATE = "subprocess.communicate"
Expand Down
118 changes: 84 additions & 34 deletions sentry_sdk/integrations/dramatiq.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,34 @@
import json

import sentry_sdk
from sentry_sdk.integrations import Integration
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.api import continue_trace, get_baggage, get_traceparent
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds
from sentry_sdk.tracing import (
BAGGAGE_HEADER_NAME,
SENTRY_TRACE_HEADER_NAME,
TransactionSource,
)
from sentry_sdk.utils import (
AnnotatedValue,
capture_internal_exceptions,
event_from_exception,
ContextVar,
HAS_REAL_CONTEXTVARS,
CONTEXTVARS_ERROR_MESSAGE,
)
from typing import TypeVar

R = TypeVar("R")

from dramatiq.broker import Broker # type: ignore
from dramatiq.message import Message # type: ignore
from dramatiq.middleware import Middleware, default_middleware # type: ignore
from dramatiq.errors import Retry # type: ignore
try:
from dramatiq.broker import Broker
from dramatiq.middleware import Middleware, default_middleware
from dramatiq.errors import Retry
from dramatiq.message import Message
except ImportError:
raise DidNotEnable("Dramatiq is not installed")

from typing import TYPE_CHECKING

Expand All @@ -38,6 +54,12 @@ class DramatiqIntegration(Integration):
@staticmethod
def setup_once():
# type: () -> None
if not HAS_REAL_CONTEXTVARS:
raise DidNotEnable(
"The dramatiq integration for Sentry requires Python 3.7+ "
" or aiocontextvars package." + CONTEXTVARS_ERROR_MESSAGE
)

_patch_dramatiq_broker()


Expand Down Expand Up @@ -85,50 +107,78 @@ class SentryMiddleware(Middleware): # type: ignore[misc]
DramatiqIntegration.
"""

_transaction = ContextVar("_transaction")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I already may have asked this in an earlier iteration, but do we need the ContextVar here? The scopes themselves are stored in ContextVars and ideally they should govern the reference to the current transaction (this should already be working as long as the transaction/span is correctly set on the scope, which it is as long as you use start_span/transaction context manager or just __enter__ directly)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It makes sense.
Looks like I really needn't store transaction separately and I can get it directly from scope:

transaction = sentry_sdk.get_current_scope().transaction

Right? =)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, that'd be the idea.


def before_enqueue(self, broker, message, delay):
# type: (Broker, Message[R], int) -> None
message.options["sentry_headers"] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I don't think we're really in risk of collisions here, but let's maybe call this _sentry_headers just to denote it's something internal

Suggested change
message.options["sentry_headers"] = {
message.options["_sentry_headers"] = {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

BAGGAGE_HEADER_NAME: get_baggage(),
SENTRY_TRACE_HEADER_NAME: get_traceparent(),
}

def before_process_message(self, broker, message):
# type: (Broker, Message) -> None
# type: (Broker, Message[R]) -> None
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
if integration is None:
return

message._scope_manager = sentry_sdk.new_scope()
message._scope_manager.__enter__()
Comment on lines -94 to -95
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need some sort of scope management in order to make sure the data we collect about tasks is isolated.

The general rule of thumb is: if you start a transaction, you should start it in a new isolation scope. See for example huey.

So we should start an isolation scope right after the initial if integration is None: return check with

scope = sentry_sdk.isolation_scope()
message._scope_manager = scope
scope.__enter__()

Everything that we do on the scope later in the function can stay, but it should be done on the isolation scope, not current scope as before.

And finally, we need to __exit__ the saved scope in after_process_message with message._scope_manager.__exit__(None, None, None).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.
but please recheck it)


scope = sentry_sdk.get_current_scope()
scope.set_transaction_name(message.actor_name)
scope.set_extra("dramatiq_message_id", message.message_id)
scope.set_tag("dramatiq_message_id", message.message_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SDK shouldn't set tags on its own (we still do this in a couple places, but are avoiding it for new code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little bit confused here. What do you mean exactly by "SDK shouldn't set tags"?
Am I do it wrong now or you wanna say setting tags is only allowed for client's code and no tags allowed in SDK?
I definitely want to set dramatiq_message_id for each task like it happens in celery integration:
image

message_id is extremely helpful in investigating task failures

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setting tags is only allowed for client's code and no tags allowed in SDK?

This. Going forward, we're not setting tags in the SDK. You can still set dramatiq_message_id with e.g. span.set_data() and it should be searchable.

scope.clear_breadcrumbs()
scope.add_event_processor(_make_message_event_processor(message, integration))

transaction = continue_trace(
message.options.get("sentry_headers") or {},
name=message.actor_name,
op=OP.QUEUE_TASK_DRAMATIQ,
source=TransactionSource.TASK,
# origin=DramatiqIntegration.origin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The origin looks good, any reason it's commented out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reasons. Will fix it. Thanks

)
transaction.set_status(SPANSTATUS.OK)
sentry_sdk.start_transaction(
transaction,
name=message.actor_name,
op=OP.QUEUE_TASK_DRAMATIQ,
source=TransactionSource.TASK,
)
transaction.__enter__()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Transaction Initialization Redundancy

The transaction initialization in before_process_message uses an incorrect pattern. It creates a transaction with continue_trace(), then passes this existing transaction object to sentry_sdk.start_transaction(), which is designed to create new transactions. This leads to redundant initialization, a manual transaction.__enter__() call, and causes the origin parameter set by continue_trace() to be lost.

Fix in Cursor Fix in Web

self._transaction.set(transaction)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Dramatiq Integration Fails Transaction Initialization

The Dramatiq integration incorrectly initializes transactions. It calls continue_trace() to create a transaction, then redundantly calls sentry_sdk.start_transaction() with that same transaction object, and finally manually calls transaction.__enter__(). This mixed lifecycle management can lead to improper transaction initialization, inconsistent state, or broken distributed tracing.

Fix in Cursor Fix in Web


def after_process_message(self, broker, message, *, result=None, exception=None):
# type: (Broker, Message, Any, Optional[Any], Optional[Exception]) -> None
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration)
if integration is None:
return
Comment on lines 104 to 106
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please keep this -- it's a fallback in case the integration/the SDK/the client has been deactivated in the meantime. In that case we shouldn't do anything.

We should have this also in the new before_enqueue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but I believe it's unnecessary due to this: https://github.com/getsentry/sentry-python/pull/4571/files#diff-2722e3fe31f13ffc24072313765f1fc89f0f0721154b7ca072bb46b1f9573f5bR84
Middleware won't be in use if integration is not enabled

# type: (Broker, Message[R], Optional[Any], Optional[Exception]) -> None

actor = broker.get_actor(message.actor_name)
throws = message.options.get("throws") or actor.options.get("throws")

try:
if (
exception is not None
and not (throws and isinstance(exception, throws))
and not isinstance(exception, Retry)
):
event, hint = event_from_exception(
exception,
client_options=sentry_sdk.get_client().options,
mechanism={
"type": DramatiqIntegration.identifier,
"handled": False,
},
)
sentry_sdk.capture_event(event, hint=hint)
finally:
message._scope_manager.__exit__(None, None, None)
transaction = self._transaction.get(None)
if not transaction:
return None

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: SentryMiddleware Scope Leak

The SentryMiddleware creates a scope leak. In after_process_message, if no transaction is found, the isolation scope entered in before_process_message is not properly exited, leading to a resource leak.

Fix in Cursor Fix in Web

is_event_capture_required = (
exception is not None
and not (throws and isinstance(exception, throws))
and not isinstance(exception, Retry)
)
if not is_event_capture_required:
# normal transaction finish
transaction.__exit__(None, None, None)
return

event, hint = event_from_exception(
exception, # type: ignore[arg-type]
client_options=sentry_sdk.get_client().options,
mechanism={
"type": DramatiqIntegration.identifier,
"handled": False,
},
)
sentry_sdk.capture_event(event, hint=hint)
# transaction error
transaction.__exit__(type(exception), exception, None)


def _make_message_event_processor(message, integration):
# type: (Message, DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]
# type: (Message[R], DramatiqIntegration) -> Callable[[Event, Hint], Optional[Event]]

def inner(event, hint):
# type: (Event, Hint) -> Optional[Event]
Expand All @@ -142,7 +192,7 @@ def inner(event, hint):

class DramatiqMessageExtractor:
def __init__(self, message):
# type: (Message) -> None
# type: (Message[R]) -> None
self.message_data = dict(message.asdict())

def content_length(self):
Expand Down
71 changes: 60 additions & 11 deletions tests/integrations/dramatiq/test_dramatiq.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,21 @@
from dramatiq.brokers.stub import StubBroker

import sentry_sdk
from sentry_sdk.tracing import TransactionSource
from sentry_sdk import start_transaction
from sentry_sdk.consts import SPANSTATUS
from sentry_sdk.integrations.dramatiq import DramatiqIntegration

# from sentry_sdk.integrations.logging import LoggingIntegration

@pytest.fixture
def broker(sentry_init):
sentry_init(integrations=[DramatiqIntegration()])

@pytest.fixture(scope="function")
def broker(request, sentry_init):
sentry_init(
integrations=[DramatiqIntegration()],
traces_sample_rate=getattr(request, "param", None),
# disabled_integrations=[LoggingIntegration()],
)
broker = StubBroker()
broker.emit_after("process_boot")
dramatiq.set_broker(broker)
Expand Down Expand Up @@ -44,22 +53,61 @@ def dummy_actor(x, y):
assert exception["type"] == "ZeroDivisionError"


def test_that_actor_name_is_set_as_transaction(broker, worker, capture_events):
@pytest.mark.parametrize(
"broker,expected_span_status",
[
(1.0, SPANSTATUS.INTERNAL_ERROR),
(1.0, SPANSTATUS.OK),
],
ids=["error", "success"],
indirect=["broker"],
)
def test_task_transaction(broker, worker, capture_events, expected_span_status):
events = capture_events()
task_fails = expected_span_status == SPANSTATUS.INTERNAL_ERROR

@dramatiq.actor(max_retries=0)
def dummy_actor(x, y):
return x / y

dummy_actor.send(1, 0)
dummy_actor.send(1, int(not task_fails))
broker.join(dummy_actor.queue_name)
worker.join()

if task_fails:
error_event = events.pop(0)
exception = error_event["exception"]["values"][0]
assert exception["type"] == "ZeroDivisionError"
# todo: failed assert. Logging instead of dramatiq
# assert exception["mechanism"]["type"] == DramatiqIntegration.identifier

(event,) = events
assert event["type"] == "transaction"
assert event["transaction"] == "dummy_actor"
assert event["transaction_info"] == {"source": TransactionSource.TASK}
assert event["contexts"]["trace"]["status"] == expected_span_status


def test_that_dramatiq_message_id_is_set_as_extra(broker, worker, capture_events):
@pytest.mark.parametrize("broker", [1.0], indirect=True)
def test_dramatiq_propagate_trace(broker, worker, capture_events):
events = capture_events()

@dramatiq.actor(max_retries=0)
def propagated_trace_task():
pass

with start_transaction() as outer_transaction:
propagated_trace_task.send()
broker.join(propagated_trace_task.queue_name)
worker.join()

assert (
events[0]["transaction"] == "propagated_trace_task"
) # the "inner" transaction
assert events[0]["contexts"]["trace"]["trace_id"] == outer_transaction.trace_id


def test_that_dramatiq_message_id_is_set_as_tag(broker, worker, capture_events):
events = capture_events()

@dramatiq.actor(max_retries=0)
Expand All @@ -72,13 +120,14 @@ def dummy_actor(x, y):
worker.join()

event_message, event_error = events
assert "dramatiq_message_id" in event_message["extra"]
assert "dramatiq_message_id" in event_error["extra"]

assert "dramatiq_message_id" in event_message["tags"]
assert "dramatiq_message_id" in event_error["tags"]
assert (
event_message["extra"]["dramatiq_message_id"]
== event_error["extra"]["dramatiq_message_id"]
event_message["tags"]["dramatiq_message_id"]
== event_error["tags"]["dramatiq_message_id"]
)
msg_ids = [e["extra"]["dramatiq_message_id"] for e in events]
msg_ids = [e["tags"]["dramatiq_message_id"] for e in events]
assert all(uuid.UUID(msg_id) and isinstance(msg_id, str) for msg_id in msg_ids)


Expand Down
1 change: 1 addition & 0 deletions tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ deps =
dramatiq-v1.12.3: dramatiq==1.12.3
dramatiq-v1.15.0: dramatiq==1.15.0
dramatiq-v1.18.0: dramatiq==1.18.0
{py3.6}-dramatiq: aiocontextvars

huey-v2.1.3: huey==2.1.3
huey-v2.2.0: huey==2.2.0
Expand Down