Skip to content
Snippets Groups Projects
Select Git revision
  • e3e9f86edfc855ae564c70d1159fe1612cd3c487
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

momqueryservice.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    momqueryservice.py 13.44 KiB
    #!/usr/bin/python
    # $Id$
    
    '''
    Simple Service listening on momqueryservice.GetProjectDetails
    which gives the project details for each requested mom object id
    
    Example usage:
    service side: just run this service somewhere where it can access the momdb and
    a qpid broker.
    Make sure the bus exists: qpid-config add exchange topic <busname>
    
    client side: do a RPC call to the <busname>.GetProjectDetails with a
    comma seperated string of mom2object id's as argument.
    You get a dict of mom2id to project-details-dict back.
    
    with RPC(busname, 'GetProjectDetails') as getProjectDetails:
        res, status = getProjectDetails(ids_string)
    
    '''
    from os import stat
    import sys
    import logging
    import time
    from optparse import OptionParser
    from mysql import connector
    from mysql.connector.errors import OperationalError
    from lofar.messaging import Service
    from lofar.messaging.Service import MessageHandlerInterface
    from lofar.common.util import waitForInterrupt
    from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
    from lofar.common import dbcredentials
    
    logger=logging.getLogger(__file__)
    
    def _idsFromString(id_string):
        if not isinstance(id_string, basestring):
            raise ValueError('Expected a string, got a ' + str(type(id_string)))
    
        # parse text: it should contain a list of ints
        # filter out everything else to prevent sql injection
        ids = [int(y) for y in [x.strip() for x in id_string.split(',')] if y.isdigit()]
        return ids
    
    def _isListOfInts(items):
        if not items:
            return False
    
        if not isinstance(items, list):
            return False
    
        for x in items:
            if not isinstance(x, int):
                return False
    
        return True
    
    def _toIdsString(mom_ids):
        if isinstance(mom_ids, int):
            ids = [mom_ids]
        elif _isListOfInts(mom_ids):
            ids = mom_ids
        else:
            ids = _idsFromString(mom_ids)
    
        if not ids:
            raise ValueError("Could not find proper ids in: " + mom_ids)
    
        ids_str = ','.join([str(id) for id in ids])
        return ids_str
    
    class MoMDatabaseWrapper:
        '''handler class for details query in mom db'''
        def __init__(self, dbcreds):
            self.dbcreds = dbcreds
            self.conn = None
    
        def _connect(self):
            if self.conn:
                self.conn.close()
    
            connect_options = self.dbcreds.mysql_connect_options()
            connect_options['connection_timeout'] = 5
            try:
                logger.info("Connecting to %s" % self.dbcreds.stringWithHiddenPassword())
                self.conn = connector.connect(**connect_options)
                logger.debug("Connected to %s" % self.dbcreds.stringWithHiddenPassword())
            except Exception as e:
                logger.error(str(e))
                self.conn = None
    
        def _executeQuery(self, query):
            # try to execute query on flaky lofar mysql connection
            # max of 3 tries, on succes return result
            # use new connection for every query,
            # because on the flaky lofar network a connection may appear functional but returns improper results.
            for i in range(3):
                try:
                    self._connect()
                    cursor = self.conn.cursor(dictionary=True)
                    cursor.execute(query)
                    return cursor.fetchall()
                except (OperationalError, AttributeError) as e:
                    logger.error(str(e))
    
        def getProjectDetails(self, mom_ids):
            ''' get the project details (project_mom2id, project_name,
            project_description, object_mom2id, object_name, object_description,
            object_type, object_group_id, object_group_name, object_status) for given mom object mom_ids
            :param mixed mom_ids comma seperated string of mom2object id's, or list of ints
            :rtype list of dict's key value pairs with the project details
            '''
            if not mom_ids:
                return {}
    
            ids_str = _toIdsString(mom_ids)
    
            logger.info("getProjectDetails for mom ids: %s" % ids_str)
    
            # TODO: make a view for this query in momdb!
            query = '''SELECT project.mom2id as project_mom2id, project.id as project_mom2objectid, project.name as project_name, project.description as project_description,
            object.mom2id as object_mom2id, object.id as object_mom2objectid, object.name as object_name, object.description as object_description, object.mom2objecttype as object_type, object.group_id as object_group_id, grp.id as object_group_mom2objectid, grp.name as object_group_name,
            status.code as object_status
            FROM mom2object as object
            left join mom2object as project on project.id = object.ownerprojectid
            left join mom2object as grp on grp.mom2id = object.group_id
            left join mom2objectstatus as mostatus on object.currentstatusid = mostatus.id
            inner join status on mostatus.statusid = status.id
            where object.mom2id in (%s)
            order by project_mom2id
            ''' % (ids_str,)
            rows = self._executeQuery(query)
    
            logger.info("Found %d results for mom id(s): %s" %
                        (len(rows) if rows else 0, ids_str))
    
            result = {}
            for row in rows:
                object_mom2id = row['object_mom2id']
                result[str(object_mom2id)] = dict(row)
    
            logger.info(result)
    
            return result
    
        def getProjects(self):
            ''' get the list of all projects with columns (project_mom2id, project_name,
            project_description, status_name, status_id, last_user_id,
            last_user_name, statustime)
            :rtype list of dict's key value pairs with all projects
            '''
            # TODO: make a view for this query in momdb!
            query = '''SELECT project.mom2id as mom2id, project.name as name, project.description as description,
                    statustype.code as status_name,  statustype.id as status_id,
            status.userid as last_user_id, status.name as last_user_name, status.statustime as statustime
            FROM mom2object as project
            left join mom2objectstatus as status on project.currentstatusid = status.id
            left join status as statustype on status.statusid=statustype.id
            where project.mom2objecttype='PROJECT'
            order by mom2id;
            '''
            result = self._executeQuery(query)
    
            logger.info("Found %d projects" % (len(result), ))
    
            return result
    
        def getPredecessorIds(self, mom_ids):
            if not mom_ids:
                return {}
    
            ids_str = _toIdsString(mom_ids)
    
            logger.info("getPredecessorIds for mom ids: %s" % ids_str)
    
            query = '''SELECT mom2id, predecessor
            FROM mom2object
            where mom2id in (%s)
            order by mom2id;
            ''' % (ids_str,)
            rows = self._executeQuery(query)
    
            result = {}
            for row in rows:
                mom2id = row['mom2id']
                pred_string = row['predecessor']
                pred_id_list = [y[1:] for y in [x.strip() for x in pred_string.split(',')] if y[0] == 'M'] if pred_string else []
                pred_id_list = [int(x) for x in pred_id_list if x.isdigit()]
                result[str(mom2id)] = pred_id_list
    
            for mom2id in ids_str.split(','):
                if not mom2id in result:
                    result[mom2id] = []
    
            logger.info('predecessors: %s', result)
    
            return result
    
        def getSuccessorIds(self, mom_ids):
            if not mom_ids:
                return {}
    
            ids_str = _toIdsString(mom_ids)
    
            logger.info("getSuccessorIds for mom ids: %s" % ids_str)
    
            condition = ' OR '.join(['predecessor LIKE \'%%M%s%%\'' % x for x in ids_str.split(',')])
    
            # TODO: make a view for this query in momdb!
            query = '''SELECT mom2id, predecessor
            FROM mom2object
            where %s
            order by mom2id;
            ''' % (condition,)
            rows = self._executeQuery(query)
    
            result = {}
            for mom2id in ids_str.split(','):
                result[mom2id] = []
    
            for row in rows:
                suc_mom2id = row['mom2id']
                pred_string = row['predecessor']
                pred_id_list = [y[1:] for y in [x.strip() for x in pred_string.split(',')] if y[0] == 'M'] if pred_string else []
                for mom2id in ids_str.split(','):
                    if mom2id in pred_id_list:
                        result[str(mom2id)].append(suc_mom2id)
    
            logger.info('successors: %s', result)
    
            return result
    
        def getTaskIdsInGroup(self, mom_group_ids):
            if not mom_group_ids:
                return {}
    
            ids_str = _toIdsString(mom_group_ids)
    
            logger.info("getTaskIdsInGroup for mom group ids: %s" % ids_str)
    
            query = '''SELECT mom2id, group_id FROM mom2object
            where group_id in (%s)
            and (mom2objecttype = 'LOFAR_OBSERVATION' or mom2objecttype like \'%%PIPELINE%%\')''' % ids_str
    
            rows = self._executeQuery(query)
    
            result = {}
            for group_id in ids_str.split(','):
                result[group_id] = []
    
            for row in rows:
                mom2id = row['mom2id']
                group_id = row['group_id']
                result[str(group_id)].append(mom2id)
    
            logger.info('task ids per group: %s', result)
    
            return result
    
        def getDataProducts(self, mom_ids):
            if not mom_ids:
                return {}
    
            ids_str = _toIdsString(mom_ids)
    
            logger.info("getDataProducts for mom ids: %s" % ids_str)
    
            query = '''SELECT mo.id as momobject_id, mo.mom2id as mom2id, dp.id, dp.name, dp.exported, dp.status, dp.fileformat
                        FROM mom2object mo
                        inner join dataproduct dp on mo.id = dp.mom2objectid
                        where mo.mom2id in (%s)
                        and not isnull(dp.fileformat)''' % ids_str
    
            rows = self._executeQuery(query)
    
            result = {}
            for mom2id in ids_str.split(','):
                result[mom2id] = []
    
            for row in rows:
                mom2id = row['mom2id']
                result[str(mom2id)].append(dict(row))
    
            for mom2id, dps in result.items():
                logger.info('Found %s dataproducts for mom2id %s', len(dps), mom2id)
    
            return result
    
    class ProjectDetailsQueryHandler(MessageHandlerInterface):
        '''handler class for details query in mom db
        :param MoMDatabaseWrapper momdb inject database access via wrapper
        '''
        def __init__(self, **kwargs):
            super(ProjectDetailsQueryHandler, self).__init__(**kwargs)
            self.dbcreds = kwargs.pop("dbcreds", None)
    
            self.service2MethodMap = {
                'GetProjects': self.getProjects,
                'GetProjectDetails': self.getProjectDetails,
                'GetPredecessorIds': self.getPredecessorIds,
                'GetSuccessorIds': self.getSuccessorIds,
                'GetTaskIdsInGroup': self.getTaskIdsInGroup,
                'GetDataProducts': self.getDataProducts
                }
    
        def prepare_loop(self):
            self.momdb = MoMDatabaseWrapper(self.dbcreds)
    
        def getProjectDetails(self, mom_ids):
            return self.momdb.getProjectDetails(mom_ids)
    
        def getProjects(self):
            return self.momdb.getProjects()
    
        def getPredecessorIds(self, mom_ids):
            return self.momdb.getPredecessorIds(mom_ids)
    
        def getSuccessorIds(self, mom_ids):
            return self.momdb.getSuccessorIds(mom_ids)
    
        def getTaskIdsInGroup(self, mom_group_ids):
            return self.momdb.getTaskIdsInGroup(mom_group_ids)
    
        def getDataProducts(self, mom_ids):
            return self.momdb.getDataProducts(mom_ids)
    
    
    def createService(busname=DEFAULT_MOMQUERY_BUSNAME,
                      servicename=DEFAULT_MOMQUERY_SERVICENAME,
                      dbcreds=None,
                      handler=None,
                      broker=None):
        '''create the GetProjectDetails on given busname
        :param string busname: name of the bus on which this service listens
        :param string servicename: name of the service
        :param Credentials dbcreds: Credentials for the MoM database.
        :param ProjectDetailsQueryHandler handler: ProjectDetailsQueryHandler class Type, or mock like type
        :rtype: lofar.messaging.Service'''
    
        if not handler:
            handler = ProjectDetailsQueryHandler
    
        return Service(servicename,
                       handler,
                       busname=busname,
                       numthreads=1,
                       use_service_methods=True,
                       verbose=False,
                       broker=broker,
                       handler_args={'dbcreds' : dbcreds})
    
    
    def main():
        '''Starts the momqueryservice.GetProjectDetails service'''
    
        # Check the invocation arguments
        parser = OptionParser("%prog [options]",
                              description='runs the momqueryservice')
        parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
        parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_MOMQUERY_BUSNAME, help="Name of the bus exchange on the qpid broker, [default: %default]")
        parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_MOMQUERY_SERVICENAME, help="Name for this service, [default: %default]")
        parser.add_option_group(dbcredentials.options_group(parser))
        parser.set_defaults(dbcredentials="MoM")
        (options, args) = parser.parse_args()
    
        dbcreds = dbcredentials.parse_options(options)
    
        logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
    
        # start the service and listen.
        with createService(busname=options.busname,
                           servicename=options.servicename,
                           broker=options.broker,
                           dbcreds=dbcreds):
            waitForInterrupt()
    
    if __name__ == '__main__':
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
        main()