Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit ccac390

Browse files
knizhnikkelvich
authored andcommitted
Add dtmacid test
1 parent 629f58b commit ccac390

File tree

4 files changed

+324
-3
lines changed

4 files changed

+324
-3
lines changed

tests/dtmacid.cpp

Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
#include <time.h>
2+
#include <stdio.h>
3+
#include <stdarg.h>
4+
#include <stdlib.h>
5+
#include <inttypes.h>
6+
#include <sys/time.h>
7+
#include <pthread.h>
8+
9+
#include <string>
10+
#include <vector>
11+
12+
#include <pqxx/connection>
13+
#include <pqxx/transaction>
14+
#include <pqxx/nontransaction>
15+
#include <pqxx/pipeline>
16+
17+
using namespace std;
18+
using namespace pqxx;
19+
20+
template<class T>
21+
class my_unique_ptr
22+
{
23+
T* ptr;
24+
25+
public:
26+
my_unique_ptr(T* p = NULL) : ptr(p) {}
27+
~my_unique_ptr() { delete ptr; }
28+
T& operator*() { return *ptr; }
29+
T* operator->() { return ptr; }
30+
void operator=(T* p) { ptr = p; }
31+
void operator=(my_unique_ptr& other) {
32+
ptr = other.ptr;
33+
other.ptr = NULL;
34+
}
35+
};
36+
37+
typedef void* (*thread_proc_t)(void*);
38+
typedef uint32_t xid_t;
39+
40+
struct thread
41+
{
42+
pthread_t t;
43+
size_t transactions;
44+
size_t updates;
45+
size_t selects;
46+
size_t aborts;
47+
int id;
48+
49+
void start(int tid, thread_proc_t proc) {
50+
id = tid;
51+
updates = 0;
52+
selects = 0;
53+
aborts = 0;
54+
transactions = 0;
55+
pthread_create(&t, NULL, proc, this);
56+
}
57+
58+
void wait() {
59+
pthread_join(t, NULL);
60+
}
61+
};
62+
63+
struct config
64+
{
65+
int nReaders;
66+
int nWriters;
67+
int nIterations;
68+
int nAccounts;
69+
int updatePercent;
70+
vector<string> connections;
71+
bool scatter;
72+
73+
config() {
74+
nReaders = 1;
75+
nWriters = 10;
76+
nIterations = 1000;
77+
nAccounts = 100000;
78+
updatePercent = 100;
79+
scatter = false;
80+
}
81+
};
82+
83+
config cfg;
84+
bool running;
85+
86+
#define USEC 1000000
87+
88+
static time_t getCurrentTime()
89+
{
90+
struct timeval tv;
91+
gettimeofday(&tv, NULL);
92+
return (time_t)tv.tv_sec*USEC + tv.tv_usec;
93+
}
94+
95+
96+
void exec(transaction_base& txn, char const* sql, ...)
97+
{
98+
va_list args;
99+
va_start(args, sql);
100+
char buf[1024];
101+
vsprintf(buf, sql, args);
102+
va_end(args);
103+
txn.exec(buf);
104+
}
105+
106+
template<class T>
107+
T execQuery( transaction_base& txn, char const* sql, ...)
108+
{
109+
va_list args;
110+
va_start(args, sql);
111+
char buf[1024];
112+
vsprintf(buf, sql, args);
113+
va_end(args);
114+
result r = txn.exec(buf);
115+
return r[0][0].as(T());
116+
}
117+
118+
void* reader(void* arg)
119+
{
120+
thread& t = *(thread*)arg;
121+
vector< my_unique_ptr<connection> > conns(cfg.connections.size());
122+
for (size_t i = 0; i < conns.size(); i++) {
123+
conns[i] = new connection(cfg.connections[i]);
124+
}
125+
int lt = 0;
126+
int gt = 0;
127+
while (running) {
128+
int c1 = random() % conns.size();
129+
int c2;
130+
while ((c2 = random() % conns.size()) == c1);
131+
work txn1(*conns[c1]);
132+
work txn2(*conns[c2]);
133+
result r1 = txn1.exec("select v from t order by u");
134+
result r2 = txn2.exec("select v from t order by u");
135+
int delta = 0;
136+
for (int i=0; i < cfg.nAccounts; i++) {
137+
int diff = r1[i][0].as(int()) - r2[i][0].as(int());
138+
if (diff != 0) {
139+
if (delta == 0) {
140+
delta = diff;
141+
if (delta < 0) lt++; else gt++;
142+
} else if (delta != diff) {
143+
printf("Inconsistency found for record %d\n", i);
144+
}
145+
}
146+
}
147+
t.transactions += 2;
148+
t.selects += 2;
149+
txn1.commit();
150+
txn2.commit();
151+
}
152+
printf("lt=%d, gt=%d\n", lt, gt);
153+
return NULL;
154+
}
155+
156+
void* writer(void* arg)
157+
{
158+
thread& t = *(thread*)arg;
159+
vector< my_unique_ptr<connection> > conns(cfg.connections.size());
160+
for (size_t i = 0; i < conns.size(); i++) {
161+
conns[i] = new connection(cfg.connections[i]);
162+
}
163+
for (int i = 0; i < cfg.nIterations; i++)
164+
{
165+
//work
166+
//transaction<repeatable_read> txn(*conns[random() % conns.size()]);
167+
transaction<read_committed> txn(*conns[random() % conns.size()]);
168+
int acc = random() % cfg.nAccounts;
169+
try {
170+
exec(txn, "update t set v = v + 1 where u=%d", acc);
171+
txn.commit();
172+
t.updates += 1;
173+
t.transactions += 1;
174+
} catch (pqxx_exception const& x) {
175+
txn.abort();
176+
t.aborts += 1;
177+
i -= 1;
178+
}
179+
}
180+
return NULL;
181+
}
182+
183+
void initializeDatabase()
184+
{
185+
connection conn(cfg.connections[0]);
186+
time_t start = getCurrentTime();
187+
printf("Creating database schema...\n");
188+
{
189+
nontransaction txn(conn);
190+
exec(txn, "drop extension if exists multimaster");
191+
exec(txn, "create extension multimaster");
192+
exec(txn, "drop table if exists t");
193+
exec(txn, "create table t(u int primary key, v int)");
194+
}
195+
printf("Populating data...\n");
196+
{
197+
work txn(conn);
198+
exec(txn, "insert into t (select generate_series(0,%d), %d)", cfg.nAccounts-1, 0);
199+
txn.commit();
200+
}
201+
printf("Initialization completed in %f seconds\n", (getCurrentTime() - start)/100000.0);
202+
}
203+
204+
int main (int argc, char* argv[])
205+
{
206+
bool initialize = false;
207+
208+
if (argc == 1){
209+
printf("Use -h to show usage options\n");
210+
return 1;
211+
}
212+
213+
for (int i = 1; i < argc; i++) {
214+
if (argv[i][0] == '-') {
215+
switch (argv[i][1]) {
216+
case 'r':
217+
cfg.nReaders = atoi(argv[++i]);
218+
continue;
219+
case 'w':
220+
cfg.nWriters = atoi(argv[++i]);
221+
continue;
222+
case 'a':
223+
cfg.nAccounts = atoi(argv[++i]);
224+
continue;
225+
case 'n':
226+
cfg.nIterations = atoi(argv[++i]);
227+
continue;
228+
case 'p':
229+
cfg.updatePercent = atoi(argv[++i]);
230+
continue;
231+
case 's':
232+
cfg.scatter = true;
233+
continue;
234+
case 'c':
235+
cfg.connections.push_back(string(argv[++i]));
236+
continue;
237+
case 'i':
238+
initialize = true;
239+
continue;
240+
}
241+
}
242+
printf("Options:\n"
243+
"\t-r N\tnumber of readers (1)\n"
244+
"\t-w N\tnumber of writers (10)\n"
245+
"\t-a N\tnumber of accounts (100000)\n"
246+
"\t-n N\tnumber of iterations (1000)\n"
247+
"\t-p N\tupdate percent (100)\n"
248+
"\t-c STR\tdatabase connection string\n"
249+
"\t-i\tinitialize database\n");
250+
return 1;
251+
}
252+
253+
if (initialize) {
254+
initializeDatabase();
255+
printf("%d accounts inserted\n", cfg.nAccounts);
256+
return 0;
257+
}
258+
259+
time_t start = getCurrentTime();
260+
running = true;
261+
262+
vector<thread> readers(cfg.nReaders);
263+
vector<thread> writers(cfg.nWriters);
264+
size_t nAborts = 0;
265+
size_t nUpdates = 0;
266+
size_t nSelects = 0;
267+
size_t nTransactions = 0;
268+
269+
for (int i = 0; i < cfg.nReaders; i++) {
270+
readers[i].start(i, reader);
271+
}
272+
for (int i = 0; i < cfg.nWriters; i++) {
273+
writers[i].start(i, writer);
274+
}
275+
276+
for (int i = 0; i < cfg.nWriters; i++) {
277+
writers[i].wait();
278+
nUpdates += writers[i].updates;
279+
nSelects += writers[i].selects;
280+
nAborts += writers[i].aborts;
281+
nTransactions += writers[i].transactions;
282+
}
283+
284+
running = false;
285+
286+
for (int i = 0; i < cfg.nReaders; i++) {
287+
readers[i].wait();
288+
nSelects += readers[i].selects;
289+
nTransactions += writers[i].transactions;
290+
}
291+
292+
time_t elapsed = getCurrentTime() - start;
293+
294+
printf(
295+
"{\"tps\":%f, \"transactions\":%ld,"
296+
" \"selects\":%ld, \"updates\":%ld, \"aborts\":%ld, \"abort_percent\": %d,"
297+
" \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d, \"iterations\":%d, \"hosts\":%ld}\n",
298+
(double)(nTransactions*USEC)/elapsed,
299+
nTransactions,
300+
nSelects,
301+
nUpdates,
302+
nAborts,
303+
(int)(nAborts*100/nTransactions),
304+
cfg.nReaders,
305+
cfg.nWriters,
306+
cfg.updatePercent,
307+
cfg.nAccounts,
308+
cfg.nIterations,
309+
cfg.connections.size()
310+
);
311+
312+
return 0;
313+
}

tests/makefile

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
CXX=g++
2-
CXXFLAGS=-g -Wall -O2 -pthread
2+
CXXFLAGS=-g -Wall -O0 -pthread
33

4-
all: dtmbench
4+
all: dtmbench dtmacid
55

66
dtmbench: dtmbench.cpp
77
$(CXX) $(CXXFLAGS) -o dtmbench dtmbench.cpp -lpqxx -lpq
88

9+
dtmacid: dtmacid.cpp
10+
$(CXX) $(CXXFLAGS) -o dtmacid dtmacid.cpp -lpqxx -lpq
11+
912
clean:
10-
rm -f dtmbench
13+
rm -f dtmbench dtmacid

tests/run_perf.sh

100644100755
File mode changed.

tests/runacid.sh

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
./dtmacid \
2+
-c "dbname=postgres host=localhost port=5432 sslmode=disable" \
3+
-c "dbname=postgres host=localhost port=5433 sslmode=disable" \
4+
-c "dbname=postgres host=localhost port=5434 sslmode=disable" \
5+
-n 1000 -a 1000 -w 10 -r 1 $*

0 commit comments

Comments
 (0)