Skip to content
Snippets Groups Projects
Commit 3b809c8a authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-877: renamed _finishing_subtasks to _finishing_subtask_timestamps

parent a0a62c75
No related branches found
No related tags found
2 merge requests!634WIP: COBALT commissioning delta,!565TMSS-877: Leave subtask in FINISHING state in case feedback is overdue
...@@ -62,7 +62,7 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -62,7 +62,7 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
def __init__(self, rest_client_creds_id: str=None, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None: def __init__(self, rest_client_creds_id: str=None, qpid_broker: str=old_qpid_messagebus.broker_feedback, feedback_wait_timeout: int=DEFAULT_FEEDBACK_WAIT_TIMEOUT) -> None:
super().__init__(log_event_messages=False) super().__init__(log_event_messages=False)
# a dict of subtask_id -> wait_timeout_timestamp for timeout computation # a dict of subtask_id -> wait_timeout_timestamp for timeout computation
self._finishing_subtasks = {} self._finishing_subtask_timestamps = {}
self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(rest_client_creds_id) self._tmss_client = TMSSsession.create_from_dbcreds_for_ldap(rest_client_creds_id)
self._qpid_broker = qpid_broker self._qpid_broker = qpid_broker
self._old_qpid_frombus = None self._old_qpid_frombus = None
...@@ -74,7 +74,7 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -74,7 +74,7 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
self._old_qpid_frombus = old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker) self._old_qpid_frombus = old_qpid_messagebus.FromBus(self.QPID_DATAPRODUCT_FEEDBACK_QUEUE, broker=self._qpid_broker)
except Exception as e: except Exception as e:
logger.warning("Could not connect to old-style qpid messagebus: %s", e) logger.warning("Could not connect to old-style qpid messagebus: %s", e)
self._init_timeouts_for_finishing_subtasks() self._init_timeouts_for_finishing_subtask_timestamps()
super().start_handling() super().start_handling()
def stop_handling(self): def stop_handling(self):
...@@ -95,9 +95,9 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -95,9 +95,9 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
logger.info('waiting at most %d seconds until %s before cancelling %s subtask id=%s if not all feedback is received by then', logger.info('waiting at most %d seconds until %s before cancelling %s subtask id=%s if not all feedback is received by then',
(wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp, (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), wait_timeout_timestamp,
specifications_template['type_value'], subtask['id']) specifications_template['type_value'], subtask['id'])
self._finishing_subtasks[subtask['id']] = wait_timeout_timestamp self._finishing_subtask_timestamps[subtask['id']] = wait_timeout_timestamp
def _init_timeouts_for_finishing_subtasks(self): def _init_timeouts_for_finishing_subtask_timestamps(self):
'''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.''' '''upon startup, initialize the timeout for all currently finishing subtasks. Allows for service restarts.'''
subtasks = self._tmss_client.get_subtasks(state='finishing') subtasks = self._tmss_client.get_subtasks(state='finishing')
for subtask in subtasks: for subtask in subtasks:
...@@ -111,11 +111,11 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -111,11 +111,11 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
subtask = self._tmss_client.get_subtask(id) subtask = self._tmss_client.get_subtask(id)
self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask) self._init_wait_timeout_for_finishing_observation_or_pipeline_subtask(subtask)
elif status in ('finished', 'cancelling', 'cancelled', 'error'): elif status in ('finished', 'cancelling', 'cancelled', 'error'):
if id in self._finishing_subtasks: if id in self._finishing_subtask_timestamps:
wait_timeout_timestamp = self._finishing_subtasks[id] wait_timeout_timestamp = self._finishing_subtask_timestamps[id]
logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ", logger.info("removing remaining feedback wait time of %s seconds for subtask id=%s because the status is %s, ",
(wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status) (wait_timeout_timestamp - datetime.utcnow()).total_seconds(), id, status)
del self._finishing_subtasks[id] del self._finishing_subtask_timestamps[id]
def before_receive_message(self): def before_receive_message(self):
# use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop # use TMSSEventMessageHandler template pattern to act perform extra business logic in the loop
...@@ -124,9 +124,9 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler): ...@@ -124,9 +124,9 @@ class HybridFeedbackMessageHandler(TMSSEventMessageHandler):
super().before_receive_message() super().before_receive_message()
def _detect_and_log_feedback_timeout(self): def _detect_and_log_feedback_timeout(self):
for subtask_id, wait_timeout_timestamp in list(self._finishing_subtasks.items()): for subtask_id, wait_timeout_timestamp in list(self._finishing_subtask_timestamps.items()):
if datetime.utcnow() > wait_timeout_timestamp: if datetime.utcnow() > wait_timeout_timestamp:
del self._finishing_subtasks[subtask_id] del self._finishing_subtask_timestamps[subtask_id]
logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout) logger.warning('Feedback for subtask id=%s is overdue and was expected no later than %s! Cancel it or fix the feedback so that this subtask can proceed.', subtask_id, self._feedback_wait_timeout)
def _read_feedback_message_and_process(self, timeout: float=1): def _read_feedback_message_and_process(self, timeout: float=1):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment