Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit bfb12cd

Browse files
committed
Fix bogus completion tag usage in walsender
Since commit fd5942c (2012, 9.3-era), walsender has been sending completion tags for certain replication commands twice -- and they're not even consistent. Apparently neither libpq nor JDBC have a problem with it, but it's not kosher. Fix by remove the EndCommand() call in the common code path for them all, and inserting specific calls to EndReplicationCommand() specifically in those places where it's needed. EndReplicationCommand() is a new simple function to send the completion tag for replication commands. Do this instead of sending a generic SELECT completion tag for them all, which was also pretty bogus (if innocuous). While at it, change StartReplication() to use EndReplicationCommand() instead of pg_puttextmessage(). In commit 2f96613, I failed to realize that replication commands are not close-enough kin of regular SQL commands, so the DROP_REPLICATION_SLOT tag I added is undeserved and a type pun. Take it out. Backpatch to 13, where the latter commit appeared. The duplicate tag has been sent since 9.3, but since nothing is broken, it doesn't seem worth fixing. Per complaints from Tom Lane. Discussion: https://postgr.es/m/1347966.1600195735@sss.pgh.pa.us
1 parent c287f58 commit bfb12cd

File tree

4 files changed

+34
-14
lines changed

4 files changed

+34
-14
lines changed

src/backend/replication/walsender.c

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -799,7 +799,7 @@ StartReplication(StartReplicationCmd *cmd)
799799
}
800800

801801
/* Send CommandComplete message */
802-
pq_puttextmessage('C', "START_STREAMING");
802+
EndReplicationCommand("START_STREAMING");
803803
}
804804

805805
/*
@@ -1122,11 +1122,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
11221122
static void
11231123
DropReplicationSlot(DropReplicationSlotCmd *cmd)
11241124
{
1125-
QueryCompletion qc;
1126-
11271125
ReplicationSlotDrop(cmd->slotname, !cmd->wait);
1128-
SetQueryCompletion(&qc, CMDTAG_DROP_REPLICATION_SLOT, 0);
1129-
EndCommand(&qc, DestRemote, false);
11301126
}
11311127

11321128
/*
@@ -1517,9 +1513,9 @@ exec_replication_command(const char *cmd_string)
15171513
{
15181514
int parse_rc;
15191515
Node *cmd_node;
1516+
const char *cmdtag;
15201517
MemoryContext cmd_context;
15211518
MemoryContext old_context;
1522-
QueryCompletion qc;
15231519

15241520
/*
15251521
* If WAL sender has been told that shutdown is getting close, switch its
@@ -1603,51 +1599,67 @@ exec_replication_command(const char *cmd_string)
16031599
switch (cmd_node->type)
16041600
{
16051601
case T_IdentifySystemCmd:
1602+
cmdtag = "IDENTIFY_SYSTEM";
16061603
IdentifySystem();
1604+
EndReplicationCommand(cmdtag);
16071605
break;
16081606

16091607
case T_BaseBackupCmd:
1610-
PreventInTransactionBlock(true, "BASE_BACKUP");
1608+
cmdtag = "BASE_BACKUP";
1609+
PreventInTransactionBlock(true, cmdtag);
16111610
SendBaseBackup((BaseBackupCmd *) cmd_node);
1611+
EndReplicationCommand(cmdtag);
16121612
break;
16131613

16141614
case T_CreateReplicationSlotCmd:
1615+
cmdtag = "CREATE_REPLICATION_SLOT";
16151616
CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
1617+
EndReplicationCommand(cmdtag);
16161618
break;
16171619

16181620
case T_DropReplicationSlotCmd:
1621+
cmdtag = "DROP_REPLICATION_SLOT";
16191622
DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
1623+
EndReplicationCommand(cmdtag);
16201624
break;
16211625

16221626
case T_StartReplicationCmd:
16231627
{
16241628
StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
16251629

1626-
PreventInTransactionBlock(true, "START_REPLICATION");
1630+
cmdtag = "START_REPLICATION";
1631+
PreventInTransactionBlock(true, cmdtag);
16271632

16281633
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
16291634
StartReplication(cmd);
16301635
else
16311636
StartLogicalReplication(cmd);
16321637

1638+
/* callees already sent their own completion message */
1639+
16331640
Assert(xlogreader != NULL);
16341641
break;
16351642
}
16361643

16371644
case T_TimeLineHistoryCmd:
1638-
PreventInTransactionBlock(true, "TIMELINE_HISTORY");
1645+
cmdtag = "TIMELINE_HISTORY";
1646+
PreventInTransactionBlock(true, cmdtag);
16391647
SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
1648+
EndReplicationCommand(cmdtag);
16401649
break;
16411650

16421651
case T_VariableShowStmt:
16431652
{
16441653
DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
16451654
VariableShowStmt *n = (VariableShowStmt *) cmd_node;
16461655

1656+
cmdtag = "SHOW";
1657+
16471658
/* syscache access needs a transaction environment */
16481659
StartTransactionCommand();
16491660
GetPGVariable(n->name, dest);
16501661
CommitTransactionCommand();
1662+
EndReplicationCommand(cmdtag);
16511663
}
16521664
break;
16531665

@@ -1671,10 +1683,6 @@ exec_replication_command(const char *cmd_string)
16711683
MemoryContextSwitchTo(old_context);
16721684
MemoryContextDelete(cmd_context);
16731685

1674-
/* Send CommandComplete message */
1675-
SetQueryCompletion(&qc, CMDTAG_SELECT, 0);
1676-
EndCommand(&qc, DestRemote, true);
1677-
16781686
/* Report to pgstat that this process is now idle */
16791687
pgstat_report_activity(STATE_IDLE, NULL);
16801688

src/backend/tcop/dest.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,18 @@ EndCommand(const QueryCompletion *qc, CommandDest dest, bool force_undecorated_o
211211
}
212212
}
213213

214+
/* ----------------
215+
* EndReplicationCommand - stripped down version of EndCommand
216+
*
217+
* For use by replication commands.
218+
* ----------------
219+
*/
220+
void
221+
EndReplicationCommand(const char *commandTag)
222+
{
223+
pq_putmessage('C', commandTag, strlen(commandTag) + 1);
224+
}
225+
214226
/* ----------------
215227
* NullCommand - tell dest that an empty query string was recognized
216228
*

src/include/tcop/cmdtaglist.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,6 @@ PG_CMDTAG(CMDTAG_DROP_OWNED, "DROP OWNED", true, false, false)
157157
PG_CMDTAG(CMDTAG_DROP_POLICY, "DROP POLICY", true, false, false)
158158
PG_CMDTAG(CMDTAG_DROP_PROCEDURE, "DROP PROCEDURE", true, false, false)
159159
PG_CMDTAG(CMDTAG_DROP_PUBLICATION, "DROP PUBLICATION", true, false, false)
160-
PG_CMDTAG(CMDTAG_DROP_REPLICATION_SLOT, "DROP REPLICATION SLOT", false, false, false)
161160
PG_CMDTAG(CMDTAG_DROP_ROLE, "DROP ROLE", false, false, false)
162161
PG_CMDTAG(CMDTAG_DROP_ROUTINE, "DROP ROUTINE", true, false, false)
163162
PG_CMDTAG(CMDTAG_DROP_RULE, "DROP RULE", true, false, false)

src/include/tcop/dest.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ extern void BeginCommand(CommandTag commandTag, CommandDest dest);
139139
extern DestReceiver *CreateDestReceiver(CommandDest dest);
140140
extern void EndCommand(const QueryCompletion *qc, CommandDest dest,
141141
bool force_undecorated_output);
142+
extern void EndReplicationCommand(const char *commandTag);
142143

143144
/* Additional functions that go with destination management, more or less. */
144145

0 commit comments

Comments
 (0)