-
Notifications
You must be signed in to change notification settings - Fork 931
Fix error propagation rule for Python's C API #2019
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 4 commits
c5bdee6
cb71833
42714b5
5f29f12
e693fac
f8dc43a
7d0822d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1575,6 +1575,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, | |
if (result) | ||
Py_DECREF(result); | ||
else { | ||
|
||
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); | ||
|
||
CallState_crash(cs); | ||
rd_kafka_yield(rk); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -1755,6 +1755,8 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) | |
if (result) | ||
Py_DECREF(result); | ||
else { | ||
|
||
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); | ||
crash: | ||
CallState_crash(cs); | ||
rd_kafka_yield(h->rk); | ||
|
@@ -1808,6 +1810,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker | |
/* throttle_cb executed successfully */ | ||
Py_DECREF(result); | ||
goto done; | ||
} else { | ||
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); | ||
} | ||
|
||
/** | ||
|
@@ -1839,6 +1843,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { | |
if (result) | ||
Py_DECREF(result); | ||
else { | ||
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); | ||
CallState_crash(cs); | ||
rd_kafka_yield(h->rk); | ||
} | ||
|
@@ -1874,6 +1879,7 @@ static void log_cb (const rd_kafka_t *rk, int level, | |
if (result) | ||
Py_DECREF(result); | ||
else { | ||
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback); | ||
CallState_crash(cs); | ||
rd_kafka_yield(h->rk); | ||
} | ||
|
@@ -2572,6 +2578,9 @@ void CallState_begin (Handle *h, CallState *cs) { | |
cs->thread_state = PyEval_SaveThread(); | ||
assert(cs->thread_state != NULL); | ||
cs->crashed = 0; | ||
cs->exception_type = NULL; | ||
cs->exception_value = NULL; | ||
cs->exception_traceback = NULL; | ||
#ifdef WITH_PY_TSS | ||
PyThread_tss_set(&h->tlskey, cs); | ||
#else | ||
|
@@ -2592,8 +2601,19 @@ int CallState_end (Handle *h, CallState *cs) { | |
|
||
PyEval_RestoreThread(cs->thread_state); | ||
|
||
if (PyErr_CheckSignals() == -1 || cs->crashed) | ||
if (PyErr_CheckSignals() == -1) | ||
return 0; | ||
|
||
if (cs->crashed) { | ||
/* Restore the saved exception if we have one */ | ||
if (cs->exception_type) { | ||
PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback); | ||
|
||
cs->exception_type = NULL; | ||
cs->exception_value = NULL; | ||
cs->exception_traceback = NULL; | ||
} | ||
return 0; | ||
} | ||
|
||
return 1; | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,8 @@ | |
def dummy_commit_cb(err, partitions): | ||
pass | ||
|
||
kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100', | ||
Check failure on line 23 in tests/test_Consumer.py
|
||
'session.timeout.ms': 1000, # Avoid close() blocking too long | ||
'on_commit': dummy_commit_cb}) | ||
|
||
kc.subscribe(["test"]) | ||
|
@@ -324,3 +324,38 @@ | |
with pytest.raises(ValueError) as ex: | ||
TestConsumer({'bootstrap.servers': "mybroker:9092"}) | ||
assert ex.match('group.id must be set') | ||
|
||
|
||
def test_callback_exception_no_system_error(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would suggest adding more test cases related to librdkafka layer, C binding layer and python layer. |
||
|
||
exception_raised = [] | ||
|
||
def error_cb_that_raises(error): | ||
"""Error callback that raises an exception""" | ||
exception_raised.append(error) | ||
raise RuntimeError("Test exception from error_cb") | ||
|
||
# Create consumer with error callback that raises exception | ||
consumer = TestConsumer({ | ||
'group.id': 'test-callback-systemerror-fix', | ||
'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error | ||
'socket.timeout.ms': 100, | ||
'session.timeout.ms': 1000, | ||
'error_cb': error_cb_that_raises | ||
}) | ||
|
||
consumer.subscribe(['test-topic']) | ||
|
||
# This should trigger the error callback due to connection failure | ||
# Before fix: Would get RuntimeError + SystemError (Issue #865) | ||
# After fix: Should only get RuntimeError (no SystemError) | ||
with pytest.raises(RuntimeError) as exc_info: | ||
consumer.consume(timeout=0.1) | ||
|
||
# Verify we got the expected exception message | ||
assert "Test exception from error_cb" in str(exc_info.value) | ||
|
||
# Verify the error callback was actually called | ||
assert len(exception_raised) > 0 | ||
|
||
consumer.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra line. From other places as well.