Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6c3ed50

Browse files
committed
dmq integration: resurrect docker tests, fix gcc warnings, fix few errors
1 parent 1ce9da8 commit 6c3ed50

12 files changed

+65
-39
lines changed

Dockerfile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
FROM pgproee10
1+
FROM pgproee11
22

33
RUN mkdir /pg/mmts
44
COPY ./ /pg/mmts/
55

66
RUN export USE_PGXS=1 && \
77
cd /pg/mmts && make clean && make install
88

9-
RUN export USE_PGXS=1 && \
10-
cd /pg/src/contrib/referee && make clean && make install
9+
# RUN export USE_PGXS=1 && \
10+
# cd /pg/src/contrib/referee && make clean && make install
1111

1212
# pg_regress client assumes such dir exists on server
1313
RUN cp /pg/src/src/test/regress/*.so /pg/install/lib/postgresql/

commit.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ MtmBeginTransaction(MtmCurrentTrans* x)
6464
x->containsDML = false; // will be set by executor hook
6565
x->isTransactionBlock = IsTransactionBlock();
6666

67+
68+
/* XXX: ugly hack with debug_query_string */
69+
6770
/* Application name can be changed using PGAPPNAME environment variable */
6871
if (x->isDistributed && Mtm->status != MTM_ONLINE
6972
&& strcmp(application_name, MULTIMASTER_ADMIN) != 0

dmq.c

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ dmq_sender_main(Datum main_arg)
418418
{
419419
// Assert(PQstatus(conns[conn_id].pgconn) != CONNECTION_OK);
420420
conns[conn_id].state = Idle;
421-
DeleteWaitEvent(set, conns[conn_id].pos);
421+
// DeleteWaitEvent(set, conns[conn_id].pos);
422422

423423
mtm_log(DmqStateFinal,
424424
"[DMQ] failed to send message to %s: %s",
@@ -429,14 +429,14 @@ dmq_sender_main(Datum main_arg)
429429
{
430430
mtm_log(DmqTraceOutgoing,
431431
"[DMQ] sent message (l=%zu, m=%s) to %s",
432-
len, data, conns[conn_id].receiver_name);
432+
len, (char *) data, conns[conn_id].receiver_name);
433433
}
434434
}
435435
else
436436
{
437437
mtm_log(WARNING,
438438
"[DMQ] dropping message (l=%zu, m=%s) to disconnected %s",
439-
len, data, conns[conn_id].receiver_name);
439+
len, (char *) data, conns[conn_id].receiver_name);
440440
}
441441

442442
wait = false;
@@ -463,7 +463,7 @@ dmq_sender_main(Datum main_arg)
463463
* Generate timeout or socket events.
464464
*
465465
*
466-
* XXX: here we expect that whole cyle takes less then 250-100 ms.
466+
* XXX: here we expect that whole cycle takes less then 250-100 ms.
467467
* Otherwise we can stuck with timer_event forever.
468468
*/
469469
now_millisec = dmq_now();
@@ -526,7 +526,7 @@ dmq_sender_main(Datum main_arg)
526526
if (ret < 0)
527527
{
528528
conns[conn_id].state = Idle;
529-
DeleteWaitEvent(set, conns[conn_id].pos);
529+
// DeleteWaitEvent(set, conns[conn_id].pos);
530530
// Assert(PQstatus(conns[i].pgconn) != CONNECTION_OK);
531531

532532
mtm_log(DmqStateFinal,
@@ -543,7 +543,7 @@ dmq_sender_main(Datum main_arg)
543543
*/
544544
else if (nevents > 0 && event.events & WL_SOCKET_MASK)
545545
{
546-
uint conn_id = (uint) event.user_data;
546+
uintptr_t conn_id = (uintptr_t) event.user_data;
547547

548548
switch (conns[conn_id].state)
549549
{
@@ -625,6 +625,7 @@ dmq_sender_main(Datum main_arg)
625625
if (!PQisBusy(conns[conn_id].pgconn))
626626
{
627627
conns[conn_id].state = Active;
628+
DeleteWaitEvent(set, event.pos);
628629

629630
mtm_log(DmqStateFinal,
630631
"[DMQ] Connected to %s",
@@ -638,7 +639,6 @@ dmq_sender_main(Datum main_arg)
638639
if (!PQconsumeInput(conns[conn_id].pgconn))
639640
{
640641
conns[conn_id].state = Idle;
641-
DeleteWaitEvent(set, event.pos);
642642

643643
mtm_log(DmqStateFinal,
644644
"[DMQ] connection error with %s: %s",
@@ -975,8 +975,8 @@ dmq_receiver_loop(PG_FUNCTION_ARGS)
975975

976976
/* okay, switch client to copyout state */
977977
pq_beginmessage(&s, 'W');
978-
pq_sendbyte(&s, 0);
979-
pq_sendint16(&s, 0);
978+
pq_sendbyte(&s, 0); /* copy_is_binary */
979+
pq_sendint16(&s, 0); /* numAttributes */
980980
pq_endmessage(&s);
981981
pq_flush();
982982

@@ -1146,7 +1146,7 @@ dmq_push_buffer(DmqDestinationId dest_id, char *stream_name, const void *payload
11461146
// XXX: use sendv instead
11471147
res = shm_mq_send(dmq_local.mq_outh, buf.len, buf.data, false);
11481148
if (res != SHM_MQ_SUCCESS)
1149-
mtm_log(ERROR, "[DMQ] dmq_push: can't send to queue");
1149+
mtm_log(WARNING, "[DMQ] dmq_push: can't send to queue");
11501150
}
11511151

11521152
static bool

logger.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,4 @@ typedef enum MtmLogTag
4444

4545
#define mtm_log(tag, fmt, ...) ereport(tag, \
4646
(errmsg("[MTM] " fmt, ## __VA_ARGS__), \
47-
errhidestmt(true), errhidecontext(true)))
47+
errhidestmt(true), errhidecontext(true)))

multimaster.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -442,19 +442,20 @@ void MtmSleep(timestamp_t usec)
442442
for (;;)
443443
{
444444
int rc;
445-
timestamp_t sleepfor = waketm - MtmGetCurrentTime();
445+
timestamp_t sleepfor;
446446

447447
CHECK_FOR_INTERRUPTS();
448448

449+
sleepfor = waketm - MtmGetCurrentTime();
450+
if (sleepfor < 0)
451+
break;
452+
449453
rc = WaitLatch(MyLatch,
450454
WL_TIMEOUT | WL_POSTMASTER_DEATH,
451455
sleepfor/1000.0, WAIT_EVENT_BGWORKER_STARTUP);
452456

453457
if (rc & WL_POSTMASTER_DEATH)
454458
proc_exit(1);
455-
456-
if (MtmGetCurrentTime() > waketm)
457-
break;
458459
}
459460
}
460461

pglogical_apply.c

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -441,8 +441,6 @@ create_rel_estate(Relation rel)
441441
static bool
442442
process_remote_begin(StringInfo s, GlobalTransactionId *gtid)
443443
{
444-
csn_t snapshot;
445-
nodemask_t participantsMask = 0;
446444
int rc;
447445
TransactionId xid;
448446

@@ -452,11 +450,11 @@ process_remote_begin(StringInfo s, GlobalTransactionId *gtid)
452450
// XXX: get rid of MtmReplicationNodeId
453451
MtmReplicationNodeId = gtid->node;
454452

455-
snapshot = pq_getmsgint64(s);
456-
participantsMask = pq_getmsgint64(s);
453+
pq_getmsgint64(s); // XXX: snapshot
454+
pq_getmsgint64(s); // XXX: participantsMask
457455
Assert(gtid->node > 0);
458456

459-
MTM_LOG2("REMOTE begin node=%d xid=%llu snapshot=%lld participantsMask=%llx", gtid.node, (long64)gtid.xid, snapshot, participantsMask);
457+
// MTM_LOG2("REMOTE begin node=%d xid=%llu snapshot=%lld participantsMask=%llx", gtid.node, (long64)gtid.xid, snapshot, participantsMask);
460458
MtmResetTransaction();
461459

462460
SetCurrentStatementStartTimestamp();

resolver.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,10 @@ MtmTxStateMnem(MtmTxState state)
194194
case MtmTxAborted:
195195
return "MtmTxAborted";
196196
}
197+
198+
/* silence compiler */
199+
Assert(false);
200+
return NULL;
197201
}
198202

199203
/*****************************************************************************

tests/reinit-mm.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ SQL
9393
CONF
9494

9595
pg_ctl -w -D tmp_check/node$i -l node$i.log start
96-
psql -p $port -c 'create extension multimaster'
96+
psql -p $port -c 'create extension multimaster;'
9797

9898
done
9999

tests2/docker-entrypoint.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ if [ "$1" = 'postgres' ]; then
5757
max_replication_slots = 10
5858
max_wal_senders = 10
5959
shared_preload_libraries = 'multimaster'
60-
default_transaction_isolation = 'repeatable read'
6160
log_line_prefix = '%m [%p]: '
6261
wal_writer_delay = 500ms
6362
# log_statement = all
@@ -71,7 +70,7 @@ if [ "$1" = 'postgres' ]; then
7170
7271
multimaster.volkswagen_mode = on
7372
multimaster.ignore_tables_without_pk = on
74-
partition_backend = 'internal'
73+
# partition_backend = 'internal'
7574
EOF
7675

7776
if [ -n "$NODE_ID" ]; then

tests2/lib/bank_client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ def __init__(self, dsns, n_accounts=100000):
7878
# logging.basicConfig(level=logging.DEBUG)
7979
self.n_accounts = n_accounts
8080
self.dsns = dsns
81+
82+
self.create_extension()
83+
8184
self.total = 0
8285
self.aggregates = [{} for e in dsns]
8386
keep_trying(40, 1, self.initdb, 'self.initdb')
@@ -113,7 +116,7 @@ def __init__(self, dsns, n_accounts=100000):
113116
def initdb(self):
114117
conn = psycopg2.connect(self.dsns[0])
115118
cur = conn.cursor()
116-
cur.execute('create extension if not exists multimaster')
119+
# cur.execute('create extension if not exists multimaster')
117120
conn.commit()
118121
cur.execute('drop table if exists bank_test')
119122
cur.execute('create table bank_test(uid int primary key, amount int)')
@@ -135,6 +138,16 @@ def execute(self, node_id, statements):
135138
cur.close()
136139
con.close()
137140

141+
def create_extension(self):
142+
143+
for dsn in self.dsns:
144+
con = psycopg2.connect(dsn)
145+
con.autocommit = True
146+
cur = con.cursor()
147+
cur.execute('create extension multimaster;')
148+
cur.close()
149+
con.close()
150+
138151
def is_data_identic(self):
139152
hashes = set()
140153

tests2/test_recovery.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,18 @@ def test_node_partition(self):
8181
self.assertCommits(aggs)
8282
self.assertIsolation(aggs)
8383

84-
def test_edge_partition(self):
85-
print('### test_edge_partition ###')
84+
# def test_edge_partition(self):
85+
# print('### test_edge_partition ###')
8686

87-
aggs_failure, aggs = self.performFailure(EdgePartition('node1', 'node3'),
88-
node_wait_for_online="dbname=regression user=postgres host=127.0.0.1 port=15434", stop_load=True)
87+
# aggs_failure, aggs = self.performFailure(EdgePartition('node1', 'node3'),
88+
# node_wait_for_online="dbname=regression user=postgres host=127.0.0.1 port=15434", stop_load=True)
8989

90-
self.assertTrue( ('commit' in aggs_failure[0]['transfer']['finish']) or ('commit' in aggs_failure[2]['transfer']['finish']) )
91-
self.assertCommits(aggs_failure[1:2]) # second node
92-
self.assertIsolation(aggs_failure)
90+
# self.assertTrue( ('commit' in aggs_failure[0]['transfer']['finish']) or ('commit' in aggs_failure[2]['transfer']['finish']) )
91+
# self.assertCommits(aggs_failure[1:2]) # second node
92+
# self.assertIsolation(aggs_failure)
9393

94-
self.assertCommits(aggs)
95-
self.assertIsolation(aggs)
94+
# self.assertCommits(aggs)
95+
# self.assertIsolation(aggs)
9696

9797
def test_node_restart(self):
9898
print('### test_node_restart ###')

tests2/test_regression.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,32 @@
22
import subprocess
33
import time
44

5+
from lib.bank_client import MtmClient
6+
57
class RecoveryTest(unittest.TestCase):
68

79
@classmethod
8-
def setUpClass(self):
10+
def setUpClass(cls):
911
print('setUp')
1012
subprocess.check_call(['docker-compose','up',
1113
'--force-recreate',
1214
'--build',
1315
'-d'])
16+
time.sleep(10)
17+
cls.client = MtmClient([
18+
"dbname=regression user=postgres host=127.0.0.1 port=15432",
19+
"dbname=regression user=postgres host=127.0.0.1 port=15433",
20+
"dbname=regression user=postgres host=127.0.0.1 port=15434"
21+
], n_accounts=1000)
1422

1523
@classmethod
16-
def tearDownClass(self):
24+
def tearDownClass(cls):
1725
print('tearDown')
1826
# subprocess.check_call(['docker-compose','down'])
1927

2028
def test_regression(self):
2129
# XXX: make smth clever here
22-
time.sleep(31)
30+
time.sleep(10)
2331
subprocess.check_call(['docker', 'exec',
2432
'node1',
2533
'/pg/mmts/tests2/support/docker-regress.sh',

0 commit comments

Comments
 (0)