diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py index 28cd71dabbeb2674f6c4f7f489b806a350f515df..a76205f35621e2c9712c1fcc5b384f84976e834c 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/lofar_viewset.py @@ -73,6 +73,54 @@ class LOFARViewSet(viewsets.ModelViewSet): filter_fields = '__all__' ordering_fields = '__all__' + # Override extra_action_permission_specs to change the way that project-based permission checks are performed on + # extra actions on this viewset. This is required when an action operates on a different model and the permission + # check needs to check the there-referenced project. Or when the reference is handed as a url parameter instead pf + # the POST data. Format: + # {<extra_action_name>: {'model': models.<_name_of_model_this_action_operates_on>, + # 'project_ref_override_pk_url_param': <url_parameter_name>, + # 'project_ref_override_attr': <path_to_project_attribute_name>, + # 'project_ref_override_model': models.<name_of_model_the_path_to_project_attribute_refers_to>}} + extra_action_permission_specs = {} + + @staticmethod + def _user_has_permission_for_any_project(user, method, permission_name): + permitted_project_roles = get_project_roles_with_permission(permission_name, method) + user_project_roles = set([project_role['role'] for project_role in get_project_roles_for_user(user)]) + for role in permissions.ProjectRole.objects.filter(value__in=list(user_project_roles)).all(): + if role in permitted_project_roles: + return True + return False + + def _get_permitted_extra_actions(self, request): + # determined the extra actions a user is allowed to perform (these may have different permissions than the base + # model, so we list which ones are allowed next to the allowed REST methods. This should be fine as actions only + # support either POST or GET so there is no ambiguity.) + + permission_name = self.serializer_class.Meta.model.__name__.lower() + allowed_extra_actions = [] + extra_actions = self.get_extra_actions() + actual_method = request.method # keep the actually requested method so that we can set it when we are done + actual_action = self.action # keep the actually requested action so that we can set it when we are done + for extra_action in extra_actions: + action = extra_action.__name__ + for method in extra_action.mapping: + self.action = action # pretend to do something else and check if we are allowed to do that + request.method = method.upper() # pretend to do something else and check if we are allowed to do that + if TMSSPermissions().has_permission(request=request, view=self): + if action not in allowed_extra_actions: + allowed_extra_actions.append(action) + # show extra action as permitted if the user is allowed to perform it in case the correct project is + # referenced (which then only works if the user posts data that links the object to the right project). + if actual_action == 'list' and action not in allowed_extra_actions: + action_permission_name = '%s-%s' % (permission_name, action) + if self._user_has_permission_for_any_project(request.user, method.upper(), action_permission_name): + allowed_extra_actions.append(action) + self.action = actual_action + request.method = actual_method + + return allowed_extra_actions + def _get_permitted_methods(self, request, pk=None): # Django returns an "Allow" header that reflects what methods the model supports in principle, but not what # the current user is actually has permission to perform. We use the "Access-Control-Allow-Methods" header @@ -96,21 +144,7 @@ class LOFARViewSet(viewsets.ModelViewSet): allowed_methods.append(method) request.method = actual_method - # check allowed extra actions (these may have different permissions than the base model, so we list which ones - # are allowed next to the allowed REST methods. This should be fine as actions only support either POST or GET - # so there is no ambiguity.) - extra_actions = self.get_extra_actions() - actual_action = self.action # keep the actually requested action so that we can set it when we are done - for extra_action in extra_actions: - action = extra_action.__name__ - for method in extra_action.mapping: - self.action = action # pretend to do something else and check if we are allowed to do that - request.method = method.upper() # pretend to do something else and check if we are allowed to do that - if TMSSPermissions().has_permission(request=request, view=self): - if action not in allowed_methods: - allowed_methods.append(action) - self.action = actual_action - request.method = actual_method + permission_name = self.serializer_class.Meta.model.__name__.lower() # add POST permission if project permission allows creation if correct project is referenced if self.action == 'list' and 'POST' not in allowed_methods: @@ -118,18 +152,14 @@ class LOFARViewSet(viewsets.ModelViewSet): # generally. Hence we need to check if there is the theoretical possibility of POSTing an object based # on project permissions (which then only works if the user posts data that links the object to the # right project). - permission_name = self.serializer_class.Meta.model.__name__.lower() - permitted_project_roles = get_project_roles_with_permission(permission_name, 'POST') - user_project_roles = set([project_role['role'] for project_role in get_project_roles_for_user(request.user)]) - for role in permissions.ProjectRole.objects.filter(value__in=list(user_project_roles)).all(): - if role in permitted_project_roles: - allowed_methods.append('POST') - # optimization: we grant POST/PUT/PATCH/DELETE permissions en block, so we only need to check one - # of them and skip the expensive actual permission check for the rest. Note: this only affects the - # header and the permission will still be properly checked when the user performs one of these. - allowed_methods += ['PUT', 'PATCH', 'DELETE'] - break - + if self._user_has_permission_for_any_project(request.user, 'POST', permission_name): + allowed_methods.append('POST') + # optimization: we grant POST/PUT/PATCH/DELETE permissions en block, so we only need to check one + # of them and skip the expensive actual permission check for the rest. Note: this only affects the + # header and the permission will still be properly checked when the user performs one of these. + allowed_methods += ['PUT', 'PATCH', 'DELETE'] + + allowed_methods += self._get_permitted_extra_actions(request) return allowed_methods @swagger_auto_schema(responses={403: 'forbidden'}) diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py index 7778f77c5fae60fa28a47a97408b1099105dbe53..458ce3447fa0b31597fcc1cfd59b059ddecb0509 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/permissions.py @@ -15,6 +15,7 @@ from lofar.common import isProductionEnvironment, isDevelopmentEnvironment from django.db.models import Q from django.contrib.auth import get_user_model User = get_user_model() +from django.shortcuts import get_object_or_404 # # Permissions @@ -24,6 +25,7 @@ User = get_user_model() # A. apply a filter to prevent object without permission to be included in the list with full details, or # B. customize get_queryset on the view to call check_object_permissions. + def get_project_roles_for_user(user): if not isProductionEnvironment(): @@ -70,7 +72,7 @@ def get_project_roles_for_user(user): def get_project_roles_with_permission(permission_name, method='GET'): try: - logger.info('checking permission name=%s action=%s' % (permission_name, method)) + logger.info('checking permission name=%s method=%s' % (permission_name, method)) # ...retrieve ProjectPermission object project_permission = models.ProjectPermission.objects.get(name=permission_name) # ...determine what project roles are allowed to perform the requested action @@ -154,20 +156,40 @@ class IsProjectMember(drf_permissions.DjangoObjectPermissions): # Note: the Viewflow actions are currently create actions but they do not actually create anything # (and do not contain the expected data to create an instance of the underlying model, but instead e.g. the # data required to perform a workflow step). So we exclude the offending views here. - if view.action == 'create' and request.data \ + regular_create_action = view.action == 'create' and request.data \ and not 'SchedulingUnitTaskExecuteViewSet' in str(view) \ - and not 'SchedulingUnitTaskAssignViewSet' in str(view): + and not 'SchedulingUnitTaskAssignViewSet' in str(view) + extra_action_specs = view.extra_action_permission_specs.get(view.action, None) if hasattr(view, 'extra_action_permission_specs') else None + create_via_extra_action = extra_action_specs and (request.data or + extra_action_specs['project_ref_override_pk_url_param'] in request.query_params) + + if regular_create_action or create_via_extra_action: obj = None - if view.serializer_class.Meta.model == models.Project: + + # Some extra actions create objects on other models. By default, we use the project reference of the base + # model of the action to check whether the user has one of the required roles in this project. On base models + # that are project-independent (e.g. SchedulingUnitObservingStrategyTemplate), we instead check the target + # model for the path to project. + model = extra_action_specs.get('model') if create_via_extra_action else view.serializer_class.Meta.model + + if model == models.Project: return False # project creation solely depends on system role - if hasattr(view.serializer_class.Meta.model, 'path_to_project'): - attrs = view.serializer_class.Meta.model.path_to_project.split('__') - elif hasattr(view.serializer_class.Meta.model, 'project'): + if hasattr(model, 'path_to_project'): + attrs = model.path_to_project.split('__') + elif hasattr(model, 'project'): attrs = ['project'] else: - raise AttributeError(f'The model {view.serializer_class.Meta.model} requires an attribute "project" or "path_to_project" to check project permissions.') + raise AttributeError(f'The model {model} requires an attribute "project" or "path_to_project" to check project permissions.') for attr in attrs: obj_resolved_in_this_iteration = False + if extra_action_specs \ + and attr == extra_action_specs.get('project_ref_override_attr') and \ + extra_action_specs.get('project_ref_override_pk_url_param') in request.query_params: + # some extra actions do not carry all info in their POSTed data, but include a url parameter + # reference to the related project. Use that to resolve the project here. + obj = get_object_or_404(extra_action_specs.get('project_ref_override_model'), + pk=request.query_params[extra_action_specs.get('project_ref_override_pk_url_param')]) + continue if not obj: # on first iteration, the referenced object needs to be resolved from POSTed FQDN obj_ref = request.data[attr] diff --git a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py index e27b5d7f5cbc71c0471574ded29957273ca44e5b..7579c9e41db1933155804dc44213ed4a34ad89fb 100644 --- a/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py +++ b/SAS/TMSS/backend/src/tmss/tmssapp/viewsets/specification.py @@ -97,6 +97,12 @@ class SchedulingUnitObservingStrategyTemplateViewSet(AbstractTemplateViewSet): queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all() serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer + # override default method, to make sure that we perform the correct project-based permission checks + extra_action_permission_specs = {'create_scheduling_unit': {'model': models.SchedulingUnitDraft, + 'project_ref_override_pk_url_param': 'scheduling_set_id', + 'project_ref_override_attr': 'scheduling_set', + 'project_ref_override_model': models.SchedulingSet}} + @swagger_auto_schema(responses={status.HTTP_201_CREATED: 'The newly created scheduling unit', status.HTTP_403_FORBIDDEN: 'forbidden'}, operation_description="Create a new SchedulingUnit based on this SchedulingUnitObservingStrategyTemplate, with the given <name> and <description> and make it a child of the given <scheduling_set_id>", diff --git a/SAS/TMSS/backend/test/t_permissions_project_roles.py b/SAS/TMSS/backend/test/t_permissions_project_roles.py index a3d9a6d9c0fbf4790f705ca818a50fedd178e6dc..9b4b6933f0be70fef233a52984817950fd589bf4 100755 --- a/SAS/TMSS/backend/test/t_permissions_project_roles.py +++ b/SAS/TMSS/backend/test/t_permissions_project_roles.py @@ -93,11 +93,15 @@ class ProjectPermissionTestCase(TestCase): su_template_url = client.get_schedulingunit_template("scheduling unit")['url'] # user is shared_support - cls.scheduling_set_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingSet(project_url=cls.project_shared_support_url), '/scheduling_set/') + set = cls.test_data_creator.post_data_and_get_response_as_json_object(cls.test_data_creator.SchedulingSet(project_url=cls.project_shared_support_url), '/scheduling_set/') + cls.scheduling_set_shared_support_url = set['url'] + cls.scheduling_set_shared_support_id = set['id'] cls.scheduling_unit_draft_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(template_url=su_template_url, scheduling_set_url=cls.scheduling_set_shared_support_url), '/scheduling_unit_draft/') # user is contact - cls.scheduling_set_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingSet(project_url=cls.project_contact_url), '/scheduling_set/') + set = cls.test_data_creator.post_data_and_get_response_as_json_object(cls.test_data_creator.SchedulingSet(project_url=cls.project_contact_url), '/scheduling_set/') + cls.scheduling_set_contact_url = set['url'] + cls.scheduling_set_contact_id = set['id'] cls.scheduling_unit_draft_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(scheduling_set_url=cls.scheduling_set_contact_url), '/scheduling_unit_draft/') # user has no role @@ -111,9 +115,12 @@ class ProjectPermissionTestCase(TestCase): cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='taskdraft-create_task_blueprint', POST=[shared_support_role_url]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project', GET=[shared_support_role_url], POST=[]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project-my_roles', GET=[shared_support_role_url, friend_of_project_role_url]), '/project_permission/') + cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='schedulingunitobservingstrategytemplate-create_scheduling_unit', GET=[shared_support_role_url], POST=[shared_support_role_url]), '/project_permission/') cls.task_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.TaskTemplate(), '/task_template/') + cls.scheduling_unit_observing_strategy_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitObservingStrategyTemplate(scheduling_unit_template_url=su_template_url), '/scheduling_unit_observing_strategy_template/') + # create a new test_data_creator with the regular 'paulus' user, not superuser as in other tests, so that project permissions of tht user are checked. cls.auth = requests.auth.HTTPBasicAuth('paulus', 'pauluspass') cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, cls.auth) @@ -252,6 +259,36 @@ class ProjectPermissionTestCase(TestCase): self.assertEqual(r.status_code, 403) self.assertNotIn('Access-Control-Allow-Methods', r.headers) + # SchedulingUnitObservingStrategyTemplate.actions + + def test_strategy_template_create_SU_draft_raises_error_if_user_has_no_permission_for_related_project(self): + # create SU draft via the extra action on strategy template, to test that the path to project of the SU draft model is used + # make sure we cannot create a draft in a scheduling set connected to a project where we have no role + POST_and_assert_expected_response(self, self.scheduling_unit_observing_strategy_template_url + '/create_scheduling_unit/', + {}, 403, {}, auth=self.auth, params={'scheduling_set_id': self.scheduling_set_contact_id}) + + def test_strategy_template_create_SU_draft_works_if_user_has_permission_for_related_project(self): + # create SU draft via the extra action on strategy template, to test that the path to project of the SU draft model is used + # make sure we can create a draft in a scheduling set connected to a project where we have 'shared_support' role + POST_and_assert_expected_response(self, self.scheduling_unit_observing_strategy_template_url + '/create_scheduling_unit/', + {}, 201, {}, auth=self.auth, params={'scheduling_set_id': self.scheduling_set_shared_support_id}) + + def test_access_control_allow_header_reflects_user_permissions_in_strategy_template_list_view(self): + # the frontend relies on the list view to include an extra action if the user has permission to execute it. + # Since the user has permission to create scheduling units in case they belong to the right project, the action + # should be listed here. + with requests.Session() as session: + session.verify = False + session.auth = self.auth + + r = session.get(BASE_URL + '/scheduling_unit_observing_strategy_template/') + self.assertEqual(r.status_code, 200) + allowed_methods = r.headers['Access-Control-Allow-Methods'].split(', ') + self.assertIn('create_scheduling_unit', allowed_methods) + + + # Project + def test_project_get_friend_returns_correct_user(self): """ Note: This test relies on real data from Keycloak. diff --git a/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py b/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py index 4f70c90423820f2529077cdae49d1d404615ce8b..a219ee978057c7683cafc6a3189accb6517f7bed 100644 --- a/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py +++ b/SAS/TMSS/backend/test/tmss_test_environment_unittest_setup.py @@ -58,22 +58,22 @@ import lofar.sas.tmss.tmss.settings as TMSS_SETTINGS # by default we assert on requests taking longer than this timeout DEFAULT_REQUEST_TIMEOUT=10 -def _call_API_and_assert_expected_response(test_instance, url, call, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT): +def _call_API_and_assert_expected_response(test_instance, url, call, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT, params=None): """ Call API method on the provided url and assert the expected code is returned and the expected content is in the response content :return: response as dict. This either contains the data of an entry or error details. If JSON cannot be parsed, return string. """ _start_request_timestamp = datetime.datetime.utcnow() if call == 'PUT': - response = requests.put(url, json=data, auth=auth, timeout=timeout) + response = requests.put(url, json=data, auth=auth, timeout=timeout, params=params) elif call == 'POST': - response = requests.post(url, json=data, auth=auth, timeout=timeout) + response = requests.post(url, json=data, auth=auth, timeout=timeout, params=params) elif call == 'GET': - response = requests.get(url, auth=auth, allow_redirects=False) + response = requests.get(url, auth=auth, allow_redirects=False, params=params) elif call == 'PATCH': - response = requests.patch(url, json=data, auth=auth, timeout=timeout) + response = requests.patch(url, json=data, auth=auth, timeout=timeout, params=params) elif call == 'DELETE': - response = requests.delete(url, auth=auth, timeout=timeout) + response = requests.delete(url, auth=auth, timeout=timeout, params=params) else: raise ValueError("The provided call '%s' is not a valid API method choice" % call) @@ -131,12 +131,12 @@ def PUT_and_assert_expected_response(test_instance, url, data, expected_code, ex return r_dict -def POST_and_assert_expected_response(test_instance, url, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT): +def POST_and_assert_expected_response(test_instance, url, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT, params=None): """ POST data on url and assert the expected code is returned and the expected content is in the response content :return: response dict """ - r_dict = _call_API_and_assert_expected_response(test_instance, url, 'POST', data, expected_code, expected_content, auth=auth, timeout=timeout) + r_dict = _call_API_and_assert_expected_response(test_instance, url, 'POST', data, expected_code, expected_content, auth=auth, timeout=timeout, params=params) return r_dict