Skip to content
Snippets Groups Projects

Resolve TMSS-780

Merged Jörn Künsemöller requested to merge TMSS-780 into master
Files
15
@@ -65,25 +65,29 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler):
try:
suc_subtask_id = successor['id']
suc_subtask_state = successor['state_value']
if suc_subtask_state == "defined":
successor_predecessors = self.tmss_client.get_subtask_predecessors(suc_subtask_id)
if any([suc_pred['state_value']!='finished' for suc_pred in successor_predecessors]):
logger.info("skipping scheduling of successor subtask %s for finished subtask %s because not all its other predecessor subtasks are finished", suc_subtask_id, id)
suc_subtask_obsolete_since = successor['obsolete_since']
if suc_subtask_obsolete_since is None:
if suc_subtask_state == "defined":
successor_predecessors = self.tmss_client.get_subtask_predecessors(suc_subtask_id)
if any([suc_pred['state_value'] != 'finished' and suc_pred['obsolete_since'] is None for suc_pred in successor_predecessors]):
logger.info("skipping scheduling of successor subtask %s for finished subtask %s because not all its other (non-obsolete) predecessor subtasks are finished", suc_subtask_id, id)
else:
logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, id)
# try scheduling the subtask.
# if it succeeds, then the state will be 'scheduled' afterwards
# if there is a specification error, then the state will be 'error' afterwards
# if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry.
# for the ingest-permission we will retry automatically when that permission is granted
scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id)
suc_subtask_state = scheduled_successor['state_value']
logger.log(logging.INFO if suc_subtask_state=='scheduled' else logging.WARNING,
"successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, id, suc_subtask_state, scheduled_successor['url'])
else:
logger.info("trying to schedule successor subtask %s for finished subtask %s", suc_subtask_id, id)
# try scheduling the subtask.
# if it succeeds, then the state will be 'scheduled' afterwards
# if there is a specification error, then the state will be 'error' afterwards
# if there is another kind of error (like needing ingest-permission), then the state will be 'defined' afterwards, so you can retry.
# for the ingest-permission we will retry automatically when that permission is granted
scheduled_successor = self.tmss_client.schedule_subtask(suc_subtask_id)
suc_subtask_state = scheduled_successor['state_value']
logger.log(logging.INFO if suc_subtask_state=='scheduled' else logging.WARNING,
"successor subtask %s for finished subtask %s now has state '%s', see %s", suc_subtask_id, id, suc_subtask_state, scheduled_successor['url'])
logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, id, suc_subtask_state)
else:
logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because its state is '%s'", suc_subtask_id, id, suc_subtask_state)
logger.warning("skipping scheduling of successor subtask %s for finished subtask %s because it is markes obsolete since %s", suc_subtask_id, id, suc_subtask_obsolete_since)
except Exception as e:
logger.error(e)
@@ -96,7 +100,7 @@ class TMSSSubTaskSchedulingEventMessageHandler(TMSSEventMessageHandler):
if subtask['state_value'] == 'defined':
subtask_template = self.tmss_client.get_url_as_json_object(subtask['specifications_template'])
if subtask_template['type_value'] == 'ingest':
if all(pred['state_value'] == 'finished' for pred in self.tmss_client.get_subtask_predecessors(subtask['id'])):
if all(pred['state_value'] == 'finished' or pred['obsolete_since'] is not None for pred in self.tmss_client.get_subtask_predecessors(subtask['id'])):
logger.info("trying to schedule ingest subtask id=%s for scheduling_unit_blueprint id=%s...", subtask['id'], id)
self.tmss_client.schedule_subtask(subtask['id'])
Loading