diff --git a/__init__.py b/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 00000000..c94eabc2 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,9 @@ +[pytest] +testpaths = ["./tests", "./testgres/plugins/pg_probackup2/pg_probackup2/tests"] +addopts = --strict-markers +markers = +#log_file = logs/pytest.log +log_file_level = NOTSET +log_file_format = %(levelname)8s [%(asctime)s] %(message)s +log_file_date_format=%Y-%m-%d %H:%M:%S + diff --git a/run_tests.sh b/run_tests.sh index 73c459be..e9d58b54 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -22,11 +22,11 @@ export VIRTUAL_ENV_DISABLE_PROMPT=1 source $VENV_PATH/bin/activate # install utilities -$PIP install coverage flake8 psutil Sphinx +$PIP install coverage flake8 psutil Sphinx pytest pytest-xdist psycopg2 six psutil # install testgres' dependencies export PYTHONPATH=$(pwd) -$PIP install . +# $PIP install . # test code quality flake8 . @@ -38,21 +38,19 @@ rm -f $COVERAGE_FILE # run tests (PATH) -time coverage run -a tests/test_simple.py +time coverage run -a -m pytest -l -v -n 4 -k "TestgresTests" # run tests (PG_BIN) time \ PG_BIN=$(dirname $(which pg_config)) \ - ALT_CONFIG=1 \ - coverage run -a tests/test_simple.py + coverage run -a -m pytest -l -v -n 4 -k "TestgresTests" # run tests (PG_CONFIG) time \ PG_CONFIG=$(which pg_config) \ - ALT_CONFIG=1 \ - coverage run -a tests/test_simple.py + coverage run -a -m pytest -l -v -n 4 -k "TestgresTests" # show coverage diff --git a/testgres/plugins/__init__.py b/testgres/plugins/__init__.py index 8c19a23b..824eadc6 100644 --- a/testgres/plugins/__init__.py +++ b/testgres/plugins/__init__.py @@ -1,7 +1,7 @@ -from pg_probackup2.gdb import GDBobj -from pg_probackup2.app import ProbackupApp, ProbackupException -from pg_probackup2.init_helpers import init_params -from pg_probackup2.storage.fs_backup import FSTestBackupDir +from .pg_probackup2.pg_probackup2.gdb import GDBobj +from .pg_probackup2.pg_probackup2.app import ProbackupApp, ProbackupException +from .pg_probackup2.pg_probackup2.init_helpers import init_params +from .pg_probackup2.pg_probackup2.storage.fs_backup import FSTestBackupDir __all__ = [ "ProbackupApp", "ProbackupException", "init_params", "FSTestBackupDir", "GDBobj" diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py b/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py index 078fdbab..c4570a39 100644 --- a/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py +++ b/testgres/plugins/pg_probackup2/pg_probackup2/init_helpers.py @@ -121,8 +121,7 @@ def __init__(self): self.probackup_path = probackup_path_tmp if not self.probackup_path: - logging.error('pg_probackup binary is not found') - exit(1) + raise Exception('pg_probackup binary is not found') if os.name == 'posix': self.EXTERNAL_DIRECTORY_DELIMITER = ':' @@ -213,11 +212,15 @@ def __init__(self): if self.probackup_version.split('.')[0].isdigit(): self.major_version = int(self.probackup_version.split('.')[0]) else: - logging.error('Can\'t process pg_probackup version \"{}\": the major version is expected to be a number'.format(self.probackup_version)) - sys.exit(1) + raise Exception('Can\'t process pg_probackup version \"{}\": the major version is expected to be a number'.format(self.probackup_version)) def test_env(self): return self._test_env.copy() -init_params = Init() +try: + init_params = Init() +except Exception as e: + logging.error(str(e)) + logging.warning("testgres.plugins.probackup2.init_params is set to None.") + init_params = None diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/tests/__init__.py b/testgres/plugins/pg_probackup2/pg_probackup2/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/tests/basic_test.py b/testgres/plugins/pg_probackup2/pg_probackup2/tests/basic_test.py deleted file mode 100644 index b63531ec..00000000 --- a/testgres/plugins/pg_probackup2/pg_probackup2/tests/basic_test.py +++ /dev/null @@ -1,80 +0,0 @@ -import logging -import os -import shutil -import unittest -import testgres -from pg_probackup2.app import ProbackupApp -from pg_probackup2.init_helpers import Init, init_params -from pg_probackup2.app import build_backup_dir - - -class TestUtils: - @staticmethod - def get_module_and_function_name(test_id): - try: - module_name = test_id.split('.')[-2] - fname = test_id.split('.')[-1] - except IndexError: - logging.warning(f"Couldn't get module name and function name from test_id: `{test_id}`") - module_name, fname = test_id.split('(')[1].split('.')[1], test_id.split('(')[0] - return module_name, fname - - -class ProbackupTest(unittest.TestCase): - def setUp(self): - self.setup_test_environment() - self.setup_test_paths() - self.setup_backup_dir() - self.setup_probackup() - - def setup_test_environment(self): - self.output = None - self.cmd = None - self.nodes_to_cleanup = [] - self.module_name, self.fname = TestUtils.get_module_and_function_name(self.id()) - self.test_env = Init().test_env() - - def setup_test_paths(self): - self.rel_path = os.path.join(self.module_name, self.fname) - self.test_path = os.path.join(init_params.tmp_path, self.rel_path) - os.makedirs(self.test_path) - self.pb_log_path = os.path.join(self.test_path, "pb_log") - - def setup_backup_dir(self): - self.backup_dir = build_backup_dir(self, 'backup') - self.backup_dir.cleanup() - - def setup_probackup(self): - self.pg_node = testgres.NodeApp(self.test_path, self.nodes_to_cleanup) - self.pb = ProbackupApp(self, self.pg_node, self.pb_log_path, self.test_env, - auto_compress_alg='zlib', backup_dir=self.backup_dir) - - def tearDown(self): - if os.path.exists(self.test_path): - shutil.rmtree(self.test_path) - - -class BasicTest(ProbackupTest): - def test_full_backup(self): - # Setting up a simple test node - node = self.pg_node.make_simple('node', pg_options={"fsync": "off", "synchronous_commit": "off"}) - - # Initialize and configure Probackup - self.pb.init() - self.pb.add_instance('node', node) - self.pb.set_archiving('node', node) - - # Start the node and initialize pgbench - node.slow_start() - node.pgbench_init(scale=100, no_vacuum=True) - - # Perform backup and validation - backup_id = self.pb.backup_node('node', node) - out = self.pb.validate('node', backup_id) - - # Check if the backup is valid - self.assertIn(f"INFO: Backup {backup_id} is valid", out) - - -if __name__ == "__main__": - unittest.main() diff --git a/testgres/plugins/pg_probackup2/pg_probackup2/tests/test_basic.py b/testgres/plugins/pg_probackup2/pg_probackup2/tests/test_basic.py new file mode 100644 index 00000000..ba788623 --- /dev/null +++ b/testgres/plugins/pg_probackup2/pg_probackup2/tests/test_basic.py @@ -0,0 +1,95 @@ +from __future__ import annotations + +import os +import shutil +import pytest + +from ...... import testgres +from ...pg_probackup2.app import ProbackupApp +from ...pg_probackup2.init_helpers import Init, init_params +from ..storage.fs_backup import FSTestBackupDir + + +class ProbackupTest: + pg_node: testgres.PostgresNode + + @staticmethod + def probackup_is_available() -> bool: + p = os.environ.get("PGPROBACKUPBIN") + + if p is None: + return False + + if not os.path.exists(p): + return False + + return True + + @pytest.fixture(autouse=True, scope="function") + def implicit_fixture(self, request: pytest.FixtureRequest): + assert isinstance(request, pytest.FixtureRequest) + self.helper__setUp(request) + yield + self.helper__tearDown() + + def helper__setUp(self, request: pytest.FixtureRequest): + assert isinstance(request, pytest.FixtureRequest) + + self.helper__setup_test_environment(request) + self.helper__setup_test_paths() + self.helper__setup_backup_dir() + self.helper__setup_probackup() + + def helper__setup_test_environment(self, request: pytest.FixtureRequest): + assert isinstance(request, pytest.FixtureRequest) + + self.output = None + self.cmd = None + self.nodes_to_cleanup = [] + self.module_name, self.fname = request.node.cls.__name__, request.node.name + self.test_env = Init().test_env() + + def helper__setup_test_paths(self): + self.rel_path = os.path.join(self.module_name, self.fname) + self.test_path = os.path.join(init_params.tmp_path, self.rel_path) + os.makedirs(self.test_path, exist_ok=True) + self.pb_log_path = os.path.join(self.test_path, "pb_log") + + def helper__setup_backup_dir(self): + self.backup_dir = self.helper__build_backup_dir('backup') + self.backup_dir.cleanup() + + def helper__setup_probackup(self): + self.pg_node = testgres.NodeApp(self.test_path, self.nodes_to_cleanup) + self.pb = ProbackupApp(self, self.pg_node, self.pb_log_path, self.test_env, + auto_compress_alg='zlib', backup_dir=self.backup_dir) + + def helper__tearDown(self): + if os.path.exists(self.test_path): + shutil.rmtree(self.test_path) + + def helper__build_backup_dir(self, backup='backup'): + return FSTestBackupDir(rel_path=self.rel_path, backup=backup) + + +@pytest.mark.skipif(not ProbackupTest.probackup_is_available(), reason="Check that PGPROBACKUPBIN is defined and is valid.") +class TestBasic(ProbackupTest): + def test_full_backup(self): + # Setting up a simple test node + node = self.pg_node.make_simple('node', pg_options={"fsync": "off", "synchronous_commit": "off"}) + + # Initialize and configure Probackup + self.pb.init() + self.pb.add_instance('node', node) + self.pb.set_archiving('node', node) + + # Start the node and initialize pgbench + node.slow_start() + node.pgbench_init(scale=100, no_vacuum=True) + + # Perform backup and validation + backup_id = self.pb.backup_node('node', node) + out = self.pb.validate('node', backup_id) + + # Check if the backup is valid + assert f"INFO: Backup {backup_id} is valid" in out diff --git a/tests/helpers/run_conditions.py b/tests/helpers/run_conditions.py index 8d57f753..11357c30 100644 --- a/tests/helpers/run_conditions.py +++ b/tests/helpers/run_conditions.py @@ -1,3 +1,4 @@ +# coding: utf-8 import pytest import platform diff --git a/tests/test_local.py b/tests/test_local.py index 4051bfb5..60a96c18 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -1,12 +1,13 @@ +# coding: utf-8 import os import pytest import re import tempfile -from testgres import ExecUtilException -from testgres import InvalidOperationException -from testgres import LocalOperations +from ..testgres import ExecUtilException +from ..testgres import InvalidOperationException +from ..testgres import LocalOperations from .helpers.run_conditions import RunConditions diff --git a/tests/test_remote.py b/tests/test_remote.py index 4330b92f..8b167e9f 100755 --- a/tests/test_remote.py +++ b/tests/test_remote.py @@ -1,13 +1,14 @@ +# coding: utf-8 import os import pytest import re import tempfile -from testgres import ExecUtilException -from testgres import InvalidOperationException -from testgres import RemoteOperations -from testgres import ConnectionParams +from ..testgres import ExecUtilException +from ..testgres import InvalidOperationException +from ..testgres import RemoteOperations +from ..testgres import ConnectionParams class TestRemoteOperations: diff --git a/tests/test_simple.py b/tests/test_simple.py index a751f0a3..6c433cd4 100644 --- a/tests/test_simple.py +++ b/tests/test_simple.py @@ -1,22 +1,22 @@ -#!/usr/bin/env python # coding: utf-8 - import os import re import subprocess import tempfile -import testgres import time import six -import unittest +import pytest import psutil +import platform import logging.config from contextlib import contextmanager from shutil import rmtree -from testgres import \ +from .. import testgres + +from ..testgres import \ InitNodeException, \ StartNodeException, \ ExecUtilException, \ @@ -27,31 +27,32 @@ InvalidOperationException, \ NodeApp -from testgres import \ +from ..testgres import \ TestgresConfig, \ configure_testgres, \ scoped_config, \ pop_config -from testgres import \ +from ..testgres import \ NodeStatus, \ ProcessType, \ IsolationLevel, \ get_new_node -from testgres import \ +from ..testgres import \ get_bin_path, \ get_pg_config, \ get_pg_version -from testgres import \ +from ..testgres import \ First, \ Any # NOTE: those are ugly imports -from testgres import bound_ports -from testgres.utils import PgVer, parse_pg_version -from testgres.node import ProcessProxy +from ..testgres import bound_ports +from ..testgres.utils import PgVer, parse_pg_version +from ..testgres.utils import file_tail +from ..testgres.node import ProcessProxy def pg_version_ge(version): @@ -105,11 +106,11 @@ def removing(f): rmtree(f, ignore_errors=True) -class TestgresTests(unittest.TestCase): +class TestgresTests: def test_node_repr(self): with get_new_node() as node: pattern = r"PostgresNode\(name='.+', port=.+, base_dir='.+'\)" - self.assertIsNotNone(re.match(pattern, str(node))) + assert re.match(pattern, str(node)) is not None def test_custom_init(self): with get_new_node() as node: @@ -126,15 +127,15 @@ def test_custom_init(self): lines = conf.readlines() # check number of lines - self.assertGreaterEqual(len(lines), 6) + assert (len(lines) >= 6) # there should be no trust entries at all - self.assertFalse(any('trust' in s for s in lines)) + assert not (any('trust' in s for s in lines)) def test_double_init(self): with get_new_node().init() as node: # can't initialize node more than once - with self.assertRaises(InitNodeException): + with pytest.raises(expected_exception=InitNodeException): node.init() def test_init_after_cleanup(self): @@ -143,10 +144,11 @@ def test_init_after_cleanup(self): node.cleanup() node.init().start().execute('select 1') - @unittest.skipUnless(util_exists('pg_resetwal.exe' if os.name == 'nt' else 'pg_resetwal'), 'pgbench might be missing') - @unittest.skipUnless(pg_version_ge('9.6'), 'requires 9.6+') def test_init_unique_system_id(self): # this function exists in PostgreSQL 9.6+ + __class__.helper__skip_test_if_util_not_exist("pg_resetwal") + __class__.helper__skip_test_if_pg_version_is_not_ge("9.6") + query = 'select system_identifier from pg_control_system()' with scoped_config(cache_initdb=False): @@ -155,8 +157,8 @@ def test_init_unique_system_id(self): with scoped_config(cache_initdb=True, cached_initdb_unique=True) as config: - self.assertTrue(config.cache_initdb) - self.assertTrue(config.cached_initdb_unique) + assert (config.cache_initdb) + assert (config.cached_initdb_unique) # spawn two nodes; ids must be different with get_new_node().init().start() as node1, \ @@ -166,37 +168,37 @@ def test_init_unique_system_id(self): id2 = node2.execute(query)[0] # ids must increase - self.assertGreater(id1, id0) - self.assertGreater(id2, id1) + assert (id1 > id0) + assert (id2 > id1) def test_node_exit(self): base_dir = None - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): with get_new_node().init() as node: base_dir = node.base_dir node.safe_psql('select 1') # we should save the DB for "debugging" - self.assertTrue(os.path.exists(base_dir)) + assert (os.path.exists(base_dir)) rmtree(base_dir, ignore_errors=True) with get_new_node().init() as node: base_dir = node.base_dir # should have been removed by default - self.assertFalse(os.path.exists(base_dir)) + assert not (os.path.exists(base_dir)) def test_double_start(self): with get_new_node().init().start() as node: # can't start node more than once node.start() - self.assertTrue(node.is_started) + assert (node.is_started) def test_uninitialized_start(self): with get_new_node() as node: # node is not initialized yet - with self.assertRaises(StartNodeException): + with pytest.raises(expected_exception=StartNodeException): node.start() def test_restart(self): @@ -205,13 +207,13 @@ def test_restart(self): # restart, ok res = node.execute('select 1') - self.assertEqual(res, [(1, )]) + assert (res == [(1, )]) node.restart() res = node.execute('select 2') - self.assertEqual(res, [(2, )]) + assert (res == [(2, )]) # restart, fail - with self.assertRaises(StartNodeException): + with pytest.raises(expected_exception=StartNodeException): node.append_conf('pg_hba.conf', 'DUMMY') node.restart() @@ -228,106 +230,107 @@ def test_reload(self): # check new value cmm_new = node.execute('show client_min_messages') - self.assertEqual('debug1', cmm_new[0][0].lower()) - self.assertNotEqual(cmm_old, cmm_new) + assert ('debug1' == cmm_new[0][0].lower()) + assert (cmm_old != cmm_new) def test_pg_ctl(self): with get_new_node() as node: node.init().start() status = node.pg_ctl(['status']) - self.assertTrue('PID' in status) + assert ('PID' in status) def test_status(self): - self.assertTrue(NodeStatus.Running) - self.assertFalse(NodeStatus.Stopped) - self.assertFalse(NodeStatus.Uninitialized) + assert (NodeStatus.Running) + assert not (NodeStatus.Stopped) + assert not (NodeStatus.Uninitialized) # check statuses after each operation with get_new_node() as node: - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Uninitialized) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) node.init() - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Stopped) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Stopped) node.start() - self.assertNotEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Running) + assert (node.pid != 0) + assert (node.status() == NodeStatus.Running) node.stop() - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Stopped) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Stopped) node.cleanup() - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Uninitialized) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) def test_psql(self): with get_new_node().init().start() as node: # check returned values (1 arg) res = node.psql('select 1') - self.assertEqual(rm_carriage_returns(res), (0, b'1\n', b'')) + assert (rm_carriage_returns(res) == (0, b'1\n', b'')) # check returned values (2 args) res = node.psql('postgres', 'select 2') - self.assertEqual(rm_carriage_returns(res), (0, b'2\n', b'')) + assert (rm_carriage_returns(res) == (0, b'2\n', b'')) # check returned values (named) res = node.psql(query='select 3', dbname='postgres') - self.assertEqual(rm_carriage_returns(res), (0, b'3\n', b'')) + assert (rm_carriage_returns(res) == (0, b'3\n', b'')) # check returned values (1 arg) res = node.safe_psql('select 4') - self.assertEqual(rm_carriage_returns(res), b'4\n') + assert (rm_carriage_returns(res) == b'4\n') # check returned values (2 args) res = node.safe_psql('postgres', 'select 5') - self.assertEqual(rm_carriage_returns(res), b'5\n') + assert (rm_carriage_returns(res) == b'5\n') # check returned values (named) res = node.safe_psql(query='select 6', dbname='postgres') - self.assertEqual(rm_carriage_returns(res), b'6\n') + assert (rm_carriage_returns(res) == b'6\n') # check feeding input node.safe_psql('create table horns (w int)') node.safe_psql('copy horns from stdin (format csv)', input=b"1\n2\n3\n\\.\n") _sum = node.safe_psql('select sum(w) from horns') - self.assertEqual(rm_carriage_returns(_sum), b'6\n') + assert (rm_carriage_returns(_sum) == b'6\n') # check psql's default args, fails - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.psql() node.stop() # check psql on stopped node, fails - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.safe_psql('select 1') def test_safe_psql__expect_error(self): with get_new_node().init().start() as node: err = node.safe_psql('select_or_not_select 1', expect_error=True) - self.assertTrue(type(err) == str) # noqa: E721 - self.assertIn('select_or_not_select', err) - self.assertIn('ERROR: syntax error at or near "select_or_not_select"', err) + assert (type(err) == str) # noqa: E721 + assert ('select_or_not_select' in err) + assert ('ERROR: syntax error at or near "select_or_not_select"' in err) # --------- - with self.assertRaises(InvalidOperationException) as ctx: + with pytest.raises( + expected_exception=InvalidOperationException, + match="^" + re.escape("Exception was expected, but query finished successfully: `select 1;`.") + "$" + ): node.safe_psql("select 1;", expect_error=True) - self.assertEqual(str(ctx.exception), "Exception was expected, but query finished successfully: `select 1;`.") - # --------- res = node.safe_psql("select 1;", expect_error=False) - self.assertEqual(rm_carriage_returns(res), b'1\n') + assert (rm_carriage_returns(res) == b'1\n') def test_transactions(self): with get_new_node().init().start() as node: @@ -341,12 +344,12 @@ def test_transactions(self): con.begin() con.execute('insert into test values (2)') res = con.execute('select * from test order by val asc') - self.assertListEqual(res, [(1, ), (2, )]) + assert (res == [(1, ), (2, )]) con.rollback() con.begin() res = con.execute('select * from test') - self.assertListEqual(res, [(1, )]) + assert (res == [(1, )]) con.rollback() con.begin() @@ -357,15 +360,15 @@ def test_control_data(self): with get_new_node() as node: # node is not initialized yet - with self.assertRaises(ExecUtilException): + with pytest.raises(expected_exception=ExecUtilException): node.get_control_data() node.init() data = node.get_control_data() # check returned dict - self.assertIsNotNone(data) - self.assertTrue(any('pg_control' in s for s in data.keys())) + assert data is not None + assert (any('pg_control' in s for s in data.keys())) def test_backup_simple(self): with get_new_node() as master: @@ -374,7 +377,7 @@ def test_backup_simple(self): master.init(allow_streaming=True) # node must be running - with self.assertRaises(BackupException): + with pytest.raises(expected_exception=BackupException): master.backup() # it's time to start node @@ -386,7 +389,7 @@ def test_backup_simple(self): with master.backup(xlog_method='stream') as backup: with backup.spawn_primary().start() as slave: res = slave.execute('select * from test order by i asc') - self.assertListEqual(res, [(1, ), (2, ), (3, ), (4, )]) + assert (res == [(1, ), (2, ), (3, ), (4, )]) def test_backup_multiple(self): with get_new_node() as node: @@ -394,12 +397,12 @@ def test_backup_multiple(self): with node.backup(xlog_method='fetch') as backup1, \ node.backup(xlog_method='fetch') as backup2: - self.assertNotEqual(backup1.base_dir, backup2.base_dir) + assert (backup1.base_dir != backup2.base_dir) with node.backup(xlog_method='fetch') as backup: with backup.spawn_primary('node1', destroy=False) as node1, \ backup.spawn_primary('node2', destroy=False) as node2: - self.assertNotEqual(node1.base_dir, node2.base_dir) + assert (node1.base_dir != node2.base_dir) def test_backup_exhaust(self): with get_new_node() as node: @@ -411,15 +414,17 @@ def test_backup_exhaust(self): pass # now let's try to create one more node - with self.assertRaises(BackupException): + with pytest.raises(expected_exception=BackupException): backup.spawn_primary() def test_backup_wrong_xlog_method(self): with get_new_node() as node: node.init(allow_streaming=True).start() - with self.assertRaises(BackupException, - msg='Invalid xlog_method "wrong"'): + with pytest.raises( + expected_exception=BackupException, + match="^" + re.escape('Invalid xlog_method "wrong"') + "$" + ): node.backup(xlog_method='wrong') def test_pg_ctl_wait_option(self): @@ -440,17 +445,18 @@ def test_replicate(self): with node.replicate().start() as replica: res = replica.execute('select 1') - self.assertListEqual(res, [(1, )]) + assert (res == [(1, )]) node.execute('create table test (val int)', commit=True) replica.catchup() res = node.execute('select * from test') - self.assertListEqual(res, []) + assert (res == []) - @unittest.skipUnless(pg_version_ge('9.6'), 'requires 9.6+') def test_synchronous_replication(self): + __class__.helper__skip_test_if_pg_version_is_not_ge("9.6") + with get_new_node() as master: old_version = not pg_version_ge('9.6') @@ -465,12 +471,12 @@ def test_synchronous_replication(self): standby2.start() # check formatting - self.assertEqual( - '1 ("{}", "{}")'.format(standby1.name, standby2.name), - str(First(1, (standby1, standby2)))) # yapf: disable - self.assertEqual( - 'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name), - str(Any(1, (standby1, standby2)))) # yapf: disable + assert ( + '1 ("{}", "{}")'.format(standby1.name, standby2.name) == str(First(1, (standby1, standby2))) + ) # yapf: disable + assert ( + 'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name) == str(Any(1, (standby1, standby2))) + ) # yapf: disable # set synchronous_standby_names master.set_synchronous_standbys(First(2, [standby1, standby2])) @@ -488,10 +494,11 @@ def test_synchronous_replication(self): master.safe_psql( 'insert into abc select generate_series(1, 1000000)') res = standby1.safe_psql('select count(*) from abc') - self.assertEqual(rm_carriage_returns(res), b'1000000\n') + assert (rm_carriage_returns(res) == b'1000000\n') - @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_replication(self): + __class__.helper__skip_test_if_pg_version_is_not_ge("10") + with get_new_node() as node1, get_new_node() as node2: node1.init(allow_logical=True) node1.start() @@ -510,7 +517,7 @@ def test_logical_replication(self): # wait until changes apply on subscriber and check them sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(1, 1), (2, 2)]) + assert (res == [(1, 1), (2, 2)]) # disable and put some new data sub.disable() @@ -520,7 +527,7 @@ def test_logical_replication(self): sub.enable() sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(1, 1), (2, 2), (3, 3)]) + assert (res == [(1, 1), (2, 2), (3, 3)]) # Add new tables. Since we added "all tables" to publication # (default behaviour of publish() method) we don't need @@ -534,7 +541,7 @@ def test_logical_replication(self): node1.safe_psql('insert into test2 values (\'a\'), (\'b\')') sub.catchup() res = node2.execute('select * from test2') - self.assertListEqual(res, [('a', ), ('b', )]) + assert (res == [('a', ), ('b', )]) # drop subscription sub.drop() @@ -548,20 +555,21 @@ def test_logical_replication(self): node1.safe_psql('insert into test values (4, 4)') sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)]) + assert (res == [(1, 1), (2, 2), (3, 3), (4, 4)]) # explicitly add table - with self.assertRaises(ValueError): + with pytest.raises(expected_exception=ValueError): pub.add_tables([]) # fail pub.add_tables(['test2']) node1.safe_psql('insert into test2 values (\'c\')') sub.catchup() res = node2.execute('select * from test2') - self.assertListEqual(res, [('a', ), ('b', )]) + assert (res == [('a', ), ('b', )]) - @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_catchup(self): """ Runs catchup for 100 times to be sure that it is consistent """ + __class__.helper__skip_test_if_pg_version_is_not_ge("10") + with get_new_node() as node1, get_new_node() as node2: node1.init(allow_logical=True) node1.start() @@ -579,16 +587,14 @@ def test_logical_catchup(self): node1.execute('insert into test values ({0}, {0})'.format(i)) sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [( - i, - i, - )]) + assert (res == [(i, i, )]) node1.execute('delete from test') - @unittest.skipIf(pg_version_ge('10'), 'requires <10') def test_logical_replication_fail(self): + __class__.helper__skip_test_if_pg_version_is_ge("10") + with get_new_node() as node: - with self.assertRaises(InitNodeException): + with pytest.raises(expected_exception=InitNodeException): node.init(allow_logical=True) def test_replication_slots(self): @@ -599,7 +605,7 @@ def test_replication_slots(self): replica.execute('select 1') # cannot create new slot with the same name - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): node.replicate(slot='slot1') def test_incorrect_catchup(self): @@ -607,7 +613,7 @@ def test_incorrect_catchup(self): node.init(allow_streaming=True).start() # node has no master, can't catch up - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): node.catchup() def test_promotion(self): @@ -622,7 +628,7 @@ def test_promotion(self): # make standby becomes writable master replica.safe_psql('insert into abc values (1)') res = replica.safe_psql('select * from abc') - self.assertEqual(rm_carriage_returns(res), b'1\n') + assert (rm_carriage_returns(res) == b'1\n') def test_dump(self): query_create = 'create table test as select generate_series(1, 2) as val' @@ -635,20 +641,20 @@ def test_dump(self): with removing(node1.dump(format=format)) as dump: with get_new_node().init().start() as node3: if format == 'directory': - self.assertTrue(os.path.isdir(dump)) + assert (os.path.isdir(dump)) else: - self.assertTrue(os.path.isfile(dump)) + assert (os.path.isfile(dump)) # restore dump node3.restore(filename=dump) res = node3.execute(query_select) - self.assertListEqual(res, [(1, ), (2, )]) + assert (res == [(1, ), (2, )]) def test_users(self): with get_new_node().init().start() as node: node.psql('create role test_user login') value = node.safe_psql('select 1', username='test_user') value = rm_carriage_returns(value) - self.assertEqual(value, b'1\n') + assert (value == b'1\n') def test_poll_query_until(self): with get_new_node() as node: @@ -661,15 +667,15 @@ def test_poll_query_until(self): node.poll_query_until(query=check_time.format(start_time)) end_time = node.execute(get_time)[0][0] - self.assertTrue(end_time - start_time >= 5) + assert (end_time - start_time >= 5) # check 0 columns - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.poll_query_until( query='select from pg_catalog.pg_class limit 1') # check None, fail - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.poll_query_until(query='create table abc (val int)') # check None, ok @@ -682,7 +688,7 @@ def test_poll_query_until(self): expected=None) # check arbitrary expected value, fail - with self.assertRaises(TimeoutException): + with pytest.raises(expected_exception=TimeoutException): node.poll_query_until(query='select 3', expected=1, max_attempts=3, @@ -692,17 +698,17 @@ def test_poll_query_until(self): node.poll_query_until(query='select 2', expected=2) # check timeout - with self.assertRaises(TimeoutException): + with pytest.raises(expected_exception=TimeoutException): node.poll_query_until(query='select 1 > 2', max_attempts=3, sleep_time=0.01) # check ProgrammingError, fail - with self.assertRaises(testgres.ProgrammingError): + with pytest.raises(expected_exception=testgres.ProgrammingError): node.poll_query_until(query='dummy1') # check ProgrammingError, ok - with self.assertRaises(TimeoutException): + with pytest.raises(expected_exception=(TimeoutException)): node.poll_query_until(query='dummy2', max_attempts=3, sleep_time=0.01, @@ -754,16 +760,17 @@ def test_logging(self): # check that master's port is found with open(logfile.name, 'r') as log: lines = log.readlines() - self.assertTrue(any(node_name in s for s in lines)) + assert (any(node_name in s for s in lines)) # test logger after stop/start/restart master.stop() master.start() master.restart() - self.assertTrue(master._logger.is_alive()) + assert (master._logger.is_alive()) - @unittest.skipUnless(util_exists('pgbench.exe' if os.name == 'nt' else 'pgbench'), 'pgbench might be missing') def test_pgbench(self): + __class__.helper__skip_test_if_util_not_exist("pgbench") + with get_new_node().init().start() as node: # initialize pgbench DB and run benchmarks @@ -780,13 +787,13 @@ def test_pgbench(self): proc.stdout.close() - self.assertTrue('tps' in out) + assert ('tps' in out) def test_pg_config(self): # check same instances a = get_pg_config() b = get_pg_config() - self.assertEqual(id(a), id(b)) + assert (id(a) == id(b)) # save right before config change c1 = get_pg_config() @@ -794,26 +801,26 @@ def test_pg_config(self): # modify setting for this scope with scoped_config(cache_pg_config=False) as config: # sanity check for value - self.assertFalse(config.cache_pg_config) + assert not (config.cache_pg_config) # save right after config change c2 = get_pg_config() # check different instances after config change - self.assertNotEqual(id(c1), id(c2)) + assert (id(c1) != id(c2)) # check different instances a = get_pg_config() b = get_pg_config() - self.assertNotEqual(id(a), id(b)) + assert (id(a) != id(b)) def test_config_stack(self): # no such option - with self.assertRaises(TypeError): + with pytest.raises(expected_exception=TypeError): configure_testgres(dummy=True) # we have only 1 config in stack - with self.assertRaises(IndexError): + with pytest.raises(expected_exception=IndexError): pop_config() d0 = TestgresConfig.cached_initdb_dir @@ -821,22 +828,22 @@ def test_config_stack(self): d2 = 'dummy_def' with scoped_config(cached_initdb_dir=d1) as c1: - self.assertEqual(c1.cached_initdb_dir, d1) + assert (c1.cached_initdb_dir == d1) with scoped_config(cached_initdb_dir=d2) as c2: stack_size = len(testgres.config.config_stack) # try to break a stack - with self.assertRaises(TypeError): + with pytest.raises(expected_exception=TypeError): with scoped_config(dummy=True): pass - self.assertEqual(c2.cached_initdb_dir, d2) - self.assertEqual(len(testgres.config.config_stack), stack_size) + assert (c2.cached_initdb_dir == d2) + assert (len(testgres.config.config_stack) == stack_size) - self.assertEqual(c1.cached_initdb_dir, d1) + assert (c1.cached_initdb_dir == d1) - self.assertEqual(TestgresConfig.cached_initdb_dir, d0) + assert (TestgresConfig.cached_initdb_dir == d0) def test_unix_sockets(self): with get_new_node() as node: @@ -854,17 +861,15 @@ def test_auto_name(self): with get_new_node().init(allow_streaming=True).start() as m: with m.replicate().start() as r: # check that nodes are running - self.assertTrue(m.status()) - self.assertTrue(r.status()) + assert (m.status()) + assert (r.status()) # check their names - self.assertNotEqual(m.name, r.name) - self.assertTrue('testgres' in m.name) - self.assertTrue('testgres' in r.name) + assert (m.name != r.name) + assert ('testgres' in m.name) + assert ('testgres' in r.name) def test_file_tail(self): - from testgres.utils import file_tail - s1 = "the quick brown fox jumped over that lazy dog\n" s2 = "abc\n" s3 = "def\n" @@ -879,13 +884,13 @@ def test_file_tail(self): f.seek(0) lines = file_tail(f, 3) - self.assertEqual(lines[0], s1) - self.assertEqual(lines[1], s2) - self.assertEqual(lines[2], s3) + assert (lines[0] == s1) + assert (lines[1] == s2) + assert (lines[2] == s3) f.seek(0) lines = file_tail(f, 1) - self.assertEqual(lines[0], s3) + assert (lines[0] == s3) def test_isolation_levels(self): with get_new_node().init().start() as node: @@ -903,24 +908,42 @@ def test_isolation_levels(self): con.begin(IsolationLevel.Serializable).commit() # check wrong level - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): con.begin('Garbage').commit() def test_ports_management(self): - # check that no ports have been bound yet - self.assertEqual(len(bound_ports), 0) + assert bound_ports is not None + assert type(bound_ports) == set # noqa: E721 + + if len(bound_ports) != 0: + logging.warning("bound_ports is not empty: {0}".format(bound_ports)) + + stage0__bound_ports = bound_ports.copy() with get_new_node() as node: - # check that we've just bound a port - self.assertEqual(len(bound_ports), 1) + assert bound_ports is not None + assert type(bound_ports) == set # noqa: E721 + + assert node.port is not None + assert type(node.port) == int # noqa: E721 + + logging.info("node port is {0}".format(node.port)) - # check that bound_ports contains our port - port_1 = list(bound_ports)[0] - port_2 = node.port - self.assertEqual(port_1, port_2) + assert node.port in bound_ports + assert node.port not in stage0__bound_ports + + assert stage0__bound_ports <= bound_ports + assert len(stage0__bound_ports) + 1 == len(bound_ports) + + stage1__bound_ports = stage0__bound_ports.copy() + stage1__bound_ports.add(node.port) + + assert stage1__bound_ports == bound_ports # check that port has been freed successfully - self.assertEqual(len(bound_ports), 0) + assert bound_ports is not None + assert type(bound_ports) == set # noqa: E721 + assert bound_ports == stage0__bound_ports def test_exceptions(self): str(StartNodeException('msg', [('file', 'lines')])) @@ -939,22 +962,22 @@ def test_version_management(self): g = PgVer('15.3.1bihabeta1') k = PgVer('15.3.1') - self.assertTrue(a == b) - self.assertTrue(b > c) - self.assertTrue(a > c) - self.assertTrue(d > e) - self.assertTrue(e > f) - self.assertTrue(d > f) - self.assertTrue(h > f) - self.assertTrue(h == i) - self.assertTrue(g == k) - self.assertTrue(g > h) + assert (a == b) + assert (b > c) + assert (a > c) + assert (d > e) + assert (e > f) + assert (d > f) + assert (h > f) + assert (h == i) + assert (g == k) + assert (g > h) version = get_pg_version() with get_new_node() as node: - self.assertTrue(isinstance(version, six.string_types)) - self.assertTrue(isinstance(node.version, PgVer)) - self.assertEqual(node.version, PgVer(version)) + assert (isinstance(version, six.string_types)) + assert (isinstance(node.version, PgVer)) + assert (node.version == PgVer(version)) def test_child_pids(self): master_processes = [ @@ -977,53 +1000,128 @@ def test_child_pids(self): ProcessType.WalReceiver, ] + def LOCAL__test_auxiliary_pids( + node: testgres.PostgresNode, + expectedTypes: list[ProcessType] + ) -> list[ProcessType]: + # returns list of the absence processes + assert node is not None + assert type(node) == testgres.PostgresNode # noqa: E721 + assert expectedTypes is not None + assert type(expectedTypes) == list # noqa: E721 + + pids = node.auxiliary_pids + assert pids is not None # noqa: E721 + assert type(pids) == dict # noqa: E721 + + result = list[ProcessType]() + for ptype in expectedTypes: + if not (ptype in pids): + result.append(ptype) + return result + + def LOCAL__check_auxiliary_pids__multiple_attempts( + node: testgres.PostgresNode, + expectedTypes: list[ProcessType]): + assert node is not None + assert type(node) == testgres.PostgresNode # noqa: E721 + assert expectedTypes is not None + assert type(expectedTypes) == list # noqa: E721 + + nAttempt = 0 + + while nAttempt < 5: + nAttempt += 1 + + logging.info("Test pids of [{0}] node. Attempt #{1}.".format( + node.name, + nAttempt + )) + + if nAttempt > 1: + time.sleep(1) + + absenceList = LOCAL__test_auxiliary_pids(node, expectedTypes) + assert absenceList is not None + assert type(absenceList) == list # noqa: E721 + if len(absenceList) == 0: + logging.info("Bingo!") + return + + logging.info("These processes are not found: {0}.".format(absenceList)) + continue + + raise Exception("Node {0} does not have the following processes: {1}.".format( + node.name, + absenceList + )) + with get_new_node().init().start() as master: # master node doesn't have a source walsender! - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): master.source_walsender with master.connect() as con: - self.assertGreater(con.pid, 0) + assert (con.pid > 0) with master.replicate().start() as replica: # test __str__ method str(master.child_processes[0]) - master_pids = master.auxiliary_pids - for ptype in master_processes: - self.assertIn(ptype, master_pids) + LOCAL__check_auxiliary_pids__multiple_attempts( + master, + master_processes) + + LOCAL__check_auxiliary_pids__multiple_attempts( + replica, + repl_processes) - replica_pids = replica.auxiliary_pids - for ptype in repl_processes: - self.assertIn(ptype, replica_pids) + master_pids = master.auxiliary_pids # there should be exactly 1 source walsender for replica - self.assertEqual(len(master_pids[ProcessType.WalSender]), 1) + assert (len(master_pids[ProcessType.WalSender]) == 1) pid1 = master_pids[ProcessType.WalSender][0] pid2 = replica.source_walsender.pid - self.assertEqual(pid1, pid2) + assert (pid1 == pid2) replica.stop() # there should be no walsender after we've stopped replica - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): replica.source_walsender def test_child_process_dies(self): # test for FileNotFound exception during child_processes() function cmd = ["timeout", "60"] if os.name == 'nt' else ["sleep", "60"] - with subprocess.Popen(cmd, shell=True) as process: # shell=True might be needed on Windows - self.assertEqual(process.poll(), None) - # collect list of processes currently running - children = psutil.Process(os.getpid()).children() - # kill a process, so received children dictionary becomes invalid - process.kill() - process.wait() - # try to handle children list -- missing processes will have ptype "ProcessType.Unknown" - [ProcessProxy(p) for p in children] + nAttempt = 0 + + while True: + if nAttempt == 5: + raise Exception("Max attempt number is exceed.") + + nAttempt += 1 + + logging.info("Attempt #{0}".format(nAttempt)) + + with subprocess.Popen(cmd, shell=True) as process: # shell=True might be needed on Windows + r = process.poll() + + if r is not None: + logging.warning("process.pool() returns an unexpected result: {0}.".format(r)) + continue + + assert r is None + # collect list of processes currently running + children = psutil.Process(os.getpid()).children() + # kill a process, so received children dictionary becomes invalid + process.kill() + process.wait() + # try to handle children list -- missing processes will have ptype "ProcessType.Unknown" + [ProcessProxy(p) for p in children] + break def test_upgrade_node(self): old_bin_dir = os.path.dirname(get_bin_path("pg_config")) @@ -1036,7 +1134,7 @@ def test_upgrade_node(self): node_new.init(cached=False) res = node_new.upgrade_from(old_node=node_old) node_new.start() - self.assertTrue(b'Upgrade Complete' in res) + assert (b'Upgrade Complete' in res) def test_parse_pg_version(self): # Linux Mint @@ -1051,25 +1149,26 @@ def test_parse_pg_version(self): def test_the_same_port(self): with get_new_node() as node: node.init().start() - self.assertTrue(node._should_free_port) - self.assertEqual(type(node.port), int) + assert (node._should_free_port) + assert (type(node.port) == int) # noqa: E721 node_port_copy = node.port - self.assertEqual(rm_carriage_returns(node.safe_psql("SELECT 1;")), b'1\n') + assert (rm_carriage_returns(node.safe_psql("SELECT 1;")) == b'1\n') with get_new_node(port=node.port) as node2: - self.assertEqual(type(node2.port), int) - self.assertEqual(node2.port, node.port) - self.assertFalse(node2._should_free_port) - - with self.assertRaises(StartNodeException) as ctx: + assert (type(node2.port) == int) # noqa: E721 + assert (node2.port == node.port) + assert not (node2._should_free_port) + + with pytest.raises( + expected_exception=StartNodeException, + match=re.escape("Cannot start node") + ): node2.init().start() - self.assertIn("Cannot start node", str(ctx.exception)) - # node is still working - self.assertEqual(node.port, node_port_copy) - self.assertTrue(node._should_free_port) - self.assertEqual(rm_carriage_returns(node.safe_psql("SELECT 3;")), b'3\n') + assert (node.port == node_port_copy) + assert (node._should_free_port) + assert (rm_carriage_returns(node.safe_psql("SELECT 3;")) == b'3\n') class tagPortManagerProxy: sm_prev_testgres_reserve_port = None @@ -1174,31 +1273,31 @@ def test_port_rereserve_during_node_start(self): with get_new_node() as node1: node1.init().start() - self.assertTrue(node1._should_free_port) - self.assertEqual(type(node1.port), int) # noqa: E721 + assert (node1._should_free_port) + assert (type(node1.port) == int) # noqa: E721 node1_port_copy = node1.port - self.assertEqual(rm_carriage_returns(node1.safe_psql("SELECT 1;")), b'1\n') + assert (rm_carriage_returns(node1.safe_psql("SELECT 1;")) == b'1\n') with __class__.tagPortManagerProxy(node1.port, C_COUNT_OF_BAD_PORT_USAGE): assert __class__.tagPortManagerProxy.sm_DummyPortNumber == node1.port with get_new_node() as node2: - self.assertTrue(node2._should_free_port) - self.assertEqual(node2.port, node1.port) + assert (node2._should_free_port) + assert (node2.port == node1.port) node2.init().start() - self.assertNotEqual(node2.port, node1.port) - self.assertTrue(node2._should_free_port) - self.assertEqual(__class__.tagPortManagerProxy.sm_DummyPortCurrentUsage, 0) - self.assertEqual(__class__.tagPortManagerProxy.sm_DummyPortTotalUsage, C_COUNT_OF_BAD_PORT_USAGE) - self.assertTrue(node2.is_started) + assert (node2.port != node1.port) + assert (node2._should_free_port) + assert (__class__.tagPortManagerProxy.sm_DummyPortCurrentUsage == 0) + assert (__class__.tagPortManagerProxy.sm_DummyPortTotalUsage == C_COUNT_OF_BAD_PORT_USAGE) + assert (node2.is_started) - self.assertEqual(rm_carriage_returns(node2.safe_psql("SELECT 2;")), b'2\n') + assert (rm_carriage_returns(node2.safe_psql("SELECT 2;")) == b'2\n') # node1 is still working - self.assertEqual(node1.port, node1_port_copy) - self.assertTrue(node1._should_free_port) - self.assertEqual(rm_carriage_returns(node1.safe_psql("SELECT 3;")), b'3\n') + assert (node1.port == node1_port_copy) + assert (node1._should_free_port) + assert (rm_carriage_returns(node1.safe_psql("SELECT 3;")) == b'3\n') def test_port_conflict(self): assert testgres.PostgresNode._C_MAX_START_ATEMPTS > 1 @@ -1207,35 +1306,36 @@ def test_port_conflict(self): with get_new_node() as node1: node1.init().start() - self.assertTrue(node1._should_free_port) - self.assertEqual(type(node1.port), int) # noqa: E721 + assert (node1._should_free_port) + assert (type(node1.port) == int) # noqa: E721 node1_port_copy = node1.port - self.assertEqual(rm_carriage_returns(node1.safe_psql("SELECT 1;")), b'1\n') + assert (rm_carriage_returns(node1.safe_psql("SELECT 1;")) == b'1\n') with __class__.tagPortManagerProxy(node1.port, C_COUNT_OF_BAD_PORT_USAGE): assert __class__.tagPortManagerProxy.sm_DummyPortNumber == node1.port with get_new_node() as node2: - self.assertTrue(node2._should_free_port) - self.assertEqual(node2.port, node1.port) + assert (node2._should_free_port) + assert (node2.port == node1.port) - with self.assertRaises(StartNodeException) as ctx: + with pytest.raises( + expected_exception=StartNodeException, + match=re.escape("Cannot start node after multiple attempts") + ): node2.init().start() - self.assertIn("Cannot start node after multiple attempts", str(ctx.exception)) - - self.assertEqual(node2.port, node1.port) - self.assertTrue(node2._should_free_port) - self.assertEqual(__class__.tagPortManagerProxy.sm_DummyPortCurrentUsage, 1) - self.assertEqual(__class__.tagPortManagerProxy.sm_DummyPortTotalUsage, C_COUNT_OF_BAD_PORT_USAGE) - self.assertFalse(node2.is_started) + assert (node2.port == node1.port) + assert (node2._should_free_port) + assert (__class__.tagPortManagerProxy.sm_DummyPortCurrentUsage == 1) + assert (__class__.tagPortManagerProxy.sm_DummyPortTotalUsage == C_COUNT_OF_BAD_PORT_USAGE) + assert not (node2.is_started) # node2 must release our dummyPort (node1.port) - self.assertEqual(__class__.tagPortManagerProxy.sm_DummyPortCurrentUsage, 0) + assert (__class__.tagPortManagerProxy.sm_DummyPortCurrentUsage == 0) # node1 is still working - self.assertEqual(node1.port, node1_port_copy) - self.assertTrue(node1._should_free_port) - self.assertEqual(rm_carriage_returns(node1.safe_psql("SELECT 3;")), b'3\n') + assert (node1.port == node1_port_copy) + assert (node1._should_free_port) + assert (rm_carriage_returns(node1.safe_psql("SELECT 3;")) == b'3\n') def test_simple_with_bin_dir(self): with get_new_node() as node: @@ -1300,29 +1400,28 @@ def test_set_auto_conf(self): content = f.read() for x in testData: - self.assertIn( - x[0] + " = " + x[2], - content, - x[0] + " stored wrong" - ) - - -if __name__ == '__main__': - if os.environ.get('ALT_CONFIG'): - suite = unittest.TestSuite() - - # Small subset of tests for alternative configs (PG_BIN or PG_CONFIG) - suite.addTest(TestgresTests('test_pg_config')) - suite.addTest(TestgresTests('test_pg_ctl')) - suite.addTest(TestgresTests('test_psql')) - suite.addTest(TestgresTests('test_replicate')) - - print('Running tests for alternative config:') - for t in suite: - print(t) - print() - - runner = unittest.TextTestRunner() - runner.run(suite) - else: - unittest.main() + assert x[0] + " = " + x[2] in content + + @staticmethod + def helper__skip_test_if_util_not_exist(name: str): + assert type(name) == str # noqa: E721 + + if platform.system().lower() == "windows": + name2 = name + ".exe" + else: + name2 = name + + if not util_exists(name2): + pytest.skip('might be missing') + + @staticmethod + def helper__skip_test_if_pg_version_is_not_ge(version: str): + assert type(version) == str # noqa: E721 + if not pg_version_ge(version): + pytest.skip('requires {0}+'.format(version)) + + @staticmethod + def helper__skip_test_if_pg_version_is_ge(version: str): + assert type(version) == str # noqa: E721 + if pg_version_ge(version): + pytest.skip('requires <{0}'.format(version)) diff --git a/tests/test_simple_remote.py b/tests/test_simple_remote.py index 8b44623a..e7cc5e5c 100755 --- a/tests/test_simple_remote.py +++ b/tests/test_simple_remote.py @@ -1,22 +1,21 @@ -#!/usr/bin/env python # coding: utf-8 - import os import re import subprocess import tempfile -import testgres import time import six -import unittest +import pytest import psutil import logging.config from contextlib import contextmanager -from testgres.exceptions import \ +from .. import testgres + +from ..testgres.exceptions import \ InitNodeException, \ StartNodeException, \ ExecUtilException, \ @@ -26,38 +25,33 @@ TestgresException, \ InvalidOperationException -from testgres.config import \ +from ..testgres.config import \ TestgresConfig, \ configure_testgres, \ scoped_config, \ pop_config, testgres_config -from testgres import \ +from ..testgres import \ NodeStatus, \ ProcessType, \ IsolationLevel, \ get_remote_node, \ RemoteOperations -from testgres import \ +from ..testgres import \ get_bin_path, \ get_pg_config, \ get_pg_version -from testgres import \ +from ..testgres import \ First, \ Any # NOTE: those are ugly imports -from testgres import bound_ports -from testgres.utils import PgVer -from testgres.node import ProcessProxy, ConnectionParams - -conn_params = ConnectionParams(host=os.getenv('RDBMS_TESTPOOL1_HOST') or '127.0.0.1', - username=os.getenv('USER'), - ssh_key=os.getenv('RDBMS_TESTPOOL_SSHKEY')) -os_ops = RemoteOperations(conn_params) -testgres_config.set_os_ops(os_ops=os_ops) +from ..testgres import bound_ports +from ..testgres.utils import PgVer +from ..testgres.utils import file_tail +from ..testgres.node import ProcessProxy, ConnectionParams def pg_version_ge(version): @@ -68,16 +62,16 @@ def pg_version_ge(version): def util_exists(util): def good_properties(f): - return (os_ops.path_exists(f) and # noqa: W504 - os_ops.isfile(f) and # noqa: W504 - os_ops.is_executable(f)) # yapf: disable + return (testgres_config.os_ops.path_exists(f) and # noqa: W504 + testgres_config.os_ops.isfile(f) and # noqa: W504 + testgres_config.os_ops.is_executable(f)) # yapf: disable # try to resolve it if good_properties(get_bin_path(util)): return True # check if util is in PATH - for path in os_ops.environ("PATH").split(os_ops.pathsep): + for path in testgres_config.os_ops.environ("PATH").split(testgres_config.os_ops.pathsep): if good_properties(os.path.join(path, util)): return True @@ -87,37 +81,56 @@ def removing(f): try: yield f finally: - if os_ops.isfile(f): - os_ops.remove_file(f) + if testgres_config.os_ops.isfile(f): + testgres_config.os_ops.remove_file(f) + + elif testgres_config.os_ops.isdir(f): + testgres_config.os_ops.rmdirs(f, ignore_errors=True) - elif os_ops.isdir(f): - os_ops.rmdirs(f, ignore_errors=True) +class TestgresRemoteTests: + sm_conn_params = ConnectionParams( + host=os.getenv('RDBMS_TESTPOOL1_HOST') or '127.0.0.1', + username=os.getenv('USER'), + ssh_key=os.getenv('RDBMS_TESTPOOL_SSHKEY')) + + sm_os_ops = RemoteOperations(sm_conn_params) + + @pytest.fixture(autouse=True, scope="class") + def implicit_fixture(self): + prev_ops = testgres_config.os_ops + assert prev_ops is not None + assert __class__.sm_os_ops is not None + testgres_config.set_os_ops(os_ops=__class__.sm_os_ops) + assert testgres_config.os_ops is __class__.sm_os_ops + yield + assert testgres_config.os_ops is __class__.sm_os_ops + testgres_config.set_os_ops(os_ops=prev_ops) + assert testgres_config.os_ops is prev_ops -class TestgresRemoteTests(unittest.TestCase): def test_node_repr(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: pattern = r"PostgresNode\(name='.+', port=.+, base_dir='.+'\)" - self.assertIsNotNone(re.match(pattern, str(node))) + assert re.match(pattern, str(node)) is not None def test_custom_init(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: # enable page checksums node.init(initdb_params=['-k']).start() - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init( allow_streaming=True, initdb_params=['--auth-local=reject', '--auth-host=reject']) hba_file = os.path.join(node.data_dir, 'pg_hba.conf') - lines = os_ops.readlines(hba_file) + lines = node.os_ops.readlines(hba_file) # check number of lines - self.assertGreaterEqual(len(lines), 6) + assert (len(lines) >= 6) # there should be no trust entries at all - self.assertFalse(any('trust' in s for s in lines)) + assert not (any('trust' in s for s in lines)) def test_init__LANG_С(self): # PBCKP-1744 @@ -126,7 +139,7 @@ def test_init__LANG_С(self): try: os.environ["LANG"] = "C" - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start() finally: __class__.helper__restore_envvar("LANG", prev_LANG) @@ -167,9 +180,9 @@ def test_init__unk_LANG_and_LC_CTYPE(self): while True: try: - with get_remote_node(conn_params=conn_params): + with __class__.helper__get_node(): pass - except testgres.exceptions.ExecUtilException as e: + except ExecUtilException as e: # # Example of an error message: # @@ -193,88 +206,89 @@ def test_init__unk_LANG_and_LC_CTYPE(self): __class__.helper__restore_envvar("LC_COLLATE", prev_LC_COLLATE) def test_double_init(self): - with get_remote_node(conn_params=conn_params).init() as node: + with __class__.helper__get_node().init() as node: # can't initialize node more than once - with self.assertRaises(InitNodeException): + with pytest.raises(expected_exception=InitNodeException): node.init() def test_init_after_cleanup(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start().execute('select 1') node.cleanup() node.init().start().execute('select 1') - @unittest.skipUnless(util_exists('pg_resetwal'), 'might be missing') - @unittest.skipUnless(pg_version_ge('9.6'), 'requires 9.6+') def test_init_unique_system_id(self): # this function exists in PostgreSQL 9.6+ + __class__.helper__skip_test_if_util_not_exist("pg_resetwal") + __class__.helper__skip_test_if_pg_version_is_not_ge('9.6') + query = 'select system_identifier from pg_control_system()' with scoped_config(cache_initdb=False): - with get_remote_node(conn_params=conn_params).init().start() as node0: + with __class__.helper__get_node().init().start() as node0: id0 = node0.execute(query)[0] with scoped_config(cache_initdb=True, cached_initdb_unique=True) as config: - self.assertTrue(config.cache_initdb) - self.assertTrue(config.cached_initdb_unique) + assert (config.cache_initdb) + assert (config.cached_initdb_unique) # spawn two nodes; ids must be different - with get_remote_node(conn_params=conn_params).init().start() as node1, \ - get_remote_node(conn_params=conn_params).init().start() as node2: + with __class__.helper__get_node().init().start() as node1, \ + __class__.helper__get_node().init().start() as node2: id1 = node1.execute(query)[0] id2 = node2.execute(query)[0] # ids must increase - self.assertGreater(id1, id0) - self.assertGreater(id2, id1) + assert (id1 > id0) + assert (id2 > id1) def test_node_exit(self): - with self.assertRaises(QueryException): - with get_remote_node(conn_params=conn_params).init() as node: + with pytest.raises(expected_exception=QueryException): + with __class__.helper__get_node().init() as node: base_dir = node.base_dir node.safe_psql('select 1') # we should save the DB for "debugging" - self.assertTrue(os_ops.path_exists(base_dir)) - os_ops.rmdirs(base_dir, ignore_errors=True) + assert (__class__.sm_os_ops.path_exists(base_dir)) + __class__.sm_os_ops.rmdirs(base_dir, ignore_errors=True) - with get_remote_node(conn_params=conn_params).init() as node: + with __class__.helper__get_node().init() as node: base_dir = node.base_dir # should have been removed by default - self.assertFalse(os_ops.path_exists(base_dir)) + assert not (__class__.sm_os_ops.path_exists(base_dir)) def test_double_start(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + with __class__.helper__get_node().init().start() as node: # can't start node more than once node.start() - self.assertTrue(node.is_started) + assert (node.is_started) def test_uninitialized_start(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: # node is not initialized yet - with self.assertRaises(StartNodeException): + with pytest.raises(expected_exception=StartNodeException): node.start() def test_restart(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start() # restart, ok res = node.execute('select 1') - self.assertEqual(res, [(1,)]) + assert (res == [(1,)]) node.restart() res = node.execute('select 2') - self.assertEqual(res, [(2,)]) + assert (res == [(2,)]) # restart, fail - with self.assertRaises(StartNodeException): + with pytest.raises(expected_exception=StartNodeException): node.append_conf('pg_hba.conf', 'DUMMY') node.restart() def test_reload(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start() # change client_min_messages and save old value @@ -286,108 +300,109 @@ def test_reload(self): # check new value cmm_new = node.execute('show client_min_messages') - self.assertEqual('debug1', cmm_new[0][0].lower()) - self.assertNotEqual(cmm_old, cmm_new) + assert ('debug1' == cmm_new[0][0].lower()) + assert (cmm_old != cmm_new) def test_pg_ctl(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start() status = node.pg_ctl(['status']) - self.assertTrue('PID' in status) + assert ('PID' in status) def test_status(self): - self.assertTrue(NodeStatus.Running) - self.assertFalse(NodeStatus.Stopped) - self.assertFalse(NodeStatus.Uninitialized) + assert (NodeStatus.Running) + assert not (NodeStatus.Stopped) + assert not (NodeStatus.Uninitialized) # check statuses after each operation - with get_remote_node(conn_params=conn_params) as node: - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Uninitialized) + with __class__.helper__get_node() as node: + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) node.init() - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Stopped) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Stopped) node.start() - self.assertNotEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Running) + assert (node.pid != 0) + assert (node.status() == NodeStatus.Running) node.stop() - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Stopped) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Stopped) node.cleanup() - self.assertEqual(node.pid, 0) - self.assertEqual(node.status(), NodeStatus.Uninitialized) + assert (node.pid == 0) + assert (node.status() == NodeStatus.Uninitialized) def test_psql(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + with __class__.helper__get_node().init().start() as node: # check returned values (1 arg) res = node.psql('select 1') - self.assertEqual(res, (0, b'1\n', b'')) + assert (res == (0, b'1\n', b'')) # check returned values (2 args) res = node.psql('postgres', 'select 2') - self.assertEqual(res, (0, b'2\n', b'')) + assert (res == (0, b'2\n', b'')) # check returned values (named) res = node.psql(query='select 3', dbname='postgres') - self.assertEqual(res, (0, b'3\n', b'')) + assert (res == (0, b'3\n', b'')) # check returned values (1 arg) res = node.safe_psql('select 4') - self.assertEqual(res, b'4\n') + assert (res == b'4\n') # check returned values (2 args) res = node.safe_psql('postgres', 'select 5') - self.assertEqual(res, b'5\n') + assert (res == b'5\n') # check returned values (named) res = node.safe_psql(query='select 6', dbname='postgres') - self.assertEqual(res, b'6\n') + assert (res == b'6\n') # check feeding input node.safe_psql('create table horns (w int)') node.safe_psql('copy horns from stdin (format csv)', input=b"1\n2\n3\n\\.\n") _sum = node.safe_psql('select sum(w) from horns') - self.assertEqual(_sum, b'6\n') + assert (_sum == b'6\n') # check psql's default args, fails - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.psql() node.stop() # check psql on stopped node, fails - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.safe_psql('select 1') def test_safe_psql__expect_error(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + with __class__.helper__get_node().init().start() as node: err = node.safe_psql('select_or_not_select 1', expect_error=True) - self.assertTrue(type(err) == str) # noqa: E721 - self.assertIn('select_or_not_select', err) - self.assertIn('ERROR: syntax error at or near "select_or_not_select"', err) + assert (type(err) == str) # noqa: E721 + assert ('select_or_not_select' in err) + assert ('ERROR: syntax error at or near "select_or_not_select"' in err) # --------- - with self.assertRaises(InvalidOperationException) as ctx: + with pytest.raises( + expected_exception=InvalidOperationException, + match="^" + re.escape("Exception was expected, but query finished successfully: `select 1;`.") + "$" + ): node.safe_psql("select 1;", expect_error=True) - self.assertEqual(str(ctx.exception), "Exception was expected, but query finished successfully: `select 1;`.") - # --------- res = node.safe_psql("select 1;", expect_error=False) - self.assertEqual(res, b'1\n') + assert (res == b'1\n') def test_transactions(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + with __class__.helper__get_node().init().start() as node: with node.connect() as con: con.begin() con.execute('create table test(val int)') @@ -397,12 +412,12 @@ def test_transactions(self): con.begin() con.execute('insert into test values (2)') res = con.execute('select * from test order by val asc') - self.assertListEqual(res, [(1,), (2,)]) + assert (res == [(1,), (2,)]) con.rollback() con.begin() res = con.execute('select * from test') - self.assertListEqual(res, [(1,)]) + assert (res == [(1,)]) con.rollback() con.begin() @@ -410,25 +425,25 @@ def test_transactions(self): con.commit() def test_control_data(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: # node is not initialized yet - with self.assertRaises(ExecUtilException): + with pytest.raises(expected_exception=ExecUtilException): node.get_control_data() node.init() data = node.get_control_data() # check returned dict - self.assertIsNotNone(data) - self.assertTrue(any('pg_control' in s for s in data.keys())) + assert data is not None + assert (any('pg_control' in s for s in data.keys())) def test_backup_simple(self): - with get_remote_node(conn_params=conn_params) as master: + with __class__.helper__get_node() as master: # enable streaming for backups master.init(allow_streaming=True) # node must be running - with self.assertRaises(BackupException): + with pytest.raises(expected_exception=BackupException): master.backup() # it's time to start node @@ -440,23 +455,23 @@ def test_backup_simple(self): with master.backup(xlog_method='stream') as backup: with backup.spawn_primary().start() as slave: res = slave.execute('select * from test order by i asc') - self.assertListEqual(res, [(1,), (2,), (3,), (4,)]) + assert (res == [(1,), (2,), (3,), (4,)]) def test_backup_multiple(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(allow_streaming=True).start() with node.backup(xlog_method='fetch') as backup1, \ node.backup(xlog_method='fetch') as backup2: - self.assertNotEqual(backup1.base_dir, backup2.base_dir) + assert (backup1.base_dir != backup2.base_dir) with node.backup(xlog_method='fetch') as backup: with backup.spawn_primary('node1', destroy=False) as node1, \ backup.spawn_primary('node2', destroy=False) as node2: - self.assertNotEqual(node1.base_dir, node2.base_dir) + assert (node1.base_dir != node2.base_dir) def test_backup_exhaust(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(allow_streaming=True).start() with node.backup(xlog_method='fetch') as backup: @@ -465,19 +480,21 @@ def test_backup_exhaust(self): pass # now let's try to create one more node - with self.assertRaises(BackupException): + with pytest.raises(expected_exception=BackupException): backup.spawn_primary() def test_backup_wrong_xlog_method(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(allow_streaming=True).start() - with self.assertRaises(BackupException, - msg='Invalid xlog_method "wrong"'): + with pytest.raises( + expected_exception=BackupException, + match="^" + re.escape('Invalid xlog_method "wrong"') + "$" + ): node.backup(xlog_method='wrong') def test_pg_ctl_wait_option(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start(wait=False) while True: try: @@ -489,23 +506,24 @@ def test_pg_ctl_wait_option(self): pass def test_replicate(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(allow_streaming=True).start() with node.replicate().start() as replica: res = replica.execute('select 1') - self.assertListEqual(res, [(1,)]) + assert (res == [(1,)]) node.execute('create table test (val int)', commit=True) replica.catchup() res = node.execute('select * from test') - self.assertListEqual(res, []) + assert (res == []) - @unittest.skipUnless(pg_version_ge('9.6'), 'requires 9.6+') def test_synchronous_replication(self): - with get_remote_node(conn_params=conn_params) as master: + __class__.helper__skip_test_if_pg_version_is_not_ge("9.6") + + with __class__.helper__get_node() as master: old_version = not pg_version_ge('9.6') master.init(allow_streaming=True).start() @@ -519,12 +537,12 @@ def test_synchronous_replication(self): standby2.start() # check formatting - self.assertEqual( - '1 ("{}", "{}")'.format(standby1.name, standby2.name), - str(First(1, (standby1, standby2)))) # yapf: disable - self.assertEqual( - 'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name), - str(Any(1, (standby1, standby2)))) # yapf: disable + assert ( + '1 ("{}", "{}")'.format(standby1.name, standby2.name) == str(First(1, (standby1, standby2))) + ) # yapf: disable + assert ( + 'ANY 1 ("{}", "{}")'.format(standby1.name, standby2.name) == str(Any(1, (standby1, standby2))) + ) # yapf: disable # set synchronous_standby_names master.set_synchronous_standbys(First(2, [standby1, standby2])) @@ -542,11 +560,12 @@ def test_synchronous_replication(self): master.safe_psql( 'insert into abc select generate_series(1, 1000000)') res = standby1.safe_psql('select count(*) from abc') - self.assertEqual(res, b'1000000\n') + assert (res == b'1000000\n') - @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_replication(self): - with get_remote_node(conn_params=conn_params) as node1, get_remote_node(conn_params=conn_params) as node2: + __class__.helper__skip_test_if_pg_version_is_not_ge("10") + + with __class__.helper__get_node() as node1, __class__.helper__get_node() as node2: node1.init(allow_logical=True) node1.start() node2.init().start() @@ -564,7 +583,7 @@ def test_logical_replication(self): # wait until changes apply on subscriber and check them sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(1, 1), (2, 2)]) + assert (res == [(1, 1), (2, 2)]) # disable and put some new data sub.disable() @@ -574,7 +593,7 @@ def test_logical_replication(self): sub.enable() sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(1, 1), (2, 2), (3, 3)]) + assert (res == [(1, 1), (2, 2), (3, 3)]) # Add new tables. Since we added "all tables" to publication # (default behaviour of publish() method) we don't need @@ -588,7 +607,7 @@ def test_logical_replication(self): node1.safe_psql('insert into test2 values (\'a\'), (\'b\')') sub.catchup() res = node2.execute('select * from test2') - self.assertListEqual(res, [('a',), ('b',)]) + assert (res == [('a',), ('b',)]) # drop subscription sub.drop() @@ -602,21 +621,22 @@ def test_logical_replication(self): node1.safe_psql('insert into test values (4, 4)') sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [(1, 1), (2, 2), (3, 3), (4, 4)]) + assert (res == [(1, 1), (2, 2), (3, 3), (4, 4)]) # explicitly add table - with self.assertRaises(ValueError): + with pytest.raises(expected_exception=ValueError): pub.add_tables([]) # fail pub.add_tables(['test2']) node1.safe_psql('insert into test2 values (\'c\')') sub.catchup() res = node2.execute('select * from test2') - self.assertListEqual(res, [('a',), ('b',)]) + assert (res == [('a',), ('b',)]) - @unittest.skipUnless(pg_version_ge('10'), 'requires 10+') def test_logical_catchup(self): """ Runs catchup for 100 times to be sure that it is consistent """ - with get_remote_node(conn_params=conn_params) as node1, get_remote_node(conn_params=conn_params) as node2: + __class__.helper__skip_test_if_pg_version_is_not_ge("10") + + with __class__.helper__get_node() as node1, __class__.helper__get_node() as node2: node1.init(allow_logical=True) node1.start() node2.init().start() @@ -633,39 +653,37 @@ def test_logical_catchup(self): node1.execute('insert into test values ({0}, {0})'.format(i)) sub.catchup() res = node2.execute('select * from test') - self.assertListEqual(res, [( - i, - i, - )]) + assert (res == [(i, i, )]) node1.execute('delete from test') - @unittest.skipIf(pg_version_ge('10'), 'requires <10') def test_logical_replication_fail(self): - with get_remote_node(conn_params=conn_params) as node: - with self.assertRaises(InitNodeException): + __class__.helper__skip_test_if_pg_version_is_ge("10") + + with __class__.helper__get_node() as node: + with pytest.raises(expected_exception=InitNodeException): node.init(allow_logical=True) def test_replication_slots(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(allow_streaming=True).start() with node.replicate(slot='slot1').start() as replica: replica.execute('select 1') # cannot create new slot with the same name - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): node.replicate(slot='slot1') def test_incorrect_catchup(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(allow_streaming=True).start() # node has no master, can't catch up - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): node.catchup() def test_promotion(self): - with get_remote_node(conn_params=conn_params) as master: + with __class__.helper__get_node() as master: master.init().start() master.safe_psql('create table abc(id serial)') @@ -676,35 +694,35 @@ def test_promotion(self): # make standby becomes writable master replica.safe_psql('insert into abc values (1)') res = replica.safe_psql('select * from abc') - self.assertEqual(res, b'1\n') + assert (res == b'1\n') def test_dump(self): query_create = 'create table test as select generate_series(1, 2) as val' query_select = 'select * from test order by val asc' - with get_remote_node(conn_params=conn_params).init().start() as node1: + with __class__.helper__get_node().init().start() as node1: node1.execute(query_create) for format in ['plain', 'custom', 'directory', 'tar']: with removing(node1.dump(format=format)) as dump: - with get_remote_node(conn_params=conn_params).init().start() as node3: + with __class__.helper__get_node().init().start() as node3: if format == 'directory': - self.assertTrue(os_ops.isdir(dump)) + assert (node1.os_ops.isdir(dump)) else: - self.assertTrue(os_ops.isfile(dump)) + assert (node1.os_ops.isfile(dump)) # restore dump node3.restore(filename=dump) res = node3.execute(query_select) - self.assertListEqual(res, [(1,), (2,)]) + assert (res == [(1,), (2,)]) def test_users(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + with __class__.helper__get_node().init().start() as node: node.psql('create role test_user login') value = node.safe_psql('select 1', username='test_user') - self.assertEqual(b'1\n', value) + assert (b'1\n' == value) def test_poll_query_until(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init().start() get_time = 'select extract(epoch from now())' @@ -714,15 +732,15 @@ def test_poll_query_until(self): node.poll_query_until(query=check_time.format(start_time)) end_time = node.execute(get_time)[0][0] - self.assertTrue(end_time - start_time >= 5) + assert (end_time - start_time >= 5) # check 0 columns - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.poll_query_until( query='select from pg_catalog.pg_class limit 1') # check None, fail - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): node.poll_query_until(query='create table abc (val int)') # check None, ok @@ -735,7 +753,7 @@ def test_poll_query_until(self): expected=None) # check arbitrary expected value, fail - with self.assertRaises(TimeoutException): + with pytest.raises(expected_exception=TimeoutException): node.poll_query_until(query='select 3', expected=1, max_attempts=3, @@ -745,17 +763,17 @@ def test_poll_query_until(self): node.poll_query_until(query='select 2', expected=2) # check timeout - with self.assertRaises(TimeoutException): + with pytest.raises(expected_exception=TimeoutException): node.poll_query_until(query='select 1 > 2', max_attempts=3, sleep_time=0.01) # check ProgrammingError, fail - with self.assertRaises(testgres.ProgrammingError): + with pytest.raises(expected_exception=testgres.ProgrammingError): node.poll_query_until(query='dummy1') # check ProgrammingError, ok - with self.assertRaises(TimeoutException): + with pytest.raises(expected_exception=TimeoutException): node.poll_query_until(query='dummy2', max_attempts=3, sleep_time=0.01, @@ -808,17 +826,18 @@ def test_logging(self): # check that master's port is found with open(logfile.name, 'r') as log: lines = log.readlines() - self.assertTrue(any(node_name in s for s in lines)) + assert (any(node_name in s for s in lines)) # test logger after stop/start/restart master.stop() master.start() master.restart() - self.assertTrue(master._logger.is_alive()) + assert (master._logger.is_alive()) - @unittest.skipUnless(util_exists('pgbench'), 'might be missing') def test_pgbench(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + __class__.helper__skip_test_if_util_not_exist("pgbench") + + with __class__.helper__get_node().init().start() as node: # initialize pgbench DB and run benchmarks node.pgbench_init(scale=2, foreign_keys=True, options=['-q']).pgbench_run(time=2) @@ -828,13 +847,13 @@ def test_pgbench(self): stderr=subprocess.STDOUT, options=['-T3']) out = proc.communicate()[0] - self.assertTrue(b'tps = ' in out) + assert (b'tps = ' in out) def test_pg_config(self): # check same instances a = get_pg_config() b = get_pg_config() - self.assertEqual(id(a), id(b)) + assert (id(a) == id(b)) # save right before config change c1 = get_pg_config() @@ -842,26 +861,26 @@ def test_pg_config(self): # modify setting for this scope with scoped_config(cache_pg_config=False) as config: # sanity check for value - self.assertFalse(config.cache_pg_config) + assert not (config.cache_pg_config) # save right after config change c2 = get_pg_config() # check different instances after config change - self.assertNotEqual(id(c1), id(c2)) + assert (id(c1) != id(c2)) # check different instances a = get_pg_config() b = get_pg_config() - self.assertNotEqual(id(a), id(b)) + assert (id(a) != id(b)) def test_config_stack(self): # no such option - with self.assertRaises(TypeError): + with pytest.raises(expected_exception=TypeError): configure_testgres(dummy=True) # we have only 1 config in stack - with self.assertRaises(IndexError): + with pytest.raises(expected_exception=IndexError): pop_config() d0 = TestgresConfig.cached_initdb_dir @@ -869,54 +888,52 @@ def test_config_stack(self): d2 = 'dummy_def' with scoped_config(cached_initdb_dir=d1) as c1: - self.assertEqual(c1.cached_initdb_dir, d1) + assert (c1.cached_initdb_dir == d1) with scoped_config(cached_initdb_dir=d2) as c2: stack_size = len(testgres.config.config_stack) # try to break a stack - with self.assertRaises(TypeError): + with pytest.raises(expected_exception=TypeError): with scoped_config(dummy=True): pass - self.assertEqual(c2.cached_initdb_dir, d2) - self.assertEqual(len(testgres.config.config_stack), stack_size) + assert (c2.cached_initdb_dir == d2) + assert (len(testgres.config.config_stack) == stack_size) - self.assertEqual(c1.cached_initdb_dir, d1) + assert (c1.cached_initdb_dir == d1) - self.assertEqual(TestgresConfig.cached_initdb_dir, d0) + assert (TestgresConfig.cached_initdb_dir == d0) def test_unix_sockets(self): - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: node.init(unix_sockets=False, allow_streaming=True) node.start() res_exec = node.execute('select 1') res_psql = node.safe_psql('select 1') - self.assertEqual(res_exec, [(1,)]) - self.assertEqual(res_psql, b'1\n') + assert (res_exec == [(1,)]) + assert (res_psql == b'1\n') with node.replicate().start() as r: res_exec = r.execute('select 1') res_psql = r.safe_psql('select 1') - self.assertEqual(res_exec, [(1,)]) - self.assertEqual(res_psql, b'1\n') + assert (res_exec == [(1,)]) + assert (res_psql == b'1\n') def test_auto_name(self): - with get_remote_node(conn_params=conn_params).init(allow_streaming=True).start() as m: + with __class__.helper__get_node().init(allow_streaming=True).start() as m: with m.replicate().start() as r: # check that nodes are running - self.assertTrue(m.status()) - self.assertTrue(r.status()) + assert (m.status()) + assert (r.status()) # check their names - self.assertNotEqual(m.name, r.name) - self.assertTrue('testgres' in m.name) - self.assertTrue('testgres' in r.name) + assert (m.name != r.name) + assert ('testgres' in m.name) + assert ('testgres' in r.name) def test_file_tail(self): - from testgres.utils import file_tail - s1 = "the quick brown fox jumped over that lazy dog\n" s2 = "abc\n" s3 = "def\n" @@ -931,16 +948,16 @@ def test_file_tail(self): f.seek(0) lines = file_tail(f, 3) - self.assertEqual(lines[0], s1) - self.assertEqual(lines[1], s2) - self.assertEqual(lines[2], s3) + assert (lines[0] == s1) + assert (lines[1] == s2) + assert (lines[2] == s3) f.seek(0) lines = file_tail(f, 1) - self.assertEqual(lines[0], s3) + assert (lines[0] == s3) def test_isolation_levels(self): - with get_remote_node(conn_params=conn_params).init().start() as node: + with __class__.helper__get_node().init().start() as node: with node.connect() as con: # string levels con.begin('Read Uncommitted').commit() @@ -955,24 +972,24 @@ def test_isolation_levels(self): con.begin(IsolationLevel.Serializable).commit() # check wrong level - with self.assertRaises(QueryException): + with pytest.raises(expected_exception=QueryException): con.begin('Garbage').commit() def test_ports_management(self): # check that no ports have been bound yet - self.assertEqual(len(bound_ports), 0) + assert (len(bound_ports) == 0) - with get_remote_node(conn_params=conn_params) as node: + with __class__.helper__get_node() as node: # check that we've just bound a port - self.assertEqual(len(bound_ports), 1) + assert (len(bound_ports) == 1) # check that bound_ports contains our port port_1 = list(bound_ports)[0] port_2 = node.port - self.assertEqual(port_1, port_2) + assert (port_1 == port_2) # check that port has been freed successfully - self.assertEqual(len(bound_ports), 0) + assert (len(bound_ports) == 0) def test_exceptions(self): str(StartNodeException('msg', [('file', 'lines')])) @@ -987,18 +1004,18 @@ def test_version_management(self): e = PgVer('15rc1') f = PgVer('15beta4') - self.assertTrue(a == b) - self.assertTrue(b > c) - self.assertTrue(a > c) - self.assertTrue(d > e) - self.assertTrue(e > f) - self.assertTrue(d > f) + assert (a == b) + assert (b > c) + assert (a > c) + assert (d > e) + assert (e > f) + assert (d > f) version = get_pg_version() - with get_remote_node(conn_params=conn_params) as node: - self.assertTrue(isinstance(version, six.string_types)) - self.assertTrue(isinstance(node.version, PgVer)) - self.assertEqual(node.version, PgVer(version)) + with __class__.helper__get_node() as node: + assert (isinstance(version, six.string_types)) + assert (isinstance(node.version, PgVer)) + assert (node.version == PgVer(version)) def test_child_pids(self): master_processes = [ @@ -1021,51 +1038,132 @@ def test_child_pids(self): ProcessType.WalReceiver, ] - with get_remote_node(conn_params=conn_params).init().start() as master: + def LOCAL__test_auxiliary_pids( + node: testgres.PostgresNode, + expectedTypes: list[ProcessType] + ) -> list[ProcessType]: + # returns list of the absence processes + assert node is not None + assert type(node) == testgres.PostgresNode # noqa: E721 + assert expectedTypes is not None + assert type(expectedTypes) == list # noqa: E721 + + pids = node.auxiliary_pids + assert pids is not None # noqa: E721 + assert type(pids) == dict # noqa: E721 + + result = list[ProcessType]() + for ptype in expectedTypes: + if not (ptype in pids): + result.append(ptype) + return result + + def LOCAL__check_auxiliary_pids__multiple_attempts( + node: testgres.PostgresNode, + expectedTypes: list[ProcessType]): + assert node is not None + assert type(node) == testgres.PostgresNode # noqa: E721 + assert expectedTypes is not None + assert type(expectedTypes) == list # noqa: E721 + + nAttempt = 0 + + while nAttempt < 5: + nAttempt += 1 + + logging.info("Test pids of [{0}] node. Attempt #{1}.".format( + node.name, + nAttempt + )) + + if nAttempt > 1: + time.sleep(1) + + absenceList = LOCAL__test_auxiliary_pids(node, expectedTypes) + assert absenceList is not None + assert type(absenceList) == list # noqa: E721 + if len(absenceList) == 0: + logging.info("Bingo!") + return + + logging.info("These processes are not found: {0}.".format(absenceList)) + continue + + raise Exception("Node {0} does not have the following processes: {1}.".format( + node.name, + absenceList + )) + + with __class__.helper__get_node().init().start() as master: # master node doesn't have a source walsender! - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): master.source_walsender with master.connect() as con: - self.assertGreater(con.pid, 0) + assert (con.pid > 0) with master.replicate().start() as replica: # test __str__ method str(master.child_processes[0]) - master_pids = master.auxiliary_pids - for ptype in master_processes: - self.assertIn(ptype, master_pids) + LOCAL__check_auxiliary_pids__multiple_attempts( + master, + master_processes) - replica_pids = replica.auxiliary_pids - for ptype in repl_processes: - self.assertIn(ptype, replica_pids) + LOCAL__check_auxiliary_pids__multiple_attempts( + replica, + repl_processes) + + master_pids = master.auxiliary_pids # there should be exactly 1 source walsender for replica - self.assertEqual(len(master_pids[ProcessType.WalSender]), 1) + assert (len(master_pids[ProcessType.WalSender]) == 1) pid1 = master_pids[ProcessType.WalSender][0] pid2 = replica.source_walsender.pid - self.assertEqual(pid1, pid2) + assert (pid1 == pid2) replica.stop() # there should be no walsender after we've stopped replica - with self.assertRaises(TestgresException): + with pytest.raises(expected_exception=TestgresException): replica.source_walsender + # TODO: Why does not this test work with remote host? def test_child_process_dies(self): - # test for FileNotFound exception during child_processes() function - with subprocess.Popen(["sleep", "60"]) as process: - self.assertEqual(process.poll(), None) - # collect list of processes currently running - children = psutil.Process(os.getpid()).children() - # kill a process, so received children dictionary becomes invalid - process.kill() - process.wait() - # try to handle children list -- missing processes will have ptype "ProcessType.Unknown" - [ProcessProxy(p) for p in children] + nAttempt = 0 + + while True: + if nAttempt == 5: + raise Exception("Max attempt number is exceed.") + + nAttempt += 1 + + logging.info("Attempt #{0}".format(nAttempt)) + + # test for FileNotFound exception during child_processes() function + with subprocess.Popen(["sleep", "60"]) as process: + r = process.poll() + + if r is not None: + logging.warning("process.pool() returns an unexpected result: {0}.".format(r)) + continue + + assert r is None + # collect list of processes currently running + children = psutil.Process(os.getpid()).children() + # kill a process, so received children dictionary becomes invalid + process.kill() + process.wait() + # try to handle children list -- missing processes will have ptype "ProcessType.Unknown" + [ProcessProxy(p) for p in children] + break + + @staticmethod + def helper__get_node(): + assert __class__.sm_conn_params is not None + return get_remote_node(conn_params=__class__.sm_conn_params) @staticmethod def helper__restore_envvar(name, prev_value): @@ -1074,23 +1172,20 @@ def helper__restore_envvar(name, prev_value): else: os.environ[name] = prev_value + @staticmethod + def helper__skip_test_if_util_not_exist(name: str): + assert type(name) == str # noqa: E721 + if not util_exists(name): + pytest.skip('might be missing') -if __name__ == '__main__': - if os_ops.environ('ALT_CONFIG'): - suite = unittest.TestSuite() - - # Small subset of tests for alternative configs (PG_BIN or PG_CONFIG) - suite.addTest(TestgresRemoteTests('test_pg_config')) - suite.addTest(TestgresRemoteTests('test_pg_ctl')) - suite.addTest(TestgresRemoteTests('test_psql')) - suite.addTest(TestgresRemoteTests('test_replicate')) - - print('Running tests for alternative config:') - for t in suite: - print(t) - print() + @staticmethod + def helper__skip_test_if_pg_version_is_not_ge(version: str): + assert type(version) == str # noqa: E721 + if not pg_version_ge(version): + pytest.skip('requires {0}+'.format(version)) - runner = unittest.TextTestRunner() - runner.run(suite) - else: - unittest.main() + @staticmethod + def helper__skip_test_if_pg_version_is_ge(version: str): + assert type(version) == str # noqa: E721 + if pg_version_ge(version): + pytest.skip('requires <{0}'.format(version))