-
Jörn Künsemöller authored
TMSS-241: remove units from resource models (which should always be in SI/base units on the DB level)
Jörn Künsemöller authoredTMSS-241: remove units from resource models (which should always be in SI/base units on the DB level)
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_test_data_rest.py 24.28 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
################################################################################################
# the methods below can be used to to HTTP REST calls to the django server and check the results
################################################################################################
from datetime import datetime
import uuid
import requests
import json
from lofar.common.json_utils import get_default_json_object_for_schema
class TMSSRESTTestDataCreator():
def __init__(self, django_api_url: str, auth: requests.auth.HTTPBasicAuth):
self.django_api_url = django_api_url
self.auth = auth
def get_response_as_json_object(self, url):
"""GET the given data the self.django_api_url+url_postfix, and return the response"""
return json.loads(requests.get(url, auth=self.auth).content.decode('utf-8'))
def post_data_and_get_response(self, data, url_postfix):
"""POST the given data the self.django_api_url+url_postfix, and return the response"""
return requests.post(self.django_api_url + url_postfix, json=data, auth=self.auth)
def post_data_and_get_response_as_json_object(self, data, url_postfix):
"""POST the given data the self.django_api_url+url_postfix, and return the response"""
return json.loads(self.post_data_and_get_response(data, url_postfix).content.decode('utf-8'))
def post_data_and_get_url(self, data, url_postfix):
"""POST the given data the self.django_api_url+url_postfix, and return the response's url"""
result = self.post_data_and_get_response_as_json_object(data, url_postfix)
try:
url = result['url']
except KeyError:
# Because I don't like 'Bad Request' errors, I want more content if it goes wrong
raise Exception("Error during POST request of '%s' result is '%s'" % (url_postfix, result))
return url
#######################################################
# the methods below can be used to create test data
# naming convention is: <django_model_name>()
#######################################################
def GeneratorTemplate(self, name="generatortemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My one observation',
"version": version,
"schema": {"mykey": "my value"},
"create_function": 'Funky',
"tags": ["TMSS", "TESTING"]}
def SchedulingUnitTemplate(self, name="schedulingunittemplate1", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return { "name": name,
"description": 'My description',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def TaskTemplate(self, name="tasktemplate1", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My one observation',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"],
"validation_code_js": "???"}
def TaskRelationSelectionTemplate(self, name="taskrelationselectiontemplate1", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My one observation',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def TaskConnectorType(self, role="correlator", input_of_url=None, output_of_url=None):
if input_of_url is None:
input_of_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
if output_of_url is None:
output_of_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
return {"role": self.django_api_url + '/role/%s/'%role,
"datatype": self.django_api_url + '/datatype/image/',
"dataformats": [self.django_api_url + '/dataformat/Beamformed/'],
"output_of": output_of_url,
"input_of": input_of_url,
"tags": []}
def DefaultTemplates(self, name="defaulttemplate"):
return {"name": name,
"template": None,
"tags": []}
def Cycle(self, description="my cycle description"):
return {"name": 'my_cycle_' + str(uuid.uuid4()),
"description": description,
"tags": [],
"start": datetime.utcnow().isoformat(),
"stop": datetime.utcnow().isoformat(),
"number": 1,
"standard_hours": 2,
"expert_hours": 3,
"filler_hours": 4,
"projects": []}
def Project(self, description="my project description"):
return {"name": 'my_project_' + str(uuid.uuid4()),
"description": description,
"tags": [],
"project_quota": [],
"priority_rank": 1.0,
"trigger_priority": 1000,
"can_trigger": False,
"private_data": True,
"cycles": []}
def ResourceType(self, description="my resource_type description"):
return {
"tags": [],
"description": description,
"name": 'my_resource_type_' + str(uuid.uuid4())
}
def ProjectQuota(self, description="my project quota description", project_url=None, resource_url=None):
if project_url is None:
project_url = self.post_data_and_get_url(self.Project(), '/project/')
if resource_url is None:
resource_url = self.post_data_and_get_url(self.ResourceType(), '/resource_type/')
return {
"value": 1000,
"project": project_url,
"resource_type": resource_url
}
def SchedulingSet(self, name="my_scheduling_set", project_url=None, generator_template_url=None):
if project_url is None:
print(self.Project())
project_url = self.post_data_and_get_url(self.Project(), '/project/')
if generator_template_url is None:
generator_template_url = self.post_data_and_get_url(self.GeneratorTemplate(), '/generator_template/')
return {"name": name,
"description": "This is my scheduling set",
"tags": [],
"generator_doc": "{}",
"project": project_url,
"generator_template": generator_template_url,
"generator_source": None,
"scheduling_unit_drafts": []}
def SchedulingUnitDraft(self, name="my_scheduling_unit_draft", scheduling_set_url=None, template_url=None, requirements_doc=None):
if scheduling_set_url is None:
scheduling_set_url = self.post_data_and_get_url(self.SchedulingSet(), '/scheduling_set/')
if template_url is None:
template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/')
if requirements_doc is None:
scheduling_unit_template = self.get_response_as_json_object(template_url)
requirements_doc = get_default_json_object_for_schema(scheduling_unit_template['schema'])
return {"name": name,
"description": "This is my run draft",
"tags": [],
"requirements_doc": requirements_doc,
"copy_reason": self.django_api_url + '/copy_reason/template/',
"generator_instance_doc": "{}",
"copies": None,
"scheduling_set": scheduling_set_url,
"requirements_template": template_url,
"scheduling_unit_blueprints": [],
"task_drafts": []}
def TaskDraft(self, name='my_task_draft', scheduling_unit_draft_url=None, template_url=None):
if scheduling_unit_draft_url is None:
scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/')
if template_url is None:
template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
return {"name": name,
"description": "This is my task draft",
"tags": [],
"specifications_doc": "{}",
"copy_reason": self.django_api_url + '/copy_reason/template/',
"copies": None,
"scheduling_unit_draft": scheduling_unit_draft_url,
"specifications_template": template_url,
'task_blueprints': [],
'produced_by': [],
'consumed_by': [],
'first_to_connect': [],
'second_to_connect': []}
def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None):
if producer_url is None:
producer_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
if consumer_url is None:
consumer_url = self.post_data_and_get_url(self.TaskDraft(),'/task_draft/')
if template_url is None:
template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
if input_role_url is None:
input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
if output_role_url is None:
output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
return {"tags": [],
"selection_doc": "{}",
"dataformat": self.django_api_url + "/dataformat/Beamformed/",
"producer": producer_url,
"consumer": consumer_url,
"input_role": input_role_url,
"output_role": output_role_url,
"selection_template": template_url,
'related_task_relation_blueprint': []}
def SchedulingUnitBlueprint(self, name="my_scheduling_unit_blueprint", scheduling_unit_draft_url=None, template_url=None):
if scheduling_unit_draft_url is None:
scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/')
if template_url is None:
template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/')
return {"name": name,
"description": "This is my run blueprint",
"tags": [],
"requirements_doc": "{}",
"do_cancel": False,
"draft": scheduling_unit_draft_url,
"requirements_template": template_url,
"task_blueprints": []}
def TaskBlueprint(self, name="my_TaskBlueprint", draft_url=None, template_url=None, scheduling_unit_blueprint_url=None):
if draft_url is None:
task_draft = self.TaskDraft()
draft_url = self.post_data_and_get_url(task_draft, '/task_draft/')
if template_url is None:
template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
if scheduling_unit_blueprint_url is None:
scheduling_unit_blueprint_url = self.post_data_and_get_url(self.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/')
return {"name": name,
"description": "This is my work request blueprint",
"tags": [],
"specifications_doc": "{}",
"do_cancel": False,
"draft": draft_url,
"specifications_template": template_url,
"scheduling_unit_blueprint": scheduling_unit_blueprint_url,
"subtasks": [],
"produced_by": [],
"consumed_by": [],
'first_to_connect': [],
'second_to_connect': []}
def TaskRelationBlueprint(self, draft_url=None, template_url=None, input_role_url=None, output_role_url=None, consumer_url=None, producer_url=None):
if draft_url is None:
draft_url = self.post_data_and_get_url(self.TaskRelationDraft(), '/task_relation_draft/')
if producer_url is None:
producer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
if consumer_url is None:
consumer_url = self.post_data_and_get_url(self.TaskBlueprint(),'/task_blueprint/')
if template_url is None:
template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
if input_role_url is None:
input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
if output_role_url is None:
output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/')
# test data
return {"tags": [],
"selection_doc": "{}",
"dataformat": self.django_api_url + '/dataformat/MeasurementSet/',
"input_role": input_role_url,
"output_role": output_role_url,
"draft": draft_url,
"selection_template": template_url,
"producer": producer_url,
"consumer": consumer_url}
def SubtaskTemplate(self, name="subtask_template_1", schema=None, subtask_type_url: str=None, version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
if schema is None:
schema = {}
if subtask_type_url is None:
subtask_type_url = self.django_api_url + '/subtask_type/observation/'
return {"type": subtask_type_url,
"name": name,
"description": 'My one observation',
"version": version,
"schema": schema,
"realtime": True,
"queue": False,
"tags": ["TMSS", "TESTING"]}
def TaskSchedulingRelationBlueprint(self, first_url=None, second_url=None, placement="after"):
if first_url is None:
first_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
if second_url is None:
second_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
return {"tags": [],
"first": first_url,
"second": second_url,
"placement": self.django_api_url + '/scheduling_relation_placement/%s/'%placement,
"time_offset":60}
def TaskSchedulingRelationDraft(self, first_url=None, second_url=None, placement="after"):
if first_url is None:
first_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
if second_url is None:
second_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
return {"tags": [],
"first": first_url,
"second": second_url,
"placement": self.django_api_url + '/scheduling_relation_placement/%s/'%placement,
"time_offset":60}
def DataproductSpecificationsTemplate(self, name="my_DataproductSpecificationsTemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My one date',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def DataproductFeedbackTemplate(self, name="my_DataproductFeedbackTemplate", version:str=None) -> dict:
if version is None:
version = str(uuid.uuid4())
return {"name": name,
"description": 'My one date',
"version": version,
"schema": {"mykey": "my value"},
"tags": ["TMSS", "TESTING"]}
def DefaultSubtaskTemplates(self, name=None, template_url=None):
if template_url is None:
template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/')
return {"name": name if name else "default_template_%s" % uuid.uuid4(),
"template": template_url,
"tags": []}
def Cluster(self, name=None):
return {"name": name if name else "Cluster %s" % uuid.uuid4(),
"description": 'My one cluster',
"location": "upstairs",
"tags": ['tmss', 'testing']}
def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining"):
if cluster_url is None:
cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/')
if task_blueprint_url is None:
task_blueprint = self.TaskBlueprint()
task_blueprint_url = self.post_data_and_get_url(task_blueprint, '/task_blueprint/')
if specifications_template_url is None:
specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/')
if specifications_doc is None:
specifications_doc = requests.get(specifications_template_url + 'default_specification/', auth=self.auth).content.decode('utf-8')
return {"start_time": datetime.utcnow().isoformat(),
"stop_time": datetime.utcnow().isoformat(),
"state": self.django_api_url + '/subtask_state/%s/' % (state,),
"specifications_doc": specifications_doc,
"task_blueprint": task_blueprint_url,
"specifications_template": specifications_template_url,
"tags": ["TMSS", "TESTING"],
"do_cancel": datetime.utcnow().isoformat(),
"priority": 1,
"schedule_method": self.django_api_url + '/schedule_method/manual/',
"cluster": cluster_url}
def SubtaskOutput(self, subtask_url=None):
if subtask_url is None:
subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/')
return {"subtask": subtask_url,
"tags": []}
def Dataproduct(self, filename="my_filename", directory="/tmp/", specifications_template_url=None, subtask_output_url=None, dataproduct_feedback_template_url=None, dataformat="MeasurementSet"):
if specifications_template_url is None:
specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/dataproduct_specifications_template/')
if subtask_output_url is None:
subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/')
if dataproduct_feedback_template_url is None:
dataproduct_feedback_template_url = self.post_data_and_get_url(self.DataproductFeedbackTemplate(), '/dataproduct_feedback_template/')
return {"filename": filename,
"directory": directory,
"dataformat": "%s/dataformat/%s/" % (self.django_api_url, dataformat),
"deleted_since": None,
"pinned_since": None,
"specifications_doc": "{}",
"specifications_template": specifications_template_url,
"tags": ["TMSS", "TESTING"],
"producer": subtask_output_url,
"do_cancel": None,
"expected_size": 1234,
"size": 123,
"feedback_doc": "{}",
"feedback_template": dataproduct_feedback_template_url
}
def AntennaSet(self, name="antennaset1"):
return {"name": name,
"description": 'My one observation',
"station_type": self.django_api_url + '/station_type/core/',
"rcus": [1,2,3,4,5],
"inputs": ['input1', 'input2'],
"tags": ['tmss', 'testing']}
def DataproductTransform(self, input_dataproduct_url=None, output_dataproduct_url=None):
if input_dataproduct_url is None:
input_dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
if output_dataproduct_url is None:
output_dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
return {"input": input_dataproduct_url,
"output": output_dataproduct_url,
"identity": True,
"tags": ['tmss', 'testing']}
def DataproductHash(self, algorithm_url=None, hash="my_hash", dataproduct_url=None):
if algorithm_url is None:
algorithm_url = self.django_api_url + '/algorithm/md5/'
if dataproduct_url is None:
dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
return {"dataproduct": dataproduct_url,
"algorithm": algorithm_url,
"hash": hash,
"tags": ['tmss', 'testing']}
def DataproductArchiveInfo(self, storage_ticket="my_storage_ticket", dataproduct_url=None):
if dataproduct_url is None:
dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
return {"dataproduct": dataproduct_url,
"storage_ticket": storage_ticket,
"public_since": datetime.utcnow().isoformat(),
"corrupted_since": datetime.utcnow().isoformat(),
"tags": ['tmss', 'testing']}
def SubtaskInput(self, subtask_url=None, task_relation_blueprint_url=None, dataproduct_urls=None, subtask_output_url=None, task_relation_selection_template_url=None):
if subtask_url is None:
subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/')
if task_relation_blueprint_url is None:
task_relation_blueprint_url = self.post_data_and_get_url(self.TaskRelationBlueprint(), '/task_relation_blueprint/')
if dataproduct_urls is None:
dataproduct_urls = [self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/'),
self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')]
if subtask_output_url is None:
subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/')
if task_relation_selection_template_url is None:
task_relation_selection_template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
return {"subtask": subtask_url,
"task_relation_blueprint": task_relation_blueprint_url,
"producer": subtask_output_url,
"dataproducts": dataproduct_urls,
"selection_doc": {},
"selection_template": task_relation_selection_template_url,
"tags": []}
def Filesystem(self, name="my_Filesystem", cluster_url=None):
if cluster_url is None:
cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/')
return {"name": name,
"description": 'My one filesystem',
"capacity": 1111111111,
"cluster": cluster_url,
"tags": ['tmss', 'testing']}