-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
rpc.py 18.25 KiB
#!/usr/bin/env python3
# Copyright (C) 2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import logging
import datetime
from lofar.messaging.RPC import RPC, RPCException, RPCWrapper
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME, DEFAULT_SERVICENAME
from lofar.common.util import convertStringDigitKeysToInt
''' Simple RPC client for Service lofarbus.*Z
'''
logger = logging.getLogger(__name__)
class RARPCException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return "RARPCException: " + str(self.message)
class RARPC(RPCWrapper):
def __init__(self, busname=DEFAULT_BUSNAME,
servicename=DEFAULT_SERVICENAME,
broker=None,
timeout=120):
super(RARPC, self).__init__(busname, servicename, broker, timeout=timeout)
def getResourceClaimStatuses(self):
return self.rpc('GetResourceClaimStatuses')
def getResourceClaimPropertyTypes(self):
return self.rpc('GetResourceClaimPropertyTypes')
def getResourceClaimProperties(self, claim_ids=None, task_id=None):
return self.rpc('GetResourceClaimProperties', claim_ids=claim_ids, task_id=task_id)
def insertResourceClaimProperty(self, claim_id, property_type, value, io_type):
return self.rpc('InsertResourceClaimProperty', claim_id=claim_id,
property_type=property_type,
value=value,
io_type=io_type)
def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None,
status=None, resource_type=None, extended=False, include_properties=False):
return self.rpc('GetResourceClaims', claim_ids=claim_ids,
lower_bound=lower_bound,
upper_bound=upper_bound,
resource_ids=resource_ids,
task_ids=task_ids,
status=status,
resource_type=resource_type,
extended=extended,
include_properties=include_properties)
def getResourceClaim(self, id):
return self.rpc('GetResourceClaim', id=id)
def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, claim_size, username,
user_id, used_rcus=None, properties=None):
return self.rpc('InsertResourceClaim', resource_id=resource_id,
task_id=task_id,
starttime=starttime,
endtime=endtime,
status=status,
claim_size=claim_size,
username=username,
used_rcus=used_rcus,
user_id=user_id,
properties=properties)
def insertResourceClaims(self, task_id, claims, username, user_id):
return self.rpc('InsertResourceClaims', task_id=task_id,
claims=claims,
username=username,
user_id=user_id)
def deleteResourceClaim(self, id):
return self.rpc('DeleteResourceClaim', id=id)
def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None):
return self.rpc('UpdateResourceClaim', id=id,
resource_id=resource_id,
task_id=task_id,
starttime=starttime,
endtime=endtime,
status=status,
claim_size=claim_size,
username=username,
used_rcus=used_rcus,
user_id=user_id)
def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None,
resource_id=None, task_id=None, starttime=None, endtime=None,
status=None, claim_size=None, username=None, used_rcus=None, user_id=None,
commit=True):
return self.rpc('UpdateResourceClaims', where_resource_claim_ids=where_resource_claim_ids,
where_task_ids=where_task_ids,
where_resource_types=where_resource_types,
resource_id=resource_id,
task_id=task_id,
starttime=starttime,
endtime=endtime,
status=status,
claim_size=claim_size,
username=username,
used_rcus=used_rcus,
user_id=user_id)
def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None):
return self.rpc('UpdateTaskAndResourceClaims', task_id=task_id,
starttime=starttime,
endtime=endtime,
task_status=task_status,
claim_status=claim_status,
username=username,
used_rcus=used_rcus,
user_id=user_id,
where_resource_types=where_resource_types)
def getResourceUsages(self, lower_bound=None, upper_bound=None, resource_ids=None, status=None):
all_usages = self.rpc('GetResourceUsages',
lower_bound=lower_bound,
upper_bound=upper_bound,
resource_ids=resource_ids,
status=status)
all_usages = convertStringDigitKeysToInt(all_usages)
return all_usages
def getResourceGroupTypes(self):
return self.rpc('GetResourceGroupTypes')
def getResourceGroups(self):
return self.rpc('GetResourceGroups')
def getResourceGroupNames(self, resourceGroupTypeName):
return self.rpc('GetResourceGroupNames',
resourceGroupTypeName=resourceGroupTypeName)
def getResourceGroupMemberships(self):
rg_memberships = self.rpc('GetResourceGroupMemberships')
rg_memberships = convertStringDigitKeysToInt(rg_memberships)
return rg_memberships
def getResourceTypes(self):
return self.rpc('GetResourceTypes')
def getResources(self, resource_ids=None, resource_types=None, include_availability=False):
return self.rpc('GetResources', resource_ids=resource_ids, resource_types=resource_types, include_availability=include_availability)
# instantiate this object and call this function to update DRAGNET active (config file), avail_cap (df(1) and config file override) and total_cap (config file) values
def updateResourceAvailability(self, resource_id, active=None, available_capacity=None, total_capacity=None):
return self.rpc('UpdateResourceAvailability',
resource_id=resource_id,
active=active,
available_capacity=available_capacity,
total_capacity=total_capacity)
def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None):
'''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id'''
return self.rpc('GetTask', id=id, mom_id=mom_id, otdb_id=otdb_id, specification_id=specification_id)
def insertTask(self, mom_id, otdb_id, task_status, task_type, specification_id):
return self.rpc('InsertTask', mom_id=mom_id,
otdb_id=otdb_id,
task_status=task_status,
task_type=task_type,
specification_id=specification_id)
def deleteTask(self, id):
return self.rpc('DeleteTask', id=id)
def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None):
return self.rpc('UpdateTask',
id=task_id,
mom_id=mom_id,
otdb_id=otdb_id,
task_status=task_status,
task_type=task_type,
specification_id=specification_id)
def updateTaskStatusForOtdbId(self, otdb_id, task_status):
return self.rpc('UpdateTaskStatusForOtdbId',
otdb_id=otdb_id,
task_status=task_status)
def getTasksTimeWindow(self, task_ids=None, mom_ids=None, otdb_ids=None):
return self.rpc('GetTasksTimeWindow', task_ids=task_ids, mom_ids=mom_ids, otdb_ids=otdb_ids)
def getTasks(self, lower_bound=None, upper_bound=None, task_ids=None, task_status=None, task_type=None, mom_ids=None, otdb_ids=None, cluster=None):
'''getTasks let's you query tasks from the radb with many optional filters.
:param lower_bound: datetime specifies the lower_bound of a time window above which to select tasks
:param upper_bound: datetime specifies the upper_bound of a time window below which to select tasks
:param task_ids: int/list/tuple specifies one or more task_ids to select
:param task_status: int/string/list specifies one or more task_statuses to select in either task_status_id or task_status_name form
:param task_type: int/string/list specifies one or more task_types to select in either task_type_id or task_type_name form
:param mom_ids: int/list/tuple specifies one or more mom_ids to select
:param otdb_ids: int/list/tuple specifies one or more otdb_ids to select
:param cluster: string specifies the cluster to select
'''
return self.rpc('GetTasks', lower_bound=lower_bound, upper_bound=upper_bound, task_ids=task_ids, task_status=task_status, task_type=task_type, mom_ids=mom_ids, otdb_ids=otdb_ids, cluster=cluster)
def getTaskPredecessorIds(self, id=None):
return self.rpc('GetTaskPredecessorIds', id=id)
def getTaskSuccessorIds(self, **kwargs):
return self.rpc('GetTaskSuccessorIds', id=id)
def insertTaskPredecessor(self, task_id, predecessor_id):
return self.rpc('InsertTaskPredecessor', task_id=task_id, predecessor_id=predecessor_id)
def insertTaskPredecessors(self, task_id, predecessor_ids):
return self.rpc('InsertTaskPredecessors', task_id=task_id, predecessor_ids=predecessor_ids)
def getTaskTypes(self):
return self.rpc('GetTaskTypes')
def getTaskStatuses(self):
return self.rpc('GetTaskStatuses')
def getSpecification(self, id):
return self.rpc('GetSpecification', id=id)
def insertSpecificationAndTask(self, mom_id, otdb_id, task_status, task_type, starttime, endtime, content, cluster):
return self.rpc('InsertSpecificationAndTask',
mom_id=mom_id,
otdb_id=otdb_id,
task_status=task_status,
task_type=task_type,
starttime=starttime,
endtime=endtime,
content=content,
cluster=cluster)
def insertSpecification(self, starttime, endtime, content, cluster):
return self.rpc('InsertSpecification',
starttime=starttime,
endtime=endtime,
content=content,
cluster=cluster)
def deleteSpecification(self, id):
return self.rpc('DeleteSpecification', id=id)
def updateSpecification(self, id, starttime=None, endtime=None, content=None, cluster=None):
return self.rpc('UpdateSpecification',
id=id,
starttime=starttime,
endtime=endtime,
content=content,
cluster=cluster)
def getSpecifications(self):
return self.rpc('GetSpecifications')
def getUnits(self):
return self.rpc('GetUnits')
def getResourceAllocationConfig(self, sql_like_name_pattern=None):
return self.rpc('GetResourceAllocationConfig',
sql_like_name_pattern=sql_like_name_pattern)
def get_conflicting_overlapping_claims(self, claim_id):
'''returns a list of claimed claims which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)'''
return self.rpc('get_overlapping_claims',
claim_id=claim_id)
def get_conflicting_overlapping_tasks(self, claim_id):
'''returns a list of tasks which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)'''
return self.rpc('get_overlapping_tasks',
claim_id=claim_id)
def get_max_resource_usage_between(self, resource_id, lower_bound, upper_bound, claim_status='claimed'):
return self.rpc('get_max_resource_usage_between',
resource_id=resource_id,
lower_bound=lower_bound,
upper_bound=upper_bound,
claim_status=claim_status)
def get_resource_claimable_capacity(self, resource_id, lower_bound, upper_bound):
'''get the claimable capacity for the given resource within the timewindow given by lower_bound and upper_bound.
this is the resource's available capacity (total-used) minus the maximum allocated usage in that timewindow.'''
return self.rpc('get_resource_claimable_capacity', resource_id=resource_id,
lower_bound=lower_bound,
upper_bound=upper_bound).get('resource_claimable_capacity')
def do_tests(busname=DEFAULT_BUSNAME, servicename=DEFAULT_SERVICENAME):
with RARPC(busname=busname, servicename=servicename) as rpc:
#for i in range(0, 10):
#taskId = rpc.insertTask(1234, 5678, 'active', 'OBSERVATION', 1)['id']
#rcId = rpc.insertResourceClaim(1, taskId, datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), 'TENTATIVE', 1, 10, 'einstein', -1)['id']
#print rpc.getResourceClaim(rcId)
#rpc.updateResourceClaim(rcId, starttime=datetime.datetime.utcnow(), endtime=datetime.datetime.utcnow() + datetime.timedelta(hours=2), status='CLAIMED')
#print rpc.getResourceClaim(rcId)
#print
#tasks = rpc.getTasks()
#for t in tasks:
#print rpc.getTask(t['id'])
#for i in range(4,9):
#rcId = rpc.insertResourceClaim(i, t['id'], datetime.datetime.utcnow(), datetime.datetime.utcnow() + datetime.timedelta(hours=1), 'TENTATIVE', 1, 10, 'einstein', -1)['id']
##print rpc.deleteTask(t['id'])
##print rpc.getTasks()
##print rpc.getResourceClaims()
#print
#taskId = tasks[0]['id']
#print 'taskId=', taskId
#print rpc.getResourceClaimsForTask(taskId)
#print rpc.updateResourceClaimsForTask(taskId, starttime=datetime.datetime.utcnow(), endtime=datetime.datetime.utcnow() + datetime.timedelta(hours=3))
#print rpc.getResourceClaimsForTask(taskId)
#print rpc.getTasks()
#print rpc.getResourceClaims()
#print rpc.getResources()
#print rpc.getResourceGroups()
#print rpc.getResourceGroupNames('cluster')
#print rpc.getResourceGroupMemberships()
for rc in rpc.getResourceClaims():
print(rc)
rpc.insertResourceClaimProperty(rc['id'], 'nr_of_CS_files', 42)
print(rpc.getResourceClaimProperties(rc['id']))
print()
print(rpc.getResourceClaimProperties(task_id=493))
#rpc.deleteTask(taskId)
#print rpc.getTasks()
#print rpc.getResourceClaims()
#print rpc.getResourceAllocationConfig()
if __name__ == '__main__':
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
do_tests()