diff --git a/.gitattributes b/.gitattributes index 5fa260b9c2caa97136c18806ea2c3f6693fe43e8..cf79b26ea3313eae2a8ca97cc6420f6fcaa83355 100644 --- a/.gitattributes +++ b/.gitattributes @@ -4717,10 +4717,12 @@ SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.ini -text SAS/ResourceAssignment/ResourceAssignmentDatabase/radbpglistener.py -text SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_common_testing.py -text -SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_performance_test.py -text -SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py -text -SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.run -text -SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.sh -text +SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.py -text +SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.run -text +SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.sh -text +SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.py -text +SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.run -text +SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.sh -text SAS/ResourceAssignment/ResourceAssignmentEditor/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssignmentEditor/bin/CMakeLists.txt -text SAS/ResourceAssignment/ResourceAssignmentEditor/bin/raewebservice -text diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py index e84053fdb515b1c66421a592e9999f6ae8051574..75be797753f76263f52941fd1c3cafbc7cb38c63 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb.py @@ -68,6 +68,7 @@ class RADatabase: port=self.dbcreds.port, connect_timeout=5) self.cursor = self.conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) + self.conn.notices = collections.deque() def _queryAsSingleLine(self, query, qargs=None): line = ' '.join(query.replace('\n', ' ').split()) @@ -124,7 +125,7 @@ class RADatabase: if self.conn.notices: for notice in self.conn.notices: logger.info('database log message %s', notice.strip()) - del self.conn.notices[:] + self.conn.notices.clear() def commit(self): logger.info('commit') @@ -1458,22 +1459,23 @@ class RADatabase: raise ValueError('please provide either "where_resource_claim_ids" and/or "where_task_ids" argument for updateResourceClaims') conditions = [] + condition_values = [] if where_resource_claim_ids is not None: if isinstance(where_resource_claim_ids, int): # just a single id conditions.append('id = %s') - values.append(where_resource_claim_ids) + condition_values.append(where_resource_claim_ids) elif len(where_resource_claim_ids): #assume a list/enumerable of id's conditions.append('id in %s') - values.append(tuple(where_resource_claim_ids)) + condition_values.append(tuple(where_resource_claim_ids)) if where_task_ids is not None: if isinstance(where_task_ids, int): # just a single id conditions.append('task_id = %s') - values.append(where_task_ids) + condition_values.append(where_task_ids) elif len(where_task_ids): #assume a list/enumerable of id's conditions.append('task_id in %s') - values.append(tuple(where_task_ids)) + condition_values.append(tuple(where_task_ids)) if where_resource_types is not None: if isinstance(where_resource_types, basestring) or isinstance(where_resource_types, int): @@ -1491,16 +1493,21 @@ class RADatabase: where_resource_type_ids = [x for x in where_resource_types] conditions.append('resource_id in (SELECT r.id FROM virtual_instrument.resource r WHERE r.type_id in %s)') - values.append(tuple(where_resource_type_ids)) + condition_values.append(tuple(where_resource_type_ids)) query += ' WHERE ' + ' AND '.join(conditions) - self._executeQuery(query, values) + self._executeQuery(query, values + condition_values) if commit: self.commit() + if self.cursor.rowcount == 0: + # nothing updated, so let's check if there was nothing to update, or that the update failed + query = 'SELECT count(id) FROM resource_allocation.resource_claim WHERE ' + ' AND '.join(conditions) + return self._executeQuery(query, condition_values, fetch=_FETCH_ONE).get('count', 0) == 0 + return self.cursor.rowcount > 0 @@ -1759,9 +1766,12 @@ and/or claim_statuses. if isinstance(resource_ids, int): # just a single id conditions.append('resource_id = %s') qargs.append(resource_ids) + usages_per_resource[resource_ids] = {} # append default empty result dict elif resource_ids: #assume a list/enumerable of id's conditions.append('resource_id in %s') qargs.append(tuple(resource_ids)) + for resource_id in resource_ids: + usages_per_resource[resource_id] = {} # append default empty result dict if claim_statuses is not None: if isinstance(claim_statuses, basestring): @@ -1779,6 +1789,10 @@ and/or claim_statuses. conditions.append('status_id in %s') qargs.append(tuple(claim_status_ids)) + for rcs in self.getResourceClaimStatuses(): + for resource_id, result_dict in usages_per_resource.items(): + result_dict[rcs['id']] = [] # add default empty list for each requested resource_id for each known status + if conditions: query += ' WHERE ' + ' AND '.join(conditions) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql index 56504320b0c87b8c630ea7f6b6fd668908fef162..2242ac5e375ceb19c80d64d99db1863fcd48ae1e 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_functions_and_triggers.sql @@ -13,6 +13,8 @@ DECLARE claim_claimed_status_id int := 1; --beware: hard coded instead of lookup for performance task_approved_status_id int := 300; --beware: hard coded instead of lookup for performance task_conflict_status_id int := 335; --beware: hard coded instead of lookup for performance + task_finished_status_id int := 1000; --beware: hard coded instead of lookup for performance + task_aborted_status_id int := 1100; --beware: hard coded instead of lookup for performance BEGIN IF NEW.status_id <> OLD.status_id THEN IF NEW.status_id = task_approved_status_id OR NEW.status_id = task_conflict_status_id THEN @@ -25,7 +27,14 @@ BEGIN RAISE EXCEPTION 'Cannot update task status from % to % when not all its claims are claimed', OLD.status_id, NEW.status_id; END IF; END IF; -END IF; + + IF NEW.status_id = task_finished_status_id OR NEW.status_id = task_aborted_status_id THEN + -- if task ends, remove obsolete claims (keep long lasting claims) + DELETE FROM resource_allocation.resource_claim rc + WHERE rc.task_id=NEW.id + AND rc.endtime <= (SELECT sp.endtime FROM resource_allocation.specification sp WHERE sp.id=NEW.specification_id LIMIT 1); + END IF; + END IF; RETURN NEW; END; $BODY$ @@ -239,55 +248,14 @@ CREATE OR REPLACE FUNCTION resource_allocation.process_new_claim_into_resource_u RETURNS void AS $$ DECLARE - usage_at_or_before_start resource_allocation.resource_usage; - usage_at_or_before_end resource_allocation.resource_usage; - intermediate_usage resource_allocation.resource_usage; BEGIN - -- find resource_usage at claim starttime - SELECT * FROM resource_allocation.get_resource_usage_at_or_before(new_claim.resource_id, new_claim.status_id, new_claim.starttime, false, false, false) into usage_at_or_before_start; - SELECT * FROM resource_allocation.get_resource_usage_at_or_before(new_claim.resource_id, new_claim.status_id, new_claim.endtime, false, false, false) into usage_at_or_before_end; - - --add new_claim.claim_size at claim starttime to resource_usage depending on state of usage_at_or_before_start - IF usage_at_or_before_start IS NOT NULL THEN - IF usage_at_or_before_start.as_of_timestamp = new_claim.starttime THEN - --update at the claim starttime the already existing usage value at the claim starttime + new claim size - UPDATE resource_allocation.resource_usage ru SET usage = usage_at_or_before_start.usage + new_claim.claim_size - WHERE ru.id = usage_at_or_before_start.id; - ELSE - --insert at the claim starttime the existing usage value before claim starttime + new claim size - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (new_claim.resource_id, new_claim.status_id, new_claim.starttime, usage_at_or_before_start.usage + new_claim.claim_size); - END IF; - ELSE - -- no previous usage known, so insert this claim's size as the first usage - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (new_claim.resource_id, new_claim.status_id, new_claim.starttime, new_claim.claim_size); - END IF; + -- insert the claim's start and end delta + INSERT INTO resource_allocation.resource_usage_delta (claim_id, resource_id, status_id, moment, delta) + VALUES (new_claim.id, new_claim.resource_id, new_claim.status_id, new_claim.starttime, new_claim.claim_size), + (new_claim.id, new_claim.resource_id, new_claim.status_id, new_claim.endtime, -new_claim.claim_size); - --close resource_usage for this new_claim claim at claim endtime depending on state of usage_at_or_before_end - IF usage_at_or_before_end IS NOT NULL THEN - IF usage_at_or_before_end.as_of_timestamp <> new_claim.endtime THEN - --insert at the claim endtime the existing usage value before claim endtime - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (new_claim.resource_id, new_claim.status_id, new_claim.endtime, usage_at_or_before_end.usage); - END IF; - --TODO: 20180709; why no else with an upate? - ELSE - -- no previous usage known, so insert 0 as the last usage - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (new_claim.resource_id, new_claim.status_id, new_claim.endtime, 0); - END IF; - - --now modify any existing usages between new_claim claim starttime and endtime - FOR intermediate_usage IN SELECT * FROM resource_allocation.resource_usage ru - WHERE ru.resource_id = new_claim.resource_id - AND ru.status_id = new_claim.status_id - AND ru.as_of_timestamp > new_claim.starttime - AND ru.as_of_timestamp < new_claim.endtime - LOOP - UPDATE resource_allocation.resource_usage ru SET usage = intermediate_usage.usage + new_claim.claim_size - WHERE ru.id = intermediate_usage.id; - END LOOP; + -- with the two new delta entries, use the deltas table to rebuild the usages table from the claim's starttime onwards + PERFORM resource_allocation.rebuild_resource_usages_from_deltas_for_resource_of_status(new_claim.resource_id, new_claim.status_id, new_claim.starttime); END; $$ LANGUAGE plpgsql; ALTER FUNCTION resource_allocation.process_new_claim_into_resource_usages(new_claim resource_allocation.resource_claim) OWNER TO resourceassignment; @@ -296,16 +264,6 @@ COMMENT ON FUNCTION resource_allocation.process_new_claim_into_resource_usages(n --------------------------------------------------------------------------------------------------------------------- --- 20180903: brainstorm with AK & JS: the resource_usages table is useful because it makes lookups faster. However, --- there are known bugs in inserting/updating the resource_usages table upon changes in resource_claims. --- We discussed the idea of using an additional deltas helper table: claims -> deltas -> usages. --- the current implementation goes diretly from claims -> usages, and loops over claims "opening" and "closing" in the usage table. --- Introducing the intermediate deltas table has the benefit of using simple sql sum's, and not keeping track of opening/closing claims. --- Highly recommended to give this a try in JIRA SW-35. - ---------------------------------------------------------------------------------------------------------------------- - - CREATE OR REPLACE FUNCTION resource_allocation.rebuild_resource_usages_from_claims() RETURNS void AS $$ @@ -343,269 +301,120 @@ COMMENT ON FUNCTION resource_allocation.rebuild_resource_usages_from_claims_for_ CREATE OR REPLACE FUNCTION resource_allocation.rebuild_resource_usages_from_claims_for_resource_of_status(_resource_id int, _status_id int) RETURNS void AS $$ -DECLARE - claim resource_allocation.resource_claim; - finished_claim resource_allocation.resource_claim; - tmp_usage resource_allocation.resource_usage; - new_usage_value bigint := 0; - new_usage_id integer; BEGIN - -- make sure nobody thouches the these tables while running this function - LOCK TABLE resource_allocation.resource_claim IN ACCESS SHARE MODE; - LOCK TABLE resource_allocation.resource_usage IN EXCLUSIVE MODE; - LOCK TABLE resource_allocation._rebuild_usages_active_claims IN EXCLUSIVE MODE; - - -- delete the relevant usages (so we can re-enter them in this method) - DELETE FROM resource_allocation.resource_usage WHERE resource_id = _resource_id AND status_id = _status_id; - - -- make sure the helper tables are empty - TRUNCATE resource_allocation._rebuild_usages_active_claims; --tracks the 'open'/'active' claims (starttime < loop_timestamp < endtime) - TRUNCATE resource_allocation._rebuild_usages_active_usages; --will be filled with small subset of usages-table for faster lookups than in the big reource_usage table. - - -- process each claim for this _resource_id with _status_id - FOR claim IN (SELECT * FROM resource_allocation.resource_claim - WHERE resource_id = _resource_id - AND status_id = _status_id - ORDER BY starttime, endtime) - LOOP - -- keep helper table _rebuild_usages_active_usages small and quick-to-search in each iteration. - -- delete all 'closed'/'obsolete' usages from - -- any usage before the first usage before min(endtime) of the active_claims is obsolete. (yes, that's twice before) - SELECT * FROM resource_allocation._rebuild_usages_active_usages ru - WHERE ru.as_of_timestamp < (SELECT MIN(endtime) FROM resource_allocation._rebuild_usages_active_claims) - ORDER BY ru.as_of_timestamp DESC - LIMIT 1 - INTO tmp_usage; - IF tmp_usage IS NOT NULL THEN - -- remember from above? any usage before the first usage before min(starttime) of the active_claims is obsolete. - -- so, select the first usage before the usage we just found. - SELECT * FROM resource_allocation._rebuild_usages_active_usages ru - WHERE ru.as_of_timestamp < tmp_usage.as_of_timestamp - ORDER BY ru.as_of_timestamp DESC - LIMIT 1 - INTO tmp_usage; - - IF tmp_usage IS NOT NULL THEN - DELETE FROM resource_allocation._rebuild_usages_active_usages ru WHERE ru.as_of_timestamp < tmp_usage.as_of_timestamp; - END IF; - END IF; - - --'close' all finished claims (if any) - FOR finished_claim IN (SELECT * FROM resource_allocation._rebuild_usages_active_claims ac - WHERE ac.endtime <= claim.starttime - ORDER BY endtime) - LOOP - --(quick) search in the (small) _rebuild_usages_active_usages which holds only relevant usages - --find last usage at or before finished_claim.endtime - SELECT * FROM resource_allocation._rebuild_usages_active_usages ru - WHERE ru.as_of_timestamp <= finished_claim.endtime - ORDER BY ru.as_of_timestamp DESC - LIMIT 1 - INTO tmp_usage; - - IF tmp_usage IS NULL THEN - RAISE EXCEPTION 'tmp_usage should not be NULL while finishing active claims for claim % in rebuild_resource_usages_from_claims_for_resource_of_status(%, %)', finished_claim, _resource_id, _status_id; - END IF; - - -- integrate (add current value to previous value) - new_usage_value := tmp_usage.usage - finished_claim.claim_size; - - --a finished claim is 'closed' by subtracting the claim_size from the last usage value - --this happens either at an already existing usage timestamp, or at a new usage timestamp. - IF finished_claim.endtime = tmp_usage.as_of_timestamp THEN - --claim's endtime coincides with existing usage timestamp - --update the existing usage into the table - UPDATE resource_allocation.resource_usage - SET usage = new_usage_value - WHERE id = tmp_usage.id; - - --also update the usage in the the small _rebuild_usages_active_usages table. - UPDATE resource_allocation._rebuild_usages_active_usages - SET usage = new_usage_value - WHERE id = tmp_usage.id; - ELSE - --claim's endtime does not coincide with existing usage timestamp - --insert the new usage into the table - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (_resource_id, _status_id, finished_claim.endtime, new_usage_value) RETURNING id INTO new_usage_id; - - --also add the usage to the small _rebuild_usages_active_usages table, so it can be (quickly) searched. - INSERT INTO resource_allocation._rebuild_usages_active_usages (id, resource_id, status_id, as_of_timestamp, usage) - VALUES (new_usage_id, _resource_id, _status_id, finished_claim.endtime, new_usage_value); - END IF; - - --now that the claim has been 'closed', remove it from the active claims - DELETE FROM resource_allocation._rebuild_usages_active_claims WHERE id = finished_claim.id; - END LOOP; -- end loop over finished claims - - --all claims which finished at or before this claim's starttime are now closed. - --now, handle the new 'active' claim - - --(quick) search in the (small) _rebuild_usages_active_usages which holds only relevant usages - --find last usage at or before claim.starttime - SELECT * FROM resource_allocation._rebuild_usages_active_usages ru - WHERE ru.as_of_timestamp <= claim.starttime - ORDER BY ru.as_of_timestamp DESC - LIMIT 1 - INTO tmp_usage; - - --this 'active' claim 'opens' also either at an already existing usage timestamp or at a new usage timestamp. - IF tmp_usage IS NOT NULL AND claim.starttime = tmp_usage.as_of_timestamp THEN - --claim's starttime coincides with existing usage timestamp - -- integrate (add current value to previous value) - new_usage_value := tmp_usage.usage + claim.claim_size; - - --update the existing usage with the new_usage_value - UPDATE resource_allocation.resource_usage - SET usage = new_usage_value - WHERE id = tmp_usage.id; - - --also update the small _rebuild_usages_active_usages table, so it can be (quickly) searched. - UPDATE resource_allocation._rebuild_usages_active_usages - SET usage = new_usage_value - WHERE id = tmp_usage.id; - ELSE - --claim's starttime does not coincide with existing usage timestamp - IF tmp_usage IS NULL THEN - -- integrate (no previous value, so start of integral) - new_usage_value := claim.claim_size; - ELSE - -- integrate (add current value to previous value) - new_usage_value := tmp_usage.usage + claim.claim_size; - END IF; - - --and insert the new usage into the table - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (_resource_id, _status_id, claim.starttime, new_usage_value) RETURNING id INTO new_usage_id; - - --also add the usage to the small _rebuild_usages_active_usages table, so it can be (quickly) searched. - INSERT INTO resource_allocation._rebuild_usages_active_usages (id, resource_id, status_id, as_of_timestamp, usage) - VALUES (new_usage_id, _resource_id, _status_id, claim.starttime, new_usage_value); - END IF; - - --now that the claim has been 'opened', add it to the active claims - INSERT INTO resource_allocation._rebuild_usages_active_claims (id, resource_id, task_id, starttime, endtime, status_id, claim_size) - VALUES (claim.id, claim.resource_id, claim.task_id, claim.starttime, claim.endtime, claim.status_id, claim.claim_size); - END LOOP; - - --all claims were processed and at least opened - --so, conclude with 'closing' all still active claims - FOR finished_claim IN (SELECT * FROM resource_allocation._rebuild_usages_active_claims ac - ORDER BY endtime) - LOOP - -- (quick) search in the (small) _rebuild_usages_active_usages which holds only relevant usages - SELECT * FROM resource_allocation._rebuild_usages_active_usages ru - WHERE ru.as_of_timestamp <= finished_claim.endtime - ORDER BY ru.as_of_timestamp DESC - LIMIT 1 - INTO tmp_usage; - - IF tmp_usage IS NULL THEN - RAISE EXCEPTION 'tmp_usage should not be NULL while finishing processing opened claims for claim % in rebuild_resource_usages_from_claims_for_resource_of_status(%, %)', finished_claim, _resource_id, _status_id; - END IF; - - -- integrate (add current value to previous value) - new_usage_value := tmp_usage.usage - finished_claim.claim_size; - - --a finished claim is 'closed' by subtracting the claim_size from the last_usage_value - --this happens either at an already existing usage timestamp, or at a new usage timestamp. - IF finished_claim.endtime = tmp_usage.as_of_timestamp THEN - --claim's endtime coincides with existing usage timestamp - UPDATE resource_allocation.resource_usage - SET usage = new_usage_value - WHERE id = tmp_usage.id; - - --also update the small _rebuild_usages_active_usages table, so it can be (quickly) searched. - UPDATE resource_allocation._rebuild_usages_active_usages - SET usage = new_usage_value - WHERE id = tmp_usage.id; - ELSE - --claim's endtime does not coincide with existing usage timestamp - --insert the new usage into the table - INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) - VALUES (_resource_id, _status_id, finished_claim.endtime, new_usage_value) RETURNING id INTO new_usage_id; - - --also add the usage to the small _rebuild_usages_active_usages table, so it can be (quickly) searched. - INSERT INTO resource_allocation._rebuild_usages_active_usages (id, resource_id, status_id, as_of_timestamp, usage) - VALUES (new_usage_id, _resource_id, _status_id, finished_claim.endtime, new_usage_value); - END IF; - - --now that the claim has been 'closed', remove it from the active claims - DELETE FROM resource_allocation._rebuild_usages_active_claims WHERE id = finished_claim.id; - END LOOP; - - -- wipe the helper tables - TRUNCATE resource_allocation._rebuild_usages_active_claims; - TRUNCATE resource_allocation._rebuild_usages_active_usages; + -- delete all the relevant deltas (so we can re-enter them in this method) + DELETE FROM resource_allocation.resource_usage_delta WHERE resource_id = _resource_id AND status_id = _status_id; + + -- build up the delta's table by inserting positive claim_size delta's at all claim starttimes... + INSERT INTO resource_allocation.resource_usage_delta (claim_id, resource_id, status_id, moment, delta) + (SELECT rc.id, _resource_id, _status_id, rc.starttime, rc.claim_size + FROM resource_allocation.resource_claim rc + WHERE rc.resource_id = _resource_id + AND rc.status_id = _status_id); + + -- ...and by inserting negative claim_size delta's at all claim endtimes + INSERT INTO resource_allocation.resource_usage_delta (claim_id, resource_id, status_id, moment, delta) + (SELECT rc.id, _resource_id, _status_id, rc.endtime, -rc.claim_size + FROM resource_allocation.resource_claim rc + WHERE rc.resource_id = _resource_id + AND rc.status_id = _status_id); + + -- now that the deltas table has been rebuild, use it to rebuild the usages table + PERFORM resource_allocation.rebuild_resource_usages_from_deltas_for_resource_of_status(_resource_id, _status_id); END; $$ LANGUAGE plpgsql; ALTER FUNCTION resource_allocation.rebuild_resource_usages_from_claims_for_resource_of_status(int, int) OWNER TO resourceassignment; COMMENT ON FUNCTION resource_allocation.rebuild_resource_usages_from_claims_for_resource_of_status(int, int) IS 'function which rebuilds the resource_usages table for the claims with a specific status for a specific resource.'; ---------------------------------------------------------------------------------------------------------------------- -CREATE OR REPLACE FUNCTION resource_allocation.process_old_claim_outof_resource_usages(old_claim resource_allocation.resource_claim) + +CREATE OR REPLACE FUNCTION resource_allocation.rebuild_resource_usages_from_deltas_for_resource_of_status(_resource_id int, _status_id int, _since timestamp default NULL) RETURNS void AS $$ DECLARE - usage_at_start RECORD; - usage_before_start RECORD; - usage_at_end RECORD; - intermediate_usage RECORD; + combined_delta_row record; + running_usage_sum bigint; + usage_before_since resource_allocation.resource_usage; BEGIN - -- find resource_usage at claim starttime - SELECT * FROM resource_allocation.get_resource_usage_at_or_before(old_claim.resource_id, old_claim.status_id, old_claim.starttime, true, false, true) into usage_at_start; - - IF usage_at_start IS NULL THEN - RAISE EXCEPTION 'process_old_claim_outof_resource_usages(%) cannot find usage_at_start', old_claim; - END IF; - - - -- and find resource_usage at claim endtime - SELECT * FROM resource_allocation.get_resource_usage_at_or_before(old_claim.resource_id, old_claim.status_id, old_claim.endtime, true, false, true) into usage_at_end; + -- here are two versions of the same algorithm + -- if _since is NULL, then run over the entire timespan + -- else, do time-bound queries which are slightly slower. + IF _since IS NULL THEN + -- delete the relevant usages + DELETE FROM resource_allocation.resource_usage + WHERE resource_id = _resource_id + AND status_id = _status_id; + + running_usage_sum := 0; + + -- perform integration over delta's and insert into resource_usage + FOR combined_delta_row in (SELECT rud.moment, SUM(rud.delta) as summed_delta + FROM resource_allocation.resource_usage_delta rud + WHERE rud.resource_id = _resource_id + AND rud.status_id = _status_id + GROUP BY rud.moment + ORDER BY rud.moment) LOOP + --integrate + running_usage_sum := running_usage_sum + combined_delta_row.summed_delta; + + --and insert into resource_usage + INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) + VALUES (_resource_id, _status_id, combined_delta_row.moment, running_usage_sum); + END LOOP; + ELSE + -- same alghorithm as above, but now timerange-bound as of _since + -- delete the relevant usages + DELETE FROM resource_allocation.resource_usage + WHERE resource_id = _resource_id + AND status_id = _status_id + AND as_of_timestamp >= _since; + + -- get the usage_before_since to initialize running_usage_sum with + SELECT * FROM resource_allocation.get_resource_usage_at_or_before(_resource_id, _status_id, _since, false, true, false) + INTO usage_before_since; + + IF usage_before_since is NULL THEN + running_usage_sum := 0; + ELSE + running_usage_sum := usage_before_since.usage; + END IF; - IF usage_at_end IS NULL THEN - RAISE EXCEPTION 'process_old_claim_outof_resource_usages(%) cannot find usage_at_end', old_claim; + -- perform integration over delta's since _since and insert into resource_usage + FOR combined_delta_row in (SELECT rud.moment, SUM(rud.delta) as summed_delta + FROM resource_allocation.resource_usage_delta rud + WHERE rud.resource_id = _resource_id + AND rud.status_id = _status_id + AND rud.moment >= _since + GROUP BY rud.moment + ORDER BY rud.moment) LOOP + --integrate + running_usage_sum := running_usage_sum + combined_delta_row.summed_delta; + + --and insert into resource_usage + INSERT INTO resource_allocation.resource_usage (resource_id, status_id, as_of_timestamp, usage) + VALUES (_resource_id, _status_id, combined_delta_row.moment, running_usage_sum); + END LOOP; END IF; +END; +$$ LANGUAGE plpgsql; +ALTER FUNCTION resource_allocation.rebuild_resource_usages_from_deltas_for_resource_of_status(int, int, timestamp) OWNER TO resourceassignment; +COMMENT ON FUNCTION resource_allocation.rebuild_resource_usages_from_deltas_for_resource_of_status(int, int, timestamp) + IS 'function which rebuilds the resource_usages table from the resource_usage_deltas table with a specific status for a specific resource since a given timestamp.'; +--------------------------------------------------------------------------------------------------------------------- - IF usage_at_start.usage = old_claim.claim_size THEN - IF usage_at_end.usage = 0 THEN - -- find resource_usage before claim starttime - SELECT * FROM resource_allocation.get_resource_usage_at_or_before(old_claim.resource_id, old_claim.status_id, old_claim.starttime, false, true, false) into usage_before_start; - - IF usage_before_start IS NULL OR (usage_before_start IS NOT NULL AND usage_before_start.usage = 0) THEN - --usage_at_start was 'caused' by this deleted claim only, so delete it - DELETE FROM resource_allocation.resource_usage ru WHERE ru.id = usage_at_start.id; - ELSE - UPDATE resource_allocation.resource_usage ru SET usage = 0 WHERE ru.id = usage_at_start.id; - END IF; - ELSE - --usage_at_start was 'caused' by this deleted claim only, so delete it - DELETE FROM resource_allocation.resource_usage ru WHERE ru.id = usage_at_start.id; - END IF; - ELSE - --update the usage_at_start.usage by subtracting the deleted claim size - UPDATE resource_allocation.resource_usage ru SET usage = usage_at_start.usage - old_claim.claim_size - WHERE ru.id = usage_at_start.id; - END IF; - IF usage_at_end.usage = 0 THEN - --usage_at_end was 'caused' by this deleted claim only, so delete it - --TODO:20180704 do not delete if another claim with this status and timestamp also causes this 0 - DELETE FROM resource_allocation.resource_usage ru WHERE ru.id = usage_at_end.id; - END IF; +--------------------------------------------------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION resource_allocation.process_old_claim_outof_resource_usages(old_claim resource_allocation.resource_claim) + RETURNS void AS +$$ +DECLARE +BEGIN + -- get rid of claim in delta's table (this should delete two entries, one for the starttime, and one for the endtime) + DELETE FROM resource_allocation.resource_usage_delta WHERE claim_id = old_claim.id; - --now modify any existing usages between old_claim claim starttime and endtime - FOR intermediate_usage IN SELECT * FROM resource_allocation.resource_usage ru - WHERE ru.resource_id = old_claim.resource_id - AND ru.status_id = old_claim.status_id - AND ru.as_of_timestamp > old_claim.starttime - AND ru.as_of_timestamp < old_claim.endtime - LOOP - UPDATE resource_allocation.resource_usage ru SET usage = intermediate_usage.usage - old_claim.claim_size - WHERE ru.id = intermediate_usage.id; - END LOOP; + -- with the two removed delta entries, use the deltas table to rebuild the usages table from the claim's starttime onwards + PERFORM resource_allocation.rebuild_resource_usages_from_deltas_for_resource_of_status(old_claim.resource_id, old_claim.status_id, old_claim.starttime); END; $$ LANGUAGE plpgsql; ALTER FUNCTION resource_allocation.process_old_claim_outof_resource_usages(old_claim resource_allocation.resource_claim) OWNER TO resourceassignment; @@ -697,8 +506,6 @@ DECLARE max_resource_usage_in_time_window resource_allocation.resource_usage; max_resource_at_or_before_starttime resource_allocation.resource_usage; BEGIN - SELECT * FROM resource_allocation.get_resource_usage_at_or_before(_resource_id, _claim_status_id, _lower, false, false, false) into max_resource_at_or_before_starttime; - SELECT * FROM resource_allocation.resource_usage ru WHERE ru.resource_id = _resource_id AND ru.status_id = _claim_status_id @@ -707,20 +514,23 @@ BEGIN ORDER BY ru.usage DESC LIMIT 1 INTO max_resource_usage_in_time_window; - IF max_resource_usage_in_time_window IS NOT NULL THEN - IF max_resource_at_or_before_starttime IS NULL THEN - RETURN max_resource_usage_in_time_window; - ELSE - IF max_resource_usage_in_time_window.usage > max_resource_at_or_before_starttime.usage THEN - RETURN max_resource_usage_in_time_window; - ELSE - RETURN max_resource_at_or_before_starttime; + IF max_resource_usage_in_time_window IS NULL THEN + -- no usages withing given window, so return first usage before window (which extends in time into this window) + SELECT * FROM resource_allocation.get_resource_usage_at_or_before(_resource_id, _claim_status_id, _lower, false, false, false) INTO max_resource_at_or_before_starttime; + RETURN max_resource_at_or_before_starttime; + END IF; + + IF max_resource_usage_in_time_window.as_of_timestamp > _lower THEN -- Skips as_of_timestamp = _lower on purpose + -- check if the usage at_or_before_starttime is higher then in_time_window + SELECT * FROM resource_allocation.get_resource_usage_at_or_before(_resource_id, _claim_status_id, _lower, false, false, false) INTO max_resource_at_or_before_starttime; + IF max_resource_at_or_before_starttime IS NOT NULL THEN + IF max_resource_at_or_before_starttime.usage > max_resource_usage_in_time_window.usage THEN + RETURN max_resource_at_or_before_starttime; END IF; END IF; - ELSE - -- could also be NULL but that is checked for elsewhere - RETURN max_resource_at_or_before_starttime; END IF; + + RETURN max_resource_usage_in_time_window; END; $$ LANGUAGE plpgsql; ALTER FUNCTION resource_allocation.get_max_resource_usage_between(_resource_id int, _claim_status_id int, _lower timestamp, _upper timestamp) OWNER TO resourceassignment; @@ -750,23 +560,26 @@ BEGIN -- available_capacity is a truly measured metric (by tools like df (disk-free)) SELECT available, total FROM resource_monitoring.resource_capacity WHERE resource_id = _resource_id LIMIT 1 INTO available_capacity, total_capacity; - -- determine how much of the used_capacity is 'accounted for' by claims. - -- this is a best guess of the amount of data which we know that should be on the resource. - -- we can only 'measure' that at this moment, - -- so take the current resource usage - SELECT usage FROM resource_allocation.get_current_resource_usage(_resource_id, claimed_status_id) INTO current_claimed_usage; - IF available_capacity = total_capacity THEN --this is not a monitored resource, and hence we do not know how much space is actually available. --make a best guess by subtracting the current_claimed_usage from the total_capacity - RETURN total_capacity - max_resource_usage_value; ELSE --this is a monitored resource, and the claimable_capacity is not just the free space (available_capacity) at this moment! -- we have to take into account what we know about already claimed portions, -- both at this moment (current_claimed_usage) and for the planned claim (max_resource_usage_value, between _lower and _upper) - RETURN available_capacity + current_claimed_usage - max_resource_usage_value; + -- determine how much of the used_capacity is 'accounted for' by claims. + -- this is a best guess of the amount of data which we know that should be on the resource. + -- we can only 'measure' that at this moment, + -- so take the current resource usage + SELECT usage FROM resource_allocation.get_current_resource_usage(_resource_id, claimed_status_id) INTO current_claimed_usage; + + IF current_claimed_usage IS NOT NULL THEN + RETURN available_capacity + current_claimed_usage - max_resource_usage_value; + END IF; + + RETURN available_capacity - max_resource_usage_value; END IF; END; $$ LANGUAGE plpgsql; @@ -853,7 +666,6 @@ DECLARE claim_has_conflicts boolean; BEGIN --order of following steps is important, do not reorder the steps - IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN IF NEW.starttime >= NEW.endtime THEN -- Conceptually, you can't claim and release a resource at the same timestamp. @@ -896,9 +708,7 @@ BEGIN --update the resource usages affected by this claim --do this before we check for conflicts, because this claim might be shifted for example --which might influence the resource_usages which determine wheter a claim fits. - IF OLD.resource_id <> 117 THEN --20180903: skip checking of cep4 storage until JIRA SW-35 is solved. - PERFORM resource_allocation.process_old_claim_outof_resource_usages(OLD); - END IF; + PERFORM resource_allocation.process_old_claim_outof_resource_usages(OLD); END IF; --only check claim if status and/or claim_size and/or start/end time changed @@ -906,33 +716,29 @@ BEGIN OLD.claim_size <> NEW.claim_size OR OLD.starttime <> NEW.starttime OR OLD.endtime <> NEW.endtime)) THEN - IF NEW.resource_id <> 117 THEN --20180903: skip checking of cep4 storage until JIRA SW-35 is solved. - --check if claim fits or has conflicts - SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(NEW) INTO claim_has_conflicts; - - IF claim_has_conflicts THEN - IF NEW.status_id <> claim_conflict_status_id THEN - -- only set claims to conflict if task status <= queued - -- when a claim goes to conflict, then so does it's task, and we don't want that for running/finished/aborted tasks - IF EXISTS (SELECT 1 FROM resource_allocation.task - WHERE id=NEW.task_id - AND status_id = ANY(ARRAY[300, 335, 350, 400, 500])) THEN -- hardcoded tasks statuses <= queued - -- conflict with others, so set claim status to conflict - NEW.status_id := claim_conflict_status_id; - END IF; - END IF; - ELSE - -- no conflict (anymore) with others, so set claim status to tentative if currently in conflict - IF NEW.status_id = claim_conflict_status_id THEN - NEW.status_id := claim_tentative_status_id; + --check if claim fits or has conflicts + SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(NEW) INTO claim_has_conflicts; + + IF claim_has_conflicts THEN + IF NEW.status_id <> claim_conflict_status_id THEN + -- only set claims to conflict if task status <= queued + -- when a claim goes to conflict, then so does it's task, and we don't want that for running/finished/aborted tasks + IF EXISTS (SELECT 1 FROM resource_allocation.task + WHERE id=NEW.task_id + AND status_id = ANY(ARRAY[task_approved_status_id, task_conflict_status_id, task_prescheduled_status_id, task_scheduled_status_id, task_queued_status_id])) THEN -- hardcoded tasks statuses <= queued + -- conflict with others, so set claim status to conflict + NEW.status_id := claim_conflict_status_id; END IF; END IF; + ELSE + -- no conflict (anymore) with others, so set claim status to tentative if currently in conflict + IF NEW.status_id = claim_conflict_status_id THEN + NEW.status_id := claim_tentative_status_id; + END IF; END IF; - IF TG_OP = 'INSERT' OR TG_OP = 'UPDATE' THEN - --update the resource usages affected by this claim - PERFORM resource_allocation.process_new_claim_into_resource_usages(NEW); - END IF; + --update the resource usages affected by this claim + PERFORM resource_allocation.process_new_claim_into_resource_usages(NEW); END IF; IF TG_OP = 'DELETE' THEN @@ -967,6 +773,8 @@ DECLARE claim_conflict_status_id int := 2; --beware: hard coded instead of lookup for performance task_approved_status_id int := 300; --beware: hard coded instead of lookup for performance task_conflict_status_id int := 335; --beware: hard coded instead of lookup for performance + task_finished_status_id int := 1000; --beware: hard coded instead of lookup for performance + task_aborted_status_id int := 1100; --beware: hard coded instead of lookup for performance affected_claim resource_allocation.resource_claim; claim_has_conflicts boolean; BEGIN @@ -981,6 +789,7 @@ BEGIN IF NOT EXISTS (SELECT id FROM resource_allocation.resource_claim WHERE task_id = NEW.task_id AND status_id = claim_conflict_status_id) THEN + IF NOT EXISTS (SELECT id FROM resource_allocation.task WHERE id = NEW.task_id AND status_id = task_approved_status_id) THEN @@ -1002,14 +811,12 @@ BEGIN AND rc.endtime >= OLD.starttime AND rc.starttime < OLD.endtime LOOP - IF affected_claim.resource_id <> 117 THEN --20180903: skip checking of cep4 storage until JIRA SW-35 is solved. - --check if claim fits or has conflicts - SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(affected_claim) INTO claim_has_conflicts; + --check if claim fits or has conflicts + SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(affected_claim) INTO claim_has_conflicts; - IF NOT claim_has_conflicts THEN - -- no conflict (anymore) with others, so set claim status to tentative - UPDATE resource_allocation.resource_claim SET status_id=claim_tentative_status_id WHERE id = affected_claim.id; - END IF; + IF NOT claim_has_conflicts THEN + -- no conflict (anymore) with others, so set claim status to tentative + UPDATE resource_allocation.resource_claim SET status_id=claim_tentative_status_id WHERE id = affected_claim.id; END IF; END LOOP; END IF; @@ -1025,18 +832,31 @@ BEGIN AND rc.endtime >= NEW.starttime AND rc.starttime < NEW.endtime LOOP - IF affected_claim.resource_id <> 117 THEN --20180903: skip checking of cep4 storage until JIRA SW-35 is solved. - --check if claim fits or has conflicts - SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(affected_claim) INTO claim_has_conflicts; + --check if claim fits or has conflicts + SELECT * FROM resource_allocation.has_conflict_with_overlapping_claims(affected_claim) INTO claim_has_conflicts; - IF claim_has_conflicts THEN - -- new conflict for affected_claim because this NEW claim is now claimed - UPDATE resource_allocation.resource_claim SET status_id=claim_conflict_status_id WHERE id = affected_claim.id; - END IF; + IF claim_has_conflicts THEN + -- new conflict for affected_claim because this NEW claim is now claimed + UPDATE resource_allocation.resource_claim SET status_id=claim_conflict_status_id WHERE id = affected_claim.id; END IF; END LOOP; END IF; + IF TG_OP = 'UPDATE' THEN + -- delete obsolete claim when task is finished/aborted + IF NEW.status_id = claim_claimed_status_id AND + NEW.endtime <> OLD.endtime AND + NEW.endtime <= (SELECT * FROM utcnow() LIMIT 1) THEN + -- this claim is obsolete... + IF EXISTS (SELECT id FROM resource_allocation.task t WHERE t.id = NEW.task_id AND t.status_id IN (task_finished_status_id, task_aborted_status_id) ) THEN + -- ...and it's task is finished/aborted + -- so, delete this claim + DELETE FROM resource_allocation.resource_claim rc + WHERE rc.id=NEW.id; + END IF; + END IF; + END IF; + IF TG_OP = 'DELETE' THEN RETURN OLD; END IF; diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_resource_allocation_statics.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_resource_allocation_statics.sql index e56bd04d19edfee931c0222d488011d4c306fd92..8b18e25d296e969da10180d5c5567346e3fefafd 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_resource_allocation_statics.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/add_resource_allocation_statics.sql @@ -2,6 +2,8 @@ -- psql resourceassignment -U resourceassignment -f add_resource_allocation_statics.sql -W BEGIN; + +-- General Warning: if you change any of these values, make sure you update the hardcoded values in add_functions_and_triggers.sql too. INSERT INTO resource_allocation.task_status VALUES (200, 'prepared'), (300, 'approved'), (320, 'on_hold'), (335, 'conflict'), (350, 'prescheduled'), (400, 'scheduled'), (500, 'queued'), (600, 'active'), (900, 'completing'), (1000, 'finished'), (1100, 'aborted'), (1150, 'error'), (1200, 'obsolete'); -- This is the list from OTDB, we'll need to merge it with the list from MoM in the future, might use different indexes? diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_database.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_database.sql index 40337a9dc5f0bebe742f0ca55cda5d47dd44d6c9..a82dfe3779bc0bd495cdef58acc7d4601daeb4ca 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_database.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/radb/sql/create_database.sql @@ -33,6 +33,7 @@ DROP TABLE IF EXISTS resource_monitoring.resource_group_availability CASCADE; DROP TABLE IF EXISTS resource_monitoring.resource_availability CASCADE; DROP TABLE IF EXISTS resource_monitoring.resource_capacity CASCADE; DROP TABLE IF EXISTS resource_allocation.resource_usage CASCADE; +DROP TABLE IF EXISTS resource_allocation.resource_usage_delta CASCADE; DROP TABLE IF EXISTS resource_allocation.resource_claim_property CASCADE; DROP TABLE IF EXISTS resource_allocation.resource_claim_property_type CASCADE; DROP TABLE IF EXISTS resource_allocation.resource_claim_property_io_type CASCADE; @@ -251,14 +252,6 @@ CREATE INDEX resource_claim_resource_id_idx CREATE INDEX resource_claim_status_id_idx ON resource_allocation.resource_claim (status_id); -CREATE TABLE resource_allocation._rebuild_usages_active_claims ( LIKE resource_allocation.resource_claim INCLUDING INDEXES INCLUDING CONSTRAINTS ); -DROP INDEX resource_allocation._rebuild_usages_active_claims_resource_id_idx; --remove unnecessary index -DROP INDEX resource_allocation._rebuild_usages_active_claims_status_id_idx; --remove unnecessary index -DROP INDEX resource_allocation._rebuild_usages_active_claims_task_id_idx; --remove unnecessary index -ALTER TABLE resource_allocation._rebuild_usages_active_claims - OWNER TO resourceassignment; -COMMENT ON TABLE resource_allocation._rebuild_usages_active_claims - IS 'helper table for the rebuild_resource_usages_from_claims_for_resource_of_status method.'; CREATE TABLE resource_allocation.conflict_reason ( id serial NOT NULL, @@ -359,13 +352,30 @@ CREATE INDEX resource_usage_resource_id_idx CREATE INDEX resource_usage_status_id_idx ON resource_allocation.resource_usage (status_id); -CREATE TABLE resource_allocation._rebuild_usages_active_usages ( LIKE resource_allocation.resource_usage INCLUDING INDEXES INCLUDING CONSTRAINTS ); -DROP INDEX resource_allocation._rebuild_usages_active_usages_status_id_idx; --remove unnecessary index -DROP INDEX resource_allocation._rebuild_usages_active_usages_resource_id_idx; --remove unnecessary index -ALTER TABLE resource_allocation._rebuild_usages_active_usages +CREATE TABLE resource_allocation.resource_usage_delta ( + id serial NOT NULL, + claim_id integer NOT NULL, -- yes, this is a reference to resource_allocation.resource_claim.id, but it's not defined as a reference because it is already used in the before_insert trigger when the claim id does not exist in the claim table yet. We do the consistent bookkeeping in the trigger functions ourselves. + resource_id integer NOT NULL REFERENCES virtual_instrument.resource ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE, + status_id integer NOT NULL REFERENCES resource_allocation.resource_claim_status ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE, + moment timestamp NOT NULL, + delta bigint NOT NULL, + PRIMARY KEY (id) +) WITH (OIDS=FALSE); +ALTER TABLE resource_allocation.resource_usage_delta OWNER TO resourceassignment; -COMMENT ON TABLE resource_allocation._rebuild_usages_active_usages - IS 'helper table for the rebuild_resource_usages_from_claims_for_resource_of_status method.'; +COMMENT ON TABLE resource_allocation.resource_usage_delta + IS 'intermediate helper table to quickly compute resource_usage from resource_claim.'; + +CREATE INDEX resource_usage_delta_moment_idx + ON resource_allocation.resource_usage_delta + USING btree (moment); + +CREATE INDEX resource_usage_delta_idx + ON resource_allocation.resource_usage_delta (claim_id, resource_id, status_id); + + + + CREATE TABLE resource_monitoring.resource_availability ( id serial NOT NULL, diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/CMakeLists.txt b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/CMakeLists.txt index 439c9ccb70041b11f720e5bdb0aaebdd9b4f2a6b..7bc26cd3333a391eebc200387ef92a1e2cae8961 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/CMakeLists.txt +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/CMakeLists.txt @@ -5,5 +5,6 @@ include(FindPythonModule) find_python_module(testing.postgresql) find_python_module(mock) -lofar_add_test(t_radb) +lofar_add_test(t_radb_functionality) +lofar_add_test(t_radb_performance) diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_performance_test.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_performance_test.py deleted file mode 100755 index 300e97077358c0d3824f45b153b4a922a2e22a9c..0000000000000000000000000000000000000000 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/radb_performance_test.py +++ /dev/null @@ -1,113 +0,0 @@ -#!/usr/bin/python - -# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) -# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands -# -# This file is part of the LOFAR software suite. -# The LOFAR software suite is free software: you can redistribute it and/or -# modify it under the terms of the GNU General Public License as published -# by the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. -# -# The LOFAR software suite is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License along -# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. - -# $Id: $ -from optparse import OptionParser -import os -from datetime import datetime, timedelta -import logging -from random import randint - -logger = logging.getLogger(__name__) - -from lofar.common import dbcredentials -from lofar.sas.resourceassignment.database.radb import RADatabase -from lofar.common.datetimeutils import totalSeconds - -def test_resource_usages_performance(radb): - radb.updateResourceAvailability(117, available_capacity=10000000, total_capacity=10000000) - - now = datetime.utcnow() - now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour - spec_ids = [] - filename = 'resource_usages_performance%s.csv' % (datetime.utcnow().strftime('%Y%m%dT%H%M%S'),) - with open(filename, 'w') as file: - file.write('#claims, elapsed_insert, elapsed_rebuild\n') - counter = 0 - for k in range(20): - num_claims_to_insert = 20 - num_insert_repeats = 10 - elapsed_insert = 0 - for i in range(num_insert_repeats): - counter += 1 - result = radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation', - now+timedelta(hours=3*counter), - now + timedelta(hours=1 + 3*counter), - 'content', 'CEP4') - task_id = result['task_id'] - task = radb.getTask(task_id) - spec_ids.append(task['specification_id']) - - claims = [{'resource_id': 117, - 'starttime': task['starttime']-timedelta(minutes=randint(0, 1800)), - 'endtime': task['starttime']+timedelta(seconds=randint(1801, 3600)), - 'status': 'tentative', - 'claim_size': q} - for q in range(num_claims_to_insert)] - - start = datetime.utcnow() - radb.insertResourceClaims(task_id, claims, 'foo', 1, 1) - elapsed_insert += totalSeconds(datetime.utcnow() - start) - elapsed_insert /= 10 - - start = datetime.utcnow() - # make sure the usage table is wiped, so asserts fail when rebuild_resource_usages_from_claims is erroneously roll'ed back. - radb.rebuild_resource_usages_from_claims(117, 'tentative') - elapsed_rebuild = totalSeconds(datetime.utcnow() - start) - - logger.info('TEST RESULT: radb now contains %d claims, insert of %d claims takes on average %.3fsec and a rebuild of the whole usage table takes %.3fsec', - len(radb.getResourceClaims()), num_claims_to_insert, elapsed_insert, elapsed_rebuild) - file.write('%d, %.3f, %.3f\n' % (len(radb.getResourceClaims()), elapsed_insert, elapsed_rebuild)) - file.flush() - - logger.info('removing all test specs/tasks/claims from db') - - for spec_id in spec_ids: - radb.deleteSpecification(spec_id) - - logger.info('Done. Results can be found in file: %s', filename) - -if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',level=logging.INFO) - - # Check the invocation arguments - parser = OptionParser("%prog [options]", description='runs some test queries on the radb') - parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') - parser.add_option_group(dbcredentials.options_group(parser)) - parser.set_defaults(dbcredentials="RADB") - (options, args) = parser.parse_args() - - dbcreds = dbcredentials.parse_options(options) - - print - print 'Using dbcreds: %s' % dbcreds.stringWithHiddenPassword() - print 'Are you sure you want to run the performance tests on this database? Tables will be modified! Precious data might be lost!' - print 'This test gives the most reproducable results when run on a clean database.' - print - answer = raw_input('CONTINUE? y/<n>: ') - if 'y' not in answer.lower(): - print 'Exiting without running the test...' - exit(1) - - print 'Starting test....' - radb = RADatabase(dbcreds=dbcreds, log_queries=options.verbose) - - test_resource_usages_performance(radb) - - diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.run b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.run deleted file mode 100755 index 7895b4e99028fba6866f5d359567fe3a9483f912..0000000000000000000000000000000000000000 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.run +++ /dev/null @@ -1,5 +0,0 @@ -#!/bin/bash - -# Run the unit test -source python-coverage.sh -python_coverage_test "ResourceAssignmentDatabase/*" t_radb.py diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.sh b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.sh deleted file mode 100755 index d7aa7a173a26cde54d69dbbfffd7d825a2450292..0000000000000000000000000000000000000000 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/sh - -./runctest.sh t_radb diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.py similarity index 93% rename from SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py rename to SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.py index 98170ed7993acf3371c59071338484814d4f1259..26ae9f3ad8b655717bb149a402e391cfa062f416 100755 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb.py +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.py @@ -1055,15 +1055,15 @@ class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest): # when a task is > queued (for example, finished) # then we don't want conflict statuses anymore if we update start/endtimes # test here with weird starttime shift back to overlap with task1 - self.assertTrue(self.radb.updateTask(task_id2, task_status='finished')) - self.assertEqual('finished', self.radb.getTask(task_id2)['status']) + self.assertTrue(self.radb.updateTask(task_id2, task_status='active')) + self.assertEqual('active', self.radb.getTask(task_id2)['status']) self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, starttime=task1['starttime'])) - self.assertEqual('finished', self.radb.getTask(task_id2)['status']) + self.assertEqual('active', self.radb.getTask(task_id2)['status']) self.assertEqual('claimed', self.radb.getResourceClaim(t2_claim_ids[0])['status']) #ok, that works, now set the start/end time back to 'normal' for some later test cases self.assertTrue(self.radb.updateTaskAndResourceClaims(task_id2, starttime=now+timedelta(hours=2), endtime=now+timedelta(hours=3))) - self.assertEqual('finished', self.radb.getTask(task_id2)['status']) + self.assertEqual('active', self.radb.getTask(task_id2)['status']) self.assertEqual('claimed', self.radb.getResourceClaim(t2_claim_ids[0])['status']) @@ -1689,37 +1689,36 @@ class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest): # assume the data has NOT been written YET, and that the claim of size 60 does NOT occupy 60 YET on the resource # this happens during the observation. Data is being written, but the system does not know it yet. - # then there should be still be 100-nothing=100 claimable_capacity left - # self.assertEqual(100, self.radb.get_resource_claimable_capacity(cep4_id, task['starttime'], task['endtime'])) + # then there should be still be 100-60(claimed)-nothing=40 claimable_capacity left + self.assertEqual(40, self.radb.get_resource_claimable_capacity(cep4_id, task['starttime'], task['endtime'])) # assume the observation finished, and data has been written # so, the claim of size 60 now occupies 60 on the resource # that would be detected (by the storagequeryservice) and propagated into the radb # so, let's update the available_capacity self.assertTrue(self.radb.updateResourceAvailability(cep4_id, available_capacity=100-60)) - # and there should be 100-60=40 claimable_capacity left - self.assertEqual(40, self.radb.get_resource_claimable_capacity(cep4_id, task['starttime'], task['endtime'])) - # check the capacities of the resource # please note that the misc_used_capacity=0 because we used exactly the same amount of diskspace as was claimed (as it should be) self.assertEqual(100, self.radb.getResources(cep4_id, include_availability=True)[0]['total_capacity']) self.assertEqual( 40, self.radb.getResources(cep4_id, include_availability=True)[0]['available_capacity']) self.assertEqual( 60, self.radb.getResources(cep4_id, include_availability=True)[0]['used_capacity']) self.assertEqual( 0, self.radb.getResources(cep4_id, include_availability=True)[0]['misc_used_capacity']) + # and there should be 100-60=40 claimable_capacity left, because there is 60 claimed and 60 used (which matches fine, like it should) + self.assertEqual(40, self.radb.get_resource_claimable_capacity(cep4_id, task['starttime'], task['endtime'])) # so far, so good... # now onto the situation in practice.... - # suppose there is some additional (20) miscelaneous data on cep4, which is not known in claims (like backups/logs/other_data) - # this should be reflected in the available_capacity and misc_used_capacity + # suppose there is some additional (20) miscellaneous data on cep4, which is not known in claims (like backups/logs/other_data) + # this should be reflected in the available_capacity and misc_used_capacity # available_capacity = 100-60-20 : 60 is claimed and in use, and 20 is other unaccounted for data. self.assertTrue(self.radb.updateResourceAvailability(cep4_id, available_capacity=100-60-20)) self.assertEqual(100, self.radb.getResources(cep4_id, include_availability=True)[0]['total_capacity']) self.assertEqual( 20, self.radb.getResources(cep4_id, include_availability=True)[0]['available_capacity']) self.assertEqual( 80, self.radb.getResources(cep4_id, include_availability=True)[0]['used_capacity']) - # and the used_capacity of 70 should be build up of the parts: resource_usage=50 and misc_used_capacity=20 + # and the used_capacity of 60 should be build up of the parts: resource_usage=60 and misc_used_capacity=20 self.assertEqual( 60, self.radb.get_resource_usage_at_or_before(cep4_id, start+timedelta(hours=0.5), 'claimed')['usage']) self.assertEqual( 20, self.radb.getResources(cep4_id, include_availability=True)[0]['misc_used_capacity']) - # and the resource_usage should still be 50 (cause no claims were added/changed) + # and the resource_usage should still be 60 (because no claims were added/changed) self.assertEqual(60, self.radb.get_max_resource_usage_between(cep4_id, task['starttime'], task['endtime'], 'claimed')['usage']) self.assertEqual(60, self.radb.get_current_resource_usage(cep4_id, 'claimed')['usage']) # but, there should be less claimable capacity left: 100 -60 (claim) -20 (misc_data) = 20 claimable_capacity left @@ -2477,8 +2476,159 @@ class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest): self.assertEqual('approved', self.radb.getTask(task1_id)['status']) self.assertEqual('approved', self.radb.getTask(task2_id)['status']) + def test_obsolete_claims_are_removed(self): + '''Test if obsolete claims from finished tasks are removed automatically''' + # start with clean database + for spec in self.radb.getSpecifications(): + self.radb.deleteSpecification(spec['id']) # cascades into tasks and claims + + base_time = datetime.utcnow() + # round to current full hour (for readability in logging) + base_time = base_time - timedelta(minutes=base_time.minute, seconds=base_time.second, + microseconds=base_time.microsecond) + + # insert a first task and full claim on a resource... + spec_task = self.radb.insertSpecificationAndTask(0, 0, 'approved', 'observation', + base_time + timedelta(minutes=-20), + base_time + timedelta(minutes=-10), 'foo', 'CEP4') + self.assertTrue(spec_task['inserted']) + task_id = spec_task['task_id'] + task = self.radb.getTask(task_id) + self.assertEqual('approved', task['status']) + + claim1_id = self.radb.insertResourceClaim(0, task_id, + task['starttime'], task['endtime'], + 1, 'foo', 1) + + # insert second (long-lasting) claim + claim2_id = self.radb.insertResourceClaim(1, task_id, + task['starttime'], task['endtime'] + timedelta(days=100), + 1, 'foo', 1) + + # task should have the two inserted claims + self.assertEqual(set([claim1_id, claim2_id]), set(c['id'] for c in self.radb.getResourceClaims(task_ids=task_id))) + + # claim them, and check it. Should succeed. + self.radb.updateTaskAndResourceClaims(task_id, task_status='scheduled', claim_status='claimed') + self.assertEqual('claimed', self.radb.getResourceClaim(claim1_id)['status']) + self.assertEqual('claimed', self.radb.getResourceClaim(claim2_id)['status']) + self.assertEqual('scheduled', self.radb.getTask(task_id)['status']) + # task should still have the two inserted claims + self.assertEqual(set([claim1_id, claim2_id]), set(c['id'] for c in self.radb.getResourceClaims(task_ids=task_id))) + + # now, let's do the actual test... + # finish the task, and check if claims are removed + self.radb.updateTask(task_id, task_status='finished') + self.assertEqual('finished', self.radb.getTask(task_id)['status']) + # only the long lasting claim2 should remain + self.assertEqual(set([claim2_id]), set(c['id'] for c in self.radb.getResourceClaims(task_ids=task_id))) + + # end the long-lasting claim + self.radb.updateResourceClaim(claim2_id, endtime=task['endtime'] + timedelta(minutes=5)) + # task should still be finished... + self.radb.updateTask(task_id, task_status='finished') + # ...and now claims should remain + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id))) + + def test_20181108_bugfix_resource_usages(self): + # start with clean database + for spec in self.radb.getSpecifications(): + self.radb.deleteSpecification(spec['id']) # cascades into tasks and claims + + now = datetime.utcnow() + now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour + now += timedelta(hours=3) + + NUM_CLAIMS = 2 + NUM_CLAIMS_PER_RESOURCE = 2 + RESOURCE_ID = 0 + resource_max_cap = long(self.radb.get_resource_claimable_capacity(RESOURCE_ID, now, now)) + + task1_id = self.radb.insertSpecificationAndTask(1, 1, 'approved', 'observation', + now+timedelta(hours=1), + now + timedelta(hours=2), + 'content', 'CEP4')['task_id'] + task1 = self.radb.getTask(task1_id) + + claims1 = [{'resource_id': RESOURCE_ID, + 'starttime': task1['starttime'], + 'endtime': task1['endtime'], + 'status': 'tentative', + 'claim_size': resource_max_cap/NUM_CLAIMS_PER_RESOURCE} + for _ in range(NUM_CLAIMS)] + + self.radb.insertResourceClaims(task1_id, claims1, 'foo', 1, 1) + + # there should be NUM_CLAIMS tentative claims, + # and usage should be one 'block' from start->endtime + self.assertEqual(NUM_CLAIMS, len(self.radb.getResourceClaims(task_ids=task1_id, status='tentative'))) + self.assertEqual([{'as_of_timestamp': task1['starttime'], 'usage': resource_max_cap }, + {'as_of_timestamp': task1['endtime'], 'usage': 0L}], + self.radb.getResourceUsages(task1['starttime'], task1['endtime'], RESOURCE_ID)[RESOURCE_ID]['tentative']) + + # update the claims to 'claimed' status + self.radb.updateResourceClaims(where_task_ids=task1_id, status='claimed') + + # now, there should be zero tentative claims, but NUM_CLAIMS 'claimed' claims + # and usage should be one 'block' from start->endtime for claimed status + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task1_id, status='tentative'))) + self.assertEqual(NUM_CLAIMS, len(self.radb.getResourceClaims(task_ids=task1_id, status='claimed'))) + # self.assertEqual([], + # self.radb.getResourceUsages(task1['starttime'], task1['endtime'], RESOURCE_ID)[RESOURCE_ID]['tentative']) + self.assertEqual([{'as_of_timestamp': task1['starttime'], 'usage': resource_max_cap }, + {'as_of_timestamp': task1['endtime'], 'usage': 0L}], + self.radb.getResourceUsages(task1['starttime'], task1['endtime'], RESOURCE_ID)[RESOURCE_ID]['claimed']) + + # finish the task... + self.radb.updateTask(task_id=task1_id, task_status='finished') + + # ... as a result there should be no more claims, and usages should be clean + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task1_id))) + self.assertEqual([], + self.radb.getResourceUsages(task1['starttime'], task1['endtime'], RESOURCE_ID)[RESOURCE_ID]['tentative']) + self.assertEqual([], + self.radb.getResourceUsages(task1['starttime'], task1['endtime'], RESOURCE_ID)[RESOURCE_ID]['claimed']) + + # insert second task after the first one (not overlapping) + task2_id = self.radb.insertSpecificationAndTask(2, 2, 'approved', 'observation', + now + timedelta(hours=3), + now + timedelta(hours=4), + 'content', 'CEP4')['task_id'] + task2 = self.radb.getTask(task2_id) + + # and insert claims for the second task + claims2 = [{'resource_id': RESOURCE_ID, + 'starttime': task2['starttime'], + 'endtime': task2['endtime'], + 'status': 'tentative', + 'claim_size': resource_max_cap / NUM_CLAIMS_PER_RESOURCE} + for _ in range(NUM_CLAIMS)] + + self.radb.insertResourceClaims(task2_id, claims2, 'foo', 1, 1) + + # there should be NUM_CLAIMS tentative claims, + # and usage should be one 'block' from start->endtime + self.assertEqual(NUM_CLAIMS, len(self.radb.getResourceClaims(task_ids=task2_id, status='tentative'))) + self.assertEqual([{'as_of_timestamp': task2['starttime'], 'usage': resource_max_cap }, + {'as_of_timestamp': task2['endtime'], 'usage': 0L}], + self.radb.getResourceUsages(task2['starttime'], task2['endtime'], RESOURCE_ID)[RESOURCE_ID]['tentative']) + + # update the claims to 'claimed' status + self.radb.updateResourceClaims(where_task_ids=task2_id, status='claimed') + + # now, there should be zero tentative claims, but NUM_CLAIMS 'claimed' claims + # and usage should be one 'block' from start->endtime for claimed status + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task2_id, status='tentative'))) + self.assertEqual(NUM_CLAIMS, len(self.radb.getResourceClaims(task_ids=task2_id, status='claimed'))) + # self.assertEqual([], + # self.radb.getResourceUsages(task2['starttime'], task2['endtime'], RESOURCE_ID)[RESOURCE_ID]['tentative']) + self.assertEqual([{'as_of_timestamp': task2['starttime'], 'usage': resource_max_cap }, + {'as_of_timestamp': task2['endtime'], 'usage': 0L}], + self.radb.getResourceUsages(task2['starttime'], task2['endtime'], RESOURCE_ID)[RESOURCE_ID]['claimed']) + if __name__ == "__main__": os.environ['TZ'] = 'UTC' logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) unittest.main() + diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.run b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.run new file mode 100755 index 0000000000000000000000000000000000000000..ef3148622ed62d851e5164ad5b9a13405c3f9492 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.run @@ -0,0 +1,5 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "ResourceAssignmentDatabase/*" t_radb_functionality.py diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.sh b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.sh new file mode 100755 index 0000000000000000000000000000000000000000..86b69a5b65c1f3c361a3d4b96a4553fdf05d73e6 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_functionality.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_radb_functionality diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.py b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.py new file mode 100755 index 0000000000000000000000000000000000000000..903a930572ee6700c694397b0ca663985a4a59c3 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.py @@ -0,0 +1,198 @@ +#!/usr/bin/python + +# Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) +# P.O. Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This file is part of the LOFAR software suite. +# The LOFAR software suite is free software: you can redistribute it and/or +# modify it under the terms of the GNU General Public License as published +# by the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# The LOFAR software suite is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. + +# $Id: $ +import unittest +import psycopg2 +import os +from datetime import datetime, timedelta +from dateutil import parser +from pprint import pformat +from random import randint +from lofar.common.datetimeutils import totalSeconds +import logging +logger = logging.getLogger(__name__) + + +import radb_common_testing +from lofar.sas.resourceassignment.database.radb import RADatabase, _FETCH_ONE + +def setUpModule(): + return radb_common_testing.setUpModule() + +def tearDownModule(): + return radb_common_testing.tearDownModule() + +class ResourceAssignmentDatabaseTest(radb_common_testing.RADBCommonTest): + def test_resource_usages_performance(self): + ELAPSED_TRESHOLD = 2.0 #max allowed insert/update/delete time in seconds + + num_resources = self.radb._executeQuery('select count(id) from virtual_instrument.resource;', fetch=_FETCH_ONE)['count'] + # make sure all resources have 1000 units available + MAX_CAPACITY=1000 + self.radb._executeQuery('update resource_monitoring.resource_capacity set (available, total) = (%s, %s);', (MAX_CAPACITY,MAX_CAPACITY)) + + # pretend that we have an almost unlimited amount of storage space + self.radb._executeQuery('update resource_monitoring.resource_capacity set (available, total) = (%s, %s) ' \ + 'where resource_id in (select id from virtual_instrument.resource_view where type_name = \'storage\');', + (1e9*MAX_CAPACITY,1e9*MAX_CAPACITY)) + + # keep a list of storage-type resource(ids), so we can create long lasting claims for these. + storage_resource_ids = set(r['id'] for r in self.radb.getResources(resource_types='storage')) + + now = datetime.utcnow() + now -= timedelta(minutes=now.minute, seconds=now.second, microseconds=now.microsecond) # round to full hour + spec_ids = [] + filename = 'resource_usages_performance%s.csv' % (datetime.utcnow().strftime('%Y%m%dT%H%M%S'),) + with open(filename, 'w') as file: + file.write('#tasks, #claims, #claims_per_resource, #inserted_claims, elapsed_insert\n') + counter = 0 + # it is not common to claim a single resource multiple times for the same task, but it can happen, so test for it. + for preferred_num_claims_per_resource in [1, 2, 5, 10, 20, 50]: + # let's test over a feasible range of #claims. A lofar observation usually has ~200 claims. + for num_claims_to_insert in [1, 2, 5, 10, 20, 50, 100, 200, 500]: + num_claims_to_insert = min(num_claims_to_insert, preferred_num_claims_per_resource*num_resources) + num_claims_per_resource = min(preferred_num_claims_per_resource, num_claims_to_insert) + + for oversubscription_factor in [1, 999]: + counter += 1 + + logger.info('*****************************************************************') + logger.info('starting task and claim scheduling: counter=%s num_claims_per_resource=%s num_claims_to_insert=%s oversubscription_factor=%s', + counter, num_claims_per_resource, num_claims_to_insert, oversubscription_factor) + + result = self.radb.insertSpecificationAndTask(counter, counter, 'approved', 'observation', + now+timedelta(hours=3*counter), + now + timedelta(hours=3*counter + 1), + 'content', 'CEP4') + task_id = result['task_id'] + task = self.radb.getTask(task_id) + spec_ids.append(task['specification_id']) + + claims = [{'resource_id': q/num_claims_per_resource, + 'starttime': task['starttime'], + 'endtime': task['endtime'], + 'status': 'tentative', + 'claim_size': oversubscription_factor*MAX_CAPACITY/num_claims_per_resource} + for q in range(num_claims_to_insert)] + + # extend claims on storage resources + for claim in claims: + if claim['resource_id'] in storage_resource_ids: + claim['endtime'] += timedelta(days=100) + + start = datetime.utcnow() + self.radb.insertResourceClaims(task_id, claims, 'foo', 1, 1) + elapsed_insert = totalSeconds(datetime.utcnow() - start) + + num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count'] + num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count'] + has_storage_claims = len(self.radb.getResourceClaims(task_ids=task_id, resource_type='storage')) > 0 + + # enforce perfomance criterion: inserting claims should take less than ELAPSED_TRESHOLD sec + self.assertLess(elapsed_insert, ELAPSED_TRESHOLD, msg="insertResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" %( + elapsed_insert, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource)) + + if oversubscription_factor > 1: + # (deliberate) oversubscription of resources + # so, expect the claims and task to be in conflict + self.assertGreater(len(self.radb.getResourceClaims(task_ids=task_id, status='conflict')), 0) + self.assertEqual('conflict', self.radb.getTask(task_id)['status']) + + # solve oversubscription + start = datetime.utcnow() + self.radb.updateResourceClaims(where_task_ids=task_id, claim_size=MAX_CAPACITY/num_claims_per_resource) + elapsed_status_update = totalSeconds(datetime.utcnow() - start) + + # enforce perfomance criterion: updating claims should take less than ELAPSED_TRESHOLD sec + self.assertLess(elapsed_status_update, ELAPSED_TRESHOLD, + msg="updateResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % ( + elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource)) + + # check if not oversubscribed anymore + self.assertEqual(0, len(self.radb.getResourceClaims(task_ids=task_id, status='conflict'))) + self.assertEqual('approved', self.radb.getTask(task_id)['status']) + + # no oversubscription (anymore), so expect all claims to be claimable... + start = datetime.utcnow() + self.radb.updateTaskAndResourceClaims(task_id=task_id, claim_status='claimed') + elapsed_status_update = totalSeconds(datetime.utcnow() - start) + + # are they indeed claimed? + self.assertEqual(num_claims_to_insert, len(self.radb.getResourceClaims(task_ids=task_id, status='claimed'))) + + # enforce perfomance criterion: updating claims should take less than 2*ELAPSED_TRESHOLD sec (2* because we update both tasks and claims) + self.assertLess(elapsed_status_update, 2*ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s" % ( + elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource)) + + # ... and proceed with cycling through the task status + for task_status in ['scheduled', 'queued', 'active', 'completing', 'finished']: + # update the task status + start = datetime.utcnow() + self.radb.updateTaskAndResourceClaims(task_id=task_id, task_status=task_status) + elapsed_status_update = totalSeconds(datetime.utcnow() - start) + + # enforce perfomance criterion: updating task status should take less than 2*ELAPSED_TRESHOLD sec (2* because we update both tasks and claims) + self.assertLess(elapsed_status_update, 2*ELAPSED_TRESHOLD, msg="updateTaskAndResourceClaims took longer than allowed. (%ssec > %ssec) num_tasks=%s num_claims=%s num_claims_to_insert=%s num_claims_per_resource=%s task_status=%s" % ( + elapsed_status_update, ELAPSED_TRESHOLD, num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, task_status)) + + # check task status + self.assertEqual(task_status, self.radb.getTask(task_id)['status']) + + # task should now be finished + self.assertEqual('finished', self.radb.getTask(task_id)['status']) + # and all non-long-lasting (storage) claims should be removed. + self.assertEqual(0, len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] <= task['endtime']))) + + if has_storage_claims: + # and all long-lasting (storage) claims should still be there. + # (they are removed by the cleanupservice ending/removing the storage claims) + self.assertGreater(len(list(c for c in self.radb.getResourceClaims(task_ids=task_id) if c['endtime'] > task['endtime'])), 0) + + logger.info('TEST RESULT: radb now contains %d tasks, %d claims, insert of %d claims with %d claims per resource takes on average %.3fsec', + num_tasks, num_claims, num_claims_to_insert, num_claims_per_resource, elapsed_insert) + file.write('%d, %d, %d, %d, %.3f\n' % (num_tasks, num_claims, num_claims_per_resource, num_claims_to_insert, elapsed_insert)) + file.flush() + + logger.info('removing all test specs/tasks/claims from db') + delete_elapsed_list = [] + + file.write('\n\n#tasks, #claims, elapsed_delete\n') + + for spec_id in spec_ids: + num_tasks = self.radb._executeQuery('select count(id) from resource_allocation.task;', fetch=_FETCH_ONE)['count'] + num_claims = self.radb._executeQuery('select count(id) from resource_allocation.resource_claim;', fetch=_FETCH_ONE)['count'] + start = datetime.utcnow() + self.radb.deleteSpecification(spec_id) + elapsed = totalSeconds(datetime.utcnow() - start) + delete_elapsed_list.append(elapsed) + + # enforce perfomance criterion: (cascading) delete of spec should take less than ELAPSED_TRESHOLD sec + self.assertLess(elapsed, ELAPSED_TRESHOLD) + + file.write('%d, %d, %.3f\n' % (num_tasks, num_claims, elapsed)) + file.flush() + + logger.info('average spec delete time: %.3f', sum(delete_elapsed_list)/float(len(delete_elapsed_list))) + logger.info('Done. Results can be found in file: %s', filename) + +if __name__ == "__main__": + os.environ['TZ'] = 'UTC' + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) + unittest.main() diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.run b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.run new file mode 100755 index 0000000000000000000000000000000000000000..ca855f9d9e93e0c79e069b1c00053666b66a80ce --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.run @@ -0,0 +1,5 @@ +#!/bin/bash + +# Run the unit test +source python-coverage.sh +python_coverage_test "ResourceAssignmentDatabase/*" t_radb_performance.py diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.sh b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.sh new file mode 100755 index 0000000000000000000000000000000000000000..a6bb3bf30765da05105acd8340ea52c6f1dbf0e5 --- /dev/null +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/tests/t_radb_performance.sh @@ -0,0 +1,3 @@ +#!/bin/sh + +./runctest.sh t_radb_performance