40
40
#include "access/valid.h"
41
41
#include "access/visibilitymap.h"
42
42
#include "access/xloginsert.h"
43
+ #include "catalog/pg_database.h"
44
+ #include "catalog/pg_database_d.h"
43
45
#include "commands/vacuum.h"
44
46
#include "pgstat.h"
45
47
#include "port/pg_bitutils.h"
@@ -57,6 +59,12 @@ static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
57
59
Buffer newbuf , HeapTuple oldtup ,
58
60
HeapTuple newtup , HeapTuple old_key_tuple ,
59
61
bool all_visible_cleared , bool new_all_visible_cleared );
62
+ #ifdef USE_ASSERT_CHECKING
63
+ static void check_lock_if_inplace_updateable_rel (Relation relation ,
64
+ ItemPointer otid ,
65
+ HeapTuple newtup );
66
+ static void check_inplace_rel_lock (HeapTuple oldtup );
67
+ #endif
60
68
static Bitmapset * HeapDetermineColumnsInfo (Relation relation ,
61
69
Bitmapset * interesting_cols ,
62
70
Bitmapset * external_cols ,
@@ -103,6 +111,8 @@ static HeapTuple ExtractReplicaIdentity(Relation relation, HeapTuple tp, bool ke
103
111
* heavyweight lock mode and MultiXactStatus values to use for any particular
104
112
* tuple lock strength.
105
113
*
114
+ * These interact with InplaceUpdateTupleLock, an alias for ExclusiveLock.
115
+ *
106
116
* Don't look at lockstatus/updstatus directly! Use get_mxact_status_for_lock
107
117
* instead.
108
118
*/
@@ -3189,6 +3199,10 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
3189
3199
(errcode (ERRCODE_INVALID_TRANSACTION_STATE ),
3190
3200
errmsg ("cannot update tuples during a parallel operation" )));
3191
3201
3202
+ #ifdef USE_ASSERT_CHECKING
3203
+ check_lock_if_inplace_updateable_rel (relation , otid , newtup );
3204
+ #endif
3205
+
3192
3206
/*
3193
3207
* Fetch the list of attributes to be checked for various operations.
3194
3208
*
@@ -4053,6 +4067,128 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup,
4053
4067
return TM_Ok ;
4054
4068
}
4055
4069
4070
+ #ifdef USE_ASSERT_CHECKING
4071
+ /*
4072
+ * Confirm adequate lock held during heap_update(), per rules from
4073
+ * README.tuplock section "Locking to write inplace-updated tables".
4074
+ */
4075
+ static void
4076
+ check_lock_if_inplace_updateable_rel (Relation relation ,
4077
+ ItemPointer otid ,
4078
+ HeapTuple newtup )
4079
+ {
4080
+ /* LOCKTAG_TUPLE acceptable for any catalog */
4081
+ switch (RelationGetRelid (relation ))
4082
+ {
4083
+ case RelationRelationId :
4084
+ case DatabaseRelationId :
4085
+ {
4086
+ LOCKTAG tuptag ;
4087
+
4088
+ SET_LOCKTAG_TUPLE (tuptag ,
4089
+ relation -> rd_lockInfo .lockRelId .dbId ,
4090
+ relation -> rd_lockInfo .lockRelId .relId ,
4091
+ ItemPointerGetBlockNumber (otid ),
4092
+ ItemPointerGetOffsetNumber (otid ));
4093
+ if (LockHeldByMe (& tuptag , InplaceUpdateTupleLock , false))
4094
+ return ;
4095
+ }
4096
+ break ;
4097
+ default :
4098
+ Assert (!IsInplaceUpdateRelation (relation ));
4099
+ return ;
4100
+ }
4101
+
4102
+ switch (RelationGetRelid (relation ))
4103
+ {
4104
+ case RelationRelationId :
4105
+ {
4106
+ /* LOCKTAG_TUPLE or LOCKTAG_RELATION ok */
4107
+ Form_pg_class classForm = (Form_pg_class ) GETSTRUCT (newtup );
4108
+ Oid relid = classForm -> oid ;
4109
+ Oid dbid ;
4110
+ LOCKTAG tag ;
4111
+
4112
+ if (IsSharedRelation (relid ))
4113
+ dbid = InvalidOid ;
4114
+ else
4115
+ dbid = MyDatabaseId ;
4116
+
4117
+ if (classForm -> relkind == RELKIND_INDEX )
4118
+ {
4119
+ Relation irel = index_open (relid , AccessShareLock );
4120
+
4121
+ SET_LOCKTAG_RELATION (tag , dbid , irel -> rd_index -> indrelid );
4122
+ index_close (irel , AccessShareLock );
4123
+ }
4124
+ else
4125
+ SET_LOCKTAG_RELATION (tag , dbid , relid );
4126
+
4127
+ if (!LockHeldByMe (& tag , ShareUpdateExclusiveLock , false) &&
4128
+ !LockHeldByMe (& tag , ShareRowExclusiveLock , true))
4129
+ elog (WARNING ,
4130
+ "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)" ,
4131
+ NameStr (classForm -> relname ),
4132
+ relid ,
4133
+ classForm -> relkind ,
4134
+ ItemPointerGetBlockNumber (otid ),
4135
+ ItemPointerGetOffsetNumber (otid ));
4136
+ }
4137
+ break ;
4138
+ case DatabaseRelationId :
4139
+ {
4140
+ /* LOCKTAG_TUPLE required */
4141
+ Form_pg_database dbForm = (Form_pg_database ) GETSTRUCT (newtup );
4142
+
4143
+ elog (WARNING ,
4144
+ "missing lock on database \"%s\" (OID %u) @ TID (%u,%u)" ,
4145
+ NameStr (dbForm -> datname ),
4146
+ dbForm -> oid ,
4147
+ ItemPointerGetBlockNumber (otid ),
4148
+ ItemPointerGetOffsetNumber (otid ));
4149
+ }
4150
+ break ;
4151
+ }
4152
+ }
4153
+
4154
+ /*
4155
+ * Confirm adequate relation lock held, per rules from README.tuplock section
4156
+ * "Locking to write inplace-updated tables".
4157
+ */
4158
+ static void
4159
+ check_inplace_rel_lock (HeapTuple oldtup )
4160
+ {
4161
+ Form_pg_class classForm = (Form_pg_class ) GETSTRUCT (oldtup );
4162
+ Oid relid = classForm -> oid ;
4163
+ Oid dbid ;
4164
+ LOCKTAG tag ;
4165
+
4166
+ if (IsSharedRelation (relid ))
4167
+ dbid = InvalidOid ;
4168
+ else
4169
+ dbid = MyDatabaseId ;
4170
+
4171
+ if (classForm -> relkind == RELKIND_INDEX )
4172
+ {
4173
+ Relation irel = index_open (relid , AccessShareLock );
4174
+
4175
+ SET_LOCKTAG_RELATION (tag , dbid , irel -> rd_index -> indrelid );
4176
+ index_close (irel , AccessShareLock );
4177
+ }
4178
+ else
4179
+ SET_LOCKTAG_RELATION (tag , dbid , relid );
4180
+
4181
+ if (!LockHeldByMe (& tag , ShareUpdateExclusiveLock , true))
4182
+ elog (WARNING ,
4183
+ "missing lock for relation \"%s\" (OID %u, relkind %c) @ TID (%u,%u)" ,
4184
+ NameStr (classForm -> relname ),
4185
+ relid ,
4186
+ classForm -> relkind ,
4187
+ ItemPointerGetBlockNumber (& oldtup -> t_self ),
4188
+ ItemPointerGetOffsetNumber (& oldtup -> t_self ));
4189
+ }
4190
+ #endif
4191
+
4056
4192
/*
4057
4193
* Check if the specified attribute's values are the same. Subroutine for
4058
4194
* HeapDetermineColumnsInfo.
@@ -6070,15 +6206,21 @@ heap_inplace_lock(Relation relation,
6070
6206
TM_Result result ;
6071
6207
bool ret ;
6072
6208
6209
+ #ifdef USE_ASSERT_CHECKING
6210
+ if (RelationGetRelid (relation ) == RelationRelationId )
6211
+ check_inplace_rel_lock (oldtup_ptr );
6212
+ #endif
6213
+
6073
6214
Assert (BufferIsValid (buffer ));
6074
6215
6216
+ LockTuple (relation , & oldtup .t_self , InplaceUpdateTupleLock );
6075
6217
LockBuffer (buffer , BUFFER_LOCK_EXCLUSIVE );
6076
6218
6077
6219
/*----------
6078
6220
* Interpret HeapTupleSatisfiesUpdate() like heap_update() does, except:
6079
6221
*
6080
6222
* - wait unconditionally
6081
- * - no tuple locks
6223
+ * - already locked tuple above, since inplace needs that unconditionally
6082
6224
* - don't recheck header after wait: simpler to defer to next iteration
6083
6225
* - don't try to continue even if the updater aborts: likewise
6084
6226
* - no crosscheck
@@ -6162,7 +6304,10 @@ heap_inplace_lock(Relation relation,
6162
6304
* don't bother optimizing that.
6163
6305
*/
6164
6306
if (!ret )
6307
+ {
6308
+ UnlockTuple (relation , & oldtup .t_self , InplaceUpdateTupleLock );
6165
6309
InvalidateCatalogSnapshot ();
6310
+ }
6166
6311
return ret ;
6167
6312
}
6168
6313
@@ -6171,6 +6316,8 @@ heap_inplace_lock(Relation relation,
6171
6316
*
6172
6317
* The tuple cannot change size, and therefore its header fields and null
6173
6318
* bitmap (if any) don't change either.
6319
+ *
6320
+ * Since we hold LOCKTAG_TUPLE, no updater has a local copy of this tuple.
6174
6321
*/
6175
6322
void
6176
6323
heap_inplace_update_and_unlock (Relation relation ,
@@ -6254,6 +6401,7 @@ heap_inplace_unlock(Relation relation,
6254
6401
HeapTuple oldtup , Buffer buffer )
6255
6402
{
6256
6403
LockBuffer (buffer , BUFFER_LOCK_UNLOCK );
6404
+ UnlockTuple (relation , & oldtup -> t_self , InplaceUpdateTupleLock );
6257
6405
}
6258
6406
6259
6407
#define FRM_NOOP 0x0001
0 commit comments