Skip to content
Snippets Groups Projects
Select Git revision
  • 5098f38577999518cc11f292c48527f83331873d
  • master default protected
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • control-single-hba-and-lba
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
  • v0.51.9-4 protected
  • v0.51.9-3 protected
  • v0.51.9-2 protected
  • v0.51.9-1 protected
  • v0.51.9 protected
  • v0.51.8 protected
  • v0.39.15-wsrttwo protected
  • v0.39.15-wsrt protected
41 results

ini_client.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    feedback_handling.py 12.46 KiB
    #!/usr/bin/env python3
    
    # subtask_scheduling.py
    #
    # Copyright (C) 2015
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id: subtask_scheduling.py 1580 2015-09-30 14:18:57Z loose $
    
    """
    The subtask_scheduling service schedules TMSS subtasks.
    It listens on the lofar notification message bus for state changes of TMSS subtasks; when a task finished,
    it schedules (rest action) all successors that are in state 'defined'.
    """
    
    import os
    import logging
    import threading
    from lofar.common.util import waitForInterrupt
    from lofar.common.datetimeutils import round_to_second_precision
    from datetime import datetime, timedelta
    from dateutil import parser
    
    logger = logging.getLogger(__name__)
    
    # warning: we import the old qpid messagebus here. We have to, because cobalt/otdb still use it.
    # TODO: introduce AMQP/RabbitMQ/Kombu messagebus in cobalt, get rid of otdb, then use the new messagebus here.
    from lofar.messagebus import messagebus as old_qpid_messagebus
    
    # TMSS already uses the new AMQP/RabbitMQ/Kombu messagebus. Use it here to listen for subtask status changes.
    from lofar.sas.tmss.client.tmssbuslistener import TMSSBusListener, TMSSEventMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME
    
    from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import process_feedback_for_subtask_and_set_to_finished_if_complete
    from lofar.sas.tmss.tmss.tmssapp.models import Subtask
    
    
    class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
        '''This new-style message handler listens to the TMSS event messages using the TMSSBusListener
        and reads/processes old-style qpid feedback messages.
    
        We need such a hybrid solution because TMSS gives us the subtask status changes,
        and qpid/cobalt/pipelines give us the feedback. This old-style qpid handling needs to be replace once cobalt/pipelines are adapted.'''
    
        # name of the qpid queue where the qpid feedback for dataproducts/processing is received.
        QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss"
        QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss"
    
        # wait this long before cancelling the subtask if not all feedback is received
        DEFAULT_FEEDBACK_WAIT_TIMEOUT = 3600
    
        def __init__(self, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None:
            super().__init__(log_event_messages=False)
            # a dict of subtask_id -> wait_timeout_timestamp for timeout computation
            self._finishing_subtask_timestamps = {}
            self._qpid_broker = qpid_broker
            self._old_qpid_frombuses = None
            self._feedback_wait_timeout = feedback_wait_timeout
    
        def start_handling(self):
            try:
                self._old_qpid_frombuses = [old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker),
                                            old_qpid_messagebus.FromBus(self.QPID_PROCESSING_FEEDBACK_QUEUE, broker=self._qpid_broker)]
            except Exception as e:
                logger.warning("Could not connect to old-style qpid messagebus: %s", e)
            self._init_timeouts_for_finishing_subtask_timestamps()
            super().start_handling()
    
        def stop_handling(self):
            super().stop_handling()
            if self._old_qpid_frombuses:
                for frombus in self._old_qpid_frombuses:
                    frombus.close()
    
        def _init_wait_timeout_for_finishing_observation_or_pipeline_subtask(self, subtask: Subtask):
            if subtask.state.value == 'finishing':
                specifications_template = subtask.specifications_template
                if specifications_template.type.value.lower() in ('observation', 'pipeline'):
                    try:
                        finishing_timestamp = parser.parse(subtask.scheduled_stop_time, ignoretz=True)
                    except:
                        finishing_timestamp = datetime.utcnow()
                    wait_timeout_timestamp = round_to_second_precision(finishing_timestamp + timedelta(seconds=self._feedback_wait_timeout))
                    logger.info('waiting at most %d seconds until %s before yielding a warning for %s subtask id=%s if not all feedback is received by then',
                                (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp,
                                specifications_template.type.value, subtask.id)
                    self._finishing_subtask_timestamps[subtask.id] = wait_timeout_timestamp
    
        def _init_timeouts_for_finishing_subtask_timestamps(self):
            '''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.'''
            subtasks = Subtask.objects.filter(state__value='finishing')
            for subtask in subtasks:
                self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
    
        def onSubTaskStatusChanged(self, id: int, status: str):
            '''Handle TMSS subtask status changes'''
            logger.info("subtask id=%s status changed to %s", id, status)
            # keep track of finishing_status_change_timestamp for timeout computation
            if status == 'finishing':
                subtask = Subtask.objects.get(id=id)
                self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
            elif status in ('finished', 'cancelling', 'cancelled', 'error'):
                if id in self._finishing_subtask_timestamps:
                    wait_timeout_timestamp = self._finishing_subtask_timestamps[id]
                    logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ",
                                (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status)
                    del self._finishing_subtask_timestamps[id]
    
        def before_receive_message(self):
            # use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop
            self._read_feedback_message_and_process(5)
            self._detect_and_log_feedback_timeout()
            super().before_receive_message()
    
        def _detect_and_log_feedback_timeout(self):
            for subtask_id, wait_timeout_timestamp in list(self._finishing_subtask_timestamps.items()):
                if datetime.utcnow() > wait_timeout_timestamp:
                    del self._finishing_subtask_timestamps[subtask_id]
                    logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout)
    
        def _read_feedback_message_and_process(self, timeout: float=1):
            try:
                if self._old_qpid_frombuses is None:
                    return
    
                start_timestamp = datetime.utcnow()
    
                while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
                    # read both the dataproducts- and processing-buses
                    for old_qpid_frombus in self._old_qpid_frombuses:
                        try:
                            while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
                                # get message from messagebus
                                msg = old_qpid_frombus.get(0.1)
    
                                if msg is None:
                                    break
                                else:
                                    content = msg.content()
                                    logger.info("received message from qpid queue='%s' %s", self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, old_qpid_frombus.receiver.source.address)
                                    old_qpid_frombus.ack(msg)
    
                                    # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB.
                                    # hence, it stores this id in the 'sasid' property of the message.
                                    # We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field,
                                    # so we can fetch the TMSS subtask_id from the msg's sasid.
                                    tmss_subtask_id = int(content.sasid)
                                    feedback = content.payload
    
                                    logger.info("received feedback for sasid=%s", tmss_subtask_id)
    
                                    self.process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback)
                        except TimeoutError:
                            pass
            except Exception as e:
                logger.error(str(e))
    
        def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str):
            if subtask_id < 1000000:
                logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id)
                return
    
            tmss_subtask = Subtask.objects.get(id=subtask_id)
            logger.info('submitting feedback for subtask id=%s to TMSS', subtask_id)
            updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback)
            logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value)
    
    def create_service(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT):
        return TMSSBusListener(handler_type=HybridFeedbackMessageHandler, handler_kwargs={ "qpid_broker": qpid_broker, "feedback_wait_timeout": feedback_wait_timeout },
                               exchange=exchange, broker=broker)
    
    def main():
        # make sure we run in UTC timezone
        import os
        os.environ['TZ'] = 'UTC'
    
        logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
    
        from optparse import OptionParser, OptionGroup
    
        # Check the invocation arguments
        parser = OptionParser('%prog [options]',
                              description='run the tmss_feedback_handling_service which relays observation feedback from the old QPID messaging system into TMSS.')
    
        parser.add_option('-t', '--timeout', dest='timeout', type='int', default=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT,
                          help='Wait for <timeout> seconds before a warning is issued if not all feedback is received, default: %default')
    
        group = OptionGroup(parser, 'QPID Messaging options',
                            description="This feedback service connects to the (old and almost obsolete) QPID broker. This can and should be replaced by connecting to RabbitMQ when Cobalt and MAC have been adapted.")
        group.add_option('-q', '--qpid_broker', dest='qpid_broker', type='string', default=old_qpid_messagebus.broker_feedback, help='Address of the QPID broker, default: %default')
        parser.add_option_group(group)
    
        group = OptionGroup(parser, 'RabbitMQ Messaging options')
        group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
        group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]")
        parser.add_option_group(group)
    
        group = OptionGroup(parser, 'TMSS Django options')
        parser.add_option_group(group)
        group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='TMSS Django REST API credentials name, default: %default')
    
        (options, args) = parser.parse_args()
    
        # Django setup routine
        from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
        setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
    
        with create_service(exchange=options.exchange, broker=options.broker, qpid_broker=options.qpid_broker, feedback_wait_timeout=options.timeout):
            waitForInterrupt()
    
    if __name__ == '__main__':
        main()