Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_subscription.c16
-rw-r--r--src/backend/commands/subscriptioncmds.c4
-rw-r--r--src/backend/utils/adt/pg_upgrade_support.c106
3 files changed, 122 insertions, 4 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d6a978f1362..7167377d82e 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -228,10 +228,14 @@ textarray_to_stringlist(ArrayType *textarray)
/*
* Add new state record for a subscription table.
+ *
+ * If retain_lock is true, then don't release the locks taken in this function.
+ * We normally release the locks at the end of transaction but in binary-upgrade
+ * mode, we expect to release those immediately.
*/
void
AddSubscriptionRelState(Oid subid, Oid relid, char state,
- XLogRecPtr sublsn)
+ XLogRecPtr sublsn, bool retain_lock)
{
Relation rel;
HeapTuple tup;
@@ -269,7 +273,15 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
heap_freetuple(tup);
/* Cleanup. */
- table_close(rel, NoLock);
+ if (retain_lock)
+ {
+ table_close(rel, NoLock);
+ }
+ else
+ {
+ table_close(rel, RowExclusiveLock);
+ UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
+ }
}
/*
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index edc82c11beb..dd067d39ade 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -773,7 +773,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
rv->schemaname, rv->relname);
AddSubscriptionRelState(subid, relid, table_state,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr, true);
}
/*
@@ -943,7 +943,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
- InvalidXLogRecPtr);
+ InvalidXLogRecPtr, true);
ereport(DEBUG1,
(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
rv->schemaname, rv->relname, sub->name)));
diff --git a/src/backend/utils/adt/pg_upgrade_support.c b/src/backend/utils/adt/pg_upgrade_support.c
index 92921b0239d..14368aafbe0 100644
--- a/src/backend/utils/adt/pg_upgrade_support.c
+++ b/src/backend/utils/adt/pg_upgrade_support.c
@@ -11,15 +11,24 @@
#include "postgres.h"
+#include "access/relation.h"
+#include "access/table.h"
#include "catalog/binary_upgrade.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
+#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
#include "replication/logical.h"
+#include "replication/origin.h"
+#include "replication/worker_internal.h"
+#include "storage/lmgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
+#include "utils/lsyscache.h"
+#include "utils/pg_lsn.h"
+#include "utils/syscache.h"
#define CHECK_IS_BINARY_UPGRADE \
@@ -305,3 +314,100 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
PG_RETURN_BOOL(!found_pending_wal);
}
+
+/*
+ * binary_upgrade_add_sub_rel_state
+ *
+ * Add the relation with the specified relation state to pg_subscription_rel
+ * catalog.
+ */
+Datum
+binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS)
+{
+ Relation subrel;
+ Relation rel;
+ Oid subid;
+ char *subname;
+ Oid relid;
+ char relstate;
+ XLogRecPtr sublsn;
+
+ CHECK_IS_BINARY_UPGRADE;
+
+ /* We must check these things before dereferencing the arguments */
+ if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
+ elog(ERROR, "null argument to binary_upgrade_add_sub_rel_state is not allowed");
+
+ subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ relid = PG_GETARG_OID(1);
+ relstate = PG_GETARG_CHAR(2);
+ sublsn = PG_ARGISNULL(3) ? InvalidXLogRecPtr : PG_GETARG_LSN(3);
+
+ subrel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ subid = get_subscription_oid(subname, false);
+ rel = relation_open(relid, AccessShareLock);
+
+ /*
+ * Since there are no concurrent ALTER/DROP SUBSCRIPTION commands during
+ * the upgrade process, and the apply worker (which builds cache based on
+ * the subscription catalog) is not running, the locks can be released
+ * immediately.
+ */
+ AddSubscriptionRelState(subid, relid, relstate, sublsn, false);
+ relation_close(rel, AccessShareLock);
+ table_close(subrel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}
+
+/*
+ * binary_upgrade_replorigin_advance
+ *
+ * Update the remote_lsn for the subscriber's replication origin.
+ */
+Datum
+binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
+{
+ Relation rel;
+ Oid subid;
+ char *subname;
+ char originname[NAMEDATALEN];
+ RepOriginId node;
+ XLogRecPtr remote_commit;
+
+ CHECK_IS_BINARY_UPGRADE;
+
+ /*
+ * We must ensure a non-NULL subscription name before dereferencing the
+ * arguments.
+ */
+ if (PG_ARGISNULL(0))
+ elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed");
+
+ subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
+ remote_commit = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1);
+
+ rel = table_open(SubscriptionRelationId, RowExclusiveLock);
+ subid = get_subscription_oid(subname, false);
+
+ ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));
+
+ /* Lock to prevent the replication origin from vanishing */
+ LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ node = replorigin_by_name(originname, false);
+
+ /*
+ * The server will be stopped after setting up the objects in the new
+ * cluster and the origins will be flushed during the shutdown checkpoint.
+ * This will ensure that the latest LSN values for origin will be
+ * available after the upgrade.
+ */
+ replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
+ false /* backward */ ,
+ false /* WAL log */ );
+
+ UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
+ table_close(rel, RowExclusiveLock);
+
+ PG_RETURN_VOID();
+}