Skip to content

Commit a60ec9f

Browse files
committed
adjust for new cancelRow() behaviour
1 parent 08f432a commit a60ec9f

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

connector/src/main/java/io/questdb/kafka/QuestDBSinkTask.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -325,19 +325,21 @@ private void handleSingleRecord(SinkRecord record) {
325325
throw new InvalidDataException("Table name cannot be empty");
326326
}
327327

328+
boolean partialRecord = false;
328329
try {
329330
sender.table(tableName);
331+
partialRecord = true;
330332
if (config.isIncludeKey()) {
331333
handleObject(config.getKeyPrefix(), record.keySchema(), record.key(), PRIMITIVE_KEY_FALLBACK_NAME);
332334
}
333335
handleObject(config.getValuePrefix(), record.valueSchema(), recordValue, PRIMITIVE_VALUE_FALLBACK_NAME);
334336
} catch (InvalidDataException ex) {
335-
if (httpTransport) {
337+
if (httpTransport && partialRecord) {
336338
sender.cancelRow();
337339
}
338340
throw ex;
339341
} catch (LineSenderException ex) {
340-
if (httpTransport) {
342+
if (httpTransport && partialRecord) {
341343
sender.cancelRow();
342344
}
343345
throw new InvalidDataException("object contains invalid data", ex);

0 commit comments

Comments
 (0)