Skip to content

Commit b74eba0

Browse files
authored
Add optional timeout argument to Producer.flush() (#105)
Requires librdkafka >= v0.9.2
1 parent 0af5127 commit b74eba0

File tree

2 files changed

+33
-8
lines changed

2 files changed

+33
-8
lines changed

confluent_kafka/src/Producer.c

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -408,12 +408,31 @@ static PyObject *Producer_poll (Handle *self, PyObject *args,
408408
}
409409

410410

411-
static PyObject *Producer_flush (Handle *self, PyObject *ignore) {
412-
while (rd_kafka_outq_len(self->rk) > 0) {
413-
if (Producer_poll0(self, 500) == -1)
414-
return NULL;
415-
}
416-
Py_RETURN_NONE;
411+
static PyObject *Producer_flush (Handle *self, PyObject *args,
412+
PyObject *kwargs) {
413+
double tmout = -1;
414+
int qlen;
415+
static char *kws[] = { "timeout", NULL };
416+
#if RD_KAFKA_VERSION >= 0x00090300
417+
CallState cs;
418+
#endif
419+
420+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
421+
return NULL;
422+
423+
#if RD_KAFKA_VERSION >= 0x00090300
424+
CallState_begin(self, &cs);
425+
rd_kafka_flush(self->rk, tmout < 0 ? -1 : (int)(tmout * 1000));
426+
if (!CallState_end(self, &cs))
427+
return NULL;
428+
qlen = rd_kafka_outq_len(self->rk);
429+
#else
430+
while ((qlen = rd_kafka_outq_len(self->rk)) > 0) {
431+
if (Producer_poll0(self, 500) == -1)
432+
return NULL;
433+
}
434+
#endif
435+
return PyLong_FromLong(qlen);
417436
}
418437

419438

@@ -463,11 +482,16 @@ static PyMethodDef Producer_methods[] = {
463482
"\n"
464483
},
465484

466-
{ "flush", (PyCFunction)Producer_flush, METH_NOARGS,
485+
{ "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS,
486+
".. py:function:: flush([timeout])\n"
487+
"\n"
467488
" Wait for all messages in the Producer queue to be delivered.\n"
468489
" This is a convenience method that calls :py:func:`poll()` until "
469-
":py:func:`len()` is zero.\n"
490+
":py:func:`len()` is zero or the optional timeout elapses.\n"
470491
"\n"
492+
" :param: float timeout: Maximum time to block (requires librdkafka >= v0.9.3).\n"
493+
" :returns: Number of messages still in queue.\n"
494+
"\n"
471495
".. note:: See :py:func:`poll()` for a description on what "
472496
"callbacks may be triggered.\n"
473497
"\n"

tests/test_Producer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def on_delivery(err,msg):
3232

3333
p.poll(0.001)
3434

35+
p.flush(0.002)
3536
p.flush()
3637

3738

0 commit comments

Comments
 (0)