Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 5903e48

Browse files
committed
Add error reporting to the libdtm snapshot acquisition method. Catch exceptions in the reader thread.
1 parent 6034200 commit 5903e48

File tree

3 files changed

+57
-41
lines changed

3 files changed

+57
-41
lines changed

contrib/pg_dtm/libdtm.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <assert.h>
99
#include <time.h>
1010

11+
#include "postgres.h"
1112
#include "libdtm.h"
1213
#include "dtmd/include/proto.h"
1314
#include "dtmd/include/dtmdlimits.h"
@@ -428,8 +429,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *g
428429
return;
429430
failure:
430431
DiscardConnection();
431-
fprintf(
432-
stderr,
432+
elog(ERROR,
433433
"DtmGlobalGetSnapshot: failed to"
434434
" get the snapshot for xid = %d\n",
435435
xid

contrib/pg_dtm/tests/dtmbench.cpp

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -118,35 +118,40 @@ void* reader(void* arg)
118118
int64_t prevSum = 0;
119119

120120
while (running) {
121-
xid_t xid = 0;
122-
for (size_t i = 0; i < conns.size(); i++) {
123-
work txn(*conns[i]);
124-
if (i == 0) {
125-
xid = execQuery(txn, "select dtm_begin_transaction()");
126-
} else {
127-
exec(txn, "select dtm_join_transaction(%u)", xid);
121+
try {
122+
xid_t xid = 0;
123+
for (size_t i = 0; i < conns.size(); i++) {
124+
work txn(*conns[i]);
125+
if (i == 0) {
126+
xid = execQuery(txn, "select dtm_begin_transaction()");
127+
} else {
128+
exec(txn, "select dtm_join_transaction(%u)", xid);
129+
}
130+
txn.commit();
128131
}
129-
txn.commit();
130-
}
131-
vector< unique_ptr<nontransaction> > txns(conns.size());
132-
vector< unique_ptr<pipeline> > pipes(conns.size());
133-
vector<pipeline::query_id> results(conns.size());
134-
for (size_t i = 0; i < conns.size(); i++) {
135-
txns[i] = new nontransaction(*conns[i]);
136-
pipes[i] = new pipeline(*txns[i]);
137-
results[i] = pipes[i]->insert("select sum(v) from t");
138-
}
139-
int64_t sum = 0;
140-
for (size_t i = 0; i < conns.size(); i++) {
141-
pipes[i]->complete();
142-
result r = pipes[i]->retrieve(results[i]);
143-
sum += r[0][0].as(int64_t());
144-
}
145-
if (sum != prevSum) {
146-
printf("Total=%ld xid=%u\n", sum, xid);
147-
prevSum = sum;
132+
vector< unique_ptr<nontransaction> > txns(conns.size());
133+
vector< unique_ptr<pipeline> > pipes(conns.size());
134+
vector<pipeline::query_id> results(conns.size());
135+
for (size_t i = 0; i < conns.size(); i++) {
136+
txns[i] = new nontransaction(*conns[i]);
137+
pipes[i] = new pipeline(*txns[i]);
138+
results[i] = pipes[i]->insert("select sum(v) from t");
139+
}
140+
int64_t sum = 0;
141+
for (size_t i = 0; i < conns.size(); i++) {
142+
pipes[i]->complete();
143+
result r = pipes[i]->retrieve(results[i]);
144+
sum += r[0][0].as(int64_t());
145+
}
146+
if (sum != prevSum) {
147+
printf("Total=%ld xid=%u\n", sum, xid);
148+
prevSum = sum;
149+
}
150+
t.proceeded += 1;
151+
} catch (pqxx_exception const& x) {
152+
printf("reader exception\n");
153+
continue;
148154
}
149-
t.proceeded += 1;
150155
}
151156
return NULL;
152157
}
@@ -174,8 +179,13 @@ void* writer(void* arg)
174179
nontransaction srcTx(*conns[srcCon]);
175180
nontransaction dstTx(*conns[dstCon]);
176181

177-
xid_t xid = execQuery(srcTx, "select dtm_begin_transaction()");
178-
exec(dstTx, "select dtm_join_transaction(%u)", xid);
182+
try {
183+
xid_t xid = execQuery(srcTx, "select dtm_begin_transaction()");
184+
exec(dstTx, "select dtm_join_transaction(%u)", xid);
185+
} catch (pqxx_exception const& x) {
186+
i -= 1;
187+
continue;
188+
}
179189

180190
exec(srcTx, "begin transaction isolation level %s", cfg.isolationLevel);
181191
exec(dstTx, "begin transaction isolation level %s", cfg.isolationLevel);
@@ -213,14 +223,19 @@ void* writer(void* arg)
213223
void initializeDatabase()
214224
{
215225
for (size_t i = 0; i < cfg.connections.size(); i++) {
216-
connection conn(cfg.connections[i]);
217-
work txn(conn);
218-
exec(txn, "drop extension if exists pg_dtm");
219-
exec(txn, "create extension pg_dtm");
220-
exec(txn, "drop table if exists t");
221-
exec(txn, "create table t(u int primary key, v int)");
222-
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
223-
txn.commit();
226+
try {
227+
connection conn(cfg.connections[i]);
228+
work txn(conn);
229+
exec(txn, "drop extension if exists pg_dtm");
230+
exec(txn, "create extension pg_dtm");
231+
exec(txn, "drop table if exists t");
232+
exec(txn, "create table t(u int primary key, v int)");
233+
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
234+
txn.commit();
235+
} catch (pqxx_exception const& x) {
236+
i -= 1;
237+
continue;
238+
}
224239
}
225240
}
226241

@@ -308,3 +323,4 @@ int main (int argc, char* argv[])
308323
}
309324
return 0;
310325
}
326+
// vim: sts=4 ts=4 sw=4 expandtab

contrib/pg_dtm/tests/makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ CXXFLAGS=-g -Wall -O2 -pthread
44
all: dtmbench
55

66
dtmbench: dtmbench.cpp
7-
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx
7+
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx -lpq
88

99
clean:
10-
rm -f dtmbench
10+
rm -f dtmbench

0 commit comments

Comments
 (0)