@@ -79,6 +79,26 @@ static void send_startup_message(LogicalDecodingContext *ctx,
79
79
80
80
static bool startup_message_sent = false;
81
81
82
+ #define OUTPUT_BUFFER_SIZE (16*1024*1024)
83
+
84
+ static void MtmOutputPluginWrite (LogicalDecodingContext * ctx , bool last_write , bool flush )
85
+ {
86
+ if (flush ) {
87
+ OutputPluginWrite (ctx , last_write );
88
+ }
89
+ }
90
+
91
+ static void MtmOutputPluginPrepareWrite (LogicalDecodingContext * ctx , bool last_write , bool flush )
92
+ {
93
+ if (!ctx -> prepared_write ) {
94
+ OutputPluginPrepareWrite (ctx , last_write );
95
+ } else if (flush || ctx -> out -> len > OUTPUT_BUFFER_SIZE ) {
96
+ OutputPluginWrite (ctx , false);
97
+ OutputPluginPrepareWrite (ctx , last_write );
98
+ }
99
+ }
100
+
101
+
82
102
/* specify output plugin callbacks */
83
103
void
84
104
_PG_output_plugin_init (OutputPluginCallbacks * cb )
@@ -388,16 +408,16 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
388
408
send_replication_origin &= txn -> origin_id != InvalidRepOriginId ;
389
409
390
410
if (data -> api ) {
391
- OutputPluginPrepareWrite (ctx , !send_replication_origin );
411
+ MtmOutputPluginPrepareWrite (ctx , !send_replication_origin , true );
392
412
data -> api -> write_begin (ctx -> out , data , txn );
393
413
394
414
if (send_replication_origin )
395
415
{
396
416
char * origin ;
397
417
398
418
/* Message boundary */
399
- OutputPluginWrite (ctx , false);
400
- OutputPluginPrepareWrite (ctx , true);
419
+ MtmOutputPluginWrite (ctx , false , false);
420
+ MtmOutputPluginPrepareWrite (ctx , true, false );
401
421
402
422
/*
403
423
* XXX: which behaviour we want here?
@@ -412,7 +432,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
412
432
replorigin_by_oid (txn -> origin_id , true, & origin ))
413
433
data -> api -> write_origin (ctx -> out , origin , txn -> origin_lsn );
414
434
}
415
- OutputPluginWrite (ctx , true);
435
+ MtmOutputPluginWrite (ctx , true, false );
416
436
}
417
437
}
418
438
@@ -422,9 +442,9 @@ pg_decode_caughtup(LogicalDecodingContext *ctx)
422
442
PGLogicalOutputData * data = (PGLogicalOutputData * )ctx -> output_plugin_private ;
423
443
424
444
if (data -> api ) {
425
- OutputPluginPrepareWrite (ctx , true);
445
+ MtmOutputPluginPrepareWrite (ctx , true , true);
426
446
data -> api -> write_caughtup (ctx -> out , data , ctx -> reader -> EndRecPtr );
427
- OutputPluginWrite (ctx , true);
447
+ MtmOutputPluginWrite (ctx , true , true);
428
448
}
429
449
}
430
450
@@ -439,9 +459,9 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
439
459
PGLogicalOutputData * data = (PGLogicalOutputData * )ctx -> output_plugin_private ;
440
460
441
461
if (data -> api ) {
442
- OutputPluginPrepareWrite (ctx , true);
462
+ MtmOutputPluginPrepareWrite (ctx , true , true);
443
463
data -> api -> write_commit (ctx -> out , data , txn , commit_lsn );
444
- OutputPluginWrite (ctx , true);
464
+ MtmOutputPluginWrite (ctx , true , true);
445
465
}
446
466
}
447
467
@@ -462,38 +482,38 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
462
482
/* TODO: add caching (send only if changed) */
463
483
if (data -> api -> write_rel )
464
484
{
465
- OutputPluginPrepareWrite (ctx , false);
485
+ MtmOutputPluginPrepareWrite (ctx , false , false);
466
486
data -> api -> write_rel (ctx -> out , data , relation );
467
- OutputPluginWrite (ctx , false);
487
+ MtmOutputPluginWrite (ctx , false , false);
468
488
}
469
489
470
490
/* Send the data */
471
491
switch (change -> action )
472
492
{
473
493
case REORDER_BUFFER_CHANGE_INSERT :
474
- OutputPluginPrepareWrite (ctx , true);
494
+ MtmOutputPluginPrepareWrite (ctx , true, false );
475
495
data -> api -> write_insert (ctx -> out , data , relation ,
476
496
& change -> data .tp .newtuple -> tuple );
477
- OutputPluginWrite (ctx , true);
497
+ MtmOutputPluginWrite (ctx , true, false );
478
498
break ;
479
499
case REORDER_BUFFER_CHANGE_UPDATE :
480
500
{
481
501
HeapTuple oldtuple = change -> data .tp .oldtuple ?
482
502
& change -> data .tp .oldtuple -> tuple : NULL ;
483
503
484
- OutputPluginPrepareWrite (ctx , true);
504
+ MtmOutputPluginPrepareWrite (ctx , true, false );
485
505
data -> api -> write_update (ctx -> out , data , relation , oldtuple ,
486
506
& change -> data .tp .newtuple -> tuple );
487
- OutputPluginWrite (ctx , true);
507
+ MtmOutputPluginWrite (ctx , true, false );
488
508
break ;
489
509
}
490
510
case REORDER_BUFFER_CHANGE_DELETE :
491
511
if (change -> data .tp .oldtuple )
492
512
{
493
- OutputPluginPrepareWrite (ctx , true);
513
+ MtmOutputPluginPrepareWrite (ctx , true, false );
494
514
data -> api -> write_delete (ctx -> out , data , relation ,
495
515
& change -> data .tp .oldtuple -> tuple );
496
- OutputPluginWrite (ctx , true);
516
+ MtmOutputPluginWrite (ctx , true, false );
497
517
}
498
518
else
499
519
elog (DEBUG1 , "didn't send DELETE change because of missing oldtuple" );
@@ -536,9 +556,9 @@ pg_decode_message(LogicalDecodingContext *ctx,
536
556
{
537
557
PGLogicalOutputData * data = (PGLogicalOutputData * )ctx -> output_plugin_private ;
538
558
539
- OutputPluginPrepareWrite (ctx , true);
559
+ MtmOutputPluginPrepareWrite (ctx , true, ! transactional );
540
560
data -> api -> write_message (ctx -> out , prefix , sz , message );
541
- OutputPluginWrite (ctx , true);
561
+ MtmOutputPluginWrite (ctx , true, ! transactional );
542
562
}
543
563
544
564
static void
@@ -559,9 +579,9 @@ send_startup_message(LogicalDecodingContext *ctx,
559
579
*/
560
580
561
581
if (data -> api ) {
562
- OutputPluginPrepareWrite (ctx , last_message );
582
+ MtmOutputPluginPrepareWrite (ctx , last_message , true );
563
583
data -> api -> write_startup_message (ctx -> out , msg );
564
- OutputPluginWrite (ctx , last_message );
584
+ MtmOutputPluginWrite (ctx , last_message , true );
565
585
}
566
586
567
587
pfree (msg );
0 commit comments