#!/usr/bin/env python3 # Copyright (C) 2017 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it # and/or modify it under the terms of the GNU General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. import logging import datetime from lofar.messaging.RPC import RPC, RPCException, RPCWrapper from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME from lofar.common.util import convertStringDigitKeysToInt ''' Simple RPC client for Service lofarbus.*Z ''' logger = logging.getLogger(__name__) class RARPCException(Exception): def __init__(self, message): self.message = message def __str__(self): return "RARPCException: " + str(self.message) class RARPC(RPCWrapper): def __init__(self, busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME, broker=None, timeout=120): super(RARPC, self).__init__(busname, servicename, broker, timeout=timeout) def getResourceClaimStatuses(self): return self.rpc('GetResourceClaimStatuses') def getResourceClaimPropertyTypes(self): return self.rpc('GetResourceClaimPropertyTypes') def getResourceClaimProperties(self, claim_ids=None, task_id=None): return self.rpc('GetResourceClaimProperties', claim_ids=claim_ids, task_id=task_id) def insertResourceClaimProperty(self, claim_id, property_type, value, io_type): return self.rpc('InsertResourceClaimProperty', claim_id=claim_id, property_type=property_type, value=value, io_type=io_type) def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None, extended=False, include_properties=False): return self.rpc('GetResourceClaims', claim_ids=claim_ids, lower_bound=lower_bound, upper_bound=upper_bound, resource_ids=resource_ids, task_ids=task_ids, status=status, resource_type=resource_type, extended=extended, include_properties=include_properties) def getResourceClaim(self, id): return self.rpc('GetResourceClaim', id=id) def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, claim_size, username, user_id, used_rcus=None, properties=None): return self.rpc('InsertResourceClaim', resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id, properties=properties) def insertResourceClaims(self, task_id, claims, username, user_id): return self.rpc('InsertResourceClaims', task_id=task_id, claims=claims, username=username, user_id=user_id) def deleteResourceClaim(self, id): return self.rpc('DeleteResourceClaim', id=id) def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None): return self.rpc('UpdateResourceClaim', id=id, resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id) def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): return self.rpc('UpdateResourceClaims', where_resource_claim_ids=where_resource_claim_ids, where_task_ids=where_task_ids, where_resource_types=where_resource_types, resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id) def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None): return self.rpc('UpdateTaskAndResourceClaims', task_id=task_id, starttime=starttime, endtime=endtime, task_status=task_status, claim_status=claim_status, username=username, used_rcus=used_rcus, user_id=user_id, where_resource_types=where_resource_types) def getResourceUsages(self, lower_bound=None, upper_bound=None, resource_ids=None, status=None): all_usages = self.rpc('GetResourceUsages', lower_bound=lower_bound, upper_bound=upper_bound, resource_ids=resource_ids, status=status) all_usages = convertStringDigitKeysToInt(all_usages) return all_usages def getResourceGroupTypes(self): return self.rpc('GetResourceGroupTypes') def getResourceGroups(self): return self.rpc('GetResourceGroups') def getResourceGroupNames(self, resourceGroupTypeName): return self.rpc('GetResourceGroupNames', resourceGroupTypeName=resourceGroupTypeName) def getResourceGroupMemberships(self): rg_memberships = self.rpc('GetResourceGroupMemberships') rg_memberships = convertStringDigitKeysToInt(rg_memberships) return rg_memberships def getResourceTypes(self): return self.rpc('GetResourceTypes') def getResources(self, resource_ids=None, resource_types=None, include_availability=False): return self.rpc('GetResources', resource_ids=resource_ids, resource_types=resource_types, include_availability=include_availability) # instantiate this object and call this function to update DRAGNET active (config file), avail_cap (df(1) and config file override) and total_cap (config file) values def updateResourceAvailability(self, resource_id, active=None, available_capacity=None, total_capacity=None): return self.rpc('UpdateResourceAvailability', resource_id=resource_id, active=active, available_capacity=available_capacity, total_capacity=total_capacity) def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None): '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id''' return self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id, specification_id=specification_id) def insertTask(self, mom_id, otdb_id, task_status, task_type, specification_id): return self.rpc('InsertTask', mom_id=mom_id, otdb_id=otdb_id, task_status=task_status, task_type=task_type, specification_id=specification_id) def deleteTask(self, id): return self.rpc('DeleteTask', id=id) def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None): return self.rpc('UpdateTask', id=task_id, mom_id=mom_id, otdb_id=otdb_id, task_status=task_status, task_type=task_type, specification_id=specification_id) def updateTaskStatusForOtdbId(self, otdb_id, task_status): return self.rpc('UpdateTaskStatusForOtdbId', otdb_id=otdb_id, task_status=task_status) def getTasksTimeWindow(self, task_ids=None, mom_ids=None, otdb_ids=None): return self.rpc('GetTasksTimeWindow', task_ids=task_ids, mom_ids=mom_ids, otdb_ids=otdb_ids) def getTasks(self, lower_bound=None, upper_bound=None, task_ids=None, task_status=None, task_type=None, mom_ids=None, otdb_ids=None, cluster=None): '''getTasks let's you query tasks from the radb with many optional filters. :param lower_bound: datetime specifies the lower_bound of a time window above which to select tasks :param upper_bound: datetime specifies the upper_bound of a time window below which to select tasks :param task_ids: int/list/tuple specifies one or more task_ids to select :param task_status: int/string/list specifies one or more task_statuses to select in either task_status_id or task_status_name form :param task_type: int/string/list specifies one or more task_types to select in either task_type_id or task_type_name form :param mom_ids: int/list/tuple specifies one or more mom_ids to select :param otdb_ids: int/list/tuple specifies one or more otdb_ids to select :param cluster: string specifies the cluster to select ''' return self.rpc('GetTasks', lower_bound=lower_bound, upper_bound=upper_bound, task_ids=task_ids, task_status=task_status, task_type=task_type, mom_ids=mom_ids, otdb_ids=otdb_ids, cluster=cluster) def getTaskPredecessorIds(self, id=None): return self.rpc('GetTaskPredecessorIds', id=id) def getTaskSuccessorIds(self, **kwargs): return self.rpc('GetTaskSuccessorIds', id=id) def insertTaskPredecessor(self, task_id, predecessor_id): return self.rpc('InsertTaskPredecessor', task_id=task_id, predecessor_id=predecessor_id) def insertTaskPredecessors(self, task_id, predecessor_ids): return self.rpc('InsertTaskPredecessors', task_id=task_id, predecessor_ids=predecessor_ids) def getTaskTypes(self): return self.rpc('GetTaskTypes') def getTaskStatuses(self): return self.rpc('GetTaskStatuses') def getSpecification(self, id): return self.rpc('GetSpecification', id=id) def insertSpecificationAndTask(self, mom_id, otdb_id, task_status, task_type, starttime, endtime, content, cluster): return self.rpc('InsertSpecificationAndTask', mom_id=mom_id, otdb_id=otdb_id, task_status=task_status, task_type=task_type, starttime=starttime, endtime=endtime, content=content, cluster=cluster) def insertSpecification(self, starttime, endtime, content, cluster): return self.rpc('InsertSpecification', starttime=starttime, endtime=endtime, content=content, cluster=cluster) def deleteSpecification(self, id): return self.rpc('DeleteSpecification', id=id) def updateSpecification(self, id, starttime=None, endtime=None, content=None, cluster=None): return self.rpc('UpdateSpecification', id=id, starttime=starttime, endtime=endtime, content=content, cluster=cluster) def getSpecifications(self): return self.rpc('GetSpecifications') def getUnits(self): return self.rpc('GetUnits') def getResourceAllocationConfig(self, sql_like_name_pattern=None): return self.rpc('GetResourceAllocationConfig', sql_like_name_pattern=sql_like_name_pattern) def get_conflicting_overlapping_claims(self, claim_id): '''returns a list of claimed claims which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)''' return self.rpc('get_overlapping_claims', claim_id=claim_id) def get_conflicting_overlapping_tasks(self, claim_id): '''returns a list of tasks which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)''' return self.rpc('get_overlapping_tasks', claim_id=claim_id) def get_max_resource_usage_between(self, resource_id, lower_bound, upper_bound, claim_status='claimed'): return self.rpc('get_max_resource_usage_between', resource_id=resource_id, lower_bound=lower_bound, upper_bound=upper_bound, claim_status=claim_status) def get_resource_claimable_capacity(self, resource_id, lower_bound, upper_bound): '''get the claimable capacity for the given resource within the timewindow given by lower_bound and upper_bound. this is the resource's available capacity (total-used) minus the maximum allocated usage in that timewindow.''' return self.rpc('get_resource_claimable_capacity', resource_id=resource_id, lower_bound=lower_bound, upper_bound=upper_bound).get('resource_claimable_capacity') def do_tests(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME): with RARPC(busname=busname, servicename=servicename) as rpc: #for i in range(0, 10): #taskId = rpc.insertTask(1234, 5678, 'active', 'OBSERVATION', 1)['id'] #rcId = rpc.insertResourceClaim(1, taskId, datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), 'TENTATIVE', 1, 10, 'einstein', -1)['id'] #print rpc.getResourceClaim(rcId) #rpc.updateResourceClaim(rcId, starttime=datetime.datetime.utcnow(), endtime=datetime.datetime.utcnow() + datetime.timedelta(hours=2), status='CLAIMED') #print rpc.getResourceClaim(rcId) #print #tasks = rpc.getTasks() #for t in tasks: #print rpc.getTask(t['id']) #for i in range(4,9): #rcId = rpc.insertResourceClaim(i, t['id'], datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), 'TENTATIVE', 1, 10, 'einstein', -1)['id'] ##print rpc.deleteTask(t['id']) ##print rpc.getTasks() ##print rpc.getResourceClaims() #print #taskId = tasks[0]['id'] #print 'taskId=', taskId #print rpc.getResourceClaimsForTask(taskId) #print rpc.updateResourceClaimsForTask(taskId, starttime=datetime.datetime.utcnow(), endtime=datetime.datetime.utcnow() + datetime.timedelta(hours=3)) #print rpc.getResourceClaimsForTask(taskId) #print rpc.getTasks() #print rpc.getResourceClaims() #print rpc.getResources() #print rpc.getResourceGroups() #print rpc.getResourceGroupNames('cluster') #print rpc.getResourceGroupMemberships() for rc in rpc.getResourceClaims(): print(rc) rpc.insertResourceClaimProperty(rc['id'], 'nr_of_CS_files', 42) print(rpc.getResourceClaimProperties(rc['id'])) print() print(rpc.getResourceClaimProperties(task_id=493)) #rpc.deleteTask(taskId) #print rpc.getTasks() #print rpc.getResourceClaims() #print rpc.getResourceAllocationConfig() if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) do_tests()