diff --git a/SAS/TMSS/services/feedback_handling/bin/tmss_feedback_handling_service b/SAS/TMSS/services/feedback_handling/bin/tmss_feedback_handling_service index 2ecd686a25fd88e45094bf4cda143e41de1fb61d..81ebeafa772a4285176676e9b4024f8b72d7a531 100755 --- a/SAS/TMSS/services/feedback_handling/bin/tmss_feedback_handling_service +++ b/SAS/TMSS/services/feedback_handling/bin/tmss_feedback_handling_service @@ -18,7 +18,7 @@ # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -from lofar.sas.tmss.services.subtask_scheduling import main +from lofar.sas.tmss.services.feedback_handling import main if __name__ == "__main__": main() diff --git a/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py b/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py index b5d3ccb180607fccf24738f8d5515665c782819d..296c3705322b807d3063c002ad791771221845a1 100644 --- a/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py +++ b/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py @@ -54,6 +54,13 @@ class TMSSFeedbackListener: with TMSSsession.create_from_dbcreds_for_ldap() as session: session.process_subtask_feedback_and_set_finished(subtask_id) + def __enter__(self): + self.start_handling() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop_handling() + def start_handling(self): def listen(ex): @@ -82,5 +89,10 @@ class TMSSFeedbackListener: thread = self.threads.pop() thread.join(5) +def main(): + from lofar.common.util import waitForInterrupt + with TMSSFeedbackListener(): + waitForInterrupt() + if __name__ == '__main__': - TMSSFeedbackListener().start_handling() + main() diff --git a/SAS/TMSS/test/test_utils.py b/SAS/TMSS/test/test_utils.py index 4520339e7e5db56b1848dcf8cbbdc20ad606c4c3..71e8bb8423c1d5cb9f313be1472db2ddfd7f8de1 100644 --- a/SAS/TMSS/test/test_utils.py +++ b/SAS/TMSS/test/test_utils.py @@ -275,6 +275,7 @@ class TMSSTestEnvironment: start_ra_test_environment: bool=False, start_postgres_listener: bool=False, start_subtask_scheduler: bool=False, start_dynamic_scheduler: bool=False, start_pipeline_control: bool=False, start_websocket: bool=False, + start_feedback_service: bool=False, start_workflow_service: bool=False, enable_viewflow: bool=False): self._exchange = exchange self._broker = broker @@ -311,6 +312,9 @@ class TMSSTestEnvironment: self._start_websocket = start_websocket self.websocket_service = None + self._start_feedback_service = start_feedback_service + self.feedback_service = None + self.enable_viewflow = enable_viewflow or start_workflow_service self._start_workflow_service = start_workflow_service self.workflow_service = None @@ -385,6 +389,14 @@ class TMSSTestEnvironment: self.workflow_service = create_workflow_service(exchange=self._exchange, broker=self._broker) self.workflow_service.start_listening() + if self._start_feedback_service: + try: + from lofar.sas.tmss.services.feedback_handling import TMSSFeedbackListener + self.feedback_service = TMSSFeedbackListener() + self.feedback_service.start_handling() + except Exception as e: + logger.exception(e) + if self._populate_schemas or self._populate_test_data: self.populate_schemas() @@ -401,6 +413,10 @@ class TMSSTestEnvironment: self.postgres_listener.stop() self.postgres_listener = None + if self.feedback_service is not None: + self.feedback_service.stop_handling() + self.feedback_service = None + if self.websocket_service is not None: self.websocket_service.stop_listening() self.websocket_service = None @@ -503,6 +519,7 @@ def main_test_environment(): group.add_option('-S', '--scheduling', dest='scheduling', action='store_true', help='start the TMSS background scheduling services for dynamic scheduling of schedulingunits and subtask scheduling of chains of dependend subtasks.') group.add_option('-v', '--viewflow', dest='viewflow', action='store_true', help='Enable the viewflow app for workflows on top of TMSS') group.add_option('-w', '--websockets', dest='websockets', action='store_true', help='Enable json updates pushed via websockets') + group.add_option('-f', '--feedbackservice', dest='feedbackservice', action='store_true', help='Enable feedbackservice to handle feedback from observations/pipelines which comes in via the (old qpid) otdb messagebus.') group.add_option('--all', dest='all', action='store_true', help='Enable/Start all the services, upload schemas and testdata') group = OptionGroup(parser, 'Messaging options') @@ -523,6 +540,7 @@ def main_test_environment(): start_subtask_scheduler=options.scheduling or options.all, start_dynamic_scheduler=options.scheduling or options.all, start_websocket=options.websockets or options.all, + start_feedback_service=options.feedbackservice or options.all, start_workflow_service=options.viewflow or options.all, enable_viewflow=options.viewflow or options.all) as tmss_test_env: