@@ -443,11 +443,12 @@ free_cps_slot(int code, Datum arg)
443
443
void
444
444
bgw_main_concurrent_part (Datum main_arg )
445
445
{
446
- int rows ;
446
+ ConcurrentPartSlot * part_slot ;
447
+ char * sql = NULL ;
448
+ int64 rows ;
447
449
bool failed ;
448
450
int failures_count = 0 ;
449
- char * sql = NULL ;
450
- ConcurrentPartSlot * part_slot ;
451
+ LOCKMODE lockmode = RowExclusiveLock ;
451
452
452
453
/* Update concurrent part slot */
453
454
part_slot = & concurrent_part_slots [DatumGetInt32 (main_arg )];
@@ -479,12 +480,14 @@ bgw_main_concurrent_part(Datum main_arg)
479
480
/* Do the job */
480
481
do
481
482
{
482
- MemoryContext old_mcxt ;
483
+ MemoryContext old_mcxt ;
483
484
484
485
Oid types [2 ] = { OIDOID , INT4OID };
485
486
Datum vals [2 ] = { part_slot -> relid , part_slot -> batch_size };
486
487
bool nulls [2 ] = { false, false };
487
488
489
+ bool rel_locked = false;
490
+
488
491
/* Reset loop variables */
489
492
failed = false;
490
493
rows = 0 ;
@@ -520,66 +523,92 @@ bgw_main_concurrent_part(Datum main_arg)
520
523
/* Exec ret = _partition_data_concurrent() */
521
524
PG_TRY ();
522
525
{
523
- /* Make sure that relation exists and has partitions */
524
- if (SearchSysCacheExists1 (RELOID , ObjectIdGetDatum (part_slot -> relid )) &&
525
- get_pathman_relation_info (part_slot -> relid ) != NULL )
526
- {
527
- int ret ;
528
- bool isnull ;
526
+ int ret ;
527
+ bool isnull ;
529
528
530
- ret = SPI_execute_with_args ( sql , 2 , types , vals , nulls , false, 0 );
531
- if (ret == SPI_OK_SELECT )
532
- {
533
- TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
534
- HeapTuple tuple = SPI_tuptable -> vals [ 0 ];
529
+ /* Lock relation for DELETE and INSERT */
530
+ if (! ConditionalLockRelationOid ( part_slot -> relid , lockmode ) )
531
+ {
532
+ elog ( ERROR , "could not take lock on relation %u" , part_slot -> relid ) ;
533
+ }
535
534
536
- Assert (SPI_processed == 1 ); /* there should be 1 result at most */
535
+ /* Great, now relation is locked */
536
+ rel_locked = true;
537
+ (void ) rel_locked ; /* mute clang analyzer */
537
538
538
- rows = DatumGetInt32 (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
539
+ /* Make sure that relation exists */
540
+ if (!SearchSysCacheExists1 (RELOID , ObjectIdGetDatum (part_slot -> relid )))
541
+ {
542
+ /* Exit after we raise ERROR */
543
+ failures_count = PART_WORKER_MAX_ATTEMPTS ;
544
+ (void ) failures_count ; /* mute clang analyzer */
539
545
540
- Assert (!isnull ); /* ... and ofc it must not be NULL */
541
- }
546
+ elog (ERROR , "relation %u does not exist" , part_slot -> relid );
542
547
}
543
- /* Otherwise it's time to exit */
544
- else
548
+
549
+ /* Make sure that relation has partitions */
550
+ if (get_pathman_relation_info (part_slot -> relid ) == NULL )
545
551
{
552
+ /* Exit after we raise ERROR */
546
553
failures_count = PART_WORKER_MAX_ATTEMPTS ;
554
+ (void ) failures_count ; /* mute clang analyzer */
555
+
556
+ elog (ERROR , "relation \"%s\" is not partitioned" ,
557
+ get_rel_name (part_slot -> relid ));
558
+ }
559
+
560
+ /* Call concurrent partitioning function */
561
+ ret = SPI_execute_with_args (sql , 2 , types , vals , nulls , false, 0 );
562
+ if (ret == SPI_OK_SELECT )
563
+ {
564
+ TupleDesc tupdesc = SPI_tuptable -> tupdesc ;
565
+ HeapTuple tuple = SPI_tuptable -> vals [0 ];
547
566
548
- elog (LOG , "relation \"%u\" is not partitioned (or does not exist)" ,
549
- part_slot -> relid );
567
+ /* There should be 1 result at most */
568
+ Assert (SPI_processed == 1 );
569
+
570
+ /* Extract number of processed rows */
571
+ rows = DatumGetInt64 (SPI_getbinval (tuple , tupdesc , 1 , & isnull ));
572
+ Assert (!isnull ); /* ... and ofc it must not be NULL */
550
573
}
574
+ /* Else raise generic error */
575
+ else elog (ERROR , "partitioning function returned %u" , ret );
576
+
577
+ /* Finally, unlock our partitioned table */
578
+ UnlockRelationOid (part_slot -> relid , lockmode );
551
579
}
552
580
PG_CATCH ();
553
581
{
554
582
/*
555
583
* The most common exception we can catch here is a deadlock with
556
584
* concurrent user queries. Check that attempts count doesn't exceed
557
- * some reasonable value
585
+ * some reasonable value.
558
586
*/
559
- ErrorData * error ;
560
- char * sleep_time_str ;
587
+ ErrorData * error ;
588
+
589
+ /* Unlock relation if we caught ERROR too early */
590
+ if (rel_locked )
591
+ UnlockRelationOid (part_slot -> relid , lockmode );
592
+
593
+ /* Increase number of failures and set 'failed' status */
594
+ failures_count ++ ;
595
+ failed = true;
561
596
562
597
/* Switch to the original context & copy edata */
563
598
MemoryContextSwitchTo (old_mcxt );
564
599
error = CopyErrorData ();
565
600
FlushErrorState ();
566
601
567
602
/* Print messsage for this BGWorker to server log */
568
- sleep_time_str = datum_to_cstring (Float8GetDatum (part_slot -> sleep_time ),
569
- FLOAT8OID );
570
- failures_count ++ ;
571
603
ereport (LOG ,
572
604
(errmsg ("%s: %s" , concurrent_part_bgw , error -> message ),
573
- errdetail ("attempt: %d/%d, sleep time: %s " ,
605
+ errdetail ("attempt: %d/%d, sleep time: %.2f " ,
574
606
failures_count ,
575
607
PART_WORKER_MAX_ATTEMPTS ,
576
- sleep_time_str )));
577
- pfree (sleep_time_str ); /* free the time string */
608
+ (float ) part_slot -> sleep_time )));
578
609
610
+ /* Finally, free error data */
579
611
FreeErrorData (error );
580
-
581
- /* Set 'failed' flag */
582
- failed = true;
583
612
}
584
613
PG_END_TRY ();
585
614
@@ -606,9 +635,10 @@ bgw_main_concurrent_part(Datum main_arg)
606
635
/* Failed this time, wait */
607
636
else if (failed )
608
637
{
609
- /* Abort transaction and sleep for a second */
638
+ /* Abort transaction */
610
639
AbortCurrentTransaction ();
611
640
641
+ /* Sleep for a specified amount of time (default 1s) */
612
642
DirectFunctionCall1 (pg_sleep , Float8GetDatum (part_slot -> sleep_time ));
613
643
}
614
644
@@ -626,8 +656,10 @@ bgw_main_concurrent_part(Datum main_arg)
626
656
627
657
#ifdef USE_ASSERT_CHECKING
628
658
/* Report debug message */
629
- elog (DEBUG1 , "%s: relocated %d rows, total: " UINT64_FORMAT " [%u]" ,
630
- concurrent_part_bgw , rows , part_slot -> total_rows , MyProcPid );
659
+ elog (DEBUG1 , "%s: "
660
+ "relocated" INT64_FORMAT "rows, "
661
+ "total: " INT64_FORMAT ,
662
+ concurrent_part_bgw , rows , part_slot -> total_rows );
631
663
#endif
632
664
}
633
665
@@ -636,9 +668,6 @@ bgw_main_concurrent_part(Datum main_arg)
636
668
break ;
637
669
}
638
670
while (rows > 0 || failed ); /* do while there's still rows to be relocated */
639
-
640
- /* Reclaim the resources */
641
- pfree (sql );
642
671
}
643
672
644
673
@@ -824,26 +853,33 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
824
853
/* Iterate through worker slots */
825
854
for (i = userctx -> cur_idx ; i < PART_WORKER_SLOTS ; i ++ )
826
855
{
827
- ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ];
856
+ ConcurrentPartSlot * cur_slot = & concurrent_part_slots [i ],
857
+ slot_copy ;
828
858
HeapTuple htup = NULL ;
829
859
830
- HOLD_INTERRUPTS ();
860
+ /* Copy slot to process local memory */
831
861
SpinLockAcquire (& cur_slot -> mutex );
862
+ memcpy (& slot_copy , cur_slot , sizeof (ConcurrentPartSlot ));
863
+ SpinLockRelease (& cur_slot -> mutex );
832
864
833
- if (cur_slot -> worker_status != CPS_FREE )
865
+ if (slot_copy . worker_status != CPS_FREE )
834
866
{
835
867
Datum values [Natts_pathman_cp_tasks ];
836
868
bool isnull [Natts_pathman_cp_tasks ] = { 0 };
837
869
838
- values [Anum_pathman_cp_tasks_userid - 1 ] = cur_slot -> userid ;
839
- values [Anum_pathman_cp_tasks_pid - 1 ] = cur_slot -> pid ;
840
- values [Anum_pathman_cp_tasks_dbid - 1 ] = cur_slot -> dbid ;
841
- values [Anum_pathman_cp_tasks_relid - 1 ] = cur_slot -> relid ;
842
- values [Anum_pathman_cp_tasks_processed - 1 ] = cur_slot -> total_rows ;
870
+ values [Anum_pathman_cp_tasks_userid - 1 ] = slot_copy .userid ;
871
+ values [Anum_pathman_cp_tasks_pid - 1 ] = slot_copy .pid ;
872
+ values [Anum_pathman_cp_tasks_dbid - 1 ] = slot_copy .dbid ;
873
+ values [Anum_pathman_cp_tasks_relid - 1 ] = slot_copy .relid ;
874
+
875
+ /* Record processed rows */
876
+ values [Anum_pathman_cp_tasks_processed - 1 ] =
877
+ /* FIXME: use Int64GetDatum() in release 1.5 */
878
+ Int32GetDatum ((int32 ) slot_copy .total_rows );
843
879
844
880
/* Now build a status string */
845
881
values [Anum_pathman_cp_tasks_status - 1 ] =
846
- CStringGetTextDatum (cps_print_status (cur_slot -> worker_status ));
882
+ CStringGetTextDatum (cps_print_status (slot_copy . worker_status ));
847
883
848
884
/* Form output tuple */
849
885
htup = heap_form_tuple (funcctx -> tuple_desc , values , isnull );
@@ -852,9 +888,6 @@ show_concurrent_part_tasks_internal(PG_FUNCTION_ARGS)
852
888
userctx -> cur_idx = i + 1 ;
853
889
}
854
890
855
- SpinLockRelease (& cur_slot -> mutex );
856
- RESUME_INTERRUPTS ();
857
-
858
891
/* Return tuple if needed */
859
892
if (htup )
860
893
SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (htup ));
0 commit comments