diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/applyparallelworker.c | 3 | ||||
-rw-r--r-- | src/backend/replication/logical/launcher.c | 31 | ||||
-rw-r--r-- | src/backend/replication/logical/tablesync.c | 3 |
3 files changed, 25 insertions, 12 deletions
diff --git a/src/backend/replication/logical/applyparallelworker.c b/src/backend/replication/logical/applyparallelworker.c index 1d4e83c4c1f..4e8ee2973e0 100644 --- a/src/backend/replication/logical/applyparallelworker.c +++ b/src/backend/replication/logical/applyparallelworker.c @@ -435,7 +435,8 @@ pa_launch_parallel_worker(void) return NULL; } - launched = logicalrep_worker_launch(MyLogicalRepWorker->dbid, + launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY, + MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index e231fa7f951..7cc0a16d3bc 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -303,7 +303,8 @@ logicalrep_workers_find(Oid subid, bool only_running) * Returns true on success, false on failure. */ bool -logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, +logicalrep_worker_launch(LogicalRepWorkerType wtype, + Oid dbid, Oid subid, const char *subname, Oid userid, Oid relid, dsm_handle subworker_dsm) { BackgroundWorker bgw; @@ -315,10 +316,18 @@ logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname, Oid userid, int nsyncworkers; int nparallelapplyworkers; TimestampTz now; - bool is_parallel_apply_worker = (subworker_dsm != DSM_HANDLE_INVALID); - - /* Sanity check - tablesync worker cannot be a subworker */ - Assert(!(is_parallel_apply_worker && OidIsValid(relid))); + bool is_tablesync_worker = (wtype == WORKERTYPE_TABLESYNC); + bool is_parallel_apply_worker = (wtype == WORKERTYPE_PARALLEL_APPLY); + + /*---------- + * Sanity checks: + * - must be valid worker type + * - tablesync workers are only ones to have relid + * - parallel apply worker is the only kind of subworker + */ + Assert(wtype != WORKERTYPE_UNKNOWN); + Assert(is_tablesync_worker == OidIsValid(relid)); + Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID)); ereport(DEBUG1, (errmsg_internal("starting logical replication worker for subscription \"%s\"", @@ -393,7 +402,7 @@ retry: * sync worker limit per subscription. So, just return silently as we * might get here because of an otherwise harmless race condition. */ - if (OidIsValid(relid) && nsyncworkers >= max_sync_workers_per_subscription) + if (is_tablesync_worker && nsyncworkers >= max_sync_workers_per_subscription) { LWLockRelease(LogicalRepWorkerLock); return false; @@ -427,6 +436,7 @@ retry: } /* Prepare the worker slot. */ + worker->type = wtype; worker->launch_time = now; worker->in_use = true; worker->generation++; @@ -466,7 +476,7 @@ retry: subid); snprintf(bgw.bgw_type, BGW_MAXLEN, "logical replication parallel worker"); } - else if (OidIsValid(relid)) + else if (is_tablesync_worker) { snprintf(bgw.bgw_function_name, BGW_MAXLEN, "TablesyncWorkerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, @@ -847,7 +857,7 @@ logicalrep_sync_worker_count(Oid subid) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (w->subid == subid && OidIsValid(w->relid)) + if (w->subid == subid && isTablesyncWorker(w)) res++; } @@ -1180,7 +1190,8 @@ ApplyLauncherMain(Datum main_arg) (elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval) { ApplyLauncherSetWorkerStartTime(sub->oid, now); - logicalrep_worker_launch(sub->dbid, sub->oid, sub->name, + logicalrep_worker_launch(WORKERTYPE_APPLY, + sub->dbid, sub->oid, sub->name, sub->owner, InvalidOid, DSM_HANDLE_INVALID); } @@ -1290,7 +1301,7 @@ pg_stat_get_subscription(PG_FUNCTION_ARGS) worker_pid = worker.proc->pid; values[0] = ObjectIdGetDatum(worker.subid); - if (OidIsValid(worker.relid)) + if (isTablesyncWorker(&worker)) values[1] = ObjectIdGetDatum(worker.relid); else nulls[1] = true; diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 651a7750653..67bdd14095e 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -587,7 +587,8 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn) TimestampDifferenceExceeds(hentry->last_start_time, now, wal_retrieve_retry_interval)) { - logicalrep_worker_launch(MyLogicalRepWorker->dbid, + logicalrep_worker_launch(WORKERTYPE_TABLESYNC, + MyLogicalRepWorker->dbid, MySubscription->oid, MySubscription->name, MyLogicalRepWorker->userid, |