Skip to content
Snippets Groups Projects
Commit ef5d2655 authored by Mario Raciti's avatar Mario Raciti
Browse files

TMSS-556: Update t_websocket tests to test the WS service with auth rules

parent 2eb97169
No related branches found
No related tags found
1 merge request!822Resolve TMSS-556
...@@ -59,9 +59,9 @@ class TMSSWebSocket(WebSocket): ...@@ -59,9 +59,9 @@ class TMSSWebSocket(WebSocket):
if token_obj: if token_obj:
self.user = token_obj.user self.user = token_obj.user
self.authenticated = True self.authenticated = True
logger.info('client authenticated %s' % self.address[0]) logger.info('Client authenticated')
else: else:
logger.info('client unauthenticated %s' % self.address[0]) logger.info('client unauthenticated')
self.close(1011, u'unauthenticated') self.close(1011, u'unauthenticated')
# NOTE: We just ignore incoming messages as we treat the communication as one-way only, except for the auth msg. # NOTE: We just ignore incoming messages as we treat the communication as one-way only, except for the auth msg.
...@@ -121,12 +121,14 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler): ...@@ -121,12 +121,14 @@ class TMSSEventMessageHandlerForWebsocket(TMSSEventMessageHandler):
self.t.join() self.t.join()
def _get_authorised_clients_for_object_in_websocket(self, obj_name): def _get_authorised_clients_for_object_in_websocket(self, obj_name):
from lofar.sas.tmss.tmss.tmssapp.models import User from django.contrib.auth import get_user_model
User = get_user_model()
auth_clients = [] auth_clients = []
for ws in self._ws_server.connections.values(): for ws in self._ws_server.connections.values():
if ws.authenticated: # Check user permissions for the object if ws.authenticated: # Check user permissions for the object
user = User.objects.get(username=ws.user) user = User.objects.get(username=ws.user)
if user.has_perm('tmssapp.view_%s' % obj_name.replace("_", "")): if user.has_perm("tmssapp.view_%s" % obj_name.replace('_','')):
auth_clients.append(ws) auth_clients.append(ws)
return auth_clients return auth_clients
......
...@@ -68,8 +68,7 @@ class TestSubtaskSchedulingService(unittest.TestCase): ...@@ -68,8 +68,7 @@ class TestSubtaskSchedulingService(unittest.TestCase):
logger.info('Connected to ws') logger.info('Connected to ws')
# Send auth token as first message after the WS handshake # Send auth token as first message after the WS handshake
response = requests.post(self.test_data_creator.django_api_url + '/token-auth/', response = requests.post(self.test_data_creator.django_api_url + '/token-auth/',
json={'username': self.tmss_test_env.client_credentials.dbcreds.user, json={'username': 'paulus', 'password': 'pauluspass'})
'password': self.tmss_test_env.client_credentials.dbcreds.password})
ws.send(JSONdumps(response.json())) ws.send(JSONdumps(response.json()))
def on_close(ws): def on_close(ws):
...@@ -98,6 +97,19 @@ class TestSubtaskSchedulingService(unittest.TestCase): ...@@ -98,6 +97,19 @@ class TestSubtaskSchedulingService(unittest.TestCase):
cls.test_data_creator = cls.tmss_test_env.create_test_data_creator() cls.test_data_creator = cls.tmss_test_env.create_test_data_creator()
# Create group and permissions and add to user 'paulus' to test auth rules for WS messages
from django.contrib.auth.models import Group, Permission
from django.contrib.auth import get_user_model
User = get_user_model()
ws_test_group, _ = Group.objects.get_or_create(name='ws_test')
for model_name in ('schedulingunitdraft', 'taskdraft', 'schedulingunitblueprint', 'taskblueprint', 'subtask'):
ws_test_group.permissions.add(Permission.objects.get(codename='view_%s' % model_name))
ws_test_user, _ = User.objects.get_or_create(username='paulus', password='pauluspass')
ws_test_user.groups.add(ws_test_group)
while not ws_test_user.has_perm('tmssapp.view_subtask'):
ws_test_user = User.objects.get(username='paulus')
@classmethod @classmethod
def tearDownClass(cls) -> None: def tearDownClass(cls) -> None:
cls.tmss_test_env.stop() cls.tmss_test_env.stop()
...@@ -118,10 +130,14 @@ class TestSubtaskSchedulingService(unittest.TestCase): ...@@ -118,10 +130,14 @@ class TestSubtaskSchedulingService(unittest.TestCase):
self.start_ws_client(websocket_port) # Start ws client self.start_ws_client(websocket_port) # Start ws client
def test_object(json_test, obj_type, action): # Check if the correct/expected json_blobs arrive in the ws client def test_object(json_test, obj_type, action, auth=True): # Check if the correct/expected json_blobs arrive in the ws client
# Wait for incoming ws message # Wait for incoming ws message
if not self.sync_event.wait(timeout=50): if not self.sync_event.wait(timeout=50):
if auth: # TODO: Improve auth checks.
raise TimeoutError() raise TimeoutError()
else:
self.sync_event.clear()
return
self.sync_event.clear() self.sync_event.clear()
# Assert json_blobs # Assert json_blobs
expected_json_blob = {'object_details': {'id': json_test['id']}, 'object_type': obj_type.value, 'action': action.value} expected_json_blob = {'object_details': {'id': json_test['id']}, 'object_type': obj_type.value, 'action': action.value}
...@@ -142,6 +158,11 @@ class TestSubtaskSchedulingService(unittest.TestCase): ...@@ -142,6 +158,11 @@ class TestSubtaskSchedulingService(unittest.TestCase):
# Test creations # Test creations
# Test reservation create not authorised to receive messages
reservation = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.Reservation(), '/reservation/')
test_object(reservation, self.ObjTypes.RESERVATION, self.ObjActions.CREATE, False)
# Test scheduling_unit_draft create # Test scheduling_unit_draft create
su_draft = self.test_data_creator.post_data_and_get_response_as_json_object( su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(
self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/') self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/')
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment