@@ -198,6 +198,44 @@ public void testTableTemplateWithKey_withSchema(boolean useHttp) {
198198 httpPort );
199199 }
200200
201+ @ ParameterizedTest
202+ @ ValueSource (booleans = {true , false })
203+ public void testTableTemplateWithKeyAndPartition_withSchema (boolean useHttp ) {
204+ connect .kafka ().createTopic (topicName , 3 );
205+ Map <String , String > props = ConnectTestUtils .baseConnectorProps (questDBContainer , topicName , useHttp );
206+ props .put (QuestDBSinkConnectorConfig .TABLE_CONFIG , "${topic}.${key}_${partition}" );
207+ props .put (QuestDBSinkConnectorConfig .INCLUDE_KEY_CONFIG , "false" );
208+ connect .configureConnector (ConnectTestUtils .CONNECTOR_NAME , props );
209+ ConnectTestUtils .assertConnectorTaskRunningEventually (connect );
210+ Schema schema = SchemaBuilder .struct ().name ("com.example.Person" )
211+ .field ("firstname" , Schema .STRING_SCHEMA )
212+ .field ("lastname" , Schema .STRING_SCHEMA )
213+ .field ("age" , Schema .INT8_SCHEMA )
214+ .build ();
215+
216+ Struct john = new Struct (schema )
217+ .put ("firstname" , "John" )
218+ .put ("lastname" , "Doe" )
219+ .put ("age" , (byte ) 42 );
220+
221+ Struct jane = new Struct (schema )
222+ .put ("firstname" , "Jane" )
223+ .put ("lastname" , "Doe" )
224+ .put ("age" , (byte ) 41 );
225+
226+ connect .kafka ().produce (topicName , 1 , "john" , new String (converter .fromConnectData (topicName , schema , john )));
227+ connect .kafka ().produce (topicName , 2 , "jane" , new String (converter .fromConnectData (topicName , schema , jane )));
228+
229+ QuestDBUtils .assertSqlEventually ( "\" firstname\" ,\" lastname\" ,\" age\" \r \n "
230+ + "\" John\" ,\" Doe\" ,42\r \n " ,
231+ "select firstname,lastname,age from " + topicName + "." + "john_1" ,
232+ httpPort );
233+ QuestDBUtils .assertSqlEventually ( "\" firstname\" ,\" lastname\" ,\" age\" \r \n "
234+ + "\" Jane\" ,\" Doe\" ,41\r \n " ,
235+ "select firstname,lastname,age from " + topicName + "." + "jane_2" ,
236+ httpPort );
237+ }
238+
201239 @ Test
202240 public void testTombstoneRecordFilter () {
203241 connect .kafka ().createTopic (topicName , 1 );
@@ -331,6 +369,30 @@ public void testTableTemplateWithKey_schemaless(boolean useHttp) {
331369 httpPort );
332370 }
333371
372+ @ ParameterizedTest
373+ @ ValueSource (booleans = {true , false })
374+ public void testTableTemplateWithKeyAndPartition_schemaless (boolean useHttp ) {
375+ connect .kafka ().createTopic (topicName , 3 );
376+ Map <String , String > props = ConnectTestUtils .baseConnectorProps (questDBContainer , topicName , useHttp );
377+ props .put (QuestDBSinkConnectorConfig .TABLE_CONFIG , "literal_${topic}_literal_${key}_literal_${partition}" );
378+ props .put (QuestDBSinkConnectorConfig .INCLUDE_KEY_CONFIG , "false" );
379+ props .put ("value.converter.schemas.enable" , "false" );
380+ connect .configureConnector (ConnectTestUtils .CONNECTOR_NAME , props );
381+ ConnectTestUtils .assertConnectorTaskRunningEventually (connect );
382+
383+ connect .kafka ().produce (topicName , 1 , "john" , "{\" firstname\" :\" John\" ,\" lastname\" :\" Doe\" ,\" age\" :42}" );
384+ connect .kafka ().produce (topicName , 2 , "jane" , "{\" firstname\" :\" Jane\" ,\" lastname\" :\" Doe\" ,\" age\" :41}" );
385+
386+ QuestDBUtils .assertSqlEventually ( "\" firstname\" ,\" lastname\" ,\" age\" \r \n "
387+ + "\" John\" ,\" Doe\" ,42\r \n " ,
388+ "select firstname,lastname,age from literal_" + topicName + "_literal_" + "john_literal_1" ,
389+ httpPort );
390+ QuestDBUtils .assertSqlEventually ( "\" firstname\" ,\" lastname\" ,\" age\" \r \n "
391+ + "\" Jane\" ,\" Doe\" ,41\r \n " ,
392+ "select firstname,lastname,age from literal_" + topicName + "_literal_" + "jane_literal_2" ,
393+ httpPort );
394+ }
395+
334396 @ ParameterizedTest
335397 @ ValueSource (booleans = {true , false })
336398 public void testDeadLetterQueue_wrongJson (boolean useHttp ) {
0 commit comments