#!/usr/bin/env python3 # Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. ''' By importing this helper module in your unittest module you get a TMSSTestDatabaseInstance which is automatically destroyed at the end of the unittest session. ''' ####################################################### # the methods below can be used to create test data # naming convention is: <django_model_name>_test_data() ####################################################### from lofar.sas.tmss.tmss.tmssapp import models from lofar.common.json_utils import get_default_json_object_for_schema from lofar.sas.tmss.test.test_utils import minimal_json_schema from datetime import datetime, timedelta import uuid import json def GeneratorTemplate_test_data(name="my_GeneratorTemplate") -> dict: return {"name": name, "description": 'My one observation', "schema": minimal_json_schema(), "create_function": 'Funky', "tags": ["TMSS", "TESTING"]} def DefaultGeneratorTemplate_test_data(name=None, template=None) -> dict: return {'name': name if name is not None else "DefaultGeneratorTemplate_"+str(uuid.uuid4()), 'template': template, 'tags':[]} def SchedulingUnitTemplate_test_data(name="my_SchedulingUnitTemplate", schema:dict=None) -> dict: if schema is None: schema = minimal_json_schema(properties={ "foo" : { "type": "string", "default": "bar" } }, required=["foo"]) return {"name": name, "description": 'My SchedulingUnitTemplate description', "schema": schema, "tags": ["TMSS", "TESTING"]} def SchedulingConstraintsTemplate_test_data(name="my_SchedulingConstraintsTemplate", schema:dict=None) -> dict: if schema is None: schema = minimal_json_schema(properties={ "foo" : { "type": "string", "default": "bar" } }, required=["foo"]) return {"name": name, "description": 'My SchedulingConstraintsTemplate description', "schema": schema, "tags": ["TMSS", "TESTING"]} def SchedulingUnitObservingStrategyTemplate_test_data(name="my_SchedulingUnitObservingStrategyTemplate", scheduling_unit_template:models.SchedulingUnitTemplate=None, template:dict=None) -> dict: if scheduling_unit_template is None: scheduling_unit_template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data()) if template is None: template = get_default_json_object_for_schema(scheduling_unit_template.schema) return {"name": name, "description": 'My SchedulingUnitTemplate description', "template": template, "scheduling_unit_template": scheduling_unit_template, "tags": ["TMSS", "TESTING"]} def TaskTemplate_test_data(name="my TaskTemplate", description:str=None, schema:dict=None, task_type_value:str=None) -> dict: if schema is None: schema = minimal_json_schema(properties={"mykey": {}}) if task_type_value is None: task_type_value = 'observation' return {"type": models.TaskType.objects.get(value=task_type_value), "validation_code_js":"", "name": name, "description": description or "<no description>", "schema": schema, "tags": ["TMSS", "TESTING"]} def TaskRelationSelectionTemplate_test_data(name="my_TaskRelationSelectionTemplate") -> dict: return {"name": name, "description": 'My TaskRelationSelectionTemplate description', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def TaskConnectorType_test_data() -> dict: return {"role": models.Role.objects.get(value='calibrator'), "datatype": models.Datatype.objects.get(value='instrument model'), "dataformat": models.Dataformat.objects.get(value='Beamformed'), "task_template": models.TaskTemplate.objects.create(**TaskTemplate_test_data()), "iotype": models.IOType.objects.get(value=models.IOType.Choices.OUTPUT.value), "tags": []} def Cycle_test_data() -> dict: return {"name": 'my_cycle' + str(uuid.uuid4()), "description": "", "tags": [], "start": datetime.utcnow().isoformat(), "stop": datetime.utcnow().isoformat()} def Project_test_data(name: str=None, priority_rank: int = 1, auto_pin=False, can_trigger=False) -> dict: if name is None: name = 'my_project_' + str(uuid.uuid4()) return { #"cycles": [models.Cycle.objects.create(**Cycle_test_data())], # ManyToMany, use set() "name": name, "description": 'my description ' + str(uuid.uuid4()), "tags": [], "auto_ingest": False, "priority_rank": priority_rank, "trigger_priority": 1000, "can_trigger": can_trigger, "private_data": True, "expert": True, "filler": False, "auto_pin": auto_pin} def ResourceType_test_data(quantity: models.Quantity=None) -> dict: if quantity is None: quantity = models.Quantity.objects.get(value=models.Quantity.Choices.BYTES.value) return { "tags": [], "description": 'my description ' + str(uuid.uuid4()), "name": 'my_resource_type_' + str(uuid.uuid4()), "quantity": quantity } def ProjectQuota_test_data(value: int=1000, project: models.Project=None, resource_type: models.ResourceType=None) -> dict: if project is None: project = models.Project.objects.create(**Project_test_data()) if resource_type is None: resource_type = models.ResourceType.objects.create(**ResourceType_test_data()) return { "value": value, "project": project, "resource_type": resource_type } def ProjectQuotaArchiveLocation_test_data(project_quota: models.ProjectQuota=None, archive_location: models.Filesystem=None) -> dict: if project_quota is None: project_quota = models.ProjectQuota.objects.create(**ProjectQuota_test_data()) if archive_location is None: archive_location = models.Filesystem.objects.create(**Filesystem_test_data()) return { "project_quota": project_quota, "archive_location": archive_location } def SchedulingSet_test_data(name="my_scheduling_set", project: models.Project=None) -> dict: if project is None: project = models.Project.objects.create(**Project_test_data()) generator_template = models.GeneratorTemplate.objects.create(**GeneratorTemplate_test_data()) generator_doc = get_default_json_object_for_schema(generator_template.schema) return {"name": name, "description": "", "tags": [], "generator_doc": generator_doc, "project": project, "generator_template": generator_template, "generator_source": None} def SchedulingUnitDraft_test_data(name="my_scheduling_unit_draft", scheduling_set: models.SchedulingSet=None, template: models.SchedulingUnitTemplate=None, requirements_doc: dict=None, observation_strategy_template: models.SchedulingUnitObservingStrategyTemplate=None, scheduling_constraints_doc: dict=None, scheduling_constraints_template: models.SchedulingConstraintsTemplate=None, is_triggered=False) -> dict: if scheduling_set is None: scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data()) if template is None: template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data()) if requirements_doc is None: requirements_doc = get_default_json_object_for_schema(template.schema) if scheduling_constraints_template is None: scheduling_constraints_template = models.SchedulingConstraintsTemplate.objects.create(**SchedulingConstraintsTemplate_test_data()) if scheduling_constraints_doc is None: scheduling_constraints_doc = get_default_json_object_for_schema(scheduling_constraints_template.schema) return {"name": name, "description": "", "tags": [], "requirements_doc": requirements_doc, "copy_reason": models.CopyReason.objects.get(value='template'), "generator_instance_doc": "para", "copies": None, "scheduling_set": scheduling_set, "requirements_template": template, "observation_strategy_template": observation_strategy_template, "scheduling_constraints_template": scheduling_constraints_template, "scheduling_constraints_doc": scheduling_constraints_doc, "is_triggered": is_triggered} def TaskDraft_test_data(name: str=None, specifications_template: models.TaskTemplate=None, specifications_doc: dict=None, scheduling_unit_draft: models.SchedulingUnitDraft=None, output_pinned=False) -> dict: if name is None: name = "my_task_draft_" + str(uuid.uuid4()) if specifications_template is None: specifications_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data()) if specifications_doc is None: specifications_doc = get_default_json_object_for_schema(specifications_template.schema) if scheduling_unit_draft is None: scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data()) return {"name": name, "description": "", "tags": [], "specifications_doc": specifications_doc, "copy_reason": models.CopyReason.objects.get(value='template'), "copies": None, "scheduling_unit_draft": scheduling_unit_draft, "specifications_template": specifications_template, "output_pinned": output_pinned} def TaskRelationDraft_test_data(producer: models.TaskDraft = None, consumer: models.TaskDraft = None) -> dict: if producer is None: producer = models.TaskDraft.objects.create(**TaskDraft_test_data()) if consumer is None: consumer = models.TaskDraft.objects.create(**TaskDraft_test_data()) return {"tags": [], "selection_doc": {}, "producer": producer, "consumer": consumer, "input_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()), "output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()), "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data())} def SchedulingUnitBlueprint_test_data(name=None, requirements_template: models.SchedulingUnitTemplate=None, draft=None, output_pinned=None) -> dict: if name is None: name = 'my_scheduling_unit_blueprint_' + str(uuid.uuid4()) if requirements_template is None: requirements_template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data()) if draft is None: draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data()) if output_pinned is None: output_pinned = False return {"name": name, "description": "", "tags": [], "requirements_doc": get_default_json_object_for_schema(requirements_template.schema), "requirements_template": requirements_template, "do_cancel": False, "draft": draft, "output_pinned": output_pinned} def TaskBlueprint_test_data(name: str=None, task_draft: models.TaskDraft = None, scheduling_unit_blueprint: models.SchedulingUnitBlueprint = None, specifications_template: models.TaskTemplate=None, specifications_doc: dict=None, output_pinned=False) -> dict: if name is None: name = 'my_task_blueprint_'+str(uuid.uuid4()) if scheduling_unit_blueprint is None: scheduling_unit_blueprint = models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data()) if task_draft is None: task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data(scheduling_unit_draft=scheduling_unit_blueprint.draft)) if specifications_template is None: specifications_template = task_draft.specifications_template if specifications_doc is None: specifications_doc = task_draft.specifications_doc return {"name": name, "description": "", "tags": [], "specifications_doc": specifications_doc, "do_cancel": False, "draft": task_draft, "specifications_template": specifications_template, "scheduling_unit_blueprint": scheduling_unit_blueprint, "output_pinned": output_pinned} def TaskRelationBlueprint_test_data(producer: models.TaskBlueprint = None, consumer: models.TaskBlueprint = None) -> dict: if producer is None: producer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) if consumer is None: consumer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) return {"tags": [], "selection_doc": {}, "input_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()), "output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()), "draft": models.TaskRelationDraft.objects.create(**TaskRelationDraft_test_data()), "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()), "producer": producer, "consumer": consumer} def SubtaskTemplate_test_data(schema: object=None, subtask_type_value:str='observation') -> dict: if schema is None: schema = minimal_json_schema() return {"type": models.SubtaskType.objects.get(value=subtask_type_value), "name": subtask_type_value + " template", "description": '<description>', "schema": schema, "realtime": True, "queue": False, "tags": ["TMSS", "TESTING"]} def TaskSchedulingRelationDraft_test_data(first: models.TaskDraft = None, second: models.TaskDraft = None) -> dict: if first is None: first = models.TaskDraft.objects.create(**TaskDraft_test_data()) if second is None: second = models.TaskDraft.objects.create(**TaskDraft_test_data()) return {"tags": [], "first": first, "second": second, "placement": models.SchedulingRelationPlacement.objects.get(value='after'), "time_offset":60} def TaskSchedulingRelationBlueprint_test_data(first: models.TaskBlueprint = None, second: models.TaskBlueprint = None) -> dict: if first is None: first = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) if second is None: second = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data()) return {"tags": [], "first": first, "second": second, "placement": models.SchedulingRelationPlacement.objects.get(value='after'), "time_offset":60} def DataproductSpecificationsTemplate_test_data() -> dict: return {"name": "data", "description": 'My one date', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def DataproductFeedbackTemplate_test_data() -> dict: return {"name": "data", "description": 'My one date', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def SubtaskOutput_test_data(subtask: models.Subtask=None, task_blueprint: models.TaskBlueprint=None) -> dict: if subtask is None: subtask = models.Subtask.objects.create(**Subtask_test_data()) if task_blueprint is None: task_blueprint = models. TaskBlueprint.objects.create(**TaskBlueprint_test_data(())) return {"subtask": subtask, "task_blueprint": task_blueprint, "tags":[]} def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None, selection_template: models.TaskRelationSelectionTemplate=None) -> dict: if subtask is None: subtask = models.Subtask.objects.create(**Subtask_test_data()) if producer is None: producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data()) if selection_template is None: selection_template = models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()) if selection_doc is None: selection_doc = get_default_json_object_for_schema(selection_template.schema) return {"subtask": subtask, "task_relation_blueprint": models.TaskRelationBlueprint.objects.create(**TaskRelationBlueprint_test_data()), "producer": producer, "selection_doc": selection_doc, "selection_template": selection_template, "tags":[]} def Subtask_test_data(subtask_template: models.SubtaskTemplate=None, specifications_doc: dict=None, start_time=None, stop_time=None, cluster=None, state=None, raw_feedback=None) -> dict: if subtask_template is None: subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data()) if specifications_doc is None: specifications_doc = get_default_json_object_for_schema(subtask_template.schema) # Type need to be a datetime object not a str so do not add .isoformat() if start_time is None: start_time = datetime.utcnow() if stop_time is None: stop_time = datetime.utcnow() + timedelta(minutes=10) if cluster is None: cluster = models.Cluster.objects.create(name="dummy cluster", location="downstairs", archive_site=True, tags=[]) if state is None: state = models.SubtaskState.objects.get(value='defining') return { "start_time": start_time, "stop_time": stop_time, "state": state, "specifications_doc": specifications_doc, #"task_blueprint": task_blueprint, # ManyToMany, use set() "specifications_template": subtask_template, "tags": ["TMSS", "TESTING"], "do_cancel": datetime.utcnow(), "cluster": cluster, "raw_feedback": raw_feedback, "global_identifier": models.SIPidentifier.objects.create(source="TMSS")} def Dataproduct_test_data(producer: models.SubtaskOutput=None, filename: str=None, directory: str=None, dataformat: models.Dataformat=None, datatype: models.Datatype=None, specifications_doc: object=None, specifications_template: models.DataproductSpecificationsTemplate=None, feedback_doc: object = None, feedback_template: models.DataproductFeedbackTemplate = None) -> dict: if filename is None: filename = "my_file_%s.ext" % uuid.uuid4() if directory is None: directory = "/tmp/test_data/%s/" % uuid.uuid4() if producer is None: producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data()) if dataformat is None: dataformat = models.Dataformat.objects.get(value="MeasurementSet") if datatype is None: datatype = models.Datatype.objects.get(value="visibilities") if specifications_template is None: specifications_template = models.DataproductSpecificationsTemplate.objects.create(**DataproductSpecificationsTemplate_test_data()) if specifications_doc is None: specifications_doc = get_default_json_object_for_schema(specifications_template.schema) if feedback_template is None: feedback_template = models.DataproductFeedbackTemplate.objects.create(**DataproductFeedbackTemplate_test_data()) if feedback_doc is None: feedback_doc = get_default_json_object_for_schema(feedback_template.schema) return {"filename": filename, "directory": directory, "dataformat": dataformat, "datatype": datatype, "deleted_since": None, "specifications_doc": specifications_doc, "specifications_template": specifications_template, "tags": ["TMSS", "TESTING"], "producer": producer, "do_cancel": None, "expected_size": 1234, "size": 123, "feedback_doc": feedback_doc, "feedback_template": feedback_template, "sap": models.SAP.objects.create(**SAP_test_data()), "global_identifier": models.SIPidentifier.objects.create(source="TMSS")} def AntennaSet_test_data() -> dict: return {"name": "observation", "description": 'My one observation', "station_type": models.StationType.objects.get(value='core'), "rcus": [1,2,3,4,5], "inputs": ['input1', 'input2'], "tags": ['tmss', 'testing']} def DataproductTransform_test_data() -> dict: return {"input": models.Dataproduct.objects.create(**Dataproduct_test_data()), "output": models.Dataproduct.objects.create(**Dataproduct_test_data()), "identity": True, "tags": ['tmss', 'testing']} def Filesystem_test_data(directory: str="/", cluster: models.Cluster=None) -> dict: if cluster is None: cluster = models.Cluster.objects.create(**Cluster_test_data()) return {"capacity": 1111111111, "cluster": cluster, "directory": directory, "tags": ['tmss', 'testing']} def Cluster_test_data(name: str="default cluster", archive_site: bool=True) -> dict: return {"name": name, "location": "upstairs", "archive_site": archive_site, "tags": ['tmss', 'testing']} def DataproductArchiveInfo_test_data() -> dict: return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()), "storage_ticket": "myticket_1", "public_since": datetime.utcnow().isoformat(), "corrupted_since": datetime.utcnow().isoformat(), "tags": ['tmss', 'testing']} def DataproductHash_test_data() -> dict: return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()), "hash_algorithm": models.HashAlgorithm.objects.get(value='md5'), "hash": "myhash_1", "tags": ['tmss', 'testing']} def SAP_test_data(specifications_template=None, specifications_doc=None) -> dict: if specifications_template is None: specifications_template = models.SAPTemplate.objects.create(**SAPTemplate_test_data()) if specifications_doc is None: specifications_doc = get_default_json_object_for_schema(specifications_template.schema) return {"specifications_doc": specifications_doc, "specifications_template": specifications_template, "global_identifier": models.SIPidentifier.objects.create(source="TMSS"), "tags": ['tmss', 'testing']} def SAPTemplate_test_data() -> dict: return {"name": "my_sap_template" + str(uuid.uuid4()), "description": 'My SAP test template', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def ReservationTemplate_test_data(name="my_ReservationTemplate", schema:dict=None) -> dict: if schema is None: schema = minimal_json_schema(properties={ "foo" : { "type": "string", "default": "bar" } }, required=["foo"]) return {"name": name, "description": 'My ReservationTemplate description', "schema": schema, "tags": ["TMSS", "TESTING"]} def Reservation_test_data(name="MyReservation", duration=None, start_time=None, project: models.Project = None) -> dict: if project is None: project = models.Project.objects.create(**Project_test_data()) if start_time is None: start_time = datetime.utcnow() + timedelta(hours=12) if duration is None: stop_time = None else: stop_time = start_time + timedelta(seconds=duration) specifications_template = models.ReservationTemplate.objects.create(**ReservationTemplate_test_data()) specifications_doc = get_default_json_object_for_schema(specifications_template.schema) return {"name": name, "project": project, "description": "Test Reservation", "tags": ["TMSS", "TESTING"], "start_time": start_time, "stop_time": stop_time, # can be None "specifications_doc": specifications_doc, "specifications_template": specifications_template} def ReservationStrategyTemplate_test_data(name="my_ReservationStrategyTemplate", reservation_template:models.ReservationTemplate=None, template:dict=None) -> dict: if reservation_template is None: reservation_template = models.ReservationTemplate.objects.create(**ReservationTemplate_test_data()) if template is None: template = get_default_json_object_for_schema(reservation_template.schema) return {"name": name, "description": 'My Reservation Template description', "template": template, "reservation_template": reservation_template, "tags": ["TMSS", "TESTING"]} def ProjectPermission_test_data(name=None, GET=None, PUT=None, POST=None, PATCH=None, DELETE=None) -> dict: if name is None: name = 'MyProjectPermission_%s' % uuid.uuid4() return{'name': name, 'GET': GET or [], 'PUT': PUT or [], 'PATCH': PATCH or [], 'DELETE': DELETE or [], 'POST': POST or []}