diff --git a/MAC/Services/src/PipelineControl.py b/MAC/Services/src/PipelineControl.py index a430d2b651e2ca9251034a070fe61f1de3108cf9..04af255196e726fc77121e18e1cda7b43e661437 100755 --- a/MAC/Services/src/PipelineControl.py +++ b/MAC/Services/src/PipelineControl.py @@ -73,6 +73,7 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTD from lofar.sas.otdb.otdbrpc import OTDBRPC from lofar.common.util import waitForInterrupt from lofar.common import isProductionEnvironment +from lofar.common.subprocess_utils import communicate_returning_strings from lofar.messaging.RPC import RPCTimeoutException, RPCException from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME @@ -101,7 +102,7 @@ def runCommand(cmdline, input=None): # Start command proc = subprocess.Popen( cmdline, - stdin=subprocess.PIPE if input else open("/dev/null"), + stdin=subprocess.PIPE if input else None, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, @@ -110,12 +111,12 @@ def runCommand(cmdline, input=None): # Feed input and wait for termination logger.debug("runCommand input: %s", input) - stdout, _ = proc.communicate(input) + stdout, _ = communicate_returning_strings(proc, input) logger.debug("runCommand output: %s", stdout) # Check exit status, bail on error if proc.returncode != 0: - logger.warn("runCommand(%s) had exit status %s with output: %s", cmdline, proc.returncode, stdout) + logger.warning("runCommand(%s) had exit status %s with output: %s", cmdline, proc.returncode, stdout) raise subprocess.CalledProcessError(proc.returncode, cmdline) # Return output diff --git a/MAC/Services/test/tPipelineControl.py b/MAC/Services/test/tPipelineControl.py index cf8d585047dae968a33f243f2baf42d9a71cb5cf..ab5050d5f58422e8d3601ea346ea32d0eefd632b 100644 --- a/MAC/Services/test/tPipelineControl.py +++ b/MAC/Services/test/tPipelineControl.py @@ -1,6 +1,7 @@ #!/usr/bin/env python3 import sys +import time from lofar.mac.PipelineControl import * from lofar.sas.otdb.OTDBBusListener import OTDBBusListener @@ -12,12 +13,13 @@ from lofar.common.methodtrigger import MethodTrigger import subprocess import unittest +from unittest.mock import patch import uuid import datetime import logging logger = logging.getLogger(__name__) -logging.basicConfig(stream=sys.stdout, level=logging.INFO) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) def setUpModule(): pass @@ -179,6 +181,10 @@ class TestPipelineDependencies(unittest.TestCase): self.tmp_queue.open() self.addCleanup(self.tmp_queue.close) + self.notification_bus = self.tmp_queue.create_tobus() + self.notification_bus.open() + self.addCleanup(self.notification_bus.close) + # ================================ # Global state to manipulate # ================================ @@ -204,7 +210,7 @@ class TestPipelineDependencies(unittest.TestCase): MockOTDBService, busname=self.tmp_queue.address, use_service_methods=True, - handler_args={ "notification_bus": self.tmp_queue }) + handler_args={ "notification_bus": self.notification_bus }) service.start_listening() self.addCleanup(service.stop_listening) @@ -254,6 +260,14 @@ class TestPipelineControl(unittest.TestCase): self.tmp_queue.open() self.addCleanup(self.tmp_queue.close) + self.tmp_notification_queue = TemporaryQueue(__class__.__name__ + "_notification_bus") + self.tmp_notification_queue.open() + self.addCleanup(self.tmp_notification_queue.close) + + self.notification_bus = self.tmp_notification_queue.create_tobus() + self.notification_bus.open() + self.addCleanup(self.notification_bus.close) + # Patch SLURM class MockSlurm(object): def __init__(self, *args, **kwargs): @@ -303,7 +317,7 @@ class TestPipelineControl(unittest.TestCase): MockOTDBService, busname=self.tmp_queue.address, use_service_methods=True, - handler_args={ "notification_bus": self.tmp_queue }) + handler_args={ "notification_bus": self.notification_bus }) service.start_listening() self.addCleanup(service.stop_listening) @@ -319,24 +333,15 @@ class TestPipelineControl(unittest.TestCase): service.start_listening() self.addCleanup(service.stop_listening) - # ================================ - # Setup listener to catch result - # of our service - # ================================ - - listener = OTDBBusListener(busname=self.tmp_queue.address) - listener.start_listening() - self.addCleanup(listener.stop_listening) + def _wait_for_status(self, otdb_id, expected_status, timeout=5): + start = datetime.datetime.utcnow() + while True: + if otdb_status[otdb_id] == expected_status: + break - self.queued_trigger = MethodTrigger(listener, "onObservationQueued") - - def test_setStatus(self): - with PipelineControl(otdb_notification_busname=self.tmp_queue.address, otdb_service_busname=self.tmp_queue.address, ra_service_busname=self.tmp_queue.address) as pipelineControl: - pipelineControl._setStatus(12345, "queued") - - # Wait for the status to propagate - self.assertTrue(self.queued_trigger.wait()) - self.assertEqual(self.queued_trigger.args[0], 12345) + time.sleep(0.25) + self.assertGreater(datetime.timedelta(seconds=timeout), datetime.datetime.utcnow() - start, + "Timeout while waiting for expected status") def testNoPredecessors(self): """ @@ -344,15 +349,15 @@ class TestPipelineControl(unittest.TestCase): 3 requires nothing """ - with PipelineControl(otdb_notification_busname=self.tmp_queue.address, otdb_service_busname=self.tmp_queue.address, ra_service_busname=self.tmp_queue.address) as pipelineControl: + with PipelineControl(otdb_notification_busname=self.tmp_notification_queue.address, + otdb_service_busname=self.tmp_queue.address, + ra_service_busname=self.tmp_queue.address) as pipelineControl: + # Send fake status update pipelineControl._setStatus(3, "scheduled") # Wait for pipeline to be queued - self.assertTrue(self.queued_trigger.wait()) - - # Verify message - self.assertEqual(self.queued_trigger.args[0], 3) # otdbId + self._wait_for_status(3, "queued") # Check if job was scheduled self.assertIn("3", pipelineControl.slurm.scheduled_jobs) @@ -366,12 +371,15 @@ class TestPipelineControl(unittest.TestCase): 2 requires 3 4 is an observation """ - with PipelineControl(otdb_notification_busname=self.tmp_queue.address, otdb_service_busname=self.tmp_queue.address, ra_service_busname=self.tmp_queue.address) as pipelineControl: + with PipelineControl(otdb_notification_busname=self.tmp_notification_queue.address, + otdb_service_busname=self.tmp_queue.address, + ra_service_busname=self.tmp_queue.address) as pipelineControl: # Send fake status update pipelineControl._setStatus(1, "scheduled") # Message should not arrive, as predecessors havent finished - self.assertFalse(self.queued_trigger.wait()) + with self.assertRaises(AssertionError): + self._wait_for_status(1, "queued") # Finish predecessors pipelineControl._setStatus(2, "finished") @@ -379,10 +387,7 @@ class TestPipelineControl(unittest.TestCase): pipelineControl._setStatus(4, "finished") # Wait for pipeline to be queued - self.assertTrue(self.queued_trigger.wait()) - - # Verify message - self.assertEqual(self.queued_trigger.args[0], 1) # otdbId + self._wait_for_status(1, "queued") # Check if job was scheduled self.assertIn("1", pipelineControl.slurm.scheduled_jobs)