diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py index 5dca8f18ed03bef335cdf0ab6707026cf25a997d..d6031bc9bd8e0ee438e89c2b4967a6900d2c3662 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py @@ -19,9 +19,7 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. import logging -import datetime -from lofar.messaging.RPC import RPC, RPCException, RPCWrapper -from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME +from lofar.messaging import RPC, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME from lofar.common.util import convertStringDigitKeysToInt @@ -37,30 +35,30 @@ class RARPCException(Exception): def __str__(self): return "RARPCException: " + str(self.message) -class RARPC(RPCWrapper): - def __init__(self, busname=DEFAULT_BUSNAME, - broker=DEFAULT_BROKER, - timeout=120): - super(RARPC, self).__init__(busname, DEFAULT_SERVICENAME, broker, timeout=timeout) +class RARPC(RPC): + def __init__(self, exchange: str = DEFAULT_BUSNAME, + broker: str = DEFAULT_BROKER, + timeout: int = 120): + super().__init__(DEFAULT_SERVICENAME, exchange, broker, timeout) def getResourceClaimStatuses(self): - return self.rpc('GetResourceClaimStatuses') + return self.execute('GetResourceClaimStatuses') def getResourceClaimPropertyTypes(self): - return self.rpc('GetResourceClaimPropertyTypes') + return self.execute('GetResourceClaimPropertyTypes') def getResourceClaimProperties(self, claim_ids=None, task_id=None): - return self.rpc('GetResourceClaimProperties', claim_ids=claim_ids, task_id=task_id) + return self.execute('GetResourceClaimProperties', claim_ids=claim_ids, task_id=task_id) def insertResourceClaimProperty(self, claim_id, property_type, value, io_type): - return self.rpc('InsertResourceClaimProperty', claim_id=claim_id, + return self.execute('InsertResourceClaimProperty', claim_id=claim_id, property_type=property_type, value=value, io_type=io_type) def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None, extended=False, include_properties=False): - return self.rpc('GetResourceClaims', claim_ids=claim_ids, + return self.execute('GetResourceClaims', claim_ids=claim_ids, lower_bound=lower_bound, upper_bound=upper_bound, resource_ids=resource_ids, @@ -71,11 +69,11 @@ class RARPC(RPCWrapper): include_properties=include_properties) def getResourceClaim(self, id): - return self.rpc('GetResourceClaim', id=id) + return self.execute('GetResourceClaim', id=id) def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, claim_size, username, user_id, used_rcus=None, properties=None): - return self.rpc('InsertResourceClaim', resource_id=resource_id, + return self.execute('InsertResourceClaim', resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, @@ -87,16 +85,16 @@ class RARPC(RPCWrapper): properties=properties) def insertResourceClaims(self, task_id, claims, username, user_id): - return self.rpc('InsertResourceClaims', task_id=task_id, + return self.execute('InsertResourceClaims', task_id=task_id, claims=claims, username=username, user_id=user_id) def deleteResourceClaim(self, id): - return self.rpc('DeleteResourceClaim', id=id) + return self.execute('DeleteResourceClaim', id=id) def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None): - return self.rpc('UpdateResourceClaim', id=id, + return self.execute('UpdateResourceClaim', id=id, resource_id=resource_id, task_id=task_id, starttime=starttime, @@ -111,7 +109,7 @@ class RARPC(RPCWrapper): resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): - return self.rpc('UpdateResourceClaims', where_resource_claim_ids=where_resource_claim_ids, + return self.execute('UpdateResourceClaims', where_resource_claim_ids=where_resource_claim_ids, where_task_ids=where_task_ids, where_resource_types=where_resource_types, resource_id=resource_id, @@ -125,7 +123,7 @@ class RARPC(RPCWrapper): user_id=user_id) def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None): - return self.rpc('UpdateTaskAndResourceClaims', task_id=task_id, + return self.execute('UpdateTaskAndResourceClaims', task_id=task_id, starttime=starttime, endtime=endtime, task_status=task_status, @@ -137,7 +135,7 @@ class RARPC(RPCWrapper): def getResourceUsages(self, lower_bound=None, upper_bound=None, resource_ids=None, status=None): - all_usages = self.rpc('GetResourceUsages', + all_usages = self.execute('GetResourceUsages', lower_bound=lower_bound, upper_bound=upper_bound, resource_ids=resource_ids, @@ -148,29 +146,29 @@ class RARPC(RPCWrapper): return all_usages def getResourceGroupTypes(self): - return self.rpc('GetResourceGroupTypes') + return self.execute('GetResourceGroupTypes') def getResourceGroups(self): - return self.rpc('GetResourceGroups') + return self.execute('GetResourceGroups') def getResourceGroupNames(self, resourceGroupTypeName): - return self.rpc('GetResourceGroupNames', + return self.execute('GetResourceGroupNames', resourceGroupTypeName=resourceGroupTypeName) def getResourceGroupMemberships(self): - rg_memberships = self.rpc('GetResourceGroupMemberships') + rg_memberships = self.execute('GetResourceGroupMemberships') rg_memberships = convertStringDigitKeysToInt(rg_memberships) return rg_memberships def getResourceTypes(self): - return self.rpc('GetResourceTypes') + return self.execute('GetResourceTypes') def getResources(self, resource_ids=None, resource_types=None, include_availability=False): - return self.rpc('GetResources', resource_ids=resource_ids, resource_types=resource_types, include_availability=include_availability) + return self.execute('GetResources', resource_ids=resource_ids, resource_types=resource_types, include_availability=include_availability) # instantiate this object and call this function to update DRAGNET active (config file), avail_cap (df(1) and config file override) and total_cap (config file) values def updateResourceAvailability(self, resource_id, active=None, available_capacity=None, total_capacity=None): - return self.rpc('UpdateResourceAvailability', + return self.execute('UpdateResourceAvailability', resource_id=resource_id, active=active, available_capacity=available_capacity, @@ -178,20 +176,20 @@ class RARPC(RPCWrapper): def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None): '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id''' - return self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id, specification_id=specification_id) + return self.execute('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id, specification_id=specification_id) def insertTask(self, mom_id, otdb_id, task_status, task_type, specification_id): - return self.rpc('InsertTask', mom_id=mom_id, + return self.execute('InsertTask', mom_id=mom_id, otdb_id=otdb_id, task_status=task_status, task_type=task_type, specification_id=specification_id) def deleteTask(self, id): - return self.rpc('DeleteTask', id=id) + return self.execute('DeleteTask', id=id) def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None): - return self.rpc('UpdateTask', + return self.execute('UpdateTask', id=task_id, mom_id=mom_id, otdb_id=otdb_id, @@ -200,12 +198,12 @@ class RARPC(RPCWrapper): specification_id=specification_id) def updateTaskStatusForOtdbId(self, otdb_id, task_status): - return self.rpc('UpdateTaskStatusForOtdbId', + return self.execute('UpdateTaskStatusForOtdbId', otdb_id=otdb_id, task_status=task_status) def getTasksTimeWindow(self, task_ids=None, mom_ids=None, otdb_ids=None): - return self.rpc('GetTasksTimeWindow', task_ids=task_ids, mom_ids=mom_ids, otdb_ids=otdb_ids) + return self.execute('GetTasksTimeWindow', task_ids=task_ids, mom_ids=mom_ids, otdb_ids=otdb_ids) def getTasks(self, lower_bound=None, upper_bound=None, task_ids=None, task_status=None, task_type=None, mom_ids=None, otdb_ids=None, cluster=None): '''getTasks let's you query tasks from the radb with many optional filters. @@ -218,31 +216,31 @@ class RARPC(RPCWrapper): :param otdb_ids: int/list/tuple specifies one or more otdb_ids to select :param cluster: string specifies the cluster to select ''' - return self.rpc('GetTasks', lower_bound=lower_bound, upper_bound=upper_bound, task_ids=task_ids, task_status=task_status, task_type=task_type, mom_ids=mom_ids, otdb_ids=otdb_ids, cluster=cluster) + return self.execute('GetTasks', lower_bound=lower_bound, upper_bound=upper_bound, task_ids=task_ids, task_status=task_status, task_type=task_type, mom_ids=mom_ids, otdb_ids=otdb_ids, cluster=cluster) def getTaskPredecessorIds(self, id=None): - return self.rpc('GetTaskPredecessorIds', id=id) + return self.execute('GetTaskPredecessorIds', id=id) def getTaskSuccessorIds(self, **kwargs): - return self.rpc('GetTaskSuccessorIds', id=id) + return self.execute('GetTaskSuccessorIds', id=id) def insertTaskPredecessor(self, task_id, predecessor_id): - return self.rpc('InsertTaskPredecessor', task_id=task_id, predecessor_id=predecessor_id) + return self.execute('InsertTaskPredecessor', task_id=task_id, predecessor_id=predecessor_id) def insertTaskPredecessors(self, task_id, predecessor_ids): - return self.rpc('InsertTaskPredecessors', task_id=task_id, predecessor_ids=predecessor_ids) + return self.execute('InsertTaskPredecessors', task_id=task_id, predecessor_ids=predecessor_ids) def getTaskTypes(self): - return self.rpc('GetTaskTypes') + return self.execute('GetTaskTypes') def getTaskStatuses(self): - return self.rpc('GetTaskStatuses') + return self.execute('GetTaskStatuses') def getSpecification(self, id): - return self.rpc('GetSpecification', id=id) + return self.execute('GetSpecification', id=id) def insertSpecificationAndTask(self, mom_id, otdb_id, task_status, task_type, starttime, endtime, content, cluster): - return self.rpc('InsertSpecificationAndTask', + return self.execute('InsertSpecificationAndTask', mom_id=mom_id, otdb_id=otdb_id, task_status=task_status, @@ -253,17 +251,17 @@ class RARPC(RPCWrapper): cluster=cluster) def insertSpecification(self, starttime, endtime, content, cluster): - return self.rpc('InsertSpecification', + return self.execute('InsertSpecification', starttime=starttime, endtime=endtime, content=content, cluster=cluster) def deleteSpecification(self, id): - return self.rpc('DeleteSpecification', id=id) + return self.execute('DeleteSpecification', id=id) def updateSpecification(self, id, starttime=None, endtime=None, content=None, cluster=None): - return self.rpc('UpdateSpecification', + return self.execute('UpdateSpecification', id=id, starttime=starttime, endtime=endtime, @@ -271,27 +269,27 @@ class RARPC(RPCWrapper): cluster=cluster) def getSpecifications(self): - return self.rpc('GetSpecifications') + return self.execute('GetSpecifications') def getUnits(self): - return self.rpc('GetUnits') + return self.execute('GetUnits') def getResourceAllocationConfig(self, sql_like_name_pattern=None): - return self.rpc('GetResourceAllocationConfig', + return self.execute('GetResourceAllocationConfig', sql_like_name_pattern=sql_like_name_pattern) def get_conflicting_overlapping_claims(self, claim_id): '''returns a list of claimed claims which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)''' - return self.rpc('get_overlapping_claims', + return self.execute('get_overlapping_claims', claim_id=claim_id) def get_conflicting_overlapping_tasks(self, claim_id): '''returns a list of tasks which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)''' - return self.rpc('get_overlapping_tasks', + return self.execute('get_overlapping_tasks', claim_id=claim_id) def get_max_resource_usage_between(self, resource_id, lower_bound, upper_bound, claim_status='claimed'): - return self.rpc('get_max_resource_usage_between', + return self.execute('get_max_resource_usage_between', resource_id=resource_id, lower_bound=lower_bound, upper_bound=upper_bound, @@ -300,21 +298,17 @@ class RARPC(RPCWrapper): def get_resource_claimable_capacity(self, resource_id, lower_bound, upper_bound): '''get the claimable capacity for the given resource within the timewindow given by lower_bound and upper_bound. this is the resource's available capacity (total-used) minus the maximum allocated usage in that timewindow.''' - return self.rpc('get_resource_claimable_capacity', resource_id=resource_id, + return self.execute('get_resource_claimable_capacity', resource_id=resource_id, lower_bound=lower_bound, upper_bound=upper_bound).get('resource_claimable_capacity') -def do_tests(busname=DEFAULT_BUSNAME): - with RARPC(busname=busname) as rpc: - #for i in range(0, 10): - #taskId = rpc.insertTask(1234, 5678, 'active', 'OBSERVATION', 1)['id'] - #rcId = rpc.insertResourceClaim(1, taskId, datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), 'TENTATIVE', 1, 10, 'einstein', -1)['id'] - #print rpc.getResourceClaim(rcId) - #rpc.updateResourceClaim(rcId, starttime=datetime.datetime.utcnow(), endtime=datetime.datetime.utcnow() + datetime.timedelta(hours=2), status='CLAIMED') - #print rpc.getResourceClaim(rcId) - #print - - #tasks = rpc.getTasks() +def do_tests(exchange=DEFAULT_BUSNAME): + from datetime import datetime, timedelta + with RARPC(exchange=exchange) as rpc: + tasks = rpc.getTasks(lower_bound=datetime.utcnow()-timedelta(days=1)) + print(tasks) + return + #for t in tasks: #print rpc.getTask(t['id']) #for i in range(4,9): @@ -337,14 +331,6 @@ def do_tests(busname=DEFAULT_BUSNAME): #print rpc.getResourceGroupNames('cluster') #print rpc.getResourceGroupMemberships() - for rc in rpc.getResourceClaims(): - print(rc) - rpc.insertResourceClaimProperty(rc['id'], 'nr_of_CS_files', 42) - print(rpc.getResourceClaimProperties(rc['id'])) - - print() - print(rpc.getResourceClaimProperties(task_id=493)) - #rpc.deleteTask(taskId) #print rpc.getTasks() diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py index 3db2d82f1351468c77a851cad182890b0c38f511..130e6f7867908e84fa1759af26367c10102b1130 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py @@ -20,9 +20,7 @@ with RPC(busname, 'GetObjectDetails') as getObjectDetails: ''' import logging from optparse import OptionParser -from lofar.messaging import Service, DEFAULT_BROKER, DEFAULT_BUSNAME -from lofar.messaging import setQpidLogLevel -from lofar.messaging.Service import MessageHandlerInterface +from lofar.messaging import Service, ServiceMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME from lofar.common.util import waitForInterrupt from lofar.sas.resourceassignment.database import radb from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME @@ -31,65 +29,63 @@ from lofar.common.util import convertIntKeysToString logger = logging.getLogger(__name__) -class RADBHandler(MessageHandlerInterface): - def __init__(self, **kwargs): - super(RADBHandler, self).__init__(**kwargs) - self.dbcreds = kwargs.pop("dbcreds", None) - self.log_queries = kwargs.pop("log_queries", False) - - self.service2MethodMap = { - 'GetResourceClaimStatuses': self._getResourceClaimStatuses, - 'GetResourceClaimProperties': self._getResourceClaimProperties, - 'InsertResourceClaimProperty': self._insertResourceClaimProperty, - 'GetResourceClaimPropertyTypes': self._getResourceClaimPropertyTypes, - 'GetRcuSpecifications': self._getRcuSpecifications, - 'GetRcuSpecification': self._getRcuSpecification, - 'InsertRcuSpecifications': self._insertRcuSpecifications, - 'InsertRcuSpecification': self._insertRcuSpecification, - 'GetResourceClaims': self._getResourceClaims, - 'GetResourceClaim': self._getResourceClaim, - 'InsertResourceClaims': self._insertResourceClaims, - 'InsertResourceClaim': self._insertResourceClaim, - 'DeleteResourceClaim': self._deleteResourceClaim, - 'UpdateResourceClaim': self._updateResourceClaim, - 'UpdateResourceClaims': self._updateResourceClaims, - 'UpdateTaskAndResourceClaims': self._updateTaskAndResourceClaims, - 'GetResourceUsages': self._getResourceUsages, - 'GetResourceGroupTypes': self._getResourceGroupTypes, - 'GetResourceGroups': self._getResourceGroups, - 'GetResourceGroupNames': self._getResourceGroupNames, - 'GetResourceGroupMemberships': self._getResourceGroupMemberships, - 'GetResourceTypes': self._getResourceTypes, - 'GetResources': self._getResources, - 'UpdateResourceAvailability': self._updateResourceAvailability, - 'GetTasksTimeWindow': self._getTasksTimeWindow, - 'GetTasks': self._getTasks, - 'GetTask': self._getTask, - 'InsertTask': self._insertTask, - 'DeleteTask': self._deleteTask, - 'UpdateTask': self._updateTask, - 'UpdateTaskStatusForOtdbId': self._updateTaskStatusForOtdbId, - 'GetTaskPredecessorIds': self._getTaskPredecessorIds, - 'GetTaskSuccessorIds': self._getTaskSuccessorIds, - 'InsertTaskPredecessor': self._insertTaskPredecessor, - 'InsertTaskPredecessors': self._insertTaskPredecessors, - 'GetTaskStatuses': self._getTaskStatuses, - 'GetTaskTypes': self._getTaskTypes, - 'GetSpecifications': self._getSpecifications, - 'GetSpecification': self._getSpecification, - 'InsertSpecificationAndTask': self._insertSpecificationAndTask, - 'InsertSpecification': self._insertSpecification, - 'DeleteSpecification': self._deleteSpecification, - 'UpdateSpecification': self._updateSpecification, - 'GetUnits': self._getUnits, - 'GetResourceAllocationConfig': self._getResourceAllocationConfig, - 'get_overlapping_claims': self._get_overlapping_claims, - 'get_overlapping_tasks': self._get_overlapping_tasks, - 'get_max_resource_usage_between': self._get_max_resource_usage_between, - 'get_resource_claimable_capacity': self._get_resource_claimable_capacity } - - def prepare_loop(self): - self.radb = radb.RADatabase(dbcreds=self.dbcreds, log_queries=self.log_queries) +class RADBServiceMessageHandler(ServiceMessageHandler): + def __init__(self, dbcreds: dbcredentials.Credentials, log_queries: bool=False): + super().__init__() + self.radb = radb.RADatabase(dbcreds=dbcreds, log_queries=log_queries) + + def init(self, service_name: str, exchange: str, broker: str): + super().init(service_name, exchange, broker) + + self.register_service_method('GetResourceClaimStatuses', self._getResourceClaimStatuses) + self.register_service_method('GetResourceClaimProperties', self._getResourceClaimProperties) + self.register_service_method('InsertResourceClaimProperty', self._insertResourceClaimProperty) + self.register_service_method('GetResourceClaimPropertyTypes', self._getResourceClaimPropertyTypes) + self.register_service_method('GetRcuSpecifications', self._getRcuSpecifications) + self.register_service_method('GetRcuSpecification', self._getRcuSpecification) + self.register_service_method('InsertRcuSpecifications', self._insertRcuSpecifications) + self.register_service_method('InsertRcuSpecification', self._insertRcuSpecification) + self.register_service_method('GetResourceClaims', self._getResourceClaims) + self.register_service_method('GetResourceClaim', self._getResourceClaim) + self.register_service_method('InsertResourceClaims', self._insertResourceClaims) + self.register_service_method('InsertResourceClaim', self._insertResourceClaim) + self.register_service_method('DeleteResourceClaim', self._deleteResourceClaim) + self.register_service_method('UpdateResourceClaim', self._updateResourceClaim) + self.register_service_method('UpdateResourceClaims', self._updateResourceClaims) + self.register_service_method('UpdateTaskAndResourceClaims', self._updateTaskAndResourceClaims) + self.register_service_method('GetResourceUsages', self._getResourceUsages) + self.register_service_method('GetResourceGroupTypes', self._getResourceGroupTypes) + self.register_service_method('GetResourceGroups', self._getResourceGroups) + self.register_service_method('GetResourceGroupNames', self._getResourceGroupNames) + self.register_service_method('GetResourceGroupMemberships', self._getResourceGroupMemberships) + self.register_service_method('GetResourceTypes', self._getResourceTypes) + self.register_service_method('GetResources', self._getResources) + self.register_service_method('UpdateResourceAvailability', self._updateResourceAvailability) + self.register_service_method('GetTasksTimeWindow', self._getTasksTimeWindow) + self.register_service_method('GetTasks', self._getTasks) + self.register_service_method('GetTask', self._getTask) + self.register_service_method('InsertTask', self._insertTask) + self.register_service_method('DeleteTask', self._deleteTask) + self.register_service_method('UpdateTask', self._updateTask) + self.register_service_method('UpdateTaskStatusForOtdbId', self._updateTaskStatusForOtdbId) + self.register_service_method('GetTaskPredecessorIds', self._getTaskPredecessorIds) + self.register_service_method('GetTaskSuccessorIds', self._getTaskSuccessorIds) + self.register_service_method('InsertTaskPredecessor', self._insertTaskPredecessor) + self.register_service_method('InsertTaskPredecessors', self._insertTaskPredecessors) + self.register_service_method('GetTaskStatuses', self._getTaskStatuses) + self.register_service_method('GetTaskTypes', self._getTaskTypes) + self.register_service_method('GetSpecifications', self._getSpecifications) + self.register_service_method('GetSpecification', self._getSpecification) + self.register_service_method('InsertSpecificationAndTask', self._insertSpecificationAndTask) + self.register_service_method('InsertSpecification', self._insertSpecification) + self.register_service_method('DeleteSpecification', self._deleteSpecification) + self.register_service_method('UpdateSpecification', self._updateSpecification) + self.register_service_method('GetUnits', self._getUnits) + self.register_service_method('GetResourceAllocationConfig', self._getResourceAllocationConfig) + self.register_service_method('get_overlapping_claims', self._get_overlapping_claims) + self.register_service_method('get_overlapping_tasks', self._get_overlapping_tasks) + self.register_service_method('get_max_resource_usage_between', self._get_max_resource_usage_between) + self.register_service_method('get_resource_claimable_capacity', self._get_resource_claimable_capacity) def _getTaskStatuses(self): return self.radb.getTaskStatuses() @@ -404,15 +400,13 @@ class RADBHandler(MessageHandlerInterface): return { 'resource_claimable_capacity': resource_claimable_capacity} -def createService(busname=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, dbcreds=None, log_queries=False, verbose=False): +def createService(exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER, dbcreds=None, log_queries=False): return Service(DEFAULT_SERVICENAME, - RADBHandler, - busname=busname, + RADBServiceMessageHandler, + handler_kwargs={'dbcreds': dbcreds, 'log_queries': log_queries}, + exchange=exchange, broker=broker, - use_service_methods=True, - numthreads=1, - handler_args={'dbcreds': dbcreds, 'log_queries': log_queries}, - verbose=verbose) + num_threads=4) def main(): # make sure we run in UTC timezone @@ -422,9 +416,9 @@ def main(): # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the resourceassignment database service') - parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') - parser.add_option("-b", "--busname", dest="busname", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the qpid broker, default: %s" % DEFAULT_BUSNAME) - parser.add_option('-Q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries') + parser.add_option('-b', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') + parser.add_option("-e", "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="Name of the bus exchange on the broker, default: %s" % DEFAULT_BUSNAME) + parser.add_option('-q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries') parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="RADB") @@ -432,15 +426,13 @@ def main(): dbcreds = dbcredentials.parse_options(options) - setQpidLogLevel(logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) - with createService(busname=options.busname, + with createService(exchange=options.exchange, broker=options.broker, - verbose=options.verbose, log_queries=options.log_queries, dbcreds=dbcreds): waitForInterrupt() diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py index 7ee5006e14bb988238e5f73796211822046716ef..cf11c62796597b2104018648e51cfa850bb7bd7a 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/test/test_ra_service_and_rpc.py @@ -1,22 +1,18 @@ #!/usr/bin/env python3 import unittest -import uuid import datetime import logging -from lofar.messaging import Service, TemporaryQueue +from lofar.messaging import TemporaryExchange from lofar.sas.resourceassignment.resourceassignmentservice.service import createService -from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC, RARPCException -import threading +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from unittest.mock import patch -with TemporaryQueue(__name__) as tmp_queue: +with TemporaryExchange(__name__) as tmp_exchange: logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logger = logging.getLogger(__name__) - busname = tmp_queue.address - # the system under test is the service and the rpc, not the RADatabase # so, patch (mock) the RADatabase class during these tests. # when the service instantiates an RADatabase it will get the mocked class. @@ -41,34 +37,33 @@ with TemporaryQueue(__name__) as tmp_queue: def test(self): '''basic test ''' - rpc = RARPC(busname=busname) - self.assertEqual(mock.getTaskStatuses.return_value, rpc.getTaskStatuses()) - self.assertEqual(mock.getTaskTypes.return_value, rpc.getTaskTypes()) - self.assertEqual(mock.getResourceClaimStatuses.return_value, rpc.getResourceClaimStatuses()) - self.assertEqual(mock.getUnits.return_value, rpc.getUnits()) - self.assertEqual(mock.getResourceTypes.return_value, rpc.getResourceTypes()) - self.assertEqual(mock.getResourceGroupTypes.return_value, rpc.getResourceGroupTypes()) - self.assertEqual(mock.getResources.return_value, rpc.getResources()) - self.assertEqual(mock.getResourceGroups.return_value, rpc.getResourceGroups()) - self.assertEqual(mock.getTasks.return_value, rpc.getTasks()) - self.assertEqual(mock.getResourceClaims.return_value, rpc.getResourceClaims()) + with RARPC(exchange=tmp_exchange.address, timeout=1) as rpc: + self.assertEqual(mock.getTaskStatuses.return_value, rpc.getTaskStatuses()) + self.assertEqual(mock.getTaskTypes.return_value, rpc.getTaskTypes()) + self.assertEqual(mock.getResourceClaimStatuses.return_value, rpc.getResourceClaimStatuses()) + self.assertEqual(mock.getUnits.return_value, rpc.getUnits()) + self.assertEqual(mock.getResourceTypes.return_value, rpc.getResourceTypes()) + self.assertEqual(mock.getResourceGroupTypes.return_value, rpc.getResourceGroupTypes()) + self.assertEqual(mock.getResources.return_value, rpc.getResources()) + self.assertEqual(mock.getResourceGroups.return_value, rpc.getResourceGroups()) + self.assertEqual(mock.getTasks.return_value, rpc.getTasks()) + self.assertEqual(mock.getResourceClaims.return_value, rpc.getResourceClaims()) - #TODO: fix this test - #self.assertEqual(None, rpc.getTask(1)) - #self.assertEqual(mock.getTask.return_value, rpc.getTask(5)) + #TODO: fix this test + #self.assertEqual(None, rpc.getTask(1)) + #self.assertEqual(mock.getTask.return_value, rpc.getTask(5)) - # test non existing service method, should timeout - with self.assertRaises(ValueError) as cm: - rpc.rpc('foo', timeout=1) - self.assertEqual(str(cm.exception), "{'backtrace': '', 'state': 'TIMEOUT', 'errmsg': 'RPC Timed out'}") + # test non existing service method, should raise + with self.assertRaises(Exception) as e: + rpc.execute('foo') - ## test method with wrong args - #with self.assertRaises(TypeError) as cm: - #rpc.rpc('GetTasks', timeout=1, fooarg='bar') - #self.assertTrue('got an unexpected keyword argument \'fooarg\'' in str(cm.exception)) + ## test method with wrong args + #with self.assertRaises(TypeError) as cm: + #rpc.rpc('GetTasks', timeout=1, fooarg='bar') + #self.assertTrue('got an unexpected keyword argument \'fooarg\'' in str(cm.exception)) # create and run the service - with createService(busname=busname): + with createService(exchange=tmp_exchange.address, log_queries=True): # and run all tests unittest.main()