From 577505acf76d069a3898d841323dc34bf1a2967c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20K=C3=BCnsem=C3=B6ller?= <jkuensem@physik.uni-bielefeld.de> Date: Wed, 26 Apr 2023 23:12:39 +0200 Subject: [PATCH] TMSS-2305: deal with extra actions that create on other models and scheduling_set_id url parameter --- .../tmss/tmssapp/viewsets/lofar_viewset.py | 10 +++-- .../src/tmss/tmssapp/viewsets/permissions.py | 32 ++++++++++---- .../test/t_permissions_project_roles.py | 42 ++++++++++++++++++- .../tmss_test_environment_unittest_setup.py | 16 +++---- 4 files changed, 80 insertions(+), 20 deletions(-) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py index 28cd71dabbe..3b0bb982d6d 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py @@ -106,9 +106,13 @@ class LOFARViewSet(viewsets.ModelViewSet): for method in extra_action.mapping: self.action = action # pretend to do something else and check if we are allowed to do that request.method = method.upper() # pretend to do something else and check if we are allowed to do that - if TMSSPermissions().has_permission(request=request, view=self): - if action not in allowed_methods: - allowed_methods.append(action) + try: + if TMSSPermissions().has_permission(request=request, view=self): + if action not in allowed_methods: + allowed_methods.append(action) + except: + pass + # todo: check extra actions that create on other models if allowerd when correct project is referenced self.action = actual_action request.method = actual_method diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py index 7778f77c5fa..796512542fe 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py @@ -24,6 +24,12 @@ User = get_user_model() # A. apply a filter to prevent object without permission to be included in the list with full details, or # B. customize get_queryset on the view to call check_object_permissions. +# This dictionary maps extra actions on the model that should be considered for the project reference when checking +# project-based permissions. +# todo: discuss whether this reference should be added to the ProjectPermission table, so that we can populate it +# together with the required project roles. +extra_action_project_related_model = {'create_scheduling_unit': models.SchedulingUnitDraft} + def get_project_roles_for_user(user): if not isProductionEnvironment(): @@ -70,7 +76,7 @@ def get_project_roles_for_user(user): def get_project_roles_with_permission(permission_name, method='GET'): try: - logger.info('checking permission name=%s action=%s' % (permission_name, method)) + logger.info('checking permission name=%s method=%s' % (permission_name, method)) # ...retrieve ProjectPermission object project_permission = models.ProjectPermission.objects.get(name=permission_name) # ...determine what project roles are allowed to perform the requested action @@ -154,20 +160,32 @@ class IsProjectMember(drf_permissions.DjangoObjectPermissions): # Note: the Viewflow actions are currently create actions but they do not actually create anything # (and do not contain the expected data to create an instance of the underlying model, but instead e.g. the # data required to perform a workflow step). So we exclude the offending views here. - if view.action == 'create' and request.data \ + if (view.action == 'create' or view.action in extra_action_project_related_model) and request.data is not None\ and not 'SchedulingUnitTaskExecuteViewSet' in str(view) \ and not 'SchedulingUnitTaskAssignViewSet' in str(view): obj = None - if view.serializer_class.Meta.model == models.Project: + + # Some extra actions create objects on other models. By default, we use the project reference of the base + # model of the action to check whether the user has one of the required roles in this project. On base models + # that are project-independent (e.g. SchedulingUnitObservingStrategyTemplate), we instead check the target + # model for the path to project. + model = extra_action_project_related_model.get(view.action, view.serializer_class.Meta.model) + + if model == models.Project: return False # project creation solely depends on system role - if hasattr(view.serializer_class.Meta.model, 'path_to_project'): - attrs = view.serializer_class.Meta.model.path_to_project.split('__') - elif hasattr(view.serializer_class.Meta.model, 'project'): + if hasattr(model, 'path_to_project'): + attrs = model.path_to_project.split('__') + elif hasattr(model, 'project'): attrs = ['project'] else: - raise AttributeError(f'The model {view.serializer_class.Meta.model} requires an attribute "project" or "path_to_project" to check project permissions.') + raise AttributeError(f'The model {model} requires an attribute "project" or "path_to_project" to check project permissions.') for attr in attrs: obj_resolved_in_this_iteration = False + if attr == 'scheduling_set' and 'scheduling_set_id' in request.query_params: + # some extra actions do not carry all info in their POSTed data, but include references as url parameters + from django.shortcuts import get_object_or_404 + obj = get_object_or_404(models.SchedulingSet, pk=request.query_params['scheduling_set_id']) + continue if not obj: # on first iteration, the referenced object needs to be resolved from POSTed FQDN obj_ref = request.data[attr] diff --git a/SAS/TMSS/backend/test/t_permissions_project_roles.py b/SAS/TMSS/backend/test/t_permissions_project_roles.py index a3d9a6d9c0f..a3d677c77b8 100755 --- a/SAS/TMSS/backend/test/t_permissions_project_roles.py +++ b/SAS/TMSS/backend/test/t_permissions_project_roles.py @@ -93,11 +93,15 @@ class ProjectPermissionTestCase(TestCase): su_template_url = client.get_schedulingunit_template("scheduling unit")['url'] # user is shared_support - cls.scheduling_set_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingSet(project_url=cls.project_shared_support_url), '/scheduling_set/') + set = cls.test_data_creator.post_data_and_get_response_as_json_object(cls.test_data_creator.SchedulingSet(project_url=cls.project_shared_support_url), '/scheduling_set/') + cls.scheduling_set_shared_support_url = set['url'] + cls.scheduling_set_shared_support_id = set['id'] cls.scheduling_unit_draft_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(template_url=su_template_url, scheduling_set_url=cls.scheduling_set_shared_support_url), '/scheduling_unit_draft/') # user is contact - cls.scheduling_set_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingSet(project_url=cls.project_contact_url), '/scheduling_set/') + set = cls.test_data_creator.post_data_and_get_response_as_json_object(cls.test_data_creator.SchedulingSet(project_url=cls.project_contact_url), '/scheduling_set/') + cls.scheduling_set_contact_url = set['url'] + cls.scheduling_set_contact_id = set['id'] cls.scheduling_unit_draft_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(scheduling_set_url=cls.scheduling_set_contact_url), '/scheduling_unit_draft/') # user has no role @@ -111,9 +115,12 @@ class ProjectPermissionTestCase(TestCase): cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='taskdraft-create_task_blueprint', POST=[shared_support_role_url]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project', GET=[shared_support_role_url], POST=[]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project-my_roles', GET=[shared_support_role_url, friend_of_project_role_url]), '/project_permission/') + cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='schedulingunitobservingstrategytemplate-create_scheduling_unit', GET=[shared_support_role_url], POST=[shared_support_role_url]), '/project_permission/') cls.task_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.TaskTemplate(), '/task_template/') + cls.scheduling_unit_observing_strategy_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitObservingStrategyTemplate(scheduling_unit_template_url=su_template_url), '/scheduling_unit_observing_strategy_template/') + # create a new test_data_creator with the regular 'paulus' user, not superuser as in other tests, so that project permissions of tht user are checked. cls.auth = requests.auth.HTTPBasicAuth('paulus', 'pauluspass') cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, cls.auth) @@ -252,6 +259,37 @@ class ProjectPermissionTestCase(TestCase): self.assertEqual(r.status_code, 403) self.assertNotIn('Access-Control-Allow-Methods', r.headers) + # SchedulingUnitObservingStrategyTemplate.actions + + def test_strategy_template_create_SU_draft_raises_error_if_user_has_no_permission_for_related_project(self): + # create SU draft via the extra action on strategy template, to test that the path to project of the SU draft model is used + # make sure we cannot create a draft in a scheduling set connected to a project where we have no role + POST_and_assert_expected_response(self, self.scheduling_unit_observing_strategy_template_url + '/create_scheduling_unit/', + {}, 403, {}, auth=self.auth, params={'scheduling_set_id': self.scheduling_set_contact_id}) + + def test_strategy_template_create_SU_draft_works_if_user_has_permission_for_related_project(self): + # create SU draft via the extra action on strategy template, to test that the path to project of the SU draft model is used + # make sure we can create a draft in a scheduling set connected to a project where we have 'shared_support' role + POST_and_assert_expected_response(self, self.scheduling_unit_observing_strategy_template_url + '/create_scheduling_unit/', + {}, 201, {}, auth=self.auth, params={'scheduling_set_id': self.scheduling_set_shared_support_id}) + + @unittest.skip('fix LOFARViewSet with check if extra action can be executed when correct project is referenced') + def test_access_control_allow_header_reflects_user_permissions_in_strategy_template_list_view(self): + # the frontend relies on the list view to include an extra action if the user has permission to execute it. + # Since the user has permission to create scheduling units in case they belong to the right project, the action + # should be listed here. + with requests.Session() as session: + session.verify = False + session.auth = self.auth + + r = session.get(BASE_URL + '/scheduling_unit_observing_strategy_template/') + self.assertEqual(r.status_code, 200) + allowed_methods = r.headers['Access-Control-Allow-Methods'].split(', ') + self.assertIn('create_scheduling_unit', allowed_methods) + + + # Project + def test_project_get_friend_returns_correct_user(self): """ Note: This test relies on real data from Keycloak. diff --git a/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py b/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py index 4f70c904238..a219ee97805 100644 --- a/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py +++ b/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py @@ -58,22 +58,22 @@ import lofar.sas.tmss.tmss.settings as TMSS_SETTINGS # by default we assert on requests taking longer than this timeout DEFAULT_REQUEST_TIMEOUT=10 -def _call_API_and_assert_expected_response(test_instance, url, call, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT): +def _call_API_and_assert_expected_response(test_instance, url, call, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT, params=None): """ Call API method on the provided url and assert the expected code is returned and the expected content is in the response content :return: response as dict. This either contains the data of an entry or error details. If JSON cannot be parsed, return string. """ _start_request_timestamp = datetime.datetime.utcnow() if call == 'PUT': - response = requests.put(url, json=data, auth=auth, timeout=timeout) + response = requests.put(url, json=data, auth=auth, timeout=timeout, params=params) elif call == 'POST': - response = requests.post(url, json=data, auth=auth, timeout=timeout) + response = requests.post(url, json=data, auth=auth, timeout=timeout, params=params) elif call == 'GET': - response = requests.get(url, auth=auth, allow_redirects=False) + response = requests.get(url, auth=auth, allow_redirects=False, params=params) elif call == 'PATCH': - response = requests.patch(url, json=data, auth=auth, timeout=timeout) + response = requests.patch(url, json=data, auth=auth, timeout=timeout, params=params) elif call == 'DELETE': - response = requests.delete(url, auth=auth, timeout=timeout) + response = requests.delete(url, auth=auth, timeout=timeout, params=params) else: raise ValueError("The provided call '%s' is not a valid API method choice" % call) @@ -131,12 +131,12 @@ def PUT_and_assert_expected_response(test_instance, url, data, expected_code, ex return r_dict -def POST_and_assert_expected_response(test_instance, url, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT): +def POST_and_assert_expected_response(test_instance, url, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT, params=None): """ POST data on url and assert the expected code is returned and the expected content is in the response content :return: response dict """ - r_dict = _call_API_and_assert_expected_response(test_instance, url, 'POST', data, expected_code, expected_content, auth=auth, timeout=timeout) + r_dict = _call_API_and_assert_expected_response(test_instance, url, 'POST', data, expected_code, expected_content, auth=auth, timeout=timeout, params=params) return r_dict -- GitLab