diff options
author | Masahiko Sawada | 2025-02-28 18:29:36 +0000 |
---|---|---|
committer | Masahiko Sawada | 2025-02-28 18:29:36 +0000 |
commit | 7717f63006935de00fafd000bff450280508adf1 (patch) | |
tree | 02ebf3eb211ef7a54b9d00e55f260fc8b1f835c0 /src/backend | |
parent | 77cb08be510623421fc727f35980de5107eea735 (diff) |
Refactor COPY FROM to use format callback functions.
This commit introduces a new CopyFromRoutine struct, which is a set of
callback routines to read tuples in a specific format. It also makes
COPY FROM with the existing formats (text, CSV, and binary) utilize
these format callbacks.
This change is a preliminary step towards making the COPY FROM command
extensible in terms of input formats.
Similar to 2e4127b6d2d, this refactoring contributes to a performance
improvement by reducing the number of "if" branches that need to be
checked on a per-row basis when sending field representations in text
or CSV mode. The performance benchmark results showed ~5% performance
gain in text or CSV mode.
Author: Sutou Kouhei <kou@clear-code.com>
Reviewed-by: Masahiko Sawada <sawada.mshk@gmail.com>
Reviewed-by: Michael Paquier <michael@paquier.xyz>
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: Tomas Vondra <tomas.vondra@enterprisedb.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Discussion: https://postgr.es/m/20231204.153548.2126325458835528809.kou@clear-code.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/commands/copyfrom.c | 192 | ||||
-rw-r--r-- | src/backend/commands/copyfromparse.c | 446 |
2 files changed, 398 insertions, 240 deletions
diff --git a/src/backend/commands/copyfrom.c b/src/backend/commands/copyfrom.c index 8875d79d59a..198cee2bc48 100644 --- a/src/backend/commands/copyfrom.c +++ b/src/backend/commands/copyfrom.c @@ -28,7 +28,7 @@ #include "access/tableam.h" #include "access/xact.h" #include "catalog/namespace.h" -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "commands/trigger.h" @@ -107,6 +107,145 @@ typedef struct CopyMultiInsertInfo static void ClosePipeFromProgram(CopyFromState cstate); /* + * Built-in format-specific routines. One-row callbacks are defined in + * copyfromparse.c. + */ +static void CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam); +static void CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromTextLikeEnd(CopyFromState cstate); +static void CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam); +static void CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc); +static void CopyFromBinaryEnd(CopyFromState cstate); + + +/* + * COPY FROM routines for built-in formats. + * + * CSV and text formats share the same TextLike routines except for the + * one-row callback. + */ + +/* text format */ +static const CopyFromRoutine CopyFromRoutineText = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromTextOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +/* CSV format */ +static const CopyFromRoutine CopyFromRoutineCSV = { + .CopyFromInFunc = CopyFromTextLikeInFunc, + .CopyFromStart = CopyFromTextLikeStart, + .CopyFromOneRow = CopyFromCSVOneRow, + .CopyFromEnd = CopyFromTextLikeEnd, +}; + +/* binary format */ +static const CopyFromRoutine CopyFromRoutineBinary = { + .CopyFromInFunc = CopyFromBinaryInFunc, + .CopyFromStart = CopyFromBinaryStart, + .CopyFromOneRow = CopyFromBinaryOneRow, + .CopyFromEnd = CopyFromBinaryEnd, +}; + +/* Return a COPY FROM routine for the given options */ +static const CopyFromRoutine * +CopyFromGetRoutine(CopyFormatOptions opts) +{ + if (opts.csv_mode) + return &CopyFromRoutineCSV; + else if (opts.binary) + return &CopyFromRoutineBinary; + + /* default is text */ + return &CopyFromRoutineText; +} + +/* Implementation of the start callback for text and CSV formats */ +static void +CopyFromTextLikeStart(CopyFromState cstate, TupleDesc tupDesc) +{ + AttrNumber attr_count; + + /* + * If encoding conversion is needed, we need another buffer to hold the + * converted input data. Otherwise, we can just point input_buf to the + * same buffer as raw_buf. + */ + if (cstate->need_transcoding) + { + cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); + cstate->input_buf_index = cstate->input_buf_len = 0; + } + else + cstate->input_buf = cstate->raw_buf; + cstate->input_reached_eof = false; + + initStringInfo(&cstate->line_buf); + + /* + * Create workspace for CopyReadAttributes results; used by CSV and text + * format. + */ + attr_count = list_length(cstate->attnumlist); + cstate->max_fields = attr_count; + cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); +} + +/* + * Implementation of the infunc callback for text and CSV formats. Assign + * the input function data to the given *finfo. + */ +static void +CopyFromTextLikeInFunc(CopyFromState cstate, Oid atttypid, FmgrInfo *finfo, + Oid *typioparam) +{ + Oid func_oid; + + getTypeInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the end callback for text and CSV formats */ +static void +CopyFromTextLikeEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* Implementation of the start callback for binary format */ +static void +CopyFromBinaryStart(CopyFromState cstate, TupleDesc tupDesc) +{ + /* Read and verify binary header */ + ReceiveCopyBinaryHeader(cstate); +} + +/* + * Implementation of the infunc callback for binary format. Assign + * the binary input function to the given *finfo. + */ +static void +CopyFromBinaryInFunc(CopyFromState cstate, Oid atttypid, + FmgrInfo *finfo, Oid *typioparam) +{ + Oid func_oid; + + getTypeBinaryInputInfo(atttypid, &func_oid, typioparam); + fmgr_info(func_oid, finfo); +} + +/* Implementation of the end callback for binary format */ +static void +CopyFromBinaryEnd(CopyFromState cstate) +{ + /* nothing to do */ +} + +/* * error context callback for COPY FROM * * The argument for the error context must be CopyFromState. @@ -1403,7 +1542,6 @@ BeginCopyFrom(ParseState *pstate, num_defaults; FmgrInfo *in_functions; Oid *typioparams; - Oid in_func_oid; int *defmap; ExprState **defexprs; MemoryContext oldcontext; @@ -1435,6 +1573,9 @@ BeginCopyFrom(ParseState *pstate, /* Extract options from the statement node tree */ ProcessCopyOptions(pstate, &cstate->opts, true /* is_from */ , options); + /* Set the format routine */ + cstate->routine = CopyFromGetRoutine(cstate->opts); + /* Process the target relation */ cstate->rel = rel; @@ -1590,25 +1731,6 @@ BeginCopyFrom(ParseState *pstate, cstate->raw_buf_index = cstate->raw_buf_len = 0; cstate->raw_reached_eof = false; - if (!cstate->opts.binary) - { - /* - * If encoding conversion is needed, we need another buffer to hold - * the converted input data. Otherwise, we can just point input_buf - * to the same buffer as raw_buf. - */ - if (cstate->need_transcoding) - { - cstate->input_buf = (char *) palloc(INPUT_BUF_SIZE + 1); - cstate->input_buf_index = cstate->input_buf_len = 0; - } - else - cstate->input_buf = cstate->raw_buf; - cstate->input_reached_eof = false; - - initStringInfo(&cstate->line_buf); - } - initStringInfo(&cstate->attribute_buf); /* Assign range table and rteperminfos, we'll need them in CopyFrom. */ @@ -1641,13 +1763,9 @@ BeginCopyFrom(ParseState *pstate, continue; /* Fetch the input function and typioparam info */ - if (cstate->opts.binary) - getTypeBinaryInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - else - getTypeInputInfo(att->atttypid, - &in_func_oid, &typioparams[attnum - 1]); - fmgr_info(in_func_oid, &in_functions[attnum - 1]); + cstate->routine->CopyFromInFunc(cstate, att->atttypid, + &in_functions[attnum - 1], + &typioparams[attnum - 1]); /* Get default info if available */ defexprs[attnum - 1] = NULL; @@ -1782,20 +1900,7 @@ BeginCopyFrom(ParseState *pstate, pgstat_progress_update_multi_param(3, progress_cols, progress_vals); - if (cstate->opts.binary) - { - /* Read and verify binary header */ - ReceiveCopyBinaryHeader(cstate); - } - - /* create workspace for CopyReadAttributes results */ - if (!cstate->opts.binary) - { - AttrNumber attr_count = list_length(cstate->attnumlist); - - cstate->max_fields = attr_count; - cstate->raw_fields = (char **) palloc(attr_count * sizeof(char *)); - } + cstate->routine->CopyFromStart(cstate, tupDesc); MemoryContextSwitchTo(oldcontext); @@ -1808,6 +1913,9 @@ BeginCopyFrom(ParseState *pstate, void EndCopyFrom(CopyFromState cstate) { + /* Invoke the end callback */ + cstate->routine->CopyFromEnd(cstate); + /* No COPY FROM related resources except memory. */ if (cstate->is_program) { diff --git a/src/backend/commands/copyfromparse.c b/src/backend/commands/copyfromparse.c index caccdc8563c..bad577aa67b 100644 --- a/src/backend/commands/copyfromparse.c +++ b/src/backend/commands/copyfromparse.c @@ -62,7 +62,7 @@ #include <unistd.h> #include <sys/stat.h> -#include "commands/copy.h" +#include "commands/copyapi.h" #include "commands/copyfrom_internal.h" #include "commands/progress.h" #include "executor/executor.h" @@ -140,13 +140,18 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* non-export function prototypes */ -static bool CopyReadLine(CopyFromState cstate); -static bool CopyReadLineText(CopyFromState cstate); +static bool CopyReadLine(CopyFromState cstate, bool is_csv); +static bool CopyReadLineText(CopyFromState cstate, bool is_csv); static int CopyReadAttributesText(CopyFromState cstate); static int CopyReadAttributesCSV(CopyFromState cstate); static Datum CopyReadBinaryAttribute(CopyFromState cstate, FmgrInfo *flinfo, Oid typioparam, int32 typmod, bool *isnull); +static pg_attribute_always_inline bool CopyFromTextLikeOneRow(CopyFromState cstate, + ExprContext *econtext, + Datum *values, + bool *nulls, + bool is_csv); /* Low-level communications functions */ @@ -740,9 +745,12 @@ CopyReadBinaryData(CopyFromState cstate, char *dest, int nbytes) * in the relation. * * NOTE: force_not_null option are not applied to the returned fields. + * + * We use pg_attribute_always_inline to reduce function call overhead + * and to help compilers to optimize away the 'is_csv' condition. */ -bool -NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) +static pg_attribute_always_inline bool +NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields, bool is_csv) { int fldct; bool done; @@ -759,13 +767,13 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) tupDesc = RelationGetDescr(cstate->rel); cstate->cur_lineno++; - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); if (cstate->opts.header_line == COPY_HEADER_MATCH) { int fldnum; - if (cstate->opts.csv_mode) + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -809,7 +817,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) cstate->cur_lineno++; /* Actually read the line into memory here */ - done = CopyReadLine(cstate); + done = CopyReadLine(cstate, is_csv); /* * EOF at start of line means we're done. If we see EOF after some @@ -820,7 +828,7 @@ NextCopyFromRawFields(CopyFromState cstate, char ***fields, int *nfields) return false; /* Parse the line into de-escaped field values */ - if (cstate->opts.csv_mode) + if (is_csv) fldct = CopyReadAttributesCSV(cstate); else fldct = CopyReadAttributesText(cstate); @@ -847,233 +855,275 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, { TupleDesc tupDesc; AttrNumber num_phys_attrs, - attr_count, num_defaults = cstate->num_defaults; - FmgrInfo *in_functions = cstate->in_functions; - Oid *typioparams = cstate->typioparams; int i; int *defmap = cstate->defmap; ExprState **defexprs = cstate->defexprs; tupDesc = RelationGetDescr(cstate->rel); num_phys_attrs = tupDesc->natts; - attr_count = list_length(cstate->attnumlist); /* Initialize all values for row to NULL */ MemSet(values, 0, num_phys_attrs * sizeof(Datum)); MemSet(nulls, true, num_phys_attrs * sizeof(bool)); MemSet(cstate->defaults, false, num_phys_attrs * sizeof(bool)); - if (!cstate->opts.binary) + /* Get one row from source */ + if (!cstate->routine->CopyFromOneRow(cstate, econtext, values, nulls)) + return false; + + /* + * Now compute and insert any defaults available for the columns not + * provided by the input data. Anything not processed here or above will + * remain NULL. + */ + for (i = 0; i < num_defaults; i++) { - char **field_strings; - ListCell *cur; - int fldct; - int fieldno; - char *string; + /* + * The caller must supply econtext and have switched into the + * per-tuple memory context in it. + */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - /* read raw fields in the next line */ - if (!NextCopyFromRawFields(cstate, &field_strings, &fldct)) - return false; + values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext, + &nulls[defmap[i]]); + } + + return true; +} + +/* Implementation of the per-row callback for text format */ +bool +CopyFromTextOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, false); +} + +/* Implementation of the per-row callback for CSV format */ +bool +CopyFromCSVOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + return CopyFromTextLikeOneRow(cstate, econtext, values, nulls, true); +} - /* check for overflowing fields */ - if (attr_count > 0 && fldct > attr_count) +/* + * Workhorse for CopyFromTextOneRow() and CopyFromCSVOneRow(). + * + * We use pg_attribute_always_inline to reduce function call overhead + * and to help compilers to optimize away the 'is_csv' condition. + */ +static pg_attribute_always_inline bool +CopyFromTextLikeOneRow(CopyFromState cstate, ExprContext *econtext, + Datum *values, bool *nulls, bool is_csv) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + ExprState **defexprs = cstate->defexprs; + char **field_strings; + ListCell *cur; + int fldct; + int fieldno; + char *string; + + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); + + /* read raw fields in the next line */ + if (!NextCopyFromRawFields(cstate, &field_strings, &fldct, is_csv)) + return false; + + /* check for overflowing fields */ + if (attr_count > 0 && fldct > attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("extra data after last expected column"))); + + fieldno = 0; + + /* Loop to read the user attributes on the line. */ + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + if (fieldno >= fldct) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("extra data after last expected column"))); - - fieldno = 0; + errmsg("missing data for column \"%s\"", + NameStr(att->attname)))); + string = field_strings[fieldno++]; - /* Loop to read the user attributes on the line. */ - foreach(cur, cstate->attnumlist) + if (cstate->convert_select_flags && + !cstate->convert_select_flags[m]) { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - if (fieldno >= fldct) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("missing data for column \"%s\"", - NameStr(att->attname)))); - string = field_strings[fieldno++]; + /* ignore input field, leaving column as NULL */ + continue; + } - if (cstate->convert_select_flags && - !cstate->convert_select_flags[m]) + if (is_csv) + { + if (string == NULL && + cstate->opts.force_notnull_flags[m]) { - /* ignore input field, leaving column as NULL */ - continue; + /* + * FORCE_NOT_NULL option is set and column is NULL - convert + * it to the NULL string. + */ + string = cstate->opts.null_print; } - - if (cstate->opts.csv_mode) + else if (string != NULL && cstate->opts.force_null_flags[m] + && strcmp(string, cstate->opts.null_print) == 0) { - if (string == NULL && - cstate->opts.force_notnull_flags[m]) - { - /* - * FORCE_NOT_NULL option is set and column is NULL - - * convert it to the NULL string. - */ - string = cstate->opts.null_print; - } - else if (string != NULL && cstate->opts.force_null_flags[m] - && strcmp(string, cstate->opts.null_print) == 0) - { - /* - * FORCE_NULL option is set and column matches the NULL - * string. It must have been quoted, or otherwise the - * string would already have been set to NULL. Convert it - * to NULL as specified. - */ - string = NULL; - } + /* + * FORCE_NULL option is set and column matches the NULL + * string. It must have been quoted, or otherwise the string + * would already have been set to NULL. Convert it to NULL as + * specified. + */ + string = NULL; } + } - cstate->cur_attname = NameStr(att->attname); - cstate->cur_attval = string; + cstate->cur_attname = NameStr(att->attname); + cstate->cur_attval = string; - if (string != NULL) - nulls[m] = false; + if (string != NULL) + nulls[m] = false; - if (cstate->defaults[m]) - { - /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. - */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + if (cstate->defaults[m]) + { + /* We must have switched into the per-tuple memory context */ + Assert(econtext != NULL); + Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); - values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); - } + values[m] = ExecEvalExpr(defexprs[m], econtext, &nulls[m]); + } - /* - * If ON_ERROR is specified with IGNORE, skip rows with soft - * errors - */ - else if (!InputFunctionCallSafe(&in_functions[m], - string, - typioparams[m], - att->atttypmod, - (Node *) cstate->escontext, - &values[m])) - { - Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); + /* + * If ON_ERROR is specified with IGNORE, skip rows with soft errors + */ + else if (!InputFunctionCallSafe(&in_functions[m], + string, + typioparams[m], + att->atttypmod, + (Node *) cstate->escontext, + &values[m])) + { + Assert(cstate->opts.on_error != COPY_ON_ERROR_STOP); - cstate->num_errors++; + cstate->num_errors++; - if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) - { - /* - * Since we emit line number and column info in the below - * notice message, we suppress error context information - * other than the relation name. - */ - Assert(!cstate->relname_only); - cstate->relname_only = true; + if (cstate->opts.log_verbosity == COPY_LOG_VERBOSITY_VERBOSE) + { + /* + * Since we emit line number and column info in the below + * notice message, we suppress error context information other + * than the relation name. + */ + Assert(!cstate->relname_only); + cstate->relname_only = true; - if (cstate->cur_attval) - { - char *attval; - - attval = CopyLimitPrintoutLength(cstate->cur_attval); - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname, - attval)); - pfree(attval); - } - else - ereport(NOTICE, - errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", - (unsigned long long) cstate->cur_lineno, - cstate->cur_attname)); - - /* reset relname_only */ - cstate->relname_only = false; + if (cstate->cur_attval) + { + char *attval; + + attval = CopyLimitPrintoutLength(cstate->cur_attval); + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": \"%s\"", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname, + attval)); + pfree(attval); } + else + ereport(NOTICE, + errmsg("skipping row due to data type incompatibility at line %llu for column \"%s\": null input", + (unsigned long long) cstate->cur_lineno, + cstate->cur_attname)); - return true; + /* reset relname_only */ + cstate->relname_only = false; } - cstate->cur_attname = NULL; - cstate->cur_attval = NULL; + return true; } - Assert(fieldno == attr_count); + cstate->cur_attname = NULL; + cstate->cur_attval = NULL; } - else - { - /* binary */ - int16 fld_count; - ListCell *cur; - cstate->cur_lineno++; + Assert(fieldno == attr_count); - if (!CopyGetInt16(cstate, &fld_count)) - { - /* EOF detected (end of file, or protocol-level EOF) */ - return false; - } + return true; +} - if (fld_count == -1) - { - /* - * Received EOF marker. Wait for the protocol-level EOF, and - * complain if it doesn't come immediately. In COPY FROM STDIN, - * this ensures that we correctly handle CopyFail, if client - * chooses to send that now. When copying from file, we could - * ignore the rest of the file like in text mode, but we choose to - * be consistent with the COPY FROM STDIN case. - */ - char dummy; +/* Implementation of the per-row callback for binary format */ +bool +CopyFromBinaryOneRow(CopyFromState cstate, ExprContext *econtext, Datum *values, + bool *nulls) +{ + TupleDesc tupDesc; + AttrNumber attr_count; + FmgrInfo *in_functions = cstate->in_functions; + Oid *typioparams = cstate->typioparams; + int16 fld_count; + ListCell *cur; - if (CopyReadBinaryData(cstate, &dummy, 1) > 0) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("received copy data after EOF marker"))); - return false; - } + tupDesc = RelationGetDescr(cstate->rel); + attr_count = list_length(cstate->attnumlist); - if (fld_count != attr_count) - ereport(ERROR, - (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - errmsg("row field count is %d, expected %d", - (int) fld_count, attr_count))); + cstate->cur_lineno++; - foreach(cur, cstate->attnumlist) - { - int attnum = lfirst_int(cur); - int m = attnum - 1; - Form_pg_attribute att = TupleDescAttr(tupDesc, m); - - cstate->cur_attname = NameStr(att->attname); - values[m] = CopyReadBinaryAttribute(cstate, - &in_functions[m], - typioparams[m], - att->atttypmod, - &nulls[m]); - cstate->cur_attname = NULL; - } + if (!CopyGetInt16(cstate, &fld_count)) + { + /* EOF detected (end of file, or protocol-level EOF) */ + return false; } - /* - * Now compute and insert any defaults available for the columns not - * provided by the input data. Anything not processed here or above will - * remain NULL. - */ - for (i = 0; i < num_defaults; i++) + if (fld_count == -1) { /* - * The caller must supply econtext and have switched into the - * per-tuple memory context in it. + * Received EOF marker. Wait for the protocol-level EOF, and complain + * if it doesn't come immediately. In COPY FROM STDIN, this ensures + * that we correctly handle CopyFail, if client chooses to send that + * now. When copying from file, we could ignore the rest of the file + * like in text mode, but we choose to be consistent with the COPY + * FROM STDIN case. */ - Assert(econtext != NULL); - Assert(CurrentMemoryContext == econtext->ecxt_per_tuple_memory); + char dummy; - values[defmap[i]] = ExecEvalExpr(defexprs[defmap[i]], econtext, - &nulls[defmap[i]]); + if (CopyReadBinaryData(cstate, &dummy, 1) > 0) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("received copy data after EOF marker"))); + return false; + } + + if (fld_count != attr_count) + ereport(ERROR, + (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), + errmsg("row field count is %d, expected %d", + (int) fld_count, attr_count))); + + foreach(cur, cstate->attnumlist) + { + int attnum = lfirst_int(cur); + int m = attnum - 1; + Form_pg_attribute att = TupleDescAttr(tupDesc, m); + + cstate->cur_attname = NameStr(att->attname); + values[m] = CopyReadBinaryAttribute(cstate, + &in_functions[m], + typioparams[m], + att->atttypmod, + &nulls[m]); + cstate->cur_attname = NULL; } return true; @@ -1087,7 +1137,7 @@ NextCopyFrom(CopyFromState cstate, ExprContext *econtext, * in the final value of line_buf. */ static bool -CopyReadLine(CopyFromState cstate) +CopyReadLine(CopyFromState cstate, bool is_csv) { bool result; @@ -1095,7 +1145,7 @@ CopyReadLine(CopyFromState cstate) cstate->line_buf_valid = false; /* Parse data and transfer into line_buf */ - result = CopyReadLineText(cstate); + result = CopyReadLineText(cstate, is_csv); if (result) { @@ -1163,7 +1213,7 @@ CopyReadLine(CopyFromState cstate) * CopyReadLineText - inner loop of CopyReadLine for text mode */ static bool -CopyReadLineText(CopyFromState cstate) +CopyReadLineText(CopyFromState cstate, bool is_csv) { char *copy_input_buf; int input_buf_ptr; @@ -1178,7 +1228,7 @@ CopyReadLineText(CopyFromState cstate) char quotec = '\0'; char escapec = '\0'; - if (cstate->opts.csv_mode) + if (is_csv) { quotec = cstate->opts.quote[0]; escapec = cstate->opts.escape[0]; @@ -1255,7 +1305,7 @@ CopyReadLineText(CopyFromState cstate) prev_raw_ptr = input_buf_ptr; c = copy_input_buf[input_buf_ptr++]; - if (cstate->opts.csv_mode) + if (is_csv) { /* * If character is '\r', we may need to look ahead below. Force @@ -1294,7 +1344,7 @@ CopyReadLineText(CopyFromState cstate) } /* Process \r */ - if (c == '\r' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\r' && (!is_csv || !in_quote)) { /* Check for \r\n on first line, _and_ handle \r\n. */ if (cstate->eol_type == EOL_UNKNOWN || @@ -1322,10 +1372,10 @@ CopyReadLineText(CopyFromState cstate) if (cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); @@ -1339,10 +1389,10 @@ CopyReadLineText(CopyFromState cstate) else if (cstate->eol_type == EOL_NL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal carriage return found in data") : errmsg("unquoted carriage return found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\r\" to represent carriage return.") : errhint("Use quoted CSV field to represent carriage return."))); /* If reach here, we have found the line terminator */ @@ -1350,15 +1400,15 @@ CopyReadLineText(CopyFromState cstate) } /* Process \n */ - if (c == '\n' && (!cstate->opts.csv_mode || !in_quote)) + if (c == '\n' && (!is_csv || !in_quote)) { if (cstate->eol_type == EOL_CR || cstate->eol_type == EOL_CRNL) ereport(ERROR, (errcode(ERRCODE_BAD_COPY_FILE_FORMAT), - !cstate->opts.csv_mode ? + !is_csv ? errmsg("literal newline found in data") : errmsg("unquoted newline found in data"), - !cstate->opts.csv_mode ? + !is_csv ? errhint("Use \"\\n\" to represent newline.") : errhint("Use quoted CSV field to represent newline."))); cstate->eol_type = EOL_NL; /* in case not set yet */ @@ -1370,7 +1420,7 @@ CopyReadLineText(CopyFromState cstate) * Process backslash, except in CSV mode where backslash is a normal * character. */ - if (c == '\\' && !cstate->opts.csv_mode) + if (c == '\\' && !is_csv) { char c2; |