#!/usr/bin/env python3 # Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. ################################################################################################ # the methods below can be used to to HTTP REST calls to the django server and check the results ################################################################################################ from datetime import datetime import uuid import requests import json from lofar.common.json_utils import get_default_json_object_for_schema class TMSSRESTTestDataCreator(): def __init__(self, django_api_url: str, auth: requests.auth.HTTPBasicAuth): self.django_api_url = django_api_url self.auth = auth def get_response_as_json_object(self, url): """GET the given data the self.django_api_url+url_postfix, and return the response""" return json.loads(requests.get(url, auth=self.auth).content.decode('utf-8')) def post_data_and_get_response(self, data, url_postfix): """POST the given data the self.django_api_url+url_postfix, and return the response""" return requests.post(self.django_api_url + url_postfix, json=data, auth=self.auth) def post_data_and_get_response_as_json_object(self, data, url_postfix): """POST the given data the self.django_api_url+url_postfix, and return the response""" return json.loads(self.post_data_and_get_response(data, url_postfix).content.decode('utf-8')) def post_data_and_get_url(self, data, url_postfix): """POST the given data the self.django_api_url+url_postfix, and return the response's url""" result = self.post_data_and_get_response_as_json_object(data, url_postfix) try: url = result['url'] except KeyError: # Because I don't like 'Bad Request' errors, I want more content if it goes wrong raise Exception("Error during POST request of '%s' result is '%s'" % (url_postfix, result)) return url ####################################################### # the methods below can be used to create test data # naming convention is: <django_model_name>() ####################################################### def GeneratorTemplate(self, name="generatortemplate", version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) return {"name": name, "description": 'My one observation', "version": version, "schema": {"mykey": "my value"}, "create_function": 'Funky', "tags": ["TMSS", "TESTING"]} def SchedulingUnitTemplate(self, name="schedulingunittemplate1", version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) return { "name": name, "description": 'My description', "version": version, "schema": {"mykey": "my value"}, "tags": ["TMSS", "TESTING"]} def TaskTemplate(self, name="tasktemplate1", version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) return {"name": name, "description": 'My one observation', "version": version, "schema": {"mykey": "my value"}, "tags": ["TMSS", "TESTING"], "validation_code_js": "???"} def TaskRelationSelectionTemplate(self, name="taskrelationselectiontemplate1", version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) return {"name": name, "description": 'My one observation', "version": version, "schema": {"mykey": "my value"}, "tags": ["TMSS", "TESTING"]} def TaskConnectorType(self, role="correlator", input_of_url=None, output_of_url=None): if input_of_url is None: input_of_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/') if output_of_url is None: output_of_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/') return {"role": self.django_api_url + '/role/%s/'%role, "datatype": self.django_api_url + '/datatype/image/', "dataformats": [self.django_api_url + '/dataformat/Beamformed/'], "output_of": output_of_url, "input_of": input_of_url, "tags": []} def DefaultTemplates(self, name="defaulttemplate"): return {"name": name, "template": None, "tags": []} def Cycle(self, description="my cycle description"): return {"name": 'my_cycle_' + str(uuid.uuid4()), "description": description, "tags": [], "start": datetime.utcnow().isoformat(), "stop": datetime.utcnow().isoformat(), "number": 1, "standard_hours": 2, "expert_hours": 3, "filler_hours": 4, "projects": []} def Project(self, description="my project description"): return {"name": 'my_project_' + str(uuid.uuid4()), "description": description, "tags": [], "project_quota": [], "priority_rank": 1.0, "trigger_priority": 1000, "can_trigger": False, "private_data": True, "cycles": []} def ResourceType(self, description="my resource_type description"): return { "tags": [], "description": description, "name": 'my_resource_type_' + str(uuid.uuid4()) } def ProjectQuota(self, description="my project quota description", project_url=None, resource_url=None): if project_url is None: project_url = self.post_data_and_get_url(self.Project(), '/project/') if resource_url is None: resource_url = self.post_data_and_get_url(self.ResourceType(), '/resource_type/') return { "value": 1000, "project": project_url, "resource_type": resource_url } def SchedulingSet(self, name="my_scheduling_set", project_url=None, generator_template_url=None): if project_url is None: print(self.Project()) project_url = self.post_data_and_get_url(self.Project(), '/project/') if generator_template_url is None: generator_template_url = self.post_data_and_get_url(self.GeneratorTemplate(), '/generator_template/') return {"name": name, "description": "This is my scheduling set", "tags": [], "generator_doc": "{}", "project": project_url, "generator_template": generator_template_url, "generator_source": None, "scheduling_unit_drafts": []} def SchedulingUnitDraft(self, name="my_scheduling_unit_draft", scheduling_set_url=None, template_url=None, requirements_doc=None): if scheduling_set_url is None: scheduling_set_url = self.post_data_and_get_url(self.SchedulingSet(), '/scheduling_set/') if template_url is None: template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/') if requirements_doc is None: scheduling_unit_template = self.get_response_as_json_object(template_url) requirements_doc = get_default_json_object_for_schema(scheduling_unit_template['schema']) return {"name": name, "description": "This is my run draft", "tags": [], "requirements_doc": requirements_doc, "copy_reason": self.django_api_url + '/copy_reason/template/', "generator_instance_doc": "{}", "copies": None, "scheduling_set": scheduling_set_url, "requirements_template": template_url, "scheduling_unit_blueprints": [], "task_drafts": []} def TaskDraft(self, name='my_task_draft', scheduling_unit_draft_url=None, template_url=None): if scheduling_unit_draft_url is None: scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/') if template_url is None: template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/') return {"name": name, "description": "This is my task draft", "tags": [], "specifications_doc": "{}", "copy_reason": self.django_api_url + '/copy_reason/template/', "copies": None, "scheduling_unit_draft": scheduling_unit_draft_url, "specifications_template": template_url, 'task_blueprints': [], 'produced_by': [], 'consumed_by': [], 'first_to_connect': [], 'second_to_connect': []} def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None): if producer_url is None: producer_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/') if consumer_url is None: consumer_url = self.post_data_and_get_url(self.TaskDraft(),'/task_draft/') if template_url is None: template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/') if input_role_url is None: input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') if output_role_url is None: output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') return {"tags": [], "selection_doc": "{}", "dataformat": self.django_api_url + "/dataformat/Beamformed/", "producer": producer_url, "consumer": consumer_url, "input_role": input_role_url, "output_role": output_role_url, "selection_template": template_url, 'related_task_relation_blueprint': []} def SchedulingUnitBlueprint(self, name="my_scheduling_unit_blueprint", scheduling_unit_draft_url=None, template_url=None): if scheduling_unit_draft_url is None: scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/') if template_url is None: template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/') return {"name": name, "description": "This is my run blueprint", "tags": [], "requirements_doc": "{}", "do_cancel": False, "draft": scheduling_unit_draft_url, "requirements_template": template_url, "task_blueprints": []} def TaskBlueprint(self, name="my_TaskBlueprint", draft_url=None, template_url=None, scheduling_unit_blueprint_url=None): if draft_url is None: task_draft = self.TaskDraft() draft_url = self.post_data_and_get_url(task_draft, '/task_draft/') if template_url is None: template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/') if scheduling_unit_blueprint_url is None: scheduling_unit_blueprint_url = self.post_data_and_get_url(self.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/') return {"name": name, "description": "This is my work request blueprint", "tags": [], "specifications_doc": "{}", "do_cancel": False, "draft": draft_url, "specifications_template": template_url, "scheduling_unit_blueprint": scheduling_unit_blueprint_url, "subtasks": [], "produced_by": [], "consumed_by": [], 'first_to_connect': [], 'second_to_connect': []} def TaskRelationBlueprint(self, draft_url=None, template_url=None, input_role_url=None, output_role_url=None, consumer_url=None, producer_url=None): if draft_url is None: draft_url = self.post_data_and_get_url(self.TaskRelationDraft(), '/task_relation_draft/') if producer_url is None: producer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/') if consumer_url is None: consumer_url = self.post_data_and_get_url(self.TaskBlueprint(),'/task_blueprint/') if template_url is None: template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/') if input_role_url is None: input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') if output_role_url is None: output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') # test data return {"tags": [], "selection_doc": "{}", "dataformat": self.django_api_url + '/dataformat/MeasurementSet/', "input_role": input_role_url, "output_role": output_role_url, "draft": draft_url, "selection_template": template_url, "producer": producer_url, "consumer": consumer_url} def SubtaskTemplate(self, name="subtask_template_1", schema=None, subtask_type_url: str=None, version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) if schema is None: schema = {} if subtask_type_url is None: subtask_type_url = self.django_api_url + '/subtask_type/observation/' return {"type": subtask_type_url, "name": name, "description": 'My one observation', "version": version, "schema": schema, "realtime": True, "queue": False, "tags": ["TMSS", "TESTING"]} def TaskSchedulingRelationBlueprint(self, first_url=None, second_url=None, placement="after"): if first_url is None: first_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/') if second_url is None: second_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/') return {"tags": [], "first": first_url, "second": second_url, "placement": self.django_api_url + '/scheduling_relation_placement/%s/'%placement, "time_offset":60} def TaskSchedulingRelationDraft(self, first_url=None, second_url=None, placement="after"): if first_url is None: first_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/') if second_url is None: second_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/') return {"tags": [], "first": first_url, "second": second_url, "placement": self.django_api_url + '/scheduling_relation_placement/%s/'%placement, "time_offset":60} def DataproductSpecificationsTemplate(self, name="my_DataproductSpecificationsTemplate", version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) return {"name": name, "description": 'My one date', "version": version, "schema": {"mykey": "my value"}, "tags": ["TMSS", "TESTING"]} def DataproductFeedbackTemplate(self, name="my_DataproductFeedbackTemplate", version:str=None) -> dict: if version is None: version = str(uuid.uuid4()) return {"name": name, "description": 'My one date', "version": version, "schema": {"mykey": "my value"}, "tags": ["TMSS", "TESTING"]} def DefaultSubtaskTemplates(self, name=None, template_url=None): if template_url is None: template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/') return {"name": name if name else "default_template_%s" % uuid.uuid4(), "template": template_url, "tags": []} def Cluster(self, name=None): return {"name": name if name else "Cluster %s" % uuid.uuid4(), "description": 'My one cluster', "location": "upstairs", "tags": ['tmss', 'testing']} def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining"): if cluster_url is None: cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/') if task_blueprint_url is None: task_blueprint = self.TaskBlueprint() task_blueprint_url = self.post_data_and_get_url(task_blueprint, '/task_blueprint/') if specifications_template_url is None: specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/') if specifications_doc is None: specifications_doc = requests.get(specifications_template_url + 'default_specification/', auth=self.auth).content.decode('utf-8') return {"start_time": datetime.utcnow().isoformat(), "stop_time": datetime.utcnow().isoformat(), "state": self.django_api_url + '/subtask_state/%s/' % (state,), "specifications_doc": specifications_doc, "task_blueprint": task_blueprint_url, "specifications_template": specifications_template_url, "tags": ["TMSS", "TESTING"], "do_cancel": datetime.utcnow().isoformat(), "priority": 1, "schedule_method": self.django_api_url + '/schedule_method/manual/', "cluster": cluster_url} def SubtaskOutput(self, subtask_url=None): if subtask_url is None: subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/') return {"subtask": subtask_url, "tags": []} def Dataproduct(self, filename="my_filename", directory="/tmp/", specifications_template_url=None, subtask_output_url=None, dataproduct_feedback_template_url=None, dataformat="MeasurementSet"): if specifications_template_url is None: specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/dataproduct_specifications_template/') if subtask_output_url is None: subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/') if dataproduct_feedback_template_url is None: dataproduct_feedback_template_url = self.post_data_and_get_url(self.DataproductFeedbackTemplate(), '/dataproduct_feedback_template/') return {"filename": filename, "directory": directory, "dataformat": "%s/dataformat/%s/" % (self.django_api_url, dataformat), "deleted_since": None, "pinned_since": None, "specifications_doc": "{}", "specifications_template": specifications_template_url, "tags": ["TMSS", "TESTING"], "producer": subtask_output_url, "do_cancel": None, "expected_size": 1234, "size": 123, "feedback_doc": "{}", "feedback_template": dataproduct_feedback_template_url } def AntennaSet(self, name="antennaset1"): return {"name": name, "description": 'My one observation', "station_type": self.django_api_url + '/station_type/core/', "rcus": [1,2,3,4,5], "inputs": ['input1', 'input2'], "tags": ['tmss', 'testing']} def DataproductTransform(self, input_dataproduct_url=None, output_dataproduct_url=None): if input_dataproduct_url is None: input_dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/') if output_dataproduct_url is None: output_dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/') return {"input": input_dataproduct_url, "output": output_dataproduct_url, "identity": True, "tags": ['tmss', 'testing']} def DataproductHash(self, algorithm_url=None, hash="my_hash", dataproduct_url=None): if algorithm_url is None: algorithm_url = self.django_api_url + '/algorithm/md5/' if dataproduct_url is None: dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/') return {"dataproduct": dataproduct_url, "algorithm": algorithm_url, "hash": hash, "tags": ['tmss', 'testing']} def DataproductArchiveInfo(self, storage_ticket="my_storage_ticket", dataproduct_url=None): if dataproduct_url is None: dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/') return {"dataproduct": dataproduct_url, "storage_ticket": storage_ticket, "public_since": datetime.utcnow().isoformat(), "corrupted_since": datetime.utcnow().isoformat(), "tags": ['tmss', 'testing']} def SubtaskInput(self, subtask_url=None, task_relation_blueprint_url=None, dataproduct_urls=None, subtask_output_url=None, task_relation_selection_template_url=None): if subtask_url is None: subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/') if task_relation_blueprint_url is None: task_relation_blueprint_url = self.post_data_and_get_url(self.TaskRelationBlueprint(), '/task_relation_blueprint/') if dataproduct_urls is None: dataproduct_urls = [self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/'), self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')] if subtask_output_url is None: subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/') if task_relation_selection_template_url is None: task_relation_selection_template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/') return {"subtask": subtask_url, "task_relation_blueprint": task_relation_blueprint_url, "producer": subtask_output_url, "dataproducts": dataproduct_urls, "selection_doc": {}, "selection_template": task_relation_selection_template_url, "tags": []} def Filesystem(self, name="my_Filesystem", cluster_url=None): if cluster_url is None: cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/') return {"name": name, "description": 'My one filesystem', "capacity": 1111111111, "cluster": cluster_url, "tags": ['tmss', 'testing']}