Skip to content
Snippets Groups Projects
Commit b77c327f authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-516: fixed PipelineControl tests for python3 and messagebus changes.

parent bee92011
No related branches found
No related tags found
No related merge requests found
......@@ -73,6 +73,7 @@ from lofar.sas.otdb.config import DEFAULT_OTDB_NOTIFICATION_BUSNAME, DEFAULT_OTD
from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.common.util import waitForInterrupt
from lofar.common import isProductionEnvironment
from lofar.common.subprocess_utils import communicate_returning_strings
from lofar.messaging.RPC import RPCTimeoutException, RPCException
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as DEFAULT_RAS_SERVICE_BUSNAME
......@@ -101,7 +102,7 @@ def runCommand(cmdline, input=None):
# Start command
proc = subprocess.Popen(
cmdline,
stdin=subprocess.PIPE if input else open("/dev/null"),
stdin=subprocess.PIPE if input else None,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
......@@ -110,12 +111,12 @@ def runCommand(cmdline, input=None):
# Feed input and wait for termination
logger.debug("runCommand input: %s", input)
stdout, _ = proc.communicate(input)
stdout, _ = communicate_returning_strings(proc, input)
logger.debug("runCommand output: %s", stdout)
# Check exit status, bail on error
if proc.returncode != 0:
logger.warn("runCommand(%s) had exit status %s with output: %s", cmdline, proc.returncode, stdout)
logger.warning("runCommand(%s) had exit status %s with output: %s", cmdline, proc.returncode, stdout)
raise subprocess.CalledProcessError(proc.returncode, cmdline)
# Return output
......
#!/usr/bin/env python3
import sys
import time
from lofar.mac.PipelineControl import *
from lofar.sas.otdb.OTDBBusListener import OTDBBusListener
......@@ -12,12 +13,13 @@ from lofar.common.methodtrigger import MethodTrigger
import subprocess
import unittest
from unittest.mock import patch
import uuid
import datetime
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
def setUpModule():
pass
......@@ -179,6 +181,10 @@ class TestPipelineDependencies(unittest.TestCase):
self.tmp_queue.open()
self.addCleanup(self.tmp_queue.close)
self.notification_bus = self.tmp_queue.create_tobus()
self.notification_bus.open()
self.addCleanup(self.notification_bus.close)
# ================================
# Global state to manipulate
# ================================
......@@ -204,7 +210,7 @@ class TestPipelineDependencies(unittest.TestCase):
MockOTDBService,
busname=self.tmp_queue.address,
use_service_methods=True,
handler_args={ "notification_bus": self.tmp_queue })
handler_args={ "notification_bus": self.notification_bus })
service.start_listening()
self.addCleanup(service.stop_listening)
......@@ -254,6 +260,14 @@ class TestPipelineControl(unittest.TestCase):
self.tmp_queue.open()
self.addCleanup(self.tmp_queue.close)
self.tmp_notification_queue = TemporaryQueue(__class__.__name__ + "_notification_bus")
self.tmp_notification_queue.open()
self.addCleanup(self.tmp_notification_queue.close)
self.notification_bus = self.tmp_notification_queue.create_tobus()
self.notification_bus.open()
self.addCleanup(self.notification_bus.close)
# Patch SLURM
class MockSlurm(object):
def __init__(self, *args, **kwargs):
......@@ -303,7 +317,7 @@ class TestPipelineControl(unittest.TestCase):
MockOTDBService,
busname=self.tmp_queue.address,
use_service_methods=True,
handler_args={ "notification_bus": self.tmp_queue })
handler_args={ "notification_bus": self.notification_bus })
service.start_listening()
self.addCleanup(service.stop_listening)
......@@ -319,24 +333,15 @@ class TestPipelineControl(unittest.TestCase):
service.start_listening()
self.addCleanup(service.stop_listening)
# ================================
# Setup listener to catch result
# of our service
# ================================
listener = OTDBBusListener(busname=self.tmp_queue.address)
listener.start_listening()
self.addCleanup(listener.stop_listening)
def _wait_for_status(self, otdb_id, expected_status, timeout=5):
start = datetime.datetime.utcnow()
while True:
if otdb_status[otdb_id] == expected_status:
break
self.queued_trigger = MethodTrigger(listener, "onObservationQueued")
def test_setStatus(self):
with PipelineControl(otdb_notification_busname=self.tmp_queue.address, otdb_service_busname=self.tmp_queue.address, ra_service_busname=self.tmp_queue.address) as pipelineControl:
pipelineControl._setStatus(12345, "queued")
# Wait for the status to propagate
self.assertTrue(self.queued_trigger.wait())
self.assertEqual(self.queued_trigger.args[0], 12345)
time.sleep(0.25)
self.assertGreater(datetime.timedelta(seconds=timeout), datetime.datetime.utcnow() - start,
"Timeout while waiting for expected status")
def testNoPredecessors(self):
"""
......@@ -344,15 +349,15 @@ class TestPipelineControl(unittest.TestCase):
3 requires nothing
"""
with PipelineControl(otdb_notification_busname=self.tmp_queue.address, otdb_service_busname=self.tmp_queue.address, ra_service_busname=self.tmp_queue.address) as pipelineControl:
with PipelineControl(otdb_notification_busname=self.tmp_notification_queue.address,
otdb_service_busname=self.tmp_queue.address,
ra_service_busname=self.tmp_queue.address) as pipelineControl:
# Send fake status update
pipelineControl._setStatus(3, "scheduled")
# Wait for pipeline to be queued
self.assertTrue(self.queued_trigger.wait())
# Verify message
self.assertEqual(self.queued_trigger.args[0], 3) # otdbId
self._wait_for_status(3, "queued")
# Check if job was scheduled
self.assertIn("3", pipelineControl.slurm.scheduled_jobs)
......@@ -366,12 +371,15 @@ class TestPipelineControl(unittest.TestCase):
2 requires 3
4 is an observation
"""
with PipelineControl(otdb_notification_busname=self.tmp_queue.address, otdb_service_busname=self.tmp_queue.address, ra_service_busname=self.tmp_queue.address) as pipelineControl:
with PipelineControl(otdb_notification_busname=self.tmp_notification_queue.address,
otdb_service_busname=self.tmp_queue.address,
ra_service_busname=self.tmp_queue.address) as pipelineControl:
# Send fake status update
pipelineControl._setStatus(1, "scheduled")
# Message should not arrive, as predecessors havent finished
self.assertFalse(self.queued_trigger.wait())
with self.assertRaises(AssertionError):
self._wait_for_status(1, "queued")
# Finish predecessors
pipelineControl._setStatus(2, "finished")
......@@ -379,10 +387,7 @@ class TestPipelineControl(unittest.TestCase):
pipelineControl._setStatus(4, "finished")
# Wait for pipeline to be queued
self.assertTrue(self.queued_trigger.wait())
# Verify message
self.assertEqual(self.queued_trigger.args[0], 1) # otdbId
self._wait_for_status(1, "queued")
# Check if job was scheduled
self.assertIn("1", pipelineControl.slurm.scheduled_jobs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment