Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 6cd110c

Browse files
committed
Fix assorted issues in parallel vacuumdb.
Avoid storing the result of PQsocket() in a pgsocket variable; it's declared as int, and the no-socket test is properly written as "x < 0" not "x == PGINVALID_SOCKET". This accidentally had no bad effect because we never got to init_slot() with a bad connection, but it's still wrong. Actually, it seems like we should avoid storing the result for a long period at all. The function's not so expensive that it's worth avoiding, and the existing coding technique here would fail if anyone tried to PQreset the connection during the life of the program. Hence, just re-call PQsocket every time we construct a select(2) mask. Speaking of select(), GetIdleSlot imagined that it could compute the select mask once and continue to use it over multiple calls to select_loop(), which is pretty bogus since that would stomp on the mask on return. This could only matter if the function's outer loop iterated more than once, which is unlikely (it'd take some connection receiving data, but not enough to complete its command). But if it did happen, we'd acquire "tunnel vision" and stop watching the other connections for query termination, with the effect of losing parallelism. Another way in which GetIdleSlot could lose parallelism is that once PQisBusy returns false, it would lock in on that connection and do PQgetResult until that returns NULL; in some cases that could result in blocking. (Perhaps this can never happen in vacuumdb due to the limited set of commands that it can issue, but I'm not quite sure of that, and even if true today it's not a future-proof assumption.) Refactor the code to do that properly, so that it risks blocking in PQgetResult only in cases where we need to wait anyway. Another loss-of-parallelism problem, which *is* easily demonstrable, is that any setup queries issued during prepare_vacuum_command() were always issued on the last-to-be-created connection, whether or not that was idle. Long-running operations on that connection thus prevented issuance of additional operations on the other ones, except in the limited cases where no preparatory query was needed. Instead, wait till we've identified a free connection and use that one. Also, avoid core dump due to undersized malloc request in the case that no tables are identified to be vacuumed. The bogus no-socket test was noted by CharSyam, the other problems identified in my own code review. Back-patch to 9.5 where parallel vacuumdb was introduced. Discussion: https://postgr.es/m/CAMrLSE6etb33-192DTEUGkV-TsvEcxtBDxGWG1tgNOMnQHwgDA@mail.gmail.com
1 parent 91d8231 commit 6cd110c

File tree

1 file changed

+114
-78
lines changed

1 file changed

+114
-78
lines changed

src/bin/scripts/vacuumdb.c

Lines changed: 114 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,8 @@
2626
/* Parallel vacuuming stuff */
2727
typedef struct ParallelSlot
2828
{
29-
PGconn *connection;
30-
pgsocket sock;
31-
bool isFree;
29+
PGconn *connection; /* One connection */
30+
bool isFree; /* Is it known to be idle? */
3231
} ParallelSlot;
3332

3433
/* vacuum options controlled by user flags */
@@ -69,13 +68,16 @@ static void run_vacuum_command(PGconn *conn, const char *sql, bool echo,
6968
static ParallelSlot *GetIdleSlot(ParallelSlot slots[], int numslots,
7069
const char *progname);
7170

71+
static bool ProcessQueryResult(PGconn *conn, PGresult *result,
72+
const char *progname);
73+
7274
static bool GetQueryResult(PGconn *conn, const char *progname);
7375

7476
static void DisconnectDatabase(ParallelSlot *slot);
7577

7678
static int select_loop(int maxFd, fd_set *workerset, bool *aborting);
7779

78-
static void init_slot(ParallelSlot *slot, PGconn *conn, const char *progname);
80+
static void init_slot(ParallelSlot *slot, PGconn *conn);
7981

8082
static void help(const char *progname);
8183

@@ -341,7 +343,7 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
341343
PQExpBufferData sql;
342344
PGconn *conn;
343345
SimpleStringListCell *cell;
344-
ParallelSlot *slots = NULL;
346+
ParallelSlot *slots;
345347
SimpleStringList dbtables = {NULL, NULL};
346348
int i;
347349
bool failed = false;
@@ -385,7 +387,6 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
385387
PQExpBufferData buf;
386388
PGresult *res;
387389
int ntups;
388-
int i;
389390

390391
initPQExpBuffer(&buf);
391392

@@ -426,15 +427,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
426427
* for the first slot. If not in parallel mode, the first slot in the
427428
* array contains the connection.
428429
*/
430+
if (concurrentCons <= 0)
431+
concurrentCons = 1;
429432
slots = (ParallelSlot *) pg_malloc(sizeof(ParallelSlot) * concurrentCons);
430-
init_slot(slots, conn, progname);
433+
init_slot(slots, conn);
431434
if (parallel)
432435
{
433436
for (i = 1; i < concurrentCons; i++)
434437
{
435438
conn = connectDatabase(dbname, host, port, username, prompt_password,
436439
progname, echo, false, true);
437-
init_slot(slots + i, conn, progname);
440+
init_slot(slots + i, conn);
438441
}
439442
}
440443

@@ -456,11 +459,8 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
456459
cell = tables ? tables->head : NULL;
457460
do
458461
{
459-
ParallelSlot *free_slot;
460462
const char *tabname = cell ? cell->val : NULL;
461-
462-
prepare_vacuum_command(&sql, conn, vacopts, tabname,
463-
tables == &dbtables, progname, echo);
463+
ParallelSlot *free_slot;
464464

465465
if (CancelRequested)
466466
{
@@ -492,10 +492,17 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
492492
else
493493
free_slot = slots;
494494

495+
/*
496+
* Prepare the vacuum command. Note that in some cases this requires
497+
* query execution, so be sure to use the free connection.
498+
*/
499+
prepare_vacuum_command(&sql, free_slot->connection, vacopts, tabname,
500+
tables == &dbtables, progname, echo);
501+
495502
/*
496503
* Execute the vacuum. If not in parallel mode, this terminates the
497504
* program in case of an error. (The parallel case handles query
498-
* errors in GetQueryResult through GetIdleSlot.)
505+
* errors in ProcessQueryResult through GetIdleSlot.)
499506
*/
500507
run_vacuum_command(free_slot->connection, sql.data,
501508
echo, tabname, progname, parallel);
@@ -508,13 +515,11 @@ vacuum_one_database(const char *dbname, vacuumingOptions *vacopts,
508515
{
509516
int j;
510517

518+
/* wait for all connections to finish */
511519
for (j = 0; j < concurrentCons; j++)
512520
{
513-
/* wait for all connection to return the results */
514521
if (!GetQueryResult((slots + j)->connection, progname))
515522
goto finish;
516-
517-
(slots + j)->isFree = true;
518523
}
519524
}
520525

@@ -685,7 +690,8 @@ prepare_vacuum_command(PQExpBuffer sql, PGconn *conn,
685690
}
686691

687692
/*
688-
* Execute a vacuum/analyze command to the server.
693+
* Send a vacuum/analyze command to the server. In async mode, return after
694+
* sending the command; else, wait for it to finish.
689695
*
690696
* Any errors during command execution are reported to stderr. If async is
691697
* false, this function exits the program after reporting the error.
@@ -733,10 +739,6 @@ run_vacuum_command(PGconn *conn, const char *sql, bool echo,
733739
* this happens, we read the whole set and mark as free all sockets that become
734740
* available.
735741
*
736-
* Process the slot list, if any free slot is available then return the slotid
737-
* else perform the select on all the socket's and wait until at least one slot
738-
* becomes available.
739-
*
740742
* If an error occurs, NULL is returned.
741743
*/
742744
static ParallelSlot *
@@ -745,31 +747,43 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
745747
{
746748
int i;
747749
int firstFree = -1;
748-
fd_set slotset;
749-
pgsocket maxFd;
750-
751-
for (i = 0; i < numslots; i++)
752-
if ((slots + i)->isFree)
753-
return slots + i;
754-
755-
FD_ZERO(&slotset);
756750

757-
maxFd = slots->sock;
751+
/* Any connection already known free? */
758752
for (i = 0; i < numslots; i++)
759753
{
760-
FD_SET((slots + i)->sock, &slotset);
761-
if ((slots + i)->sock > maxFd)
762-
maxFd = (slots + i)->sock;
754+
if (slots[i].isFree)
755+
return slots + i;
763756
}
764757

765758
/*
766759
* No free slot found, so wait until one of the connections has finished
767760
* its task and return the available slot.
768761
*/
769-
for (firstFree = -1; firstFree < 0;)
762+
while (firstFree < 0)
770763
{
764+
fd_set slotset;
765+
int maxFd = 0;
771766
bool aborting;
772767

768+
/* We must reconstruct the fd_set for each call to select_loop */
769+
FD_ZERO(&slotset);
770+
771+
for (i = 0; i < numslots; i++)
772+
{
773+
int sock = PQsocket(slots[i].connection);
774+
775+
/*
776+
* We don't really expect any connections to lose their sockets
777+
* after startup, but just in case, cope by ignoring them.
778+
*/
779+
if (sock < 0)
780+
continue;
781+
782+
FD_SET(sock, &slotset);
783+
if (sock > maxFd)
784+
maxFd = sock;
785+
}
786+
773787
SetCancelConn(slots->connection);
774788
i = select_loop(maxFd, &slotset, &aborting);
775789
ResetCancelConn();
@@ -787,64 +801,93 @@ GetIdleSlot(ParallelSlot slots[], int numslots,
787801

788802
for (i = 0; i < numslots; i++)
789803
{
790-
if (!FD_ISSET((slots + i)->sock, &slotset))
791-
continue;
792-
793-
PQconsumeInput((slots + i)->connection);
794-
if (PQisBusy((slots + i)->connection))
795-
continue;
804+
int sock = PQsocket(slots[i].connection);
796805

797-
(slots + i)->isFree = true;
806+
if (sock >= 0 && FD_ISSET(sock, &slotset))
807+
{
808+
/* select() says input is available, so consume it */
809+
PQconsumeInput(slots[i].connection);
810+
}
798811

799-
if (!GetQueryResult((slots + i)->connection, progname))
800-
return NULL;
812+
/* Collect result(s) as long as any are available */
813+
while (!PQisBusy(slots[i].connection))
814+
{
815+
PGresult *result = PQgetResult(slots[i].connection);
801816

802-
if (firstFree < 0)
803-
firstFree = i;
817+
if (result != NULL)
818+
{
819+
/* Check and discard the command result */
820+
if (!ProcessQueryResult(slots[i].connection, result,
821+
progname))
822+
return NULL;
823+
}
824+
else
825+
{
826+
/* This connection has become idle */
827+
slots[i].isFree = true;
828+
if (firstFree < 0)
829+
firstFree = i;
830+
break;
831+
}
832+
}
804833
}
805834
}
806835

807836
return slots + firstFree;
808837
}
809838

839+
/*
840+
* ProcessQueryResult
841+
*
842+
* Process (and delete) a query result. Returns true if there's no error,
843+
* false otherwise -- but errors about trying to vacuum a missing relation
844+
* are reported and subsequently ignored.
845+
*/
846+
static bool
847+
ProcessQueryResult(PGconn *conn, PGresult *result, const char *progname)
848+
{
849+
/*
850+
* If it's an error, report it. Errors about a missing table are harmless
851+
* so we continue processing; but die for other errors.
852+
*/
853+
if (PQresultStatus(result) != PGRES_COMMAND_OK)
854+
{
855+
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
856+
857+
fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
858+
progname, PQdb(conn), PQerrorMessage(conn));
859+
860+
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
861+
{
862+
PQclear(result);
863+
return false;
864+
}
865+
}
866+
867+
PQclear(result);
868+
return true;
869+
}
870+
810871
/*
811872
* GetQueryResult
812873
*
813-
* Process the query result. Returns true if there's no error, false
814-
* otherwise -- but errors about trying to vacuum a missing relation are
815-
* reported and subsequently ignored.
874+
* Pump the conn till it's dry of results; return false if any are errors.
875+
* Note that this will block if the conn is busy.
816876
*/
817877
static bool
818878
GetQueryResult(PGconn *conn, const char *progname)
819879
{
880+
bool ok = true;
820881
PGresult *result;
821882

822883
SetCancelConn(conn);
823884
while ((result = PQgetResult(conn)) != NULL)
824885
{
825-
/*
826-
* If errors are found, report them. Errors about a missing table are
827-
* harmless so we continue processing; but die for other errors.
828-
*/
829-
if (PQresultStatus(result) != PGRES_COMMAND_OK)
830-
{
831-
char *sqlState = PQresultErrorField(result, PG_DIAG_SQLSTATE);
832-
833-
fprintf(stderr, _("%s: vacuuming of database \"%s\" failed: %s"),
834-
progname, PQdb(conn), PQerrorMessage(conn));
835-
836-
if (sqlState && strcmp(sqlState, ERRCODE_UNDEFINED_TABLE) != 0)
837-
{
838-
PQclear(result);
839-
return false;
840-
}
841-
}
842-
843-
PQclear(result);
886+
if (!ProcessQueryResult(conn, result, progname))
887+
ok = false;
844888
}
845889
ResetCancelConn();
846-
847-
return true;
890+
return ok;
848891
}
849892

850893
/*
@@ -936,18 +979,11 @@ select_loop(int maxFd, fd_set *workerset, bool *aborting)
936979
}
937980

938981
static void
939-
init_slot(ParallelSlot *slot, PGconn *conn, const char *progname)
982+
init_slot(ParallelSlot *slot, PGconn *conn)
940983
{
941984
slot->connection = conn;
985+
/* Initially assume connection is idle */
942986
slot->isFree = true;
943-
slot->sock = PQsocket(conn);
944-
945-
if (slot->sock < 0)
946-
{
947-
fprintf(stderr, _("%s: invalid socket: %s"), progname,
948-
PQerrorMessage(conn));
949-
exit(1);
950-
}
951987
}
952988

953989
static void

0 commit comments

Comments
 (0)