Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_test_data_django_models.py 17.40 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
'''
By importing this helper module in your unittest module you get a TMSSTestDatabaseInstance
which is automatically destroyed at the end of the unittest session.
'''
#######################################################
# the methods below can be used to create test data
# naming convention is: <django_model_name>_test_data()
#######################################################
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.common.json_utils import get_default_json_object_for_schema
from datetime import datetime
import uuid
import json
def GeneratorTemplate_test_data(name="my_GeneratorTemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My one observation',
"version": version,
"schema": {"mykey": "my value"},
"create_function": 'Funky',
"tags": ["TMSS", "TESTING"]}
def DefaultGeneratorTemplate_test_data(name=None, template=None) -> dict:
return {'name': name if name is not None else "DefaultGeneratorTemplate_"+str(uuid.uuid4()),
'template': template,
'tags':[]}
def SchedulingUnitTemplate_test_data(name="my_SchedulingUnitTemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My SchedulingUnitTemplate description',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def TaskTemplate_test_data(name="my TaskTemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"validation_code_js":"",
"name": name,
"description": 'My TaskTemplate description',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def TaskRelationSelectionTemplate_test_data(name="my_TaskRelationSelectionTemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My TaskRelationSelectionTemplate description',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def TaskConnectorType_test_data() -> dict:
return {"role": models.Role.objects.get(value='calibrator'),
"datatype": models.Datatype.objects.get(value='instrument model'),
"output_of": models.TaskTemplate.objects.create(**TaskTemplate_test_data()),
"input_of": models.TaskTemplate.objects.create(**TaskTemplate_test_data()),
"tags": []}
def Cycle_test_data() -> dict:
return {"name": 'my_cycle' + str(uuid.uuid4()),
"description": "",
"tags": [],
"start": datetime.utcnow().isoformat(),
"stop": datetime.utcnow().isoformat()}
def Project_test_data() -> dict:
return { #"cycles": [models.Cycle.objects.create(**Cycle_test_data())], # ManyToMany, use set()
"name": 'my_project_' + str(uuid.uuid4()),
"description": 'my description ' + str(uuid.uuid4()),
"tags": [],
"priority_rank": 1.0,
"trigger_priority": 1000,
"can_trigger": False,
"private_data": True,
"expert": True,
"filler": False}
def ResourceType_test_data() -> dict:
return {
"tags": [],
"description": 'my description ' + str(uuid.uuid4()),
"name": 'my_resource_type_' + str(uuid.uuid4()),
"quantity": models.Quantity.objects.get(value=models.Quantity.Choices.NUMBER.value)
}
def ProjectQuota_test_data() -> dict:
return {
"value": '1000',
"project": models.Project.objects.create(**Project_test_data()),
"resource_type": models.ResourceType.objects.create(**ResourceType_test_data())
}
def SchedulingSet_test_data(name="my_scheduling_set", project: models.Project=None) -> dict:
if project is None:
project = models.Project.objects.create(**Project_test_data())
return {"name": name,
"description": "",
"tags": [],
"generator_doc": {},
"project": project,
"generator_template": models.GeneratorTemplate.objects.create(**GeneratorTemplate_test_data()),
"generator_source": None}
def SchedulingUnitDraft_test_data(name="my_scheduling_unit_draft", scheduling_set: models.SchedulingSet=None, template: models.SchedulingUnitTemplate=None, requirements_doc: dict=None) -> dict:
if scheduling_set is None:
scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data())
if template is None:
template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())
if requirements_doc is None:
requirements_doc = get_default_json_object_for_schema(template.schema)
return {"name": name,
"description": "",
"tags": [],
"requirements_doc": requirements_doc,
"copy_reason": models.CopyReason.objects.get(value='template'),
"generator_instance_doc": "para",
"copies": None,
"scheduling_set": scheduling_set,
"requirements_template": template }
def TaskDraft_test_data(name: str="my_task_draft", specifications_template: models.TaskTemplate=None, specifications_doc: dict=None, scheduling_unit_draft: models.SchedulingUnitDraft=None) -> dict:
if specifications_template is None:
specifications_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data())
if specifications_doc is None:
specifications_doc = get_default_json_object_for_schema(specifications_template.schema)
if scheduling_unit_draft is None:
scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data())
return {"name": name,
"description": "",
"tags": [],
"specifications_doc": specifications_doc,
"copy_reason": models.CopyReason.objects.get(value='template'),
"copies": None,
"scheduling_unit_draft": scheduling_unit_draft,
"specifications_template": specifications_template }
def TaskRelationDraft_test_data(producer: models.TaskDraft = None, consumer: models.TaskDraft = None) -> dict:
if producer is None:
producer = models.TaskDraft.objects.create(**TaskDraft_test_data())
if consumer is None:
consumer = models.TaskDraft.objects.create(**TaskDraft_test_data())
return {"tags": [],
"selection_doc": {},
"dataformat": models.Dataformat.objects.get(value='Beamformed'),
"producer": producer,
"consumer": consumer,
"input_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
"output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
"selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data())}
def SchedulingUnitBlueprint_test_data(name='my_scheduling_unit_blueprint') -> dict:
return {"name": name,
"description": "",
"tags": [],
"requirements_doc": {},
"do_cancel": False,
"draft": models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data()),
"requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())}
def TaskBlueprint_test_data(name='my_task_blueprint', task_draft: models.TaskDraft = None) -> dict:
if task_draft is None:
task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data())
return {"name": name,
"description": "",
"tags": [],
"specifications_doc": task_draft.specifications_doc,
"do_cancel": False,
"draft": task_draft,
"specifications_template": task_draft.specifications_template,
"scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data())}
def TaskRelationBlueprint_test_data(producer: models.TaskBlueprint = None, consumer: models.TaskBlueprint = None) -> dict:
if producer is None:
producer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
if consumer is None:
consumer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
return {"tags": [],
"selection_doc": {},
"dataformat": models.Dataformat.objects.get(value='Beamformed'),
"input_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
"output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
"draft": models.TaskRelationDraft.objects.create(**TaskRelationDraft_test_data()),
"selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()),
"producer": producer,
"consumer": consumer}
def SubtaskTemplate_test_data(schema: object=None, version:str=None) -> dict:
if schema is None:
schema = {}
if version is None:
version = str(uuid.uuid4())
return {"type": models.SubtaskType.objects.get(value='copy'),
"name": "observation",
"description": 'My one observation',
"version": version,
"schema": schema,
"realtime": True,
"queue": False,
"tags": ["TMSS", "TESTING"]}
def TaskSchedulingRelationDraft_test_data(first: models.TaskDraft = None, second: models.TaskDraft = None) -> dict:
if first is None:
first = models.TaskDraft.objects.create(**TaskDraft_test_data())
if second is None:
second = models.TaskDraft.objects.create(**TaskDraft_test_data())
return {"tags": [],
"first": first,
"second": second,
"placement": models.SchedulingRelationPlacement.objects.get(value='after'),
"time_offset":60}
def TaskSchedulingRelationBlueprint_test_data(first: models.TaskBlueprint = None, second: models.TaskBlueprint = None) -> dict:
if first is None:
first = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
if second is None:
second = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
return {"tags": [],
"first": first,
"second": second,
"placement": models.SchedulingRelationPlacement.objects.get(value='after'),
"time_offset":60}
def DataproductSpecificationsTemplate_test_data(version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": "data",
"description": 'My one date',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def DataproductFeedbackTemplate_test_data(version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": "data",
"description": 'My one date',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict:
if subtask is None:
subtask = models.Subtask.objects.create(**Subtask_test_data())
return {"subtask": subtask,
"tags":[]}
def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None) -> dict:
if subtask is None:
subtask = models.Subtask.objects.create(**Subtask_test_data())
if producer is None:
producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data())
if selection_doc is None:
selection_doc = {}
return {"subtask": subtask,
"task_relation_blueprint": models.TaskRelationBlueprint.objects.create(**TaskRelationBlueprint_test_data()),
"producer": producer,
"selection_doc": selection_doc,
"selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()),
"tags":[]}
def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_template: models.SubtaskTemplate=None,
specifications_doc: dict=None, start_time=None, stop_time=None, cluster=None, state=None) -> dict:
if task_blueprint is None:
task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
if subtask_template is None:
subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data())
if specifications_doc is None:
specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
# Type need to be a datetime object not a str so do not add .isoformat()
if start_time is None:
start_time = datetime.utcnow()
if stop_time is None:
stop_time = datetime.utcnow()
if cluster is None:
cluster = models.Cluster.objects.create(name="dummy cluster", location="downstairs", tags=[])
if state is None:
state = models.SubtaskState.objects.get(value='defining')
return { "start_time": start_time,
"stop_time": stop_time,
"state": state,
"specifications_doc": specifications_doc,
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"tags": ["TMSS", "TESTING"],
"do_cancel": datetime.utcnow(),
"priority": 1,
"schedule_method": models.ScheduleMethod.objects.get(value='manual'),
"cluster": cluster}
def Dataproduct_test_data(producer: models.SubtaskOutput=None,
filename: str="my_file.ext",
directory: str="/data/test-projects",
dataformat: models.Dataformat=None,
specifications_doc: object=None) -> dict:
if producer is None:
producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data())
if dataformat is None:
dataformat = models.Dataformat.objects.get(value="MeasurementSet")
if specifications_doc is None:
specifications_doc={}
return {"filename": filename,
"directory": directory,
"dataformat": dataformat,
"deleted_since": None,
"pinned_since": None,
"specifications_doc": specifications_doc,
"specifications_template": models.DataproductSpecificationsTemplate.objects.create(**DataproductSpecificationsTemplate_test_data()),
"tags": ["TMSS", "TESTING"],
"producer": producer,
"do_cancel": None,
"expected_size": 1234,
"size": 123,
"feedback_doc": {},
"feedback_template": models.DataproductFeedbackTemplate.objects.create(**DataproductFeedbackTemplate_test_data())}
def AntennaSet_test_data() -> dict:
return {"name": "observation",
"description": 'My one observation',
"station_type": models.StationType.objects.get(value='core'),
"rcus": [1,2,3,4,5],
"inputs": ['input1', 'input2'],
"tags": ['tmss', 'testing']}
def DataproductTransform_test_data() -> dict:
return {"input": models.Dataproduct.objects.create(**Dataproduct_test_data()),
"output": models.Dataproduct.objects.create(**Dataproduct_test_data()),
"identity": True,
"tags": ['tmss', 'testing']}
def Filesystem_test_data() -> dict:
return {"capacity": 1111111111,
"cluster": models.Cluster.objects.create(**Cluster_test_data()),
"tags": ['tmss', 'testing']}
def Cluster_test_data(name="default cluster") -> dict:
return {"name": name,
"location": "upstairs",
"tags": ['tmss', 'testing']}
def DataproductArchiveInfo_test_data() -> dict:
return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()),
"storage_ticket": "myticket_1",
"public_since": datetime.utcnow().isoformat(),
"corrupted_since": datetime.utcnow().isoformat(),
"tags": ['tmss', 'testing']}
def DataproductHash_test_data() -> dict:
return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()),
"algorithm": models.Algorithm.objects.get(value='md5'),
"hash": "myhash_1",
"tags": ['tmss', 'testing']}