Skip to content
Snippets Groups Projects
Commit 467889c8 authored by Jörn Künsemöller's avatar Jörn Künsemöller
Browse files

Merge branch 'TMSS-2305' into 'master'

Resolve TMSS-2305

Closes TMSS-2305

See merge request !1079
parents e0e37bd0 85b74d44
No related branches found
No related tags found
1 merge request!1079Resolve TMSS-2305
...@@ -73,6 +73,54 @@ class LOFARViewSet(viewsets.ModelViewSet): ...@@ -73,6 +73,54 @@ class LOFARViewSet(viewsets.ModelViewSet):
filter_fields = '__all__' filter_fields = '__all__'
ordering_fields = '__all__' ordering_fields = '__all__'
# Override extra_action_permission_specs to change the way that project-based permission checks are performed on
# extra actions on this viewset. This is required when an action operates on a different model and the permission
# check needs to check the there-referenced project. Or when the reference is handed as a url parameter instead pf
# the POST data. Format:
# {<extra_action_name>: {'model': models.<_name_of_model_this_action_operates_on>,
# 'project_ref_override_pk_url_param': <url_parameter_name>,
# 'project_ref_override_attr': <path_to_project_attribute_name>,
# 'project_ref_override_model': models.<name_of_model_the_path_to_project_attribute_refers_to>}}
extra_action_permission_specs = {}
@staticmethod
def _user_has_permission_for_any_project(user, method, permission_name):
permitted_project_roles = get_project_roles_with_permission(permission_name, method)
user_project_roles = set([project_role['role'] for project_role in get_project_roles_for_user(user)])
for role in permissions.ProjectRole.objects.filter(value__in=list(user_project_roles)).all():
if role in permitted_project_roles:
return True
return False
def _get_permitted_extra_actions(self, request):
# determined the extra actions a user is allowed to perform (these may have different permissions than the base
# model, so we list which ones are allowed next to the allowed REST methods. This should be fine as actions only
# support either POST or GET so there is no ambiguity.)
permission_name = self.serializer_class.Meta.model.__name__.lower()
allowed_extra_actions = []
extra_actions = self.get_extra_actions()
actual_method = request.method # keep the actually requested method so that we can set it when we are done
actual_action = self.action # keep the actually requested action so that we can set it when we are done
for extra_action in extra_actions:
action = extra_action.__name__
for method in extra_action.mapping:
self.action = action # pretend to do something else and check if we are allowed to do that
request.method = method.upper() # pretend to do something else and check if we are allowed to do that
if TMSSPermissions().has_permission(request=request, view=self):
if action not in allowed_extra_actions:
allowed_extra_actions.append(action)
# show extra action as permitted if the user is allowed to perform it in case the correct project is
# referenced (which then only works if the user posts data that links the object to the right project).
if actual_action == 'list' and action not in allowed_extra_actions:
action_permission_name = '%s-%s' % (permission_name, action)
if self._user_has_permission_for_any_project(request.user, method.upper(), action_permission_name):
allowed_extra_actions.append(action)
self.action = actual_action
request.method = actual_method
return allowed_extra_actions
def _get_permitted_methods(self, request, pk=None): def _get_permitted_methods(self, request, pk=None):
# Django returns an "Allow" header that reflects what methods the model supports in principle, but not what # Django returns an "Allow" header that reflects what methods the model supports in principle, but not what
# the current user is actually has permission to perform. We use the "Access-Control-Allow-Methods" header # the current user is actually has permission to perform. We use the "Access-Control-Allow-Methods" header
...@@ -96,21 +144,7 @@ class LOFARViewSet(viewsets.ModelViewSet): ...@@ -96,21 +144,7 @@ class LOFARViewSet(viewsets.ModelViewSet):
allowed_methods.append(method) allowed_methods.append(method)
request.method = actual_method request.method = actual_method
# check allowed extra actions (these may have different permissions than the base model, so we list which ones permission_name = self.serializer_class.Meta.model.__name__.lower()
# are allowed next to the allowed REST methods. This should be fine as actions only support either POST or GET
# so there is no ambiguity.)
extra_actions = self.get_extra_actions()
actual_action = self.action # keep the actually requested action so that we can set it when we are done
for extra_action in extra_actions:
action = extra_action.__name__
for method in extra_action.mapping:
self.action = action # pretend to do something else and check if we are allowed to do that
request.method = method.upper() # pretend to do something else and check if we are allowed to do that
if TMSSPermissions().has_permission(request=request, view=self):
if action not in allowed_methods:
allowed_methods.append(action)
self.action = actual_action
request.method = actual_method
# add POST permission if project permission allows creation if correct project is referenced # add POST permission if project permission allows creation if correct project is referenced
if self.action == 'list' and 'POST' not in allowed_methods: if self.action == 'list' and 'POST' not in allowed_methods:
...@@ -118,18 +152,14 @@ class LOFARViewSet(viewsets.ModelViewSet): ...@@ -118,18 +152,14 @@ class LOFARViewSet(viewsets.ModelViewSet):
# generally. Hence we need to check if there is the theoretical possibility of POSTing an object based # generally. Hence we need to check if there is the theoretical possibility of POSTing an object based
# on project permissions (which then only works if the user posts data that links the object to the # on project permissions (which then only works if the user posts data that links the object to the
# right project). # right project).
permission_name = self.serializer_class.Meta.model.__name__.lower() if self._user_has_permission_for_any_project(request.user, 'POST', permission_name):
permitted_project_roles = get_project_roles_with_permission(permission_name, 'POST') allowed_methods.append('POST')
user_project_roles = set([project_role['role'] for project_role in get_project_roles_for_user(request.user)]) # optimization: we grant POST/PUT/PATCH/DELETE permissions en block, so we only need to check one
for role in permissions.ProjectRole.objects.filter(value__in=list(user_project_roles)).all(): # of them and skip the expensive actual permission check for the rest. Note: this only affects the
if role in permitted_project_roles: # header and the permission will still be properly checked when the user performs one of these.
allowed_methods.append('POST') allowed_methods += ['PUT', 'PATCH', 'DELETE']
# optimization: we grant POST/PUT/PATCH/DELETE permissions en block, so we only need to check one
# of them and skip the expensive actual permission check for the rest. Note: this only affects the allowed_methods += self._get_permitted_extra_actions(request)
# header and the permission will still be properly checked when the user performs one of these.
allowed_methods += ['PUT', 'PATCH', 'DELETE']
break
return allowed_methods return allowed_methods
@swagger_auto_schema(responses={403: 'forbidden'}) @swagger_auto_schema(responses={403: 'forbidden'})
......
...@@ -15,6 +15,7 @@ from lofar.common import isProductionEnvironment, isDevelopmentEnvironment ...@@ -15,6 +15,7 @@ from lofar.common import isProductionEnvironment, isDevelopmentEnvironment
from django.db.models import Q from django.db.models import Q
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
User = get_user_model() User = get_user_model()
from django.shortcuts import get_object_or_404
# #
# Permissions # Permissions
...@@ -24,6 +25,7 @@ User = get_user_model() ...@@ -24,6 +25,7 @@ User = get_user_model()
# A. apply a filter to prevent object without permission to be included in the list with full details, or # A. apply a filter to prevent object without permission to be included in the list with full details, or
# B. customize get_queryset on the view to call check_object_permissions. # B. customize get_queryset on the view to call check_object_permissions.
def get_project_roles_for_user(user): def get_project_roles_for_user(user):
if not isProductionEnvironment(): if not isProductionEnvironment():
...@@ -70,7 +72,7 @@ def get_project_roles_for_user(user): ...@@ -70,7 +72,7 @@ def get_project_roles_for_user(user):
def get_project_roles_with_permission(permission_name, method='GET'): def get_project_roles_with_permission(permission_name, method='GET'):
try: try:
logger.info('checking permission name=%s action=%s' % (permission_name, method)) logger.info('checking permission name=%s method=%s' % (permission_name, method))
# ...retrieve ProjectPermission object # ...retrieve ProjectPermission object
project_permission = models.ProjectPermission.objects.get(name=permission_name) project_permission = models.ProjectPermission.objects.get(name=permission_name)
# ...determine what project roles are allowed to perform the requested action # ...determine what project roles are allowed to perform the requested action
...@@ -154,20 +156,40 @@ class IsProjectMember(drf_permissions.DjangoObjectPermissions): ...@@ -154,20 +156,40 @@ class IsProjectMember(drf_permissions.DjangoObjectPermissions):
# Note: the Viewflow actions are currently create actions but they do not actually create anything # Note: the Viewflow actions are currently create actions but they do not actually create anything
# (and do not contain the expected data to create an instance of the underlying model, but instead e.g. the # (and do not contain the expected data to create an instance of the underlying model, but instead e.g. the
# data required to perform a workflow step). So we exclude the offending views here. # data required to perform a workflow step). So we exclude the offending views here.
if view.action == 'create' and request.data \ regular_create_action = view.action == 'create' and request.data \
and not 'SchedulingUnitTaskExecuteViewSet' in str(view) \ and not 'SchedulingUnitTaskExecuteViewSet' in str(view) \
and not 'SchedulingUnitTaskAssignViewSet' in str(view): and not 'SchedulingUnitTaskAssignViewSet' in str(view)
extra_action_specs = view.extra_action_permission_specs.get(view.action, None) if hasattr(view, 'extra_action_permission_specs') else None
create_via_extra_action = extra_action_specs and (request.data or
extra_action_specs['project_ref_override_pk_url_param'] in request.query_params)
if regular_create_action or create_via_extra_action:
obj = None obj = None
if view.serializer_class.Meta.model == models.Project:
# Some extra actions create objects on other models. By default, we use the project reference of the base
# model of the action to check whether the user has one of the required roles in this project. On base models
# that are project-independent (e.g. SchedulingUnitObservingStrategyTemplate), we instead check the target
# model for the path to project.
model = extra_action_specs.get('model') if create_via_extra_action else view.serializer_class.Meta.model
if model == models.Project:
return False # project creation solely depends on system role return False # project creation solely depends on system role
if hasattr(view.serializer_class.Meta.model, 'path_to_project'): if hasattr(model, 'path_to_project'):
attrs = view.serializer_class.Meta.model.path_to_project.split('__') attrs = model.path_to_project.split('__')
elif hasattr(view.serializer_class.Meta.model, 'project'): elif hasattr(model, 'project'):
attrs = ['project'] attrs = ['project']
else: else:
raise AttributeError(f'The model {view.serializer_class.Meta.model} requires an attribute "project" or "path_to_project" to check project permissions.') raise AttributeError(f'The model {model} requires an attribute "project" or "path_to_project" to check project permissions.')
for attr in attrs: for attr in attrs:
obj_resolved_in_this_iteration = False obj_resolved_in_this_iteration = False
if extra_action_specs \
and attr == extra_action_specs.get('project_ref_override_attr') and \
extra_action_specs.get('project_ref_override_pk_url_param') in request.query_params:
# some extra actions do not carry all info in their POSTed data, but include a url parameter
# reference to the related project. Use that to resolve the project here.
obj = get_object_or_404(extra_action_specs.get('project_ref_override_model'),
pk=request.query_params[extra_action_specs.get('project_ref_override_pk_url_param')])
continue
if not obj: if not obj:
# on first iteration, the referenced object needs to be resolved from POSTed FQDN # on first iteration, the referenced object needs to be resolved from POSTed FQDN
obj_ref = request.data[attr] obj_ref = request.data[attr]
......
...@@ -97,6 +97,12 @@ class SchedulingUnitObservingStrategyTemplateViewSet(AbstractTemplateViewSet): ...@@ -97,6 +97,12 @@ class SchedulingUnitObservingStrategyTemplateViewSet(AbstractTemplateViewSet):
queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all() queryset = models.SchedulingUnitObservingStrategyTemplate.objects.all()
serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer serializer_class = serializers.SchedulingUnitObservingStrategyTemplateSerializer
# override default method, to make sure that we perform the correct project-based permission checks
extra_action_permission_specs = {'create_scheduling_unit': {'model': models.SchedulingUnitDraft,
'project_ref_override_pk_url_param': 'scheduling_set_id',
'project_ref_override_attr': 'scheduling_set',
'project_ref_override_model': models.SchedulingSet}}
@swagger_auto_schema(responses={status.HTTP_201_CREATED: 'The newly created scheduling unit', @swagger_auto_schema(responses={status.HTTP_201_CREATED: 'The newly created scheduling unit',
status.HTTP_403_FORBIDDEN: 'forbidden'}, status.HTTP_403_FORBIDDEN: 'forbidden'},
operation_description="Create a new SchedulingUnit based on this SchedulingUnitObservingStrategyTemplate, with the given <name> and <description> and make it a child of the given <scheduling_set_id>", operation_description="Create a new SchedulingUnit based on this SchedulingUnitObservingStrategyTemplate, with the given <name> and <description> and make it a child of the given <scheduling_set_id>",
......
...@@ -93,11 +93,15 @@ class ProjectPermissionTestCase(TestCase): ...@@ -93,11 +93,15 @@ class ProjectPermissionTestCase(TestCase):
su_template_url = client.get_schedulingunit_template("scheduling unit")['url'] su_template_url = client.get_schedulingunit_template("scheduling unit")['url']
# user is shared_support # user is shared_support
cls.scheduling_set_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingSet(project_url=cls.project_shared_support_url), '/scheduling_set/') set = cls.test_data_creator.post_data_and_get_response_as_json_object(cls.test_data_creator.SchedulingSet(project_url=cls.project_shared_support_url), '/scheduling_set/')
cls.scheduling_set_shared_support_url = set['url']
cls.scheduling_set_shared_support_id = set['id']
cls.scheduling_unit_draft_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(template_url=su_template_url, scheduling_set_url=cls.scheduling_set_shared_support_url), '/scheduling_unit_draft/') cls.scheduling_unit_draft_shared_support_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(template_url=su_template_url, scheduling_set_url=cls.scheduling_set_shared_support_url), '/scheduling_unit_draft/')
# user is contact # user is contact
cls.scheduling_set_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingSet(project_url=cls.project_contact_url), '/scheduling_set/') set = cls.test_data_creator.post_data_and_get_response_as_json_object(cls.test_data_creator.SchedulingSet(project_url=cls.project_contact_url), '/scheduling_set/')
cls.scheduling_set_contact_url = set['url']
cls.scheduling_set_contact_id = set['id']
cls.scheduling_unit_draft_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(scheduling_set_url=cls.scheduling_set_contact_url), '/scheduling_unit_draft/') cls.scheduling_unit_draft_contact_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitDraft(scheduling_set_url=cls.scheduling_set_contact_url), '/scheduling_unit_draft/')
# user has no role # user has no role
...@@ -111,9 +115,12 @@ class ProjectPermissionTestCase(TestCase): ...@@ -111,9 +115,12 @@ class ProjectPermissionTestCase(TestCase):
cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='taskdraft-create_task_blueprint', POST=[shared_support_role_url]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='taskdraft-create_task_blueprint', POST=[shared_support_role_url]), '/project_permission/')
cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project', GET=[shared_support_role_url], POST=[]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project', GET=[shared_support_role_url], POST=[]), '/project_permission/')
cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project-my_roles', GET=[shared_support_role_url, friend_of_project_role_url]), '/project_permission/') cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='project-my_roles', GET=[shared_support_role_url, friend_of_project_role_url]), '/project_permission/')
cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.ProjectPermission(name='schedulingunitobservingstrategytemplate-create_scheduling_unit', GET=[shared_support_role_url], POST=[shared_support_role_url]), '/project_permission/')
cls.task_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.TaskTemplate(), '/task_template/') cls.task_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.TaskTemplate(), '/task_template/')
cls.scheduling_unit_observing_strategy_template_url = cls.test_data_creator.post_data_and_get_url(cls.test_data_creator.SchedulingUnitObservingStrategyTemplate(scheduling_unit_template_url=su_template_url), '/scheduling_unit_observing_strategy_template/')
# create a new test_data_creator with the regular 'paulus' user, not superuser as in other tests, so that project permissions of tht user are checked. # create a new test_data_creator with the regular 'paulus' user, not superuser as in other tests, so that project permissions of tht user are checked.
cls.auth = requests.auth.HTTPBasicAuth('paulus', 'pauluspass') cls.auth = requests.auth.HTTPBasicAuth('paulus', 'pauluspass')
cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, cls.auth) cls.test_data_creator = TMSSRESTTestDataCreator(BASE_URL, cls.auth)
...@@ -252,6 +259,36 @@ class ProjectPermissionTestCase(TestCase): ...@@ -252,6 +259,36 @@ class ProjectPermissionTestCase(TestCase):
self.assertEqual(r.status_code, 403) self.assertEqual(r.status_code, 403)
self.assertNotIn('Access-Control-Allow-Methods', r.headers) self.assertNotIn('Access-Control-Allow-Methods', r.headers)
# SchedulingUnitObservingStrategyTemplate.actions
def test_strategy_template_create_SU_draft_raises_error_if_user_has_no_permission_for_related_project(self):
# create SU draft via the extra action on strategy template, to test that the path to project of the SU draft model is used
# make sure we cannot create a draft in a scheduling set connected to a project where we have no role
POST_and_assert_expected_response(self, self.scheduling_unit_observing_strategy_template_url + '/create_scheduling_unit/',
{}, 403, {}, auth=self.auth, params={'scheduling_set_id': self.scheduling_set_contact_id})
def test_strategy_template_create_SU_draft_works_if_user_has_permission_for_related_project(self):
# create SU draft via the extra action on strategy template, to test that the path to project of the SU draft model is used
# make sure we can create a draft in a scheduling set connected to a project where we have 'shared_support' role
POST_and_assert_expected_response(self, self.scheduling_unit_observing_strategy_template_url + '/create_scheduling_unit/',
{}, 201, {}, auth=self.auth, params={'scheduling_set_id': self.scheduling_set_shared_support_id})
def test_access_control_allow_header_reflects_user_permissions_in_strategy_template_list_view(self):
# the frontend relies on the list view to include an extra action if the user has permission to execute it.
# Since the user has permission to create scheduling units in case they belong to the right project, the action
# should be listed here.
with requests.Session() as session:
session.verify = False
session.auth = self.auth
r = session.get(BASE_URL + '/scheduling_unit_observing_strategy_template/')
self.assertEqual(r.status_code, 200)
allowed_methods = r.headers['Access-Control-Allow-Methods'].split(', ')
self.assertIn('create_scheduling_unit', allowed_methods)
# Project
def test_project_get_friend_returns_correct_user(self): def test_project_get_friend_returns_correct_user(self):
""" """
Note: This test relies on real data from Keycloak. Note: This test relies on real data from Keycloak.
......
...@@ -58,22 +58,22 @@ import lofar.sas.tmss.tmss.settings as TMSS_SETTINGS ...@@ -58,22 +58,22 @@ import lofar.sas.tmss.tmss.settings as TMSS_SETTINGS
# by default we assert on requests taking longer than this timeout # by default we assert on requests taking longer than this timeout
DEFAULT_REQUEST_TIMEOUT=10 DEFAULT_REQUEST_TIMEOUT=10
def _call_API_and_assert_expected_response(test_instance, url, call, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT): def _call_API_and_assert_expected_response(test_instance, url, call, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT, params=None):
""" """
Call API method on the provided url and assert the expected code is returned and the expected content is in the response content Call API method on the provided url and assert the expected code is returned and the expected content is in the response content
:return: response as dict. This either contains the data of an entry or error details. If JSON cannot be parsed, return string. :return: response as dict. This either contains the data of an entry or error details. If JSON cannot be parsed, return string.
""" """
_start_request_timestamp = datetime.datetime.utcnow() _start_request_timestamp = datetime.datetime.utcnow()
if call == 'PUT': if call == 'PUT':
response = requests.put(url, json=data, auth=auth, timeout=timeout) response = requests.put(url, json=data, auth=auth, timeout=timeout, params=params)
elif call == 'POST': elif call == 'POST':
response = requests.post(url, json=data, auth=auth, timeout=timeout) response = requests.post(url, json=data, auth=auth, timeout=timeout, params=params)
elif call == 'GET': elif call == 'GET':
response = requests.get(url, auth=auth, allow_redirects=False) response = requests.get(url, auth=auth, allow_redirects=False, params=params)
elif call == 'PATCH': elif call == 'PATCH':
response = requests.patch(url, json=data, auth=auth, timeout=timeout) response = requests.patch(url, json=data, auth=auth, timeout=timeout, params=params)
elif call == 'DELETE': elif call == 'DELETE':
response = requests.delete(url, auth=auth, timeout=timeout) response = requests.delete(url, auth=auth, timeout=timeout, params=params)
else: else:
raise ValueError("The provided call '%s' is not a valid API method choice" % call) raise ValueError("The provided call '%s' is not a valid API method choice" % call)
...@@ -131,12 +131,12 @@ def PUT_and_assert_expected_response(test_instance, url, data, expected_code, ex ...@@ -131,12 +131,12 @@ def PUT_and_assert_expected_response(test_instance, url, data, expected_code, ex
return r_dict return r_dict
def POST_and_assert_expected_response(test_instance, url, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT): def POST_and_assert_expected_response(test_instance, url, data, expected_code, expected_content, auth=AUTH, timeout:float=DEFAULT_REQUEST_TIMEOUT, params=None):
""" """
POST data on url and assert the expected code is returned and the expected content is in the response content POST data on url and assert the expected code is returned and the expected content is in the response content
:return: response dict :return: response dict
""" """
r_dict = _call_API_and_assert_expected_response(test_instance, url, 'POST', data, expected_code, expected_content, auth=auth, timeout=timeout) r_dict = _call_API_and_assert_expected_response(test_instance, url, 'POST', data, expected_code, expected_content, auth=auth, timeout=timeout, params=params)
return r_dict return r_dict
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment