30
30
#include "utils/hsearch.h"
31
31
#include <utils/guc.h>
32
32
#include "utils/tqual.h"
33
+ #include "utils/builtins.h"
33
34
34
35
#define DTM_HASH_INIT_SIZE 1000000
35
36
#define INVALID_CID 0
@@ -83,7 +84,7 @@ typedef struct
83
84
#define DTM_TRACE (x )
84
85
/* #define DTM_TRACE(x) fprintf x */
85
86
86
- static shmem_startup_hook_type prev_shmem_startup_hook ;
87
+ // static shmem_startup_hook_type prev_shmem_startup_hook;
87
88
static HTAB * xid2status ;
88
89
static HTAB * gtid2xid ;
89
90
static DtmNodeState * local ;
@@ -126,7 +127,7 @@ void _PG_init(void);
126
127
void _PG_fini (void );
127
128
128
129
129
- static void dtm_shmem_startup (void );
130
+ // static void dtm_shmem_startup(void);
130
131
static void dtm_xact_callback (XactEvent event , void * arg );
131
132
static timestamp_t dtm_get_current_time ();
132
133
static void dtm_sleep (timestamp_t interval );
@@ -199,70 +200,6 @@ dtm_sync(cid_t global_cid)
199
200
return local_cid ;
200
201
}
201
202
202
- // void
203
- // _PG_init(void)
204
- // {
205
- // DTM_TRACE((stderr, "DTM_PG_init \n"));
206
-
207
- // /*
208
- // * In order to create our shared memory area, we have to be loaded via
209
- // * shared_preload_libraries. If not, fall out without hooking into any of
210
- // * the main system. (We don't throw error here because it seems useful to
211
- // * allow the pg_stat_statements functions to be created even when the
212
- // * module isn't active. The functions must protect themselves against
213
- // * being called then, however.)
214
- // */
215
- // if (!process_shared_preload_libraries_in_progress)
216
- // return;
217
-
218
- // RequestAddinShmemSpace(dtm_memsize());
219
-
220
- // DefineCustomIntVariable(
221
- // "dtm.vacuum_delay",
222
- // "Minimal age of records which can be vacuumed (seconds)",
223
- // NULL,
224
- // &DtmVacuumDelay,
225
- // 10,
226
- // 1,
227
- // INT_MAX,
228
- // PGC_BACKEND,
229
- // 0,
230
- // NULL,
231
- // NULL,
232
- // NULL
233
- // );
234
-
235
- // DefineCustomBoolVariable(
236
- // "dtm.record_commits",
237
- // "Store information about committed global transactions in pg_committed_xacts table",
238
- // NULL,
239
- // &DtmRecordCommits,
240
- // false,
241
- // PGC_BACKEND,
242
- // 0,
243
- // NULL,
244
- // NULL,
245
- // NULL
246
- // );
247
-
248
-
249
- // /*
250
- // * Install hooks.
251
- // */
252
- // prev_shmem_startup_hook = shmem_startup_hook;
253
- // shmem_startup_hook = dtm_shmem_startup;
254
- // }
255
-
256
- // /*
257
- // * Module unload callback
258
- // */
259
- // void
260
- // _PG_fini(void)
261
- // {
262
- // /* Uninstall hooks. */
263
- // shmem_startup_hook = prev_shmem_startup_hook;
264
- // }
265
-
266
203
/*
267
204
* Estimate shared memory space needed.
268
205
*/
@@ -277,23 +214,6 @@ GlobalSnapshotShmemSize(void)
277
214
return size ;
278
215
}
279
216
280
-
281
- /*
282
- * shmem_startup hook: allocate or attach to shared memory,
283
- * then load any pre-existing statistics from file.
284
- * Also create and load the query-texts file, which is expected to exist
285
- * (even if empty) while the module is enabled.
286
- */
287
- // static void
288
- // dtm_shmem_startup(void)
289
- // {
290
- // if (prev_shmem_startup_hook)
291
- // {
292
- // prev_shmem_startup_hook();
293
- // }
294
- // DtmInitialize();
295
- // }
296
-
297
217
static void
298
218
dtm_xact_callback (XactEvent event , void * arg )
299
219
{
@@ -332,20 +252,6 @@ dtm_xact_callback(XactEvent event, void *arg)
332
252
}
333
253
}
334
254
335
- /*
336
- * ***************************************************************************
337
- */
338
-
339
- // PG_MODULE_MAGIC;
340
-
341
- // PG_FUNCTION_INFO_V1(dtm_extend);
342
- // PG_FUNCTION_INFO_V1(dtm_access);
343
- // PG_FUNCTION_INFO_V1(dtm_begin_prepare);
344
- // PG_FUNCTION_INFO_V1(dtm_prepare);
345
- // PG_FUNCTION_INFO_V1(dtm_end_prepare);
346
- // PG_FUNCTION_INFO_V1(dtm_get_csn);
347
-
348
-
349
255
/*
350
256
* ***************************************************************************
351
257
*/
@@ -459,7 +365,7 @@ Snapshot
459
365
DtmGetSnapshot (Snapshot snapshot )
460
366
{
461
367
snapshot = PgGetSnapshotData (snapshot );
462
- // RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid(RecentGlobalDataXmin);
368
+ RecentGlobalDataXmin = RecentGlobalXmin = DtmAdjustOldestXid (RecentGlobalDataXmin );
463
369
return snapshot ;
464
370
}
465
371
@@ -468,7 +374,7 @@ DtmGetOldestXmin(Relation rel, int flags)
468
374
{
469
375
TransactionId xmin = PgGetOldestXmin (rel , flags );
470
376
471
- // xmin = DtmAdjustOldestXid(xmin);
377
+ xmin = DtmAdjustOldestXid (xmin );
472
378
return xmin ;
473
379
}
474
380
@@ -581,7 +487,8 @@ DtmLocalBegin(DtmCurrentTrans * x)
581
487
if (!TransactionIdIsValid (x -> xid ))
582
488
{
583
489
SpinLockAcquire (& local -> lock );
584
- x -> xid = GetCurrentTransactionIdIfAny ();
490
+ // x->xid = GetCurrentTransactionIdIfAny();
491
+ x -> xid = GetCurrentTransactionId ();
585
492
// Assert(TransactionIdIsValid(x->xid));
586
493
x -> cid = INVALID_CID ;
587
494
x -> is_global = false;
@@ -662,7 +569,6 @@ DtmLocalBeginPrepare(GlobalTransactionId gtid)
662
569
663
570
id = (DtmTransId * ) hash_search (gtid2xid , gtid , HASH_FIND , NULL );
664
571
Assert (id != NULL );
665
- id -> xid = GetCurrentTransactionId ();
666
572
Assert (TransactionIdIsValid (id -> xid ));
667
573
ts = (DtmTransStatus * ) hash_search (xid2status , & id -> xid , HASH_ENTER , NULL );
668
574
ts -> status = TRANSACTION_STATUS_IN_PROGRESS ;
@@ -706,7 +612,6 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
706
612
int i ;
707
613
708
614
id = (DtmTransId * ) hash_search (gtid2xid , gtid , HASH_FIND , NULL );
709
- Assert (id != NULL );
710
615
711
616
ts = (DtmTransStatus * ) hash_search (xid2status , & id -> xid , HASH_FIND , NULL );
712
617
Assert (ts != NULL );
@@ -748,6 +653,9 @@ DtmLocalEndPrepare(GlobalTransactionId gtid, cid_t cid)
748
653
void
749
654
DtmLocalCommitPrepared (DtmCurrentTrans * x )
750
655
{
656
+ // if (!x->is_global)
657
+ // return;
658
+
751
659
Assert (x -> gtid != NULL );
752
660
753
661
SpinLockAcquire (& local -> lock );
@@ -772,6 +680,9 @@ DtmLocalCommitPrepared(DtmCurrentTrans * x)
772
680
void
773
681
DtmLocalCommit (DtmCurrentTrans * x )
774
682
{
683
+ // if (!x->is_global)
684
+ // return;
685
+
775
686
SpinLockAcquire (& local -> lock );
776
687
if (TransactionIdIsValid (x -> xid ))
777
688
{
@@ -816,8 +727,8 @@ DtmLocalCommit(DtmCurrentTrans * x)
816
727
void
817
728
DtmLocalAbortPrepared (DtmCurrentTrans * x )
818
729
{
819
- if (x -> is_global )
820
- return ;
730
+ // if (! x->is_global)
731
+ // return;
821
732
822
733
Assert (x -> gtid != NULL );
823
734
@@ -826,15 +737,10 @@ DtmLocalAbortPrepared(DtmCurrentTrans * x)
826
737
DtmTransId * id = (DtmTransId * ) hash_search (gtid2xid , x -> gtid , HASH_REMOVE , NULL );
827
738
828
739
Assert (id != NULL );
829
-
830
- if (id != NULL )
831
- {
832
- x -> is_global = true;
833
- x -> is_prepared = true;
834
- x -> xid = id -> xid ;
835
- free (id -> subxids );
836
- }
837
-
740
+ x -> is_global = true;
741
+ x -> is_prepared = true;
742
+ x -> xid = id -> xid ;
743
+ free (id -> subxids );
838
744
DTM_TRACE ((stderr , "Global transaction %u(%s) is preaborted\n" , x -> xid , gtid ));
839
745
}
840
746
SpinLockRelease (& local -> lock );
@@ -846,8 +752,8 @@ DtmLocalAbortPrepared(DtmCurrentTrans * x)
846
752
void
847
753
DtmLocalAbort (DtmCurrentTrans * x )
848
754
{
849
- if (x -> is_global )
850
- return ;
755
+ // if (! x->is_global)
756
+ // return;
851
757
852
758
SpinLockAcquire (& local -> lock );
853
759
{
@@ -942,6 +848,8 @@ DtmGetCsn(TransactionId xid)
942
848
void
943
849
DtmLocalSavePreparedState (DtmCurrentTrans * x )
944
850
{
851
+ // x->is_prepared = true;
852
+
945
853
if (x -> gtid [0 ])
946
854
{
947
855
SpinLockAcquire (& local -> lock );
@@ -999,7 +907,7 @@ DtmAddSubtransactions(DtmTransStatus * ts, TransactionId *subxids, int nSubxids)
999
907
Datum
1000
908
pg_global_snaphot_create (PG_FUNCTION_ARGS )
1001
909
{
1002
- GlobalTransactionId gtid = PG_GETARG_CSTRING ( 0 );
910
+ GlobalTransactionId gtid = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) );
1003
911
cid_t cid = DtmLocalExtend (& dtm_tx , gtid );
1004
912
1005
913
DTM_TRACE ((stderr , "Backend %d extends transaction %u(%s) to global with cid=%lu\n" , getpid (), dtm_tx .xid , gtid , cid ));
@@ -1010,7 +918,7 @@ Datum
1010
918
pg_global_snaphot_join (PG_FUNCTION_ARGS )
1011
919
{
1012
920
cid_t cid = PG_GETARG_INT64 (0 );
1013
- GlobalTransactionId gtid = PG_GETARG_CSTRING ( 1 );
921
+ GlobalTransactionId gtid = text_to_cstring ( PG_GETARG_TEXT_PP ( 1 ) );
1014
922
1015
923
DTM_TRACE ((stderr , "Backend %d joins transaction %u(%s) with cid=%lu\n" , getpid (), dtm_tx .xid , gtid , cid ));
1016
924
cid = DtmLocalAccess (& dtm_tx , gtid , cid );
@@ -1020,7 +928,7 @@ pg_global_snaphot_join(PG_FUNCTION_ARGS)
1020
928
Datum
1021
929
pg_global_snaphot_begin_prepare (PG_FUNCTION_ARGS )
1022
930
{
1023
- GlobalTransactionId gtid = PG_GETARG_CSTRING ( 0 );
931
+ GlobalTransactionId gtid = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) );
1024
932
1025
933
DtmLocalBeginPrepare (gtid );
1026
934
DTM_TRACE ((stderr , "Backend %d begins prepare of transaction %s\n" , getpid (), gtid ));
@@ -1030,7 +938,7 @@ pg_global_snaphot_begin_prepare(PG_FUNCTION_ARGS)
1030
938
Datum
1031
939
pg_global_snaphot_prepare (PG_FUNCTION_ARGS )
1032
940
{
1033
- GlobalTransactionId gtid = PG_GETARG_CSTRING ( 0 );
941
+ GlobalTransactionId gtid = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) );
1034
942
cid_t cid = PG_GETARG_INT64 (1 );
1035
943
1036
944
cid = DtmLocalPrepare (gtid , cid );
@@ -1041,7 +949,7 @@ pg_global_snaphot_prepare(PG_FUNCTION_ARGS)
1041
949
Datum
1042
950
pg_global_snaphot_end_prepare (PG_FUNCTION_ARGS )
1043
951
{
1044
- GlobalTransactionId gtid = PG_GETARG_CSTRING ( 0 );
952
+ GlobalTransactionId gtid = text_to_cstring ( PG_GETARG_TEXT_PP ( 0 ) );
1045
953
cid_t cid = PG_GETARG_INT64 (1 );
1046
954
1047
955
DTM_TRACE ((stderr , "Backend %d ends prepare of transactions %s with cid=%lu\n" , getpid (), gtid , cid ));
0 commit comments