Skip to content
Snippets Groups Projects
Select Git revision
  • f4eaaa9ab601af66635ab83c3a5cbcdf54e88ad6
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

schedulingunitflow.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tmss_test_data_rest.py 38.83 KiB
    #!/usr/bin/env python3
    
    # Copyright (C) 2018    ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it and/or
    # modify it under the terms of the GNU General Public License as published
    # by the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    
    ################################################################################################
    # the methods below can be used to to HTTP REST calls to the django server and check the results
    ################################################################################################
    
    from datetime import datetime, timedelta
    import uuid
    import requests
    import json
    from http import HTTPStatus
    from copy import deepcopy
    from lofar.sas.tmss.test.test_utils import minimal_json_schema
    
    
    class TMSSRESTTestDataCreator():
        def __init__(self, django_api_url: str, auth: requests.auth.HTTPBasicAuth):
            self.django_api_url = django_api_url[:-1] if django_api_url.endswith('/') else django_api_url
    
            self.auth = auth
    
        def get_response_as_json_object(self, url):
            """GET the given data the self.django_api_url+url_postfix, and return the response"""
            return json.loads(requests.get(url, auth=self.auth).content.decode('utf-8'))
    
        def post_data_and_get_response(self, data, url_postfix):
            """POST the given data the self.django_api_url+url_postfix, and return the response"""
            return requests.post(self.django_api_url + url_postfix, json=data, auth=self.auth)
    
        def post_data_and_get_response_as_json_object(self, data, url_postfix):
            """POST the given data the self.django_api_url+url_postfix, and return the response"""
            response = self.post_data_and_get_response(data, url_postfix)
            if response.status_code == HTTPStatus.CREATED:
                return json.loads(response.content.decode('utf-8'))
            raise Exception("Error during POST request of '%s' status=%s content: %s" % (url_postfix, response.status_code, response.content.decode('utf-8')))
    
        def post_data_and_get_url(self, data, url_postfix):
            """POST the given data the self.django_api_url+url_postfix, and return the response's url"""
            result = self.post_data_and_get_response_as_json_object(data, url_postfix)
            try:
                url = result['url']
            except KeyError:
                # Because I don't like 'Bad Request' errors, I want more content if it goes wrong
                raise Exception("Error during POST request of '%s' result is '%s'" % (url_postfix, result))
            return url
    
        def update_schema_from_template(self, model_name:str, template_test_data:dict) -> dict:
            '''helper method to update the schema subdict in the template_test_data dict with the auto-injected-by-the-backend-properties if needed'''
            updated_test_data = deepcopy(template_test_data)
    
            if 'schema' in updated_test_data:
                if 'name' in template_test_data and 'version' in template_test_data:
                    updated_test_data['schema']['$id'] = "%s/schemas/%s/%s/%s#" % (self.django_api_url,
                                                                                   model_name,
                                                                                   template_test_data['name'],
                                                                                   template_test_data['version'])
                else:
                    updated_test_data['schema'].pop('$id','')
    
    
                if 'name' in template_test_data:
                    updated_test_data['schema']['title'] = template_test_data['name']
                else:
                    updated_test_data['schema'].pop('title','')
    
                if 'description' in template_test_data:
                    updated_test_data['schema']['description'] = template_test_data['description']
                else:
                    updated_test_data['schema'].pop('description','')
    
                if 'version' in template_test_data:
                    updated_test_data['schema']['version'] = template_test_data['version']
                else:
                    updated_test_data['schema'].pop('version','')
    
            return updated_test_data
    
        def update_document_from_template(self, model_name:str, data:dict, document_key:str, template_key:str) -> dict:
            updated_data = deepcopy(data)
            updated_data[document_key] = self.update_schema_from_template(model_name, updated_data[document_key])
            return updated_data
    
        #######################################################
        # the methods below can be used to create test data
        # naming convention is: <django_model_name>()
        #######################################################
        
        
        def GeneratorTemplate(self, name="generatortemplate") -> dict:
            return {"name": name,
                    "description": 'My one observation',
                    "schema": minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}}),
                    "create_function": 'Funky',
                    "tags": ["TMSS", "TESTING"]}
        
        def SchedulingUnitTemplate(self, name="schedulingunittemplate1", schema:dict=None) -> dict:
            if schema is None:
                schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}})
    
            return { "name": name,
                     "description": 'My description',
                     "schema": schema,
                     "tags": ["TMSS", "TESTING"]}
    
        @property
        def cached_scheduling_unit_template_url(self):
            try:
                return self._scheduling_unit_template_url
            except AttributeError:
                self._scheduling_unit_template_url = self.post_data_and_get_url(self.SchedulingUnitTemplate(), '/scheduling_unit_template/')
                return self._scheduling_unit_template_url
    
    
        def SchedulingConstraintsTemplate(self, name="schedulingcontraintstemplate1", schema:dict=None) -> dict:
            if schema is None:
                schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}})
    
            return { "name": name,
                     "description": 'My description',
                     "schema": schema,
                     "tags": ["TMSS", "TESTING"]}
    
    
        def ReservationTemplate(self, name="reservationtemplate1", schema:dict=None) -> dict:
            if schema is None:
                schema = minimal_json_schema(properties={"foo": {"type": "string", "default": "bar"}})
    
            return { "name": name,
                     "description": 'My description',
                     "schema": schema,
                     "tags": ["TMSS", "TESTING"]}
    
        @property
        def cached_reservation_template_url(self):
            try:
                return self._reservation_template_url
            except AttributeError:
                self._reservation_template_url = self.post_data_and_get_url(self.ReservationTemplate(), '/reservation_template/')
                return self._reservation_template_url
    
        def ReservationStrategyTemplate(self, name="my_ReservationStrategyTemplate",
                                              reservation_template_url=None,
                                              template:dict=None) -> dict:
            if reservation_template_url is None:
                reservation_template_url = self.cached_reservation_template_url
    
            if template is None:
                template = self.get_response_as_json_object(reservation_template_url+'/default')
    
            return {"name": name,
                    "description": 'My ReservationTemplate description',
                    "template": template,
                    "reservation_template": reservation_template_url,
                    "version": "1",
                    "tags": ["TMSS", "TESTING"]}
    
        def SchedulingUnitObservingStrategyTemplate(self, name="my_SchedulingUnitObservingStrategyTemplate",
                                                          scheduling_unit_template_url=None,
                                                          template:dict=None) -> dict:
            if scheduling_unit_template_url is None:
                scheduling_unit_template_url = self.cached_scheduling_unit_template_url
    
            if template is None:
                template = self.get_response_as_json_object(scheduling_unit_template_url+'/default')
    
            return {"name": name,
                    "description": 'My SchedulingUnitTemplate description',
                    "template": template,
                    "scheduling_unit_template": scheduling_unit_template_url,
                    "tags": ["TMSS", "TESTING"]}
    
        def TaskTemplate(self, name="tasktemplate1", task_type_url: str = None) -> dict:
            if task_type_url is None:
                task_type_url = self.django_api_url + '/task_type/observation'
    
            return {"name": name,
                    "description": 'My one observation',
                    "schema": minimal_json_schema(),
                    "tags": ["TMSS", "TESTING"],
                    "type": task_type_url,
                    "validation_code_js": "???"}
    
        @property
        def cached_task_template_url(self):
            try:
                return self._task_template_url
            except AttributeError:
                self._task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
                return self._task_template_url
    
    
        def TaskRelationSelectionTemplate(self, name="taskrelationselectiontemplate1") -> dict:
            return {"name": name,
                    "description": 'My one observation',
                    "schema": minimal_json_schema(),
                    "tags": ["TMSS", "TESTING"]}
    
        @property
        def cached_task_relation_selection_template_url(self):
            try:
                return self._task_relation_selection_template_url
            except AttributeError:
                self._task_relation_selection_template_url = self.post_data_and_get_url(self.TaskRelationSelectionTemplate(), '/task_relation_selection_template/')
                return self._task_relation_selection_template_url
    
    
        def TaskConnectorType(self, role="correlator", iotype="output", task_template_url=None):
            if task_template_url is None:
                task_template_url = self.cached_task_template_url
        
            return {"role": self.django_api_url + '/role/%s'%role,
                    "datatype": self.django_api_url + '/datatype/image',
                    "dataformat": self.django_api_url + '/dataformat/Beamformed',
                    "task_template": task_template_url,
                    "iotype": self.django_api_url + '/iotype/%s'%iotype,
                    "tags": []}
    
    
        def DefaultTemplates(self, name="defaulttemplate"):
            return {"name": name,
                    "template": None,
                    "tags": []}
    
        def Cycle(self, description="my cycle description"):
            return {"name": 'my_cycle_' + str(uuid.uuid4()),
                    "description": description,
                    "tags": [],
                    "start": datetime.utcnow().isoformat(),
                    "stop": datetime.utcnow().isoformat(),
                    "projects": [],
                    "quota": []}
        
        @property
        def cached_cycle_url(self):
            try:
                return self._cycle_url
            except AttributeError:
                self._cycle_url = self.post_data_and_get_url(self.Cycle(), '/cycle/')
                return self._cycle_url
    
        def Project(self, description="my project description", name=None, auto_pin=False, auto_ingest=False, cycle_urls=[]):
            if name is None:
                name = 'my_project_' + str(uuid.uuid4())
    
            if not cycle_urls:
                cycle_urls = [self.cached_cycle_url]
    
            return {"name": name,
                    "description": description,
                    "tags": [],
                    "quota": [],
                    "priority_rank": 1.0,
                    "trigger_priority": 1000,
                    "can_trigger": False,
                    "private_data": True,
                    "cycles": cycle_urls,
                    "auto_pin": auto_pin,
                    "auto_ingest": auto_ingest}
    
        @property
        def cached_project_url(self):
            try:
                return self._project_url
            except AttributeError:
                self._project_url = self.post_data_and_get_url(self.Project(), '/project/')
                return self._project_url
    
        def ResourceType(self, description="my resource_type description"):
            return {
                "tags": [],
                "description": description,
                "name": 'my_resource_type_' + str(uuid.uuid4()),
                "quantity": self.django_api_url + '/quantity/number'
            }
    
        @property
        def cached_resource_type_url(self):
            try:
                return self._resource_type_url
            except AttributeError:
                self._resource_type_url = self.post_data_and_get_url(self.ResourceType(), '/resource_type/')
                return self._resource_type_url
    
        def ProjectQuota(self, description="my project quota description", project_url=None, resource_url=None):
            if project_url is None:
                project_url = self.cached_project_url
    
            if resource_url is None:
                resource_url = self.cached_resource_type_url
    
            return {
                "value": 1000,
                "project": project_url,
                "resource_type": resource_url
                }
        
    
        def SchedulingSet(self, name="my_scheduling_set", project_url=None, generator_template_url=None, generator_doc=None):
            if project_url is None:
                project_url = self.cached_project_url
        
            if generator_template_url is None:
                generator_template_url = self.post_data_and_get_url(self.GeneratorTemplate(), '/generator_template/')
    
            if generator_doc is None:
                generator_doc = self.get_response_as_json_object(generator_template_url+'/default')
    
            return {"name": name,
                    "description": "This is my scheduling set",
                    "tags": [],
                    "generator_doc": generator_doc,
                    "project": project_url,
                    "generator_template": generator_template_url,
                    "generator_source": None,
                    "scheduling_unit_drafts": []}
    
        @property
        def cached_scheduling_set_url(self):
            try:
                return self._scheduling_set_url
            except AttributeError:
                self._scheduling_set_url = self.post_data_and_get_url(self.SchedulingSet(), '/scheduling_set/')
                return self._scheduling_set_url
    
    
        def SchedulingUnitDraft(self, name="my_scheduling_unit_draft", scheduling_set_url=None, template_url=None, scheduling_constraints_template_url=None, requirements_doc=None, scheduling_constraints_doc=None, scheduling_constraints_template=None, observation_strategy_template_url=None):
            if scheduling_set_url is None:
                scheduling_set_url = self.cached_scheduling_set_url
        
            if template_url is None:
                template_url = self.cached_scheduling_unit_template_url
    
            if scheduling_constraints_template_url is None:
                scheduling_constraints_template_url = self.post_data_and_get_url(self.SchedulingConstraintsTemplate(), '/scheduling_constraints_template/')
    
            if requirements_doc is None:
                requirements_doc = self.get_response_as_json_object(template_url+'/default')
    
            if scheduling_constraints_doc is None:
                scheduling_constraints_doc = self.get_response_as_json_object(scheduling_constraints_template_url+'/default')
    
            return {"name": name,
                    "description": "This is my run draft",
                    "tags": [],
                    "requirements_doc": requirements_doc,
                    "copy_reason": self.django_api_url + '/copy_reason/template',
                    "generator_instance_doc": {},
                    "copies": None,
                    "scheduling_set": scheduling_set_url,
                    "requirements_template": template_url,
                    "scheduling_constraints_doc": scheduling_constraints_doc,
                    "scheduling_constraints_template": scheduling_constraints_template_url,
                    "observation_strategy_template": observation_strategy_template_url,
                    "scheduling_unit_blueprints": [],
                    "task_drafts": []}
    
        @property
        def cached_scheduling_unit_draft_url(self):
            try:
                return self._scheduling_unit_draft_url
            except AttributeError:
                self._scheduling_unit_draft_url = self.post_data_and_get_url(self.SchedulingUnitDraft(), '/scheduling_unit_draft/')
                return self._scheduling_unit_draft_url
    
        def TaskDraft(self, name=None, scheduling_unit_draft_url=None, template_url=None, specifications_doc=None):
            if name is None:
                name = str(uuid.uuid4())
    
            if scheduling_unit_draft_url is None:
                scheduling_unit_draft_url = self.cached_scheduling_unit_draft_url
        
            if template_url is None:
                template_url = self.cached_task_template_url
    
            if specifications_doc is None:
                specifications_doc = self.get_response_as_json_object(template_url+'/default')
    
            return {"name": name,
                    "description": "This is my task draft",
                    "tags": [],
                    "specifications_doc": specifications_doc,
                    "copy_reason": self.django_api_url + '/copy_reason/template',
                    "copies": None,
                    "scheduling_unit_draft": scheduling_unit_draft_url,
                    "specifications_template": template_url,
                    'task_blueprints': [],
                    'produced_by': [],
                    'consumed_by': [],
                    'first_scheduling_relation': [],
                    'second_scheduling_relation': [],
                    "output_pinned": False}
    
        @property
        def cached_task_draft_url(self):
            try:
                return self._task_draft_url
            except AttributeError:
                self._task_draft_url = self.post_data_and_get_url(self.TaskDraft(), '/task_draft/')
                return self._task_draft_url
    
        def TaskRelationDraft(self, producer_url=None, consumer_url=None, template_url=None, input_role_url=None, output_role_url=None, selection_doc=None):
            if producer_url is None:
                producer_url = self.cached_task_draft_url
        
            if consumer_url is None:
                consumer_url = self.cached_task_draft_url
    
            if template_url is None:
                template_url = self.cached_task_relation_selection_template_url
    
            if template_url is None:
                template_url = self.cached_task_relation_selection_template_url
    
            if selection_doc is None:
                selection_doc = self.get_response_as_json_object(template_url+'/default')
    
            if input_role_url is None:
                try:
                    input_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="input"), '/task_connector_type/')
                except:
                    task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
                    input_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="input", task_template_url=task_template_url), '/task_connector_type/')
    
            if output_role_url is None:
                try:
                    output_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="output"), '/task_connector_type/')
                except:
                    task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
                    output_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="output", task_template_url=task_template_url), '/task_connector_type/')
    
            return {"tags": [],
                    "selection_doc": selection_doc,
                    "producer": producer_url,
                    "consumer": consumer_url,
                    "input_role": input_role_url,
                    "output_role": output_role_url,
                    "selection_template": template_url,
                    'related_task_relation_blueprint': []}
        
        def SchedulingUnitBlueprint(self, name="my_scheduling_unit_blueprint", scheduling_unit_draft_url=None, template_url=None, requirements_doc:dict=None):
            if template_url is None:
                template_url = self.cached_scheduling_unit_template_url
    
            if scheduling_unit_draft_url is None:
                scheduling_unit_draft_url = self.cached_scheduling_unit_draft_url
    
            if requirements_doc is None:
                requirements_doc = self.get_response_as_json_object(template_url+'/default')
    
            return {"name": name,
                    "description": "This is my run blueprint",
                    "tags": [],
                    "requirements_doc": requirements_doc,
                    "draft": scheduling_unit_draft_url,
                    "requirements_template": template_url,
                    "task_blueprints": []}
        
        @property
        def cached_scheduling_unit_blueprint_url(self):
            try:
                return self._scheduling_unit_blueprint_url
            except AttributeError:
                self._scheduling_unit_blueprint_url = self.post_data_and_get_url(self.SchedulingUnitBlueprint(), '/scheduling_unit_blueprint/')
                return self._scheduling_unit_blueprint_url
    
    
        def TaskBlueprint(self, name=None, draft_url=None, template_url=None, scheduling_unit_blueprint_url=None, specifications_doc=None):
            if name is None:
                name = str(uuid.uuid4())
    
            if draft_url is None:
                draft_url = self.cached_task_draft_url
        
            if template_url is None:
                template_url = self.cached_task_template_url
        
            if specifications_doc is None:
                specifications_doc = self.get_response_as_json_object(template_url+'/default')
    
            if scheduling_unit_blueprint_url is None:
                scheduling_unit_blueprint_url = self.cached_scheduling_unit_blueprint_url
        
            return {"name": name,
                    "description": "This is my work request blueprint",
                    "tags": [],
                    "specifications_doc": specifications_doc,
                    "draft": draft_url,
                    "specifications_template": template_url,
                    "scheduling_unit_blueprint": scheduling_unit_blueprint_url,
                    "subtasks": [],
                    "produced_by": [],
                    "consumed_by": [],
                    'first_scheduling_relation': [],
                    'second_scheduling_relation': [],
                    "output_pinned": False}
    
        @property
        def cached_task_blueprint_url(self):
            try:
                return self._task_blueprint_url
            except AttributeError:
                self._task_blueprint_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
                return self._task_blueprint_url
    
        def TaskRelationBlueprint(self, draft_url=None, template_url=None, input_role_url=None, output_role_url=None, consumer_url=None, producer_url=None, selection_doc=None):
            if draft_url is None:
                draft_url = self.post_data_and_get_url(self.TaskRelationDraft(), '/task_relation_draft/')
        
            if producer_url is None:
                producer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
        
            if consumer_url is None:
                consumer_url = self.post_data_and_get_url(self.TaskBlueprint(), '/task_blueprint/')
        
            if template_url is None:
                template_url = self.cached_task_relation_selection_template_url
        
            if selection_doc is None:
                selection_doc = self.get_response_as_json_object(template_url+'/default')
    
            if input_role_url is None:
                try:
                    input_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="input"), '/task_connector_type/')
                except:
                    task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
                    input_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="input", task_template_url=task_template_url), '/task_connector_type/')
        
            if output_role_url is None:
                try:
                    output_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="output"), '/task_connector_type/')
                except:
                    task_template_url = self.post_data_and_get_url(self.TaskTemplate(), '/task_template/')
                    output_role_url = self.post_data_and_get_url(self.TaskConnectorType(iotype="output", task_template_url=task_template_url), '/task_connector_type/')
    
            # test data
            return {"tags": [],
                    "selection_doc": selection_doc,
                    "input_role": input_role_url,
                    "output_role": output_role_url,
                    "draft": draft_url,
                    "selection_template": template_url,
                    "producer": producer_url,
                    "consumer": consumer_url}
        
        def SubtaskTemplate(self, name="subtask_template_1", schema=None, subtask_type_url: str=None) -> dict:
            if schema is None:
                schema = minimal_json_schema()
    
            if subtask_type_url is None:
                subtask_type_url = self.django_api_url + '/subtask_type/observation'
    
            return {"type": subtask_type_url,
                           "name": name,
                           "description": 'My one observation',
                           "schema": schema,
                           "realtime": True,
                           "queue": False,
                           "tags": ["TMSS", "TESTING"]}
    
        @property
        def cached_subtask_template_url(self):
            try:
                return self._subtask_template_url
            except AttributeError:
                self._subtask_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/subtask_template/')
                return self._subtask_template_url
    
    
        def TaskSchedulingRelationBlueprint(self, first_url=None, second_url=None, placement="after"):
            
            if first_url is None:
                first_url = self.cached_task_blueprint_url
        
            if second_url is None:
                second_url = self.cached_task_blueprint_url
    
            return {"tags": [],
                    "first": first_url,
                    "second": second_url,
                    "placement": self.django_api_url + '/scheduling_relation_placement/%s'%placement,
                    "time_offset":60}
    
        def TaskSchedulingRelationDraft(self, first_url=None, second_url=None, placement="after"):
            if first_url is None:
                first_url = self.cached_task_draft_url
        
            if second_url is None:
                second_url = self.cached_task_draft_url
            return {"tags": [],
                    "first": first_url,
                    "second": second_url,
                    "placement": self.django_api_url + '/scheduling_relation_placement/%s'%placement,
                    "time_offset":60}
    
        def DataproductSpecificationsTemplate(self, name="my_DataproductSpecificationsTemplate") -> dict:
            return  {"name": name,
                     "description": 'My one date',
                     "schema": minimal_json_schema(),
                     "tags": ["TMSS", "TESTING"]}
        
        def DataproductFeedbackTemplate(self, name="my_DataproductFeedbackTemplate") -> dict:
            return  {"name": name,
                     "description": 'My one date',
                     "schema": minimal_json_schema(),
                     "tags": ["TMSS", "TESTING"]}
        
        def DefaultSubtaskTemplates(self, name=None, template_url=None):
            if template_url is None:
                template_url = self.cached_subtask_template_url
        
            return {"name": name if name else "default_template_%s" % uuid.uuid4(),
                    "template": template_url,
                    "tags": []}
        
        def Cluster(self, name=None):
            return {"name": name if name else "Cluster %s" % uuid.uuid4(),
                    "description": 'My one cluster',
                    "location": "upstairs",
                    "archive_site": False,
                    "tags": ['tmss', 'testing']}
    
        @property
        def cached_cluster_url(self):
            try:
                return self._cluster_url
            except AttributeError:
                self._cluster_url = self.post_data_and_get_url(self.Cluster(), '/cluster/')
                return self._cluster_url
    
    
        def Subtask(self, cluster_url=None, task_blueprint_urls=None, specifications_template_url=None, specifications_doc=None, state:str="defining", start_time: datetime=None, stop_time: datetime=None, raw_feedback:str =None):
            if cluster_url is None:
                cluster_url = self.cached_cluster_url
        
            if task_blueprint_urls is None:
                task_blueprint_urls = [self.cached_task_blueprint_url]
        
            if specifications_template_url is None:
                specifications_template_url = self.cached_subtask_template_url
    
            if specifications_doc is None:
                specifications_doc = self.get_response_as_json_object(specifications_template_url+'/default')
    
            if start_time is None:
                start_time = datetime.utcnow()
    
            if stop_time is None:
                stop_time = start_time + timedelta(minutes=60)
    
            if isinstance(start_time, datetime):
                start_time = start_time.isoformat()
    
            if isinstance(stop_time, datetime):
                stop_time = stop_time.isoformat()
    
            return {"start_time": start_time,
                    "stop_time": stop_time,
                    "state": self.django_api_url + '/subtask_state/%s' % (state,),
                    "specifications_doc": specifications_doc,
                    "task_blueprints": task_blueprint_urls,
                    "specifications_template": specifications_template_url,
                    "tags": ["TMSS", "TESTING"],
                    "cluster": cluster_url,
                    "raw_feedback": raw_feedback}
    
        @property
        def cached_subtask_url(self):
            try:
                return self._subtask_url
            except AttributeError:
                self._subtask_url = self.post_data_and_get_url(self.Subtask(), '/subtask/')
                return self._subtask_url
    
        def SubtaskOutput(self, subtask_url=None, task_blueprint_url=None):
    
            if subtask_url is None:
                subtask_url = self.cached_subtask_url
    
            if task_blueprint_url is None:
                task_blueprint_url = self.cached_task_blueprint_url
    
            return {"subtask": subtask_url,
                    "task_blueprint": task_blueprint_url,
                    "tags": []}
    
        @property
        def cached_subtask_output_url(self):
            try:
                return self._subtask_output_url
            except AttributeError:
                self._subtask_output_url = self.post_data_and_get_url(self.SubtaskOutput(), '/subtask_output/')
                return self._subtask_output_url
    
        def Dataproduct(self, filename="my_filename", directory="/tmp/",
                        specifications_doc=None, specifications_template_url=None,
                        subtask_output_url=None,
                        dataproduct_feedback_doc=None, dataproduct_feedback_template_url=None,
                        dataformat="MeasurementSet", datatype="visibilities",
                        sap_url=None):
    
            if specifications_template_url is None:
                specifications_template_url = self.post_data_and_get_url(self.SubtaskTemplate(), '/dataproduct_specifications_template/')
        
            if specifications_doc is None:
                specifications_doc = self.get_response_as_json_object(specifications_template_url+'/default')
    
            if subtask_output_url is None:
                subtask_output_url = self.cached_subtask_output_url
        
            if dataproduct_feedback_template_url is None:
                dataproduct_feedback_template_url = self.post_data_and_get_url(self.DataproductFeedbackTemplate(), '/dataproduct_feedback_template/')
    
            if dataproduct_feedback_doc is None:
                dataproduct_feedback_doc = self.get_response_as_json_object(dataproduct_feedback_template_url+'/default')
    
            if sap_url is None:
                sap_url = self.post_data_and_get_url(self.SAP(), '/sap/')
    
            return {"filename": filename,
                    "directory": directory,
                    "dataformat": "%s/dataformat/%s" % (self.django_api_url, dataformat),
                    "datatype": "%s/datatype/%s" % (self.django_api_url, datatype),
                    "deleted_since": None,
                    "specifications_doc": specifications_doc,
                    "specifications_template": specifications_template_url,
                    "tags": ["TMSS", "TESTING"],
                    "producer": subtask_output_url,
                    "expected_size": 1234,
                    "size": 123,
                    "feedback_doc": dataproduct_feedback_doc,
                    "feedback_template": dataproduct_feedback_template_url,
                    "sap": sap_url
                    }
        
        @property
        def cached_dataproduct_url(self):
            try:
                return self._dataproduct_url
            except AttributeError:
                self._dataproduct_url = self.post_data_and_get_url(self.Dataproduct(), '/dataproduct/')
                return self._dataproduct_url
    
        def AntennaSet(self, name="antennaset1"):
            return {"name": name,
                    "description": 'My one observation',
                    "station_type": self.django_api_url + '/station_type/core',
                    "rcus": [1,2,3,4,5],
                    "inputs": ['input1', 'input2'],
                    "tags": ['tmss', 'testing']}
        
        def DataproductTransform(self, input_dataproduct_url=None, output_dataproduct_url=None):
            if input_dataproduct_url is None:
                input_dataproduct_url = self.cached_dataproduct_url
        
            if output_dataproduct_url is None:
                output_dataproduct_url = self.cached_dataproduct_url
        
            return {"input": input_dataproduct_url,
                    "output": output_dataproduct_url,
                    "identity": True,
                    "tags": ['tmss', 'testing']}
        
        def DataproductHash(self, hash_algorithm_url=None, hash="my_hash", dataproduct_url=None):
            if hash_algorithm_url is None:
                hash_algorithm_url = self.django_api_url + '/hash_algorithm/md5'
        
            if dataproduct_url is None:
                dataproduct_url = self.cached_dataproduct_url
        
            return {"dataproduct": dataproduct_url,
                    "hash_algorithm": hash_algorithm_url,
                    "hash": hash,
                    "tags": ['tmss', 'testing']}
        
        
        def DataproductArchiveInfo(self, storage_ticket="my_storage_ticket", dataproduct_url=None):
            if dataproduct_url is None:
                dataproduct_url = self.cached_dataproduct_url
        
            return {"dataproduct": dataproduct_url,
                    "storage_ticket": storage_ticket,
                    "public_since": datetime.utcnow().isoformat(),
                    "corrupted_since": datetime.utcnow().isoformat(),
                    "tags": ['tmss', 'testing']}
        
        def SubtaskInput(self, subtask_url=None, task_relation_blueprint_url=None, dataproduct_urls=None, subtask_output_url=None, task_relation_selection_template_url=None, selection_doc=None):
            if subtask_url is None:
                subtask_url = self.cached_subtask_url
        
            if task_relation_blueprint_url is None:
                task_relation_blueprint_url = self.post_data_and_get_url(self.TaskRelationBlueprint(), '/task_relation_blueprint/')
        
            if dataproduct_urls is None:
                dataproduct_urls = [self.cached_dataproduct_url]
    
            if subtask_output_url is None:
                subtask_output_url = self.cached_subtask_output_url
        
            if task_relation_selection_template_url is None:
                task_relation_selection_template_url = self.cached_task_relation_selection_template_url
    
            if selection_doc is None:
                selection_doc = self.get_response_as_json_object(task_relation_selection_template_url+'/default')
    
            return {"subtask": subtask_url,
                    "task_relation_blueprint": task_relation_blueprint_url,
                    "producer": subtask_output_url,
                    "dataproducts": dataproduct_urls,
                    "selection_doc": selection_doc,
                    "selection_template": task_relation_selection_template_url,
                    "tags": []}
        
        def Filesystem(self, name="my_Filesystem", cluster_url=None):
            if cluster_url is None:
                cluster_url = self.cached_cluster_url
        
            return {"name": name,
                    "description": 'My one filesystem',
                    "capacity": 1111111111,
                    "cluster": cluster_url,
                    "directory": '/',
                    "tags": ['tmss', 'testing']}
    
        def SAPTemplate(self):
            return {"name": "my_sap_template" + str(uuid.uuid4()),
                    "description": 'My SAP test template',
                    "schema": minimal_json_schema(),
                    "tags": ["TMSS", "TESTING"]}
    
        def SAP(self, specifications_template_url=None, specifications_doc=None):
    
            if specifications_template_url is None:
                specifications_template_url = self.post_data_and_get_url(self.SAPTemplate(), '/sap_template/')
    
            if specifications_doc is None:
                specifications_doc = self.get_response_as_json_object(specifications_template_url + '/default')
    
            return {"specifications_doc": specifications_doc,
                    "specifications_template": specifications_template_url,
                    "tags": ['tmss', 'testing']}
    
        def Reservation(self, name="My Reservation", duration=None, start_time=None, project_url=None,
                        specifications_template_url=None, specifications_doc=None) -> dict:
    
            if project_url is None:
                project_url = self.cached_project_url
            if start_time is None:
                start_time = datetime.utcnow() + timedelta(hours=12)
    
            if duration is None:
                stop_time = None
            else:
                stop_time = start_time + timedelta(seconds=duration)
    
            if specifications_template_url is None:
                specifications_template_url = self.post_data_and_get_url(self.ReservationTemplate(), '/reservation_template/')
    
            if specifications_doc is None:
                specifications_doc = self.get_response_as_json_object(specifications_template_url + '/default')
    
            if isinstance(start_time, datetime):
                start_time = start_time.isoformat()
    
            if isinstance(stop_time, datetime):
                stop_time = stop_time.isoformat()
    
            return {"name": name,
                    "project": project_url,
                    "description": "Test Reservation",
                    "tags": ["TMSS", "TESTING"],
                    "start_time": start_time,
                    "stop_time": stop_time, # can be None
                    "specifications_doc": specifications_doc,
                    "specifications_template": specifications_template_url}
    
        def ProjectPermission(self, name=None, GET=None, PUT=None, PATCH=None, DELETE=None, POST=None) -> dict:
            if name is None:
                name = 'MyProjectPermission_%s' % uuid.uuid4()
    
            return {'name': name,
                    'GET': GET or [],
                    'PUT': PUT or [],
                    'PATCH': PATCH or [],
                    'DELETE': DELETE or [],
                    'POST': POST or []}
    
        def wipe_cache(self):
            cached_url_attributes = [attr for attr in self.__dict__.keys() if attr.startswith('_') and attr.endswith('_url')]
            for attr in cached_url_attributes:
                if hasattr(self, attr):
                    delattr(self, attr)