diff --git a/flow/connectors/clickhouse/cdc.go b/flow/connectors/clickhouse/cdc.go index 29f8371e50..c653b36b0f 100644 --- a/flow/connectors/clickhouse/cdc.go +++ b/flow/connectors/clickhouse/cdc.go @@ -2,12 +2,21 @@ package connclickhouse import ( "context" + "errors" "fmt" + "hash/fnv" + "io" "log/slog" + "strconv" + "strings" + "sync/atomic" + "time" chproto "github.com/ClickHouse/ch-go/proto" "github.com/ClickHouse/clickhouse-go/v2" _ "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/shopspring/decimal" + "golang.org/x/sync/errgroup" "github.com/PeerDB-io/peerdb/flow/connectors/utils" "github.com/PeerDB-io/peerdb/flow/generated/protos" @@ -145,18 +154,526 @@ func (c *ClickHouseConnector) syncRecordsViaAvro( }, nil } +func formatSlice[T any](values []T, f func(T) string) string { + s := make([]string, 0, len(values)) + for _, value := range values { + s = append(s, f(value)) + } + return fmt.Sprintf("[%s]", strings.Join(s, ",")) +} + +func formatQValue(value types.QValue, nullable bool) string { + switch v := value.(type) { + case nil: + return "NULL" + case types.QValueNull: + if !nullable { + switch types.QValueKind(v) { + case types.QValueKindArrayFloat32, + types.QValueKindArrayFloat64, + types.QValueKindArrayInt16, + types.QValueKindArrayInt32, + types.QValueKindArrayInt64, + types.QValueKindArrayString, + types.QValueKindArrayEnum, + types.QValueKindArrayDate, + types.QValueKindArrayInterval, + types.QValueKindArrayTimestamp, + types.QValueKindArrayTimestampTZ, + types.QValueKindArrayBoolean, + types.QValueKindArrayJSON, + types.QValueKindArrayJSONB, + types.QValueKindArrayUUID, + types.QValueKindArrayNumeric: + return "[]" + case types.QValueKindFloat32, + types.QValueKindFloat64, + types.QValueKindInt8, + types.QValueKindInt16, + types.QValueKindInt32, + types.QValueKindInt64, + types.QValueKindInt256, + types.QValueKindUInt8, + types.QValueKindUInt16, + types.QValueKindUInt32, + types.QValueKindUInt64, + types.QValueKindUInt256, + types.QValueKindBoolean, + types.QValueKindNumeric: + return "0" + case types.QValueKindQChar, + types.QValueKindString, + types.QValueKindEnum, + types.QValueKindBytes: + return "''" + case types.QValueKindJSON, + types.QValueKindJSONB: + return "'{}'" + case types.QValueKindUUID: + return "'00000000-0000-0000-0000-000000000000'" + case types.QValueKindDate: + return "'1970-01-01'" + case types.QValueKindTimestamp, types.QValueKindTimestampTZ: + return "'1970-01-01 00:00:00'" + } + } + return "NULL" + case types.QValueArrayBoolean: + return formatSlice(v.Val, func(val bool) string { + if val { + return "true" + } else { + return "false" + } + }) + case types.QValueArrayInt16: + return formatSlice(v.Val, func(val int16) string { + return strconv.FormatInt(int64(val), 10) + }) + case types.QValueArrayInt32: + return formatSlice(v.Val, func(val int32) string { + return strconv.FormatInt(int64(val), 10) + }) + case types.QValueArrayInt64: + return formatSlice(v.Val, func(val int64) string { + return strconv.FormatInt(val, 10) + }) + case types.QValueArrayString: + return formatSlice(v.Val, peerdb_clickhouse.QuoteLiteral) + case types.QValueArrayFloat32: + return formatSlice(v.Val, func(val float32) string { + return strconv.FormatFloat(float64(val), 'g', -1, 32) + }) + case types.QValueArrayFloat64: + return formatSlice(v.Val, func(val float64) string { + return strconv.FormatFloat(val, 'g', -1, 64) + }) + case types.QValueArrayNumeric: + return formatSlice(v.Val, func(val decimal.Decimal) string { + return val.String() + }) + case types.QValueArrayDate: + return formatSlice(v.Val, func(val time.Time) string { + return peerdb_clickhouse.QuoteLiteral(val.Format(time.DateOnly)) + }) + case types.QValueArrayTimestamp: + return formatSlice(v.Val, func(val time.Time) string { + return peerdb_clickhouse.QuoteLiteral(val.Format(time.StampNano)) + }) + case types.QValueArrayTimestampTZ: + return formatSlice(v.Val, func(val time.Time) string { + return peerdb_clickhouse.QuoteLiteral(val.Format(time.StampNano)) + }) + case types.QValueArrayEnum: + return formatSlice(v.Val, peerdb_clickhouse.QuoteLiteral) + case types.QValueBoolean: + if v.Val { + return "true" + } else { + return "false" + } + case types.QValueInt16: + return strconv.FormatInt(int64(v.Val), 10) + case types.QValueInt32: + return strconv.FormatInt(int64(v.Val), 10) + case types.QValueInt64: + return strconv.FormatInt(v.Val, 10) + case types.QValueString: + return peerdb_clickhouse.QuoteLiteral(v.Val) + case types.QValueFloat32: + return strconv.FormatFloat(float64(v.Val), 'g', -1, 32) + case types.QValueFloat64: + return strconv.FormatFloat(v.Val, 'g', -1, 64) + case types.QValueNumeric: + return v.Val.String() + case types.QValueDate: + return peerdb_clickhouse.QuoteLiteral(v.Val.Format(time.DateOnly)) + case types.QValueTimestamp: + return peerdb_clickhouse.QuoteLiteral(v.Val.Format(time.StampNano)) + case types.QValueTimestampTZ: + return peerdb_clickhouse.QuoteLiteral(v.Val.Format(time.StampNano)) + case types.QValueEnum: + return peerdb_clickhouse.QuoteLiteral(v.Val) + case types.QValueBytes: + return peerdb_clickhouse.QuoteLiteral(shared.UnsafeFastReadOnlyBytesToString(v.Val)) + default: + return peerdb_clickhouse.QuoteLiteral(fmt.Sprint(v.Value())) + } +} + func (c *ClickHouseConnector) SyncRecords(ctx context.Context, req *model.SyncRecordsRequest[model.RecordItems]) (*model.SyncResponse, error) { - res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + enableStream, err := internal.PeerDBEnableClickHouseStream(ctx, req.Env) if err != nil { return nil, err - } + } else if enableStream { + var numRecords int64 + const ( + queryTypeNone int8 = iota + queryTypeInsert + queryTypeUpdate + queryTypeDelete + ) + type query struct { + table string + values []string + columns []string + whereValues []string + whereColumns []string + lsn int64 + ty int8 + } - if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpoint); err != nil { - c.logger.Error("failed to increment id", slog.Any("error", err)) - return nil, err - } + parallelNormalize, err := internal.PeerDBClickHouseParallelNormalize(ctx, req.Env) + if err != nil { + return nil, err + } + if parallelNormalize < 1 { + parallelNormalize = 1 + } + lsns := make([]atomic.Int64, parallelNormalize) + queries := make([]chan query, parallelNormalize) + for workerId := range len(queries) { + queries[workerId] = make(chan query) + } + + tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings) + flushLoopDone := make(chan struct{}) + go func() { + flushTimeout, err := internal.PeerDBQueueFlushTimeoutSeconds(ctx, req.Env) + if err != nil { + c.logger.Warn("failed to get flush timeout, no periodic flushing", slog.Any("error", err)) + return + } + ticker := time.NewTicker(flushTimeout) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-flushLoopDone: + return + // flush loop doesn't block processing new messages + case <-ticker.C: + oldLastSeen := req.ConsumedOffset.Load() + lastSeen := oldLastSeen + for workerId := range len(lsns) { + lastSeen = min(lastSeen, lsns[workerId].Load()) + } + if lastSeen > oldLastSeen { + if err := c.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil { + c.logger.Warn("SetLastOffset error", slog.Any("error", err)) + } else { + shared.AtomicInt64Max(req.ConsumedOffset, lastSeen) + c.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen)) + } + } + } + } + }() + defer close(flushLoopDone) + + // TODO sourceSchemaAsDestinationColumn + // TODO schema changes + fnvHash := fnv.New64a() + group, groupCtx := errgroup.WithContext(ctx) + for workerId := range len(lsns) { + group.Go(func() error { + currentType := queryTypeNone + currentTable := "" + batchTimeout := make(<-chan time.Time) + var batch []query + finishBatch := func() error { + c.logger.Info("finishBatch", slog.Int("len", len(batch))) + if len(batch) > 0 { + switch currentType { + case queryTypeInsert: + columns := make([]string, 0, len(batch[0].columns)) + for _, colName := range batch[0].columns { + col := peerdb_clickhouse.QuoteIdentifier(colName) + columns = append(columns, col) + } + baseQuery := fmt.Sprintf("INSERT INTO %s(%s) VALUES ", + peerdb_clickhouse.QuoteIdentifier(currentTable), + strings.Join(columns, ","), + ) + // batch in inserts of 100KB + var batchIdx int + for batchIdx != -1 { + querySize := len(baseQuery) + finalValues := make([]string, 0, len(batch)) + for idx, q := range batch[batchIdx:] { + finalValue := fmt.Sprintf("(%s)", strings.Join(q.values, ",")) + finalValues = append(finalValues, finalValue) + querySize += len(finalValue) + 1 + if querySize > 100000 { + batchIdx += idx + break + } + } + batchIdx = -1 + if err := c.execWithLogging(ctx, + baseQuery+strings.Join(finalValues, ","), + ); err != nil { + return err + } + } + case queryTypeUpdate: + return errors.New("UPDATE does not support batching") + case queryTypeDelete: + return errors.New("DELETE does not support batching") + } + if currentType != queryTypeNone { + shared.AtomicInt64Max(&lsns[workerId], batch[len(batch)-1].lsn) + } + batchTimeout = make(<-chan time.Time) + batch = batch[:0] + } + currentType = queryTypeNone + currentTable = "" + return nil + } + for { + c.logger.Info("top select") + select { + case q, ok := <-queries[workerId]: + if !ok { + return finishBatch() + } + if currentType != q.ty || currentTable != q.table { + if err := finishBatch(); err != nil { + return err + } + } + switch q.ty { + case queryTypeUpdate: + assign := make([]string, 0, len(q.columns)) + where := make([]string, 0, len(q.whereColumns)) + for idx, colName := range q.columns { + assign = append(assign, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), + q.values[idx], + )) + } + for idx, colName := range q.whereColumns { + where = append(where, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), + q.whereValues[idx], + )) + } + if err := c.execWithLogging(ctx, fmt.Sprintf("UPDATE %s SET %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(q.table), + strings.Join(assign, ","), + strings.Join(where, " AND "), + )); err != nil { + return err + } + shared.AtomicInt64Max(&lsns[workerId], q.lsn) + case queryTypeDelete: + where := make([]string, 0, len(q.whereColumns)) + for idx, colName := range q.whereColumns { + where = append(where, fmt.Sprintf("%s=%s", + peerdb_clickhouse.QuoteIdentifier(colName), + q.whereValues[idx], + )) + } + if err := c.execWithLogging(ctx, fmt.Sprintf("DELETE FROM %s WHERE %s", + peerdb_clickhouse.QuoteIdentifier(q.table), + strings.Join(where, " AND "), + )); err != nil { + return err + } + shared.AtomicInt64Max(&lsns[workerId], q.lsn) + case queryTypeInsert: + if len(batch) == 0 { + batchTimeout = time.After(time.Second) + } + batch = append(batch, q) + currentType = q.ty + currentTable = q.table + } + case <-batchTimeout: + if err := finishBatch(); err != nil { + return err + } + case <-groupCtx.Done(): + return nil + } + } + }) + } + Loop: + for { + select { + case record, ok := <-req.Records.GetRecords(): + if !ok { + c.logger.Info("flushing batches because no more records") + break Loop + } + numRecords += 1 + tableSchema, ok := req.TableNameSchemaMapping[record.GetDestinationTableName()] + if !ok { + c.logger.Warn("missing schema for table, ignoring", slog.String("table", record.GetDestinationTableName())) + continue + } + fnvHash.Reset() + orderingKeySlice := make([]string, 0) + for _, tm := range req.TableMappings { + if tm.DestinationTableIdentifier == record.GetDestinationTableName() && tm.SourceTableIdentifier == record.GetSourceTableName() { + for _, col := range tm.Columns { + if col.Ordering > 0 { + orderingKeySlice = append(orderingKeySlice, col.DestinationName) + } + } + break + } + } + if len(orderingKeySlice) == 0 { + orderingKeySlice = tableSchema.PrimaryKeyColumns + } + orderingKey := make(map[string]struct{}, len(orderingKeySlice)) + for _, colName := range orderingKeySlice { + orderingKey[colName] = struct{}{} + } + lsn := record.GetBaseRecord().CheckpointID + switch r := record.(type) { + case *model.InsertRecord[model.RecordItems]: + colNames := make([]string, 0, len(tableSchema.Columns)) + values := make([]string, 0, len(tableSchema.Columns)) + formattedValMap := make(map[string]string, len(orderingKeySlice)) + for _, col := range tableSchema.Columns { + colNames = append(colNames, col.Name) + val := r.Items.GetColumnValue(col.Name) + formattedVal := formatQValue(val, tableSchema.NullableEnabled && col.Nullable) + values = append(values, formattedVal) + if _, isOrdering := orderingKey[col.Name]; isOrdering { + formattedValMap[col.Name] = formattedVal + } + } + for _, colName := range orderingKeySlice { + io.WriteString(fnvHash, formattedValMap[colName]) + } + queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ + ty: queryTypeInsert, + table: r.DestinationTableName, + columns: colNames, + values: values, + lsn: lsn, + } + case *model.UpdateRecord[model.RecordItems]: + columns := make([]string, 0, len(tableSchema.Columns)) + values := make([]string, 0, len(tableSchema.Columns)) + for _, col := range tableSchema.Columns { + if _, unchanged := r.UnchangedToastColumns[col.Name]; unchanged { + continue + } + if _, isOrdering := orderingKey[col.Name]; isOrdering { + // CH does not support UPDATE on these columns + continue + } + columns = append(columns, col.Name) + values = append(values, formatQValue(r.NewItems.GetColumnValue(col.Name), tableSchema.NullableEnabled && col.Nullable)) + } + whereValues := make([]string, 0, len(orderingKeySlice)) + for _, colName := range orderingKeySlice { + item := r.OldItems.GetColumnValue(colName) + if item == nil { + item = r.NewItems.GetColumnValue(colName) + } + var nullable bool // TODO be a map lookup + if tableSchema.NullableEnabled { + for _, col := range tableSchema.Columns { + if col.Name == colName { + if col.Nullable { + nullable = true + } + break + } + } + } + formattedVal := formatQValue(item, nullable) + whereValues = append(whereValues, formattedVal) + if _, isOrdering := orderingKey[colName]; isOrdering { + io.WriteString(fnvHash, formattedVal) + } + } + queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ + ty: queryTypeUpdate, + table: r.DestinationTableName, + values: values, + columns: columns, + whereValues: whereValues, + whereColumns: orderingKeySlice, + lsn: lsn, + } + case *model.DeleteRecord[model.RecordItems]: + values := make([]string, 0, len(orderingKeySlice)) + for _, colName := range orderingKeySlice { + var nullable bool // TODO be a map lookup + if tableSchema.NullableEnabled { + for _, col := range tableSchema.Columns { + if col.Name == colName { + if col.Nullable { + nullable = true + } + break + } + } + } + formattedVal := formatQValue(r.Items.GetColumnValue(colName), nullable) + values = append(values, formattedVal) + if _, isOrdering := orderingKey[colName]; isOrdering { + io.WriteString(fnvHash, formattedVal) + } + } + queries[fnvHash.Sum64()%uint64(len(queries))] <- query{ + ty: queryTypeDelete, + table: r.DestinationTableName, + whereValues: values, + whereColumns: orderingKeySlice, + lsn: lsn, + } + } + case <-groupCtx.Done(): + if err := ctx.Err(); err != nil { + return nil, err + } + c.logger.Error("error syncing, ending batch", slog.Any("error", groupCtx.Err())) + break Loop + } + } + + for _, ch := range queries { + close(ch) + } + if err := group.Wait(); err != nil { + return nil, err + } - return res, nil + lastCheckpoint := req.Records.GetLastCheckpoint() + if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil { + return nil, err + } + return &model.SyncResponse{ + CurrentSyncBatchID: req.SyncBatchID, + LastSyncedCheckpoint: lastCheckpoint, + NumRecordsSynced: numRecords, + TableNameRowsMapping: tableNameRowsMapping, + TableSchemaDeltas: req.Records.SchemaDeltas, + }, nil + } else { + res, err := c.syncRecordsViaAvro(ctx, req, req.SyncBatchID) + if err != nil { + return nil, err + } + + if err := c.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, res.LastSyncedCheckpoint); err != nil { + c.logger.Error("failed to increment id", slog.Any("error", err)) + return nil, err + } + + return res, nil + } } func (c *ClickHouseConnector) ReplayTableSchemaDeltas( diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index c7996fad32..cf6539c192 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -258,6 +258,13 @@ func Connect(ctx context.Context, env map[string]string, config *protos.Clickhou if config.Cluster != "" { settings["insert_distributed_sync"] = uint64(1) } + if allowStream, err := internal.PeerDBEnableClickHouseStream(ctx, env); err != nil { + return nil, fmt.Errorf("failed to load stream config: %w", err) + } else if allowStream { + settings["allow_experimental_lightweight_update"] = true + settings["async_insert"] = true + settings["lightweight_delete_mode"] = "lightweight_update" + } conn, err := clickhouse.Open(&clickhouse.Options{ Addr: []string{shared.JoinHostPort(config.Host, config.Port)}, diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index 58b9c216f1..5766223749 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -254,8 +254,15 @@ func (c *ClickHouseConnector) generateCreateTableSQLForNormalizedTable( } } - if allowNullableKey { - stmtBuilder.WriteString(" SETTINGS allow_nullable_key = 1") + if allowStream, err := internal.PeerDBEnableClickHouseStream(ctx, config.Env); err != nil { + return nil, fmt.Errorf("failed to load stream config: %w", err) + } else if allowStream { + stmtBuilder.WriteString(" SETTINGS enable_block_number_column=1,enable_block_offset_column=1") + if allowNullableKey { + stmtBuilder.WriteString(",allow_nullable_key=1") + } + } else if allowNullableKey { + stmtBuilder.WriteString(" SETTINGS allow_nullable_key=1") } if c.Config.Cluster != "" { @@ -414,6 +421,16 @@ func (c *ClickHouseConnector) NormalizeRecords( }, nil } + enableStream, err := internal.PeerDBEnableClickHouseStream(ctx, req.Env) + if err != nil { + return model.NormalizeResponse{}, err + } else if enableStream { + return model.NormalizeResponse{ + StartBatchID: normBatchID, + EndBatchID: req.SyncBatchID, + }, nil + } + endBatchID := req.SyncBatchID groupBatches, err := internal.PeerDBGroupNormalize(ctx, req.Env) if err != nil { diff --git a/flow/connectors/clickhouse/validate.go b/flow/connectors/clickhouse/validate.go index 81736a529b..f6d2d8b83a 100644 --- a/flow/connectors/clickhouse/validate.go +++ b/flow/connectors/clickhouse/validate.go @@ -18,8 +18,7 @@ func (c *ClickHouseConnector) ValidateMirrorDestination( tableNameSchemaMapping map[string]*protos.TableSchema, ) error { if internal.PeerDBOnlyClickHouseAllowed() { - err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database) - if err != nil { + if err := chvalidate.CheckIfClickHouseCloudHasSharedMergeTreeEnabled(ctx, c.logger, c.database); err != nil { return err } } diff --git a/flow/internal/dynamicconf.go b/flow/internal/dynamicconf.go index fe3660b49d..94c627dcee 100644 --- a/flow/internal/dynamicconf.go +++ b/flow/internal/dynamicconf.go @@ -135,6 +135,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, TargetForSetting: protos.DynconfTarget_SNOWFLAKE, }, + { + Name: "PEERDB_ENABLE_CLICKHOUSE_STREAM", + Description: "Stream records as series of inserts, lightweight updates, & lightweight deletes", + DefaultValue: "false", + ValueType: protos.DynconfValueType_BOOL, + ApplyMode: protos.DynconfApplyMode_APPLY_MODE_AFTER_RESUME, + TargetForSetting: protos.DynconfTarget_CLICKHOUSE, + }, { Name: "PEERDB_CLICKHOUSE_BINARY_FORMAT", Description: "Binary field encoding on clickhouse destination; either raw, hex, or base64", @@ -572,6 +580,10 @@ func PeerDBNullable(ctx context.Context, env map[string]string) (bool, error) { return dynamicConfBool(ctx, env, "PEERDB_NULLABLE") } +func PeerDBEnableClickHouseStream(ctx context.Context, env map[string]string) (bool, error) { + return dynamicConfBool(ctx, env, "PEERDB_ENABLE_CLICKHOUSE_STREAM") +} + func PeerDBBinaryFormat(ctx context.Context, env map[string]string) (BinaryFormat, error) { format, err := dynLookup(ctx, env, "PEERDB_CLICKHOUSE_BINARY_FORMAT") if err != nil { diff --git a/flow/model/record.go b/flow/model/record.go index e0bed285e2..cce21e3f5c 100644 --- a/flow/model/record.go +++ b/flow/model/record.go @@ -13,8 +13,8 @@ type Record[T Items] interface { GetTransactionID() uint64 GetDestinationTableName() string GetSourceTableName() string - // get columns and values for the record - GetItems() T + GetBaseRecord() BaseRecord + GetItems() T // get columns and values for the record PopulateCountMap(mapOfCounts map[string]*RecordTypeCounts) } @@ -63,6 +63,10 @@ func (r *InsertRecord[T]) GetSourceTableName() string { return r.SourceTableName } +func (r *InsertRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *InsertRecord[T]) GetItems() T { return r.Items } @@ -100,6 +104,10 @@ func (r *UpdateRecord[T]) GetSourceTableName() string { return r.SourceTableName } +func (r *UpdateRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *UpdateRecord[T]) GetItems() T { return r.NewItems } @@ -135,6 +143,10 @@ func (r *DeleteRecord[T]) GetSourceTableName() string { return r.SourceTableName } +func (r *DeleteRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *DeleteRecord[T]) GetItems() T { return r.Items } @@ -164,6 +176,10 @@ func (r *RelationRecord[T]) GetSourceTableName() string { return r.TableSchemaDelta.SrcTableName } +func (r *RelationRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *RelationRecord[T]) GetItems() T { var none T return none @@ -190,6 +206,10 @@ func (r *MessageRecord[T]) GetSourceTableName() string { return "" } +func (r *MessageRecord[T]) GetBaseRecord() BaseRecord { + return r.BaseRecord +} + func (r *MessageRecord[T]) GetItems() T { var none T return none diff --git a/flow/shared/clickhouse/escape.go b/flow/shared/clickhouse/escape.go index 7a3bedec61..ce2784590a 100644 --- a/flow/shared/clickhouse/escape.go +++ b/flow/shared/clickhouse/escape.go @@ -21,6 +21,9 @@ func escape(result *strings.Builder, value string) { for idx := range len(value) { if mustEscape(value[idx]) { result.WriteByte('\\') + } else if value[idx] == 0 { + result.WriteString("\\x00") + continue } result.WriteByte(value[idx]) }