static void SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
- List *sync_standbys);
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys);
static void SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr,
- List *sync_standbys, uint8 nth);
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys,
+ uint8 nth);
static int SyncRepGetStandbyPriority(void);
static List *SyncRepGetSyncStandbysPriority(bool *am_sync);
static List *SyncRepGetSyncStandbysQuorum(bool *am_sync);
+static int standby_priority_comparator(const void *a, const void *b);
static int cmp_lsn(const void *a, const void *b);
#ifdef USE_ASSERT_CHECKING
priority = SyncRepGetStandbyPriority();
if (MyWalSnd->sync_standby_priority != priority)
{
- LWLockAcquire(SyncRepLock, LW_EXCLUSIVE);
+ SpinLockAcquire(&MyWalSnd->mutex);
MyWalSnd->sync_standby_priority = priority;
- LWLockRelease(SyncRepLock);
+ SpinLockRelease(&MyWalSnd->mutex);
+
ereport(DEBUG1,
(errmsg("standby \"%s\" now has synchronous standby priority %u",
application_name, priority)));
/*
* Check whether we are a sync standby or not, and calculate the synced
- * positions among all sync standbys.
+ * positions among all sync standbys. (Note: although this step does not
+ * of itself require holding SyncRepLock, it seems like a good idea to do
+ * it after acquiring the lock. This ensures that the WAL pointers we use
+ * to release waiters are newer than any previous execution of this
+ * routine used.)
*/
got_recptr = SyncRepGetSyncRecPtr(&writePtr, &flushPtr, &applyPtr, &am_sync);
SyncRepGetSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
XLogRecPtr *applyPtr, bool *am_sync)
{
- List *sync_standbys;
+ SyncRepStandbyData *sync_standbys;
+ int num_standbys;
+ int i;
+ /* Initialize default results */
*writePtr = InvalidXLogRecPtr;
*flushPtr = InvalidXLogRecPtr;
*applyPtr = InvalidXLogRecPtr;
*am_sync = false;
+ /* Quick out if not even configured to be synchronous */
+ if (SyncRepConfig == NULL)
+ return false;
+
/* Get standbys that are considered as synchronous at this moment */
- sync_standbys = SyncRepGetSyncStandbys(am_sync);
+ num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
+
+ /* Am I among the candidate sync standbys? */
+ for (i = 0; i < num_standbys; i++)
+ {
+ if (sync_standbys[i].is_me)
+ {
+ *am_sync = true;
+ break;
+ }
+ }
/*
- * Quick exit if we are not managing a sync standby or there are not
- * enough synchronous standbys.
+ * Nothing more to do if we are not managing a sync standby or there are
+ * not enough synchronous standbys.
*/
if (!(*am_sync) ||
- SyncRepConfig == NULL ||
- list_length(sync_standbys) < SyncRepConfig->num_sync)
+ num_standbys < SyncRepConfig->num_sync)
{
- list_free(sync_standbys);
+ pfree(sync_standbys);
return false;
}
if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY)
{
SyncRepGetOldestSyncRecPtr(writePtr, flushPtr, applyPtr,
- sync_standbys);
+ sync_standbys, num_standbys);
}
else
{
SyncRepGetNthLatestSyncRecPtr(writePtr, flushPtr, applyPtr,
- sync_standbys, SyncRepConfig->num_sync);
+ sync_standbys, num_standbys,
+ SyncRepConfig->num_sync);
}
- list_free(sync_standbys);
+ pfree(sync_standbys);
return true;
}
* Calculate the oldest Write, Flush and Apply positions among sync standbys.
*/
static void
-SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, List *sync_standbys)
+SyncRepGetOldestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys)
{
- ListCell *cell;
+ int i;
/*
* Scan through all sync standbys and calculate the oldest Write, Flush
- * and Apply positions.
+ * and Apply positions. We assume *writePtr et al were initialized to
+ * InvalidXLogRecPtr.
*/
- foreach(cell, sync_standbys)
+ for (i = 0; i < num_standbys; i++)
{
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
- XLogRecPtr write;
- XLogRecPtr flush;
- XLogRecPtr apply;
-
- SpinLockAcquire(&walsnd->mutex);
- write = walsnd->write;
- flush = walsnd->flush;
- apply = walsnd->apply;
- SpinLockRelease(&walsnd->mutex);
+ XLogRecPtr write = sync_standbys[i].write;
+ XLogRecPtr flush = sync_standbys[i].flush;
+ XLogRecPtr apply = sync_standbys[i].apply;
if (XLogRecPtrIsInvalid(*writePtr) || *writePtr > write)
*writePtr = write;
* standbys.
*/
static void
-SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr, XLogRecPtr *flushPtr,
- XLogRecPtr *applyPtr, List *sync_standbys, uint8 nth)
+SyncRepGetNthLatestSyncRecPtr(XLogRecPtr *writePtr,
+ XLogRecPtr *flushPtr,
+ XLogRecPtr *applyPtr,
+ SyncRepStandbyData *sync_standbys,
+ int num_standbys,
+ uint8 nth)
{
- ListCell *cell;
XLogRecPtr *write_array;
XLogRecPtr *flush_array;
XLogRecPtr *apply_array;
- int len;
- int i = 0;
-
- len = list_length(sync_standbys);
- write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
- flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
- apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * len);
+ int i;
- foreach(cell, sync_standbys)
- {
- WalSnd *walsnd = &WalSndCtl->walsnds[lfirst_int(cell)];
+ /* Should have enough candidates, or somebody messed up */
+ Assert(nth > 0 && nth <= num_standbys);
- SpinLockAcquire(&walsnd->mutex);
- write_array[i] = walsnd->write;
- flush_array[i] = walsnd->flush;
- apply_array[i] = walsnd->apply;
- SpinLockRelease(&walsnd->mutex);
+ write_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+ flush_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
+ apply_array = (XLogRecPtr *) palloc(sizeof(XLogRecPtr) * num_standbys);
- i++;
+ for (i = 0; i < num_standbys; i++)
+ {
+ write_array[i] = sync_standbys[i].write;
+ flush_array[i] = sync_standbys[i].flush;
+ apply_array[i] = sync_standbys[i].apply;
}
/* Sort each array in descending order */
- qsort(write_array, len, sizeof(XLogRecPtr), cmp_lsn);
- qsort(flush_array, len, sizeof(XLogRecPtr), cmp_lsn);
- qsort(apply_array, len, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(write_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(flush_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
+ qsort(apply_array, num_standbys, sizeof(XLogRecPtr), cmp_lsn);
/* Get Nth latest Write, Flush, Apply positions */
*writePtr = write_array[nth - 1];
return 1;
}
+/*
+ * Return data about walsenders that are candidates to be sync standbys.
+ *
+ * *standbys is set to a palloc'd array of structs of per-walsender data,
+ * and the number of valid entries (candidate sync senders) is returned.
+ * (This might be more or fewer than num_sync; caller must check.)
+ */
+int
+SyncRepGetCandidateStandbys(SyncRepStandbyData **standbys)
+{
+ int i;
+ int n;
+
+ /* Create result array */
+ *standbys = (SyncRepStandbyData *)
+ palloc(max_wal_senders * sizeof(SyncRepStandbyData));
+
+ /* Quick exit if sync replication is not requested */
+ if (SyncRepConfig == NULL)
+ return 0;
+
+ /* Collect raw data from shared memory */
+ n = 0;
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ volatile WalSnd *walsnd; /* Use volatile pointer to prevent code
+ * rearrangement */
+ SyncRepStandbyData *stby;
+ WalSndState state; /* not included in SyncRepStandbyData */
+
+ walsnd = &WalSndCtl->walsnds[i];
+ stby = *standbys + n;
+
+ SpinLockAcquire(&walsnd->mutex);
+ stby->pid = walsnd->pid;
+ state = walsnd->state;
+ stby->write = walsnd->write;
+ stby->flush = walsnd->flush;
+ stby->apply = walsnd->apply;
+ stby->sync_standby_priority = walsnd->sync_standby_priority;
+ SpinLockRelease(&walsnd->mutex);
+
+ /* Must be active */
+ if (stby->pid == 0)
+ continue;
+
+ /* Must be streaming or stopping */
+ if (state != WALSNDSTATE_STREAMING &&
+ state != WALSNDSTATE_STOPPING)
+ continue;
+
+ /* Must be synchronous */
+ if (stby->sync_standby_priority == 0)
+ continue;
+
+ /* Must have a valid flush position */
+ if (XLogRecPtrIsInvalid(stby->flush))
+ continue;
+
+ /* OK, it's a candidate */
+ stby->walsnd_index = i;
+ stby->is_me = (walsnd == MyWalSnd);
+ n++;
+ }
+
+ /*
+ * In quorum mode, we return all the candidates. In priority mode, if we
+ * have too many candidates then return only the num_sync ones of highest
+ * priority.
+ */
+ if (SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY &&
+ n > SyncRepConfig->num_sync)
+ {
+ /* Sort by priority ... */
+ qsort(*standbys, n, sizeof(SyncRepStandbyData),
+ standby_priority_comparator);
+ /* ... then report just the first num_sync ones */
+ n = SyncRepConfig->num_sync;
+ }
+
+ return n;
+}
+
+/*
+ * qsort comparator to sort SyncRepStandbyData entries by priority
+ */
+static int
+standby_priority_comparator(const void *a, const void *b)
+{
+ const SyncRepStandbyData *sa = (const SyncRepStandbyData *) a;
+ const SyncRepStandbyData *sb = (const SyncRepStandbyData *) b;
+
+ /* First, sort by increasing priority value */
+ if (sa->sync_standby_priority != sb->sync_standby_priority)
+ return sa->sync_standby_priority - sb->sync_standby_priority;
+
+ /*
+ * We might have equal priority values; arbitrarily break ties by position
+ * in the WALSnd array. (This is utterly bogus, since that is arrival
+ * order dependent, but there are regression tests that rely on it.)
+ */
+ return sa->walsnd_index - sb->walsnd_index;
+}
+
+
/*
* Return the list of sync standbys, or NIL if no sync standby is connected.
*
*
* On return, *am_sync is set to true if this walsender is connecting to
* sync standby. Otherwise it's set to false.
+ *
+ * XXX This function is BROKEN and should not be used in new code. It has
+ * an inherent race condition, since the returned list of integer indexes
+ * might no longer correspond to reality.
*/
List *
SyncRepGetSyncStandbys(bool *am_sync)
priority = next_highest_priority;
}
- /* never reached, but keep compiler quiet */
- Assert(false);
+ /*
+ * We might get here if the set of sync_standby_priority values in shared
+ * memory is inconsistent, as can happen transiently after a change in the
+ * synchronous_standby_names setting. In that case, just return the
+ * incomplete list we have so far. That will cause the caller to decide
+ * there aren't enough synchronous candidates, which should be a safe
+ * choice until the priority values become consistent again.
+ */
+ list_free(pending);
return result;
}
* Found a free slot. Reserve it for us.
*/
walsnd->pid = MyProcPid;
+ walsnd->state = WALSNDSTATE_STARTUP;
walsnd->sentPtr = InvalidXLogRecPtr;
+ walsnd->needreload = false;
walsnd->write = InvalidXLogRecPtr;
walsnd->flush = InvalidXLogRecPtr;
walsnd->apply = InvalidXLogRecPtr;
walsnd->writeLag = -1;
walsnd->flushLag = -1;
walsnd->applyLag = -1;
- walsnd->state = WALSNDSTATE_STARTUP;
+ walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
SpinLockRelease(&walsnd->mutex);
Tuplestorestate *tupstore;
MemoryContext per_query_ctx;
MemoryContext oldcontext;
- List *sync_standbys;
+ SyncRepStandbyData *sync_standbys;
+ int num_standbys;
int i;
/* check to see if caller supports us returning a tuplestore */
MemoryContextSwitchTo(oldcontext);
/*
- * Get the currently active synchronous standbys.
+ * Get the currently active synchronous standbys. This could be out of
+ * date before we're done, but we'll use the data anyway.
*/
- LWLockAcquire(SyncRepLock, LW_SHARED);
- sync_standbys = SyncRepGetSyncStandbys(NULL);
- LWLockRelease(SyncRepLock);
+ num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
for (i = 0; i < max_wal_senders; i++)
{
int pid;
WalSndState state;
TimestampTz replyTime;
+ bool is_sync_standby;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+ int j;
+ /* Collect data from shared memory */
SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
{
replyTime = walsnd->replyTime;
SpinLockRelease(&walsnd->mutex);
+ /*
+ * Detect whether walsender is/was considered synchronous. We can
+ * provide some protection against stale data by checking the PID
+ * along with walsnd_index.
+ */
+ is_sync_standby = false;
+ for (j = 0; j < num_standbys; j++)
+ {
+ if (sync_standbys[j].walsnd_index == i &&
+ sync_standbys[j].pid == pid)
+ {
+ is_sync_standby = true;
+ break;
+ }
+ }
+
memset(nulls, 0, sizeof(nulls));
values[0] = Int32GetDatum(pid);
*/
if (priority == 0)
values[10] = CStringGetTextDatum("async");
- else if (list_member_int(sync_standbys, i))
+ else if (is_sync_standby)
values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else