Skip to content
Snippets Groups Projects
Commit 8045105d authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

simplified feedback handling. No more need for subtask status event handling...

simplified feedback handling. No more need for subtask status event handling and cancelling upon 1h timeout. Just use the plain old qpid bus until cobalt/pipelines start using rabbitmq/kombu
parent 17156b56
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python3 #!/usr/bin/env python3
# subtask_scheduling.py
#
# Copyright (C) 2015 # Copyright (C) 2015
# ASTRON (Netherlands Institute for Radio Astronomy) # ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
...@@ -19,20 +17,12 @@ ...@@ -19,20 +17,12 @@
# #
# You should have received a copy of the GNU General Public License along # You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: subtask_scheduling.py 1580 2015-09-30 14:18:57Z loose $
""" """
The subtask_scheduling service schedules TMSS subtasks.
It listens on the lofar notification message bus for state changes of TMSS subtasks; when a task finished,
it schedules (rest action) all successors that are in state 'defined'.
""" """
import logging import logging
from lofar.common.util import waitForInterrupt
from lofar.common.datetimeutils import round_to_second_precision
from datetime import datetime, timedelta from datetime import datetime, timedelta
from dateutil import parser
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
...@@ -40,107 +30,20 @@ logger = logging.getLogger(__name__) ...@@ -40,107 +30,20 @@ logger = logging.getLogger(__name__)
# TODO: introduce AMQP/RabbitMQ/Kombu messagebus in cobalt, get rid of otdb, then use the new messagebus here. # TODO: introduce AMQP/RabbitMQ/Kombu messagebus in cobalt, get rid of otdb, then use the new messagebus here.
from lofar.messagebus import messagebus as old_qpid_messagebus from lofar.messagebus import messagebus as old_qpid_messagebus
# TMSS already uses the new AMQP/RabbitMQ/Kombu messagebus. Use it here to listen for subtask status changes.
from lofar.sas.tmss.client.tmssbuslistener import TMSSBusListener, TMSSEventMessageHandler, DEFAULT_BROKER, DEFAULT_BUSNAME
class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
'''This new-style message handler listens to the TMSS event messages using the TMSSBusListener
and reads/processes old-style qpid feedback messages.
We need such a hybrid solution because TMSS gives us the subtask status changes,
and qpid/cobalt/pipelines give us the feedback. This old-style qpid handling needs to be replace once cobalt/pipelines are adapted.'''
# name of the qpid queue where the qpid feedback for dataproducts/processing is received. # name of the qpid queue where the qpid feedback for dataproducts/processing is received.
QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss" QPID_DATAPRODUCT_FEEDBACK_QUEUE = "otdb.task.feedback.dataproducts.for_tmss"
QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss" QPID_PROCESSING_FEEDBACK_QUEUE = "otdb.task.feedback.processing.for_tmss"
# wait this long before cancelling the subtask if not all feedback is received def read_feedback_message_and_process(old_qpid_frombuses, timeout: float=1):
DEFAULT_FEEDBACK_WAIT_TIMEOUT = 3600
def __init__(self, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None:
super().__init__(log_event_messages=False)
# a dict of subtask_id -> wait_timeout_timestamp for timeout computation
self._finishing_subtask_timestamps = {}
self._qpid_broker = qpid_broker
self._old_qpid_frombuses = None
self._feedback_wait_timeout = feedback_wait_timeout
def start_handling(self):
try:
self._old_qpid_frombuses = [old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker),
old_qpid_messagebus.FromBus(self.QPID_PROCESSING_FEEDBACK_QUEUE, broker=self._qpid_broker)]
except Exception as e:
logger.warning("Could not connect to old-style qpid messagebus: %s", e)
self._init_timeouts_for_finishing_subtask_timestamps()
super().start_handling()
def stop_handling(self):
super().stop_handling()
if self._old_qpid_frombuses:
for frombus in self._old_qpid_frombuses:
frombus.close()
def _init_wait_timeout_for_finishing_observation_or_pipeline_subtask(self, subtask):
from lofar.sas.tmss.tmss.tmssapp.models import SubtaskState
if subtask.state.value == SubtaskState.Choices.FINISHING.value:
specifications_template = subtask.specifications_template
if specifications_template.type.value.lower() in ('observation', 'pipeline'):
try: try:
finishing_timestamp = parser.parse(subtask.scheduled_stop_time, ignoretz=True) if old_qpid_frombuses is None:
except:
finishing_timestamp = datetime.utcnow()
wait_timeout_timestamp = round_to_second_precision(finishing_timestamp + timedelta(seconds=self._feedback_wait_timeout))
logger.info('waiting at most %d seconds until %s before yielding a warning for %s subtask id=%s if not all feedback is received by then',
(wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp,
specifications_template.type.value, subtask.id)
self._finishing_subtask_timestamps[subtask.id] = wait_timeout_timestamp
def _init_timeouts_for_finishing_subtask_timestamps(self):
'''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.'''
from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskState
subtasks = Subtask.objects.filter(state__value=SubtaskState.Choices.FINISHING.value)
for subtask in subtasks:
self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
def onSubTaskStatusChanged(self, id: int, status: str):
'''Handle TMSS subtask status changes'''
from lofar.sas.tmss.tmss.tmssapp.models import Subtask, SubtaskState
logger.info("subtask id=%s status changed to %s", id, status)
# keep track of finishing_status_change_timestamp for timeout computation
if status == SubtaskState.Choices.FINISHING.value:
subtask = Subtask.objects.get(id=id)
self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
elif status in ('finished', 'cancelling', 'cancelled', 'error'):
if id in self._finishing_subtask_timestamps:
wait_timeout_timestamp = self._finishing_subtask_timestamps[id]
logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ",
(wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status)
del self._finishing_subtask_timestamps[id]
def before_receive_message(self):
# use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop
self._read_feedback_message_and_process(5)
self._detect_and_log_feedback_timeout()
super().before_receive_message()
def _detect_and_log_feedback_timeout(self):
for subtask_id, wait_timeout_timestamp in list(self._finishing_subtask_timestamps.items()):
if datetime.utcnow() > wait_timeout_timestamp:
del self._finishing_subtask_timestamps[subtask_id]
logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout)
def _read_feedback_message_and_process(self, timeout: float=1):
try:
if self._old_qpid_frombuses is None:
return return
start_timestamp = datetime.utcnow() start_timestamp = datetime.utcnow()
while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout): while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
# read both the dataproducts- and processing-buses # read both the dataproducts- and processing-buses
for old_qpid_frombus in self._old_qpid_frombuses: for old_qpid_frombus in old_qpid_frombuses:
try: try:
while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout): while datetime.utcnow() - start_timestamp < timedelta(seconds=timeout):
# get message from messagebus # get message from messagebus
...@@ -150,7 +53,7 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -150,7 +53,7 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
break break
else: else:
content = msg.content() content = msg.content()
logger.info("received message from qpid queue='%s' %s", self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, old_qpid_frombus.receiver.source.address) logger.info("received message from qpid queue='%s'", old_qpid_frombus.receiver.source.address)
old_qpid_frombus.ack(msg) old_qpid_frombus.ack(msg)
# note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB. # note: cobalt/rtcp creates feedback and assumes that the observationID has its origin in OTDB.
...@@ -162,13 +65,13 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -162,13 +65,13 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
logger.info("received feedback for sasid=%s", tmss_subtask_id) logger.info("received feedback for sasid=%s", tmss_subtask_id)
self.process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback) process_feedback_and_set_to_finished_if_complete(tmss_subtask_id, feedback)
except TimeoutError: except TimeoutError:
pass pass
except Exception as e: except Exception as e:
logger.error(str(e)) logger.error(str(e))
def process_feedback_and_set_to_finished_if_complete(self, subtask_id: int, feedback: str): def process_feedback_and_set_to_finished_if_complete(subtask_id: int, feedback: str):
if subtask_id < 1000000: if subtask_id < 1000000:
logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id) logger.info('not processing feedback for non-TMSS observation id=%s', subtask_id)
return return
...@@ -181,9 +84,6 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -181,9 +84,6 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback) updated_subtask = process_feedback_for_subtask_and_set_to_finished_if_complete(subtask=tmss_subtask, feedback_doc=feedback)
logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value) logger.info('subtask id=%s with the processed feedback has state %s', subtask_id, updated_subtask.state.value)
def create_service(handler_type=HybridFeedbackMessageHandler,exchange: str = DEFAULT_BUSNAME, broker: str = DEFAULT_BROKER, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT):
return TMSSBusListener(handler_type=handler_type, handler_kwargs={ "qpid_broker": qpid_broker, "feedback_wait_timeout": feedback_wait_timeout },
exchange=exchange, broker=broker)
def main(): def main():
# make sure we run in UTC timezone # make sure we run in UTC timezone
...@@ -198,19 +98,11 @@ def main(): ...@@ -198,19 +98,11 @@ def main():
parser = OptionParser('%prog [options]', parser = OptionParser('%prog [options]',
description='run the tmss_feedback_handling_service which relays observation feedback from the old QPID messaging system into TMSS.') description='run the tmss_feedback_handling_service which relays observation feedback from the old QPID messaging system into TMSS.')
parser.add_option('-t', '--timeout', dest='timeout', type='int', default=HybridFeedbackMessageHandler.DEFAULT_FEEDBACK_WAIT_TIMEOUT,
help='Wait for <timeout> seconds before a warning is issued if not all feedback is received, default: %default')
group = OptionGroup(parser, 'QPID Messaging options', group = OptionGroup(parser, 'QPID Messaging options',
description="This feedback service connects to the (old and almost obsolete) QPID broker. This can and should be replaced by connecting to RabbitMQ when Cobalt and MAC have been adapted.") description="This feedback service connects to the (old and almost obsolete) QPID broker. This can and should be replaced by connecting to RabbitMQ when Cobalt and MAC have been adapted.")
group.add_option('-q', '--qpid_broker', dest='qpid_broker', type='string', default=old_qpid_messagebus.broker_feedback, help='Address of the QPID broker, default: %default') group.add_option('-q', '--qpid_broker', dest='qpid_broker', type='string', default=old_qpid_messagebus.broker_feedback, help='Address of the QPID broker, default: %default')
parser.add_option_group(group) parser.add_option_group(group)
group = OptionGroup(parser, 'RabbitMQ Messaging options')
group.add_option('-b', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, help='Address of the message broker, default: %default')
group.add_option('-e', "--exchange", dest="exchange", type="string", default=DEFAULT_BUSNAME, help="exchange where the TMSS event messages are published. [default: %default]")
parser.add_option_group(group)
group = OptionGroup(parser, 'TMSS Django options') group = OptionGroup(parser, 'TMSS Django options')
parser.add_option_group(group) parser.add_option_group(group)
group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default') group.add_option('-C', '--credentials', dest='dbcredentials', type='string', default=os.environ.get('TMSS_DBCREDENTIALS', 'TMSS'), help='django dbcredentials name, default: %default')
...@@ -220,8 +112,18 @@ def main(): ...@@ -220,8 +112,18 @@ def main():
from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error from lofar.sas.tmss.tmss import setup_and_check_tmss_django_database_connection_and_exit_on_error
setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials) setup_and_check_tmss_django_database_connection_and_exit_on_error(options.dbcredentials)
with create_service(exchange=options.exchange, broker=options.broker, qpid_broker=options.qpid_broker, feedback_wait_timeout=options.timeout): old_qpid_frombuses = [old_qpid_messagebus.FromBus(QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=options.qpid_broker),
waitForInterrupt() old_qpid_messagebus.FromBus(QPID_PROCESSING_FEEDBACK_QUEUE, broker=options.qpid_broker)]
while True:
try:
read_feedback_message_and_process(old_qpid_frombuses, timeout=5)
except KeyboardInterrupt:
break
for bus in old_qpid_frombuses:
bus.close()
if __name__ == '__main__': if __name__ == '__main__':
main() main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment