diff --git a/MAC/Services/CMakeLists.txt b/MAC/Services/CMakeLists.txt index d721a1703185ea91988753b63dc5c1052e1c3c74..303a12a5b2ea42a0a32445d08aa804054ca1ed11 100644 --- a/MAC/Services/CMakeLists.txt +++ b/MAC/Services/CMakeLists.txt @@ -1,6 +1,6 @@ # $Id$ -lofar_package(MAC_Services 1.0 DEPENDS PyMessaging OTDB_Services pyparameterset Docker) +lofar_package(MAC_Services 1.0 DEPENDS PyMessaging OTDB_Services pyparameterset Docker ResourceAssignmentService) add_subdirectory(src) add_subdirectory(test) diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index 89708ce7519297c18637561f61f82d58b8bf0b44..d13a516e0b31850295aaaf7edf214c61262305d7 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -67,6 +67,8 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTD from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.common.util import waitForInterrupt from lofar.messaging.RPC import RPCTimeoutException +from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME import subprocess import datetime @@ -139,9 +141,6 @@ class Parset(dict): return (self[PARSET_PREFIX + "Observation.ObservationControl.PythonControl.softwareVersion"] or self.defaultDockerImage()) - def slurmJobName(self): - return str(self.otdbId()) - def otdbId(self): return int(self[PARSET_PREFIX + "Observation.otdbID"]) @@ -176,65 +175,93 @@ class Slurm(object): def cancel(self, jobName): self._runCommand("scancel --jobname %s" % (jobName,)) - def jobs(self, maxage=datetime.timedelta(365)): - starttime = (datetime.datetime.utcnow() - maxage).strftime("%FT%T") + def isQueuedOrRunning(self, jobName): + stdout = self._runCommand("sacct --starttime=2016-01-01 --noheader --parsable2 --format=jobid --name=%s --state=PENDING,CONFIGURING,RUNNING,RESIZING,COMPLETING,SUSPENDED" % (jobname,)) - stdout = self._runCommand("sacct --starttime=%s --noheader --parsable2 --format=jobid,jobname" % (starttime,)) + return stdout != "" - jobs = {} - for l in stdout.split("\n"): - # One line is one job - jobid, jobname = l.split("|") - jobs_properties = { "JobId": jobid, "JobName": jobname } +class PipelineDependencies(object): + def __init__(self, ra_service_busname=DEFAULT_RAS_SERVICE_BUSNAME): + self.rarpc = RARPC(busname=ra_service_busname) - # warn of duplicate names - if jobname in jobs: - logger.warning("Duplicate job name: %s" % (jobname,)) - jobs[jobname] = job_properties + def open(self): + self.rarpc.open() - return jobs + def close(self): + self.rarpc.close() - def jobid(self, jobname): - stdout = self._runCommand("sacct --starttime=2016-01-01 --noheader --parsable2 --format=jobid --name=%s" % (jobname,)) + def __enter__(self): + self.open() + return self - if stdout == "": - return None + def __exit__(self, type, value, tb): + self.close() + + def getState(self, otdb_id): + """ + Return the status of a single `otdb_id'. + """ - lines = stdout.split("\n") + radb_task = self.rarpc.getTask(otdb_id=otdb_id) + return radb_task["status"] - # We cannot rely on subjobs, so filter them out - # by only accepting numbers, not dots ("136.0") or strings ("136.batch"). - lines = [l for l in lines if l.isdigit()] + def getPredecessorStates(self, otdb_id): + """ + Return a dict of {"sasid":"status"} pairs of all the predecessors of `otdb_id'. + """ + radb_task = self.rarpc.getTask(otdb_id=otdb_id) - if len(lines) > 1: - logger.warning("Duplicate job name: %s matches jobs [%s]" % (jobname,", ".join(lines))) + predecessor_ids = radb_task['predecessor_ids'] + predecessor_tasks = self.rarpc.getTasks(task_ids=predecessor_ids) - # Use last occurance if there are multiple - return lines[-1] + return {t["otdb_id"]: t["status"] for t in predecessor_tasks} + + def getSuccessorIds(self, otdb_id): + """ + Return a list of all the successors of `otdb_id'. + """ + radb_task = self.rarpc.getTask(otdb_id=otdb_id) + + successor_ids = radb_task['successor_ids'] + successor_tasks = self.rarpc.getTasks(task_ids=successor_ids) if successor_ids else [] + + return [t["otdb_id"] for t in successor_tasks] + + def canStart(self, otdbId): + """ + Return whether `otdbId' can start, according to the status of the predecessors + and its own status. + """ + return ( + self.getState(otdbId) == "scheduled" and + all([x == "finished" for x in self.getPredecessorStates(otdbId).values()]) + ) class PipelineControl(OTDBBusListener): - def __init__(self, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME, **kwargs): + def __init__(self, otdb_notification_busname=DEFAULT_OTDB_NOTIFICATION_BUSNAME, otdb_service_busname=DEFAULT_OTDB_SERVICE_BUSNAME, ra_service_busname=DEFAULT_RAS_SERVICE_BUSNAME, **kwargs): super(PipelineControl, self).__init__(busname=otdb_notification_busname, **kwargs) self.otdb_service_busname = otdb_service_busname self.otdbrpc = OTDBRPC(busname=otdb_service_busname) + self.dependencies = PipelineDependencies(ra_service_busname=ra_service_busname) self.slurm = Slurm() - def _setStatus(self, obsid, status): - try: - self.otdbrpc.taskSetStatus(otdb_id=obsid, new_status=status) - except RPCTimeoutException, e: - # We use a queue, so delivery is guaranteed. We don't care about the answer. - pass + def _setStatus(self, otdb_id, status): + self.otdbrpc.taskSetStatus(otdb_id=otdb_id, new_status=status) + + def _getParset(self, otdbId): + return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"]) def start_listening(self, **kwargs): self.otdbrpc.open() + self.dependencies.open() super(PipelineControl, self).start_listening(**kwargs) def stop_listening(self, **kwargs): super(PipelineControl, self).stop_listening(**kwargs) + self.dependencies.close() self.otdbrpc.close() @staticmethod @@ -249,86 +276,22 @@ class PipelineControl(OTDBBusListener): return True - def _slurmJobIds(self, parsets): - return [self.slurm.jobid(p.slurmJobName()) for p in parsets] - - def _getParset(self, otdbId): - return Parset(self.otdbrpc.taskGetSpecification(otdb_id=otdbId)["specification"]) - - def _getPredecessorParsets(self, parset): - otdbIds = parset.predecessors() - - logger.info("Obtaining predecessor parsets %s", otdbIds) - - return [self._getParset(otdbId) for otdbId in otdbIds] - - def onObservationAborted(self, otdbId, modificationTime): - logger.info("***** STOP Otdb ID %s *****", otdbId) - - # Request the parset - parset = self._getParset(otdbId) - - if not self._shouldHandle(parset): - return - - # Cancel corresponding SLURM job, causing any successors - # to be cancelled as well. - jobName = parset.slurmJobName() - - def cancel(jobName): - logger.info("Cancelling job %s", jobName) - self.slurm.cancel(jobName) - - cancel("%s-abort-trigger" % jobName) - cancel(jobName) - - """ - More statusses we want to abort on. - """ - onObservationDescribed = onObservationAborted - onObservationPrepared = onObservationAborted - onObservationApproved = onObservationAborted - onObservationPrescheduled = onObservationAborted - onObservationConflict = onObservationAborted - onObservationHold = onObservationAborted - - @classmethod - def _minStartTime(self, preparsets, margin=datetime.timedelta(0, 60, 0)): - result = None - - for p in preparsets: - processType = p[PARSET_PREFIX + "Observation.processType"] - - # If we depend on an observation, start 1 minute after it - obs_endtime = datetime.datetime.strptime(p[PARSET_PREFIX + "Observation.stopTime"], "%Y-%m-%d %H:%M:%S") - min_starttime = obs_endtime + margin - - result = max(result, min_starttime) if result else min_starttime - - return result - - def onObservationScheduled(self, otdbId, modificationTime): - logger.info("***** QUEUE Otdb ID %s *****", otdbId) - - # Request the parset - parset = self._getParset(otdbId) - - if not self._shouldHandle(parset): - return - - """ - Collect predecessor information. - """ - - # Collect the parsets of predecessors - logger.info("Obtaining predecessor parsets") - preparsets = self._getPredecessorParsets(parset) + @staticmethod + def _jobName(otdbId): + return str(otdbId) + def _startPipeline(self, otdbId, parset): """ Schedule "docker-runPipeline.sh", which will fetch the parset and run the pipeline within a SLURM job. """ + # Avoid race conditions by checking whether we haven't already sent the job + # to SLURM. Our QUEUED status update may still be being processed. + if self.slurm.isQueuedOrRunning(otdbId): + logger.info("Pipeline %s is already queued or running in SLURM.", otdbId) + return + # Determine SLURM parameters sbatch_params = [ # Only run job if all nodes are ready @@ -353,17 +316,9 @@ class PipelineControl(OTDBBusListener): os.path.expandvars("--output=/data/log/runPipeline-%s.log" % (otdbId,)), ] - min_starttime = self._minStartTime([x for x in preparsets if x.isObservation()]) - if min_starttime: - sbatch_params.append("--begin=%s" % (min_starttime.strftime("%FT%T"),)) - - predecessor_jobs = self._slurmJobIds([x for x in preparsets if x.isPipeline()]) - if predecessor_jobs: - sbatch_params.append("--dependency=%s" % (",".join(("afterok:%s" % x for x in predecessor_jobs)),)) - # Schedule runPipeline.sh logger.info("Scheduling SLURM job for runPipeline.sh") - slurm_job_id = self.slurm.submit(parset.slurmJobName(), + slurm_job_id = self.slurm.submit(self._jobName(otdbId), # pull docker image from repository on all nodes "srun --nodelist=$SLURM_NODELIST --cpus-per-task=1 --job-name=docker-pull" " --no-kill" @@ -395,12 +350,11 @@ class PipelineControl(OTDBBusListener): sbatch_params=sbatch_params ) - logger.info("Scheduled SLURM job %s" % (slurm_job_id,)) + logger.info("Scheduled SLURM job %s", slurm_job_id) # Schedule pipelineAborted.sh logger.info("Scheduling SLURM job for pipelineAborted.sh") - slurm_cancel_job_id = self.slurm.submit("%s-abort-trigger" % parset.slurmJobName(), - + slurm_cancel_job_id = self.slurm.submit("%s-abort-trigger" % self._jobName(otdbId), "ssh {myhostname} '" "source {lofarroot}/lofarinit.sh && " "setOTDBTreeStatus -o {obsid} -s aborted -B {status_bus}" @@ -421,18 +375,74 @@ class PipelineControl(OTDBBusListener): "--output=/data/log/abort-trigger-%s.log" % (otdbId,), ] ) - logger.info("Scheduled SLURM job %s" % (slurm_cancel_job_id,)) + logger.info("Scheduled SLURM job %s", slurm_cancel_job_id) - """ - Update OTDB status. Note the possible race condition - as the SLURM jobs will set the status too. - """ - - # Set OTDB status to QUEUED - # TODO: How to avoid race condition with runPipeline.sh setting the status to STARTED - # when the SLURM job starts running? logger.info("Setting status to QUEUED") self._setStatus(otdbId, "queued") - logger.info("Pipeline processed.") + def _stopPipeline(self, otdbId): + # Cancel corresponding SLURM job, but first the abort-trigger + # to avoid setting ABORTED as a side effect. + # to be cancelled as well. + + if not self.slurm.isQueuedOrRunning(otdbId): + logger.info("_stopPipeline: Job %s not running") + return + + def cancel(jobName): + logger.info("Cancelling job %s", jobName) + self.slurm.cancel(jobName) + + jobName = self._jobName(otdbId) + cancel("%s-abort-trigger" % jobName) + cancel(jobName) + + def _startSuccessors(self, otdbId): + for s in self.dependencies.getSuccessorIds(otdbId): + parset = self._getParset(s) + if not self._shouldHandle(parset): + continue + + if self.dependencies.canStart(s): + logger.info("***** START Otdb ID %s *****", otdbId) + self._startPipeline(s, parset) + else: + logger.info("Job %s still cannot start yet.", otdbId) + + def onObservationScheduled(self, otdbId, modificationTime): + parset = self._getParset(otdbId) + if not self._shouldHandle(parset): + return + + # Maybe the pipeline can start already + if self.dependencies.canStart(otdbId): + logger.info("***** START Otdb ID %s *****", otdbId) + self._startPipeline(otdbId, parset) + else: + logger.info("Job %s was set to scheduled, but cannot start yet.", otdbId) + + def onObservationFinished(self, otdbId, modificationTime): + """ Check if any successors can now start. """ + + logger.info("Considering to start successors of %s", otdbId) + + self._startSuccessors(otdbId) + + def onObservationAborted(self, otdbId, modificationTime): + parset = self._getParset(otdbId) + if not self._shouldHandle(parset): + return + + logger.info("***** STOP Otdb ID %s *****", otdbId) + self._stop(otdbId) + + """ + More statusses we want to abort on. + """ + onObservationDescribed = onObservationAborted + onObservationPrepared = onObservationAborted + onObservationApproved = onObservationAborted + onObservationPrescheduled = onObservationAborted + onObservationConflict = onObservationAborted + onObservationHold = onObservationAborted diff --git a/MAC/Services/test/tPipelineControl.py b/MAC/Services/test/tPipelineControl.py index 927a2507c9319add801298ce7fff75f930288125..50b03020dde797794e4c1b813c888bacc85e4eb4 100644 --- a/MAC/Services/test/tPipelineControl.py +++ b/MAC/Services/test/tPipelineControl.py @@ -5,6 +5,7 @@ import sys from lofar.mac.PipelineControl import * from lofar.sas.otdb.OTDBBusListener import OTDBBusListener from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_SUBJECT, DEFAULT_OTDB_SERVICENAME +from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as DEFAULT_RAS_SERVICENAME from lofar.messaging import ToBus, Service, EventMessage, MessageHandlerInterface from lofar.common.methodtrigger import MethodTrigger @@ -15,7 +16,7 @@ import uuid import datetime import logging -logging.basicConfig(stream=sys.stdout, level=logging.DEBUG) +logging.basicConfig(stream=sys.stdout, level=logging.INFO) try: from mock import patch @@ -54,28 +55,6 @@ class TestRunCommand(unittest.TestCase): output = runCommand("cat -", "yes") self.assertEqual(output, "yes") -class TestSlurmJobs(unittest.TestCase): - def test_no_jobs(self): - """ Test 'scontrol show job' output if there are no jobs. """ - with patch('lofar.mac.PipelineControl.Slurm._runCommand') as MockRunSlurmCommand: - MockRunSlurmCommand.return_value = "" - - self.assertEqual(Slurm().jobid("foo"), None) - - def test_one_job(self): - """ Test 'scontrol show job' output for a single job. """ - with patch('lofar.mac.PipelineControl.Slurm._runCommand') as MockRunSlurmCommand: - MockRunSlurmCommand.return_value = """119""" - - self.assertEqual(Slurm().jobid("foo"), "119") - - def test_one_job_with_subjobs(self): - """ Test 'scontrol show job' output for a single job that has subjobs. """ - with patch('lofar.mac.PipelineControl.Slurm._runCommand') as MockRunSlurmCommand: - MockRunSlurmCommand.return_value = """119\n119.0\n119.batch""" - - self.assertEqual(Slurm().jobid("foo"), "119") - class TestPipelineControlClassMethods(unittest.TestCase): def test_shouldHandle(self): """ Test whether we filter the right OTDB trees. """ @@ -95,6 +74,113 @@ class TestPipelineControlClassMethods(unittest.TestCase): "ObsSW.Observation.Cluster.ProcessingCluster.clusterName": t["cluster"] } self.assertEqual(PipelineControl._shouldHandle(Parset(parset)), t["shouldHandle"]) +class MockRAService(MessageHandlerInterface): + """ + Fakes RAService calls. + + For each job, radb_id = otdb_id + 1000 to detect misplaced ids. + """ + def __init__(self, predecessors, status): + super(MockRAService, self).__init__() + + self.service2MethodMap = { + "GetTask": self.GetTask, + "GetTasks": self.GetTasks, + } + + self.predecessors = predecessors + self.successors = {x: [s for s in predecessors if x in predecessors[s]] for x in predecessors} + self.status = status + + def GetTask(self, id, mom_id, otdb_id): + print "***** GetTask(%s) *****" % (otdb_id,) + + return { + 'status': self.status[otdb_id], + + 'predecessor_ids': [1000 + x for x in self.predecessors[otdb_id]], + 'successor_ids': [1000 + x for x in self.successors[otdb_id]], + + 'starttime': datetime.datetime.utcnow(), + 'endtime': datetime.datetime.utcnow(), + } + + def GetTasks(self, lower_bound, upper_bound, task_ids): + print "***** GetTasks(%s) *****" % (task_ids,) + + return [{ + 'otdb_id': t - 1000, + 'status': self.status[t - 1000], + + 'starttime': datetime.datetime.utcnow(), + 'endtime': datetime.datetime.utcnow(), + } for t in task_ids] + +class TestPipelineDependencies(unittest.TestCase): + def setUp(self): + # Create a random bus + self.busname = "%s-%s" % (sys.argv[0], str(uuid.uuid4())[:8]) + self.bus = ToBus(self.busname, { "create": "always", "delete": "always", "node": { "type": "topic" } }) + self.bus.open() + self.addCleanup(self.bus.close) + + # ================================ + # Global state to manipulate + # ================================ + + predecessors = { + 1: [2,3,4], + 2: [3], + 3: [], + 4: [], + } + + status = { + 1: "scheduled", # cannot start, since predecessor 2 hasn't finished + 2: "scheduled", # can start, since predecessor 3 has finished + 3: "finished", + 4: "scheduled", # can start, because no predecessors + } + + # ================================ + # Setup mock ra service + # + # Note that RA IDs are the same as + # OTDB IDs + 1000 in this test. + # ================================ + + service = Service(DEFAULT_RAS_SERVICENAME, + MockRAService, + busname=self.busname, + use_service_methods=True, + handler_args={"predecessors": predecessors, "status": status}) + service.start_listening() + self.addCleanup(service.stop_listening) + + def testGetState(self): + with PipelineDependencies(ra_service_busname=self.busname) as pipelineDependencies: + self.assertEqual(pipelineDependencies.getState(1), "scheduled") + self.assertEqual(pipelineDependencies.getState(2), "scheduled") + self.assertEqual(pipelineDependencies.getState(3), "finished") + self.assertEqual(pipelineDependencies.getState(4), "scheduled") + + def testPredecessorStates(self): + with PipelineDependencies(ra_service_busname=self.busname) as pipelineDependencies: + self.assertEqual(pipelineDependencies.getPredecessorStates(1), {2: "scheduled", 3: "finished", 4: "scheduled"}) + self.assertEqual(pipelineDependencies.getPredecessorStates(3), {}) + + def testSuccessorIds(self): + with PipelineDependencies(ra_service_busname=self.busname) as pipelineDependencies: + self.assertEqual(pipelineDependencies.getSuccessorIds(1), []) + self.assertEqual(pipelineDependencies.getSuccessorIds(3), [1,2]) + + def testCanStart(self): + with PipelineDependencies(ra_service_busname=self.busname) as pipelineDependencies: + self.assertEqual(pipelineDependencies.canStart(1), False) + self.assertEqual(pipelineDependencies.canStart(2), True) + self.assertEqual(pipelineDependencies.canStart(3), False) + self.assertEqual(pipelineDependencies.canStart(4), True) + class TestPipelineControl(unittest.TestCase): def setUp(self): # Create a random bus @@ -113,15 +199,11 @@ class TestPipelineControl(unittest.TestCase): self.scheduled_jobs[jobName] = (args, kwargs) - # Return job ID + # Return fake job ID return "42" - def jobid(self, jobname): - if jobname in ["1", "2", "3"]: - return jobname - - # "4" is an observation, so no SLURM job - return None + def isQueuedOrRunning(self, otdbId): + return str(otdbId) in self.scheduled_jobs patcher = patch('lofar.mac.PipelineControl.Slurm') patcher.start().side_effect = MockSlurm @@ -132,6 +214,24 @@ class TestPipelineControl(unittest.TestCase): patcher.start().return_value = "lofar-pipeline:trunk" self.addCleanup(patcher.stop) + # ================================ + # Global state to manipulate + # ================================ + + predecessors = { + 1: [2,3,4], + 2: [3], + 3: [], + 4: [], + } + + status = { + 1: "prescheduled", + 2: "prescheduled", + 3: "prescheduled", + 4: "prescheduled", + } + # ================================ # Setup mock otdb service # ================================ @@ -153,35 +253,18 @@ class TestPipelineControl(unittest.TestCase): def TaskGetSpecification(self, OtdbID): print "***** TaskGetSpecification(%s) *****" % (OtdbID,) - if OtdbID == 1: - predecessors = "[2,3,4]" - elif OtdbID == 2: - predecessors = "[3]" - elif OtdbID == 3: - predecessors = "[]" - elif OtdbID == 4: - return { "TaskSpecification": { - "Version.number": "1", - PARSET_PREFIX + "Observation.otdbID": str(OtdbID), - PARSET_PREFIX + "Observation.Scheduler.predecessors": "[]", - PARSET_PREFIX + "Observation.processType": "Observation", - PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName": "CEP4", - PARSET_PREFIX + "Observation.stopTime": "2016-01-01 01:00:00", - } } - else: - raise Exception("Invalid OtdbID: %s" % OtdbID) - return { "TaskSpecification": { - "Version.number": "1", + "Version.number": "1", PARSET_PREFIX + "Observation.otdbID": str(OtdbID), - PARSET_PREFIX + "Observation.Scheduler.predecessors": predecessors, - PARSET_PREFIX + "Observation.processType": "Pipeline", + PARSET_PREFIX + "Observation.processType": ("Observation" if OtdbID == 4 else "Pipeline"), PARSET_PREFIX + "Observation.Cluster.ProcessingCluster.clusterName": "CEP4", } } def TaskSetStatus(self, OtdbID, NewStatus, UpdateTimestamps): print "***** TaskSetStatus(%s,%s) *****" % (OtdbID, NewStatus) + status[OtdbID] = NewStatus + # Broadcast the state change content = { "treeID" : OtdbID, "state" : NewStatus, "time_of_change" : datetime.datetime.utcnow() } msg = EventMessage(context=DEFAULT_OTDB_NOTIFICATION_SUBJECT, content=content) @@ -197,6 +280,18 @@ class TestPipelineControl(unittest.TestCase): service.start_listening() self.addCleanup(service.stop_listening) + # ================================ + # Setup mock ra service + # ================================ + + service = Service(DEFAULT_RAS_SERVICENAME, + MockRAService, + busname=self.busname, + use_service_methods=True, + handler_args={"predecessors": predecessors, "status": status}) + service.start_listening() + self.addCleanup(service.stop_listening) + # ================================ # Setup listener to catch result # of our service @@ -206,15 +301,15 @@ class TestPipelineControl(unittest.TestCase): listener.start_listening() self.addCleanup(listener.stop_listening) - self.trigger = MethodTrigger(listener, "onObservationQueued") + self.queued_trigger = MethodTrigger(listener, "onObservationQueued") def test_setStatus(self): - with PipelineControl(otdb_notification_busname=self.busname, otdb_service_busname=self.busname) as pipelineControl: + with PipelineControl(otdb_notification_busname=self.busname, otdb_service_busname=self.busname, ra_service_busname=self.busname) as pipelineControl: pipelineControl._setStatus(12345, "queued") # Wait for the status to propagate - self.assertTrue(self.trigger.wait()) - self.assertEqual(self.trigger.args[0], 12345) + self.assertTrue(self.queued_trigger.wait()) + self.assertEqual(self.queued_trigger.args[0], 12345) def testNoPredecessors(self): """ @@ -222,15 +317,15 @@ class TestPipelineControl(unittest.TestCase): 3 requires nothing """ - with PipelineControl(otdb_notification_busname=self.busname, otdb_service_busname=self.busname) as pipelineControl: + with PipelineControl(otdb_notification_busname=self.busname, otdb_service_busname=self.busname, ra_service_busname=self.busname) as pipelineControl: # Send fake status update pipelineControl._setStatus(3, "scheduled") - # Wait for message to arrive - self.assertTrue(self.trigger.wait()) + # Wait for pipeline to be queued + self.assertTrue(self.queued_trigger.wait()) # Verify message - self.assertEqual(self.trigger.args[0], 3) # otdbId + self.assertEqual(self.queued_trigger.args[0], 3) # otdbId # Check if job was scheduled self.assertIn("3", pipelineControl.slurm.scheduled_jobs) @@ -244,29 +339,28 @@ class TestPipelineControl(unittest.TestCase): 2 requires 3 4 is an observation """ - with PipelineControl(otdb_notification_busname=self.busname, otdb_service_busname=self.busname) as pipelineControl: + with PipelineControl(otdb_notification_busname=self.busname, otdb_service_busname=self.busname, ra_service_busname=self.busname) as pipelineControl: # Send fake status update pipelineControl._setStatus(1, "scheduled") - # Wait for message to arrive - self.assertTrue(self.trigger.wait()) + # Message should not arrive, as predecessors havent finished + self.assertFalse(self.queued_trigger.wait()) + + # Finish predecessors + pipelineControl._setStatus(2, "finished") + pipelineControl._setStatus(3, "finished") + pipelineControl._setStatus(4, "finished") + + # Wait for pipeline to be queued + self.assertTrue(self.queued_trigger.wait()) # Verify message - self.assertEqual(self.trigger.args[0], 1) # otdbId + self.assertEqual(self.queued_trigger.args[0], 1) # otdbId # Check if job was scheduled self.assertIn("1", pipelineControl.slurm.scheduled_jobs) self.assertIn("1-abort-trigger", pipelineControl.slurm.scheduled_jobs) - # Earliest start of this job > stop time of observation - for p in pipelineControl.slurm.scheduled_jobs["1"][1]["sbatch_params"]: - if p.startswith("--begin="): - begin = datetime.datetime.strptime(p, "--begin=%Y-%m-%dT%H:%M:%S") - self.assertGreater(begin, datetime.datetime(2016, 1, 1, 1, 0, 0)) - break - else: - self.assertTrue(False, "--begin parameter not given to SLURM job") - def main(argv): unittest.main()