diff --git a/.gitattributes b/.gitattributes index 95872b467d96c217c309afc34c61b9fa0c661469..8d63d8c6ad6433daaeef47db8ae2b55edc15190c 100644 --- a/.gitattributes +++ b/.gitattributes @@ -5415,10 +5415,16 @@ SAS/TriggerServices/django_rest/urls.py -text SAS/TriggerServices/lib/CMakeLists.txt -text SAS/TriggerServices/lib/__init__.py -text SAS/TriggerServices/lib/config.py -text +SAS/TriggerServices/lib/trigger_cancellation_service -text +SAS/TriggerServices/lib/trigger_cancellation_service.ini -text +SAS/TriggerServices/lib/trigger_cancellation_service.py -text SAS/TriggerServices/lib/trigger_service.py -text SAS/TriggerServices/lib/trigger_service_rpc.py -text SAS/TriggerServices/test/CMakeLists.txt -text SAS/TriggerServices/test/setup_queues_and_services.sh -text +SAS/TriggerServices/test/t_trigger_cancellation_service.py -text +SAS/TriggerServices/test/t_trigger_cancellation_service.run -text +SAS/TriggerServices/test/t_trigger_cancellation_service.sh -text SAS/TriggerServices/test/t_trigger_service.in/trigger_misc_testing_nov2016.xml -text SAS/TriggerServices/test/t_trigger_service.in/trigger_testing_20_03_17.xml -text SAS/TriggerServices/test/t_trigger_service.py -text diff --git a/SAS/TriggerServices/lib/CMakeLists.txt b/SAS/TriggerServices/lib/CMakeLists.txt index 87745ebc0cccf5acfeccac350a610cd4c378d8a9..f9f8f7e7ccc808d079b65f09e7a414a11cd8a016 100644 --- a/SAS/TriggerServices/lib/CMakeLists.txt +++ b/SAS/TriggerServices/lib/CMakeLists.txt @@ -4,6 +4,7 @@ include(PythonInstall) set(_py_files trigger_service.py trigger_service_rpc.py + trigger_cancellation_service.py config.py ) diff --git a/SAS/TriggerServices/lib/config.py b/SAS/TriggerServices/lib/config.py index c4e6e26d23c534024a89e533cbddc6e2e89c224f..f2b54064e8b67e7a982c66457dc0fc2f2b1eb0c6 100644 --- a/SAS/TriggerServices/lib/config.py +++ b/SAS/TriggerServices/lib/config.py @@ -24,3 +24,6 @@ TRIGGER_SERVICENAME = "triggerservice" TRIGGER_ADDITION_NOTIFICATION_BUSNAME = adaptNameToEnvironment("lofar.trigger.notification") TRIGGER_ADDITION_NOTIFICATION_SUBJECT = 'TriggerAdded' + +OTDB_NOTIFICATION_BUSNAME = adaptNameToEnvironment('lofar.otdb.notification') +OTDB_NOTIFICATION_SUBJECT = 'TaskStatus' \ No newline at end of file diff --git a/SAS/TriggerServices/lib/trigger_cancellation_service b/SAS/TriggerServices/lib/trigger_cancellation_service new file mode 100644 index 0000000000000000000000000000000000000000..87e03e782afe775fd0eb3d7210f086fa6f641548 --- /dev/null +++ b/SAS/TriggerServices/lib/trigger_cancellation_service @@ -0,0 +1,10 @@ +#!/usr/bin/python +# $Id: radbservice 33373 2016-01-22 11:01:15Z schaap $ + +''' +runs the otb to radb taskstatus propagator service +''' +from lofar.triggerservices.trigger_cancellation_service import main + +if __name__ == '__main__': + main() diff --git a/SAS/TriggerServices/lib/trigger_cancellation_service.ini b/SAS/TriggerServices/lib/trigger_cancellation_service.ini new file mode 100644 index 0000000000000000000000000000000000000000..45ae0716c93007f0d1d76df3f6a4b92c8cf3f5cb --- /dev/null +++ b/SAS/TriggerServices/lib/trigger_cancellation_service.ini @@ -0,0 +1,8 @@ +[program:triggercancellationservice] +command=/bin/bash -c 'source $LOFARROOT/lofarinit.sh;exec trigger_cancellation_service' +user=lofarsys +stopsignal=INT ; KeyboardInterrupt +stopasgroup=true ; bash does not propagate signals +stdout_logfile=%(program_name)s.log +redirect_stderr=true +stderr_logfile=NONE diff --git a/SAS/TriggerServices/lib/trigger_cancellation_service.py b/SAS/TriggerServices/lib/trigger_cancellation_service.py new file mode 100644 index 0000000000000000000000000000000000000000..ca4a51137c5d52f00f9d24091bc348ed769e14e3 --- /dev/null +++ b/SAS/TriggerServices/lib/trigger_cancellation_service.py @@ -0,0 +1,88 @@ +#!/usr/bin/python +# $Id$ + +''' +This listens on OTDB updates and cancels triggers that are related to things falling apart. +''' +import logging +from datetime import datetime, timedelta +from optparse import OptionParser +from lofar.common.util import waitForInterrupt +from lofar.sas.otdb.OTDBBusListener import OTDBBusListener +from config import MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME, OTDB_NOTIFICATION_BUSNAME, OTDB_NOTIFICATION_SUBJECT +from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC + +logger = logging.getLogger(__name__) + +class TriggerCancellationService(OTDBBusListener): + def __init__(self, + momqueryrpc = MoMQueryRPC(MOMQUERY_BUSNAME, MOMQUERY_SERVICENAME), + otdb_notification_busname=OTDB_NOTIFICATION_BUSNAME, + otdb_notification_subject=OTDB_NOTIFICATION_SUBJECT, + broker=None, **kwargs): + super(TriggerCancellationService, self).__init__(busname=otdb_notification_busname, + subject=otdb_notification_subject, + broker=broker, + **kwargs) + + self.momqueryrpc = momqueryrpc + + def start_listening(self, **kwargs): + self.momqueryrpc.open() + super(TriggerCancellationService, self).start_listening(**kwargs) + + def stop_listening(self, **kwargs): + self.momqueryrpc.close() + super(TriggerCancellationService, self).stop_listening(**kwargs) + + def _cancel_trigger(self, trigger_id, cancellation_reason): + self.momqueryrpc.cancel_trigger(trigger_id, cancellation_reason) + + def _if_trigger_cancel_trigger(self, otdb_id, cancellation_reason): + mom_id = None + + while not mom_id: # we are sometimes too fast for MOM + mom_id = self.momqueryrpc.getMoMIdsForOTDBIds(otdb_id)[otdb_id] + + trigger_id = self.momqueryrpc.get_trigger_id(mom_id)['trigger_id'] + + if trigger_id: + logger.info("Cancelling trigger w/ otdb_id: %s, mom_id: %s, trigger_id: %s, reason: %s", + otdb_id, mom_id, trigger_id, cancellation_reason) + self._cancel_trigger(trigger_id, cancellation_reason) + + def onObservationError(self, otdb_id, modificationTime): + self._if_trigger_cancel_trigger(otdb_id, "Observation error notification received for OTDB-Id %s" % otdb_id) + + def onObservationConflict(self, otdb_id, modificationTime): + self._if_trigger_cancel_trigger(otdb_id, "Observation conflict notification received for OTDB-Id %s" % otdb_id) + + def onObservationAborted(self, otdb_id, modificationTime): + self._if_trigger_cancel_trigger(otdb_id, "Observation aborted notification received for OTDB-Id %s" % otdb_id) + + # todo: Is this smart enough? How about cancelling triggers if triggered pipeline gets aborted? + + +def main(): + # Check the invocation arguments + parser = OptionParser("%prog [options]", description='runs the resourceassignment database service') + parser.add_option('-q', '--broker', dest='broker', type='string', default=None, help='Address of the qpid broker, default: localhost') + parser.add_option("--otdb_notification_busname", dest="otdb_notification_busname", type="string", + default=OTDB_NOTIFICATION_BUSNAME, + help="Bus or queue where the OTDB notifications are published. [default: %default]") + parser.add_option("--otdb_notification_subject", dest="otdb_notification_subject", type="string", + default=OTDB_NOTIFICATION_SUBJECT, + help="Subject of OTDB notifications on otdb_notification_busname. [default: %default]") + (options, args) = parser.parse_args() + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + + with TriggerCancellationService(otdb_notification_busname=options.otdb_notification_busname, + otdb_notification_subject=options.otdb_notification_subject, + radb_busname=options.radb_busname, + radb_servicename=options.radb_servicename, + broker=options.broker): + waitForInterrupt() + +if __name__ == '__main__': + main() diff --git a/SAS/TriggerServices/test/CMakeLists.txt b/SAS/TriggerServices/test/CMakeLists.txt index 560335aaa57b6867dc222c7599f767345a253065..82b39a5f4de53ea7e1782e07c2921254297df08b 100644 --- a/SAS/TriggerServices/test/CMakeLists.txt +++ b/SAS/TriggerServices/test/CMakeLists.txt @@ -4,3 +4,4 @@ include(FindPythonModule) find_python_module(mock REQUIRED) lofar_add_test(t_trigger_service) +lofar_add_test(t_trigger_cancellation_service) diff --git a/SAS/TriggerServices/test/t_trigger_cancellation_service.py b/SAS/TriggerServices/test/t_trigger_cancellation_service.py new file mode 100755 index 0000000000000000000000000000000000000000..19866634bbae1ec6b5a40df1dd58c090120713c0 --- /dev/null +++ b/SAS/TriggerServices/test/t_trigger_cancellation_service.py @@ -0,0 +1,157 @@ +#!/usr/bin/python + +# Copyright (C) 2017 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +import unittest +import mock +import os + +from lofar.triggerservices.trigger_cancellation_service import TriggerCancellationService + + +class TestTriggerCancellationService(unittest.TestCase): + project_name = "test_lofar" + trigger_id = 1 + obs_sas_id = 22 + obs_mom_id = 44 + mom_link = "https://lofar.astron.nl/mom3/user/main/list/setUpProjectList.do" + + def setUp(self): + self.momqueryrpc_mock = mock.MagicMock() + + self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': self.trigger_id, 'status': "OK" } + self.momqueryrpc_mock.getMoMIdsForOTDBIds.return_value = {self.obs_sas_id: self.obs_mom_id} + + @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.start_listening') + def test_start_listening_opens_momquery_rpc(self, super_mock): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.start_listening() + + self.momqueryrpc_mock.open.assert_called() + super_mock.assert_called() + + @mock.patch('lofar.sas.otdb.OTDBBusListener.OTDBBusListener.stop_listening') + def test_stop_listening_closes_momquery_rpc(self, super_mock): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.stop_listening() + + self.momqueryrpc_mock.close.assert_called() + super_mock.assert_called() + + # Aborted + + def test_onObservationAborted_does_not_call_cancel_trigger_when_its_not_a_trigger(self): + self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} + + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationAborted(self.obs_sas_id, None) + + self.momqueryrpc_mock.cancel_trigger.assert_not_called() + + def test_onObservationAborted_calls_cancel_trigger_with_correct_trigger(self): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationAborted(self.obs_sas_id, None) + + self.momqueryrpc_mock.cancel_trigger.assert_called() + + + def test_onObservationAborted_uses_correct_triggerid_and_sets_correct_cancellation_reason(self): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationAborted(self.obs_sas_id, None) + + # correct otdb id is used to obtain trigger + call_otdb_id = self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_args[0][0] + self.assertEqual(self.obs_sas_id, call_otdb_id) + + # correct trigger id and reason are used to abort + call_id, call_reason = self.momqueryrpc_mock.cancel_trigger.call_args[0] + self.assertEqual(self.trigger_id, call_id) + self.assertIn('aborted', call_reason) + + # Error + + def test_onObservationError_does_not_call_cancel_trigger_when_its_not_a_trigger(self): + self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} + + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationError(self.obs_sas_id, None) + + self.momqueryrpc_mock.cancel_trigger.assert_not_called() + + def test_onObservationError_calls_cancel_trigger_when_its_a_trigger(self): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationError(self.obs_sas_id, None) + + self.momqueryrpc_mock.cancel_trigger.assert_called() + + def test_onObservationError_uses_correct_triggerid_and_sets_correct_cancellation_reason(self): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationError(self.obs_sas_id, None) + + # correct otdb id is used to obtain trigger + call_otdb_id = self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_args[0][0] + self.assertEqual(self.obs_sas_id, call_otdb_id) + + # correct trigger id and reason are used to abort + call_id, call_reason = self.momqueryrpc_mock.cancel_trigger.call_args[0] + self.assertEqual(self.trigger_id, call_id) + self.assertIn('error', call_reason) + + # Conflict + + def test_onObservationConflict_does_not_call_cancel_trigger_when_its_not_a_trigger(self): + self.momqueryrpc_mock.get_trigger_id.return_value = {'trigger_id': None, 'status': "Error"} + + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationConflict(self.obs_sas_id, None) + + self.momqueryrpc_mock.cancel_trigger.assert_not_called() + + def test_onObservationConflict_calls_cancel_trigger_when_its_a_trigger(self): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationConflict(self.obs_sas_id, None) + + self.momqueryrpc_mock.cancel_trigger.assert_called() + + def test_onObservationConflict_uses_correct_triggerid_and_sets_correct_cancellation_reason(self): + listener = TriggerCancellationService(self.momqueryrpc_mock) + + listener.onObservationConflict(self.obs_sas_id, None) + + # correct otdb id is used to obtain trigger + call_otdb_id = self.momqueryrpc_mock.getMoMIdsForOTDBIds.call_args[0][0] + self.assertEqual(self.obs_sas_id, call_otdb_id) + + # correct trigger id and reason are used to abort + call_id, call_reason = self.momqueryrpc_mock.cancel_trigger.call_args[0] + self.assertEqual(self.trigger_id, call_id) + self.assertIn('conflict', call_reason) + + +if __name__ == "__main__": + unittest.main() diff --git a/SAS/TriggerServices/test/t_trigger_cancellation_service.run b/SAS/TriggerServices/test/t_trigger_cancellation_service.run new file mode 100755 index 0000000000000000000000000000000000000000..c5bbda703a784c8a7868bc191276520f29551f13 --- /dev/null +++ b/SAS/TriggerServices/test/t_trigger_cancellation_service.run @@ -0,0 +1,5 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "trigger_cancellation_service/*" t_trigger_cancellation_service.py diff --git a/SAS/TriggerServices/test/t_trigger_cancellation_service.sh b/SAS/TriggerServices/test/t_trigger_cancellation_service.sh new file mode 100755 index 0000000000000000000000000000000000000000..ab436d2efbcc486cf7345f752fbf9ba68bcfc76e --- /dev/null +++ b/SAS/TriggerServices/test/t_trigger_cancellation_service.sh @@ -0,0 +1,4 @@ +#!/bin/sh + +# run the test with the same name as this script +./runctest.sh t_trigger_cancellation_service \ No newline at end of file