diff --git a/LCS/PyCommon/postgres.py b/LCS/PyCommon/postgres.py index da15f193da1d4760f5fb627e826c00d86dea0869..3f47c8c568b1b2f180c91a440025ae0e179ca26a 100644 --- a/LCS/PyCommon/postgres.py +++ b/LCS/PyCommon/postgres.py @@ -97,6 +97,15 @@ FETCH_NONE=0 FETCH_ONE=1 FETCH_ALL=2 +class PostgresDBError(Exception): + pass + +class PostgresDBConnectionError(PostgresDBError): + pass + +class PostgresDBQueryExecutionError(PostgresDBError): + pass + class PostgresDatabaseConnection: def __init__(self, dbcreds: DBCredentials, @@ -107,7 +116,6 @@ class PostgresDatabaseConnection: self.dbcreds = dbcreds self._connection = None self._log_queries = log_queries - self.__connection_retries = 0 self.__auto_commit_selects = auto_commit_selects self.__num_connect_retries = num_connect_retries self.__connect_retry_interval = connect_retry_interval @@ -117,7 +125,7 @@ class PostgresDatabaseConnection: logger.debug("already connected to database: %s", self.dbcreds.stringWithHiddenPassword()) return - for retry_cntr in range(self.__num_connect_retries): + for retry_cntr in range(self.__num_connect_retries+1): try: logger.debug("trying to connect to database using: %s", self.dbcreds.stringWithHiddenPassword()) @@ -144,13 +152,24 @@ class PostgresDatabaseConnection: # we have a proper connection, so return return - except Exception as e: - logger.error(e) - if retry_cntr == self.__num_connect_retries-1: - raise - - logger.info('retrying to connect to %s in %s seconds', self.database, self.__connect_retry_interval) - time.sleep(self.__connect_retry_interval) + except psycopg2.DatabaseError as dbe: + error_string = str(dbe).replace('\n', ' ') + logger.error(error_string) + + # see https://www.postgresql.org/docs/current/errcodes-appendix.html#ERRCODES-TABLE + if (isinstance(dbe, psycopg2.OperationalError) and re.search('connection', str(dbe), re.IGNORECASE)) or \ + (dbe.pgcode is not None and (dbe.pgcode.startswith('08') or dbe.pgcode.startswith('57P') or dbe.pgcode.startswith('53'))): + # try to reconnect on connection-like-errors + if retry_cntr == self.__num_connect_retries: + raise PostgresDBConnectionError("%s Error while connecting to %s. error=%s" % (self.__class__.__name__, + self.dbcreds.stringWithHiddenPassword(), + error_string)) + + logger.info('retrying to connect to %s in %s seconds', self.database, self.__connect_retry_interval) + time.sleep(self.__connect_retry_interval) + else: + # non-connection-error, raise generic PostgresDBError + raise PostgresDBError(error_string) def disconnect(self): if self.is_connected: @@ -168,7 +187,7 @@ class PostgresDatabaseConnection: @property def is_connected(self) -> bool: - return self._connection is not None + return self._connection is not None and self._connection.closed==0 def reconnect(self): self.disconnect() @@ -183,7 +202,8 @@ class PostgresDatabaseConnection: '''disconnects from the database''' self.disconnect() - def _queryAsSingleLine(self, query, qargs=None): + @staticmethod + def _queryAsSingleLine(query, qargs=None): line = ' '.join(query.replace('\n', ' ').split()) if qargs: line = line % tuple(['\'%s\'' % a if isinstance(a, str) else a for a in qargs]) @@ -193,7 +213,8 @@ class PostgresDatabaseConnection: '''execute the query and reconnect upon OperationalError''' try: # make sure we're connected - self.connect() + if not self.is_connected: + self.connect() query_log_line = self._queryAsSingleLine(query, qargs) @@ -228,17 +249,29 @@ class PostgresDatabaseConnection: except Exception as e: logger.error("error while fetching result(s) for %s: %s", query_log_line, e) - except psycopg2.OperationalError as e: - # TODO: should only retry query on connection errors - logger.error(str(e)) - while self.__connection_retries < 5: - logger.info("(re)trying to connect to database") - self.__connection_retries += 1 - self.connect() - if self._connection: - self.__connection_retries = 0 - return self.executeQuery(query, qargs, fetch) - time.sleep(i*i) + except psycopg2.OperationalError as oe: + error_string = str(oe).replace('\n', ' ') + + # see https://www.postgresql.org/docs/current/errcodes-appendix.html#ERRCODES-TABLE + if oe.pgcode.startswith('08') or oe.pgcode.startswith('57P') or oe.pgcode.startswith('53'): + # some connection error occured. + # try to reconnect a few times and execute the query againg when the connection returns + logger.warning(error_string) + + for retry_cntr in range(self.__num_connect_retries + 1): + try: + self.reconnect() + except PostgresDBConnectionError as e: + logger.error(e) + time.sleep(self.__connect_retry_interval) + else: + # hey, reconnect worked, and re-execute the query + # WARNING: possible stack-overflow, but that's the least of our problems compared to a lost db connection... + return self.executeQuery(query, qargs, fetch) + else: + # raise psycopg2 wrapped in our own PostgresDBQueryExecutionError + raise PostgresDBQueryExecutionError("Could not execute query '%s' error=%s" % (query_log_line, error_string)) + except (psycopg2.IntegrityError, psycopg2.ProgrammingError, psycopg2.InternalError, psycopg2.DataError) as e: self._log_database_notifications() logger.error("Rolling back query=\'%s\' due to error: \'%s\'" % (query_log_line, e)) diff --git a/LCS/PyCommon/test/CMakeLists.txt b/LCS/PyCommon/test/CMakeLists.txt index 6309f036769b6cb3b76377b36c9c9df46fdf24e1..b7c00cd2d37fcfd4ab607d4b5ffb8dd136fec65c 100644 --- a/LCS/PyCommon/test/CMakeLists.txt +++ b/LCS/PyCommon/test/CMakeLists.txt @@ -13,3 +13,4 @@ lofar_add_test(t_methodtrigger) lofar_add_test(t_util) lofar_add_test(t_test_utils) lofar_add_test(t_cep4_utils) +lofar_add_test(t_postgres) diff --git a/LCS/PyCommon/test/t_postgres.py b/LCS/PyCommon/test/t_postgres.py new file mode 100755 index 0000000000000000000000000000000000000000..0d065ff954fff68ed0a1cc4d300171ec382d28cb --- /dev/null +++ b/LCS/PyCommon/test/t_postgres.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 + +import unittest +from unittest import mock +from lofar.common.postgres import * +import testing.postgresql +import psycopg2 +import signal +from lofar.common.dbcredentials import Credentials + +import logging +logger = logging.getLogger(__name__) + +def setUpModule(): + pass + +def tearDownModule(): + pass + +class TestPostgres(unittest.TestCase): + def setUp(self): + logger.debug("starting test-postgres-database-instance...") + self.pgfactory = testing.postgresql.PostgresqlFactory() + self.pginstance = self.pgfactory() + + self.dbcreds = Credentials() + self.dbcreds.user = 'test_pg_user' + self.dbcreds.password = 'test_pg_password' # cannot be empty... + + dsn = self.pginstance.dsn() + + # update credentials from current testing pginstance (e.g. port changes for each test) + self.dbcreds.host = dsn['host'] + self.dbcreds.database = dsn['database'] + self.dbcreds.port = dsn['port'] + + # use 'normal' psycopg2 API to connect and setup the database, + # not the PostgresDatabaseConnection class, because that's the object-under-test. + logger.debug("connecting to %s test-postgres-database-instance...", self.dbcreds.stringWithHiddenPassword()) + with psycopg2.connect(**dsn) as connection: + with connection.cursor() as cursor: + logger.debug("creating database user and tables in %s test-postgres-database-instance...", self.dbcreds.stringWithHiddenPassword()) + cursor.execute("CREATE USER %s WITH SUPERUSER PASSWORD '%s';" % (self.dbcreds.user,self.dbcreds.password)) + cursor.execute("CREATE TABLE IF NOT EXISTS foo (id serial NOT NULL, bar text NOT NULL, PRIMARY KEY (id));") + connection.commit() + + logger.info("created and started %s test-postgres-database-instance", self.dbcreds.stringWithHiddenPassword()) + + def tearDown(self): + logger.debug("stopping %s test-postgres-database-instance...", self.dbcreds.stringWithHiddenPassword()) + self.pginstance.stop() + logger.info("stopped %s test-postgres-database-instance", self.dbcreds.stringWithHiddenPassword()) + + def test_connection_error_on_stopped_pginstance(self): + # force to pginstance to stop so we cannot connect to it. + self.pginstance.stop() + logger.info("stopped %s test-postgres-database-instance", self.dbcreds.stringWithHiddenPassword()) + + # test if connecting fails + with mock.patch('lofar.common.postgres.logger') as mocked_logger: + with self.assertRaises(PostgresDBConnectionError): + NUM_CONNECT_RETRIES = 2 + db = PostgresDatabaseConnection(dbcreds=self.dbcreds, connect_retry_interval=0.1, num_connect_retries=NUM_CONNECT_RETRIES) + db.connect() + + # check logging + self.assertEqual(NUM_CONNECT_RETRIES, len([ca for ca in mocked_logger.info.call_args_list if 'retrying to connect' in ca[0][0]])) + self.assertEqual(NUM_CONNECT_RETRIES+1, len([ca for ca in mocked_logger.debug.call_args_list if 'trying to connect to database' in ca[0][0]])) + self.assertEqual(NUM_CONNECT_RETRIES+1, len([ca for ca in mocked_logger.error.call_args_list if 'could not connect' in ca[0][0]])) + + def test_reconnect_on_connection_loss(self): + + # define a helper class on top of the normal PostgresDatabaseConnection + # which is able to restart the testing-pginstance after the first reconnect attempt (for testing purposes) + # the object under test is still the PostgresDatabaseConnection! + class HelperPostgresDatabaseConnection(PostgresDatabaseConnection): + def __init__(self, dbcreds: DBCredentials, pginstance): + super().__init__(dbcreds, log_queries=True, num_connect_retries=1, connect_retry_interval=.1) + self.pginstance = pginstance + self.reconnect_was_called = False + + def reconnect(self): + # do normal reconnect (won't work because the pginstance is stopped...) + try: + super().reconnect() + #ignore any exceptions here, should be handled by the normal PostgresDatabaseConnection + finally: + self.reconnect_was_called = True + if not self.pginstance.is_alive(): + # restart the pginstance, so any next reconnect will work + logger.info("restarting test-postgres-database-instance...") + self.pginstance.start() + logger.info("restarted test-postgres-database-instance") + + with HelperPostgresDatabaseConnection(dbcreds=self.dbcreds, pginstance=self.pginstance) as db: + # insert some test data + db.executeQuery("INSERT INTO foo (bar) VALUES ('my_value');") + db.commit() + + # do normal select query, should work. + result = db.executeQuery("SELECT * from foo;", fetch=FETCH_ALL) + self.assertEqual([{'id':1, 'bar': 'my_value'}], result) + + # terminate the pginstance (simulating a production database malfunction) + self.pginstance.terminate(signal.SIGTERM) + logger.info("terminated %s test-postgres-database-instance", self.dbcreds.stringWithHiddenPassword()) + + # prove that the database is down by trying to connect which results in a PostgresDBConnectionError + with self.assertRaises(PostgresDBConnectionError): + with PostgresDatabaseConnection(dbcreds=self.dbcreds, num_connect_retries=0): + pass + + # do normal select query on our original connection again (even though the database itself is down) + # should work anyway, because the executeQuery tries to reconnect, + # and our helper class restarts the database server after the first reconnect attempt. + result2 = db.executeQuery("SELECT * from foo;", fetch=FETCH_ALL) + self.assertEqual([{'id':1, 'bar': 'my_value'}], result2) + self.assertTrue(db.reconnect_was_called) + + +logging.basicConfig(format='%(asctime)s %(levelname)s %(process)s %(threadName)s %(message)s', level=logging.DEBUG) + +if __name__ == "__main__": + unittest.main() diff --git a/LCS/PyCommon/test/t_postgres.run b/LCS/PyCommon/test/t_postgres.run new file mode 100755 index 0000000000000000000000000000000000000000..05c53f094a11a78c708480291e70cdda3c25a44f --- /dev/null +++ b/LCS/PyCommon/test/t_postgres.run @@ -0,0 +1,4 @@ +#!/bin/bash +source python-coverage.sh + +python_coverage_test postgres t_postgres.py diff --git a/LCS/PyCommon/test/t_postgres.sh b/LCS/PyCommon/test/t_postgres.sh new file mode 100755 index 0000000000000000000000000000000000000000..34699f2f49dfbe079eaa1fb5d7068c6dcfc699ab --- /dev/null +++ b/LCS/PyCommon/test/t_postgres.sh @@ -0,0 +1,2 @@ +#!/bin/sh +./runctest.sh t_postgres