diff --git a/connector/src/main/java/io/questdb/kafka/Templating.java b/connector/src/main/java/io/questdb/kafka/Templating.java index 353c3e2..1de4989 100644 --- a/connector/src/main/java/io/questdb/kafka/Templating.java +++ b/connector/src/main/java/io/questdb/kafka/Templating.java @@ -51,6 +51,11 @@ private Templating() { partials.add(record -> record.key() == null ? "null" : record.key().toString()); break; } + case "partition": { + // assumption: sink records always have a non-null kafkaPartition() + partials.add(record -> String.valueOf(record.kafkaPartition())); + break; + } default: { throw new ConnectException("Unknown template in table name, table template: '" + template + "'"); } diff --git a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java index e3cb604..8d36948 100644 --- a/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java +++ b/connector/src/test/java/io/questdb/kafka/QuestDBSinkConnectorEmbeddedTest.java @@ -198,6 +198,44 @@ public void testTableTemplateWithKey_withSchema(boolean useHttp) { httpPort); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTableTemplateWithKeyAndPartition_withSchema(boolean useHttp) { + connect.kafka().createTopic(topicName, 3); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}_${partition}"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + Schema schema = SchemaBuilder.struct().name("com.example.Person") + .field("firstname", Schema.STRING_SCHEMA) + .field("lastname", Schema.STRING_SCHEMA) + .field("age", Schema.INT8_SCHEMA) + .build(); + + Struct john = new Struct(schema) + .put("firstname", "John") + .put("lastname", "Doe") + .put("age", (byte) 42); + + Struct jane = new Struct(schema) + .put("firstname", "Jane") + .put("lastname", "Doe") + .put("age", (byte) 41); + + connect.kafka().produce(topicName, 1, "john", new String(converter.fromConnectData(topicName, schema, john))); + connect.kafka().produce(topicName, 2, "jane", new String(converter.fromConnectData(topicName, schema, jane))); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from " + topicName + "." + "john_1", + httpPort); + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from " + topicName + "." + "jane_2", + httpPort); + } + @Test public void testTombstoneRecordFilter() { connect.kafka().createTopic(topicName, 1); @@ -331,6 +369,30 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) { httpPort); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testTableTemplateWithKeyAndPartition_schemaless(boolean useHttp) { + connect.kafka().createTopic(topicName, 3); + Map props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp); + props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal_${partition}"); + props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false"); + props.put("value.converter.schemas.enable", "false"); + connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props); + ConnectTestUtils.assertConnectorTaskRunningEventually(connect); + + connect.kafka().produce(topicName, 1, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}"); + connect.kafka().produce(topicName, 2, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}"); + + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"John\",\"Doe\",42\r\n", + "select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal_1", + httpPort); + QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n" + + "\"Jane\",\"Doe\",41\r\n", + "select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal_2", + httpPort); + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testDeadLetterQueue_wrongJson(boolean useHttp) { diff --git a/connector/src/test/java/io/questdb/kafka/TemplatingTest.java b/connector/src/test/java/io/questdb/kafka/TemplatingTest.java index 7b426b7..571d72e 100644 --- a/connector/src/test/java/io/questdb/kafka/TemplatingTest.java +++ b/connector/src/test/java/io/questdb/kafka/TemplatingTest.java @@ -16,6 +16,13 @@ public void testPlainTableName() { assertTableName(fn, record, "table"); } + @Test + public void testPartition() { + Function fn = Templating.newTableTableFn("table_${partition}"); + SinkRecord record = newSinkRecord("mytopic", "key", 42); + assertTableName(fn, record, "table_42"); + } + @Test public void testEmptyTableName() { Function fn = Templating.newTableTableFn(""); @@ -51,6 +58,13 @@ public void testTopicWithNullKey() { assertTableName(fn, record, "mytopic_null"); } + @Test + public void testTopicWithNullKeyAndPartition() { + Function fn = Templating.newTableTableFn("${topic}_${key}_${partition}"); + SinkRecord record = newSinkRecord("mytopic", null, 3); + assertTableName(fn, record, "mytopic_null_3"); + } + @Test public void testMissingClosingBrackets() { assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'"); @@ -99,7 +113,11 @@ private static void assertTableName(Function } private static SinkRecord newSinkRecord(String topic, String key) { - return new SinkRecord(topic, 0, null, key, null, null, 0); + return newSinkRecord(topic, key, 0); + } + + private static SinkRecord newSinkRecord(String topic, String key, int partition) { + return new SinkRecord(topic, partition, null, key, null, null, 0); } }