diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index 206e98a41dc525927616793332b0708fc7f127f6..ce3671fb9a6aa6008605526b8355cc446d87a843 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -53,6 +53,10 @@ from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTI from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME +from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC +from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME +from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME + logger = logging.getLogger(__name__) class ResourceAssigner(): @@ -63,6 +67,8 @@ class ResourceAssigner(): re_servicename=RE_SERVICENAME, otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME, otdb_servicename=DEFAULT_OTDB_SERVICENAME, + cleanup_busname=DEFAULT_CLEANUP_BUSNAME, + cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME, ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME, ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX, mom_busname=DEFAULT_MOMQUERY_BUSNAME, @@ -80,6 +86,7 @@ class ResourceAssigner(): self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180) self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180) + self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker) self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker) self.ra_notification_prefix = ra_notification_prefix @@ -98,6 +105,7 @@ class ResourceAssigner(): self.rerpc.open() self.otdbrpc.open() self.momrpc.open() + self.curpc.open() self.ra_notification_bus.open() def close(self): @@ -106,6 +114,7 @@ class ResourceAssigner(): self.rerpc.close() self.otdbrpc.close() self.momrpc.close() + self.curpc.close() self.ra_notification_bus.close() def doAssignment(self, specification_tree): @@ -230,6 +239,17 @@ class ResourceAssigner(): else: logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,)) self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated') + + # remove any output and/or intermediate data for restarting CEP4 pipelines + if task['type'] == 'pipeline': + path_result = self.curpc.getPathForOTDBId(task['otdb_id']) + if path_result['found']: + logger.info("removing data on disk from previous run for otdb_id %s", otdb_id) + result = self.curpc.removeTaskData(task['otdb_id']) + + if not result['deleted']: + logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message']) + # send notification that the task was scheduled, # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb self._sendNotification(task, 'scheduled') diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py index 0fd5d28fe2ed996f5d6e0b1706b8db2585625bc0..10fa2a01a1a083c5b1f0e7493c9653bb80d85db0 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/raservice.py @@ -90,6 +90,8 @@ def main(): from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME + from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME + from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME # Check the invocation arguments parser = OptionParser("%prog [options]", @@ -117,6 +119,8 @@ def main(): help="Name of the resource estimator service. [default: %default]") parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default") parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default") + parser.add_option('--cleanup_busname', dest='cleanup_busname', type='string', default=DEFAULT_CLEANUP_BUSNAME, help='Name of the bus exchange on the qpid broker on which the cleanupservice listens, default: %default') + parser.add_option('--cleanup_servicename', dest='cleanup_servicename', type='string', default=DEFAULT_CLEANUP_SERVICENAME, help='Name of the cleanupservice, default: %default') parser.add_option("--ra_notification_busname", dest="ra_notification_busname", type="string", default=DEFAULT_RA_NOTIFICATION_BUSNAME, help="Name of the notification bus on which the resourceassigner publishes its notifications. [default: %default]") @@ -142,6 +146,8 @@ def main(): re_servicename=options.re_servicename, otdb_busname=options.otdb_busname, otdb_servicename=options.otdb_servicename, + cleanup_busname=options.cleanup_busname, + cleanup_servicename=options.cleanup_servicename, ra_notification_busname=options.ra_notification_busname, ra_notification_prefix=options.ra_notification_prefix, mom_busname=options.mom_query_busname, diff --git a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py index 88ada92f35ae7c37f3579638347c7ce9747084bb..01ea4db44f7c10d642b90c0ef687863080458468 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEditor/lib/webservice.py @@ -316,15 +316,6 @@ def putTask(task_id): if not task: abort(404, "unknown task %s" % str(updatedTask)) - # remove output and intermediate data for restarting CEP4 pipelines - if task.get('cluster') == 'CEP4' and task['type'] == 'pipeline' and updatedTask['status'] == 'prescheduled' and task['status'] in ['finished', 'aborted', 'error']: - path_result = curpc.getPathForOTDBId(task['otdb_id']) - if path_result['found']: - result = curpc.removeTaskData(task['otdb_id']) - - if not result['deleted']: - abort(500, result['message']) - otdbrpc.taskSetStatus(task['otdb_id'], updatedTask['status']) #rarpc.updateTaskAndResourceClaims(task_id,