Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit f1725c7

Browse files
committed
pg-11 compat
1 parent 98f044d commit f1725c7

9 files changed

+64
-48
lines changed

arbiter.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
#include "postgres.h"
2828
#include "fmgr.h"
2929
#include "miscadmin.h"
30-
#include "common/pg_socket.h"
30+
// #include "common/pg_socket.h"
3131
#include "postmaster/postmaster.h"
3232
#include "postmaster/bgworker.h"
3333
#include "storage/s_lock.h"
@@ -66,6 +66,7 @@
6666
#include "common/ip.h"
6767
#include "pgstat.h"
6868

69+
#include "compat.h"
6970

7071
#ifndef USE_EPOLL
7172
#ifdef __linux__
@@ -119,6 +120,7 @@ char const* const MtmMessageKindMnem[] =
119120

120121
static BackgroundWorker MtmSenderWorker = {
121122
"mtm-sender",
123+
"mtm",
122124
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
123125
BgWorkerStart_ConsistentState,
124126
MULTIMASTER_BGW_RESTART_TIMEOUT,
@@ -132,6 +134,7 @@ static BackgroundWorker MtmSenderWorker = {
132134

133135
static BackgroundWorker MtmRecevierWorker = {
134136
"mtm-receiver",
137+
"mtm",
135138
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
136139
BgWorkerStart_ConsistentState,
137140
MULTIMASTER_BGW_RESTART_TIMEOUT,
@@ -145,6 +148,7 @@ static BackgroundWorker MtmRecevierWorker = {
145148

146149
static BackgroundWorker MtmMonitorWorker = {
147150
"mtm-monitor",
151+
"mtm",
148152
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION,
149153
BgWorkerStart_ConsistentState,
150154
MULTIMASTER_BGW_RESTART_TIMEOUT,
@@ -763,7 +767,7 @@ void MtmSender(Datum arg)
763767
BackgroundWorkerUnblockSignals();
764768

765769
/* Connect to a database */
766-
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL);
770+
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL, 0);
767771

768772
/* Start heartbeat times */
769773
heartbeat_timer = RegisterTimeout(USER_TIMEOUT, MtmScheduleHeartbeat);
@@ -851,7 +855,7 @@ void MtmMonitor(Datum arg)
851855
BackgroundWorkerUnblockSignals();
852856

853857
/* Connect to a database */
854-
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL);
858+
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL, 0);
855859

856860
while (!stop) {
857861
int rc = WaitLatch(&MyProc->procLatch, WL_TIMEOUT | WL_POSTMASTER_DEATH, MtmHeartbeatRecvTimeout, PG_WAIT_EXTENSION);
@@ -898,7 +902,7 @@ void MtmReceiver(Datum arg)
898902
BackgroundWorkerUnblockSignals();
899903

900904
/* Connect to a database */
901-
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL);
905+
BackgroundWorkerInitializeConnection(MtmDatabaseName, NULL, 0);
902906

903907
MtmAcceptIncomingConnections();
904908

bgwpool.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ static void BgwPoolMainLoop(BgwPool* pool)
5050
pqsignal(SIGHUP, PostgresSigHupHandler);
5151

5252
BackgroundWorkerUnblockSignals();
53-
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser);
53+
BackgroundWorkerInitializeConnection(pool->dbname, pool->dbuser, 0);
5454
ActivePortal = &fakePortal;
5555
ActivePortal->status = PORTAL_ACTIVE;
5656
ActivePortal->sourceText = "";

compat.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
2+
#ifndef MTMCOMPAT_H
3+
#define MTMCOMPAT_H
4+
5+
/* Allow use of rdma functions on non-rdma enabled postgres */
6+
#define pg_socket(a, b, c, d) socket(a, b, c)
7+
#define pg_setsockopt(a, b, c, d, e, f) setsockopt(a, b, c, d, e)
8+
#define pg_getsockopt(a, b, c, d, e, f) getsockopt(a, b, c, d, e)
9+
#define pg_set_noblock(a, b) pg_set_noblock(a)
10+
#define pg_bind(a, b, c, d) bind(a, b, c)
11+
#define pg_listen(a, b, c) listen(a, b)
12+
#define pg_connect(a, b, c, d) connect(a, b, c)
13+
#define pg_accept(a, b, c, d) accept(a, b, c)
14+
#define pg_select(a, b, c, d, e, f) select(a, b, c, d, e)
15+
#define pg_send(a, b, c, d, e) send(a, b, c, d)
16+
#define pg_recv(a, b, c, d, e) recv(a, b, c, d)
17+
#define pg_closesocket(a, b) closesocket(a)
18+
19+
#define PQselect(a, b, c, d, e, f) select(a, b, c, d, e)
20+
#define PQisRsocket(a) false
21+
22+
#endif /* MTMCOMPAT_H */

multimaster.c

Lines changed: 9 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
#include "funcapi.h"
1414
#include "fmgr.h"
1515
#include "miscadmin.h"
16-
#include "common/pg_socket.h"
16+
// #include "common/pg_socket.h"
1717
#include "pgstat.h"
1818
#include "utils/regproc.h"
1919

@@ -67,7 +67,7 @@
6767
#include "access/htup_details.h"
6868
#include "catalog/indexing.h"
6969
#include "catalog/namespace.h"
70-
#include "catalog/pg_constraint_fn.h"
70+
#include "catalog/pg_constraint.h"
7171
#include "catalog/pg_proc.h"
7272
#include "pglogical_output/hooks.h"
7373
#include "parser/analyze.h"
@@ -83,6 +83,8 @@
8383
#include "ddd.h"
8484
#include "state.h"
8585

86+
#include "compat.h"
87+
8688
typedef struct {
8789
TransactionId xid; /* local transaction ID */
8890
GlobalTransactionId gtid; /* global transaction ID assigned by coordinator of transaction */
@@ -206,16 +208,7 @@ static dlist_head MtmLsnMapping = DLIST_STATIC_INIT(MtmLsnMapping);
206208

207209
static TransactionManager MtmTM =
208210
{
209-
PgTransactionIdGetStatus,
210-
PgTransactionIdSetTreeStatus,
211-
MtmGetSnapshot,
212-
PgGetNewTransactionId,
213-
MtmGetOldestXmin,
214-
PgTransactionIdIsInProgress,
215-
PgGetGlobalTransactionId,
216-
PgXidInMVCCSnapshot,
217211
MtmDetectGlobalDeadLock,
218-
MtmGetName,
219212
MtmGetTransactionStateSize,
220213
MtmSerializeTransactionState,
221214
MtmDeserializeTransactionState,
@@ -630,7 +623,7 @@ void MtmSetSnapshot(csn_t globalSnapshot)
630623

631624
Snapshot MtmGetSnapshot(Snapshot snapshot)
632625
{
633-
snapshot = PgGetSnapshotData(snapshot);
626+
snapshot = GetSnapshotData(snapshot);
634627
if (XactIsoLevel == XACT_READ_COMMITTED && MtmTx.snapshot != INVALID_CSN) {
635628
MtmTx.snapshot = MtmGetCurrentTime();
636629
if (TransactionIdIsValid(GetCurrentTransactionIdIfAny())) {
@@ -644,7 +637,7 @@ Snapshot MtmGetSnapshot(Snapshot snapshot)
644637

645638
TransactionId MtmGetOldestXmin(Relation rel, int flags)
646639
{
647-
TransactionId xmin = PgGetOldestXmin(rel, flags); /* consider all backends */
640+
TransactionId xmin = GetOldestXmin(rel, flags); /* consider all backends */
648641
// if (TransactionIdIsValid(xmin)) {
649642
// MtmLock(LW_EXCLUSIVE);
650643
// xmin = MtmAdjustOldestXid(xmin);
@@ -4802,7 +4795,7 @@ static bool MtmTwoPhaseCommit(MtmCurrentTrans* x)
48024795
MtmGenerateGid(x->gid);
48034796

48044797
if (!x->isTransactionBlock) {
4805-
BeginTransactionBlock(false);
4798+
BeginTransactionBlock();
48064799
x->isTransactionBlock = true;
48074800
CommitTransactionCommand();
48084801
StartTransactionCommand();
@@ -5807,10 +5800,10 @@ MtmDetectGlobalDeadLockForXid(TransactionId xid)
58075800

58085801
if (!hasDeadlock)
58095802
{
5810-
TimestampTz start_time = get_timeout_start_time(DEADLOCK_TIMEOUT);
5803+
// TimestampTz start_time = get_timeout_start_time(DEADLOCK_TIMEOUT);
58115804
MTM_LOG1("Enable deadlock timeout in backend %d for transaction %llu", MyProcPid, (long64)xid);
58125805
enable_timeout_after(DEADLOCK_TIMEOUT, DeadlockTimeout);
5813-
set_timeout_start_time(DEADLOCK_TIMEOUT, start_time);
5806+
// set_timeout_start_time(DEADLOCK_TIMEOUT, start_time);
58145807
}
58155808
}
58165809
return hasDeadlock;

pglogical_apply.c

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -514,7 +514,7 @@ process_remote_message(StringInfo s)
514514
ExecVacuum(MtmVacuumStmt, 1);
515515
} else if (MtmIndexStmt != NULL) {
516516
Oid relid = RangeVarGetRelidExtended(MtmIndexStmt->relation, ShareUpdateExclusiveLock,
517-
false, false,
517+
0,
518518
NULL,
519519
NULL);
520520
/* Run parse analysis ... */
@@ -523,12 +523,14 @@ process_remote_message(StringInfo s)
523523
DefineIndex(relid, /* OID of heap relation */
524524
MtmIndexStmt,
525525
InvalidOid, /* no predefined OID */
526+
InvalidOid, /* no parent index */
527+
InvalidOid, /* no parent constraint */
526528
false, /* is_alter_table */
527529
true, /* check_rights */
528530
true, /* check_not_in_use */
529531
false, /* skip_build */
530532
false); /* quiet */
531-
533+
532534
}
533535
else if (MtmDropStmt != NULL)
534536
{
@@ -631,7 +633,7 @@ read_tuple_parts(StringInfo s, Relation rel, TupleData *tup)
631633

632634
for (i = 0; i < desc->natts; i++)
633635
{
634-
Form_pg_attribute att = desc->attrs[i];
636+
Form_pg_attribute att = TupleDescAttr(desc, i);
635637
char kind;
636638
const char *data;
637639
int len;
@@ -745,7 +747,7 @@ read_rel(StringInfo s, LOCKMODE mode)
745747
relnamelen = pq_getmsgbyte(s);
746748
rv->relname = (char *) pq_getmsgbytes(s, relnamelen);
747749

748-
local_relid = RangeVarGetRelidExtended(rv, mode, false, false, NULL, NULL);
750+
local_relid = RangeVarGetRelidExtended(rv, mode, 0, NULL, NULL);
749751
old_context = MemoryContextSwitchTo(TopMemoryContext);
750752
pglogical_relid_map_put(remote_relid, local_relid);
751753
MemoryContextSwitchTo(old_context);
@@ -820,7 +822,7 @@ process_remote_commit(StringInfo in)
820822
AbortCurrentTransaction();
821823
} else {
822824
/* prepare TBLOCK_INPROGRESS state for PrepareTransactionBlock() */
823-
BeginTransactionBlock(false);
825+
BeginTransactionBlock();
824826
CommitTransactionCommand();
825827
StartTransactionCommand();
826828

@@ -915,8 +917,8 @@ process_remote_insert(StringInfo s, Relation rel)
915917
PushActiveSnapshot(GetTransactionSnapshot());
916918

917919
estate = create_rel_estate(rel);
918-
newslot = ExecInitExtraTupleSlot(estate);
919-
oldslot = ExecInitExtraTupleSlot(estate);
920+
newslot = ExecInitExtraTupleSlot(estate, NULL);
921+
oldslot = ExecInitExtraTupleSlot(estate, NULL);
920922
ExecSetSlotDescriptor(newslot, tupDesc);
921923
ExecSetSlotDescriptor(oldslot, tupDesc);
922924

@@ -1086,9 +1088,9 @@ process_remote_update(StringInfo s, Relation rel)
10861088
action);
10871089

10881090
estate = create_rel_estate(rel);
1089-
oldslot = ExecInitExtraTupleSlot(estate);
1091+
oldslot = ExecInitExtraTupleSlot(estate, NULL);
10901092
ExecSetSlotDescriptor(oldslot, tupDesc);
1091-
newslot = ExecInitExtraTupleSlot(estate);
1093+
newslot = ExecInitExtraTupleSlot(estate, NULL);
10921094
ExecSetSlotDescriptor(newslot, tupDesc);
10931095

10941096
if (action == 'K')
@@ -1196,7 +1198,7 @@ process_remote_delete(StringInfo s, Relation rel)
11961198
bool found_old;
11971199

11981200
estate = create_rel_estate(rel);
1199-
oldslot = ExecInitExtraTupleSlot(estate);
1201+
oldslot = ExecInitExtraTupleSlot(estate, NULL);
12001202
ExecSetSlotDescriptor(oldslot, tupDesc);
12011203

12021204
read_tuple_parts(s, rel, &oldtup);
@@ -1277,9 +1279,7 @@ void MtmExecutor(void* work, size_t size)
12771279
if (MtmApplyContext == NULL) {
12781280
MtmApplyContext = AllocSetContextCreate(TopMemoryContext,
12791281
"ApplyContext",
1280-
ALLOCSET_DEFAULT_MINSIZE,
1281-
ALLOCSET_DEFAULT_INITSIZE,
1282-
ALLOCSET_DEFAULT_MAXSIZE);
1282+
ALLOCSET_DEFAULT_SIZES);
12831283
}
12841284
top_context = MemoryContextSwitchTo(MtmApplyContext);
12851285
replorigin_session_origin = InvalidRepOriginId;

pglogical_output.c

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,9 +209,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
209209

210210
data->context = AllocSetContextCreate(TopMemoryContext,
211211
"pglogical conversion context",
212-
ALLOCSET_DEFAULT_MINSIZE,
213-
ALLOCSET_DEFAULT_INITSIZE,
214-
ALLOCSET_DEFAULT_MAXSIZE);
212+
ALLOCSET_DEFAULT_SIZES);
215213
data->allow_internal_basetypes = true;
216214
data->allow_binary_basetypes = true;
217215

@@ -407,9 +405,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
407405

408406
data->hooks_mctxt = AllocSetContextCreate(ctx->context,
409407
"pglogical_output hooks context",
410-
ALLOCSET_SMALL_MINSIZE,
411-
ALLOCSET_SMALL_INITSIZE,
412-
ALLOCSET_SMALL_MAXSIZE);
408+
ALLOCSET_DEFAULT_SIZES);
413409

414410
load_hooks(data);
415411
call_startup_hook(data, ctx->output_plugin_options);

pglogical_proto.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
631631

632632
for (i = 0; i < desc->natts; i++)
633633
{
634-
if (desc->attrs[i]->attisdropped)
634+
if (TupleDescAttr(desc, i)->attisdropped)
635635
continue;
636636
nliveatts++;
637637
}
@@ -652,7 +652,7 @@ pglogical_write_tuple(StringInfo out, PGLogicalOutputData *data,
652652
{
653653
HeapTuple typtup;
654654
Form_pg_type typclass;
655-
Form_pg_attribute att = desc->attrs[i];
655+
Form_pg_attribute att = TupleDescAttr(desc, i);
656656
char transfer_type;
657657

658658
/* skip dropped columns */

pglogical_receiver.c

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
/* Some general headers for custom bgworker facility */
1717

1818
#include <unistd.h>
19+
#include <sys/time.h>
20+
1921
#include "postgres.h"
2022
#include "fmgr.h"
2123
#include "miscadmin.h"
22-
#include "common/pg_socket.h"
24+
// #include "common/pg_socket.h"
2325
#include "pqexpbuffer.h"
2426
#include "access/xact.h"
2527
#include "access/clog.h"
@@ -47,6 +49,7 @@
4749
#include "spill.h"
4850
#include "state.h"
4951
#include "bgwpool.h"
52+
#include "compat.h"
5053

5154
#define ERRCODE_DUPLICATE_OBJECT_STR "42710"
5255
#define RECEIVER_SUSPEND_TIMEOUT (1*USECS_PER_SEC)
@@ -269,7 +272,7 @@ pglogical_receiver_main(Datum main_arg)
269272
BackgroundWorkerUnblockSignals();
270273

271274
/* Connect to a database */
272-
BackgroundWorkerInitializeConnection(MtmDatabaseName, MtmDatabaseUser);
275+
BackgroundWorkerInitializeConnection(MtmDatabaseName, MtmDatabaseUser, 0);
273276
ActivePortal = &fakePortal;
274277
ActivePortal->status = PORTAL_ACTIVE;
275278
ActivePortal->sourceText = "";

spill.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,7 @@ int MtmCreateSpillFile(int node_id, int* file_id)
6767
sprintf(path, "pg_mtm/%d/txn-%d.snap",
6868
node_id, ++spill_file_id);
6969
fd = OpenTransientFile(path,
70-
O_CREAT | O_TRUNC | O_WRONLY | O_APPEND | PG_BINARY,
71-
S_IRUSR | S_IWUSR);
70+
O_CREAT | O_TRUNC | O_WRONLY | O_APPEND | PG_BINARY);
7271
if (fd < 0) {
7372
ereport(PANIC,
7473
(errcode_for_file_access(),
@@ -87,8 +86,7 @@ int MtmOpenSpillFile(int node_id, int file_id)
8786
sprintf(path, "pg_mtm/%d/txn-%d.snap",
8887
node_id, file_id);
8988
fd = OpenTransientFile(path,
90-
O_RDONLY | PG_BINARY,
91-
S_IRUSR | S_IWUSR);
89+
O_RDONLY | PG_BINARY);
9290
if (fd < 0) {
9391
ereport(PANIC,
9492
(errcode_for_file_access(),

0 commit comments

Comments
 (0)