Skip to content
Snippets Groups Projects
Select Git revision
  • 12aa31bbdd147ceb16424d142d62e1701e9ad20e
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

momqueryrpc.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    momqueryrpc.py 17.99 KiB
    #!/usr/bin/python
    
    # Copyright (C) 2017    ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    import sys
    import logging
    import pprint
    from optparse import OptionParser
    from lofar.messaging.RPC import RPC, RPCException, RPCWrapper
    from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
    from lofar.common.util import convertStringDigitKeysToInt
    
    ''' Simple RPC client for Service momqueryservice
    '''
    
    logger = logging.getLogger(__name__)
    
    
    class MoMQueryRPC(RPCWrapper):
        def __init__(self, busname=DEFAULT_MOMQUERY_BUSNAME,
                     servicename=DEFAULT_MOMQUERY_SERVICENAME,
                     broker=None,
                     timeout=120):
            super(MoMQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout)
    
        def add_trigger(self, user_name, host_name, project_name, meta_data):
            logger.info("Requestion AddTrigger for user_name: %s, host_name: %s, project_name: %s and meta_data: %s",
                        user_name, host_name, project_name, meta_data)
    
            row_id = self.rpc('AddTrigger',
                              user_name=user_name, host_name=host_name, project_name=project_name, meta_data=meta_data)
    
            logger.info("Received AddTrigger for user_name (%s), host_name(%s), project_name(%s) and meta_data(%s): %s",
                        user_name, host_name, project_name, meta_data, row_id)
    
            return row_id
    
        def get_project_priority(self, project_name):
            logger.info("Requestion GetProjectPriority for project_name: %s", project_name)
    
            priority = self.rpc('GetProjectPriority', project_name=project_name)
    
            logger.info("Received GetProjectPriority for project_name (%s): %s", project_name, priority)
    
            return priority
    
        def allows_triggers(self, project_name):
            """returns whether a project is allowed to submit triggers
            :param project_name:
            :return: Boolean
            """
            logger.info("Requesting AllowsTriggers for project_name: %s", project_name)
    
            result = self.rpc('AllowsTriggers', project_name=project_name)
    
            logger.info("Received AllowsTriggers for project_name (%s): %s", project_name, result)
    
            return result
    
        def authorized_add_with_status(self, user_name, project_name, job_type, status):
            """returns whether user is allowed in project to move a certain jobtype to a certain state
            :param user_name:
            :param project_name:
            :param job_type:
            :param status:
            :return: Boolean
            """
            logger.info("Requesting AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s",
                        user_name, project_name, job_type, status)
            result = self.rpc('AutorizedAddWithStatus', user_name=user_name, project_name=project_name, job_type=job_type,
                              status=status)
            logger.info(
                "Received AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s result: %s",
                user_name, project_name, job_type, status, result)
            return result
    
        def folderExists(self, folder):
            """returns true if folder exists
            :param folder:
            :return: Boolean
            """
            logger.info("Requesting folder: %s exists", folder)
            result = self.rpc('FolderExists', folder=folder)
            logger.info("Received folder exists: %s", result)
            return result
    
        def isProjectActive(self, project_name):
            """returns true if project is available and active
            :param project_name:
            :return: Boolean
            """
            logger.info("Requesting if project: %s is active", project_name)
            result = self.rpc('IsProjectActive', project_name=project_name)
            logger.info("Received Project is active: %s", result)
            return result
    
        def get_trigger_id(self, mom_id):
            """returns trigger id if mom_id has a trigger else None
            :param mom_id:
            :return: Integer or None
            """
            logger.info("Requesting GetTriggerId for mom_id: %s", mom_id)
            result = self.rpc('GetTriggerId', mom_id=mom_id)
            logger.info("Received trigger_id: %s", result)
            return result
    
        def get_trigger_quota(self, project_name):
            """returns trigger quota as (current,max) tuple for project with given name
            :param project_name
            :return: (Integer, Integer)
            """
            logger.info("Requesting GetTriggerQuota for project: %s", project_name)
            result = self.rpc('GetTriggerId', project_name=project_name)
            logger.info("Received trigger quota: %s", result)
            return result
    
        def update_trigger_quota(self, project_name):
            """
            count all the accepted triggers that are not cancelled, and update the trigger quota field in mom accordingly
            returns updated quota as (current, max) tuple (same as get_trigger_quota)
            :param project_name
            :return: (Integer, Integer)
            """
            logger.info("Requesting UpdateTriggerQuota for project: %s", project_name)
            result = self.rpc('UpdateTriggerQuota', project_name=project_name)
            logger.info("Received updated trigger quota: %s", result)
            return result
    
        def cancel_trigger(self, trigger_id, reason):
            """ flags trigger as canceled and returns updated trigger quota as (current, max) tuple
            :param trigger_id, reason
            :return (Integer, Integer)
            """
            logger.info("Requesting CancelTrigger for trigger id: %s | reason: %s", trigger_id, reason)
            result = self.rpc('CancelTrigger', trigger_id=trigger_id, reason=reason)
            logger.info("Requesting CancelTrigger for trigger id %s returned updated project trigger quota: %s", trigger_id, result)
            return result
    
        def get_project_details(self, mom_id):
            """returns email addresses of pi and contact author for a project mom id
            :param mom_id
            :rtype dict with pi and contact author email addresses"""
            logger.info("Requesting GetProjectDetails for mom_id: %s", mom_id)
            result = self.rpc('GetProjectDetails', mom_id=mom_id)
            logger.info("Received project_details: %s", result)
            return result
    
        def get_project_priorities_for_objects(self, mom_ids):
            '''get the project priorities for one or more mom ids
            :param ids single or list of mom ids
            :rtype dict with project priorities'''
            if isinstance(mom_ids, int) or isinstance(mom_ids, str):
                mom_ids = [mom_ids]
            mom_ids = [str(x) for x in mom_ids]
            ids_string = ', '.join(mom_ids)
    
            logger.info("Requesting project priorities for mom objects: %s", (str(ids_string)))
            result = self.rpc('GetProjectPrioritiesForObjects', mom_ids=ids_string)
            result = convertStringDigitKeysToInt(result)
            logger.info("Received project priorities for %s mom objects" % (len(result)))
            return result
    
        def getObjectDetails(self, ids):
            '''get the object details for one or more mom ids
            :param ids single or list of mom ids
            :rtype dict with project details'''
            if isinstance(ids, int) or isinstance(ids, str):
                ids = [ids]
            ids = [str(x) for x in ids]
            ids_string = ', '.join(ids)
    
            logger.info("Requesting details for mom objects: %s", (str(ids_string)))
            result = self.rpc('GetObjectDetails', mom_ids=ids_string)
            result = convertStringDigitKeysToInt(result)
            logger.info("Received details for %s mom objects" % (len(result)))
            return result
    
        def getProjects(self):
            '''get all projects
            :rtype dict with all projects'''
            logger.info("Requesting all projects")
            projects = self.rpc('GetProjects')
            for project in projects:
                project['statustime'] = project['statustime'].datetime()
    
            logger.info("Received %s projects", (len(projects)))
            return projects
    
        def getProject(self, project_mom2id):
            '''get projects by mo2_id'''
            logger.info("getProject(%s)", project_mom2id)
            project = self.rpc('GetProject', project_mom2id=project_mom2id)
            return project
    
        def getProjectTaskIds(self, project_mom2id):
            '''get all task mom2id's for the given project
            :rtype dict with all projects'''
            logger.info("getProjectTaskIds")
            task_ids = self.rpc('GetProjectTaskIds', project_mom2id=project_mom2id)
            return task_ids
    
        def getPredecessorIds(self, ids):
            logger.debug("getSuccessorIds(%s)", ids)
            result = self.rpc('GetPredecessorIds', mom_ids=ids)
            logger.info("GetPredecessorIds(%s): %s", ids, result)
            return result
    
        def getSuccessorIds(self, ids):
            logger.debug("getSuccessorIds(%s)", ids)
            result = self.rpc('GetSuccessorIds', mom_ids=ids)
            logger.info("getSuccessorIds(%s): %s", ids, result)
            return result
    
        def getTaskIdsInGroup(self, mom_group_ids):
            logger.debug("getTaskIdsInGroup(%s)", mom_group_ids)
            result = self.rpc('GetTaskIdsInGroup', mom_group_ids=mom_group_ids)
            logger.info("getTaskIdsInGroup(%s): %s", mom_group_ids, result)
            return result
    
        def getTaskIdsInParentGroup(self, mom_parent_group_ids):
            logger.debug("getTaskIdsInParentGroup(%s)", mom_parent_group_ids)
            result = self.rpc('GetTaskIdsInParentGroup', mom_parent_group_ids=mom_parent_group_ids)
            logger.info("getTaskIdsInParentGroup(%s): %s", mom_parent_group_ids, result)
            return result
    
        def getDataProducts(self, ids):
            logger.debug("getDataProducts(%s)", ids)
            result = self.rpc('GetDataProducts', mom_ids=ids)
            result = convertStringDigitKeysToInt(result)
            logger.info('Found # dataproducts per mom2id: %s', ', '.join('%s:%s' % (id, len(dps)) for id, dps in result.items()))
            return result
    
        def getMoMIdsForOTDBIds(self, otdb_ids):
            '''reverse lookup from otdb_id(s) to mom2id(s)
            returns: dict with otdb_id(s) in keys, mom2id(s) as values'''
            if isinstance(otdb_ids, int) or isinstance(otdb_ids, str):
                otdb_ids = [otdb_ids]
            logger.debug("getMoMIdsForOTDBIds(%s)", otdb_ids)
            result = self.rpc('GetMoMIdsForOTDBIds', otdb_ids=otdb_ids)
            result = convertStringDigitKeysToInt(result)
            return result
    
        def getOTDBIdsForMoMIds(self, mom_ids):
            '''lookup from mom2id(s) to otdb_id(s)
            returns: dict with mom2id(s) in keys, otdb_id(s) as values'''
            if isinstance(mom_ids, int) or isinstance(mom_ids, str):
                mom_ids = [mom_ids]
            logger.debug("getOTDBIdsForMoMIds(%s)", mom_ids)
            result = self.rpc('GetOTDBIdsForMoMIds', mom_ids=mom_ids)
            result = convertStringDigitKeysToInt(result)
            return result
    
        def getTaskIdsGraph(self, mom2id):
            '''Get the fully connected graph of interconnected tasks given any mom2id in that graph
            returns: dict with mom2id:node as key value pairs, where each node is a dict with items node_mom2id, predecessor_ids, successor_ids'''
            logger.debug("getTaskIdsGraph(%s)", mom2id)
            result = self.rpc('GetTaskIdsGraph', mom2id=mom2id)
            result = convertStringDigitKeysToInt(result)
            return result
    
        def get_station_selection(self, mom_id):
            """
            Get the station selection represented as resource groups with min/max values for given mom id.
            :param mom_id: int
            :return: list of dict
            """
            logger.info("Calling GetStationSelection for mom id "+str(mom_id))
            station_selection = self.rpc('GetStationSelection', mom_id=mom_id)
            return station_selection
    
        def get_time_restrictions(self, mom_id):
            """
            Returns min start and max end times and duration for given mom id.
            :param mom_id: int
            :return: dict
            """
            logger.info("Calling GetTimeRestrictions for mom id "+str(mom_id))
            time_restrictions = self.rpc('GetTimeRestrictions', mom_id=mom_id)
            return time_restrictions
    
    
    def main():
        # Check the invocation arguments
        parser = OptionParser('%prog [options]',
                              description='do requests to the momqueryservice from the commandline')
        parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
        parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker, default: [%default]')
        parser.add_option('-s', '--servicename', dest='servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name for this service, default: [%default]')
        parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
        parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get list of all projects')
        parser.add_option('-p', '--project_details', dest='project_details', type='int', help='get project details for mom object with given id')
        parser.add_option('-O', '--objects_details', dest='objects_details', type='int', help='get object details for mom object with given id')
        parser.add_option('--predecessors', dest='id_for_predecessors', type='int', help='get the predecessor id\'s for the given mom2id')
        parser.add_option('--successors', dest='id_for_successors', type='int', help='get the successors id\'s for the given mom2id')
        parser.add_option('-g', '--group', dest='group_id', type='int', help='get the tasks ids in the given group mom2id')
        parser.add_option('--parent_group', dest='parent_group_id', type='int', help='get the tasks ids in the given parent group mom2id')
        parser.add_option('-d', '--dataproducts', dest='id_for_dataproducts', type='int', help='get the dataproducts for the given mom2id')
        parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', help='get the mom2id for the given otdb_id')
        parser.add_option('-m', '--mom_id', dest='mom_id', type='int', help='get the otdb_id for the given mom2id')
        parser.add_option('-t', '--task_graph', dest='task_graph_mom2id', type='int', help='get the fully connected task graph given any mom2id in that graph')
        (options, args) = parser.parse_args()
    
        if len(sys.argv) == 1:
            parser.print_help()
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
                            level=logging.INFO if options.verbose else logging.WARN)
    
        with MoMQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc:
            if options.projects:
                projects = rpc.getProjects()
                for project in projects:
                    print project
    
            if options.project_details:
                project_details = rpc.get_project_details(options.project_details)
                if project_details:
                    for k, v in project_details.items():
                        print '  %s: %s' % (k, v)
                else:
                    print 'No results'
    
            if options.objects_details:
                objects_details = rpc.getObjectDetails(options.objects_details)
                if objects_details:
                    for k, v in objects_details.items():
                        print '  %s: %s' % (k, v)
                else:
                    print 'No results'
    
            if options.id_for_predecessors:
                predecessor_ids = rpc.getPredecessorIds(options.id_for_predecessors)
                if predecessor_ids:
                    for k, v in predecessor_ids.items():
                        print '  %s: %s' % (k, v)
                else:
                    print 'No results'
    
            if options.id_for_successors:
                successor_ids = rpc.getSuccessorIds(options.id_for_successors)
                if successor_ids:
                    for k, v in successor_ids.items():
                        print '  %s: %s' % (k, v)
                else:
                    print 'No results'
    
            if options.group_id:
                task_ids = rpc.getTaskIdsInGroup(options.group_id)
                if task_ids:
                    for k, v in task_ids.items():
                        print '  %s: %s' % (k, v)
                else:
                    print 'No results'
    
            if options.parent_group_id:
                task_ids = rpc.getTaskIdsInParentGroup(options.parent_group_id)
                if task_ids:
                    for k, v in task_ids.items():
                        print '  %s: %s' % (k, v)
                else:
                    print 'No results'
    
            if options.id_for_dataproducts:
                results = rpc.getDataProducts(options.id_for_dataproducts)
                if results:
                    for mom2id, dps in results.items():
                        print '  dataproducts for %s' % mom2id
                        pprint.pprint(dps)
                else:
                    print 'No results'
    
            if options.otdb_id:
                results = rpc.getMoMIdsForOTDBIds(options.otdb_id)
                if results and options.otdb_id in results:
                    print 'mom2id=%s for otdb_id=%s' % (results[options.otdb_id], options.otdb_id)
                else:
                    print 'No results'
    
            if options.mom_id:
                results = rpc.getOTDBIdsForMoMIds(options.mom_id)
                if results and options.mom_id in results:
                    print 'otdb_id=%s for mom2id=%s' % (results[options.mom_id], options.mom_id)
                else:
                    print 'No results'
    
            if options.task_graph_mom2id:
                result = rpc.getTaskIdsGraph(options.task_graph_mom2id)
                pprint.pprint(result)
    
    if __name__ == '__main__':
        main()