Skip to content
Snippets Groups Projects
Commit ab4e1a5c authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

TMSS-2829: skeleton for copy-service test, including (sub)task definition and creation. wip.

parent 6d5559ba
No related branches found
No related tags found
1 merge request!1296Resolve TMSS-2829
......@@ -16,30 +16,25 @@
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
import threading
import time
import unittest
import uuid
import logging
logger = logging.getLogger('lofar.'+__name__)
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
from lofar.common.test_utils import skip_integration_tests
if skip_integration_tests():
exit(3)
from lofar.messaging.messagebus import TemporaryExchange, BusListenerJanitor
from lofar.messaging.messagebus import TemporaryExchange
from lofar.common.test_utils import integration_test
from lofar.common.util import single_line_with_single_spaces
from threading import Lock
import requests
import json
from collections import deque
from time import sleep
from datetime import datetime, timedelta
@integration_test
class TestSubtaskSchedulingService(unittest.TestCase):
class TestCopyService(unittest.TestCase):
'''
Tests for the SubtaskSchedulingService
Tests for the TMSSService
'''
@classmethod
def setUpClass(cls) -> None:
......@@ -48,180 +43,70 @@ class TestSubtaskSchedulingService(unittest.TestCase):
cls.tmp_exchange = TemporaryExchange("%s_%s" % (cls.__name__, cls.TEST_UUID))
cls.tmp_exchange.open()
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, start_postgres_listener=False, populate_schemas=False, populate_test_data=False)
# override DEFAULT_BUSNAME
import lofar
lofar.messaging.config.DEFAULT_BUSNAME = cls.tmp_exchange.address
lofar.messaging.DEFAULT_BUSNAME = cls.tmp_exchange.address
# import here, and not at top of module, because DEFAULT_BUSNAME needs to be set before importing
from lofar.sas.tmss.test.test_environment import TMSSTestEnvironment
cls.tmss_test_env = TMSSTestEnvironment(exchange=cls.tmp_exchange.address, populate_schemas=True, populate_test_data=False,
start_subtask_scheduler=True, start_postgres_listener=True,
start_dynamic_scheduler=False, enable_viewflow=False)
cls.tmss_test_env.start()
from lofar.sas.tmss.test.tmss_test_data_rest import TMSSRESTTestDataCreator
cls.test_data_creator = TMSSRESTTestDataCreator(cls.tmss_test_env.django_server.url,
(cls.tmss_test_env.ldap_server.dbcreds.user,
cls.tmss_test_env.ldap_server.dbcreds.password))
@classmethod
def tearDownClass(cls) -> None:
cls.tmss_test_env.stop()
cls.tmp_exchange.close()
def test_01_for_expected_behaviour(self):
'''
This test starts a TMSSPGListener service and TMSS, creates/updates/deletes subtasks/tasks/schedulingunits, and checks if the correct events are sent.
'''
logger.info(' -- test_01_for_expected_behaviour -- ')
from lofar.sas.tmss.services.postgres_listener import TMSSPGListener, TMSS_SUBTASK_OBJECT_EVENT_PREFIX, \
TMSS_SUBTASK_STATUS_EVENT_PREFIX, TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX, TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX, \
TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX, TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX, \
TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX, TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX, \
TMSS_PROJECT_OBJECT_EVENT_PREFIX, TMSS_PROJECTCYCLES_OBJECT_EVENT_PREFIX, TMSS_SCHEDULINGCONSTRAINTSWEIGHTFACTOR_OBJECT_EVENT_PREFIX
class TestTMSSPGListener(TMSSPGListener):
'''Helper TMSSPGListener for this test, storing intermediate results, and providing synchronization threading.Events'''
def __init__(self, dbcreds, exchange=self.tmp_exchange.address):
super().__init__(dbcreds, exchange)
self.subjects = deque()
self.contentDicts = deque()
self.lock = Lock()
self.event_received = threading.Event()
def _sendNotification(self, subject, contentDict):
# instead of sending a notification to the messagebus, record the subject and content in queues
# so we can check in the test if the correct subjects are recorded
with self.lock:
self.event_received.clear()
logger.info("TestTMSSPGListener detected db change: %s %s", subject, single_line_with_single_spaces(contentDict))
self.subjects.append(subject)
contentDict = json.loads(contentDict) if isinstance(contentDict, str) else contentDict
if 'new' in contentDict:
del contentDict['new']
if 'old' in contentDict:
del contentDict['old']
self.contentDicts.append(contentDict)
self.event_received.set()
def test_copy_with_managed_output(self):
from lofar.sas.tmss.tmss.tmssapp.models import SchedulingUnitObservingStrategyTemplate, SchedulingSet
from lofar.sas.tmss.tmss.tmssapp.tasks import create_scheduling_unit_draft_from_observing_strategy_template, create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft
from lofar.sas.tmss.test.tmss_test_data_django_models import SchedulingSet_test_data
strategy_template = SchedulingUnitObservingStrategyTemplate.get_version_or_latest(name="Simple Observation")
strategy_template.template['tasks']['CopyPipeline'] = {
"specifications_doc": {
"destination": "localhost:/tmp",
"managed_output": True
},
"specifications_template": {
"name": "copy pipeline",
"version": 1
}
}
strategy_template.template['task_relations'].append({
"consumer": "CopyPipeline",
"input": {
"dataformat": "MeasurementSet",
"datatype": "visibilities",
"role": "any"
},
"output": {
"dataformat": "MeasurementSet",
"datatype": "visibilities",
"role": "correlator"
},
"producer": "Observation",
"selection_doc": {},
"selection_template": {"name": "all" }
})
scheduling_set = SchedulingSet.objects.create(**SchedulingSet_test_data())
scheduling_unit_draft = create_scheduling_unit_draft_from_observing_strategy_template(strategy_template, scheduling_set, "name", "description")
scheduling_unit_blueprint = create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft)
# create and start the service (the object under test)
with TestTMSSPGListener(exchange=self.tmp_exchange.address, dbcreds=self.tmss_test_env.database.dbcreds) as service:
from lofar.sas.tmss.tmss.tmssapp.models.specification import SchedulingConstraintsWeightFactor
weight_factor = SchedulingConstraintsWeightFactor.objects.create(constraint_name=str(uuid.uuid4()),
weight=1)
weight_factor.weight = 2
weight_factor.save()
# sync and check weight_factor updated
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_SCHEDULINGCONSTRAINTSWEIGHTFACTOR_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft())
self.assertEqual({'id': weight_factor.id}, service.contentDicts.popleft())
# create a Project
project = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Project(), '/project/')
# sync and check
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_PROJECT_OBJECT_EVENT_PREFIX +'.Created', service.subjects.popleft())
self.assertEqual({"name": project['name']}, service.contentDicts.popleft())
self.assertEqual(TMSS_PROJECTCYCLES_OBJECT_EVENT_PREFIX +'.Created', service.subjects.popleft())
self.assertIn("id", service.contentDicts.popleft())
# create a SchedulingUnitDraft
scheduling_set_url = self.test_data_creator.post_data_and_get_url(self.test_data_creator.SchedulingSet(project_url=project['url']), '/scheduling_set/')
su_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitDraft(scheduling_set_url=scheduling_set_url), '/scheduling_unit_draft/')
# sync and check
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_SCHEDULINGUNITDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft())
self.assertEqual({"id": su_draft['id']}, service.contentDicts.popleft())
# create a TaskDraft
task_draft = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskDraft(scheduling_unit_draft_url=su_draft['url']), '/task_draft/')
# sync and check
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_TASKDRAFT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft())
self.assertEqual({"id": task_draft['id']}, service.contentDicts.popleft())
# create a SchedulingUnitBlueprint
su_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.SchedulingUnitBlueprint(scheduling_unit_draft_url=su_draft['url']), '/scheduling_unit_blueprint/')
# sync and check
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft())
self.assertEqual({"id": su_blueprint['id']}, service.contentDicts.popleft())
# create a TaskBlueprint
task_blueprint = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.TaskBlueprint(scheduling_unit_blueprint_url=su_blueprint['url'],
draft_url=task_draft['url']), '/task_blueprint/')
# sync and check
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft())
self.assertEqual({"id": task_blueprint['id']}, service.contentDicts.popleft())
# create a SubTask
subtask = self.test_data_creator.post_data_and_get_response_as_json_object(self.test_data_creator.Subtask(task_blueprint_url=task_blueprint['url']), '/subtask/')
# sync and check
service.event_received.wait(timeout=5)
with service.lock:
self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Created', service.subjects.popleft())
self.assertEqual({"id": subtask['id']}, service.contentDicts.popleft())
# the creation of the subtask should also trigger an update of its parent TaskBlueprint and SchedulingUnitBlueprint
self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft())
self.assertEqual({"id": task_blueprint['id']}, service.contentDicts.popleft())
self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft())
self.assertEqual({"id": su_blueprint['id']}, service.contentDicts.popleft())
# update subtask status, use a nice tmss_client and the rest api.
with self.tmss_test_env.create_tmss_client() as client:
client.set_subtask_status(subtask['id'], 'defined')
# ugly, but functional. Wait for all status updates: 1 object, 1 status. both per each object (3 types) => total 6 events.
start_wait = datetime.utcnow()
while True:
with service.lock:
if len(service.subjects) == 6:
break
if datetime.utcnow() - start_wait > timedelta(seconds=5):
raise TimeoutError("timeout while waiting for status/object updates")
# sync and check
with service.lock:
self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX + '.Updated', service.subjects.popleft())
self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft())
self.assertEqual(TMSS_SUBTASK_STATUS_EVENT_PREFIX+'.Defined', service.subjects.popleft())
self.assertEqual({'id': subtask['id'], 'status': 'defined'}, service.contentDicts.popleft())
self.assertEqual(TMSS_TASKBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft())
self.assertEqual({'id': task_blueprint['id']}, service.contentDicts.popleft())
self.assertEqual(TMSS_TASKBLUEPRINT_STATUS_EVENT_PREFIX+'.Schedulable', service.subjects.popleft())
self.assertEqual({'id': task_blueprint['id'], 'status': 'schedulable'}, service.contentDicts.popleft())
from lofar.sas.tmss.services.copy_service import create_copy_service
service = create_copy_service(exchange=self.tmp_exchange.address, tmss_client_credentials_id=self.tmss_test_env.client_credentials.dbcreds_id)
with BusListenerJanitor(service):
pass
self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_OBJECT_EVENT_PREFIX+'.Updated', service.subjects.popleft())
self.assertEqual({'id': su_blueprint['id']}, service.contentDicts.popleft())
self.assertEqual(TMSS_SCHEDULINGUNITBLUEPRINT_STATUS_EVENT_PREFIX+'.Schedulable', service.subjects.popleft())
self.assertEqual({'id': su_blueprint['id'], 'status': 'schedulable'}, service.contentDicts.popleft())
# delete subtask, use direct http delete request on rest api
requests.delete(subtask['url'], auth=self.test_data_creator.auth)
time.sleep(1)
# sync and check subtask deleted
with service.lock:
self.assertEqual(TMSS_SUBTASK_OBJECT_EVENT_PREFIX+'.Deleted', service.subjects.popleft())
self.assertEqual({'id': subtask['id']}, service.contentDicts.popleft())
if __name__ == '__main__':
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment