From fbd4daa5cca63350e504c58a36df4f03cca21856 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Thu, 7 Jan 2021 13:47:45 +0100 Subject: [PATCH] TMSS-566: fixes for the feedbackservice --- .../lib/feedback_handling.py | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py b/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py index 296c3705322..ebf4821abd5 100644 --- a/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py +++ b/SAS/TMSS/services/feedback_handling/lib/feedback_handling.py @@ -70,12 +70,25 @@ class TMSSFeedbackListener: try: # get message from messagebus msg = fbus.get(1) - # add contained feedback to TMSS - self.append_feedback_to_tmss_subtask_raw_feedback(msg.momid, msg.payload) - # try processing it, which will will fail until feedback of the subtask is complete. - self.process_subtask_feedback_and_set_finished(msg.momid) + + if msg is not None: + # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB. + # hence, it stores this id in the 'sasid' property of the message. + # We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field, + # so we can fetch the TMSS subtask_id from the msg's sasid. + tmss_subtask_id = msg.sasid + + logger.info("received feedback from bus='%s' for TMSS subtask id=%s payload=%s", ex, tmss_subtask_id, msg.payload) + + # add contained feedback to TMSS + self.append_feedback_to_tmss_subtask_raw_feedback(tmss_subtask_id, msg.payload) + + # try processing it, which will will fail until feedback of the subtask is complete. + self.process_subtask_feedback_and_set_finished(tmss_subtask_id) except TimeoutError: pass + except Exception as e: + logger.error(str(e)) logger.info('Stopped listening on exchange=%s broker=%s' % (ex, broker_feedback)) for exchange in self.exchanges: @@ -90,6 +103,8 @@ class TMSSFeedbackListener: thread.join(5) def main(): + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + from lofar.common.util import waitForInterrupt with TMSSFeedbackListener(): waitForInterrupt() -- GitLab