Skip to content
Snippets Groups Projects
Commit baf15672 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: reconnect to momdb if connection lost

parent 4406a3fa
Branches
Tags
No related merge requests found
......@@ -21,8 +21,10 @@ with RPC(busname, 'GetProjectDetails') as getProjectDetails:
from os import stat
import sys
import logging
import time
from optparse import OptionParser
from mysql import connector
from mysql.connector.errors import OperationalError
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
......@@ -56,7 +58,52 @@ def _isListOfInts(items):
class MoMDatabaseWrapper:
'''handler class for details query in mom db'''
def __init__(self, dbcreds):
self.conn = connector.connect(**dbcreds.mysql_connect_options())
self.dbcreds = dbcreds
self.conn = None
def _connect(self):
connect_options = self.dbcreds.mysql_connect_options()
connect_options['connection_timeout'] = 5
try:
self.conn = connector.connect(**connect_options)
except Exception as e:
logger.error(str(e))
self.conn = None
def ensureConnection(self):
if not self.conn:
self._connect()
try:
# try a simple select
# if it fails, reconnect
cursor = self.conn.cursor()
cursor.execute('''SELECT * FROM lofar_mom3.project;''')
cursor.fetchall()
except (OperationalError, AttributeError) as e:
if isinstance(e, OperationalError):
logger.error(str(e))
for i in range(5):
logger.info("retrying to connect to mom database")
self._connect()
if self.conn:
logger.info("connected to mom database")
break
time.sleep(i*i)
def _executeQuery(self, query):
def doQuery(connection):
cursor = connection.cursor(dictionary=True)
cursor.execute(query)
return cursor.fetchall()
try:
return doQuery(self.conn)
except (OperationalError, AttributeError) as e:
if isinstance(e, OperationalError):
logger.error(str(e))
self.ensureConnection()
return doQuery(self.conn)
def getProjectDetails(self, mom_ids):
''' get the project details (project_mom2id, project_name,
......@@ -65,7 +112,6 @@ class MoMDatabaseWrapper:
:param mixed mom_ids comma seperated string of mom2object id's, or list of ints
:rtype list of dict's key value pairs with the project details
'''
if _isListOfInts(mom_ids):
ids = mom_ids
else:
......@@ -79,7 +125,6 @@ class MoMDatabaseWrapper:
logger.info("Query for mom id%s: %s" %
('\'s' if len(ids) > 1 else '', ids_str))
cursor = self.conn.cursor(dictionary=True)
# TODO: make a view for this query in momdb!
query = '''SELECT project.mom2id as project_mom2id, project.name as project_name, project.description as project_description,
object.mom2id as object_mom2id, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id
......@@ -88,9 +133,7 @@ class MoMDatabaseWrapper:
where object.mom2id in (%s)
order by project_mom2id
''' % (ids_str,)
cursor.execute(query)
rows = cursor.fetchall()
rows = self._executeQuery(query)
logger.info("Found %d results for mom id%s: %s" %
(len(rows), '\'s' if len(ids) > 1 else '', ids_str))
......@@ -110,7 +153,6 @@ class MoMDatabaseWrapper:
last_user_name, statustime)
:rtype list of dict's key value pairs with all projects
'''
cursor = self.conn.cursor(dictionary=True)
# TODO: make a view for this query in momdb!
query = '''SELECT project.mom2id as mom2id, project.name as name, project.description as description,
lofar_mom3.statustype.code as status_name, lofar_mom3.statustype.id as status_id,
......@@ -121,9 +163,7 @@ class MoMDatabaseWrapper:
where project.mom2objecttype='PROJECT'
order by mom2id;
'''
cursor.execute(query)
return cursor.fetchall()
return self._executeQuery(query)
class ProjectDetailsQueryHandler(MessageHandlerInterface):
......@@ -131,7 +171,7 @@ class ProjectDetailsQueryHandler(MessageHandlerInterface):
:param MoMDatabaseWrapper momdb inject database access via wrapper
'''
def __init__(self, **kwargs):
MessageHandlerInterface.__init__(self, **kwargs)
super(ProjectDetailsQueryHandler, self).__init__(**kwargs)
self.dbcreds = kwargs.pop("dbcreds", None)
self.service2MethodMap = {
......@@ -169,7 +209,7 @@ def createService(busname=DEFAULT_BUSNAME,
return Service(servicename,
handler,
busname=busname,
numthreads=1,
numthreads=2,
use_service_methods=True,
verbose=False,
broker=broker,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment