Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 62 additions & 46 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,50 +423,63 @@ func PullCdcRecords[Items model.Items](
}
}

var standByLastLogged time.Time
cdcRecordsStorage, err := utils.NewCDCStore[Items](ctx, req.Env, p.flowJobName)
var cdcRecordsStorage *utils.CDCStore[Items]
var totalRecords int64
cdcStoreEnabled, err := internal.PeerDBCDCStoreEnabled(ctx, req.Env)
if err != nil {
return err
}
if cdcStoreEnabled {
cdcRecordsStorage, err = utils.NewCDCStore[Items](ctx, req.Env, p.flowJobName)
if err != nil {
return err
}
}
defer func() {
if cdcRecordsStorage.IsEmpty() {
if totalRecords == 0 {
records.SignalAsEmpty()
}
logger.Info("[finished] PullRecords", slog.Int("records", cdcRecordsStorage.Len()))
if err := cdcRecordsStorage.Close(); err != nil {
logger.Warn("failed to clean up records storage", slog.Any("error", err))
logger.Info("[finished] PullRecords", slog.Int64("records", totalRecords))
if cdcRecordsStorage != nil {
if err := cdcRecordsStorage.Close(); err != nil {
logger.Warn("failed to clean up records storage", slog.Any("error", err))
}
}
}()

logger.Info("pulling records start")

shutdown := shared.Interval(ctx, time.Minute, func() {
logger.Info("pulling records", slog.Int("records", cdcRecordsStorage.Len()))
logger.Info("pulling records", slog.Int64("records", totalRecords))
})
defer shutdown()

nextStandbyMessageDeadline := time.Now().Add(req.IdleTimeout)

var standByLastLogged time.Time
pkmRequiresResponse := false
waitingForCommit := false
fetchedBytes := 0

addRecordWithKey := func(key model.TableWithPkey, rec model.Record[Items]) error {
if err := cdcRecordsStorage.Set(logger, key, rec); err != nil {
return err
if cdcRecordsStorage != nil {
if err := cdcRecordsStorage.Set(logger, key, rec); err != nil {
return err
}
}
if err := records.AddRecord(ctx, rec); err != nil {
return err
}
totalRecords++

if cdcRecordsStorage.Len() == 1 {
if totalRecords == 1 {
records.SignalAsNotEmpty()
nextStandbyMessageDeadline = time.Now().Add(req.IdleTimeout)
logger.Info(fmt.Sprintf("pushing the standby deadline to %s", nextStandbyMessageDeadline))
}
if cdcRecordsStorage.Len()%50000 == 0 {
if totalRecords%50000 == 0 {
logger.Info("pulling records",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()),
slog.Bool("waitingForCommit", waitingForCommit))
Expand All @@ -481,7 +494,7 @@ func PullCdcRecords[Items model.Items](
lastEmptyBatchPkmSentTime := time.Now()
for {
if pkmRequiresResponse {
if cdcRecordsStorage.IsEmpty() && int64(clientXLogPos) > req.ConsumedOffset.Load() {
if totalRecords == 0 && int64(clientXLogPos) > req.ConsumedOffset.Load() {
err := p.updateConsumedOffset(ctx, logger, req.FlowJobName, req.ConsumedOffset, clientXLogPos)
if err != nil {
return err
Expand All @@ -496,7 +509,7 @@ func PullCdcRecords[Items model.Items](

if time.Since(standByLastLogged) > 10*time.Second {
logger.Info("Sent Standby status message",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()),
slog.Bool("waitingForCommit", waitingForCommit))
Expand All @@ -505,17 +518,17 @@ func PullCdcRecords[Items model.Items](
}

if p.commitLock == nil {
if cdcRecordsStorage.Len() >= int(req.MaxBatchSize) {
if totalRecords >= int64(req.MaxBatchSize) {
logger.Info("batch filled, returning currently accumulated records",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()))
return nil
}

if waitingForCommit {
logger.Info("commit received, returning currently accumulated records",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()))
return nil
Expand All @@ -524,18 +537,18 @@ func PullCdcRecords[Items model.Items](

// if we are past the next standby deadline (?)
if time.Now().After(nextStandbyMessageDeadline) {
if !cdcRecordsStorage.IsEmpty() {
logger.Info("standby deadline reached", slog.Int("records", cdcRecordsStorage.Len()))
if totalRecords > 0 {
logger.Info("standby deadline reached", slog.Int64("records", totalRecords))

if p.commitLock == nil {
logger.Info("no commit lock, returning currently accumulated records",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()))
return nil
} else {
logger.Info("commit lock, waiting for commit to return records",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()))
waitingForCommit = true
Expand All @@ -548,7 +561,7 @@ func PullCdcRecords[Items model.Items](

var receiveCtx context.Context
var cancel context.CancelFunc
if cdcRecordsStorage.IsEmpty() {
if totalRecords == 0 {
receiveCtx, cancel = context.WithCancel(ctx)
} else {
receiveCtx, cancel = context.WithDeadline(ctx, nextStandbyMessageDeadline)
Expand All @@ -567,7 +580,7 @@ func PullCdcRecords[Items model.Items](
if err != nil && p.commitLock == nil {
if pgconn.Timeout(err) {
logger.Info("Stand-by deadline reached, returning currently accumulated records",
slog.Int("records", cdcRecordsStorage.Len()),
slog.Int64("records", totalRecords),
slog.Int("bytes", fetchedBytes),
slog.Int("channelLen", records.ChannelLen()))
return nil
Expand Down Expand Up @@ -635,19 +648,19 @@ func PullCdcRecords[Items model.Items](
return err
}

latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal)
if err != nil {
return err
}
if ok {
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
if cdcRecordsStorage != nil {
if latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal); err != nil {
return err
} else if ok {
// iterate through unchanged toast cols and set them in new record
updatedCols := r.NewItems.UpdateIfNotExists(latestRecord.GetItems())
for _, col := range updatedCols {
delete(r.UnchangedToastColumns, col)
}
p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(updatedCols)),
metric.WithAttributeSet(attribute.NewSet(
attribute.Bool("backfilled", true))))
}
p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(updatedCols)),
metric.WithAttributeSet(attribute.NewSet(
attribute.Bool("backfilled", true))))
}
p.otelManager.Metrics.UnchangedToastValuesCounter.Add(ctx, int64(len(r.UnchangedToastColumns)),
metric.WithAttributeSet(attribute.NewSet(
Expand Down Expand Up @@ -686,16 +699,19 @@ func PullCdcRecords[Items model.Items](
return err
}

latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal)
if err != nil {
return err
}
if ok {
r.Items = latestRecord.GetItems()
if updateRecord, ok := latestRecord.(*model.UpdateRecord[Items]); ok {
r.UnchangedToastColumns = updateRecord.UnchangedToastColumns
backfilled := false
if cdcRecordsStorage != nil {
if latestRecord, ok, err := cdcRecordsStorage.Get(tablePkeyVal); err != nil {
return err
} else if ok {
r.Items = latestRecord.GetItems()
if updateRecord, ok := latestRecord.(*model.UpdateRecord[Items]); ok {
r.UnchangedToastColumns = updateRecord.UnchangedToastColumns
backfilled = true
}
}
} else {
}
if !backfilled {
// there is nothing to backfill the items in the delete record with,
// so don't update the row with this record
// add sentinel value to prevent update statements from selecting
Expand All @@ -720,9 +736,9 @@ func PullCdcRecords[Items model.Items](
}

case *model.MessageRecord[Items]:
// if cdc store empty, we can move lsn,
// if there were no records, we can move lsn,
// otherwise push to records so destination can ack once all previous messages processed
if cdcRecordsStorage.IsEmpty() {
if totalRecords == 0 {
if int64(clientXLogPos) > req.ConsumedOffset.Load() {
if err := p.updateConsumedOffset(ctx, logger, req.FlowJobName, req.ConsumedOffset, clientXLogPos); err != nil {
return err
Expand Down
24 changes: 8 additions & 16 deletions flow/connectors/utils/cdc_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func encVal(val any) ([]byte, error) {
return buf.Bytes(), nil
}

type cdcStore[Items model.Items] struct {
type CDCStore[Items model.Items] struct {
inMemoryRecords map[model.TableWithPkey]model.Record[Items]
pebbleDB *pebble.DB
flowJobName string
Expand All @@ -44,7 +44,7 @@ type cdcStore[Items model.Items] struct {
numRecordsSwitchThreshold int
}

func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, flowJobName string) (*cdcStore[Items], error) {
func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string, flowJobName string) (*CDCStore[Items], error) {
numRecordsSwitchThreshold, err := internal.PeerDBCDCDiskSpillRecordsThreshold(ctx, env)
if err != nil {
return nil, fmt.Errorf("failed to get CDC disk spill records threshold: %w", err)
Expand All @@ -54,7 +54,7 @@ func NewCDCStore[Items model.Items](ctx context.Context, env map[string]string,
return nil, fmt.Errorf("failed to get CDC disk spill memory percent threshold: %w", err)
}

return &cdcStore[Items]{
return &CDCStore[Items]{
inMemoryRecords: make(map[model.TableWithPkey]model.Record[Items]),
pebbleDB: nil,
numRecords: atomic.Int32{},
Expand Down Expand Up @@ -128,7 +128,7 @@ func init() {
gob.Register(types.QValueArrayNumeric{})
}

func (c *cdcStore[T]) initPebbleDB() error {
func (c *CDCStore[T]) initPebbleDB() error {
if c.pebbleDB != nil {
return nil
}
Expand All @@ -152,7 +152,7 @@ func (c *cdcStore[T]) initPebbleDB() error {
return nil
}

func (c *cdcStore[T]) diskSpillThresholdsExceeded() bool {
func (c *CDCStore[T]) diskSpillThresholdsExceeded() bool {
if c.numRecordsSwitchThreshold >= 0 && len(c.inMemoryRecords) >= c.numRecordsSwitchThreshold {
c.thresholdReason = fmt.Sprintf("more than %d primary keys read, spilling to disk",
c.numRecordsSwitchThreshold)
Expand All @@ -170,7 +170,7 @@ func (c *cdcStore[T]) diskSpillThresholdsExceeded() bool {
return false
}

func (c *cdcStore[T]) Set(logger log.Logger, key model.TableWithPkey, rec model.Record[T]) error {
func (c *CDCStore[T]) Set(logger log.Logger, key model.TableWithPkey, rec model.Record[T]) error {
if key.TableName != "" {
_, ok := c.inMemoryRecords[key]
if ok || !c.diskSpillThresholdsExceeded() {
Expand Down Expand Up @@ -208,7 +208,7 @@ func (c *cdcStore[T]) Set(logger log.Logger, key model.TableWithPkey, rec model.
}

// bool is to indicate if a record is found or not [similar to ok]
func (c *cdcStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error) {
func (c *CDCStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error) {
rec, ok := c.inMemoryRecords[key]
if ok {
return rec, true, nil
Expand Down Expand Up @@ -244,15 +244,7 @@ func (c *cdcStore[T]) Get(key model.TableWithPkey) (model.Record[T], bool, error
return nil, false, nil
}

func (c *cdcStore[T]) Len() int {
return int(c.numRecords.Load())
}

func (c *cdcStore[T]) IsEmpty() bool {
return c.Len() == 0
}

func (c *cdcStore[T]) Close() error {
func (c *CDCStore[T]) Close() error {
c.inMemoryRecords = nil
if c.pebbleDB != nil {
if err := c.pebbleDB.Close(); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/utils/cdc_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestNullKeyDoesntStore(t *testing.T) {
require.NoError(t, err)
require.False(t, ok)

require.Equal(t, 1, cdcRecordsStore.Len())
require.Equal(t, 1, int(cdcRecordsStore.numRecords.Load()))

require.NoError(t, cdcRecordsStore.Close())
}
12 changes: 12 additions & 0 deletions flow/internal/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_QUEUES,
},
{
Name: "PEERDB_CDC_STORE_ENABLED",
Description: "Controls whether to enable the store for recovering unchanged Postgres TOAST values within a CDC batch",
DefaultValue: "true",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD",
Description: "CDC: number of records beyond which records are written to disk instead",
Expand Down Expand Up @@ -544,6 +552,10 @@ func PeerDBQueueParallelism(ctx context.Context, env map[string]string) (int64,
return dynamicConfSigned[int64](ctx, env, "PEERDB_QUEUE_PARALLELISM")
}

func PeerDBCDCStoreEnabled(ctx context.Context, env map[string]string) (bool, error) {
return dynamicConfBool(ctx, env, "PEERDB_CDC_STORE_ENABLED")
}

func PeerDBCDCDiskSpillRecordsThreshold(ctx context.Context, env map[string]string) (int64, error) {
return dynamicConfSigned[int64](ctx, env, "PEERDB_CDC_DISK_SPILL_RECORDS_THRESHOLD")
}
Expand Down
Loading