Skip to content
Snippets Groups Projects
Commit 115bf4d9 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-307: while working on TMSS commissioning with TMSS-573 it was discovered...

TMSS-307: while working on TMSS commissioning with TMSS-573 it was discovered that cobalt sends its feedback to a queue with a single consumer. Now with two consumers (the extra tmss_feedback_service) they were fighting over the messages. Hence, the queue has been replaced with a topic exchange which duplicates the message in two seperate queues for otdb and for tmss.
parent 49c61eec
Branches
Tags
1 merge request!343Resolve TMSS-557 and TMSS-307
...@@ -3,7 +3,7 @@ ...@@ -3,7 +3,7 @@
# #
# Parameters for FeedbackService to connect to Qpid. # Parameters for FeedbackService to connect to Qpid.
# #
FeedbackQueuenames = [ "otdb.task.feedback.dataproducts" , "otdb.task.feedback.processing" ] FeedbackQueuenames = [ "otdb.task.feedback.dataproducts.for_otdb" , "otdb.task.feedback.processing.for_otdb" ]
FeedbackService.OTDBdatabase = LOFAR_4 FeedbackService.OTDBdatabase = LOFAR_4
FeedbackService.OTDBhostname = sasdb FeedbackService.OTDBhostname = sasdb
......
...@@ -43,8 +43,18 @@ fi ...@@ -43,8 +43,18 @@ fi
# MessageBus # MessageBus
qpid-config -b $CCU add queue ${PREFIX}mac.task.feedback.state --durable qpid-config -b $CCU add queue ${PREFIX}mac.task.feedback.state --durable
qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.dataproducts --durable qpid-config -b $MCU del queue topic ${PREFIX}otdb.task.feedback.dataproducts --durable
qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.processing --durable qpid-config -b $MCU add exchange topic ${PREFIX}otdb.task.feedback.dataproducts --durable
qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.dataproducts.for_otdb --durable
qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.dataproducts ${PREFIX}otdb.task.feedback.dataproducts.for_otdb "#" --durable
qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.dataproducts.for_tmss --durable
qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.dataproducts ${PREFIX}otdb.task.feedback.dataproducts.for_tmss "#" --durable
qpid-config -b $MCU del queue topic ${PREFIX}otdb.task.feedback.processing --durable
qpid-config -b $MCU add exchange topic ${PREFIX}otdb.task.feedback.processing --durable
qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.processing.for_otdb --durable
qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.processing ${PREFIX}otdb.task.feedback.processing.for_otdb "#" --durable
qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.processing.for_tmss --durable
qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.processing ${PREFIX}otdb.task.feedback.processing.for_tmss "#" --durable
qpid-config -b $MCU add queue ${PREFIX}lofar.task.specification.system --durable qpid-config -b $MCU add queue ${PREFIX}lofar.task.specification.system --durable
qpid-config -b $CCU add queue ${PREFIX}lofar.task.specification.system --durable qpid-config -b $CCU add queue ${PREFIX}lofar.task.specification.system --durable
qpid-config -b $CCU add queue ${PREFIX}mom.task.specification.system --durable qpid-config -b $CCU add queue ${PREFIX}mom.task.specification.system --durable
......
...@@ -42,7 +42,7 @@ class TMSSFeedbackListener: ...@@ -42,7 +42,7 @@ class TMSSFeedbackListener:
# Note we can't use fancy bus listeners since we have to use the really old messagebus package for feedback # Note we can't use fancy bus listeners since we have to use the really old messagebus package for feedback
# todo: get rid of old-style messaging or improve this service stub # todo: get rid of old-style messaging or improve this service stub
threads = [] threads = []
exchanges = ["otdb.task.feedback.processing", "otdb.task.feedback.dataproducts"] exchanges = ["otdb.task.feedback.processing.for_tmss", "otdb.task.feedback.dataproducts.for_tmss"]
def __init__(self, rest_client_creds_id: str="TMSSClient", qpid_broker: str=broker_feedback) -> None: def __init__(self, rest_client_creds_id: str="TMSSClient", qpid_broker: str=broker_feedback) -> None:
super().__init__() super().__init__()
...@@ -75,6 +75,7 @@ class TMSSFeedbackListener: ...@@ -75,6 +75,7 @@ class TMSSFeedbackListener:
if msg is not None: if msg is not None:
content = msg.content() content = msg.content()
logger.info("received feedback from bus='%s' %s", exchange_name, content) logger.info("received feedback from bus='%s' %s", exchange_name, content)
fbus.ack(msg)
# note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB. # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB.
# hence, it stores this id in the 'sasid' property of the message. # hence, it stores this id in the 'sasid' property of the message.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment