@@ -98,19 +98,24 @@ int checkSchedulerNamespace(void)
98
98
int get_scheduler_maxworkers (void )
99
99
{
100
100
const char * opt ;
101
+ int var ;
101
102
102
- opt = GetConfigOption ("scheduler.max_workers" , true, false);
103
+ opt = GetConfigOption ("schedule.max_workers" , true, false);
104
+ /* opt = GetConfigOptionByName("schedule.max_workers", NULL); */
103
105
if (opt == NULL )
104
106
{
105
107
return 2 ;
106
108
}
107
- return atoi (opt );
109
+
110
+ var = atoi (opt );
111
+ /* pfree(opt); */
112
+ return var ;
108
113
}
109
114
110
115
char * get_scheduler_nodename (void )
111
116
{
112
117
const char * opt ;
113
- opt = GetConfigOption ("scheduler .nodename" , true, false);
118
+ opt = GetConfigOption ("schedule .nodename" , true, false);
114
119
115
120
return _copy_string ((char * )(opt == NULL || strlen (opt ) == 0 ? "master" : opt ));
116
121
}
@@ -140,13 +145,74 @@ scheduler_manager_ctx_t *initialize_scheduler_manager_context(char *dbname, dsm_
140
145
return ctx ;
141
146
}
142
147
143
- void refresh_scheduler_manager_context (scheduler_manager_ctx_t * ctx )
148
+ int refresh_scheduler_manager_context (scheduler_manager_ctx_t * ctx )
144
149
{
145
- /* TODO set new nodename , if changed kill all kids workers, change
146
- max-workers resize slots
147
- if less then was and all slots are buisy [ ??? ]
148
- kill youngest (?)
149
- */
150
+ int rc = 0 ;
151
+ int N , i , busy ;
152
+ scheduler_manager_slot_t * * old ;
153
+
154
+ N = get_scheduler_maxworkers ();
155
+ if (N != ctx -> slots_len )
156
+ {
157
+ elog (LOG , "Change available workers number %d => %d" , ctx -> slots_len , N );
158
+ }
159
+
160
+ if (N > ctx -> slots_len )
161
+ {
162
+ pgstat_report_activity (STATE_RUNNING , "extend the number of workers" );
163
+
164
+ old = ctx -> slots ;
165
+ ctx -> slots = worker_alloc (sizeof (scheduler_manager_slot_t * ) * N );
166
+ for (i = 0 ; i < N ; i ++ )
167
+ {
168
+ ctx -> slots [i ] = NULL ;
169
+ }
170
+ for (i = 0 ; i < ctx -> slots_len ; i ++ )
171
+ {
172
+ ctx -> slots [i ] = old [i ];
173
+ }
174
+ pfree (old );
175
+ ctx -> free_slots += (N - ctx -> slots_len );
176
+ ctx -> slots_len = N ;
177
+ }
178
+ else if (N < ctx -> slots_len )
179
+ {
180
+ pgstat_report_activity (STATE_RUNNING , "shrink the number of workers" );
181
+ busy = ctx -> slots_len - ctx -> free_slots ;
182
+ if (N >= busy )
183
+ {
184
+ ctx -> slots = repalloc (ctx -> slots , sizeof (scheduler_manager_slot_t * ) * N );
185
+ ctx -> slots_len = N ;
186
+ ctx -> free_slots = N - busy ;
187
+ }
188
+ else
189
+ {
190
+ pgstat_report_activity (STATE_RUNNING , "wait for some workers free slots" );
191
+ while (!got_sigterm )
192
+ {
193
+ CHECK_FOR_INTERRUPTS ();
194
+ scheduler_check_slots (ctx );
195
+ busy = ctx -> slots_len - ctx -> free_slots ;
196
+ if (N >= busy )
197
+ {
198
+ ctx -> slots = repalloc (ctx -> slots , sizeof (scheduler_manager_slot_t * ) * N );
199
+ ctx -> slots_len = N ;
200
+ ctx -> free_slots = N - busy ;
201
+ break ;
202
+ }
203
+ if (rc )
204
+ {
205
+ if (rc & WL_POSTMASTER_DEATH ) proc_exit (1 );
206
+ if (got_sigterm || got_sighup ) return 0 ;
207
+ }
208
+ rc = WaitLatch (MyLatch ,
209
+ WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH , 500L );
210
+ ResetLatch (MyLatch );
211
+ }
212
+ }
213
+ }
214
+
215
+ return 1 ;
150
216
}
151
217
152
218
void destroy_scheduler_manager_context (scheduler_manager_ctx_t * ctx )
@@ -218,7 +284,6 @@ scheduler_task_t *scheduler_get_active_tasks(scheduler_manager_ctx_t *ctx, int *
218
284
tupdesc = SPI_tuptable -> tupdesc ;
219
285
220
286
tasks = worker_alloc (sizeof (scheduler_task_t ) * processed );
221
- elog (LOG , "Found %d tasks" , processed );
222
287
223
288
for (i = 0 ; i < processed ; i ++ )
224
289
{
@@ -496,7 +561,6 @@ TimestampTz *scheduler_calc_next_task_time(scheduler_task_t *task, TimestampTz s
496
561
curr += SECS_PER_MINUTE ;
497
562
#endif
498
563
}
499
- elog (LOG , "made: %d" , * ntimes );
500
564
for (i = 0 ; i < 5 ; i ++ ) destroy_bit_array (& cron [i ], 0 );
501
565
if (* ntimes == 0 )
502
566
{
@@ -1144,6 +1208,14 @@ void clean_at_table(scheduler_manager_ctx_t *ctx)
1144
1208
STOP_SPI_SNAP ();
1145
1209
}
1146
1210
1211
+ void set_slots_stat_report (scheduler_manager_ctx_t * ctx )
1212
+ {
1213
+ char state [128 ];
1214
+ snprintf (state , 128 , "slots busy: %d, free: %d" ,
1215
+ ctx -> slots_len - ctx -> free_slots , ctx -> free_slots );
1216
+ pgstat_report_activity (STATE_RUNNING , state );
1217
+ }
1218
+
1147
1219
void manager_worker_main (Datum arg )
1148
1220
{
1149
1221
char * database ;
@@ -1196,7 +1268,6 @@ void manager_worker_main(Datum arg)
1196
1268
dsm_detach (seg );
1197
1269
proc_exit (0 );
1198
1270
}
1199
- elog (LOG , "ON" );
1200
1271
SetCurrentStatementStartTimestamp ();
1201
1272
pgstat_report_activity (STATE_RUNNING , "initialize." );
1202
1273
@@ -1209,7 +1280,8 @@ void manager_worker_main(Datum arg)
1209
1280
init_worker_mem_ctx ("WorkerMemoryContext" );
1210
1281
ctx = initialize_scheduler_manager_context (database , seg );
1211
1282
clean_at_table (ctx );
1212
- elog (LOG , "Start main loop" );
1283
+ set_slots_stat_report (ctx );
1284
+
1213
1285
while (!got_sigterm )
1214
1286
{
1215
1287
if (rc )
@@ -1220,20 +1292,22 @@ void manager_worker_main(Datum arg)
1220
1292
got_sighup = false;
1221
1293
ProcessConfigFile (PGC_SIGHUP );
1222
1294
refresh_scheduler_manager_context (ctx );
1295
+ set_slots_stat_report (ctx );
1223
1296
}
1224
1297
if (!got_sighup && !got_sigterm )
1225
1298
{
1226
1299
if (rc & WL_LATCH_SET )
1227
1300
{
1228
1301
scheduler_check_slots (ctx );
1302
+ set_slots_stat_report (ctx );
1229
1303
}
1230
1304
else if (rc & WL_TIMEOUT )
1231
1305
{
1232
1306
scheduler_make_at_record (ctx );
1233
1307
scheduler_vanish_expired_jobs (ctx );
1234
1308
scheduler_start_jobs (ctx );
1235
1309
scheduler_check_slots (ctx );
1236
- pgstat_report_activity ( STATE_IDLE , "" );
1310
+ set_slots_stat_report ( ctx );
1237
1311
}
1238
1312
}
1239
1313
}
0 commit comments