@@ -336,21 +336,36 @@ pub_rf_contains_invalid_column(Oid pubid, Relation relation, List *ancestors,
336
336
}
337
337
338
338
/*
339
- * Check if all columns referenced in the REPLICA IDENTITY are covered by
340
- * the column list.
339
+ * Check for invalid columns in the publication table definition.
341
340
*
342
- * Returns true if any replica identity column is not covered by column list.
341
+ * This function evaluates two conditions:
342
+ *
343
+ * 1. Ensures that all columns referenced in the REPLICA IDENTITY are covered
344
+ * by the column list. If any column is missing, *invalid_column_list is set
345
+ * to true.
346
+ * 2. Ensures that all the generated columns referenced in the REPLICA IDENTITY
347
+ * are published either by listing them in the column list or by enabling
348
+ * publish_generated_columns option. If any unpublished generated column is
349
+ * found, *invalid_gen_col is set to true.
350
+ *
351
+ * Returns true if any of the above conditions are not met.
343
352
*/
344
353
bool
345
- pub_collist_contains_invalid_column (Oid pubid , Relation relation , List * ancestors ,
346
- bool pubviaroot )
354
+ pub_contains_invalid_column (Oid pubid , Relation relation , List * ancestors ,
355
+ bool pubviaroot , bool pubgencols ,
356
+ bool * invalid_column_list ,
357
+ bool * invalid_gen_col )
347
358
{
348
- HeapTuple tuple ;
349
359
Oid relid = RelationGetRelid (relation );
350
360
Oid publish_as_relid = RelationGetRelid (relation );
351
- bool result = false;
352
- Datum datum ;
353
- bool isnull ;
361
+ Bitmapset * idattrs ;
362
+ Bitmapset * columns = NULL ;
363
+ TupleDesc desc = RelationGetDescr (relation );
364
+ Publication * pub ;
365
+ int x ;
366
+
367
+ * invalid_column_list = false;
368
+ * invalid_gen_col = false;
354
369
355
370
/*
356
371
* For a partition, if pubviaroot is true, find the topmost ancestor that
@@ -368,80 +383,91 @@ pub_collist_contains_invalid_column(Oid pubid, Relation relation, List *ancestor
368
383
publish_as_relid = relid ;
369
384
}
370
385
371
- tuple = SearchSysCache2 ( PUBLICATIONRELMAP ,
372
- ObjectIdGetDatum ( publish_as_relid ),
373
- ObjectIdGetDatum ( pubid ) );
386
+ /* Fetch the column list */
387
+ pub = GetPublication ( pubid );
388
+ check_and_fetch_column_list ( pub , publish_as_relid , NULL , & columns );
374
389
375
- if (!HeapTupleIsValid (tuple ))
376
- return false;
390
+ if (relation -> rd_rel -> relreplident == REPLICA_IDENTITY_FULL )
391
+ {
392
+ /* With REPLICA IDENTITY FULL, no column list is allowed. */
393
+ * invalid_column_list = (columns != NULL );
377
394
378
- datum = SysCacheGetAttr (PUBLICATIONRELMAP , tuple ,
379
- Anum_pg_publication_rel_prattrs ,
380
- & isnull );
395
+ /*
396
+ * As we don't allow a column list with REPLICA IDENTITY FULL, the
397
+ * publish_generated_columns option must be set to true if the table
398
+ * has any stored generated columns.
399
+ */
400
+ if (!pubgencols &&
401
+ relation -> rd_att -> constr &&
402
+ relation -> rd_att -> constr -> has_generated_stored )
403
+ * invalid_gen_col = true;
381
404
382
- if (!isnull )
383
- {
384
- int x ;
385
- Bitmapset * idattrs ;
386
- Bitmapset * columns = NULL ;
405
+ if (* invalid_gen_col && * invalid_column_list )
406
+ return true;
407
+ }
387
408
388
- /* With REPLICA IDENTITY FULL, no column list is allowed. */
389
- if ( relation -> rd_rel -> relreplident == REPLICA_IDENTITY_FULL )
390
- result = true ;
409
+ /* Remember columns that are part of the REPLICA IDENTITY */
410
+ idattrs = RelationGetIndexAttrBitmap ( relation ,
411
+ INDEX_ATTR_BITMAP_IDENTITY_KEY ) ;
391
412
392
- /* Transform the column list datum to a bitmapset. */
393
- columns = pub_collist_to_bitmapset (NULL , datum , NULL );
413
+ /*
414
+ * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are offset
415
+ * (to handle system columns the usual way), while column list does not
416
+ * use offset, so we can't do bms_is_subset(). Instead, we have to loop
417
+ * over the idattrs and check all of them are in the list.
418
+ */
419
+ x = -1 ;
420
+ while ((x = bms_next_member (idattrs , x )) >= 0 )
421
+ {
422
+ AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber );
423
+ Form_pg_attribute att = TupleDescAttr (desc , attnum - 1 );
394
424
395
- /* Remember columns that are part of the REPLICA IDENTITY */
396
- idattrs = RelationGetIndexAttrBitmap (relation ,
397
- INDEX_ATTR_BITMAP_IDENTITY_KEY );
425
+ if (columns == NULL )
426
+ {
427
+ /*
428
+ * The publish_generated_columns option must be set to true if the
429
+ * REPLICA IDENTITY contains any stored generated column.
430
+ */
431
+ if (!pubgencols && att -> attgenerated )
432
+ {
433
+ * invalid_gen_col = true;
434
+ break ;
435
+ }
436
+
437
+ /* Skip validating the column list since it is not defined */
438
+ continue ;
439
+ }
398
440
399
441
/*
400
- * Attnums in the bitmap returned by RelationGetIndexAttrBitmap are
401
- * offset (to handle system columns the usual way), while column list
402
- * does not use offset, so we can't do bms_is_subset(). Instead, we
403
- * have to loop over the idattrs and check all of them are in the
404
- * list .
442
+ * If pubviaroot is true, we are validating the column list of the
443
+ * parent table, but the bitmap contains the replica identity
444
+ * information of the child table. The parent/child attnums may not
445
+ * match, so translate them to the parent - get the attname from the
446
+ * child, and look it up in the parent .
405
447
*/
406
- x = -1 ;
407
- while ((x = bms_next_member (idattrs , x )) >= 0 )
448
+ if (pubviaroot )
408
449
{
409
- AttrNumber attnum = (x + FirstLowInvalidHeapAttributeNumber );
450
+ /* attribute name in the child table */
451
+ char * colname = get_attname (relid , attnum , false);
410
452
411
453
/*
412
- * If pubviaroot is true, we are validating the column list of the
413
- * parent table, but the bitmap contains the replica identity
414
- * information of the child table. The parent/child attnums may
415
- * not match, so translate them to the parent - get the attname
416
- * from the child, and look it up in the parent.
454
+ * Determine the attnum for the attribute name in parent (we are
455
+ * using the column list defined on the parent).
417
456
*/
418
- if (pubviaroot )
419
- {
420
- /* attribute name in the child table */
421
- char * colname = get_attname (relid , attnum , false);
422
-
423
- /*
424
- * Determine the attnum for the attribute name in parent (we
425
- * are using the column list defined on the parent).
426
- */
427
- attnum = get_attnum (publish_as_relid , colname );
428
- }
429
-
430
- /* replica identity column, not covered by the column list */
431
- if (!bms_is_member (attnum , columns ))
432
- {
433
- result = true;
434
- break ;
435
- }
457
+ attnum = get_attnum (publish_as_relid , colname );
436
458
}
437
459
438
- bms_free (idattrs );
439
- bms_free (columns );
460
+ /* replica identity column, not covered by the column list */
461
+ * invalid_column_list |= !bms_is_member (attnum , columns );
462
+
463
+ if (* invalid_column_list && * invalid_gen_col )
464
+ break ;
440
465
}
441
466
442
- ReleaseSysCache (tuple );
467
+ bms_free (columns );
468
+ bms_free (idattrs );
443
469
444
- return result ;
470
+ return * invalid_column_list || * invalid_gen_col ;
445
471
}
446
472
447
473
/* check_functions_in_node callback */
0 commit comments