Skip to content
Snippets Groups Projects
Commit 70547097 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-44: cleanup of test code.

parent b4b26771
Branches
Tags
No related merge requests found
......@@ -41,7 +41,7 @@ def tearDownModule():
class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest):
def test_resource_usages_performance(self):
ELAPSED_TRESHOLD = 1.0
ELAPSED_TRESHOLD = 2.0 #max allowed insert/update/delete time in seconds
num_resources = self.radb._executeQuery('select count(id) from virtual_instrument.resource;', fetch=_FETCH_ONE)['count']
# make sure all resources have 1000 units available
......@@ -63,113 +63,112 @@ class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest):
with open(filename, 'w') as file:
file.write('#tasks, #claims, #claims_per_resource, #inserted_claims, elapsed_insert\n')
counter = 0
for repeat_nr in range(10):
# it is not common to claim a single resource multiple times for the same task, but it can happen, so test for it.
for preferred_num_claims_per_resource in [1, 2, 5, 10, 20, 50]:
# let's test over a feasible range of #claims. A lofar observation usually has ~200 claims.
for num_claims_to_insert in [1, 2, 5, 10, 20, 50, 100, 200, 500]:
num_claims_to_insert = min(num_claims_to_insert, preferred_num_claims_per_resource*num_resources)
num_claims_per_resource = min(preferred_num_claims_per_resource, num_claims_to_insert)
for oversubscription_factor in [1, 10]:
counter += 1
logger.info('*****************************************************************')
logger.info('starting task and claim scheduling: counter=%s repeat_nr=%s num_claims_per_resource=%s num_claims_to_insert=%s claim_factor=%s',
counter, repeat_nr, num_claims_per_resource, num_claims_to_insert, oversubscription_factor)
result = self.radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation',
now+timedelta(hours=3*counter),
now + timedelta(hours=3*counter + 1),
'content', 'CEP4')
task_id = result['task_id']
task = self.radb.getTask(task_id)
spec_ids.append(task['specification_id'])
claims = [{'resource_id': q/num_claims_per_resource,
'starttime': task['starttime'],
'endtime': task['endtime'],
'status': 'tentative',
'claim_size': oversubscription_factor*MAX_CAPACITY/num_claims_per_resource}
for q in range(num_claims_to_insert)]
# extend claims on storage resources
for claim in claims:
if claim['resource_id'] in storage_resource_ids:
claim['endtime'] += timedelta(days=100)
# it is not common to claim a single resource multiple times for the same task, but it can happen, so test for it.
for preferred_num_claims_per_resource in [1, 2, 5, 10, 20, 50]:
# let's test over a feasible range of #claims. A lofar observation usually has ~200 claims.
for num_claims_to_insert in [1, 2, 5, 10, 20, 50, 100, 200, 500]:
num_claims_to_insert = min(num_claims_to_insert, preferred_num_claims_per_resource*num_resources)
num_claims_per_resource = min(preferred_num_claims_per_resource, num_claims_to_insert)
for oversubscription_factor in [1, 999]:
counter += 1
logger.info('*****************************************************************')
logger.info('starting task and claim scheduling: counter=%s num_claims_per_resource=%s num_claims_to_insert=%s oversubscription_factor=%s',
counter, num_claims_per_resource, num_claims_to_insert, oversubscription_factor)
result = self.radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation',
now+timedelta(hours=3*counter),
now + timedelta(hours=3*counter + 1),
'content', 'CEP4')
task_id = result['task_id']
task = self.radb.getTask(task_id)
spec_ids.append(task['specification_id'])
claims = [{'resource_id': q/num_claims_per_resource,
'starttime': task['starttime'],
'endtime': task['endtime'],
'status': 'tentative',
'claim_size': oversubscription_factor*MAX_CAPACITY/num_claims_per_resource}
for q in range(num_claims_to_insert)]
# extend claims on storage resources
for claim in claims:
if claim['resource_id'] in storage_resource_ids:
claim['endtime'] += timedelta(days=100)
start = datetime.utcnow()
self.radb.insertResourceClaims(task_id, claims, 'foo', 1, 1)
elapsed_insert = totalSeconds(datetime.utcnow() - start)
num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count']
num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count']
has_storage_claims = len(self.radb.getResourceClaims(task_ids=task_id, resource_type='storage')) > 0
# enforce perfomance criterion: inserting claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_insert, ELAPSED_TRESHOLD, msg="insertResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" %(
elapsed_insert, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
if oversubscription_factor > 1:
# (deliberate) oversubscription of resources
# so, expect the claims and task to be in conflict
self.assertGreater(len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')), 0)
self.assertEqual('conflict', self.radb.getTask(task_id)['status'])
# solve oversubscription
start = datetime.utcnow()
self.radb.insertResourceClaims(task_id, claims, 'foo', 1, 1)
elapsed_insert = totalSeconds(datetime.utcnow() - start)
num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count']
num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count']
has_storage_claims = len(self.radb.getResourceClaims(task_ids=task_id, resource_type='storage')) > 0
# enforce perfomance criterion: inserting claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_insert, ELAPSED_TRESHOLD, msg="insertResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" %(
elapsed_insert, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
if oversubscription_factor > 1:
# (deliberate) oversubscription of resources
# so, expect the claims and task to be in conflict
self.assertGreater(len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')), 0)
self.assertEqual('conflict', self.radb.getTask(task_id)['status'])
# solve oversubscription
start = datetime.utcnow()
self.radb.updateResourceClaims(where_task_ids=task_id, claim_size=MAX_CAPACITY/num_claims_per_resource)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# enforce perfomance criterion: updating claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD,
msg="updateResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# check if not oversubscribed anymore
self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')))
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
# no oversubscription (anymore), so expect all claims to be claimable...
self.radb.updateResourceClaims(where_task_ids=task_id, claim_size=MAX_CAPACITY/num_claims_per_resource)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# enforce perfomance criterion: updating claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD,
msg="updateResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# check if not oversubscribed anymore
self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')))
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
# no oversubscription (anymore), so expect all claims to be claimable...
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, claim_status='claimed')
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# are they indeed claimed?
self.assertEqual(num_claims_to_insert, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')))
# enforce perfomance criterion: updating claims should take less than 2*ELAPSED_TRESHOLD sec (2* because we update both tasks and claims)
self.assertLess(elapsed_status_update, 2*ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# ... and proceed with cycling through the task status
for task_status in ['scheduled', 'queued', 'active', 'completing', 'finished']:
# update the task status
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, claim_status='claimed')
self.radb.updateTaskAndResourceClaims(task_id=task_id, task_status=task_status)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# are they indeed claimed?
self.assertEqual(num_claims_to_insert, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')))
# enforce perfomance criterion: updating task status should take less than 2*ELAPSED_TRESHOLD sec (2* because we update both tasks and claims)
self.assertLess(elapsed_status_update, 2*ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s task_status=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, task_status))
# enforce perfomance criterion: updating claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# ... and proceed with cycling through the task status
for task_status in ['scheduled', 'queued', 'active', 'completing', 'finished']:
# update the task status
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, task_status=task_status)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# enforce perfomance criterion: updating task status should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s task_status=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, task_status))
# check task status
self.assertEqual(task_status, self.radb.getTask(task_id)['status'])
# task should now be finished
self.assertEqual('finished', self.radb.getTask(task_id)['status'])
# and all non-long-lasting (storage) claims should be removed.
self.assertEqual(0, len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] <= task['endtime'])))
if has_storage_claims:
# and all long-lasting (storage) claims should still be there.
# (they are removed by the cleanupservice ending/removing the storage claims)
self.assertGreater(len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] > task['endtime'])), 0)
logger.info('TEST RESULT: radb now contains %d tasks, %d claims, insert of %d claims with %d claims per resource takes on average %.3fsec',
num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, elapsed_insert)
file.write('%d, %d, %d, %d, %.3f\n' % (num_tasks, num_claims, num_claims_per_resource, num_claims_to_insert, elapsed_insert))
file.flush()
# check task status
self.assertEqual(task_status, self.radb.getTask(task_id)['status'])
# task should now be finished
self.assertEqual('finished', self.radb.getTask(task_id)['status'])
# and all non-long-lasting (storage) claims should be removed.
self.assertEqual(0, len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] <= task['endtime'])))
if has_storage_claims:
# and all long-lasting (storage) claims should still be there.
# (they are removed by the cleanupservice ending/removing the storage claims)
self.assertGreater(len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] > task['endtime'])), 0)
logger.info('TEST RESULT: radb now contains %d tasks, %d claims, insert of %d claims with %d claims per resource takes on average %.3fsec',
num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, elapsed_insert)
file.write('%d, %d, %d, %d, %.3f\n' % (num_tasks, num_claims, num_claims_per_resource, num_claims_to_insert, elapsed_insert))
file.flush()
logger.info('removing all test specs/tasks/claims from db')
delete_elapsed_list = []
......@@ -197,35 +196,3 @@ if __name__ == "__main__":
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
unittest.main()
exit()
from lofar.common import dbcredentials
radb = RADatabase(dbcredentials.DBCredentials().get('RADB'), log_queries=True)
now = datetime.utcnow()
now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour
counter = 0
for k in range(25000):
num_claims_to_insert = 5
num_insert_repeats = 10
elapsed_insert = 0
for i in range(num_insert_repeats):
counter += 1
result = radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation',
now+timedelta(hours=3*counter),
now + timedelta(hours=1 + 3*counter),
'content', 'CEP4')
task_id = result['task_id']
task = radb.getTask(task_id)
claims = [{'resource_id': 117,
'starttime': task['starttime']-timedelta(minutes=randint(0, 1800)),
'endtime': task['starttime']+timedelta(seconds=randint(1801, 3600)),
'status': 'tentative',
'claim_size': q}
for q in range(num_claims_to_insert)]
radb.insertResourceClaims(task_id, claims, 'foo', 1, 1)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment