Skip to content

Commit 7bef980

Browse files
authored
Merge pull request #113 from confluentinc/callstate_destroy
Callstate destroy
2 parents b74eba0 + 6cf1fa4 commit 7bef980

File tree

3 files changed

+24
-19
lines changed

3 files changed

+24
-19
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -472,19 +472,16 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
472472

473473
static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
474474
CallState cs;
475-
int raise = 0;
476475

477476
CallState_begin(self, &cs);
478477

479478
rd_kafka_consumer_close(self->rk);
480479

481-
raise = !CallState_end(self, &cs);
482-
483480
rd_kafka_destroy(self->rk);
484481
self->rk = NULL;
485482

486-
if (raise)
487-
return NULL;
483+
if (!CallState_end(self, &cs))
484+
return NULL;
488485

489486
Py_RETURN_NONE;
490487
}

confluent_kafka/src/Producer.c

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,14 @@ static void Producer_dealloc (Handle *self) {
108108

109109
Producer_clear(self);
110110

111-
if (self->rk)
112-
rd_kafka_destroy(self->rk);
111+
if (self->rk) {
112+
CallState cs;
113+
CallState_begin(self, &cs);
114+
115+
rd_kafka_destroy(self->rk);
116+
117+
CallState_end(self, &cs);
118+
}
113119

114120
Py_TYPE(self)->tp_free((PyObject *)self);
115121
}

examples/integration_test.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,17 +239,19 @@ def verify_producer_performance(with_dr_cb=True):
239239
bar = None
240240

241241
for i in range(0, msgcnt):
242-
try:
243-
if with_dr_cb:
244-
p.produce(topic, value=msg_payload, callback=dr.delivery)
245-
else:
246-
p.produce(topic, value=msg_payload)
247-
except BufferError as e:
248-
# Local queue is full (slow broker connection?)
249-
msgs_backpressure += 1
250-
if bar is not None and (msgs_backpressure % 1000) == 0:
251-
bar.next(n=0)
252-
p.poll(0)
242+
while True:
243+
try:
244+
if with_dr_cb:
245+
p.produce(topic, value=msg_payload, callback=dr.delivery)
246+
else:
247+
p.produce(topic, value=msg_payload)
248+
break
249+
except BufferError as e:
250+
# Local queue is full (slow broker connection?)
251+
msgs_backpressure += 1
252+
if bar is not None and (msgs_backpressure % 1000) == 0:
253+
bar.next(n=0)
254+
p.poll(100)
253255
continue
254256

255257
if bar is not None and (msgs_produced % 5000) == 0:
@@ -268,7 +270,7 @@ def verify_producer_performance(with_dr_cb=True):
268270
(msgs_produced, bytecnt / (1024*1024), t_produce_spent,
269271
msgs_produced / t_produce_spent,
270272
(bytecnt/t_produce_spent) / (1024*1024)))
271-
print('# %d messages not produce()d due to backpressure (local queue full)' % msgs_backpressure)
273+
print('# %d temporary produce() failures due to backpressure (local queue full)' % msgs_backpressure)
272274

273275
print('waiting for %d/%d deliveries' % (len(p), msgs_produced))
274276
# Wait for deliveries

0 commit comments

Comments
 (0)