diff options
author | Bruce Momjian | 2013-01-09 13:57:47 +0000 |
---|---|---|
committer | Bruce Momjian | 2013-01-09 13:57:47 +0000 |
commit | a89c46f9bc314ed549245d888da09b8c5cace104 (patch) | |
tree | 7b6ed8dc5ea0bbd6426c8e5452a24ce5f5bd5a77 /contrib/pg_upgrade/parallel.c | |
parent | c00dc337b8752ec959e27bfdc58e13f3d305154a (diff) |
Allow parallel copy/link in pg_upgrade
This patch implements parallel copying/linking of files by tablespace
using the --jobs option in pg_upgrade.
Diffstat (limited to 'contrib/pg_upgrade/parallel.c')
-rw-r--r-- | contrib/pg_upgrade/parallel.c | 147 |
1 files changed, 134 insertions, 13 deletions
diff --git a/contrib/pg_upgrade/parallel.c b/contrib/pg_upgrade/parallel.c index 8ea36bc6b96..d157511781e 100644 --- a/contrib/pg_upgrade/parallel.c +++ b/contrib/pg_upgrade/parallel.c @@ -34,11 +34,24 @@ typedef struct { char log_file[MAXPGPATH]; char opt_log_file[MAXPGPATH]; char cmd[MAX_STRING]; -} thread_arg; +} exec_thread_arg; -thread_arg **thread_args; +typedef struct { + DbInfoArr *old_db_arr; + DbInfoArr *new_db_arr; + char old_pgdata[MAXPGPATH]; + char new_pgdata[MAXPGPATH]; + char old_tablespace[MAXPGPATH]; +} transfer_thread_arg; + +exec_thread_arg **exec_thread_args; +transfer_thread_arg **transfer_thread_args; + +/* track current thread_args struct so reap_child() can be used for all cases */ +void **cur_thread_args; -DWORD win32_exec_prog(thread_arg *args); +DWORD win32_exec_prog(exec_thread_arg *args); +DWORD win32_transfer_all_new_dbs(transfer_thread_arg *args); #endif @@ -58,7 +71,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, pid_t child; #else HANDLE child; - thread_arg *new_arg; + exec_thread_arg *new_arg; #endif va_start(args, fmt); @@ -71,7 +84,9 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, else { /* parallel */ - +#ifdef WIN32 + cur_thread_args = (void **)exec_thread_args; +#endif /* harvest any dead children */ while (reap_child(false) == true) ; @@ -100,7 +115,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, int i; thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); - thread_args = pg_malloc(user_opts.jobs * sizeof(thread_arg *)); + exec_thread_args = pg_malloc(user_opts.jobs * sizeof(exec_thread_arg *)); /* * For safety and performance, we keep the args allocated during @@ -108,11 +123,11 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, * in a thread different from the one that allocated it. */ for (i = 0; i < user_opts.jobs; i++) - thread_args[i] = pg_malloc(sizeof(thread_arg)); + exec_thread_args[i] = pg_malloc(sizeof(exec_thread_arg)); } /* use first empty array element */ - new_arg = thread_args[parallel_jobs-1]; + new_arg = exec_thread_args[parallel_jobs-1]; /* Can only pass one pointer into the function, so use a struct */ strcpy(new_arg->log_file, log_file); @@ -134,7 +149,7 @@ parallel_exec_prog(const char *log_file, const char *opt_log_file, #ifdef WIN32 DWORD -win32_exec_prog(thread_arg *args) +win32_exec_prog(exec_thread_arg *args) { int ret; @@ -147,6 +162,112 @@ win32_exec_prog(thread_arg *args) /* + * parallel_transfer_all_new_dbs + * + * This has the same API as transfer_all_new_dbs, except it does parallel execution + * by transfering multiple tablespaces in parallel + */ +void parallel_transfer_all_new_dbs(DbInfoArr *old_db_arr, DbInfoArr *new_db_arr, + char *old_pgdata, char *new_pgdata, + char *old_tablespace) +{ +#ifndef WIN32 + pid_t child; +#else + HANDLE child; + transfer_thread_arg *new_arg; +#endif + + if (user_opts.jobs <= 1) + /* throw_error must be true to allow jobs */ + transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, NULL); + else + { + /* parallel */ +#ifdef WIN32 + cur_thread_args = (void **)transfer_thread_args; +#endif + /* harvest any dead children */ + while (reap_child(false) == true) + ; + + /* must we wait for a dead child? */ + if (parallel_jobs >= user_opts.jobs) + reap_child(true); + + /* set this before we start the job */ + parallel_jobs++; + + /* Ensure stdio state is quiesced before forking */ + fflush(NULL); + +#ifndef WIN32 + child = fork(); + if (child == 0) + { + transfer_all_new_dbs(old_db_arr, new_db_arr, old_pgdata, new_pgdata, + old_tablespace); + /* if we take another exit path, it will be non-zero */ + /* use _exit to skip atexit() functions */ + _exit(0); + } + else if (child < 0) + /* fork failed */ + pg_log(PG_FATAL, "could not create worker process: %s\n", strerror(errno)); +#else + if (thread_handles == NULL) + { + int i; + + thread_handles = pg_malloc(user_opts.jobs * sizeof(HANDLE)); + transfer_thread_args = pg_malloc(user_opts.jobs * sizeof(transfer_thread_arg *)); + + /* + * For safety and performance, we keep the args allocated during + * the entire life of the process, and we don't free the args + * in a thread different from the one that allocated it. + */ + for (i = 0; i < user_opts.jobs; i++) + transfer_thread_args[i] = pg_malloc(sizeof(transfer_thread_arg)); + } + + /* use first empty array element */ + new_arg = transfer_thread_args[parallel_jobs-1]; + + /* Can only pass one pointer into the function, so use a struct */ + new_arg->old_db_arr = old_db_arr; + new_arg->new_db_arr = new_db_arr; + strcpy(new_arg->old_pgdata, old_pgdata); + strcpy(new_arg->new_pgdata, new_pgdata); + strcpy(new_arg->old_tablespace, old_tablespace); + + child = (HANDLE) _beginthreadex(NULL, 0, (void *) win32_exec_prog, + new_arg, 0, NULL); + if (child == 0) + pg_log(PG_FATAL, "could not create worker thread: %s\n", strerror(errno)); + + thread_handles[parallel_jobs-1] = child; +#endif + } + + return; +} + + +#ifdef WIN32 +DWORD +win32_transfer_all_new_dbs(transfer_thread_arg *args) +{ + transfer_all_new_dbs(args->old_db_arr, args->new_db_arr, args->old_pgdata, + args->new_pgdata, args->old_tablespace); + + /* terminates thread */ + return 0; +} +#endif + + +/* * collect status from a completed worker child */ bool @@ -195,7 +316,7 @@ reap_child(bool wait_for_child) /* Move last slot into dead child's position */ if (thread_num != parallel_jobs - 1) { - thread_arg *tmp_args; + void *tmp_args; thread_handles[thread_num] = thread_handles[parallel_jobs - 1]; @@ -205,9 +326,9 @@ reap_child(bool wait_for_child) * reused by the next created thread. Instead, the new thread * will use the arg struct of the thread that just died. */ - tmp_args = thread_args[thread_num]; - thread_args[thread_num] = thread_args[parallel_jobs - 1]; - thread_args[parallel_jobs - 1] = tmp_args; + tmp_args = cur_thread_args[thread_num]; + cur_thread_args[thread_num] = cur_thread_args[parallel_jobs - 1]; + cur_thread_args[parallel_jobs - 1] = tmp_args; } #endif |