From 02c337551133888dfbde2ed0b2fba7cc0c65429e Mon Sep 17 00:00:00 2001 From: "v.shepard" Date: Mon, 10 Apr 2023 23:03:36 +0200 Subject: [PATCH 1/6] PBCKP-137 update node.py --- testgres/__init__.py | 4 +- testgres/node.py | 337 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 327 insertions(+), 14 deletions(-) diff --git a/testgres/__init__.py b/testgres/__init__.py index 9d5e37cf..1b33ba3b 100644 --- a/testgres/__init__.py +++ b/testgres/__init__.py @@ -32,7 +32,7 @@ ProcessType, \ DumpFormat -from .node import PostgresNode +from .node import PostgresNode, NodeApp from .utils import \ reserve_port, \ @@ -53,7 +53,7 @@ "NodeConnection", "DatabaseError", "InternalError", "ProgrammingError", "OperationalError", "TestgresException", "ExecUtilException", "QueryException", "TimeoutException", "CatchUpException", "StartNodeException", "InitNodeException", "BackupException", "XLogMethod", "IsolationLevel", "NodeStatus", "ProcessType", "DumpFormat", - "PostgresNode", + "PostgresNode", "NodeApp", "reserve_port", "release_port", "bound_ports", "get_bin_path", "get_pg_config", "get_pg_version", "First", "Any", ] diff --git a/testgres/node.py b/testgres/node.py index 378e6803..0d1232a2 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -2,6 +2,12 @@ import io import os +import random +import shutil +import signal +import threading +from queue import Queue + import psutil import subprocess import time @@ -11,6 +17,15 @@ except ImportError: from collections import Iterable +# we support both pg8000 and psycopg2 +try: + import psycopg2 as pglib +except ImportError: + try: + import pg8000 as pglib + except ImportError: + raise ImportError("You must have psycopg2 or pg8000 modules installed") + from shutil import rmtree from six import raise_from, iteritems, text_type from tempfile import mkstemp, mkdtemp @@ -86,6 +101,10 @@ from .backup import NodeBackup +InternalError = pglib.InternalError +ProgrammingError = pglib.ProgrammingError +OperationalError = pglib.OperationalError + class ProcessProxy(object): """ @@ -140,6 +159,9 @@ def __init__(self, name=None, port=None, base_dir=None): self.utils_log_name = self.utils_log_file self.pg_log_name = self.pg_log_file + # Node state + self.is_started = False + def __enter__(self): return self @@ -629,9 +651,38 @@ def get_control_data(self): return out_dict + def slow_start(self, replica=False): + """ + Starts the PostgreSQL instance and then polls the instance + until it reaches the expected state (primary or replica). The state is checked + using the pg_is_in_recovery() function. + + Args: + replica: If True, waits for the instance to be in recovery (i.e., replica mode). + If False, waits for the instance to be in primary mode. Default is False. + """ + self.start() + + if replica: + query = 'SELECT pg_is_in_recovery()' + else: + query = 'SELECT not pg_is_in_recovery()' + # Call poll_query_until until the expected value is returned + self.poll_query_until( + dbname="template1", + query=query, + suppress={pglib.InternalError, + QueryException, + pglib.ProgrammingError, + pglib.OperationalError}) + + def start(self, params=[], wait=True): """ - Start this node using pg_ctl. + Starts the PostgreSQL node using pg_ctl if node has not been started. + By default, it waits for the operation to complete before returning. + Optionally, it can return immediately without waiting for the start operation + to complete by setting the `wait` parameter to False. Args: params: additional arguments for pg_ctl. @@ -640,14 +691,16 @@ def start(self, params=[], wait=True): Returns: This instance of :class:`.PostgresNode`. """ + if self.is_started: + return self _params = [ - get_bin_path("pg_ctl"), - "-D", self.data_dir, - "-l", self.pg_log_file, - "-w" if wait else '-W', # --wait or --no-wait - "start" - ] + params # yapf: disable + get_bin_path("pg_ctl"), + "-D", self.data_dir, + "-l", self.pg_log_file, + "-w" if wait else '-W', # --wait or --no-wait + "start" + ] + params # yapf: disable try: execute_utility(_params, self.utils_log_file) @@ -657,20 +710,22 @@ def start(self, params=[], wait=True): raise_from(StartNodeException(msg, files), e) self._maybe_start_logger() - + self.is_started = True return self def stop(self, params=[], wait=True): """ - Stop this node using pg_ctl. + Stops the PostgreSQL node using pg_ctl if the node has been started. Args: - params: additional arguments for pg_ctl. - wait: wait until operation completes. + params: A list of additional arguments for pg_ctl. Defaults to None. + wait: If True, waits until the operation is complete. Defaults to True. Returns: This instance of :class:`.PostgresNode`. """ + if not self.is_started: + return self _params = [ get_bin_path("pg_ctl"), @@ -682,9 +737,25 @@ def stop(self, params=[], wait=True): execute_utility(_params, self.utils_log_file) self._maybe_stop_logger() - + self.is_started = False return self + def kill(self, someone=None): + """ + Kills the PostgreSQL node or a specified auxiliary process if the node is running. + + Args: + someone: A key to the auxiliary process in the auxiliary_pids dictionary. + If None, the main PostgreSQL node process will be killed. Defaults to None. + """ + if self.is_started: + sig = signal.SIGKILL if os.name != 'nt' else signal.SIGBREAK + if someone == None: + os.kill(self.pid, sig) + else: + os.kill(self.auxiliary_pids[someone][0], sig) + self.is_started = False + def restart(self, params=[]): """ Restart this node using pg_ctl. @@ -1359,3 +1430,245 @@ def connect(self, username=username, password=password, autocommit=autocommit) # yapf: disable + + def table_checksum(self, table, dbname="postgres"): + """ + Calculate the checksum of a table by hashing its rows. + + The function fetches rows from the table in chunks and calculates the checksum + by summing the hash values of each row. The function uses a separate thread + to fetch rows when there are more than 2000 rows in the table. + + Args: + table (str): The name of the table for which the checksum should be calculated. + dbname (str, optional): The name of the database where the table is located. Defaults to "postgres". + + Returns: + int: The calculated checksum of the table. + """ + + def fetch_rows(con, cursor_name): + while True: + rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}") + if not rows: + break + yield rows + + def process_rows(queue, con, cursor_name): + try: + for rows in fetch_rows(con, cursor_name): + queue.put(rows) + except Exception as e: + queue.put(e) + else: + queue.put(None) + + cursor_name = f"cur_{random.randint(0, 2 ** 48)}" + checksum = 0 + query_thread = None + + with self.connect(dbname=dbname) as con: + con.execute(f""" + DECLARE {cursor_name} NO SCROLL CURSOR FOR + SELECT t::text FROM {table} as t + """) + + queue = Queue(maxsize=50) + initial_rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}") + + if not initial_rows: + return 0 + + queue.put(initial_rows) + + if len(initial_rows) == 2000: + query_thread = threading.Thread(target=process_rows, args=(queue, con, cursor_name)) + query_thread.start() + else: + queue.put(None) + + while True: + rows = queue.get() + if rows is None: + break + if isinstance(rows, Exception): + raise rows + + for row in rows: + checksum += hash(row[0]) + + if query_thread is not None: + query_thread.join() + + con.execute(f"CLOSE {cursor_name}; ROLLBACK;") + + return checksum + + def pgbench_table_checksums(self, dbname="postgres", + pgbench_tables=('pgbench_branches', + 'pgbench_tellers', + 'pgbench_accounts', + 'pgbench_history') + ): + """ + Calculate the checksums of the specified pgbench tables using table_checksum method. + + Args: + dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres". + pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the + checksums should be calculated. Defaults to a tuple containing the + names of the default pgbench tables. + + Returns: + set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum. + """ + return {(table, self.table_checksum(table, dbname)) + for table in pgbench_tables} + + def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}): + """ + Update or remove configuration options in the specified configuration file, + updates the options specified in the options dictionary, removes any options + specified in the rm_options set, and writes the updated configuration back to + the file. + + Args: + options (dict): A dictionary containing the options to update or add, + with the option names as keys and their values as values. + config (str, optional): The name of the configuration file to update. + Defaults to 'postgresql.auto.conf'. + rm_options (set, optional): A set containing the names of the options to remove. + Defaults to an empty set. + """ + # parse postgresql.auto.conf + path = os.path.join(self.data_dir, config) + + with open(path, 'r') as f: + raw_content = f.read() + + current_options = {} + current_directives = [] + for line in raw_content.splitlines(): + + # ignore comments + if line.startswith('#'): + continue + + if line == '': + continue + + if line.startswith('include'): + current_directives.append(line) + continue + + name, var = line.partition('=')[::2] + name = name.strip() + var = var.strip() + var = var.strip('"') + var = var.strip("'") + + # remove options specified in rm_options list + if name in rm_options: + continue + + current_options[name] = var + + for option in options: + current_options[option] = options[option] + + auto_conf = '' + for option in current_options: + auto_conf += "{0} = '{1}'\n".format( + option, current_options[option]) + + for directive in current_directives: + auto_conf += directive + "\n" + + with open(path, 'wt') as f: + f.write(auto_conf) + + +class NodeApp: + """ + Functions that can be moved to testgres.PostgresNode + We use these functions in ProbackupController and need tp move them in some visible place + """ + + def __init__(self, test_path, nodes_to_cleanup): + self.test_path = test_path + self.nodes_to_cleanup = nodes_to_cleanup + + def make_empty( + self, + base_dir=None): + real_base_dir = os.path.join(self.test_path, base_dir) + shutil.rmtree(real_base_dir, ignore_errors=True) + os.makedirs(real_base_dir) + + node = PostgresNodeExtended(base_dir=real_base_dir) + node.should_rm_dirs = True + self.nodes_to_cleanup.append(node) + + return node + + def make_simple( + self, + base_dir=None, + set_replication=False, + ptrack_enable=False, + initdb_params=[], + pg_options={}): + + node = self.make_empty(base_dir) + node.init( + initdb_params=initdb_params, allow_streaming=set_replication) + + # set major version + with open(os.path.join(node.data_dir, 'PG_VERSION')) as f: + node.major_version_str = str(f.read().rstrip()) + node.major_version = float(node.major_version_str) + + # Sane default parameters + options = {} + options['max_connections'] = 100 + options['shared_buffers'] = '10MB' + options['fsync'] = 'off' + + options['wal_level'] = 'logical' + options['hot_standby'] = 'off' + + options['log_line_prefix'] = '%t [%p]: [%l-1] ' + options['log_statement'] = 'none' + options['log_duration'] = 'on' + options['log_min_duration_statement'] = 0 + options['log_connections'] = 'on' + options['log_disconnections'] = 'on' + options['restart_after_crash'] = 'off' + options['autovacuum'] = 'off' + + # Allow replication in pg_hba.conf + if set_replication: + options['max_wal_senders'] = 10 + + if ptrack_enable: + options['ptrack.map_size'] = '1' + options['shared_preload_libraries'] = 'ptrack' + + if node.major_version >= 13: + options['wal_keep_size'] = '200MB' + else: + options['wal_keep_segments'] = '12' + + # set default values + node.set_auto_conf(options) + + # Apply given parameters + node.set_auto_conf(pg_options) + + # kludge for testgres + # https://github.com/postgrespro/testgres/issues/54 + # for PG >= 13 remove 'wal_keep_segments' parameter + if node.major_version >= 13: + node.set_auto_conf({}, 'postgresql.conf', ['wal_keep_segments']) + + return node From 1512afde8a40bee606046e6a305aa4017ec8419a Mon Sep 17 00:00:00 2001 From: "v.shepard" Date: Tue, 11 Apr 2023 15:02:00 +0200 Subject: [PATCH 2/6] PBCKP-137 up version 1.8.6 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index a5dc600e..5c6f4a07 100755 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ readme = f.read() setup( - version='1.8.5', + version='1.8.6', name='testgres', packages=['testgres'], description='Testing utility for PostgreSQL and its extensions', From 0d62e0e6881a8cd18e9acd58507fcae74ce71ad9 Mon Sep 17 00:00:00 2001 From: "v.shepard" Date: Tue, 11 Apr 2023 22:50:33 +0200 Subject: [PATCH 3/6] PBCKP-137 update node.py --- testgres/node.py | 163 +++++++++++++++++++---------------------------- 1 file changed, 67 insertions(+), 96 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 0d1232a2..6d1d4544 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -12,6 +12,7 @@ import subprocess import time + try: from collections.abc import Iterable except ImportError: @@ -104,6 +105,7 @@ InternalError = pglib.InternalError ProgrammingError = pglib.ProgrammingError OperationalError = pglib.OperationalError +DatabaseError = pglib.DatabaseError class ProcessProxy(object): @@ -651,13 +653,15 @@ def get_control_data(self): return out_dict - def slow_start(self, replica=False): + def slow_start(self, replica=False, dbname='template1', username='dev'): """ Starts the PostgreSQL instance and then polls the instance until it reaches the expected state (primary or replica). The state is checked using the pg_is_in_recovery() function. Args: + dbname: + username: replica: If True, waits for the instance to be in recovery (i.e., replica mode). If False, waits for the instance to be in primary mode. Default is False. """ @@ -668,14 +672,15 @@ def slow_start(self, replica=False): else: query = 'SELECT not pg_is_in_recovery()' # Call poll_query_until until the expected value is returned - self.poll_query_until( - dbname="template1", - query=query, - suppress={pglib.InternalError, - QueryException, - pglib.ProgrammingError, - pglib.OperationalError}) - + self.poll_query_until(query=query, + expected=False, + dbname=dbname, + username=username, + suppress={InternalError, + QueryException, + ProgrammingError, + OperationalError, + DatabaseError}) def start(self, params=[], wait=True): """ @@ -1432,96 +1437,66 @@ def connect(self, autocommit=autocommit) # yapf: disable def table_checksum(self, table, dbname="postgres"): - """ - Calculate the checksum of a table by hashing its rows. - - The function fetches rows from the table in chunks and calculates the checksum - by summing the hash values of each row. The function uses a separate thread - to fetch rows when there are more than 2000 rows in the table. - - Args: - table (str): The name of the table for which the checksum should be calculated. - dbname (str, optional): The name of the database where the table is located. Defaults to "postgres". - - Returns: - int: The calculated checksum of the table. - """ - - def fetch_rows(con, cursor_name): - while True: - rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}") - if not rows: - break - yield rows - - def process_rows(queue, con, cursor_name): - try: - for rows in fetch_rows(con, cursor_name): - queue.put(rows) - except Exception as e: - queue.put(e) - else: - queue.put(None) - - cursor_name = f"cur_{random.randint(0, 2 ** 48)}" - checksum = 0 - query_thread = None - - with self.connect(dbname=dbname) as con: - con.execute(f""" - DECLARE {cursor_name} NO SCROLL CURSOR FOR - SELECT t::text FROM {table} as t - """) - - queue = Queue(maxsize=50) - initial_rows = con.execute(f"FETCH FORWARD 2000 FROM {cursor_name}") - - if not initial_rows: - return 0 - - queue.put(initial_rows) - - if len(initial_rows) == 2000: - query_thread = threading.Thread(target=process_rows, args=(queue, con, cursor_name)) - query_thread.start() - else: - queue.put(None) + con = self.connect(dbname=dbname) + + curname = "cur_" + str(random.randint(0, 2 ** 48)) + + con.execute(""" + DECLARE %s NO SCROLL CURSOR FOR + SELECT t::text FROM %s as t + """ % (curname, table)) + + que = Queue(maxsize=50) + sum = 0 + + rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname) + if not rows: + return 0 + que.put(rows) + + th = None + if len(rows) == 2000: + def querier(): + try: + while True: + rows = con.execute("FETCH FORWARD 2000 FROM %s" % curname) + if not rows: + break + que.put(rows) + except Exception as e: + que.put(e) + else: + que.put(None) - while True: - rows = queue.get() - if rows is None: - break - if isinstance(rows, Exception): - raise rows + th = threading.Thread(target=querier) + th.start() + else: + que.put(None) - for row in rows: - checksum += hash(row[0]) + while True: + rows = que.get() + if rows is None: + break + if isinstance(rows, Exception): + raise rows + # hash uses SipHash since Python3.4, therefore it is good enough + for row in rows: + sum += hash(row[0]) - if query_thread is not None: - query_thread.join() + if th is not None: + th.join() - con.execute(f"CLOSE {cursor_name}; ROLLBACK;") + con.execute("CLOSE %s; ROLLBACK;" % curname) - return checksum + con.close() + return sum def pgbench_table_checksums(self, dbname="postgres", - pgbench_tables=('pgbench_branches', - 'pgbench_tellers', - 'pgbench_accounts', - 'pgbench_history') + pgbench_tables = ('pgbench_branches', + 'pgbench_tellers', + 'pgbench_accounts', + 'pgbench_history') ): - """ - Calculate the checksums of the specified pgbench tables using table_checksum method. - - Args: - dbname (str, optional): The name of the database where the pgbench tables are located. Defaults to "postgres". - pgbench_tables (tuple of str, optional): A tuple containing the names of the pgbench tables for which the - checksums should be calculated. Defaults to a tuple containing the - names of the default pgbench tables. - - Returns: - set of tuple: A set of tuples, where each tuple contains the table name and its corresponding checksum. - """ return {(table, self.table_checksum(table, dbname)) for table in pgbench_tables} @@ -1589,10 +1564,6 @@ def set_auto_conf(self, options, config='postgresql.auto.conf', rm_options={}): class NodeApp: - """ - Functions that can be moved to testgres.PostgresNode - We use these functions in ProbackupController and need tp move them in some visible place - """ def __init__(self, test_path, nodes_to_cleanup): self.test_path = test_path @@ -1605,7 +1576,7 @@ def make_empty( shutil.rmtree(real_base_dir, ignore_errors=True) os.makedirs(real_base_dir) - node = PostgresNodeExtended(base_dir=real_base_dir) + node = PostgresNode(base_dir=real_base_dir) node.should_rm_dirs = True self.nodes_to_cleanup.append(node) From 8be1b3a72cecd7dd15862c3258b97fb5834e6737 Mon Sep 17 00:00:00 2001 From: "v.shepard" Date: Mon, 17 Apr 2023 10:43:16 +0200 Subject: [PATCH 4/6] PBCKP-137 update node --- testgres/node.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 6d1d4544..17b9a260 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -105,7 +105,6 @@ InternalError = pglib.InternalError ProgrammingError = pglib.ProgrammingError OperationalError = pglib.OperationalError -DatabaseError = pglib.DatabaseError class ProcessProxy(object): @@ -653,7 +652,7 @@ def get_control_data(self): return out_dict - def slow_start(self, replica=False, dbname='template1', username='dev'): + def slow_start(self, replica=False, dbname='template1', username=default_username()): """ Starts the PostgreSQL instance and then polls the instance until it reaches the expected state (primary or replica). The state is checked @@ -673,14 +672,12 @@ def slow_start(self, replica=False, dbname='template1', username='dev'): query = 'SELECT not pg_is_in_recovery()' # Call poll_query_until until the expected value is returned self.poll_query_until(query=query, - expected=False, dbname=dbname, username=username, suppress={InternalError, QueryException, ProgrammingError, - OperationalError, - DatabaseError}) + OperationalError}) def start(self, params=[], wait=True): """ @@ -970,7 +967,7 @@ def psql(self, return process.returncode, out, err @method_decorator(positional_args_hack(['dbname', 'query'])) - def safe_psql(self, query=None, **kwargs): + def safe_psql(self, query=None, expect_error=False, **kwargs): """ Execute a query using psql. @@ -980,6 +977,8 @@ def safe_psql(self, query=None, **kwargs): dbname: database name to connect to. username: database user name. input: raw input to be passed. + expect_error: if True - fail if we didn't get ret + if False - fail if we got ret **kwargs are passed to psql(). @@ -992,7 +991,12 @@ def safe_psql(self, query=None, **kwargs): ret, out, err = self.psql(query=query, **kwargs) if ret: - raise QueryException((err or b'').decode('utf-8'), query) + if expect_error: + out = (err or b'').decode('utf-8') + else: + raise QueryException((err or b'').decode('utf-8'), query) + elif expect_error: + assert False, f"Exception was expected, but query finished successfully: `{query}` " return out From a3beb766c63d0aacff35da115ae9477256d0335b Mon Sep 17 00:00:00 2001 From: "v.shepard" Date: Fri, 28 Apr 2023 13:57:49 +0200 Subject: [PATCH 5/6] PBCKP-577 flake8 fixes --- testgres/node.py | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/testgres/node.py b/testgres/node.py index 17b9a260..e6ac44b0 100644 --- a/testgres/node.py +++ b/testgres/node.py @@ -115,6 +115,7 @@ class ProcessProxy(object): process: wrapped psutill.Process object ptype: instance of ProcessType """ + def __init__(self, process, ptype=None): self.process = process self.ptype = ptype or ProcessType.from_process(process) @@ -697,12 +698,12 @@ def start(self, params=[], wait=True): return self _params = [ - get_bin_path("pg_ctl"), - "-D", self.data_dir, - "-l", self.pg_log_file, - "-w" if wait else '-W', # --wait or --no-wait - "start" - ] + params # yapf: disable + get_bin_path("pg_ctl"), + "-D", self.data_dir, + "-l", self.pg_log_file, + "-w" if wait else '-W', # --wait or --no-wait + "start" + ] + params # yapf: disable try: execute_utility(_params, self.utils_log_file) @@ -752,7 +753,7 @@ def kill(self, someone=None): """ if self.is_started: sig = signal.SIGKILL if os.name != 'nt' else signal.SIGBREAK - if someone == None: + if someone is None: os.kill(self.pid, sig) else: os.kill(self.auxiliary_pids[someone][0], sig) @@ -1496,10 +1497,10 @@ def querier(): return sum def pgbench_table_checksums(self, dbname="postgres", - pgbench_tables = ('pgbench_branches', - 'pgbench_tellers', - 'pgbench_accounts', - 'pgbench_history') + pgbench_tables=('pgbench_branches', + 'pgbench_tellers', + 'pgbench_accounts', + 'pgbench_history') ): return {(table, self.table_checksum(table, dbname)) for table in pgbench_tables} From a1788501925569df57a1f2b7afca654e06b17db0 Mon Sep 17 00:00:00 2001 From: "v.shepard" Date: Fri, 28 Apr 2023 14:41:44 +0200 Subject: [PATCH 6/6] PBCKP-137 test change test_double_start --- tests/test_simple.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_simple.py b/tests/test_simple.py index d79fa79a..94420b04 100755 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -171,8 +171,8 @@ def test_node_exit(self): def test_double_start(self): with get_new_node().init().start() as node: # can't start node more than once - with self.assertRaises(StartNodeException): - node.start() + node.start() + self.assertTrue(node.is_started) def test_uninitialized_start(self): with get_new_node() as node: