Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

v2.11.0 is a feature release with the following enhancements:

### Fixes
- Fix error propagation rule for Python's C API to prevent SystemError when callbacks raise exceptions (#865)

confluent-kafka-python v2.11.0 is based on librdkafka v2.11.0, see the
[librdkafka release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0)
for a complete list of changes, enhancements, fixes and upgrade considerations.
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/src/Consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1575,6 +1575,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
if (result)
Py_DECREF(result);
else {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra line. From other places as well.

PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PyErr_Fetch is deprecated since 3.12. Its suggested to use PyErr_GetRaisedException() instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to check if PyErr_SetRaisedException(exc); is necessary as well to set the context temporarily rather than passing it into GetRaisedException

CallState_crash(cs);
rd_kafka_yield(rk);
}
Expand Down
2 changes: 2 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
if (result)
Py_DECREF(result);
else {

PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
CallState_crash(cs);
rd_kafka_yield(rk);
}
Expand Down
22 changes: 21 additions & 1 deletion src/confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -1755,6 +1755,8 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
if (result)
Py_DECREF(result);
else {

PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
crash:
CallState_crash(cs);
rd_kafka_yield(h->rk);
Expand Down Expand Up @@ -1808,6 +1810,8 @@ static void throttle_cb (rd_kafka_t *rk, const char *broker_name, int32_t broker
/* throttle_cb executed successfully */
Py_DECREF(result);
goto done;
} else {
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
}

/**
Expand Down Expand Up @@ -1839,6 +1843,7 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
if (result)
Py_DECREF(result);
else {
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand Down Expand Up @@ -1874,6 +1879,7 @@ static void log_cb (const rd_kafka_t *rk, int level,
if (result)
Py_DECREF(result);
else {
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand Down Expand Up @@ -2572,6 +2578,9 @@ void CallState_begin (Handle *h, CallState *cs) {
cs->thread_state = PyEval_SaveThread();
assert(cs->thread_state != NULL);
cs->crashed = 0;
cs->exception_type = NULL;
cs->exception_value = NULL;
cs->exception_traceback = NULL;
#ifdef WITH_PY_TSS
PyThread_tss_set(&h->tlskey, cs);
#else
Expand All @@ -2592,8 +2601,19 @@ int CallState_end (Handle *h, CallState *cs) {

PyEval_RestoreThread(cs->thread_state);

if (PyErr_CheckSignals() == -1 || cs->crashed)
if (PyErr_CheckSignals() == -1)
return 0;

if (cs->crashed) {
/* Restore the saved exception if we have one */
if (cs->exception_type) {
PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly you'll need to use https://docs.python.org/3/c-api/exceptions.html#c.PyErr_SetRaisedException here instead as PyErr_Restore is also deprecated

cs->exception_type = NULL;
cs->exception_value = NULL;
cs->exception_traceback = NULL;
}
return 0;
}

return 1;
}
Expand Down
52 changes: 52 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,60 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg);
typedef struct {
PyThreadState *thread_state;
int crashed; /* Callback crashed */
PyObject *exception_type; /* Stored exception type */
PyObject *exception_value; /* Stored exception value */
PyObject *exception_traceback; /* Stored exception traceback */
} CallState;

/**
* @brief Compatibility layer for Python exception handling API changes.
* PyErr_Fetch was deprecated in Python 3.12 in favor of PyErr_GetRaisedException.
*/
#if PY_VERSION_HEX >= 0x030c0000
/* Python 3.12+ - use new API */
static inline void
CallState_fetch_exception(CallState *cs) {
PyObject *exc = PyErr_GetRaisedException();
if (exc) {
cs->exception_type = (PyObject *)Py_TYPE(exc);
Py_INCREF(cs->exception_type);
cs->exception_value = exc;
cs->exception_traceback = PyException_GetTraceback(exc);
} else {
cs->exception_type = NULL;
cs->exception_value = NULL;
cs->exception_traceback = NULL;
}
}

static inline void
CallState_restore_exception(CallState *cs) {
if (cs->exception_value) {
PyErr_SetRaisedException(cs->exception_value);
/* PyErr_SetRaisedException steals the reference, so clear our pointer */
cs->exception_value = NULL;
Py_XDECREF(cs->exception_type);
cs->exception_type = NULL;
Py_XDECREF(cs->exception_traceback);
cs->exception_traceback = NULL;
}
}
#else
/* Python < 3.12 - use legacy API */
static inline void
CallState_fetch_exception(CallState *cs) {
PyErr_Fetch(&cs->exception_type, &cs->exception_value, &cs->exception_traceback);
}

static inline void
CallState_restore_exception(CallState *cs) {
PyErr_Restore(cs->exception_type, cs->exception_value, cs->exception_traceback);
cs->exception_type = NULL;
cs->exception_value = NULL;
cs->exception_traceback = NULL;
}
#endif

/**
* @brief Initialiase a CallState and unlock the GIL prior to a
* possibly blocking external call.
Expand Down
35 changes: 35 additions & 0 deletions tests/test_Consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
def dummy_commit_cb(err, partitions):
pass

kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100',

Check failure on line 23 in tests/test_Consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Consumer.py#L23

Define a constant instead of duplicating this literal 'socket.timeout.ms' 8 times.

Check failure on line 23 in tests/test_Consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Consumer.py#L23

Define a constant instead of duplicating this literal 'group.id' 8 times.
'session.timeout.ms': 1000, # Avoid close() blocking too long

Check failure on line 24 in tests/test_Consumer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Consumer.py#L24

Define a constant instead of duplicating this literal 'session.timeout.ms' 8 times.
'on_commit': dummy_commit_cb})

kc.subscribe(["test"])
Expand Down Expand Up @@ -324,3 +324,38 @@
with pytest.raises(ValueError) as ex:
TestConsumer({'bootstrap.servers': "mybroker:9092"})
assert ex.match('group.id must be set')


def test_callback_exception_no_system_error():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest adding more test cases related to librdkafka layer, C binding layer and python layer.


exception_raised = []

def error_cb_that_raises(error):
"""Error callback that raises an exception"""
exception_raised.append(error)
raise RuntimeError("Test exception from error_cb")

# Create consumer with error callback that raises exception
consumer = TestConsumer({
'group.id': 'test-callback-systemerror-fix',
'bootstrap.servers': 'nonexistent-broker:9092', # Will trigger error
'socket.timeout.ms': 100,
'session.timeout.ms': 1000,
'error_cb': error_cb_that_raises
})

consumer.subscribe(['test-topic'])

# This should trigger the error callback due to connection failure
# Before fix: Would get RuntimeError + SystemError (Issue #865)
# After fix: Should only get RuntimeError (no SystemError)
with pytest.raises(RuntimeError) as exc_info:
consumer.consume(timeout=0.1)

# Verify we got the expected exception message
assert "Test exception from error_cb" in str(exc_info.value)

# Verify the error callback was actually called
assert len(exception_raised) > 0

consumer.close()
31 changes: 31 additions & 0 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
p = Producer()
assert ex.match('expected configuration dict')

p = Producer({'socket.timeout.ms': 10,

Check failure on line 24 in tests/test_Producer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Producer.py#L24

Define a constant instead of duplicating this literal 'socket.timeout.ms' 5 times.
'error_cb': error_cb,
'message.timeout.ms': 10})

Check failure on line 26 in tests/test_Producer.py

View check run for this annotation

SonarQube-Confluent / confluent-kafka-python Sonarqube Results

tests/test_Producer.py#L26

Define a constant instead of duplicating this literal 'message.timeout.ms' 5 times.

p.produce('mytopic')
p.produce('mytopic', value='somedata', key='a key')
Expand Down Expand Up @@ -283,3 +283,34 @@

p = Producer({})
assert bool(p)


def test_callback_exception_no_system_error():
delivery_reports = []

def delivery_cb_that_raises(err, msg):
"""Delivery report callback that raises an exception"""
delivery_reports.append((err, msg))
raise RuntimeError("Test exception from delivery_cb")

producer = Producer({
'bootstrap.servers': 'nonexistent-broker:9092', # Will cause delivery failures
'socket.timeout.ms': 100,
'message.timeout.ms': 10, # Very short timeout to trigger delivery failure quickly
'on_delivery': delivery_cb_that_raises
})

# Produce a message - this will trigger delivery report callback when it fails
producer.produce('test-topic', value='test-message')

# Flush to ensure delivery reports are processed
# Before fix: Would get RuntimeError + SystemError (Issue #865)
# After fix: Should only get RuntimeError (no SystemError)
with pytest.raises(RuntimeError) as exc_info:
producer.flush(timeout=2.0) # Longer timeout to ensure delivery callback fires

# Verify we got an exception from our callback
assert "Test exception from delivery_cb" in str(exc_info.value)

# Verify the delivery callback was actually called
assert len(delivery_reports) > 0