Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions DEVELOPER.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ If librdkafka is installed in a non-standard location provide the include and li
C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
```

On MacOS, If you installed librdkafka with brew, you can use the following
```bash
export C_INCLUDE_PATH=$(brew --prefix librdkafka)/include
export LIBRARY_PATH=$(brew --prefix librdkafka)/lib
```

4. **Install confluent-kafka-python with optional dependencies**
```bash
pip3 install -e .[dev,tests,docs]
Expand Down Expand Up @@ -87,6 +93,26 @@ python3 tools/unasync.py --check
If you make any changes to the async code (in `src/confluent_kafka/schema_registry/_async` and `tests/integration/schema_registry/_async`), you **must** run this script to generate the sync counter parts (in `src/confluent_kafka/schema_registry/_sync` and `tests/integration/schema_registry/_sync`). Otherwise, this script will be run in CI with the --check flag and fail the build.


## Local Setup with UV

Tested with python 3.11

```bash
# Modify pyproject.toml to require python version >=3.11
# This fixes the cel-python dependency conflict
uv venv --python 3.11
source .venv/bin/activate

uv sync --extra dev --extra tests
uv pip install trivup setuptools
pytest tests/

# When making changes, change project.version in pyproject.toml before re-running:
uv sync --extra dev --extra tests

```


## Tests


Expand Down
46 changes: 45 additions & 1 deletion src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,42 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
return cfl_PyInt_FromInt(qlen);
}


static PyObject *Producer_close(Handle *self, PyObject *args, PyObject *kwargs) {

CallState cs;

if (!self->rk)
Py_RETURN_TRUE;

CallState_begin(self, &cs);

/* Warn if there are pending messages */
int outq_len = rd_kafka_outq_len(self->rk);
if (outq_len > 0) {
const char msg[150];
sprintf(msg, "There are %d message(s) still in producer queue! "
"Use flush() or wait for delivery.", outq_len);
rd_kafka_log_print(
self->rk,
CK_LOG_WARNING,
"CLOSWARN",
msg
);
}
rd_kafka_destroy(self->rk);
rd_kafka_log_print(self->rk, CK_LOG_INFO, "CLOSEINF", "Producer destroy requested");

self->rk = NULL;

if (!CallState_end(self, &cs))
return NULL;

Py_RETURN_TRUE;

}


static PyObject *Producer_init_transactions (Handle *self, PyObject *args) {
CallState cs;
rd_kafka_error_t *error;
Expand Down Expand Up @@ -609,7 +645,15 @@ static PyMethodDef Producer_methods[] = {
" :rtype: int\n"
"\n"
},

{ "close", (PyCFunction)Producer_close, METH_VARARGS|METH_KEYWORDS,
".. py:function:: close()\n"
"\n"
" Request to close the producer on demand.\n"
"\n"
" :rtype: bool\n"
" :returns: True if producer close requested successfully, False otherwise\n"
"\n"
},
{ "flush", (PyCFunction)Producer_flush, METH_VARARGS|METH_KEYWORDS,
".. py:function:: flush([timeout])\n"
"\n"
Expand Down
10 changes: 10 additions & 0 deletions src/confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@
#endif


#define CK_LOG_EMERG 0
#define CK_LOG_ALERT 1
#define CK_LOG_CRIT 2
#define CK_LOG_ERR 3
#define CK_LOG_WARNING 4
#define CK_LOG_NOTICE 5
#define CK_LOG_INFO 6
#define CK_LOG_DEBUG 7



/****************************************************************************
*
Expand Down
28 changes: 26 additions & 2 deletions tests/test_Producer.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pytest
import json
from struct import pack

import pytest

from confluent_kafka import Producer, KafkaError, KafkaException, \
TopicPartition, libversion

from tests.common import TestConsumer


Expand Down Expand Up @@ -47,6 +48,8 @@ def on_delivery(err, msg):
except KafkaException as e:
assert e.args[0].code() in (KafkaError._TIMED_OUT, KafkaError._TRANSPORT)

assert p.close(), "Failed to validate that producer was closed."


def test_produce_timestamp():
""" Test produce() with timestamp arg """
Expand Down Expand Up @@ -239,6 +242,8 @@ def test_transaction_api():
assert ex.value.args[0].fatal() is False
assert ex.value.args[0].txn_requires_abort() is False

assert p.close(), "The producer was not closed"


def test_purge():
"""
Expand Down Expand Up @@ -274,6 +279,8 @@ def on_delivery(err, msg):
p.flush(0.002)
assert cb_detector["on_delivery_called"]

assert p.close(), "The producer was not closed"


def test_producer_bool_value():
"""
Expand All @@ -283,3 +290,20 @@ def test_producer_bool_value():

p = Producer({})
assert bool(p)
assert p.close(), "The producer was not fully closed"


def test_producer_close():
"""
Ensures the producer close can be requested on demand
"""
conf = {
'debug': 'all',
'socket.timeout.ms': 10,
'error_cb': error_cb,
'message.timeout.ms': 10
}
producer = Producer(conf)
msg = {"test": "test"}
producer.produce(json.dumps(msg))
assert producer.close(), "The producer could nto be closed on demand"