Skip to content
Snippets Groups Projects
Commit 8cd67d0e authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-320: added option to create test output dataproducts

parent 6d09c0fc
No related branches found
No related tags found
2 merge requests!308Resolve TMSS-495,!306Resolve TMSS-320
...@@ -531,6 +531,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -531,6 +531,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
handle_observations: bool = True, handle_pipelines: bool = True, handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True, handle_QA: bool = True, handle_ingest: bool = True,
delay: float=1, duration: float=5, delay: float=1, duration: float=5,
create_output_dataproducts: bool=False,
exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER): exchange=DEFAULT_BUSNAME, broker=DEFAULT_BROKER):
''' '''
create a "simulator" which sets the correct events in the correct order upon receiving status change events, create a "simulator" which sets the correct events in the correct order upon receiving status change events,
...@@ -550,7 +551,8 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -550,7 +551,8 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
def __init__(self, scheduling_unit_blueprint_id: int, stop_event: threading.Event, def __init__(self, scheduling_unit_blueprint_id: int, stop_event: threading.Event,
handle_observations: bool = True, handle_pipelines: bool = True, handle_observations: bool = True, handle_pipelines: bool = True,
handle_QA: bool = True, handle_ingest: bool = True, handle_QA: bool = True, handle_ingest: bool = True,
delay: float = 1, duration: float = 10) -> None: delay: float = 1, duration: float = 10,
create_output_dataproducts: bool=False) -> None:
super().__init__(log_event_messages=False) super().__init__(log_event_messages=False)
self.scheduling_unit_blueprint_id = scheduling_unit_blueprint_id self.scheduling_unit_blueprint_id = scheduling_unit_blueprint_id
self.stop_event = stop_event self.stop_event = stop_event
...@@ -560,6 +562,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -560,6 +562,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
self.handle_ingest = handle_ingest self.handle_ingest = handle_ingest
self.delay = delay self.delay = delay
self.duration = duration self.duration = duration
self.create_output_dataproducts = create_output_dataproducts
def need_to_handle(self, subtask: models.Subtask) -> bool: def need_to_handle(self, subtask: models.Subtask) -> bool:
if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id: if subtask.task_blueprint.scheduling_unit_blueprint.id != self.scheduling_unit_blueprint_id:
...@@ -671,6 +674,13 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -671,6 +674,13 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
if subtask.specifications_template.type.value in [models.SubtaskType.Choices.OBSERVATION.value, if subtask.specifications_template.type.value in [models.SubtaskType.Choices.OBSERVATION.value,
models.SubtaskType.Choices.PIPELINE.value]: models.SubtaskType.Choices.PIPELINE.value]:
if self.create_output_dataproducts:
for output_dp in subtask.output_dataproducts.all():
os.makedirs(output_dp.directory, exist_ok=True)
logger.info('writing 1KB test dataproduct for subtask id=%s %s', subtask.id, output_dp.filepath)
with open(output_dp.filepath, 'w') as file:
file.write(1024 * 'a')
# create some nice default (and thus correct although not scientifically meaningful) feedback # create some nice default (and thus correct although not scientifically meaningful) feedback
template = models.DataproductFeedbackTemplate.objects.get(name="feedback") template = models.DataproductFeedbackTemplate.objects.get(name="feedback")
feedback_doc = get_default_json_object_for_schema(template.schema) feedback_doc = get_default_json_object_for_schema(template.schema)
...@@ -718,6 +728,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int ...@@ -718,6 +728,7 @@ def create_scheduling_unit_blueprint_simulator(scheduling_unit_blueprint_id: int
'stop_event': stop_event, 'stop_event': stop_event,
'handle_observations': handle_observations, 'handle_pipelines': handle_pipelines, 'handle_observations': handle_observations, 'handle_pipelines': handle_pipelines,
'handle_QA': handle_QA, 'handle_ingest': handle_ingest, 'handle_QA': handle_QA, 'handle_ingest': handle_ingest,
'create_output_dataproducts': create_output_dataproducts,
'delay': delay, 'duration': duration}, 'delay': delay, 'duration': duration},
exchange=exchange, broker=broker) exchange=exchange, broker=broker)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment