Skip to content

Commit 5184001

Browse files
committed
Merge pull request #4 from confluentinc/value_bytes
Py3: use bytes for Message payload and key
2 parents d62630d + 99b6413 commit 5184001

File tree

4 files changed

+80
-27
lines changed

4 files changed

+80
-27
lines changed

README.md

Lines changed: 68 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,96 @@
11
Confluent's Apache Kafka client for Python
22
==========================================
33

4+
Confluent's Kafka client for Python wraps the librdkafka C library, providing
5+
full Kafka protocol support with great performance and reliability.
46

5-
Prerequisites
6-
===============
7+
The Python bindings provides a high-level Producer and Consumer with support
8+
for the balanced consumer groups of Apache Kafka 0.9.
79

8-
librdkafka >=0.9.1 (or master>=2016-04-13)
9-
py.test (pip install pytest)
10+
See the [API documentation](http://docs.confluent.io/current/clients/confluent-kafka-python/index.html) for more info.
1011

12+
**License**: [Apache License v2.0](http://www.apache.org/licenses/LICENSE-2.0)
1113

12-
Build
14+
15+
Usage
1316
=====
1417

15-
python setup.by build
18+
**Producer:**
19+
20+
from confluent_kafka import Producer
21+
22+
p = Producer({'bootstrap.servers': 'mybroker,mybroker2'})
23+
for data in some_data_source:
24+
p.produce('mytopic', data.encode('utf-8'))
25+
p.flush()
26+
27+
28+
**High-level Consumer:**
29+
30+
from confluent_kafka import Consumer
31+
32+
c = Consumer({'bootstrap.servers': 'mybroker', 'group.id': 'mygroup',
33+
'default.topic.config': {'auto.offset.reset': 'smallest'}})
34+
c.subscribe(['mytopic'])
35+
while running:
36+
msg = c.poll()
37+
if not msg.error():
38+
print('Received message: %s' % msg.value().decode('utf-8'))
39+
c.close()
40+
41+
42+
43+
See [examples](examples) for more examples.
44+
45+
46+
47+
Prerequisites
48+
=============
49+
50+
* Python >= 2.7 or Python 3.x
51+
* [librdkafka](https://github.com/edenhill/librdkafka) >= 0.9.1
1652

1753

1854

1955
Install
2056
=======
21-
Preferably in a virtualenv:
57+
58+
**Install from PyPi:**
59+
60+
pip install confluent-kafka
61+
62+
63+
**Install from source / tarball:**
2264

2365
pip install .
2466

2567

26-
Run unit-tests
27-
==============
68+
Build
69+
=====
70+
71+
python setup.by build
72+
73+
74+
75+
76+
Tests
77+
=====
78+
79+
80+
**Run unit-tests:**
2881

2982
py.test
3083

84+
**NOTE**: Requires py.test, install by `pip install pytest`
3185

32-
Run integration tests
33-
=====================
34-
**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.
86+
87+
**Run integration tests:**
3588

3689
examples/integration_test.py <kafka-broker>
3790

91+
**WARNING**: These tests require an active Kafka cluster and will make use of a topic named 'test'.
92+
93+
3894

3995

4096
Generate documentation
@@ -51,7 +107,3 @@ or:
51107
Documentation will be generated in `docs/_build/`
52108

53109

54-
Examples
55-
========
56-
57-
See [examples](examples)

confluent_kafka/src/Producer.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,8 @@ static PyMethodDef Producer_methods[] = {
370370
"message has been succesfully delivered or permanently fails delivery.\n"
371371
"\n"
372372
" :param str topic: Topic to produce message to\n"
373-
" :param str value: Message payload\n"
374-
" :param str key: Message key\n"
373+
" :param str|bytes value: Message payload\n"
374+
" :param str|bytes key: Message key\n"
375375
" :param int partition: Partition to produce to, elses uses the "
376376
"configured partitioner.\n"
377377
" :param func on_delivery(err,msg): Delivery report callback to call "

confluent_kafka/src/confluent_kafka.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -339,12 +339,12 @@ static PyMethodDef Message_methods[] = {
339339

340340
{ "value", (PyCFunction)Message_value, METH_NOARGS,
341341
" :returns: message value (payload) or None if not available.\n"
342-
" :rtype: str or None\n"
342+
" :rtype: str|bytes or None\n"
343343
"\n"
344344
},
345345
{ "key", (PyCFunction)Message_key, METH_NOARGS,
346346
" :returns: message key or None if not available.\n"
347-
" :rtype: str or None\n"
347+
" :rtype: str|bytes or None\n"
348348
"\n"
349349
},
350350
{ "topic", (PyCFunction)Message_topic, METH_NOARGS,
@@ -486,10 +486,10 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
486486
self->topic = cfl_PyUnistr(
487487
_FromString(rd_kafka_topic_name(rkm->rkt)));
488488
if (rkm->payload)
489-
self->value = cfl_PyUnistr(_FromStringAndSize(rkm->payload,
490-
rkm->len));
489+
self->value = cfl_PyBin(_FromStringAndSize(rkm->payload,
490+
rkm->len));
491491
if (rkm->key)
492-
self->key = cfl_PyUnistr(
492+
self->key = cfl_PyBin(
493493
_FromStringAndSize(rkm->key, rkm->key_len));
494494

495495
self->partition = rkm->partition;

confluent_kafka/src/confluent_kafka.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,12 @@
4141
*
4242
****************************************************************************/
4343

44-
#ifdef PY3
44+
#ifdef PY3 /* Python 3 */
4545
/**
4646
* @brief Binary type, use as cfl_PyBin(_X(A,B)) where _X() is the type-less
47-
* suffix of a PyBinary/Str_X() function
47+
* suffix of a PyBytes/Str_X() function
4848
*/
49-
#define cfl_PyBin(X) PyBinary ## X
49+
#define cfl_PyBin(X) PyBytes ## X
5050

5151
/**
5252
* @brief Unicode type, same usage as PyBin()
@@ -62,7 +62,8 @@
6262
* @returns Unicode Python string object
6363
*/
6464
#define cfl_PyObject_Unistr(X) PyObject_Str(X)
65-
#else
65+
66+
#else /* Python 2 */
6667

6768
/* See comments above */
6869
#define cfl_PyBin(X) PyString ## X

0 commit comments

Comments
 (0)