Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 776e1c8

Browse files
author
Amit Kapila
committed
Add a common function to generate the origin name.
Make a common replication origin name formatting function to replace multiple snprintf() expressions. This also includes logic previously done by ReplicationOriginNameForTablesync(). This makes the code to generate the origin name consistent among apply worker and tablesync worker. Author: Peter Smith Reviewed-By: Aleksander Alekseev Discussion: https://postgr.es/m/CAHut%2BPsa8hhfSE6ozUK-ih7GkQziAVAf4f3bqiXEj2nQiu-43g%40mail.gmail.com
1 parent 8432a81 commit 776e1c8

File tree

4 files changed

+52
-38
lines changed

4 files changed

+52
-38
lines changed

src/backend/commands/subscriptioncmds.c

+8-7
Original file line numberDiff line numberDiff line change
@@ -657,7 +657,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
657657

658658
recordDependencyOnOwner(SubscriptionRelationId, subid, owner);
659659

660-
snprintf(originname, sizeof(originname), "pg_%u", subid);
660+
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
661661
replorigin_create(originname);
662662

663663
/*
@@ -946,8 +946,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
946946
* origin and by this time the origin might be already
947947
* removed. For these reasons, passing missing_ok = true.
948948
*/
949-
ReplicationOriginNameForTablesync(sub->oid, relid, originname,
950-
sizeof(originname));
949+
ReplicationOriginNameForLogicalRep(sub->oid, relid, originname,
950+
sizeof(originname));
951951
replorigin_drop_by_name(originname, true, false);
952952
}
953953

@@ -1315,7 +1315,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
13151315
char originname[NAMEDATALEN];
13161316
XLogRecPtr remote_lsn;
13171317

1318-
snprintf(originname, sizeof(originname), "pg_%u", subid);
1318+
ReplicationOriginNameForLogicalRep(subid, InvalidOid,
1319+
originname, sizeof(originname));
13191320
originid = replorigin_by_name(originname, false);
13201321
remote_lsn = replorigin_get_progress(originid, false);
13211322

@@ -1521,8 +1522,8 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
15211522
* worker so passing missing_ok = true. This can happen for the states
15221523
* before SUBREL_STATE_FINISHEDCOPY.
15231524
*/
1524-
ReplicationOriginNameForTablesync(subid, relid, originname,
1525-
sizeof(originname));
1525+
ReplicationOriginNameForLogicalRep(subid, relid, originname,
1526+
sizeof(originname));
15261527
replorigin_drop_by_name(originname, true, false);
15271528
}
15281529

@@ -1533,7 +1534,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
15331534
RemoveSubscriptionRel(subid, InvalidOid);
15341535

15351536
/* Remove the origin tracking if exists. */
1536-
snprintf(originname, sizeof(originname), "pg_%u", subid);
1537+
ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
15371538
replorigin_drop_by_name(originname, true, false);
15381539

15391540
/*

src/backend/replication/logical/tablesync.c

+12-24
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,10 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
353353
*/
354354
StartTransactionCommand();
355355

356-
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
357-
MyLogicalRepWorker->relid,
358-
originname,
359-
sizeof(originname));
356+
ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
357+
MyLogicalRepWorker->relid,
358+
originname,
359+
sizeof(originname));
360360

361361
/*
362362
* Resetting the origin session removes the ownership of the slot.
@@ -505,10 +505,10 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
505505
* error while dropping we won't restart it to drop the
506506
* origin. So passing missing_ok = true.
507507
*/
508-
ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
509-
rstate->relid,
510-
originname,
511-
sizeof(originname));
508+
ReplicationOriginNameForLogicalRep(MyLogicalRepWorker->subid,
509+
rstate->relid,
510+
originname,
511+
sizeof(originname));
512512
replorigin_drop_by_name(originname, true, false);
513513

514514
/*
@@ -1193,18 +1193,6 @@ ReplicationSlotNameForTablesync(Oid suboid, Oid relid,
11931193
relid, GetSystemIdentifier());
11941194
}
11951195

1196-
/*
1197-
* Form the origin name for tablesync.
1198-
*
1199-
* Return the name in the supplied buffer.
1200-
*/
1201-
void
1202-
ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
1203-
char *originname, Size szorgname)
1204-
{
1205-
snprintf(originname, szorgname, "pg_%u_%u", suboid, relid);
1206-
}
1207-
12081196
/*
12091197
* Start syncing the table in the sync worker.
12101198
*
@@ -1274,10 +1262,10 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
12741262
MyLogicalRepWorker->relstate == SUBREL_STATE_FINISHEDCOPY);
12751263

12761264
/* Assign the origin tracking record name. */
1277-
ReplicationOriginNameForTablesync(MySubscription->oid,
1278-
MyLogicalRepWorker->relid,
1279-
originname,
1280-
sizeof(originname));
1265+
ReplicationOriginNameForLogicalRep(MySubscription->oid,
1266+
MyLogicalRepWorker->relid,
1267+
originname,
1268+
sizeof(originname));
12811269

12821270
if (MyLogicalRepWorker->relstate == SUBREL_STATE_DATASYNC)
12831271
{

src/backend/replication/logical/worker.c

+30-5
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,30 @@ static void apply_error_callback(void *arg);
364364
static inline void set_apply_error_context_xact(TransactionId xid, XLogRecPtr lsn);
365365
static inline void reset_apply_error_context_info(void);
366366

367+
/*
368+
* Form the origin name for the subscription.
369+
*
370+
* This is a common function for tablesync and other workers. Tablesync workers
371+
* must pass a valid relid. Other callers must pass relid = InvalidOid.
372+
*
373+
* Return the name in the supplied buffer.
374+
*/
375+
void
376+
ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
377+
char *originname, Size szoriginname)
378+
{
379+
if (OidIsValid(relid))
380+
{
381+
/* Replication origin name for tablesync workers. */
382+
snprintf(originname, szoriginname, "pg_%u_%u", suboid, relid);
383+
}
384+
else
385+
{
386+
/* Replication origin name for non-tablesync workers. */
387+
snprintf(originname, szoriginname, "pg_%u", suboid);
388+
}
389+
}
390+
367391
/*
368392
* Should this worker apply changes for given relation.
369393
*
@@ -3679,10 +3703,10 @@ ApplyWorkerMain(Datum main_arg)
36793703
* Allocate the origin name in long-lived context for error context
36803704
* message.
36813705
*/
3682-
ReplicationOriginNameForTablesync(MySubscription->oid,
3683-
MyLogicalRepWorker->relid,
3684-
originname,
3685-
sizeof(originname));
3706+
ReplicationOriginNameForLogicalRep(MySubscription->oid,
3707+
MyLogicalRepWorker->relid,
3708+
originname,
3709+
sizeof(originname));
36863710
apply_error_callback_arg.origin_name = MemoryContextStrdup(ApplyContext,
36873711
originname);
36883712
}
@@ -3707,7 +3731,8 @@ ApplyWorkerMain(Datum main_arg)
37073731

37083732
/* Setup replication origin tracking. */
37093733
StartTransactionCommand();
3710-
snprintf(originname, sizeof(originname), "pg_%u", MySubscription->oid);
3734+
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
3735+
originname, sizeof(originname));
37113736
originid = replorigin_by_name(originname, true);
37123737
if (!OidIsValid(originid))
37133738
originid = replorigin_create(originname);

src/include/replication/worker_internal.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
9292

9393
extern int logicalrep_sync_worker_count(Oid subid);
9494

95-
extern void ReplicationOriginNameForTablesync(Oid suboid, Oid relid,
96-
char *originname, Size szorgname);
95+
extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
96+
char *originname, Size szoriginname);
9797
extern char *LogicalRepSyncTableStart(XLogRecPtr *origin_startpos);
9898

9999
extern bool AllTablesyncsReady(void);

0 commit comments

Comments
 (0)