Skip to content
Snippets Groups Projects
Commit 6b3e8f1f authored by Auke Klazema's avatar Auke Klazema
Browse files

SW-705: Convert ObservationConrol2 to the new messaging system

parent c93f5281
No related branches found
No related tags found
No related merge requests found
......@@ -22,24 +22,20 @@ import logging
from optparse import OptionParser
# WARNING: This code only works with Fabric Version 2
from fabric.connection import Connection
from fabric import Connection
from lofar.messaging import Service
from lofar.messaging import setQpidLogLevel
from lofar.common.util import waitForInterrupt
from lofar.messaging.Service import MessageHandlerInterface
from lofar.messaging import ServiceMessageHandler
from lofar.messaging import DEFAULT_BROKER, DEFAULT_BUSNAME
import lofar.mac.config as config
logger = logging.getLogger(__name__)
class ObservationControlHandler(MessageHandlerInterface):
def __init__(self, **kwargs):
super(ObservationControlHandler, self).__init__(**kwargs)
self.service2MethodMap = {
'AbortObservation': self.abort_observation
}
class ObservationControlHandler(ServiceMessageHandler):
def __init__(self):
super(ObservationControlHandler, self).__init__()
host = "localhost"
......@@ -53,22 +49,22 @@ class ObservationControlHandler(MessageHandlerInterface):
self.connection = Connection(host)
def _abort_observation_task(self, sas_id):
logger.info("trying to abort ObservationControl for SAS ID: %s", sas_id)
killed = False
with settings(warn_only = True):
pid_line = self.connection.run('pidof ObservationControl').stdout
pids = pid_line.split(' ')
pid_line = self.connection.run('pidof ObservationControl').stdout
pids = pid_line.split(' ')
for pid in pids:
pid_sas_id = self.connection.run("ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % pid).stdout
if str(pid_sas_id) == str(sas_id):
logger.info("Killing ObservationControl with PID: %s for SAS ID: %s", pid, sas_id)
self.connection.run('kill -SIGINT %s' % pid)
killed = True
for pid in pids:
pid_sas_id = self.connection.run(
"ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % pid).stdout
if str(pid_sas_id) == str(sas_id):
logger.info("Killing ObservationControl with PID: %s for SAS ID: %s",
pid, sas_id)
self.connection.run('kill -SIGINT %s' % pid)
killed = True
return killed
......@@ -78,19 +74,6 @@ class ObservationControlHandler(MessageHandlerInterface):
return {'aborted': aborted}
def handle_message(self, msg):
pass
def create_service(bus_name = DEFAULT_BUSNAME,
broker = DEFAULT_BROKER, verbose = False):
return Service(config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
ObservationControlHandler,
busname = bus_name,
broker = broker,
use_service_methods = True,
numthreads = 1,
handler_args = {},
verbose = verbose)
def main():
# make sure we run in UTC timezone
......@@ -99,20 +82,25 @@ def main():
# Check the invocation arguments
parser = OptionParser("%prog [options]",
description = 'runs the observationcontrol service')
parser.add_option('-q', '--broker', dest = 'broker', type = 'string', default = DEFAULT_BROKER, help = 'Address of the qpid broker, default: %default')
parser.add_option("-b", "--busname", dest = "busname", type = "string", default = DEFAULT_BUSNAME, help = "Name of the bus exchange on the qpid broker, default: %default")
parser.add_option('-V', '--verbose', dest = 'verbose', action = 'store_true', help = 'verbose logging')
description='runs the observationcontrol service')
parser.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the qpid broker, default: %default')
parser.add_option("-e", "--exchange", dest="echange", type="string",
default=DEFAULT_BUSNAME,
help="Name of the exchange on the qpid broker, default: %default")
parser.add_option('-V', '--verbose', dest='verbose', action='store_true',
help='verbose logging')
(options, args) = parser.parse_args()
setQpidLogLevel(logging.INFO)
logging.basicConfig(format = '%(asctime)s %(levelname)s %(message)s',
level = logging.DEBUG if options.verbose else logging.INFO)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
with create_service(bus_name = options.busname,
broker = options.broker,
verbose = options.verbose):
with Service(service_name=config.DEFAULT_OBSERVATION_CONTROL_SERVICE_NAME,
handler_type=ObservationControlHandler,
broker=options.broker,
exchange=options.exchange):
waitForInterrupt()
if __name__ == '__main__':
main()
......@@ -4,9 +4,7 @@ import uuid
from unittest import mock
import os
import time
from lofar.mac.ObservationControl2 import ObservationControlHandler, create_service
from qpid.messaging.message import Message as QpidMessage
from lofar.mac.ObservationControl2 import ObservationControlHandler
import lofar.mac.config as config
class TestObservationControlHandler(unittest.TestCase):
......@@ -17,32 +15,27 @@ class TestObservationControlHandler(unittest.TestCase):
def _run_side_effect(self, cmd):
if cmd.startswith("ps -p %s" % self.pid1):
return self.sas_id
return_value = mock.MagicMock
return_value.stdout = self.sas_id
return return_value
elif cmd.startswith("ps -p %s" % self.pid2):
return self.sas_id + "10"
return_value = mock.MagicMock
return_value.stdout = self.sas_id + "10"
return return_value
elif cmd.startswith("pidof"):
return "%s %s" % (self.pid1, self.pid2)
return_value = mock.MagicMock
return_value.stdout = "%s %s" % (self.pid1, self.pid2)
return return_value
elif cmd.startswith("kill"):
return ""
return_value = mock.MagicMock
return_value.stdout = ""
return return_value
def setUp(self):
fabric_run_pathcher = mock.patch('lofar.mac.ObservationControl2.run')
self.addCleanup(fabric_run_pathcher.stop)
self.fabric_run_mock = fabric_run_pathcher.start()
self.fabric_run_mock.side_effect = self._run_side_effect
fabric_tasks_pathcher = mock.patch('lofar.mac.ObservationControl2.tasks')
self.addCleanup(fabric_tasks_pathcher.stop)
self.fabric_tasks_mock = fabric_tasks_pathcher.start()
fabric_env_pathcher = mock.patch('lofar.mac.ObservationControl2.env')
self.addCleanup(fabric_env_pathcher.stop)
self.fabric_env_mock = fabric_env_pathcher.start()
fabric_settings_pathcher = mock.patch('lofar.mac.ObservationControl2.settings')
self.addCleanup(fabric_settings_pathcher.stop)
self.fabric_settings_mock = fabric_settings_pathcher.start()
fabric_connection_pathcher = mock.patch('lofar.mac.ObservationControl2.Connection')
self.addCleanup(fabric_connection_pathcher.stop)
self.fabric_connection_mock = fabric_connection_pathcher.start()
self.fabric_connection_mock().run.side_effect = self._run_side_effect
logger_patcher = mock.patch('lofar.mac.ObservationControl2.logger')
self.addCleanup(logger_patcher.stop)
......@@ -53,48 +46,37 @@ class TestObservationControlHandler(unittest.TestCase):
def test_abort_observation_task_should_run_pidof_ObservationControl(self):
self.observation_control_handler._abort_observation_task(self.sas_id)
self.fabric_run_mock.assert_any_call('pidof ObservationControl')
self.fabric_connection_mock().run.assert_any_call('pidof ObservationControl')
def test_abort_observation_tasks_should_run_ps_to_find_sas_id_on_command(self):
self.observation_control_handler._abort_observation_task(self.sas_id)
self.fabric_run_mock.assert_any_call(
self.fabric_connection_mock().run.assert_any_call(
"ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % self.pid1)
self.fabric_run_mock.assert_any_call(
self.fabric_connection_mock().run.assert_any_call(
"ps -p %s --no-heading -o command | awk -F[{}] '{ printf $2; }'" % self.pid2)
def test_abort_observation_task_should_run_kill_when_sas_id_matches(self):
self.observation_control_handler._abort_observation_task(self.sas_id)
self.fabric_run_mock.assert_any_call('kill -SIGINT %s' % self.pid1)
def test_abort_observation_should_set_run_settings_with_warn_only_as_true(self):
self.observation_control_handler._abort_observation_task(self.sas_id)
self.fabric_settings_mock.assert_called_with(warn_only=True)
self.fabric_connection_mock().run.assert_any_call('kill -SIGINT %s' % self.pid1)
@mock.patch.dict(os.environ, {'LOFARENV': 'TEST'})
def test_observation_control_should_select_test_host_if_lofar_environment_is_test(self):
ObservationControlHandler()
self.assertEqual(self.fabric_env_mock.hosts, [config.TEST_OBSERVATION_CONTROL_HOST])
self.fabric_connection_mock.assert_called_with(config.TEST_OBSERVATION_CONTROL_HOST)
@mock.patch.dict(os.environ, {'LOFARENV': 'PRODUCTION'})
def test_observation_control_should_select_production_host_if_lofar_environment_is_production(self):
ObservationControlHandler()
self.assertEqual(self.fabric_env_mock.hosts, [config.PRODUCTION_OBSERVATION_CONTROL_HOST])
self.fabric_connection_mock.assert_called_with(config.PRODUCTION_OBSERVATION_CONTROL_HOST)
def test_observation_control_should_select_local_host_if_no_lofar_environment_is_set(self):
ObservationControlHandler()
self.assertEqual(self.fabric_env_mock.hosts, ["localhost"])
def test_abort_observation_should_execute_abort_observation_task_on_localhost(self):
self.observation_control_handler.abort_observation(self.sas_id)
self.fabric_tasks_mock.execute.assert_any_call(self.observation_control_handler._abort_observation_task,
self.sas_id)
self.fabric_connection_mock.assert_called_with("localhost")
def test_abort_observation_task_should_return_false_on_unknown_sas_id(self):
self.assertFalse(self.observation_control_handler._abort_observation_task("unknown"))
......@@ -114,53 +96,16 @@ class TestObservationControlHandler(unittest.TestCase):
"Killing ObservationControl with PID: %s for SAS ID: %s", self.pid1, self.sas_id)
def test_abort_observation_should_return_aborted_true_if_execute_returns_true(self):
self.fabric_tasks_mock.execute.return_value = {'localhost': True}
result = self.observation_control_handler.abort_observation(self.sas_id)
self.assertTrue(result['aborted'])
def test_abort_observation_should_return_aborted_false_if_execute_returns_false(self):
self.fabric_tasks_mock.execute.return_value = {'localhost': False}
self.fabric_connection_mock().run = mock.MagicMock()
self.fabric_connection_mock().run.stdout = ""
result = self.observation_control_handler.abort_observation(self.sas_id)
self.assertFalse(result['aborted'])
@mock.patch('lofar.mac.ObservationControl2.ObservationControlHandler.abort_observation')
@mock.patch('lofar.messaging.messagebus.qpid.messaging')
def test_service_runs_abort_observation_task_when_called_with_AbortObservation_command(self, qpid_mock, abort_observation_mock):
message_id = str(uuid.uuid1())
test_id = 200
qpid_message = QpidMessage([{'sas_id': 100}],
reply_to="localhost",
properties={
"has_args": True,
"has_kwargs": True,
"SystemName": "LOFAR",
"MessageType": "RequestMessage",
"MessageId": message_id,
"status": "OK"
},
subject=".AbortObservation")
sender_mock = mock.MagicMock()
receiver_mock = mock.MagicMock()
receiver_mock.fetch.return_value = qpid_message
abort_observation_mock.return_value = {"aborted": True}
qpid_mock.Message = QpidMessage
qpid_mock.Connection().session().senders = [sender_mock]
qpid_mock.Connection().session().next_receiver.return_value = receiver_mock
with create_service() as service:
while receiver_mock.fetch.calles == 0:
pass
self.assertTrue(abort_observation_mock.called)
if __name__ == "__main__":
unittest.main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment