Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 4be273a

Browse files
author
Nikita Malakhov
committed
Logical replication data reconstruction for TOAST API
Reference logical replication mechanics could process only data TOASTed by reference TOAST - using External TOAST Pointers, and data reconstruction is done inside replication mechanics instead of Toaster used to TOAST data. This patch hides data reconstruction into Toaster, adding reconstruction function to TOAST API and moving reference reconstruction code into Generic Toaster. Author: Teodor Sigaev <teodor@sigaev.ru> Author: Oleg Bartunov <obartunov@postgrespro.ru> Author: Nikita Glukhov <n.gluhov@postgrespro.ru> Author: Nikita Malakhov <n.malakhov@postgrespro.ru>
1 parent 8207a8f commit 4be273a

File tree

14 files changed

+265
-82
lines changed

14 files changed

+265
-82
lines changed

src/backend/access/toast/generic_toaster.c

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
#include "utils/fmgroids.h"
5353
#include "access/generic_toaster.h"
5454
#include "access/toast_compression.h"
55+
#include "replication/reorderbuffer.h"
5556

5657
/*
5758
* Callback function signatures --- see toaster.sgml for more info.
@@ -63,10 +64,10 @@
6364
*/
6465
static void
6566
generic_toast_init(Relation rel, Oid toastoid, Oid toastindexoid, Datum reloptions, LOCKMODE lockmode,
66-
bool check, Oid OIDOldToast)
67+
bool check, Oid OIDOldToast)
6768
{
6869
(void) create_toast_table(rel, toastoid, toastindexoid, reloptions, lockmode,
69-
check, OIDOldToast);
70+
check, OIDOldToast);
7071
}
7172

7273

@@ -82,8 +83,8 @@ generic_toast(Relation toast_rel, Oid toasterid, Datum value, Datum oldvalue,
8283
Assert(toast_rel != NULL);
8384

8485
result = toast_save_datum(toast_rel, value,
85-
(struct varlena *) DatumGetPointer(oldvalue),
86-
options);
86+
(struct varlena *) DatumGetPointer(oldvalue),
87+
options);
8788
return result;
8889
}
8990

@@ -107,7 +108,7 @@ generic_detoast(Datum toast_ptr, int offset, int length)
107108
else
108109
{
109110
result = toast_fetch_datum_slice(tvalue,
110-
offset, length);
111+
offset, length);
111112
}
112113

113114
return PointerGetDatum(result);
@@ -616,6 +617,83 @@ generic_toaster_vtable(Datum toast_ptr)
616617
return routine;
617618
}
618619

620+
struct varlena *
621+
generic_toaster_reconstruct(Relation toastrel, struct varlena *varlena,
622+
HTAB *toast_hash)
623+
{
624+
struct varatt_external toast_pointer;
625+
struct varlena *reconstructed;
626+
ReorderBufferToastEnt *ent;
627+
dlist_iter it;
628+
Size data_done = 0;
629+
TupleDesc toast_desc;
630+
631+
if (!VARATT_IS_EXTERNAL_ONDISK(varlena))
632+
return NULL;
633+
634+
if (!toast_hash)
635+
return NULL;
636+
637+
VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
638+
639+
/*
640+
* Check whether the toast tuple changed, replace if so.
641+
*/
642+
ent = (ReorderBufferToastEnt *)
643+
hash_search(toast_hash,
644+
(void *) &toast_pointer.va_valueid,
645+
HASH_FIND,
646+
NULL);
647+
648+
if (ent == NULL)
649+
return NULL;
650+
651+
reconstructed = palloc0(toast_pointer.va_rawsize);
652+
653+
ent->reconstructed = reconstructed;
654+
655+
toast_desc = RelationGetDescr(toastrel);
656+
657+
/* stitch toast tuple back together from its parts */
658+
dlist_foreach(it, &ent->chunks)
659+
{
660+
bool isnull;
661+
ReorderBufferChange *cchange;
662+
ReorderBufferTupleBuf *ctup;
663+
Pointer chunk;
664+
665+
cchange = dlist_container(ReorderBufferChange, node, it.cur);
666+
ctup = cchange->data.tp.newtuple;
667+
chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
668+
669+
Assert(!isnull);
670+
Assert(!VARATT_IS_EXTERNAL(chunk));
671+
Assert(!VARATT_IS_SHORT(chunk));
672+
673+
memcpy(VARDATA(reconstructed) + data_done,
674+
VARDATA(chunk),
675+
VARSIZE(chunk) - VARHDRSZ);
676+
data_done += VARSIZE(chunk) - VARHDRSZ;
677+
}
678+
Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
679+
680+
/* make sure its marked as compressed or not */
681+
if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
682+
SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
683+
else
684+
SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
685+
686+
return reconstructed;
687+
}
688+
689+
static Datum
690+
generic_reconstruct(Relation toastrel, struct varlena *varlena,
691+
HTAB *toast_hash, bool *need_free)
692+
{
693+
*need_free = false;
694+
return PointerGetDatum(generic_toaster_reconstruct(toastrel, varlena, toast_hash));
695+
}
696+
619697
Datum
620698
default_toaster_handler(PG_FUNCTION_ARGS)
621699
{
@@ -628,6 +706,7 @@ default_toaster_handler(PG_FUNCTION_ARGS)
628706
tsrroutine->update_toast = NULL;
629707
tsrroutine->copy_toast = NULL;
630708
tsrroutine->get_vtable = generic_toaster_vtable;
709+
tsrroutine->reconstruct = generic_reconstruct;
631710
tsrroutine->toastervalidate = generic_validate;
632711

633712
PG_RETURN_POINTER(tsrroutine);

src/backend/replication/logical/proto.c

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include "replication/logicalproto.h"
2020
#include "utils/lsyscache.h"
2121
#include "utils/syscache.h"
22+
#include "access/toasterapi.h"
2223

2324
/*
2425
* Protocol message flags.
@@ -814,7 +815,9 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
814815
continue;
815816
}
816817

817-
if (att->attlen == -1 && VARATT_IS_EXTERNAL_ONDISK(values[i]))
818+
if (att->attlen == -1 &&
819+
(VARATT_IS_EXTERNAL_ONDISK(values[i]) ||
820+
VARATT_IS_CUSTOM(values[i])))
818821
{
819822
/*
820823
* Unchanged toasted datum. (Note that we don't promise to detect
@@ -830,6 +833,31 @@ logicalrep_write_tuple(StringInfo out, Relation rel, TupleTableSlot *slot,
830833
elog(ERROR, "cache lookup failed for type %u", att->atttypid);
831834
typclass = (Form_pg_type) GETSTRUCT(typtup);
832835

836+
if (att->attlen == -1 && VARATT_IS_EXTERNAL_INDIRECT(values[i]))
837+
{
838+
struct varatt_indirect redirect;
839+
struct varlena *attr;
840+
841+
VARATT_EXTERNAL_GET_POINTER(redirect, values[i]);
842+
attr = (struct varlena *) redirect.pointer;
843+
844+
/* Send type diff, if it is a custom pointer. */
845+
if (VARATT_IS_CUSTOM(attr))
846+
{
847+
int len = VARATT_CUSTOM_GET_DATA_SIZE(attr);
848+
849+
if (!OidIsValid(typclass->typapplydiff))
850+
elog(ERROR, "toaster returned diff, but datatype does not support diffs");
851+
852+
pq_sendbyte(out, LOGICALREP_COLUMN_DIFF);
853+
pq_sendint(out, len, 4); /* length */
854+
pq_sendbytes(out, VARATT_CUSTOM_GET_DATA(attr), len); /* data */
855+
856+
ReleaseSysCache(typtup);
857+
continue;
858+
}
859+
}
860+
833861
/*
834862
* Send in binary if requested and type has suitable send function.
835863
*/
@@ -907,6 +935,7 @@ logicalrep_read_tuple(StringInfo in, LogicalRepTupleData *tuple)
907935
value->maxlen = len;
908936
break;
909937
case LOGICALREP_COLUMN_BINARY:
938+
case LOGICALREP_COLUMN_DIFF:
910939
len = pq_getmsgint(in, 4); /* read length */
911940

912941
/* and data */

src/backend/replication/logical/reorderbuffer.c

Lines changed: 31 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
#include "access/toasterapi.h"
8888
#include "access/heapam.h"
8989
#include "access/rewriteheap.h"
90+
#include "access/toast_helper.h"
9091
#include "access/transam.h"
9192
#include "access/xact.h"
9293
#include "access/xlog_internal.h"
@@ -157,19 +158,6 @@ typedef struct ReorderBufferIterTXNState
157158
ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
158159
} ReorderBufferIterTXNState;
159160

160-
/* toast datastructures */
161-
typedef struct ReorderBufferToastEnt
162-
{
163-
Oid chunk_id; /* toast_table.chunk_id */
164-
int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
165-
* have seen */
166-
Size num_chunks; /* number of chunks we've already seen */
167-
Size size; /* combined size of chunks seen */
168-
dlist_head chunks; /* linked list of chunks */
169-
struct varlena *reconstructed; /* reconstructed varlena now pointed to in
170-
* main tup */
171-
} ReorderBufferToastEnt;
172-
173161
/* Disk serialization support datastructures */
174162
typedef struct ReorderBufferDiskChange
175163
{
@@ -4724,15 +4712,16 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
47244712
bool *free;
47254713
HeapTuple tmphtup;
47264714
Relation toast_rel;
4727-
TupleDesc toast_desc;
47284715
MemoryContext oldcontext;
47294716
ReorderBufferTupleBuf *newtup;
47304717
Size old_size;
47314718

47324719
/* no toast tuples changed */
4733-
if (txn->toast_hash == NULL)
4734-
return;
4735-
4720+
if (!change->data.tp.newtuple ||
4721+
!HeapTupleHasExternal(&change->data.tp.newtuple->tuple))
4722+
{
4723+
return;
4724+
}
47364725
/*
47374726
* We're going to modify the size of the change. So, to make sure the
47384727
* accounting is correct we record the current change size and then after
@@ -4757,8 +4746,6 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
47574746
elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
47584747
relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
47594748

4760-
toast_desc = RelationGetDescr(toast_rel);
4761-
47624749
/* should we allocate from stack instead? */
47634750
attrs = palloc0(sizeof(Datum) * desc->natts);
47644751
isnull = palloc0(sizeof(bool) * desc->natts);
@@ -4771,16 +4758,14 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
47714758
for (natt = 0; natt < desc->natts; natt++)
47724759
{
47734760
Form_pg_attribute attr = TupleDescAttr(desc, natt);
4774-
ReorderBufferToastEnt *ent;
4761+
TsrRoutine *toaster;
47754762
struct varlena *varlena;
47764763

47774764
/* va_rawsize is the size of the original datum -- including header */
4778-
struct varatt_external toast_pointer;
47794765
struct varatt_indirect redirect_pointer;
47804766
struct varlena *new_datum = NULL;
47814767
struct varlena *reconstructed;
4782-
dlist_iter it;
4783-
Size data_done = 0;
4768+
bool need_free;
47844769

47854770
/* system columns aren't toasted */
47864771
if (attr->attnum < 0)
@@ -4801,68 +4786,40 @@ ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
48014786
varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
48024787

48034788
/* no need to do anything if the tuple isn't external */
4804-
if (!VARATT_IS_EXTERNAL(varlena))
4805-
continue;
4806-
4807-
VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
4808-
4809-
/*
4810-
* Check whether the toast tuple changed, replace if so.
4811-
*/
4812-
ent = (ReorderBufferToastEnt *)
4813-
hash_search(txn->toast_hash,
4814-
(void *) &toast_pointer.va_valueid,
4815-
HASH_FIND,
4816-
NULL);
4817-
if (ent == NULL)
4789+
if (!VARATT_IS_EXTERNAL_ONDISK(varlena) &&
4790+
!VARATT_IS_CUSTOM(varlena))
48184791
continue;
48194792

4820-
new_datum =
4821-
(struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4822-
4823-
free[natt] = true;
4824-
4825-
reconstructed = palloc0(toast_pointer.va_rawsize);
4826-
4827-
ent->reconstructed = reconstructed;
4828-
4829-
/* stitch toast tuple back together from its parts */
4830-
dlist_foreach(it, &ent->chunks)
4793+
toaster = SearchTsrCache(attr->atttoaster);
4794+
if (toaster->reconstruct)
48314795
{
4832-
bool isnull;
4833-
ReorderBufferChange *cchange;
4834-
ReorderBufferTupleBuf *ctup;
4835-
Pointer chunk;
4836-
4837-
cchange = dlist_container(ReorderBufferChange, node, it.cur);
4838-
ctup = cchange->data.tp.newtuple;
4839-
chunk = DatumGetPointer(fastgetattr(&ctup->tuple, 3, toast_desc, &isnull));
4840-
4841-
Assert(!isnull);
4842-
Assert(!VARATT_IS_EXTERNAL(chunk));
4843-
Assert(!VARATT_IS_SHORT(chunk));
4844-
4845-
memcpy(VARDATA(reconstructed) + data_done,
4846-
VARDATA(chunk),
4847-
VARSIZE(chunk) - VARHDRSZ);
4848-
data_done += VARSIZE(chunk) - VARHDRSZ;
4796+
reconstructed = (struct varlena *) DatumGetPointer(
4797+
toaster->reconstruct(toast_rel, varlena, txn->toast_hash, &need_free));
48494798
}
4850-
Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
4851-
4852-
/* make sure its marked as compressed or not */
4853-
if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
4854-
SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
48554799
else
4856-
SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
4800+
{
4801+
ereport(ERROR,
4802+
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
4803+
errmsg("TOASTER does not support reconstruction of values")));
4804+
}
4805+
4806+
if (!reconstructed)
4807+
continue;
4808+
4809+
if (need_free)
4810+
txn->toast_reconstructed = lappend(txn->toast_reconstructed, reconstructed);
48574811

48584812
memset(&redirect_pointer, 0, sizeof(redirect_pointer));
48594813
redirect_pointer.pointer = reconstructed;
48604814

4815+
new_datum = (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
4816+
48614817
SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
48624818
memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
48634819
sizeof(redirect_pointer));
48644820

48654821
attrs[natt] = PointerGetDatum(new_datum);
4822+
free[natt] = true;
48664823
}
48674824

48684825
/*
@@ -4910,6 +4867,9 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
49104867
HASH_SEQ_STATUS hstat;
49114868
ReorderBufferToastEnt *ent;
49124869

4870+
list_free_deep(txn->toast_reconstructed);
4871+
txn->toast_reconstructed = NIL;
4872+
49134873
if (txn->toast_hash == NULL)
49144874
return;
49154875

0 commit comments

Comments
 (0)