From 51535a64e72506795ec964dd1ba549420f35e110 Mon Sep 17 00:00:00 2001 From: woonhak Date: Tue, 21 Nov 2023 16:14:29 -0800 Subject: [PATCH] add additional sync finish condition check to fix infinite catchup status If sync process synced very closely to the their remote, there is a possibility that no commit since the starting LSN point. If that is the case, sync/apply workers are infinitely waiting for new commit to finish even if sync worker synced all of the log record from remote. This randomly happened while running `add_table` test when `alter_subscription_resynchronize_table()` called and wait for complete sync called. To address that issue, we can evaluate sync finish condition while pulling log records. --- pglogical_apply.c | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/pglogical_apply.c b/pglogical_apply.c index c8fc47e..0d6bd2e 100644 --- a/pglogical_apply.c +++ b/pglogical_apply.c @@ -1485,7 +1485,38 @@ apply_work(PGconn *streamConn) send_feedback(applyconn, last_received, GetCurrentTimestamp(), false); if (!in_remote_transaction) + { process_syncing_tables(last_received); + + /* checking whether sync can finish. + * If sync process synced very closely to the remote, + * there is a possibility that no commit after the start point. + * If that is the case, sync/apply workers are infinitely waiting a commit to finish. + * This can happened when `alter_subscription_resynchronize_table()` called + * randomly. + * To address that issue, we can evaluate sync finish condition here */ + if (MyPGLogicalWorker->worker_type == PGLOGICAL_WORKER_SYNC && + MyApplyWorker->replay_stop_lsn != InvalidXLogRecPtr && + last_received >= MyApplyWorker->replay_stop_lsn) { + StartTransactionCommand(); + /* + * In case there is nothing to catchup, finish immediately. + * Note pglogical_sync_worker_finish() will commit. + */ + /* Mark local table as done. */ + set_table_sync_status(MyApplyWorker->subid, + NameStr(MyPGLogicalWorker->worker.sync.nspname), + NameStr(MyPGLogicalWorker->worker.sync.relname), + SYNC_STATUS_SYNCDONE, last_received); + CommitTransactionCommand(); + PQfinish(applyconn); + pglogical_sync_worker_finish(); + + /* stop process */ + proc_exit(0); + } + } + /* We must not have switched out of MessageContext by mistake */ Assert(CurrentMemoryContext == MessageContext);