Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_test_data_rest.py 24.28 KiB
#!/usr/bin/env python3

# Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.

################################################################################################
# the methods below can be used to to HTTP REST calls to the django server and check the results
################################################################################################

from datetime import datetime
import uuid
import requests
import json
from lofar.common.json_utils import get_default_json_object_for_schema

class TMSSRESTTestDataCreator():
    def __init__(self, django_api_url: str, auth: requests.auth.HTTPBasicAuth):
        self.django_api_url = django_api_url
        self.auth = auth

    def get_response_as_json_object(self, url):
        """GET the given data the self.django_api_url+url_postfix, and return the response"""
        return json.loads(requests.get(url, auth=self.auth).content.decode('utf-8'))

    def post_data_and_get_response(self, data, url_postfix):
        """POST the given data the self.django_api_url+url_postfix, and return the response"""
        return requests.post(self.django_api_url + url_postfix, json=data, auth=self.auth)

    def post_data_and_get_response_as_json_object(self, data, url_postfix):
        """POST the given data the self.django_api_url+url_postfix, and return the response"""
        return json.loads(self.post_data_and_get_response(data, url_postfix).content.decode('utf-8'))

    def post_data_and_get_url(self, data, url_postfix):
        """POST the given data the self.django_api_url+url_postfix, and return the response's url"""
        result = self.post_data_and_get_response_as_json_object(data, url_postfix)
        try:
            url = result['url']
        except KeyError:
            # Because I don't like 'Bad Request' errors, I want more content if it goes wrong
            raise Exception("Error during POST request of '%s' result is '%s'" % (url_postfix, result))
        return url
   
    #######################################################
    # the methods below can be used to create test data
    # naming convention is: <django_model_name>()
    #######################################################
    
    
    def GeneratorTemplate(self, name="generatortemplate", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        return {"name": name,
                "description": 'My one observation',
                "version": version,
                "schema": {"mykey": "my value"},
                "create_function": 'Funky',
                "tags": ["TMSS", "TESTING"]}
    
    def SchedulingUnitTemplate(self, name="schedulingunittemplate1", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        return { "name": name,
                 "description": 'My description',
                 "version": version,
                 "schema": {"mykey": "my value"},
                 "tags": ["TMSS", "TESTING"]}
    
    def TaskTemplate(self, name="tasktemplate1", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        return {"name": name,
                "description": 'My one observation',
                "version": version,
                "schema": {"mykey": "my value"},
                "tags": ["TMSS", "TESTING"],
                "validation_code_js": "???"}
    
    def TaskRelationSelectionTemplate(self, name="taskrelationselectiontemplate1", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        return {"name": name,
                "description": 'My one observation',
                "version": version,
                "schema": {"mykey": "my value"},
                "tags": ["TMSS", "TESTING"]}
    
    def TaskConnectorType(self, role="correlator", input_of_url=None, output_of_url=None):
        if input_of_url is None:
            input_of_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
    
        if output_of_url is None:
            output_of_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
    
        return {"role": self.django_api_url + '/role/%s/'%role,
                "datatype": self.django_api_url + '/datatype/image/',
                "dataformats": [self.django_api_url + '/dataformat/Beamformed/'],
                "output_of": output_of_url,
                "input_of": input_of_url,
                "tags": []}
    
    def DefaultTemplates(self, name="defaulttemplate"):
        return {"name": name,
                "template": None,
                "tags": []}
    
    def Cycle(self, description="my cycle description"):
        return {"name": 'my_cycle_' + str(uuid.uuid4()),
                "description": description,
                "tags": [],
                "start": datetime.utcnow().isoformat(),
                "stop": datetime.utcnow().isoformat(),
                "number": 1,
                "standard_hours": 2,
                "expert_hours": 3,
                "filler_hours": 4,
                "projects": []}
    
    def Project(self, description="my project description"):
        return {"name": 'my_project_' + str(uuid.uuid4()),
                "description": description,
                "tags": [],
                "project_quota": [],
                "priority_rank": 1.0,
                "trigger_priority": 1000,
                "can_trigger": False,
                "private_data": True,
                "cycles": []}

    def ResourceType(self, description="my resource_type description"):
        return {
            "tags": [],
            "description": description,
            "name": 'my_resource_type_' + str(uuid.uuid4())
        }

    
    def ProjectQuota(self, description="my project quota description", project_url=None, resource_url=None):
        if project_url is None:
            project_url = self.post_data_and_get_url(self.Project(), '/project/')
        if resource_url is None:
            resource_url = self.post_data_and_get_url(self.ResourceType(), '/resource_type/')

        return {
            "value": 1000,
            "project": project_url,
            "resource_type": resource_url
            }
    

    def SchedulingSet(self, name="my_scheduling_set", project_url=None, generator_template_url=None):
        if project_url is None:
            print(self.Project())
            project_url = self.post_data_and_get_url(self.Project(), '/project/')
    
        if generator_template_url is None:
            generator_template_url = self.post_data_and_get_url(self.GeneratorTemplate(), '/generator_template/')
    
        return {"name": name,
                "description": "This is my scheduling set",
                "tags": [],
                "generator_doc": "{}",
                "project": project_url,
                "generator_template": generator_template_url,
                "generator_source": None,
                "scheduling_unit_drafts": []}
    
    def SchedulingUnitDraft(self, name="my_scheduling_unit_draft", scheduling_set_url=None, template_url=None, requirements_doc=None):
        if scheduling_set_url is None:
            scheduling_set_url = self.post_data_and_get_url(self.SchedulingSet(), '/scheduling_set/')
    
        if template_url is None:
            template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/')

        if requirements_doc is None:
            scheduling_unit_template = self.get_response_as_json_object(template_url)
            requirements_doc = get_default_json_object_for_schema(scheduling_unit_template['schema'])

        return {"name": name,
                "description": "This is my run draft",
                "tags": [],
                "requirements_doc": requirements_doc,
                "copy_reason": self.django_api_url + '/copy_reason/template/',
                "generator_instance_doc": "{}",
                "copies": None,
                "scheduling_set": scheduling_set_url,
                "requirements_template": template_url,
                "scheduling_unit_blueprints": [],
                "task_drafts": []}
    
    def TaskDraft(self, name='my_task_draft', scheduling_unit_draft_url=None, template_url=None):
        if scheduling_unit_draft_url is None:
            scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/')
    
        if template_url is None:
            template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
    
        return {"name": name,
                "description": "This is my task draft",
                "tags": [],
                "specifications_doc": "{}",
                "copy_reason": self.django_api_url + '/copy_reason/template/',
                "copies": None,
                "scheduling_unit_draft": scheduling_unit_draft_url,
                "specifications_template": template_url,
                'task_blueprints': [],
                'produced_by': [],
                'consumed_by': [],
                'first_to_connect': [],
                'second_to_connect': []}


    def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None):
        if producer_url is None:
            producer_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
    
        if consumer_url is None:
            consumer_url = self.post_data_and_get_url(self.TaskDraft(),'/task_draft/')
    
        if template_url is None:
            template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
    
        if input_role_url is None:
            input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
    
        if output_role_url is None:
            output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
    
        return {"tags": [],
                "selection_doc": "{}",
                "dataformat": self.django_api_url + "/dataformat/Beamformed/",
                "producer": producer_url,
                "consumer": consumer_url,
                "input_role": input_role_url,
                "output_role": output_role_url,
                "selection_template": template_url,
                'related_task_relation_blueprint': []}
    
    def SchedulingUnitBlueprint(self, name="my_scheduling_unit_blueprint", scheduling_unit_draft_url=None, template_url=None):
        if scheduling_unit_draft_url is None:
            scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/')
    
        if template_url is None:
            template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/')
    
        return {"name": name,
                "description": "This is my run blueprint",
                "tags": [],
                "requirements_doc": "{}",
                "do_cancel": False,
                "draft": scheduling_unit_draft_url,
                "requirements_template": template_url,
                "task_blueprints": []}
    
    def TaskBlueprint(self, name="my_TaskBlueprint", draft_url=None, template_url=None, scheduling_unit_blueprint_url=None):
        if draft_url is None:
            task_draft = self.TaskDraft()
            draft_url = self.post_data_and_get_url(task_draft, '/task_draft/')
    
        if template_url is None:
            template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
    
        if scheduling_unit_blueprint_url is None:
            scheduling_unit_blueprint_url = self.post_data_and_get_url(self.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/')
    
        return {"name": name,
                "description": "This is my work request blueprint",
                "tags": [],
                "specifications_doc": "{}",
                "do_cancel": False,
                "draft": draft_url,
                "specifications_template": template_url,
                "scheduling_unit_blueprint": scheduling_unit_blueprint_url,
                "subtasks": [],
                "produced_by": [],
                "consumed_by": [],
                'first_to_connect': [],
                'second_to_connect': []}

    def TaskRelationBlueprint(self, draft_url=None, template_url=None, input_role_url=None, output_role_url=None, consumer_url=None, producer_url=None):
        if draft_url is None:
            draft_url = self.post_data_and_get_url(self.TaskRelationDraft(), '/task_relation_draft/')
    
        if producer_url is None:
            producer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
    
        if consumer_url is None:
            consumer_url = self.post_data_and_get_url(self.TaskBlueprint(),'/task_blueprint/')
    
        if template_url is None:
            template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
    
        if input_role_url is None:
            input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
    
        if output_role_url is None:
            output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
    
        # test data
        return {"tags": [],
                "selection_doc": "{}",
                "dataformat": self.django_api_url + '/dataformat/MeasurementSet/',
                "input_role": input_role_url,
                "output_role": output_role_url,
                "draft": draft_url,
                "selection_template": template_url,
                "producer": producer_url,
                "consumer": consumer_url}
    
    def SubtaskTemplate(self, name="subtask_template_1", schema=None, subtask_type_url: str=None, version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        if schema is None:
            schema = {}

        if subtask_type_url is None:
            subtask_type_url = self.django_api_url + '/subtask_type/observation/'

        return {"type": subtask_type_url,
                       "name": name,
                       "description": 'My one observation',
                       "version": version,
                       "schema": schema,
                       "realtime": True,
                       "queue": False,
                       "tags": ["TMSS", "TESTING"]}
    
    def TaskSchedulingRelationBlueprint(self, first_url=None, second_url=None, placement="after"):
        
        if first_url is None:
            first_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
    
        if second_url is None:
            second_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')

        return {"tags": [],
                "first": first_url,
                "second": second_url,
                "placement": self.django_api_url + '/scheduling_relation_placement/%s/'%placement,
                "time_offset":60}

    def TaskSchedulingRelationDraft(self, first_url=None, second_url=None, placement="after"):
        if first_url is None:
            first_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
    
        if second_url is None:
            second_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
        return {"tags": [],
                "first": first_url,
                "second": second_url,
                "placement": self.django_api_url + '/scheduling_relation_placement/%s/'%placement,
                "time_offset":60}

    def DataproductSpecificationsTemplate(self, name="my_DataproductSpecificationsTemplate", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        return  {"name": name,
                 "description": 'My one date',
                 "version": version,
                 "schema": {"mykey": "my value"},
                 "tags": ["TMSS", "TESTING"]}
    
    def DataproductFeedbackTemplate(self, name="my_DataproductFeedbackTemplate", version:str=None) -> dict:
        if version is None:
            version = str(uuid.uuid4())

        return  {"name": name,
                 "description": 'My one date',
                 "version": version,
                 "schema": {"mykey": "my value"},
                 "tags": ["TMSS", "TESTING"]}
    
    def DefaultSubtaskTemplates(self, name=None, template_url=None):
        if template_url is None:
            template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/')
    
        return {"name": name if name else "default_template_%s" % uuid.uuid4(),
                "template": template_url,
                "tags": []}
    
    def Cluster(self, name=None):
        return {"name": name if name else "Cluster %s" % uuid.uuid4(),
                "description": 'My one cluster',
                "location": "upstairs",
                "tags": ['tmss', 'testing']}
    
    def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining"):
        if cluster_url is None:
            cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/')
    
        if task_blueprint_url is None:
            task_blueprint = self.TaskBlueprint()
            task_blueprint_url = self.post_data_and_get_url(task_blueprint, '/task_blueprint/')
    
        if specifications_template_url is None:
            specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/')

        if specifications_doc is None:
            specifications_doc = requests.get(specifications_template_url + 'default_specification/', auth=self.auth).content.decode('utf-8')

        return {"start_time": datetime.utcnow().isoformat(),
                "stop_time": datetime.utcnow().isoformat(),
                "state": self.django_api_url + '/subtask_state/%s/' % (state,),
                "specifications_doc": specifications_doc,
                "task_blueprint": task_blueprint_url,
                "specifications_template": specifications_template_url,
                "tags": ["TMSS", "TESTING"],
                "do_cancel": datetime.utcnow().isoformat(),
                "priority": 1,
                "schedule_method": self.django_api_url + '/schedule_method/manual/',
                "cluster": cluster_url}
    
    def SubtaskOutput(self, subtask_url=None):
        if subtask_url is None:
            subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/')
    

        return {"subtask": subtask_url,
                "tags": []}

    def Dataproduct(self, filename="my_filename", directory="/tmp/", specifications_template_url=None, subtask_output_url=None, dataproduct_feedback_template_url=None, dataformat="MeasurementSet"):
        if specifications_template_url is None:
            specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/dataproduct_specifications_template/')
    
        if subtask_output_url is None:
            subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/')
    
        if dataproduct_feedback_template_url is None:
            dataproduct_feedback_template_url = self.post_data_and_get_url(self.DataproductFeedbackTemplate(), '/dataproduct_feedback_template/')
    
        return {"filename": filename,
                "directory": directory,
                "dataformat": "%s/dataformat/%s/" % (self.django_api_url, dataformat),
                "deleted_since": None,
                "pinned_since": None,
                "specifications_doc": "{}",
                "specifications_template": specifications_template_url,
                "tags": ["TMSS", "TESTING"],
                "producer": subtask_output_url,
                "do_cancel": None,
                "expected_size": 1234,
                "size": 123,
                "feedback_doc": "{}",
                "feedback_template": dataproduct_feedback_template_url
                }
    
    def AntennaSet(self, name="antennaset1"):
        return {"name": name,
                "description": 'My one observation',
                "station_type": self.django_api_url + '/station_type/core/',
                "rcus": [1,2,3,4,5],
                "inputs": ['input1', 'input2'],
                "tags": ['tmss', 'testing']}
    
    def DataproductTransform(self, input_dataproduct_url=None, output_dataproduct_url=None):
        if input_dataproduct_url is None:
            input_dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
    
        if output_dataproduct_url is None:
            output_dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
    
        return {"input": input_dataproduct_url,
                "output": output_dataproduct_url,
                "identity": True,
                "tags": ['tmss', 'testing']}
    
    def DataproductHash(self, algorithm_url=None, hash="my_hash", dataproduct_url=None):
        if algorithm_url is None:
            algorithm_url = self.django_api_url + '/algorithm/md5/'
    
        if dataproduct_url is None:
            dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
    
        return {"dataproduct": dataproduct_url,
                "algorithm": algorithm_url,
                "hash": hash,
                "tags": ['tmss', 'testing']}
    
    
    def DataproductArchiveInfo(self, storage_ticket="my_storage_ticket", dataproduct_url=None):
        if dataproduct_url is None:
            dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
    
        return {"dataproduct": dataproduct_url,
                "storage_ticket": storage_ticket,
                "public_since": datetime.utcnow().isoformat(),
                "corrupted_since": datetime.utcnow().isoformat(),
                "tags": ['tmss', 'testing']}
    
    def SubtaskInput(self, subtask_url=None, task_relation_blueprint_url=None, dataproduct_urls=None, subtask_output_url=None, task_relation_selection_template_url=None):
        if subtask_url is None:
            subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/')
    
        if task_relation_blueprint_url is None:
            task_relation_blueprint_url = self.post_data_and_get_url(self.TaskRelationBlueprint(), '/task_relation_blueprint/')
    
        if dataproduct_urls is None:
            dataproduct_urls = [self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/'),
                                self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')]

        if subtask_output_url is None:
            subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/')
    
        if task_relation_selection_template_url is None:
            task_relation_selection_template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
    
        return {"subtask": subtask_url,
                "task_relation_blueprint": task_relation_blueprint_url,
                "producer": subtask_output_url,
                "dataproducts": dataproduct_urls,
                "selection_doc": {},
                "selection_template": task_relation_selection_template_url,
                "tags": []}
    
    def Filesystem(self, name="my_Filesystem", cluster_url=None):
        if cluster_url is None:
            cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/')
    
        return {"name": name,
                "description": 'My one filesystem',
                "capacity": 1111111111,
                "cluster": cluster_url,
                "tags": ['tmss', 'testing']}