Skip to content
Snippets Groups Projects
Commit 820fd43f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #9607: delete data from previous cep4 pipeline run upon (re)scheduling

parent c78fb5f7
No related branches found
No related tags found
No related merge requests found
......@@ -53,6 +53,10 @@ from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTI
from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
logger = logging.getLogger(__name__)
class ResourceAssigner():
......@@ -63,6 +67,8 @@ class ResourceAssigner():
re_servicename=RE_SERVICENAME,
otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
otdb_servicename=DEFAULT_OTDB_SERVICENAME,
cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
mom_busname=DEFAULT_MOMQUERY_BUSNAME,
......@@ -80,6 +86,7 @@ class ResourceAssigner():
self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180)
self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
self.ra_notification_prefix = ra_notification_prefix
......@@ -98,6 +105,7 @@ class ResourceAssigner():
self.rerpc.open()
self.otdbrpc.open()
self.momrpc.open()
self.curpc.open()
self.ra_notification_bus.open()
def close(self):
......@@ -106,6 +114,7 @@ class ResourceAssigner():
self.rerpc.close()
self.otdbrpc.close()
self.momrpc.close()
self.curpc.close()
self.ra_notification_bus.close()
def doAssignment(self, specification_tree):
......@@ -230,6 +239,17 @@ class ResourceAssigner():
else:
logger.info('doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,))
self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')
# remove any output and/or intermediate data for restarting CEP4 pipelines
if task['type'] == 'pipeline':
path_result = self.curpc.getPathForOTDBId(task['otdb_id'])
if path_result['found']:
logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
result = self.curpc.removeTaskData(task['otdb_id'])
if not result['deleted']:
logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message'])
# send notification that the task was scheduled,
# another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb
self._sendNotification(task, 'scheduled')
......
......@@ -90,6 +90,8 @@ def main():
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME
# Check the invocation arguments
parser = OptionParser("%prog [options]",
......@@ -117,6 +119,8 @@ def main():
help="Name of the resource estimator service. [default: %default]")
parser.add_option("--otdb_busname", dest="otdb_busname", type="string", default=DEFAULT_OTDB_SERVICE_BUSNAME, help="Name of the bus on which the OTDB service listens, default: %default")
parser.add_option("--otdb_servicename", dest="otdb_servicename", type="string", default=DEFAULT_OTDB_SERVICENAME, help="Name of the OTDB service, default: %default")
parser.add_option('--cleanup_busname', dest='cleanup_busname', type='string', default=DEFAULT_CLEANUP_BUSNAME, help='Name of the bus exchange on the qpid broker on which the cleanupservice listens, default: %default')
parser.add_option('--cleanup_servicename', dest='cleanup_servicename', type='string', default=DEFAULT_CLEANUP_SERVICENAME, help='Name of the cleanupservice, default: %default')
parser.add_option("--ra_notification_busname", dest="ra_notification_busname", type="string",
default=DEFAULT_RA_NOTIFICATION_BUSNAME,
help="Name of the notification bus on which the resourceassigner publishes its notifications. [default: %default]")
......@@ -142,6 +146,8 @@ def main():
re_servicename=options.re_servicename,
otdb_busname=options.otdb_busname,
otdb_servicename=options.otdb_servicename,
cleanup_busname=options.cleanup_busname,
cleanup_servicename=options.cleanup_servicename,
ra_notification_busname=options.ra_notification_busname,
ra_notification_prefix=options.ra_notification_prefix,
mom_busname=options.mom_query_busname,
......
......@@ -316,15 +316,6 @@ def putTask(task_id):
if not task:
abort(404, "unknown task %s" % str(updatedTask))
# remove output and intermediate data for restarting CEP4 pipelines
if task.get('cluster') == 'CEP4' and task['type'] == 'pipeline' and updatedTask['status'] == 'prescheduled' and task['status'] in ['finished', 'aborted', 'error']:
path_result = curpc.getPathForOTDBId(task['otdb_id'])
if path_result['found']:
result = curpc.removeTaskData(task['otdb_id'])
if not result['deleted']:
abort(500, result['message'])
otdbrpc.taskSetStatus(task['otdb_id'], updatedTask['status'])
#rarpc.updateTaskAndResourceClaims(task_id,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment