Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit e987d00

Browse files
committed
merge
2 parents 29ef2db + 63381e2 commit e987d00

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+6920
-3032
lines changed

contrib/mmts/Cluster.pm

Lines changed: 197 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,35 @@ package Cluster;
33
use strict;
44
use warnings;
55

6-
use Proc::ProcessTable;
76
use PostgresNode;
87
use TestLib;
98
use Test::More;
109
use Cwd;
1110

11+
use Socket;
12+
13+
use IPC::Run;
14+
15+
sub check_port
16+
{
17+
my ($host, $port) = @_;
18+
my $iaddr = inet_aton($host);
19+
my $paddr = sockaddr_in($port, $iaddr);
20+
my $proto = getprotobyname("tcp");
21+
my $available = 0;
22+
23+
socket(SOCK, PF_INET, SOCK_STREAM, $proto)
24+
or die "socket failed: $!";
25+
26+
if (bind(SOCK, $paddr) && listen(SOCK, SOMAXCONN))
27+
{
28+
$available = 1;
29+
}
30+
31+
close(SOCK);
32+
return $available;
33+
}
34+
1235
my %allocated_ports = ();
1336
sub allocate_ports
1437
{
@@ -19,8 +42,7 @@ sub allocate_ports
1942
{
2043
my $port = int(rand() * 16384) + 49152;
2144
next if $allocated_ports{$port};
22-
diag("checking for port $port\n");
23-
if (!TestLib::run_log(['pg_isready', '-h', $host, '-p', $port]))
45+
if (check_port($host, $port))
2446
{
2547
$allocated_ports{$port} = 1;
2648
push(@allocated_now, $port);
@@ -44,6 +66,7 @@ sub new
4466
my $node = new PostgresNode("node$i", $host, $pgport);
4567
$node->{id} = $i;
4668
$node->{arbiter_port} = $arbiter_port;
69+
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
4770
push(@$nodes, $node);
4871
}
4972

@@ -67,48 +90,54 @@ sub init
6790
}
6891
}
6992

93+
sub all_connstrs
94+
{
95+
my ($self) = @_;
96+
my $nodes = $self->{nodes};
97+
return join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
98+
}
99+
100+
70101
sub configure
71102
{
72103
my ($self) = @_;
73104
my $nodes = $self->{nodes};
74-
my $nnodes = scalar @{ $nodes };
75105

76-
my $connstr = join(', ', map { "${ \$_->connstr('postgres') } arbiter_port=${ \$_->{arbiter_port} }" } @$nodes);
106+
my $connstr = $self->all_connstrs();
77107

78108
foreach my $node (@$nodes)
79109
{
80110
my $id = $node->{id};
81111
my $host = $node->host;
82112
my $pgport = $node->port;
83113
my $arbiter_port = $node->{arbiter_port};
114+
my $unix_sock_dir = $ENV{PGHOST};
84115

85116
$node->append_conf("postgresql.conf", qq(
86117
log_statement = none
87118
listen_addresses = '$host'
88-
unix_socket_directories = ''
119+
unix_socket_directories = '$unix_sock_dir'
89120
port = $pgport
90-
max_prepared_transactions = 200
91-
max_connections = 200
121+
max_prepared_transactions = 10
122+
max_connections = 10
92123
max_worker_processes = 100
93124
wal_level = logical
94-
fsync = off
95-
max_wal_senders = 10
125+
max_wal_senders = 6
96126
wal_sender_timeout = 0
97127
default_transaction_isolation = 'repeatable read'
98-
max_replication_slots = 10
128+
max_replication_slots = 6
99129
shared_preload_libraries = 'multimaster'
130+
shared_buffers = 16MB
100131
101132
multimaster.arbiter_port = $arbiter_port
102-
multimaster.workers = 10
103-
multimaster.queue_size = 10485760 # 10mb
133+
multimaster.workers = 1
104134
multimaster.node_id = $id
105135
multimaster.conn_strings = '$connstr'
106-
multimaster.heartbeat_recv_timeout = 1000
136+
multimaster.heartbeat_recv_timeout = 1050
107137
multimaster.heartbeat_send_timeout = 250
108-
multimaster.max_nodes = $nnodes
109-
multimaster.ignore_tables_without_pk = true
110-
multimaster.twopc_min_timeout = 50000
111-
multimaster.min_2pc_timeout = 50000
138+
multimaster.max_nodes = 6
139+
multimaster.ignore_tables_without_pk = false
140+
multimaster.queue_size = 4194304
112141
log_line_prefix = '%t: '
113142
));
114143

@@ -128,6 +157,7 @@ sub start
128157
foreach my $node (@$nodes)
129158
{
130159
$node->start();
160+
note( "Starting node with connstr 'dbname=postgres port=@{[ $node->port() ]} host=@{[ $node->host() ]}'");
131161
}
132162
}
133163

@@ -137,7 +167,7 @@ sub stopnode
137167
return 1 unless defined $node->{_pid};
138168
$mode = 'fast' unless defined $mode;
139169
my $name = $node->name;
140-
diag("stopping $name ${mode}ly");
170+
note("stopping $name ${mode}ly");
141171

142172
if ($mode eq 'kill') {
143173
killtree($node->{_pid});
@@ -147,13 +177,13 @@ sub stopnode
147177
my $pgdata = $node->data_dir;
148178
my $ret = TestLib::system_log('pg_ctl', '-D', $pgdata, '-m', 'fast', 'stop');
149179
my $pidfile = $node->data_dir . "/postmaster.pid";
150-
diag("unlink $pidfile");
180+
note("unlink $pidfile");
151181
unlink $pidfile;
152182
$node->{_pid} = undef;
153183
$node->_update_pid;
154184

155185
if ($ret != 0) {
156-
diag("$name failed to stop ${mode}ly");
186+
note("$name failed to stop ${mode}ly");
157187
return 0;
158188
}
159189

@@ -166,43 +196,22 @@ sub stopid
166196
return stopnode($self->{nodes}->[$idx]);
167197
}
168198

169-
sub killtree
199+
sub dumplogs
170200
{
171-
my $root = shift;
172-
diag("killtree $root\n");
173-
174-
my $t = new Proc::ProcessTable;
175-
176-
my %parent = ();
177-
#my %cmd = ();
178-
foreach my $p (@{$t->table}) {
179-
$parent{$p->pid} = $p->ppid;
180-
# $cmd{$p->pid} = $p->cmndline;
181-
}
201+
my ($self) = @_;
202+
my $nodes = $self->{nodes};
182203

183-
if (!defined $root) {
184-
return;
185-
}
186-
my @queue = ($root);
187-
my @killist = ();
188-
189-
while (scalar @queue) {
190-
my $victim = shift @queue;
191-
while (my ($pid, $ppid) = each %parent) {
192-
if ($ppid == $victim) {
193-
push @queue, $pid;
194-
}
195-
}
196-
diag("SIGSTOP to $victim");
197-
kill 'STOP', $victim;
198-
unshift @killist, $victim;
204+
note("Dumping logs:");
205+
foreach my $node (@$nodes) {
206+
note("##################################################################");
207+
note($node->{_logfile});
208+
note("##################################################################");
209+
my $filename = $node->{_logfile};
210+
open my $fh, '<', $filename or die "error opening $filename: $!";
211+
my $data = do { local $/; <$fh> };
212+
note($data);
213+
note("##################################################################\n\n");
199214
}
200-
201-
diag("SIGKILL to " . join(' ', @killist));
202-
kill 'KILL', @killist;
203-
#foreach my $victim (@killist) {
204-
# print("kill $victim " . $cmd{$victim} . "\n");
205-
#}
206215
}
207216

208217
sub stop
@@ -211,34 +220,32 @@ sub stop
211220
my $nodes = $self->{nodes};
212221
$mode = 'fast' unless defined $mode;
213222

214-
diag("Dumping logs:");
215-
foreach my $node (@$nodes) {
216-
diag("##################################################################");
217-
diag($node->{_logfile});
218-
diag("##################################################################");
219-
my $filename = $node->{_logfile};
220-
open my $fh, '<', $filename or die "error opening $filename: $!";
221-
my $data = do { local $/; <$fh> };
222-
diag($data);
223-
diag("##################################################################\n\n");
224-
}
225-
226223
my $ok = 1;
227-
diag("stopping cluster ${mode}ly");
224+
note("stopping cluster ${mode}ly");
228225

229226
foreach my $node (@$nodes) {
230227
if (!stopnode($node, $mode)) {
231228
$ok = 0;
232-
if (!stopnode($node, 'kill')) {
233-
my $name = $node->name;
234-
BAIL_OUT("failed to kill $name");
235-
}
229+
# if (!stopnode($node, 'kill')) {
230+
# my $name = $node->name;
231+
# BAIL_OUT("failed to kill $name");
232+
# }
236233
}
237234
}
238235
sleep(2);
236+
237+
$self->dumplogs();
238+
239239
return $ok;
240240
}
241241

242+
sub bail_out_with_logs
243+
{
244+
my ($self, $msg) = @_;
245+
$self->dumplogs();
246+
BAIL_OUT($msg);
247+
}
248+
242249
sub teardown
243250
{
244251
my ($self) = @_;
@@ -269,10 +276,127 @@ sub poll
269276
return 1;
270277
}
271278
my $tries_left = $tries - $i - 1;
272-
diag("$poller poll for $pollee failed [$tries_left tries left]");
279+
note("$poller poll for $pollee failed [$tries_left tries left]");
273280
sleep($delay);
274281
}
275282
return 0;
276283
}
277284

285+
sub pgbench()
286+
{
287+
my ($self, $node, @args) = @_;
288+
my $pgbench_handle = $self->pgbench_async($node, @args);
289+
$self->pgbench_await($pgbench_handle);
290+
}
291+
292+
sub pgbench_async()
293+
{
294+
my ($self, $node, @args) = @_;
295+
296+
my ($in, $out, $err, $rc);
297+
$in = '';
298+
$out = '';
299+
300+
my @pgbench_command = (
301+
'pgbench',
302+
@args,
303+
-h => $self->{nodes}->[$node]->host(),
304+
-p => $self->{nodes}->[$node]->port(),
305+
'postgres',
306+
);
307+
note("running pgbench: " . join(" ", @pgbench_command));
308+
my $handle = IPC::Run::start(\@pgbench_command, $in, $out);
309+
return $handle;
310+
}
311+
312+
sub pgbench_await()
313+
{
314+
my ($self, $pgbench_handle) = @_;
315+
IPC::Run::finish($pgbench_handle) || BAIL_OUT("pgbench exited with $?");
316+
}
317+
318+
sub is_data_identic()
319+
{
320+
my ($self, @nodenums) = @_;
321+
my $checksum = '';
322+
323+
my $sql = "select md5('(' || string_agg(aid::text || ', ' || abalance::text , '),(') || ')')
324+
from (select * from pgbench_accounts order by aid) t;";
325+
326+
foreach my $i (@nodenums)
327+
{
328+
my $current_hash = '';
329+
$self->{nodes}->[$i]->psql('postgres', $sql, stdout => \$current_hash);
330+
if ($current_hash eq '')
331+
{
332+
note("got empty hash from node $i");
333+
return 0;
334+
}
335+
if ($checksum eq '')
336+
{
337+
$checksum = $current_hash;
338+
}
339+
elsif ($checksum ne $current_hash)
340+
{
341+
note("got different hashes: $checksum ang $current_hash");
342+
return 0;
343+
}
344+
}
345+
346+
note($checksum);
347+
return 1;
348+
}
349+
350+
sub add_node()
351+
{
352+
my ($self, %params) = @_;
353+
354+
my $pgport;
355+
my $arbiter_port;
356+
my $connstrs;
357+
my $node_id;
358+
359+
if (defined $params{node_id})
360+
{
361+
$node_id = $params{node_id};
362+
$pgport = $params{port};
363+
$arbiter_port = $params{arbiter_port};
364+
$connstrs = $self->all_connstrs();
365+
}
366+
else
367+
{
368+
$node_id = scalar(@{$self->{nodes}}) + 1;
369+
$pgport = (allocate_ports('127.0.0.1', 1))[0];
370+
$arbiter_port = (allocate_ports('127.0.0.1', 1))[0];
371+
$connstrs = $self->all_connstrs() . ", dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port";
372+
}
373+
374+
my $node = PostgresNode->get_new_node("node${node_id}x");
375+
376+
$self->{nodes}->[0]->backup("backup_for_$node_id");
377+
# do init from backup before setting host, since init_from_backup() checks
378+
# it default value
379+
$node->init_from_backup($self->{nodes}->[0], "backup_for_$node_id");
380+
381+
$node->{_host} = '127.0.0.1';
382+
$node->{_port} = $pgport;
383+
$node->{port} = $pgport;
384+
$node->{host} = '127.0.0.1';
385+
$node->{arbiter_port} = $arbiter_port;
386+
$node->{mmconnstr} = "${ \$node->connstr('postgres') } arbiter_port=${ \$node->{arbiter_port} }";
387+
$node->append_conf("postgresql.conf", qq(
388+
multimaster.arbiter_port = $arbiter_port
389+
multimaster.conn_strings = '$connstrs'
390+
multimaster.node_id = $node_id
391+
port = $pgport
392+
));
393+
$node->append_conf("pg_hba.conf", qq(
394+
local replication all trust
395+
host replication all 127.0.0.1/32 trust
396+
host replication all ::1/128 trust
397+
));
398+
399+
push(@{$self->{nodes}}, $node);
400+
}
401+
278402
1;

0 commit comments

Comments
 (0)