Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 40c5d18

Browse files
committed
multidb refactor: assorted fixes for ddl replication
1 parent 97b31fb commit 40c5d18

File tree

14 files changed

+170
-80
lines changed

14 files changed

+170
-80
lines changed

.gitignore

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
1-
tests/node*
21
tmp_check

Cluster.pm

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ sub new
7171

7272
my $node = new PostgresNode("node$i", $host, $pgport);
7373
$node->{id} = $i;
74+
$node->{dbname} = 'postgres';
7475
$node->{arbiter_port} = $arbiter_port;
75-
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
76+
$node->{mmconnstr} = "${ \$node->connstr($node->{dbname}) } arbiter_port=${ \$node->{arbiter_port} }";
7677
push(@$nodes, $node);
7778
}
7879

@@ -101,15 +102,30 @@ sub all_connstrs
101102
{
102103
my ($self) = @_;
103104
my $nodes = $self->{nodes};
104-
return join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
105+
return join(', ', map { "${ \$_->connstr($_->{dbname}) } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
105106
}
106107

107108

108109
sub configure
109110
{
110-
my ($self) = @_;
111+
my ($self, $dbname) = @_;
111112
my $nodes = $self->{nodes};
112113

114+
if (defined $dbname)
115+
{
116+
foreach my $node (@$nodes)
117+
{
118+
$node->{dbname} = $dbname;
119+
$node->append_conf("postgresql.conf", qq(
120+
listen_addresses = '127.0.0.1'
121+
unix_socket_directories = ''
122+
));
123+
$node->start();
124+
$node->safe_psql('postgres', "CREATE DATABASE $dbname");
125+
$node->stop();
126+
}
127+
}
128+
113129
my $connstr = $self->all_connstrs();
114130
$connstr =~ s/'//gms;
115131

@@ -135,17 +151,15 @@ sub configure
135151
max_replication_slots = 6
136152
shared_preload_libraries = 'multimaster'
137153
shared_buffers = 16MB
138-
wal_writer_delay = 2ms
139154
140-
multimaster.arbiter_port = $arbiter_port
141-
multimaster.workers = 1
142155
multimaster.node_id = $id
143156
multimaster.conn_strings = '$connstr'
144157
multimaster.heartbeat_recv_timeout = 8050
145158
multimaster.heartbeat_send_timeout = 250
146159
multimaster.max_nodes = 6
147-
multimaster.ignore_tables_without_pk = false
148-
multimaster.queue_size = 4194304
160+
# XXX try without ignore_tables_without_pk
161+
multimaster.ignore_tables_without_pk = true
162+
multimaster.volkswagen_mode = 1
149163
log_line_prefix = '%t [%p]: '
150164
));
151165

@@ -165,8 +179,8 @@ sub start
165179
foreach my $node (@$nodes)
166180
{
167181
$node->start();
168-
$node->safe_psql('postgres', "create extension multimaster;");
169-
note( "Starting node with connstr 'dbname=postgres port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
182+
$node->safe_psql($node->{dbname}, "create extension multimaster;");
183+
note( "Starting node with connstr 'port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
170184
}
171185
}
172186

@@ -304,7 +318,7 @@ sub pgbench_async()
304318
@args,
305319
-h => $self->{nodes}->[$node]->host(),
306320
-p => $self->{nodes}->[$node]->port(),
307-
'postgres',
321+
$node->{dbname},
308322
);
309323
note("running pgbench: " . join(" ", @pgbench_command));
310324
my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
@@ -329,7 +343,7 @@ sub is_data_identic()
329343
foreach my $i (@nodenums)
330344
{
331345
my $current_hash = '';
332-
$self->{nodes}->[$i]->psql('postgres', $sql, stdout => \$current_hash);
346+
$self->{nodes}->[$i]->psql($self->{nodes}->[$i]->{dbname}, $sql, stdout => \$current_hash);
333347
note("hash$i: $current_hash");
334348
if ($current_hash eq '')
335349
{
@@ -372,7 +386,7 @@ sub add_node()
372386
$node_id = scalar(@{$self->{nodes}}) + 1;
373387
$pgport = (allocate_ports('127.0.0.1', 1))[0];
374388
$arbiter_port = (allocate_ports('127.0.0.1', 1))[0];
375-
$connstrs = $self->all_connstrs() . ", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
389+
$connstrs = $self->all_connstrs() . ", dbname=${ \$self->{nodes}->[0]->{dbname} } host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
376390
}
377391

378392
$connstrs =~ s/'//gms;
@@ -389,7 +403,7 @@ sub add_node()
389403
$node->{port} = $pgport;
390404
$node->{host} = '127.0.0.1';
391405
$node->{arbiter_port} = $arbiter_port;
392-
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
406+
$node->{mmconnstr} = "${ \$node->connstr($node->{dbname}) } arbiter_port=${ \$node->{arbiter_port} }";
393407
$node->append_conf("postgresql.conf", qq(
394408
multimaster.arbiter_port = $arbiter_port
395409
multimaster.conn_strings = '$connstrs'
@@ -411,7 +425,7 @@ sub await_nodes()
411425

412426
foreach my $i (@nodenums)
413427
{
414-
if (!$self->{nodes}->[$i]->poll_query_until('postgres', "select 't'"))
428+
if (!$self->{nodes}->[$i]->poll_query_until($self->{nodes}->[$i]->{dbname}, "select 't'"))
415429
{
416430
# sleep(3600);
417431
die "Timed out while waiting for mm node to became online";

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ check: temp-install
3939
submake-regress:
4040
$(MAKE) -C $(top_builddir)/src/test/regress all
4141

42-
run: temp-install
42+
start: temp-install
4343
rm -rf '$(CURDIR)'/tmp_check
4444
$(MKDIR_P) '$(CURDIR)'/tmp_check
4545
cd $(srcdir) && TESTDIR='$(CURDIR)' \
@@ -65,4 +65,4 @@ regress: submake-regress
6565
--bindir='' \
6666
--use-existing \
6767
--schedule=serial_schedule \
68-
--dbname=postgres
68+
--dlpath=$(CURDIR)/$(top_builddir)/src/test/regress

run.pl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
{
1212
my $cluster = new Cluster($n_nodes);
1313
$cluster->init();
14-
$cluster->configure();
14+
$cluster->configure('regression');
1515
$cluster->start();
1616
$cluster->await_nodes( (0..$n_nodes-1) );
1717
}
1818
elsif ($action eq "--stop")
1919
{
2020
for my $i (1..$n_nodes)
2121
{
22-
TestLib::system_or_bail('pg_ctl',
22+
TestLib::system_log('pg_ctl',
2323
'-D', "$TestLib::tmp_check/t_run_node${i}_data/pgdata",
2424
'-m', 'fast',
2525
'stop');

sql/multimaster.sql

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,36 @@
1+
2+
-- check that implicit empty transactions works fine
3+
create table t (a int, b text);
4+
create or replace function f1() returns trigger as $$begin raise notice 'b: %', new.b; return NULL; end$$ language plpgsql;
5+
create trigger tr1 before insert on t for each row execute procedure f1();
6+
insert into t values (1, 'asdf');
7+
copy t from stdout;
8+
1 baz
9+
\.
10+
11+
12+
-- test mixed temp table and persistent write
13+
14+
create table t_tempddl_mix(id int primary key);
15+
insert into t_tempddl_mix values(1);
16+
begin;
17+
insert into t_tempddl_mix values(42);
18+
create temp table tempddl(id int);
19+
commit;
20+
table t;
21+
22+
\c "dbname=regression port=64717 host=127.0.0.1"
23+
table t;
24+
25+
26+
-- test CTA replication inside explain
27+
28+
EXPLAIN ANALYZE create table explain_cta as select 42;
29+
30+
31+
32+
--- xx?
33+
134
create user user1;
235
create schema user1;
336
alter schema user1 owner to user1;

src/commit.c

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ static void MtmPrePrepareTransaction(MtmCurrentTrans* x);
3434

3535
static bool GatherPrepares(MtmCurrentTrans* x, nodemask_t participantsMask,
3636
int *failed_at);
37-
static void GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask);
37+
static void GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask, MtmMessageCode code);
3838

3939

4040
void
@@ -51,8 +51,12 @@ MtmXactCallback2(XactEvent event, void *arg)
5151
MtmPrePrepareTransaction(&MtmTx);
5252
break;
5353
case XACT_EVENT_COMMIT_COMMAND:
54-
if (!MtmTx.isTransactionBlock && !IsSubTransaction())
54+
if (IsTransactionOrTransactionBlock()
55+
&& !IsTransactionBlock()
56+
&& !IsSubTransaction())
57+
{
5558
MtmTwoPhaseCommit(&MtmTx);
59+
}
5660
break;
5761
default:
5862
break;
@@ -66,7 +70,6 @@ MtmBeginTransaction(MtmCurrentTrans* x)
6670
// XXX: move it down the callbacks?
6771
x->isDistributed = MtmIsUserTransaction();
6872
x->containsDML = false; // will be set by executor hook
69-
x->isTransactionBlock = IsTransactionBlock();
7073

7174
MtmDDLResetStatement();
7275

@@ -132,32 +135,33 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
132135
DmqSubscribed = true;
133136
}
134137

135-
if (!x->isTransactionBlock)
138+
if (!IsTransactionBlock())
136139
{
137140
BeginTransactionBlock();
138-
x->isTransactionBlock = true;
139141
CommitTransactionCommand();
140142
StartTransactionCommand();
141143
}
142144

143-
xid = GetCurrentTransactionId();
145+
xid = GetTopTransactionId();
146+
MtmGenerateGid(gid, xid);
144147
sprintf(stream, "xid" XID_FMT, xid);
145148
dmq_stream_subscribe(stream);
146149
mtm_log(MtmTxTrace, "%s subscribed for %s", gid, stream);
150+
147151
x->xid = xid;
148152

149-
MtmGenerateGid(gid, xid);
150153
participantsMask = (((nodemask_t)1 << Mtm->nAllNodes) - 1) &
151154
~Mtm->disabledNodeMask &
152155
~((nodemask_t)1 << (MtmNodeId-1));
153156

154157
ret = PrepareTransactionBlock(gid);
155158
if (!ret)
156159
{
157-
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
158-
mtm_log(WARNING, "Failed to prepare transaction %s", gid);
159-
return false;
160+
if (!MtmVolksWagenMode)
161+
mtm_log(WARNING, "Failed to prepare transaction %s", gid);
162+
return true;
160163
}
164+
mtm_log(MtmTxFinish, "TXFINISH: %s prepared", gid);
161165
CommitTransactionCommand();
162166

163167
ret = GatherPrepares(x, participantsMask, &failed_at);
@@ -172,11 +176,12 @@ MtmTwoPhaseCommit(MtmCurrentTrans* x)
172176

173177
SetPreparedTransactionState(gid, MULTIMASTER_PRECOMMITTED);
174178
mtm_log(MtmTxFinish, "TXFINISH: %s precommitted", gid);
175-
GatherPrecommits(x, participantsMask);
179+
GatherPrecommits(x, participantsMask, MSG_PRECOMMITTED);
176180

177181
StartTransactionCommand();
178182
FinishPreparedTransaction(gid, true, false);
179183
mtm_log(MtmTxFinish, "TXFINISH: %s committed", gid);
184+
GatherPrecommits(x, participantsMask, MSG_COMMITTED);
180185

181186
dmq_stream_unsubscribe(stream);
182187
mtm_log(MtmTxTrace, "%s unsubscribed for %s", gid, stream);
@@ -256,7 +261,7 @@ GatherPrepares(MtmCurrentTrans* x, nodemask_t participantsMask, int *failed_at)
256261
}
257262

258263
static void
259-
GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask)
264+
GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask, MtmMessageCode code)
260265
{
261266
Assert(participantsMask != 0);
262267

@@ -274,7 +279,7 @@ GatherPrecommits(MtmCurrentTrans* x, nodemask_t participantsMask)
274279
msg = (MtmArbiterMessage *) buffer.data;
275280

276281
Assert(msg->node == sender_to_node[sender_id]);
277-
Assert(msg->code == MSG_PRECOMMITTED);
282+
Assert(msg->code == code);
278283
Assert(msg->dxid == x->xid);
279284
Assert(BIT_CHECK(participantsMask, sender_to_node[sender_id] - 1));
280285

0 commit comments

Comments
 (0)