Skip to content
Snippets Groups Projects
Commit 6293ff36 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-41: also track claim and task status changes and measure performance.

parent 06dd347b
No related branches found
No related tags found
No related merge requests found
......@@ -41,61 +41,102 @@ def tearDownModule():
class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest):
def test_resource_usages_performance(self):
num_resources = self.radb._executeQuery('select count(id) from virtual_instrument.resource;', fetch=_FETCH_ONE)['count']
ELAPSED_TRESHOLD = 1.0
insert_elapsed_list = []
num_resources = self.radb._executeQuery('select count(id) from virtual_instrument.resource;', fetch=_FETCH_ONE)['count']
# make sure all resources have 1000 units available
MAX_CAPACITY=1000
self.radb._executeQuery('update resource_monitoring.resource_capacity set (available, total) = (%s, %s);', (MAX_CAPACITY,MAX_CAPACITY))
now = datetime.utcnow()
now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour
spec_ids = []
filename = 'resource_usages_performance%s.csv' % (datetime.utcnow().strftime('%Y%m%dT%H%M%S'),)
with open(filename, 'w') as file:
file.write('#tasks, #claims, #claims_per_resource, #inserted_claims, elapsed_insert, elapsed_rebuild\n')
file.write('#tasks, #claims, #claims_per_resource, #inserted_claims, elapsed_insert\n')
counter = 0
for repeat_nr in range(10):
for repeat_nr in range(1):
# it is not common to claim a single resource multiple times for the same task, but it can happen, so test for it.
for num_claims_per_resource in [1, 2, 5, 10, 20]:
for num_claims_per_resource in [1, 2, 5, 10, 20, 50]:
# let's test over a feasible range of #claims. A lofar observation usually has ~200 claims.
for num_claims_to_insert in [1, 2, 5, 10, 20, 50, 100, 200, 500]:
num_claims_to_insert = min(num_claims_to_insert, num_claims_per_resource*num_resources)
counter += 1
result = self.radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation',
now+timedelta(hours=3*counter),
now + timedelta(hours=1 + 3*counter),
'content', 'CEP4')
task_id = result['task_id']
task = self.radb.getTask(task_id)
spec_ids.append(task['specification_id'])
claims = [{'resource_id': q/num_claims_per_resource,
'starttime': task['starttime']-timedelta(minutes=randint(0, 1800)),
'endtime': task['starttime']+timedelta(seconds=randint(1801, 3600)),
'status': 'tentative',
'claim_size': q}
for q in range(num_claims_to_insert)]
start = datetime.utcnow()
self.radb.insertResourceClaims(task_id, claims, 'foo', 1, 1)
elapsed_insert = totalSeconds(datetime.utcnow() - start)
insert_elapsed_list.append(elapsed_insert)
num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count']
num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count']
# enforce perfomance criterion: inserting claims should take less than 1.0 sec
self.assertLess(elapsed_insert, 1.0, msg="insertResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" %(
elapsed_insert, 1.0, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
start = datetime.utcnow()
# make sure the usage table is wiped, so asserts fail when rebuild_resource_usages_from_claims is erroneously roll'ed back.
# self.radb.rebuild_resource_usages_from_claims(RESOURCE_ID, 'tentative')
elapsed_rebuild = -1 #totalSeconds(datetime.utcnow() - start)
logger.info('TEST RESULT: radb now contains %d tasks, %d claims, insert of %d claims with %d claims per resource takes on average %.3fsec and a rebuild of the whole usage table takes %.3fsec',
num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, elapsed_insert, elapsed_rebuild)
file.write('%d, %d, %d, %d, %.3f, %.3f\n' % (num_tasks, num_claims, num_claims_per_resource, num_claims_to_insert, elapsed_insert, elapsed_rebuild))
file.flush()
num_claims_per_resource = min(num_claims_per_resource, num_claims_to_insert)
for claim_factor in [1, 2]:
counter += 1
logger.info('*****************************************************************')
logger.info('starting task and claim scheduling: counter=%s repeat_nr=%s num_claims_per_resource=%s num_claims_to_insert=%s claim_factor=%s',
counter, repeat_nr, num_claims_per_resource, num_claims_to_insert, claim_factor)
result = self.radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation',
now+timedelta(hours=3*counter),
now + timedelta(hours=3*counter + 1),
'content', 'CEP4')
task_id = result['task_id']
task = self.radb.getTask(task_id)
spec_ids.append(task['specification_id'])
claims = [{'resource_id': q/num_claims_per_resource,
'starttime': task['starttime'],
'endtime': task['endtime'], #TODO: extend storage claims into the future
'status': 'tentative',
'claim_size': claim_factor*MAX_CAPACITY/num_claims_per_resource}
for q in range(num_claims_to_insert)]
start = datetime.utcnow()
self.radb.insertResourceClaims(task_id, claims, 'foo', 1, 1)
elapsed_insert = totalSeconds(datetime.utcnow() - start)
num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count']
num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count']
# enforce perfomance criterion: inserting claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_insert, ELAPSED_TRESHOLD, msg="insertResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" %(
elapsed_insert, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
if claim_factor == 1:
# no oversubscription, so expect all claims to be claimable...
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, claim_status='claimed')
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# are they indeed claimed?
self.assertEqual(num_claims_to_insert, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')))
# enforce perfomance criterion: updating claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# ... and proceed with cycling through the task status
for task_status in ['scheduled', 'queued', 'active', 'completing', 'finished']:
# update the task status
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, task_status=task_status)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# check task status
self.assertEqual(task_status, self.radb.getTask(task_id)['status'])
# enforce perfomance criterion: inserting claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s task_status=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, task_status))
# task should now be finished
self.assertEqual('finished', self.radb.getTask(task_id)['status'])
# and all claims should be removed.
# self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id)))
else:
# (deliberate) oversubscription of resources
# so, expect the claims and task to be in conflict
self.assertEqual(num_claims_to_insert, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')))
self.assertEqual('conflict', self.radb.getTask(task_id)['status'])
logger.info('TEST RESULT: radb now contains %d tasks, %d claims, insert of %d claims with %d claims per resource takes on average %.3fsec',
num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, elapsed_insert)
file.write('%d, %d, %d, %d, %.3f\n' % (num_tasks, num_claims, num_claims_per_resource, num_claims_to_insert, elapsed_insert))
file.flush()
logger.info('removing all test specs/tasks/claims from db')
delete_elapsed_list = []
......@@ -110,13 +151,12 @@ class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest):
elapsed = totalSeconds(datetime.utcnow() - start)
delete_elapsed_list.append(elapsed)
# enforce perfomance criterion: (cascading) delete of spec should take less than 1.0 sec
self.assertLess(elapsed, 1.0)
# enforce perfomance criterion: (cascading) delete of spec should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed, ELAPSED_TRESHOLD)
file.write('%d, %d, %.3f\n' % (num_tasks, num_claims, elapsed))
file.flush()
logger.info('average claim insert time: %.3f', sum(insert_elapsed_list)/float(len(insert_elapsed_list)))
logger.info('average spec delete time: %.3f', sum(delete_elapsed_list)/float(len(delete_elapsed_list)))
logger.info('Done. Results can be found in file: %s', filename)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment