diff --git a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py index 351b2ad41c4a4a83adf5177cd4b58a0e31bc3188..8ca079c071f31e98db55a88b9235c9c5f4aacb2d 100644 --- a/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py +++ b/SAS/TMSS/backend/services/feedback_handling/lib/feedback_handling.py @@ -1,7 +1,5 @@ #!/usr/bin/env python3 -# subtask_scheduling.py -# # Copyright (C) 2015 # ASTRON (Netherlands Institute for Radio Astronomy) # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands @@ -19,20 +17,12 @@ # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. -# -# $Id: subtask_scheduling.py 1580 2015-09-30 14:18:57Z loose $ """ -The subtask_scheduling service schedules TMSS subtasks. -It listens on the lofar notification message bus for state changes of TMSS subtasks; when a task finished, -it schedules (rest action) all successors that are in state 'defined'. """ import logging -from lofar.common.util import waitForInterrupt -from lofar.common.datetimeutils import round_to_second_precision from datetime import datetime, timedelta -from dateutil import parser logger = logging.getLogger(__name__) @@ -40,150 +30,60 @@ logger = logging.getLogger(__name__) # TODO: introduce AMQP/RabbitMQ/Kombu messagebus in cobalt, get rid of otdb, then use the new messagebus here. from lofar.messagebus import messagebus as old_qpid_messagebus -# TMSS already uses the new AMQP/RabbitMQ/Kombu messagebus. Use it here to listen for subtask status changes. -from lofar.sas.tmss.client.tmssbuslistener import TMSSBusListener, TMSSEventMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME - - -class HybridFeedbackMessageHandler(TMSSEventMessageHandler): - '''This new-style message handler listens to the TMSS event messages using the TMSSBusListener - and reads/processes old-style qpid feedback messages. - - We need such a hybrid solution because TMSS gives us the subtask status changes, - and qpid/cobalt/pipelines give us the feedback. This old-style qpid handling needs to be replace once cobalt/pipelines are adapted.''' +# name of the qpid queue where the qpid feedback for dataproducts/processing is received. +QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss" +QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss" - # name of the qpid queue where the qpid feedback for dataproducts/processing is received. - QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss" - QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss" - - # wait this long before cancelling the subtask if not all feedback is received - DEFAULT_FEEDBACK_WAIT_TIMEOUT = 3600 - - def __init__(self, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None: - super().__init__(log_event_messages=False) - # a dict of subtask_id -> wait_timeout_timestamp for timeout computation - self._finishing_subtask_timestamps = {} - self._qpid_broker = qpid_broker - self._old_qpid_frombuses = None - self._feedback_wait_timeout = feedback_wait_timeout - - def start_handling(self): - try: - self._old_qpid_frombuses = [old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker), - old_qpid_messagebus.FromBus(self.QPID_PROCESSING_FEEDBACK_QUEUE, broker=self._qpid_broker)] - except Exception as e: - logger.warning("Could not connect to old-style qpid messagebus: %s", e) - self._init_timeouts_for_finishing_subtask_timestamps() - super().start_handling() - - def stop_handling(self): - super().stop_handling() - if self._old_qpid_frombuses: - for frombus in self._old_qpid_frombuses: - frombus.close() - - def _init_wait_timeout_for_finishing_observation_or_pipeline_subtask(self, subtask): - from lofar.sas.tmss.tmss.tmssapp.models import SubtaskState - - if subtask.state.value == SubtaskState.Choices.FINISHING.value: - specifications_template = subtask.specifications_template - if specifications_template.type.value.lower() in ('observation', 'pipeline'): - try: - finishing_timestamp = parser.parse(subtask.scheduled_stop_time, ignoretz=True) - except: - finishing_timestamp = datetime.utcnow() - wait_timeout_timestamp = round_to_second_precision(finishing_timestamp + timedelta(seconds=self._feedback_wait_timeout)) - logger.info('waiting at most %d seconds until %s before yielding a warning for %s subtask id=%s if not all feedback is received by then', - (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp, - specifications_template.type.value, subtask.id) - self._finishing_subtask_timestamps[subtask.id] = wait_timeout_timestamp - - def _init_timeouts_for_finishing_subtask_timestamps(self): - '''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.''' - from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskState - subtasks = Subtask.objects.filter(state__value=SubtaskState.Choices.FINISHING.value) - for subtask in subtasks: - self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask) - - def onSubTaskStatusChanged(self, id: int, status: str): - '''Handle TMSS subtask status changes''' - from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskState - logger.info("subtask id=%s status changed to %s", id, status) - # keep track of finishing_status_change_timestamp for timeout computation - if status == SubtaskState.Choices.FINISHING.value: - subtask = Subtask.objects.get(id=id) - self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask) - elif status in ('finished', 'cancelling', 'cancelled', 'error'): - if id in self._finishing_subtask_timestamps: - wait_timeout_timestamp = self._finishing_subtask_timestamps[id] - logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ", - (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status) - del self._finishing_subtask_timestamps[id] - - def before_receive_message(self): - # use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop - self._read_feedback_message_and_process(5) - self._detect_and_log_feedback_timeout() - super().before_receive_message() - - def _detect_and_log_feedback_timeout(self): - for subtask_id, wait_timeout_timestamp in list(self._finishing_subtask_timestamps.items()): - if datetime.utcnow() > wait_timeout_timestamp: - del self._finishing_subtask_timestamps[subtask_id] - logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout) - - def _read_feedback_message_and_process(self, timeout: float=1): - try: - if self._old_qpid_frombuses is None: - return - - start_timestamp = datetime.utcnow() - - while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout): - # read both the dataproducts- and processing-buses - for old_qpid_frombus in self._old_qpid_frombuses: - try: - while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout): - # get message from messagebus - msg = old_qpid_frombus.get(0.1) - - if msg is None: - break - else: - content = msg.content() - logger.info("received message from qpid queue='%s' %s", self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, old_qpid_frombus.receiver.source.address) - old_qpid_frombus.ack(msg) - - # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB. - # hence, it stores this id in the 'sasid' property of the message. - # We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field, - # so we can fetch the TMSS subtask_id from the msg's sasid. - tmss_subtask_id = int(content.sasid) - feedback = content.payload - - logger.info("received feedback for sasid=%s", tmss_subtask_id) - - self.process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback) - except TimeoutError: - pass - except Exception as e: - logger.error(str(e)) - - def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str): - if subtask_id < 1000000: - logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id) +def read_feedback_message_and_process(old_qpid_frombuses, timeout: float=1): + try: + if old_qpid_frombuses is None: return - from lofar.sas.tmss.tmss.tmssapp.models import Subtask - from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import process_feedback_for_subtask_and_set_to_finished_if_complete + start_timestamp = datetime.utcnow() - tmss_subtask = Subtask.objects.get(id=subtask_id) - logger.info('submitting feedback for subtask id=%s to TMSS', subtask_id) - updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback) - logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value) + while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout): + # read both the dataproducts- and processing-buses + for old_qpid_frombus in old_qpid_frombuses: + try: + while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout): + # get message from messagebus + msg = old_qpid_frombus.get(0.1) + + if msg is None: + break + else: + content = msg.content() + logger.info("received message from qpid queue='%s'", old_qpid_frombus.receiver.source.address) + old_qpid_frombus.ack(msg) + + # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB. + # hence, it stores this id in the 'sasid' property of the message. + # We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field, + # so we can fetch the TMSS subtask_id from the msg's sasid. + tmss_subtask_id = int(content.sasid) + feedback = content.payload + + logger.info("received feedback for sasid=%s", tmss_subtask_id) + + process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback) + except TimeoutError: + pass + except Exception as e: + logger.error(str(e)) + +def process_feedback_and_set_to_finished_if_complete(subtask_id: int, feedback: str): + if subtask_id < 1000000: + logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id) + return + + from lofar.sas.tmss.tmss.tmssapp.models import Subtask + from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import process_feedback_for_subtask_and_set_to_finished_if_complete + + tmss_subtask = Subtask.objects.get(id=subtask_id) + logger.info('submitting feedback for subtask id=%s to TMSS', subtask_id) + updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback) + logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value) -def create_service(handler_type=HybridFeedbackMessageHandler,exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT): - return TMSSBusListener(handler_type=handler_type, handler_kwargs={ "qpid_broker": qpid_broker, "feedback_wait_timeout": feedback_wait_timeout }, - exchange=exchange, broker=broker) def main(): # make sure we run in UTC timezone @@ -198,19 +98,11 @@ def main(): parser = OptionParser('%prog [options]', description='run the tmss_feedback_handling_service which relays observation feedback from the old QPID messaging system into TMSS.') - parser.add_option('-t', '--timeout', dest='timeout', type='int', default=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT, - help='Wait for <timeout> seconds before a warning is issued if not all feedback is received, default: %default') - group = OptionGroup(parser, 'QPID Messaging options', description="This feedback service connects to the (old and almost obsolete) QPID broker. This can and should be replaced by connecting to RabbitMQ when Cobalt and MAC have been adapted.") group.add_option('-q', '--qpid_broker', dest='qpid_broker', type='string', default=old_qpid_messagebus.broker_feedback, help='Address of the QPID broker, default: %default') parser.add_option_group(group) - group = OptionGroup(parser, 'RabbitMQ Messaging options') - group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default') - group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]") - parser.add_option_group(group) - group = OptionGroup(parser, 'TMSS Django options') parser.add_option_group(group) group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default') @@ -220,8 +112,18 @@ def main(): from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials) - with create_service(exchange=options.exchange, broker=options.broker, qpid_broker=options.qpid_broker, feedback_wait_timeout=options.timeout): - waitForInterrupt() + old_qpid_frombuses = [old_qpid_messagebus.FromBus(QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=options.qpid_broker), + old_qpid_messagebus.FromBus(QPID_PROCESSING_FEEDBACK_QUEUE, broker=options.qpid_broker)] + + while True: + try: + read_feedback_message_and_process(old_qpid_frombuses, timeout=5) + except KeyboardInterrupt: + break + + for bus in old_qpid_frombuses: + bus.close() + if __name__ == '__main__': main()