Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
315 changes: 315 additions & 0 deletions src/confluent_kafka/src/Producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,277 @@ static PyObject *Producer_flush (Handle *self, PyObject *args,
return cfl_PyInt_FromInt(qlen);
}

/**
* @brief Validate arguments and parse all messages in the batch
* @param self Producer handle
* @param messages_list Python list of message dictionaries
* @param default_partition Default partition for messages
* @param default_dr_cb Default delivery callback
* @param rkmessages Output array of librdkafka messages (must be pre-allocated)
* @param msgstates Output array of message states (must be pre-allocated)
* @param message_cnt Output parameter for message count
* @returns 0 on success, -1 on error (with Python exception set)
*/
static int Producer_validate_and_parse_batch(Handle *self, PyObject *messages_list,
int default_partition, PyObject *default_dr_cb,
rd_kafka_message_t *rkmessages,
struct Producer_msgstate **msgstates,
int *message_cnt) {
int i;

/* Validate messages_list is a list */
if (!PyList_Check(messages_list)) {
PyErr_SetString(PyExc_TypeError, "messages must be a list");
return -1;
}

*message_cnt = (int)PyList_Size(messages_list);
if (*message_cnt == 0) {
return 0; /* Empty batch is valid */
}

/* Parse each message in the batch */
for (i = 0; i < *message_cnt; i++) {
PyObject *msg_dict = PyList_GetItem(messages_list, i);
PyObject *value_obj = NULL, *key_obj = NULL, *headers_obj = NULL;
PyObject *partition_obj = NULL, *timestamp_obj = NULL;
PyObject *callback_obj = NULL;
const char *value = NULL, *key = NULL;
Py_ssize_t value_len = 0, key_len = 0;
int msg_partition = default_partition;
PyObject *msg_dr_cb = default_dr_cb;
#ifdef RD_KAFKA_V_HEADERS
rd_kafka_headers_t *rd_headers = NULL;
#endif

if (!PyDict_Check(msg_dict)) {
PyErr_Format(PyExc_TypeError,
"Message at index %d must be a dict", i);
return -1;
}

/* Extract message fields */
value_obj = PyDict_GetItemString(msg_dict, "value");
key_obj = PyDict_GetItemString(msg_dict, "key");
headers_obj = PyDict_GetItemString(msg_dict, "headers");
partition_obj = PyDict_GetItemString(msg_dict, "partition");
timestamp_obj = PyDict_GetItemString(msg_dict, "timestamp");
callback_obj = PyDict_GetItemString(msg_dict, "callback");

/* Parse value */
if (value_obj && value_obj != Py_None) {
if (PyBytes_Check(value_obj)) {
value = PyBytes_AsString(value_obj);
value_len = PyBytes_Size(value_obj);
} else if (PyUnicode_Check(value_obj)) {
value = PyUnicode_AsUTF8AndSize(value_obj, &value_len);
} else {
PyErr_Format(PyExc_TypeError,
"Message value at index %d must be bytes or str", i);
return -1;
}
}

/* Parse key */
if (key_obj && key_obj != Py_None) {
if (PyBytes_Check(key_obj)) {
key = PyBytes_AsString(key_obj);
key_len = PyBytes_Size(key_obj);
} else if (PyUnicode_Check(key_obj)) {
key = PyUnicode_AsUTF8AndSize(key_obj, &key_len);
} else {
PyErr_Format(PyExc_TypeError,
"Message key at index %d must be bytes or str", i);
return -1;
}
}

/* Parse partition */
if (partition_obj && partition_obj != Py_None) {
if (!PyLong_Check(partition_obj)) {
PyErr_Format(PyExc_TypeError,
"Message partition at index %d must be int", i);
return -1;
}
msg_partition = (int)PyLong_AsLong(partition_obj);
}

/* Parse timestamp - currently not supported in batch mode */
if (timestamp_obj && timestamp_obj != Py_None) {
PyErr_Format(PyExc_NotImplementedError,
"Message timestamps are not currently supported in batch mode");
return -1;
}

/* Parse callback */
if (callback_obj && callback_obj != Py_None) {
msg_dr_cb = callback_obj;
}

#ifdef RD_KAFKA_V_HEADERS
/* Parse headers */
if (headers_obj && headers_obj != Py_None) {
if (!(rd_headers = py_headers_to_c(headers_obj))) {
PyErr_Format(PyExc_ValueError,
"Invalid headers at index %d", i);
return -1;
}
}
#endif

/* Create msgstate for this message */
msgstates[i] = Producer_msgstate_new(self, msg_dr_cb);

/* Fill librdkafka message structure */
rkmessages[i].payload = (void *)value;
rkmessages[i].len = value_len;
rkmessages[i].key = (void *)key;
rkmessages[i].key_len = key_len;
rkmessages[i].partition = msg_partition;
rkmessages[i]._private = msgstates[i];
rkmessages[i].err = RD_KAFKA_RESP_ERR_NO_ERROR;

#ifdef RD_KAFKA_V_HEADERS
/* Note: headers are not directly supported in rd_kafka_produce_batch
* This is a limitation of the current librdkafka batch API */
if (rd_headers) {
rd_kafka_headers_destroy(rd_headers);
rd_headers = NULL;
/* Could warn user that headers are ignored in batch mode */
}
#endif
}

return 0;
}

/**
* @brief Execute batch produce and handle errors
* @param messages_list Original Python message list (for error annotation)
* @param rkt Topic handle
* @param partition Default partition
* @param rkmessages Array of librdkafka messages
* @param msgstates Array of message states
* @param message_cnt Number of messages
* @returns Number of messages successfully queued
*/
static int Producer_execute_and_handle_errors(PyObject *messages_list,
rd_kafka_topic_t *rkt, int partition,
rd_kafka_message_t *rkmessages,
struct Producer_msgstate **msgstates,
int message_cnt) {
int good = 0;
int i;

/* Call librdkafka batch produce */
good = rd_kafka_produce_batch(rkt, partition, RD_KAFKA_MSG_F_COPY,
rkmessages, message_cnt);

/* Handle results and cleanup failed messages */
for (i = 0; i < message_cnt; i++) {
if (rkmessages[i].err != RD_KAFKA_RESP_ERR_NO_ERROR) {
/* Message failed, cleanup msgstate */
if (msgstates[i]) {
Producer_msgstate_destroy(msgstates[i]);
msgstates[i] = NULL;
}

/* Set error on the Python message dict for user feedback */
PyObject *msg_dict = PyList_GetItem(messages_list, i);
PyObject *error_obj = KafkaError_new0(rkmessages[i].err,
"Message failed: %s",
rd_kafka_err2str(rkmessages[i].err));
if (error_obj) {
PyDict_SetItemString(msg_dict, "_error", error_obj);
Py_DECREF(error_obj);
}
}
}

return good;
}


/**
* @brief Produce a batch of messages.
* @returns Number of messages successfully queued for producing.
*/
static PyObject *Producer_produce_batch (Handle *self, PyObject *args,
PyObject *kwargs) {
const char *topic;
PyObject *messages_list = NULL;
int partition = RD_KAFKA_PARTITION_UA;
PyObject *dr_cb = NULL, *dr_cb2 = NULL;
rd_kafka_message_t *rkmessages = NULL;
struct Producer_msgstate **msgstates = NULL;
int message_cnt = 0;
int good = 0;
rd_kafka_topic_t *rkt = NULL;

static char *kws[] = { "topic", "messages", "partition", "callback", "on_delivery", NULL };

/* Parse arguments */
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sO|iOO", kws,
&topic, &messages_list, &partition,
&dr_cb, &dr_cb2))
return NULL;

/* Handle callback aliases */
if (dr_cb2 && !dr_cb)
dr_cb = dr_cb2;
if (!dr_cb || dr_cb == Py_None)
dr_cb = self->u.Producer.default_dr_cb;

/* Get preliminary message count for allocation */
if (!PyList_Check(messages_list)) {
PyErr_SetString(PyExc_TypeError, "messages must be a list");
return NULL;
}

message_cnt = (int)PyList_Size(messages_list);
if (message_cnt == 0) {
return cfl_PyInt_FromInt(0);
}

/* Allocate arrays for librdkafka messages and msgstates */
rkmessages = calloc(message_cnt, sizeof(*rkmessages));
msgstates = calloc(message_cnt, sizeof(*msgstates));
if (!rkmessages || !msgstates) {
PyErr_NoMemory();
goto cleanup;
}

/* Get topic handle */
if (!(rkt = rd_kafka_topic_new(self->rk, topic, NULL))) {
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__INVALID_ARG, "Invalid topic: %s", topic);
goto cleanup;
}

/* FUNCTION 1: Validate arguments and parse all messages */
if (Producer_validate_and_parse_batch(self, messages_list, partition, dr_cb,
rkmessages, msgstates, &message_cnt) != 0) {
goto cleanup;
}

/* FUNCTION 2: Execute batch and handle errors */
good = Producer_execute_and_handle_errors(messages_list, rkt, partition,
rkmessages, msgstates, message_cnt);

cleanup:
/* Cleanup resources */
if (rkt)
rd_kafka_topic_destroy(rkt);
if (rkmessages)
free(rkmessages);
if (msgstates)
free(msgstates);

if (PyErr_Occurred())
return NULL;

return cfl_PyInt_FromInt(good);
}

static PyObject *Producer_init_transactions (Handle *self, PyObject *args) {
CallState cs;
rd_kafka_error_t *error;
Expand Down Expand Up @@ -624,6 +895,50 @@ static PyMethodDef Producer_methods[] = {
"callbacks may be triggered.\n"
"\n"
},

{ "produce_batch", (PyCFunction)Producer_produce_batch, METH_VARARGS|METH_KEYWORDS,
".. py:function:: produce_batch(topic, messages, [partition], [on_delivery])\n"
"\n"
" Produce a batch of messages to topic.\n"
" This is an asynchronous operation that efficiently sends multiple messages\n"
" in a single batch, reducing overhead compared to individual produce() calls.\n"
"\n"
" Each message in the batch can have individual delivery callbacks, or a\n"
" single callback can be applied to all messages in the batch.\n"
"\n"
" :param str topic: Topic to produce messages to\n"
" :param list messages: List of message dictionaries. Each message dict can contain:\n"
" - 'value' (str|bytes): Message payload (optional)\n"
" - 'key' (str|bytes): Message key (optional)\n"
" - 'partition' (int): Specific partition (optional, overrides batch partition)\n"
" - 'timestamp' (int): Message timestamp in milliseconds (optional)\n"
" - 'callback' (func): Per-message delivery callback (optional)\n"
" :param int partition: Default partition for all messages (optional)\n"
" :param func on_delivery(err,msg): Default delivery callback for all messages\n"
" :returns: Number of messages successfully queued for producing\n"
" :rtype: int\n"
"\n"
" :raises TypeError: if messages is not a list or message format is invalid\n"
" :raises BufferError: if the internal producer message queue is full\n"
" :raises KafkaException: for other errors\n"
"\n"
" .. note:: Message headers are not currently supported in batch mode due to\n"
" librdkafka API limitations. Use individual produce() calls if headers are needed.\n"
"\n"
" .. note:: Failed messages will have an '_error' field added to their dict\n"
" containing the error information.\n"
"\n"
" Example::\n"
"\n"
" messages = [\n"
" {'value': 'message 1', 'key': 'key1'},\n"
" {'value': 'message 2', 'key': 'key2', 'partition': 1},\n"
" {'value': 'message 3', 'callback': my_callback}\n"
" ]\n"
" count = producer.produce_batch('my-topic', messages)\n"
" print(f'Successfully queued {count} messages')\n"
"\n"
},
{ "purge", (PyCFunction)Producer_purge, METH_VARARGS|METH_KEYWORDS,
".. py:function:: purge([in_queue=True], [in_flight=True], [blocking=True])\n"
"\n"
Expand Down
Loading