#!/usr/bin/env python3 # Copyright (C) 2018 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. ################################################################################################ # the methods below can be used to to HTTP REST calls to the django server and check the results ################################################################################################ from datetime import datetime, timedelta import uuid import requests import json from http import HTTPStatus from copy import deepcopy from lofar.sas.tmss.test.test_utils import minimal_json_schema class TMSSRESTTestDataCreator(): def __init__(self, django_api_url: str, auth: requests.auth.HTTPBasicAuth): self.django_api_url = django_api_url[:-1] if django_api_url.endswith('/') else django_api_url self.auth = auth def get_response_as_json_object(self, url): """GET the given data the self.django_api_url+url_postfix, and return the response""" return json.loads(requests.get(url, auth=self.auth).content.decode('utf-8')) def post_data_and_get_response(self, data, url_postfix): """POST the given data the self.django_api_url+url_postfix, and return the response""" return requests.post(self.django_api_url + url_postfix, json=data, auth=self.auth) def post_data_and_get_response_as_json_object(self, data, url_postfix): """POST the given data the self.django_api_url+url_postfix, and return the response""" response = self.post_data_and_get_response(data, url_postfix) if response.status_code == HTTPStatus.CREATED: return json.loads(response.content.decode('utf-8')) raise Exception("Error during POST request of '%s' status=%s content: %s" % (url_postfix, response.status_code, response.content.decode('utf-8'))) def post_data_and_get_url(self, data, url_postfix): """POST the given data the self.django_api_url+url_postfix, and return the response's url""" result = self.post_data_and_get_response_as_json_object(data, url_postfix) try: url = result['url'] except KeyError: # Because I don't like 'Bad Request' errors, I want more content if it goes wrong raise Exception("Error during POST request of '%s' result is '%s'" % (url_postfix, result)) return url def update_schema_from_template(self, model_name:str, template_test_data:dict) -> dict: '''helper method to update the schema subdict in the template_test_data dict with the auto-injected-by-the-backend-properties if needed''' updated_test_data = deepcopy(template_test_data) if 'schema' in updated_test_data: if 'name' in template_test_data and 'version' in template_test_data: updated_test_data['schema']['$id'] = "%s/schemas/%s/%s/%s#" % (self.django_api_url, model_name, template_test_data['name'], template_test_data['version']) else: updated_test_data['schema'].pop('$id','') if 'name' in template_test_data: updated_test_data['schema']['title'] = template_test_data['name'] else: updated_test_data['schema'].pop('title','') if 'description' in template_test_data: updated_test_data['schema']['description'] = template_test_data['description'] else: updated_test_data['schema'].pop('description','') if 'version' in template_test_data: updated_test_data['schema']['version'] = template_test_data['version'] else: updated_test_data['schema'].pop('version','') return updated_test_data def update_document_from_template(self, model_name:str, data:dict, document_key:str, template_key:str) -> dict: updated_data = deepcopy(data) updated_data[document_key] = self.update_schema_from_template(model_name, updated_data[document_key]) return updated_data ####################################################### # the methods below can be used to create test data # naming convention is: <django_model_name>() ####################################################### def GeneratorTemplate(self, name="generatortemplate") -> dict: return {"name": name, "description": 'My one observation', "schema": minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}}), "create_function": 'Funky', "tags": ["TMSS", "TESTING"]} def SchedulingUnitTemplate(self, name="schedulingunittemplate1", schema:dict=None) -> dict: if schema is None: schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}}) return { "name": name, "description": 'My description', "schema": schema, "tags": ["TMSS", "TESTING"]} @property def cached_scheduling_unit_template_url(self): try: return self._scheduling_unit_template_url except AttributeError: self._scheduling_unit_template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/') return self._scheduling_unit_template_url def SchedulingConstraintsTemplate(self, name="schedulingcontraintstemplate1", schema:dict=None) -> dict: if schema is None: schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}}) return { "name": name, "description": 'My description', "schema": schema, "tags": ["TMSS", "TESTING"]} def ReservationTemplate(self, name="reservationtemplate1", schema:dict=None) -> dict: if schema is None: schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}}) return { "name": name, "description": 'My description', "schema": schema, "tags": ["TMSS", "TESTING"]} def SchedulingUnitObservingStrategyTemplate(self, name="my_SchedulingUnitObservingStrategyTemplate", scheduling_unit_template_url=None, template:dict=None) -> dict: if scheduling_unit_template_url is None: scheduling_unit_template_url = self.cached_scheduling_unit_template_url if template is None: template = self.get_response_as_json_object(scheduling_unit_template_url+'/default') return {"name": name, "description": 'My SchedulingUnitTemplate description', "template": template, "scheduling_unit_template": scheduling_unit_template_url, "tags": ["TMSS", "TESTING"]} def TaskTemplate(self, name="tasktemplate1", task_type_url: str = None) -> dict: if task_type_url is None: task_type_url = self.django_api_url + '/task_type/observation' return {"name": name, "description": 'My one observation', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"], "type": task_type_url, "validation_code_js": "???"} @property def cached_task_template_url(self): try: return self._task_template_url except AttributeError: self._task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/') return self._task_template_url def TaskRelationSelectionTemplate(self, name="taskrelationselectiontemplate1") -> dict: return {"name": name, "description": 'My one observation', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} @property def cached_task_relation_selection_template_url(self): try: return self._task_relation_selection_template_url except AttributeError: self._task_relation_selection_template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/') return self._task_relation_selection_template_url def TaskConnectorType(self, role="correlator", input_of_url=None, output_of_url=None): if input_of_url is None: input_of_url = self.cached_task_template_url if output_of_url is None: output_of_url = self.cached_task_template_url return {"role": self.django_api_url + '/role/%s'%role, "datatype": self.django_api_url + '/datatype/image', "dataformats": [self.django_api_url + '/dataformat/Beamformed'], "output_of": output_of_url, "input_of": input_of_url, "tags": []} def DefaultTemplates(self, name="defaulttemplate"): return {"name": name, "template": None, "tags": []} def Cycle(self, description="my cycle description"): return {"name": 'my_cycle_' + str(uuid.uuid4()), "description": description, "tags": [], "start": datetime.utcnow().isoformat(), "stop": datetime.utcnow().isoformat(), "projects": [], "quota": []} @property def cached_cycle_url(self): try: return self._cycle_url except AttributeError: self._cycle_url = self.post_data_and_get_url(self.Cycle(), '/cycle/') return self._cycle_url def Project(self, description="my project description", name=None, auto_pin=False, cycle_urls=[]): if name is None: name = 'my_project_' + str(uuid.uuid4()) if not cycle_urls: cycle_urls = [self.cached_cycle_url] return {"name": name, "description": description, "tags": [], "quota": [], "priority_rank": 1.0, "trigger_priority": 1000, "can_trigger": False, "private_data": True, "cycles": cycle_urls, "archive_subdirectory": 'my_project/', "auto_pin": auto_pin} @property def cached_project_url(self): try: return self._project_url except AttributeError: self._project_url = self.post_data_and_get_url(self.Project(), '/project/') return self._project_url def ResourceType(self, description="my resource_type description"): return { "tags": [], "description": description, "name": 'my_resource_type_' + str(uuid.uuid4()), "quantity": self.django_api_url + '/quantity/number' } @property def cached_resource_type_url(self): try: return self._resource_type_url except AttributeError: self._resource_type_url = self.post_data_and_get_url(self.ResourceType(), '/resource_type/') return self._resource_type_url def ProjectQuota(self, description="my project quota description", project_url=None, resource_url=None): if project_url is None: project_url = self.cached_project_url if resource_url is None: resource_url = self.cached_resource_type_url return { "value": 1000, "project": project_url, "resource_type": resource_url } def SchedulingSet(self, name="my_scheduling_set", project_url=None, generator_template_url=None, generator_doc=None): if project_url is None: project_url = self.cached_project_url if generator_template_url is None: generator_template_url = self.post_data_and_get_url(self.GeneratorTemplate(), '/generator_template/') if generator_doc is None: generator_doc = self.get_response_as_json_object(generator_template_url+'/default') return {"name": name, "description": "This is my scheduling set", "tags": [], "generator_doc": generator_doc, "project": project_url, "generator_template": generator_template_url, "generator_source": None, "scheduling_unit_drafts": []} @property def cached_scheduling_set_url(self): try: return self._scheduling_set_url except AttributeError: self._scheduling_set_url = self.post_data_and_get_url(self.SchedulingSet(), '/scheduling_set/') return self._scheduling_set_url def SchedulingUnitDraft(self, name="my_scheduling_unit_draft", scheduling_set_url=None, template_url=None, scheduling_constraints_template_url=None, requirements_doc=None, scheduling_constraints_doc=None, scheduling_constraints_template=None, observation_strategy_template_url=None): if scheduling_set_url is None: scheduling_set_url = self.cached_scheduling_set_url if template_url is None: template_url = self.cached_scheduling_unit_template_url if scheduling_constraints_template_url is None: scheduling_constraints_template_url = self.post_data_and_get_url(self.SchedulingConstraintsTemplate(), '/scheduling_constraints_template/') if requirements_doc is None: requirements_doc = self.get_response_as_json_object(template_url+'/default') if scheduling_constraints_doc is None: scheduling_constraints_doc = self.get_response_as_json_object(scheduling_constraints_template_url+'/default') return {"name": name, "description": "This is my run draft", "tags": [], "requirements_doc": requirements_doc, "copy_reason": self.django_api_url + '/copy_reason/template', "generator_instance_doc": {}, "copies": None, "scheduling_set": scheduling_set_url, "requirements_template": template_url, "scheduling_constraints_doc": scheduling_constraints_doc, "scheduling_constraints_template": scheduling_constraints_template_url, "observation_strategy_template": observation_strategy_template_url, "scheduling_unit_blueprints": [], "task_drafts": []} @property def cached_scheduling_unit_draft_url(self): try: return self._scheduling_unit_draft_url except AttributeError: self._scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/') return self._scheduling_unit_draft_url def TaskDraft(self, name=None, scheduling_unit_draft_url=None, template_url=None, specifications_doc=None): if name is None: name = str(uuid.uuid4()) if scheduling_unit_draft_url is None: scheduling_unit_draft_url = self.cached_scheduling_unit_draft_url if template_url is None: template_url = self.cached_task_template_url if specifications_doc is None: specifications_doc = self.get_response_as_json_object(template_url+'/default') return {"name": name, "description": "This is my task draft", "tags": [], "specifications_doc": specifications_doc, "copy_reason": self.django_api_url + '/copy_reason/template', "copies": None, "scheduling_unit_draft": scheduling_unit_draft_url, "specifications_template": template_url, 'task_blueprints': [], 'produced_by': [], 'consumed_by': [], 'first_scheduling_relation': [], 'second_scheduling_relation': [], "output_pinned": False} @property def cached_task_draft_url(self): try: return self._task_draft_url except AttributeError: self._task_draft_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/') return self._task_draft_url def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None, selection_doc=None): if producer_url is None: producer_url = self.cached_task_draft_url if consumer_url is None: consumer_url = self.cached_task_draft_url if template_url is None: template_url = self.cached_task_relation_selection_template_url if template_url is None: template_url = self.cached_task_relation_selection_template_url if selection_doc is None: selection_doc = self.get_response_as_json_object(template_url+'/default') if input_role_url is None: input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') if output_role_url is None: output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') return {"tags": [], "selection_doc": selection_doc, "dataformat": self.django_api_url + "/dataformat/Beamformed", "producer": producer_url, "consumer": consumer_url, "input_role": input_role_url, "output_role": output_role_url, "selection_template": template_url, 'related_task_relation_blueprint': []} def SchedulingUnitBlueprint(self, name="my_scheduling_unit_blueprint", scheduling_unit_draft_url=None, template_url=None, requirements_doc:dict=None): if template_url is None: template_url = self.cached_scheduling_unit_template_url if scheduling_unit_draft_url is None: scheduling_unit_draft_url = self.cached_scheduling_unit_draft_url if requirements_doc is None: requirements_doc = self.get_response_as_json_object(template_url+'/default') return {"name": name, "description": "This is my run blueprint", "tags": [], "requirements_doc": requirements_doc, "do_cancel": False, "draft": scheduling_unit_draft_url, "requirements_template": template_url, "task_blueprints": []} @property def cached_scheduling_unit_blueprint_url(self): try: return self._scheduling_unit_blueprint_url except AttributeError: self._scheduling_unit_blueprint_url = self.post_data_and_get_url(self.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/') return self._scheduling_unit_blueprint_url def TaskBlueprint(self, name=None, draft_url=None, template_url=None, scheduling_unit_blueprint_url=None, specifications_doc=None): if name is None: name = str(uuid.uuid4()) if draft_url is None: draft_url = self.cached_task_draft_url if template_url is None: template_url = self.cached_task_template_url if specifications_doc is None: specifications_doc = self.get_response_as_json_object(template_url+'/default') if scheduling_unit_blueprint_url is None: scheduling_unit_blueprint_url = self.cached_scheduling_unit_blueprint_url return {"name": name, "description": "This is my work request blueprint", "tags": [], "specifications_doc": specifications_doc, "do_cancel": False, "draft": draft_url, "specifications_template": template_url, "scheduling_unit_blueprint": scheduling_unit_blueprint_url, "subtasks": [], "produced_by": [], "consumed_by": [], 'first_scheduling_relation': [], 'second_scheduling_relation': [], "output_pinned": False} @property def cached_task_blueprint_url(self): try: return self._task_blueprint_url except AttributeError: self._task_blueprint_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/') return self._task_blueprint_url def TaskRelationBlueprint(self, draft_url=None, template_url=None, input_role_url=None, output_role_url=None, consumer_url=None, producer_url=None, selection_doc=None): if draft_url is None: draft_url = self.post_data_and_get_url(self.TaskRelationDraft(), '/task_relation_draft/') if producer_url is None: producer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/') if consumer_url is None: consumer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/') if template_url is None: template_url = self.cached_task_relation_selection_template_url if selection_doc is None: selection_doc = self.get_response_as_json_object(template_url+'/default') if input_role_url is None: input_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') if output_role_url is None: output_role_url = self.post_data_and_get_url(self.TaskConnectorType(), '/task_connector_type/') # test data return {"tags": [], "selection_doc": selection_doc, "dataformat": self.django_api_url + '/dataformat/MeasurementSet', "input_role": input_role_url, "output_role": output_role_url, "draft": draft_url, "selection_template": template_url, "producer": producer_url, "consumer": consumer_url} def SubtaskTemplate(self, name="subtask_template_1", schema=None, subtask_type_url: str=None) -> dict: if schema is None: schema = minimal_json_schema() if subtask_type_url is None: subtask_type_url = self.django_api_url + '/subtask_type/observation' return {"type": subtask_type_url, "name": name, "description": 'My one observation', "schema": schema, "realtime": True, "queue": False, "tags": ["TMSS", "TESTING"]} @property def cached_subtask_template_url(self): try: return self._subtask_template_url except AttributeError: self._subtask_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/') return self._subtask_template_url def TaskSchedulingRelationBlueprint(self, first_url=None, second_url=None, placement="after"): if first_url is None: first_url = self.cached_task_blueprint_url if second_url is None: second_url = self.cached_task_blueprint_url return {"tags": [], "first": first_url, "second": second_url, "placement": self.django_api_url + '/scheduling_relation_placement/%s'%placement, "time_offset":60} def TaskSchedulingRelationDraft(self, first_url=None, second_url=None, placement="after"): if first_url is None: first_url = self.cached_task_draft_url if second_url is None: second_url = self.cached_task_draft_url return {"tags": [], "first": first_url, "second": second_url, "placement": self.django_api_url + '/scheduling_relation_placement/%s'%placement, "time_offset":60} def DataproductSpecificationsTemplate(self, name="my_DataproductSpecificationsTemplate") -> dict: return {"name": name, "description": 'My one date', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def DataproductFeedbackTemplate(self, name="my_DataproductFeedbackTemplate") -> dict: return {"name": name, "description": 'My one date', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def DefaultSubtaskTemplates(self, name=None, template_url=None): if template_url is None: template_url = self.cached_subtask_template_url return {"name": name if name else "default_template_%s" % uuid.uuid4(), "template": template_url, "tags": []} def Cluster(self, name=None): return {"name": name if name else "Cluster %s" % uuid.uuid4(), "description": 'My one cluster', "location": "upstairs", "archive_site": False, "tags": ['tmss', 'testing']} @property def cached_cluster_url(self): try: return self._cluster_url except AttributeError: self._cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/') return self._cluster_url def Subtask(self, cluster_url=None, task_blueprint_url=None, specifications_template_url=None, specifications_doc=None, state:str="defining", start_time: datetime=None, stop_time: datetime=None, raw_feedack:str =None): if cluster_url is None: cluster_url = self.cached_cluster_url if task_blueprint_url is None: task_blueprint_url = self.cached_task_blueprint_url if specifications_template_url is None: specifications_template_url = self.cached_subtask_template_url if specifications_doc is None: specifications_doc = self.get_response_as_json_object(specifications_template_url+'/default') if start_time is None: start_time = datetime.utcnow() if stop_time is None: stop_time = start_time + timedelta(minutes=60) if isinstance(start_time, datetime): start_time = start_time.isoformat() if isinstance(stop_time, datetime): stop_time = stop_time.isoformat() return {"start_time": start_time, "stop_time": stop_time, "state": self.django_api_url + '/subtask_state/%s' % (state,), "specifications_doc": specifications_doc, "task_blueprint": task_blueprint_url, "specifications_template": specifications_template_url, "tags": ["TMSS", "TESTING"], "do_cancel": datetime.utcnow().isoformat(), "cluster": cluster_url, "raw_feedback": raw_feedack} @property def cached_subtask_url(self): try: return self._subtask_url except AttributeError: self._subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/') return self._subtask_url def SubtaskOutput(self, subtask_url=None): if subtask_url is None: subtask_url = self.cached_subtask_url return {"subtask": subtask_url, "tags": []} @property def cached_subtask_output_url(self): try: return self._subtask_output_url except AttributeError: self._subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/') return self._subtask_output_url def Dataproduct(self, filename="my_filename", directory="/tmp/", specifications_doc=None, specifications_template_url=None, subtask_output_url=None, dataproduct_feedback_doc=None, dataproduct_feedback_template_url=None, dataformat="MeasurementSet", datatype="visibilities", sap_url=None): if specifications_template_url is None: specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/dataproduct_specifications_template/') if specifications_doc is None: specifications_doc = self.get_response_as_json_object(specifications_template_url+'/default') if subtask_output_url is None: subtask_output_url = self.cached_subtask_output_url if dataproduct_feedback_template_url is None: dataproduct_feedback_template_url = self.post_data_and_get_url(self.DataproductFeedbackTemplate(), '/dataproduct_feedback_template/') if dataproduct_feedback_doc is None: dataproduct_feedback_doc = self.get_response_as_json_object(dataproduct_feedback_template_url+'/default') if sap_url is None: sap_url = self.post_data_and_get_url(self.SAP(), '/sap/') return {"filename": filename, "directory": directory, "dataformat": "%s/dataformat/%s" % (self.django_api_url, dataformat), "datatype": "%s/datatype/%s" % (self.django_api_url, datatype), "deleted_since": None, "specifications_doc": specifications_doc, "specifications_template": specifications_template_url, "tags": ["TMSS", "TESTING"], "producer": subtask_output_url, "do_cancel": None, "expected_size": 1234, "size": 123, "feedback_doc": dataproduct_feedback_doc, "feedback_template": dataproduct_feedback_template_url, "sap": sap_url } @property def cached_dataproduct_url(self): try: return self._dataproduct_url except AttributeError: self._dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/') return self._dataproduct_url def AntennaSet(self, name="antennaset1"): return {"name": name, "description": 'My one observation', "station_type": self.django_api_url + '/station_type/core', "rcus": [1,2,3,4,5], "inputs": ['input1', 'input2'], "tags": ['tmss', 'testing']} def DataproductTransform(self, input_dataproduct_url=None, output_dataproduct_url=None): if input_dataproduct_url is None: input_dataproduct_url = self.cached_dataproduct_url if output_dataproduct_url is None: output_dataproduct_url = self.cached_dataproduct_url return {"input": input_dataproduct_url, "output": output_dataproduct_url, "identity": True, "tags": ['tmss', 'testing']} def DataproductHash(self, algorithm_url=None, hash="my_hash", dataproduct_url=None): if algorithm_url is None: algorithm_url = self.django_api_url + '/algorithm/md5' if dataproduct_url is None: dataproduct_url = self.cached_dataproduct_url return {"dataproduct": dataproduct_url, "algorithm": algorithm_url, "hash": hash, "tags": ['tmss', 'testing']} def DataproductArchiveInfo(self, storage_ticket="my_storage_ticket", dataproduct_url=None): if dataproduct_url is None: dataproduct_url = self.cached_dataproduct_url return {"dataproduct": dataproduct_url, "storage_ticket": storage_ticket, "public_since": datetime.utcnow().isoformat(), "corrupted_since": datetime.utcnow().isoformat(), "tags": ['tmss', 'testing']} def SubtaskInput(self, subtask_url=None, task_relation_blueprint_url=None, dataproduct_urls=None, subtask_output_url=None, task_relation_selection_template_url=None, selection_doc=None): if subtask_url is None: subtask_url = self.cached_subtask_url if task_relation_blueprint_url is None: task_relation_blueprint_url = self.post_data_and_get_url(self.TaskRelationBlueprint(), '/task_relation_blueprint/') if dataproduct_urls is None: dataproduct_urls = [self.cached_dataproduct_url] if subtask_output_url is None: subtask_output_url = self.cached_subtask_output_url if task_relation_selection_template_url is None: task_relation_selection_template_url = self.cached_task_relation_selection_template_url if selection_doc is None: selection_doc = self.get_response_as_json_object(task_relation_selection_template_url+'/default') return {"subtask": subtask_url, "task_relation_blueprint": task_relation_blueprint_url, "producer": subtask_output_url, "dataproducts": dataproduct_urls, "selection_doc": selection_doc, "selection_template": task_relation_selection_template_url, "tags": []} def Filesystem(self, name="my_Filesystem", cluster_url=None): if cluster_url is None: cluster_url = self.cached_cluster_url return {"name": name, "description": 'My one filesystem', "capacity": 1111111111, "cluster": cluster_url, "directory": '/', "tags": ['tmss', 'testing']} def SAPTemplate(self): return {"name": "my_sap_template" + str(uuid.uuid4()), "description": 'My SAP test template', "schema": minimal_json_schema(), "tags": ["TMSS", "TESTING"]} def SAP(self, specifications_template_url=None, specifications_doc=None): if specifications_template_url is None: specifications_template_url = self.post_data_and_get_url(self.SAPTemplate(), '/sap_template/') if specifications_doc is None: specifications_doc = self.get_response_as_json_object(specifications_template_url + '/default') return {"specifications_doc": specifications_doc, "specifications_template": specifications_template_url, "tags": ['tmss', 'testing']} def Reservation(self, name="My Reservation", duration=None, start_time=None, project_url=None, specifications_template_url=None, specifications_doc=None) -> dict: if project_url is None: project_url = self.cached_project_url if start_time is None: start_time = datetime.utcnow() + timedelta(hours=12) if specifications_template_url is None: specifications_template_url = self.post_data_and_get_url(self.ReservationTemplate(), '/reservation_template/') if specifications_doc is None: specifications_doc = self.get_response_as_json_object(specifications_template_url + '/default') if isinstance(start_time, datetime): start_time = start_time.isoformat() return {"name": name, "project": project_url, "description": "Test Reservation", "tags": ["TMSS", "TESTING"], "start_time": start_time, "duration": duration, # can be None "specifications_doc": specifications_doc, "specifications_template": specifications_template_url} def ProjectPermission(self, name=None, GET=None, PUT=None, PATCH=None, DELETE=None, POST=None) -> dict: if name is None: name = 'MyProjectPermission_%s' % uuid.uuid4() return {'name': name, 'GET': GET or [], 'PUT': PUT or [], 'PATCH': PATCH or [], 'DELETE': DELETE or [], 'POST': POST or []} def wipe_cache(self): for attr in ['_dataproduct_url', '_subtask_url', '_subtask_output_url', '_subtask_template_url', '_cluster_url', '_cycle_url', '_project_url', '_resource_type_url', '_scheduling_set_url', '_scheduling_unit_blueprint_url', '_task_blueprint_url', '_task_draft_url']: if hasattr(self, attr): delattr(self, attr)