Skip to content

Commit b48cbf6

Browse files
committed
ordering key, not primary key
1 parent 0a35ba2 commit b48cbf6

File tree

1 file changed

+25
-10
lines changed
  • flow/connectors/clickhouse

1 file changed

+25
-10
lines changed

flow/connectors/clickhouse/cdc.go

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log/slog"
7-
"slices"
7+
"maps"
88
"strconv"
99
"strings"
1010
"sync/atomic"
@@ -347,6 +347,17 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
347347
}
348348
numRecords += 1
349349
tableSchema := req.TableNameSchemaMapping[record.GetDestinationTableName()]
350+
var orderingKey map[string]struct{}
351+
for _, tm := range req.TableMappings {
352+
if tm.DestinationTableIdentifier == record.GetDestinationTableName() && tm.SourceTableIdentifier == record.GetSourceTableName() {
353+
for _, col := range tm.Columns {
354+
if col.Ordering > 0 {
355+
orderingKey[col.DestinationName] = struct{}{}
356+
}
357+
}
358+
break
359+
}
360+
}
350361
switch r := record.(type) {
351362
case *model.InsertRecord[model.RecordItems]:
352363
colNames := make([]string, 0, len(tableSchema.Columns))
@@ -365,16 +376,20 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
365376
case *model.UpdateRecord[model.RecordItems]:
366377
assignments := make([]string, 0, len(tableSchema.Columns))
367378
for _, col := range tableSchema.Columns {
368-
// TODO needs to match custom ordering key
369-
if !slices.Contains(tableSchema.PrimaryKeyColumns, col.Name) {
370-
assignments = append(assignments, fmt.Sprintf("%s=%s",
371-
peerdb_clickhouse.QuoteIdentifier(col.Name),
372-
formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable),
373-
))
379+
if _, unchanged := r.UnchangedToastColumns[col.Name]; unchanged {
380+
continue
374381
}
382+
if _, isOrdering := orderingKey[col.Name]; isOrdering {
383+
// CH does not support UPDATE on these columns
384+
continue
385+
}
386+
assignments = append(assignments, fmt.Sprintf("%s=%s",
387+
peerdb_clickhouse.QuoteIdentifier(col.Name),
388+
formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable),
389+
))
375390
}
376-
where := make([]string, 0, len(tableSchema.PrimaryKeyColumns))
377-
for _, colName := range tableSchema.PrimaryKeyColumns {
391+
where := make([]string, 0, len(orderingKey))
392+
for colName := range maps.Keys(orderingKey) {
378393
item := r.OldItems.GetColumnValue(colName)
379394
if item == nil {
380395
item = r.NewItems.GetColumnValue(colName)
@@ -402,7 +417,7 @@ func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRe
402417
}
403418
case *model.DeleteRecord[model.RecordItems]:
404419
where := make([]string, 0, len(tableSchema.PrimaryKeyColumns))
405-
for _, colName := range tableSchema.PrimaryKeyColumns {
420+
for colName := range maps.Keys(orderingKey) {
406421
var nullable bool
407422
if tableSchema.NullableEnabled {
408423
for _, col := range tableSchema.Columns {

0 commit comments

Comments
 (0)