17
17
#include "access/htup_details.h"
18
18
#include "catalog/pg_user_mapping.h"
19
19
#include "access/xact.h"
20
+ #include "access/xtm.h"
21
+ #include "access/transam.h"
20
22
#include "mb/pg_wchar.h"
21
23
#include "miscadmin.h"
22
24
#include "pgstat.h"
@@ -72,12 +74,18 @@ static unsigned int prep_stmt_number = 0;
72
74
/* tracks whether any work is needed in callback functions */
73
75
static bool xact_got_connection = false;
74
76
77
+ typedef long long csn_t ;
78
+ static csn_t currentGlobalTransactionId = 0 ;
79
+ static int currentLocalTransactionId = 0 ;
80
+ static PGconn * currentConnection = NULL ;
81
+
75
82
/* prototypes of private functions */
76
83
static PGconn * connect_pg_server (ForeignServer * server , UserMapping * user );
77
84
static void disconnect_pg_server (ConnCacheEntry * entry );
78
85
static void check_conn_params (const char * * keywords , const char * * values );
79
86
static void configure_remote_session (PGconn * conn );
80
87
static void do_sql_command (PGconn * conn , const char * sql );
88
+ static void do_sql_send_command (PGconn * conn , const char * sql );
81
89
static void begin_remote_xact (ConnCacheEntry * entry );
82
90
static void pgfdw_xact_callback (XactEvent event , void * arg );
83
91
static void pgfdw_subxact_callback (SubXactEvent event ,
@@ -403,6 +411,19 @@ do_sql_command(PGconn *conn, const char *sql)
403
411
PQclear (res );
404
412
}
405
413
414
+ static void
415
+ do_sql_send_command (PGconn * conn , const char * sql )
416
+ {
417
+ if (PQsendQuery (conn , sql ) != PGRES_COMMAND_OK )
418
+ {
419
+ PGresult * res = PQgetResult (conn );
420
+
421
+ elog (WARNING , "Failed to send command %s" , sql );
422
+ pgfdw_report_error (ERROR , res , conn , true, sql );
423
+ PQclear (res );
424
+ }
425
+ }
426
+
406
427
/*
407
428
* Start remote transaction or subtransaction, if needed.
408
429
*
@@ -417,15 +438,26 @@ static void
417
438
begin_remote_xact (ConnCacheEntry * entry )
418
439
{
419
440
int curlevel = GetCurrentTransactionNestLevel ();
441
+ PGresult * res ;
442
+
420
443
421
444
/* Start main transaction if we haven't yet */
422
445
if (entry -> xact_depth <= 0 )
423
446
{
447
+ TransactionId gxid = GetTransactionManager ()-> GetGlobalTransactionId ();
424
448
const char * sql ;
425
449
426
450
elog (DEBUG3 , "starting remote transaction on connection %p" ,
427
451
entry -> conn );
428
452
453
+ if (TransactionIdIsValid (gxid ))
454
+ {
455
+ char stmt [64 ];
456
+ snprintf (stmt , sizeof (stmt ), "select public.dtm_join_transaction(%d)" , gxid );
457
+ res = PQexec (entry -> conn , stmt );
458
+ PQclear (res );
459
+ }
460
+
429
461
if (IsolationIsSerializable ())
430
462
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE" ;
431
463
else
@@ -434,6 +466,41 @@ begin_remote_xact(ConnCacheEntry *entry)
434
466
do_sql_command (entry -> conn , sql );
435
467
entry -> xact_depth = 1 ;
436
468
entry -> changing_xact_state = false;
469
+
470
+ if (UseTsDtmTransactions )
471
+ {
472
+ if (currentConnection == NULL )
473
+ {
474
+ currentConnection = entry -> conn ;
475
+ }
476
+ else if (entry -> conn != currentConnection )
477
+ {
478
+ if (!currentGlobalTransactionId )
479
+ {
480
+ char * resp ;
481
+ res = PQexec (currentConnection , psprintf ("SELECT public.dtm_extend('%d.%d')" ,
482
+ MyProcPid , ++ currentLocalTransactionId ));
483
+
484
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
485
+ {
486
+ pgfdw_report_error (ERROR , res , currentConnection , true, sql );
487
+ }
488
+ resp = PQgetvalue (res , 0 , 0 );
489
+ if (resp == NULL || (* resp ) == '\0' || sscanf (resp , "%lld" , & currentGlobalTransactionId ) != 1 )
490
+ {
491
+ pgfdw_report_error (ERROR , res , currentConnection , true, sql );
492
+ }
493
+ PQclear (res );
494
+ }
495
+ res = PQexec (entry -> conn , psprintf ("SELECT public.dtm_access(%llu, '%d.%d')" , currentGlobalTransactionId , MyProcPid , currentLocalTransactionId ));
496
+
497
+ if (PQresultStatus (res ) != PGRES_TUPLES_OK )
498
+ {
499
+ pgfdw_report_error (ERROR , res , entry -> conn , true, sql );
500
+ }
501
+ PQclear (res );
502
+ }
503
+ }
437
504
}
438
505
439
506
/*
@@ -643,6 +710,78 @@ pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
643
710
PQclear (res );
644
711
}
645
712
713
+ typedef bool (* DtmCommandResultHandler ) (PGresult * result , void * arg );
714
+
715
+ static bool
716
+ RunDtmStatement (char const * sql , unsigned expectedStatus , DtmCommandResultHandler handler , void * arg )
717
+ {
718
+ HASH_SEQ_STATUS scan ;
719
+ ConnCacheEntry * entry ;
720
+ bool allOk = true;
721
+
722
+ hash_seq_init (& scan , ConnectionHash );
723
+ while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
724
+ {
725
+ if (entry -> xact_depth > 0 )
726
+ {
727
+ do_sql_send_command (entry -> conn , sql );
728
+ }
729
+ }
730
+
731
+ hash_seq_init (& scan , ConnectionHash );
732
+ while ((entry = (ConnCacheEntry * ) hash_seq_search (& scan )))
733
+ {
734
+ if (entry -> xact_depth > 0 )
735
+ {
736
+ PGresult * result = PQgetResult (entry -> conn );
737
+
738
+ if (PQresultStatus (result ) != expectedStatus || (handler && !handler (result , arg )))
739
+ {
740
+ elog (WARNING , "Failed command %s: status=%d, expected status=%d" , sql , PQresultStatus (result ), expectedStatus );
741
+ pgfdw_report_error (ERROR , result , entry -> conn , true, sql );
742
+ allOk = false;
743
+ }
744
+ PQclear (result );
745
+ PQgetResult (entry -> conn ); /* consume NULL result */
746
+ }
747
+ }
748
+ return allOk ;
749
+ }
750
+
751
+ static bool
752
+ RunDtmCommand (char const * sql )
753
+ {
754
+ return RunDtmStatement (sql , PGRES_COMMAND_OK , NULL , NULL );
755
+ }
756
+
757
+ static bool
758
+ RunDtmFunction (char const * sql )
759
+ {
760
+ return RunDtmStatement (sql , PGRES_TUPLES_OK , NULL , NULL );
761
+ }
762
+
763
+
764
+ static bool
765
+ DtmMaxCSN (PGresult * result , void * arg )
766
+ {
767
+ char * resp = PQgetvalue (result , 0 , 0 );
768
+ csn_t * maxCSN = (csn_t * ) arg ;
769
+ csn_t csn = 0 ;
770
+
771
+ if (resp == NULL || (* resp ) == '\0' || sscanf (resp , "%lld" , & csn ) != 1 )
772
+ {
773
+ return false;
774
+ }
775
+ else
776
+ {
777
+ if (* maxCSN < csn )
778
+ {
779
+ * maxCSN = csn ;
780
+ }
781
+ return true;
782
+ }
783
+ }
784
+
646
785
/*
647
786
* pgfdw_xact_callback --- cleanup at main-transaction end.
648
787
*/
@@ -652,10 +791,55 @@ pgfdw_xact_callback(XactEvent event, void *arg)
652
791
HASH_SEQ_STATUS scan ;
653
792
ConnCacheEntry * entry ;
654
793
794
+ /* Do nothing for this events */
795
+ switch (event )
796
+ {
797
+ case XACT_EVENT_START :
798
+ case XACT_EVENT_COMMIT_PREPARED :
799
+ case XACT_EVENT_ABORT_PREPARED :
800
+ return ;
801
+ default :
802
+ break ;
803
+ }
804
+
655
805
/* Quick exit if no connections were touched in this transaction. */
656
806
if (!xact_got_connection )
657
807
return ;
658
808
809
+ if (currentGlobalTransactionId != 0 )
810
+ {
811
+ switch (event )
812
+ {
813
+ case XACT_EVENT_PARALLEL_PRE_COMMIT :
814
+ case XACT_EVENT_PRE_COMMIT :
815
+ {
816
+ csn_t maxCSN = 0 ;
817
+
818
+ if (!RunDtmCommand (psprintf ("PREPARE TRANSACTION '%d.%d'" ,
819
+ MyProcPid , currentLocalTransactionId )) ||
820
+ !RunDtmFunction (psprintf ("SELECT public.dtm_begin_prepare('%d.%d')" ,
821
+ MyProcPid , currentLocalTransactionId )) ||
822
+ !RunDtmStatement (psprintf ("SELECT public.dtm_prepare('%d.%d',0)" ,
823
+ MyProcPid , currentLocalTransactionId ), PGRES_TUPLES_OK , DtmMaxCSN , & maxCSN ) ||
824
+ !RunDtmFunction (psprintf ("SELECT public.dtm_end_prepare('%d.%d',%lld)" ,
825
+ MyProcPid , currentLocalTransactionId , maxCSN )) ||
826
+ !RunDtmCommand (psprintf ("COMMIT PREPARED '%d.%d'" ,
827
+ MyProcPid , currentLocalTransactionId )))
828
+ {
829
+ RunDtmCommand (psprintf ("ROLLBACK PREPARED '%d.%d'" ,
830
+ MyProcPid , currentLocalTransactionId ));
831
+ ereport (ERROR ,
832
+ (errcode (ERRCODE_TRANSACTION_ROLLBACK ),
833
+ errmsg ("transaction was aborted at one of the shards" )));
834
+ break ;
835
+ }
836
+ return ;
837
+ }
838
+ default :
839
+ break ;
840
+ }
841
+ }
842
+
659
843
/*
660
844
* Scan all connection cache entries to find open remote transactions, and
661
845
* close them.
@@ -689,10 +873,34 @@ pgfdw_xact_callback(XactEvent event, void *arg)
689
873
pgfdw_reject_incomplete_xact_state_change (entry );
690
874
691
875
/* Commit all remote transactions during pre-commit */
692
- entry -> changing_xact_state = true;
693
- do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
694
- entry -> changing_xact_state = false;
876
+ do_sql_send_command (entry -> conn , "COMMIT TRANSACTION" );
877
+ continue ;
878
+
879
+ case XACT_EVENT_PRE_PREPARE :
880
+
881
+ /*
882
+ * We disallow remote transactions that modified anything,
883
+ * since it's not very reasonable to hold them open until
884
+ * the prepared transaction is committed. For the moment,
885
+ * throw error unconditionally; later we might allow
886
+ * read-only cases. Note that the error will cause us to
887
+ * come right back here with event == XACT_EVENT_ABORT, so
888
+ * we'll clean up the connection state at that point.
889
+ */
890
+ ereport (ERROR ,
891
+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
892
+ errmsg ("cannot prepare a transaction that modified remote tables" )));
893
+ break ;
695
894
895
+ case XACT_EVENT_PARALLEL_COMMIT :
896
+ case XACT_EVENT_COMMIT :
897
+ case XACT_EVENT_PREPARE :
898
+ if (!currentGlobalTransactionId )
899
+ {
900
+ entry -> changing_xact_state = true;
901
+ do_sql_command (entry -> conn , "COMMIT TRANSACTION" );
902
+ entry -> changing_xact_state = false;
903
+ }
696
904
/*
697
905
* If there were any errors in subtransactions, and we
698
906
* made prepared statements, do a DEALLOCATE ALL to make
@@ -716,27 +924,7 @@ pgfdw_xact_callback(XactEvent event, void *arg)
716
924
entry -> have_prep_stmt = false;
717
925
entry -> have_error = false;
718
926
break ;
719
- case XACT_EVENT_PRE_PREPARE :
720
927
721
- /*
722
- * We disallow remote transactions that modified anything,
723
- * since it's not very reasonable to hold them open until
724
- * the prepared transaction is committed. For the moment,
725
- * throw error unconditionally; later we might allow
726
- * read-only cases. Note that the error will cause us to
727
- * come right back here with event == XACT_EVENT_ABORT, so
728
- * we'll clean up the connection state at that point.
729
- */
730
- ereport (ERROR ,
731
- (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
732
- errmsg ("cannot prepare a transaction that modified remote tables" )));
733
- break ;
734
- case XACT_EVENT_PARALLEL_COMMIT :
735
- case XACT_EVENT_COMMIT :
736
- case XACT_EVENT_PREPARE :
737
- /* Pre-commit should have closed the open transaction */
738
- elog (ERROR , "missed cleaning up connection during pre-commit" );
739
- break ;
740
928
case XACT_EVENT_PARALLEL_ABORT :
741
929
case XACT_EVENT_ABORT :
742
930
@@ -800,6 +988,11 @@ pgfdw_xact_callback(XactEvent event, void *arg)
800
988
/* Disarm changing_xact_state if it all worked. */
801
989
entry -> changing_xact_state = abort_cleanup_failure ;
802
990
break ;
991
+
992
+ case XACT_EVENT_START :
993
+ case XACT_EVENT_COMMIT_PREPARED :
994
+ case XACT_EVENT_ABORT_PREPARED :
995
+ break ;
803
996
}
804
997
}
805
998
@@ -818,16 +1011,22 @@ pgfdw_xact_callback(XactEvent event, void *arg)
818
1011
disconnect_pg_server (entry );
819
1012
}
820
1013
}
1014
+ if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT )
1015
+ {
1016
+ /*
1017
+ * Regardless of the event type, we can now mark ourselves as out of
1018
+ * the transaction. (Note: if we are here during PRE_COMMIT or
1019
+ * PRE_PREPARE, this saves a useless scan of the hashtable during
1020
+ * COMMIT or PREPARE.)
1021
+ */
1022
+ xact_got_connection = false;
821
1023
822
- /*
823
- * Regardless of the event type, we can now mark ourselves as out of the
824
- * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
825
- * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
826
- */
827
- xact_got_connection = false;
1024
+ /* Also reset cursor numbering for next transaction */
1025
+ cursor_number = 0 ;
828
1026
829
- /* Also reset cursor numbering for next transaction */
830
- cursor_number = 0 ;
1027
+ currentGlobalTransactionId = 0 ;
1028
+ currentConnection = NULL ;
1029
+ }
831
1030
}
832
1031
833
1032
/*
0 commit comments