This project includes Apache Kafka Connect source and sink connectors for IBM Cloudant.
These connectors can stream events:
- from Cloudant (source connector) to Kafka topic(s)
- to Cloudant (sink connector) from Kafka topic(s)
Note: the connectors are also compatible with Apache CouchDB.
Experimental
Note: The below instructions assume an installation of Kafka at $KAFKA_HOME.
- Kafka Connect
4.xrequires Java17. - Connection to a Kafka broker with minimum version
2.1.
- Download the zip from the releases page. The zip file
contains the plugin jar and the non-Kafka dependencies needed to run. The zip file is signed and the signature
can be verified by running
jarsigner -verify cloudant-kafka-connector-x.y.z.zipcommand. - Configure the Kafka connect plugin path for
your Kafka distribution, for example:
plugin.path=/kafka/connect.- This will be configured in either
connect-standalone.propertiesorconnect-distributed.propertiesin theconfigdirectory of your Kafka installation. - If you're not sure which to use, edit
connect-standalone.propertiesand follow the standalone execution instructions below.
- This will be configured in either
- Unzip and move to the plugin path configured earlier, for example:
unzip cloudant-kafka-connector-x.y.z.zip; mv cloudant-kafka-connector-x.y.z /kafka/connect. - Edit the source
or sink example properties files and save this to the
configdirectory of your Kafka installation. - Start Kafka.
- Start the connector (see below).
Connector execution in Kafka is available through scripts in the Kafka install path:
$KAFKA_HOME/bin/connect-standalone.sh or $KAFKA_HOME/bin/connect-distributed.sh
Use the appropriate configuration files for standalone or distributed execution with Cloudant as source, as sink, or both.
For example:
-
standalone execution with Cloudant changes feed as source:
$KAFKA_HOME/bin/connect-standalone.sh \ $KAFKA_HOME/config/connect-standalone.properties \ $KAFKA_HOME/config/connect-cloudant-source.properties -
standalone execution with Cloudant as sink:
$KAFKA_HOME/bin/connect-standalone.sh \ $KAFKA_HOME/config/connect-standalone.properties \ $KAFKA_HOME/config/connect-cloudant-sink.properties -
standalone execution with multiple configurations, one using Cloudant as source and one using Cloudant as sink:
$KAFKA_HOME/bin/connect-standalone.sh \ $KAFKA_HOME/config/connect-standalone.properties \ $KAFKA_HOME/config/connect-cloudant-source.properties \ $KAFKA_HOME/config/connect-cloudant-sink.properties
Any number of connector configurations can be passed to the executing script.
As outlined above, the Cloudant Kafka connector can be configured in standalone or distributed mode according to the Kafka Connector documentation.
The connect-standalone or connect-distributed configuration files contain default values which are necessary for all connectors, such as:
bootstrap.servers- If using a standalone worker
offset.storage.file.filename. offset.flush.interval.ms
The cloudant-changes-source-example and cloudant-sink-example properties files contain the minimum required to get started.
For a full reference explaining all the connector options, see here (source) and
here (sink).
In order to read from or write to Cloudant, some authentication properties need to be configured. These properties are common to both the source and sink connector, and are detailed in the configuration reference, linked above.
A number of different authentication methods are supported. IBM Cloud IAM-based authentication methods are recommended and the default is to use an IAM API key. See locating your service credentials for details on how to find your IAM API key.
Also present in the connect-standalone or connect-distributed configuration files are defaults for key and value conversion, which are as follows:
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Depending on your needs, you may need to change these converter settings. For instance, in the sample configuration files, value schemas are disabled on the assumption that users will read and write events which are "raw" JSON and do not have inline schemas.
For the source connector:
- Keys are produced as a
org.apache.kafka.connect.data.Structcontaining:_id: the original Cloudant document IDcloudant.db: the name of the Cloudant database the event originated fromcloudant.url: the URL of the Cloudant instance the event originated from.
- Values are produced as a (schemaless)
java.util.Map<String, Object>. - These types are compatible with the default
org.apache.kafka.connect.json.JsonConverterand should be compatible with any other converter that can accept aStructorMap. - The
schemas.enablemay be safely used with akey.converterif desired. - The source connector does not generate schemas for the event values by default. To use
schemas.enablewith thevalue.converterconsider using a schema registry or theMapToStructSMT.
For the sink connector:
- Kafka keys are currently ignored; therefore the key converter settings are not relevant.
- We assume that the values in kafka are serialized JSON objects, and therefore
JsonConverteris supported. If your values contain a schema ({"schema": {...}, "payload": {...}}), then setvalue.converter.schemas.enable=true, otherwise setvalue.converter.schemas.enable=false. Any other converter that converts the message values intoorg.apache.kafka.connect.data.Structorjava.util.Maptypes should also work. However, it must be noted that the subsequent serialization ofMaporStructvalues to JSON documents in the sink may not match expectations if a schema has not been provided. - Inserting only a single revision of any
_idis currently supported. This means it cannot update or delete documents. - The
_revfield in event values are preserved. To remove_revduring data flow, use theReplaceFieldSMT.
Note: The ID of each document written to Cloudant by the sink connector can be configured as follows:
- From the value of the
cloudant_doc_idheader on the even, which will overwrite the_idfield if it already exists. The Mapping Document IDs section of the SMT reference shows an example of how to use this header to set the ID based on the event key. - The value of the
_idfield in the JSON. - If no other non-null or non-empty value is available the document will be created with a new UUID.
A number of SMTs (Single Message Transforms) have been provided as part of the library to customize fields or values of events during data flow.
See the SMT reference for an overview of how to use these and Kafka built-in SMTs for common use cases.
INFO level logging is configured by default to the console. To change log levels or settings, work with
$KAFKA_HOME/config/connect-log4j.properties
and add log settings like
log4j.logger.com.ibm.cloud.cloudant.kafka=DEBUG, stdout