Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,18 @@ public void setTableSchema(TableSchema ts) {
public void setRuntimeContext(RuntimeContext ctx) {}

@Override
public Object[] transform(T record, boolean supportUpsertDelete) {
public Object[] transform(T record, boolean supportUpsertDelete, boolean ignoreDelete) {
Object[] rowData = new Object[fieldNames.length + (supportUpsertDelete ? 1 : 0)];
consumer.accept(rowData, record);
if (supportUpsertDelete && (record instanceof RowData)) {
// set `__op` column
rowData[rowData.length - 1] = StarRocksSinkOP.parse(((RowData)record).getRowKind()).ordinal();
if (ignoreDelete && StarRocksSinkOP.parse(((RowData) record).getRowKind()) == StarRocksSinkOP.DELETE) {
// Convert DELETE to UPSERT when ignoreDelete is true
rowData[rowData.length - 1] = StarRocksSinkOP.UPSERT.ordinal();
} else {
// Use the original operation type
rowData[rowData.length - 1] = StarRocksSinkOP.parse(((RowData) record).getRowKind()).ordinal();
}
}
return rowData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ public interface StarRocksIRowTransformer<T> extends Serializable {

default void setFastJsonWrapper(JsonWrapper jsonWrapper) {}

Object[] transform(T record, boolean supportUpsertDelete);
Object[] transform(T record, boolean supportUpsertDelete, boolean ignoreDelete);

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void setFastJsonWrapper(JsonWrapper jsonWrapper) {
}

@Override
public Object[] transform(RowData record, boolean supportUpsertDelete) {
public Object[] transform(RowData record, boolean supportUpsertDelete, boolean ignoreDelete) {
RowData transformRecord = valueTransform.apply(record);
Object[] values = new Object[columnDataTypes.length + (supportUpsertDelete ? 1 : 0)];
int idx = 0;
Expand All @@ -98,7 +98,13 @@ public Object[] transform(RowData record, boolean supportUpsertDelete) {
}
if (supportUpsertDelete) {
// set `__op` column
values[idx] = StarRocksSinkOP.parse(record.getRowKind()).ordinal();
if (ignoreDelete && StarRocksSinkOP.parse(record.getRowKind()) == StarRocksSinkOP.DELETE) {
// Convert DELETE to UPSERT when ignoreDelete is true
values[idx] = StarRocksSinkOP.UPSERT.ordinal();
} else {
// Use the original operation type
values[idx] = StarRocksSinkOP.parse(record.getRowKind()).ordinal();
}
}
return values;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static StarRocksSink<RowData> createSink(
sinkOptions.getTableName(),
sinkOptions.supportUpsertDelete(),
sinkOptions.getIgnoreUpdateBefore(),
sinkOptions.getIgnoreDelete(),
serializer,
rowTransformer);
StreamLoadProperties streamLoadProperties = sinkOptions.getProperties(sinkTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public synchronized void invoke(T value, Context context) throws Exception {
return;
}
}
String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete()));
String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete(), sinkOptions.getIgnoreDelete()));
sinkManager.writeRecords(
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ public void invoke(T value, Context context) throws Exception {
}
}
flushLegacyData();
String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete()));
String serializedValue = serializer.serialize(rowTransformer.transform(value, sinkOptions.supportUpsertDelete(), sinkOptions.getIgnoreDelete()));
sinkManager.write(
null,
sinkOptions.getDatabaseName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public enum StreamLoadFormat {
"insert the update_after row in StarRocks, and this options should be set false for this case. Note that how " +
"to set this options depends on the user case.");

public static final ConfigOption<Boolean> SINK_IGNORE_DELETE = ConfigOptions.key("sink.ignore.delete")
.booleanType().defaultValue(false).withDescription("Whether to ignore delete records. When set to true, delete operations " +
"will be ignored even for tables with primary keys. This is useful when you want to keep all historical data " +
"in the target table and don't want any records to be deleted. Default is false.");

public static final ConfigOption<Boolean> SINK_ENABLE_EXACTLY_ONCE_LABEL_GEN = ConfigOptions.key("sink.exactly-once.enable-label-gen")
.booleanType().defaultValue(true).withDescription("Only available when using exactly-once and sink.label-prefix is set. " +
"When it's true, the connector will generate label in the format '{labelPrefix}-{tableName}-{subtaskIndex}-{id}'. " +
Expand Down Expand Up @@ -321,6 +326,10 @@ public boolean getIgnoreUpdateBefore() {
return tableOptions.get(SINK_IGNORE_UPDATE_BEFORE);
}

public boolean getIgnoreDelete() {
return tableOptions.get(SINK_IGNORE_DELETE);
}

public static Builder builder() {
return new Builder();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class RowDataSerializationSchema implements RecordSerializationSchema<Row
private final String tableName;
boolean supportUpsertDelete;
boolean ignoreUpdateBefore;
boolean ignoreDelete;
private final StarRocksISerializer serializer;
private final StarRocksIRowTransformer<RowData> rowTransformer;
private transient DefaultStarRocksRowData reusableRowData;
Expand All @@ -48,12 +49,14 @@ public RowDataSerializationSchema(
String tableName,
boolean supportUpsertDelete,
boolean ignoreUpdateBefore,
boolean ignoreDelete,
StarRocksISerializer serializer,
StarRocksIRowTransformer<RowData> rowTransformer) {
this.databaseName = databaseName;
this.tableName = tableName;
this.supportUpsertDelete = supportUpsertDelete;
this.ignoreUpdateBefore = ignoreUpdateBefore;
this.ignoreDelete = ignoreDelete;
this.serializer = serializer;
this.rowTransformer = rowTransformer;
}
Expand All @@ -79,7 +82,7 @@ public StarRocksRowData serialize(RowData record) {
// let go the UPDATE_AFTER and INSERT rows for tables who have a group of `unique` or `duplicate` keys.
return null;
}
String serializedRow = serializer.serialize(rowTransformer.transform(record, supportUpsertDelete));
String serializedRow = serializer.serialize(rowTransformer.transform(record, supportUpsertDelete, ignoreDelete));
reusableRowData.setRow(serializedRow);
return reusableRowData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void testTransformer(@Injectable RuntimeContext runtimeCtx) {
rowTransformer.setRuntimeContext(runtimeCtx);
rowTransformer.setTableSchema(TABLE_SCHEMA);
UserInfoForTest rowData = createRowData();
String result = StarRocksSerializerFactory.createSerializer(OPTIONS, TABLE_SCHEMA.getFieldNames()).serialize(rowTransformer.transform(rowData, OPTIONS.supportUpsertDelete()));
String result = StarRocksSerializerFactory.createSerializer(OPTIONS, TABLE_SCHEMA.getFieldNames()).serialize(rowTransformer.transform(rowData, OPTIONS.supportUpsertDelete(), OPTIONS.getIgnoreDelete()));

Map<String, String> loadProsp = OPTIONS.getSinkStreamLoadProperties();
String format = loadProsp.get("format");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testTransformer(@Injectable TypeInformation<RowData> rowDataTypeInfo
rowTransformer.setRuntimeContext(runtimeCtx);
rowTransformer.setTableSchema(TABLE_SCHEMA);
GenericRowData rowData = createRowData();
String result = StarRocksSerializerFactory.createSerializer(OPTIONS, TABLE_SCHEMA.getFieldNames()).serialize(rowTransformer.transform(rowData, OPTIONS.supportUpsertDelete()));
String result = StarRocksSerializerFactory.createSerializer(OPTIONS, TABLE_SCHEMA.getFieldNames()).serialize(rowTransformer.transform(rowData, OPTIONS.supportUpsertDelete(), OPTIONS.getIgnoreDelete()));

Map<String, String> loadProsp = OPTIONS.getSinkStreamLoadProperties();
String format = loadProsp.get("format");
Expand Down