8
8
*
9
9
*
10
10
* IDENTIFICATION
11
- * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.76 2001/01/24 19:42:48 momjian Exp $
11
+ * $Header: /cvsroot/pgsql/src/backend/access/nbtree/nbtinsert.c,v 1.77 2001/01/26 01:24:31 vadim Exp $
12
12
*
13
13
*-------------------------------------------------------------------------
14
14
*/
@@ -34,7 +34,9 @@ typedef struct
34
34
int best_delta ; /* best size delta so far */
35
35
} FindSplitData ;
36
36
37
- void _bt_newroot (Relation rel , Buffer lbuf , Buffer rbuf );
37
+ Buffer _bt_fixroot (Relation rel , Buffer oldrootbuf , bool release );
38
+
39
+ static Buffer _bt_newroot (Relation rel , Buffer lbuf , Buffer rbuf );
38
40
39
41
static TransactionId _bt_check_unique (Relation rel , BTItem btitem ,
40
42
Relation heapRel , Buffer buf ,
@@ -44,6 +46,8 @@ static InsertIndexResult _bt_insertonpg(Relation rel, Buffer buf,
44
46
int keysz , ScanKey scankey ,
45
47
BTItem btitem ,
46
48
OffsetNumber afteritem );
49
+ static void _bt_insertuple (Relation rel , Buffer buf ,
50
+ Size itemsz , BTItem btitem , OffsetNumber newitemoff );
47
51
static Buffer _bt_split (Relation rel , Buffer buf , OffsetNumber firstright ,
48
52
OffsetNumber newitemoff , Size newitemsz ,
49
53
BTItem newitem , bool newitemonleft ,
@@ -456,9 +460,14 @@ _bt_insertonpg(Relation rel,
456
460
457
461
if (is_root )
458
462
{
463
+ Buffer rootbuf ;
464
+
459
465
Assert (stack == (BTStack ) NULL );
460
466
/* create a new root node and release the split buffers */
461
- _bt_newroot (rel , buf , rbuf );
467
+ rootbuf = _bt_newroot (rel , buf , rbuf );
468
+ _bt_wrtbuf (rel , rootbuf );
469
+ _bt_wrtbuf (rel , rbuf );
470
+ _bt_wrtbuf (rel , buf );
462
471
}
463
472
else
464
473
{
@@ -519,52 +528,11 @@ _bt_insertonpg(Relation rel,
519
528
}
520
529
else
521
530
{
522
- START_CRIT_SECTION ();
523
- _bt_pgaddtup (rel , page , itemsz , btitem , newitemoff , "page" );
524
531
itup_off = newitemoff ;
525
532
itup_blkno = BufferGetBlockNumber (buf );
526
- /* XLOG stuff */
527
- {
528
- xl_btree_insert xlrec ;
529
- uint8 flag = XLOG_BTREE_INSERT ;
530
- XLogRecPtr recptr ;
531
- XLogRecData rdata [2 ];
532
- BTItemData truncitem ;
533
533
534
- xlrec .target .node = rel -> rd_node ;
535
- ItemPointerSet (& (xlrec .target .tid ), BufferGetBlockNumber (buf ), newitemoff );
536
- rdata [0 ].buffer = InvalidBuffer ;
537
- rdata [0 ].data = (char * )& xlrec ;
538
- rdata [0 ].len = SizeOfBtreeInsert ;
539
- rdata [0 ].next = & (rdata [1 ]);
540
-
541
- /* Read comments in _bt_pgaddtup */
542
- if (!(P_ISLEAF (lpageop )) & & newitemoff == P_FIRSTDATAKEY (lpageop ))
543
- {
544
- truncitem = * btitem ;
545
- truncitem .bti_itup .t_info = sizeof (BTItemData );
546
- rdata [1 ].data = (char * )& truncitem ;
547
- rdata [1 ].len = sizeof (BTItemData );
548
- }
549
- else
550
- {
551
- rdata [1 ].data = (char * )btitem ;
552
- rdata [1 ].len = IndexTupleDSize (btitem -> bti_itup ) +
553
- (sizeof (BTItemData ) - sizeof (IndexTupleData ));
554
- }
555
- rdata [1 ].buffer = buf ;
556
- rdata [1 ].next = NULL ;
534
+ _bt_insertuple (rel , buf , itemsz , btitem , newitemoff );
557
535
558
- if (P_ISLEAF (lpageop ))
559
- flag |= XLOG_BTREE_LEAF ;
560
-
561
- recptr = XLogInsert (RM_BTREE_ID , flag , rdata );
562
-
563
- PageSetLSN (page , recptr );
564
- PageSetSUI (page , ThisStartUpID );
565
- }
566
-
567
- END_CRIT_SECTION ();
568
536
/* Write out the updated page and release pin/lock */
569
537
_bt_wrtbuf (rel , buf );
570
538
}
@@ -576,6 +544,57 @@ _bt_insertonpg(Relation rel,
576
544
return res ;
577
545
}
578
546
547
+ static void
548
+ _bt_insertuple (Relation rel , Buffer buf ,
549
+ Size itemsz , BTItem btitem , OffsetNumber newitemoff )
550
+ {
551
+ Page page = BufferGetPage (buf );
552
+ BTPageOpaque pageop = (BTPageOpaque ) PageGetSpecialPointer (page );
553
+
554
+ START_CRIT_SECTION ();
555
+ _bt_pgaddtup (rel , page , itemsz , btitem , newitemoff , "page" );
556
+ /* XLOG stuff */
557
+ {
558
+ xl_btree_insert xlrec ;
559
+ uint8 flag = XLOG_BTREE_INSERT ;
560
+ XLogRecPtr recptr ;
561
+ XLogRecData rdata [2 ];
562
+ BTItemData truncitem ;
563
+ xlrec .target .node = rel -> rd_node ;
564
+ ItemPointerSet (& (xlrec .target .tid ), BufferGetBlockNumber (buf ), newitemoff );
565
+ rdata [0 ].buffer = InvalidBuffer ;
566
+ rdata [0 ].data = (char * )& xlrec ;
567
+ rdata [0 ].len = SizeOfBtreeInsert ;
568
+ rdata [0 ].next = & (rdata [1 ]);
569
+
570
+ /* Read comments in _bt_pgaddtup */
571
+ if (!(P_ISLEAF (pageop )) & & newitemoff == P_FIRSTDATAKEY (pageop ))
572
+ {
573
+ truncitem = * btitem ;
574
+ truncitem .bti_itup .t_info = sizeof (BTItemData );
575
+ rdata [1 ].data = (char * )& truncitem ;
576
+ rdata [1 ].len = sizeof (BTItemData );
577
+ }
578
+ else
579
+ {
580
+ rdata [1 ].data = (char * )btitem ;
581
+ rdata [1 ].len = IndexTupleDSize (btitem -> bti_itup ) +
582
+ (sizeof (BTItemData ) - sizeof (IndexTupleData ));
583
+ }
584
+ rdata [1 ].buffer = buf ;
585
+ rdata [1 ].next = NULL ;
586
+ if (P_ISLEAF (pageop ))
587
+ flag |= XLOG_BTREE_LEAF ;
588
+
589
+ recptr = XLogInsert (RM_BTREE_ID , flag , rdata );
590
+
591
+ PageSetLSN (page , recptr );
592
+ PageSetSUI (page , ThisStartUpID );
593
+ }
594
+
595
+ END_CRIT_SECTION ();
596
+ }
597
+
579
598
/*
580
599
* _bt_split() -- split a page in the btree.
581
600
*
@@ -1130,11 +1149,12 @@ _bt_getstackbuf(Relation rel, BTStack stack)
1130
1149
* graph.
1131
1150
*
1132
1151
* On entry, lbuf (the old root) and rbuf (its new peer) are write-
1133
- * locked. On exit, a new root page exists with entries for the
1134
- * two new children. The new root page is neither pinned nor locked, and
1135
- * we have also written out lbuf and rbuf and dropped their pins/locks.
1152
+ * locked. On exit, a new root page exists with entries for the
1153
+ * two new children, metapage is updated and unlocked/unpinned.
1154
+ * The new root buffer is returned to caller which has to unlock/unpin
1155
+ * lbuf, rbuf & rootbuf.
1136
1156
*/
1137
- void
1157
+ static Buffer
1138
1158
_bt_newroot (Relation rel , Buffer lbuf , Buffer rbuf )
1139
1159
{
1140
1160
Buffer rootbuf ;
@@ -1257,13 +1277,156 @@ _bt_newroot(Relation rel, Buffer lbuf, Buffer rbuf)
1257
1277
}
1258
1278
END_CRIT_SECTION ();
1259
1279
1260
- /* write and let go of the new root buffer */
1261
- _bt_wrtbuf (rel , rootbuf );
1280
+ /* write and let go of metapage buffer */
1262
1281
_bt_wrtbuf (rel , metabuf );
1263
1282
1264
- /* update and release new sibling, and finally the old root */
1265
- _bt_wrtbuf (rel , rbuf );
1266
- _bt_wrtbuf (rel , lbuf );
1283
+ return (rootbuf );
1284
+ }
1285
+
1286
+ /*
1287
+ * In the event old root page was splitted but no new one was created we
1288
+ * build required parent levels keeping write lock on old root page.
1289
+ * Note: it's assumed that old root page' btpo_parent points to meta page,
1290
+ * ie not to parent page. On exit, new root page buffer is write locked.
1291
+ * If "release" is TRUE then oldrootbuf will be released immediately
1292
+ * after upper level is builded.
1293
+ */
1294
+ Buffer
1295
+ _bt_fixroot (Relation rel , Buffer oldrootbuf , bool release )
1296
+ {
1297
+ Buffer rootbuf ;
1298
+ BlockNumber rootblk ;
1299
+ Page rootpage ;
1300
+ XLogRecPtr rootLSN ;
1301
+ Page oldrootpage = BufferGetPage (oldrootbuf );
1302
+ BTPageOpaque oldrootopaque = (BTPageOpaque )
1303
+ PageGetSpecialPointer (oldrootpage );
1304
+ Buffer buf , leftbuf , rightbuf ;
1305
+ Page page , leftpage , rightpage ;
1306
+ BTPageOpaque opaque , leftopaque , rightopaque ;
1307
+ OffsetNumber newitemoff ;
1308
+ BTItem btitem , ritem ;
1309
+ Size itemsz ;
1310
+
1311
+ if (! P_LEFTMOST (oldrootopaque ) || P_RIGHTMOST (oldrootopaque ))
1312
+ elog (ERROR , "bt_fixroot: not valid old root page" );
1313
+
1314
+ /* Read right neighbor and create new root page*/
1315
+ leftbuf = _bt_getbuf (rel , oldrootopaque -> btpo_next , BT_WRITE );
1316
+ leftpage = BufferGetPage (leftbuf );
1317
+ leftopaque = (BTPageOpaque ) PageGetSpecialPointer (leftpage );
1318
+ rootbuf = _bt_newroot (rel , oldrootbuf , leftbuf );
1319
+ rootpage = BufferGetPage (rootbuf );
1320
+ rootLSN = PageGetLSN (rootpage );
1321
+ rootblk = BufferGetBlockNumber (rootbuf );
1322
+
1323
+ /*
1324
+ * Update LSN & StartUpID of old root buffer and its neighbor to
1325
+ * ensure that they will be written on disk after logging new
1326
+ * root creation. Unfortunately, for the moment (?) we do not
1327
+ * log this operation and so possibly break our rule to log entire
1328
+ * page content of first after checkpoint modification.
1329
+ */
1330
+ HOLD_INTERRUPTS ();
1331
+ oldrootopaque -> btpo_parent = rootblk ;
1332
+ leftopaque -> btpo_parent = rootblk ;
1333
+ PageSetLSN (oldrootpage , rootLSN );
1334
+ PageSetSUI (oldrootpage , ThisStartUpID );
1335
+ PageSetLSN (leftpage , rootLSN );
1336
+ PageSetSUI (leftpage , ThisStartUpID );
1337
+ RESUME_INTERRUPTS ();
1338
+
1339
+ /* parent page where to insert pointers */
1340
+ buf = rootbuf ;
1341
+ page = BufferGetPage (buf );
1342
+ opaque = (BTPageOpaque ) PageGetSpecialPointer (page );
1343
+
1344
+ /*
1345
+ * Now read other pages (if any) on level and add them to new root.
1346
+ * If concurrent process will split one of pages on this level then it
1347
+ * will notice either btpo_parent == metablock or btpo_parent == rootblk.
1348
+ * In first case it will give up its locks and try to lock leftmost page
1349
+ * buffer (oldrootbuf) to fix root - ie it will wait for us and let us
1350
+ * continue. In second case it will try to lock rootbuf keeping its locks
1351
+ * on buffers we already passed, also waiting for us. If we'll have to
1352
+ * unlock rootbuf (split it) and that process will have to split page
1353
+ * of new level we created (level of rootbuf) then it will wait while
1354
+ * we create upper level. Etc.
1355
+ */
1356
+ while (! P_RIGHTMOST (leftopaque ))
1357
+ {
1358
+ rightbuf = _bt_getbuf (rel , leftopaque -> btpo_next , BT_WRITE );
1359
+ rightpage = BufferGetPage (rightbuf );
1360
+ rightopaque = (BTPageOpaque ) PageGetSpecialPointer (rightpage );
1361
+
1362
+ /* Update LSN & StartUpID (see comments above) */
1363
+ HOLD_INTERRUPTS ();
1364
+ rightopaque -> btpo_parent = rootblk ;
1365
+ if (XLByteLT (PageGetLSN (rightpage ), rootLSN ))
1366
+ PageSetLSN (rightpage , rootLSN );
1367
+ PageSetSUI (rightpage , ThisStartUpID );
1368
+ RESUME_INTERRUPTS ();
1369
+
1370
+ ritem = (BTItem ) PageGetItem (leftpage , PageGetItemId (leftpage , P_HIKEY ));
1371
+ btitem = _bt_formitem (& (ritem -> bti_itup ));
1372
+ ItemPointerSet (& (btitem -> bti_itup .t_tid ), leftopaque -> btpo_next , P_HIKEY );
1373
+ itemsz = IndexTupleDSize (btitem -> bti_itup )
1374
+ + (sizeof (BTItemData ) - sizeof (IndexTupleData ));
1375
+ itemsz = MAXALIGN (itemsz );
1376
+
1377
+ newitemoff = OffsetNumberNext (PageGetMaxOffsetNumber (page ));
1378
+
1379
+ if (PageGetFreeSpace (page ) < itemsz )
1380
+ {
1381
+ Buffer newbuf ;
1382
+ OffsetNumber firstright ;
1383
+ OffsetNumber itup_off ;
1384
+ BlockNumber itup_blkno ;
1385
+ bool newitemonleft ;
1386
+
1387
+ firstright = _bt_findsplitloc (rel , page ,
1388
+ newitemoff , itemsz , & newitemonleft );
1389
+ newbuf = _bt_split (rel , buf , firstright ,
1390
+ newitemoff , itemsz , btitem , newitemonleft ,
1391
+ & itup_off , & itup_blkno );
1392
+ /* Keep lock on new "root" buffer ! */
1393
+ if (buf != rootbuf )
1394
+ _bt_relbuf (rel , buf , BT_WRITE );
1395
+ buf = newbuf ;
1396
+ page = BufferGetPage (buf );
1397
+ opaque = (BTPageOpaque ) PageGetSpecialPointer (page );
1398
+ }
1399
+ else
1400
+ _bt_insertuple (rel , buf , itemsz , btitem , newitemoff );
1401
+
1402
+ /* give up left buffer */
1403
+ _bt_relbuf (rel , leftbuf , BT_WRITE );
1404
+ leftbuf = rightbuf ;
1405
+ leftpage = rightpage ;
1406
+ leftopaque = rightopaque ;
1407
+ }
1408
+
1409
+ /* give up rightmost page buffer */
1410
+ _bt_relbuf (rel , leftbuf , BT_WRITE );
1411
+
1412
+ /*
1413
+ * Here we hold locks on old root buffer, new root buffer we've
1414
+ * created with _bt_newroot() - rootbuf, - and buf we've used
1415
+ * for last insert ops - buf. If rootbuf != buf then we have to
1416
+ * create at least one more level. And if "release" is TRUE
1417
+ * (ie we've already created some levels) then we give up
1418
+ * oldrootbuf.
1419
+ */
1420
+ if (release )
1421
+ _bt_relbuf (rel , oldrootbuf , BT_WRITE );
1422
+
1423
+ if (rootbuf != buf )
1424
+ {
1425
+ _bt_relbuf (rel , buf , BT_WRITE );
1426
+ return (_bt_fixroot (rel , rootbuf , true));
1427
+ }
1428
+
1429
+ return (rootbuf );
1267
1430
}
1268
1431
1269
1432
/*
0 commit comments