-
Jörn Künsemöller authoredJörn Künsemöller authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_scheduling_units.py 20.52 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id: $
import os
import unittest
import requests
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.common.test_utils import exit_with_skipped_code_if_skip_integration_tests
exit_with_skipped_code_if_skip_integration_tests()
from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema
# Do Mandatory setup step:
# use setup/teardown magic for tmss test database, ldap server and django server
# (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module)
from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import *
tmss_test_env.populate_schemas()
from lofar.sas.tmss.test.tmss_test_data_django_models import *
# import and setup rest test data creator
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.exceptions import SchemaValidationException
import requests
from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft
class SchedulingUnitBlueprintStateTest(unittest.TestCase):
"""
Test the Scheduling Blueprint State which is derived from the TaskBlueprint states.
The result of each possible combination of these states will be checked
See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-SchedulingBlueprints
"""
def create_tasks_and_subtasks(self, schedulingunit_blueprint, skip_create_subtask=[]):
"""
Create three taskblueprint related to the schedulingunit_blueprint.
These task are an observation, a pipeline and a ingest task.
Also per task one subtask is instantiated (so makes three total) which is required to be able to set
the task status which is a read-only property and is derived from the subtask states
:param schedulingunit_blueprint:
:return: dictionary with task and subtask objects
"""
# Create observation task
task_data = TaskBlueprint_test_data(name="Task Observation "+str(uuid.uuid4()), scheduling_unit_blueprint=schedulingunit_blueprint)
task_obs = models.TaskBlueprint.objects.create(**task_data)
subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"),
subtask_template=models.SubtaskTemplate.objects.get(name='observation control'))
if "observation" in skip_create_subtask:
subtask_obs = None
else:
subtask_obs = models.Subtask.objects.create(**subtask_data)
subtask_obs.task_blueprints.set([task_obs])
# Create pipeline task
task_data = TaskBlueprint_test_data(name="Task Pipeline", scheduling_unit_blueprint=schedulingunit_blueprint)
task_pipe = models.TaskBlueprint.objects.create(**task_data)
# Need to change the default template type (observation) to pipeline
task_pipe.specifications_template = models.TaskTemplate.objects.get(type=models.TaskType.Choices.PIPELINE.value)
task_pipe.save()
subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"),
subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control'))
if "pipeline" in skip_create_subtask:
subtask_pipe = None
else:
subtask_pipe = models.Subtask.objects.create(**subtask_data)
subtask_pipe.task_blueprints.set([task_pipe])
# Create ingest task
# Because there is no taskTemplate object for ingest by default I have to create one
test_data = TaskTemplate_test_data(name="task_template_for_ingest", task_type_value="ingest")
my_test_template = models.TaskTemplate.objects.create(**test_data)
task_data = TaskBlueprint_test_data(name="Task Ingest", scheduling_unit_blueprint=schedulingunit_blueprint)
task_ingest = models.TaskBlueprint.objects.create(**task_data)
task_ingest.specifications_template = my_test_template
task_ingest.save()
# There is no template defined for ingest yet ...but I can use pipeline control, only the template type matters
# ....should become other thing in future but for this test does not matter
subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"),
subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control'))
if "ingest" in skip_create_subtask:
subtask_ingest = None
else:
subtask_ingest = models.Subtask.objects.create(**subtask_data)
subtask_ingest.task_blueprints.set([task_ingest])
return {"observation": {"task": task_obs, "subtask": subtask_obs},
"pipeline": {"task": task_pipe, "subtask": subtask_pipe},
"ingest": {"task": task_ingest, "subtask": subtask_ingest}}
def set_task_state(self, task_state, task_type, task, subtask):
"""
Set the taskblueprint state for given task_type
State of task can only be set by setting the subtask state
Do not set subtask state if subtask is None
:param task_state: Task state to be set
:param task_type: observation, pipeline or ingest
:param task: TaskBlueprint object
:param subtask: SubTask object
"""
# Translate task state to subtask state, mostly one-o-one but two exceptions
if task_state == "observed":
subtask_state = "finishing"
elif task_state == "schedulable":
subtask_state = "scheduling"
else:
subtask_state = task_state
if subtask is not None:
subtask.state = models.SubtaskState.objects.get(value=subtask_state)
subtask.save()
# Check task.status as precondition
self.assertEqual(task_state, task.status,
"INCORRECT PRECONDITION. Expected %s task to have status=%s, but actual status=%s)" % (
task_type, task_state, task.status))
def test_state_with_no_tasks(self):
"""
Test the schedulingunitblueprint state when tasks are not instantiated.
the expected state should be 'defined'
"""
schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Scheduling Blueprint No Tasks")
schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data)
self.assertEqual("defined", schedulingunit_blueprint.status)
def test_states_with_observation_pipeline_ingest_tasks_subtasks(self):
"""
Test the schedulingunitblueprint state when only one task is instantiated, an pipeline
Subtask are also instantiated so minimal task state is schedulable !
See next table where every row represents:
Taskstate(obs), Taskstate(pipeline), Taskstate(ingest), Expected SchedulingUnitBlueprint Status
"""
test_table = [
# normal behaviour
("error", "schedulable", "schedulable", "error"),
("cancelled", "schedulable", "schedulable", "cancelled"),
("schedulable", "schedulable", "schedulable", "schedulable"),
("scheduled", "schedulable", "schedulable", "scheduled"),
("started", "schedulable", "schedulable", "observing"),
("observed", "schedulable", "schedulable", "observed"),
("observed", "scheduled", "schedulable", "observed"),
("observed", "started", "schedulable", "processing"),
("observed", "finished", "schedulable", "processing"),
("observed", "finished", "scheduled", "processing"),
("observed", "finished", "started", "processing"),
("observed", "finished", "finished", "processing"),
("finished", "schedulable", "schedulable", "observed"),
("finished", "scheduled", "schedulable", "observed"),
("finished", "started", "schedulable", "processing"),
("finished", "finished", "schedulable", "processed"),
("finished", "finished", "scheduled", "processed"),
("finished", "finished", "started", "ingesting"),
("finished", "finished", "finished", "finished"),
# any cancelled
("observed", "cancelled", "schedulable", "cancelled"),
("observed", "schedulable", "cancelled", "cancelled"),
("observed", "scheduled", "cancelled", "cancelled"),
("observed", "started", "cancelled", "cancelled"),
("observed", "cancelled", "schedulable", "cancelled"),
("observed", "cancelled", "scheduled", "cancelled"),
("observed", "cancelled", "started", "cancelled"),
("observed", "cancelled", "finished", "cancelled"),
("finished", "cancelled", "schedulable", "cancelled"),
# any error
("observed", "error", "schedulable", "error"),
("observed", "schedulable", "error", "error"),
("observed", "scheduled", "error", "error"),
("observed", "started", "error", "error"),
("observed", "error", "schedulable", "error"),
("observed", "error", "scheduled", "error"),
("observed", "error", "started", "error"),
("observed", "error", "finished", "error"),
# cancelled over error
("error", "error", "cancelled", "cancelled")
]
# Create schedulingblueprint
schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Task Blueprint With Three Tasks")
schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data)
# Create related task and subtasks
tasks_and_subtasks_dict = self.create_tasks_and_subtasks(schedulingunit_blueprint)
# Do the actual test
task_state_dict = {}
for test_item in test_table:
task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status = test_item
info_msg = "Test with with states observation='%s',pipeline='%s',ingest='%s' should result in schedulingunit_blueprint.status '%s'" \
% (task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status)
logger.info(info_msg)
for key in tasks_and_subtasks_dict:
self.set_task_state(task_state_dict[key], key, tasks_and_subtasks_dict[key]["task"], tasks_and_subtasks_dict[key]["subtask"])
# Check result
self.assertEqual(expected_schedulingunit_status, schedulingunit_blueprint.status, info_msg)
def test_states_with_observation_pipeline_ingest_tasks_no_ingest_subtask(self):
"""
Test the schedulingunitblueprint state when the tasks, observation, pipeline and ingest are instantiated
Subtask of ingest is missing, which makes implicit the task state defined!
See next table where every row represents:
Taskstate(obs), Taskstate(pipeline), Taskstate(ingest), Expected SchedulingUnitBlueprint Status
"""
test_table = [
# normal behaviour
("error", "schedulable", "defined", "error"),
("cancelled", "schedulable", "defined", "cancelled"),
("schedulable", "schedulable", "defined", "schedulable"),
("scheduled", "schedulable", "defined", "scheduled"),
("started", "schedulable", "defined", "observing"),
("observed", "schedulable", "defined", "observed"),
("observed", "scheduled", "defined", "observed"),
("observed", "started", "defined", "processing"),
("observed", "finished", "defined", "processing"),
("finished", "schedulable", "defined", "observed"),
]
# Create schedulingblueprint
schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Task Blueprint With Three Tasks No Ingest Subtask")
schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data)
# Create related task and subtasks (skip creation of ingest subtask)
tasks_and_subtasks_dict = self.create_tasks_and_subtasks(schedulingunit_blueprint, ["ingest"])
# Do the actual test
task_state_dict = {}
for test_item in test_table:
task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status = test_item
info_msg = "Test with with states observation='%s',pipeline='%s',ingest='%s' should result in schedulingunit_blueprint.status '%s'" \
% (task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status)
logger.info(info_msg)
for key in tasks_and_subtasks_dict:
self.set_task_state(task_state_dict[key], key, tasks_and_subtasks_dict[key]["task"], tasks_and_subtasks_dict[key]["subtask"])
# Check result
self.assertEqual(expected_schedulingunit_status, schedulingunit_blueprint.status, info_msg)
class TestFlatStations(unittest.TestCase):
"""
Test the property of 'flat_stations', retrieve a list of all station as a flat list
"""
def create_UC1_observation_scheduling_unit(self, name, scheduling_set):
constraints_template = models.SchedulingConstraintsTemplate.objects.get(name="constraints")
constraints = add_defaults_to_json_object_for_schema({}, constraints_template.schema)
uc1_strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines")
scheduling_unit_spec = add_defaults_to_json_object_for_schema(uc1_strategy_template.template,
uc1_strategy_template.scheduling_unit_template.schema)
# limit target obs duration for demo data
scheduling_unit_spec['tasks']['Calibrator Observation 1']['specifications_doc']['duration'] = 2 * 60
scheduling_unit_spec['tasks']['Target Observation']['specifications_doc']['duration'] = 2 * 3600
scheduling_unit_spec['tasks']['Calibrator Observation 2']['specifications_doc']['duration'] = 2 * 60
# add the scheduling_unit_doc to a new SchedulingUnitDraft instance, and were ready to use it!
return models.SchedulingUnitDraft.objects.create(name=name,
scheduling_set=scheduling_set,
requirements_template=uc1_strategy_template.scheduling_unit_template,
requirements_doc=scheduling_unit_spec,
observation_strategy_template=uc1_strategy_template,
scheduling_constraints_doc=constraints,
scheduling_constraints_template=constraints_template)
def modify_stations_in_station_group(self, station_group_idx, lst_stations):
"""
Modify for the scheduling_unit_blueprint created add setup, the list of stations for given group idx
"""
station_groups = self.scheduling_unit_blueprint.requirements_doc['tasks']['Target Observation']['specifications_doc']['station_groups']
station_groups[station_group_idx]["stations"] = lst_stations
def setUp(self) -> None:
# scheduling unit
my_scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data())
scheduling_unit_draft = self.create_UC1_observation_scheduling_unit("UC1 scheduling unit for testing", my_scheduling_set)
self.scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
def test_with_different_stations(self):
"""
Test with different station list and station groups
"""
list_expected_stations = [
"CS001", "CS002", "CS003", "CS004", "CS005", "CS006", "CS007", "CS011", "CS013", "CS017", "CS021",
"CS024", "CS026", "CS028", "CS030", "CS031", "CS032", "CS301", "CS302", "CS401", "CS501", "RS106",
"RS205", "RS208", "RS210", "RS305", "RS306", "RS307", "RS310", "RS406", "RS407", "RS409", "RS503",
"RS508", "RS509",
"DE601", "DE602", "DE603", "DE604", "DE605", "DE609", "FR606", "SE607", "UK608", "PL610", "PL611",
"PL612", "IE613", "LV614"]
self.assertCountEqual(list_expected_stations, self.scheduling_unit_blueprint.flat_station_list)
# Clear all stations and check that flat_station_list is empty
nbr_station_groups = len(self.scheduling_unit_blueprint.requirements_doc['tasks']['Target Observation']['specifications_doc']['station_groups'])
for idx in range(nbr_station_groups):
self.modify_stations_in_station_group(idx, [])
self.assertEqual([], self.scheduling_unit_blueprint.flat_station_list)
# Set two stations for all station_groups, check flat_station_list contains two stations
for idx in range(nbr_station_groups):
self.modify_stations_in_station_group(idx, ['CS001', 'CS002'])
self.assertCountEqual(['CS001', 'CS002'], self.scheduling_unit_blueprint.flat_station_list)
# Set different stations for the station_groups
total_station_list = []
for idx in range(nbr_station_groups):
station_list = ['CS00%d' % idx, 'CS02%d' % idx]
total_station_list += station_list
self.modify_stations_in_station_group(idx, station_list)
self.assertCountEqual(total_station_list, self.scheduling_unit_blueprint.flat_station_list)
# Set two stations for all station_groups, check flat_station_list contains all stations
all_stations = ["CS001","CS002","CS003","CS004","CS005","CS006","CS007","CS011","CS013","CS017","CS021","CS024",
"CS026","CS028","CS030","CS031","CS032","CS101","CS103","CS201","CS301","CS302","CS401","CS501",
"RS104","RS106","RS205","RS208","RS210","RS305","RS306","RS307","RS310","RS406","RS407","RS409",
"RS410","RS503","RS508","RS509",
"DE601","DE602","DE603","DE604","DE605","FR606","SE607","UK608","DE609","PL610","PL611","PL612",
"IE613","LV614"]
for idx in range(nbr_station_groups):
self.modify_stations_in_station_group(idx, all_stations)
self.assertCountEqual(all_stations, self.scheduling_unit_blueprint.flat_station_list)
# Lets set group with stations which are already in other station groups, so flat_station_list still the same
self.modify_stations_in_station_group(0, ['CS001', 'CS001', 'DE601', 'PL612'])
self.assertCountEqual(all_stations, self.scheduling_unit_blueprint.flat_station_list)
# Lets add a group with stations which are NOT in other station groups, so flat_station_list so be extend now
station_list = ['XX901', 'XX902', 'XX903', 'XX904']
self.modify_stations_in_station_group(0, station_list)
self.assertCountEqual(all_stations+station_list, self.scheduling_unit_blueprint.flat_station_list)