Skip to content

Commit 52d22fd

Browse files
authored
Timezone cleanup (#3202)
1. if-err cleanup 2. don't wait on sync to cancel when pausing, this was causing pausing to sometimes take 10+ seconds 3. set timezone UTC on postgres connections 4. parse datetime as UTC in ClickHouse 4 addresses this: ``` select parseDateTimeBestEffort('2025-07-11 19:30:02.313462+0000') SELECT parseDateTimeBestEffort('2025-07-11 19:30:02.313462+0000') Query id: 0d4bbad2-fdab-414f-85c8-2c80c61ad0ab ┌─parseDateTimeBestEffort('2025-07-11 19:30:02.313462+0000')─┐ │ 2025-07-11 15:30:02 │ └────────────────────────────────────────────────────────────┘ ``` where the ClickHouse server has a non-UTC timezone. The problem is our timezone hygiene isn't clear enough bringing data from postgres & ingesting into ClickHouse to handle this, between initial loads & cdc, that we should instead maintain a policy of "peerdb transmits times as UTC"
1 parent 9acfbab commit 52d22fd

File tree

13 files changed

+44
-65
lines changed

13 files changed

+44
-65
lines changed

flow/connectors/bigquery/bigquery.go

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,9 @@ func NewBigQueryConnector(ctx context.Context, config *protos.BigqueryConfig) (*
8383
return nil, fmt.Errorf("failed to create BigQuery client: %v", err)
8484
}
8585

86-
_, datasetErr := client.DatasetInProject(projectID, datasetID).Metadata(ctx)
87-
if datasetErr != nil {
88-
logger.Error("failed to get dataset metadata", "error", datasetErr)
89-
return nil, fmt.Errorf("failed to get dataset metadata: %v", datasetErr)
86+
if _, err := client.DatasetInProject(projectID, datasetID).Metadata(ctx); err != nil {
87+
logger.Error("failed to get dataset metadata", "error", err)
88+
return nil, fmt.Errorf("failed to get dataset metadata: %v", err)
9089
}
9190

9291
storageClient, err := bqsa.CreateStorageClient(ctx)
@@ -166,8 +165,7 @@ func (c *BigQueryConnector) Close() error {
166165

167166
// ConnectionActive returns nil if the connection is active.
168167
func (c *BigQueryConnector) ConnectionActive(ctx context.Context) error {
169-
_, err := c.client.DatasetInProject(c.projectID, c.datasetID).Metadata(ctx)
170-
if err != nil {
168+
if _, err := c.client.DatasetInProject(c.projectID, c.datasetID).Metadata(ctx); err != nil {
171169
return fmt.Errorf("failed to get dataset metadata: %v", err)
172170
}
173171

@@ -182,12 +180,13 @@ func (c *BigQueryConnector) waitForTableReady(ctx context.Context, datasetTable
182180
attempt := 0
183181

184182
for {
183+
if err := ctx.Err(); err != nil {
184+
return err
185+
}
185186
if time.Now().After(deadline) {
186187
return fmt.Errorf("timeout reached while waiting for table %s to be ready", datasetTable)
187188
}
188-
189-
_, err := table.Metadata(ctx)
190-
if err == nil {
189+
if _, err := table.Metadata(ctx); err == nil {
191190
return nil
192191
}
193192

@@ -768,8 +767,7 @@ func (c *BigQueryConnector) RenameTables(
768767

769768
// if _resync table does not exist, log and continue.
770769
dataset := c.client.DatasetInProject(c.projectID, srcDatasetTable.dataset)
771-
_, err := dataset.Table(srcDatasetTable.table).Metadata(ctx)
772-
if err != nil {
770+
if _, err := dataset.Table(srcDatasetTable.table).Metadata(ctx); err != nil {
773771
if !strings.Contains(err.Error(), "notFound") {
774772
return nil, fmt.Errorf("[renameTable] failed to get metadata for _resync table %s: %w", srcDatasetTable.string(), err)
775773
}
@@ -779,8 +777,7 @@ func (c *BigQueryConnector) RenameTables(
779777

780778
// if the original table does not exist, log and skip soft delete step
781779
originalTableExists := true
782-
_, err = dataset.Table(dstDatasetTable.table).Metadata(ctx)
783-
if err != nil {
780+
if _, err := dataset.Table(dstDatasetTable.table).Metadata(ctx); err != nil {
784781
if !strings.Contains(err.Error(), "notFound") {
785782
return nil, fmt.Errorf("[renameTable] failed to get metadata for original table %s: %w", dstDatasetTable.string(), err)
786783
}
@@ -860,8 +857,7 @@ func (c *BigQueryConnector) RenameTables(
860857

861858
query.DefaultProjectID = c.projectID
862859
query.DefaultDatasetID = c.datasetID
863-
_, err := query.Read(ctx)
864-
if err != nil {
860+
if _, err := query.Read(ctx); err != nil {
865861
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dstDatasetTable.string(), err)
866862
}
867863
}
@@ -876,8 +872,7 @@ func (c *BigQueryConnector) RenameTables(
876872

877873
query.DefaultProjectID = c.projectID
878874
query.DefaultDatasetID = c.datasetID
879-
_, err := query.Read(ctx)
880-
if err != nil {
875+
if _, err := query.Read(ctx); err != nil {
881876
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", srcDatasetTable.string(), err)
882877
}
883878
}
@@ -886,8 +881,7 @@ func (c *BigQueryConnector) RenameTables(
886881
dropQuery := c.queryWithLogging("DROP TABLE IF EXISTS " + dstDatasetTable.string())
887882
dropQuery.DefaultProjectID = c.projectID
888883
dropQuery.DefaultDatasetID = c.datasetID
889-
_, err = dropQuery.Read(ctx)
890-
if err != nil {
884+
if _, err := dropQuery.Read(ctx); err != nil {
891885
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
892886
}
893887

@@ -896,8 +890,7 @@ func (c *BigQueryConnector) RenameTables(
896890
srcDatasetTable.string(), dstDatasetTable.table))
897891
query.DefaultProjectID = c.projectID
898892
query.DefaultDatasetID = c.datasetID
899-
_, err = query.Read(ctx)
900-
if err != nil {
893+
if _, err := query.Read(ctx); err != nil {
901894
return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(),
902895
dstDatasetTable.string(), err)
903896
}
@@ -925,8 +918,7 @@ func (c *BigQueryConnector) CreateTablesFromExisting(
925918
newDatasetTable.string(), existingDatasetTable.string()))
926919
query.DefaultProjectID = c.projectID
927920
query.DefaultDatasetID = c.datasetID
928-
_, err := query.Read(ctx)
929-
if err != nil {
921+
if _, err := query.Read(ctx); err != nil {
930922
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
931923
}
932924

@@ -950,8 +942,7 @@ func (c *BigQueryConnector) RemoveTableEntriesFromRawTable(
950942
rawTableIdentifier, tableName, req.NormalizeBatchId, req.SyncBatchId))
951943
deleteCmd.DefaultProjectID = c.projectID
952944
deleteCmd.DefaultDatasetID = c.datasetID
953-
_, err := deleteCmd.Read(ctx)
954-
if err != nil {
945+
if _, err := deleteCmd.Read(ctx); err != nil {
955946
c.logger.Error("failed to remove entries from raw table", "error", err)
956947
}
957948

flow/connectors/bigquery/qrep.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,7 @@ func (c *BigQueryConnector) SetupQRepMetadataTables(ctx context.Context, config
103103
query := c.queryWithLogging("TRUNCATE TABLE " + config.DestinationTableIdentifier)
104104
query.DefaultDatasetID = c.datasetID
105105
query.DefaultProjectID = c.projectID
106-
_, err := query.Read(ctx)
107-
if err != nil {
106+
if _, err := query.Read(ctx); err != nil {
108107
return fmt.Errorf("failed to TRUNCATE table before query replication: %w", err)
109108
}
110109
}

flow/connectors/clickhouse/normalize_query.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
128128
switch clickHouseType {
129129
case "Date32", "Nullable(Date32)":
130130
fmt.Fprintf(&projection,
131-
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6)) AS %s,",
131+
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC')) AS %s,",
132132
peerdb_clickhouse.QuoteLiteral(colName),
133133
peerdb_clickhouse.QuoteIdentifier(dstColName),
134134
)
135135
if t.enablePrimaryUpdate {
136136
fmt.Fprintf(&projectionUpdate,
137-
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, %s),6)) AS %s,",
137+
"toDate32(parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, %s),6,'UTC')) AS %s,",
138138
peerdb_clickhouse.QuoteLiteral(colName),
139139
peerdb_clickhouse.QuoteIdentifier(dstColName),
140140
)
@@ -144,40 +144,41 @@ func (t *NormalizeQueryGenerator) BuildQuery(ctx context.Context) (string, error
144144
// parseDateTime64BestEffortOrNull for hh:mm:ss puts the year as current year
145145
// (or previous year if result would be in future) so explicitly anchor to unix epoch
146146
fmt.Fprintf(&projection,
147-
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_data, %s),6) AS %s,",
147+
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,",
148148
peerdb_clickhouse.QuoteLiteral(colName),
149149
peerdb_clickhouse.QuoteIdentifier(dstColName),
150150
)
151151
if t.enablePrimaryUpdate {
152152
fmt.Fprintf(&projectionUpdate,
153-
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_match_data, %s),6) AS %s,",
153+
"parseDateTime64BestEffortOrNull('1970-01-01 ' || JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,",
154154
peerdb_clickhouse.QuoteLiteral(colName),
155155
peerdb_clickhouse.QuoteIdentifier(dstColName),
156156
)
157157
}
158158
} else {
159159
fmt.Fprintf(&projection,
160-
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6) AS %s,",
160+
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_data, %s),6,'UTC') AS %s,",
161161
peerdb_clickhouse.QuoteLiteral(colName),
162162
peerdb_clickhouse.QuoteIdentifier(dstColName),
163163
)
164164
if t.enablePrimaryUpdate {
165165
fmt.Fprintf(&projectionUpdate,
166-
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, %s),6) AS %s,",
166+
"parseDateTime64BestEffortOrNull(JSONExtractString(_peerdb_match_data, %s),6,'UTC') AS %s,",
167167
peerdb_clickhouse.QuoteLiteral(colName),
168168
peerdb_clickhouse.QuoteIdentifier(dstColName),
169169
)
170170
}
171171
}
172172
case "Array(DateTime64(6))", "Nullable(Array(DateTime64(6)))":
173173
fmt.Fprintf(&projection,
174-
`arrayMap(x -> parseDateTime64BestEffortOrNull(trimBoth(x, '"'), 6), JSONExtractArrayRaw(_peerdb_data, %s)) AS %s,`,
174+
`arrayMap(x -> parseDateTime64BestEffortOrNull(trimBoth(x, '"'),6,'UTC'), JSONExtractArrayRaw(_peerdb_data, %s)) AS %s,`,
175175
peerdb_clickhouse.QuoteLiteral(colName),
176176
peerdb_clickhouse.QuoteIdentifier(dstColName),
177177
)
178178
if t.enablePrimaryUpdate {
179179
fmt.Fprintf(&projectionUpdate,
180-
`arrayMap(x -> parseDateTime64BestEffortOrNull(trimBoth(x, '"'), 6), JSONExtractArrayRaw(_peerdb_match_data, %s)) AS %s,`,
180+
`arrayMap(x -> parseDateTime64BestEffortOrNull(trimBoth(x, '"'),6,'UTC'),`+
181+
`JSONExtractArrayRaw(_peerdb_match_data, %s)) AS %s,`,
181182
peerdb_clickhouse.QuoteLiteral(colName),
182183
peerdb_clickhouse.QuoteIdentifier(dstColName),
183184
)

flow/connectors/eventhub/hubmanager.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ func (m *EventHubManager) GetOrCreateHubClient(ctx context.Context, name ScopedE
6262
hub, hubConnectOK = m.hubs.Load(name)
6363
if hubConnectOK {
6464
hubTmp := hub.(*azeventhubs.ProducerClient)
65-
_, err := hubTmp.GetEventHubProperties(ctx, nil)
66-
if err != nil {
65+
if _, err := hubTmp.GetEventHubProperties(ctx, nil); err != nil {
6766
logger := internal.LoggerFromCtx(ctx)
6867
logger.Info(
6968
fmt.Sprintf("eventhub %s not reachable. Will re-establish connection and re-create it.", name),
@@ -169,8 +168,7 @@ func (m *EventHubManager) EnsureEventHubExists(ctx context.Context, name ScopedE
169168
},
170169
}
171170

172-
_, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name.Eventhub, opts, nil)
173-
if err != nil {
171+
if _, err := hubClient.CreateOrUpdate(ctx, resourceGroup, namespace, name.Eventhub, opts, nil); err != nil {
174172
slog.Error("failed to create event hub", slog.Any("error", err))
175173
return err
176174
}

flow/connectors/mongo/validate.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ func (c *MongoConnector) ValidateMirrorSource(ctx context.Context, cfg *protos.F
2929
return nil
3030
}
3131

32-
_, err := shared_mongo.GetReplSetGetStatus(ctx, c.client)
33-
if err != nil {
32+
if _, err := shared_mongo.GetReplSetGetStatus(ctx, c.client); err != nil {
3433
return err
3534
}
3635

flow/connectors/postgres/postgres.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,10 @@ func NewPostgresConnector(ctx context.Context, env map[string]string, pgConfig *
7373
return nil, err
7474
}
7575

76-
runtimeParams := connConfig.Config.RuntimeParams
77-
runtimeParams["idle_in_transaction_session_timeout"] = "0"
78-
runtimeParams["statement_timeout"] = "0"
79-
runtimeParams["DateStyle"] = "ISO, DMY"
76+
connConfig.Config.RuntimeParams["timezone"] = "UTC"
77+
connConfig.Config.RuntimeParams["idle_in_transaction_session_timeout"] = "0"
78+
connConfig.Config.RuntimeParams["statement_timeout"] = "0"
79+
connConfig.Config.RuntimeParams["DateStyle"] = "ISO, DMY"
8080

8181
tunnel, err := utils.NewSSHTunnel(ctx, pgConfig.SshConfig)
8282
if err != nil {
@@ -159,10 +159,9 @@ func (c *PostgresConnector) CreateReplConn(ctx context.Context) (*pgx.Conn, erro
159159
return nil, fmt.Errorf("failed to parse connection string: %w", err)
160160
}
161161

162-
runtimeParams := replConfig.Config.RuntimeParams
163-
runtimeParams["idle_in_transaction_session_timeout"] = "0"
164-
runtimeParams["statement_timeout"] = "0"
165-
// ensure that replication is set to database
162+
replConfig.Config.RuntimeParams["timezone"] = "UTC"
163+
replConfig.Config.RuntimeParams["idle_in_transaction_session_timeout"] = "0"
164+
replConfig.Config.RuntimeParams["statement_timeout"] = "0"
166165
replConfig.Config.RuntimeParams["replication"] = "database"
167166
replConfig.Config.RuntimeParams["bytea_output"] = "hex"
168167
replConfig.Config.RuntimeParams["intervalstyle"] = "postgres"

flow/connectors/postgres/qrep_query_executor.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@ func (c *PostgresConnector) NewQRepQueryExecutor(ctx context.Context, version ui
3434
func (c *PostgresConnector) NewQRepQueryExecutorSnapshot(ctx context.Context, version uint32,
3535
snapshot string, flowJobName string, partitionID string,
3636
) (*QRepQueryExecutor, error) {
37-
_, err := c.fetchCustomTypeMapping(ctx)
38-
if err != nil {
37+
if _, err := c.fetchCustomTypeMapping(ctx); err != nil {
3938
c.logger.Error("[pg_query_executor] failed to fetch custom type mapping", slog.Any("error", err))
4039
return nil, fmt.Errorf("failed to fetch custom type mapping: %w", err)
4140
}

flow/connectors/postgres/ssh_wrapped_conn.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,7 @@ func NewPostgresConnFromConfig(
6060
}
6161

6262
if err := retryWithBackoff(logger, func() error {
63-
_, err := conn.Exec(ctx, "SELECT 1")
64-
if err != nil {
63+
if _, err := conn.Exec(ctx, "SELECT 1"); err != nil {
6564
logger.Error("Failed to ping pool", slog.Any("error", err), slog.String("host", connConfig.Host))
6665
return err
6766
}

flow/connectors/pubsub/pubsub.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,7 @@ func (c *PubSubConnector) Close() error {
5959

6060
func (c *PubSubConnector) ConnectionActive(ctx context.Context) error {
6161
topic := c.client.Topic("test")
62-
_, err := topic.Exists(ctx)
63-
if err != nil {
62+
if _, err := topic.Exists(ctx); err != nil {
6463
return fmt.Errorf("pubsub connection active check failure: %w", err)
6564
}
6665
return nil

flow/connectors/snowflake/qrep.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,7 @@ func (c *SnowflakeConnector) SetupQRepMetadataTables(ctx context.Context, config
7575
if schemaExists, err := c.checkIfRawSchemaExists(ctx); err != nil {
7676
return fmt.Errorf("error while checking if schema %s for raw table exists: %w", c.rawSchema, err)
7777
} else if !schemaExists {
78-
_, err := c.execWithLogging(ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema))
79-
if err != nil {
78+
if _, err := c.execWithLogging(ctx, fmt.Sprintf(createSchemaSQL, c.rawSchema)); err != nil {
8079
return err
8180
}
8281
}
@@ -225,8 +224,7 @@ func (c *SnowflakeConnector) dropStage(ctx context.Context, stagingPath string,
225224
stageName := c.getStageNameForJob(job)
226225
stmt := "DROP STAGE IF EXISTS " + stageName
227226

228-
_, err := c.ExecContext(ctx, stmt)
229-
if err != nil {
227+
if _, err := c.ExecContext(ctx, stmt); err != nil {
230228
return fmt.Errorf("failed to drop stage %s: %w", stageName, err)
231229
}
232230

0 commit comments

Comments
 (0)