Skip to content
Snippets Groups Projects
Select Git revision
  • 39040e2cc47a864c06fde902794eb6b948b55e0e
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

feedback_handling.py

Blame
  • Mario Raciti's avatar
    TMSS-1781: Update feedback_handling by replacing the REST API use with Django ORM for optimisation
    Mario Raciti authored
    39040e2c
    History
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    feedback_handling.py 12.46 KiB
    #!/usr/bin/env python3
    
    # subtask_scheduling.py
    #
    # Copyright (C) 2015
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id: subtask_scheduling.py 1580 2015-09-30 14:18:57Z loose $
    
    """
    The subtask_scheduling service schedules TMSS subtasks.
    It listens on the lofar notification message bus for state changes of TMSS subtasks; when a task finished,
    it schedules (rest action) all successors that are in state 'defined'.
    """
    
    import os
    import logging
    import threading
    from lofar.common.util import waitForInterrupt
    from lofar.common.datetimeutils import round_to_second_precision
    from datetime import datetime, timedelta
    from dateutil import parser
    
    logger = logging.getLogger(__name__)
    
    # warning: we import the old qpid messagebus here. We have to, because cobalt/otdb still use it.
    # TODO: introduce AMQP/RabbitMQ/Kombu messagebus in cobalt, get rid of otdb, then use the new messagebus here.
    from lofar.messagebus import messagebus as old_qpid_messagebus
    
    # TMSS already uses the new AMQP/RabbitMQ/Kombu messagebus. Use it here to listen for subtask status changes.
    from lofar.sas.tmss.client.tmssbuslistener import TMSSBusListener, TMSSEventMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME
    
    from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import process_feedback_for_subtask_and_set_to_finished_if_complete
    from lofar.sas.tmss.tmss.tmssapp.models import Subtask
    
    
    class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
        '''This new-style message handler listens to the TMSS event messages using the TMSSBusListener
        and reads/processes old-style qpid feedback messages.
    
        We need such a hybrid solution because TMSS gives us the subtask status changes,
        and qpid/cobalt/pipelines give us the feedback. This old-style qpid handling needs to be replace once cobalt/pipelines are adapted.'''
    
        # name of the qpid queue where the qpid feedback for dataproducts/processing is received.
        QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss"
        QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss"
    
        # wait this long before cancelling the subtask if not all feedback is received
        DEFAULT_FEEDBACK_WAIT_TIMEOUT = 3600
    
        def __init__(self, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None:
            super().__init__(log_event_messages=False)
            # a dict of subtask_id -> wait_timeout_timestamp for timeout computation
            self._finishing_subtask_timestamps = {}
            self._qpid_broker = qpid_broker
            self._old_qpid_frombuses = None
            self._feedback_wait_timeout = feedback_wait_timeout
    
        def start_handling(self):
            try:
                self._old_qpid_frombuses = [old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker),
                                            old_qpid_messagebus.FromBus(self.QPID_PROCESSING_FEEDBACK_QUEUE, broker=self._qpid_broker)]
            except Exception as e:
                logger.warning("Could not connect to old-style qpid messagebus: %s", e)
            self._init_timeouts_for_finishing_subtask_timestamps()
            super().start_handling()
    
        def stop_handling(self):
            super().stop_handling()
            if self._old_qpid_frombuses:
                for frombus in self._old_qpid_frombuses:
                    frombus.close()
    
        def _init_wait_timeout_for_finishing_observation_or_pipeline_subtask(self, subtask: Subtask):
            if subtask.state.value == 'finishing':
                specifications_template = subtask.specifications_template
                if specifications_template.type.value.lower() in ('observation', 'pipeline'):
                    try:
                        finishing_timestamp = parser.parse(subtask.scheduled_stop_time, ignoretz=True)
                    except:
                        finishing_timestamp = datetime.utcnow()
                    wait_timeout_timestamp = round_to_second_precision(finishing_timestamp + timedelta(seconds=self._feedback_wait_timeout))
                    logger.info('waiting at most %d seconds until %s before yielding a warning for %s subtask id=%s if not all feedback is received by then',
                                (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp,
                                specifications_template.type.value, subtask.id)
                    self._finishing_subtask_timestamps[subtask.id] = wait_timeout_timestamp
    
        def _init_timeouts_for_finishing_subtask_timestamps(self):
            '''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.'''
            subtasks = Subtask.objects.filter(state__value='finishing')
            for subtask in subtasks:
                self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
    
        def onSubTaskStatusChanged(self, id: int, status: str):
            '''Handle TMSS subtask status changes'''
            logger.info("subtask id=%s status changed to %s", id, status)
            # keep track of finishing_status_change_timestamp for timeout computation
            if status == 'finishing':
                subtask = Subtask.objects.get(id=id)
                self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
            elif status in ('finished', 'cancelling', 'cancelled', 'error'):
                if id in self._finishing_subtask_timestamps:
                    wait_timeout_timestamp = self._finishing_subtask_timestamps[id]
                    logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ",
                                (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status)
                    del self._finishing_subtask_timestamps[id]
    
        def before_receive_message(self):
            # use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop
            self._read_feedback_message_and_process(5)
            self._detect_and_log_feedback_timeout()
            super().before_receive_message()
    
        def _detect_and_log_feedback_timeout(self):
            for subtask_id, wait_timeout_timestamp in list(self._finishing_subtask_timestamps.items()):
                if datetime.utcnow() > wait_timeout_timestamp:
                    del self._finishing_subtask_timestamps[subtask_id]
                    logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout)
    
        def _read_feedback_message_and_process(self, timeout: float=1):
            try:
                if self._old_qpid_frombuses is None:
                    return
    
                start_timestamp = datetime.utcnow()
    
                while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
                    # read both the dataproducts- and processing-buses
                    for old_qpid_frombus in self._old_qpid_frombuses:
                        try:
                            while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
                                # get message from messagebus
                                msg = old_qpid_frombus.get(0.1)
    
                                if msg is None:
                                    break
                                else:
                                    content = msg.content()
                                    logger.info("received message from qpid queue='%s' %s", self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, old_qpid_frombus.receiver.source.address)
                                    old_qpid_frombus.ack(msg)
    
                                    # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB.
                                    # hence, it stores this id in the 'sasid' property of the message.
                                    # We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field,
                                    # so we can fetch the TMSS subtask_id from the msg's sasid.
                                    tmss_subtask_id = int(content.sasid)
                                    feedback = content.payload
    
                                    logger.info("received feedback for sasid=%s", tmss_subtask_id)
    
                                    self.process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback)
                        except TimeoutError:
                            pass
            except Exception as e:
                logger.error(str(e))
    
        def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str):
            if subtask_id < 1000000:
                logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id)
                return
    
            tmss_subtask = Subtask.objects.get(id=subtask_id)
            logger.info('submitting feedback for subtask id=%s to TMSS', subtask_id)
            updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback)
            logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value)
    
    def create_service(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT):
        return TMSSBusListener(handler_type=HybridFeedbackMessageHandler, handler_kwargs={ "qpid_broker": qpid_broker, "feedback_wait_timeout": feedback_wait_timeout },
                               exchange=exchange, broker=broker)
    
    def main():
        # make sure we run in UTC timezone
        import os
        os.environ['TZ'] = 'UTC'
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
        from optparse import OptionParser, OptionGroup
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options]',
                              description='run the tmss_feedback_handling_service which relays observation feedback from the old QPID messaging system into TMSS.')
    
        parser.add_option('-t', '--timeout', dest='timeout', type='int', default=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT,
                          help='Wait for <timeout> seconds before a warning is issued if not all feedback is received, default: %default')
    
        group = OptionGroup(parser, 'QPID Messaging options',
                            description="This feedback service connects to the (old and almost obsolete) QPID broker. This can and should be replaced by connecting to RabbitMQ when Cobalt and MAC have been adapted.")
        group.add_option('-q', '--qpid_broker', dest='qpid_broker', type='string', default=old_qpid_messagebus.broker_feedback, help='Address of the QPID broker, default: %default')
        parser.add_option_group(group)
    
        group = OptionGroup(parser, 'RabbitMQ Messaging options')
        group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
        group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]")
        parser.add_option_group(group)
    
        group = OptionGroup(parser, 'TMSS Django options')
        parser.add_option_group(group)
        group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='TMSS Django REST API credentials name, default: %default')
    
        (options, args) = parser.parse_args()
    
        # Django setup routine
        from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
        setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
    
        with create_service(exchange=options.exchange, broker=options.broker, qpid_broker=options.qpid_broker, feedback_wait_timeout=options.timeout):
            waitForInterrupt()
    
    if __name__ == '__main__':
        main()