-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
t_radb_performance.py 12.84 KiB
#!/usr/bin/python
# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy)
# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it and/or
# modify it under the terms of the GNU General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
# $Id: $
import unittest
import psycopg2
import os
from datetime import datetime, timedelta
from dateutil import parser
from pprint import pformat
from random import randint
from lofar.common.datetimeutils import totalSeconds
import logging
logger = logging.getLogger(__name__)
import radb_common_testing
from lofar.sas.resourceassignment.database.radb import RADatabase, _FETCH_ONE
def setUpModule():
return radb_common_testing.setUpModule()
def tearDownModule():
return radb_common_testing.tearDownModule()
class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest):
def test_resource_usages_performance(self):
ELAPSED_TRESHOLD = 2.0 #max allowed insert/update/delete time in seconds
num_resources = self.radb._executeQuery('select count(id) from virtual_instrument.resource;', fetch=_FETCH_ONE)['count']
# make sure all resources have 1000 units available
MAX_CAPACITY=1000
self.radb._executeQuery('update resource_monitoring.resource_capacity set (available, total) = (%s, %s);', (MAX_CAPACITY,MAX_CAPACITY))
# pretend that we have an almost unlimited amount of storage space
self.radb._executeQuery('update resource_monitoring.resource_capacity set (available, total) = (%s, %s) ' \
'where resource_id in (select id from virtual_instrument.resource_view where type_name = \'storage\');',
(1e9*MAX_CAPACITY,1e9*MAX_CAPACITY))
# keep a list of storage-type resource(ids), so we can create long lasting claims for these.
storage_resource_ids = set(r['id'] for r in self.radb.getResources(resource_types='storage'))
now = datetime.utcnow()
now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour
spec_ids = []
filename = 'resource_usages_performance%s.csv' % (datetime.utcnow().strftime('%Y%m%dT%H%M%S'),)
with open(filename, 'w') as file:
file.write('#tasks, #claims, #claims_per_resource, #inserted_claims, elapsed_insert\n')
counter = 0
# it is not common to claim a single resource multiple times for the same task, but it can happen, so test for it.
for preferred_num_claims_per_resource in [1, 2, 5, 10, 20, 50]:
# let's test over a feasible range of #claims. A lofar observation usually has ~200 claims.
for num_claims_to_insert in [1, 2, 5, 10, 20, 50, 100, 200, 500]:
num_claims_to_insert = min(num_claims_to_insert, preferred_num_claims_per_resource*num_resources)
num_claims_per_resource = min(preferred_num_claims_per_resource, num_claims_to_insert)
for oversubscription_factor in [1, 999]:
counter += 1
logger.info('*****************************************************************')
logger.info('starting task and claim scheduling: counter=%s num_claims_per_resource=%s num_claims_to_insert=%s oversubscription_factor=%s',
counter, num_claims_per_resource, num_claims_to_insert, oversubscription_factor)
result = self.radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation',
now+timedelta(hours=3*counter),
now + timedelta(hours=3*counter + 1),
'content', 'CEP4')
task_id = result['task_id']
task = self.radb.getTask(task_id)
spec_ids.append(task['specification_id'])
claims = [{'resource_id': q/num_claims_per_resource,
'starttime': task['starttime'],
'endtime': task['endtime'],
'status': 'tentative',
'claim_size': oversubscription_factor*MAX_CAPACITY/num_claims_per_resource}
for q in range(num_claims_to_insert)]
# extend claims on storage resources
for claim in claims:
if claim['resource_id'] in storage_resource_ids:
claim['endtime'] += timedelta(days=100)
start = datetime.utcnow()
self.radb.insertResourceClaims(task_id, claims, 'foo', 1, 1)
elapsed_insert = totalSeconds(datetime.utcnow() - start)
num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count']
num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count']
has_storage_claims = len(self.radb.getResourceClaims(task_ids=task_id, resource_type='storage')) > 0
# enforce perfomance criterion: inserting claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_insert, ELAPSED_TRESHOLD, msg="insertResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" %(
elapsed_insert, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
if oversubscription_factor > 1:
# (deliberate) oversubscription of resources
# so, expect the claims and task to be in conflict
self.assertGreater(len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')), 0)
self.assertEqual('conflict', self.radb.getTask(task_id)['status'])
# solve oversubscription
start = datetime.utcnow()
self.radb.updateResourceClaims(where_task_ids=task_id, claim_size=MAX_CAPACITY/num_claims_per_resource)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# enforce perfomance criterion: updating claims should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD,
msg="updateResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# check if not oversubscribed anymore
self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')))
self.assertEqual('approved', self.radb.getTask(task_id)['status'])
# no oversubscription (anymore), so expect all claims to be claimable...
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, claim_status='claimed')
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# are they indeed claimed?
self.assertEqual(num_claims_to_insert, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed')))
# enforce perfomance criterion: updating claims should take less than 2*ELAPSED_TRESHOLD sec (2* because we update both tasks and claims)
self.assertLess(elapsed_status_update, 2*ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource))
# ... and proceed with cycling through the task status
for task_status in ['scheduled', 'queued', 'active', 'completing', 'finished']:
# update the task status
start = datetime.utcnow()
self.radb.updateTaskAndResourceClaims(task_id=task_id, task_status=task_status)
elapsed_status_update = totalSeconds(datetime.utcnow() - start)
# enforce perfomance criterion: updating task status should take less than 2*ELAPSED_TRESHOLD sec (2* because we update both tasks and claims)
self.assertLess(elapsed_status_update, 2*ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s task_status=%s" % (
elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, task_status))
# check task status
self.assertEqual(task_status, self.radb.getTask(task_id)['status'])
# task should now be finished
self.assertEqual('finished', self.radb.getTask(task_id)['status'])
# and all non-long-lasting (storage) claims should be removed.
self.assertEqual(0, len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] <= task['endtime'])))
if has_storage_claims:
# and all long-lasting (storage) claims should still be there.
# (they are removed by the cleanupservice ending/removing the storage claims)
self.assertGreater(len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] > task['endtime'])), 0)
logger.info('TEST RESULT: radb now contains %d tasks, %d claims, insert of %d claims with %d claims per resource takes on average %.3fsec',
num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, elapsed_insert)
file.write('%d, %d, %d, %d, %.3f\n' % (num_tasks, num_claims, num_claims_per_resource, num_claims_to_insert, elapsed_insert))
file.flush()
logger.info('removing all test specs/tasks/claims from db')
delete_elapsed_list = []
file.write('\n\n#tasks, #claims, elapsed_delete\n')
for spec_id in spec_ids:
num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count']
num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count']
start = datetime.utcnow()
self.radb.deleteSpecification(spec_id)
elapsed = totalSeconds(datetime.utcnow() - start)
delete_elapsed_list.append(elapsed)
# enforce perfomance criterion: (cascading) delete of spec should take less than ELAPSED_TRESHOLD sec
self.assertLess(elapsed, ELAPSED_TRESHOLD)
file.write('%d, %d, %.3f\n' % (num_tasks, num_claims, elapsed))
file.flush()
logger.info('average spec delete time: %.3f', sum(delete_elapsed_list)/float(len(delete_elapsed_list)))
logger.info('Done. Results can be found in file: %s', filename)
if __name__ == "__main__":
os.environ['TZ'] = 'UTC'
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
unittest.main()