Skip to content
Snippets Groups Projects
Commit a885c014 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-261: auto_grant_ingest_permission: simulate granting ingest permission...

TMSS-261: auto_grant_ingest_permission: simulate granting ingest permission when scheduling an ingest subtask fails
parent 32eac6b2
No related branches found
No related tags found
1 merge request!409Resolve TMSS-261
...@@ -658,6 +658,7 @@ def main_test_environment(): ...@@ -658,6 +658,7 @@ def main_test_environment():
def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int, stop_event: threading.Event, def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int, stop_event: threading.Event,
handle_observations: bool = True, handle_pipelines: bool = True, handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True, handle_QA: bool = True, handle_ingest: bool = True,
auto_grant_ingest_permission: bool = True,
delay: float=1, duration: float=5, delay: float=1, duration: float=5,
create_output_dataproducts: bool=False, create_output_dataproducts: bool=False,
exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
...@@ -688,6 +689,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -688,6 +689,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
self.handle_pipelines = handle_pipelines self.handle_pipelines = handle_pipelines
self.handle_QA = handle_QA self.handle_QA = handle_QA
self.handle_ingest = handle_ingest self.handle_ingest = handle_ingest
self.auto_grant_ingest_permission = auto_grant_ingest_permission
self.delay = delay self.delay = delay
self.duration = duration self.duration = duration
self.create_output_dataproducts = create_output_dataproducts self.create_output_dataproducts = create_output_dataproducts
...@@ -844,6 +846,22 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -844,6 +846,22 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
for algo in models.Algorithm.objects.all(): for algo in models.Algorithm.objects.all():
models.DataproductHash.objects.create(dataproduct=output_dp, algorithm=algo, hash=uuid4()) models.DataproductHash.objects.create(dataproduct=output_dp, algorithm=algo, hash=uuid4())
elif status == models.SubtaskState.Choices.DEFINED.value:
state_transition = models.SubtaskStateLog.objects.filter(subtask__id=subtask.id,
old_state__value=models.SubtaskState.Choices.SCHEDULING.value,
new_state__value=models.SubtaskState.Choices.DEFINED.value).order_by('-updated_at').first()
if state_transition and datetime.utcnow() - state_transition.updated_at < timedelta(hours=1):
logger.info("subtask id=%d type='%s' returned to state 'defined' while scheduling... (which means that scheduling did not succeed)",
subtask.id, subtask.specifications_template.type.value)
if subtask.specifications_template.type.value == 'ingest':
logger.info("subtask id=%d is an ingest task which requires permission in order to be scheduled", subtask.id)
if self.auto_grant_ingest_permission and subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_required:
# just granting the permission triggers the scheduling_service to check and schedulable ingest subtasks,
# resulting in a scheduled ingest subtask.
logger.info("granting ingest subtask id=%d ingest_permission", subtask.id)
subtask.task_blueprint.scheduling_unit_blueprint.ingest_permission_granted_since = datetime.utcnow()
subtask.task_blueprint.scheduling_unit_blueprint.save()
if next_state: if next_state:
sleep(self.delay) # mimic a little 'processing' delay sleep(self.delay) # mimic a little 'processing' delay
...@@ -895,6 +913,7 @@ def main_scheduling_unit_blueprint_simulator(): ...@@ -895,6 +913,7 @@ def main_scheduling_unit_blueprint_simulator():
parser.add_option_group(group) parser.add_option_group(group)
group.add_option('-e', '--event_delay', dest='event_delay', type='float', default=1.0, help='wait <event_delay> seconds between simulating events to mimic real-world behaviour, default: %default') group.add_option('-e', '--event_delay', dest='event_delay', type='float', default=1.0, help='wait <event_delay> seconds between simulating events to mimic real-world behaviour, default: %default')
group.add_option('-d', '--duration', dest='duration', type='float', default=60.0, help='wait <duration> seconds while "observing"/"processing" between started and finishing state to mimic real-world behaviour, default: %default') group.add_option('-d', '--duration', dest='duration', type='float', default=60.0, help='wait <duration> seconds while "observing"/"processing" between started and finishing state to mimic real-world behaviour, default: %default')
group.add_option('-g', '--grant_ingest_permission', dest='grant_ingest_permission', action='store_true', help='automatically grant ingest permission for ingest subtasks if needed')
group = OptionGroup(parser, 'Messaging options') group = OptionGroup(parser, 'Messaging options')
parser.add_option_group(group) parser.add_option_group(group)
...@@ -926,6 +945,7 @@ def main_scheduling_unit_blueprint_simulator(): ...@@ -926,6 +945,7 @@ def main_scheduling_unit_blueprint_simulator():
delay=options.event_delay, duration=options.duration, delay=options.event_delay, duration=options.duration,
handle_observations=bool(options.observation), handle_pipelines=bool(options.pipeline), handle_observations=bool(options.observation), handle_pipelines=bool(options.pipeline),
handle_QA=bool(options.QA), handle_ingest=bool(options.ingest), handle_QA=bool(options.QA), handle_ingest=bool(options.ingest),
auto_grant_ingest_permission=bool(options.grant_ingest_permission),
exchange=options.exchange, broker=options.broker): exchange=options.exchange, broker=options.broker):
print("Press Ctrl-C to exit") print("Press Ctrl-C to exit")
try: try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment