Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e6b4f3d

Browse files
committed
Merge branch 'master' of github.com:postgrespro/postgres_cluster
2 parents 4848fad + b2b082b commit e6b4f3d

File tree

9 files changed

+96
-9
lines changed

9 files changed

+96
-9
lines changed

contrib/mmts/multimaster.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -530,7 +530,7 @@ MtmAdjustOldestXid(TransactionId xid)
530530
if (prev != NULL) {
531531
Mtm->transListHead = prev;
532532
Mtm->oldestXid = xid = prev->xid;
533-
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, olderstSnapshot=%ld", MyProcPid, xid, oldestSnapshot);
533+
MTM_LOG2("%d: MtmAdjustOldestXid: oldestXid=%d, oldestSnapshot=%ld", MyProcPid, xid, oldestSnapshot);
534534
} else if (TransactionIdPrecedes(Mtm->oldestXid, xid)) {
535535
xid = Mtm->oldestXid;
536536
}

contrib/mmts/tests2/lib/bank_client.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ def check_total(self):
133133
def tx(conn, cur):
134134
cur.execute('select sum(amount) from bank_test')
135135
res = cur.fetchone()
136+
conn.commit()
136137
if res[0] != 0:
137138
print("Isolation error, total = %d" % (res[0],))
138139
raise BaseException

contrib/raftable/raft/include/raft.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ raft_t raft_init(raft_config_t *config);
6060
*/
6161
raft_bool_t raft_peer_up(raft_t r, int id, char *host, int port, raft_bool_t self);
6262

63+
/*
64+
* Returns the number of entried applied by the leader.
65+
*/
66+
int raft_progress(raft_t r);
67+
6368
/*
6469
* Remove a previously added peer named 'id'.
6570
*/

contrib/raftable/raft/src/raft.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1182,3 +1182,7 @@ bool raft_is_leader(raft_t r) {
11821182
int raft_get_leader(raft_t r) {
11831183
return r->leader;
11841184
}
1185+
1186+
int raft_progress(raft_t r) {
1187+
return r->log.applied;
1188+
}

contrib/raftable/raftable--1.0.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION raftable" to load this file. \quit
33

4+
-- sync
5+
CREATE FUNCTION raftable_sync(timeout_ms int)
6+
RETURNS void
7+
AS 'MODULE_PATHNAME','raftable_sql_sync'
8+
LANGUAGE C;
9+
410
-- get
511
CREATE FUNCTION raftable(key varchar(64))
612
RETURNS text

contrib/raftable/raftable.c

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ PG_MODULE_MAGIC;
3838

3939
PG_FUNCTION_INFO_V1(raftable_sql_get);
4040
PG_FUNCTION_INFO_V1(raftable_sql_set);
41+
PG_FUNCTION_INFO_V1(raftable_sql_sync);
4142
PG_FUNCTION_INFO_V1(raftable_sql_list);
4243

4344
static struct {
@@ -327,6 +328,47 @@ static bool try_sending_update(RaftableUpdate *ru, size_t size, timeout_t *timeo
327328
return true;
328329
}
329330

331+
bool raftable_sync(int timeout_ms)
332+
{
333+
RaftableUpdate ru;
334+
size_t size = sizeof(ru);
335+
timeout_t timeout;
336+
337+
Assert(wcfg.id >= 0);
338+
339+
ru.expector = wcfg.id;
340+
ru.fieldnum = 0;
341+
342+
if (timeout_ms < 0)
343+
{
344+
while (true)
345+
{
346+
timeout_start(&timeout, 100);
347+
348+
if (try_sending_update(&ru, size, &timeout))
349+
return true;
350+
else
351+
disconnect_leader();
352+
}
353+
}
354+
else
355+
{
356+
timeout_start(&timeout, timeout_ms);
357+
358+
TIMEOUT_LOOP_START(&timeout);
359+
{
360+
if (try_sending_update(&ru, size, &timeout))
361+
return true;
362+
else
363+
disconnect_leader();
364+
}
365+
TIMEOUT_LOOP_END(&timeout);
366+
}
367+
368+
elog(WARNING, "failed to sync after %d ms", timeout_elapsed_ms(&timeout));
369+
return false;
370+
}
371+
330372
bool raftable_set(const char *key, const char *value, size_t vallen, int timeout_ms)
331373
{
332374
RaftableField *f;
@@ -408,6 +450,16 @@ raftable_sql_set(PG_FUNCTION_ARGS)
408450
PG_RETURN_VOID();
409451
}
410452

453+
Datum
454+
raftable_sql_sync(PG_FUNCTION_ARGS)
455+
{
456+
int timeout_ms = PG_GETARG_INT32(0);
457+
458+
raftable_sync(timeout_ms);
459+
460+
PG_RETURN_VOID();
461+
}
462+
411463
void raftable_every(void (*func)(const char *, const char *, size_t, void *), void *arg)
412464
{
413465
void *scan;

contrib/raftable/raftable.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
#ifndef __RAFTABLE_H__
22
#define __RAFTABLE_H__
33

4+
/* Syncs with the raftable leader. Gives up after 'timeout_ms' milliseconds. */
5+
bool raftable_sync(int timeout_ms);
6+
47
/* Gets value by key. Returns the value or NULL if not found. */
58
char *raftable_get(const char *key, size_t *vallen);
69

contrib/raftable/t/000_basic.pl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ sub start_nodes
7777
$able->psql('postgres', "select raftable('world', '$tests{world}', $timeout_ms);");
7878

7979
$baker->start;
80-
sleep(5);
80+
$baker->psql('postgres', "select raftable_sync($timeout_ms);");
8181
while (my ($key, $value) = each(%tests))
8282
{
8383
my ($rc, $stdout, $stderr) = $baker->psql('postgres', "select raftable('$key');");

contrib/raftable/worker.c

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -322,14 +322,30 @@ static void attend(Client *c)
322322

323323
Assert(c->expect.id == NOBODY); /* client shouldn't send multiple updates at once */
324324

325-
u.len = c->msglen;
326-
u.data = c->msg;
327325
c->expect.id = ru->expector;
328-
index = raft_emit(raft, u);
329-
if (index >= 0)
330-
c->expect.index = index;
331-
else
332-
c->good = false;
326+
if (ru->fieldnum > 0) {
327+
// an actual update
328+
u.len = c->msglen;
329+
u.data = c->msg;
330+
index = raft_emit(raft, u);
331+
if (index >= 0)
332+
c->expect.index = index;
333+
else
334+
c->good = false;
335+
} else {
336+
// a sync command
337+
c->expect.index = raft_progress(raft);
338+
if (raft_applied(raft, c->expect.id, c->expect.index))
339+
{
340+
int ok = 1;
341+
if (send(c->sock, &ok, sizeof(ok), 0) != sizeof(ok))
342+
{
343+
fprintf(stderr, "failed to notify client\n");
344+
c->good = false;
345+
}
346+
c->expect.id = NOBODY;
347+
}
348+
}
333349
pfree(c->msg);
334350
c->msg = NULL;
335351
}

0 commit comments

Comments
 (0)