Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_scheduling_units.py 20.52 KiB
#!/usr/bin/env python3

# Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

# $Id:  $

import os
import unittest
import requests

import logging
logger = logging.getLogger(__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)

from lofar.common.test_utils import exit_with_skipped_code_if_skip_integration_tests
exit_with_skipped_code_if_skip_integration_tests()

from lofar.common.json_utils import get_default_json_object_for_schema, add_defaults_to_json_object_for_schema


# Do Mandatory setup step:
# use setup/teardown magic for tmss test database, ldap server and django server
# (ignore pycharm unused import statement, python unittests does use at RunTime the tmss_test_environment_unittest_setup module)
from lofar.sas.tmss.test.tmss_test_environment_unittest_setup import *
tmss_test_env.populate_schemas()

from lofar.sas.tmss.test.tmss_test_data_django_models import *

# import and setup rest test data creator
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
rest_data_creator = TMSSRESTTestDataCreator(BASE_URL, AUTH)

from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.exceptions import SchemaValidationException

import requests

from lofar.sas.tmss.tmss.tmssapp.tasks import create_task_blueprints_and_subtasks_from_scheduling_unit_draft


class SchedulingUnitBlueprintStateTest(unittest.TestCase):
    """
    Test the Scheduling Blueprint State which is derived from the TaskBlueprint states.
    The result of each possible combination of these states will be checked
    See https://support.astron.nl/confluence/display/TMSS/Specification+Flow#SpecificationFlow-SchedulingBlueprints
    """

    def create_tasks_and_subtasks(self, schedulingunit_blueprint, skip_create_subtask=[]):
        """
        Create three taskblueprint related to the schedulingunit_blueprint.
        These task are an observation, a pipeline and a ingest task.
        Also per task one subtask is instantiated (so makes three total) which is required to be able to set
        the task status which is a read-only property and is derived from the subtask states
        :param schedulingunit_blueprint:
        :return: dictionary with task and subtask objects
        """
        # Create observation task
        task_data = TaskBlueprint_test_data(name="Task Observation "+str(uuid.uuid4()), scheduling_unit_blueprint=schedulingunit_blueprint)
        task_obs = models.TaskBlueprint.objects.create(**task_data)
        subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"),
                                               subtask_template=models.SubtaskTemplate.objects.get(name='observation control'))
        if "observation" in skip_create_subtask:
            subtask_obs = None
        else:
            subtask_obs = models.Subtask.objects.create(**subtask_data)
            subtask_obs.task_blueprints.set([task_obs])

        # Create pipeline task
        task_data = TaskBlueprint_test_data(name="Task Pipeline", scheduling_unit_blueprint=schedulingunit_blueprint)
        task_pipe = models.TaskBlueprint.objects.create(**task_data)
        # Need to change the default template type (observation) to pipeline
        task_pipe.specifications_template = models.TaskTemplate.objects.get(type=models.TaskType.Choices.PIPELINE.value)
        task_pipe.save()
        subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"),
                                         subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control'))
        if "pipeline" in skip_create_subtask:
            subtask_pipe = None
        else:
            subtask_pipe = models.Subtask.objects.create(**subtask_data)
            subtask_pipe.task_blueprints.set([task_pipe])

        # Create ingest task
        # Because there is no taskTemplate object for ingest by default I have to create one
        test_data = TaskTemplate_test_data(name="task_template_for_ingest", task_type_value="ingest")
        my_test_template = models.TaskTemplate.objects.create(**test_data)
        task_data = TaskBlueprint_test_data(name="Task Ingest", scheduling_unit_blueprint=schedulingunit_blueprint)
        task_ingest = models.TaskBlueprint.objects.create(**task_data)
        task_ingest.specifications_template = my_test_template
        task_ingest.save()
        # There is no template defined for ingest yet ...but I can use pipeline control, only the template type matters
        # ....should become other thing in future but for this test does not matter
        subtask_data = Subtask_test_data(state=models.SubtaskState.objects.get(value="defined"),
                                         subtask_template=models.SubtaskTemplate.objects.get(name='pipeline control'))
        if "ingest" in skip_create_subtask:
            subtask_ingest = None
        else:
            subtask_ingest = models.Subtask.objects.create(**subtask_data)
            subtask_ingest.task_blueprints.set([task_ingest])

        return {"observation": {"task": task_obs, "subtask": subtask_obs},
                "pipeline": {"task": task_pipe, "subtask": subtask_pipe},
                "ingest": {"task": task_ingest, "subtask": subtask_ingest}}

    def set_task_state(self, task_state, task_type, task, subtask):
        """
        Set the taskblueprint state for given task_type
        State of task can only be set by setting the subtask state
        Do not set subtask state if subtask is None
        :param task_state: Task state to be set
        :param task_type: observation, pipeline or ingest
        :param task: TaskBlueprint object
        :param subtask: SubTask object
        """
        # Translate task state to subtask state, mostly one-o-one but two exceptions
        if task_state == "observed":
            subtask_state = "finishing"
        elif task_state == "schedulable":
            subtask_state = "scheduling"
        else:
            subtask_state = task_state

        if subtask is not None:
            subtask.state = models.SubtaskState.objects.get(value=subtask_state)
            subtask.save()
        # Check task.status as precondition
        self.assertEqual(task_state, task.status,
                         "INCORRECT PRECONDITION. Expected %s task to have status=%s, but actual status=%s)" % (
                         task_type, task_state, task.status))

    def test_state_with_no_tasks(self):
        """
        Test the schedulingunitblueprint state when tasks are not instantiated.
        the expected state should be 'defined'
        """
        schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Scheduling Blueprint No Tasks")
        schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data)
        self.assertEqual("defined", schedulingunit_blueprint.status)

    def test_states_with_observation_pipeline_ingest_tasks_subtasks(self):
        """
        Test the schedulingunitblueprint state when only one task is instantiated, an pipeline
        Subtask are also instantiated so minimal task state is schedulable !
        See next table where every row represents:
            Taskstate(obs),  Taskstate(pipeline), Taskstate(ingest), Expected SchedulingUnitBlueprint Status
        """
        test_table = [
            # normal behaviour
            ("error",       "schedulable", "schedulable",  "error"),
            ("cancelled",   "schedulable", "schedulable",  "cancelled"),
            ("schedulable", "schedulable", "schedulable",  "schedulable"),
            ("scheduled",   "schedulable", "schedulable",  "scheduled"),
            ("started",     "schedulable", "schedulable",  "observing"),
            ("observed",    "schedulable", "schedulable",  "observed"),
            ("observed",    "scheduled",   "schedulable",  "observed"),
            ("observed",    "started",     "schedulable",  "processing"),
            ("observed",    "finished",    "schedulable",  "processing"),
            ("observed",    "finished",    "scheduled",    "processing"),
            ("observed",    "finished",    "started",      "processing"),
            ("observed",    "finished",    "finished",     "processing"),
            ("finished",    "schedulable", "schedulable",  "observed"),
            ("finished",    "scheduled",   "schedulable",  "observed"),
            ("finished",    "started",     "schedulable",  "processing"),
            ("finished",    "finished",    "schedulable",  "processed"),
            ("finished",    "finished",    "scheduled",    "processed"),
            ("finished",    "finished",    "started",      "ingesting"),
            ("finished",    "finished",    "finished",     "finished"),
            # any cancelled
            ("observed",    "cancelled",   "schedulable",  "cancelled"),
            ("observed",    "schedulable", "cancelled",    "cancelled"),
            ("observed",    "scheduled",   "cancelled",    "cancelled"),
            ("observed",    "started",     "cancelled",    "cancelled"),
            ("observed",    "cancelled",   "schedulable",  "cancelled"),
            ("observed",    "cancelled",   "scheduled",    "cancelled"),
            ("observed",    "cancelled",   "started",      "cancelled"),
            ("observed",    "cancelled",   "finished",     "cancelled"),
            ("finished",    "cancelled",   "schedulable",  "cancelled"),
            # any error
            ("observed",    "error",       "schedulable",  "error"),
            ("observed",    "schedulable", "error",        "error"),
            ("observed",    "scheduled",   "error",        "error"),
            ("observed",    "started",     "error",        "error"),
            ("observed",    "error",       "schedulable",  "error"),
            ("observed",    "error",       "scheduled",    "error"),
            ("observed",    "error",       "started",      "error"),
            ("observed",    "error",       "finished",     "error"),
            # cancelled over error
            ("error",       "error",       "cancelled",    "cancelled")
        ]
        # Create schedulingblueprint
        schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Task Blueprint With Three Tasks")
        schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data)
        # Create related task and subtasks
        tasks_and_subtasks_dict = self.create_tasks_and_subtasks(schedulingunit_blueprint)
        # Do the actual test
        task_state_dict = {}
        for test_item in test_table:
            task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status = test_item
            info_msg = "Test with with states observation='%s',pipeline='%s',ingest='%s' should result in schedulingunit_blueprint.status '%s'" \
                        % (task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status)
            logger.info(info_msg)
            for key in tasks_and_subtasks_dict:
                self.set_task_state(task_state_dict[key], key, tasks_and_subtasks_dict[key]["task"], tasks_and_subtasks_dict[key]["subtask"])
            # Check result
            self.assertEqual(expected_schedulingunit_status, schedulingunit_blueprint.status, info_msg)

    def test_states_with_observation_pipeline_ingest_tasks_no_ingest_subtask(self):
        """
        Test the schedulingunitblueprint state when the tasks, observation, pipeline and ingest are instantiated
        Subtask of ingest is missing, which makes implicit the task state defined!
        See next table where every row represents:
            Taskstate(obs),  Taskstate(pipeline), Taskstate(ingest), Expected SchedulingUnitBlueprint Status
        """
        test_table = [
            # normal behaviour
            ("error",       "schedulable", "defined",  "error"),
            ("cancelled",   "schedulable", "defined",  "cancelled"),
            ("schedulable", "schedulable", "defined",  "schedulable"),
            ("scheduled",   "schedulable", "defined",  "scheduled"),
            ("started",     "schedulable", "defined",  "observing"),
            ("observed",    "schedulable", "defined",  "observed"),
            ("observed",    "scheduled",   "defined",  "observed"),
            ("observed",    "started",     "defined",  "processing"),
            ("observed",    "finished",    "defined",  "processing"),
            ("finished",    "schedulable", "defined",  "observed"),
        ]
        # Create schedulingblueprint
        schedulingunit_data = SchedulingUnitBlueprint_test_data(name="Task Blueprint With Three Tasks No Ingest Subtask")
        schedulingunit_blueprint = models.SchedulingUnitBlueprint.objects.create(**schedulingunit_data)
        # Create related task and subtasks (skip creation of ingest subtask)
        tasks_and_subtasks_dict = self.create_tasks_and_subtasks(schedulingunit_blueprint, ["ingest"])
        # Do the actual test
        task_state_dict = {}
        for test_item in test_table:
            task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status = test_item
            info_msg = "Test with with states observation='%s',pipeline='%s',ingest='%s' should result in schedulingunit_blueprint.status '%s'" \
                        % (task_state_dict["observation"], task_state_dict["pipeline"], task_state_dict["ingest"], expected_schedulingunit_status)
            logger.info(info_msg)
            for key in tasks_and_subtasks_dict:
                self.set_task_state(task_state_dict[key], key, tasks_and_subtasks_dict[key]["task"], tasks_and_subtasks_dict[key]["subtask"])
            # Check result
            self.assertEqual(expected_schedulingunit_status, schedulingunit_blueprint.status, info_msg)


class TestFlatStations(unittest.TestCase):
    """
    Test the property of 'flat_stations', retrieve a list of all station as a flat list
    """
    def create_UC1_observation_scheduling_unit(self, name, scheduling_set):

        constraints_template = models.SchedulingConstraintsTemplate.objects.get(name="constraints")
        constraints = add_defaults_to_json_object_for_schema({}, constraints_template.schema)

        uc1_strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.get(name="UC1 CTC+pipelines")
        scheduling_unit_spec = add_defaults_to_json_object_for_schema(uc1_strategy_template.template,
                                                                      uc1_strategy_template.scheduling_unit_template.schema)
        # limit target obs duration for demo data
        scheduling_unit_spec['tasks']['Calibrator Observation 1']['specifications_doc']['duration'] = 2 * 60
        scheduling_unit_spec['tasks']['Target Observation']['specifications_doc']['duration'] = 2 * 3600
        scheduling_unit_spec['tasks']['Calibrator Observation 2']['specifications_doc']['duration'] = 2 * 60

        # add the scheduling_unit_doc to a new SchedulingUnitDraft instance, and were ready to use it!
        return models.SchedulingUnitDraft.objects.create(name=name,
                                                         scheduling_set=scheduling_set,
                                                         requirements_template=uc1_strategy_template.scheduling_unit_template,
                                                         requirements_doc=scheduling_unit_spec,
                                                         observation_strategy_template=uc1_strategy_template,
                                                         scheduling_constraints_doc=constraints,
                                                         scheduling_constraints_template=constraints_template)

    def modify_stations_in_station_group(self, station_group_idx, lst_stations):
        """
        Modify for the scheduling_unit_blueprint created add setup, the list of stations for given group idx
        """
        station_groups = self.scheduling_unit_blueprint.requirements_doc['tasks']['Target Observation']['specifications_doc']['station_groups']
        station_groups[station_group_idx]["stations"] = lst_stations

    def setUp(self) -> None:
        # scheduling unit
        my_scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data())
        scheduling_unit_draft = self.create_UC1_observation_scheduling_unit("UC1 scheduling unit for testing",  my_scheduling_set)
        self.scheduling_unit_blueprint = create_task_blueprints_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)

    def test_with_different_stations(self):
        """
        Test with different station list and station groups
        """
        list_expected_stations = [
            "CS001", "CS002", "CS003", "CS004", "CS005", "CS006", "CS007", "CS011", "CS013", "CS017", "CS021",
            "CS024", "CS026", "CS028", "CS030", "CS031", "CS032", "CS301", "CS302", "CS401", "CS501", "RS106",
            "RS205", "RS208", "RS210", "RS305", "RS306", "RS307", "RS310", "RS406", "RS407", "RS409", "RS503",
            "RS508", "RS509",
            "DE601", "DE602", "DE603", "DE604", "DE605", "DE609", "FR606", "SE607", "UK608", "PL610", "PL611",
            "PL612", "IE613", "LV614"]
        self.assertCountEqual(list_expected_stations, self.scheduling_unit_blueprint.flat_station_list)

        # Clear all stations and check that flat_station_list is empty
        nbr_station_groups = len(self.scheduling_unit_blueprint.requirements_doc['tasks']['Target Observation']['specifications_doc']['station_groups'])
        for idx in range(nbr_station_groups):
            self.modify_stations_in_station_group(idx, [])
        self.assertEqual([], self.scheduling_unit_blueprint.flat_station_list)

        # Set two stations for all station_groups, check flat_station_list contains two stations
        for idx in range(nbr_station_groups):
            self.modify_stations_in_station_group(idx, ['CS001', 'CS002'])
        self.assertCountEqual(['CS001', 'CS002'], self.scheduling_unit_blueprint.flat_station_list)

        # Set different stations for the station_groups
        total_station_list = []
        for idx in range(nbr_station_groups):
            station_list = ['CS00%d' % idx,  'CS02%d' % idx]
            total_station_list += station_list
            self.modify_stations_in_station_group(idx, station_list)
        self.assertCountEqual(total_station_list, self.scheduling_unit_blueprint.flat_station_list)

        # Set two stations for all station_groups, check flat_station_list contains all stations
        all_stations = ["CS001","CS002","CS003","CS004","CS005","CS006","CS007","CS011","CS013","CS017","CS021","CS024",
                        "CS026","CS028","CS030","CS031","CS032","CS101","CS103","CS201","CS301","CS302","CS401","CS501",
                        "RS104","RS106","RS205","RS208","RS210","RS305","RS306","RS307","RS310","RS406","RS407","RS409",
                        "RS410","RS503","RS508","RS509",
                        "DE601","DE602","DE603","DE604","DE605","FR606","SE607","UK608","DE609","PL610","PL611","PL612",
                        "IE613","LV614"]
        for idx in range(nbr_station_groups):
            self.modify_stations_in_station_group(idx, all_stations)
        self.assertCountEqual(all_stations, self.scheduling_unit_blueprint.flat_station_list)

        # Lets set group with stations which are already in other station groups, so flat_station_list still the same
        self.modify_stations_in_station_group(0, ['CS001', 'CS001', 'DE601', 'PL612'])
        self.assertCountEqual(all_stations, self.scheduling_unit_blueprint.flat_station_list)

        # Lets add a group with stations which are NOT in other station groups, so flat_station_list so be extend now
        station_list = ['XX901', 'XX902', 'XX903', 'XX904']
        self.modify_stations_in_station_group(0, station_list)
        self.assertCountEqual(all_stations+station_list, self.scheduling_unit_blueprint.flat_station_list)