Select Git revision
tmss_test_data_rest.py

TMSS-1093: removed usage of obsolete model properties
Jorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tmss_test_data_rest.py 37.34 KiB
#!/usr/bin/env python3
# Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
################################################################################################
# the methods below can be used to to HTTP REST calls to the django server and check the results
################################################################################################
from datetime import datetime, timedelta
import uuid
import requests
import json
from http import HTTPStatus
from copy import deepcopy
from lofar.sas.tmss.test.test_utils import minimal_json_schema
class TMSSRESTTestDataCreator():
def __init__(self, django_api_url: str, auth: requests.auth.HTTPBasicAuth):
self.django_api_url = django_api_url[:-1] if django_api_url.endswith('/') else django_api_url
self.auth = auth
def get_response_as_json_object(self, url):
"""GET the given data the self.django_api_url+url_postfix, and return the response"""
return json.loads(requests.get(url, auth=self.auth).content.decode('utf-8'))
def post_data_and_get_response(self, data, url_postfix):
"""POST the given data the self.django_api_url+url_postfix, and return the response"""
return requests.post(self.django_api_url + url_postfix, json=data, auth=self.auth)
def post_data_and_get_response_as_json_object(self, data, url_postfix):
"""POST the given data the self.django_api_url+url_postfix, and return the response"""
response = self.post_data_and_get_response(data, url_postfix)
if response.status_code == HTTPStatus.CREATED:
return json.loads(response.content.decode('utf-8'))
raise Exception("Error during POST request of '%s' status=%s content: %s" % (url_postfix, response.status_code, response.content.decode('utf-8')))
def post_data_and_get_url(self, data, url_postfix):
"""POST the given data the self.django_api_url+url_postfix, and return the response's url"""
result = self.post_data_and_get_response_as_json_object(data, url_postfix)
try:
url = result['url']
except KeyError:
# Because I don't like 'Bad Request' errors, I want more content if it goes wrong
raise Exception("Error during POST request of '%s' result is '%s'" % (url_postfix, result))
return url
def update_schema_from_template(self, model_name:str, template_test_data:dict) -> dict:
'''helper method to update the schema subdict in the template_test_data dict with the auto-injected-by-the-backend-properties if needed'''
updated_test_data = deepcopy(template_test_data)
if 'schema' in updated_test_data:
if 'name' in template_test_data and 'version' in template_test_data:
updated_test_data['schema']['$id'] = "%s/schemas/%s/%s/%s#" % (self.django_api_url,
model_name,
template_test_data['name'],
template_test_data['version'])
else:
updated_test_data['schema'].pop('$id','')
if 'name' in template_test_data:
updated_test_data['schema']['title'] = template_test_data['name']
else:
updated_test_data['schema'].pop('title','')
if 'description' in template_test_data:
updated_test_data['schema']['description'] = template_test_data['description']
else:
updated_test_data['schema'].pop('description','')
if 'version' in template_test_data:
updated_test_data['schema']['version'] = template_test_data['version']
else:
updated_test_data['schema'].pop('version','')
return updated_test_data
def update_document_from_template(self, model_name:str, data:dict, document_key:str, template_key:str) -> dict:
updated_data = deepcopy(data)
updated_data[document_key] = self.update_schema_from_template(model_name, updated_data[document_key])
return updated_data
#######################################################
# the methods below can be used to create test data
# naming convention is: <django_model_name>()
#######################################################
def SchedulingUnitTemplate(self, name="schedulingunittemplate1", schema:dict=None) -> dict:
if schema is None:
schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}})
return { "name": name,
"description": 'My description',
"schema": schema,
"tags": ["TMSS", "TESTING"]}
@property
def cached_scheduling_unit_template_url(self):
try:
return self._scheduling_unit_template_url
except AttributeError:
self._scheduling_unit_template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/')
return self._scheduling_unit_template_url
def SchedulingConstraintsTemplate(self, name="schedulingcontraintstemplate1", schema:dict=None) -> dict:
if schema is None:
schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}})
return { "name": name,
"description": 'My description',
"schema": schema,
"tags": ["TMSS", "TESTING"]}
def ReservationTemplate(self, name="reservationtemplate1", schema:dict=None) -> dict:
if schema is None:
schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}})
return { "name": name,
"description": 'My description',
"schema": schema,
"tags": ["TMSS", "TESTING"]}
@property
def cached_reservation_template_url(self):
try:
return self._reservation_template_url
except AttributeError:
self._reservation_template_url = self.post_data_and_get_url(self.ReservationTemplate(), '/reservation_template/')
return self._reservation_template_url
def ReservationStrategyTemplate(self, name="my_ReservationStrategyTemplate",
reservation_template_url=None,
template:dict=None) -> dict:
if reservation_template_url is None:
reservation_template_url = self.cached_reservation_template_url
if template is None:
template = self.get_response_as_json_object(reservation_template_url+'/default')
return {"name": name,
"description": 'My ReservationTemplate description',
"template": template,
"reservation_template": reservation_template_url,
"version": "1",
"tags": ["TMSS", "TESTING"]}
def SchedulingUnitObservingStrategyTemplate(self, name="my_SchedulingUnitObservingStrategyTemplate",
scheduling_unit_template_url=None,
template:dict=None) -> dict:
if scheduling_unit_template_url is None:
scheduling_unit_template_url = self.cached_scheduling_unit_template_url
if template is None:
template = self.get_response_as_json_object(scheduling_unit_template_url+'/default')
return {"name": name,
"description": 'My SchedulingUnitTemplate description',
"template": template,
"scheduling_unit_template": scheduling_unit_template_url,
"tags": ["TMSS", "TESTING"]}
def TaskTemplate(self, name="tasktemplate1", task_type_url: str = None) -> dict:
if task_type_url is None:
task_type_url = self.django_api_url + '/task_type/observation'
return {"name": name,
"description": 'My one observation',
"schema": minimal_json_schema(),
"tags": ["TMSS", "TESTING"],
"type": task_type_url}
@property
def cached_task_template_url(self):
try:
return self._task_template_url
except AttributeError:
self._task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
return self._task_template_url
def TaskRelationSelectionTemplate(self, name="taskrelationselectiontemplate1") -> dict:
return {"name": name,
"description": 'My one observation',
"schema": minimal_json_schema(),
"tags": ["TMSS", "TESTING"]}
@property
def cached_task_relation_selection_template_url(self):
try:
return self._task_relation_selection_template_url
except AttributeError:
self._task_relation_selection_template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
return self._task_relation_selection_template_url
def TaskConnectorType(self, role="correlator", iotype="output", task_template_url=None):
if task_template_url is None:
task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
return {"role": self.django_api_url + '/role/%s'%role,
"datatype": self.django_api_url + '/datatype/image',
"dataformat": self.django_api_url + '/dataformat/Beamformed',
"task_template": task_template_url,
"iotype": self.django_api_url + '/iotype/%s'%iotype}
def Cycle(self, description="my cycle description"):
return {"name": 'my_cycle_' + str(uuid.uuid4()),
"description": description,
"tags": [],
"start": datetime.utcnow().isoformat(),
"stop": datetime.utcnow().isoformat(),
"projects": [],
"quota": []}
@property
def cached_cycle_url(self):
try:
return self._cycle_url
except AttributeError:
self._cycle_url = self.post_data_and_get_url(self.Cycle(), '/cycle/')
return self._cycle_url
def Project(self, description="my project description", name=None, auto_pin=False, auto_ingest=False, cycle_urls=None, project_state="opened"):
if name is None:
name = 'my_project_' + str(uuid.uuid4())
if not cycle_urls:
cycle_urls = [self.cached_cycle_url]
return {"name": name,
"description": description,
"tags": [],
"quota": [],
"priority_rank": 1.0,
"trigger_priority": 1000,
"can_trigger": False,
"private_data": True,
"cycles": cycle_urls,
"auto_pin": auto_pin,
"auto_ingest": auto_ingest,
"project_state": self.django_api_url + '/project_state/%s' % project_state}
@property
def cached_project_url(self):
try:
return self._project_url
except AttributeError:
self._project_url = self.post_data_and_get_url(self.Project(), '/project/')
return self._project_url
def ResourceType(self, description="my resource_type description"):
return {
"tags": [],
"description": description,
"name": 'my_resource_type_' + str(uuid.uuid4()),
"quantity": self.django_api_url + '/quantity/number'
}
@property
def cached_resource_type_url(self):
try:
return self._resource_type_url
except AttributeError:
self._resource_type_url = self.post_data_and_get_url(self.ResourceType(), '/resource_type/')
return self._resource_type_url
def ProjectQuota(self, description="my project quota description", project_url=None, resource_url=None):
if project_url is None:
project_url = self.cached_project_url
if resource_url is None:
resource_url = self.cached_resource_type_url
return {
"value": 1000,
"project": project_url,
"resource_type": resource_url
}
def SchedulingSet(self, name="my_scheduling_set", project_url=None):
if project_url is None:
project_url = self.cached_project_url
return {"name": name,
"description": "This is my scheduling set",
"tags": [],
"project": project_url,
"scheduling_unit_drafts": []}
@property
def cached_scheduling_set_url(self):
try:
return self._scheduling_set_url
except AttributeError:
self._scheduling_set_url = self.post_data_and_get_url(self.SchedulingSet(), '/scheduling_set/')
return self._scheduling_set_url
def SchedulingUnitDraft(self, name="my_scheduling_unit_draft", scheduling_set_url=None, template_url=None, scheduling_constraints_template_url=None, specifications_doc=None, scheduling_constraints_doc=None, scheduling_constraints_template=None, observation_strategy_template_url=None):
if scheduling_set_url is None:
scheduling_set_url = self.cached_scheduling_set_url
if template_url is None:
template_url = self.cached_scheduling_unit_template_url
if scheduling_constraints_template_url is None:
scheduling_constraints_template_url = self.post_data_and_get_url(self.SchedulingConstraintsTemplate(), '/scheduling_constraints_template/')
if scheduling_constraints_doc is None:
scheduling_constraints_doc = self.get_response_as_json_object(scheduling_constraints_template_url+'/default')
data = {"name": name,
"description": "This is my run draft",
"tags": [],
"scheduling_set": scheduling_set_url,
"specifications_template": template_url,
"scheduling_constraints_doc": scheduling_constraints_doc,
"scheduling_constraints_template": scheduling_constraints_template_url,
"observation_strategy_template": observation_strategy_template_url,
"scheduling_unit_blueprints": [],
"task_drafts": []}
if specifications_doc is not None:
data['specifications_doc'] = specifications_doc
return data
@property
def cached_scheduling_unit_draft_url(self):
try:
return self._scheduling_unit_draft_url
except AttributeError:
self._scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/')
return self._scheduling_unit_draft_url
def TaskDraft(self, name=None, scheduling_unit_draft_url=None, template_url=None, specifications_doc=None):
if name is None:
name = str(uuid.uuid4())
if scheduling_unit_draft_url is None:
scheduling_unit_draft_url = self.cached_scheduling_unit_draft_url
if template_url is None:
template_url = self.cached_task_template_url
if specifications_doc is None:
specifications_doc = self.get_response_as_json_object(template_url+'/default')
return {"name": name,
"description": "This is my task draft",
"tags": [],
"specifications_doc": specifications_doc,
"scheduling_unit_draft": scheduling_unit_draft_url,
"specifications_template": template_url,
'task_blueprints': [],
'produced_by': [],
'consumed_by': [],
'first_scheduling_relation': [],
'second_scheduling_relation': [],
"output_pinned": False}
@property
def cached_task_draft_url(self):
try:
return self._task_draft_url
except AttributeError:
self._task_draft_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
return self._task_draft_url
def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None, selection_doc=None):
if producer_url is None:
producer_url = self.cached_task_draft_url
if consumer_url is None:
consumer_url = self.cached_task_draft_url
if template_url is None:
template_url = self.cached_task_relation_selection_template_url
if template_url is None:
template_url = self.cached_task_relation_selection_template_url
if selection_doc is None:
selection_doc = self.get_response_as_json_object(template_url+'/default')
if input_role_url is None:
try:
input_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="input"), '/task_connector_type/')
except:
# duplicate connector created... pick first random input connector
input_role_url = next(c for c in self.get_response_as_json_object(self.django_api_url+'/task_connector_type')['results'] if c['iotype_value']=='input')['url']
if output_role_url is None:
try:
output_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="output"), '/task_connector_type/')
except:
# duplicate connector created... pick first random output connector
output_role_url = next(c for c in self.get_response_as_json_object(self.django_api_url+'/task_connector_type')['results'] if c['iotype_value']=='output')['url']
return {"tags": [],
"blueprints": [],
"selection_doc": selection_doc,
"producer": producer_url,
"consumer": consumer_url,
"input_role": input_role_url,
"output_role": output_role_url,
"selection_template": template_url}
def SchedulingUnitBlueprint(self, name="my_scheduling_unit_blueprint", scheduling_unit_draft_url=None, template_url=None):
if template_url is None:
template_url = self.cached_scheduling_unit_template_url
if scheduling_unit_draft_url is None:
scheduling_unit_draft_url = self.cached_scheduling_unit_draft_url
return {"name": name,
"description": "This is my run blueprint",
"tags": [],
"draft": scheduling_unit_draft_url,
"specifications_template": template_url,
"task_blueprints": []}
@property
def cached_scheduling_unit_blueprint_url(self):
try:
return self._scheduling_unit_blueprint_url
except AttributeError:
self._scheduling_unit_blueprint_url = self.post_data_and_get_url(self.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/')
return self._scheduling_unit_blueprint_url
def TaskBlueprint(self, name=None, draft_url=None, template_url=None, scheduling_unit_blueprint_url=None, specifications_doc=None):
if name is None:
name = str(uuid.uuid4())
if draft_url is None:
draft_url = self.cached_task_draft_url
if template_url is None:
template_url = self.cached_task_template_url
if specifications_doc is None:
specifications_doc = self.get_response_as_json_object(template_url+'/default')
if scheduling_unit_blueprint_url is None:
scheduling_unit_blueprint_url = self.cached_scheduling_unit_blueprint_url
return {"name": name,
"description": "This is my work request blueprint",
"tags": [],
"specifications_doc": specifications_doc,
"draft": draft_url,
"specifications_template": template_url,
"scheduling_unit_blueprint": scheduling_unit_blueprint_url,
"subtasks": [],
"produced_by": [],
"consumed_by": [],
'first_scheduling_relation': [],
'second_scheduling_relation': [],
"output_pinned": False}
@property
def cached_task_blueprint_url(self):
try:
return self._task_blueprint_url
except AttributeError:
self._task_blueprint_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
return self._task_blueprint_url
def TaskRelationBlueprint(self, draft_url=None, template_url=None, input_role_url=None, output_role_url=None, consumer_url=None, producer_url=None, selection_doc=None):
if draft_url is None:
draft_url = self.post_data_and_get_url(self.TaskRelationDraft(), '/task_relation_draft/')
if producer_url is None:
producer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
if consumer_url is None:
consumer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
if template_url is None:
template_url = self.cached_task_relation_selection_template_url
if selection_doc is None:
selection_doc = self.get_response_as_json_object(template_url+'/default')
if input_role_url is None:
try:
input_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="input"), '/task_connector_type/')
except:
# duplicate connector created... pick first random input connector
input_role_url = next(c for c in self.get_response_as_json_object(self.django_api_url+'/task_connector_type')['results'] if c['iotype_value']=='input')['url']
if output_role_url is None:
try:
output_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="output"), '/task_connector_type/')
except:
# duplicate connector created... pick first random output connector
output_role_url = next(c for c in self.get_response_as_json_object(self.django_api_url+'/task_connector_type')['results'] if c['iotype_value']=='output')['url']
# test data
return {"tags": [],
"selection_doc": selection_doc,
"input_role": input_role_url,
"output_role": output_role_url,
"draft": draft_url,
"selection_template": template_url,
"producer": producer_url,
"consumer": consumer_url}
def SubtaskTemplate(self, name="subtask_template_1", schema=None, subtask_type_url: str=None) -> dict:
if schema is None:
schema = minimal_json_schema()
if subtask_type_url is None:
subtask_type_url = self.django_api_url + '/subtask_type/observation'
return {"type": subtask_type_url,
"name": name,
"description": 'My one observation',
"schema": schema,
"tags": ["TMSS", "TESTING"]}
@property
def cached_subtask_template_url(self):
try:
return self._subtask_template_url
except AttributeError:
self._subtask_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/')
return self._subtask_template_url
def TaskSchedulingRelationBlueprint(self, first_url=None, second_url=None, placement="after"):
if first_url is None:
first_url = self.cached_task_blueprint_url
if second_url is None:
second_url = self.cached_task_blueprint_url
return {"tags": [],
"first": first_url,
"second": second_url,
"placement": self.django_api_url + '/scheduling_relation_placement/%s'%placement,
"time_offset":60}
def TaskSchedulingRelationDraft(self, first_url=None, second_url=None, placement="after"):
if first_url is None:
first_url = self.cached_task_draft_url
if second_url is None:
second_url = self.cached_task_draft_url
return {"tags": [],
"blueprints": [],
"first": first_url,
"second": second_url,
"placement": self.django_api_url + '/scheduling_relation_placement/%s'%placement,
"time_offset":60}
def DataproductSpecificationsTemplate(self, name="my_DataproductSpecificationsTemplate") -> dict:
return {"name": name,
"description": 'My one date',
"schema": minimal_json_schema(),
"tags": ["TMSS", "TESTING"]}
def DataproductFeedbackTemplate(self, name="my_DataproductFeedbackTemplate") -> dict:
return {"name": name,
"description": 'My one date',
"schema": minimal_json_schema(),
"tags": ["TMSS", "TESTING"]}
def Cluster(self, name=None):
return {"name": name if name else "Cluster %s" % uuid.uuid4(),
"description": 'My one cluster',
"location": "upstairs",
"archive_site": False,
"tags": ['tmss', 'testing']}
@property
def cached_cluster_url(self):
try:
return self._cluster_url
except AttributeError:
self._cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/')
return self._cluster_url
def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining", scheduled_on_sky_start_time: datetime=None, scheduled_on_sky_stop_time: datetime=None, raw_feedback:str =None, primary: bool=True):
if cluster_url is None:
cluster_url = self.cached_cluster_url
if task_blueprint_url is None:
task_blueprint_url = self.cached_task_blueprint_url
if specifications_template_url is None:
specifications_template_url = self.cached_subtask_template_url
if specifications_doc is None:
specifications_doc = self.get_response_as_json_object(specifications_template_url+'/default')
if scheduled_on_sky_start_time is None:
scheduled_on_sky_start_time = datetime.utcnow()
if scheduled_on_sky_stop_time is None:
scheduled_on_sky_stop_time = scheduled_on_sky_start_time + timedelta(minutes=60)
if isinstance(scheduled_on_sky_start_time, datetime):
scheduled_on_sky_start_time = scheduled_on_sky_start_time.isoformat()
if isinstance(scheduled_on_sky_stop_time, datetime):
scheduled_on_sky_stop_time = scheduled_on_sky_stop_time.isoformat()
return {"scheduled_on_sky_start_time": scheduled_on_sky_start_time,
"scheduled_on_sky_stop_time": scheduled_on_sky_stop_time,
"state": self.django_api_url + '/subtask_state/%s' % (state,),
"specifications_doc": specifications_doc,
"task_blueprint": task_blueprint_url,
"specifications_template": specifications_template_url,
"tags": ["TMSS", "TESTING"],
"cluster": cluster_url,
"raw_feedback": raw_feedback,
"primary": primary}
@property
def cached_subtask_url(self):
try:
return self._subtask_url
except AttributeError:
self._subtask_url = self.post_data_and_get_url(self.Subtask(task_blueprint_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')), '/subtask/')
return self._subtask_url
def SubtaskOutput(self, subtask_url=None):
if subtask_url is None:
subtask_url = self.cached_subtask_url
return {"subtask": subtask_url,
"tags": []}
@property
def cached_subtask_output_url(self):
try:
return self._subtask_output_url
except AttributeError:
self._subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/')
return self._subtask_output_url
def Dataproduct(self, filename="my_filename", directory:str=None,
specifications_doc=None, specifications_template_url=None,
subtask_output_url=None,
dataproduct_feedback_doc=None, dataproduct_feedback_template_url=None,
dataformat="MeasurementSet", datatype="visibilities",
sap_url=None):
if directory is None:
directory = '/tmp/%s/' % uuid.uuid4()
if specifications_template_url is None:
specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/dataproduct_specifications_template/')
if specifications_doc is None:
specifications_doc = self.get_response_as_json_object(specifications_template_url+'/default')
if subtask_output_url is None:
subtask_output_url = self.cached_subtask_output_url
if dataproduct_feedback_template_url is None:
dataproduct_feedback_template_url = self.post_data_and_get_url(self.DataproductFeedbackTemplate(), '/dataproduct_feedback_template/')
if dataproduct_feedback_doc is None:
dataproduct_feedback_doc = self.get_response_as_json_object(dataproduct_feedback_template_url+'/default')
if sap_url is None:
sap_url = self.post_data_and_get_url(self.SAP(), '/sap/')
return {"filename": filename,
"directory": directory,
"dataformat": "%s/dataformat/%s" % (self.django_api_url, dataformat),
"datatype": "%s/datatype/%s" % (self.django_api_url, datatype),
"deleted_since": None,
"specifications_doc": specifications_doc,
"specifications_template": specifications_template_url,
"tags": ["TMSS", "TESTING"],
"producer": subtask_output_url,
"expected_size": 1234,
"size": 123,
"feedback_doc": dataproduct_feedback_doc,
"feedback_template": dataproduct_feedback_template_url,
"sap": sap_url
}
@property
def cached_dataproduct_url(self):
try:
return self._dataproduct_url
except AttributeError:
self._dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
return self._dataproduct_url
def AntennaSet(self, name="antennaset1"):
return {"name": name,
"description": 'My one observation',
"station_type": self.django_api_url + '/station_type/core',
"rcus": [1,2,3,4,5],
"inputs": ['input1', 'input2'],
"tags": ['tmss', 'testing']}
def DataproductTransform(self, input_dataproduct_url=None, output_dataproduct_url=None):
if input_dataproduct_url is None:
input_dataproduct_url = self.cached_dataproduct_url
if output_dataproduct_url is None:
output_dataproduct_url = self.cached_dataproduct_url
return {"input": input_dataproduct_url,
"output": output_dataproduct_url,
"identity": True,
"tags": ['tmss', 'testing']}
def DataproductHash(self, hash_algorithm_url=None, hash="my_hash", dataproduct_url=None):
if hash_algorithm_url is None:
hash_algorithm_url = self.django_api_url + '/hash_algorithm/md5'
if dataproduct_url is None:
dataproduct_url = self.cached_dataproduct_url
return {"dataproduct": dataproduct_url,
"hash_algorithm": hash_algorithm_url,
"hash": hash,
"tags": ['tmss', 'testing']}
def DataproductArchiveInfo(self, storage_ticket="my_storage_ticket", dataproduct_url=None):
if dataproduct_url is None:
dataproduct_url = self.cached_dataproduct_url
return {"dataproduct": dataproduct_url,
"storage_ticket": storage_ticket,
"public_since": datetime.utcnow().isoformat(),
"corrupted_since": datetime.utcnow().isoformat(),
"tags": ['tmss', 'testing']}
def SubtaskInput(self, subtask_url=None, task_relation_blueprint_url=None, dataproduct_urls=None, subtask_output_url=None, task_relation_selection_template_url=None, selection_doc=None):
if subtask_url is None:
subtask_url = self.cached_subtask_url
if task_relation_blueprint_url is None:
task_relation_blueprint_url = self.post_data_and_get_url(self.TaskRelationBlueprint(), '/task_relation_blueprint/')
if dataproduct_urls is None:
dataproduct_urls = [self.cached_dataproduct_url]
if subtask_output_url is None:
subtask_output_url = self.cached_subtask_output_url
if task_relation_selection_template_url is None:
task_relation_selection_template_url = self.cached_task_relation_selection_template_url
if selection_doc is None:
selection_doc = self.get_response_as_json_object(task_relation_selection_template_url+'/default')
return {"subtask": subtask_url,
"task_relation_blueprint": task_relation_blueprint_url,
"producer": subtask_output_url,
"dataproducts": dataproduct_urls,
"selection_doc": selection_doc,
"selection_template": task_relation_selection_template_url,
"tags": []}
def Filesystem(self, name="my_Filesystem", cluster_url=None):
if cluster_url is None:
cluster_url = self.cached_cluster_url
return {"name": name,
"description": 'My one filesystem',
"capacity": 1111111111,
"cluster": cluster_url,
"directory": '/',
"tags": ['tmss', 'testing']}
def SAPTemplate(self):
return {"name": "my_sap_template" + str(uuid.uuid4()),
"description": 'My SAP test template',
"schema": minimal_json_schema(),
"tags": ["TMSS", "TESTING"]}
def SAP(self, specifications_template_url=None, specifications_doc=None):
if specifications_template_url is None:
specifications_template_url = self.post_data_and_get_url(self.SAPTemplate(), '/sap_template/')
if specifications_doc is None:
specifications_doc = self.get_response_as_json_object(specifications_template_url + '/default')
return {"specifications_doc": specifications_doc,
"specifications_template": specifications_template_url,
"tags": ['tmss', 'testing']}
def Reservation(self, name="My Reservation", duration=None, start_time=None, project_url=None,
specifications_template_url=None, specifications_doc=None) -> dict:
if project_url is None:
project_url = self.cached_project_url
if start_time is None:
start_time = datetime.utcnow() + timedelta(hours=12)
if duration is None:
stop_time = None
else:
stop_time = start_time + timedelta(seconds=duration)
if specifications_template_url is None:
specifications_template_url = self.post_data_and_get_url(self.ReservationTemplate(), '/reservation_template/')
if specifications_doc is None:
specifications_doc = self.get_response_as_json_object(specifications_template_url + '/default')
if isinstance(start_time, datetime):
start_time = start_time.isoformat()
if isinstance(stop_time, datetime):
stop_time = stop_time.isoformat()
return {"name": name,
"project": project_url,
"description": "Test Reservation",
"tags": ["TMSS", "TESTING"],
"start_time": start_time,
"stop_time": stop_time, # can be None
"specifications_doc": specifications_doc,
"specifications_template": specifications_template_url}
def ProjectPermission(self, name=None, GET=None, PUT=None, PATCH=None, DELETE=None, POST=None) -> dict:
if name is None:
name = 'MyProjectPermission_%s' % uuid.uuid4()
return {'name': name,
'GET': GET or [],
'PUT': PUT or [],
'PATCH': PATCH or [],
'DELETE': DELETE or [],
'POST': POST or []}
def wipe_cache(self):
cached_url_attributes = [attr for attr in self.__dict__.keys() if attr.startswith('_') and attr.endswith('_url')]
for attr in cached_url_attributes:
if hasattr(self, attr):
delattr(self, attr)