Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
Skip to content

Commit 0d62e0e

Browse files
author
v.shepard
committed
PBCKP-137 update node.py
1 parent 1512afd commit 0d62e0e

File tree

1 file changed

+67
-96
lines changed

1 file changed

+67
-96
lines changed

testgres/node.py

Lines changed: 67 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import subprocess
1313
import time
1414

15+
1516
try:
1617
from collections.abc import Iterable
1718
except ImportError:
@@ -104,6 +105,7 @@
104105
InternalError = pglib.InternalError
105106
ProgrammingError = pglib.ProgrammingError
106107
OperationalError = pglib.OperationalError
108+
DatabaseError = pglib.DatabaseError
107109

108110

109111
class ProcessProxy(object):
@@ -651,13 +653,15 @@ def get_control_data(self):
651653

652654
return out_dict
653655

654-
def slow_start(self, replica=False):
656+
def slow_start(self, replica=False, dbname='template1', username='dev'):
655657
"""
656658
Starts the PostgreSQL instance and then polls the instance
657659
until it reaches the expected state (primary or replica). The state is checked
658660
using the pg_is_in_recovery() function.
659661
660662
Args:
663+
dbname:
664+
username:
661665
replica: If True, waits for the instance to be in recovery (i.e., replica mode).
662666
If False, waits for the instance to be in primary mode. Default is False.
663667
"""
@@ -668,14 +672,15 @@ def slow_start(self, replica=False):
668672
else:
669673
query = 'SELECT not pg_is_in_recovery()'
670674
# Call poll_query_until until the expected value is returned
671-
self.poll_query_until(
672-
dbname="template1",
673-
query=query,
674-
suppress={pglib.InternalError,
675-
QueryException,
676-
pglib.ProgrammingError,
677-
pglib.OperationalError})
678-
675+
self.poll_query_until(query=query,
676+
expected=False,
677+
dbname=dbname,
678+
username=username,
679+
suppress={InternalError,
680+
QueryException,
681+
ProgrammingError,
682+
OperationalError,
683+
DatabaseError})
679684

680685
def start(self, params=[], wait=True):
681686
"""
@@ -1432,96 +1437,66 @@ def connect(self,
14321437
autocommit=autocommit) # yapf: disable
14331438

14341439
def table_checksum(self, table, dbname="postgres"):
1435-
"""
1436-
Calculate the checksum of a table by hashing its rows.
1437-
1438-
The function fetches rows from the table in chunks and calculates the checksum
1439-
by summing the hash values of each row. The function uses a separate thread
1440-
to fetch rows when there are more than 2000 rows in the table.
1441-
1442-
Args:
1443-
table (str): The name of the table for which the checksum should be calculated.
1444-
dbname (str, optional): The name of the database where the table is located. Defaults to "postgres".
1445-
1446-
Returns:
1447-
int: The calculated checksum of the table.
1448-
"""
1449-
1450-
def fetch_rows(con, cursor_name):
1451-
while True:
1452-
rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}")
1453-
if not rows:
1454-
break
1455-
yield rows
1456-
1457-
def process_rows(queue, con, cursor_name):
1458-
try:
1459-
for rows in fetch_rows(con, cursor_name):
1460-
queue.put(rows)
1461-
except Exception as e:
1462-
queue.put(e)
1463-
else:
1464-
queue.put(None)
1465-
1466-
cursor_name = f"cur_{random.randint(0, 2 ** 48)}"
1467-
checksum = 0
1468-
query_thread = None
1469-
1470-
with self.connect(dbname=dbname) as con:
1471-
con.execute(f"""
1472-
DECLARE {cursor_name} NO SCROLL CURSOR FOR
1473-
SELECT t::text FROM {table} as t
1474-
""")
1475-
1476-
queue = Queue(maxsize=50)
1477-
initial_rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}")
1478-
1479-
if not initial_rows:
1480-
return 0
1481-
1482-
queue.put(initial_rows)
1483-
1484-
if len(initial_rows) == 2000:
1485-
query_thread = threading.Thread(target=process_rows, args=(queue, con, cursor_name))
1486-
query_thread.start()
1487-
else:
1488-
queue.put(None)
1440+
con = self.connect(dbname=dbname)
1441+
1442+
curname = "cur_" + str(random.randint(0, 2 ** 48))
1443+
1444+
con.execute("""
1445+
DECLARE %s NO SCROLL CURSOR FOR
1446+
SELECT t::text FROM %s as t
1447+
""" % (curname, table))
1448+
1449+
que = Queue(maxsize=50)
1450+
sum = 0
1451+
1452+
rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname)
1453+
if not rows:
1454+
return 0
1455+
que.put(rows)
1456+
1457+
th = None
1458+
if len(rows) == 2000:
1459+
def querier():
1460+
try:
1461+
while True:
1462+
rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname)
1463+
if not rows:
1464+
break
1465+
que.put(rows)
1466+
except Exception as e:
1467+
que.put(e)
1468+
else:
1469+
que.put(None)
14891470

1490-
while True:
1491-
rows = queue.get()
1492-
if rows is None:
1493-
break
1494-
if isinstance(rows, Exception):
1495-
raise rows
1471+
th = threading.Thread(target=querier)
1472+
th.start()
1473+
else:
1474+
que.put(None)
14961475

1497-
for row in rows:
1498-
checksum += hash(row[0])
1476+
while True:
1477+
rows = que.get()
1478+
if rows is None:
1479+
break
1480+
if isinstance(rows, Exception):
1481+
raise rows
1482+
# hash uses SipHash since Python3.4, therefore it is good enough
1483+
for row in rows:
1484+
sum += hash(row[0])
14991485

1500-
if query_thread is not None:
1501-
query_thread.join()
1486+
if th is not None:
1487+
th.join()
15021488

1503-
con.execute(f"CLOSE {cursor_name}; ROLLBACK;")
1489+
con.execute("CLOSE %s; ROLLBACK;" % curname)
15041490

1505-
return checksum
1491+
con.close()
1492+
return sum
15061493

15071494
def pgbench_table_checksums(self, dbname="postgres",
1508-
pgbench_tables=('pgbench_branches',
1509-
'pgbench_tellers',
1510-
'pgbench_accounts',
1511-
'pgbench_history')
1495+
pgbench_tables = ('pgbench_branches',
1496+
'pgbench_tellers',
1497+
'pgbench_accounts',
1498+
'pgbench_history')
15121499
):
1513-
"""
1514-
Calculate the checksums of the specified pgbench tables using table_checksum method.
1515-
1516-
Args:
1517-
dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres".
1518-
pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the
1519-
checksums should be calculated. Defaults to a tuple containing the
1520-
names of the default pgbench tables.
1521-
1522-
Returns:
1523-
set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum.
1524-
"""
15251500
return {(table, self.table_checksum(table, dbname))
15261501
for table in pgbench_tables}
15271502

@@ -1589,10 +1564,6 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}):
15891564

15901565

15911566
class NodeApp:
1592-
"""
1593-
Functions that can be moved to testgres.PostgresNode
1594-
We use these functions in ProbackupController and need tp move them in some visible place
1595-
"""
15961567

15971568
def __init__(self, test_path, nodes_to_cleanup):
15981569
self.test_path = test_path
@@ -1605,7 +1576,7 @@ def make_empty(
16051576
shutil.rmtree(real_base_dir, ignore_errors=True)
16061577
os.makedirs(real_base_dir)
16071578

1608-
node = PostgresNodeExtended(base_dir=real_base_dir)
1579+
node = PostgresNode(base_dir=real_base_dir)
16091580
node.should_rm_dirs = True
16101581
self.nodes_to_cleanup.append(node)
16111582

0 commit comments

Comments
 (0)