Select Git revision
generate_input.sh
-
Matthijs van der Wild authoredMatthijs van der Wild authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
service.py 20.35 KiB
#!/usr/bin/python
# $Id$
'''
Simple Service listening on momqueryservice.GetProjectDetails
which gives the project details for each requested mom object id
Example usage:
service side: just run this service somewhere where it can access the momdb and
a qpid broker.
Make sure the bus exists: qpid-config add exchange topic <busname>
client side: do a RPC call to the <busname>.GetProjectDetails with a
comma seperated string of mom2object id's as argument.
You get a dict of mom2id to project-details-dict back.
with RPC(busname, 'GetProjectDetails') as getProjectDetails:
res, status = getProjectDetails(ids_string)
'''
import logging
from optparse import OptionParser
from lofar.messaging import Service
from lofar.messaging import setQpidLogLevel
from lofar.messaging.Service import MessageHandlerInterface
from lofar.common.util import waitForInterrupt
from lofar.sas.resourceassignment.database import radb
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
from lofar.common import dbcredentials
from lofar.common.util import convertIntKeysToString
logger = logging.getLogger(__name__)
class RADBHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(RADBHandler, self).__init__(**kwargs)
self.dbcreds = kwargs.pop("dbcreds", None)
self.log_queries = kwargs.pop("log_queries", False)
self.service2MethodMap = {
'GetResourceClaimStatuses': self._getResourceClaimStatuses,
'GetResourceClaimProperties': self._getResourceClaimProperties,
'InsertResourceClaimProperty': self._insertResourceClaimProperty,
'GetResourceClaimPropertyTypes': self._getResourceClaimPropertyTypes,
'GetResourceClaims': self._getResourceClaims,
'GetResourceClaim': self._getResourceClaim,
'InsertResourceClaims': self._insertResourceClaims,
'InsertResourceClaim': self._insertResourceClaim,
'DeleteResourceClaim': self._deleteResourceClaim,
'UpdateResourceClaim': self._updateResourceClaim,
'UpdateTaskAndResourceClaims': self._updateTaskAndResourceClaims,
'GetResourceUsages': self._getResourceUsages,
'GetResourceGroupTypes': self._getResourceGroupTypes,
'GetResourceGroups': self._getResourceGroups,
'GetResourceGroupMemberships': self._getResourceGroupMemberships,
'GetResourceTypes': self._getResourceTypes,
'GetResources': self._getResources,
'UpdateResourceAvailability': self._updateResourceAvailability,
'GetTasksTimeWindow': self._getTasksTimeWindow,
'GetTasks': self._getTasks,
'GetTask': self._getTask,
'InsertTask': self._insertTask,
'DeleteTask': self._deleteTask,
'UpdateTask': self._updateTask,
'UpdateTaskStatusForOtdbId': self._updateTaskStatusForOtdbId,
'GetTaskPredecessorIds': self._getTaskPredecessorIds,
'GetTaskSuccessorIds': self._getTaskSuccessorIds,
'InsertTaskPredecessor': self._insertTaskPredecessor,
'InsertTaskPredecessors': self._insertTaskPredecessors,
'GetTaskStatuses': self._getTaskStatuses,
'GetTaskTypes': self._getTaskTypes,
'GetSpecifications': self._getSpecifications,
'GetSpecification': self._getSpecification,
'InsertSpecificationAndTask': self._insertSpecificationAndTask,
'InsertSpecification': self._insertSpecification,
'DeleteSpecification': self._deleteSpecification,
'UpdateSpecification': self._updateSpecification,
'GetUnits': self._getUnits}
def prepare_loop(self):
self.radb = radb.RADatabase(dbcreds=self.dbcreds, log_queries=self.log_queries)
def _getTaskStatuses(self):
return self.radb.getTaskStatuses()
def _getTaskTypes(self):
return self.radb.getTaskTypes()
def _getResourceClaimStatuses(self):
return self.radb.getResourceClaimStatuses()
def _getResourceClaimPropertyTypes(self):
return self.radb.getResourceClaimPropertyTypes()
def _getResourceClaimProperties(self, **kwargs):
return self.radb.getResourceClaimProperties(claim_ids=kwargs.get('claim_ids'), task_id=kwargs.get('task_id'))
def _insertResourceClaimProperty(self, **kwargs):
id = self.radb.insertResourceClaimProperty(kwargs.get('claim_id'), kwargs.get('property_type'), kwargs.get('value'))
return {'id':id}
def _getResourceClaims(self, **kwargs):
return self.radb.getResourceClaims(claim_ids=kwargs.get('claim_ids'),
lower_bound=kwargs.get('lower_bound').datetime() if kwargs.get('lower_bound') else None,
upper_bound=kwargs.get('upper_bound').datetime() if kwargs.get('upper_bound') else None,
resource_ids=kwargs.get('resource_ids'),
task_ids=kwargs.get('task_ids'),
status=kwargs.get('status'),
resource_type=kwargs.get('resource_type'),
extended=kwargs.get('extended', False),
include_properties=kwargs.get('include_properties'))
def _getResourceClaim(self, **kwargs):
claim = self.radb.getResourceClaim(kwargs['id'])
return claim
def _insertResourceClaims(self, **kwargs):
logger.info('InsertResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
claims = kwargs['claims']
for claim in claims:
claim['starttime'] = claim['starttime'].datetime()
claim['endtime'] = claim['endtime'].datetime()
ids = self.radb.insertResourceClaims(kwargs['task_id'],
claims,
kwargs['session_id'],
kwargs['username'],
kwargs['user_id'])
return {'ids':ids}
def _insertResourceClaim(self, **kwargs):
logger.info('InsertResourceClaim: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = self.radb.insertResourceClaim(kwargs['resource_id'],
kwargs['task_id'],
kwargs['starttime'].datetime(),
kwargs['endtime'].datetime(),
kwargs.get('status_id', kwargs.get('status')),
kwargs['session_id'],
kwargs['claim_size'],
kwargs['username'],
kwargs['user_id'],
kwargs.get('properties'))
return {'id':id}
def _deleteResourceClaim(self, **kwargs):
logger.info('DeleteResourceClaim: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = kwargs['id']
deleted = self.radb.deleteResourceClaim(id)
return {'id': id, 'deleted': deleted}
def _updateResourceClaim(self, **kwargs):
logger.info('UpdateResourceClaim: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = kwargs['id']
updated = self.radb.updateResourceClaim(id,
resource_id=kwargs.get('resource_id'),
task_id=kwargs.get('task_id'),
starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None,
endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None,
status=kwargs.get('status_id', kwargs.get('status')),
session_id=kwargs.get('session_id'),
claim_size=kwargs.get('claim_size'),
username=kwargs.get('username'),
user_id=kwargs.get('user_id'))
return {'id': id, 'updated': updated}
def _updateTaskAndResourceClaims(self, **kwargs):
logger.info('UpdateTaskAndResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
task_id = kwargs['task_id']
updated = self.radb.updateTaskAndResourceClaims(task_id,
starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None,
endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None,
task_status=kwargs.get('task_status_id', kwargs.get('task_status')),
claim_status=kwargs.get('claim_status_id', kwargs.get('claim_status')),
session_id=kwargs.get('session_id'),
username=kwargs.get('username'),
user_id=kwargs.get('user_id'))
return {'task_id': task_id, 'updated': updated}
def _getResourceUsages(self, **kwargs):
usages = self.radb.getResourceUsages(claim_ids=kwargs.get('claim_ids'),
lower_bound=kwargs.get('lower_bound').datetime() if kwargs.get('lower_bound') else None,
upper_bound=kwargs.get('upper_bound').datetime() if kwargs.get('upper_bound') else None,
resource_ids=kwargs.get('resource_ids'),
task_ids=kwargs.get('task_ids'),
status=kwargs.get('status'),
resource_type=kwargs.get('resource_type'))
return usages
def _getResourceGroupTypes(self):
return self.radb.getResourceGroupTypes()
def _getResourceGroups(self):
return self.radb.getResourceGroups()
def _getResourceGroupMemberships(self):
rg_memberships = self.radb.getResourceGroupMemberships()
rg_memberships = convertIntKeysToString(rg_memberships)
return rg_memberships
def _getResourceTypes(self):
return self.radb.getResourceTypes()
def _getResources(self, **kwargs):
return self.radb.getResources(resource_ids=kwargs.get('resource_ids'),
resource_types=kwargs.get('resource_types'),
include_availability=kwargs.get('include_availability', False))
def _updateResourceAvailability(self, **kwargs):
return self.radb.updateResourceAvailability(resource_id=kwargs['resource_id'],
active=kwargs.get('active'),
available_capacity=kwargs.get('available_capacity'),
total_capacity=kwargs.get('total_capacity'))
def _getTasksTimeWindow(self, **kwargs):
logger.info('GetTasksTimeWindow: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
return self.radb.getTasksTimeWindow(task_ids=kwargs.get('task_ids'),
mom_ids=kwargs.get('mom_ids'),
otdb_ids=kwargs.get('otdb_ids'))
def _getTasks(self, **kwargs):
logger.info('GetTasks: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
return self.radb.getTasks(lower_bound=kwargs.get('lower_bound').datetime() if kwargs.get('lower_bound') else None,
upper_bound=kwargs.get('upper_bound').datetime() if kwargs.get('upper_bound') else None,
task_ids=kwargs.get('task_ids'),
task_status=kwargs.get('task_status'),
task_type=kwargs.get('task_type'),
mom_ids=kwargs.get('mom_ids'),
otdb_ids=kwargs.get('otdb_ids'),
cluster=kwargs.get('cluster'))
def _getTask(self, **kwargs):
logger.info('GetTask: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
task = self.radb.getTask(id=kwargs.get('id'), mom_id=kwargs.get('mom_id'), otdb_id=kwargs.get('otdb_id'), specification_id=kwargs.get('specification_id'))
return task
def _insertTask(self, **kwargs):
logger.info('InsertTask: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
task_id = self.radb.insertTask(kwargs['mom_id'],
kwargs['otdb_id'],
kwargs.get('status_id', kwargs.get('status', 'prepared')),
kwargs.get('type_id', kwargs.get('type')),
kwargs['specification_id'])
return {'id':task_id }
def _deleteTask(self, **kwargs):
logger.info('DeleteTask: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = kwargs['id']
deleted = self.radb.deleteTask(id)
return {'id': id, 'deleted': deleted}
def _updateTaskStatusForOtdbId(self, **kwargs):
logger.info('UpdateTaskStatusForOtdbId: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
otdb_id=kwargs.get('otdb_id')
updated = self.radb.updateTaskStatusForOtdbId(otdb_id=otdb_id,
task_status=kwargs.get('status_id', kwargs.get('status')))
return {'otdb_id': otdb_id, 'updated': updated}
def _updateTask(self, **kwargs):
logger.info('UpdateTask: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = kwargs['id']
updated = self.radb.updateTask(id,
mom_id=kwargs.get('mom_id'),
otdb_id=kwargs.get('otdb_id'),
task_status=kwargs.get('status_id', kwargs.get('status', 'prepared')),
task_type=kwargs.get('type_id', kwargs.get('type')),
specification_id=kwargs.get('specification_id'))
return {'id': id, 'updated': updated}
def _getTaskPredecessorIds(self, **kwargs):
logger.info('GetTaskPredecessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
return convertIntKeysToString(self.radb.getTaskPredecessorIds(kwargs.get('id')))
def _getTaskSuccessorIds(self, **kwargs):
logger.info('GetTaskSuccessorIds: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
return convertIntKeysToString(self.radb.getTaskSuccessorIds(kwargs.get('id')))
def _insertTaskPredecessor(self, **kwargs):
id = self.radb.insertTaskPredecessor(kwargs['task_id'],
kwargs['predecessor_id'])
return {'id':id}
def _insertTaskPredecessors(self, **kwargs):
ids = self.radb.insertTaskPredecessors(kwargs['task_id'],
kwargs['predecessor_ids'])
return {'ids':ids}
def _getSpecifications(self):
return self.radb.getSpecifications()
def _getSpecification(self, **kwargs):
logger.info('GetSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
specification = self.radb.getSpecification(kwargs['id'])
return specification
def _insertSpecificationAndTask(self, **kwargs):
logger.info('InsertSpecificationAndTask: %s' % dict({k:v for k,v in kwargs.items() if v != None and k != 'content'}))
return self.radb.insertSpecificationAndTask(kwargs['mom_id'],
kwargs['otdb_id'],
kwargs['task_status'],
kwargs['task_type'],
kwargs.get('starttime').datetime() if kwargs.get('starttime') else None,
kwargs.get('endtime').datetime() if kwargs.get('endtime') else None,
kwargs['content'],
kwargs['cluster'])
def _insertSpecification(self, **kwargs):
logger.info('InsertSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
specification_id = self.radb.insertSpecification(kwargs.get('starttime').datetime() if kwargs.get('starttime') else None,
kwargs.get('endtime').datetime() if kwargs.get('endtime') else None,
kwargs['content'],
kwargs['cluster'])
return {'id':specification_id}
def _deleteSpecification(self, **kwargs):
logger.info('DeleteSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = kwargs['id']
deleted = self.radb.deleteSpecification(id)
return {'id': id, 'deleted': deleted}
def _updateSpecification(self, **kwargs):
logger.info('UpdateSpecification: %s' % dict({k:v for k,v in kwargs.items() if v != None}))
id = kwargs['id']
updated = self.radb.updateSpecification(id,
starttime=kwargs['starttime'].datetime() if 'starttime' in kwargs else None,
endtime=kwargs['endtime'].datetime() if 'endtime' in kwargs else None,
content=kwargs.get('content'),
cluster=kwargs.get('cluster'))
return {'id': id, 'updated': updated}
def _getUnits(self):
return self.radb.getUnits()
def createService(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None, dbcreds=None, log_queries=False, verbose=False):
return Service(servicename,
RADBHandler,
busname=busname,
broker=broker,
use_service_methods=True,
numthreads=4,
handler_args={'dbcreds': dbcreds, 'log_queries': log_queries},
verbose=verbose)
def main():
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description='runs the resourceassignment database service')
parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost')
parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME)
parser.add_option("-s", "--servicename", dest="servicename", type="string", default=DEFAULT_SERVICENAME, help="Name for this service, default: %s" % DEFAULT_SERVICENAME)
parser.add_option('-Q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option_group(dbcredentials.options_group(parser))
parser.set_defaults(dbcredentials="RADB")
(options, args) = parser.parse_args()
dbcreds = dbcredentials.parse_options(options)
setQpidLogLevel(logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword())
with createService(busname=options.busname,
servicename=options.servicename,
broker=options.broker,
verbose=options.verbose,
log_queries=options.log_queries,
dbcreds=dbcreds):
waitForInterrupt()
if __name__ == '__main__':
main()