#!/usr/bin/python # $Id$ ''' Simple Service listening on momqueryservice.GetObjectDetails which gives the project details for each requested mom object id Example usage: service side: just run this service somewhere where it can access the momdb and a qpid broker. Make sure the bus exists: qpid-config add exchange topic <busname> client side: do a RPC call to the <busname>.GetObjectDetails with a comma seperated string of mom2object id's as argument. You get a dict of mom2id to project-details-dict back. with RPC(busname, 'GetObjectDetails') as getObjectDetails: res, status = getObjectDetails(ids_string) ''' import logging from optparse import OptionParser from lofar.messaging import Service from lofar.messaging import setQpidLogLevel from lofar.messaging.Service import MessageHandlerInterface from lofar.common.util import waitForInterrupt from lofar.sas.resourceassignment.database import radb from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME from lofar.common import dbcredentials from lofar.common.util import convertIntKeysToString logger = logging.getLogger(__name__) class RADBHandler(MessageHandlerInterface): def __init__(self, **kwargs): super(RADBHandler, self).__init__(**kwargs) self.dbcreds = kwargs.pop("dbcreds", None) self.log_queries = kwargs.pop("log_queries", False) self.service2MethodMap = { 'GetResourceClaimStatuses': self._getResourceClaimStatuses, 'GetResourceClaimProperties': self._getResourceClaimProperties, 'InsertResourceClaimProperty': self._insertResourceClaimProperty, 'GetResourceClaimPropertyTypes': self._getResourceClaimPropertyTypes, 'GetRcuSpecifications': self._getRcuSpecifications, 'GetRcuSpecification': self._getRcuSpecification, 'InsertRcuSpecifications': self._insertRcuSpecifications, 'InsertRcuSpecification': self._insertRcuSpecification, 'GetResourceClaims': self._getResourceClaims, 'GetResourceClaim': self._getResourceClaim, 'InsertResourceClaims': self._insertResourceClaims, 'InsertResourceClaim': self._insertResourceClaim, 'DeleteResourceClaim': self._deleteResourceClaim, 'UpdateResourceClaim': self._updateResourceClaim, 'UpdateResourceClaims': self._updateResourceClaims, 'UpdateTaskAndResourceClaims': self._updateTaskAndResourceClaims, 'GetResourceUsages': self._getResourceUsages, 'GetResourceGroupTypes': self._getResourceGroupTypes, 'GetResourceGroups': self._getResourceGroups, 'GetResourceGroupNames': self._getResourceGroupNames, 'GetResourceGroupMemberships': self._getResourceGroupMemberships, 'GetResourceTypes': self._getResourceTypes, 'GetResources': self._getResources, 'UpdateResourceAvailability': self._updateResourceAvailability, 'GetTasksTimeWindow': self._getTasksTimeWindow, 'GetTasks': self._getTasks, 'GetTask': self._getTask, 'InsertTask': self._insertTask, 'DeleteTask': self._deleteTask, 'UpdateTask': self._updateTask, 'UpdateTaskStatusForOtdbId': self._updateTaskStatusForOtdbId, 'GetTaskPredecessorIds': self._getTaskPredecessorIds, 'GetTaskSuccessorIds': self._getTaskSuccessorIds, 'InsertTaskPredecessor': self._insertTaskPredecessor, 'InsertTaskPredecessors': self._insertTaskPredecessors, 'GetTaskStatuses': self._getTaskStatuses, 'GetTaskTypes': self._getTaskTypes, 'GetSpecifications': self._getSpecifications, 'GetSpecification': self._getSpecification, 'InsertSpecificationAndTask': self._insertSpecificationAndTask, 'InsertSpecification': self._insertSpecification, 'DeleteSpecification': self._deleteSpecification, 'UpdateSpecification': self._updateSpecification, 'GetUnits': self._getUnits, 'GetResourceAllocationConfig': self._getResourceAllocationConfig, 'get_conflicting_overlapping_claims': self._get_conflicting_overlapping_claims, 'get_conflicting_overlapping_tasks': self._get_conflicting_overlapping_tasks, 'get_max_resource_usage_between': self._get_max_resource_usage_between, 'get_resource_claimable_capacity': self._get_resource_claimable_capacity } def prepare_loop(self): self.radb = radb.RADatabase(dbcreds=self.dbcreds, log_queries=self.log_queries) def _getTaskStatuses(self): return self.radb.getTaskStatuses() def _getTaskTypes(self): return self.radb.getTaskTypes() def _getRcuSpecifications(self, **kwargs): return self.radb.getRcuSpecifications(rcu_ids=kwargs.get('rcu_ids')) def _getRcuSpecification(self, **kwargs): return self.radb.getRcuSpecification(rcu_id=kwargs.get('rcu_id')) def _insertRcuSpecifications(self, **kwargs): return self.radb.insertRcuSpecifications(rcu_patterns_list=kwargs.get('rcu_patterns_list'), commit=kwargs.get('commit')) def _insertRcuSpecification(self, **kwargs): return self.radb.insertRcuSpecification(rcu_pattern=kwargs.get('rcu_pattern'), commit=kwargs.get('commit')) def _getResourceClaimStatuses(self): return self.radb.getResourceClaimStatuses() def _getResourceClaimPropertyTypes(self): return self.radb.getResourceClaimPropertyTypes() def _getResourceClaimProperties(self, **kwargs): return self.radb.getResourceClaimProperties(claim_ids=kwargs.get('claim_ids'), task_id=kwargs.get('task_id')) def _insertResourceClaimProperty(self, **kwargs): id = self.radb.insertResourceClaimProperty(kwargs.get('claim_id'), kwargs.get('property_type'), kwargs.get('value'), kwargs.get('io_type')) return {'id':id} def _getResourceClaims(self, **kwargs): return self.radb.getResourceClaims(claim_ids=kwargs.get('claim_ids'), lower_bound=kwargs.get('lower_bound').datetime() if kwargs.get('lower_bound') else None, upper_bound=kwargs.get('upper_bound').datetime() if kwargs.get('upper_bound') else None, resource_ids=kwargs.get('resource_ids'), task_ids=kwargs.get('task_ids'), status=kwargs.get('status'), resource_type=kwargs.get('resource_type'), extended=kwargs.get('extended', False), include_properties=kwargs.get('include_properties')) def _getResourceClaim(self, **kwargs): claim = self.radb.getResourceClaim(kwargs['id']) return claim def _insertResourceClaims(self, **kwargs): logger.info('InsertResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) claims = kwargs['claims'] for claim in claims: claim['starttime'] = claim['starttime'].datetime() claim['endtime'] = claim['endtime'].datetime() ids = self.radb.insertResourceClaims(kwargs['task_id'], claims, kwargs['username'], kwargs['user_id']) return {'ids':ids} def _insertResourceClaim(self, **kwargs): logger.info('InsertResourceClaim: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = self.radb.insertResourceClaim(kwargs['resource_id'], kwargs['task_id'], kwargs['starttime'].datetime(), kwargs['endtime'].datetime(), kwargs.get('status_id', kwargs.get('status')), kwargs['claim_size'], kwargs['username'], kwargs['user_id'], kwargs['rcu_id'], kwargs.get('properties')) return {'id':id} def _deleteResourceClaim(self, **kwargs): logger.info('DeleteResourceClaim: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id'] deleted = self.radb.deleteResourceClaim(id) return {'id': id, 'deleted': deleted} def _updateResourceClaim(self, **kwargs): logger.info('UpdateResourceClaim: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id'] updated = self.radb.updateResourceClaim(id, resource_id=kwargs.get('resource_id'), task_id=kwargs.get('task_id'), starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, status=kwargs.get('status_id', kwargs.get('status')), claim_size=kwargs.get('claim_size'), username=kwargs.get('username'), user_id=kwargs.get('user_id')) return {'id': id, 'updated': updated} def _updateResourceClaims(self, **kwargs): logger.info('UpdateResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) task_id = kwargs['task_id'] updated = self.radb.updateResourceClaims(where_resource_claim_ids=kwargs.get('where_resource_claim_ids'), where_task_ids=kwargs.get('where_task_ids'), where_resource_types=kwargs.get('where_resource_types'), resource_id=kwargs.get('resource_id'), task_id=kwargs.get('task_id'), starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, status=kwargs.get('status_id', kwargs.get('status')), claim_size=kwargs.get('status'), username=kwargs.get('username'), user_id=kwargs.get('user_id'), used_rcus=None) return {'task_id': task_id, 'updated': updated} def _updateTaskAndResourceClaims(self, **kwargs): logger.info('UpdateTaskAndResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) task_id = kwargs['task_id'] updated = self.radb.updateTaskAndResourceClaims(task_id, starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, task_status=kwargs.get('task_status_id', kwargs.get('task_status')), claim_status=kwargs.get('claim_status_id', kwargs.get('claim_status')), username=kwargs.get('username'), user_id=kwargs.get('user_id'), where_resource_types=kwargs.get('where_resource_types'), commit=kwargs.get('commit', True)) return {'task_id': task_id, 'updated': updated} def _getResourceUsages(self, **kwargs): usages = self.radb.getResourceUsages(claim_ids=kwargs.get('claim_ids'), lower_bound=kwargs.get('lower_bound').datetime() if kwargs.get('lower_bound') else None, upper_bound=kwargs.get('upper_bound').datetime() if kwargs.get('upper_bound') else None, resource_ids=kwargs.get('resource_ids'), task_ids=kwargs.get('task_ids'), status=kwargs.get('status'), resource_type=kwargs.get('resource_type')) return usages def _getResourceGroupTypes(self): return self.radb.getResourceGroupTypes() def _getResourceGroups(self): return self.radb.getResourceGroups() def _getResourceGroupNames(self, **kwargs): return self.radb.getResourceGroupNames(resourceGroupTypeName=kwargs.get('resourceGroupTypeName')) def _getResourceGroupMemberships(self): rg_memberships = self.radb.getResourceGroupMemberships() rg_memberships = convertIntKeysToString(rg_memberships) return rg_memberships def _getResourceTypes(self): return self.radb.getResourceTypes() def _getResources(self, **kwargs): return self.radb.getResources(resource_ids=kwargs.get('resource_ids'), resource_types=kwargs.get('resource_types'), include_availability=kwargs.get('include_availability', False)) def _updateResourceAvailability(self, **kwargs): return self.radb.updateResourceAvailability(resource_id=kwargs['resource_id'], active=kwargs.get('active'), available_capacity=kwargs.get('available_capacity'), total_capacity=kwargs.get('total_capacity')) def _getTasksTimeWindow(self, **kwargs): logger.info('GetTasksTimeWindow: %s' % dict({k:v for k,v in kwargs.items() if v != None})) return self.radb.getTasksTimeWindow(task_ids=kwargs.get('task_ids'), mom_ids=kwargs.get('mom_ids'), otdb_ids=kwargs.get('otdb_ids')) def _getTasks(self, **kwargs): logger.info('GetTasks: %s' % dict({k:v for k,v in kwargs.items() if v != None})) return self.radb.getTasks(lower_bound=kwargs.get('lower_bound').datetime() if kwargs.get('lower_bound') else None, upper_bound=kwargs.get('upper_bound').datetime() if kwargs.get('upper_bound') else None, task_ids=kwargs.get('task_ids'), task_status=kwargs.get('task_status'), task_type=kwargs.get('task_type'), mom_ids=kwargs.get('mom_ids'), otdb_ids=kwargs.get('otdb_ids'), cluster=kwargs.get('cluster')) def _getTask(self, **kwargs): logger.info('GetTask: %s' % dict({k:v for k,v in kwargs.items() if v != None})) task = self.radb.getTask(id=kwargs.get('id'), mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id'), specification_id=kwargs.get('specification_id')) return task def _insertTask(self, **kwargs): logger.info('InsertTask: %s' % dict({k:v for k,v in kwargs.items() if v != None})) task_id = self.radb.insertTask(kwargs['mom_id'], kwargs['otdb_id'], kwargs.get('status_id', kwargs.get('task_status', 'prepared')), kwargs.get('type_id', kwargs.get('task_type')), kwargs['specification_id']) return {'id':task_id } def _deleteTask(self, **kwargs): logger.info('DeleteTask: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id'] deleted = self.radb.deleteTask(id) return {'id': id, 'deleted': deleted} def _updateTaskStatusForOtdbId(self, **kwargs): logger.info('UpdateTaskStatusForOtdbId: %s' % dict({k:v for k,v in kwargs.items() if v != None})) otdb_id=kwargs.get('otdb_id') updated = self.radb.updateTaskStatusForOtdbId(otdb_id=otdb_id, task_status=kwargs.get('status_id', kwargs.get('task_status'))) return {'otdb_id': otdb_id, 'updated': updated} def _updateTask(self, **kwargs): logger.info('UpdateTask: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id'] updated = self.radb.updateTask(id, mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id'), task_status=kwargs.get('status_id', kwargs.get('task_status')), task_type=kwargs.get('type_id', kwargs.get('task_type')), specification_id=kwargs.get('specification_id')) return {'id': id, 'updated': updated} def _getTaskPredecessorIds(self, **kwargs): logger.info('GetTaskPredecessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None})) return convertIntKeysToString(self.radb.getTaskPredecessorIds(kwargs.get('id'))) def _getTaskSuccessorIds(self, **kwargs): logger.info('GetTaskSuccessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None})) return convertIntKeysToString(self.radb.getTaskSuccessorIds(kwargs.get('id'))) def _insertTaskPredecessor(self, **kwargs): id = self.radb.insertTaskPredecessor(kwargs['task_id'], kwargs['predecessor_id']) return {'id':id} def _insertTaskPredecessors(self, **kwargs): ids = self.radb.insertTaskPredecessors(kwargs['task_id'], kwargs['predecessor_ids']) return {'ids':ids} def _getSpecifications(self): return self.radb.getSpecifications() def _getSpecification(self, **kwargs): logger.info('GetSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None})) specification = self.radb.getSpecification(kwargs['id']) return specification def _insertSpecificationAndTask(self, **kwargs): logger.info('InsertSpecificationAndTask: %s' % dict({k:v for k,v in kwargs.items() if v != None and k != 'content'})) return self.radb.insertSpecificationAndTask(kwargs['mom_id'], kwargs['otdb_id'], kwargs['task_status'], kwargs['task_type'], kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, kwargs['content'], kwargs['cluster']) def _insertSpecification(self, **kwargs): logger.info('InsertSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None})) specification_id = self.radb.insertSpecification(kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, kwargs['content'], kwargs['cluster']) return {'id':specification_id} def _deleteSpecification(self, **kwargs): logger.info('DeleteSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id'] deleted = self.radb.deleteSpecification(id) return {'id': id, 'deleted': deleted} def _updateSpecification(self, **kwargs): logger.info('UpdateSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None})) id = kwargs['id'] updated = self.radb.updateSpecification(id, starttime=kwargs['starttime'].datetime() if 'starttime' in kwargs else None, endtime=kwargs['endtime'].datetime() if 'endtime' in kwargs else None, content=kwargs.get('content'), cluster=kwargs.get('cluster')) return {'id': id, 'updated': updated} def _getUnits(self): return self.radb.getUnits() def _getResourceAllocationConfig(self, **kwargs): return self.radb.getResourceAllocationConfig(sql_like_name_pattern=kwargs.get('sql_like_name_pattern')) def _get_conflicting_overlapping_claims(self, **kwargs): return self.radb.get_conflicting_overlapping_claims(**kwargs) def _get_conflicting_overlapping_tasks(self, **kwargs): return self.radb.get_conflicting_overlapping_tasks(**kwargs) def _get_max_resource_usage_between(self, **kwargs): return self.radb.get_max_resource_usage_between(**kwargs) def _get_resource_claimable_capacity(self, **kwargs): return self.radb.get_resource_claimable_capacity(**kwargs) def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None, dbcreds=None, log_queries=False, verbose=False): return Service(servicename, RADBHandler, busname=busname, broker=broker, use_service_methods=True, numthreads=8, handler_args={'dbcreds': dbcreds, 'log_queries': log_queries}, verbose=verbose) def main(): # make sure we run in UTC timezone import os os.environ['TZ'] = 'UTC' # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the resourceassignment database service') parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME) parser.add_option('-Q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="RADB") (options, args) = parser.parse_args() dbcreds = dbcredentials.parse_options(options) setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) with createService(busname=options.busname, servicename=options.servicename, broker=options.broker, verbose=options.verbose, log_queries=options.log_queries, dbcreds=dbcreds): waitForInterrupt() if __name__ == '__main__': main()