diff --git a/src/main/java/kafdrop/config/MessageFormatConfiguration.java b/src/main/java/kafdrop/config/MessageFormatConfiguration.java index e43e90cf..290a7653 100644 --- a/src/main/java/kafdrop/config/MessageFormatConfiguration.java +++ b/src/main/java/kafdrop/config/MessageFormatConfiguration.java @@ -31,4 +31,26 @@ public void setFormat(MessageFormat format) { this.format = format; } } + + @Component + @ConfigurationProperties(prefix = "key") + public static final class KeyFormatProperties { + private MessageFormat format; + + @PostConstruct + public void init() { + // Set a default message format if not configured. + if (format == null) { + format = MessageFormat.DEFAULT; + } + } + + public MessageFormat getFormat() { + return format; + } + + public void setFormat(MessageFormat format) { + this.format = format; + } + } } diff --git a/src/main/java/kafdrop/controller/MessageController.java b/src/main/java/kafdrop/controller/MessageController.java index 949f47e0..deccb4ff 100644 --- a/src/main/java/kafdrop/controller/MessageController.java +++ b/src/main/java/kafdrop/controller/MessageController.java @@ -28,6 +28,7 @@ import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import kafdrop.config.MessageFormatConfiguration; import kafdrop.util.*; import org.springframework.http.MediaType; import org.springframework.stereotype.Controller; @@ -62,19 +63,19 @@ public final class MessageController { private final MessageInspector messageInspector; private final MessageFormatProperties messageFormatProperties; - private final MessageFormatProperties keyFormatProperties; + private final MessageFormatConfiguration.KeyFormatProperties keyFormatProperties; private final SchemaRegistryProperties schemaRegistryProperties; private final ProtobufDescriptorProperties protobufProperties; - public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) { + public MessageController(KafkaMonitor kafkaMonitor, MessageInspector messageInspector, MessageFormatProperties messageFormatProperties, MessageFormatConfiguration.KeyFormatProperties keyFormatProperties, SchemaRegistryProperties schemaRegistryProperties, ProtobufDescriptorProperties protobufProperties) { this.kafkaMonitor = kafkaMonitor; this.messageInspector = messageInspector; this.messageFormatProperties = messageFormatProperties; this.keyFormatProperties = keyFormatProperties; this.schemaRegistryProperties = schemaRegistryProperties; - this.protobufProperties = protobufProperties; + this.protobufProperties = protobufProperties; } /** @@ -312,9 +313,9 @@ public static class PartitionOffsetInfo { private MessageFormat format; private MessageFormat keyFormat; - + private String descFile; - + private String msgTypeName; public PartitionOffsetInfo(int partition, long offset, long count, MessageFormat format) { diff --git a/src/main/java/kafdrop/util/AvroMessageDeserializer.java b/src/main/java/kafdrop/util/AvroMessageDeserializer.java index b01ea072..b66b8c18 100644 --- a/src/main/java/kafdrop/util/AvroMessageDeserializer.java +++ b/src/main/java/kafdrop/util/AvroMessageDeserializer.java @@ -4,6 +4,7 @@ import java.nio.*; import java.util.*; +import java.util.stream.Collectors; public final class AvroMessageDeserializer implements MessageDeserializer { @@ -12,7 +13,7 @@ public final class AvroMessageDeserializer implements MessageDeserializer { public AvroMessageDeserializer(String topicName, String schemaRegistryUrl, String schemaRegistryAuth) { this.topicName = topicName; - this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth); + this.deserializer = getDeserializer(schemaRegistryUrl, schemaRegistryAuth, topicName); } @Override @@ -22,15 +23,33 @@ public String deserializeMessage(ByteBuffer buffer) { return deserializer.deserialize(topicName, bytes).toString(); } - private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth) { + private static KafkaAvroDeserializer getDeserializer(String schemaRegistryUrl, String schemaRegistryAuth, String topicName) { final var config = new HashMap(); config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); if (schemaRegistryAuth != null) { config.put(AbstractKafkaSchemaSerDeConfig.BASIC_AUTH_CREDENTIALS_SOURCE, "USER_INFO"); config.put(AbstractKafkaSchemaSerDeConfig.USER_INFO_CONFIG, schemaRegistryAuth); } + setConfigFromEnvIfAvailable(topicName, AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, config); final var kafkaAvroDeserializer = new KafkaAvroDeserializer(); kafkaAvroDeserializer.configure(config, false); return kafkaAvroDeserializer; } + + private static void setConfigFromEnvIfAvailable(String topicName, String configPath, Map config){ + String configPrefix = "SCHEMA_REGISTRY"; + String topicScopedEnvPath = Arrays.stream(new String[]{configPrefix, configPath.replaceAll("\\.", "_"), topicName.replaceAll("-", "_") } ) + .map(String::toUpperCase).collect(Collectors.joining("_")); + + String noTopicScopedEnvPath = Arrays.stream(new String[]{ configPrefix, configPath.replaceAll("\\.", "_") }) + .map(String::toUpperCase).collect(Collectors.joining("_")); + + for(String envPath : new String[]{topicScopedEnvPath, noTopicScopedEnvPath}) { + + String namingStrategyValue = System.getenv(envPath); + if (namingStrategyValue != null) { + config.put(configPath, namingStrategyValue); + } + } + } }