Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 75895d3

Browse files
knizhnikkelvich
authored andcommitted
New DDL replication mechanism
1 parent 3911ec5 commit 75895d3

File tree

9 files changed

+134
-51
lines changed

9 files changed

+134
-51
lines changed

multimaster--1.0.sql

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
-- complain if script is sourced in psql, rather than via CREATE EXTENSION
22
\echo Use "CREATE EXTENSION multimaster" to load this file. \quit
33

4-
CREATE FUNCTION mtm_start_replication() RETURNS void
4+
CREATE FUNCTION mtm.start_replication() RETURNS void
55
AS 'MODULE_PATHNAME','mtm_start_replication'
66
LANGUAGE C;
77

8-
CREATE FUNCTION mtm_stop_replication() RETURNS void
8+
CREATE FUNCTION mtm.stop_replication() RETURNS void
99
AS 'MODULE_PATHNAME','mtm_stop_replication'
1010
LANGUAGE C;
1111

12-
CREATE FUNCTION mtm_drop_node(node integer, drop_slot bool default false) RETURNS void
12+
CREATE FUNCTION mtm.drop_node(node integer, drop_slot bool default false) RETURNS void
1313
AS 'MODULE_PATHNAME','mtm_drop_node'
1414
LANGUAGE C;
1515

16-
CREATE FUNCTION mtm_get_snapshot() RETURNS bigint
16+
CREATE FUNCTION mtm.get_snapshot() RETURNS bigint
1717
AS 'MODULE_PATHNAME','mtm_get_snapshot'
1818
LANGUAGE C;
1919

20+
CREATE TABLE IF NOT EXISTS mtm.ddl_log (issued timestamp with time zone not null, query text);

multimaster.c

Lines changed: 62 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,9 @@
4848
#include "replication/slot.h"
4949
#include "port/atomics.h"
5050
#include "tcop/utility.h"
51+
#include "nodes/makefuncs.h"
52+
#include "access/htup_details.h"
53+
#include "catalog/indexing.h"
5154

5255
#include "multimaster.h"
5356

@@ -865,7 +868,7 @@ mtm_drop_node(PG_FUNCTION_ARGS)
865868
dtm->nNodes -= 1;
866869
if (!IsTransactionBlock())
867870
{
868-
MtmBroadcastUtilityStmt(psprintf("select mtm_drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
871+
MtmBroadcastUtilityStmt(psprintf("select multimaster.drop_node(%d,%s)", nodeId, dropSlot ? "true" : "false"), true);
869872
}
870873
if (dropSlot)
871874
{
@@ -880,7 +883,7 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
880883
{
881884
PG_RETURN_INT64(dtmTx.snapshot);
882885
}
883-
886+
884887
/*
885888
* Execute statement with specified parameters and check its result
886889
*/
@@ -926,7 +929,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
926929
failedNode = i;
927930
do {
928931
PQfinish(conns[i]);
929-
} while (--i >= 0);
932+
} while (--i >= 0);
930933
elog(ERROR, "Failed to establish connection '%s' to node %d", conn_str, failedNode);
931934
}
932935
}
@@ -935,7 +938,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
935938
i += 1;
936939
}
937940
Assert(i == MtmNodes);
938-
941+
939942
for (i = 0; i < MtmNodes; i++)
940943
{
941944
if (conns[i])
@@ -972,7 +975,7 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
972975
failedNode = i;
973976
}
974977
}
975-
}
978+
}
976979
for (i = 0; i < MtmNodes; i++)
977980
{
978981
if (conns[i])
@@ -986,6 +989,48 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
986989
}
987990
}
988991

992+
static void MtmProcessDDLCommand(char const* queryString)
993+
{
994+
RangeVar *rv;
995+
Relation rel;
996+
TupleDesc tupDesc;
997+
HeapTuple tup;
998+
Datum values[Natts_mtm_ddl_log];
999+
bool nulls[Natts_mtm_ddl_log];
1000+
TimestampTz ts = GetCurrentTimestamp();
1001+
1002+
rv = makeRangeVar(MULTIMASTER_SCHEMA_NAME, MULTIMASTER_DDL_TABLE, -1);
1003+
rel = heap_openrv_extended(rv, RowExclusiveLock, true);
1004+
1005+
if (rel == NULL) {
1006+
return;
1007+
}
1008+
1009+
tupDesc = RelationGetDescr(rel);
1010+
1011+
/* Form a tuple. */
1012+
memset(nulls, false, sizeof(nulls));
1013+
1014+
values[Anum_mtm_ddl_log_issued - 1] = TimestampTzGetDatum(ts);
1015+
values[Anum_mtm_ddl_log_query - 1] = CStringGetTextDatum(queryString);
1016+
1017+
tup = heap_form_tuple(tupDesc, values, nulls);
1018+
1019+
/* Insert the tuple to the catalog. */
1020+
simple_heap_insert(rel, tup);
1021+
1022+
/* Update the indexes. */
1023+
CatalogUpdateIndexes(rel, tup);
1024+
1025+
/* Cleanup. */
1026+
heap_freetuple(tup);
1027+
heap_close(rel, RowExclusiveLock);
1028+
1029+
elog(WARNING, "Replicate command: '%s'", queryString);
1030+
1031+
dtmTx.containsDML = true;
1032+
}
1033+
9891034
static void MtmProcessUtility(Node *parsetree, const char *queryString,
9901035
ProcessUtilityContext context, ParamListInfo params,
9911036
DestReceiver *dest, char *completionTag)
@@ -1013,22 +1058,18 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
10131058
skipCommand = false;
10141059
break;
10151060
}
1016-
if (skipCommand || IsTransactionBlock()) {
1017-
if (PreviousProcessUtilityHook != NULL)
1018-
{
1019-
PreviousProcessUtilityHook(parsetree, queryString, context,
1020-
params, dest, completionTag);
1021-
}
1022-
else
1023-
{
1024-
standard_ProcessUtility(parsetree, queryString, context,
1025-
params, dest, completionTag);
1026-
}
1027-
if (!skipCommand) {
1028-
dtmTx.isDistributed = false;
1029-
}
1030-
} else {
1031-
MtmBroadcastUtilityStmt(queryString, false);
1061+
if (!skipCommand && !dtmTx.isReplicated) {
1062+
MtmProcessDDLCommand(queryString);
1063+
}
1064+
if (PreviousProcessUtilityHook != NULL)
1065+
{
1066+
PreviousProcessUtilityHook(parsetree, queryString, context,
1067+
params, dest, completionTag);
1068+
}
1069+
else
1070+
{
1071+
standard_ProcessUtility(parsetree, queryString, context,
1072+
params, dest, completionTag);
10321073
}
10331074
}
10341075

multimaster.control

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
comment = 'Multimaster'
22
default_version = '1.0'
33
module_pathname = '$libdir/multimaster'
4-
relocatable = true
4+
schema = mtm
5+
relocatable = false

multimaster.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#include "bgwpool.h"
66

77
#define MTM_INFO(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
8-
#define MTM_TRACE(fmt, ...)
8+
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
99
#define MTM_TUPLE_TRACE(fmt, ...)
1010
/*
1111
#define MTM_TRACE(fmt, ...) fprintf(stderr, fmt, ## __VA_ARGS__)
@@ -14,7 +14,14 @@
1414

1515
#define BIT_SET(mask, bit) ((mask) & ((int64)1 << (bit)))
1616

17-
#define MULTIMASTER_NAME "mmts"
17+
#define MULTIMASTER_NAME "mtm"
18+
#define MULTIMASTER_SCHEMA_NAME "mtm"
19+
#define MULTIMASTER_DDL_TABLE "ddl_log"
20+
21+
#define Natts_mtm_ddl_log 2
22+
#define Anum_mtm_ddl_log_issued 1
23+
#define Anum_mtm_ddl_log_query 2
24+
1825

1926
typedef uint64 csn_t; /* commit serial number */
2027
#define INVALID_CSN ((csn_t)-1)

pglogical_apply.c

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -472,13 +472,14 @@ process_remote_commit(StringInfo s)
472472
static void
473473
process_remote_insert(StringInfo s, Relation rel)
474474
{
475-
EState *estate;
476-
TupleData new_tuple;
475+
EState *estate;
476+
TupleData new_tuple;
477477
TupleTableSlot *newslot;
478478
TupleTableSlot *oldslot;
479479
ResultRelInfo *relinfo;
480-
ScanKey *index_keys;
481-
int i;
480+
ScanKey *index_keys;
481+
char* relname = RelationGetRelationName(rel);
482+
int i;
482483

483484
estate = create_rel_estate(rel);
484485
newslot = ExecInitExtraTupleSlot(estate);
@@ -560,6 +561,18 @@ process_remote_insert(StringInfo s, Relation rel)
560561
FreeExecutorState(estate);
561562

562563
CommandCounterIncrement();
564+
565+
if (strcmp(relname, MULTIMASTER_DDL_TABLE) == 0) {
566+
char* ddl = TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
567+
int rc;
568+
SPI_connect();
569+
rc = SPI_execute(ddl, false, 0);
570+
SPI_finish();
571+
if (rc != SPI_OK_UTILITY) {
572+
elog(ERROR, "Failed to execute utility statement %s", ddl);
573+
}
574+
}
575+
563576
}
564577

565578
static void

tests/dtmbench.cpp

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -180,28 +180,20 @@ void* writer(void* arg)
180180
void initializeDatabase()
181181
{
182182
connection conn(cfg.connections[0]);
183-
printf("creating extension\n");
184-
{
185-
nontransaction txn(conn);
186-
exec(txn, "drop extension if exists multimaster");
187-
exec(txn, "create extension multimaster");
188-
}
189-
printf("extension created\n");
190-
191-
printf("creating table t\n");
183+
time_t start = getCurrentTime();
184+
printf("Creating database schema...\n");
192185
{
193186
nontransaction txn(conn);
194187
exec(txn, "drop table if exists t");
195188
exec(txn, "create table t(u int primary key, v int)");
196189
}
197-
printf("table t created\n");
198-
printf("inserting stuff into t\n");
190+
printf("Populating data...\n");
199191
{
200192
work txn(conn);
201193
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
202194
txn.commit();
203195
}
204-
printf("stuff inserted\n");
196+
printf("Initialization completed in %f seconds\n", (start - getCurrentTime())/100000.0);
205197
}
206198

207199
int main (int argc, char* argv[])

tests/perf.results

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,3 +117,24 @@ Bench started at Пн. февр. 15 17:26:11 MSK 2016
117117
astro5:{tps:96460.088384, transactions:1000000, selects:2000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:5000, hosts:3}
118118
Bench finished at Пн. февр. 15 17:26:22 MSK 2016
119119
Bench started at Пн. февр. 15 17:26:41 MSK 2016
120+
Bench started at Пн. февр. 15 17:58:14 MSK 2016
121+
astro5:{tps:93430.358394, transactions:1250000, selects:2500000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:5000, hosts:3}
122+
Bench finished at Пн. февр. 15 17:58:28 MSK 2016
123+
Bench started at Пн. февр. 15 17:59:11 MSK 2016
124+
astro5:{tps:81893.902409, transactions:1500000, selects:3000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:300, update_percent:0, accounts:500000, iterations:5000, hosts:3}
125+
Bench finished at Пн. февр. 15 17:59:29 MSK 2016
126+
Bench started at Пн. февр. 15 17:59:59 MSK 2016
127+
astro5:{tps:105707.597142, transactions:1000000, selects:2000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:5000, hosts:3}
128+
Bench finished at Пн. февр. 15 18:00:09 MSK 2016
129+
Bench started at Пн. февр. 15 18:00:54 MSK 2016
130+
astro5:{tps:92668.464039, transactions:1250000, selects:2500000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:5000, hosts:3}
131+
Bench finished at Пн. февр. 15 18:01:08 MSK 2016
132+
Bench started at Пн. февр. 15 18:06:22 MSK 2016
133+
astro5:{tps:121069.442298, transactions:125000000, selects:250000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:250, update_percent:0, accounts:500000, iterations:500000, hosts:3}
134+
Bench finished at Пн. февр. 15 18:23:35 MSK 2016
135+
Bench started at Пн. февр. 15 18:24:11 MSK 2016
136+
astro5:{tps:122202.228254, transactions:100000000, selects:200000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:500000, hosts:3}
137+
Bench finished at Пн. февр. 15 18:37:50 MSK 2016
138+
Bench started at Пн. февр. 15 18:44:02 MSK 2016
139+
astro5:{tps:121774.204222, transactions:100000000, selects:200000000, updates:0, aborts:0, abort_percent: 0, readers:0, writers:200, update_percent:0, accounts:500000, iterations:500000, hosts:3}
140+
Bench finished at Пн. февр. 15 18:57:44 MSK 2016

tests/perf.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
- name: run transfers
4747
shell: >
4848
~/pg_cluster/install/bin/dtmbench {{connections}}
49-
-w {{ nconns }} -r 0 -n 5000 -a 500000 -p {{ up }} |
49+
-w {{ nconns }} -r 0 -n 500000 -a 500000 -p {{ up }} |
5050
tee -a perf.results |
5151
sed "s/^/`hostname`:/"
5252
register: transfers_result

tests/reinit-mm.sh

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,15 @@ sep=""
1010
for ((i=1;i<=n_nodes;i++))
1111
do
1212
port=$((5431+i))
13-
conn_str="$conn_str${sep}dbname=postgres host=127.0.0.1 port=$port sslmode=disable"
13+
conn_str="$conn_str${sep}dbname=postgres host=localhost port=$port sslmode=disable"
1414
sep=","
1515
initdb node$i
1616
done
1717

18-
echo Start DTM
19-
~/postgres_cluster/contrib/arbiter/bin/arbiter -r 0.0.0.0:5431 -i 0 -d dtm 2> dtm.log &
20-
sleep 2
18+
#echo Start DTM
19+
#~/postgres_cluster/contrib/arbiter/bin/arbiter -r 0.0.0.0:5431 -i 0 -d dtm 2> dtm.log &
20+
#sleep 2
21+
echo "Starting nodes..."
2122

2223
echo Start nodes
2324
for ((i=1;i<=n_nodes;i++))
@@ -31,7 +32,13 @@ do
3132
done
3233

3334
sleep 5
34-
echo Initialize database schema
35-
psql postgres -f init.sql
35+
echo "Create multimaster extension..."
36+
37+
for ((i=1;i<=n_nodes;i++))
38+
do
39+
port=$((5431+i))
40+
psql postgres -p $port -c "create extension multimaster"
41+
done
42+
3643

3744
echo Done

0 commit comments

Comments
 (0)