Skip to content
Snippets Groups Projects
Commit fbd4daa5 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-566: fixes for the feedbackservice

parent 1439ee0b
No related branches found
No related tags found
1 merge request!314Resolve TMSS-566
......@@ -70,12 +70,25 @@ class TMSSFeedbackListener:
try:
# get message from messagebus
msg = fbus.get(1)
# add contained feedback to TMSS
self.append_feedback_to_tmss_subtask_raw_feedback(msg.momid, msg.payload)
# try processing it, which will will fail until feedback of the subtask is complete.
self.process_subtask_feedback_and_set_finished(msg.momid)
if msg is not None:
# note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB.
# hence, it stores this id in the 'sasid' property of the message.
# We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field,
# so we can fetch the TMSS subtask_id from the msg's sasid.
tmss_subtask_id = msg.sasid
logger.info("received feedback from bus='%s' for TMSS subtask id=%s payload=%s", ex, tmss_subtask_id, msg.payload)
# add contained feedback to TMSS
self.append_feedback_to_tmss_subtask_raw_feedback(tmss_subtask_id, msg.payload)
# try processing it, which will will fail until feedback of the subtask is complete.
self.process_subtask_feedback_and_set_finished(tmss_subtask_id)
except TimeoutError:
pass
except Exception as e:
logger.error(str(e))
logger.info('Stopped listening on exchange=%s broker=%s' % (ex, broker_feedback))
for exchange in self.exchanges:
......@@ -90,6 +103,8 @@ class TMSSFeedbackListener:
thread.join(5)
def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.common.util import waitForInterrupt
with TMSSFeedbackListener():
waitForInterrupt()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment