Select Git revision
momqueryservice.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
momqueryservice.py 13.44 KiB
#!/usr/bin/python
# $Id$
'''
Simple Service listening on momqueryservice.GetProjectDetails
which gives the project details for each requested mom object id
Example usage:
service side: just run this service somewhere where it can access the momdb and
a qpid broker.
Make sure the bus exists: qpid-config add exchange topic <busname>
client side: do a RPC call to the <busname>.GetProjectDetails with a
comma seperated string of mom2object id's as argument.
You get a dict of mom2id to project-details-dict back.
with RPC(busname, 'GetProjectDetails') as getProjectDetails:
res, status = getProjectDetails(ids_string)
'''
from os import stat
import sys
import logging
import time
from optparse import OptionParser
from mysql import connector
from mysql.connector.errors import OperationalError
from lofar.messaging import Service
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.common import dbcredentials
logger=logging.getLogger(__file__)
def _idsFromString(id_string):
if not isinstance(id_string, basestring):
raise ValueError('Expected a string, got a ' + str(type(id_string)))
# parse text: it should contain a list of ints
# filter out everything else to prevent sql injection
ids = [int(y) for y in [x.strip() for x in id_string.split(',')] if y.isdigit()]
return ids
def _isListOfInts(items):
if not items:
return False
if not isinstance(items, list):
return False
for x in items:
if not isinstance(x, int):
return False
return True
def _toIdsString(mom_ids):
if isinstance(mom_ids, int):
ids = [mom_ids]
elif _isListOfInts(mom_ids):
ids = mom_ids
else:
ids = _idsFromString(mom_ids)
if not ids:
raise ValueError("Could not find proper ids in: " + mom_ids)
ids_str = ','.join([str(id) for id in ids])
return ids_str
class MoMDatabaseWrapper:
'''handler class for details query in mom db'''
def __init__(self, dbcreds):
self.dbcreds = dbcreds
self.conn = None
def _connect(self):
if self.conn:
self.conn.close()
connect_options = self.dbcreds.mysql_connect_options()
connect_options['connection_timeout'] = 5
try:
logger.info("Connecting to %s" % self.dbcreds.stringWithHiddenPassword())
self.conn = connector.connect(**connect_options)
logger.debug("Connected to %s" % self.dbcreds.stringWithHiddenPassword())
except Exception as e:
logger.error(str(e))
self.conn = None
def _executeQuery(self, query):
# try to execute query on flaky lofar mysql connection
# max of 3 tries, on succes return result
# use new connection for every query,
# because on the flaky lofar network a connection may appear functional but returns improper results.
for i in range(3):
try:
self._connect()
cursor = self.conn.cursor(dictionary=True)
cursor.execute(query)
return cursor.fetchall()
except (OperationalError, AttributeError) as e:
logger.error(str(e))
def getProjectDetails(self, mom_ids):
''' get the project details (project_mom2id, project_name,
project_description, object_mom2id, object_name, object_description,
object_type, object_group_id, object_group_name, object_status) for given mom object mom_ids
:param mixed mom_ids comma seperated string of mom2object id's, or list of ints
:rtype list of dict's key value pairs with the project details
'''
if not mom_ids:
return {}
ids_str = _toIdsString(mom_ids)
logger.info("getProjectDetails for mom ids: %s" % ids_str)
# TODO: make a view for this query in momdb!
query = '''SELECT project.mom2id as project_mom2id, project.id as project_mom2objectid, project.name as project_name, project.description as project_description,
object.mom2id as object_mom2id, object.id as object_mom2objectid, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id, grp.id as object_group_mom2objectid, grp.name as object_group_name,
status.code as object_status
FROM mom2object as object
left join mom2object as project on project.id = object.ownerprojectid
left join mom2object as grp on grp.mom2id = object.group_id
left join mom2objectstatus as mostatus on object.currentstatusid = mostatus.id
inner join status on mostatus.statusid = status.id
where object.mom2id in (%s)
order by project_mom2id
''' % (ids_str,)
rows = self._executeQuery(query)
logger.info("Found %d results for mom id(s): %s" %
(len(rows) if rows else 0, ids_str))
result = {}
for row in rows:
object_mom2id = row['object_mom2id']
result[str(object_mom2id)] = dict(row)
logger.info(result)
return result
def getProjects(self):
''' get the list of all projects with columns (project_mom2id, project_name,
project_description, status_name, status_id, last_user_id,
last_user_name, statustime)
:rtype list of dict's key value pairs with all projects
'''
# TODO: make a view for this query in momdb!
query = '''SELECT project.mom2id as mom2id, project.name as name, project.description as description,
statustype.code as status_name, statustype.id as status_id,
status.userid as last_user_id, status.name as last_user_name, status.statustime as statustime
FROM mom2object as project
left join mom2objectstatus as status on project.currentstatusid = status.id
left join status as statustype on status.statusid=statustype.id
where project.mom2objecttype='PROJECT'
order by mom2id;
'''
result = self._executeQuery(query)
logger.info("Found %d projects" % (len(result), ))
return result
def getPredecessorIds(self, mom_ids):
if not mom_ids:
return {}
ids_str = _toIdsString(mom_ids)
logger.info("getPredecessorIds for mom ids: %s" % ids_str)
query = '''SELECT mom2id, predecessor
FROM mom2object
where mom2id in (%s)
order by mom2id;
''' % (ids_str,)
rows = self._executeQuery(query)
result = {}
for row in rows:
mom2id = row['mom2id']
pred_string = row['predecessor']
pred_id_list = [y[1:] for y in [x.strip() for x in pred_string.split(',')] if y[0] == 'M'] if pred_string else []
pred_id_list = [int(x) for x in pred_id_list if x.isdigit()]
result[str(mom2id)] = pred_id_list
for mom2id in ids_str.split(','):
if not mom2id in result:
result[mom2id] = []
logger.info('predecessors: %s', result)
return result
def getSuccessorIds(self, mom_ids):
if not mom_ids:
return {}
ids_str = _toIdsString(mom_ids)
logger.info("getSuccessorIds for mom ids: %s" % ids_str)
condition = ' OR '.join(['predecessor LIKE \'%%M%s%%\'' % x for x in ids_str.split(',')])
# TODO: make a view for this query in momdb!
query = '''SELECT mom2id, predecessor
FROM mom2object
where %s
order by mom2id;
''' % (condition,)
rows = self._executeQuery(query)
result = {}
for mom2id in ids_str.split(','):
result[mom2id] = []
for row in rows:
suc_mom2id = row['mom2id']
pred_string = row['predecessor']
pred_id_list = [y[1:] for y in [x.strip() for x in pred_string.split(',')] if y[0] == 'M'] if pred_string else []
for mom2id in ids_str.split(','):
if mom2id in pred_id_list:
result[str(mom2id)].append(suc_mom2id)
logger.info('successors: %s', result)
return result
def getTaskIdsInGroup(self, mom_group_ids):
if not mom_group_ids:
return {}
ids_str = _toIdsString(mom_group_ids)
logger.info("getTaskIdsInGroup for mom group ids: %s" % ids_str)
query = '''SELECT mom2id, group_id FROM mom2object
where group_id in (%s)
and (mom2objecttype = 'LOFAR_OBSERVATION' or mom2objecttype like \'%%PIPELINE%%\')''' % ids_str
rows = self._executeQuery(query)
result = {}
for group_id in ids_str.split(','):
result[group_id] = []
for row in rows:
mom2id = row['mom2id']
group_id = row['group_id']
result[str(group_id)].append(mom2id)
logger.info('task ids per group: %s', result)
return result
def getDataProducts(self, mom_ids):
if not mom_ids:
return {}
ids_str = _toIdsString(mom_ids)
logger.info("getDataProducts for mom ids: %s" % ids_str)
query = '''SELECT mo.id as momobject_id, mo.mom2id as mom2id, dp.id, dp.name, dp.exported, dp.status, dp.fileformat
FROM mom2object mo
inner join dataproduct dp on mo.id = dp.mom2objectid
where mo.mom2id in (%s)
and not isnull(dp.fileformat)''' % ids_str
rows = self._executeQuery(query)
result = {}
for mom2id in ids_str.split(','):
result[mom2id] = []
for row in rows:
mom2id = row['mom2id']
result[str(mom2id)].append(dict(row))
for mom2id, dps in result.items():
logger.info('Found %s dataproducts for mom2id %s', len(dps), mom2id)
return result
class ProjectDetailsQueryHandler(MessageHandlerInterface):
'''handler class for details query in mom db
:param MoMDatabaseWrapper momdb inject database access via wrapper
'''
def __init__(self, **kwargs):
super(ProjectDetailsQueryHandler, self).__init__(**kwargs)
self.dbcreds = kwargs.pop("dbcreds", None)
self.service2MethodMap = {
'GetProjects': self.getProjects,
'GetProjectDetails': self.getProjectDetails,
'GetPredecessorIds': self.getPredecessorIds,
'GetSuccessorIds': self.getSuccessorIds,
'GetTaskIdsInGroup': self.getTaskIdsInGroup,
'GetDataProducts': self.getDataProducts
}
def prepare_loop(self):
self.momdb = MoMDatabaseWrapper(self.dbcreds)
def getProjectDetails(self, mom_ids):
return self.momdb.getProjectDetails(mom_ids)
def getProjects(self):
return self.momdb.getProjects()
def getPredecessorIds(self, mom_ids):
return self.momdb.getPredecessorIds(mom_ids)
def getSuccessorIds(self, mom_ids):
return self.momdb.getSuccessorIds(mom_ids)
def getTaskIdsInGroup(self, mom_group_ids):
return self.momdb.getTaskIdsInGroup(mom_group_ids)
def getDataProducts(self, mom_ids):
return self.momdb.getDataProducts(mom_ids)
def createService(busname=DEFAULT_MOMQUERY_BUSNAME,
servicename=DEFAULT_MOMQUERY_SERVICENAME,
dbcreds=None,
handler=None,
broker=None):
'''create the GetProjectDetails on given busname
:param string busname: name of the bus on which this service listens
:param string servicename: name of the service
:param Credentials dbcreds: Credentials for the MoM database.
:param ProjectDetailsQueryHandler handler: ProjectDetailsQueryHandler class Type, or mock like type
:rtype: lofar.messaging.Service'''
if not handler:
handler = ProjectDetailsQueryHandler
return Service(servicename,
handler,
busname=busname,
numthreads=1,
use_service_methods=True,
verbose=False,
broker=broker,
handler_args={'dbcreds' : dbcreds})
def main():
'''Starts the momqueryservice.GetProjectDetails service'''
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the momqueryservice')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus exchange on the qpid broker, [default: %default]")
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name for this service, [default: %default]")
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials="MoM")
(options, args) = parser.parse_args()
dbcreds = dbcredentials.parse_options(options)
logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
# start the service and listen.
with createService(busname=options.busname,
servicename=options.servicename,
broker=options.broker,
dbcreds=dbcreds):
waitForInterrupt()
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
main()