Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions connector/src/main/java/io/questdb/kafka/Templating.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ private Templating() {
partials.add(record -> record.key() == null ? "null" : record.key().toString());
break;
}
case "partition": {
// assumption: sink records always have a non-null kafkaPartition()
partials.add(record -> String.valueOf(record.kafkaPartition()));
break;
}
default: {
throw new ConnectException("Unknown template in table name, table template: '" + template + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,44 @@ public void testTableTemplateWithKey_withSchema(boolean useHttp) {
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTableTemplateWithKeyAndPartition_withSchema(boolean useHttp) {
connect.kafka().createTopic(topicName, 3);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "${topic}.${key}_${partition}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);
Schema schema = SchemaBuilder.struct().name("com.example.Person")
.field("firstname", Schema.STRING_SCHEMA)
.field("lastname", Schema.STRING_SCHEMA)
.field("age", Schema.INT8_SCHEMA)
.build();

Struct john = new Struct(schema)
.put("firstname", "John")
.put("lastname", "Doe")
.put("age", (byte) 42);

Struct jane = new Struct(schema)
.put("firstname", "Jane")
.put("lastname", "Doe")
.put("age", (byte) 41);

connect.kafka().produce(topicName, 1, "john", new String(converter.fromConnectData(topicName, schema, john)));
connect.kafka().produce(topicName, 2, "jane", new String(converter.fromConnectData(topicName, schema, jane)));

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from " + topicName + "." + "john_1",
httpPort);
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from " + topicName + "." + "jane_2",
httpPort);
}

@Test
public void testTombstoneRecordFilter() {
connect.kafka().createTopic(topicName, 1);
Expand Down Expand Up @@ -331,6 +369,30 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) {
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTableTemplateWithKeyAndPartition_schemaless(boolean useHttp) {
connect.kafka().createTopic(topicName, 3);
Map<String, String> props = ConnectTestUtils.baseConnectorProps(questDBContainer, topicName, useHttp);
props.put(QuestDBSinkConnectorConfig.TABLE_CONFIG, "literal_${topic}_literal_${key}_literal_${partition}");
props.put(QuestDBSinkConnectorConfig.INCLUDE_KEY_CONFIG, "false");
props.put("value.converter.schemas.enable", "false");
connect.configureConnector(ConnectTestUtils.CONNECTOR_NAME, props);
ConnectTestUtils.assertConnectorTaskRunningEventually(connect);

connect.kafka().produce(topicName, 1, "john", "{\"firstname\":\"John\",\"lastname\":\"Doe\",\"age\":42}");
connect.kafka().produce(topicName, 2, "jane", "{\"firstname\":\"Jane\",\"lastname\":\"Doe\",\"age\":41}");

QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"John\",\"Doe\",42\r\n",
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal_1",
httpPort);
QuestDBUtils.assertSqlEventually( "\"firstname\",\"lastname\",\"age\"\r\n"
+ "\"Jane\",\"Doe\",41\r\n",
"select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal_2",
httpPort);
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testDeadLetterQueue_wrongJson(boolean useHttp) {
Expand Down
20 changes: 19 additions & 1 deletion connector/src/test/java/io/questdb/kafka/TemplatingTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ public void testPlainTableName() {
assertTableName(fn, record, "table");
}

@Test
public void testPartition() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("table_${partition}");
SinkRecord record = newSinkRecord("mytopic", "key", 42);
assertTableName(fn, record, "table_42");
}

@Test
public void testEmptyTableName() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("");
Expand Down Expand Up @@ -51,6 +58,13 @@ public void testTopicWithNullKey() {
assertTableName(fn, record, "mytopic_null");
}

@Test
public void testTopicWithNullKeyAndPartition() {
Function<SinkRecord, ? extends CharSequence> fn = Templating.newTableTableFn("${topic}_${key}_${partition}");
SinkRecord record = newSinkRecord("mytopic", null, 3);
assertTableName(fn, record, "mytopic_null_3");
}

@Test
public void testMissingClosingBrackets() {
assertIllegalTemplate("${topic", "Unbalanced brackets in a table template, missing closing '}', table template: '${topic'");
Expand Down Expand Up @@ -99,7 +113,11 @@ private static void assertTableName(Function<SinkRecord, ? extends CharSequence>
}

private static SinkRecord newSinkRecord(String topic, String key) {
return new SinkRecord(topic, 0, null, key, null, null, 0);
return newSinkRecord(topic, key, 0);
}

private static SinkRecord newSinkRecord(String topic, String key, int partition) {
return new SinkRecord(topic, partition, null, key, null, null, 0);
}

}