@@ -50,10 +50,11 @@ extern void _PG_fini(void);
50
50
extern void lsm3_merger_main (Datum arg );
51
51
52
52
/* Lsm3 dictionary (hashtable with control data for all indexes) */
53
- static Lsm3DictEntry * Lsm3Entry ;
54
53
static HTAB * Lsm3Dict ;
55
54
static LWLock * Lsm3DictLock ;
56
55
static List * Lsm3ReleasedLocks ;
56
+ static List * Lsm3Entries ;
57
+ static bool Lsm3InsideCopy ;
57
58
58
59
/* Kind of relation optioms for Lsm3 index */
59
60
static relopt_kind Lsm3ReloptKind ;
@@ -423,11 +424,20 @@ lsm3_build(Relation heap, Relation index, IndexInfo *indexInfo)
423
424
Oid save_am = index -> rd_rel -> relam ;
424
425
IndexBuildResult * result ;
425
426
bool found ;
426
- LWLockAcquire (Lsm3DictLock , LW_EXCLUSIVE ); /* Obtain exclusive lock on dictionary: it will be released in utility hook */
427
- Lsm3Entry = hash_search (Lsm3Dict , & RelationGetRelid (index ), HASH_ENTER , & found ); /* Setting Lsm3Entry indicates to utility hook that Lsm3 index was created */
427
+ Lsm3DictEntry * entry ;
428
+ if (Lsm3Entries == NULL ) { // multiple indexes can be rebuilt by truncate
429
+ LWLockAcquire (Lsm3DictLock , LW_EXCLUSIVE ); /* Obtain exclusive lock on dictionary: it will be released in utility hook */
430
+ }
431
+ elog (LOG , "lsm3_build %s" , index -> rd_rel -> relname .data );
432
+ entry = hash_search (Lsm3Dict , & RelationGetRelid (index ), HASH_ENTER , & found ); /* Setting Lsm3Entry indicates to utility hook that Lsm3 index was created */
428
433
if (!found )
429
434
{
430
- lsm3_init_entry (Lsm3Entry , index );
435
+ lsm3_init_entry (entry , index );
436
+ }
437
+ {
438
+ MemoryContext old_context = MemoryContextSwitchTo (TopMemoryContext );
439
+ Lsm3Entries = lappend (Lsm3Entries , entry );
440
+ MemoryContextSwitchTo (old_context );
431
441
}
432
442
index -> rd_rel -> relam = BTREE_AM_OID ;
433
443
result = btbuild (heap , index , indexInfo );
@@ -436,6 +446,25 @@ lsm3_build(Relation heap, Relation index, IndexInfo *indexInfo)
436
446
return result ;
437
447
}
438
448
449
+ /*
450
+ * Grab previously release self locks (to let merger to proceed).
451
+ */
452
+ static void
453
+ lsm3_reacquire_locks (void )
454
+ {
455
+ if (Lsm3ReleasedLocks )
456
+ {
457
+ ListCell * cell ;
458
+ foreach (cell , Lsm3ReleasedLocks )
459
+ {
460
+ Oid indexOid = lfirst_oid (cell );
461
+ LockRelationOid (indexOid , RowExclusiveLock );
462
+ }
463
+ list_free (Lsm3ReleasedLocks );
464
+ Lsm3ReleasedLocks = NULL ;
465
+ }
466
+ }
467
+
439
468
/* Insert in active top index, on overflow swap active indexes and initiate merge to base index */
440
469
static bool
441
470
lsm3_insert (Relation rel , Datum * values , bool * isnull ,
@@ -475,7 +504,7 @@ lsm3_insert(Relation rel, Datum *values, bool *isnull,
475
504
index -> rd_rel -> relam = save_am ;
476
505
477
506
overflow = !entry -> merge_in_progress /* do not check for overflow if merge was already initiated */
478
- && (entry -> n_inserts % LSM3_CHECK_TOP_INDEX_SIZE_PERIOD ) == 0 /* perform check only each N-th insert */
507
+ && (entry -> n_inserts % LSM3_CHECK_TOP_INDEX_SIZE_PERIOD ) == 0 /* perform check only each N-th insert */
479
508
&& RelationGetNumberOfBlocks (index )* (BLCKSZ /1024 ) > top_index_size ;
480
509
481
510
SpinLockAcquire (& entry -> spinlock );
@@ -499,8 +528,18 @@ lsm3_insert(Relation rel, Datum *values, bool *isnull,
499
528
/* Holding lock on non-ative index prevent merger bgworker from truncation this index */
500
529
if (LockHeldByMe (& tag , RowExclusiveLock ))
501
530
{
502
- LockRelease (& tag , RowExclusiveLock , false);
503
- Lsm3ReleasedLocks = lappend_oid (Lsm3ReleasedLocks , entry -> top [1 - active_index ]);
531
+ /* Copy locks all indexes and hold this locks until end of copy.
532
+ * We can not just release lock, because otherwise CopyFrom produces
533
+ * "you don't own a lock of type" warning.
534
+ * So just try to periodically release this lock and let merger grab it.
535
+ */
536
+ if (!Lsm3InsideCopy ||
537
+ (entry -> n_inserts % LSM3_CHECK_TOP_INDEX_SIZE_PERIOD ) == 0 ) /* release lock only each N-th insert */
538
+
539
+ {
540
+ LockRelease (& tag , RowExclusiveLock , false);
541
+ Lsm3ReleasedLocks = lappend_oid (Lsm3ReleasedLocks , entry -> top [1 - active_index ]);
542
+ }
504
543
}
505
544
506
545
/* If all inserts in previous active index are completed then we can start merge */
@@ -516,6 +555,12 @@ lsm3_insert(Relation rel, Datum *values, bool *isnull,
516
555
}
517
556
SpinLockRelease (& entry -> spinlock );
518
557
558
+ /* We have to require released locks because othervise CopyFrom will produce warning */
559
+ if (Lsm3InsideCopy && Lsm3ReleasedLocks )
560
+ {
561
+ pg_usleep (1 ); /* give merge thread a chance to grab the lock before we require it */
562
+ lsm3_reacquire_locks ();
563
+ }
519
564
return false;
520
565
}
521
566
@@ -849,7 +894,8 @@ lsm3_process_utility(PlannedStmt *plannedStmt,
849
894
List * drop_oids = NULL ;
850
895
ListCell * cell ;
851
896
852
- Lsm3Entry = NULL ; /* Reset entry to check it after utility statement execution */
897
+ Lsm3Entries = NULL ; /* Reset entry to check it after utility statement execution */
898
+ Lsm3InsideCopy = false;
853
899
if (IsA (parseTree , DropStmt ))
854
900
{
855
901
drop = (DropStmt * )parseTree ;
@@ -880,6 +926,10 @@ lsm3_process_utility(PlannedStmt *plannedStmt,
880
926
}
881
927
}
882
928
}
929
+ else if (IsA (parseTree , CopyStmt ))
930
+ {
931
+ Lsm3InsideCopy = true;
932
+ }
883
933
884
934
(PreviousProcessUtilityHook ? PreviousProcessUtilityHook : standard_ProcessUtility )
885
935
(plannedStmt ,
@@ -890,23 +940,26 @@ lsm3_process_utility(PlannedStmt *plannedStmt,
890
940
destReceiver ,
891
941
completionTag );
892
942
893
- if (Lsm3Entry )
943
+ if (Lsm3Entries )
894
944
{
895
- if ( IsA ( parseTree , IndexStmt )) /* This is Lsm3 creation statement */
945
+ foreach ( cell , Lsm3Entries )
896
946
{
897
- IndexStmt * stmt = (IndexStmt * )parseTree ;
898
- char * originIndexName = stmt -> idxname ;
899
- char * originAccessMethod = stmt -> accessMethod ;
900
-
901
- for (int i = 0 ; i < 2 ; i ++ )
947
+ Lsm3DictEntry * entry = (Lsm3DictEntry * )lfirst (cell );
948
+ if (IsA (parseTree , IndexStmt )) /* This is Lsm3 creation statement */
902
949
{
903
- if (stmt -> concurrent )
950
+ IndexStmt * stmt = (IndexStmt * )parseTree ;
951
+ char * originIndexName = stmt -> idxname ;
952
+ char * originAccessMethod = stmt -> accessMethod ;
953
+
954
+ for (int i = 0 ; i < 2 ; i ++ )
904
955
{
905
- PushActiveSnapshot (GetTransactionSnapshot ());
906
- }
907
- stmt -> accessMethod = "lsm3_btree_wrapper" ;
908
- stmt -> idxname = psprintf ("%s_top%d" , get_rel_name (Lsm3Entry -> base ), i );
909
- Lsm3Entry -> top [i ] = DefineIndex (Lsm3Entry -> heap ,
956
+ if (stmt -> concurrent )
957
+ {
958
+ PushActiveSnapshot (GetTransactionSnapshot ());
959
+ }
960
+ stmt -> accessMethod = "lsm3_btree_wrapper" ;
961
+ stmt -> idxname = psprintf ("%s_top%d" , get_rel_name (entry -> base ), i );
962
+ entry -> top [i ] = DefineIndex (entry -> heap ,
910
963
stmt ,
911
964
InvalidOid ,
912
965
InvalidOid ,
@@ -916,36 +969,39 @@ lsm3_process_utility(PlannedStmt *plannedStmt,
916
969
false,
917
970
false,
918
971
true).objectId ;
972
+ }
973
+ stmt -> accessMethod = originAccessMethod ;
974
+ stmt -> idxname = originIndexName ;
919
975
}
920
- stmt -> accessMethod = originAccessMethod ;
921
- stmt -> idxname = originIndexName ;
922
- }
923
- else
924
- {
925
- for (int i = 0 ; i < 2 ; i ++ )
976
+ else
926
977
{
927
- if ( Lsm3Entry -> top [ i ] == InvalidOid )
978
+ for ( int i = 0 ; i < 2 ; i ++ )
928
979
{
929
- char * topidxname = psprintf ("%s_top%d" , get_rel_name (Lsm3Entry -> base ), i );
930
- Lsm3Entry -> top [i ] = get_relname_relid (topidxname , get_rel_namespace (Lsm3Entry -> base ));
931
- if (Lsm3Entry -> top [i ] == InvalidOid )
980
+ if (entry -> top [i ] == InvalidOid )
932
981
{
933
- elog (ERROR , "Lsm3: failed to lookup %s index" , topidxname );
982
+ char * topidxname = psprintf ("%s_top%d" , get_rel_name (entry -> base ), i );
983
+ entry -> top [i ] = get_relname_relid (topidxname , get_rel_namespace (entry -> base ));
984
+ if (entry -> top [i ] == InvalidOid )
985
+ {
986
+ elog (ERROR , "Lsm3: failed to lookup %s index" , topidxname );
987
+ }
934
988
}
935
989
}
936
990
}
991
+ if (ActiveSnapshotSet ())
992
+ {
993
+ PopActiveSnapshot ();
994
+ }
995
+ CommitTransactionCommand ();
996
+ StartTransactionCommand ();
997
+ /* Mark top index as invalid to prevent planner from using it in queries */
998
+ for (int i = 0 ; i < 2 ; i ++ )
999
+ {
1000
+ index_set_state_flags (entry -> top [i ], INDEX_DROP_CLEAR_VALID );
1001
+ }
937
1002
}
938
- if (ActiveSnapshotSet ())
939
- {
940
- PopActiveSnapshot ();
941
- }
942
- CommitTransactionCommand ();
943
- StartTransactionCommand ();
944
- /* Mark top index as invalid to prevent planner from using it in queries */
945
- for (int i = 0 ; i < 2 ; i ++ )
946
- {
947
- index_set_state_flags (Lsm3Entry -> top [i ], INDEX_DROP_CLEAR_VALID );
948
- }
1003
+ list_free (Lsm3Entries );
1004
+ Lsm3Entries = NULL ;
949
1005
LWLockRelease (Lsm3DictLock ); /* Release lock set by lsm3_build */
950
1006
}
951
1007
else if (drop_objects )
@@ -961,23 +1017,14 @@ lsm3_process_utility(PlannedStmt *plannedStmt,
961
1017
}
962
1018
963
1019
/*
964
- * Executor sinish hookto reclaim released locks on non-active top indexes
1020
+ * Executor finish hook to reclaim released locks on non-active top indexes
965
1021
* to avoid "you don't own a lock of type RowExclusiveLock" warning
966
1022
*/
967
1023
static void
968
1024
lsm3_executor_finish (QueryDesc * queryDesc )
969
1025
{
970
- if (Lsm3ReleasedLocks )
971
- {
972
- ListCell * cell ;
973
- foreach (cell , Lsm3ReleasedLocks )
974
- {
975
- Oid indexOid = lfirst_oid (cell );
976
- LockRelationOid (indexOid , RowExclusiveLock );
977
- }
978
- list_free (Lsm3ReleasedLocks );
979
- Lsm3ReleasedLocks = NULL ;
980
- }
1026
+ lsm3_reacquire_locks ();
1027
+ Lsm3InsideCopy = false;
981
1028
if (PreviousExecutorFinish )
982
1029
PreviousExecutorFinish (queryDesc );
983
1030
else
0 commit comments