11
11
12
12
#include < pqxx/connection>
13
13
#include < pqxx/transaction>
14
+ #include < pqxx/subtransaction.hxx>
14
15
#include < pqxx/nontransaction>
15
16
#include < pqxx/pipeline>
16
17
@@ -70,7 +71,8 @@ struct config
70
71
vector<string> connections;
71
72
bool scatter;
72
73
bool avoidDeadlocks;
73
-
74
+ bool subtransactions;
75
+
74
76
config () {
75
77
nReaders = 1 ;
76
78
nWriters = 10 ;
@@ -79,6 +81,7 @@ struct config
79
81
updatePercent = 100 ;
80
82
scatter = false ;
81
83
avoidDeadlocks = false ;
84
+ subtransactions = false ;
82
85
}
83
86
};
84
87
@@ -159,6 +162,33 @@ void* writer(void* arg)
159
162
if (cfg.scatter ) {
160
163
srcAcc = srcAcc/cfg.nWriters *cfg.nWriters + t.id ;
161
164
dstAcc = dstAcc/cfg.nWriters *cfg.nWriters + t.id ;
165
+ } else if (cfg.subtransactions ) {
166
+ if (dstAcc < srcAcc) {
167
+ int tmp = srcAcc;
168
+ srcAcc = dstAcc;
169
+ dstAcc = tmp;
170
+ }
171
+ while (true ) {
172
+ try {
173
+ subtransaction subtxn (txn, " withdraw" );
174
+ exec (subtxn, " update t set v = v - 1 where u=%d" , srcAcc);
175
+ break ;
176
+ } catch (pqxx_exception const & x) {
177
+ t.aborts += 1 ;
178
+ }
179
+ }
180
+ while (true ) {
181
+ try {
182
+ subtransaction subtxn (txn, " deposit" );
183
+ exec (subtxn, " update t set v = v + 1 where u=%d" , dstAcc);
184
+ break ;
185
+ } catch (pqxx_exception const & x) {
186
+ t.aborts += 1 ;
187
+ }
188
+ }
189
+ txn.commit ();
190
+ t.transactions += 1 ;
191
+ continue ;
162
192
} else if (cfg.avoidDeadlocks ) {
163
193
if (dstAcc < srcAcc) {
164
194
int tmp = srcAcc;
@@ -198,8 +228,8 @@ void initializeDatabase()
198
228
printf (" Creating database schema...\n " );
199
229
{
200
230
nontransaction txn (conn);
201
- exec (txn, " drop extension if exists multimaster" );
202
- exec (txn, " create extension multimaster" );
231
+ // exec(txn, "drop extension if exists multimaster");
232
+ // exec(txn, "create extension multimaster");
203
233
exec (txn, " drop table if exists t" );
204
234
exec (txn, " create table t(u int primary key, v int)" );
205
235
}
@@ -251,6 +281,9 @@ int main (int argc, char* argv[])
251
281
case ' d' :
252
282
cfg.avoidDeadlocks = true ;
253
283
continue ;
284
+ case ' x' :
285
+ cfg.subtransactions = true ;
286
+ continue ;
254
287
}
255
288
}
256
289
printf (" Options:\n "
@@ -260,7 +293,8 @@ int main (int argc, char* argv[])
260
293
" \t -n N\t number of iterations (1000)\n "
261
294
" \t -p N\t update percent (100)\n "
262
295
" \t -c STR\t database connection string\n "
263
- " \t -s\t scattern avoid deadlocks\n "
296
+ " \t -s\t scatter ids to avoid conflicts\n "
297
+ " \t -x\t use subtransactions\n "
264
298
" \t -d\t avoid deadlocks\n "
265
299
" \t -i\t initialize database\n " );
266
300
return 1 ;
0 commit comments