Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_test_data_django_models.py 17.40 KiB
#!/usr/bin/env python3

# Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

'''
By importing this helper module in your unittest module you get a TMSSTestDatabaseInstance
which is automatically destroyed at the end of the unittest session.
'''

#######################################################
# the methods below can be used to create test data
# naming convention is: <django_model_name>_test_data()
#######################################################

from lofar.sas.tmss.tmss.tmssapp import models
from lofar.common.json_utils import get_default_json_object_for_schema

from datetime import datetime
import uuid
import json

def GeneratorTemplate_test_data(name="my_GeneratorTemplate", version:str=None) -> dict:
    if version is None:
        version = str(uuid.uuid4())

    return {"name": name,
            "description": 'My one observation',
            "version": version,
            "schema": {"mykey": "my value"},
            "create_function": 'Funky',
            "tags": ["TMSS", "TESTING"]}

def DefaultGeneratorTemplate_test_data(name=None, template=None) -> dict:
    return {'name': name if name is not None else "DefaultGeneratorTemplate_"+str(uuid.uuid4()),
            'template': template,
            'tags':[]}

def SchedulingUnitTemplate_test_data(name="my_SchedulingUnitTemplate", version:str=None) -> dict:
    if version is None:
        version = str(uuid.uuid4())

    return {"name": name,
            "description": 'My SchedulingUnitTemplate description',
            "version": version,
            "schema": {"mykey": "my value"},
            "tags": ["TMSS", "TESTING"]}

def TaskTemplate_test_data(name="my TaskTemplate", version:str=None) -> dict:
    if version is None:
        version = str(uuid.uuid4())

    return  {"validation_code_js":"",
              "name": name,
              "description": 'My TaskTemplate description',
              "version": version,
              "schema": {"mykey": "my value"},
              "tags": ["TMSS", "TESTING"]}

def TaskRelationSelectionTemplate_test_data(name="my_TaskRelationSelectionTemplate", version:str=None) -> dict:
    if version is None:
        version = str(uuid.uuid4())

    return  {"name": name,
               "description": 'My TaskRelationSelectionTemplate description',
               "version": version,
               "schema": {"mykey": "my value"},
               "tags": ["TMSS", "TESTING"]}

def TaskConnectorType_test_data() -> dict:
    return {"role": models.Role.objects.get(value='calibrator'),
            "datatype": models.Datatype.objects.get(value='instrument model'),
            "output_of": models.TaskTemplate.objects.create(**TaskTemplate_test_data()),
            "input_of": models.TaskTemplate.objects.create(**TaskTemplate_test_data()),
            "tags": []}

def Cycle_test_data() -> dict:
    return {"name": 'my_cycle' + str(uuid.uuid4()),
            "description": "",
            "tags": [],
            "start": datetime.utcnow().isoformat(),
            "stop": datetime.utcnow().isoformat()}

def Project_test_data() -> dict:
    return  { #"cycles": [models.Cycle.objects.create(**Cycle_test_data())], # ManyToMany, use set()
              "name": 'my_project_' + str(uuid.uuid4()),
               "description": 'my description ' + str(uuid.uuid4()),
               "tags": [],
               "priority_rank": 1.0,
               "trigger_priority": 1000,
               "can_trigger": False,
               "private_data": True,
               "expert": True,
               "filler": False}

def ResourceType_test_data() -> dict:
    return  {
        "tags": [],
        "description": 'my description ' + str(uuid.uuid4()),
        "name": 'my_resource_type_' + str(uuid.uuid4()),
        "quantity": models.Quantity.objects.get(value=models.Quantity.Choices.NUMBER.value)
     }

def ProjectQuota_test_data() -> dict:
   return  { 
        "value": '1000',
        "project": models.Project.objects.create(**Project_test_data()),
        "resource_type": models.ResourceType.objects.create(**ResourceType_test_data())
    }
  
def SchedulingSet_test_data(name="my_scheduling_set", project: models.Project=None) -> dict:
    if project is None:
        project = models.Project.objects.create(**Project_test_data())

    return {"name": name,
            "description": "",
            "tags": [],
            "generator_doc": {},
            "project": project,
            "generator_template": models.GeneratorTemplate.objects.create(**GeneratorTemplate_test_data()),
            "generator_source": None}

def SchedulingUnitDraft_test_data(name="my_scheduling_unit_draft", scheduling_set: models.SchedulingSet=None, template: models.SchedulingUnitTemplate=None, requirements_doc: dict=None) -> dict:
    if scheduling_set is None:
        scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data())

    if template is None:
        template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())

    if requirements_doc is None:
        requirements_doc = get_default_json_object_for_schema(template.schema)

    return {"name": name,
            "description": "",
            "tags": [],
            "requirements_doc": requirements_doc,
            "copy_reason": models.CopyReason.objects.get(value='template'),
            "generator_instance_doc": "para",
            "copies": None,
            "scheduling_set": scheduling_set,
            "requirements_template": template }

def TaskDraft_test_data(name: str="my_task_draft", specifications_template: models.TaskTemplate=None, specifications_doc: dict=None, scheduling_unit_draft: models.SchedulingUnitDraft=None) -> dict:
    if specifications_template is None:
        specifications_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data())

    if specifications_doc is None:
        specifications_doc = get_default_json_object_for_schema(specifications_template.schema)

    if scheduling_unit_draft is None:
        scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data())

    return {"name": name,
            "description": "",
            "tags": [],
            "specifications_doc": specifications_doc,
            "copy_reason": models.CopyReason.objects.get(value='template'),
            "copies": None,
            "scheduling_unit_draft": scheduling_unit_draft,
            "specifications_template": specifications_template }

def TaskRelationDraft_test_data(producer: models.TaskDraft = None, consumer: models.TaskDraft = None) -> dict:
    if producer is None:
        producer = models.TaskDraft.objects.create(**TaskDraft_test_data())

    if consumer is None:
        consumer = models.TaskDraft.objects.create(**TaskDraft_test_data())

    return {"tags": [],
            "selection_doc": {},
            "dataformat": models.Dataformat.objects.get(value='Beamformed'),
            "producer": producer,
            "consumer": consumer,
            "input_role":  models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
            "output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
            "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data())}

def SchedulingUnitBlueprint_test_data(name='my_scheduling_unit_blueprint') -> dict:
    return {"name": name,
            "description": "",
            "tags": [],
            "requirements_doc": {},
            "do_cancel": False,
            "draft": models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data()),
            "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())}

def TaskBlueprint_test_data(name='my_task_blueprint', task_draft: models.TaskDraft = None) -> dict:
    if task_draft is None:
        task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data())

    return {"name": name,
            "description": "",
            "tags": [],
            "specifications_doc": task_draft.specifications_doc,
            "do_cancel": False,
            "draft": task_draft,
            "specifications_template": task_draft.specifications_template,
            "scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data())}

def TaskRelationBlueprint_test_data(producer: models.TaskBlueprint = None, consumer: models.TaskBlueprint = None) -> dict:
    if producer is None:
        producer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())

    if consumer is None:
        consumer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())

    return {"tags": [],
            "selection_doc": {},
            "dataformat": models.Dataformat.objects.get(value='Beamformed'),
            "input_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
            "output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
            "draft": models.TaskRelationDraft.objects.create(**TaskRelationDraft_test_data()),
            "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()),
            "producer": producer,
            "consumer": consumer}


def SubtaskTemplate_test_data(schema: object=None, version:str=None) -> dict:
    if schema is None:
        schema = {}

    if version is None:
        version = str(uuid.uuid4())

    return {"type": models.SubtaskType.objects.get(value='copy'),
            "name": "observation",
            "description": 'My one observation',
            "version": version,
            "schema": schema,
            "realtime": True,
            "queue": False,
            "tags": ["TMSS", "TESTING"]}

def TaskSchedulingRelationDraft_test_data(first: models.TaskDraft = None, second: models.TaskDraft = None) -> dict:
    if first is None:
        first = models.TaskDraft.objects.create(**TaskDraft_test_data())

    if second is None:
        second = models.TaskDraft.objects.create(**TaskDraft_test_data())
    return {"tags": [],
            "first": first,
            "second": second,
            "placement": models.SchedulingRelationPlacement.objects.get(value='after'),
            "time_offset":60}

def TaskSchedulingRelationBlueprint_test_data(first: models.TaskBlueprint = None, second: models.TaskBlueprint = None) -> dict:
    if first is None:
        first = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())

    if second is None:
        second = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())

    return {"tags": [],
            "first": first,
            "second": second,
            "placement": models.SchedulingRelationPlacement.objects.get(value='after'),
            "time_offset":60}

def DataproductSpecificationsTemplate_test_data(version:str=None) -> dict:
    if version is None:
        version = str(uuid.uuid4())

    return {"name": "data",
            "description": 'My one date',
            "version": version,
            "schema": {"mykey": "my value"},
            "tags": ["TMSS", "TESTING"]}

def DataproductFeedbackTemplate_test_data(version:str=None) -> dict:
    if version is None:
        version = str(uuid.uuid4())

    return {"name": "data",
            "description": 'My one date',
            "version": version,
            "schema": {"mykey": "my value"},
            "tags": ["TMSS", "TESTING"]}

def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict:
    if subtask is None:
        subtask = models.Subtask.objects.create(**Subtask_test_data())

    return {"subtask": subtask,
            "tags":[]}

def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None) -> dict:
    if subtask is None:
        subtask = models.Subtask.objects.create(**Subtask_test_data())

    if producer is None:
        producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data())

    if selection_doc is None:
        selection_doc = {}

    return {"subtask": subtask,
            "task_relation_blueprint": models.TaskRelationBlueprint.objects.create(**TaskRelationBlueprint_test_data()),
            "producer": producer,
            "selection_doc": selection_doc,
            "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()),
            "tags":[]}

def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_template: models.SubtaskTemplate=None,
                      specifications_doc: dict=None, start_time=None, stop_time=None, cluster=None, state=None) -> dict:

    if task_blueprint is None:
        task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())

    if subtask_template is None:
        subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data())

    if specifications_doc is None:
        specifications_doc = get_default_json_object_for_schema(subtask_template.schema)

     # Type need to be a datetime object not a str so do not add .isoformat()
    if start_time is None:
        start_time = datetime.utcnow()

    if stop_time is None:
        stop_time = datetime.utcnow()

    if cluster is None:
        cluster = models.Cluster.objects.create(name="dummy cluster", location="downstairs", tags=[])

    if state is None:
        state = models.SubtaskState.objects.get(value='defining')

    return { "start_time": start_time,
             "stop_time": stop_time,
             "state": state,
             "specifications_doc": specifications_doc,
             "task_blueprint": task_blueprint,
             "specifications_template": subtask_template,
             "tags": ["TMSS", "TESTING"],
             "do_cancel": datetime.utcnow(),
             "priority": 1,
             "schedule_method": models.ScheduleMethod.objects.get(value='manual'),
             "cluster": cluster}

def Dataproduct_test_data(producer: models.SubtaskOutput=None,
                          filename: str="my_file.ext",
                          directory: str="/data/test-projects",
                          dataformat: models.Dataformat=None,
                          specifications_doc: object=None) -> dict:

    if producer is None:
        producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data())

    if dataformat is None:
        dataformat = models.Dataformat.objects.get(value="MeasurementSet")

    if specifications_doc is None:
        specifications_doc={}

    return {"filename": filename,
            "directory": directory,
            "dataformat": dataformat,
            "deleted_since": None,
            "pinned_since": None,
            "specifications_doc": specifications_doc,
            "specifications_template": models.DataproductSpecificationsTemplate.objects.create(**DataproductSpecificationsTemplate_test_data()),
            "tags": ["TMSS", "TESTING"],
            "producer": producer,
            "do_cancel": None,
            "expected_size": 1234,
            "size": 123,
            "feedback_doc": {},
            "feedback_template": models.DataproductFeedbackTemplate.objects.create(**DataproductFeedbackTemplate_test_data())}

def AntennaSet_test_data() -> dict:
    return {"name": "observation",
            "description": 'My one observation',
            "station_type": models.StationType.objects.get(value='core'),
            "rcus": [1,2,3,4,5],
            "inputs": ['input1', 'input2'],
            "tags": ['tmss', 'testing']}


def DataproductTransform_test_data() -> dict:
    return {"input": models.Dataproduct.objects.create(**Dataproduct_test_data()),
                        "output": models.Dataproduct.objects.create(**Dataproduct_test_data()),
                        "identity": True,
                        "tags": ['tmss', 'testing']}

def Filesystem_test_data() -> dict:
    return {"capacity": 1111111111,
                        "cluster": models.Cluster.objects.create(**Cluster_test_data()),
                        "tags": ['tmss', 'testing']}

def Cluster_test_data(name="default cluster") -> dict:
    return {"name": name,
            "location": "upstairs",
            "tags": ['tmss', 'testing']}

def DataproductArchiveInfo_test_data() -> dict:
    return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()),
            "storage_ticket": "myticket_1",
            "public_since": datetime.utcnow().isoformat(),
            "corrupted_since": datetime.utcnow().isoformat(),
            "tags": ['tmss', 'testing']}

def DataproductHash_test_data() -> dict:
    return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()),
            "algorithm": models.Algorithm.objects.get(value='md5'),
            "hash": "myhash_1",
            "tags": ['tmss', 'testing']}