diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py index 9a8a714739b1ce7e9b5c7677311a6d3227ad5823..126a7059d4b3192ba03f680030f923536fd9e8a6 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py @@ -137,8 +137,7 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): sql_basepath + "/add_resource_allocation_statics.sql", sql_basepath + "/add_virtual_instrument.sql", sql_basepath + "/add_notifications.sql", - sql_basepath + "/add_functions_and_triggers.sql" - ] + sql_basepath + "/add_functions_and_triggers.sql"] for sql_path in sql_createdb_paths: with open(sql_path) as sql: @@ -650,8 +649,8 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): 'starttime': parser.parse(sample_starttime), 'endtime': parser.parse(sample_endtime), 'cluster': 'CEP4', - 'status': 'conflict', - 'status_id': 335, + 'status': 'approved', + 'status_id': 300, 'type': 'observation', 'type_id': 0, 'mom_id': 0, @@ -2005,8 +2004,13 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): 'claim_size': 96 } claim2_id = self.radb.insertResourceClaims(task2_id, [claim2], 'foo', 1, 1)[0] - self.radb.updateResourceClaims(claim2_id, status='claimed') + # task1 is partially in the way, so claim2 and task2 should have conflict status + self.assertEqual('conflict', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('conflict', self.radb.getTask(task2_id)['status']) + # updating claim2's status to claimed should not succeed + self.radb.updateResourceClaims(claim2_id, status='claimed') + self.assertEqual('conflict', self.radb.getResourceClaim(claim2_id)['status']) self.assertEqual('conflict', self.radb.getTask(task2_id)['status']) def test_double_claim_should_result_in_conflict_overlap_in_the_past_and_future(self): @@ -2169,6 +2173,441 @@ class ResourceAssignmentDatabaseTest(unittest.TestCase): self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + def test_dwellscheduler_high_low_priority_scenario(self): + """special test case to prove and solve bug: https://support.astron.nl/jira/browse/SW-426 + """ + #start with clean database + for spec in self.radb.getSpecifications(): + self.radb.deleteSpecification(spec['id']) # cascades into tasks and claims + + ###################################################################################### + # setup phase, create tasks and claims. should just work. + # we replay a responsive telescope trigger event, as performed by the dwellscheduler. + # We have two tasks, one with high prio, and one with low. + # the high prio tasks will have a conflict with the low one. + ###################################################################################### + + base_time = datetime.utcnow() + # round to current full hour (for readability in logging) + base_time = base_time - timedelta(minutes=base_time.minute, seconds=base_time.second, microseconds=base_time.microsecond) + + RESOURCE_ID = 252 + resource_max_cap = self.radb.get_resource_claimable_capacity(RESOURCE_ID, base_time, base_time) + + # insert the 'low prio' spec, task... + spec_task_low = self.radb.insertSpecificationAndTask(1, 1, 'prescheduled', 'observation', + base_time + timedelta(minutes=5), + base_time + timedelta(minutes=10), 'foo', 'CEP4') + task_low_id = spec_task_low['task_id'] + task_low = self.radb.getTask(task_low_id) + + + # the dwellscheduler inserts the claim(s)... + self.radb.insertResourceClaims(task_low_id, [{ 'resource_id': RESOURCE_ID, + 'starttime': task_low['starttime'], + 'endtime': task_low['endtime'], + 'status': 'tentative', + 'claim_size': resource_max_cap }], + 'user', 1) + + # ... and then the dwellscheduler sets the claims status to claimed... + self.radb.updateResourceClaims(where_task_ids=[task_low_id], status="claimed") + + logger.info("task_low's claims: %s", self.radb.getResourceClaims(task_ids=task_low_id)) + + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_low_id))) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_low_id, status='claimed'))) + + # ... and updates the spec's start and endtime to the already specified start and endtime + # (why? not needed, but should not do any harm either) + self.radb.updateSpecification(task_low['specification_id'], + starttime=task_low['starttime'], + endtime=task_low['endtime']) + + # finally make the task scheduled. Should still work. + self.radb.updateTask(task_low_id, task_status='scheduled') + + # so fo so good. Everything should be normal and fine. Let's check. + self.assertEqual('scheduled', self.radb.getTask(id=task_low_id)['status']) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_low_id, status='claimed'))) + + # now insert a second task, the so called high priority task, + # overlapping with the beginning of task_low + # so, the dwellscheduler finds task_low in task_high's higway + # so, task_low is aborted by the dwellscheduler (later in the code). + spec_task_high1 = self.radb.insertSpecificationAndTask(2, 2, 'approved', 'observation', + base_time, + base_time + timedelta(minutes=7), 'foo', 'CEP4') + task_high1_id = spec_task_high1['task_id'] + task_high1 = self.radb.getTask(task_high1_id) + + # the dwellscheduler inserts the claim(s)... + self.radb.insertResourceClaims(task_high1_id, [{ 'resource_id': RESOURCE_ID, + 'starttime': task_high1['starttime'], + 'endtime': task_high1['endtime'], + 'status': 'tentative', + 'claim_size': resource_max_cap }], + 'user', 1) + + logger.info("task_high1's claims: %s", self.radb.getResourceClaims(task_ids=task_high1_id)) + + # we expect task_high1 to have on claim in conflict (with the claim of task_low) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_high1_id))) + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_high1_id, status='claimed'))) + self.assertEqual(1, len(self.radb.getResourceClaims(task_ids=task_high1_id, status='conflict'))) + + claim_in_conflict = self.radb.getResourceClaims(task_ids=task_high1_id, status='conflict')[0] + overlapping_claims = self.radb.get_overlapping_claims(claim_id=claim_in_conflict['id']) + logger.info('claim_in_conflict: %s', claim_in_conflict) + logger.info('overlapping_claims: %s', overlapping_claims) + self.assertEqual(1, len(overlapping_claims)) + self.assertEqual(task_low_id, overlapping_claims[0]['task_id']) + + ######################################################################## + # end of setup phase, now let's (try to) reproduce the bug... + # the dwellscheduler tries to abort task_low, to make room for task_high + # this caused an erroneous database exception on the production system + # but strangely enough we cannot repeat it here, + # even though we follow the same code path. + # + # This leads us to the conclusion that there was a strange set of + # circumstances in the data in the resource_usage table causing the bug in production. + # + # While examining the bug we did discover some errors in the sql code, + # for which we added more additional tests: + # - test_task_releases_claims_when_set_to_approved + # - test_task_in_conflict_releases_claimed_claims + # - test_duplicate_full_claims_on_one_resource + # - test_task_and_claim_with_zero_duration + # - test_are_claims_in_conflict_released_by_removing_conclict_causing_claims + # + # Even though this test could not reproduce the error as it happenend on production, + # we'll keep it for future reference, and for future proof the the code still works. + # + ######################################################################## + + with mock.patch('lofar.sas.resourceassignment.database.radb.logger') as mocked_logger: + self.radb.updateTaskAndResourceClaims(task_id=task_low_id, task_status='aborted', + endtime=task_low['starttime']) # yes, the endtime is set to the starttime + + # on production the above call produce the following log line: + # 2018-06-29 09:46:16,240 ERROR Rolling back query='UPDATE resource_allocation.resource_claim SET (endtime) = (2018-06-29 11:59:17) WHERE task_id = 148052' due to error: 'duplicate key value violates unique constraint "usage_unique" + # but unfortunately this error is not reproduced here, + # the only thing we can test for is if a rollback occurs + + # test if there was a log line containing the database log message for 'claim starttime >= endtime' + self.assertTrue(len([ca for ca in mocked_logger.error.call_args_list + if 'Rolling back' in ca[0][0] + and 'claim starttime >= endtime' in ca[0][0]]) > 0) + + + def test_task_releases_claims_when_set_to_approved(self): + now = datetime.utcnow() + now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour + + result = self.radb.insertSpecificationAndTask(0, 0, 'approved', 'observation', + now, now+timedelta(hours=1), 'foo', 'CEP4') + self.assertTrue(result['inserted']) + self.assertIsNotNone(result['task_id']) + task_id = result['task_id'] + task = self.radb.getTask(task_id) + self.assertEqual('approved', task['status']) + + # select first (arbitrary) resource + resource = self.radb.getResources(include_availability=True)[0] + + self.radb.insertResourceClaim(resource['id'], task_id, task['starttime'], task['endtime'], + 0.5*resource['available_capacity'], 'foo', 1) + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + self.assertEqual(1, len(tentative_claims)) + + # set status to claimed + self.radb.updateResourceClaims(where_task_ids=task_id, status='claimed') + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + self.assertEqual(0, len(tentative_claims)) + self.assertEqual(1, len(claimed_claims)) + + # when setting the task to prescheduled and back to approved, all claimed claims should be released + self.radb.updateTask(task_id=task_id, task_status='prescheduled') + self.radb.updateTask(task_id=task_id, task_status='approved') + task = self.radb.getTask(task_id) + self.assertEqual('approved', task['status']) + + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + self.assertEqual(1, len(tentative_claims)) + self.assertEqual(0, len(claimed_claims)) + + + def test_task_in_conflict_releases_claimed_claims(self): + """tests whether a task with multiple claims releases the claimed claims when the task goes to conflict. + This is wanted behaviour, because when a single claim goes to conflict, then the task cannot be scheduled. + So, it makes sense to release the other already claimed claims for other tasks. + """ + now = datetime.utcnow() + now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour + + result = self.radb.insertSpecificationAndTask(0, 0, 'approved', 'observation', + now, now+timedelta(hours=1), 'foo', 'CEP4') + self.assertTrue(result['inserted']) + self.assertIsNotNone(result['task_id']) + task_id = result['task_id'] + task = self.radb.getTask(task_id) + self.assertEqual('approved', task['status']) + + # select first two (arbitrary) resources + resources = self.radb.getResources(include_availability=True) + resource1 = resources[0] + resource2 = resources[1] + + # and insert a claim for each resource. + # one claim should fit and be set to claimed... + self.radb.insertResourceClaim(resource1['id'], task_id, task['starttime'], task['endtime'], + 0.5*resource1['available_capacity'], 'foo', 1) + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + self.assertEqual(1, len(tentative_claims)) + + # set status to claimed + self.radb.updateResourceClaims(where_task_ids=task_id, status='claimed') + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + conflict_claims = self.radb.getResourceClaims(task_ids=task_id, status='conflict') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + self.assertEqual(0, len(tentative_claims)) + self.assertEqual(0, len(conflict_claims)) + self.assertEqual(1, len(claimed_claims)) + + # the other claim should not fit and cause a conflict... + self.radb.insertResourceClaim(resource2['id'], task_id, task['starttime'], task['endtime'], + 2.0*resource2['available_capacity'], 'foo', 1) + + # ... result should be that the task also goes to conflict ... + task = self.radb.getTask(task_id) + self.assertEqual('conflict', task['status']) + + # ... and that all the task's claimed claims should be released + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + conflict_claims = self.radb.getResourceClaims(task_ids=task_id, status='conflict') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + self.assertEqual(1, len(tentative_claims)) + self.assertEqual(1, len(conflict_claims)) + self.assertEqual(0, len(claimed_claims)) + conflict_claim = conflict_claims[0] + + # a user/operator action could be to set the task back to approved + # all claimed claims which were already set back to tentative should still be tentative + # and claims in conflict should remain in conflict + self.radb.updateTask(task_id=task_id, task_status='approved') + task = self.radb.getTask(task_id) + self.assertEqual('approved', task['status']) + + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + conflict_claims = self.radb.getResourceClaims(task_ids=task_id, status='conflict') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + self.assertEqual(1, len(tentative_claims)) + self.assertEqual(1, len(conflict_claims)) + self.assertEqual(0, len(claimed_claims)) + self.assertEqual(conflict_claim['id'], conflict_claims[0]['id']) + + def test_duplicate_full_claims_on_one_resource(self): + """special test case to prove and solve bug: https://support.astron.nl/jira/browse/SW-426 + We found out that inserting two duplicate claims for one resource does not result in the two claims + having the conflict status, even though at least one of them should have it. + """ + # start with clean database + for spec in self.radb.getSpecifications(): + self.radb.deleteSpecification(spec['id']) # cascades into tasks and claims + + now = datetime.utcnow() + # round to next full hour (for readability in logging) + now = now - timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) + now = now + timedelta(hours=1) + + spec_task = self.radb.insertSpecificationAndTask(0, 0, 'approved', 'observation', + now, now + timedelta(minutes=10), + 'foo', 'CEP4') + + task_id = spec_task['task_id'] + task = self.radb.getTask(task_id) + + RESOURCE_ID = 252 + resource_max_cap = self.radb.get_resource_claimable_capacity(RESOURCE_ID, now, now) + + # create one claim, with claim_size of max capacity + claim = {'resource_id': RESOURCE_ID, + 'starttime': task['starttime'], + 'endtime': task['endtime'], + 'status': 'tentative', + 'claim_size': resource_max_cap} + + # insert the same claim twice, so two times the maxcap should not fit in total, + # but should fit if only one is claimed + self.radb.insertResourceClaims(task_id, [claim, claim], 'user', 1) + + # get the claims from the db, and check if there are 2, and check their status. + # Both should have tentative status, and not conflict status, + # because we did not claim anything yet. + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + conflict_claims = self.radb.getResourceClaims(task_ids=task_id, status='conflict') + self.assertEqual(2, len(tentative_claims)) + self.assertEqual(0, len(conflict_claims)) + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + + # let's try to claim them both in one call. + self.radb.updateResourceClaims(where_task_ids=[task_id], status='claimed') + + # Get the claims again from the db, and check if there are 2 + # one was successfully claimed, but put back to tentative, + # because for the other there was no room, so it should be in conflict. + # As a result of the claim in conflict, the task is in conflict as well. + # And as a result of the task in conflict, all claimed claims are released and put back to tentative. + # And because the claimed claim was put back to tentative, this frees up room for the claim in conflict, + # which should not be in conflict anymore, but also tentative. + # (Yes, this is quite confusing, but correct.) + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + conflict_claims = self.radb.getResourceClaims(task_ids=task_id, status='conflict') + self.assertEqual(2, len(tentative_claims)) + self.assertEqual(0, len(claimed_claims)) + self.assertEqual(0, len(conflict_claims)) + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + + # let's try to claim only one. + # One should fit, but as a result the other won't fit anymore and will go to conflict + # which causes the task to go to conflict, which causes the claimed claim to be released, + # which frees up space to the other which will be put to tentative after being in conflict. + # (Yes, this is also quite confusing, but correct.) + self.radb.updateResourceClaim(tentative_claims[0]['id'], status='claimed') + tentative_claims = self.radb.getResourceClaims(task_ids=task_id, status='tentative') + claimed_claims = self.radb.getResourceClaims(task_ids=task_id, status='claimed') + conflict_claims = self.radb.getResourceClaims(task_ids=task_id, status='conflict') + self.assertEqual(2, len(tentative_claims)) + self.assertEqual(0, len(claimed_claims)) + self.assertEqual(0, len(conflict_claims)) + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + + + def test_task_and_claim_with_zero_duration(self): + """claims which claim a resource and release it at the same moment are now allowed (it's a paradox). + """ + # start with clean database + for spec in self.radb.getSpecifications(): + self.radb.deleteSpecification(spec['id']) # cascades into tasks and claims + + now = datetime.utcnow() + + spec_task = self.radb.insertSpecificationAndTask(0, 0, 'approved', 'observation', + now, now, # tasks can have zero duration + 'foo', 'CEP4') + + task_id = spec_task['task_id'] + task = self.radb.getTask(task_id) + self.assertIsNotNone(task) + self.assertEqual(now, task['starttime']) + self.assertEqual(now, task['endtime']) + + with mock.patch('lofar.sas.resourceassignment.database.radb.logger') as mocked_logger: + RESOURCE_ID = 252 + inserted_claim_id = self.radb.insertResourceClaim(RESOURCE_ID, task_id, + now, now, # claims cannot have zero duration, test that! + 1, 'foo', 1) + self.assertIsNone(inserted_claim_id) + mocked_logger.error.assert_any_call('One or more claims could not be inserted. Rolling back.') + # test if there was a log line containing the database log message for 'claim starttime >= endtime' + self.assertTrue( + len([ca for ca in mocked_logger.error.call_args_list if 'claim starttime >= endtime' in ca[0][0]]) > 0) + + with mock.patch('lofar.sas.resourceassignment.database.radb.logger') as mocked_logger: + # try again, with multi-claim insert + inserted_claim_ids = self.radb.insertResourceClaims(task_id, [{'resource_id': RESOURCE_ID, + 'starttime': now, + 'endtime': now, + 'status': 'tentative', + 'claim_size': 1}], + 'foo', 1) + self.assertEqual([], inserted_claim_ids) + # c for c in mocked_logger.error.calls if c + mocked_logger.error.assert_any_call('One or more claims could not be inserted. Rolling back.') + # test if there was a log line containing the database log message for 'claim starttime >= endtime' + self.assertTrue( + len([ca for ca in mocked_logger.error.call_args_list if 'claim starttime >= endtime' in ca[0][0]]) > 0) + + def test_are_claims_in_conflict_released_by_removing_conflict_causing_claims(self): + """test whether a claim which is in conflict is put automatically to tentative when the conflict-causing claim is released. + """ + # start with clean database + for spec in self.radb.getSpecifications(): + self.radb.deleteSpecification(spec['id']) # cascades into tasks and claims + + base_time = datetime.utcnow() + # round to current full hour (for readability in logging) + base_time = base_time - timedelta(minutes=base_time.minute, seconds=base_time.second, + microseconds=base_time.microsecond) + + RESOURCE_ID = 252 + resource_max_cap = self.radb.get_resource_claimable_capacity(RESOURCE_ID, base_time, base_time) + + # insert a first task and full claim on a resource... + spec_task1 = self.radb.insertSpecificationAndTask(0, 0, 'approved', 'observation', + base_time + timedelta(minutes=+0), + base_time + timedelta(minutes=+10), 'foo', 'CEP4') + self.assertTrue(spec_task1['inserted']) + task1_id = spec_task1['task_id'] + task1 = self.radb.getTask(task1_id) + self.assertEqual('approved', task1['status']) + + claim1_id = self.radb.insertResourceClaim(RESOURCE_ID, task1_id, + task1['starttime'], task1['endtime'], + resource_max_cap, 'foo', 1) + # claim it, and check it. Should succeed. + self.radb.updateResourceClaim(claim1_id, status='claimed') + self.assertEqual('claimed', self.radb.getResourceClaim(claim1_id)['status']) + + # insert second (partially overlapping) task and claim on same resource, which we expect to get a conflict status + # because the first claim already claims the resource fully. + spec_task2 = self.radb.insertSpecificationAndTask(1, 1, 'approved', 'observation', + base_time + timedelta(minutes=+5), + base_time + timedelta(minutes=+15), 'foo', 'CEP4') + self.assertTrue(spec_task2['inserted']) + task2_id = spec_task2['task_id'] + task2 = self.radb.getTask(task2_id) + self.assertEqual('approved', task2['status']) + + claim2_id = self.radb.insertResourceClaim(RESOURCE_ID, task2_id, + task2['starttime'], task2['endtime'], + resource_max_cap, 'foo', 1) + self.assertEqual('conflict', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('conflict', self.radb.getTask(task2_id)['status']) + + # now let's see if releasing claim1 results in claim2 not having conflict state anymore + self.radb.updateResourceClaim(claim1_id, status='tentative') + self.assertEqual('tentative', self.radb.getResourceClaim(claim1_id)['status']) + self.assertEqual('tentative', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('approved', self.radb.getTask(task1_id)['status']) + self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + + # claim claim1 again, and check it. Should succeed. + # and claim2 should go to conflict again. + self.radb.updateResourceClaim(claim1_id, status='claimed') + self.assertEqual('claimed', self.radb.getResourceClaim(claim1_id)['status']) + self.assertEqual('conflict', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('approved', self.radb.getTask(task1_id)['status']) + self.assertEqual('conflict', self.radb.getTask(task2_id)['status']) + + # this time, resolve the conflict by shifting the endtime of claim1 + self.radb.updateResourceClaim(claim1_id, endtime=task2['starttime']) + self.assertEqual('claimed', self.radb.getResourceClaim(claim1_id)['status']) + self.assertEqual('tentative', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('approved', self.radb.getTask(task1_id)['status']) + self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + + # and finally, we should be able to claim claim2 as well + self.radb.updateResourceClaim(claim2_id, status='claimed') + self.assertEqual('claimed', self.radb.getResourceClaim(claim1_id)['status']) + self.assertEqual('claimed', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('approved', self.radb.getTask(task1_id)['status']) + self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + if __name__ == "__main__": os.environ['TZ'] = 'UTC'