Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6e9723c

Browse files
committed
Rename the logical replication global "wrconn"
The worker.c global wrconn is only meant to be used by logical apply/ tablesync workers, but there are other variables with the same name. To reduce future confusion rename the global from "wrconn" to "LogRepWorkerWalRcvConn". While this is just cosmetic, it seems better to backpatch it all the way back to 10 where this code appeared, to avoid future backpatching issues. Author: Peter Smith <smithpb2250@gmail.com> Discussion: https://postgr.es/m/CAHut+Pu7Jv9L2BOEx_Z0UtJxfDevQSAUW2mJqWU+CtmDrEZVAg@mail.gmail.com
1 parent 4bf0bce commit 6e9723c

File tree

4 files changed

+29
-25
lines changed

4 files changed

+29
-25
lines changed

src/backend/replication/logical/launcher.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -716,8 +716,8 @@ static void
716716
logicalrep_worker_onexit(int code, Datum arg)
717717
{
718718
/* Disconnect gracefully from the remote side. */
719-
if (wrconn)
720-
walrcv_disconnect(wrconn);
719+
if (LogRepWorkerWalRcvConn)
720+
walrcv_disconnect(LogRepWorkerWalRcvConn);
721721

722722
logicalrep_worker_detach();
723723

src/backend/replication/logical/tablesync.c

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
295295
MyLogicalRepWorker->relstate,
296296
MyLogicalRepWorker->relstate_lsn);
297297

298-
walrcv_endstreaming(wrconn, &tli);
298+
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
299299
finish_sync_worker();
300300
}
301301
else
@@ -591,7 +591,7 @@ copy_read_data(void *outbuf, int minread, int maxread)
591591
for (;;)
592592
{
593593
/* Try read the data. */
594-
len = walrcv_receive(wrconn, &buf, &fd);
594+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
595595

596596
CHECK_FOR_INTERRUPTS();
597597

@@ -665,7 +665,8 @@ fetch_remote_table_info(char *nspname, char *relname,
665665
" AND c.relkind = 'r'",
666666
quote_literal_cstr(nspname),
667667
quote_literal_cstr(relname));
668-
res = walrcv_exec(wrconn, cmd.data, 2, tableRow);
668+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
669+
lengthof(tableRow), tableRow);
669670

670671
if (res->status != WALRCV_OK_TUPLES)
671672
ereport(ERROR,
@@ -701,9 +702,11 @@ fetch_remote_table_info(char *nspname, char *relname,
701702
" AND a.attrelid = %u"
702703
" ORDER BY a.attnum",
703704
lrel->remoteid,
704-
(walrcv_server_version(wrconn) >= 120000 ? "AND a.attgenerated = ''" : ""),
705+
(walrcv_server_version(LogRepWorkerWalRcvConn) >= 120000 ?
706+
"AND a.attgenerated = ''" : ""),
705707
lrel->remoteid);
706-
res = walrcv_exec(wrconn, cmd.data, 4, attrRow);
708+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data,
709+
lengthof(attrRow), attrRow);
707710

708711
if (res->status != WALRCV_OK_TUPLES)
709712
ereport(ERROR,
@@ -773,7 +776,7 @@ copy_table(Relation rel)
773776
initStringInfo(&cmd);
774777
appendStringInfo(&cmd, "COPY %s TO STDOUT",
775778
quote_qualified_identifier(lrel.nspname, lrel.relname));
776-
res = walrcv_exec(wrconn, cmd.data, 0, NULL);
779+
res = walrcv_exec(LogRepWorkerWalRcvConn, cmd.data, 0, NULL);
777780
pfree(cmd.data);
778781
if (res->status != WALRCV_OK_COPY_OUT)
779782
ereport(ERROR,
@@ -840,8 +843,9 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
840843
* application_name, so that it is different from the main apply worker,
841844
* so that synchronous replication can distinguish them.
842845
*/
843-
wrconn = walrcv_connect(MySubscription->conninfo, true, slotname, &err);
844-
if (wrconn == NULL)
846+
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
847+
slotname, &err);
848+
if (LogRepWorkerWalRcvConn == NULL)
845849
ereport(ERROR,
846850
(errmsg("could not connect to the publisher: %s", err)));
847851

@@ -886,7 +890,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
886890
* inside the transaction so that we can use the snapshot made
887891
* by the slot to get existing data.
888892
*/
889-
res = walrcv_exec(wrconn,
893+
res = walrcv_exec(LogRepWorkerWalRcvConn,
890894
"BEGIN READ ONLY ISOLATION LEVEL "
891895
"REPEATABLE READ", 0, NULL);
892896
if (res->status != WALRCV_OK_COMMAND)
@@ -903,14 +907,14 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
903907
* that is consistent with the lsn used by the slot to start
904908
* decoding.
905909
*/
906-
walrcv_create_slot(wrconn, slotname, true,
910+
walrcv_create_slot(LogRepWorkerWalRcvConn, slotname, true,
907911
CRS_USE_SNAPSHOT, origin_startpos);
908912

909913
PushActiveSnapshot(GetTransactionSnapshot());
910914
copy_table(rel);
911915
PopActiveSnapshot();
912916

913-
res = walrcv_exec(wrconn, "COMMIT", 0, NULL);
917+
res = walrcv_exec(LogRepWorkerWalRcvConn, "COMMIT", 0, NULL);
914918
if (res->status != WALRCV_OK_COMMAND)
915919
ereport(ERROR,
916920
(errmsg("table copy could not finish transaction on publisher"),

src/backend/replication/logical/worker.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ typedef struct SlotErrCallbackArg
9696
static MemoryContext ApplyMessageContext = NULL;
9797
MemoryContext ApplyContext = NULL;
9898

99-
WalReceiverConn *wrconn = NULL;
99+
WalReceiverConn *LogRepWorkerWalRcvConn = NULL;
100100

101101
Subscription *MySubscription = NULL;
102102
bool MySubscriptionValid = false;
@@ -1158,7 +1158,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
11581158

11591159
MemoryContextSwitchTo(ApplyMessageContext);
11601160

1161-
len = walrcv_receive(wrconn, &buf, &fd);
1161+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
11621162

11631163
if (len != 0)
11641164
{
@@ -1238,7 +1238,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
12381238
MemoryContextReset(ApplyMessageContext);
12391239
}
12401240

1241-
len = walrcv_receive(wrconn, &buf, &fd);
1241+
len = walrcv_receive(LogRepWorkerWalRcvConn, &buf, &fd);
12421242
}
12431243
}
12441244

@@ -1268,7 +1268,7 @@ LogicalRepApplyLoop(XLogRecPtr last_received)
12681268
{
12691269
TimeLineID tli;
12701270

1271-
walrcv_endstreaming(wrconn, &tli);
1271+
walrcv_endstreaming(LogRepWorkerWalRcvConn, &tli);
12721272
break;
12731273
}
12741274

@@ -1431,7 +1431,8 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
14311431
(uint32) (flushpos >> 32), (uint32) flushpos
14321432
);
14331433

1434-
walrcv_send(wrconn, reply_message->data, reply_message->len);
1434+
walrcv_send(LogRepWorkerWalRcvConn,
1435+
reply_message->data, reply_message->len);
14351436

14361437
if (recvpos > last_recvpos)
14371438
last_recvpos = recvpos;
@@ -1743,18 +1744,17 @@ ApplyWorkerMain(Datum main_arg)
17431744
origin_startpos = replorigin_session_get_progress(false);
17441745
CommitTransactionCommand();
17451746

1746-
wrconn = walrcv_connect(MySubscription->conninfo, true, MySubscription->name,
1747-
&err);
1748-
if (wrconn == NULL)
1747+
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
1748+
MySubscription->name, &err);
1749+
if (LogRepWorkerWalRcvConn == NULL)
17491750
ereport(ERROR,
17501751
(errmsg("could not connect to the publisher: %s", err)));
17511752

17521753
/*
17531754
* We don't really use the output identify_system for anything but it
17541755
* does some initializations on the upstream so let's still call it.
17551756
*/
1756-
(void) walrcv_identify_system(wrconn, &startpointTLI);
1757-
1757+
(void) walrcv_identify_system(LogRepWorkerWalRcvConn, &startpointTLI);
17581758
}
17591759

17601760
/*
@@ -1773,7 +1773,7 @@ ApplyWorkerMain(Datum main_arg)
17731773
options.proto.logical.publication_names = MySubscription->publications;
17741774

17751775
/* Start normal logical streaming replication. */
1776-
walrcv_startstreaming(wrconn, &options);
1776+
walrcv_startstreaming(LogRepWorkerWalRcvConn, &options);
17771777

17781778
/* Run the main loop. */
17791779
LogicalRepApplyLoop(origin_startpos);

src/include/replication/worker_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ typedef struct LogicalRepWorker
6060
extern MemoryContext ApplyContext;
6161

6262
/* libpqreceiver connection */
63-
extern struct WalReceiverConn *wrconn;
63+
extern struct WalReceiverConn *LogRepWorkerWalRcvConn;
6464

6565
/* Worker and subscription objects. */
6666
extern Subscription *MySubscription;

0 commit comments

Comments
 (0)