Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 224cba5

Browse files
committed
[PGPRO-2648] Rework syncpoints to include all origins in receiver reports.
Previously receiver reports reflected only sender changes apply progress (last syncpoint). Such report immediately advances the slot on sender, so changes of other nodes were kept by special separate 'recovery' slots. However, this is incorrect as once advanced slot never can stream from earlier position if WAL is still there. To this end, machinery for informing node C how node B applies node A's changes was overhauled. Previously A wrote min across confirmed_flush reported to it into each syncpoint ('trim_lsn'), which e.g. defined how many A's changes C still need to keep for C. B still had no idea how A's LSNs map into C's, so during recovery start it queried C directly to perform the mapping. New implementation makes syncpoints table global and BDR-like: its changes are broadcast to all nodes without 3PC, which allows each node to do arbitrary LSN mappings; receivers learn which value they should report via (quite involved) SELECT over this table. Each node still keeps n-1 additional physical slots. They ensure applier has enough WAL to filter out already applied xacts. In fact, these slots are redundant with 3+ nodes because e.g. take C's changes on A. A always keeps changes since some C's syncpoint to recover B, and so inevitably it keeps them since *last* C's syncpoint on A. Major piece of infrastructure for this BDR-like table mode which might fire in other places is that plain COMMITs are now streamed. So e.g. if xact modified only local tables, empty begin-commit will be streamed (like in vanilla LR where table is not published), while previously nothing was sent. In particular, this forced to move 'init_done' flag into separate local table and make mtm.config table local. Also special 'B' logical/protocol message was invented to say to receiver 'this is plain COMMIT, you have no right to fail to apply the xact and go on as with usual PREPAREs'. Important thing left undone is nonblocking syncpoint implementation. Here syncpoint is still a hard barrier separating 'all origin_lsn <= sp lie to the left of the syncpoint, all origin_lsn > sp lie to the right of the syncpoint.', so on syncpoint receiver stops until all parallel workers finish their job. I've put some stubs to multimaster--1.0.sql for nonblocking implementation, but due to time constraints it is not finished. Another unpleasant thing is that any non-tx logical message including syncpoints is not forwarded further if it was received from non-origin. With more kludges this could be done, but better to fix revealing origin info for logical messages in decoding API. Doesn't matter for 3 nodes, not good for 4+.
1 parent 7b5f973 commit 224cba5

14 files changed

+828
-584
lines changed

Cluster.pm

+1-1
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ sub backup_and_init()
232232

233233
print "# Taking pg_basebackup $backup_name from node \"$name\"\n";
234234
my $dumpres = command_output(['pg_basebackup', '-D', $backup_path, '-p', $port,
235-
'-h', '127.0.0.1', '--no-sync', '-v', '-S', "mtm_recovery_slot_$to_mmid"]);
235+
'-h', '127.0.0.1', '--no-sync', '-v', '-S', "mtm_filter_slot_$to_mmid"]);
236236

237237
print "# Backup finished\n";
238238

expected/regression.diff

+61-30
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,27 @@
11
*** contrib/mmts/../../src/test/regress/expected/opr_sanity.out CENSORED
22
--- contrib/mmts/../../src/test/regress/results/opr_sanity.out CENSORED
33
***************
4-
*** 871,877 ****
5-
xml | text | 0 | a
6-
xml | character varying | 0 | a
7-
xml | character | 0 | a
8-
! (9 rows)
4+
*** 771,779 ****
5+
SELECT p1.oid, p1.proname
6+
FROM pg_proc AS p1
7+
WHERE provolatile = 'i' AND proparallel = 'u';
8+
! oid | proname
9+
! -----+---------
10+
! (0 rows)
911

10-
-- **************** pg_conversion ****************
11-
-- Look for illegal values in pg_conversion fields.
12-
--- 871,878 ----
13-
xml | text | 0 | a
14-
xml | character varying | 0 | a
15-
xml | character | 0 | a
16-
! pg_lsn | bigint | 0 | e
17-
! (10 rows)
12+
-- **************** pg_cast ****************
13+
-- Catch bogus values in pg_cast columns (other than cases detected by
14+
--- 771,780 ----
15+
SELECT p1.oid, p1.proname
16+
FROM pg_proc AS p1
17+
WHERE provolatile = 'i' AND proparallel = 'u';
18+
! oid | proname
19+
! -------+---------
20+
! 16472 | min
21+
! (1 row)
1822

19-
-- **************** pg_conversion ****************
20-
-- Look for illegal values in pg_conversion fields.
23+
-- **************** pg_cast ****************
24+
-- Catch bogus values in pg_cast columns (other than cases detected by
2125

2226
======================================================================
2327

@@ -346,6 +350,16 @@
346350
lseg_tbl|f
347351
main_table|f
348352
***************
353+
*** 85,90 ****
354+
--- 90,96 ----
355+
mlparted_defd|f
356+
money_data|f
357+
mytable|t
358+
+ nodes_init_done|t
359+
num_data|f
360+
num_exp_add|t
361+
num_exp_div|t
362+
***************
349363
*** 141,147 ****
350364
pg_pltemplate|t
351365
pg_policy|t
@@ -354,7 +368,7 @@
354368
pg_publication_rel|t
355369
pg_range|t
356370
pg_replication_origin|t
357-
--- 146,151 ----
371+
--- 147,152 ----
358372
***************
359373
*** 153,159 ****
360374
pg_shseclabel|t
@@ -364,10 +378,10 @@
364378
pg_subscription_rel|t
365379
pg_tablespace|t
366380
pg_transform|t
367-
--- 157,162 ----
381+
--- 158,163 ----
368382
***************
369383
*** 187,192 ****
370-
--- 190,196 ----
384+
--- 191,197 ----
371385
sql_sizing_profiles|f
372386
stud_emp|f
373387
student|f
@@ -770,15 +784,32 @@
770784
interpt_pp(ih.thepath, r.thepath) AS exit
771785
***************
772786
*** 1297,1302 ****
773-
--- 1306,1316 ----
787+
--- 1306,1333 ----
774788
FROM view_base_table
775789
GROUP BY view_base_table.key
776790
HAVING (length((view_base_table.data)::text) > 0);
777-
+ latest_syncpoints| SELECT DISTINCT ON (syncpoints.node_id) syncpoints.node_id,
778-
+ syncpoints.origin_lsn,
779-
+ syncpoints.local_lsn
780-
+ FROM mtm.syncpoints
781-
+ ORDER BY syncpoints.node_id, syncpoints.origin_lsn DESC;
791+
+ latest_syncpoints| SELECT node_id_pairs.origin_node_id,
792+
+ COALESCE(ls.origin_lsn, '0/0'::pg_lsn) AS origin_lsn,
793+
+ node_id_pairs.receiver_node_id,
794+
+ COALESCE(mtm.translate_syncpoint(node_id_pairs.origin_node_id, ls.origin_lsn, node_id_pairs.receiver_node_id),
795+
+ CASE
796+
+ WHEN (node_id_pairs.receiver_node_id = mtm.my_node_id()) THEN ( SELECT pg_replication_slots.restart_lsn
797+
+ FROM pg_replication_slots
798+
+ WHERE ((pg_replication_slots.slot_name)::text = format('mtm_filter_slot_%s'::text, node_id_pairs.origin_node_id)))
799+
+ ELSE NULL::pg_lsn
800+
+ END) AS fill_filter_since
801+
+ FROM (( SELECT n1.id AS origin_node_id,
802+
+ n2.id AS receiver_node_id
803+
+ FROM mtm.cluster_nodes n1,
804+
+ mtm.cluster_nodes n2
805+
+ WHERE (n1.id <> n2.id)) node_id_pairs
806+
+ LEFT JOIN ( SELECT DISTINCT ON (syncpoints.origin_node_id, syncpoints.receiver_node_id) syncpoints.origin_node_id,
807+
+ syncpoints.origin_lsn,
808+
+ syncpoints.receiver_node_id,
809+
+ syncpoints.receiver_lsn
810+
+ FROM mtm.syncpoints
811+
+ ORDER BY syncpoints.origin_node_id, syncpoints.receiver_node_id, syncpoints.origin_lsn DESC) ls ON (((node_id_pairs.origin_node_id = ls.origin_node_id) AND (node_id_pairs.receiver_node_id = ls.receiver_node_id))))
812+
+ ORDER BY node_id_pairs.origin_node_id, node_id_pairs.receiver_node_id;
782813
mvtest_tv| SELECT mvtest_t.type,
783814
sum(mvtest_t.amt) AS totamt
784815
FROM mvtest_t
@@ -803,7 +834,7 @@
803834
(pg_class c
804835
JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
805836
WHERE (c.oid IN ( SELECT pg_get_publication_tables.relid
806-
--- 1454,1482 ----
837+
--- 1471,1499 ----
807838
p.parameter_types,
808839
p.from_sql
809840
FROM pg_prepared_statement() p(name, statement, prepare_time, parameter_types, from_sql);
@@ -842,7 +873,7 @@
842873
WHERE (l.objsubid = 0)
843874
UNION ALL
844875
SELECT l.objoid,
845-
--- 1666,1672 ----
876+
--- 1683,1689 ----
846877
l.provider,
847878
l.label
848879
FROM (pg_seclabel l
@@ -859,7 +890,7 @@
859890
UNION ALL
860891
SELECT l.objoid,
861892
l.classoid,
862-
--- 1678,1684 ----
893+
--- 1695,1701 ----
863894
l.provider,
864895
l.label
865896
FROM (pg_shseclabel l
@@ -876,7 +907,7 @@
876907
LEFT JOIN pg_stat_get_subscription(NULL::oid) st(subid, relid, pid, received_lsn, last_msg_send_time, last_msg_receipt_time, latest_end_lsn, latest_end_time) ON ((st.subid = su.oid)));
877908
pg_stat_sys_indexes| SELECT pg_stat_all_indexes.relid,
878909
pg_stat_all_indexes.indexrelid,
879-
--- 1931,1937 ----
910+
--- 1948,1954 ----
880911
st.last_msg_receipt_time,
881912
st.latest_end_lsn,
882913
st.latest_end_time
@@ -886,7 +917,7 @@
886917
pg_stat_all_indexes.indexrelid,
887918
***************
888919
*** 2217,2222 ****
889-
--- 2241,2257 ----
920+
--- 2258,2274 ----
890921
JOIN pg_attribute a ON (((c.oid = a.attrelid) AND (a.attnum = s.staattnum))))
891922
LEFT JOIN pg_namespace n ON ((n.oid = c.relnamespace)))
892923
WHERE ((NOT a.attisdropped) AND has_column_privilege(c.oid, a.attnum, 'select'::text) AND ((c.relrowsecurity = false) OR (NOT row_security_active(c.oid))));
@@ -906,7 +937,7 @@
906937
pg_get_userbyid(c.relowner) AS tableowner,
907938
***************
908939
*** 2360,2365 ****
909-
--- 2395,2408 ----
940+
--- 2412,2425 ----
910941
FROM pg_control_snapshot() pg_control_snapshot(oldest_snapshot, recent_snapshot, active_snapshot)), ( SELECT pg_control_snapshot.recent_snapshot
911942
FROM pg_control_snapshot() pg_control_snapshot(oldest_snapshot, recent_snapshot, active_snapshot))) snap_id(snap_id)
912943
WHERE pg_snapshot_exists(snap_id.snap_id);

0 commit comments

Comments
 (0)