Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
</parent>

<artifactId>azure-schemaregistry-kafka-avro</artifactId>
<version>1.1.0-beta.1</version>
<version>1.1.0-beta.2</version>
<name>azure-schemaregistry-kafka-avro</name>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializer;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;
Expand Down Expand Up @@ -50,14 +49,14 @@ public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaAvroDeserializerConfig((Map<String, Object>) props);

this.serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryClient(
new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-avro-kafka-des-1.0"))
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
.schemaRegistryClient(
new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-avro-kafka-des-1.0"))
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
}

/**
Expand All @@ -68,7 +67,7 @@ public void configure(Map<String, ?> props, boolean isKey) {
*/
@Override
public T deserialize(String topic, byte[] bytes) {
return null;
return deserialize(topic, null, bytes);
}

/**
Expand All @@ -81,15 +80,13 @@ public T deserialize(String topic, byte[] bytes) {
@Override
public T deserialize(String topic, Headers headers, byte[] bytes) {
MessageContent message = new MessageContent();
message.setBodyAsBinaryData(BinaryData.fromBytes(bytes));

Header contentTypeHeader = headers.lastHeader("content-type");
if (contentTypeHeader != null) {
message.setContentType(new String(contentTypeHeader.value()));
} else {
message.setContentType("");
}

byte length = bytes[0];
byte[] contentTypeHeaderBytes = new byte[length];
byte[] body = new byte[bytes.length - 1 - length];
System.arraycopy(bytes, 1, contentTypeHeaderBytes, 0, contentTypeHeaderBytes.length);
System.arraycopy(bytes, 1 + length, body, 0, body.length);
message.setBodyAsBinaryData(BinaryData.fromBytes(body));
message.setContentType(new String(contentTypeHeaderBytes));
return (T) this.serializer.deserialize(
message,
TypeReference.createInstance(this.config.getAvroSpecificType()));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.azure.schemaregistry.kafka.avro;

import com.azure.core.models.MessageContent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializer;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;

/**
* Deserializer implementation for Kafka consumer, implementing Kafka Deserializer interface.
*
* Byte arrays are converted into Java objects by using the schema referenced by GUID prefix to deserialize the payload.
*
* Receiving Avro GenericRecords and SpecificRecords is supported. Avro reflection capabilities have been disabled on
* com.azure.schemaregistry.kafka.KafkaAvroSerializer.
*
* @see KafkaAvroSerializer See serializer class for upstream serializer implementation
*/
public class KafkaAvroKStreamDeserializer<T extends IndexedRecord> implements Deserializer<T> {
private SchemaRegistryApacheAvroSerializer serializer;
private KafkaAvroDeserializerConfig config;

/**
* Empty constructor used by Kafka consumer
*/
public KafkaAvroKStreamDeserializer() {
super();
}

/**
* Configures deserializer instance.
*
* @param props Map of properties used to configure instance
* @param isKey Indicates if deserializing record key or value. Required by Kafka deserializer interface,
* no specific functionality has been implemented for key use.
*
* @see KafkaAvroDeserializerConfig Deserializer will use configs found in here and inherited classes.
*/
public void configure(Map<String, ?> props, boolean isKey) {
this.config = new KafkaAvroDeserializerConfig((Map<String, Object>) props);

this.serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryClient(
new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(this.config.getSchemaRegistryUrl())
.credential(this.config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-avro-kafka-des-1.0"))
.buildAsyncClient())
.avroSpecificReader(this.config.getAvroSpecificReader())
.buildSerializer();
}

/**
* Deserializes byte array into Java object
* @param topic topic associated with the record bytes
* @param bytes serialized bytes, may be null
* @return deserialize object, may be null
*/
@Override
public T deserialize(String topic, byte[] bytes) {
MessageContent message = new MessageContent();
byte length = bytes[0];
byte[] contentTypeHeaderBytes = new byte[length];
byte[] body = new byte[bytes.length - 1 - length];
System.arraycopy(bytes, 1, contentTypeHeaderBytes, 0, contentTypeHeaderBytes.length);
System.arraycopy(bytes, 1 + length, body, 0, body.length);
message.setBodyAsBinaryData(BinaryData.fromBytes(body));
message.setContentType(new String(contentTypeHeaderBytes));
return (T) this.serializer.deserialize(
message,
TypeReference.createInstance(this.config.getAvroSpecificType()));
}

/**
* Deserializes byte array into Java object
* @param topic topic associated with the record bytes
* @param bytes serialized bytes, may be null
* @param headers record headers, may be null
* @return deserialize object, may be null
*/
@Override
public T deserialize(String topic, Headers headers, byte[] bytes) {
MessageContent message = new MessageContent();
byte length = bytes[0];
byte[] contentTypeHeaderBytes = new byte[length];
byte[] body = new byte[bytes.length - 1 - length];
System.arraycopy(bytes, 1, contentTypeHeaderBytes, 0, contentTypeHeaderBytes.length);
System.arraycopy(bytes, 1 + length, body, 0, body.length);
message.setBodyAsBinaryData(BinaryData.fromBytes(body));
message.setContentType(new String(contentTypeHeaderBytes));
return (T) this.serializer.deserialize(
message,
TypeReference.createInstance(this.config.getAvroSpecificType()));
}

@Override
public void close() { }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.azure.schemaregistry.kafka.avro;

import com.azure.core.models.MessageContent;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryClientBuilder;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializer;
import com.azure.data.schemaregistry.apacheavro.SchemaRegistryApacheAvroSerializerBuilder;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
* Serializer implementation for Kafka producer, implementing the Kafka Serializer interface.
*
* Objects are converted to byte arrays containing an Avro-encoded payload and is prefixed with a GUID pointing
* to the matching Avro schema in Azure Schema Registry.
*
* Currently, sending Avro GenericRecords and SpecificRecords is supported. Avro reflection has been disabled.
*
* @see KafkaAvroDeserializer See deserializer class for downstream deserializer implementation
*/
public class KafkaAvroKStreamSerializer<T> implements Serializer<T> {
private SchemaRegistryApacheAvroSerializer serializer;

/**
* Empty constructor for Kafka producer
*/
public KafkaAvroKStreamSerializer() {
super();
}

/**
* Configures serializer instance.
*
* @param props Map of properties used to configure instance.
* @param isKey Indicates if serializing record key or value. Required by Kafka serializer interface,
* no specific functionality implemented for key use.
*
* @see KafkaAvroSerializerConfig Serializer will use configs found in KafkaAvroSerializerConfig.
*/
@Override
public void configure(Map<String, ?> props, boolean isKey) {
KafkaAvroSerializerConfig config = new KafkaAvroSerializerConfig((Map<String, Object>) props);

this.serializer = new SchemaRegistryApacheAvroSerializerBuilder()
.schemaRegistryClient(new SchemaRegistryClientBuilder()
.fullyQualifiedNamespace(config.getSchemaRegistryUrl())
.credential(config.getCredential())
.clientOptions(new ClientOptions().setApplicationId("java-avro-kafka-ser-1.0"))
.buildAsyncClient())
.schemaGroup(config.getSchemaGroup())
.autoRegisterSchemas(config.getAutoRegisterSchemas())
.buildSerializer();
}


/**
* Serializes GenericRecord or SpecificRecord into a byte array, containing a GUID reference to schema
* and the encoded payload.
*
* Null behavior matches Kafka treatment of null values.
*
* @param topic Topic destination for record. Required by Kafka serializer interface, currently not used.
* @param record Object to be serialized, may be null
* @return byte[] payload for sending to EH Kafka service, may be null
* @throws SerializationException Exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, T record) {
if (record == null) {
return null;
}
MessageContent message = this.serializer.serialize(record, TypeReference.createInstance(MessageContent.class));
byte[] contentTypeHeaderBytes = message.getContentType().getBytes();
byte[] body = message.getBodyAsBinaryData().toBytes();
byte[] bytes = new byte[1 + contentTypeHeaderBytes.length + body.length];
bytes[0] = (byte) contentTypeHeaderBytes.length;
System.arraycopy(contentTypeHeaderBytes, 0, bytes, 1, contentTypeHeaderBytes.length);
System.arraycopy(body, 0, bytes, 1 + contentTypeHeaderBytes.length, body.length);
return bytes;
}

/**
* Serializes GenericRecord or SpecificRecord into a byte array, containing a GUID reference to schema
* and the encoded payload.
*
* Null behavior matches Kafka treatment of null values.
*
* @param topic Topic destination for record. Required by Kafka serializer interface, currently not used.
* @param record Object to be serialized, may be null
* @param headers Record headers, may be null
* @return byte[] payload for sending to EH Kafka service, may be null
* @throws SerializationException Exception catchable by core Kafka producer code
*/
@Override
public byte[] serialize(String topic, Headers headers, T record) {
// null needs to treated specially since the client most likely just wants to send
// an individual null value instead of making the subject a null type. Also, null in
// Kafka has a special meaning for deletion in a topic with the compact retention policy.
// Therefore, we will bypass schema registration and return a null value in Kafka, instead
// of an Avro encoded null.
if (record == null) {
return null;
}
MessageContent message = this.serializer.serialize(record, TypeReference.createInstance(MessageContent.class));
byte[] contentTypeHeaderBytes = message.getContentType().getBytes();
byte[] body = message.getBodyAsBinaryData().toBytes();
byte[] bytes = new byte[1 + contentTypeHeaderBytes.length + body.length];
bytes[0] = (byte) contentTypeHeaderBytes.length;
System.arraycopy(contentTypeHeaderBytes, 0, bytes, 1, contentTypeHeaderBytes.length);
System.arraycopy(body, 0, bytes, 1 + contentTypeHeaderBytes.length, body.length);
return bytes;
}

@Override
public void close() { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void configure(Map<String, ?> props, boolean isKey) {
*/
@Override
public byte[] serialize(String topic, T record) {
return null;
return serialize(topic, null, record);
}

/**
Expand All @@ -98,11 +98,14 @@ public byte[] serialize(String topic, Headers headers, T record) {
if (record == null) {
return null;
}

MessageContent message = this.serializer.serialize(record, TypeReference.createInstance(MessageContent.class));
byte[] contentTypeHeaderBytes = message.getContentType().getBytes();
headers.add("content-type", contentTypeHeaderBytes);
return message.getBodyAsBinaryData().toBytes();
byte[] body = message.getBodyAsBinaryData().toBytes();
byte[] bytes = new byte[1 + contentTypeHeaderBytes.length + body.length];
bytes[0] = (byte) contentTypeHeaderBytes.length;
System.arraycopy(contentTypeHeaderBytes, 0, bytes, 1, contentTypeHeaderBytes.length);
System.arraycopy(body, 0, bytes, 1 + contentTypeHeaderBytes.length, body.length);
return bytes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
* Package containing Avro-specific implementations of Kafka serializer and deserializer.
*/
package com.microsoft.azure.schemaregistry.kafka.avro;
package com.microsoft.azure.schemaregistry.kafka.avro;
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.microsoft.azure.schemaregistry.kafka.avro.serde;

import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroKStreamDeserializer;
import com.microsoft.azure.schemaregistry.kafka.avro.KafkaAvroKStreamSerializer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

/**
* Serde (Serializer / Deserializer) class for Kafka Streams library compatible with Azure Schema Registry.
*/
public class GenericAvroKStreamSerde implements Serde<GenericRecord> {

private final Serde<GenericRecord> inner;

/**
* Empty constructor used with Kafka Streams
*/
public GenericAvroKStreamSerde() {
inner = Serdes.serdeFrom(new KafkaAvroKStreamSerializer(), new KafkaAvroKStreamDeserializer());
}

@Override
public Serializer<GenericRecord> serializer() {
return inner.serializer();
}

@Override
public Deserializer<GenericRecord> deserializer() {
return inner.deserializer();
}

@Override
public void configure(final Map<String, ?> serdeConfig, final boolean isSerdeForRecordKeys) {
inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
}

@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
}
Loading