@@ -1366,10 +1366,11 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1366
1366
dlist_push_tail (& state -> old_change , & change -> node );
1367
1367
1368
1368
/*
1369
- * Update the total bytes processed before releasing the current set
1370
- * of changes and restoring the new set of changes.
1369
+ * Update the total bytes processed by the txn for which we are
1370
+ * releasing the current set of changes and restoring the new set of
1371
+ * changes.
1371
1372
*/
1372
- rb -> totalBytes += rb -> size ;
1373
+ rb -> totalBytes += entry -> txn -> size ;
1373
1374
if (ReorderBufferRestoreChanges (rb , entry -> txn , & entry -> file ,
1374
1375
& state -> entries [off ].segno ))
1375
1376
{
@@ -2371,9 +2372,9 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2371
2372
iterstate = NULL ;
2372
2373
2373
2374
/*
2374
- * Update total transaction count and total transaction bytes
2375
- * processed . Ensure to not count the streamed transaction multiple
2376
- * times.
2375
+ * Update total transaction count and total bytes processed by the
2376
+ * transaction and its subtransactions . Ensure to not count the
2377
+ * streamed transaction multiple times.
2377
2378
*
2378
2379
* Note that the statistics computation has to be done after
2379
2380
* ReorderBufferIterTXNFinish as it releases the serialized change
@@ -2382,7 +2383,7 @@ ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2382
2383
if (!rbtxn_is_streamed (txn ))
2383
2384
rb -> totalTxns ++ ;
2384
2385
2385
- rb -> totalBytes += rb -> size ;
2386
+ rb -> totalBytes += txn -> total_size ;
2386
2387
2387
2388
/*
2388
2389
* Done with current changes, send the last message for this set of
@@ -3073,7 +3074,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3073
3074
{
3074
3075
Size sz ;
3075
3076
ReorderBufferTXN * txn ;
3076
- ReorderBufferTXN * toptxn = NULL ;
3077
+ ReorderBufferTXN * toptxn ;
3077
3078
3078
3079
Assert (change -> txn );
3079
3080
@@ -3087,14 +3088,14 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3087
3088
3088
3089
txn = change -> txn ;
3089
3090
3090
- /* If streaming supported, update the total size in top level as well. */
3091
- if ( ReorderBufferCanStream ( rb ))
3092
- {
3093
- if ( txn -> toptxn != NULL )
3094
- toptxn = txn -> toptxn ;
3095
- else
3096
- toptxn = txn ;
3097
- }
3091
+ /*
3092
+ * Update the total size in top level as well. This is later used to
3093
+ * compute the decoding stats.
3094
+ */
3095
+ if ( txn -> toptxn != NULL )
3096
+ toptxn = txn -> toptxn ;
3097
+ else
3098
+ toptxn = txn ;
3098
3099
3099
3100
sz = ReorderBufferChangeSize (change );
3100
3101
@@ -3104,8 +3105,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3104
3105
rb -> size += sz ;
3105
3106
3106
3107
/* Update the total size in the top transaction. */
3107
- if (toptxn )
3108
- toptxn -> total_size += sz ;
3108
+ toptxn -> total_size += sz ;
3109
3109
}
3110
3110
else
3111
3111
{
@@ -3114,8 +3114,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3114
3114
rb -> size -= sz ;
3115
3115
3116
3116
/* Update the total size in the top transaction. */
3117
- if (toptxn )
3118
- toptxn -> total_size -= sz ;
3117
+ toptxn -> total_size -= sz ;
3119
3118
}
3120
3119
3121
3120
Assert (txn -> size <= rb -> size );
0 commit comments