From 29e3dc03bf6a010690d6dfd4852ec63853b5d10c Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Thu, 8 Jun 2017 15:02:48 +0000 Subject: [PATCH] Task #10898: treat updates of start/end times of storage claims differently, add 1 year extension --- .../propagator.py | 66 +++++++++++++++++-- .../ResourceAssignmentDatabase/radb.py | 26 +++++++- .../ResourceAssignmentService/rpc.py | 44 ++++++++----- .../ResourceAssignmentService/service.py | 29 ++++++-- 4 files changed, 137 insertions(+), 28 deletions(-) diff --git a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py index cebb4a841d6..8e1a4112c3d 100644 --- a/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py +++ b/SAS/ResourceAssignment/OTDBtoRATaskStatusPropagator/propagator.py @@ -18,6 +18,8 @@ from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC logger = logging.getLogger(__name__) +STORAGE_CLAIM_EXTENSION=timedelta(days=365) + class OTDBtoRATaskStatusPropagator(OTDBBusListener): def __init__(self, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, @@ -147,8 +149,27 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): new_startime = max([max_pred_endtime, datetime.utcnow()]) new_endtime = new_startime + timedelta(seconds=task['duration']) - logger.info("Updating task %s (otdb_id=%s, status=queued) startime to \'%s\' and endtime to \'%s\'", task['id'], treeId, new_startime, new_endtime) - self.radb.updateTaskAndResourceClaims(task['id'], starttime=new_startime, endtime=new_endtime) + logger.info("Updating task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' except for storage claims", + task['id'], treeId, new_startime, new_endtime) + + #update task and all claim start/endtimes, except for storage claims. + non_storage_resource_type_ids = [rt['id'] for rt in self.radb.getResourceTypes() if rt['name'] != 'storage'] + self.radb.updateTaskAndResourceClaims(task['id'], + where_resource_types=non_storage_resource_type_ids, + starttime=new_startime, + endtime=new_endtime) + + #get remaining storage claims... + #and update storage start/end times (including 1 year extra) + logger.info("Updating storage claims for task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' (with extra storage claim time)", + task['id'], treeId, new_startime, new_endtime+STORAGE_CLAIM_EXTENSION) + + storage_claims = self.radb.getResourceClaims(task_ids=task['id'], resource_type='storage') + storage_claim_ids = [c['id'] for c in storage_claims] + self.radb.updateResourceClaims(where_resource_claim_ids=storage_claim_ids, + starttime=new_startime, + endtime=new_endtime+STORAGE_CLAIM_EXTENSION) + except Exception as e: logger.error(e) @@ -164,8 +185,26 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): new_startime = otdb_task['starttime'] new_endtime = new_startime + timedelta(seconds=radb_task['duration']) - logger.info("Updating task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\'", radb_task['id'], treeId, new_startime, new_endtime) - self.radb.updateTaskAndResourceClaims(radb_task['id'], starttime=new_startime, endtime=new_endtime) + logger.info("Updating task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' except for storage claims", + radb_task['id'], treeId, new_startime, new_endtime) + + #update task and all claim start/endtimes, except for storage claims. + non_storage_resource_type_ids = [rt['id'] for rt in self.radb.getResourceTypes() if rt['name'] != 'storage'] + self.radb.updateTaskAndResourceClaims(radb_task['id'], + where_resource_types=non_storage_resource_type_ids, + starttime=new_startime, + endtime=new_endtime) + + #get remaining storage claims... + #and update storage start/end times (including 1 year extra) + logger.info("Updating storage claims for task %s (otdb_id=%s, status=active) startime to \'%s\' and endtime to \'%s\' with extra claim time", + radb_task['id'], treeId, new_startime, new_endtime) + + storage_claims = self.radb.getResourceClaims(task_ids=radb_task['id'], resource_type='storage') + storage_claim_ids = [c['id'] for c in storage_claims] + self.radb.updateResourceClaims(where_resource_claim_ids=storage_claim_ids, + starttime=new_startime, + endtime=new_endtime+STORAGE_CLAIM_EXTENSION) def onObservationCompleting(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'completing') @@ -183,8 +222,23 @@ class OTDBtoRATaskStatusPropagator(OTDBBusListener): if otdb_task and (otdb_task['starttime'] != radb_task['starttime'] or otdb_task['stoptime'] != radb_task['endtime']): new_endtime = otdb_task['stoptime'] - logger.info("Updating task %s (otdb_id=%s, status=%s) endtime to \'%s\'", radb_task['id'], treeId, radb_task['status'], new_endtime) - self.radb.updateTaskAndResourceClaims(radb_task['id'], endtime=new_endtime) + logger.info("Updating task %s (otdb_id=%s, status=%s) endtime to \'%s\' except for storage resource claims", radb_task['id'], treeId, radb_task['status'], new_endtime) + + #update task and all claim endtimes, except for storage claims. + non_storage_resource_type_ids = [rt['id'] for rt in self.radb.getResourceTypes() if rt['name'] != 'storage'] + self.radb.updateTaskAndResourceClaims(radb_task['id'], + where_resource_types=non_storage_resource_type_ids, + endtime=new_endtime) + + #get remaining storage claims... + #and extend storage end time + logger.info("Updating storage claims for task %s (otdb_id=%s, status=%s) endtime to \'%s\' (with extra storage claim time)", + radb_task['id'], treeId, radb_task['status'], new_endtime+STORAGE_CLAIM_EXTENSION) + + storage_claims = self.radb.getResourceClaims(task_ids=radb_task['id'], resource_type='storage') + storage_claim_ids = [c['id'] for c in storage_claims] + self.radb.updateResourceClaims(where_resource_claim_ids=storage_claim_ids, + endtime=new_endtime+STORAGE_CLAIM_EXTENSION) def onObservationFinished(self, treeId, modificationTime): self._update_radb_task_status(treeId, 'finished') diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index ed3d7ba68e1..3df39af7074 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -1269,7 +1269,8 @@ class RADatabase: return self.updateResourceClaims([resource_claim_id], None, resource_id, task_id, starttime, endtime, status, claim_size, username, used_rcus, user_id, commit) - def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, resource_id=None, task_id=None, starttime=None, endtime=None, + def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None, + resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): '''Update the given paramenters on all resource claims given/delimited by where_resource_claim_ids and/or where_task_ids. @@ -1353,6 +1354,24 @@ class RADatabase: conditions.append('task_id in %s') values.append(tuple(where_task_ids)) + if where_resource_types is not None: + if isinstance(where_resource_types, basestring) or isinstance(where_resource_types, int): + where_resource_types = [where_resource_types] + elif not isinstance(where_resource_types, collections.Iterable): + where_resource_types = [where_resource_types] + + # convert any resource_type name to id + resource_type_names = set([x for x in where_resource_types if isinstance(x, basestring)]) + if resource_type_names: + resource_type_name_to_id = {x['name']:x['id'] for x in self.getResourceTypes()} + where_resource_type_ids = [resource_type_name_to_id[x] if isinstance(x, basestring) else x + for x in where_resource_types] + else: + where_resource_type_ids = [x for x in where_resource_types] + + conditions.append('resource_type_id in %s') + values.append(tuple(where_resource_type_ids)) + query += ' WHERE ' + ' AND '.join(conditions) self._executeQuery(query, values) @@ -1363,7 +1382,7 @@ class RADatabase: return self.cursor.rowcount > 0 - def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, commit=True): + def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None, commit=True): '''combination of updateResourceClaims and updateTask in one transaction''' updated = True @@ -1371,6 +1390,7 @@ class RADatabase: username is not None or used_rcus is not None or user_id is not None): # update the claims as well updated &= self.updateResourceClaims(where_task_ids=task_id, + where_resource_types=where_resource_types, starttime=starttime, endtime=endtime, status=claim_status, @@ -1613,6 +1633,8 @@ class RADatabase: resource_type_name_to_id = {x['name']:x['id'] for x in self.getResourceTypes()} resource_type_ids = [resource_type_name_to_id[x] if isinstance(x, basestring) else x for x in resource_types] + else: + resource_type_ids = [x for x in resource_types] conditions.append('type_id in %s') qargs.append(tuple(resource_type_ids)) diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py index 6ee8e7480c3..41573604cec 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/rpc.py @@ -68,53 +68,67 @@ class RARPC(RPCWrapper): return resource_claim - def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, session_id, claim_size, username, + def insertResourceClaim(self, resource_id, task_id, starttime, endtime, status, claim_size, username, user_id, used_rcus=None, properties=None): return self.rpc('InsertResourceClaim', resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, - session_id=session_id, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id, properties=properties) - def insertResourceClaims(self, task_id, claims, session_id, username, user_id): + def insertResourceClaims(self, task_id, claims, username, user_id): return self.rpc('InsertResourceClaims', task_id=task_id, claims=claims, - session_id=session_id, username=username, user_id=user_id) def deleteResourceClaim(self, id): return self.rpc('DeleteResourceClaim', id=id) - def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, session_id=None, claim_size=None, username=None, used_rcus=None, user_id=None): + def updateResourceClaim(self, id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None): return self.rpc('UpdateResourceClaim', id=id, resource_id=resource_id, task_id=task_id, starttime=starttime, endtime=endtime, status=status, - session_id=session_id, claim_size=claim_size, username=username, used_rcus=used_rcus, user_id=user_id) - def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, session_id=None, username=None, used_rcus=None, user_id=None): + def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None, + resource_id=None, task_id=None, starttime=None, endtime=None, + status=None, claim_size=None, username=None, used_rcus=None, user_id=None, + commit=True): + return self.rpc('UpdateResourceClaims', where_resource_claim_ids=where_resource_claim_ids, + where_task_ids=where_task_ids, + where_resource_types=where_resource_types, + resource_id=resource_id, + task_id=task_id, + starttime=starttime, + endtime=endtime, + status=status, + claim_size=claim_size, + username=username, + used_rcus=used_rcus, + user_id=user_id) + + def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None): return self.rpc('UpdateTaskAndResourceClaims', task_id=task_id, - starttime=starttime, - endtime=endtime, - task_status=task_status, - claim_status=claim_status, - session_id=session_id, - username=username, - used_rcus=used_rcus, - user_id=user_id) + starttime=starttime, + endtime=endtime, + task_status=task_status, + claim_status=claim_status, + username=username, + used_rcus=used_rcus, + user_id=user_id, + where_resource_types=where_resource_types) def getResourceUsages(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None): all_usages = self.rpc('GetResourceUsages', diff --git a/SAS/ResourceAssignment/ResourceAssignmentService/service.py b/SAS/ResourceAssignment/ResourceAssignmentService/service.py index f2cfd92c144..5a26990f69a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentService/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentService/service.py @@ -52,6 +52,7 @@ class RADBHandler(MessageHandlerInterface): 'InsertResourceClaim': self._insertResourceClaim, 'DeleteResourceClaim': self._deleteResourceClaim, 'UpdateResourceClaim': self._updateResourceClaim, + 'UpdateResourceClaims': self._updateResourceClaims, 'UpdateTaskAndResourceClaims': self._updateTaskAndResourceClaims, 'GetResourceUsages': self._getResourceUsages, 'GetResourceGroupTypes': self._getResourceGroupTypes, @@ -147,7 +148,6 @@ class RADBHandler(MessageHandlerInterface): ids = self.radb.insertResourceClaims(kwargs['task_id'], claims, - kwargs['session_id'], kwargs['username'], kwargs['user_id']) return {'ids':ids} @@ -159,7 +159,6 @@ class RADBHandler(MessageHandlerInterface): kwargs['starttime'].datetime(), kwargs['endtime'].datetime(), kwargs.get('status_id', kwargs.get('status')), - kwargs['session_id'], kwargs['claim_size'], kwargs['username'], kwargs['user_id'], @@ -182,12 +181,30 @@ class RADBHandler(MessageHandlerInterface): starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, status=kwargs.get('status_id', kwargs.get('status')), - session_id=kwargs.get('session_id'), claim_size=kwargs.get('claim_size'), username=kwargs.get('username'), user_id=kwargs.get('user_id')) return {'id': id, 'updated': updated} + def _updateResourceClaims(self, **kwargs): + logger.info('UpdateResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) + task_id = kwargs['task_id'] + + updated = self.radb.updateResourceClaims(where_resource_claim_ids=kwargs.get('where_resource_claim_ids'), + where_task_ids=kwargs.get('where_task_ids'), + where_resource_types=kwargs.get('where_resource_types'), + resource_id=kwargs.get('resource_id'), + task_id=kwargs.get('task_id'), + starttime=kwargs.get('starttime').datetime() if kwargs.get('starttime') else None, + endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, + status=kwargs.get('status_id', kwargs.get('status')), + claim_size=kwargs.get('status'), + username=kwargs.get('username'), + user_id=kwargs.get('user_id'), + used_rcus=None) + + return {'task_id': task_id, 'updated': updated} + def _updateTaskAndResourceClaims(self, **kwargs): logger.info('UpdateTaskAndResourceClaims: %s' % dict({k:v for k,v in kwargs.items() if v != None})) task_id = kwargs['task_id'] @@ -197,9 +214,11 @@ class RADBHandler(MessageHandlerInterface): endtime=kwargs.get('endtime').datetime() if kwargs.get('endtime') else None, task_status=kwargs.get('task_status_id', kwargs.get('task_status')), claim_status=kwargs.get('claim_status_id', kwargs.get('claim_status')), - session_id=kwargs.get('session_id'), username=kwargs.get('username'), - user_id=kwargs.get('user_id')) + user_id=kwargs.get('user_id'), + where_resource_types=kwargs.get('where_resource_types'), + commit=kwargs.get('commit', True)) + return {'task_id': task_id, 'updated': updated} def _getResourceUsages(self, **kwargs): -- GitLab