@@ -161,7 +161,7 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
161
161
char * error = NULL ;
162
162
int i ;
163
163
job_t * job ;
164
- int ret ;
164
+ spi_response_t * r ;
165
165
166
166
EE .n = 0 ;
167
167
EE .errors = NULL ;
@@ -230,27 +230,27 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
230
230
}
231
231
if (job -> type == AtJob && i == 0 && job -> sql_params_n > 0 )
232
232
{
233
- ret = execute_spi_params_prepared (job -> dosql [i ], job -> sql_params_n , job -> sql_params , & error );
233
+ r = execute_spi_params_prepared (job -> dosql [i ], job -> sql_params_n , job -> sql_params );
234
234
}
235
235
else
236
236
{
237
- ret = execute_spi (job -> dosql [i ], & error );
237
+ r = execute_spi (job -> dosql [i ]);
238
238
}
239
- if (ret < 0 )
239
+ if (r -> retval < 0 )
240
240
{
241
241
/* success = false; */
242
242
* status = SchdExecutorError ;
243
- if (error )
243
+ if (r -> error )
244
244
{
245
245
push_executor_error (& EE , "error in command #%d: %s" ,
246
- i + 1 , error );
247
- pfree (error );
246
+ i + 1 , r -> error );
248
247
}
249
248
else
250
249
{
251
250
push_executor_error (& EE , "error in command #%d: code: %d" ,
252
- i + 1 , ret );
251
+ i + 1 , r -> retval );
253
252
}
253
+ destroy_spi_data (r );
254
254
ABORT_SPI_SNAP ();
255
255
SetConfigOption ("schedule.transaction_state" , "failure" , PGC_INTERNAL , PGC_S_SESSION );
256
256
executor_onrollback (job , & EE );
@@ -264,6 +264,7 @@ int do_one_job(schd_executor_share_t *shared, schd_executor_status_t *status)
264
264
STOP_SPI_SNAP ();
265
265
}
266
266
}
267
+ destroy_spi_data (r );
267
268
}
268
269
if (* status != SchdExecutorError )
269
270
{
@@ -331,27 +332,36 @@ int set_session_authorization(char *username, char **error)
331
332
Oid useroid ;
332
333
Datum values [1 ];
333
334
bool is_superuser ;
334
- int ret ;
335
+ spi_response_t * r ;
336
+ int rv ;
335
337
char * sql = "select oid, rolsuper from pg_catalog.pg_roles where rolname = $1" ;
336
338
char buff [1024 ];
337
339
338
340
values [0 ] = CStringGetTextDatum (username );
339
341
START_SPI_SNAP ();
340
- ret = execute_spi_sql_with_args (sql , 1 , types , values , NULL , error );
342
+ r = execute_spi_sql_with_args (sql , 1 , types , values , NULL );
341
343
342
- if (ret < 0 ) return ret ;
343
- if (SPI_processed == 0 )
344
+ if (r -> retval < 0 )
345
+ {
346
+ rv = r -> retval ;
347
+ * error = _copy_string (r -> error );
348
+ destroy_spi_data (r );
349
+ return rv ;
350
+ }
351
+ if (r -> n_rows == 0 )
344
352
{
345
353
STOP_SPI_SNAP ();
346
354
sprintf (buff , "Cannot find user with name: %s" , username );
347
355
* error = _copy_string (buff );
356
+ destroy_spi_data (r );
348
357
349
358
return -200 ;
350
359
}
351
- useroid = get_oid_from_spi (0 , 1 , 0 );
352
- is_superuser = get_boolean_from_spi (0 , 2 , false);
360
+ useroid = get_oid_from_spi (r , 0 , 1 , 0 );
361
+ is_superuser = get_boolean_from_spi (r , 0 , 2 , false);
353
362
354
363
STOP_SPI_SNAP ();
364
+ destroy_spi_data (r );
355
365
356
366
SetSessionAuthorization (useroid , is_superuser );
357
367
@@ -399,42 +409,43 @@ void set_shared_message(schd_executor_share_t *shared, executor_error_t *ee)
399
409
400
410
TimestampTz get_next_excution_time (char * sql , executor_error_t * ee )
401
411
{
402
- char * error ;
403
- int ret ;
404
412
TimestampTz ts = 0 ;
405
413
Datum d ;
406
- bool isnull ;
414
+ spi_response_t * r ;
407
415
408
416
START_SPI_SNAP ();
409
417
pgstat_report_activity (STATE_RUNNING , "culc next time execution time" );
410
- ret = execute_spi (sql , & error );
411
- if (ret < 0 )
418
+ r = execute_spi (sql );
419
+ if (r -> retval < 0 )
412
420
{
413
- if (error )
421
+ if (r -> error )
414
422
{
415
- push_executor_error (ee , "next time error: %s" , error );
416
- pfree (error );
423
+ push_executor_error (ee , "next time error: %s" , r -> error );
417
424
}
418
425
else
419
426
{
420
- push_executor_error (ee , "next time error: code = %d" , ret );
427
+ push_executor_error (ee , "next time error: code = %d" , r -> retval );
421
428
}
429
+ destroy_spi_data (r );
422
430
ABORT_SPI_SNAP ();
423
431
return 0 ;
424
432
}
425
- if (SPI_processed == 0 )
433
+ if (r -> n_rows == 0 )
426
434
{
427
435
push_executor_error (ee , "next time statement returns 0 rows" );
428
436
}
429
- else if (SPI_gettypeid ( SPI_tuptable -> tupdesc , 1 ) != TIMESTAMPTZOID )
437
+ else if (r -> types [ 0 ] != TIMESTAMPTZOID )
430
438
{
431
439
push_executor_error (ee , "next time statement column 1 type is not timestamp with timezone" );
432
440
}
441
+ else if (r -> rows [0 ][0 ].null )
442
+ {
443
+ push_executor_error (ee , "next time statement column 1 is null" );
444
+ }
433
445
else
434
446
{
435
- d = SPI_getbinval (SPI_tuptable -> vals [0 ], SPI_tuptable -> tupdesc ,
436
- 1 , & isnull );
437
- if (isnull )
447
+ d = r -> rows [0 ][0 ].dat ;
448
+ if (!d )
438
449
{
439
450
push_executor_error (ee , "next time statement row 0 column 1 ihas NULL value" );
440
451
}
@@ -443,65 +454,67 @@ TimestampTz get_next_excution_time(char *sql, executor_error_t *ee)
443
454
ts = DatumGetTimestampTz (d );
444
455
}
445
456
}
457
+ destroy_spi_data (r );
446
458
447
459
STOP_SPI_SNAP ();
448
460
return ts ;
449
461
}
450
462
451
463
int executor_onrollback (job_t * job , executor_error_t * ee )
452
464
{
453
- char * error = NULL ;
454
- int ret ;
465
+ int rv ;
466
+ spi_response_t * r ;
455
467
456
468
if (!job -> onrollback ) return 0 ;
457
469
pgstat_report_activity (STATE_RUNNING , "execure onrollback" );
458
470
459
471
START_SPI_SNAP ();
460
- ret = execute_spi (job -> onrollback , & error );
461
- if (ret < 0 )
472
+ r = execute_spi (job -> onrollback );
473
+ if (r -> retval < 0 )
462
474
{
463
- if (error )
475
+ if (r -> error )
464
476
{
465
- push_executor_error (ee , "onrollback error: %s" , error );
466
- pfree (error );
477
+ push_executor_error (ee , "onrollback error: %s" , r -> error );
467
478
}
468
479
else
469
480
{
470
- push_executor_error (ee , "onrollback error: unknown: %d" , ret );
481
+ push_executor_error (ee , "onrollback error: unknown: %d" , r -> retval );
471
482
}
472
483
ABORT_SPI_SNAP ();
473
484
}
474
485
else
475
486
{
476
487
STOP_SPI_SNAP ();
477
488
}
478
- return ret ;
489
+ rv = r -> retval ;
490
+ destroy_spi_data (r );
491
+ return rv ;
479
492
}
480
493
481
494
void set_pg_var (bool result , executor_error_t * ee )
482
495
{
483
496
char * sql = "select pgv_set_text('pgpro_scheduler', 'transaction', $1)" ;
484
497
Oid argtypes [1 ] = { TEXTOID };
485
498
Datum vals [1 ];
486
- char * error = NULL ;
487
- int ret ;
499
+ spi_response_t * r ;
488
500
489
501
pgstat_report_activity (STATE_RUNNING , "set pg_valiable" );
490
502
491
503
vals [0 ] = PointerGetDatum (cstring_to_text (result ? "success" : "failure" ));
492
504
493
- ret = execute_spi_sql_with_args (sql , 1 , argtypes , vals , NULL , & error );
494
- if (ret < 0 )
505
+ r = execute_spi_sql_with_args (sql , 1 , argtypes , vals , NULL );
506
+ if (r -> retval < 0 )
495
507
{
496
- if (error )
508
+ if (r -> error )
497
509
{
498
- push_executor_error (ee , "set variable: %s" , error );
510
+ push_executor_error (ee , "set variable: %s" , r -> error );
499
511
}
500
512
else
501
513
{
502
- push_executor_error (ee , "set variable error code: %d" , ret );
514
+ push_executor_error (ee , "set variable error code: %d" , r -> retval );
503
515
}
504
516
}
517
+ destroy_spi_data (r );
505
518
}
506
519
507
520
job_t * initializeExecutorJob (schd_executor_share_t * data )
@@ -696,8 +709,9 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
696
709
char * error = NULL ;
697
710
char * set_error = NULL ;
698
711
job_t * job ;
699
- int ret , set_ret ;
712
+ int set_ret ;
700
713
char buff [512 ];
714
+ spi_response_t * r ;
701
715
702
716
* status = shared -> status = SchdExecutorWork ;
703
717
@@ -728,7 +742,6 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
728
742
return -1;
729
743
} */
730
744
STOP_SPI_SNAP (); /* Commit changes */
731
- elog (LOG , "JOB MOVED TO PROCESSED" );
732
745
pgstat_report_activity (STATE_RUNNING , "job initialized" );
733
746
START_SPI_SNAP ();
734
747
@@ -767,11 +780,11 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
767
780
768
781
if (job -> sql_params_n > 0 )
769
782
{
770
- ret = execute_spi_params_prepared (job -> dosql [0 ], job -> sql_params_n , job -> sql_params , & error );
783
+ r = execute_spi_params_prepared (job -> dosql [0 ], job -> sql_params_n , job -> sql_params );
771
784
}
772
785
else
773
786
{
774
- ret = execute_spi (job -> dosql [0 ], & error );
787
+ r = execute_spi (job -> dosql [0 ]);
775
788
}
776
789
if (job -> timelimit )
777
790
{
@@ -780,23 +793,25 @@ int process_one_job(schd_executor_share_state_t *shared, schd_executor_status_t
780
793
ResetAllOptions ();
781
794
SetConfigOption ("enable_seqscan" , "off" , PGC_USERSET , PGC_S_SESSION );
782
795
SetSessionAuthorization (BOOTSTRAP_SUPERUSERID , true);
783
- if (ret < 0 )
796
+ if (r -> retval < 0 )
784
797
{
785
- if (error )
798
+ if (r -> error )
786
799
{
787
- set_ret = set_at_job_done (job , error , resubmit_current_job , & set_error );
788
- pfree ( error );
800
+ set_ret = set_at_job_done (job , r -> error , resubmit_current_job ,
801
+ & set_error );
789
802
}
790
803
else
791
804
{
792
- sprintf (buff , "error in command: code: %d" , ret );
793
- set_ret = set_at_job_done (job , buff , resubmit_current_job , & set_error );
805
+ sprintf (buff , "error in command: code: %d" , r -> retval );
806
+ set_ret = set_at_job_done (job , buff , resubmit_current_job ,
807
+ & set_error );
794
808
}
795
809
}
796
810
else
797
811
{
798
812
set_ret = set_at_job_done (job , NULL , resubmit_current_job , & set_error );
799
813
}
814
+ destroy_spi_data (r );
800
815
801
816
resubmit_current_job = 0 ;
802
817
current_job_id = -1 ;
0 commit comments