Select Git revision
momqueryrpc.py
-
Jörn Künsemöller authoredJörn Künsemöller authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
momqueryrpc.py 17.99 KiB
#!/usr/bin/python
# Copyright (C) 2017 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import sys
import logging
import pprint
from optparse import OptionParser
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.common.util import convertStringDigitKeysToInt
''' Simple RPC client for Service momqueryservice
'''
logger = logging.getLogger(__name__)
class MoMQueryRPC(RPCWrapper):
def __init__(self, busname=DEFAULT_MOMQUERY_BUSNAME,
servicename=DEFAULT_MOMQUERY_SERVICENAME,
broker=None,
timeout=120):
super(MoMQueryRPC, self).__init__(busname, servicename, broker, timeout=timeout)
def add_trigger(self, user_name, host_name, project_name, meta_data):
logger.info("Requestion AddTrigger for user_name: %s, host_name: %s, project_name: %s and meta_data: %s",
user_name, host_name, project_name, meta_data)
row_id = self.rpc('AddTrigger',
user_name=user_name, host_name=host_name, project_name=project_name, meta_data=meta_data)
logger.info("Received AddTrigger for user_name (%s), host_name(%s), project_name(%s) and meta_data(%s): %s",
user_name, host_name, project_name, meta_data, row_id)
return row_id
def get_project_priority(self, project_name):
logger.info("Requestion GetProjectPriority for project_name: %s", project_name)
priority = self.rpc('GetProjectPriority', project_name=project_name)
logger.info("Received GetProjectPriority for project_name (%s): %s", project_name, priority)
return priority
def allows_triggers(self, project_name):
"""returns whether a project is allowed to submit triggers
:param project_name:
:return: Boolean
"""
logger.info("Requesting AllowsTriggers for project_name: %s", project_name)
result = self.rpc('AllowsTriggers', project_name=project_name)
logger.info("Received AllowsTriggers for project_name (%s): %s", project_name, result)
return result
def authorized_add_with_status(self, user_name, project_name, job_type, status):
"""returns whether user is allowed in project to move a certain jobtype to a certain state
:param user_name:
:param project_name:
:param job_type:
:param status:
:return: Boolean
"""
logger.info("Requesting AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s",
user_name, project_name, job_type, status)
result = self.rpc('AutorizedAddWithStatus', user_name=user_name, project_name=project_name, job_type=job_type,
status=status)
logger.info(
"Received AutorizedAddWithStatus for user_name: %s project_name: %s job_type: %s status: %s result: %s",
user_name, project_name, job_type, status, result)
return result
def folderExists(self, folder):
"""returns true if folder exists
:param folder:
:return: Boolean
"""
logger.info("Requesting folder: %s exists", folder)
result = self.rpc('FolderExists', folder=folder)
logger.info("Received folder exists: %s", result)
return result
def isProjectActive(self, project_name):
"""returns true if project is available and active
:param project_name:
:return: Boolean
"""
logger.info("Requesting if project: %s is active", project_name)
result = self.rpc('IsProjectActive', project_name=project_name)
logger.info("Received Project is active: %s", result)
return result
def get_trigger_id(self, mom_id):
"""returns trigger id if mom_id has a trigger else None
:param mom_id:
:return: Integer or None
"""
logger.info("Requesting GetTriggerId for mom_id: %s", mom_id)
result = self.rpc('GetTriggerId', mom_id=mom_id)
logger.info("Received trigger_id: %s", result)
return result
def get_trigger_quota(self, project_name):
"""returns trigger quota as (current,max) tuple for project with given name
:param project_name
:return: (Integer, Integer)
"""
logger.info("Requesting GetTriggerQuota for project: %s", project_name)
result = self.rpc('GetTriggerId', project_name=project_name)
logger.info("Received trigger quota: %s", result)
return result
def update_trigger_quota(self, project_name):
"""
count all the accepted triggers that are not cancelled, and update the trigger quota field in mom accordingly
returns updated quota as (current, max) tuple (same as get_trigger_quota)
:param project_name
:return: (Integer, Integer)
"""
logger.info("Requesting UpdateTriggerQuota for project: %s", project_name)
result = self.rpc('UpdateTriggerQuota', project_name=project_name)
logger.info("Received updated trigger quota: %s", result)
return result
def cancel_trigger(self, trigger_id, reason):
""" flags trigger as canceled and returns updated trigger quota as (current, max) tuple
:param trigger_id, reason
:return (Integer, Integer)
"""
logger.info("Requesting CancelTrigger for trigger id: %s | reason: %s", trigger_id, reason)
result = self.rpc('CancelTrigger', trigger_id=trigger_id, reason=reason)
logger.info("Requesting CancelTrigger for trigger id %s returned updated project trigger quota: %s", trigger_id, result)
return result
def get_project_details(self, mom_id):
"""returns email addresses of pi and contact author for a project mom id
:param mom_id
:rtype dict with pi and contact author email addresses"""
logger.info("Requesting GetProjectDetails for mom_id: %s", mom_id)
result = self.rpc('GetProjectDetails', mom_id=mom_id)
logger.info("Received project_details: %s", result)
return result
def get_project_priorities_for_objects(self, mom_ids):
'''get the project priorities for one or more mom ids
:param ids single or list of mom ids
:rtype dict with project priorities'''
if isinstance(mom_ids, int) or isinstance(mom_ids, str):
mom_ids = [mom_ids]
mom_ids = [str(x) for x in mom_ids]
ids_string = ', '.join(mom_ids)
logger.info("Requesting project priorities for mom objects: %s", (str(ids_string)))
result = self.rpc('GetProjectPrioritiesForObjects', mom_ids=ids_string)
result = convertStringDigitKeysToInt(result)
logger.info("Received project priorities for %s mom objects" % (len(result)))
return result
def getObjectDetails(self, ids):
'''get the object details for one or more mom ids
:param ids single or list of mom ids
:rtype dict with project details'''
if isinstance(ids, int) or isinstance(ids, str):
ids = [ids]
ids = [str(x) for x in ids]
ids_string = ', '.join(ids)
logger.info("Requesting details for mom objects: %s", (str(ids_string)))
result = self.rpc('GetObjectDetails', mom_ids=ids_string)
result = convertStringDigitKeysToInt(result)
logger.info("Received details for %s mom objects" % (len(result)))
return result
def getProjects(self):
'''get all projects
:rtype dict with all projects'''
logger.info("Requesting all projects")
projects = self.rpc('GetProjects')
for project in projects:
project['statustime'] = project['statustime'].datetime()
logger.info("Received %s projects", (len(projects)))
return projects
def getProject(self, project_mom2id):
'''get projects by mo2_id'''
logger.info("getProject(%s)", project_mom2id)
project = self.rpc('GetProject', project_mom2id=project_mom2id)
return project
def getProjectTaskIds(self, project_mom2id):
'''get all task mom2id's for the given project
:rtype dict with all projects'''
logger.info("getProjectTaskIds")
task_ids = self.rpc('GetProjectTaskIds', project_mom2id=project_mom2id)
return task_ids
def getPredecessorIds(self, ids):
logger.debug("getSuccessorIds(%s)", ids)
result = self.rpc('GetPredecessorIds', mom_ids=ids)
logger.info("GetPredecessorIds(%s): %s", ids, result)
return result
def getSuccessorIds(self, ids):
logger.debug("getSuccessorIds(%s)", ids)
result = self.rpc('GetSuccessorIds', mom_ids=ids)
logger.info("getSuccessorIds(%s): %s", ids, result)
return result
def getTaskIdsInGroup(self, mom_group_ids):
logger.debug("getTaskIdsInGroup(%s)", mom_group_ids)
result = self.rpc('GetTaskIdsInGroup', mom_group_ids=mom_group_ids)
logger.info("getTaskIdsInGroup(%s): %s", mom_group_ids, result)
return result
def getTaskIdsInParentGroup(self, mom_parent_group_ids):
logger.debug("getTaskIdsInParentGroup(%s)", mom_parent_group_ids)
result = self.rpc('GetTaskIdsInParentGroup', mom_parent_group_ids=mom_parent_group_ids)
logger.info("getTaskIdsInParentGroup(%s): %s", mom_parent_group_ids, result)
return result
def getDataProducts(self, ids):
logger.debug("getDataProducts(%s)", ids)
result = self.rpc('GetDataProducts', mom_ids=ids)
result = convertStringDigitKeysToInt(result)
logger.info('Found # dataproducts per mom2id: %s', ', '.join('%s:%s' % (id, len(dps)) for id, dps in result.items()))
return result
def getMoMIdsForOTDBIds(self, otdb_ids):
'''reverse lookup from otdb_id(s) to mom2id(s)
returns: dict with otdb_id(s) in keys, mom2id(s) as values'''
if isinstance(otdb_ids, int) or isinstance(otdb_ids, str):
otdb_ids = [otdb_ids]
logger.debug("getMoMIdsForOTDBIds(%s)", otdb_ids)
result = self.rpc('GetMoMIdsForOTDBIds', otdb_ids=otdb_ids)
result = convertStringDigitKeysToInt(result)
return result
def getOTDBIdsForMoMIds(self, mom_ids):
'''lookup from mom2id(s) to otdb_id(s)
returns: dict with mom2id(s) in keys, otdb_id(s) as values'''
if isinstance(mom_ids, int) or isinstance(mom_ids, str):
mom_ids = [mom_ids]
logger.debug("getOTDBIdsForMoMIds(%s)", mom_ids)
result = self.rpc('GetOTDBIdsForMoMIds', mom_ids=mom_ids)
result = convertStringDigitKeysToInt(result)
return result
def getTaskIdsGraph(self, mom2id):
'''Get the fully connected graph of interconnected tasks given any mom2id in that graph
returns: dict with mom2id:node as key value pairs, where each node is a dict with items node_mom2id, predecessor_ids, successor_ids'''
logger.debug("getTaskIdsGraph(%s)", mom2id)
result = self.rpc('GetTaskIdsGraph', mom2id=mom2id)
result = convertStringDigitKeysToInt(result)
return result
def get_station_selection(self, mom_id):
"""
Get the station selection represented as resource groups with min/max values for given mom id.
:param mom_id: int
:return: list of dict
"""
logger.info("Calling GetStationSelection for mom id "+str(mom_id))
station_selection = self.rpc('GetStationSelection', mom_id=mom_id)
return station_selection
def get_time_restrictions(self, mom_id):
"""
Returns min start and max end times and duration for given mom id.
:param mom_id: int
:return: dict
"""
logger.info("Calling GetTimeRestrictions for mom id "+str(mom_id))
time_restrictions = self.rpc('GetTimeRestrictions', mom_id=mom_id)
return time_restrictions
def main():
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='do requests to the momqueryservice from the commandline')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option('-b', '--busname', dest='busname', type='string', default=DEFAULT_MOMQUERY_BUSNAME, help='Name of the bus exchange on the qpid broker, default: [%default]')
parser.add_option('-s', '--servicename', dest='servicename', type='string', default=DEFAULT_MOMQUERY_SERVICENAME, help='Name for this service, default: [%default]')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option('-P', '--projects', dest='projects', action='store_true', help='get list of all projects')
parser.add_option('-p', '--project_details', dest='project_details', type='int', help='get project details for mom object with given id')
parser.add_option('-O', '--objects_details', dest='objects_details', type='int', help='get object details for mom object with given id')
parser.add_option('--predecessors', dest='id_for_predecessors', type='int', help='get the predecessor id\'s for the given mom2id')
parser.add_option('--successors', dest='id_for_successors', type='int', help='get the successors id\'s for the given mom2id')
parser.add_option('-g', '--group', dest='group_id', type='int', help='get the tasks ids in the given group mom2id')
parser.add_option('--parent_group', dest='parent_group_id', type='int', help='get the tasks ids in the given parent group mom2id')
parser.add_option('-d', '--dataproducts', dest='id_for_dataproducts', type='int', help='get the dataproducts for the given mom2id')
parser.add_option('-o', '--otdb_id', dest='otdb_id', type='int', help='get the mom2id for the given otdb_id')
parser.add_option('-m', '--mom_id', dest='mom_id', type='int', help='get the otdb_id for the given mom2id')
parser.add_option('-t', '--task_graph', dest='task_graph_mom2id', type='int', help='get the fully connected task graph given any mom2id in that graph')
(options, args) = parser.parse_args()
if len(sys.argv) == 1:
parser.print_help()
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.INFO if options.verbose else logging.WARN)
with MoMQueryRPC(busname=options.busname, servicename=options.servicename, broker=options.broker) as rpc:
if options.projects:
projects = rpc.getProjects()
for project in projects:
print project
if options.project_details:
project_details = rpc.get_project_details(options.project_details)
if project_details:
for k, v in project_details.items():
print ' %s: %s' % (k, v)
else:
print 'No results'
if options.objects_details:
objects_details = rpc.getObjectDetails(options.objects_details)
if objects_details:
for k, v in objects_details.items():
print ' %s: %s' % (k, v)
else:
print 'No results'
if options.id_for_predecessors:
predecessor_ids = rpc.getPredecessorIds(options.id_for_predecessors)
if predecessor_ids:
for k, v in predecessor_ids.items():
print ' %s: %s' % (k, v)
else:
print 'No results'
if options.id_for_successors:
successor_ids = rpc.getSuccessorIds(options.id_for_successors)
if successor_ids:
for k, v in successor_ids.items():
print ' %s: %s' % (k, v)
else:
print 'No results'
if options.group_id:
task_ids = rpc.getTaskIdsInGroup(options.group_id)
if task_ids:
for k, v in task_ids.items():
print ' %s: %s' % (k, v)
else:
print 'No results'
if options.parent_group_id:
task_ids = rpc.getTaskIdsInParentGroup(options.parent_group_id)
if task_ids:
for k, v in task_ids.items():
print ' %s: %s' % (k, v)
else:
print 'No results'
if options.id_for_dataproducts:
results = rpc.getDataProducts(options.id_for_dataproducts)
if results:
for mom2id, dps in results.items():
print ' dataproducts for %s' % mom2id
pprint.pprint(dps)
else:
print 'No results'
if options.otdb_id:
results = rpc.getMoMIdsForOTDBIds(options.otdb_id)
if results and options.otdb_id in results:
print 'mom2id=%s for otdb_id=%s' % (results[options.otdb_id], options.otdb_id)
else:
print 'No results'
if options.mom_id:
results = rpc.getOTDBIdsForMoMIds(options.mom_id)
if results and options.mom_id in results:
print 'otdb_id=%s for mom2id=%s' % (results[options.mom_id], options.mom_id)
else:
print 'No results'
if options.task_graph_mom2id:
result = rpc.getTaskIdsGraph(options.task_graph_mom2id)
pprint.pprint(result)
if __name__ == '__main__':
main()