@@ -3,12 +3,35 @@ package Cluster;
3
3
use strict;
4
4
use warnings;
5
5
6
- use Proc::ProcessTable;
7
6
use PostgresNode;
8
7
use TestLib;
9
8
use Test::More;
10
9
use Cwd;
11
10
11
+ use Socket;
12
+
13
+ use IPC::Run;
14
+
15
+ sub check_port
16
+ {
17
+ my ($host , $port ) = @_ ;
18
+ my $iaddr = inet_aton($host );
19
+ my $paddr = sockaddr_in($port , $iaddr );
20
+ my $proto = getprotobyname (" tcp" );
21
+ my $available = 0;
22
+
23
+ socket (SOCK, PF_INET, SOCK_STREAM, $proto )
24
+ or die " socket failed: $! " ;
25
+
26
+ if (bind (SOCK, $paddr ) && listen (SOCK, SOMAXCONN))
27
+ {
28
+ $available = 1;
29
+ }
30
+
31
+ close (SOCK);
32
+ return $available ;
33
+ }
34
+
12
35
my %allocated_ports = ();
13
36
sub allocate_ports
14
37
{
@@ -19,8 +42,7 @@ sub allocate_ports
19
42
{
20
43
my $port = int (rand () * 16384) + 49152;
21
44
next if $allocated_ports {$port };
22
- diag(" checking for port $port \n " );
23
- if (!TestLib::run_log([' pg_isready' , ' -h' , $host , ' -p' , $port ]))
45
+ if (check_port($host , $port ))
24
46
{
25
47
$allocated_ports {$port } = 1;
26
48
push (@allocated_now , $port );
@@ -44,6 +66,7 @@ sub new
44
66
my $node = new PostgresNode(" node$i " , $host , $pgport );
45
67
$node -> {id } = $i ;
46
68
$node -> {arbiter_port } = $arbiter_port ;
69
+ $node -> {mmconnstr } = " ${ \$ node->connstr('postgres') } arbiter_port=${ \$ node->{arbiter_port} }" ;
47
70
push (@$nodes , $node );
48
71
}
49
72
@@ -67,48 +90,54 @@ sub init
67
90
}
68
91
}
69
92
93
+ sub all_connstrs
94
+ {
95
+ my ($self ) = @_ ;
96
+ my $nodes = $self -> {nodes };
97
+ return join (' , ' , map { " ${ \$ _->connstr('postgres') } arbiter_port=${ \$ _->{arbiter_port} }" } @$nodes );
98
+ }
99
+
100
+
70
101
sub configure
71
102
{
72
103
my ($self ) = @_ ;
73
104
my $nodes = $self -> {nodes };
74
- my $nnodes = scalar @{ $nodes };
75
105
76
- my $connstr = join ( ' , ' , map { " ${ \$ _->connstr('postgres') } arbiter_port=${ \$ _->{arbiter_port} } " } @$nodes );
106
+ my $connstr = $self -> all_connstrs( );
77
107
78
108
foreach my $node (@$nodes )
79
109
{
80
110
my $id = $node -> {id };
81
111
my $host = $node -> host;
82
112
my $pgport = $node -> port;
83
113
my $arbiter_port = $node -> {arbiter_port };
114
+ my $unix_sock_dir = $ENV {PGHOST };
84
115
85
116
$node -> append_conf(" postgresql.conf" , qq(
86
117
log_statement = none
87
118
listen_addresses = '$host '
88
- unix_socket_directories = ''
119
+ unix_socket_directories = '$unix_sock_dir '
89
120
port = $pgport
90
- max_prepared_transactions = 200
91
- max_connections = 200
121
+ max_prepared_transactions = 10
122
+ max_connections = 10
92
123
max_worker_processes = 100
93
124
wal_level = logical
94
- fsync = off
95
- max_wal_senders = 10
125
+ max_wal_senders = 6
96
126
wal_sender_timeout = 0
97
127
default_transaction_isolation = 'repeatable read'
98
- max_replication_slots = 10
128
+ max_replication_slots = 6
99
129
shared_preload_libraries = 'multimaster'
130
+ shared_buffers = 16MB
100
131
101
132
multimaster.arbiter_port = $arbiter_port
102
- multimaster.workers = 10
103
- multimaster.queue_size = 10485760 # 10mb
133
+ multimaster.workers = 1
104
134
multimaster.node_id = $id
105
135
multimaster.conn_strings = '$connstr '
106
- multimaster.heartbeat_recv_timeout = 1000
136
+ multimaster.heartbeat_recv_timeout = 1050
107
137
multimaster.heartbeat_send_timeout = 250
108
- multimaster.max_nodes = $nnodes
109
- multimaster.ignore_tables_without_pk = true
110
- multimaster.twopc_min_timeout = 50000
111
- multimaster.min_2pc_timeout = 50000
138
+ multimaster.max_nodes = 6
139
+ multimaster.ignore_tables_without_pk = false
140
+ multimaster.queue_size = 4194304
112
141
log_line_prefix = '%t : '
113
142
) );
114
143
@@ -128,6 +157,7 @@ sub start
128
157
foreach my $node (@$nodes )
129
158
{
130
159
$node -> start();
160
+ note( " Starting node with connstr 'dbname=postgres port=@{[ $node ->port() ]} host=@{[ $node ->host() ]}'" );
131
161
}
132
162
}
133
163
@@ -137,7 +167,7 @@ sub stopnode
137
167
return 1 unless defined $node -> {_pid };
138
168
$mode = ' fast' unless defined $mode ;
139
169
my $name = $node -> name;
140
- diag (" stopping $name ${mode} ly" );
170
+ note (" stopping $name ${mode} ly" );
141
171
142
172
if ($mode eq ' kill' ) {
143
173
killtree($node -> {_pid });
@@ -147,13 +177,13 @@ sub stopnode
147
177
my $pgdata = $node -> data_dir;
148
178
my $ret = TestLib::system_log(' pg_ctl' , ' -D' , $pgdata , ' -m' , ' fast' , ' stop' );
149
179
my $pidfile = $node -> data_dir . " /postmaster.pid" ;
150
- diag (" unlink $pidfile " );
180
+ note (" unlink $pidfile " );
151
181
unlink $pidfile ;
152
182
$node -> {_pid } = undef ;
153
183
$node -> _update_pid;
154
184
155
185
if ($ret != 0) {
156
- diag (" $name failed to stop ${mode} ly" );
186
+ note (" $name failed to stop ${mode} ly" );
157
187
return 0;
158
188
}
159
189
@@ -166,43 +196,22 @@ sub stopid
166
196
return stopnode($self -> {nodes }-> [$idx ]);
167
197
}
168
198
169
- sub killtree
199
+ sub dumplogs
170
200
{
171
- my $root = shift ;
172
- diag(" killtree $root \n " );
173
-
174
- my $t = new Proc::ProcessTable;
175
-
176
- my %parent = ();
177
- # my %cmd = ();
178
- foreach my $p (@{$t -> table}) {
179
- $parent {$p -> pid} = $p -> ppid;
180
- # $cmd{$p->pid} = $p->cmndline;
181
- }
201
+ my ($self ) = @_ ;
202
+ my $nodes = $self -> {nodes };
182
203
183
- if (!defined $root ) {
184
- return ;
185
- }
186
- my @queue = ($root );
187
- my @killist = ();
188
-
189
- while (scalar @queue ) {
190
- my $victim = shift @queue ;
191
- while (my ($pid , $ppid ) = each %parent ) {
192
- if ($ppid == $victim ) {
193
- push @queue , $pid ;
194
- }
195
- }
196
- diag(" SIGSTOP to $victim " );
197
- kill ' STOP' , $victim ;
198
- unshift @killist , $victim ;
204
+ note(" Dumping logs:" );
205
+ foreach my $node (@$nodes ) {
206
+ note(" ##################################################################" );
207
+ note($node -> {_logfile });
208
+ note(" ##################################################################" );
209
+ my $filename = $node -> {_logfile };
210
+ open my $fh , ' <' , $filename or die " error opening $filename : $! " ;
211
+ my $data = do { local $/ ; <$fh > };
212
+ note($data );
213
+ note(" ##################################################################\n\n " );
199
214
}
200
-
201
- diag(" SIGKILL to " . join (' ' , @killist ));
202
- kill ' KILL' , @killist ;
203
- # foreach my $victim (@killist) {
204
- # print("kill $victim " . $cmd{$victim} . "\n");
205
- # }
206
215
}
207
216
208
217
sub stop
@@ -211,34 +220,32 @@ sub stop
211
220
my $nodes = $self -> {nodes };
212
221
$mode = ' fast' unless defined $mode ;
213
222
214
- diag(" Dumping logs:" );
215
- foreach my $node (@$nodes ) {
216
- diag(" ##################################################################" );
217
- diag($node -> {_logfile });
218
- diag(" ##################################################################" );
219
- my $filename = $node -> {_logfile };
220
- open my $fh , ' <' , $filename or die " error opening $filename : $! " ;
221
- my $data = do { local $/ ; <$fh > };
222
- diag($data );
223
- diag(" ##################################################################\n\n " );
224
- }
225
-
226
223
my $ok = 1;
227
- diag (" stopping cluster ${mode} ly" );
224
+ note (" stopping cluster ${mode} ly" );
228
225
229
226
foreach my $node (@$nodes ) {
230
227
if (!stopnode($node , $mode )) {
231
228
$ok = 0;
232
- if (!stopnode($node , ' kill' )) {
233
- my $name = $node -> name;
234
- BAIL_OUT(" failed to kill $name " );
235
- }
229
+ # if (!stopnode($node, 'kill')) {
230
+ # my $name = $node->name;
231
+ # BAIL_OUT("failed to kill $name");
232
+ # }
236
233
}
237
234
}
238
235
sleep (2);
236
+
237
+ $self -> dumplogs();
238
+
239
239
return $ok ;
240
240
}
241
241
242
+ sub bail_out_with_logs
243
+ {
244
+ my ($self , $msg ) = @_ ;
245
+ $self -> dumplogs();
246
+ BAIL_OUT($msg );
247
+ }
248
+
242
249
sub teardown
243
250
{
244
251
my ($self ) = @_ ;
@@ -269,10 +276,127 @@ sub poll
269
276
return 1;
270
277
}
271
278
my $tries_left = $tries - $i - 1;
272
- diag (" $poller poll for $pollee failed [$tries_left tries left]" );
279
+ note (" $poller poll for $pollee failed [$tries_left tries left]" );
273
280
sleep ($delay );
274
281
}
275
282
return 0;
276
283
}
277
284
285
+ sub pgbench ()
286
+ {
287
+ my ($self , $node , @args ) = @_ ;
288
+ my $pgbench_handle = $self -> pgbench_async($node , @args );
289
+ $self -> pgbench_await($pgbench_handle );
290
+ }
291
+
292
+ sub pgbench_async ()
293
+ {
294
+ my ($self , $node , @args ) = @_ ;
295
+
296
+ my ($in , $out , $err , $rc );
297
+ $in = ' ' ;
298
+ $out = ' ' ;
299
+
300
+ my @pgbench_command = (
301
+ ' pgbench' ,
302
+ @args ,
303
+ -h => $self -> {nodes }-> [$node ]-> host(),
304
+ -p => $self -> {nodes }-> [$node ]-> port(),
305
+ ' postgres' ,
306
+ );
307
+ note(" running pgbench: " . join (" " , @pgbench_command ));
308
+ my $handle = IPC::Run::start(\@pgbench_command , $in , $out );
309
+ return $handle ;
310
+ }
311
+
312
+ sub pgbench_await ()
313
+ {
314
+ my ($self , $pgbench_handle ) = @_ ;
315
+ IPC::Run::finish($pgbench_handle ) || BAIL_OUT(" pgbench exited with $? " );
316
+ }
317
+
318
+ sub is_data_identic ()
319
+ {
320
+ my ($self , @nodenums ) = @_ ;
321
+ my $checksum = ' ' ;
322
+
323
+ my $sql = " select md5('(' || string_agg(aid::text || ', ' || abalance::text , '),(') || ')')
324
+ from (select * from pgbench_accounts order by aid) t;" ;
325
+
326
+ foreach my $i (@nodenums )
327
+ {
328
+ my $current_hash = ' ' ;
329
+ $self -> {nodes }-> [$i ]-> psql(' postgres' , $sql , stdout => \$current_hash );
330
+ if ($current_hash eq ' ' )
331
+ {
332
+ note(" got empty hash from node $i " );
333
+ return 0;
334
+ }
335
+ if ($checksum eq ' ' )
336
+ {
337
+ $checksum = $current_hash ;
338
+ }
339
+ elsif ($checksum ne $current_hash )
340
+ {
341
+ note(" got different hashes: $checksum ang $current_hash " );
342
+ return 0;
343
+ }
344
+ }
345
+
346
+ note($checksum );
347
+ return 1;
348
+ }
349
+
350
+ sub add_node ()
351
+ {
352
+ my ($self , %params ) = @_ ;
353
+
354
+ my $pgport ;
355
+ my $arbiter_port ;
356
+ my $connstrs ;
357
+ my $node_id ;
358
+
359
+ if (defined $params {node_id })
360
+ {
361
+ $node_id = $params {node_id };
362
+ $pgport = $params {port };
363
+ $arbiter_port = $params {arbiter_port };
364
+ $connstrs = $self -> all_connstrs();
365
+ }
366
+ else
367
+ {
368
+ $node_id = scalar (@{$self -> {nodes }}) + 1;
369
+ $pgport = (allocate_ports(' 127.0.0.1' , 1))[0];
370
+ $arbiter_port = (allocate_ports(' 127.0.0.1' , 1))[0];
371
+ $connstrs = $self -> all_connstrs() . " , dbname=postgres host=127.0.0.1 port=$pgport arbiter_port=$arbiter_port " ;
372
+ }
373
+
374
+ my $node = PostgresNode-> get_new_node(" node${node_id} x" );
375
+
376
+ $self -> {nodes }-> [0]-> backup(" backup_for_$node_id " );
377
+ # do init from backup before setting host, since init_from_backup() checks
378
+ # it default value
379
+ $node -> init_from_backup($self -> {nodes }-> [0], " backup_for_$node_id " );
380
+
381
+ $node -> {_host } = ' 127.0.0.1' ;
382
+ $node -> {_port } = $pgport ;
383
+ $node -> {port } = $pgport ;
384
+ $node -> {host } = ' 127.0.0.1' ;
385
+ $node -> {arbiter_port } = $arbiter_port ;
386
+ $node -> {mmconnstr } = " ${ \$ node->connstr('postgres') } arbiter_port=${ \$ node->{arbiter_port} }" ;
387
+ $node -> append_conf(" postgresql.conf" , qq(
388
+ multimaster.arbiter_port = $arbiter_port
389
+ multimaster.conn_strings = '$connstrs '
390
+ multimaster.node_id = $node_id
391
+ port = $pgport
392
+ ) );
393
+ $node -> append_conf(" pg_hba.conf" , qq(
394
+ local replication all trust
395
+ host replication all 127.0.0.1/32 trust
396
+ host replication all ::1/128 trust
397
+ ) );
398
+
399
+ push (@{$self -> {nodes }}, $node );
400
+ }
401
+
278
402
1;
0 commit comments