Skip to content
Snippets Groups Projects
Select Git revision
  • 4c3acc8792b96e0b7430f9774c9f7906d71219bc
  • master default protected
  • sdptr_lift
  • v1.5.0
  • v1.4.0
  • v1.3.0
  • v1.2.0
  • v1.1.2
  • v1.1.1
  • v1.1.0
  • v1.0.0
11 results

fpga.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tmss_test_data_django_models.py 19.51 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    '''
    By importing this helper module in your unittest module you get a TMSSTestDatabaseInstance
    which is automatically destroyed at the end of the unittest session.
    '''
    
    #######################################################
    # the methods below can be used to create test data
    # naming convention is: <django_model_name>_test_data()
    #######################################################
    
    from lofar.sas.tmss.tmss.tmssapp import models
    from lofar.common.json_utils import get_default_json_object_for_schema
    
    from datetime import datetime, timedelta
    import uuid
    import json
    
    def GeneratorTemplate_test_data(name="my_GeneratorTemplate", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        return {"name": name,
                "description": 'My one observation',
                "version": version,
                "schema": {"mykey": "my value"},
                "create_function": 'Funky',
                "tags": ["TMSS", "TESTING"]}
    
    def DefaultGeneratorTemplate_test_data(name=None, template=None) -> dict:
        return {'name': name if name is not None else "DefaultGeneratorTemplate_"+str(uuid.uuid4()),
                'template': template,
                'tags':[]}
    
    def SchedulingUnitTemplate_test_data(name="my_SchedulingUnitTemplate", version:str=None, schema:dict=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        if schema is None:
            schema = { "$schema": "https://json-schema.org/draft/2019-09/schema",
                       "type": "object",
                       "properties": { "foo" : { "type": "string", "default": "bar" } },
                       "required": ["foo"],
                       "default": {}
                       }
    
        return {"name": name,
                "description": 'My SchedulingUnitTemplate description',
                "version": version,
                "schema": schema,
                "tags": ["TMSS", "TESTING"]}
    
    def SchedulingUnitObservingStrategyTemplate_test_data(name="my_SchedulingUnitObservingStrategyTemplate", version:str=None,
                                                          scheduling_unit_template:models.SchedulingUnitTemplate=None,
                                                          template:dict=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        if scheduling_unit_template is None:
            scheduling_unit_template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())
    
        if template is None:
            template = get_default_json_object_for_schema(scheduling_unit_template.schema)
    
        return {"name": name,
                "description": 'My SchedulingUnitTemplate description',
                "version": version,
                "template": template,
                "scheduling_unit_template": scheduling_unit_template,
                "tags": ["TMSS", "TESTING"]}
    
    def TaskTemplate_test_data(name="my TaskTemplate", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        return {"type": models.TaskType.objects.get(value='observation'),
                "validation_code_js":"",
                "name": name,
                "description": 'My TaskTemplate description',
                "version": version,
                "schema": {"mykey": "my value"},
                "tags": ["TMSS", "TESTING"]}
    
    def TaskRelationSelectionTemplate_test_data(name="my_TaskRelationSelectionTemplate", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        return  {"name": name,
                   "description": 'My TaskRelationSelectionTemplate description',
                   "version": version,
                   "schema": {"mykey": "my value"},
                   "tags": ["TMSS", "TESTING"]}
    
    def TaskConnectorType_test_data() -> dict:
        return {"role": models.Role.objects.get(value='calibrator'),
                "datatype": models.Datatype.objects.get(value='instrument model'),
                "output_of": models.TaskTemplate.objects.create(**TaskTemplate_test_data()),
                "input_of": models.TaskTemplate.objects.create(**TaskTemplate_test_data()),
                "tags": []}
    
    def Cycle_test_data() -> dict:
        return {"name": 'my_cycle' + str(uuid.uuid4()),
                "description": "",
                "tags": [],
                "start": datetime.utcnow().isoformat(),
                "stop": datetime.utcnow().isoformat()}
    
    def Project_test_data(archive_subdirectory="my_project/") -> dict:
        return  { #"cycles": [models.Cycle.objects.create(**Cycle_test_data())], # ManyToMany, use set()
                  "name": 'my_project_' + str(uuid.uuid4()),
                   "description": 'my description ' + str(uuid.uuid4()),
                   "tags": [],
                   "priority_rank": 1.0,
                   "trigger_priority": 1000,
                   "can_trigger": False,
                   "private_data": True,
                   "expert": True,
                   "filler": False,
                   "archive_subdirectory": archive_subdirectory}
    
    def ResourceType_test_data() -> dict:
        return  {
            "tags": [],
            "description": 'my description ' + str(uuid.uuid4()),
            "name": 'my_resource_type_' + str(uuid.uuid4()),
            "quantity": models.Quantity.objects.get(value=models.Quantity.Choices.NUMBER.value)
         }
    
    def ProjectQuota_test_data() -> dict:
       return  { 
            "value": '1000',
            "project": models.Project.objects.create(**Project_test_data()),
            "resource_type": models.ResourceType.objects.create(**ResourceType_test_data())
        }
      
    def SchedulingSet_test_data(name="my_scheduling_set", project: models.Project=None) -> dict:
        if project is None:
            project = models.Project.objects.create(**Project_test_data())
    
        return {"name": name,
                "description": "",
                "tags": [],
                "generator_doc": {},
                "project": project,
                "generator_template": models.GeneratorTemplate.objects.create(**GeneratorTemplate_test_data()),
                "generator_source": None}
    
    def SchedulingUnitDraft_test_data(name="my_scheduling_unit_draft", scheduling_set: models.SchedulingSet=None,
                                      template: models.SchedulingUnitTemplate=None, requirements_doc: dict=None,
                                      observation_strategy_template: models.SchedulingUnitObservingStrategyTemplate=None) -> dict:
        if scheduling_set is None:
            scheduling_set = models.SchedulingSet.objects.create(**SchedulingSet_test_data())
    
        if template is None:
            template = models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())
    
        if requirements_doc is None:
            requirements_doc = get_default_json_object_for_schema(template.schema)
    
        if observation_strategy_template is None:
            observation_strategy_template = models.SchedulingUnitObservingStrategyTemplate.objects.create(**SchedulingUnitObservingStrategyTemplate_test_data())
    
        return {"name": name,
                "description": "",
                "tags": [],
                "requirements_doc": requirements_doc,
                "copy_reason": models.CopyReason.objects.get(value='template'),
                "generator_instance_doc": "para",
                "copies": None,
                "scheduling_set": scheduling_set,
                "requirements_template": template,
                "observation_strategy_template": observation_strategy_template }
    
    def TaskDraft_test_data(name: str="my_task_draft", specifications_template: models.TaskTemplate=None, specifications_doc: dict=None, scheduling_unit_draft: models.SchedulingUnitDraft=None) -> dict:
        if specifications_template is None:
            specifications_template = models.TaskTemplate.objects.create(**TaskTemplate_test_data())
    
        if specifications_doc is None:
            specifications_doc = get_default_json_object_for_schema(specifications_template.schema)
    
        if scheduling_unit_draft is None:
            scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data())
    
        return {"name": name,
                "description": "",
                "tags": [],
                "specifications_doc": specifications_doc,
                "copy_reason": models.CopyReason.objects.get(value='template'),
                "copies": None,
                "scheduling_unit_draft": scheduling_unit_draft,
                "specifications_template": specifications_template }
    
    def TaskRelationDraft_test_data(producer: models.TaskDraft = None, consumer: models.TaskDraft = None) -> dict:
        if producer is None:
            producer = models.TaskDraft.objects.create(**TaskDraft_test_data())
    
        if consumer is None:
            consumer = models.TaskDraft.objects.create(**TaskDraft_test_data())
    
        return {"tags": [],
                "selection_doc": {},
                "dataformat": models.Dataformat.objects.get(value='Beamformed'),
                "producer": producer,
                "consumer": consumer,
                "input_role":  models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
                "output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
                "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data())}
    
    def SchedulingUnitBlueprint_test_data(name='my_scheduling_unit_blueprint') -> dict:
        return {"name": name,
                "description": "",
                "tags": [],
                "requirements_doc": {},
                "do_cancel": False,
                "draft": models.SchedulingUnitDraft.objects.create(**SchedulingUnitDraft_test_data()),
                "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplate_test_data())}
    
    def TaskBlueprint_test_data(name='my_task_blueprint', task_draft: models.TaskDraft = None) -> dict:
        if task_draft is None:
            task_draft = models.TaskDraft.objects.create(**TaskDraft_test_data())
    
        return {"name": name,
                "description": "",
                "tags": [],
                "specifications_doc": task_draft.specifications_doc,
                "do_cancel": False,
                "draft": task_draft,
                "specifications_template": task_draft.specifications_template,
                "scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**SchedulingUnitBlueprint_test_data())}
    
    def TaskRelationBlueprint_test_data(producer: models.TaskBlueprint = None, consumer: models.TaskBlueprint = None) -> dict:
        if producer is None:
            producer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
    
        if consumer is None:
            consumer = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
    
        return {"tags": [],
                "selection_doc": {},
                "dataformat": models.Dataformat.objects.get(value='Beamformed'),
                "input_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
                "output_role": models.TaskConnectorType.objects.create(**TaskConnectorType_test_data()),
                "draft": models.TaskRelationDraft.objects.create(**TaskRelationDraft_test_data()),
                "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()),
                "producer": producer,
                "consumer": consumer}
    
    
    def SubtaskTemplate_test_data(schema: object=None, version:str=None) -> dict:
        if schema is None:
            schema = {}
    
        if version is None:
            version = str(uuid.uuid4())
    
        return {"type": models.SubtaskType.objects.get(value='copy'),
                "name": "observation",
                "description": 'My one observation',
                "version": version,
                "schema": schema,
                "realtime": True,
                "queue": False,
                "tags": ["TMSS", "TESTING"]}
    
    def TaskSchedulingRelationDraft_test_data(first: models.TaskDraft = None, second: models.TaskDraft = None) -> dict:
        if first is None:
            first = models.TaskDraft.objects.create(**TaskDraft_test_data())
    
        if second is None:
            second = models.TaskDraft.objects.create(**TaskDraft_test_data())
        return {"tags": [],
                "first": first,
                "second": second,
                "placement": models.SchedulingRelationPlacement.objects.get(value='after'),
                "time_offset":60}
    
    def TaskSchedulingRelationBlueprint_test_data(first: models.TaskBlueprint = None, second: models.TaskBlueprint = None) -> dict:
        if first is None:
            first = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
    
        if second is None:
            second = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
    
        return {"tags": [],
                "first": first,
                "second": second,
                "placement": models.SchedulingRelationPlacement.objects.get(value='after'),
                "time_offset":60}
    
    def DataproductSpecificationsTemplate_test_data(version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        return {"name": "data",
                "description": 'My one date',
                "version": version,
                "schema": {"mykey": "my value"},
                "tags": ["TMSS", "TESTING"]}
    
    def DataproductFeedbackTemplate_test_data(version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())
    
        return {"name": "data",
                "description": 'My one date',
                "version": version,
                "schema": {"mykey": "my value"},
                "tags": ["TMSS", "TESTING"]}
    
    def SubtaskOutput_test_data(subtask: models.Subtask=None) -> dict:
        if subtask is None:
            subtask = models.Subtask.objects.create(**Subtask_test_data())
    
        return {"subtask": subtask,
                "tags":[]}
    
    def SubtaskInput_test_data(subtask: models.Subtask=None, producer: models.SubtaskOutput=None, selection_doc=None) -> dict:
        if subtask is None:
            subtask = models.Subtask.objects.create(**Subtask_test_data())
    
        if producer is None:
            producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data())
    
        if selection_doc is None:
            selection_doc = {}
    
        return {"subtask": subtask,
                "task_relation_blueprint": models.TaskRelationBlueprint.objects.create(**TaskRelationBlueprint_test_data()),
                "producer": producer,
                "selection_doc": selection_doc,
                "selection_template": models.TaskRelationSelectionTemplate.objects.create(**TaskRelationSelectionTemplate_test_data()),
                "tags":[]}
    
    def Subtask_test_data(task_blueprint: models.TaskBlueprint=None, subtask_template: models.SubtaskTemplate=None,
                          specifications_doc: dict=None, start_time=None, stop_time=None, cluster=None, state=None) -> dict:
    
        if task_blueprint is None:
            task_blueprint = models.TaskBlueprint.objects.create(**TaskBlueprint_test_data())
    
        if subtask_template is None:
            subtask_template = models.SubtaskTemplate.objects.create(**SubtaskTemplate_test_data())
    
        if specifications_doc is None:
            specifications_doc = get_default_json_object_for_schema(subtask_template.schema)
    
         # Type need to be a datetime object not a str so do not add .isoformat()
        if start_time is None:
            start_time = datetime.utcnow()
    
        if stop_time is None:
            stop_time = datetime.utcnow() + timedelta(minutes=10)
    
        if cluster is None:
            cluster = models.Cluster.objects.create(name="dummy cluster", location="downstairs", archive_site=True, tags=[])
    
        if state is None:
            state = models.SubtaskState.objects.get(value='defining')
    
        return { "start_time": start_time,
                 "stop_time": stop_time,
                 "state": state,
                 "specifications_doc": specifications_doc,
                 "task_blueprint": task_blueprint,
                 "specifications_template": subtask_template,
                 "tags": ["TMSS", "TESTING"],
                 "do_cancel": datetime.utcnow(),
                 "priority": 1,
                 "schedule_method": models.ScheduleMethod.objects.get(value='manual'),
                 "cluster": cluster}
    
    def Dataproduct_test_data(producer: models.SubtaskOutput=None,
                              filename: str="my_file.ext",
                              directory: str="/data/test-projects",
                              dataformat: models.Dataformat=None,
                              datatype: models.Datatype=None,
                              specifications_doc: object=None) -> dict:
    
        if producer is None:
            producer = models.SubtaskOutput.objects.create(**SubtaskOutput_test_data())
    
        if dataformat is None:
            dataformat = models.Dataformat.objects.get(value="MeasurementSet")
    
        if datatype is None:
            datatype = models.Datatype.objects.get(value="visibilities")
    
        if specifications_doc is None:
            specifications_doc={}
    
        return {"filename": filename,
                "directory": directory,
                "dataformat": dataformat,
                "datatype": datatype,
                "deleted_since": None,
                "pinned_since": None,
                "specifications_doc": specifications_doc,
                "specifications_template": models.DataproductSpecificationsTemplate.objects.create(**DataproductSpecificationsTemplate_test_data()),
                "tags": ["TMSS", "TESTING"],
                "producer": producer,
                "do_cancel": None,
                "expected_size": 1234,
                "size": 123,
                "feedback_doc": {},
                "feedback_template": models.DataproductFeedbackTemplate.objects.create(**DataproductFeedbackTemplate_test_data())}
    
    def AntennaSet_test_data() -> dict:
        return {"name": "observation",
                "description": 'My one observation',
                "station_type": models.StationType.objects.get(value='core'),
                "rcus": [1,2,3,4,5],
                "inputs": ['input1', 'input2'],
                "tags": ['tmss', 'testing']}
    
    
    def DataproductTransform_test_data() -> dict:
        return {"input": models.Dataproduct.objects.create(**Dataproduct_test_data()),
                            "output": models.Dataproduct.objects.create(**Dataproduct_test_data()),
                            "identity": True,
                            "tags": ['tmss', 'testing']}
    
    def Filesystem_test_data(directory="/") -> dict:
        return {"capacity": 1111111111,
                            "cluster": models.Cluster.objects.create(**Cluster_test_data()),
                            "directory": directory,
                            "tags": ['tmss', 'testing']}
    
    def Cluster_test_data(name="default cluster") -> dict:
        return {"name": name,
                "location": "upstairs",
                "archive_site": True,
                "tags": ['tmss', 'testing']}
    
    def DataproductArchiveInfo_test_data() -> dict:
        return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()),
                "storage_ticket": "myticket_1",
                "public_since": datetime.utcnow().isoformat(),
                "corrupted_since": datetime.utcnow().isoformat(),
                "tags": ['tmss', 'testing']}
    
    def DataproductHash_test_data() -> dict:
        return {"dataproduct": models.Dataproduct.objects.create(**Dataproduct_test_data()),
                "algorithm": models.Algorithm.objects.get(value='md5'),
                "hash": "myhash_1",
                "tags": ['tmss', 'testing']}