8
8
*
9
9
*
10
10
* IDENTIFICATION
11
- * $PostgreSQL: pgsql/src/interfaces/libpq/fe-protocol3.c,v 1.31 2008/01/01 19 :46:00 momjian Exp $
11
+ * $PostgreSQL: pgsql/src/interfaces/libpq/fe-protocol3.c,v 1.32 2008/01/14 18 :46:17 tgl Exp $
12
12
*
13
13
*-------------------------------------------------------------------------
14
14
*/
@@ -1274,16 +1274,13 @@ getReadyForQuery(PGconn *conn)
1274
1274
}
1275
1275
1276
1276
/*
1277
- * PQgetCopyData - read a row of data from the backend during COPY OUT
1277
+ * getCopyDataMessage - fetch next CopyData message, process async messages
1278
1278
*
1279
- * If successful, sets *buffer to point to a malloc'd row of data, and
1280
- * returns row length (always > 0) as result.
1281
- * Returns 0 if no row available yet (only possible if async is true),
1282
- * -1 if end of copy (consult PQgetResult), or -2 if error (consult
1283
- * PQerrorMessage).
1279
+ * Returns length word of CopyData message (> 0), or 0 if no complete
1280
+ * message available, -1 if end of copy, -2 if error.
1284
1281
*/
1285
- int
1286
- pqGetCopyData3 (PGconn * conn , char * * buffer , int async )
1282
+ static int
1283
+ getCopyDataMessage (PGconn * conn )
1287
1284
{
1288
1285
char id ;
1289
1286
int msgLength ;
@@ -1298,22 +1295,94 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
1298
1295
*/
1299
1296
conn -> inCursor = conn -> inStart ;
1300
1297
if (pqGetc (& id , conn ))
1301
- goto nodata ;
1298
+ return 0 ;
1302
1299
if (pqGetInt (& msgLength , 4 , conn ))
1303
- goto nodata ;
1300
+ return 0 ;
1301
+ if (msgLength < 4 )
1302
+ {
1303
+ handleSyncLoss (conn , id , msgLength );
1304
+ return -2 ;
1305
+ }
1304
1306
avail = conn -> inEnd - conn -> inCursor ;
1305
1307
if (avail < msgLength - 4 )
1306
- goto nodata ;
1308
+ return 0 ;
1307
1309
1308
1310
/*
1309
- * If it's anything except Copy Data, exit COPY_OUT mode and let
1310
- * caller read status with PQgetResult(). The normal case is that
1311
- * it's Copy Done, but we let parseInput read that.
1311
+ * If it's a legitimate async message type, process it. (NOTIFY
1312
+ * messages are not currently possible here, but we handle them for
1313
+ * completeness. NOTICE is definitely possible, and ParameterStatus
1314
+ * could probably be made to happen.) Otherwise, if it's anything
1315
+ * except Copy Data, report end-of-copy.
1312
1316
*/
1313
- if (id != 'd' )
1317
+ switch (id )
1314
1318
{
1315
- conn -> asyncStatus = PGASYNC_BUSY ;
1316
- return -1 ;
1319
+ case 'A' : /* NOTIFY */
1320
+ if (getNotify (conn ))
1321
+ return 0 ;
1322
+ break ;
1323
+ case 'N' : /* NOTICE */
1324
+ if (pqGetErrorNotice3 (conn , false))
1325
+ return 0 ;
1326
+ break ;
1327
+ case 'S' : /* ParameterStatus */
1328
+ if (getParameterStatus (conn ))
1329
+ return 0 ;
1330
+ break ;
1331
+ case 'd' : /* Copy Data, pass it back to caller */
1332
+ return msgLength ;
1333
+ default : /* treat as end of copy */
1334
+ return -1 ;
1335
+ }
1336
+
1337
+ /* Drop the processed message and loop around for another */
1338
+ conn -> inStart = conn -> inCursor ;
1339
+ }
1340
+ }
1341
+
1342
+ /*
1343
+ * PQgetCopyData - read a row of data from the backend during COPY OUT
1344
+ *
1345
+ * If successful, sets *buffer to point to a malloc'd row of data, and
1346
+ * returns row length (always > 0) as result.
1347
+ * Returns 0 if no row available yet (only possible if async is true),
1348
+ * -1 if end of copy (consult PQgetResult), or -2 if error (consult
1349
+ * PQerrorMessage).
1350
+ */
1351
+ int
1352
+ pqGetCopyData3 (PGconn * conn , char * * buffer , int async )
1353
+ {
1354
+ int msgLength ;
1355
+
1356
+ for (;;)
1357
+ {
1358
+ /*
1359
+ * Collect the next input message. To make life simpler for async
1360
+ * callers, we keep returning 0 until the next message is fully
1361
+ * available, even if it is not Copy Data.
1362
+ */
1363
+ msgLength = getCopyDataMessage (conn );
1364
+ if (msgLength < 0 )
1365
+ {
1366
+ /*
1367
+ * On end-of-copy, exit COPY_OUT mode and let caller read status
1368
+ * with PQgetResult(). The normal case is that it's Copy Done,
1369
+ * but we let parseInput read that. If error, we expect the
1370
+ * state was already changed.
1371
+ */
1372
+ if (msgLength == -1 )
1373
+ conn -> asyncStatus = PGASYNC_BUSY ;
1374
+ return msgLength ; /* end-of-copy or error */
1375
+ }
1376
+ if (msgLength == 0 )
1377
+ {
1378
+ /* Don't block if async read requested */
1379
+ if (async )
1380
+ return 0 ;
1381
+ /* Need to load more data */
1382
+ if (pqWait (TRUE, FALSE, conn ) ||
1383
+ pqReadData (conn ) < 0 )
1384
+ return -2 ;
1385
+ continue ;
1317
1386
}
1318
1387
1319
1388
/*
@@ -1341,16 +1410,6 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
1341
1410
1342
1411
/* Empty, so drop it and loop around for another */
1343
1412
conn -> inStart = conn -> inCursor ;
1344
- continue ;
1345
-
1346
- nodata :
1347
- /* Don't block if async read requested */
1348
- if (async )
1349
- return 0 ;
1350
- /* Need to load more data */
1351
- if (pqWait (TRUE, FALSE, conn ) ||
1352
- pqReadData (conn ) < 0 )
1353
- return -2 ;
1354
1413
}
1355
1414
}
1356
1415
@@ -1413,7 +1472,6 @@ pqGetline3(PGconn *conn, char *s, int maxlen)
1413
1472
int
1414
1473
pqGetlineAsync3 (PGconn * conn , char * buffer , int bufsize )
1415
1474
{
1416
- char id ;
1417
1475
int msgLength ;
1418
1476
int avail ;
1419
1477
@@ -1424,22 +1482,13 @@ pqGetlineAsync3(PGconn *conn, char *buffer, int bufsize)
1424
1482
* Recognize the next input message. To make life simpler for async
1425
1483
* callers, we keep returning 0 until the next message is fully available
1426
1484
* even if it is not Copy Data. This should keep PQendcopy from blocking.
1485
+ * (Note: unlike pqGetCopyData3, we do not change asyncStatus here.)
1427
1486
*/
1428
- conn -> inCursor = conn -> inStart ;
1429
- if (pqGetc (& id , conn ))
1430
- return 0 ;
1431
- if (pqGetInt (& msgLength , 4 , conn ))
1432
- return 0 ;
1433
- avail = conn -> inEnd - conn -> inCursor ;
1434
- if (avail < msgLength - 4 )
1435
- return 0 ;
1436
-
1437
- /*
1438
- * Cannot proceed unless it's a Copy Data message. Anything else means
1439
- * end of copy mode.
1440
- */
1441
- if (id != 'd' )
1442
- return -1 ;
1487
+ msgLength = getCopyDataMessage (conn );
1488
+ if (msgLength < 0 )
1489
+ return -1 ; /* end-of-copy or error */
1490
+ if (msgLength == 0 )
1491
+ return 0 ; /* no data yet */
1443
1492
1444
1493
/*
1445
1494
* Move data from libpq's buffer to the caller's. In the case where a
0 commit comments