Add produce batch api to producer #2047
Open
+1,210
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This pull request adds a new batch message production API to the Kafka Python client, enabling efficient sending of multiple messages in a single call.
Changes
Producer_produce_batch
C function to support producing a batch of messages, including per-message and global delivery callbacks, input validation, and error reporting on failed messages. Message headers are parsed but ignored due to underlying library limitations, and timestamps are currently not supported in batch mode. (src/confluent_kafka/src/Producer.c
,produce_batch
method in the Python API, with detailed documentation describing usage, parameters, limitations, and examples.Testing
test_produce_batch_core_functionality()
• Zero-length vs None distinction
• Mixed encoding strings (UTF-8, emoji, Latin, Cyrillic, CJK)
• Binary data edge cases (null bytes, all byte values)
• Partition handling, empty batch, single message
• Large batch (100 messages)
• Advanced Unicode and encoding
• Large messages and scalability
• Batch size scalability (1, 10, 100, 500)
• Timestamp not supported (NotImplementedError)
• Headers parsed but ignored
test_produce_batch_validation_and_errors()
• Message field validation (value/key/partition types)
• Partition edge values (negative, max int32, large values)
• Unexpected message fields (ignored gracefully)
• Special topic names validation
• Missing/wrong argument types
• Invalid topic names (empty, very long, None)
• Non-callable callbacks
• Queue buffer limits and partial failures
• Error annotations on failed messages
• Restrictive producer limits
• All messages failing scenario
test_produce_batch_callbacks_and_exceptions()
• Callback distribution verification
• No callbacks scenario
• Callback parameter aliases (callback vs on_delivery)
• Circular reference in callback
• Slow/blocking callbacks (performance impact)
• Exception propagation during flush
• Multiple callback exceptions
• Critical callback errors
• RuntimeError and ValueError handling
test_produce_batch_concurrency_and_threading()
• Thread safety with shared producer
• Multiple threads with shared data races
• Rapid successive batch calls
• Resource contention scenarios
• Racing threads with shared data structures
• Message list modification during callback
• Recursive produce_batch calls from callback
• Corruption attempt handling
test_produce_batch_memory_and_resources()
• Deep nested message structures (500 messages)
• Memory cleanup verification with GC
• Producer destruction during active callbacks
• Memory pressure scenarios
• Many producers with batch operations (10 producers)
• Rapid batch creation and destruction (20 iterations)
• Handle exhaustion scenarios (50 handles)
• Resource cleanup verification
test_produce_batch_limits_and_performance()
• Very large batch size (1000 messages)
• Single very large message (1MB)
• Mixed success/failure with queue limits
• Error annotation verification
• Performance with different batch sizes
• Per-message time performance verification
• Concurrent performance testing (5 workers)
• Performance scaling validation
test_produce_batch_integrations()
• Transaction initialization and commit
• Transaction error handling
• SerializingProducer compatibility
• StringSerializer integration
• AvroProducer compatibility
• Schema-serialized data handling
• Type error handling for serialization
Librdkafka Produce batch
https://github.com/confluentinc/librdkafka/blob/13a2bbae88d8be073986f974db1b853cfc91ae8f/src/rdkafka_msg.c#L710-L837