Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a92b783

Browse files
arssherkelvich
authored andcommitted
Port Kostya's 2pc for postgres_fdw and triggers on fdw tables
on branch with isolation-related stuff.
1 parent d6b44da commit a92b783

File tree

13 files changed

+396
-227
lines changed

13 files changed

+396
-227
lines changed

contrib/postgres_fdw/connection.c

Lines changed: 142 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "access/transam.h"
2222
#include "access/xlog.h"
2323
#include "libpq-int.h"
24+
#include "access/xlog.h"
2425
#include "mb/pg_wchar.h"
2526
#include "miscadmin.h"
2627
#include "pgstat.h"
@@ -30,7 +31,6 @@
3031
#include "utils/memutils.h"
3132
#include "utils/syscache.h"
3233

33-
3434
/*
3535
* Connection cache hash table entry
3636
*
@@ -103,6 +103,55 @@ static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
103103
static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
104104
PGresult **result);
105105

106+
static int DistributedTransactionCount;
107+
static int DistributedTransactionParticipantsCount;
108+
static char* DistributedTransactionGid;
109+
110+
/* Parallel send of sql statement to all paritcipants nodes
111+
* and wait status
112+
*/
113+
static bool
114+
BroadcastStatement(char const * sql, unsigned expectedStatus)
115+
{
116+
HASH_SEQ_STATUS scan;
117+
ConnCacheEntry *entry;
118+
bool allOk = true;
119+
120+
hash_seq_init(&scan, ConnectionHash);
121+
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
122+
{
123+
if (entry->xact_depth > 0)
124+
{
125+
do_sql_send_command(entry->conn, sql);
126+
}
127+
}
128+
129+
hash_seq_init(&scan, ConnectionHash);
130+
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
131+
{
132+
if (entry->xact_depth > 0)
133+
{
134+
PGresult *result = PQgetResult(entry->conn);
135+
136+
if (PQresultStatus(result) != expectedStatus)
137+
{
138+
elog(WARNING, "Failed command %s: status=%d, expected status=%d", sql, PQresultStatus(result), expectedStatus);
139+
pgfdw_report_error(ERROR, result, entry->conn, true, sql);
140+
allOk = false;
141+
}
142+
PQclear(result);
143+
PQgetResult(entry->conn); /* consume NULL result */
144+
}
145+
}
146+
return allOk;
147+
}
148+
149+
static bool
150+
BroadcastCommand(char const * sql)
151+
{
152+
return BroadcastStatement(sql, PGRES_COMMAND_OK);
153+
}
154+
106155

107156
/*
108157
* Get a PGconn which can be used to execute queries on the remote PostgreSQL
@@ -457,31 +506,23 @@ begin_remote_xact(ConnCacheEntry *entry)
457506
/* Start main transaction if we haven't yet */
458507
if (entry->xact_depth <= 0)
459508
{
460-
TransactionId gxid = GetTransactionManager()->GetGlobalTransactionId();
461509
const char *sql;
462510

463511
elog(DEBUG3, "starting remote transaction on connection %p",
464512
entry->conn);
465513

466-
// XXXX?
467-
//
468-
// if (UseTsDtmTransactions && TransactionIdIsValid(gxid))
469-
// {
470-
// char stmt[64];
471-
// snprintf(stmt, sizeof(stmt), "select public.dtm_join_transaction(%d)", gxid);
472-
// res = PQexec(entry->conn, stmt);
473-
// PQclear(res);
474-
// }
475-
476514
if (IsolationIsSerializable())
477515
sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
478-
else
516+
else if (UseRepeatableRead)
479517
sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
518+
else
519+
sql = "START TRANSACTION";
480520
entry->changing_xact_state = true;
481521
do_sql_command(entry->conn, sql);
482522
entry->xact_depth = 1;
483523
entry->changing_xact_state = false;
484524

525+
485526
if (UseTsDtmTransactions)
486527
{
487528
if (currentConnection == NULL)
@@ -516,6 +557,8 @@ begin_remote_xact(ConnCacheEntry *entry)
516557
PQclear(res);
517558
}
518559
}
560+
561+
DistributedTransactionParticipantsCount += 1;
519562
}
520563

521564
/*
@@ -805,55 +848,19 @@ pgfdw_xact_callback(XactEvent event, void *arg)
805848
{
806849
HASH_SEQ_STATUS scan;
807850
ConnCacheEntry *entry;
808-
809-
// /* Do nothing for this events */
810-
// switch (event)
811-
// {
812-
// case XACT_EVENT_START:
813-
// case XACT_EVENT_COMMIT_PREPARED:
814-
// case XACT_EVENT_ABORT_PREPARED:
815-
// return;
816-
// default:
817-
// break;
818-
// }
851+
bool two_phase_commit;
819852

820853
/* Quick exit if no connections were touched in this transaction. */
821854
if (!xact_got_connection)
822855
return;
823856

824-
if (currentGlobalTransactionId != 0)
825-
{
826-
switch (event)
827-
{
828-
case XACT_EVENT_PARALLEL_PRE_COMMIT:
829-
case XACT_EVENT_PRE_COMMIT:
830-
{
831-
csn_t maxCSN = 0;
832-
833-
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
834-
MyProcPid, currentLocalTransactionId)) ||
835-
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_begin_prepare('%d.%d')",
836-
MyProcPid, currentLocalTransactionId)) ||
837-
!RunDtmStatement(psprintf("SELECT pg_global_snaphot_prepare('%d.%d',0)",
838-
MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) ||
839-
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_end_prepare('%d.%d',%lld)",
840-
MyProcPid, currentLocalTransactionId, maxCSN)) ||
841-
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
842-
MyProcPid, currentLocalTransactionId)))
843-
{
844-
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
845-
MyProcPid, currentLocalTransactionId));
846-
ereport(ERROR,
847-
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
848-
errmsg("transaction was aborted at one of the shards")));
849-
break;
850-
}
851-
return;
852-
}
853-
default:
854-
break;
855-
}
856-
}
857+
/***********************************************************************************************/
858+
859+
/* Check if we need to perform 2PC commit: number of paritcipants should be greater than 1 */
860+
two_phase_commit = Use2PC
861+
&& (TransactionIdIsValid(GetCurrentTransactionIdIfAny()) + DistributedTransactionParticipantsCount) > 1;
862+
863+
/***********************************************************************************************/
857864

858865
/*
859866
* Scan all connection cache entries to find open remote transactions, and
@@ -891,10 +898,9 @@ pgfdw_xact_callback(XactEvent event, void *arg)
891898
* we can't issue any more commands against it.
892899
*/
893900
pgfdw_reject_incomplete_xact_state_change(entry);
894-
895-
/* Commit all remote transactions during pre-commit */
896-
if (!currentGlobalTransactionId)
901+
if (!two_phase_commit && !currentGlobalTransactionId)
897902
{
903+
/* Commit all remote transactions during pre-commit */
898904
entry->changing_xact_state = true;
899905
do_sql_command(entry->conn, "COMMIT TRANSACTION");
900906
entry->changing_xact_state = false;
@@ -922,6 +928,13 @@ pgfdw_xact_callback(XactEvent event, void *arg)
922928
}
923929
entry->have_prep_stmt = false;
924930
entry->have_error = false;
931+
932+
if (two_phase_commit)
933+
{
934+
/* Do not reset xact_depth and break connection: we still need them for second phase
935+
*/
936+
continue;
937+
}
925938
break;
926939
case XACT_EVENT_PRE_PREPARE:
927940

@@ -1025,21 +1038,77 @@ pgfdw_xact_callback(XactEvent event, void *arg)
10251038
disconnect_pg_server(entry);
10261039
}
10271040
}
1028-
// if (event != XACT_EVENT_PARALLEL_PRE_COMMIT && event != XACT_EVENT_PRE_COMMIT)
1029-
// {
1030-
/*
1031-
* Regardless of the event type, we can now mark ourselves as out of the
1032-
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1033-
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1034-
*/
1035-
xact_got_connection = false;
10361041

1037-
/* Also reset cursor numbering for next transaction */
1038-
cursor_number = 0;
1042+
/***********************************************************************************************/
1043+
1044+
/*
1045+
* In case of 2PC broadcast PREPARE TRANSACTION statement.
1046+
* We are using BroadcasrCommand instead of sending them in the connection
1047+
* iterator above because we want to process them in parallel
1048+
*/
1049+
1050+
if (currentGlobalTransactionId != 0 &&
1051+
(event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT))
1052+
{
1053+
csn_t maxCSN = 0;
1054+
1055+
if (!RunDtmCommand(psprintf("PREPARE TRANSACTION '%d.%d'",
1056+
MyProcPid, currentLocalTransactionId)) ||
1057+
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_begin_prepare('%d.%d')",
1058+
MyProcPid, currentLocalTransactionId)) ||
1059+
!RunDtmStatement(psprintf("SELECT pg_global_snaphot_prepare('%d.%d',0)",
1060+
MyProcPid, currentLocalTransactionId), PGRES_TUPLES_OK, DtmMaxCSN, &maxCSN) ||
1061+
!RunDtmFunction(psprintf("SELECT pg_global_snaphot_end_prepare('%d.%d',%lld)",
1062+
MyProcPid, currentLocalTransactionId, maxCSN)) ||
1063+
!RunDtmCommand(psprintf("COMMIT PREPARED '%d.%d'",
1064+
MyProcPid, currentLocalTransactionId)))
1065+
{
1066+
RunDtmCommand(psprintf("ROLLBACK PREPARED '%d.%d'",
1067+
MyProcPid, currentLocalTransactionId));
1068+
ereport(ERROR,
1069+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
1070+
errmsg("transaction was aborted at one of the shards")));
1071+
}
1072+
return;
1073+
}
1074+
1075+
else if (two_phase_commit &&
1076+
(event == XACT_EVENT_PARALLEL_PRE_COMMIT || event == XACT_EVENT_PRE_COMMIT))
1077+
{
1078+
DistributedTransactionGid = psprintf("%d:%d:%lld:%lld:%d",
1079+
MyProcPid,
1080+
++DistributedTransactionCount,
1081+
(long long)GetSystemIdentifier(),
1082+
(long long)GetCurrentTransactionId(),
1083+
DistributedTransactionParticipantsCount);
1084+
if (!BroadcastCommand(psprintf("PREPARE TRANSACTION '%s'", DistributedTransactionGid)) ||
1085+
!BroadcastCommand(psprintf("COMMIT PREPARED '%s'", DistributedTransactionGid)))
1086+
{
1087+
BroadcastCommand(psprintf("ROLLBACK PREPARED '%s'", DistributedTransactionGid));
1088+
ereport(ERROR,
1089+
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
1090+
errmsg("Transaction %s was aborted at one of participants", DistributedTransactionGid)));
1091+
}
1092+
return;
1093+
}
1094+
1095+
1096+
/***********************************************************************************************/
1097+
1098+
/*
1099+
* Regardless of the event type, we can now mark ourselves as out of the
1100+
* transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1101+
* this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1102+
*/
1103+
xact_got_connection = false;
1104+
1105+
/* Also reset cursor numbering for next transaction */
1106+
cursor_number = 0;
10391107

1040-
currentGlobalTransactionId = 0;
1041-
currentConnection = NULL;
1042-
// }
1108+
DistributedTransactionParticipantsCount = 0;
1109+
DistributedTransactionGid = NULL;
1110+
currentGlobalTransactionId = 0;
1111+
currentConnection = NULL;
10431112
}
10441113

10451114
/*

contrib/postgres_fdw/expected/postgres_fdw.out

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7022,6 +7022,65 @@ update bar set f2 = f2 + 100 returning *;
70227022
7 | 277
70237023
(6 rows)
70247024

7025+
-- Test that UPDATE/DELETE with inherited target works with row-level triggers
7026+
CREATE TRIGGER trig_row_before
7027+
BEFORE UPDATE OR DELETE ON bar2
7028+
FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');
7029+
CREATE TRIGGER trig_row_after
7030+
AFTER UPDATE OR DELETE ON bar2
7031+
FOR EACH ROW EXECUTE PROCEDURE trigger_data(23,'skidoo');
7032+
explain (verbose, costs off)
7033+
update bar set f2 = f2 + 100;
7034+
QUERY PLAN
7035+
--------------------------------------------------------------------------------------
7036+
Update on public.bar
7037+
Update on public.bar
7038+
Foreign Update on public.bar2
7039+
Remote SQL: UPDATE public.loct2 SET f2 = $2 WHERE ctid = $1 RETURNING f1, f2, f3
7040+
-> Seq Scan on public.bar
7041+
Output: bar.f1, (bar.f2 + 100), bar.ctid
7042+
-> Foreign Scan on public.bar2
7043+
Output: bar2.f1, (bar2.f2 + 100), bar2.f3, bar2.ctid, bar2.*
7044+
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
7045+
(9 rows)
7046+
7047+
update bar set f2 = f2 + 100;
7048+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW UPDATE ON bar2
7049+
NOTICE: OLD: (3,333,33),NEW: (3,433,33)
7050+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW UPDATE ON bar2
7051+
NOTICE: OLD: (4,344,44),NEW: (4,444,44)
7052+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW UPDATE ON bar2
7053+
NOTICE: OLD: (7,277,77),NEW: (7,377,77)
7054+
NOTICE: trig_row_after(23, skidoo) AFTER ROW UPDATE ON bar2
7055+
NOTICE: OLD: (3,333,33),NEW: (3,433,33)
7056+
NOTICE: trig_row_after(23, skidoo) AFTER ROW UPDATE ON bar2
7057+
NOTICE: OLD: (4,344,44),NEW: (4,444,44)
7058+
NOTICE: trig_row_after(23, skidoo) AFTER ROW UPDATE ON bar2
7059+
NOTICE: OLD: (7,277,77),NEW: (7,377,77)
7060+
explain (verbose, costs off)
7061+
delete from bar where f2 < 400;
7062+
QUERY PLAN
7063+
---------------------------------------------------------------------------------------------
7064+
Delete on public.bar
7065+
Delete on public.bar
7066+
Foreign Delete on public.bar2
7067+
Remote SQL: DELETE FROM public.loct2 WHERE ctid = $1 RETURNING f1, f2, f3
7068+
-> Seq Scan on public.bar
7069+
Output: bar.ctid
7070+
Filter: (bar.f2 < 400)
7071+
-> Foreign Scan on public.bar2
7072+
Output: bar2.ctid, bar2.*
7073+
Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 WHERE ((f2 < 400)) FOR UPDATE
7074+
(10 rows)
7075+
7076+
delete from bar where f2 < 400;
7077+
NOTICE: trig_row_before(23, skidoo) BEFORE ROW DELETE ON bar2
7078+
NOTICE: OLD: (7,377,77)
7079+
NOTICE: trig_row_after(23, skidoo) AFTER ROW DELETE ON bar2
7080+
NOTICE: OLD: (7,377,77)
7081+
-- cleanup
7082+
DROP TRIGGER trig_row_before ON bar2;
7083+
DROP TRIGGER trig_row_after ON bar2;
70257084
drop table foo cascade;
70267085
NOTICE: drop cascades to foreign table foo2
70277086
drop table bar cascade;

0 commit comments

Comments
 (0)