Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 325f2ec

Browse files
committed
Handle heap rewrites even better in logical decoding
Logical decoding should not publish anything about tables created as part of a heap rewrite during DDL. Those tables don't exist externally, so consumers of logical decoding cannot do anything sensible with that information. In ab28fea, we worked around this for built-in logical replication, but that was hack. This is a more proper fix: We mark such transient heaps using the new field pg_class.relwrite, linking to the original relation OID. By default, we ignore them in logical decoding before they get to the output plugin. Optionally, a plugin can register their interest in getting such changes, if they handle DDL specially, in which case the new field will help them get information about the actual table. Reviewed-by: Craig Ringer <craig@2ndquadrant.com>
1 parent be8a7a6 commit 325f2ec

File tree

20 files changed

+113
-103
lines changed

20 files changed

+113
-103
lines changed

contrib/test_decoding/expected/concurrent_ddl_dml.out

Lines changed: 28 additions & 54 deletions
Large diffs are not rendered by default.

contrib/test_decoding/expected/ddl.out

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,11 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
117117
(22 rows)
118118

119119
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
120-
-- throw away changes, they contain oids
120+
-- check that this doesn't produce any changes from the heap rewrite
121121
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
122122
count
123123
-------
124-
12
124+
0
125125
(1 row)
126126

127127
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -192,16 +192,20 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
192192
COMMIT
193193
(33 rows)
194194

195-
-- hide changes bc of oid visible in full table rewrites
196195
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
197196
INSERT INTO tr_unique(data) VALUES(10);
198197
ALTER TABLE tr_unique RENAME TO tr_pkey;
199198
ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
200-
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
201-
count
202-
-------
203-
6
204-
(1 row)
199+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
200+
data
201+
-----------------------------------------------------------------------------
202+
BEGIN
203+
table public.tr_unique: INSERT: id2[integer]:1 data[integer]:10
204+
COMMIT
205+
BEGIN
206+
table public.tr_pkey: INSERT: id2[integer]:1 data[integer]:10 id[integer]:1
207+
COMMIT
208+
(6 rows)
205209

206210
INSERT INTO tr_pkey(data) VALUES(1);
207211
--show deletion with primary key

contrib/test_decoding/specs/concurrent_ddl_dml.spec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ step "s2_alter_tbl2_3rd_char" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE characte
5353
step "s2_alter_tbl2_3rd_text" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE text; }
5454
step "s2_alter_tbl2_3rd_int" { ALTER TABLE tbl2 ALTER COLUMN val3 TYPE int USING val3::integer; }
5555

56-
step "s2_get_changes" { SELECT regexp_replace(data, 'temp_\d+', 'temp') AS data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
56+
step "s2_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); }
5757

5858

5959

contrib/test_decoding/sql/ddl.sql

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ INSERT INTO replication_example(somedata, somenum) VALUES (4, 1);
6767
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
6868

6969
ALTER TABLE replication_example ALTER COLUMN somenum TYPE int4 USING (somenum::int4);
70-
-- throw away changes, they contain oids
70+
-- check that this doesn't produce any changes from the heap rewrite
7171
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
7272

7373
INSERT INTO replication_example(somedata, somenum) VALUES (5, 1);
@@ -93,12 +93,11 @@ COMMIT;
9393
/* display results */
9494
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
9595

96-
-- hide changes bc of oid visible in full table rewrites
9796
CREATE TABLE tr_unique(id2 serial unique NOT NULL, data int);
9897
INSERT INTO tr_unique(data) VALUES(10);
9998
ALTER TABLE tr_unique RENAME TO tr_pkey;
10099
ALTER TABLE tr_pkey ADD COLUMN id serial primary key;
101-
SELECT count(data) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
100+
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'include-rewrites', '1');
102101

103102
INSERT INTO tr_pkey(data) VALUES(1);
104103
--show deletion with primary key

contrib/test_decoding/test_decoding.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
101101
ctx->output_plugin_private = data;
102102

103103
opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT;
104+
opt->receive_rewrites = false;
104105

105106
foreach(option, ctx->output_plugin_options)
106107
{
@@ -166,6 +167,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
166167
errmsg("could not parse value \"%s\" for parameter \"%s\"",
167168
strVal(elem->arg), elem->defname)));
168169
}
170+
else if (strcmp(elem->defname, "include-rewrites") == 0)
171+
{
172+
173+
if (elem->arg == NULL)
174+
continue;
175+
else if (!parse_bool(strVal(elem->arg), &opt->receive_rewrites))
176+
ereport(ERROR,
177+
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
178+
errmsg("could not parse value \"%s\" for parameter \"%s\"",
179+
strVal(elem->arg), elem->defname)));
180+
}
169181
else
170182
{
171183
ereport(ERROR,
@@ -412,6 +424,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
412424
quote_qualified_identifier(
413425
get_namespace_name(
414426
get_rel_namespace(RelationGetRelid(relation))),
427+
class_form->relrewrite ?
428+
get_rel_name(class_form->relrewrite) :
415429
NameStr(class_form->relname)));
416430
appendStringInfoChar(ctx->out, ':');
417431

doc/src/sgml/catalogs.sgml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1923,6 +1923,18 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
19231923
<entry>True if table is a partition</entry>
19241924
</row>
19251925

1926+
<row>
1927+
<entry><structfield>relrewrite</structfield></entry>
1928+
<entry><type>oid</type></entry>
1929+
<entry><literal><link linkend="catalog-pg-class"><structname>pg_class</structname></link>.oid</literal></entry>
1930+
<entry>
1931+
For new relations being written during a DDL operation that requires a
1932+
table rewrite, this contains the OID of the original relation;
1933+
otherwise 0. That state is only visible internally; this field should
1934+
never contain anything other than 0 for a user-visible relation.
1935+
</entry>
1936+
</row>
1937+
19261938
<row>
19271939
<entry><structfield>relfrozenxid</structfield></entry>
19281940
<entry><type>xid</type></entry>

doc/src/sgml/logicaldecoding.sgml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,12 +486,17 @@ typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
486486
typedef struct OutputPluginOptions
487487
{
488488
OutputPluginOutputType output_type;
489+
bool receive_rewrites;
489490
} OutputPluginOptions;
490491
</programlisting>
491492
<literal>output_type</literal> has to either be set to
492493
<literal>OUTPUT_PLUGIN_TEXTUAL_OUTPUT</literal>
493494
or <literal>OUTPUT_PLUGIN_BINARY_OUTPUT</literal>. See also
494495
<xref linkend="logicaldecoding-output-mode"/>.
496+
If <literal>receive_rewrites</literal> is true, the output plugin will
497+
also be called for changes made by heap rewrites during certain DDL
498+
operations. These are of interest to plugins that handle DDL
499+
replication, but they require special handling.
495500
</para>
496501

497502
<para>

src/backend/bootstrap/bootparse.y

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,7 @@ Boot_CreateStmt:
257257
false,
258258
true,
259259
false,
260+
InvalidOid,
260261
NULL);
261262
elog(DEBUG4, "relation created with OID %u", id);
262263
}

src/backend/catalog/heap.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -806,6 +806,7 @@ InsertPgClassTuple(Relation pg_class_desc,
806806
values[Anum_pg_class_relispopulated - 1] = BoolGetDatum(rd_rel->relispopulated);
807807
values[Anum_pg_class_relreplident - 1] = CharGetDatum(rd_rel->relreplident);
808808
values[Anum_pg_class_relispartition - 1] = BoolGetDatum(rd_rel->relispartition);
809+
values[Anum_pg_class_relrewrite - 1] = ObjectIdGetDatum(rd_rel->relrewrite);
809810
values[Anum_pg_class_relfrozenxid - 1] = TransactionIdGetDatum(rd_rel->relfrozenxid);
810811
values[Anum_pg_class_relminmxid - 1] = MultiXactIdGetDatum(rd_rel->relminmxid);
811812
if (relacl != (Datum) 0)
@@ -1038,6 +1039,7 @@ heap_create_with_catalog(const char *relname,
10381039
bool use_user_acl,
10391040
bool allow_system_table_mods,
10401041
bool is_internal,
1042+
Oid relrewrite,
10411043
ObjectAddress *typaddress)
10421044
{
10431045
Relation pg_class_desc;
@@ -1176,6 +1178,8 @@ heap_create_with_catalog(const char *relname,
11761178

11771179
Assert(relid == RelationGetRelid(new_rel_desc));
11781180

1181+
new_rel_desc->rd_rel->relrewrite = relrewrite;
1182+
11791183
/*
11801184
* Decide whether to create an array type over the relation's rowtype. We
11811185
* do not create any array types for system catalogs (ie, those made

src/backend/catalog/toasting.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ create_toast_table(Relation rel, Oid toastOid, Oid toastIndexOid,
279279
false,
280280
true,
281281
true,
282+
InvalidOid,
282283
NULL);
283284
Assert(toast_relid != InvalidOid);
284285

src/backend/commands/cluster.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -692,6 +692,7 @@ make_new_heap(Oid OIDOldHeap, Oid NewTableSpace, char relpersistence,
692692
false,
693693
true,
694694
true,
695+
OIDOldHeap,
695696
NULL);
696697
Assert(OIDNewHeap != InvalidOid);
697698

src/backend/commands/tablecmds.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -764,6 +764,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
764764
true,
765765
allowSystemTableMods,
766766
false,
767+
InvalidOid,
767768
typaddress);
768769

769770
/* Store inheritance information for new rel. */

src/backend/replication/logical/logical.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,8 @@ CreateInitDecodingContext(char *plugin,
317317
startup_cb_wrapper(ctx, &ctx->options, true);
318318
MemoryContextSwitchTo(old_context);
319319

320+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
321+
320322
return ctx;
321323
}
322324

@@ -410,6 +412,8 @@ CreateDecodingContext(XLogRecPtr start_lsn,
410412
startup_cb_wrapper(ctx, &ctx->options, false);
411413
MemoryContextSwitchTo(old_context);
412414

415+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
416+
413417
ereport(LOG,
414418
(errmsg("starting logical decoding for slot \"%s\"",
415419
NameStr(slot->data.name)),

src/backend/replication/logical/reorderbuffer.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,13 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
14021402
if (!RelationIsLogicallyLogged(relation))
14031403
goto change_done;
14041404

1405+
/*
1406+
* Ignore temporary heaps created during DDL unless the
1407+
* plugin has asked for them.
1408+
*/
1409+
if (relation->rd_rel->relrewrite && !rb->output_rewrites)
1410+
goto change_done;
1411+
14051412
/*
14061413
* For now ignore sequence changes entirely. Most of the
14071414
* time they don't log changes using records we

src/backend/replication/pgoutput/pgoutput.c

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
#include "utils/inval.h"
2323
#include "utils/int8.h"
24-
#include "utils/lsyscache.h"
2524
#include "utils/memutils.h"
2625
#include "utils/syscache.h"
2726
#include "utils/varlena.h"
@@ -511,31 +510,6 @@ get_rel_sync_entry(PGOutputData *data, Oid relid)
511510
{
512511
Publication *pub = lfirst(lc);
513512

514-
/*
515-
* Skip tables that look like they are from a heap rewrite (see
516-
* make_new_heap()). We need to skip them because the subscriber
517-
* won't have a table by that name to receive the data. That
518-
* means we won't ship the new data in, say, an added column with
519-
* a DEFAULT, but if the user applies the same DDL manually on the
520-
* subscriber, then this will work out for them.
521-
*
522-
* We only need to consider the alltables case, because such a
523-
* transient heap won't be an explicit member of a publication.
524-
*/
525-
if (pub->alltables)
526-
{
527-
char *relname = get_rel_name(relid);
528-
unsigned int u;
529-
int n;
530-
531-
if (sscanf(relname, "pg_temp_%u%n", &u, &n) == 1 &&
532-
relname[n] == '\0')
533-
{
534-
if (get_rel_relkind(u) == RELKIND_RELATION)
535-
break;
536-
}
537-
}
538-
539513
if (pub->alltables || list_member_oid(pubids, pub->oid))
540514
{
541515
entry->pubactions.pubinsert |= pub->pubactions.pubinsert;

src/include/catalog/catversion.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,6 @@
5353
*/
5454

5555
/* yyyymmddN */
56-
#define CATALOG_VERSION_NO 201803141
56+
#define CATALOG_VERSION_NO 201803211
5757

5858
#endif

src/include/catalog/heap.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ extern Oid heap_create_with_catalog(const char *relname,
7171
bool use_user_acl,
7272
bool allow_system_table_mods,
7373
bool is_internal,
74+
Oid relrewrite,
7475
ObjectAddress *typaddress);
7576

7677
extern void heap_create_init_fork(Relation rel);

src/include/catalog/pg_class.h

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ CATALOG(pg_class,1259) BKI_BOOTSTRAP BKI_ROWTYPE_OID(83) BKI_SCHEMA_MACRO
7070
bool relispopulated; /* matview currently holds query results */
7171
char relreplident; /* see REPLICA_IDENTITY_xxx constants */
7272
bool relispartition; /* is relation a partition? */
73+
Oid relrewrite; /* heap for rewrite during DDL, link to original rel */
7374
TransactionId relfrozenxid; /* all Xids < this are frozen in this rel */
7475
TransactionId relminmxid; /* all multixacts in this rel are >= this.
7576
* this is really a MultiXactId */
@@ -98,7 +99,7 @@ typedef FormData_pg_class *Form_pg_class;
9899
* ----------------
99100
*/
100101

101-
#define Natts_pg_class 32
102+
#define Natts_pg_class 33
102103
#define Anum_pg_class_relname 1
103104
#define Anum_pg_class_relnamespace 2
104105
#define Anum_pg_class_reltype 3
@@ -126,11 +127,12 @@ typedef FormData_pg_class *Form_pg_class;
126127
#define Anum_pg_class_relispopulated 25
127128
#define Anum_pg_class_relreplident 26
128129
#define Anum_pg_class_relispartition 27
129-
#define Anum_pg_class_relfrozenxid 28
130-
#define Anum_pg_class_relminmxid 29
131-
#define Anum_pg_class_relacl 30
132-
#define Anum_pg_class_reloptions 31
133-
#define Anum_pg_class_relpartbound 32
130+
#define Anum_pg_class_relrewrite 28
131+
#define Anum_pg_class_relfrozenxid 29
132+
#define Anum_pg_class_relminmxid 30
133+
#define Anum_pg_class_relacl 31
134+
#define Anum_pg_class_reloptions 32
135+
#define Anum_pg_class_relpartbound 33
134136

135137
/* ----------------
136138
* initial contents of pg_class
@@ -145,13 +147,13 @@ typedef FormData_pg_class *Form_pg_class;
145147
* Note: "3" in the relfrozenxid column stands for FirstNormalTransactionId;
146148
* similarly, "1" in relminmxid stands for FirstMultiXactId
147149
*/
148-
DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
150+
DATA(insert OID = 1247 ( pg_type PGNSP 71 0 PGUID 0 0 0 0 0 0 0 f f p r 30 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_));
149151
DESCR("");
150-
DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f t n f 3 1 _null_ _null_ _null_));
152+
DATA(insert OID = 1249 ( pg_attribute PGNSP 75 0 PGUID 0 0 0 0 0 0 0 f f p r 22 0 f f f f f f t n f 0 3 1 _null_ _null_ _null_));
151153
DESCR("");
152-
DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
154+
DATA(insert OID = 1255 ( pg_proc PGNSP 81 0 PGUID 0 0 0 0 0 0 0 f f p r 28 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_));
153155
DESCR("");
154-
DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 32 0 t f f f f f t n f 3 1 _null_ _null_ _null_));
156+
DATA(insert OID = 1259 ( pg_class PGNSP 83 0 PGUID 0 0 0 0 0 0 0 f f p r 33 0 t f f f f f t n f 0 3 1 _null_ _null_ _null_));
155157
DESCR("");
156158

157159

src/include/replication/output_plugin.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ typedef enum OutputPluginOutputType
2626
typedef struct OutputPluginOptions
2727
{
2828
OutputPluginOutputType output_type;
29+
bool receive_rewrites;
2930
} OutputPluginOptions;
3031

3132
/*

src/include/replication/reorderbuffer.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,11 @@ struct ReorderBuffer
336336
*/
337337
void *private_data;
338338

339+
/*
340+
* Saved output plugin option
341+
*/
342+
bool output_rewrites;
343+
339344
/*
340345
* Private memory context.
341346
*/

0 commit comments

Comments
 (0)