Select Git revision
ini_client.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
feedback_handling.py 12.46 KiB
#!/usr/bin/env python3
# subtask_scheduling.py
#
# Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: subtask_scheduling.py 1580 2015-09-30 14:18:57Z loose $
"""
The subtask_scheduling service schedules TMSS subtasks.
It listens on the lofar notification message bus for state changes of TMSS subtasks; when a task finished,
it schedules (rest action) all successors that are in state 'defined'.
"""
import os
import logging
import threading
from lofar.common.util import waitForInterrupt
from lofar.common.datetimeutils import round_to_second_precision
from datetime import datetime, timedelta
from dateutil import parser
logger = logging.getLogger(__name__)
# warning: we import the old qpid messagebus here. We have to, because cobalt/otdb still use it.
# TODO: introduce AMQP/RabbitMQ/Kombu messagebus in cobalt, get rid of otdb, then use the new messagebus here.
from lofar.messagebus import messagebus as old_qpid_messagebus
# TMSS already uses the new AMQP/RabbitMQ/Kombu messagebus. Use it here to listen for subtask status changes.
from lofar.sas.tmss.client.tmssbuslistener import TMSSBusListener, TMSSEventMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME
from lofar.sas.tmss.tmss.tmssapp.adapters.feedback import process_feedback_for_subtask_and_set_to_finished_if_complete
from lofar.sas.tmss.tmss.tmssapp.models import Subtask
class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
'''This new-style message handler listens to the TMSS event messages using the TMSSBusListener
and reads/processes old-style qpid feedback messages.
We need such a hybrid solution because TMSS gives us the subtask status changes,
and qpid/cobalt/pipelines give us the feedback. This old-style qpid handling needs to be replace once cobalt/pipelines are adapted.'''
# name of the qpid queue where the qpid feedback for dataproducts/processing is received.
QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss"
QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss"
# wait this long before cancelling the subtask if not all feedback is received
DEFAULT_FEEDBACK_WAIT_TIMEOUT = 3600
def __init__(self, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None:
super().__init__(log_event_messages=False)
# a dict of subtask_id -> wait_timeout_timestamp for timeout computation
self._finishing_subtask_timestamps = {}
self._qpid_broker = qpid_broker
self._old_qpid_frombuses = None
self._feedback_wait_timeout = feedback_wait_timeout
def start_handling(self):
try:
self._old_qpid_frombuses = [old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker),
old_qpid_messagebus.FromBus(self.QPID_PROCESSING_FEEDBACK_QUEUE, broker=self._qpid_broker)]
except Exception as e:
logger.warning("Could not connect to old-style qpid messagebus: %s", e)
self._init_timeouts_for_finishing_subtask_timestamps()
super().start_handling()
def stop_handling(self):
super().stop_handling()
if self._old_qpid_frombuses:
for frombus in self._old_qpid_frombuses:
frombus.close()
def _init_wait_timeout_for_finishing_observation_or_pipeline_subtask(self, subtask: Subtask):
if subtask.state.value == 'finishing':
specifications_template = subtask.specifications_template
if specifications_template.type.value.lower() in ('observation', 'pipeline'):
try:
finishing_timestamp = parser.parse(subtask.scheduled_stop_time, ignoretz=True)
except:
finishing_timestamp = datetime.utcnow()
wait_timeout_timestamp = round_to_second_precision(finishing_timestamp + timedelta(seconds=self._feedback_wait_timeout))
logger.info('waiting at most %d seconds until %s before yielding a warning for %s subtask id=%s if not all feedback is received by then',
(wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp,
specifications_template.type.value, subtask.id)
self._finishing_subtask_timestamps[subtask.id] = wait_timeout_timestamp
def _init_timeouts_for_finishing_subtask_timestamps(self):
'''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.'''
subtasks = Subtask.objects.filter(state__value='finishing')
for subtask in subtasks:
self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
def onSubTaskStatusChanged(self, id: int, status: str):
'''Handle TMSS subtask status changes'''
logger.info("subtask id=%s status changed to %s", id, status)
# keep track of finishing_status_change_timestamp for timeout computation
if status == 'finishing':
subtask = Subtask.objects.get(id=id)
self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
elif status in ('finished', 'cancelling', 'cancelled', 'error'):
if id in self._finishing_subtask_timestamps:
wait_timeout_timestamp = self._finishing_subtask_timestamps[id]
logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ",
(wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status)
del self._finishing_subtask_timestamps[id]
def before_receive_message(self):
# use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop
self._read_feedback_message_and_process(5)
self._detect_and_log_feedback_timeout()
super().before_receive_message()
def _detect_and_log_feedback_timeout(self):
for subtask_id, wait_timeout_timestamp in list(self._finishing_subtask_timestamps.items()):
if datetime.utcnow() > wait_timeout_timestamp:
del self._finishing_subtask_timestamps[subtask_id]
logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout)
def _read_feedback_message_and_process(self, timeout: float=1):
try:
if self._old_qpid_frombuses is None:
return
start_timestamp = datetime.utcnow()
while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
# read both the dataproducts- and processing-buses
for old_qpid_frombus in self._old_qpid_frombuses:
try:
while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
# get message from messagebus
msg = old_qpid_frombus.get(0.1)
if msg is None:
break
else:
content = msg.content()
logger.info("received message from qpid queue='%s' %s", self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, old_qpid_frombus.receiver.source.address)
old_qpid_frombus.ack(msg)
# note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB.
# hence, it stores this id in the 'sasid' property of the message.
# We know that TMSS sets its subtask_id in the parset in the Observation.ObsID field,
# so we can fetch the TMSS subtask_id from the msg's sasid.
tmss_subtask_id = int(content.sasid)
feedback = content.payload
logger.info("received feedback for sasid=%s", tmss_subtask_id)
self.process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback)
except TimeoutError:
pass
except Exception as e:
logger.error(str(e))
def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str):
if subtask_id < 1000000:
logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id)
return
tmss_subtask = Subtask.objects.get(id=subtask_id)
logger.info('submitting feedback for subtask id=%s to TMSS', subtask_id)
updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback)
logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value)
def create_service(exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT):
return TMSSBusListener(handler_type=HybridFeedbackMessageHandler, handler_kwargs={ "qpid_broker": qpid_broker, "feedback_wait_timeout": feedback_wait_timeout },
exchange=exchange, broker=broker)
def main():
# make sure we run in UTC timezone
import os
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from optparse import OptionParser, OptionGroup
# Check the invocation arguments
parser = OptionParser('%prog [options]',
description='run the tmss_feedback_handling_service which relays observation feedback from the old QPID messaging system into TMSS.')
parser.add_option('-t', '--timeout', dest='timeout', type='int', default=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT,
help='Wait for <timeout> seconds before a warning is issued if not all feedback is received, default: %default')
group = OptionGroup(parser, 'QPID Messaging options',
description="This feedback service connects to the (old and almost obsolete) QPID broker. This can and should be replaced by connecting to RabbitMQ when Cobalt and MAC have been adapted.")
group.add_option('-q', '--qpid_broker', dest='qpid_broker', type='string', default=old_qpid_messagebus.broker_feedback, help='Address of the QPID broker, default: %default')
parser.add_option_group(group)
group = OptionGroup(parser, 'RabbitMQ Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]")
parser.add_option_group(group)
group = OptionGroup(parser, 'TMSS Django options')
parser.add_option_group(group)
group.add_option('-R', '--rest_credentials', dest='rest_credentials', type='string', default='TMSSClient', help='TMSS Django REST API credentials name, default: %default')
(options, args) = parser.parse_args()
# Django setup routine
from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
with create_service(exchange=options.exchange, broker=options.broker, qpid_broker=options.qpid_broker, feedback_wait_timeout=options.timeout):
waitForInterrupt()
if __name__ == '__main__':
main()