diff --git a/SAS/TMSS/services/tmsspglistener/test/t_tmss_pglistener_service.py b/SAS/TMSS/services/tmsspglistener/test/t_tmss_pglistener_service.py index 92b97794ed52566f6bddf8e248301004566426e4..2454713bf9b5afe68d8c1ef741cbe18c84f6374f 100755 --- a/SAS/TMSS/services/tmsspglistener/test/t_tmss_pglistener_service.py +++ b/SAS/TMSS/services/tmsspglistener/test/t_tmss_pglistener_service.py @@ -30,9 +30,11 @@ from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator from lofar.messaging.messagebus import TemporaryExchange from lofar.sas.tmss.services.tmsspglistener import * from lofar.common.test_utils import integration_test -from threading import Event +from threading import Lock import requests import json +from collections import deque +from datetime import datetime, timedelta @integration_test class TestSubtaskSchedulingService(unittest.TestCase): @@ -60,60 +62,53 @@ class TestSubtaskSchedulingService(unittest.TestCase): def test_01_for_expected_behaviour(self): ''' - This test starts a scheduling service and tmss, creates a chain of subtasks, finishes the first, and checks if the successors are then scheduled. + This test starts a TMSSPGListener service and TMSS, creates/updates/deletes subtasks/tasks/schedulingunits, and checks if the correct events are sent. ''' - logger.info(' -- test_01_for_expected_behaviour -- ') - class TestTMSSPGListener(TMSSPGListener): '''Helper TMSSPGListener for this test, storing intermediate results, and providing synchronization threading.Events''' def __init__(self, dbcreds, exchange=self.tmp_exchange.address): super().__init__(dbcreds, exchange) - self.last_subject = None - self.last_contentDict = None - self.wait_event1 = Event() - self.wait_event2 = Event() + self.subjects = deque() + self.contentDicts = deque() + self.lock = Lock() def _sendNotification(self, subject, contentDict): - self.wait_event1.wait(timeout=2) - self.last_subject = subject - self.last_contentDict = json.loads(contentDict) - self.wait_event2.set() + # instead of sending a notification to the messagebus, record the subject and content in queues + # so we can check in the test if the correct subjects are recorded + with self.lock: + logger.info("detected db change: %s %s", subject, single_line_with_single_spaces(contentDict)) + self.subjects.append(subject) + self.contentDicts.append(json.loads(contentDict) if isinstance(contentDict, str) else contentDict) # create and start the service (the object under test) with TestTMSSPGListener(exchange=self.tmp_exchange.address, dbcreds=self.tmss_test_env.database.dbcreds) as service: - # sync - service.wait_event1.set() - # create a SchedulingUnitDraft su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitDraft(), '/scheduling_unit_draft/') # sync and check - self.assertTrue(service.wait_event2.wait(timeout=2)) - self.assertEqual(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.last_subject) - self.assertEqual({"id": su_draft['id']}, service.last_contentDict) - service.wait_event1.set() + with service.lock: + self.assertEqual(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": su_draft['id']}, service.contentDicts.popleft()) # create a TaskDraft task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/') # sync and check - self.assertTrue(service.wait_event2.wait(timeout=2)) - self.assertEqual(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.last_subject) - self.assertEqual({"id": task_draft['id']}, service.last_contentDict) - service.wait_event1.set() + with service.lock: + self.assertEqual(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": task_draft['id']}, service.contentDicts.popleft()) # create a SchedulingUnitBlueprint su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']), '/scheduling_unit_blueprint/') # sync and check - self.assertTrue(service.wait_event2.wait(timeout=2)) - self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.last_subject) - self.assertEqual({"id": su_blueprint['id']}, service.last_contentDict) - service.wait_event1.set() + with service.lock: + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": su_blueprint['id']}, service.contentDicts.popleft()) # create a TaskBlueprint @@ -121,61 +116,59 @@ class TestSubtaskSchedulingService(unittest.TestCase): draft_url=task_draft['url']), '/task_blueprint/') # sync and check - self.assertTrue(service.wait_event2.wait(timeout=2)) - self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.last_subject) - self.assertEqual({"id": task_blueprint['id']}, service.last_contentDict) - service.wait_event1.set() + with service.lock: + self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": task_blueprint['id']}, service.contentDicts.popleft()) # create a SubTask subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/') # sync and check - self.assertTrue(service.wait_event2.wait(timeout=2)) - self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', service.last_subject) - self.assertEqual({"id": subtask['id']}, service.last_contentDict) - service.wait_event1.set() - - # TODO: make test work. The TMSSPGListener does output the correct events, but not in a deterministic order (thanks to postgres triggers) - # # update subtask status - # with self.tmss_test_env.create_tmss_client() as client: - # client.set_subtask_status(subtask['id'], 'scheduled') - # - # # sync and check - # service.wait_event2.wait(timeout=2) - # # self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.last_subject) - # # self.assertEqual({'id': subtask['id']}, service.last_contentDict) - # service.wait_event1.set() - # - # - # # sync and check subtask status - # service.wait_event2.wait(timeout=2) - # # self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Scheduled', service.last_subject) - # # self.assertEqual({'id': subtask['id'], 'status': 'scheduled'}, service.last_contentDict) - # service.wait_event1.set() - # - # - # # sync and check parent task_blueprint status - # service.wait_event2.wait(timeout=2) - # self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.last_subject) - # self.assertEqual({'id': task_blueprint['id'], 'status': 'scheduled'}, service.last_contentDict) - # service.wait_event1.set() - # - # - # # sync and check parent su_blueprint status - # service.wait_event2.wait(timeout=2) - # self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.last_subject) - # self.assertEqual({'id': su_blueprint['id'], 'status': 'scheduled'}, service.last_contentDict) - # service.wait_event1.set() - - # delete subtask + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft()) + self.assertEqual({"id": subtask['id']}, service.contentDicts.popleft()) + + # update subtask status, use a nice tmss_client and the rest api. + with self.tmss_test_env.create_tmss_client() as client: + client.set_subtask_status(subtask['id'], 'scheduled') + + # ugly, but functional. Wait for all status updates: 1 object, 1 status. both per each object (3 types) => total 6 events. + start_wait = datetime.utcnow() + while True: + with service.lock: + if len(service.subjects) == 6: + break + if datetime.utcnow() - start_wait > timedelta(seconds=5): + raise TimeoutError("timeout while waiting for status/object updates") + + # sync and check + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.subjects.popleft()) + self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': subtask['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': task_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id']}, service.contentDicts.popleft()) + + self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Scheduled', service.subjects.popleft()) + self.assertEqual({'id': su_blueprint['id'], 'status': 'scheduled'}, service.contentDicts.popleft()) + + # delete subtask, use direct http delete request on rest api requests.delete(subtask['url'], auth=self.test_data_creator.auth) # sync and check subtask deleted - self.assertTrue(service.wait_event2.wait(timeout=2)) - self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', service.last_subject) - self.assertEqual({'id': subtask['id']}, service.last_contentDict) - service.wait_event1.set() + with service.lock: + self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', service.subjects.popleft()) + self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft()) if __name__ == '__main__':