Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0f36ea2

Browse files
knizhnikkelvich
authored andcommitted
Support remote functions
1 parent 238d9ee commit 0f36ea2

File tree

4 files changed

+100
-57
lines changed

4 files changed

+100
-57
lines changed

multimaster.c

Lines changed: 77 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@
6868
#include "parser/analyze.h"
6969
#include "parser/parse_relation.h"
7070
#include "parser/parse_type.h"
71+
#include "parser/parse_func.h"
7172
#include "catalog/pg_class.h"
7273
#include "catalog/pg_type.h"
7374
#include "tcop/pquery.h"
@@ -158,6 +159,7 @@ static void MtmInitializeSequence(int64* start, int64* step);
158159
static void* MtmCreateSavepointContext(void);
159160
static void MtmRestoreSavepointContext(void* ctx);
160161
static void MtmReleaseSavepointContext(void* ctx);
162+
static void MtmSetRemoteFunction(char const* list, void* extra);
161163

162164
static void MtmCheckClusterLock(void);
163165
static void MtmCheckSlots(void);
@@ -184,6 +186,7 @@ MtmConnectionInfo* MtmConnections;
184186

185187
HTAB* MtmXid2State;
186188
HTAB* MtmGid2State;
189+
static HTAB* MtmRemoteFunctions;
187190
static HTAB* MtmLocalTables;
188191

189192
static bool MtmIsRecoverySession;
@@ -258,6 +261,7 @@ bool MtmMajorNode;
258261
TransactionId MtmUtilityProcessedInXid;
259262

260263
static char* MtmConnStrs;
264+
static char* MtmRemoteFunctionsList;
261265
static char* MtmClusterName;
262266
static int MtmQueueSize;
263267
static int MtmWorkers;
@@ -2229,7 +2233,7 @@ MtmCreateLocalTableMap(void)
22292233
"MtmLocalTables",
22302234
MULTIMASTER_MAX_LOCAL_TABLES, MULTIMASTER_MAX_LOCAL_TABLES,
22312235
&info,
2232-
HASH_ELEM
2236+
HASH_ELEM | HASH_BLOBS
22332237
);
22342238
return htab;
22352239
}
@@ -2423,6 +2427,48 @@ MtmShmemStartup(void)
24232427
MtmInitialize();
24242428
}
24252429

2430+
static void MtmSetRemoteFunction(char const* list, void* extra)
2431+
{
2432+
if (MtmRemoteFunctions) {
2433+
hash_destroy(MtmRemoteFunctions);
2434+
MtmRemoteFunctions = NULL;
2435+
}
2436+
}
2437+
2438+
static void MtmInitializeRemoteFunctionsMap()
2439+
{
2440+
HASHCTL info;
2441+
char* p, *q;
2442+
int n_funcs = 1;
2443+
FuncCandidateList clist;
2444+
2445+
for (p = MtmRemoteFunctionsList; (q = strchr(p, ',')) != NULL; p = q + 1, n_funcs++);
2446+
2447+
Assert(MtmRemoteFunctions == NULL);
2448+
2449+
memset(&info, 0, sizeof(info));
2450+
info.entrysize = info.keysize = sizeof(Oid);
2451+
info.hcxt = TopMemoryContext;
2452+
MtmRemoteFunctions = hash_create("MtmRemoteFunctions", n_funcs, &info,
2453+
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
2454+
2455+
p = pstrdup(MtmRemoteFunctionsList);
2456+
do {
2457+
q = strchr(p, ',');
2458+
if (q != NULL) {
2459+
*q++ = '\0';
2460+
}
2461+
clist = FuncnameGetCandidates(stringToQualifiedNameList(p), -1, NIL, false, false, true);
2462+
if (clist == NULL) {
2463+
MTM_ELOG(ERROR, "Failed to lookup function %s", p);
2464+
} else if (clist->next != NULL) {
2465+
MTM_ELOG(ERROR, "Ambigious function %s", p);
2466+
}
2467+
hash_search(MtmRemoteFunctions, &clist->oid, HASH_ENTER, NULL);
2468+
p = q;
2469+
} while (p != NULL);
2470+
}
2471+
24262472
/*
24272473
* Parse node connection string.
24282474
* This function is called at cluster startup and while adding new cluster node
@@ -3052,6 +3098,19 @@ _PG_init(void)
30523098
NULL /* GucShowHook show_hook */
30533099
);
30543100

3101+
DefineCustomStringVariable(
3102+
"multimaster.remote_functions",
3103+
"List of fnuction names which should be executed remotely at all multimaster nodes instead of executing them at master and replicating result of their work",
3104+
NULL,
3105+
&MtmRemoteFunctionsList,
3106+
"lo_create,lo_unlink",
3107+
PGC_USERSET, /* context */
3108+
0, /* flags */
3109+
NULL, /* GucStringCheckHook check_hook */
3110+
MtmSetRemoteFunction, /* GucStringAssignHook assign_hook */
3111+
NULL /* GucShowHook show_hook */
3112+
);
3113+
30553114
DefineCustomStringVariable(
30563115
"multimaster.cluster_name",
30573116
"Name of the cluster",
@@ -3541,7 +3600,7 @@ lsn_t MtmGetFlushPosition(int nodeId)
35413600
* Keep track of progress of WAL writer.
35423601
* We need to notify WAL senders at other nodes which logical records
35433602
* are flushed to the disk and so can survive failure. In asynchronous commit mode
3544-
* WAL is flushed by WAL writer. Current flish position can be obtained by GetFlushRecPtr().
3603+
* WAL is flushed by WAL writer. Current flush position can be obtained by GetFlushRecPtr().
35453604
* So on applying new logical record we insert it in the MtmLsnMapping and compare
35463605
* their poistions in local WAL log with current flush position.
35473606
* The records which are flushed to the disk by WAL writer are removed from the list
@@ -4656,7 +4715,7 @@ char* MtmGucSerialize(void)
46564715
appendStringInfoString(serialized_gucs, " TO ");
46574716

46584717
/* quite a crutch */
4659-
if (strstr(cur_entry->key, "_mem") != NULL || *(cur_entry->value) == '\0')
4718+
if (strstr(cur_entry->key, "_mem") != NULL || *(cur_entry->value) == '\0' || strchr(cur_entry->value, ',') != NULL)
46604719
{
46614720
appendStringInfoString(serialized_gucs, "'");
46624721
appendStringInfoString(serialized_gucs, cur_entry->value);
@@ -4686,10 +4745,7 @@ static void MtmProcessDDLCommand(char const* queryString, bool transactional)
46864745
if (transactional)
46874746
{
46884747
char *gucCtx = MtmGucSerialize();
4689-
if (*gucCtx)
4690-
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s", gucCtx, queryString);
4691-
else
4692-
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s", queryString);
4748+
queryString = psprintf("RESET SESSION AUTHORIZATION; reset all; %s %s", gucCtx, queryString);
46934749

46944750
/* Transactional DDL */
46954751
MTM_LOG3("Sending DDL: %s", queryString);
@@ -5058,29 +5114,28 @@ static void MtmProcessUtility(Node *parsetree, const char *queryString,
50585114
static void
50595115
MtmExecutorStart(QueryDesc *queryDesc, int eflags)
50605116
{
5061-
bool ddl_generating_call = false;
5062-
ListCell *tlist;
5063-
5064-
foreach(tlist, queryDesc->plannedstmt->planTree->targetlist)
5117+
if (!MtmTx.isReplicated && ActivePortal)
50655118
{
5066-
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
5119+
ListCell *tlist;
50675120

5068-
if (tle->resname && strcmp(tle->resname, "lo_create") == 0)
5121+
if (!MtmRemoteFunctions)
50695122
{
5070-
ddl_generating_call = true;
5071-
break;
5123+
MtmInitializeRemoteFunctionsMap();
50725124
}
50735125

5074-
if (tle->resname && strcmp(tle->resname, "lo_unlink") == 0)
5126+
foreach(tlist, queryDesc->plannedstmt->planTree->targetlist)
50755127
{
5076-
ddl_generating_call = true;
5077-
break;
5128+
TargetEntry *tle = (TargetEntry *) lfirst(tlist);
5129+
if (tle->expr && IsA(tle->expr, FuncExpr))
5130+
{
5131+
if (hash_search(MtmRemoteFunctions, &((FuncExpr*)tle->expr)->funcid, HASH_FIND, NULL))
5132+
{
5133+
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5134+
break;
5135+
}
5136+
}
50785137
}
50795138
}
5080-
5081-
if (ddl_generating_call && !MtmTx.isReplicated)
5082-
MtmProcessDDLCommand(ActivePortal->sourceText, true);
5083-
50845139
if (PreviousExecutorStartHook != NULL)
50855140
PreviousExecutorStartHook(queryDesc, eflags);
50865141
else

multimaster.h

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -51,22 +51,22 @@
5151
fprintf(stderr, MTM_TAG "%s, %lld, %s, %d\n", tx->gid, (long long)MtmGetSystemTime(), event, MyProcPid)
5252
// #endif
5353

54-
#define MULTIMASTER_NAME "multimaster"
55-
#define MULTIMASTER_SCHEMA_NAME "mtm"
56-
#define MULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
57-
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
58-
#define MULTIMASTER_MIN_PROTO_VERSION 1
59-
#define MULTIMASTER_MAX_PROTO_VERSION 1
60-
#define MULTIMASTER_MAX_GID_SIZE 32
61-
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
62-
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
63-
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
64-
#define MULTIMASTER_MAX_LOCAL_TABLES 256
65-
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
66-
#define MULTIMASTER_LOCK_BUF_INIT_SIZE 4096
67-
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
68-
#define MULTIMASTER_ADMIN "mtm_admin"
69-
#define MULTIMASTER_PRECOMMITTED "precommitted"
54+
#define MULTIMASTER_NAME "multimaster"
55+
#define MULTIMASTER_SCHEMA_NAME "mtm"
56+
#define MULTIMASTER_LOCAL_TABLES_TABLE "local_tables"
57+
#define MULTIMASTER_SLOT_PATTERN "mtm_slot_%d"
58+
#define MULTIMASTER_MIN_PROTO_VERSION 1
59+
#define MULTIMASTER_MAX_PROTO_VERSION 1
60+
#define MULTIMASTER_MAX_GID_SIZE 32
61+
#define MULTIMASTER_MAX_SLOT_NAME_SIZE 16
62+
#define MULTIMASTER_MAX_CONN_STR_SIZE 128
63+
#define MULTIMASTER_MAX_HOST_NAME_SIZE 64
64+
#define MULTIMASTER_MAX_LOCAL_TABLES 256
65+
#define MULTIMASTER_MAX_CTL_STR_SIZE 256
66+
#define MULTIMASTER_LOCK_BUF_INIT_SIZE 4096
67+
#define MULTIMASTER_BROADCAST_SERVICE "mtm_broadcast"
68+
#define MULTIMASTER_ADMIN "mtm_admin"
69+
#define MULTIMASTER_PRECOMMITTED "precommitted"
7070

7171
#define MULTIMASTER_DEFAULT_ARBITER_PORT 5433
7272

pglogical_receiver.c

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -485,8 +485,8 @@ pglogical_receiver_main(Datum main_arg)
485485
{
486486
int64 now = feGetCurrentTimestamp();
487487

488-
/* Leave is feedback is not sent properly */
489488
MtmUpdateLsnMapping(nodeId, walEnd);
489+
/* Leave if feedback is not sent properly */
490490
if (!sendFeedback(conn, now, nodeId)) {
491491
goto OnError;
492492
}
@@ -635,7 +635,6 @@ pglogical_receiver_main(Datum main_arg)
635635
{
636636
int64 now = feGetCurrentTimestamp();
637637

638-
/* Leave is feedback is not sent properly */
639638
MtmUpdateLsnMapping(nodeId, INVALID_LSN);
640639
sendFeedback(conn, now, nodeId);
641640
}
@@ -731,4 +730,3 @@ void MtmStartReceivers(void)
731730
}
732731
}
733732
}
734-

pglogical_relid_map.c

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,19 @@ static void
2020
pglogical_relid_map_init(void)
2121
{
2222
HASHCTL ctl;
23-
int hash_flags = HASH_ELEM;
24-
2523
Assert(relid_map == NULL);
2624

2725
MemSet(&ctl, 0, sizeof(ctl));
2826
ctl.keysize = sizeof(Oid);
2927
ctl.entrysize = sizeof(PGLRelidMapEntry);
30-
31-
#if PG_VERSION_NUM >= 90500
32-
hash_flags |= HASH_BLOBS;
33-
#else
34-
ctl.hash = tag_hash;
35-
hash_flags |= HASH_FUNCTION;
36-
#endif
37-
38-
relid_map = hash_create("pglogical_relid_map", PGL_INIT_RELID_MAP_SIZE, &ctl, hash_flags);
28+
relid_map = hash_create("pglogical_relid_map", PGL_INIT_RELID_MAP_SIZE, &ctl, HASH_ELEM | HASH_BLOBS);
3929

4030
Assert(relid_map != NULL);
4131
}
4232

4333
Oid pglogical_relid_map_get(Oid relid)
4434
{
45-
if (relid_map != NULL) {
35+
if (relid_map != NULL) {
4636
PGLRelidMapEntry* entry = (PGLRelidMapEntry*)hash_search(relid_map, &relid, HASH_FIND, NULL);
4737
return entry ? entry->local_relid : InvalidOid;
4838
}
@@ -51,23 +41,23 @@ Oid pglogical_relid_map_get(Oid relid)
5141

5242
bool pglogical_relid_map_put(Oid remote_relid, Oid local_relid)
5343
{
54-
bool found;
44+
bool found;
5545
PGLRelidMapEntry* entry;
56-
if (relid_map == NULL) {
46+
if (relid_map == NULL) {
5747
pglogical_relid_map_init();
5848
}
5949
entry = hash_search(relid_map, &remote_relid, HASH_ENTER, &found);
6050
if (found) {
6151
entry->local_relid = local_relid;
62-
return false;
52+
return false;
6353
}
6454
entry->local_relid = local_relid;
6555
return true;
6656
}
6757

6858
void pglogical_relid_map_reset(void)
6959
{
70-
if (relid_map != NULL) {
60+
if (relid_map != NULL) {
7161
hash_destroy(relid_map);
7262
relid_map = NULL;
7363
}

0 commit comments

Comments
 (0)