Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit a7c4878

Browse files
committed
trying to send utility_stmts on PrePrepare
1 parent 41c36e3 commit a7c4878

File tree

3 files changed

+60
-22
lines changed

3 files changed

+60
-22
lines changed

bgwpool.c

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ static void BgwPoolMainLoop(Datum arg)
2525
void* work;
2626

2727
BackgroundWorkerUnblockSignals();
28-
BackgroundWorkerInitializeConnection(pool->dbname, NULL);
28+
BackgroundWorkerInitializeConnection(pool->dbname, "stas");
2929

3030
while(true) {
3131
PGSemaphoreLock(&pool->available);
@@ -98,7 +98,7 @@ void BgwPoolStart(int nWorkers, BgwPoolConstructor constructor)
9898
worker.bgw_start_time = BgWorkerStart_ConsistentState;
9999
worker.bgw_main = BgwPoolMainLoop;
100100
worker.bgw_restart_time = MULTIMASTER_BGW_RESTART_TIMEOUT;
101-
101+
102102
for (i = 0; i < nWorkers; i++) {
103103
BgwPoolExecutorCtx* ctx = (BgwPoolExecutorCtx*)malloc(sizeof(BgwPoolExecutorCtx));
104104
snprintf(worker.bgw_name, BGW_MAXLEN, "bgw_pool_worker_%d", i+1);

multimaster.c

+40-12
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ char const* const MtmNodeStatusMnem[] =
197197

198198
bool MtmDoReplication;
199199
char* MtmDatabaseName;
200+
char* MtmUtilityStmt = NULL;
200201

201202
int MtmNodes;
202203
int MtmNodeId;
@@ -213,8 +214,6 @@ int MtmHeartbeatRecvTimeout;
213214
bool MtmUseRaftable;
214215
bool MtmUseDtm;
215216

216-
// static int reset_wrokers = 0;
217-
218217
static char* MtmConnStrs;
219218
static int MtmQueueSize;
220219
static int MtmWorkers;
@@ -693,6 +692,10 @@ static const char* const isoLevelStr[] =
693692
static void
694693
MtmBeginTransaction(MtmCurrentTrans* x)
695694
{
695+
if (MtmUtilityStmt)
696+
pfree(MtmUtilityStmt);
697+
MtmUtilityStmt = NULL;
698+
696699
if (x->snapshot == INVALID_CSN) {
697700
TransactionId xmin = (Mtm->gcCount >= MtmGcPeriod) ? PgGetOldestXmin(NULL, false) : InvalidTransactionId; /* Get oldest xmin outside critical section */
698701

@@ -3087,7 +3090,14 @@ MtmGenerateGid(char* gid)
30873090

30883091
static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
30893092
{
3090-
if (x->isDistributed && x->containsDML) {
3093+
if (MtmUtilityStmt && !MyXactAccessedTempRel)
3094+
{
3095+
MtmProcessDDLCommand(MtmUtilityStmt);
3096+
pfree(MtmUtilityStmt);
3097+
MtmUtilityStmt = NULL;
3098+
}
3099+
3100+
if (!x->isReplicated && (x->isDistributed && x->containsDML)) {
30913101
MtmGenerateGid(x->gid);
30923102
if (!x->isTransactionBlock) {
30933103
BeginTransactionBlock();
@@ -3118,6 +3128,9 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31183128
DestReceiver *dest, char *completionTag)
31193129
{
31203130
bool skipCommand = false;
3131+
3132+
// skipCommand = MyXactAccessedTempRel;
3133+
31213134
MTM_LOG3("%d: Process utility statement %s", MyProcPid, queryString);
31223135
switch (nodeTag(parsetree))
31233136
{
@@ -3198,12 +3211,12 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31983211
}
31993212
break;
32003213
case T_CreateTableAsStmt:
3201-
{
3202-
/* Do not replicate temp tables */
3203-
CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
3204-
skipCommand = stmt->into->rel->relpersistence == RELPERSISTENCE_TEMP ||
3205-
(stmt->into->rel->schemaname && strcmp(stmt->into->rel->schemaname, "pg_temp") == 0);
3206-
}
3214+
// {
3215+
// /* Do not replicate temp tables */
3216+
// CreateTableAsStmt *stmt = (CreateTableAsStmt *) parsetree;
3217+
// skipCommand = stmt->into->rel->relpersistence == RELPERSISTENCE_TEMP ||
3218+
// (stmt->into->rel->schemaname && strcmp(stmt->into->rel->schemaname, "pg_temp") == 0);
3219+
// }
32073220
break;
32083221
case T_CreateStmt:
32093222
{
@@ -3306,11 +3319,26 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
33063319
skipCommand = false;
33073320
break;
33083321
}
3309-
if (!skipCommand && !MtmTx.isReplicated && context == PROCESS_UTILITY_TOPLEVEL) {
3310-
if (MtmProcessDDLCommand(queryString)) {
3311-
return;
3322+
if (context == PROCESS_UTILITY_TOPLEVEL)
3323+
{
3324+
if (!skipCommand && !MtmTx.isReplicated) {
3325+
// if (MtmProcessDDLCommand(queryString)) {
3326+
// return;
3327+
// }
3328+
3329+
MemoryContext oldcontext;
3330+
3331+
if (MtmUtilityStmt)
3332+
pfree(MtmUtilityStmt);
3333+
3334+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3335+
MtmUtilityStmt = palloc(strlen(queryString) + 1);
3336+
MemoryContextSwitchTo(oldcontext);
3337+
3338+
strncpy(MtmUtilityStmt, queryString, strlen(queryString) + 1);
33123339
}
33133340
}
3341+
33143342
if (PreviousProcessUtilityHook != NULL)
33153343
{
33163344
PreviousProcessUtilityHook(parsetree, queryString, context,

pglogical_receiver.c

+18-8
Original file line numberDiff line numberDiff line change
@@ -468,14 +468,24 @@ pglogical_receiver_main(Datum main_arg)
468468
{
469469
stmt = copybuf + hdr_len;
470470

471-
if (buf.used >= MtmTransSpillThreshold*MB) {
472-
if (spill_file < 0) {
473-
int file_id;
474-
spill_file = MtmCreateSpillFile(nodeId, &file_id);
475-
pq_sendbyte(&spill_info, 'F');
476-
pq_sendint(&spill_info, nodeId, 4);
477-
pq_sendint(&spill_info, file_id, 4);
478-
}
471+
if (buf.used >= MtmTransSpillThreshold*MB) {
472+
if (spill_file < 0) {
473+
int file_id;
474+
spill_file = MtmCreateSpillFile(nodeId, &file_id);
475+
pq_sendbyte(&spill_info, 'F');
476+
pq_sendint(&spill_info, nodeId, 4);
477+
pq_sendint(&spill_info, file_id, 4);
478+
}
479+
ByteBufferAppend(&buf, ")", 1);
480+
pq_sendbyte(&spill_info, '(');
481+
pq_sendint(&spill_info, buf.used, 4);
482+
MtmSpillToFile(spill_file, buf.data, buf.used);
483+
ByteBufferReset(&buf);
484+
}
485+
ByteBufferAppend(&buf, stmt, rc - hdr_len);
486+
if (stmt[0] == 'C') /* commit|prepare */
487+
{
488+
if (spill_file >= 0) {
479489
ByteBufferAppend(&buf, ")", 1);
480490
pq_sendbyte(&spill_info, '(');
481491
pq_sendint(&spill_info, buf.used, 4);

0 commit comments

Comments
 (0)