10
10
use PostgreSQL::Test::Utils;
11
11
use Test::More;
12
12
13
- my ($stdin , $stdout , $stderr , $cascading_stdout , $cascading_stderr , $ret , $handle , $slot );
13
+ my ($stdin , $stdout , $stderr ,
14
+ $cascading_stdout , $cascading_stderr , $subscriber_stdin ,
15
+ $subscriber_stdout , $subscriber_stderr , $ret ,
16
+ $handle , $slot );
14
17
15
18
my $node_primary = PostgreSQL::Test::Cluster-> new(' primary' );
16
19
my $node_standby = PostgreSQL::Test::Cluster-> new(' standby' );
17
20
my $node_cascading_standby = PostgreSQL::Test::Cluster-> new(' cascading_standby' );
21
+ my $node_subscriber = PostgreSQL::Test::Cluster-> new(' subscriber' );
18
22
my $default_timeout = $PostgreSQL::Test::Utils::timeout_default ;
23
+ my $psql_timeout = IPC::Run::timer($default_timeout );
19
24
my $res ;
20
25
21
26
# Name for the physical slot on primary
@@ -267,7 +272,8 @@ sub check_for_invalidation
267
272
has_streaming => 1,
268
273
has_restoring => 1);
269
274
$node_standby -> append_conf(' postgresql.conf' ,
270
- qq[ primary_slot_name = '$primary_slotname '] );
275
+ qq[ primary_slot_name = '$primary_slotname '
276
+ max_replication_slots = 5] );
271
277
$node_standby -> start;
272
278
$node_primary -> wait_for_replay_catchup($node_standby );
273
279
$node_standby -> safe_psql(' testdb' , qq[ SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname ');] );
@@ -285,6 +291,26 @@ sub check_for_invalidation
285
291
$node_cascading_standby -> start;
286
292
$node_standby -> wait_for_replay_catchup($node_cascading_standby , $node_primary );
287
293
294
+ # ######################
295
+ # Initialize subscriber node
296
+ # ######################
297
+ $node_subscriber -> init(allows_streaming => ' logical' );
298
+ $node_subscriber -> start;
299
+
300
+ my %psql_subscriber = (
301
+ ' subscriber_stdin' => ' ' ,
302
+ ' subscriber_stdout' => ' ' ,
303
+ ' subscriber_stderr' => ' ' );
304
+ $psql_subscriber {run } = IPC::Run::start(
305
+ [ ' psql' , ' -XA' , ' -f' , ' -' , ' -d' , $node_subscriber -> connstr(' postgres' ) ],
306
+ ' <' ,
307
+ \$psql_subscriber {subscriber_stdin },
308
+ ' >' ,
309
+ \$psql_subscriber {subscriber_stdout },
310
+ ' 2>' ,
311
+ \$psql_subscriber {subscriber_stderr },
312
+ $psql_timeout );
313
+
288
314
# #################################################
289
315
# Test that logical decoding on the standby
290
316
# behaves correctly.
@@ -365,6 +391,67 @@ sub check_for_invalidation
365
391
3,
366
392
' replaying logical slot from another database fails' );
367
393
394
+ # #################################################
395
+ # Test that we can subscribe on the standby with the publication
396
+ # created on the primary.
397
+ # #################################################
398
+
399
+ # Create a table on the primary
400
+ $node_primary -> safe_psql(' postgres' ,
401
+ " CREATE TABLE tab_rep (a int primary key)" );
402
+
403
+ # Create a table (same structure) on the subscriber node
404
+ $node_subscriber -> safe_psql(' postgres' ,
405
+ " CREATE TABLE tab_rep (a int primary key)" );
406
+
407
+ # Create a publication on the primary
408
+ $node_primary -> safe_psql(' postgres' ,
409
+ " CREATE PUBLICATION tap_pub for table tab_rep" );
410
+
411
+ $node_primary -> wait_for_replay_catchup($node_standby );
412
+
413
+ # Subscribe on the standby
414
+ my $standby_connstr = $node_standby -> connstr . ' dbname=postgres' ;
415
+
416
+ # Not using safe_psql() here as it would wait for activity on the primary
417
+ # and we wouldn't be able to launch pg_log_standby_snapshot() on the primary
418
+ # while waiting.
419
+ # psql_subscriber() allows to not wait synchronously.
420
+ $psql_subscriber {subscriber_stdin } .=
421
+ qq[ CREATE SUBSCRIPTION tap_sub
422
+ CONNECTION '$standby_connstr '
423
+ PUBLICATION tap_pub
424
+ WITH (copy_data = off);] ;
425
+ $psql_subscriber {subscriber_stdin } .= " \n " ;
426
+
427
+ $psql_subscriber {run }-> pump_nb();
428
+
429
+ # Speed up the subscription creation
430
+ $node_primary -> safe_psql(' postgres' , " SELECT pg_log_standby_snapshot()" );
431
+
432
+ # Explicitly shut down psql instance gracefully - to avoid hangs
433
+ # or worse on windows
434
+ $psql_subscriber {subscriber_stdin } .= " \\ q\n " ;
435
+ $psql_subscriber {run }-> finish;
436
+
437
+ $node_subscriber -> wait_for_subscription_sync($node_standby , ' tap_sub' );
438
+
439
+ # Insert some rows on the primary
440
+ $node_primary -> safe_psql(' postgres' ,
441
+ qq[ INSERT INTO tab_rep select generate_series(1,10);] );
442
+
443
+ $node_primary -> wait_for_replay_catchup($node_standby );
444
+ $node_standby -> wait_for_catchup(' tap_sub' );
445
+
446
+ # Check that the subscriber can see the rows inserted in the primary
447
+ $result =
448
+ $node_subscriber -> safe_psql(' postgres' , " SELECT count(*) FROM tab_rep" );
449
+ is($result , qq( 10) , ' check replicated inserts after subscription on standby' );
450
+
451
+ # We do not need the subscription and the subscriber anymore
452
+ $node_subscriber -> safe_psql(' postgres' , " DROP SUBSCRIPTION tap_sub" );
453
+ $node_subscriber -> stop;
454
+
368
455
# #################################################
369
456
# Recovery conflict: Invalidate conflicting slots, including in-use slots
370
457
# Scenario 1: hot_standby_feedback off and vacuum FULL
0 commit comments