@@ -719,7 +719,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
719
719
int encoding = -1 ;
720
720
bool dbistemplate = false;
721
721
bool dballowconnections = true;
722
- int dbconnlimit = -1 ;
722
+ int dbconnlimit = DATCONNLIMIT_UNLIMITED ;
723
723
char * dbcollversion = NULL ;
724
724
int notherbackends ;
725
725
int npreparedxacts ;
@@ -926,7 +926,7 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
926
926
if (dconnlimit && dconnlimit -> arg )
927
927
{
928
928
dbconnlimit = defGetInt32 (dconnlimit );
929
- if (dbconnlimit < -1 )
929
+ if (dbconnlimit < DATCONNLIMIT_UNLIMITED )
930
930
ereport (ERROR ,
931
931
(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
932
932
errmsg ("invalid connection limit: %d" , dbconnlimit )));
@@ -977,6 +977,16 @@ createdb(ParseState *pstate, const CreatedbStmt *stmt)
977
977
errmsg ("template database \"%s\" does not exist" ,
978
978
dbtemplate )));
979
979
980
+ /*
981
+ * If the source database was in the process of being dropped, we can't
982
+ * use it as a template.
983
+ */
984
+ if (database_is_invalid_oid (src_dboid ))
985
+ ereport (ERROR ,
986
+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
987
+ errmsg ("cannot use invalid database \"%s\" as template" , dbtemplate ),
988
+ errhint ("Use DROP DATABASE to drop invalid databases." ));
989
+
980
990
/*
981
991
* Permission check: to copy a DB that's not marked datistemplate, you
982
992
* must be superuser or the owner thereof.
@@ -1576,6 +1586,7 @@ dropdb(const char *dbname, bool missing_ok, bool force)
1576
1586
bool db_istemplate ;
1577
1587
Relation pgdbrel ;
1578
1588
HeapTuple tup ;
1589
+ Form_pg_database datform ;
1579
1590
int notherbackends ;
1580
1591
int npreparedxacts ;
1581
1592
int nslots ,
@@ -1691,17 +1702,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
1691
1702
dbname ),
1692
1703
errdetail_busy_db (notherbackends , npreparedxacts )));
1693
1704
1694
- /*
1695
- * Remove the database's tuple from pg_database.
1696
- */
1697
- tup = SearchSysCache1 (DATABASEOID , ObjectIdGetDatum (db_id ));
1698
- if (!HeapTupleIsValid (tup ))
1699
- elog (ERROR , "cache lookup failed for database %u" , db_id );
1700
-
1701
- CatalogTupleDelete (pgdbrel , & tup -> t_self );
1702
-
1703
- ReleaseSysCache (tup );
1704
-
1705
1705
/*
1706
1706
* Delete any comments or security labels associated with the database.
1707
1707
*/
@@ -1718,6 +1718,37 @@ dropdb(const char *dbname, bool missing_ok, bool force)
1718
1718
*/
1719
1719
dropDatabaseDependencies (db_id );
1720
1720
1721
+ /*
1722
+ * Tell the cumulative stats system to forget it immediately, too.
1723
+ */
1724
+ pgstat_drop_database (db_id );
1725
+
1726
+ tup = SearchSysCacheCopy1 (DATABASEOID , ObjectIdGetDatum (db_id ));
1727
+ if (!HeapTupleIsValid (tup ))
1728
+ elog (ERROR , "cache lookup failed for database %u" , db_id );
1729
+ datform = (Form_pg_database ) GETSTRUCT (tup );
1730
+
1731
+ /*
1732
+ * Except for the deletion of the catalog row, subsequent actions are not
1733
+ * transactional (consider DropDatabaseBuffers() discarding modified
1734
+ * buffers). But we might crash or get interrupted below. To prevent
1735
+ * accesses to a database with invalid contents, mark the database as
1736
+ * invalid using an in-place update.
1737
+ *
1738
+ * We need to flush the WAL before continuing, to guarantee the
1739
+ * modification is durable before performing irreversible filesystem
1740
+ * operations.
1741
+ */
1742
+ datform -> datconnlimit = DATCONNLIMIT_INVALID_DB ;
1743
+ heap_inplace_update (pgdbrel , tup );
1744
+ XLogFlush (XactLastRecEnd );
1745
+
1746
+ /*
1747
+ * Also delete the tuple - transactionally. If this transaction commits,
1748
+ * the row will be gone, but if we fail, dropdb() can be invoked again.
1749
+ */
1750
+ CatalogTupleDelete (pgdbrel , & tup -> t_self );
1751
+
1721
1752
/*
1722
1753
* Drop db-specific replication slots.
1723
1754
*/
@@ -1730,11 +1761,6 @@ dropdb(const char *dbname, bool missing_ok, bool force)
1730
1761
*/
1731
1762
DropDatabaseBuffers (db_id );
1732
1763
1733
- /*
1734
- * Tell the cumulative stats system to forget it immediately, too.
1735
- */
1736
- pgstat_drop_database (db_id );
1737
-
1738
1764
/*
1739
1765
* Tell checkpointer to forget any pending fsync and unlink requests for
1740
1766
* files in the database; else the fsyncs will fail at next checkpoint, or
@@ -2248,7 +2274,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2248
2274
ListCell * option ;
2249
2275
bool dbistemplate = false;
2250
2276
bool dballowconnections = true;
2251
- int dbconnlimit = -1 ;
2277
+ int dbconnlimit = DATCONNLIMIT_UNLIMITED ;
2252
2278
DefElem * distemplate = NULL ;
2253
2279
DefElem * dallowconnections = NULL ;
2254
2280
DefElem * dconnlimit = NULL ;
@@ -2319,7 +2345,7 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2319
2345
if (dconnlimit && dconnlimit -> arg )
2320
2346
{
2321
2347
dbconnlimit = defGetInt32 (dconnlimit );
2322
- if (dbconnlimit < -1 )
2348
+ if (dbconnlimit < DATCONNLIMIT_UNLIMITED )
2323
2349
ereport (ERROR ,
2324
2350
(errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
2325
2351
errmsg ("invalid connection limit: %d" , dbconnlimit )));
@@ -2346,6 +2372,14 @@ AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel)
2346
2372
datform = (Form_pg_database ) GETSTRUCT (tuple );
2347
2373
dboid = datform -> oid ;
2348
2374
2375
+ if (database_is_invalid_form (datform ))
2376
+ {
2377
+ ereport (FATAL ,
2378
+ errcode (ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE ),
2379
+ errmsg ("cannot alter invalid database \"%s\"" , stmt -> dbname ),
2380
+ errhint ("Use DROP DATABASE to drop invalid databases." ));
2381
+ }
2382
+
2349
2383
if (!object_ownercheck (DatabaseRelationId , dboid , GetUserId ()))
2350
2384
aclcheck_error (ACLCHECK_NOT_OWNER , OBJECT_DATABASE ,
2351
2385
stmt -> dbname );
@@ -3064,6 +3098,42 @@ get_database_name(Oid dbid)
3064
3098
return result ;
3065
3099
}
3066
3100
3101
+
3102
+ /*
3103
+ * While dropping a database the pg_database row is marked invalid, but the
3104
+ * catalog contents still exist. Connections to such a database are not
3105
+ * allowed.
3106
+ */
3107
+ bool
3108
+ database_is_invalid_form (Form_pg_database datform )
3109
+ {
3110
+ return datform -> datconnlimit == DATCONNLIMIT_INVALID_DB ;
3111
+ }
3112
+
3113
+
3114
+ /*
3115
+ * Convenience wrapper around database_is_invalid_form()
3116
+ */
3117
+ bool
3118
+ database_is_invalid_oid (Oid dboid )
3119
+ {
3120
+ HeapTuple dbtup ;
3121
+ Form_pg_database dbform ;
3122
+ bool invalid ;
3123
+
3124
+ dbtup = SearchSysCache1 (DATABASEOID , ObjectIdGetDatum (dboid ));
3125
+ if (!HeapTupleIsValid (dbtup ))
3126
+ elog (ERROR , "cache lookup failed for database %u" , dboid );
3127
+ dbform = (Form_pg_database ) GETSTRUCT (dbtup );
3128
+
3129
+ invalid = database_is_invalid_form (dbform );
3130
+
3131
+ ReleaseSysCache (dbtup );
3132
+
3133
+ return invalid ;
3134
+ }
3135
+
3136
+
3067
3137
/*
3068
3138
* recovery_create_dbdir()
3069
3139
*
0 commit comments