Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 41c36e3

Browse files
committed
logical messages for UtilityStmts; GUC context
1 parent 9c31d02 commit 41c36e3

File tree

5 files changed

+115
-81
lines changed

5 files changed

+115
-81
lines changed

multimaster.c

+52-63
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
#include "replication/walsender.h"
5050
#include "replication/walsender_private.h"
5151
#include "replication/slot.h"
52+
#include "replication/message.h"
5253
#include "port/atomics.h"
5354
#include "tcop/utility.h"
5455
#include "nodes/makefuncs.h"
@@ -235,8 +236,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
235236
ProcessUtilityContext context, ParamListInfo params,
236237
DestReceiver *dest, char *completionTag);
237238

238-
// static StringInfo MtmGUCBuffer;
239-
// static bool MtmGUCBufferAllocated = false;
239+
static StringInfo MtmGUCBuffer;
240+
static bool MtmGUCBufferAllocated = false;
240241

241242
/*
242243
* -------------------------------------------
@@ -3024,53 +3025,55 @@ static void MtmBroadcastUtilityStmt(char const* sql, bool ignoreError)
30243025
}
30253026
}
30263027

3027-
static bool MtmProcessDDLCommand(char const* queryString)
3028-
{
3029-
RangeVar *rv;
3030-
Relation rel;
3031-
TupleDesc tupDesc;
3032-
HeapTuple tup;
3033-
Datum values[Natts_mtm_ddl_log];
3034-
bool nulls[Natts_mtm_ddl_log];
3035-
TimestampTz ts = GetCurrentTimestamp();
3036-
3037-
rv = makeRangeVar("public", MULTIMASTER_DDL_TABLE, -1);
3038-
rel = heap_openrv_extended(rv, RowExclusiveLock, true);
3028+
static void MtmGUCBufferAppend(const char *gucQueryString){
30393029

3040-
if (rel == NULL) {
3041-
if (!MtmIsBroadcast()) {
3042-
MtmBroadcastUtilityStmt(queryString, false);
3043-
return true;
3044-
}
3045-
return false;
3030+
if (!MtmGUCBufferAllocated)
3031+
{
3032+
MemoryContext oldcontext;
3033+
oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3034+
MtmGUCBuffer = makeStringInfo();
3035+
MemoryContextSwitchTo(oldcontext);
3036+
MtmGUCBufferAllocated = true;
3037+
appendStringInfoString(MtmGUCBuffer, "RESET SESSION AUTHORIZATION; reset all;");
30463038
}
3047-
3048-
tupDesc = RelationGetDescr(rel);
30493039

3050-
/* Form a tuple. */
3051-
memset(nulls, false, sizeof(nulls));
3052-
3053-
values[Anum_mtm_ddl_log_issued - 1] = TimestampTzGetDatum(ts);
3054-
values[Anum_mtm_ddl_log_query - 1] = CStringGetTextDatum(queryString);
3040+
appendStringInfoString(MtmGUCBuffer, gucQueryString);
3041+
/* sometimes there is no ';' char at the end. */
3042+
// appendStringInfoString(MtmGUCBuffer, ";");
3043+
}
30553044

3056-
tup = heap_form_tuple(tupDesc, values, nulls);
3045+
static char * MtmGUCBufferGet(void){
3046+
if (!MtmGUCBufferAllocated)
3047+
MtmGUCBufferAppend("");
3048+
return MtmGUCBuffer->data;
3049+
}
30573050

3058-
/* Insert the tuple to the catalog. */
3059-
simple_heap_insert(rel, tup);
3051+
static bool MtmProcessDDLCommand(char const* queryString)
3052+
{
3053+
char *queryWithContext;
3054+
char *gucContext;
30603055

3061-
/* Update the indexes. */
3062-
CatalogUpdateIndexes(rel, tup);
3056+
/* Append global GUC to utility stmt. */
3057+
gucContext = MtmGUCBufferGet();
3058+
if (gucContext)
3059+
{
3060+
queryWithContext = palloc(strlen(gucContext) + strlen(queryString) + 1);
3061+
strcpy(queryWithContext, gucContext);
3062+
strcat(queryWithContext, queryString);
3063+
}
3064+
else
3065+
{
3066+
queryWithContext = (char *) queryString;
3067+
}
30633068

3064-
/* Cleanup. */
3065-
heap_freetuple(tup);
3066-
heap_close(rel, RowExclusiveLock);
3069+
MTM_LOG1("Sending utility: %s", queryWithContext);
3070+
LogLogicalMessage("MTM:GUC", queryWithContext, strlen(queryWithContext), true);
30673071

30683072
MtmTx.containsDML = true;
30693073
return false;
30703074
}
30713075

30723076

3073-
30743077
/*
30753078
* Genenerate global transaction identifier for two-pahse commit.
30763079
* It should be unique for all nodes
@@ -3170,43 +3173,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
31703173
DiscardStmt *stmt = (DiscardStmt *) parsetree;
31713174
skipCommand = stmt->target == DISCARD_TEMP;
31723175

3173-
// skipCommand = true;
3174-
3175-
// if (MtmGUCBufferAllocated)
3176-
// {
3177-
// // XXX: move allocation somewhere to backend startup and check
3178-
// // where buffer is empty in send routines.
3179-
// MtmGUCBufferAllocated = false;
3180-
// pfree(MtmGUCBuffer);
3181-
// }
3182-
3176+
if (!IsTransactionBlock())
3177+
{
3178+
skipCommand = true;
3179+
MtmGUCBufferAppend(queryString);
3180+
}
31833181
}
31843182
break;
31853183
case T_VariableSetStmt:
31863184
{
31873185
VariableSetStmt *stmt = (VariableSetStmt *) parsetree;
31883186

3189-
skipCommand = true;
3187+
// skipCommand = true;
31903188

31913189
/* Prevent SET TRANSACTION from replication */
31923190
if (stmt->kind == VAR_SET_MULTI)
3193-
// break;
31943191
skipCommand = true;
31953192

3196-
// if (!MtmGUCBufferAllocated)
3197-
// {
3198-
// MemoryContext oldcontext;
3199-
3200-
// oldcontext = MemoryContextSwitchTo(TopMemoryContext);
3201-
// MtmGUCBuffer = makeStringInfo();
3202-
// MemoryContextSwitchTo(oldcontext);
3203-
// MtmGUCBufferAllocated = true;
3204-
// }
3205-
3206-
// appendStringInfoString(MtmGUCBuffer, queryString);
3207-
3208-
// sometimes there is no ';' char at the end.
3209-
// appendStringInfoString(MtmGUCBuffer, ";");
3193+
if (!IsTransactionBlock())
3194+
{
3195+
skipCommand = true;
3196+
MtmGUCBufferAppend(queryString);
3197+
}
32103198
}
32113199
break;
32123200
case T_CreateTableAsStmt:
@@ -3232,7 +3220,8 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
32323220

32333221
viewParse = parse_analyze((Node *) copyObject(stmt->query),
32343222
queryString, NULL, 0);
3235-
skipCommand = isQueryUsingTempRelation(viewParse);
3223+
skipCommand = isQueryUsingTempRelation(viewParse) ||
3224+
stmt->view->relpersistence == RELPERSISTENCE_TEMP;
32363225
// ||
32373226
// (stmt->relation->schemaname && strcmp(stmt->relation->schemaname, "pg_temp") == 0);
32383227
}

pglogical_apply.c

+31-18
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ static void UserTableUpdateOpenIndexes(EState *estate, TupleTableSlot *slot);
7070
static void UserTableUpdateIndexes(EState *estate, TupleTableSlot *slot);
7171

7272
static void process_remote_begin(StringInfo s);
73+
static void process_remote_message(StringInfo s);
7374
static void process_remote_commit(StringInfo s);
7475
static void process_remote_insert(StringInfo s, Relation rel);
7576
static void process_remote_update(StringInfo s, Relation rel);
@@ -338,7 +339,31 @@ process_remote_begin(StringInfo s)
338339
StartTransactionCommand();
339340
MtmJoinTransaction(&gtid, snapshot);
340341

341-
MTM_LOG3("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
342+
MTM_LOG1("REMOTE begin node=%d xid=%d snapshot=%ld", gtid.node, gtid.xid, snapshot);
343+
}
344+
345+
static void
346+
process_remote_message(StringInfo s)
347+
{
348+
const char *stmt;
349+
int rc;
350+
351+
stmt = pq_getmsgstring(s);
352+
MTM_LOG1("utility: %s", stmt);
353+
MTM_LOG3("%d: Execute utility statement %s", MyProcPid, stmt);
354+
355+
SPI_connect();
356+
rc = SPI_execute(stmt, false, 0);
357+
SPI_finish();
358+
if (rc < 0)
359+
elog(ERROR, "Failed to execute utility statement %s", stmt);
360+
361+
//XXX: create messages for tables localization too.
362+
// if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
363+
// char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
364+
// char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
365+
// MtmMakeTableLocal(schema, name);
366+
// }
342367
}
343368

344369
static void
@@ -617,7 +642,6 @@ process_remote_insert(StringInfo s, Relation rel)
617642
TupleTableSlot *oldslot;
618643
ResultRelInfo *relinfo;
619644
ScanKey *index_keys;
620-
char* relname = RelationGetRelationName(rel);
621645
int i;
622646

623647
estate = create_rel_estate(rel);
@@ -700,22 +724,6 @@ process_remote_insert(StringInfo s, Relation rel)
700724
FreeExecutorState(estate);
701725

702726
CommandCounterIncrement();
703-
704-
if (strcmp(relname, MULTIMASTER_DDL_TABLE) == 0) {
705-
char* ddl = TextDatumGetCString(new_tuple.values[Anum_mtm_ddl_log_query-1]);
706-
int rc;
707-
SPI_connect();
708-
MTM_LOG3("%d: Execute utility statement %s", MyProcPid, ddl);
709-
rc = SPI_execute(ddl, false, 0);
710-
SPI_finish();
711-
if (rc < 0)
712-
elog(ERROR, "Failed to execute utility statement %s", ddl);
713-
} else if (strcmp(relname, MULTIMASTER_LOCAL_TABLES_TABLE) == 0) {
714-
char* schema = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_schema-1]);
715-
char* name = TextDatumGetCString(new_tuple.values[Anum_mtm_local_tables_rel_name-1]);
716-
MtmMakeTableLocal(schema, name);
717-
}
718-
719727
}
720728

721729
static void
@@ -999,6 +1007,11 @@ void MtmExecutor(int id, void* work, size_t size)
9991007
s.len = save_len;
10001008
continue;
10011009
}
1010+
case 'G':
1011+
{
1012+
process_remote_message(&s);
1013+
continue;
1014+
}
10021015
default:
10031016
elog(ERROR, "unknown action of type %c", action);
10041017
}

pglogical_output.c

+19
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333

3434
#include "replication/output_plugin.h"
3535
#include "replication/logical.h"
36+
#include "replication/message.h"
3637
#include "replication/origin.h"
3738

3839
#include "utils/builtins.h"
@@ -64,6 +65,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6465
static bool pg_decode_origin_filter(LogicalDecodingContext *ctx,
6566
RepOriginId origin_id);
6667

68+
static void pg_decode_message(LogicalDecodingContext *ctx,
69+
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
70+
bool transactional, const char *prefix,
71+
Size sz, const char *message);
72+
6773
static void send_startup_message(LogicalDecodingContext *ctx,
6874
PGLogicalOutputData *data, bool last_message);
6975

@@ -81,6 +87,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
8187
cb->commit_cb = pg_decode_commit_txn;
8288
cb->filter_by_origin_cb = pg_decode_origin_filter;
8389
cb->shutdown_cb = pg_decode_shutdown;
90+
cb->message_cb = pg_decode_message;
8491
}
8592

8693
static bool
@@ -499,6 +506,18 @@ pg_decode_origin_filter(LogicalDecodingContext *ctx,
499506
return false;
500507
}
501508

509+
static void
510+
pg_decode_message(LogicalDecodingContext *ctx,
511+
ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
512+
const char *prefix, Size sz, const char *message)
513+
{
514+
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
515+
516+
OutputPluginPrepareWrite(ctx, true);
517+
data->api->write_message(ctx->out, prefix, sz, message);
518+
OutputPluginWrite(ctx, true);
519+
}
520+
502521
static void
503522
send_startup_message(LogicalDecodingContext *ctx,
504523
PGLogicalOutputData *data, bool last_message)

pglogical_proto.c

+10
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ pglogical_write_begin(StringInfo out, PGLogicalOutputData *data,
119119
}
120120
}
121121

122+
static void
123+
pglogical_write_message(StringInfo out,
124+
const char *prefix, Size sz, const char *message)
125+
{
126+
pq_sendbyte(out, 'G');
127+
pq_sendbytes(out, message, sz);
128+
pq_sendbyte(out, '\0');
129+
}
130+
122131
/*
123132
* Write COMMIT to the output stream.
124133
*/
@@ -430,6 +439,7 @@ pglogical_init_api(PGLogicalProtoType typ)
430439
MTM_LOG1("%d: PRGLOGICAL init API for slot %s node %d", MyProcPid, MyReplicationSlot->data.name.data, MtmReplicationNodeId);
431440
res->write_rel = pglogical_write_rel;
432441
res->write_begin = pglogical_write_begin;
442+
res->write_message = pglogical_write_message;
433443
res->write_commit = pglogical_write_commit;
434444
res->write_insert = pglogical_write_insert;
435445
res->write_update = pglogical_write_update;

pglogical_proto.h

+3
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ typedef void (*pglogical_write_rel_fn)(StringInfo out, struct PGLogicalOutputDat
2121

2222
typedef void (*pglogical_write_begin_fn)(StringInfo out, struct PGLogicalOutputData *data,
2323
ReorderBufferTXN *txn);
24+
typedef void (*pglogical_write_message_fn)(StringInfo out,
25+
const char *prefix, Size sz, const char *message);
2426
typedef void (*pglogical_write_commit_fn)(StringInfo out, struct PGLogicalOutputData *data,
2527
ReorderBufferTXN *txn, XLogRecPtr commit_lsn);
2628

@@ -43,6 +45,7 @@ typedef struct PGLogicalProtoAPI
4345
{
4446
pglogical_write_rel_fn write_rel;
4547
pglogical_write_begin_fn write_begin;
48+
pglogical_write_message_fn write_message;
4649
pglogical_write_commit_fn write_commit;
4750
pglogical_write_origin_fn write_origin;
4851
pglogical_write_insert_fn write_insert;

0 commit comments

Comments
 (0)