diff --git a/SAS/TMSS/test/t_tmssapp_specification_django.py b/SAS/TMSS/test/t_tmssapp_specification_django.py index 2735efe87549c18d2a60926b2401df45696520bb..683445dfe4f66a15897676c9cac59c01eb672304 100755 --- a/SAS/TMSS/test/t_tmssapp_specification_django.py +++ b/SAS/TMSS/test/t_tmssapp_specification_django.py @@ -20,24 +20,36 @@ # $Id: $ import unittest -import rest_framework.test -from tmss.tmssapp.populate import populate_choices -from tmss.tmssapp import models -from django.db.utils import IntegrityError -from django.contrib.auth.models import User -from test_utils import assertDataWithUrls, assertUrlList -import uuid - from datetime import datetime # use this to create timezone-aware datetime objects: from django.utils import timezone -client = rest_framework.test.APIClient() - # todo: Tags? -> Decide how to deal with them first. # todo: Immutability of Blueprints on db level? -class GeneratorTemplateTest(rest_framework.test.APITransactionTestCase): +# before we import any django modules the DJANGO_SETTINGS_MODULE and TMSS_DBCREDENTIALS need to be known/set. +# import and start an isolated TMSSTestDjangoServerWithPostgresInstance (with fresh database and django server) +# this automagically sets the required DJANGO_SETTINGS_MODULE and TMSS_DBCREDENTIALS envvars. +from lofar.sas.tmss.test.test_utils import TMSSTestDjangoServerWithPostgresInstance +_tmss_test_server = TMSSTestDjangoServerWithPostgresInstance() +_tmss_test_server.start() + +# tell unittest to stop (and automagically cleanup) the test database and django server once all testing is done. +def tearDownModule(): + _tmss_test_server.stop() + +# now it's safe to import django modules +from lofar.sas.tmss.tmss.tmssapp import models +from django.db.utils import IntegrityError +from django.contrib.auth.models import User + +# TODO: rest API testing should be moved out of this test module. +# import rest_framework.test +# client = rest_framework.test.APIClient() +# from lofar.sas.tmss.test.test_utils import assertDataWithUrls, assertUrlList + + +class GeneratorTemplateTest(unittest.TestCase): reset_sequences = True # todo: Decide whether we want this. It slows down tests, but we can hardcode primary keys. # test data @@ -55,16 +67,7 @@ class GeneratorTemplateTest(rest_framework.test.APITransactionTestCase): "create_function": 'Trousers', "tags": []} - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - def tearDown(self): - client.logout() - - def test_GeneratorTemplate_gets_created_with_correct_creation_timestamp(self): - # setup before = datetime.utcnow() entry = models.GeneratorTemplate.objects.create(**self.test_data_1) @@ -87,45 +90,51 @@ class GeneratorTemplateTest(rest_framework.test.APITransactionTestCase): self.assertLess(before, entry.updated_at) self.assertGreater(after, entry.updated_at) - def test_GET_GeneratorTemplate_list_view_shows_entry(self): - - # setup - entry = models.GeneratorTemplate.objects.create(**self.test_data_1) - - # assert - response = client.get('/generator_template/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response.data['results'][0].items()) - - def test_GET_GeneratorTemplate_view_returns_correct_entry(self): - - # setup - id1 = models.GeneratorTemplate.objects.create(**self.test_data_1).id - id2 = models.GeneratorTemplate.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/generator_template/%s/' % id1, format='json', follow=True) - response2 = client.get('/generator_template/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response1.data.items()) - for item in self.test_data_2.items(): - self.assertIn(item, response2.data.items()) - -class DefaultGeneratorTemplateTest(rest_framework.test.APITransactionTestCase): + # TODO: rest API testing should be moved out of this test module. + # def test_GET_GeneratorTemplate_list_view_shows_entry(self): + # + # # setup + # entry = models.GeneratorTemplate.objects.create(**self.test_data_1) + # + # # assert + # response = client.get('/generator_template/', format='json', follow=True) + # self.assertEqual(response.status_code, 200) + # for item in self.test_data_1.items(): + # self.assertIn(item, response.data['results'][0].items()) + + # TODO: rest API testing should be moved out of this test module. + # def test_GET_GeneratorTemplate_view_returns_correct_entry(self): + # + # # setup + # id1 = models.GeneratorTemplate.objects.create(**self.test_data_1).id + # id2 = models.GeneratorTemplate.objects.create(**self.test_data_2).id + # + # # assert + # response1 = client.get('/generator_template/%s/' % id1, format='json', follow=True) + # response2 = client.get('/generator_template/%s/' % id2, format='json', follow=True) + # self.assertEqual(response1.status_code, 200) + # self.assertEqual(response2.status_code, 200) + # for item in self.test_data_1.items(): + # self.assertIn(item, response1.data.items()) + # for item in self.test_data_2.items(): + # self.assertIn(item, response2.data.items()) + + + +class DefaultGeneratorTemplateTest(unittest.TestCase): test_data = {'name': 'default', 'template': None, 'tags':[]} - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - def tearDown(self): - client.logout() + # TODO: solve with LDAPServerInstance similar to DjangoServerInstance solution + # + # def setUp(self): + # user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') + # client.force_login(user) + # + # def tearDown(self): + # client.logout() def test_DefaultGeneratorTemplate_prevents_same_name(self): @@ -145,7 +154,7 @@ class DefaultGeneratorTemplateTest(rest_framework.test.APITransactionTestCase): models.DefaultGeneratorTemplate.objects.create(**test_data_2) -class SchedulingUnitTemplateTest(rest_framework.test.APITransactionTestCase): +class SchedulingUnitTemplateTest(unittest.TestCase): reset_sequences = True @@ -162,13 +171,6 @@ class SchedulingUnitTemplateTest(rest_framework.test.APITransactionTestCase): "schema": {"mykey": "my other value"}, "tags": []} - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - def tearDown(self): - client.logout() - def test_SchedulingUnitTemplate_gets_created_with_correct_creation_timestamp(self): # setup @@ -193,35 +195,37 @@ class SchedulingUnitTemplateTest(rest_framework.test.APITransactionTestCase): self.assertLess(before, entry.updated_at) self.assertGreater(after, entry.updated_at) - def test_GET_SchedulingUnitTemplate_list_view_shows_entry(self): - - # setup - entry = models.SchedulingUnitTemplate.objects.create(**self.test_data_1) - - # assert - response = client.get('/scheduling_unit_template/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response.data['results'][0].items()) - - def test_GET_SchedulingUnitTemplate_view_returns_correct_entry(self): - - # setup - id1 = models.SchedulingUnitTemplate.objects.create(**self.test_data_1).id - id2 = models.SchedulingUnitTemplate.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/scheduling_unit_template/%s/' % id1, format='json', follow=True) - response2 = client.get('/scheduling_unit_template/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response1.data.items()) - for item in self.test_data_2.items(): - self.assertIn(item, response2.data.items()) - - -class TaskTemplateTest(rest_framework.test.APITransactionTestCase): + # TODO: rest API testing should be moved out of this test module. + # def test_GET_SchedulingUnitTemplate_list_view_shows_entry(self): + # + # # setup + # entry = models.SchedulingUnitTemplate.objects.create(**self.test_data_1) + # + # # assert + # response = client.get('/scheduling_unit_template/', format='json', follow=True) + # self.assertEqual(response.status_code, 200) + # for item in self.test_data_1.items(): + # self.assertIn(item, response.data['results'][0].items()) + + # TODO: rest API testing should be moved out of this test module. + # def test_GET_SchedulingUnitTemplate_view_returns_correct_entry(self): + # + # # setup + # id1 = models.SchedulingUnitTemplate.objects.create(**self.test_data_1).id + # id2 = models.SchedulingUnitTemplate.objects.create(**self.test_data_2).id + # + # # assert + # response1 = client.get('/scheduling_unit_template/%s/' % id1, format='json', follow=True) + # response2 = client.get('/scheduling_unit_template/%s/' % id2, format='json', follow=True) + # self.assertEqual(response1.status_code, 200) + # self.assertEqual(response2.status_code, 200) + # for item in self.test_data_1.items(): + # self.assertIn(item, response1.data.items()) + # for item in self.test_data_2.items(): + # self.assertIn(item, response2.data.items()) + + +class TaskTemplateTest(unittest.TestCase): reset_sequences = True @@ -240,14 +244,6 @@ class TaskTemplateTest(rest_framework.test.APITransactionTestCase): "schema": {"mykey": "my other value"}, "tags": []} - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - def tearDown(self): - client.logout() - - def test_TaskTemplate_gets_created_with_correct_creation_timestamp(self): # setup @@ -272,1575 +268,1577 @@ class TaskTemplateTest(rest_framework.test.APITransactionTestCase): self.assertLess(before, entry.updated_at) self.assertGreater(after, entry.updated_at) - def test_GET_TaskTemplate_list_view_shows_entry(self): - - # setup - entry = models.TaskTemplate.objects.create(**self.test_data_1) - - # assert - response = client.get('/task_template/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response.data['results'][0].items()) - - def test_GET_TaskTemplate_view_returns_correct_entry(self): - - # setup - id1 = models.TaskTemplate.objects.create(**self.test_data_1).id - id2 = models.TaskTemplate.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/task_template/%s/' % id1, format='json', follow=True) - response2 = client.get('/task_template/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response1.data.items()) - for item in self.test_data_2.items(): - self.assertIn(item, response2.data.items()) - - - -class WorkRelationSelectionTemplateTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - # test data - test_data_1 = {"name": "observation", - "description": 'My one observation', - "version": 'v0.314159265359', - "schema": {"mykey": "my value"}, - "tags": ["TMSS", "TESTING"]} - - test_data_2 = {"name": "observation", - "description": 'My other observation', - "version": 'v3.14159265359', - "schema": {"mykey": "my other value"}, - "tags": []} - - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - def tearDown(self): - client.logout() - - def test_WorkRelationSelectionTemplate_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_WorkRelationSelectionTemplate_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_WorkRelationSelectionTemplate_list_view_shows_entry(self): - - # setup - entry = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1) - - # assert - response = client.get('/work_relation_selection_template/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response.data['results'][0].items()) - - def test_GET_WorkRelationSelectionTemplate_view_returns_correct_entry(self): - - # setup - id1 = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1).id - id2 = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/work_relation_selection_template/%s/' % id1, format='json', follow=True) - response2 = client.get('/work_relation_selection_template/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response1.data.items()) - for item in self.test_data_2.items(): - self.assertIn(item, response2.data.items()) - - -class TaskConnectorsTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - if populate: - populate_choices(None, None) # <- can only be called once per test case - - # test data - self.test_data_1 = {"role": models.Role.objects.get(value='calibrator'), - "datatype": models.Datatype.objects.get(value='instrument model'), - "output_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1), - "input_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2), - "tags": []} - - self.test_data_2 = {"role": models.Role.objects.get(value='target'), - "datatype": models.Datatype.objects.get(value='image'), - "output_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2), - "input_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1), - "tags": []} - - def tearDown(self): - client.logout() - - def test_GET_TaskConnectors_list_view_shows_entry(self): - - # setup - models.TaskConnectors.objects.create(**self.test_data_1) - - # assert - response = client.get('/task_connectors/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_GET_TaskConnectors_view_returns_correct_entry(self): - - # setup - id1 = models.TaskConnectors.objects.create(**self.test_data_1).id - id2 = models.TaskConnectors.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/task_connectors/%s/' % id1, format='json', follow=True) - response2 = client.get('/task_connectors/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_POST_TaskConnectors_prevents_missing_input_of(self): - - # setup - test_data_1 = dict(self.test_data_1) - test_data_1['input_of'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskConnectors.objects.create(**test_data_1) - - def test_POST_TaskConnectors_prevents_missing_output_of(self): - - # setup - test_data_1 = dict(self.test_data_1) - test_data_1['output_of'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskConnectors.objects.create(**test_data_1) - - def test_TaskConnectors_allows_setting_dataformats(self): - # Other then through the API view, we cannot assign ManyToMany on creation, but have to set it later - - test_data_1 = dict(self.test_data_1) - test_data_1['input_of'] = None - wior = models.TaskConnectors.objects.create(**self.test_data_2) - wior.dataformats.set([models.Dataformat.objects.get(value='Beamformed'), - models.Dataformat.objects.get(value='MeasurementSet')]) - wior.save() - - # assert - response = client.get('/task_connectors/%s/' % wior.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, self.test_data_2) - - -class CycleTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # test data - self.test_data_1 = {"name": 'my_cycle' + str(uuid.uuid4()), - "description": "", - "tags": [], - "start": datetime.utcnow().isoformat(), - "stop": datetime.utcnow().isoformat(), - "number": 1, - "standard_hours": 2, - "expert_hours": 3, - "filler_hours": 4} - - # test data - self.test_data_2 = {"name": 'my_cycle' + str(uuid.uuid4()), - "description": "This is my other cycle", - "tags": ['othercycle'], - "start": datetime.utcnow().isoformat(), - "stop": datetime.utcnow().isoformat(), - "number": 4, - "standard_hours": 3, - "expert_hours": 2, - "filler_hours": 1} - - def tearDown(self): - client.logout() - - def test_GET_Cycle_list_view_shows_entry(self): - - # setup - models.Cycle.objects.create(**self.test_data_1) - - # assert - response = client.get('/cycle/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response.data['results'][0].items()) - - def test_Cycle_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.Cycle.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_Cycle_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.Cycle.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_Cycle_view_returns_correct_entry(self): - - # setup - id1 = models.Cycle.objects.create(**self.test_data_1).name # name is pk - id2 = models.Cycle.objects.create(**self.test_data_2).name - - # assert - response1 = client.get('/cycle/%s/' % id1, format='json', follow=True) - response2 = client.get('/cycle/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response1.data.items()) - for item in self.test_data_2.items(): - self.assertIn(item, response2.data.items()) - - def test_Cycle_constains_list_of_related_projects(self): - - self.pt = ProjectTest() - self.pt.setUp() - - # setup - cycle = models.Cycle.objects.create(**self.test_data_1) - project1 = models.Project.objects.create(**self.pt.test_data_1) - project1.cycle = cycle - project1.save() - project2 = models.Project.objects.create(**self.pt.test_data_2) - project2.cycle = cycle - project2.save() - - # assert - response = client.get('/cycle/%s/' % cycle.name, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['projects'], [project1, project2]) - - - -class ProjectTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # test data - self.test_data_1 = {"name": 'my_project_' + str(uuid.uuid4()), - "description": "", - "tags": [], - "priority": 1, - "can_trigger": False, - "private_data": True, - "expert": True, - "filler": False} - - # test data - self.test_data_2 = {"name": 'my_project_' + str(uuid.uuid4()), - "description": "This is my other project", - "tags": ['othercycle'], - "priority": 42, - "can_trigger": True, - "private_data": False, - "expert": False, - "filler": True} - - - def tearDown(self): - client.logout() - - def test_GET_Project_list_view_shows_entry(self): - - # setup - models.Project.objects.create(**self.test_data_1) - - # assert - response = client.get('/project/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response.data['results'][0].items()) - - def test_Project_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.Project.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_Project_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.Project.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_Project_view_returns_correct_entry(self): - - # setup - id1 = models.Project.objects.create(**self.test_data_1).name # name is pk - id2 = models.Project.objects.create(**self.test_data_2).name - - # assert - response1 = client.get('/project/%s/' % id1, format='json', follow=True) - response2 = client.get('/project/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - for item in self.test_data_1.items(): - self.assertIn(item, response1.data.items()) - for item in self.test_data_2.items(): - self.assertIn(item, response2.data.items()) - - def test_nested_projects_are_filtered_according_to_cycle(self): - - self.ct = CycleTest() - self.ct.setUp() - - # setup - cycle_1 = models.Cycle.objects.create(**self.ct.test_data_1) - cycle_2 = models.Cycle.objects.create(**self.ct.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['cycle'] = cycle_1 - project_1 = models.Project.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['cycle'] = cycle_2 - project_2 = models.Project.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/cycle/%s/project/' % cycle_2.name, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned, name is pk - response = client.get('/cycle/%s/project/%s/' % (cycle_2.name, project_2.name) , format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned, name is pk - response = client.get('/cycle/%s/project/%s/' % (cycle_2.name, project_1.name) , format='json', follow=True) - self.assertEqual(response.status_code, 404) - - - -class SchedulingSetTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self): - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - pt = ProjectTest() - pt.setUp() - - # test data - self.test_data_1 = {"name": 'my_scheduling_set', - "description": "", - "tags": [], - "generator_doc": "{}", - "project": models.Project.objects.create(**pt.test_data_1), - "generator_template": None, - "generator_source": None} - - # test data - self.test_data_2 = {"name": 'my_other_scheduling_set', - "description": "This is my other scheduling set", - "tags": ['othercycle'], - "generator_doc": "{}", - "project": models.Project.objects.create(**pt.test_data_2), - "generator_template": models.GeneratorTemplate.objects.create(**GeneratorTemplateTest.test_data_2), - "generator_source": None} #models.GeneratorTemplate.objects.create(**sudt.test_data_2)} # todo: circular! - - def tearDown(self): - client.logout() - - def test_GET_SchedulingSet_list_view_shows_entry(self): - - # setup - models.SchedulingSet.objects.create(**self.test_data_1) - - # assert - response = client.get('/scheduling_set/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_SchedulingSet_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.SchedulingSet.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_SchedulingSet_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.SchedulingSet.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_SchedulingSet_view_returns_correct_entry(self): - - # setup - id1 = models.SchedulingSet.objects.create(**self.test_data_1).id - id2 = models.SchedulingSet.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/scheduling_set/%s/' % id1, format='json', follow=True) - response2 = client.get('/scheduling_set/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_SchedulingSet_prevents_missing_project(self): - - # setup - test_data = dict(self.test_data_1) - test_data['project'] = None - - # assert - with self.assertRaises(IntegrityError): - models.SchedulingSet.objects.create(**test_data) - - def test_SchedulingSet_contains_list_of_related_SchedulingUnitDraft(self): - - sudt = SchedulingUnitDraftTest() - sudt.setUp(populate=True) - - # setup - scheduling_set = models.SchedulingSet.objects.create(**self.test_data_1) - scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_1) - scheduling_unit_draft_1.scheduling_set = scheduling_set - scheduling_unit_draft_1.save() - scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_2) - scheduling_unit_draft_2.scheduling_set = scheduling_set - scheduling_unit_draft_2.save() - - # assert - response = client.get('/scheduling_set/%s/' % scheduling_set.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['scheduling_unit_drafts'], [scheduling_unit_draft_1, scheduling_unit_draft_2]) - - -class SchedulingUnitDraftTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - if populate: - populate_choices(None, None) # <- can only be called once per test case - - # set up dependencies - self.sst = SchedulingSetTest() - self.sst.setUp() - - # test data - self.test_data_1 = {"name": 'my_scheduling_unit_draft', - "description": "", - "tags": [], - "requirements_doc": "{}", - "copy_reason": models.CopyReason.objects.get(value='template'), - "generator_instance_doc": "para", - "copies": None, - "scheduling_set": models.SchedulingSet.objects.create(**self.sst.test_data_1), - "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_1)} - - # test data - self.test_data_2 = {"name": 'my_other_scheduling_unit_draft', - "description": "", - "tags": [], - "requirements_doc": "{}", - "copy_reason": models.CopyReason.objects.get(value='repeated'), # todo: should this require copies to be assigned another SchedulingUnitDraft? Or isn't this implicit, actually? - "generator_instance_doc": "meter", - "copies": None, - "scheduling_set": models.SchedulingSet.objects.create(**self.sst.test_data_2), - "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_2)} - - def tearDown(self): - client.logout() - - def test_GET_SchedulingUnitDraft_list_view_shows_entry(self): - - # setup - models.SchedulingUnitDraft.objects.create(**self.test_data_1) - - # assert - response = client.get('/scheduling_unit_draft/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_SchedulingUnitDraft_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.SchedulingUnitDraft.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_SchedulingUnitDraft_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.SchedulingUnitDraft.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_SchedulingUnitDraft_view_returns_correct_entry(self): - - # setup - id1 = models.SchedulingUnitDraft.objects.create(**self.test_data_1).id - id2 = models.SchedulingUnitDraft.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/scheduling_unit_draft/%s/' % id1, format='json', follow=True) - response2 = client.get('/scheduling_unit_draft/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_SchedulingUnitDraft_prevents_missing_template(self): - - # setup - test_data = dict(self.test_data_1) - test_data['requirements_template'] = None - - # assert - with self.assertRaises(IntegrityError): - models.SchedulingUnitDraft.objects.create(**test_data) - - def test_SchedulingUnitDraft_prevents_missing_scheduling_set(self): - - # setup - test_data = dict(self.test_data_1) - test_data['scheduling_set'] = None - - # assert - with self.assertRaises(IntegrityError): - models.SchedulingUnitDraft.objects.create(**test_data) - - def test_nested_SchedulingUnitDraft_are_filtered_according_to_SchedulingSet(self): - - sst = SchedulingSetTest() - sst.setUp() - - # setup - scheduling_set_1 = models.SchedulingSet.objects.create(**sst.test_data_1) - scheduling_set_2 = models.SchedulingSet.objects.create(**sst.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['scheduling_set'] = scheduling_set_1 - scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['scheduling_set'] = scheduling_set_2 - scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/scheduling_set/%s/scheduling_unit_draft/' % scheduling_set_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned - response = client.get('/scheduling_set/%s/scheduling_unit_draft/%s/' % (scheduling_set_2.id, scheduling_unit_draft_2.id) , format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/scheduling_set/%s/scheduling_unit_draft/%s/' % (scheduling_set_2.id, scheduling_unit_draft_1.id) , format='json', follow=True) - self.assertEqual(response.status_code, 404) - - def test_SchedulingUnitDraft_contains_list_of_related_SchedulingUnitBlueprint(self): - subt = SchedulingUnitBlueprintTest() - subt.setUp(populate=False) - - # setup - scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**self.test_data_1) - scheduling_unit_blueprint_1 = models.SchedulingUnitBlueprint.objects.create(**subt.test_data_1) - scheduling_unit_blueprint_1.draft = scheduling_unit_draft - scheduling_unit_blueprint_1.save() - scheduling_unit_blueprint_2 = models.SchedulingUnitBlueprint.objects.create(**subt.test_data_2) - scheduling_unit_blueprint_2.draft = scheduling_unit_draft - scheduling_unit_blueprint_2.save() - - # assert - response = client.get('/scheduling_unit_draft/%s/' % scheduling_unit_draft.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['related_scheduling_unit_blueprint'], [scheduling_unit_blueprint_1, scheduling_unit_blueprint_2]) - - def test_SchedulingUnitDraft_contains_list_of_related_TaskDraft(self): - tdt = TaskDraftTest() - tdt.setUp(populate=False) - - # setup - scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**self.test_data_1) - task_draft_1 = models.TaskDraft.objects.create(**tdt.test_data_1) - task_draft_1.scheduling_unit_draft = scheduling_unit_draft - task_draft_1.save() - task_draft_2 = models.TaskDraft.objects.create(**tdt.test_data_2) - task_draft_2.scheduling_unit_draft = scheduling_unit_draft - task_draft_2.save() - - # assert - response = client.get('/scheduling_unit_draft/%s/' % scheduling_unit_draft.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['task_drafts'], [task_draft_1, task_draft_2]) - - -class TaskDraftTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # set up dependencies - if populate: - populate_choices(None, None) # <- can only be called once per test case - - self.sudt = SchedulingUnitDraftTest() - self.sudt.setUp(populate=False) - - # test data - self.test_data_1 = {"name": 'my_task_draft', - "description": "", - "tags": [], - "specifications_doc": "{}", - "copy_reason": models.CopyReason.objects.get(value='template'), - "copies": None, - "scheduling_unit_draft": models.SchedulingUnitDraft.objects.create(**self.sudt.test_data_1), - "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1)} - - # test data - self.test_data_2 = {"name": 'my_other_task_draft', - "description": "", - "tags": [], - "specifications_doc": "{}", - "copy_reason": models.CopyReason.objects.get(value='repeated'), # todo: should this require copies to be assigned another SchedulingUnitDraft? Or isn't this implicit, actually? - "copies": None, - "scheduling_unit_draft": models.SchedulingUnitDraft.objects.create(**self.sudt.test_data_2), - "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2)} - - def tearDown(self): - client.logout() - - def test_GET_TaskDraft_list_view_shows_entry(self): - - # setup - models.TaskDraft.objects.create(**self.test_data_1) - - # assert - response = client.get('/task_draft/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_TaskDraft_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.TaskDraft.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_TaskDraft_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.TaskDraft.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_TaskDraft_view_returns_correct_entry(self): - - # setup - id1 = models.TaskDraft.objects.create(**self.test_data_1).id - id2 = models.TaskDraft.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/task_draft/%s/' % id1, format='json', follow=True) - response2 = client.get('/task_draft/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_TaskDraft_prevents_missing_template(self): - - # setup - test_data = dict(self.test_data_1) - test_data['specifications_template'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskDraft.objects.create(**test_data) - - def test_TaskDraft_prevents_missing_scheduling_unit_draft(self): - - # setup - test_data = dict(self.test_data_1) - test_data['scheduling_unit_draft'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskDraft.objects.create(**test_data) - - def test_nested_TaskDraft_are_filtered_according_to_SchedulingUnitDraft(self): - sudt = SchedulingUnitDraftTest() - sudt.setUp(populate=False) - - # setup - scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_1) - scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['scheduling_unit_draft'] = scheduling_unit_draft_1 - task_draft_1 = models.TaskDraft.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['scheduling_unit_draft'] = scheduling_unit_draft_2 - task_draft_2 = models.TaskDraft.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/scheduling_unit_draft/%s/task_draft/' % scheduling_unit_draft_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned - response = client.get('/scheduling_unit_draft/%s/task_draft/%s/' % (scheduling_unit_draft_2.id, task_draft_2.id), format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/scheduling_unit_draft/%s/task_draft/%s/' % (scheduling_unit_draft_2.id, task_draft_1.id), format='json', follow=True) - self.assertEqual(response.status_code, 404) - - - def test_TaskDraft_contains_list_of_related_TaskBlueprint(self): - - tbt = TaskBlueprintTest() - tbt.setUp(populate=False) - - # setup - task_draft = models.TaskDraft.objects.create(**self.test_data_1) - task_blueprint_1 = models.TaskBlueprint.objects.create(**tbt.test_data_1) - task_blueprint_1.draft = task_draft - task_blueprint_1.save() - task_blueprint_2 = models.TaskBlueprint.objects.create(**tbt.test_data_2) - task_blueprint_2.draft = task_draft - task_blueprint_2.save() - - # assert - response = client.get('/task_draft/%s/' % task_draft.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['related_task_blueprint'], [task_blueprint_1, task_blueprint_2]) - - - def test_TaskDraft_contains_lists_of_related_TaskRelationDraft(self): - - trdt = TaskRelationDraftTest() - trdt.setUp(populate=False) - - # setup - task_draft = models.TaskDraft.objects.create(**self.test_data_1) - task_relation_draft_1 = models.TaskRelationDraft.objects.create(**trdt.test_data_1) - task_relation_draft_1.producer = task_draft - task_relation_draft_1.save() - task_relation_draft_2 = models.TaskRelationDraft.objects.create(**trdt.test_data_2) - task_relation_draft_2.consumer = task_draft - task_relation_draft_2.save() - - # assert - response = client.get('/task_draft/%s/' % task_draft.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['produced_by'], [task_relation_draft_1]) - assertUrlList(self, response.data['consumed_by'], [task_relation_draft_2]) - - -class TaskRelationDraftTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # set up dependencies - if populate: - populate_choices(None, None) # <- can only be called once per test case - - self.wrdt = TaskDraftTest() - self.wrdt.setUp(populate=False) - - self.wiort = TaskConnectorsTest() - self.wiort.setUp(populate=False) - - # test data - self.test_data_1 = {"tags": [], - "selection_doc": "{}", - "dataformat": models.Dataformat.objects.get(value='Beamformed'), - "producer": models.TaskDraft.objects.create(**self.wrdt.test_data_1), - "consumer": models.TaskDraft.objects.create(**self.wrdt.test_data_2), - "input": models.TaskConnectors.objects.create(**self.wiort.test_data_1), - "output": models.TaskConnectors.objects.create(**self.wiort.test_data_2), - "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_1)} - - # test data - self.test_data_2 = {"tags": [], - "selection_doc": "{}", - "dataformat": models.Dataformat.objects.get(value='MeasurementSet'), - "producer": models.TaskDraft.objects.create(**self.wrdt.test_data_2), - "consumer": models.TaskDraft.objects.create(**self.wrdt.test_data_1), - "input": models.TaskConnectors.objects.create(**self.wiort.test_data_2), - "output": models.TaskConnectors.objects.create(**self.wiort.test_data_1), - "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_2)} - - def tearDown(self): - client.logout() - - def test_GET_TaskRelationDraft_list_view_shows_entry(self): - - # setup - models.TaskRelationDraft.objects.create(**self.test_data_1) - - # assert - response = client.get('/task_relation_draft/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_TaskRelationDraft_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.TaskRelationDraft.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_TaskRelationDraft_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.TaskRelationDraft.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_TaskRelationDraft_view_returns_correct_entry(self): - - # setup - id1 = models.TaskRelationDraft.objects.create(**self.test_data_1).id - id2 = models.TaskRelationDraft.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/task_relation_draft/%s/' % id1, format='json', follow=True) - response2 = client.get('/task_relation_draft/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_TaskRelationDraft_prevents_missing_template(self): - - # setup - test_data = dict(self.test_data_1) - test_data['selection_template'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationDraft.objects.create(**test_data) - - def test_TaskRelationDraft_prevents_missing_consumer(self): - - # setup - test_data = dict(self.test_data_1) - test_data['consumer'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationDraft.objects.create(**test_data) - - def test_TaskRelationDraft_prevents_missing_producer(self): - - # setup - test_data = dict(self.test_data_1) - test_data['producer'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationDraft.objects.create(**test_data) - - def test_nested_TaskRelationDraft_are_filtered_according_to_TaskDraft(self): - tdt = TaskDraftTest() - tdt.setUp(populate=False) - - # setup - task_draft_1 = models.TaskDraft.objects.create(**tdt.test_data_1) - task_draft_2 = models.TaskDraft.objects.create(**tdt.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['producer'] = task_draft_1 - task_relation_draft_1 = models.TaskRelationDraft.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['consumer'] = task_draft_2 - task_relation_draft_2 = models.TaskRelationDraft.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/task_draft/%s/task_relation_draft/' % task_draft_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related producer is returned - response = client.get('/task_draft/%s/task_relation_draft/%s/' % (task_draft_1.id, task_relation_draft_1.id), format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_1) - - # assert an existing related consumer is returned - response = client.get('/task_draft/%s/task_relation_draft/%s/' % (task_draft_2.id, task_relation_draft_2.id), format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/task_draft/%s/task_relation_draft/%s/' % (task_draft_2.id, task_relation_draft_1.id), format='json', follow=True) - self.assertEqual(response.status_code, 404) - -class SchedulingUnitBlueprintTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # set up dependencies - if populate: - populate_choices(None, None) # <- can only be called once per test case - - self.rdt = SchedulingUnitDraftTest() - self.rdt.setUp(populate=False) - - # test data - self.test_data_1 = {"name": 'my_scheduling_unit_blueprint', - "description": "", - "tags": [], - "requirements_doc": "{}", - "do_cancel": False, - "draft": models.SchedulingUnitDraft.objects.create(**self.rdt.test_data_1), - "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_1)} - - # test data - self.test_data_2 = {"name": 'my_other_scheduling_unit_blueprint', - "description": "", - "tags": [], - "requirements_doc": "{}", - "do_cancel": True, - "draft": models.SchedulingUnitDraft.objects.create(**self.rdt.test_data_2), - "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_2)} - - def tearDown(self): - client.logout() - - def test_GET_SchedulingUnitBlueprint_list_view_shows_entry(self): - - # setup - models.SchedulingUnitBlueprint.objects.create(**self.test_data_1) - - # assert - response = client.get('/scheduling_unit_blueprint/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_SchedulingUnitBlueprint_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.SchedulingUnitBlueprint.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_SchedulingUnitBlueprint_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.SchedulingUnitBlueprint.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_SchedulingUnitBlueprint_view_returns_correct_entry(self): - - # setup - id1 = models.SchedulingUnitBlueprint.objects.create(**self.test_data_1).id - id2 = models.SchedulingUnitBlueprint.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/scheduling_unit_blueprint/%s/' % id1, format='json', follow=True) - response2 = client.get('/scheduling_unit_blueprint/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_SchedulingUnitBlueprint_prevents_missing_template(self): - - # setup - test_data = dict(self.test_data_1) - test_data['requirements_template'] = None - - # assert - with self.assertRaises(IntegrityError): - models.SchedulingUnitBlueprint.objects.create(**test_data) - - def test_SchedulingUnitBlueprint_prevents_missing_draft(self): - - # setup - test_data = dict(self.test_data_1) - test_data['draft'] = None - - # assert - with self.assertRaises(IntegrityError): - models.SchedulingUnitBlueprint.objects.create(**test_data) - - def test_nested_SchedulingUnitBlueprint_are_filtered_according_to_SchedulingUnitDraft(self): - - sudt = SchedulingUnitDraftTest() - sudt.setUp(populate=False) - - # setup - scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_1) - scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['draft'] = scheduling_unit_draft_1 - scheduling_unit_blueprint_1 = models.SchedulingUnitBlueprint.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['draft'] = scheduling_unit_draft_2 - scheduling_unit_blueprint_2 = models.SchedulingUnitBlueprint.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/scheduling_unit_draft/%s/scheduling_unit_blueprint/' % scheduling_unit_draft_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned - response = client.get('/scheduling_unit_draft/%s/scheduling_unit_blueprint/%s/' % (scheduling_unit_draft_2.id, scheduling_unit_blueprint_2.id) , format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/scheduling_unit_draft/%s/scheduling_unit_blueprint/%s/' % (scheduling_unit_draft_2.id, scheduling_unit_blueprint_1.id) , format='json', follow=True) - self.assertEqual(response.status_code, 404) - - - -class TaskBlueprintTest(rest_framework.test.APITransactionTestCase): - - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # set up dependencies - if populate: - populate_choices(None, None) # <- can only be called once per test case - - self.wrdt = TaskDraftTest() - self.wrdt.setUp(populate=False) - - self.rbt = SchedulingUnitBlueprintTest() - self.rbt.setUp(populate=False) - - # test data - self.test_data_1 = {"name": 'my_task_blueprint', - "description": "", - "tags": [], - "specifications_doc": "{}", - "do_cancel": False, - "draft": models.TaskDraft.objects.create(**self.wrdt.test_data_1), - "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1), - "scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**self.rbt.test_data_1)} - - # test data - self.test_data_2 = {"name": 'my_other_task_blueprin', # <- Missing a 't'? Well, varchar(30) is shorter than you'd think... - "description": "", - "tags": [], - "specifications_doc": "{}", - "do_cancel": True, - "draft": models.TaskDraft.objects.create(**self.wrdt.test_data_2), - "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2), - "scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**self.rbt.test_data_2)} - - def tearDown(self): - client.logout() - - def test_GET_TaskBlueprint_list_view_shows_entry(self): - - # setup - models.TaskBlueprint.objects.create(**self.test_data_1) - - # assert - response = client.get('/task_blueprint/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_TaskBlueprint_gets_created_with_correct_creation_timestamp(self): - - # setup - before = datetime.utcnow() - entry = models.TaskBlueprint.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_TaskBlueprint_update_timestamp_gets_changed_correctly(self): - - # setup - entry = models.TaskBlueprint.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_TaskBlueprint_view_returns_correct_entry(self): - - # setup - id1 = models.TaskBlueprint.objects.create(**self.test_data_1).id - id2 = models.TaskBlueprint.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/task_blueprint/%s/' % id1, format='json', follow=True) - response2 = client.get('/task_blueprint/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_TaskBlueprint_prevents_missing_template(self): - - # setup - test_data = dict(self.test_data_1) - test_data['specifications_template'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskBlueprint.objects.create(**test_data) - - def test_TaskBlueprint_prevents_missing_draft(self): - - # setup - test_data = dict(self.test_data_1) - test_data['draft'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskBlueprint.objects.create(**test_data) - - def test_TaskBlueprint_prevents_missing_scheduling_unit_blueprint(self): - - # setup - test_data = dict(self.test_data_1) - test_data['scheduling_unit_blueprint'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskBlueprint.objects.create(**test_data) - - def test_nested_TaskBlueprint_are_filtered_according_to_TaskDraft(self): - tdt = TaskDraftTest() - tdt.setUp(populate=False) - - # setup - task_draft_1 = models.TaskDraft.objects.create(**tdt.test_data_1) - task_draft_2 = models.TaskDraft.objects.create(**tdt.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['draft'] = task_draft_1 - task_blueprint_1 = models.TaskBlueprint.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['draft'] = task_draft_2 - task_blueprint_2 = models.TaskBlueprint.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/task_draft/%s/task_blueprint/' % task_draft_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned - response = client.get('/task_draft/%s/task_blueprint/%s/' % (task_draft_2.id, task_blueprint_2.id), format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/task_draft/%s/task_blueprint/%s/' % (task_draft_2.id, task_blueprint_1.id), format='json', follow=True) - self.assertEqual(response.status_code, 404) - - def test_TaskBlueprint_contains_list_of_related_Subtask(self): - - from t_tmssapp_scheduling_django import SubtaskTest # Note: cannot do this on module level due to circular import - st = SubtaskTest() - st.setUp(populate=False) - - # setup - task_blueprint = models.TaskBlueprint.objects.create(**self.test_data_1) - subtask_1 = models.Subtask.objects.create(**st.test_data_1) - subtask_1.task_blueprint = task_blueprint - subtask_1.save() - subtask_2 = models.Subtask.objects.create(**st.test_data_2) - subtask_2.task_blueprint = task_blueprint - subtask_2.save() - - # assert - response = client.get('/task_blueprint/%s/' % task_blueprint.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['subtasks'], [subtask_1, subtask_2]) - - def test_TaskBlueprint_contains_lists_of_related_TaskRelationBlueprint(self): - - trbt = TaskRelationBlueprintTest() - trbt.setUp(populate=False) - - # setup - task_blueprint = models.TaskBlueprint.objects.create(**self.test_data_1) - task_relation_blueprint_1 = models.TaskRelationBlueprint.objects.create(**trbt.test_data_1) - task_relation_blueprint_1.producer = task_blueprint - task_relation_blueprint_1.save() - task_relation_blueprint_2 = models.TaskRelationBlueprint.objects.create(**trbt.test_data_2) - task_relation_blueprint_2.consumer = task_blueprint - task_relation_blueprint_2.save() - - # assert - response = client.get('/task_blueprint/%s/' % task_blueprint.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertUrlList(self, response.data['produced_by'], [task_relation_blueprint_1]) - assertUrlList(self, response.data['consumed_by'], [task_relation_blueprint_2]) - - -class TaskRelationBlueprintTest(rest_framework.test.APITransactionTestCase): - reset_sequences = True - - def setUp(self, populate=True): - - user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') - client.force_login(user) - - # set up dependencies - if populate: - populate_choices(None, None) # <- can only be called once per test case - - self.wrrdt = TaskRelationDraftTest() - self.wrrdt.setUp(populate=False) - - self.wrbt = TaskBlueprintTest() - self.wrbt.setUp(populate=False) - - self.wiort = TaskConnectorsTest() - self.wiort.setUp(populate=False) - - # test data - self.test_data_1 = {"tags": [], - "selection_doc": "{}", - "dataformat": models.Dataformat.objects.get(value='Beamformed'), - "input": models.TaskConnectors.objects.create(**self.wiort.test_data_1), - "output": models.TaskConnectors.objects.create(**self.wiort.test_data_2), - "draft": models.TaskRelationDraft.objects.create(**self.wrrdt.test_data_1), - "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_1), - "producer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_1), - "consumer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_2)} - - # test data - self.test_data_2 = {"tags": [], - "selection_doc": "{}", - "dataformat": models.Dataformat.objects.get(value='Beamformed'), - "input": models.TaskConnectors.objects.create(**self.wiort.test_data_1), - "output": models.TaskConnectors.objects.create(**self.wiort.test_data_2), - "draft": models.TaskRelationDraft.objects.create(**self.wrrdt.test_data_2), - "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_2), - "producer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_2), - "consumer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_1)} - - def tearDown(self): - client.logout() - - def test_GET_TaskRelationBlueprint_list_view_shows_entry(self): - # setup - models.TaskRelationBlueprint.objects.create(**self.test_data_1) - - # assert - response = client.get('/task_relation_blueprint/', format='json', follow=True) - self.assertEqual(response.status_code, 200) - - assertDataWithUrls(self, response.data['results'][0], self.test_data_1) - - def test_TaskRelationBlueprint_gets_created_with_correct_creation_timestamp(self): - # setup - before = datetime.utcnow() - entry = models.TaskRelationBlueprint.objects.create(**self.test_data_1) - - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.created_at) - self.assertGreater(after, entry.created_at) - - def test_TaskRelationBlueprint_update_timestamp_gets_changed_correctly(self): - # setup - entry = models.TaskRelationBlueprint.objects.create(**self.test_data_1) - before = datetime.utcnow() - entry.save() - after = datetime.utcnow() - - # assert - self.assertLess(before, entry.updated_at) - self.assertGreater(after, entry.updated_at) - - def test_GET_TaskRelationBlueprint_view_returns_correct_entry(self): - # setup - id1 = models.TaskRelationBlueprint.objects.create(**self.test_data_1).id - id2 = models.TaskRelationBlueprint.objects.create(**self.test_data_2).id - - # assert - response1 = client.get('/task_relation_blueprint/%s/' % id1, format='json', follow=True) - response2 = client.get('/task_relation_blueprint/%s/' % id2, format='json', follow=True) - self.assertEqual(response1.status_code, 200) - self.assertEqual(response2.status_code, 200) - assertDataWithUrls(self, response1.data, self.test_data_1) - assertDataWithUrls(self, response2.data, self.test_data_2) - - def test_TaskRelationBlueprint_prevents_missing_selection_template(self): - # setup - test_data = dict(self.test_data_1) - test_data['selection_template'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationBlueprint.objects.create(**test_data) - - def test_TaskRelationBlueprint_prevents_missing_draft(self): - # setup - test_data = dict(self.test_data_1) - test_data['draft'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationBlueprint.objects.create(**test_data) - - def test_TaskRelationBlueprint_prevents_missing_producer(self): - # setup - test_data = dict(self.test_data_1) - test_data['producer'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationBlueprint.objects.create(**test_data) - - def test_TaskRelationBlueprint_prevents_missing_consumer(self): - # setup - test_data = dict(self.test_data_1) - test_data['consumer'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationBlueprint.objects.create(**test_data) - - def test_TaskRelationBlueprint_prevents_missing_input(self): - # setup - test_data = dict(self.test_data_1) - test_data['input'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationBlueprint.objects.create(**test_data) - - def test_TaskRelationBlueprint_prevents_missing_output(self): - # setup - test_data = dict(self.test_data_1) - test_data['output'] = None - - # assert - with self.assertRaises(IntegrityError): - models.TaskRelationBlueprint.objects.create(**test_data) - - def test_nested_TaskRelationBlueprint_are_filtered_according_to_TaskRelationDraft(self): - trdt = TaskRelationDraftTest() - trdt.setUp(populate=False) - - # setup - task_relation_draft_1 = models.TaskRelationDraft.objects.create(**trdt.test_data_1) - task_relation_draft_2 = models.TaskRelationDraft.objects.create(**trdt.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['draft'] = task_relation_draft_1 - task_relation_blueprint_1 = models.TaskRelationBlueprint.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['draft'] = task_relation_draft_2 - task_relation_blueprint_2 = models.TaskRelationBlueprint.objects.create(**test_data_2) - - # assert the returned list contains related items - response = client.get('/task_relation_draft/%s/task_relation_blueprint/' % task_relation_draft_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned - response = client.get('/task_relation_draft/%s/task_relation_blueprint/%s/' % (task_relation_draft_2.id, task_relation_blueprint_2.id), format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/task_relation_draft/%s/task_relation_blueprint/%s/' % (task_relation_draft_2.id, task_relation_blueprint_1.id), format='json', follow=True) - self.assertEqual(response.status_code, 404) - - def test_nested_TaskRelationBlueprint_are_filtered_according_to_TaskBlueprint(self): - tbt = TaskBlueprintTest() - tbt.setUp(populate=False) - - # setup - task_blueprint_1 = models.TaskBlueprint.objects.create(**tbt.test_data_1) - task_blueprint_2 = models.TaskBlueprint.objects.create(**tbt.test_data_2) - test_data_1 = dict(self.test_data_1) - test_data_1['producer'] = task_blueprint_1 - task_relation_blueprint_1 = models.TaskRelationBlueprint.objects.create(**test_data_1) - test_data_2 = dict(self.test_data_2) - test_data_2['consumer'] = task_blueprint_2 - task_relation_blueprint_2 = models.TaskRelationBlueprint.objects.create(**test_data_2) - - # assert the returned list contains related producer - response = client.get('/task_blueprint/%s/task_relation_blueprint/' % task_blueprint_1.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_1) - - # assert the returned list contains related consumer - response = client.get('/task_blueprint/%s/task_relation_blueprint/' % task_blueprint_2.id, format='json', follow=True) - self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data['results']), 1) - assertDataWithUrls(self, response.data['results'][0], test_data_2) - - # assert an existing related item is returned - response = client.get('/task_blueprint/%s/task_relation_blueprint/%s/' % (task_blueprint_2.id, task_relation_blueprint_2.id), format='json', follow=True) - self.assertEqual(response.status_code, 200) - assertDataWithUrls(self, response.data, test_data_2) - - # assert an existing unrelated item is not returned - response = client.get('/task_blueprint/%s/task_relation_blueprint/%s/' % (task_blueprint_2.id, task_relation_blueprint_1.id), format='json', follow=True) - self.assertEqual(response.status_code, 404) - + # TODO: rest API testing should be moved out of this test module. + # def test_GET_TaskTemplate_list_view_shows_entry(self): + # + # # setup + # entry = models.TaskTemplate.objects.create(**self.test_data_1) + # + # # assert + # response = client.get('/task_template/', format='json', follow=True) + # self.assertEqual(response.status_code, 200) + # for item in self.test_data_1.items(): + # self.assertIn(item, response.data['results'][0].items()) + + # TODO: rest API testing should be moved out of this test module. + # def test_GET_TaskTemplate_view_returns_correct_entry(self): + # + # # setup + # id1 = models.TaskTemplate.objects.create(**self.test_data_1).id + # id2 = models.TaskTemplate.objects.create(**self.test_data_2).id + # + # # assert + # response1 = client.get('/task_template/%s/' % id1, format='json', follow=True) + # response2 = client.get('/task_template/%s/' % id2, format='json', follow=True) + # self.assertEqual(response1.status_code, 200) + # self.assertEqual(response2.status_code, 200) + # for item in self.test_data_1.items(): + # self.assertIn(item, response1.data.items()) + # for item in self.test_data_2.items(): + # self.assertIn(item, response2.data.items()) + + + +# class WorkRelationSelectionTemplateTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# # test data +# test_data_1 = {"name": "observation", +# "description": 'My one observation', +# "version": 'v0.314159265359', +# "schema": {"mykey": "my value"}, +# "tags": ["TMSS", "TESTING"]} +# +# test_data_2 = {"name": "observation", +# "description": 'My other observation', +# "version": 'v3.14159265359', +# "schema": {"mykey": "my other value"}, +# "tags": []} +# +# def setUp(self): +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# def tearDown(self): +# client.logout() +# +# def test_WorkRelationSelectionTemplate_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_WorkRelationSelectionTemplate_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_WorkRelationSelectionTemplate_list_view_shows_entry(self): +# +# # setup +# entry = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/work_relation_selection_template/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# for item in self.test_data_1.items(): +# self.assertIn(item, response.data['results'][0].items()) +# +# def test_GET_WorkRelationSelectionTemplate_view_returns_correct_entry(self): +# +# # setup +# id1 = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_1).id +# id2 = models.WorkRelationSelectionTemplate.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/work_relation_selection_template/%s/' % id1, format='json', follow=True) +# response2 = client.get('/work_relation_selection_template/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# for item in self.test_data_1.items(): +# self.assertIn(item, response1.data.items()) +# for item in self.test_data_2.items(): +# self.assertIn(item, response2.data.items()) +# +# +# class TaskConnectorsTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# # test data +# self.test_data_1 = {"role": models.Role.objects.get(value='calibrator'), +# "datatype": models.Datatype.objects.get(value='instrument model'), +# "output_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1), +# "input_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2), +# "tags": []} +# +# self.test_data_2 = {"role": models.Role.objects.get(value='target'), +# "datatype": models.Datatype.objects.get(value='image'), +# "output_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2), +# "input_of": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1), +# "tags": []} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_TaskConnectors_list_view_shows_entry(self): +# +# # setup +# models.TaskConnectors.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/task_connectors/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_GET_TaskConnectors_view_returns_correct_entry(self): +# +# # setup +# id1 = models.TaskConnectors.objects.create(**self.test_data_1).id +# id2 = models.TaskConnectors.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/task_connectors/%s/' % id1, format='json', follow=True) +# response2 = client.get('/task_connectors/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_POST_TaskConnectors_prevents_missing_input_of(self): +# +# # setup +# test_data_1 = dict(self.test_data_1) +# test_data_1['input_of'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskConnectors.objects.create(**test_data_1) +# +# def test_POST_TaskConnectors_prevents_missing_output_of(self): +# +# # setup +# test_data_1 = dict(self.test_data_1) +# test_data_1['output_of'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskConnectors.objects.create(**test_data_1) +# +# def test_TaskConnectors_allows_setting_dataformats(self): +# # Other then through the API view, we cannot assign ManyToMany on creation, but have to set it later +# +# test_data_1 = dict(self.test_data_1) +# test_data_1['input_of'] = None +# wior = models.TaskConnectors.objects.create(**self.test_data_2) +# wior.dataformats.set([models.Dataformat.objects.get(value='Beamformed'), +# models.Dataformat.objects.get(value='MeasurementSet')]) +# wior.save() +# +# # assert +# response = client.get('/task_connectors/%s/' % wior.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, self.test_data_2) +# +# +# class CycleTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self): +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # test data +# self.test_data_1 = {"name": 'my_cycle' + str(uuid.uuid4()), +# "description": "", +# "tags": [], +# "start": datetime.utcnow().isoformat(), +# "stop": datetime.utcnow().isoformat(), +# "number": 1, +# "standard_hours": 2, +# "expert_hours": 3, +# "filler_hours": 4} +# +# # test data +# self.test_data_2 = {"name": 'my_cycle' + str(uuid.uuid4()), +# "description": "This is my other cycle", +# "tags": ['othercycle'], +# "start": datetime.utcnow().isoformat(), +# "stop": datetime.utcnow().isoformat(), +# "number": 4, +# "standard_hours": 3, +# "expert_hours": 2, +# "filler_hours": 1} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_Cycle_list_view_shows_entry(self): +# +# # setup +# models.Cycle.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/cycle/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# for item in self.test_data_1.items(): +# self.assertIn(item, response.data['results'][0].items()) +# +# def test_Cycle_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.Cycle.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_Cycle_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.Cycle.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_Cycle_view_returns_correct_entry(self): +# +# # setup +# id1 = models.Cycle.objects.create(**self.test_data_1).name # name is pk +# id2 = models.Cycle.objects.create(**self.test_data_2).name +# +# # assert +# response1 = client.get('/cycle/%s/' % id1, format='json', follow=True) +# response2 = client.get('/cycle/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# for item in self.test_data_1.items(): +# self.assertIn(item, response1.data.items()) +# for item in self.test_data_2.items(): +# self.assertIn(item, response2.data.items()) +# +# def test_Cycle_constains_list_of_related_projects(self): +# +# self.pt = ProjectTest() +# self.pt.setUp() +# +# # setup +# cycle = models.Cycle.objects.create(**self.test_data_1) +# project1 = models.Project.objects.create(**self.pt.test_data_1) +# project1.cycle = cycle +# project1.save() +# project2 = models.Project.objects.create(**self.pt.test_data_2) +# project2.cycle = cycle +# project2.save() +# +# # assert +# response = client.get('/cycle/%s/' % cycle.name, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['projects'], [project1, project2]) +# +# +# +# class ProjectTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self): +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # test data +# self.test_data_1 = {"name": 'my_project_' + str(uuid.uuid4()), +# "description": "", +# "tags": [], +# "priority": 1, +# "can_trigger": False, +# "private_data": True, +# "expert": True, +# "filler": False} +# +# # test data +# self.test_data_2 = {"name": 'my_project_' + str(uuid.uuid4()), +# "description": "This is my other project", +# "tags": ['othercycle'], +# "priority": 42, +# "can_trigger": True, +# "private_data": False, +# "expert": False, +# "filler": True} +# +# +# def tearDown(self): +# client.logout() +# +# def test_GET_Project_list_view_shows_entry(self): +# +# # setup +# models.Project.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/project/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# for item in self.test_data_1.items(): +# self.assertIn(item, response.data['results'][0].items()) +# +# def test_Project_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.Project.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_Project_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.Project.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_Project_view_returns_correct_entry(self): +# +# # setup +# id1 = models.Project.objects.create(**self.test_data_1).name # name is pk +# id2 = models.Project.objects.create(**self.test_data_2).name +# +# # assert +# response1 = client.get('/project/%s/' % id1, format='json', follow=True) +# response2 = client.get('/project/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# for item in self.test_data_1.items(): +# self.assertIn(item, response1.data.items()) +# for item in self.test_data_2.items(): +# self.assertIn(item, response2.data.items()) +# +# def test_nested_projects_are_filtered_according_to_cycle(self): +# +# self.ct = CycleTest() +# self.ct.setUp() +# +# # setup +# cycle_1 = models.Cycle.objects.create(**self.ct.test_data_1) +# cycle_2 = models.Cycle.objects.create(**self.ct.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['cycle'] = cycle_1 +# project_1 = models.Project.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['cycle'] = cycle_2 +# project_2 = models.Project.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/cycle/%s/project/' % cycle_2.name, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned, name is pk +# response = client.get('/cycle/%s/project/%s/' % (cycle_2.name, project_2.name) , format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned, name is pk +# response = client.get('/cycle/%s/project/%s/' % (cycle_2.name, project_1.name) , format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# +# +# class SchedulingSetTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self): +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# pt = ProjectTest() +# pt.setUp() +# +# # test data +# self.test_data_1 = {"name": 'my_scheduling_set', +# "description": "", +# "tags": [], +# "generator_doc": "{}", +# "project": models.Project.objects.create(**pt.test_data_1), +# "generator_template": None, +# "generator_source": None} +# +# # test data +# self.test_data_2 = {"name": 'my_other_scheduling_set', +# "description": "This is my other scheduling set", +# "tags": ['othercycle'], +# "generator_doc": "{}", +# "project": models.Project.objects.create(**pt.test_data_2), +# "generator_template": models.GeneratorTemplate.objects.create(**GeneratorTemplateTest.test_data_2), +# "generator_source": None} #models.GeneratorTemplate.objects.create(**sudt.test_data_2)} # todo: circular! +# +# def tearDown(self): +# client.logout() +# +# def test_GET_SchedulingSet_list_view_shows_entry(self): +# +# # setup +# models.SchedulingSet.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/scheduling_set/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_SchedulingSet_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.SchedulingSet.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_SchedulingSet_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.SchedulingSet.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_SchedulingSet_view_returns_correct_entry(self): +# +# # setup +# id1 = models.SchedulingSet.objects.create(**self.test_data_1).id +# id2 = models.SchedulingSet.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/scheduling_set/%s/' % id1, format='json', follow=True) +# response2 = client.get('/scheduling_set/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_SchedulingSet_prevents_missing_project(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['project'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.SchedulingSet.objects.create(**test_data) +# +# def test_SchedulingSet_contains_list_of_related_SchedulingUnitDraft(self): +# +# sudt = SchedulingUnitDraftTest() +# sudt.setUp(populate=True) +# +# # setup +# scheduling_set = models.SchedulingSet.objects.create(**self.test_data_1) +# scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_1) +# scheduling_unit_draft_1.scheduling_set = scheduling_set +# scheduling_unit_draft_1.save() +# scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_2) +# scheduling_unit_draft_2.scheduling_set = scheduling_set +# scheduling_unit_draft_2.save() +# +# # assert +# response = client.get('/scheduling_set/%s/' % scheduling_set.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['scheduling_unit_drafts'], [scheduling_unit_draft_1, scheduling_unit_draft_2]) +# +# +# class SchedulingUnitDraftTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# # set up dependencies +# self.sst = SchedulingSetTest() +# self.sst.setUp() +# +# # test data +# self.test_data_1 = {"name": 'my_scheduling_unit_draft', +# "description": "", +# "tags": [], +# "requirements_doc": "{}", +# "copy_reason": models.CopyReason.objects.get(value='template'), +# "generator_instance_doc": "para", +# "copies": None, +# "scheduling_set": models.SchedulingSet.objects.create(**self.sst.test_data_1), +# "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_1)} +# +# # test data +# self.test_data_2 = {"name": 'my_other_scheduling_unit_draft', +# "description": "", +# "tags": [], +# "requirements_doc": "{}", +# "copy_reason": models.CopyReason.objects.get(value='repeated'), # todo: should this require copies to be assigned another SchedulingUnitDraft? Or isn't this implicit, actually? +# "generator_instance_doc": "meter", +# "copies": None, +# "scheduling_set": models.SchedulingSet.objects.create(**self.sst.test_data_2), +# "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_2)} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_SchedulingUnitDraft_list_view_shows_entry(self): +# +# # setup +# models.SchedulingUnitDraft.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/scheduling_unit_draft/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_SchedulingUnitDraft_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.SchedulingUnitDraft.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_SchedulingUnitDraft_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.SchedulingUnitDraft.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_SchedulingUnitDraft_view_returns_correct_entry(self): +# +# # setup +# id1 = models.SchedulingUnitDraft.objects.create(**self.test_data_1).id +# id2 = models.SchedulingUnitDraft.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/scheduling_unit_draft/%s/' % id1, format='json', follow=True) +# response2 = client.get('/scheduling_unit_draft/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_SchedulingUnitDraft_prevents_missing_template(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['requirements_template'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.SchedulingUnitDraft.objects.create(**test_data) +# +# def test_SchedulingUnitDraft_prevents_missing_scheduling_set(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['scheduling_set'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.SchedulingUnitDraft.objects.create(**test_data) +# +# def test_nested_SchedulingUnitDraft_are_filtered_according_to_SchedulingSet(self): +# +# sst = SchedulingSetTest() +# sst.setUp() +# +# # setup +# scheduling_set_1 = models.SchedulingSet.objects.create(**sst.test_data_1) +# scheduling_set_2 = models.SchedulingSet.objects.create(**sst.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['scheduling_set'] = scheduling_set_1 +# scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['scheduling_set'] = scheduling_set_2 +# scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/scheduling_set/%s/scheduling_unit_draft/' % scheduling_set_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned +# response = client.get('/scheduling_set/%s/scheduling_unit_draft/%s/' % (scheduling_set_2.id, scheduling_unit_draft_2.id) , format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/scheduling_set/%s/scheduling_unit_draft/%s/' % (scheduling_set_2.id, scheduling_unit_draft_1.id) , format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# def test_SchedulingUnitDraft_contains_list_of_related_SchedulingUnitBlueprint(self): +# subt = SchedulingUnitBlueprintTest() +# subt.setUp(populate=False) +# +# # setup +# scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**self.test_data_1) +# scheduling_unit_blueprint_1 = models.SchedulingUnitBlueprint.objects.create(**subt.test_data_1) +# scheduling_unit_blueprint_1.draft = scheduling_unit_draft +# scheduling_unit_blueprint_1.save() +# scheduling_unit_blueprint_2 = models.SchedulingUnitBlueprint.objects.create(**subt.test_data_2) +# scheduling_unit_blueprint_2.draft = scheduling_unit_draft +# scheduling_unit_blueprint_2.save() +# +# # assert +# response = client.get('/scheduling_unit_draft/%s/' % scheduling_unit_draft.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['related_scheduling_unit_blueprint'], [scheduling_unit_blueprint_1, scheduling_unit_blueprint_2]) +# +# def test_SchedulingUnitDraft_contains_list_of_related_TaskDraft(self): +# tdt = TaskDraftTest() +# tdt.setUp(populate=False) +# +# # setup +# scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(**self.test_data_1) +# task_draft_1 = models.TaskDraft.objects.create(**tdt.test_data_1) +# task_draft_1.scheduling_unit_draft = scheduling_unit_draft +# task_draft_1.save() +# task_draft_2 = models.TaskDraft.objects.create(**tdt.test_data_2) +# task_draft_2.scheduling_unit_draft = scheduling_unit_draft +# task_draft_2.save() +# +# # assert +# response = client.get('/scheduling_unit_draft/%s/' % scheduling_unit_draft.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['task_drafts'], [task_draft_1, task_draft_2]) +# +# +# class TaskDraftTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # set up dependencies +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# self.sudt = SchedulingUnitDraftTest() +# self.sudt.setUp(populate=False) +# +# # test data +# self.test_data_1 = {"name": 'my_task_draft', +# "description": "", +# "tags": [], +# "specifications_doc": "{}", +# "copy_reason": models.CopyReason.objects.get(value='template'), +# "copies": None, +# "scheduling_unit_draft": models.SchedulingUnitDraft.objects.create(**self.sudt.test_data_1), +# "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1)} +# +# # test data +# self.test_data_2 = {"name": 'my_other_task_draft', +# "description": "", +# "tags": [], +# "specifications_doc": "{}", +# "copy_reason": models.CopyReason.objects.get(value='repeated'), # todo: should this require copies to be assigned another SchedulingUnitDraft? Or isn't this implicit, actually? +# "copies": None, +# "scheduling_unit_draft": models.SchedulingUnitDraft.objects.create(**self.sudt.test_data_2), +# "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2)} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_TaskDraft_list_view_shows_entry(self): +# +# # setup +# models.TaskDraft.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/task_draft/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_TaskDraft_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.TaskDraft.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_TaskDraft_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.TaskDraft.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_TaskDraft_view_returns_correct_entry(self): +# +# # setup +# id1 = models.TaskDraft.objects.create(**self.test_data_1).id +# id2 = models.TaskDraft.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/task_draft/%s/' % id1, format='json', follow=True) +# response2 = client.get('/task_draft/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_TaskDraft_prevents_missing_template(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['specifications_template'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskDraft.objects.create(**test_data) +# +# def test_TaskDraft_prevents_missing_scheduling_unit_draft(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['scheduling_unit_draft'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskDraft.objects.create(**test_data) +# +# def test_nested_TaskDraft_are_filtered_according_to_SchedulingUnitDraft(self): +# sudt = SchedulingUnitDraftTest() +# sudt.setUp(populate=False) +# +# # setup +# scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_1) +# scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['scheduling_unit_draft'] = scheduling_unit_draft_1 +# task_draft_1 = models.TaskDraft.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['scheduling_unit_draft'] = scheduling_unit_draft_2 +# task_draft_2 = models.TaskDraft.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/scheduling_unit_draft/%s/task_draft/' % scheduling_unit_draft_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned +# response = client.get('/scheduling_unit_draft/%s/task_draft/%s/' % (scheduling_unit_draft_2.id, task_draft_2.id), format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/scheduling_unit_draft/%s/task_draft/%s/' % (scheduling_unit_draft_2.id, task_draft_1.id), format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# +# def test_TaskDraft_contains_list_of_related_TaskBlueprint(self): +# +# tbt = TaskBlueprintTest() +# tbt.setUp(populate=False) +# +# # setup +# task_draft = models.TaskDraft.objects.create(**self.test_data_1) +# task_blueprint_1 = models.TaskBlueprint.objects.create(**tbt.test_data_1) +# task_blueprint_1.draft = task_draft +# task_blueprint_1.save() +# task_blueprint_2 = models.TaskBlueprint.objects.create(**tbt.test_data_2) +# task_blueprint_2.draft = task_draft +# task_blueprint_2.save() +# +# # assert +# response = client.get('/task_draft/%s/' % task_draft.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['related_task_blueprint'], [task_blueprint_1, task_blueprint_2]) +# +# +# def test_TaskDraft_contains_lists_of_related_TaskRelationDraft(self): +# +# trdt = TaskRelationDraftTest() +# trdt.setUp(populate=False) +# +# # setup +# task_draft = models.TaskDraft.objects.create(**self.test_data_1) +# task_relation_draft_1 = models.TaskRelationDraft.objects.create(**trdt.test_data_1) +# task_relation_draft_1.producer = task_draft +# task_relation_draft_1.save() +# task_relation_draft_2 = models.TaskRelationDraft.objects.create(**trdt.test_data_2) +# task_relation_draft_2.consumer = task_draft +# task_relation_draft_2.save() +# +# # assert +# response = client.get('/task_draft/%s/' % task_draft.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['produced_by'], [task_relation_draft_1]) +# assertUrlList(self, response.data['consumed_by'], [task_relation_draft_2]) +# +# +# class TaskRelationDraftTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # set up dependencies +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# self.wrdt = TaskDraftTest() +# self.wrdt.setUp(populate=False) +# +# self.wiort = TaskConnectorsTest() +# self.wiort.setUp(populate=False) +# +# # test data +# self.test_data_1 = {"tags": [], +# "selection_doc": "{}", +# "dataformat": models.Dataformat.objects.get(value='Beamformed'), +# "producer": models.TaskDraft.objects.create(**self.wrdt.test_data_1), +# "consumer": models.TaskDraft.objects.create(**self.wrdt.test_data_2), +# "input": models.TaskConnectors.objects.create(**self.wiort.test_data_1), +# "output": models.TaskConnectors.objects.create(**self.wiort.test_data_2), +# "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_1)} +# +# # test data +# self.test_data_2 = {"tags": [], +# "selection_doc": "{}", +# "dataformat": models.Dataformat.objects.get(value='MeasurementSet'), +# "producer": models.TaskDraft.objects.create(**self.wrdt.test_data_2), +# "consumer": models.TaskDraft.objects.create(**self.wrdt.test_data_1), +# "input": models.TaskConnectors.objects.create(**self.wiort.test_data_2), +# "output": models.TaskConnectors.objects.create(**self.wiort.test_data_1), +# "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_2)} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_TaskRelationDraft_list_view_shows_entry(self): +# +# # setup +# models.TaskRelationDraft.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/task_relation_draft/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_TaskRelationDraft_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.TaskRelationDraft.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_TaskRelationDraft_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.TaskRelationDraft.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_TaskRelationDraft_view_returns_correct_entry(self): +# +# # setup +# id1 = models.TaskRelationDraft.objects.create(**self.test_data_1).id +# id2 = models.TaskRelationDraft.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/task_relation_draft/%s/' % id1, format='json', follow=True) +# response2 = client.get('/task_relation_draft/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_TaskRelationDraft_prevents_missing_template(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['selection_template'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationDraft.objects.create(**test_data) +# +# def test_TaskRelationDraft_prevents_missing_consumer(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['consumer'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationDraft.objects.create(**test_data) +# +# def test_TaskRelationDraft_prevents_missing_producer(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['producer'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationDraft.objects.create(**test_data) +# +# def test_nested_TaskRelationDraft_are_filtered_according_to_TaskDraft(self): +# tdt = TaskDraftTest() +# tdt.setUp(populate=False) +# +# # setup +# task_draft_1 = models.TaskDraft.objects.create(**tdt.test_data_1) +# task_draft_2 = models.TaskDraft.objects.create(**tdt.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['producer'] = task_draft_1 +# task_relation_draft_1 = models.TaskRelationDraft.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['consumer'] = task_draft_2 +# task_relation_draft_2 = models.TaskRelationDraft.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/task_draft/%s/task_relation_draft/' % task_draft_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related producer is returned +# response = client.get('/task_draft/%s/task_relation_draft/%s/' % (task_draft_1.id, task_relation_draft_1.id), format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_1) +# +# # assert an existing related consumer is returned +# response = client.get('/task_draft/%s/task_relation_draft/%s/' % (task_draft_2.id, task_relation_draft_2.id), format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/task_draft/%s/task_relation_draft/%s/' % (task_draft_2.id, task_relation_draft_1.id), format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# class SchedulingUnitBlueprintTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # set up dependencies +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# self.rdt = SchedulingUnitDraftTest() +# self.rdt.setUp(populate=False) +# +# # test data +# self.test_data_1 = {"name": 'my_scheduling_unit_blueprint', +# "description": "", +# "tags": [], +# "requirements_doc": "{}", +# "do_cancel": False, +# "draft": models.SchedulingUnitDraft.objects.create(**self.rdt.test_data_1), +# "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_1)} +# +# # test data +# self.test_data_2 = {"name": 'my_other_scheduling_unit_blueprint', +# "description": "", +# "tags": [], +# "requirements_doc": "{}", +# "do_cancel": True, +# "draft": models.SchedulingUnitDraft.objects.create(**self.rdt.test_data_2), +# "requirements_template": models.SchedulingUnitTemplate.objects.create(**SchedulingUnitTemplateTest.test_data_2)} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_SchedulingUnitBlueprint_list_view_shows_entry(self): +# +# # setup +# models.SchedulingUnitBlueprint.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/scheduling_unit_blueprint/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_SchedulingUnitBlueprint_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.SchedulingUnitBlueprint.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_SchedulingUnitBlueprint_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.SchedulingUnitBlueprint.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_SchedulingUnitBlueprint_view_returns_correct_entry(self): +# +# # setup +# id1 = models.SchedulingUnitBlueprint.objects.create(**self.test_data_1).id +# id2 = models.SchedulingUnitBlueprint.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/scheduling_unit_blueprint/%s/' % id1, format='json', follow=True) +# response2 = client.get('/scheduling_unit_blueprint/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_SchedulingUnitBlueprint_prevents_missing_template(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['requirements_template'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.SchedulingUnitBlueprint.objects.create(**test_data) +# +# def test_SchedulingUnitBlueprint_prevents_missing_draft(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['draft'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.SchedulingUnitBlueprint.objects.create(**test_data) +# +# def test_nested_SchedulingUnitBlueprint_are_filtered_according_to_SchedulingUnitDraft(self): +# +# sudt = SchedulingUnitDraftTest() +# sudt.setUp(populate=False) +# +# # setup +# scheduling_unit_draft_1 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_1) +# scheduling_unit_draft_2 = models.SchedulingUnitDraft.objects.create(**sudt.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['draft'] = scheduling_unit_draft_1 +# scheduling_unit_blueprint_1 = models.SchedulingUnitBlueprint.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['draft'] = scheduling_unit_draft_2 +# scheduling_unit_blueprint_2 = models.SchedulingUnitBlueprint.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/scheduling_unit_draft/%s/scheduling_unit_blueprint/' % scheduling_unit_draft_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned +# response = client.get('/scheduling_unit_draft/%s/scheduling_unit_blueprint/%s/' % (scheduling_unit_draft_2.id, scheduling_unit_blueprint_2.id) , format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/scheduling_unit_draft/%s/scheduling_unit_blueprint/%s/' % (scheduling_unit_draft_2.id, scheduling_unit_blueprint_1.id) , format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# +# +# class TaskBlueprintTest(rest_framework.test.APITransactionTestCase): +# +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # set up dependencies +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# self.wrdt = TaskDraftTest() +# self.wrdt.setUp(populate=False) +# +# self.rbt = SchedulingUnitBlueprintTest() +# self.rbt.setUp(populate=False) +# +# # test data +# self.test_data_1 = {"name": 'my_task_blueprint', +# "description": "", +# "tags": [], +# "specifications_doc": "{}", +# "do_cancel": False, +# "draft": models.TaskDraft.objects.create(**self.wrdt.test_data_1), +# "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_1), +# "scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**self.rbt.test_data_1)} +# +# # test data +# self.test_data_2 = {"name": 'my_other_task_blueprin', # <- Missing a 't'? Well, varchar(30) is shorter than you'd think... +# "description": "", +# "tags": [], +# "specifications_doc": "{}", +# "do_cancel": True, +# "draft": models.TaskDraft.objects.create(**self.wrdt.test_data_2), +# "specifications_template": models.TaskTemplate.objects.create(**TaskTemplateTest.test_data_2), +# "scheduling_unit_blueprint": models.SchedulingUnitBlueprint.objects.create(**self.rbt.test_data_2)} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_TaskBlueprint_list_view_shows_entry(self): +# +# # setup +# models.TaskBlueprint.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/task_blueprint/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_TaskBlueprint_gets_created_with_correct_creation_timestamp(self): +# +# # setup +# before = datetime.utcnow() +# entry = models.TaskBlueprint.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_TaskBlueprint_update_timestamp_gets_changed_correctly(self): +# +# # setup +# entry = models.TaskBlueprint.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_TaskBlueprint_view_returns_correct_entry(self): +# +# # setup +# id1 = models.TaskBlueprint.objects.create(**self.test_data_1).id +# id2 = models.TaskBlueprint.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/task_blueprint/%s/' % id1, format='json', follow=True) +# response2 = client.get('/task_blueprint/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_TaskBlueprint_prevents_missing_template(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['specifications_template'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskBlueprint.objects.create(**test_data) +# +# def test_TaskBlueprint_prevents_missing_draft(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['draft'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskBlueprint.objects.create(**test_data) +# +# def test_TaskBlueprint_prevents_missing_scheduling_unit_blueprint(self): +# +# # setup +# test_data = dict(self.test_data_1) +# test_data['scheduling_unit_blueprint'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskBlueprint.objects.create(**test_data) +# +# def test_nested_TaskBlueprint_are_filtered_according_to_TaskDraft(self): +# tdt = TaskDraftTest() +# tdt.setUp(populate=False) +# +# # setup +# task_draft_1 = models.TaskDraft.objects.create(**tdt.test_data_1) +# task_draft_2 = models.TaskDraft.objects.create(**tdt.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['draft'] = task_draft_1 +# task_blueprint_1 = models.TaskBlueprint.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['draft'] = task_draft_2 +# task_blueprint_2 = models.TaskBlueprint.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/task_draft/%s/task_blueprint/' % task_draft_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned +# response = client.get('/task_draft/%s/task_blueprint/%s/' % (task_draft_2.id, task_blueprint_2.id), format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/task_draft/%s/task_blueprint/%s/' % (task_draft_2.id, task_blueprint_1.id), format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# def test_TaskBlueprint_contains_list_of_related_Subtask(self): +# +# from t_tmssapp_scheduling_django import SubtaskTest # Note: cannot do this on module level due to circular import +# st = SubtaskTest() +# st.setUp(populate=False) +# +# # setup +# task_blueprint = models.TaskBlueprint.objects.create(**self.test_data_1) +# subtask_1 = models.Subtask.objects.create(**st.test_data_1) +# subtask_1.task_blueprint = task_blueprint +# subtask_1.save() +# subtask_2 = models.Subtask.objects.create(**st.test_data_2) +# subtask_2.task_blueprint = task_blueprint +# subtask_2.save() +# +# # assert +# response = client.get('/task_blueprint/%s/' % task_blueprint.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['subtasks'], [subtask_1, subtask_2]) +# +# def test_TaskBlueprint_contains_lists_of_related_TaskRelationBlueprint(self): +# +# trbt = TaskRelationBlueprintTest() +# trbt.setUp(populate=False) +# +# # setup +# task_blueprint = models.TaskBlueprint.objects.create(**self.test_data_1) +# task_relation_blueprint_1 = models.TaskRelationBlueprint.objects.create(**trbt.test_data_1) +# task_relation_blueprint_1.producer = task_blueprint +# task_relation_blueprint_1.save() +# task_relation_blueprint_2 = models.TaskRelationBlueprint.objects.create(**trbt.test_data_2) +# task_relation_blueprint_2.consumer = task_blueprint +# task_relation_blueprint_2.save() +# +# # assert +# response = client.get('/task_blueprint/%s/' % task_blueprint.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertUrlList(self, response.data['produced_by'], [task_relation_blueprint_1]) +# assertUrlList(self, response.data['consumed_by'], [task_relation_blueprint_2]) +# +# +# class TaskRelationBlueprintTest(rest_framework.test.APITransactionTestCase): +# reset_sequences = True +# +# def setUp(self, populate=True): +# +# user, _ = User.objects.get_or_create(username='paulus', email='paulus@boskabouter.com') +# client.force_login(user) +# +# # set up dependencies +# if populate: +# populate_choices(None, None) # <- can only be called once per test case +# +# self.wrrdt = TaskRelationDraftTest() +# self.wrrdt.setUp(populate=False) +# +# self.wrbt = TaskBlueprintTest() +# self.wrbt.setUp(populate=False) +# +# self.wiort = TaskConnectorsTest() +# self.wiort.setUp(populate=False) +# +# # test data +# self.test_data_1 = {"tags": [], +# "selection_doc": "{}", +# "dataformat": models.Dataformat.objects.get(value='Beamformed'), +# "input": models.TaskConnectors.objects.create(**self.wiort.test_data_1), +# "output": models.TaskConnectors.objects.create(**self.wiort.test_data_2), +# "draft": models.TaskRelationDraft.objects.create(**self.wrrdt.test_data_1), +# "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_1), +# "producer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_1), +# "consumer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_2)} +# +# # test data +# self.test_data_2 = {"tags": [], +# "selection_doc": "{}", +# "dataformat": models.Dataformat.objects.get(value='Beamformed'), +# "input": models.TaskConnectors.objects.create(**self.wiort.test_data_1), +# "output": models.TaskConnectors.objects.create(**self.wiort.test_data_2), +# "draft": models.TaskRelationDraft.objects.create(**self.wrrdt.test_data_2), +# "selection_template": models.WorkRelationSelectionTemplate.objects.create(**WorkRelationSelectionTemplateTest.test_data_2), +# "producer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_2), +# "consumer": models.TaskBlueprint.objects.create(**self.wrbt.test_data_1)} +# +# def tearDown(self): +# client.logout() +# +# def test_GET_TaskRelationBlueprint_list_view_shows_entry(self): +# # setup +# models.TaskRelationBlueprint.objects.create(**self.test_data_1) +# +# # assert +# response = client.get('/task_relation_blueprint/', format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# +# assertDataWithUrls(self, response.data['results'][0], self.test_data_1) +# +# def test_TaskRelationBlueprint_gets_created_with_correct_creation_timestamp(self): +# # setup +# before = datetime.utcnow() +# entry = models.TaskRelationBlueprint.objects.create(**self.test_data_1) +# +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.created_at) +# self.assertGreater(after, entry.created_at) +# +# def test_TaskRelationBlueprint_update_timestamp_gets_changed_correctly(self): +# # setup +# entry = models.TaskRelationBlueprint.objects.create(**self.test_data_1) +# before = datetime.utcnow() +# entry.save() +# after = datetime.utcnow() +# +# # assert +# self.assertLess(before, entry.updated_at) +# self.assertGreater(after, entry.updated_at) +# +# def test_GET_TaskRelationBlueprint_view_returns_correct_entry(self): +# # setup +# id1 = models.TaskRelationBlueprint.objects.create(**self.test_data_1).id +# id2 = models.TaskRelationBlueprint.objects.create(**self.test_data_2).id +# +# # assert +# response1 = client.get('/task_relation_blueprint/%s/' % id1, format='json', follow=True) +# response2 = client.get('/task_relation_blueprint/%s/' % id2, format='json', follow=True) +# self.assertEqual(response1.status_code, 200) +# self.assertEqual(response2.status_code, 200) +# assertDataWithUrls(self, response1.data, self.test_data_1) +# assertDataWithUrls(self, response2.data, self.test_data_2) +# +# def test_TaskRelationBlueprint_prevents_missing_selection_template(self): +# # setup +# test_data = dict(self.test_data_1) +# test_data['selection_template'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationBlueprint.objects.create(**test_data) +# +# def test_TaskRelationBlueprint_prevents_missing_draft(self): +# # setup +# test_data = dict(self.test_data_1) +# test_data['draft'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationBlueprint.objects.create(**test_data) +# +# def test_TaskRelationBlueprint_prevents_missing_producer(self): +# # setup +# test_data = dict(self.test_data_1) +# test_data['producer'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationBlueprint.objects.create(**test_data) +# +# def test_TaskRelationBlueprint_prevents_missing_consumer(self): +# # setup +# test_data = dict(self.test_data_1) +# test_data['consumer'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationBlueprint.objects.create(**test_data) +# +# def test_TaskRelationBlueprint_prevents_missing_input(self): +# # setup +# test_data = dict(self.test_data_1) +# test_data['input'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationBlueprint.objects.create(**test_data) +# +# def test_TaskRelationBlueprint_prevents_missing_output(self): +# # setup +# test_data = dict(self.test_data_1) +# test_data['output'] = None +# +# # assert +# with self.assertRaises(IntegrityError): +# models.TaskRelationBlueprint.objects.create(**test_data) +# +# def test_nested_TaskRelationBlueprint_are_filtered_according_to_TaskRelationDraft(self): +# trdt = TaskRelationDraftTest() +# trdt.setUp(populate=False) +# +# # setup +# task_relation_draft_1 = models.TaskRelationDraft.objects.create(**trdt.test_data_1) +# task_relation_draft_2 = models.TaskRelationDraft.objects.create(**trdt.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['draft'] = task_relation_draft_1 +# task_relation_blueprint_1 = models.TaskRelationBlueprint.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['draft'] = task_relation_draft_2 +# task_relation_blueprint_2 = models.TaskRelationBlueprint.objects.create(**test_data_2) +# +# # assert the returned list contains related items +# response = client.get('/task_relation_draft/%s/task_relation_blueprint/' % task_relation_draft_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned +# response = client.get('/task_relation_draft/%s/task_relation_blueprint/%s/' % (task_relation_draft_2.id, task_relation_blueprint_2.id), format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/task_relation_draft/%s/task_relation_blueprint/%s/' % (task_relation_draft_2.id, task_relation_blueprint_1.id), format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# +# def test_nested_TaskRelationBlueprint_are_filtered_according_to_TaskBlueprint(self): +# tbt = TaskBlueprintTest() +# tbt.setUp(populate=False) +# +# # setup +# task_blueprint_1 = models.TaskBlueprint.objects.create(**tbt.test_data_1) +# task_blueprint_2 = models.TaskBlueprint.objects.create(**tbt.test_data_2) +# test_data_1 = dict(self.test_data_1) +# test_data_1['producer'] = task_blueprint_1 +# task_relation_blueprint_1 = models.TaskRelationBlueprint.objects.create(**test_data_1) +# test_data_2 = dict(self.test_data_2) +# test_data_2['consumer'] = task_blueprint_2 +# task_relation_blueprint_2 = models.TaskRelationBlueprint.objects.create(**test_data_2) +# +# # assert the returned list contains related producer +# response = client.get('/task_blueprint/%s/task_relation_blueprint/' % task_blueprint_1.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_1) +# +# # assert the returned list contains related consumer +# response = client.get('/task_blueprint/%s/task_relation_blueprint/' % task_blueprint_2.id, format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# self.assertEqual(len(response.data['results']), 1) +# assertDataWithUrls(self, response.data['results'][0], test_data_2) +# +# # assert an existing related item is returned +# response = client.get('/task_blueprint/%s/task_relation_blueprint/%s/' % (task_blueprint_2.id, task_relation_blueprint_2.id), format='json', follow=True) +# self.assertEqual(response.status_code, 200) +# assertDataWithUrls(self, response.data, test_data_2) +# +# # assert an existing unrelated item is not returned +# response = client.get('/task_blueprint/%s/task_relation_blueprint/%s/' % (task_blueprint_2.id, task_relation_blueprint_1.id), format='json', follow=True) +# self.assertEqual(response.status_code, 404) +# diff --git a/SAS/TMSS/test/t_tmssapp_specification_django.run b/SAS/TMSS/test/t_tmssapp_specification_django.run index 5eb858bd2f84271921204c722c25166f3f7187e9..ebb019daef4a56a1b336a787f3afd8ae55f756cc 100755 --- a/SAS/TMSS/test/t_tmssapp_specification_django.run +++ b/SAS/TMSS/test/t_tmssapp_specification_django.run @@ -1,8 +1,6 @@ #!/bin/bash -. test_funcs.sh - -setup -run_test "$LOFARROOT/lib*/python*/site-packages/lofar/sas/tmss/manage.py test --pattern=t_tmssapp_specification_django.py --testrunner=postgres_testrunner.PostgresqlTestRunner" - +# Run the unit test +source python-coverage.sh +python_coverage_test "*tmss*" t_tmssapp_specification_django.py