Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 332d916

Browse files
knizhnikkelvich
authored andcommitted
Fix of automatic resolve of node_id
1 parent a939b57 commit 332d916

File tree

2 files changed

+84
-45
lines changed

2 files changed

+84
-45
lines changed

multimaster.c

+23-6
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,13 @@ void MtmReleaseLocks(void)
318318
* locks[N+1..2*N] are used to synchronize access to distributed lock graph at each node
319319
* -------------------------------------------
320320
*/
321+
static timestamp_t MtmLockLastReportTime;
322+
static timestamp_t MtmLockElapsedWaitTime;
323+
static timestamp_t MtmLockMaxWaitTime;
324+
static size_t MtmLockHitCount;
325+
326+
#define DEBUG_MTM_LOCK 1
327+
321328
void MtmLock(LWLockMode mode)
322329
{
323330
if (!MtmAtExitHookRegistered) {
@@ -330,18 +337,28 @@ void MtmLock(LWLockMode mode)
330337
}
331338
else
332339
{
333-
#if DEBUG_LEVEL > 1
340+
#if DEBUG_MTM_LOCK
334341
timestamp_t start, stop;
335342
start = MtmGetSystemTime();
336343
#endif
337344
if (MyProc == NULL) { /* Can not wait if have no PGPROC. It can happen at process exit. TODO: without lock we can get race condition and corrupt Mtm state */
338345
return;
339346
}
340347
LWLockAcquire((LWLockId)&Mtm->locks[MTM_STATE_LOCK_ID], mode);
341-
#if DEBUG_LEVEL > 1
348+
#if DEBUG_MTM_LOCK
342349
stop = MtmGetSystemTime();
343-
if (stop > start + MSEC_TO_USEC(MtmHeartbeatSendTimeout)) {
344-
MTM_LOG1("%d: obtaining %s lock takes %lld microseconds", MyProcPid, (mode == LW_EXCLUSIVE ? "exclusive" : "shared"), stop - start);
350+
MtmLockElapsedWaitTime += stop - start;
351+
if (stop - start > MtmLockMaxWaitTime) {
352+
MtmLockMaxWaitTime = stop - start;
353+
}
354+
MtmLockHitCount += 1;
355+
if (stop - MtmLockLastReportTime > USECS_PER_SEC) {
356+
MTM_LOG1("%d: average lock wait time %lld usec, maximal lock wait time: %lld usec",
357+
MyProcPid, MtmLockElapsedWaitTime/MtmLockHitCount, MtmLockMaxWaitTime);
358+
MtmLockLastReportTime = stop;
359+
MtmLockMaxWaitTime = 0;
360+
MtmLockElapsedWaitTime = 0;
361+
MtmLockHitCount = 0;
345362
}
346363
#endif
347364
if (mode == LW_EXCLUSIVE) {
@@ -2869,7 +2886,7 @@ static void MtmSplitConnStrs(void)
28692886
}
28702887
for (i = 0; i < MtmNodes; i++) {
28712888
MTM_LOG3("Node %d, host %s, port=%d, my port %d", i, MtmConnections[i].hostName, MtmConnections[i].postmasterPort, PostPortNumber);
2872-
if ((strcmp(MtmConnections[i].hostName, buf) == 0 || strcmp(MtmConnections[i].hostName, "localhost") == 0)
2889+
if ((strcmp(MtmConnections[i].hostName, buf) == 0 || strcmp(MtmConnections[i].hostName, "localhost") == 0 || strcmp(MtmConnections[i].hostName, "127.0.0.1") == 0)
28732890
&& MtmConnections[i].postmasterPort == PostPortNumber)
28742891
{
28752892
if (MtmNodeId == INT_MAX) {
@@ -2880,7 +2897,7 @@ static void MtmSplitConnStrs(void)
28802897
}
28812898
}
28822899
if (MtmNodeId == INT_MAX) {
2883-
MTM_ELOG(ERROR, "multimaster.node_id and host name %s can not be located in connection strings list", buf);
2900+
MTM_ELOG(ERROR, "multimaster.node_id is not specified and host name %s can not be located in connection strings list", buf);
28842901
}
28852902
} else if (MtmNodeId > i) {
28862903
MTM_ELOG(ERROR, "Multimaster node id %d is out of range [%d..%d]", MtmNodeId, 1, MtmNodes);

tests/dtmbench.cpp

+61-39
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <stdlib.h>
55
#include <inttypes.h>
66
#include <sys/time.h>
7+
#include <unistd.h>
78
#include <pthread.h>
89

910
#include <string>
@@ -22,7 +23,7 @@ template<class T>
2223
class my_unique_ptr
2324
{
2425
T* ptr;
25-
26+
2627
public:
2728
my_unique_ptr(T* p = NULL) : ptr(p) {}
2829
~my_unique_ptr() { delete ptr; }
@@ -32,7 +33,7 @@ class my_unique_ptr
3233
void operator=(my_unique_ptr& other) {
3334
ptr = other.ptr;
3435
other.ptr = NULL;
35-
}
36+
}
3637
};
3738

3839
typedef void* (*thread_proc_t)(void*);
@@ -47,7 +48,7 @@ struct thread
4748
size_t aborts;
4849
int id;
4950

50-
void start(int tid, thread_proc_t proc) {
51+
void start(int tid, thread_proc_t proc) {
5152
id = tid;
5253
updates = 0;
5354
selects = 0;
@@ -56,7 +57,7 @@ struct thread
5657
pthread_create(&t, NULL, proc, this);
5758
}
5859

59-
void wait() {
60+
void wait() {
6061
pthread_join(t, NULL);
6162
}
6263
};
@@ -72,7 +73,7 @@ struct config
7273
bool scatter;
7374
bool avoidDeadlocks;
7475
bool subtransactions;
75-
76+
7677
config() {
7778
nReaders = 1;
7879
nWriters = 10;
@@ -118,7 +119,7 @@ T execQuery( transaction_base& txn, char const* sql, ...)
118119
va_end(args);
119120
result r = txn.exec(buf);
120121
return r[0][0].as(T());
121-
}
122+
}
122123

123124
void* reader(void* arg)
124125
{
@@ -134,7 +135,7 @@ void* reader(void* arg)
134135
result r = txn.exec("select sum(v) from t");
135136
int64_t sum = r[0][0].as(int64_t());
136137
if (sum != prevSum) {
137-
r = txn.exec("select mtm.get_snapshot()");
138+
r = txn.exec("select mtm.get_snapshot()");
138139
printf("Total=%ld, snapshot=%ld\n", sum, r[0][0].as(int64_t()));
139140
prevSum = sum;
140141
}
@@ -144,7 +145,7 @@ void* reader(void* arg)
144145
}
145146
return NULL;
146147
}
147-
148+
148149
void* writer(void* arg)
149150
{
150151
thread& t = *(thread*)arg;
@@ -153,17 +154,17 @@ void* writer(void* arg)
153154
conns[i] = new connection(cfg.connections[i]);
154155
}
155156
for (int i = 0; i < cfg.nIterations; i++)
156-
{
157-
//work
157+
{
158+
//work
158159
//transaction<repeatable_read> txn(*conns[random() % conns.size()]);
159160
transaction<read_committed> txn(*conns[random() % conns.size()]);
160161
int srcAcc = random() % cfg.nAccounts;
161162
int dstAcc = random() % cfg.nAccounts;
162-
if (cfg.scatter) {
163+
if (cfg.scatter) {
163164
srcAcc = srcAcc/cfg.nWriters*cfg.nWriters + t.id;
164165
dstAcc = dstAcc/cfg.nWriters*cfg.nWriters + t.id;
165-
} else if (cfg.subtransactions) {
166-
if (dstAcc < srcAcc) {
166+
} else if (cfg.subtransactions) {
167+
if (dstAcc < srcAcc) {
167168
int tmp = srcAcc;
168169
srcAcc = dstAcc;
169170
dstAcc = tmp;
@@ -173,7 +174,7 @@ void* writer(void* arg)
173174
subtransaction subtxn(txn, "withdraw");
174175
exec(subtxn, "update t set v = v - 1 where u=%d", srcAcc);
175176
break;
176-
} catch (pqxx_exception const& x) {
177+
} catch (pqxx_exception const& x) {
177178
t.aborts += 1;
178179
}
179180
}
@@ -182,36 +183,36 @@ void* writer(void* arg)
182183
subtransaction subtxn(txn, "deposit");
183184
exec(subtxn, "update t set v = v + 1 where u=%d", dstAcc);
184185
break;
185-
} catch (pqxx_exception const& x) {
186+
} catch (pqxx_exception const& x) {
186187
t.aborts += 1;
187188
}
188189
}
189-
txn.commit();
190+
txn.commit();
190191
t.transactions += 1;
191192
continue;
192-
} else if (cfg.avoidDeadlocks) {
193-
if (dstAcc < srcAcc) {
193+
} else if (cfg.avoidDeadlocks) {
194+
if (dstAcc < srcAcc) {
194195
int tmp = srcAcc;
195196
srcAcc = dstAcc;
196197
dstAcc = tmp;
197198
}
198199
}
199-
try {
200-
if (random() % 100 < cfg.updatePercent) {
200+
try {
201+
if (random() % 100 < cfg.updatePercent) {
201202
exec(txn, "update t set v = v - 1 where u=%d", srcAcc);
202203
exec(txn, "update t set v = v + 1 where u=%d", dstAcc);
203204
t.updates += 2;
204-
} else {
205+
} else {
205206
int64_t sum = execQuery<int64_t>(txn, "select v from t where u=%d", srcAcc)
206207
+ execQuery<int64_t>(txn, "select v from t where u=%d", dstAcc);
207-
if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) {
208+
if (sum > cfg.nIterations*cfg.nWriters || sum < -cfg.nIterations*cfg.nWriters) {
208209
printf("Wrong sum=%ld\n", sum);
209210
}
210211
t.selects += 2;
211212
}
212-
txn.commit();
213+
txn.commit();
213214
t.transactions += 1;
214-
} catch (pqxx_exception const& x) {
215+
} catch (pqxx_exception const& x) {
215216
txn.abort();
216217
t.aborts += 1;
217218
i -= 1;
@@ -220,7 +221,24 @@ void* writer(void* arg)
220221
}
221222
return NULL;
222223
}
223-
224+
225+
void* monitor(void* arg)
226+
{
227+
vector<thread>& writers = *(vector<thread>*)arg;
228+
time_t start = time(NULL);
229+
size_t elapsed = 0;
230+
while (running) {
231+
sleep(1);
232+
long total = 0;
233+
for (int i = 0; i < cfg.nWriters; i++) {
234+
total += writers[i].transactions;
235+
}
236+
printf("%d: %5ld TPS\n", int(time(NULL) - start), long(total - elapsed));
237+
elapsed = total;
238+
}
239+
return NULL;
240+
}
241+
224242
void initializeDatabase()
225243
{
226244
connection conn(cfg.connections[0]);
@@ -251,15 +269,15 @@ int main (int argc, char* argv[])
251269
return 1;
252270
}
253271

254-
for (int i = 1; i < argc; i++) {
255-
if (argv[i][0] == '-') {
256-
switch (argv[i][1]) {
272+
for (int i = 1; i < argc; i++) {
273+
if (argv[i][0] == '-') {
274+
switch (argv[i][1]) {
257275
case 'r':
258276
cfg.nReaders = atoi(argv[++i]);
259277
continue;
260278
case 'w':
261279
cfg.nWriters = atoi(argv[++i]);
262-
continue;
280+
continue;
263281
case 'a':
264282
cfg.nAccounts = atoi(argv[++i]);
265283
continue;
@@ -300,7 +318,7 @@ int main (int argc, char* argv[])
300318
return 1;
301319
}
302320

303-
if (initialize) {
321+
if (initialize) {
304322
initializeDatabase();
305323
printf("%d accounts inserted\n", cfg.nAccounts);
306324
return 0;
@@ -311,34 +329,38 @@ int main (int argc, char* argv[])
311329

312330
vector<thread> readers(cfg.nReaders);
313331
vector<thread> writers(cfg.nWriters);
332+
pthread_t logger;
333+
314334
size_t nAborts = 0;
315335
size_t nUpdates = 0;
316336
size_t nSelects = 0;
317337
size_t nTransactions = 0;
318338

319-
for (int i = 0; i < cfg.nReaders; i++) {
339+
for (int i = 0; i < cfg.nReaders; i++) {
320340
readers[i].start(i, reader);
321341
}
322-
for (int i = 0; i < cfg.nWriters; i++) {
342+
for (int i = 0; i < cfg.nWriters; i++) {
323343
writers[i].start(i, writer);
324344
}
325-
326-
for (int i = 0; i < cfg.nWriters; i++) {
345+
pthread_create(&logger, NULL, monitor, &writers);
346+
347+
for (int i = 0; i < cfg.nWriters; i++) {
327348
writers[i].wait();
328349
nUpdates += writers[i].updates;
329350
nSelects += writers[i].selects;
330351
nAborts += writers[i].aborts;
331352
nTransactions += writers[i].transactions;
332353
}
333-
354+
334355
running = false;
335356

336-
for (int i = 0; i < cfg.nReaders; i++) {
357+
for (int i = 0; i < cfg.nReaders; i++) {
337358
readers[i].wait();
338359
nSelects += readers[i].selects;
339360
nTransactions += writers[i].transactions;
340361
}
341-
362+
pthread_join(logger, NULL);
363+
342364
time_t elapsed = getCurrentTime() - start;
343365

344366
printf(
@@ -347,10 +369,10 @@ int main (int argc, char* argv[])
347369
" \"readers\":%d, \"writers\":%d, \"update_percent\":%d, \"accounts\":%d, \"iterations\":%d, \"hosts\":%ld}\n",
348370
(double)(nTransactions*USEC)/elapsed,
349371
nTransactions,
350-
nSelects,
372+
nSelects,
351373
nUpdates,
352374
nAborts,
353-
(int)(nAborts*100/nTransactions),
375+
(int)(nAborts*100/nTransactions),
354376
cfg.nReaders,
355377
cfg.nWriters,
356378
cfg.updatePercent,

0 commit comments

Comments
 (0)