19
19
#include "access/htup_details.h"
20
20
#include "access/sysattr.h"
21
21
#include "access/tupconvert.h"
22
+ #include "access/xact.h"
22
23
#include "catalog/dependency.h"
23
24
#include "catalog/indexing.h"
24
25
#include "catalog/objectaccess.h"
37
38
#include "utils/tqual.h"
38
39
39
40
41
+ static void clone_fk_constraints (Relation pg_constraint , Relation parentRel ,
42
+ Relation partRel , List * clone , List * * cloned );
43
+
44
+
40
45
/*
41
46
* CreateConstraintEntry
42
47
* Create a constraint table entry.
@@ -400,57 +405,106 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
400
405
Relation rel ;
401
406
ScanKeyData key ;
402
407
SysScanDesc scan ;
403
- TupleDesc tupdesc ;
404
408
HeapTuple tuple ;
405
- AttrNumber * attmap ;
409
+ List * clone = NIL ;
406
410
407
411
parentRel = heap_open (parentId , NoLock ); /* already got lock */
408
412
/* see ATAddForeignKeyConstraint about lock level */
409
413
rel = heap_open (relationId , AccessExclusiveLock );
410
-
411
414
pg_constraint = heap_open (ConstraintRelationId , RowShareLock );
415
+
416
+ /* Obtain the list of constraints to clone or attach */
417
+ ScanKeyInit (& key ,
418
+ Anum_pg_constraint_conrelid , BTEqualStrategyNumber ,
419
+ F_OIDEQ , ObjectIdGetDatum (parentId ));
420
+ scan = systable_beginscan (pg_constraint , ConstraintRelidTypidNameIndexId , true,
421
+ NULL , 1 , & key );
422
+ while ((tuple = systable_getnext (scan )) != NULL )
423
+ clone = lappend_oid (clone , HeapTupleGetOid (tuple ));
424
+ systable_endscan (scan );
425
+
426
+ /* Do the actual work, recursing to partitions as needed */
427
+ clone_fk_constraints (pg_constraint , parentRel , rel , clone , cloned );
428
+
429
+ /* We're done. Clean up */
430
+ heap_close (parentRel , NoLock );
431
+ heap_close (rel , NoLock ); /* keep lock till commit */
432
+ heap_close (pg_constraint , RowShareLock );
433
+ }
434
+
435
+ /*
436
+ * clone_fk_constraints
437
+ * Recursive subroutine for CloneForeignKeyConstraints
438
+ *
439
+ * Clone the given list of FK constraints when a partition is attached.
440
+ *
441
+ * When cloning foreign keys to a partition, it may happen that equivalent
442
+ * constraints already exist in the partition for some of them. We can skip
443
+ * creating a clone in that case, and instead just attach the existing
444
+ * constraint to the one in the parent.
445
+ *
446
+ * This function recurses to partitions, if the new partition is partitioned;
447
+ * of course, only do this for FKs that were actually cloned.
448
+ */
449
+ static void
450
+ clone_fk_constraints (Relation pg_constraint , Relation parentRel ,
451
+ Relation partRel , List * clone , List * * cloned )
452
+ {
453
+ TupleDesc tupdesc ;
454
+ AttrNumber * attmap ;
455
+ List * partFKs ;
456
+ List * subclone = NIL ;
457
+ ListCell * cell ;
458
+
412
459
tupdesc = RelationGetDescr (pg_constraint );
413
460
414
461
/*
415
462
* The constraint key may differ, if the columns in the partition are
416
463
* different. This map is used to convert them.
417
464
*/
418
- attmap = convert_tuples_by_name_map (RelationGetDescr (rel ),
465
+ attmap = convert_tuples_by_name_map (RelationGetDescr (partRel ),
419
466
RelationGetDescr (parentRel ),
420
467
gettext_noop ("could not convert row type" ));
421
468
422
- ScanKeyInit (& key ,
423
- Anum_pg_constraint_conrelid , BTEqualStrategyNumber ,
424
- F_OIDEQ , ObjectIdGetDatum (parentId ));
425
- scan = systable_beginscan (pg_constraint , ConstraintRelidTypidNameIndexId , true,
426
- NULL , 1 , & key );
469
+ partFKs = copyObject (RelationGetFKeyList (partRel ));
427
470
428
- while (( tuple = systable_getnext ( scan )) != NULL )
471
+ foreach ( cell , clone )
429
472
{
430
- Form_pg_constraint constrForm = (Form_pg_constraint ) GETSTRUCT (tuple );
473
+ Oid parentConstrOid = lfirst_oid (cell );
474
+ Form_pg_constraint constrForm ;
475
+ HeapTuple tuple ;
431
476
AttrNumber conkey [INDEX_MAX_KEYS ];
432
477
AttrNumber mapped_conkey [INDEX_MAX_KEYS ];
433
478
AttrNumber confkey [INDEX_MAX_KEYS ];
434
479
Oid conpfeqop [INDEX_MAX_KEYS ];
435
480
Oid conppeqop [INDEX_MAX_KEYS ];
436
481
Oid conffeqop [INDEX_MAX_KEYS ];
437
482
Constraint * fkconstraint ;
438
- ClonedConstraint * newc ;
483
+ bool attach_it ;
439
484
Oid constrOid ;
440
485
ObjectAddress parentAddr ,
441
486
childAddr ;
442
487
int nelem ;
488
+ ListCell * cell ;
443
489
int i ;
444
490
ArrayType * arr ;
445
491
Datum datum ;
446
492
bool isnull ;
447
493
494
+ tuple = SearchSysCache1 (CONSTROID , parentConstrOid );
495
+ if (!tuple )
496
+ elog (ERROR , "cache lookup failed for constraint %u" ,
497
+ parentConstrOid );
498
+ constrForm = (Form_pg_constraint ) GETSTRUCT (tuple );
499
+
448
500
/* only foreign keys */
449
501
if (constrForm -> contype != CONSTRAINT_FOREIGN )
502
+ {
503
+ ReleaseSysCache (tuple );
450
504
continue ;
505
+ }
451
506
452
- ObjectAddressSet (parentAddr , ConstraintRelationId ,
453
- HeapTupleGetOid (tuple ));
507
+ ObjectAddressSet (parentAddr , ConstraintRelationId , parentConstrOid );
454
508
455
509
datum = fastgetattr (tuple , Anum_pg_constraint_conkey ,
456
510
tupdesc , & isnull );
@@ -539,6 +593,90 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
539
593
elog (ERROR , "conffeqop is not a 1-D OID array" );
540
594
memcpy (conffeqop , ARR_DATA_PTR (arr ), nelem * sizeof (Oid ));
541
595
596
+ /*
597
+ * Before creating a new constraint, see whether any existing FKs are
598
+ * fit for the purpose. If one is, attach the parent constraint to it,
599
+ * and don't clone anything. This way we avoid the expensive
600
+ * verification step and don't end up with a duplicate FK. This also
601
+ * means we don't consider this constraint when recursing to
602
+ * partitions.
603
+ */
604
+ attach_it = false;
605
+ foreach (cell , partFKs )
606
+ {
607
+ ForeignKeyCacheInfo * fk = lfirst_node (ForeignKeyCacheInfo , cell );
608
+ Form_pg_constraint partConstr ;
609
+ HeapTuple partcontup ;
610
+
611
+ attach_it = true;
612
+
613
+ /*
614
+ * Do some quick & easy initial checks. If any of these fail, we
615
+ * cannot use this constraint, but keep looking.
616
+ */
617
+ if (fk -> confrelid != constrForm -> confrelid || fk -> nkeys != nelem )
618
+ {
619
+ attach_it = false;
620
+ continue ;
621
+ }
622
+ for (i = 0 ; i < nelem ; i ++ )
623
+ {
624
+ if (fk -> conkey [i ] != mapped_conkey [i ] ||
625
+ fk -> confkey [i ] != confkey [i ] ||
626
+ fk -> conpfeqop [i ] != conpfeqop [i ])
627
+ {
628
+ attach_it = false;
629
+ break ;
630
+ }
631
+ }
632
+ if (!attach_it )
633
+ continue ;
634
+
635
+ /*
636
+ * Looks good so far; do some more extensive checks. Presumably
637
+ * the check for 'convalidated' could be dropped, since we don't
638
+ * really care about that, but let's be careful for now.
639
+ */
640
+ partcontup = SearchSysCache1 (CONSTROID ,
641
+ ObjectIdGetDatum (fk -> conoid ));
642
+ if (!partcontup )
643
+ elog (ERROR , "cache lookup failed for constraint %u" ,
644
+ fk -> conoid );
645
+ partConstr = (Form_pg_constraint ) GETSTRUCT (partcontup );
646
+ if (OidIsValid (partConstr -> conparentid ) ||
647
+ !partConstr -> convalidated ||
648
+ partConstr -> condeferrable != constrForm -> condeferrable ||
649
+ partConstr -> condeferred != constrForm -> condeferred ||
650
+ partConstr -> confupdtype != constrForm -> confupdtype ||
651
+ partConstr -> confdeltype != constrForm -> confdeltype ||
652
+ partConstr -> confmatchtype != constrForm -> confmatchtype )
653
+ {
654
+ ReleaseSysCache (partcontup );
655
+ attach_it = false;
656
+ continue ;
657
+ }
658
+
659
+ ReleaseSysCache (partcontup );
660
+
661
+ /* looks good! Attach this constraint */
662
+ ConstraintSetParentConstraint (fk -> conoid ,
663
+ HeapTupleGetOid (tuple ));
664
+ CommandCounterIncrement ();
665
+ attach_it = true;
666
+ break ;
667
+ }
668
+
669
+ /*
670
+ * If we attached to an existing constraint, there is no need to
671
+ * create a new one. In fact, there's no need to recurse for this
672
+ * constraint to partitions, either.
673
+ */
674
+ if (attach_it )
675
+ {
676
+ ReleaseSysCache (tuple );
677
+ continue ;
678
+ }
679
+
542
680
constrOid =
543
681
CreateConstraintEntry (NameStr (constrForm -> conname ),
544
682
constrForm -> connamespace ,
@@ -547,7 +685,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
547
685
constrForm -> condeferred ,
548
686
constrForm -> convalidated ,
549
687
HeapTupleGetOid (tuple ),
550
- relationId ,
688
+ RelationGetRelid ( partRel ) ,
551
689
mapped_conkey ,
552
690
nelem ,
553
691
nelem ,
@@ -568,6 +706,7 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
568
706
NULL ,
569
707
false,
570
708
1 , false, true);
709
+ subclone = lappend_oid (subclone , constrOid );
571
710
572
711
ObjectAddressSet (childAddr , ConstraintRelationId , constrOid );
573
712
recordDependencyOn (& childAddr , & parentAddr , DEPENDENCY_INTERNAL_AUTO );
@@ -580,43 +719,56 @@ CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
580
719
fkconstraint -> deferrable = constrForm -> condeferrable ;
581
720
fkconstraint -> initdeferred = constrForm -> condeferred ;
582
721
583
- createForeignKeyTriggers (rel , constrForm -> confrelid , fkconstraint ,
722
+ createForeignKeyTriggers (partRel , constrForm -> confrelid , fkconstraint ,
584
723
constrOid , constrForm -> conindid , false);
585
724
586
725
if (cloned )
587
726
{
727
+ ClonedConstraint * newc ;
728
+
588
729
/*
589
730
* Feed back caller about the constraints we created, so that they
590
731
* can set up constraint verification.
591
732
*/
592
733
newc = palloc (sizeof (ClonedConstraint ));
593
- newc -> relid = relationId ;
734
+ newc -> relid = RelationGetRelid ( partRel ) ;
594
735
newc -> refrelid = constrForm -> confrelid ;
595
736
newc -> conindid = constrForm -> conindid ;
596
737
newc -> conid = constrOid ;
597
738
newc -> constraint = fkconstraint ;
598
739
599
740
* cloned = lappend (* cloned , newc );
600
741
}
742
+
743
+ ReleaseSysCache (tuple );
601
744
}
602
- systable_endscan (scan );
603
745
604
746
pfree (attmap );
747
+ list_free_deep (partFKs );
605
748
606
- if (rel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE )
749
+ /*
750
+ * If the partition is partitioned, recurse to handle any constraints that
751
+ * were cloned.
752
+ */
753
+ if (partRel -> rd_rel -> relkind == RELKIND_PARTITIONED_TABLE &&
754
+ subclone != NIL )
607
755
{
608
- PartitionDesc partdesc = RelationGetPartitionDesc (rel );
756
+ PartitionDesc partdesc = RelationGetPartitionDesc (partRel );
609
757
int i ;
610
758
611
759
for (i = 0 ; i < partdesc -> nparts ; i ++ )
612
- CloneForeignKeyConstraints (RelationGetRelid (rel ),
613
- partdesc -> oids [i ],
614
- cloned );
760
+ {
761
+ Relation childRel ;
762
+
763
+ childRel = heap_open (partdesc -> oids [i ], AccessExclusiveLock );
764
+ clone_fk_constraints (pg_constraint ,
765
+ partRel ,
766
+ childRel ,
767
+ subclone ,
768
+ cloned );
769
+ heap_close (childRel , NoLock ); /* keep lock till commit */
770
+ }
615
771
}
616
-
617
- heap_close (rel , NoLock ); /* keep lock till commit */
618
- heap_close (parentRel , NoLock );
619
- heap_close (pg_constraint , RowShareLock );
620
772
}
621
773
622
774
/*
@@ -1028,17 +1180,33 @@ ConstraintSetParentConstraint(Oid childConstrId, Oid parentConstrId)
1028
1180
elog (ERROR , "cache lookup failed for constraint %u" , childConstrId );
1029
1181
newtup = heap_copytuple (tuple );
1030
1182
constrForm = (Form_pg_constraint ) GETSTRUCT (newtup );
1031
- constrForm -> conislocal = false;
1032
- constrForm -> coninhcount ++ ;
1033
- constrForm -> conparentid = parentConstrId ;
1034
- CatalogTupleUpdate (constrRel , & tuple -> t_self , newtup );
1035
- ReleaseSysCache (tuple );
1183
+ if (OidIsValid (parentConstrId ))
1184
+ {
1185
+ constrForm -> conislocal = false;
1186
+ constrForm -> coninhcount ++ ;
1187
+ constrForm -> conparentid = parentConstrId ;
1188
+
1189
+ CatalogTupleUpdate (constrRel , & tuple -> t_self , newtup );
1036
1190
1037
- ObjectAddressSet (referenced , ConstraintRelationId , parentConstrId );
1038
- ObjectAddressSet (depender , ConstraintRelationId , childConstrId );
1191
+ ObjectAddressSet (referenced , ConstraintRelationId , parentConstrId );
1192
+ ObjectAddressSet (depender , ConstraintRelationId , childConstrId );
1039
1193
1040
- recordDependencyOn (& depender , & referenced , DEPENDENCY_INTERNAL_AUTO );
1194
+ recordDependencyOn (& depender , & referenced , DEPENDENCY_INTERNAL_AUTO );
1195
+ }
1196
+ else
1197
+ {
1198
+ constrForm -> coninhcount -- ;
1199
+ if (constrForm -> coninhcount <= 0 )
1200
+ constrForm -> conislocal = true;
1201
+ constrForm -> conparentid = InvalidOid ;
1202
+
1203
+ deleteDependencyRecordsForClass (ConstraintRelationId , childConstrId ,
1204
+ ConstraintRelationId ,
1205
+ DEPENDENCY_INTERNAL_AUTO );
1206
+ CatalogTupleUpdate (constrRel , & tuple -> t_self , newtup );
1207
+ }
1041
1208
1209
+ ReleaseSysCache (tuple );
1042
1210
heap_close (constrRel , RowExclusiveLock );
1043
1211
}
1044
1212
0 commit comments