From 115bf4d986275655b9cd7c781b6e2583b6c46be7 Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Mon, 1 Feb 2021 16:29:55 +0100 Subject: [PATCH] TMSS-307: while working on TMSS commissioning with TMSS-573 it was discovered that cobalt sends its feedback to a queue with a single consumer. Now with two consumers (the extra tmss_feedback_service) they were fighting over the messages. Hence, the queue has been replaced with a topic exchange which duplicates the message in two seperate queues for otdb and for tmss. --- SAS/Feedback_Service/src/FeedbackService.conf | 2 +- SAS/QPIDInfrastructure/amqp-infra-setup.sh | 14 ++++++++++++-- .../feedback_handling/lib/feedback_handling.py | 3 ++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/SAS/Feedback_Service/src/FeedbackService.conf b/SAS/Feedback_Service/src/FeedbackService.conf index d733ca4bbe2..d2843a89b46 100644 --- a/SAS/Feedback_Service/src/FeedbackService.conf +++ b/SAS/Feedback_Service/src/FeedbackService.conf @@ -3,7 +3,7 @@ # # Parameters for FeedbackService to connect to Qpid. # -FeedbackQueuenames = [ "otdb.task.feedback.dataproducts" , "otdb.task.feedback.processing" ] +FeedbackQueuenames = [ "otdb.task.feedback.dataproducts.for_otdb" , "otdb.task.feedback.processing.for_otdb" ] FeedbackService.OTDBdatabase = LOFAR_4 FeedbackService.OTDBhostname = sasdb diff --git a/SAS/QPIDInfrastructure/amqp-infra-setup.sh b/SAS/QPIDInfrastructure/amqp-infra-setup.sh index 6bfd5f0477d..bca1dd197a4 100755 --- a/SAS/QPIDInfrastructure/amqp-infra-setup.sh +++ b/SAS/QPIDInfrastructure/amqp-infra-setup.sh @@ -43,8 +43,18 @@ fi # MessageBus qpid-config -b $CCU add queue ${PREFIX}mac.task.feedback.state --durable -qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.dataproducts --durable -qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.processing --durable +qpid-config -b $MCU del queue topic ${PREFIX}otdb.task.feedback.dataproducts --durable +qpid-config -b $MCU add exchange topic ${PREFIX}otdb.task.feedback.dataproducts --durable +qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.dataproducts.for_otdb --durable +qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.dataproducts ${PREFIX}otdb.task.feedback.dataproducts.for_otdb "#" --durable +qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.dataproducts.for_tmss --durable +qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.dataproducts ${PREFIX}otdb.task.feedback.dataproducts.for_tmss "#" --durable +qpid-config -b $MCU del queue topic ${PREFIX}otdb.task.feedback.processing --durable +qpid-config -b $MCU add exchange topic ${PREFIX}otdb.task.feedback.processing --durable +qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.processing.for_otdb --durable +qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.processing ${PREFIX}otdb.task.feedback.processing.for_otdb "#" --durable +qpid-config -b $MCU add queue ${PREFIX}otdb.task.feedback.processing.for_tmss --durable +qpid-config -b $MCU bind ${PREFIX}otdb.task.feedback.processing ${PREFIX}otdb.task.feedback.processing.for_tmss "#" --durable qpid-config -b $MCU add queue ${PREFIX}lofar.task.specification.system --durable qpid-config -b $CCU add queue ${PREFIX}lofar.task.specification.system --durable qpid-config -b $CCU add queue ${PREFIX}mom.task.specification.system --durable diff --git a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py index e5c6b048192..f95ca4be6e0 100644 --- a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py +++ b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py @@ -42,7 +42,7 @@ class TMSSFeedbackListener: # Note we can't use fancy bus listeners since we have to use the really old messagebus package for feedback # todo: get rid of old-style messaging or improve this service stub threads = [] - exchanges = ["otdb.task.feedback.processing", "otdb.task.feedback.dataproducts"] + exchanges = ["otdb.task.feedback.processing.for_tmss", "otdb.task.feedback.dataproducts.for_tmss"] def __init__(self, rest_client_creds_id: str="TMSSClient", qpid_broker: str=broker_feedback) -> None: super().__init__() @@ -75,6 +75,7 @@ class TMSSFeedbackListener: if msg is not None: content = msg.content() logger.info("received feedback from bus='%s' %s", exchange_name, content) + fbus.ack(msg) # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB. # hence, it stores this id in the 'sasid' property of the message. -- GitLab