-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathpgsql.py
191 lines (157 loc) · 6.25 KB
/
pgsql.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
PostgreSQL accounts and databases for members and societies.
"""
from functools import wraps
from typing import Optional, List, Set, Tuple, Union
from psycopg2.extensions import connection as Connection, cursor as Cursor
from srcf.database import Member, Society
from srcf.database.queries import get_member, get_society
from ..email import send
from ..plumbing import pgsql
from ..plumbing.common import Collect, Owner, State, owner_name, Password, Result, Unset
def connect(db: Optional[str] = None) -> Connection:
"""
Connect to the PostgreSQL server using ident authentication.
"""
return pgsql.connect("postgres.internal", db or "sysadmins")
@wraps(pgsql.context)
def context(db: Optional[str] = None):
"""
Run multiple PostgreSQL commands in a single connection:
with context() as cursor:
create_account(cursor, owner)
create_database(cursor, owner)
"""
return pgsql.context(connect(db))
def get_owned_databases(cursor: Cursor, owner: Owner) -> List[str]:
"""
Find all PostgreSQL databases belonging to a given owner.
"""
try:
role = pgsql.get_role(cursor, owner_name(owner))
except KeyError:
return []
else:
return pgsql.get_role_databases(cursor, role)
@Result.collect_value
def new_account(cursor: Cursor, owner: Owner) -> Collect[Optional[Password]]:
"""
Create a PostgreSQL user account for a given member or society.
For members, grants are added to all society roles for which they are a member.
"""
username = owner_name(owner)
res_passwd = yield from pgsql.ensure_user(cursor, username)
if isinstance(owner, Member):
yield sync_member_roles(cursor, owner)
elif isinstance(owner, Society):
yield sync_society_roles(cursor, owner)
return res_passwd.value
def _sync_roles(cursor: Cursor, current: Set[Tuple[str, pgsql.Role]],
needed: Set[Tuple[str, pgsql.Role]]):
for username, role in needed - current:
yield pgsql.grant_role(cursor, username, role)
for username, role in current - needed:
yield pgsql.revoke_role(cursor, username, role)
@Result.collect
def sync_member_roles(cursor: Cursor, member: Member) -> Collect[None]:
"""
Adjust grants for society roles to match the given member's memberships.
"""
if not member.societies:
return
username = owner_name(member)
current: Set[Tuple[str, pgsql.Role]] = set()
for role in pgsql.get_user_roles(cursor, username):
# Filter active roles to those owned by society accounts.
if role[0] == member.crsid:
continue
try:
get_society(role[0])
except KeyError:
continue
else:
current.add((username, role))
roles = pgsql.get_roles(cursor, *(soc.society for soc in member.societies))
needed = set((username, role) for role in roles)
yield from _sync_roles(cursor, current, needed)
@Result.collect
def sync_society_roles(cursor: Cursor, society: Society) -> Collect[None]:
"""
Adjust grants for member roles to match the given society's admins.
"""
try:
role = pgsql.get_role(cursor, owner_name(society))
except KeyError:
return
current: Set[Tuple[str, pgsql.Role]] = set()
for username in pgsql.get_role_users(cursor, role):
# Filter active roles to those owned by member accounts.
try:
get_member(username)
except KeyError:
continue
else:
current.add((username, role))
needed = set((user[0], role) for user in pgsql.get_roles(cursor, *society.admin_crsids))
yield from _sync_roles(cursor, current, needed)
@Result.collect_value
def reset_password(cursor: Cursor, owner: Owner) -> Collect[Password]:
"""
Reset the password of a member's or society's PostgreSQL user account.
"""
res_passwd = yield from pgsql.reset_password(cursor, owner_name(owner))
yield send(owner, "tasks/pgsql_password.j2", {"username": owner_name(owner),
"password": res_passwd.value})
return res_passwd.value
def disable_account(cursor: Cursor, owner: Owner) -> Result[Unset]:
"""
Disable a PostgreSQL user account for a given member or society if one exists.
"""
try:
role = pgsql.get_role(cursor, owner_name(owner))
except KeyError:
return Result(State.unchanged)
if not role[1]:
return Result(State.unchanged)
return pgsql.disable_role(cursor, role)
def drop_account(cursor: Cursor, owner: Owner) -> Result[Unset]:
"""
Drop a PostgreSQL user account for a given member or society.
"""
return pgsql.drop_user(cursor, owner_name(owner))
@Result.collect_value
def create_database(cursor: Cursor, owner: Owner, name: Optional[str] = None) -> Collect[str]:
"""
Create a new PostgreSQL database for the owner, defaulting to one matching their username.
"""
role = pgsql.get_role(cursor, owner_name(owner))
name = name or role[0]
yield pgsql.create_database(cursor, name, role)
return name
@Result.collect_value
def drop_database(cursor: Cursor, target: Union[Owner, str]) -> Collect[str]:
"""
Drop the named, or owner-named, PostgreSQL database.
"""
name = target if isinstance(target, str) else owner_name(target)
yield pgsql.drop_database(cursor, name)
return name
@Result.collect
def drop_all_databases(cursor: Cursor, owner: Owner) -> Collect[None]:
"""
Drop all databases belonging to the owner.
"""
for database in get_owned_databases(cursor, owner):
yield pgsql.drop_database(cursor, database)
@Result.collect_value
def create_account(cursor: Cursor, owner: Owner) -> Collect[Tuple[Optional[Password], str]]:
"""
Create a PostgreSQL user account and initial database for a member or society.
"""
res_account = yield from new_account(cursor, owner)
res_db = yield from create_database(cursor, owner)
if res_account.state == State.created:
yield send(owner, "tasks/pgsql_create.j2", {"username": owner_name(owner),
"password": res_account.value,
"database": res_db.value})
return (res_account.value, res_db.value)