Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 64a5f47

Browse files
knizhnikkelvich
authored andcommitted
Rewrite handshake procedure
1 parent c22de58 commit 64a5f47

File tree

4 files changed

+12
-11
lines changed

4 files changed

+12
-11
lines changed

arbiter.c

+5-2
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ static int MtmConnectSocket(int node, int port, int timeout)
396396
unsigned i, n_addrs = sizeof(addrs) / sizeof(addrs[0]);
397397
MtmHandshakeMessage req;
398398
MtmArbiterMessage resp;
399-
int sd;
399+
int rc, sd;
400400
timestamp_t start = MtmGetSystemTime();
401401
char const* host = Mtm->nodes[node].con.hostName;
402402

@@ -476,7 +476,10 @@ static int MtmConnectSocket(int node, int port, int timeout)
476476
close(sd);
477477
goto Retry;
478478
}
479-
if (MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout) != 1 || MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
479+
while ((rc = MtmWaitSocket(sd, false, MtmHeartbeatSendTimeout)) == 0) {
480+
elog(WARNING, "Arbiter waiting response for handshake message from %s:%d: rc=%d, error=%d", host, port, rc, errno);
481+
}
482+
if (rc != 1 || MtmReadSocket(sd, &resp, sizeof resp) != sizeof(resp)) {
480483
elog(WARNING, "Arbiter failed to receive response for handshake message from %s:%d: errno=%d", host, port, errno);
481484
close(sd);
482485
goto Retry;

pglogical_output.c

+6-6
Original file line numberDiff line numberDiff line change
@@ -383,8 +383,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
383383
/* If the record didn't originate locally, send origin info */
384384
send_replication_origin &= txn->origin_id != InvalidRepOriginId;
385385

386-
OutputPluginPrepareWrite(ctx, !send_replication_origin);
387386
if (data->api) {
387+
OutputPluginPrepareWrite(ctx, !send_replication_origin);
388388
data->api->write_begin(ctx->out, data, txn);
389389

390390
if (send_replication_origin)
@@ -408,8 +408,8 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
408408
replorigin_by_oid(txn->origin_id, true, &origin))
409409
data->api->write_origin(ctx->out, origin, txn->origin_lsn);
410410
}
411+
OutputPluginWrite(ctx, true);
411412
}
412-
OutputPluginWrite(ctx, true);
413413
}
414414

415415
/*
@@ -421,11 +421,11 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421421
{
422422
PGLogicalOutputData* data = (PGLogicalOutputData*)ctx->output_plugin_private;
423423

424-
OutputPluginPrepareWrite(ctx, true);
425424
if (data->api) {
425+
OutputPluginPrepareWrite(ctx, true);
426426
data->api->write_commit(ctx->out, data, txn, commit_lsn);
427+
OutputPluginWrite(ctx, true);
427428
}
428-
OutputPluginWrite(ctx, true);
429429
}
430430

431431
void
@@ -541,11 +541,11 @@ send_startup_message(LogicalDecodingContext *ctx,
541541
* not.
542542
*/
543543

544-
OutputPluginPrepareWrite(ctx, last_message);
545544
if (data->api) {
545+
OutputPluginPrepareWrite(ctx, last_message);
546546
data->api->write_startup_message(ctx->out, msg);
547+
OutputPluginWrite(ctx, last_message);
547548
}
548-
OutputPluginWrite(ctx, last_message);
549549

550550
pfree(msg);
551551

tests2/docker-entrypoint.sh

-3
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,6 @@ if [ "$1" = 'postgres' ]; then
8080
checkpoint_timeout = 30
8181
log_autovacuum_min_duration = 0
8282
83-
raftable.id = $NODE_ID
84-
raftable.peers = '$RAFT_PEERS'
85-
8683
multimaster.workers = 4
8784
multimaster.use_raftable = true
8885
multimaster.queue_size=52857600

tests2/lib/bank_client.py

+1
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ def exec_tx(self, name, tx_block):
151151
def check_total(self):
152152

153153
def tx(conn, cur):
154+
conn.commit()
154155
cur.execute('select sum(amount) from bank_test')
155156
res = cur.fetchone()
156157
total = res[0]

0 commit comments

Comments
 (0)