#!/usr/bin/python # Copyright (C) 2012-2015 ASTRON (Netherlands Institute for Radio Astronomy) # P.O. Box 2, 7990 AA Dwingeloo, The Netherlands # # This file is part of the LOFAR software suite. # The LOFAR software suite is free software: you can redistribute it and/or # modify it under the terms of the GNU General Public License as published # by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # The LOFAR software suite is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>. # $Id$ ''' TODO: documentation ''' import logging import psycopg2 import psycopg2.extras from datetime import datetime, timedelta import time import collections from optparse import OptionParser from lofar.common import dbcredentials from lofar.common.util import to_csv_string from lofar.common.datetimeutils import totalSeconds logger = logging.getLogger(__name__) _FETCH_NONE=0 _FETCH_ONE=1 _FETCH_ALL=2 class RADatabase: def __init__(self, dbcreds=None, log_queries=False): self.dbcreds = dbcreds self.conn = None self.cursor = None self.log_queries = log_queries self._taskStatusName2IdCache = {} self._taskTypeName2IdCache = {} self._claimStatusName2IdCache = {} self._claimStatusId2NameCache = {} def _connect(self): self.conn = None self.cursor = None self.conn = psycopg2.connect(host=self.dbcreds.host, user=self.dbcreds.user, password=self.dbcreds.password, database=self.dbcreds.database, port=self.dbcreds.port, connect_timeout=5) self.cursor = self.conn.cursor(cursor_factory = psycopg2.extras.RealDictCursor) def _queryAsSingleLine(self, query, qargs=None): line = ' '.join(query.replace('\n', ' ').split()) if qargs: line = line % tuple(qargs) return line def _executeQuery(self, query, qargs=None, fetch=_FETCH_NONE): ''' Execute the query and reconnect upon OperationalError ''' # Allow for 5 connection retries for i in range(5): try: start = datetime.utcnow() self.cursor.execute(query, qargs) if self.log_queries: elapsed = datetime.utcnow() - start elapsed_ms = 1000.0 * totalSeconds(elapsed) logger.info('executed query in %.1fms%s yielding %s rows: %s', elapsed_ms, ' (SLOW!)' if elapsed_ms > 250 else '', # for easy log grep'ing self.cursor.rowcount, self._queryAsSingleLine(query, qargs)) break except (psycopg2.OperationalError, AttributeError) as e: if isinstance(e, psycopg2.OperationalError): logger.error(str(e)) logger.info("(re)trying to connect to radb") self._connect() if self.conn: logger.info("connected to radb") time.sleep(i*i) except (psycopg2.IntegrityError, psycopg2.ProgrammingError, psycopg2.InternalError, psycopg2.DataError)as e: self._log_database_notifications() logger.error("Rolling back query=\'%s\' due to error: \'%s\'" % (self._queryAsSingleLine(query, qargs), e)) self.rollback() return [] self._log_database_notifications() if fetch == _FETCH_ONE: return self.cursor.fetchone() if fetch == _FETCH_ALL: return self.cursor.fetchall() def _log_database_notifications(self): if self.conn.notices: for notice in self.conn.notices: logger.info('database log message %s', notice.strip()) del self.conn.notices[:] def commit(self): logger.info('commit') self.conn.commit() def rollback(self): logger.info('rollback') self.conn.rollback() def getTaskStatuses(self): query = '''SELECT * from resource_allocation.task_status;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getTaskStatusNames(self): return [x['name'] for x in self.getTaskStatuses()] def getTaskStatusId(self, status_name, from_cache=True): if from_cache and status_name in self._taskStatusName2IdCache: return self._taskStatusName2IdCache[status_name] query = '''SELECT id from resource_allocation.task_status WHERE name = %s;''' result = self._executeQuery(query, [status_name], fetch=_FETCH_ONE) if result: self._taskStatusName2IdCache[status_name] = result['id'] return result['id'] raise KeyError('No such status: %s Valid values are: %s' % (status_name, ', '.join(self.getTaskStatusNames()))) def getTaskTypes(self): query = '''SELECT * from resource_allocation.task_type;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getTaskTypeNames(self): return [x['name'] for x in self.getTaskTypes()] def getTaskTypeId(self, type_name, from_cache=True): if from_cache and type_name in self._taskTypeName2IdCache: return self._taskTypeName2IdCache[type_name] query = '''SELECT id from resource_allocation.task_type WHERE name = %s;''' result = self._executeQuery(query, [type_name], fetch=_FETCH_ONE) if result: self._taskTypeName2IdCache[type_name] = result['id'] return result['id'] raise KeyError('No such type: %s Valid values are: %s' % (type_name, ', '.join(self.getTaskTypeNames()))) def getResourceClaimStatuses(self): query = '''SELECT * from resource_allocation.resource_claim_status;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getResourceClaimStatusNames(self): return [x['name'] for x in self.getResourceClaimStatuses()] def getResourceClaimStatusId(self, status_name, from_cache=True): if from_cache and status_name in self._claimStatusName2IdCache: return self._claimStatusName2IdCache[status_name] query = '''SELECT id from resource_allocation.resource_claim_status WHERE name = %s;''' result = self._executeQuery(query, [status_name], fetch=_FETCH_ONE) if result: self._claimStatusName2IdCache[status_name] = result['id'] self._claimStatusId2NameCache[result['id']] = status_name return result['id'] raise KeyError('No such status: %s. Valid values are: %s' % (status_name, ', '.join(self.getResourceClaimStatusNames()))) def getResourceClaimStatusName(self, status_id, from_cache=True): if from_cache and status_id in self._claimStatusId2NameCache: return self._claimStatusId2NameCache[status_id] query = '''SELECT name from resource_allocation.resource_claim_status WHERE id = %s;''' result = self._executeQuery(query, [status_id], fetch=_FETCH_ONE) if result: self._claimStatusId2NameCache[status_id] = result['name'] self._claimStatusName2IdCache[result['name']] = status_id return result['name'] raise KeyError('No such status_id: %s. Valid values are: %s' % (status_id, ', '.join([x['id'] for x in self.getResourceClaimStatuses()]))) def getTasksTimeWindow(self, task_ids=None, mom_ids=None, otdb_ids=None): if len([x for x in [task_ids, mom_ids, otdb_ids] if x != None]) > 1: raise KeyError("Provide either task_ids or mom_ids or otdb_ids, not multiple kinds.") query = '''SELECT min(starttime) as min_starttime, max(endtime) as max_endtime from resource_allocation.task_view''' conditions = [] qargs = [] if task_ids is not None: if isinstance(task_ids, int): # just a single id conditions.append('id = %s') qargs.append(task_ids) elif len(task_ids) > 0: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(task_ids)) elif len(task_ids) == 0: #assume a list/enumerable of id's, length 0 return [] if mom_ids is not None: if isinstance(mom_ids, int): # just a single id conditions.append('mom_id = %s') qargs.append(mom_ids) elif len(mom_ids) > 0: #assume a list/enumerable of id's conditions.append('mom_id in %s') qargs.append(tuple(mom_ids)) elif len(mom_ids) == 0: #assume a list/enumerable of id's, length 0 return [] if otdb_ids is not None: if isinstance(otdb_ids, int): # just a single id conditions.append('otdb_id = %s') qargs.append(otdb_ids) elif len(otdb_ids) > 0: #assume a list/enumerable of id's conditions.append('otdb_id in %s') qargs.append(tuple(otdb_ids)) elif len(otdb_ids) == 0: #assume a list/enumerable of id's, length 0 return [] if conditions: query += ' WHERE ' + ' AND '.join(conditions) result = self._executeQuery(query, qargs, fetch=_FETCH_ALL) if result and len(result) == 1: result = dict(result[0]) else: result = {'max_endtime': datetime.utcnow(), 'min_starttime': datetime.utcnow()} return result def getTasks(self, lower_bound=None, upper_bound=None, task_ids=None, task_status=None, task_type=None, mom_ids=None, otdb_ids=None, cluster=None): if len([x for x in [task_ids, mom_ids, otdb_ids] if x != None]) > 1: raise KeyError("Provide either task_ids or mom_ids or otdb_ids, not multiple kinds.") query = '''SELECT * from resource_allocation.task_view''' conditions = [] qargs = [] if lower_bound is not None: conditions.append('endtime >= %s') qargs.append(lower_bound) if upper_bound is not None: conditions.append('starttime <= %s') qargs.append(upper_bound) if task_ids is not None: if isinstance(task_ids, int): # just a single id conditions.append('id = %s') qargs.append(task_ids) elif len(task_ids) > 0: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(task_ids)) elif len(task_ids) == 0: #assume a list/enumerable of id's, length 0 return [] if mom_ids is not None: if isinstance(mom_ids, int): # just a single id conditions.append('mom_id = %s') qargs.append(mom_ids) elif len(mom_ids) > 0: #assume a list/enumerable of id's conditions.append('mom_id in %s') qargs.append(tuple(mom_ids)) elif len(mom_ids) == 0: #assume a list/enumerable of id's, length 0 return [] if otdb_ids is not None: if isinstance(otdb_ids, int): # just a single id conditions.append('otdb_id = %s') qargs.append(otdb_ids) elif len(otdb_ids) > 0: #assume a list/enumerable of id's conditions.append('otdb_id in %s') qargs.append(tuple(otdb_ids)) elif len(otdb_ids) == 0: #assume a list/enumerable of id's, length 0 return [] task_status, task_type = self._convertTaskTypeAndStatusToIds(task_status, task_type) if task_status is not None: if isinstance(task_status, int): # just a single id conditions.append('status_id = %s') qargs.append(task_status) elif len(task_status) > 0: #assume a list/enumerable of id's conditions.append('status_id in %s') qargs.append(tuple(task_status)) elif len(task_status) == 0: #assume a list/enumerable of id's, length 0 return [] if task_type is not None: if isinstance(task_type, int): # just a single id conditions.append('type_id = %s') qargs.append(task_type) elif len(task_type) > 0: #assume a list/enumerable of id's conditions.append('type_id in %s') qargs.append(tuple(task_type)) elif len(task_type) == 0: #assume a list/enumerable of id's, length 0 return [] if cluster is not None: conditions.append('cluster = %s') qargs.append(cluster) if conditions: query += ' WHERE ' + ' AND '.join(conditions) tasks = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) for task in tasks: if task['predecessor_ids'] is None: task['predecessor_ids'] = [] if task['successor_ids'] is None: task['successor_ids'] = [] return tasks def getTask(self, id=None, mom_id=None, otdb_id=None, specification_id=None): '''get a task for either the given (task)id, or for the given mom_id, or for the given otdb_id, or for the given specification_id''' ids = [id, mom_id, otdb_id, specification_id] validIds = [x for x in ids if x != None] if len(validIds) != 1: raise KeyError("Provide one and only one id: id=%s, mom_id=%s, otdb_id=%s, specification_id=%s" % (id, mom_id, otdb_id, specification_id)) query = '''SELECT * from resource_allocation.task_view tv ''' if id is not None: query += '''where tv.id = (%s);''' elif mom_id is not None: query += '''where tv.mom_id = (%s);''' elif otdb_id is not None: query += '''where tv.otdb_id = (%s);''' elif specification_id is not None: query += '''where tv.specification_id = (%s);''' result = self._executeQuery(query, validIds, fetch=_FETCH_ONE) task = dict(result) if result else None if task: if task['predecessor_ids'] is None: task['predecessor_ids'] = [] if task['successor_ids'] is None: task['successor_ids'] = [] return task def _convertTaskStatusToId(self, task_status): '''converts task_status to id in case it is a string or list of strings''' if task_status is not None: if isinstance(task_status, basestring): return self.getTaskStatusId(task_status, True) else: #assume iterable return [self._convertTaskStatusToId(x) for x in task_status] return task_status def _convertTaskTypeToId(self, task_type): '''converts task_status to id in case it is a string or list of strings''' if task_type is not None: if isinstance(task_type, basestring): return self.getTaskTypeId(task_type, True) else: #assume iterable return [self._convertTaskTypeToId(x) for x in task_type] return task_type def _convertTaskTypeAndStatusToIds(self, task_status, task_type): '''converts task_status and task_type to id's in case one and/or the other are strings''' return self._convertTaskStatusToId(task_status), self._convertTaskTypeToId(task_type) def insertTask(self, mom_id, otdb_id, task_status, task_type, specification_id, commit=True): if isinstance(mom_id, int) and mom_id < 0: mom_id = None if isinstance(otdb_id, int) and otdb_id < 0: otdb_id = None logger.info('insertTask mom_id=%s, otdb_id=%s, task_status=%s, task_type=%s, specification_id=%s' % (mom_id, otdb_id, task_status, task_type, specification_id)) task_status, task_type = self._convertTaskTypeAndStatusToIds(task_status, task_type) query = '''INSERT INTO resource_allocation.task (mom_id, otdb_id, status_id, type_id, specification_id) VALUES (%s, %s, %s, %s, %s) RETURNING id;''' id = self._executeQuery(query, (mom_id, otdb_id, task_status, task_type, specification_id), fetch=_FETCH_ONE)['id'] if commit: self.commit() return id def deleteTask(self, task_id, commit=True): logger.info('deleteTask task_id=%s' % task_id) query = '''DELETE FROM resource_allocation.task WHERE resource_allocation.task.id = %s;''' self._executeQuery(query, [task_id]) if commit: self.commit() return self.cursor.rowcount > 0 def updateTaskStatusForOtdbId(self, otdb_id, task_status, commit=True): '''converts task_status and task_type to id's in case one and/or the other are strings''' if task_status is not None and isinstance(task_status, basestring): #convert task_status string to task_status.id task_status = self.getTaskStatusId(task_status, True) query = '''UPDATE resource_allocation.task SET (status_id) = (%s) WHERE resource_allocation.task.otdb_id = %s;''' self._executeQuery(query, [task_status, otdb_id]) if commit: self.commit() return self.cursor.rowcount > 0 def updateTask(self, task_id, mom_id=None, otdb_id=None, task_status=None, task_type=None, specification_id=None, commit=True): '''Update the given paramenters for the task with given task_id. Inside the database consistency checks are made. When one or more claims of a task are in conflict status, then its task is set to conflict as well, and hence cannot be scheduled. When all claims of a task are not in conflict status anymore, then the task is set to approved, and hence it is possible the schedule the task. When a task is unscheduled (set to approved) then the claimed claims are set to tentative. ''' task_status, task_type = self._convertTaskTypeAndStatusToIds(task_status, task_type) fields = [] values = [] if mom_id is not None : fields.append('mom_id') values.append(mom_id) if otdb_id is not None : fields.append('otdb_id') values.append(otdb_id) if task_status is not None : fields.append('status_id') values.append(task_status) if task_type is not None : fields.append('type_id') values.append(task_type) if specification_id is not None : fields.append('specification_id') values.append(specification_id) values.append(task_id) query = '''UPDATE resource_allocation.task SET ({fields}) = ({value_placeholders}) WHERE resource_allocation.task.id = {task_id_placeholder};'''.format(fields=', '.join(fields), value_placeholders=', '.join('%s' for x in fields), task_id_placeholder='%s') self._executeQuery(query, values) if commit: self.commit() return self.cursor.rowcount > 0 def updateTaskStartEndTimes(self, task_id, starttime=None, endtime=None, commit=True): fields = [] values = [] if starttime: fields.append('starttime') values.append(starttime) if endtime: fields.append('endtime') values.append(endtime) if not fields: return False values.append(task_id) query = '''UPDATE resource_allocation.specification SET ({fields}) = ({value_placeholders}) WHERE resource_allocation.specification.id in (SELECT t.specification_id FROM resource_allocation.task t WHERE t.id={id_placeholder});'''.format(fields=', '.join(fields), value_placeholders=', '.join('%s' for x in fields), id_placeholder='%s') self._executeQuery(query, values) if commit: self.commit() return self.cursor.rowcount > 0 def getTaskPredecessorIds(self, id=None): query = '''SELECT * from resource_allocation.task_predecessor tp''' if id is not None : query += ' WHERE id=%s' items = list(self._executeQuery(query, [id] if id is not None else None, fetch=_FETCH_ALL)) predIdDict = {} for item in items: taskId = item['task_id'] if taskId not in predIdDict: predIdDict[taskId] = [] predIdDict[taskId].append(item['predecessor_id']) return predIdDict def getTaskSuccessorIds(self, id=None): query = '''SELECT * from resource_allocation.task_predecessor tp''' if id is not None: query += ' WHERE id=%s' items = list(self._executeQuery(query, [id] if id is not None else None, fetch=_FETCH_ALL)) succIdDict = {} for item in items: predId = item['predecessor_id'] if predId not in succIdDict: succIdDict[predId] = [] succIdDict[predId].append(item['task_id']) return succIdDict def getTaskPredecessorIdsForTask(self, task_id): query = '''SELECT * from resource_allocation.task_predecessor tp WHERE tp.task_id = %s;''' items = list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL)) return [x['predecessor_id'] for x in items] def getTaskSuccessorIdsForTask(self, task_id): query = '''SELECT * from resource_allocation.task_predecessor tp WHERE tp.predecessor_id = %s;''' items = list(self._executeQuery(query, [task_id], fetch=_FETCH_ALL)) return [x['task_id'] for x in items] def insertTaskPredecessor(self, task_id, predecessor_id, commit=True): query = '''INSERT INTO resource_allocation.task_predecessor (task_id, predecessor_id) VALUES (%s, %s) RETURNING id;''' result = self._executeQuery(query, (task_id, predecessor_id), fetch=_FETCH_ONE) if commit: self.commit() if result and 'id' in result: return result['id'] return None def insertTaskPredecessors(self, task_id, predecessor_ids, commit=True): ids = [self.insertTaskPredecessor(task_id, predecessor_id, False) for predecessor_id in predecessor_ids] ids = [x for x in ids if x is not None] if commit: self.commit() return ids def getSpecifications(self, specification_ids = None): query = '''SELECT * from resource_allocation.specification''' conditions = [] qargs = [] if specification_ids is not None: if isinstance(specification_ids, int): # just a single id conditions.append('id = %s') qargs.append(specification_ids) else: #assume a list/enumerable of id's if len(specification_ids): conditions.append('id in %s') qargs.append(tuple(specification_ids)) if conditions: query += ' WHERE ' + ' AND '.join(conditions) return list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) def getSpecification(self, specification_id): query = '''SELECT * from resource_allocation.specification spec WHERE spec.id = (%s);''' return self._executeQuery(query, [specification_id], fetch=_FETCH_ONE) def insertSpecification(self, starttime, endtime, content, cluster, commit=True): logger.info('insertSpecification starttime=%s, endtime=%s cluster=%s' % (starttime, endtime, cluster)) query = '''INSERT INTO resource_allocation.specification (starttime, endtime, content, cluster) VALUES (%s, %s, %s, %s) RETURNING id;''' id = self._executeQuery(query, (starttime, endtime, content, cluster), fetch=_FETCH_ONE)['id'] if commit: self.commit() return id def deleteSpecification(self, specification_id, commit=True): logger.info('deleteSpecification specification_id=%s' % (specification_id)) query = '''DELETE FROM resource_allocation.specification WHERE resource_allocation.specification.id = %s;''' self._executeQuery(query, [specification_id]) if commit: self.commit() return self.cursor.rowcount > 0 def updateSpecification(self, specification_id, starttime=None, endtime=None, content=None, cluster=None, commit=True): fields = [] values = [] if starttime: fields.append('starttime') values.append(starttime) if endtime: fields.append('endtime') values.append(endtime) if content is not None : fields.append('content') values.append(content) if cluster is not None : fields.append('cluster') values.append(cluster) values.append(specification_id) query = '''UPDATE resource_allocation.specification SET ({fields}) = ({value_placeholders}) WHERE resource_allocation.specification.id = {id_placeholder};'''.format(fields=', '.join(fields), value_placeholders=', '.join('%s' for x in fields), id_placeholder='%s') self._executeQuery(query, values) if commit: self.commit() return self.cursor.rowcount > 0 def getResourceTypes(self): query = '''SELECT rt.*, rtu.units as unit from virtual_instrument.resource_type rt inner join virtual_instrument.unit rtu on rtu.id = rt.unit_id; ''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getResourceTypeNames(self): return [x['name'] for x in self.getResourceTypes()] def getResourceTypeId(self, type_name): query = '''SELECT id from virtual_instrument.resource_type WHERE name = %s;''' result = self._executeQuery(query, [type_name], fetch=_FETCH_ONE) if result: return result['id'] raise KeyError('No such type: %s Valid values are: %s' % (type_name, ', '.join(self.getResourceTypeNames()))) def getResourceGroupTypes(self): query = '''SELECT * from virtual_instrument.resource_group_type;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getResourceGroupTypeNames(self): return [x['name'] for x in self.getResourceGroupTypes()] def getResourceGroupTypeId(self, type_name): query = '''SELECT id from virtual_instrument.resource_group_type WHERE name = %s;''' result = self._executeQuery(query, [type_name], fetch=_FETCH_ONE) if result: return result['id'] raise KeyError('No such type: %s Valid values are: %s' % (type_name, ', '.join(self.getResourceGroupTypeNames()))) def getUnits(self): query = '''SELECT * from virtual_instrument.unit;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getUnitNames(self): return [x['units'] for x in self.getUnits()] def getUnitId(self, unit_name): query = '''SELECT * from virtual_instrument.unit WHERE units = %s;''' result = self._executeQuery(query, [unit_name], fetch=_FETCH_ONE) if result: return result['id'] raise KeyError('No such unit: %s Valid values are: %s' % (unit_name, ', '.join(self.getUnitNames()))) def getResources(self, resource_ids=None, resource_types=None, include_availability=False, claimable_capacity_lower_bound=None, claimable_capacity_upper_bound=None): '''get list of resources for the requested resource_ids and/or resource_types (may be None). By default, for each resource, no availability and total-, used- and available capacity are returned. Specify include_availability=True to get those as well. By default, for each resource, no claimable_capacity is returned. Specify claimable_capacity_lower_bound and claimable_capacity_upper_bound to get the claimable_capacity as well. :param resource_ids: only get the resources for the given ids :param resource_types: only get the resources for the given resource types :param resource_ids: when include_availability=True, also retreive the total-, used- and available capacity per resource. These number are only valid at 'now', because these numbers come from the system monitor which fill in the current situation. We can't monitor into the future. :param claimable_capacity_lower_bound: when claimable_capacity_lower_bound and claimable_capacity_upper_bound are given, then also get the (maximal) claimable_capacity per resource within the time window between lower- and claimable_capacity_upper_bound. The (maximal) claimable_capacity is only valid within the requested time window, and depends on the available_capacity and the peak of the claimed claimes within the time window. :param claimable_capacity_upper_bound: see claimable_capacity_lower_bound ''' if include_availability: query = '''SELECT * from resource_monitoring.resource_view''' else: query = '''SELECT * from virtual_instrument.resource_view''' conditions = [] qargs = [] if resource_ids is not None: if isinstance(resource_ids, int): # just a single id conditions.append('id = %s') qargs.append(resource_ids) elif resource_ids: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(resource_ids)) if resource_types is not None: if isinstance(resource_types, basestring): resource_types = [resource_types] elif not isinstance(resource_types, collections.Iterable): resource_types = [resource_types] # convert any resource_type name to id resource_type_names = set([x for x in resource_types if isinstance(x, basestring)]) if resource_type_names: resource_type_name_to_id = {x['name']:x['id'] for x in self.getResourceTypes()} resource_types = [resource_type_name_to_id[x] if isinstance(x, basestring) else x for x in resource_types] conditions.append('type_id in %s') qargs.append(tuple(resource_types)) if conditions: query += ' WHERE ' + ' AND '.join(conditions) resources = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) if claimable_capacity_lower_bound or claimable_capacity_upper_bound: if isinstance(claimable_capacity_lower_bound, datetime) and isinstance(claimable_capacity_upper_bound, datetime): # TODO: possibility for performance improvement: make single db call instead of looping. for resource in resources: resource['claimable_capacity'] = self.get_resource_claimable_capacity(resource['id'], claimable_capacity_lower_bound, claimable_capacity_upper_bound) else: raise ValueError('you should supply both claimable_capacity_lower_bound and claimable_capacity_upper_bound (as datetimes)') if include_availability: # compute unaccounted-for usage, # which is the actual used_capacity minus the currently allocated total claim size # defaults to used_capacity if no currently allocated total claim size utcnow = datetime.utcnow() recent_claimed_usages = self.getResourceUsages(resource_ids=[r['id'] for r in resources], lower_bound=utcnow-timedelta(days=7), upper_bound=utcnow, claim_statuses=['claimed']) for resource in resources: resource['misc_used_capacity'] = resource['used_capacity'] if resource['id'] in recent_claimed_usages and 'claimed' in recent_claimed_usages[resource['id']] and recent_claimed_usages[resource['id']]['claimed']: current_claimed_usage = recent_claimed_usages[resource['id']]['claimed'][-1] resource['misc_used_capacity'] = resource['used_capacity'] - current_claimed_usage['usage'] return resources def get_current_resource_usage(self, resource_id, claim_status='claimed'): if isinstance(claim_status, basestring): claim_status_id = self.getResourceClaimStatusId(claim_status) else: claim_status_id = claim_status query = '''SELECT * from resource_allocation.get_current_resource_usage(%s, %s)''' return self._executeQuery(query, (resource_id, claim_status_id), fetch=_FETCH_ONE) def get_resource_usage_at_or_before(self, resource_id, timestamp, claim_status='claimed', exactly_at=False, only_before=False): if isinstance(claim_status, basestring): claim_status_id = self.getResourceClaimStatusId(claim_status) else: claim_status_id = claim_status query = '''SELECT * from resource_allocation.get_resource_usage_at_or_before(%s, %s, %s, %s, %s, %s)''' return self._executeQuery(query, (resource_id, claim_status_id, timestamp, exactly_at, only_before, False), fetch=_FETCH_ONE) def updateResourceAvailability(self, resource_id, active=None, available_capacity=None, total_capacity=None, commit=True): if active is not None: query = '''UPDATE resource_monitoring.resource_availability SET (available) = (%s) WHERE resource_id = %s;''' self._executeQuery(query, (active, resource_id)) if available_capacity is not None and total_capacity is not None: query = '''UPDATE resource_monitoring.resource_capacity SET (available, total) = (%s, %s) WHERE resource_id = %s;''' self._executeQuery(query, (available_capacity, total_capacity, resource_id)) elif available_capacity is not None: query = '''UPDATE resource_monitoring.resource_capacity SET (available) = (%s) WHERE resource_id = %s;''' self._executeQuery(query, (available_capacity, resource_id)) elif total_capacity is not None: query = '''UPDATE resource_monitoring.resource_capacity SET (total) = (%s) WHERE resource_id = %s;''' self._executeQuery(query, (total_capacity, resource_id)) if commit: self.commit() return self.cursor.rowcount > 0 def getResourceGroups(self): query = '''SELECT rg.*, rgt.name as type from virtual_instrument.resource_group rg inner join virtual_instrument.resource_group_type rgt on rgt.id = rg.type_id; ''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getResourceGroupNames(self, resourceGroupTypeName): query = '''SELECT rg.name from virtual_instrument.resource_group rg inner join virtual_instrument.resource_group_type rgt on rgt.id = rg.type_id where rgt.name = %s; ''' return list(self._executeQuery(query, (resourceGroupTypeName,), fetch=_FETCH_ALL)) def getResourceGroupMemberships(self): '''get a dict containing the resource->group and group->group relations''' query = '''select prg.id as resource_group_parent_id, prg.name as resource_group_parent_name, crg.id as resource_group_id, crg.name as resource_group_name from virtual_instrument.resource_group_to_resource_group rg2rg left join virtual_instrument.resource_group prg on rg2rg.parent_id = prg.id inner join virtual_instrument.resource_group crg on rg2rg.child_id = crg.id ''' relations = self._executeQuery(query, fetch=_FETCH_ALL) rg_items = {} # loop over list of relations # for each unique resource_group item create a dict, and add parent_ids to it for relation in relations: rg_item_id = relation['resource_group_id'] if not rg_item_id in rg_items: rg_item = {k:relation[k] for k in ('resource_group_id', 'resource_group_name')} rg_item['child_ids'] = [] rg_item['parent_ids'] = [] rg_item['resource_ids'] = [] rg_items[rg_item_id] = rg_item parent_id = relation['resource_group_parent_id'] if parent_id != None: rg_items[rg_item_id]['parent_ids'].append(parent_id) # now that we have a full list (dict.values) of rg_items... # add a child_id reference to each item's parent # this gives us a full bidirectional graph for rg_item in rg_items.values(): parentIds = rg_item['parent_ids'] rg_item_id = rg_item['resource_group_id'] for parentId in parentIds: if parentId in rg_items: parentNode = rg_items[parentId] parentNode['child_ids'].append(rg_item_id) query = '''select prg.id as resource_group_parent_id, prg.name as resource_group_parent_name, cr.id as resource_id, cr.name as resource_name from virtual_instrument.resource_to_resource_group r2rg left join virtual_instrument.resource_group prg on r2rg.parent_id = prg.id inner join virtual_instrument.resource cr on r2rg.child_id = cr.id ''' relations = self._executeQuery(query, fetch=_FETCH_ALL) r_items = {} # loop over list of relations # for each unique resource item create a dict, and add parent_ids to it for relation in relations: r_item_id = relation['resource_id'] if not r_item_id in r_items: r_item = {k:relation[k] for k in ('resource_id', 'resource_name')} r_item['parent_group_ids'] = [] r_items[r_item_id] = r_item parent_id = relation['resource_group_parent_id'] if parent_id != None: r_items[r_item_id]['parent_group_ids'].append(parent_id) rg_items[parent_id]['resource_ids'].append(r_item_id) result = {'groups': rg_items, 'resources': r_items } return result def getResourceClaimPropertyTypes(self): query = '''SELECT * from resource_allocation.resource_claim_property_type;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getResourceClaimPropertyTypeNames(self): return [x['name'] for x in self.getResourceClaimPropertyTypes()] def getResourceClaimPropertyTypeId(self, type_name): query = '''SELECT id from resource_allocation.resource_claim_property_type WHERE name = %s;''' result = self._executeQuery(query, [type_name], fetch=_FETCH_ONE) if result: return result['id'] raise KeyError('No such resource_claim_property_type: %s Valid values are: %s' % (type_name, ', '.join(self.getResourceClaimPropertyTypeNames()))) def getResourceClaimPropertyIOTypes(self): query = '''SELECT * from resource_allocation.resource_claim_property_io_type;''' return list(self._executeQuery(query, fetch=_FETCH_ALL)) def getResourceClaimPropertyIOTypeNames(self): return [x['name'] for x in self.getResourceClaimPropertyIOTypes()] def getResourceClaimPropertyIOTypeId(self, io_type_name): query = '''SELECT id from resource_allocation.resource_claim_property_io_type WHERE name = %s;''' result = self._executeQuery(query, [io_type_name], fetch=_FETCH_ONE) if result: return result['id'] raise KeyError('No such resource_claim_property_io_type: %s Valid values are: %s' % (io_type_name, ', '.join(self.getResourceClaimPropertyIOTypeNames()))) def getResourceClaimProperties(self, claim_ids=None, task_id=None): query = '''SELECT rcpv.id, rcpv.resource_claim_id, rcpv.value, rcpv.type_id, rcpv.type_name, rcpv.io_type_id, rcpv.io_type_name, sap.number as sap_nr FROM resource_allocation.resource_claim_property_view rcpv LEFT JOIN resource_allocation.sap sap on rcpv.sap_id = sap.id''' conditions = [] qargs = [] if claim_ids is not None: if isinstance(claim_ids, int): # just a single id conditions.append('rcpv.resource_claim_id = %s') qargs.append(claim_ids) else: #assume a list/enumerable of id's conditions.append('rcpv.resource_claim_id in %s') qargs.append(tuple(claim_ids)) if task_id is not None: query += ' JOIN resource_allocation.resource_claim rc on rc.id = rcpv.resource_claim_id' conditions.append('rc.task_id = %s') qargs.append(task_id) if conditions: query += ' WHERE ' + ' AND '.join(conditions) properties = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) for p in properties: if p['sap_nr'] is None: del p['sap_nr'] return properties def insertResourceClaimProperty(self, claim_id, property_type, value, io_type, commit=True): return self.insertResourceClaimProperties([(claim_id, property_type, value, io_type)], commit) def insertResourceClaimProperties(self, props, commit=True): if not props: return [] # props is a list of tuples # each tuple prop is encoded as: (claim_id, type, value, io_type, sap_nr) # index: (0 , 1 , 2 , 3 , 4 ) # first insert unique sap numbers claim_sap_nrs = list(set([(p[0], p[4]) for p in props if p[4] is not None])) sap_ids = self.insertSAPNumbers(claim_sap_nrs, False) if sap_ids == None: return None # make sap_nr to sap_id mapping per claim_id claim_id2sap_nr2sap_id = {} for claim_sap_nr,sap_id in zip(claim_sap_nrs, sap_ids): claim_id = claim_sap_nr[0] sap_nr = claim_sap_nr[1] if claim_id not in claim_id2sap_nr2sap_id: claim_id2sap_nr2sap_id[claim_id] = {} claim_id2sap_nr2sap_id[claim_id][sap_nr] = sap_id logger.info('insertResourceClaimProperties inserting %d properties' % len(props)) # convert all property type strings to id's type_strings = set([p[1] for p in props if isinstance(p[1], basestring)]) type_string2id = {t:self.getResourceClaimPropertyTypeId(t) for t in type_strings} # convert all property io_type strings to id's io_type_strings = set([p[3] for p in props if isinstance(p[3], basestring)]) io_type_string2id = {t:self.getResourceClaimPropertyIOTypeId(t) for t in io_type_strings} # finally we have all the info we need, # so we can build the bulk property insert query insert_values = ','.join(self.cursor.mogrify('(%s, %s, %s, %s, %s)', (p[0], type_string2id[p[1]] if isinstance(p[1], basestring) else p[1], p[2], io_type_string2id[p[3]] if isinstance(p[3], basestring) else p[3], claim_id2sap_nr2sap_id[p[0]].get(p[4]) if p[0] in claim_id2sap_nr2sap_id else None)) for p in props) query = '''INSERT INTO resource_allocation.resource_claim_property (resource_claim_id, type_id, value, io_type_id, sap_id) VALUES {values} RETURNING id;'''.format(values=insert_values) ids = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] if [x for x in ids if x < 0]: logger.error("One or more properties could not be inserted. Rolling back.") self.rollback() return None if commit: self.commit() return ids def insertSAPNumbers(self, sap_numbers, commit=True): if not sap_numbers: return [] logger.info('insertSAPNumbers inserting %d sap numbers' % len(sap_numbers)) insert_values = ','.join(self.cursor.mogrify('(%s, %s)', rcid_sapnr) for rcid_sapnr in sap_numbers) query = '''INSERT INTO resource_allocation.sap (resource_claim_id, number) VALUES {values} RETURNING id;'''.format(values=insert_values) sap_ids = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] if [x for x in sap_ids if x < 0]: logger.error("One or more sap_nr's could not be inserted. Rolling back.") self.rollback() return None if commit: self.commit() return sap_ids def getResourceClaims(self, claim_ids=None, lower_bound=None, upper_bound=None, resource_ids=None, task_ids=None, status=None, resource_type=None, extended=False, include_properties=False): extended |= resource_type is not None query = '''SELECT * from %s''' % ('resource_allocation.resource_claim_extended_view' if extended else 'resource_allocation.resource_claim_view') if lower_bound and not isinstance(lower_bound, datetime): lower_bound = None if upper_bound and not isinstance(upper_bound, datetime): upper_bound = None if resource_type is not None and isinstance(resource_type, basestring): #convert resource_type string to resource_type.id resource_type = self.getResourceTypeId(resource_type) conditions = [] qargs = [] if status is not None: def _claimStatusId(s): #convert status string to status.id, if it is a string return self.getResourceClaimStatusId(s) if isinstance(s, basestring) else s if isinstance(status, (int, basestring)): # just a single id conditions.append('status_id = %s') #convert status string to status.id, if it is a string qargs.append(_claimStatusId(status)) else: #assume a list/enumerable of id's conditions.append('status_id in %s') #convert status string to status.id, if they are strings qargs.append(tuple([_claimStatusId(s) for s in status])) if claim_ids is not None: if isinstance(claim_ids, int): # just a single id conditions.append('id = %s') qargs.append(claim_ids) else: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(claim_ids)) if lower_bound: conditions.append('endtime >= %s') qargs.append(lower_bound) if upper_bound: conditions.append('starttime <= %s') qargs.append(upper_bound) if resource_ids is not None: if isinstance(resource_ids, int): # just a single id conditions.append('resource_id = %s') qargs.append(resource_ids) else: #assume a list/enumerable of id's conditions.append('resource_id in %s') qargs.append(tuple(resource_ids)) if task_ids is not None: #if task_id is normal positive we do a normal inclusive filter #if task_id is negative we do an exclusive filter if isinstance(task_ids, int): # just a single id conditions.append('task_id = %s' if task_ids >= 0 else 'task_id != %s') qargs.append(abs(task_ids)) else: inclusive_task_ids = [t for t in task_ids if t >= 0] exclusive_task_ids = [-t for t in task_ids if t < 0] if inclusive_task_ids: conditions.append('task_id in %s') qargs.append(tuple(inclusive_task_ids)) if exclusive_task_ids: conditions.append('task_id not in %s') qargs.append(tuple(exclusive_task_ids)) if resource_type is not None and extended: conditions.append('resource_type_id = %s') qargs.append(resource_type) if conditions: query += ' WHERE ' + ' AND '.join(conditions) claims = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) if self.log_queries: logger.info("found %s claims" % len(claims)) if include_properties and claims: claimDict = {c['id']: c for c in claims} claim_ids = claimDict.keys() properties = self.getResourceClaimProperties(claim_ids=claim_ids) for p in properties: try: claim = claimDict[p['resource_claim_id']] del p['resource_claim_id'] if 'sap_nr' in p: if not 'saps' in claim: claim['saps'] = {} if not p['sap_nr'] in claim['saps']: claim['saps'][p['sap_nr']] = [] claim['saps'][p['sap_nr']].append(p) del p['sap_nr'] else: if not 'properties' in claim: claim['properties'] = [] claim['properties'].append(p) except KeyError: pass for claim in claims: if 'saps' in claim: claim['saps'] = [{'sap_nr':sap_nr, 'properties':props} for sap_nr, props in claim['saps'].items()] return claims def getResourceClaim(self, id): query = '''SELECT * from resource_allocation.resource_claim_view rcv where rcv.id = %s; ''' result = self._executeQuery(query, [id], fetch=_FETCH_ONE) return dict(result) if result else None def insertResourceClaim(self, resource_id, task_id, starttime, endtime, claim_size, username, user_id, used_rcus=None, properties=None, commit=True): ''' insert one resource claim for the given task :param resource_id: id of the resource which is claimed :param task_id: id of the task for which this claim is made :param starttime: when should this claim start? (can be different than the task's starttime) :param endtime: when should this claim end? (can be different than the task's endtime) :param claim_size: how much do you want to claim? :param username: the name of the user who inserts these claims (to link the this insert action to a user) (Not used yet, fill in any name) :param user_id: the id of the user who inserts these claims (to link the this insert action to a user) (Not used yet, fill in any name) :param used_rcus: ?? :param properties: optional, <list of tuples> #see insertResourceClaimProperties :param commit: do commit, or keep transaction open. :return: id of the inserted claim. ''' # for code re-use: # put the one claim in a list # and do a bulk insert of the one-item-list claim = {'resource_id': resource_id, 'starttime': starttime, 'endtime': endtime, 'claim_size': claim_size, 'used_rcus': used_rcus} if properties: claim['properties'] = properties result = self.insertResourceClaims(task_id, [claim], username, user_id, commit) if result: return result[0] def insertResourceClaims(self, task_id, claims, username, user_id, commit=True): '''bulk insert of a list of resource claims for a task(_id). All claims are inserted with status tentative. :param task_id: the task(_id) for which these claims are inserted. Each claim always belongs to one task, and one task only. :param claims: list of claims. each claim is defined by the following dict: {'resource_id': <int>, 'starttime': <datetime>, 'endtime': <datetime>, 'claim_size': <int>, 'used_rcus': <??>, 'properties': <list of tuples> #see insertResourceClaimProperties } :param username: the name of the user who inserts these claims (to link the this insert action to a user) (Not used yet, fill in any name) :param userid: the id of the user who inserts these claims (to link the this insert action to a user) (Not used yet, fill in any int) :return: list of ints with the new claim id's claims is a list of dicts. Each dict is a claim for one resource containing the fields: starttime, endtime, status, claim_size ''' logger.info('insertResourceClaims for task_id=%d with %d claim(s)' % (task_id, len(claims))) status_strings = set([c['status'] for c in claims if isinstance(c['status'], basestring)]) if status_strings: status_string2id = {s:self.getResourceClaimStatusId(s) for s in status_strings} for c in claims: if isinstance(c['status'], basestring): c['status_id'] = status_string2id[c['status']] elif isinstance(c['status'], int): c['status_id'] = c['status'] try: claim_values = [(c['resource_id'], task_id, c['starttime'], c['endtime'], c.get('status_id', 0), c['claim_size'], c.get('used_rcus'), username, user_id) for c in claims] except Exception as e: logger.error("Invalid claim dict, rolling back. %s" % e) self.rollback() return [] try: # use psycopg2 mogrify to parse and build the insert values # this way we can insert many values in one insert query, returning the id's of each inserted row. # this is much faster than psycopg2's executeMany method insert_values = ','.join(self.cursor.mogrify('(%s, %s, %s, %s, %s, %s, %s, %s, %s)', cv) for cv in claim_values) except Exception as e: logger.error("Invalid input, rolling back: %s\n%s" % (claim_values, e)) self.rollback() return [] query = '''INSERT INTO resource_allocation.resource_claim (resource_id, task_id, starttime, endtime, status_id, claim_size, used_rcus, username, user_id) VALUES {values} RETURNING id;'''.format(values=insert_values) claimIds = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] if not claimIds or [x for x in claimIds if x < 0]: logger.error("One or more claims could not be inserted. Rolling back.") self.rollback() return [] # gather all properties for all claims # store them as list of (claim_id, prop_type, prop_value, io_type, sap_nr) tuples properties = [] for claim_id, claim in zip(claimIds, claims): if 'properties' in claim and len(claim['properties']) > 0: claim_props = [(claim_id, p['type'], p['value'], p.get('io_type', 0), p.get('sap_nr')) for p in claim['properties']] properties += claim_props if properties: property_ids = self.insertResourceClaimProperties(properties, False) if property_ids == None: return [] if commit: self.commit() logger.info('inserted %d resource claim(s) for task_id=%d' % (len(claimIds), task_id)) return claimIds def deleteResourceClaim(self, resource_claim_id, commit=True): self.deleteResourceClaims(resource_claim_ids=[resource_claim_id], commit=commit) def deleteResourceClaims(self, resource_claim_ids, commit=True): if resource_claim_ids: query = '''DELETE FROM resource_allocation.resource_claim WHERE resource_allocation.resource_claim.id in %s;''' self._executeQuery(query, [tuple(resource_claim_ids)]) if commit: self.commit() return self.cursor.rowcount > 0 return True def updateResourceClaim(self, resource_claim_id, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): return self.updateResourceClaims([resource_claim_id], None, None, resource_id, task_id, starttime, endtime, status, claim_size, username, used_rcus, user_id, commit) def updateResourceClaims(self, where_resource_claim_ids=None, where_task_ids=None, where_resource_types=None, resource_id=None, task_id=None, starttime=None, endtime=None, status=None, claim_size=None, username=None, used_rcus=None, user_id=None, commit=True): '''Update the given paramenters on all resource claims given/delimited by where_resource_claim_ids and/or where_task_ids. Inside the database consistency checks are made. For example, in case you want to set a claim's status to 'claimed', but it does not fit in the free capacity of the claim's resource, then the claim goes to 'conflict'. A claim fits, (and hence can be claimed) only if the claim_size < resource.free_capacity within the claims time window. When a claim is released (claimed->tentative) then all overlapping conflicting claims are checked again if they fit because there is more capacity available. When a claim is claimed (tentative->claimed) then all overlapping tentative claims are checked again if they still fit because there is less capacity available. When one or more claims of a task are in conflict status, then its task is set to conflict as well, and hence cannot be scheduled. When all claims of a task are not in conflict status anymore, then the task is set to approved, and hence it is possible the schedule the task. ''' status_id = status if status is not None and isinstance(status, basestring): #convert status string to status.id status_id = self.getResourceClaimStatusId(status) fields = [] values = [] if resource_id is not None: fields.append('resource_id') values.append(resource_id) if task_id is not None: fields.append('task_id') values.append(task_id) if starttime: fields.append('starttime') values.append(starttime) if endtime: fields.append('endtime') values.append(endtime) if status_id is not None: fields.append('status_id') values.append(status_id) if claim_size is not None: fields.append('claim_size') values.append(claim_size) if username is not None: fields.append('username') values.append(username) if used_rcus is not None: fields.append('used_rcus') values.append(used_rcus) if user_id is not None: fields.append('user_id') values.append(user_id) query = '''UPDATE resource_allocation.resource_claim SET ({fields}) = ({value_placeholders})'''.format(fields=', '.join(fields), value_placeholders=', '.join('%s' for x in fields)) if where_resource_claim_ids is None and where_task_ids is None: raise ValueError('please provide either "where_resource_claim_ids" and/or "where_task_ids" argument for updateResourceClaims') conditions = [] if where_resource_claim_ids is not None: if isinstance(where_resource_claim_ids, int): # just a single id conditions.append('id = %s') values.append(where_resource_claim_ids) elif len(where_resource_claim_ids): #assume a list/enumerable of id's conditions.append('id in %s') values.append(tuple(where_resource_claim_ids)) if where_task_ids is not None: if isinstance(where_task_ids, int): # just a single id conditions.append('task_id = %s') values.append(where_task_ids) elif len(where_task_ids): #assume a list/enumerable of id's conditions.append('task_id in %s') values.append(tuple(where_task_ids)) if where_resource_types is not None: if isinstance(where_resource_types, basestring) or isinstance(where_resource_types, int): where_resource_types = [where_resource_types] elif not isinstance(where_resource_types, collections.Iterable): where_resource_types = [where_resource_types] # convert any resource_type name to id resource_type_names = set([x for x in where_resource_types if isinstance(x, basestring)]) if resource_type_names: resource_type_name_to_id = {x['name']:x['id'] for x in self.getResourceTypes()} where_resource_type_ids = [resource_type_name_to_id[x] if isinstance(x, basestring) else x for x in where_resource_types] else: where_resource_type_ids = [x for x in where_resource_types] conditions.append('resource_id in (SELECT r.id FROM virtual_instrument.resource r WHERE r.type_id in %s)') values.append(tuple(where_resource_type_ids)) query += ' WHERE ' + ' AND '.join(conditions) self._executeQuery(query, values) if commit: self.commit() return self.cursor.rowcount > 0 def updateTaskAndResourceClaims(self, task_id, starttime=None, endtime=None, task_status=None, claim_status=None, username=None, used_rcus=None, user_id=None, where_resource_types=None, commit=True): '''combination of updateResourceClaims and updateTask in one transaction''' updated = True if (starttime or endtime or claim_status is not None or username is not None or used_rcus is not None or user_id is not None): # update the claims as well updated &= self.updateResourceClaims(where_task_ids=task_id, where_resource_types=where_resource_types, starttime=starttime, endtime=endtime, status=claim_status, username=username, used_rcus=used_rcus, user_id=user_id, commit=False) if starttime or endtime: updated &= self.updateTaskStartEndTimes(task_id, starttime=starttime, endtime=endtime, commit=False) if task_status is not None : updated &= self.updateTask(task_id, task_status=task_status, commit=False) if commit: self.commit() return updated def get_overlapping_claims(self, claim_id, claim_status='claimed'): '''returns a list of claimed claims which overlap with given claim and which prevent the given claim to be claimed (cause it to be in conflict)''' if isinstance(claim_status, basestring): claim_status_id = self.getResourceClaimStatusId(claim_status) else: claim_status_id = claim_status query = '''SELECT * from resource_allocation.get_overlapping_claims(%s, %s)''' return list(self._executeQuery(query, (claim_id, claim_status_id), fetch=_FETCH_ALL)) def get_overlapping_tasks(self, claim_id, claim_status='claimed'): '''returns a list of tasks which overlap with given claim(s) and which prevent the given claim(s) to be claimed (cause it to be in conflict)''' conflicting_claims = self.get_overlapping_claims(claim_id, claim_status) task_ids = set([c['task_id'] for c in conflicting_claims]) return self.getTasks(task_ids=task_ids) def get_max_resource_usage_between(self, resource_id, lower_bound, upper_bound, claim_status='claimed'): if isinstance(claim_status, basestring): claim_status_id = self.getResourceClaimStatusId(claim_status) else: claim_status_id = claim_status result = {'usage': 0, 'status_id': claim_status_id, 'as_of_timestamp': lower_bound, 'resource_id': resource_id} query = '''SELECT * from resource_allocation.get_max_resource_usage_between(%s, %s, %s, %s)''' qresult = self._executeQuery(query, (resource_id, claim_status_id, lower_bound, upper_bound), fetch=_FETCH_ONE) if qresult and qresult.get('usage') is not None: result['usage'] = qresult.get('usage') return result def get_resource_claimable_capacity(self, resource_id, lower_bound, upper_bound): '''get the claimable capacity for the given resource within the timewindow given by lower_bound and upper_bound. this is the resource's available capacity (total-used) minus the maximum allocated usage in that timewindow.''' if resource_id is None or lower_bound is None or upper_bound is None: raise ValueError('resource_id and/or lower_bound and/or upper_bound cannot be None') query = '''SELECT * from resource_allocation.get_resource_claimable_capacity_between(%s, %s, %s)''' qresult = self._executeQuery(query, (resource_id, lower_bound, upper_bound), fetch=_FETCH_ONE) if qresult: return qresult.get('get_resource_claimable_capacity_between', 0) else: return 0 def rebuild_resource_usages_from_claims(self, resource_id=None, claim_status=None): '''(re)builds the resource_usages table from all currently known resource_claims''' if isinstance(claim_status, basestring): claim_status_id = self.getResourceClaimStatusId(claim_status) else: claim_status_id = claim_status if resource_id is None and claim_status_id is None: self._executeQuery('SELECT * from resource_allocation.rebuild_resource_usages_from_claims()', fetch=_FETCH_NONE) elif claim_status_id is None: self._executeQuery('SELECT * from resource_allocation.rebuild_resource_usages_from_claims_for_resource(%s)', (resource_id,), fetch=_FETCH_NONE) else: self._executeQuery('SELECT * from resource_allocation.rebuild_resource_usages_from_claims_for_resource_of_status(%s, %s)', (resource_id, claim_status_id), fetch=_FETCH_NONE) def insertSpecificationAndTask(self, mom_id, otdb_id, task_status, task_type, starttime, endtime, content, cluster, commit=True): ''' Insert a new specification and task in one transaction. Removes existing task with same otdb_id if present in the same transaction. Removes existing task with same mom_id if present in the same transaction. ''' try: existing_task = self.getTask(otdb_id=otdb_id) if existing_task: # delete old specification, task, and resource claims using cascaded delete self.deleteSpecification(existing_task['specification_id'], False) else: existing_task = self.getTask(mom_id=mom_id) if existing_task: # delete old specification, task, and resource claims using cascaded delete self.deleteSpecification(existing_task['specification_id'], False) specId = self.insertSpecification(starttime, endtime, content, cluster, False) taskId = self.insertTask(mom_id, otdb_id, task_status, task_type, specId, False) if specId >= 0 and taskId >= 0: # restore "old" predecessor/successor relationships if needed if existing_task: if existing_task['predecessor_ids']: self.insertTaskPredecessors(taskId, existing_task['predecessor_ids'], False) if existing_task['successor_ids']: for suc_id in existing_task['successor_ids']: self.insertTaskPredecessor(suc_id, taskId, False) if commit: self.commit() return {'inserted': True, 'specification_id': specId, 'task_id': taskId} except Exception as e: logger.error(e) self.rollback() return {'inserted': False, 'specification_id': None, 'task_id': None} def getTaskConflictReasons(self, task_ids=None): query = '''SELECT * from resource_allocation.task_conflict_reason_view''' conditions = [] qargs = [] if task_ids is not None: if isinstance(task_ids, int): # just a single id conditions.append('task_id = %s') qargs.append(task_ids) else: #assume a list/enumerable of id's conditions.append('task_id in %s') qargs.append(tuple(task_ids)) if conditions: query += ' WHERE ' + ' AND '.join(conditions) conflict_reasons = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) return conflict_reasons def insertTaskConflicts(self, task_id, conflict_reason_ids, commit=True): if not self.cursor: self._connect() insert_values = ','.join(self.cursor.mogrify('(%s, %s)', (task_id, cr_id)) for cr_id in conflict_reason_ids) query = '''INSERT INTO resource_allocation.task_conflict_reason (task_id, conflict_reason_id) VALUES {values} RETURNING id;'''.format(values=insert_values) ids = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] if [x for x in ids if x < 0]: logger.error("One or more conflict reasons could not be inserted. Rolling back.") self.rollback() return None if commit: self.commit() return ids def getResourceClaimConflictReasons(self, claim_ids=None, resource_ids=None, task_ids=None): query = '''SELECT * from resource_allocation.resource_claim_conflict_reason_view''' conditions = [] qargs = [] if claim_ids is not None: if isinstance(claim_ids, int): # just a single id conditions.append('id = %s') qargs.append(claim_ids) else: #assume a list/enumerable of id's conditions.append('id in %s') qargs.append(tuple(claim_ids)) if resource_ids is not None: if isinstance(resource_ids, int): # just a single id conditions.append('resource_id = %s') qargs.append(resource_ids) else: #assume a list/enumerable of id's conditions.append('resource_id in %s') qargs.append(tuple(resource_ids)) if task_ids is not None: if isinstance(task_ids, int): # just a single id conditions.append('task_id = %s') qargs.append(task_ids) else: #assume a list/enumerable of id's conditions.append('task_id in %s') qargs.append(tuple(task_ids)) if conditions: query += ' WHERE ' + ' AND '.join(conditions) conflict_reasons = list(self._executeQuery(query, qargs, fetch=_FETCH_ALL)) return conflict_reasons def insertResourceClaimConflicts(self, claim_id, conflict_reason_ids, commit=True): if not self.cursor: self._connect() insert_values = ','.join(self.cursor.mogrify('(%s, %s)', (claim_id, cr_id)) for cr_id in conflict_reason_ids) query = '''INSERT INTO resource_allocation.resource_claim_conflict_reason (resource_claim_id, conflict_reason_id) VALUES {values} RETURNING id;'''.format(values=insert_values) ids = [x['id'] for x in self._executeQuery(query, fetch=_FETCH_ALL)] if [x for x in ids if x < 0]: logger.error("One or more conflict reasons could not be inserted. Rolling back.") self.rollback() return None if commit: self.commit() return ids def getResourceUsages(self, lower_bound=None, upper_bound=None, resource_ids=None, claim_statuses=None): """ Get the resource usages over time within the optionally given time window, resource_ids, and/or claim_statuses. :param lower_bound: filter for usages newer than lower_bound :param upper_bound: filter for usages older than upper_bound :param resource_ids: filter for usages for the given resource id(s) :param claim_statuses: filter for usages for the given claim status(es) :return: a nested dict with resource_id at the first level, then claim_status(name) at the second level, and then a list of time ordered usages. """ usages_per_resource = {} query = '''SELECT * from resource_allocation.resource_usage''' conditions = [] qargs = [] if lower_bound is not None: conditions.append('as_of_timestamp >= %s') qargs.append(lower_bound) if upper_bound is not None: conditions.append('as_of_timestamp <= %s') qargs.append(upper_bound) if resource_ids is not None: if isinstance(resource_ids, int): # just a single id conditions.append('resource_id = %s') qargs.append(resource_ids) elif resource_ids: #assume a list/enumerable of id's conditions.append('resource_id in %s') qargs.append(tuple(resource_ids)) if claim_statuses is not None: if isinstance(claim_statuses, basestring): claim_statuses = [claim_statuses] elif not isinstance(claim_statuses, collections.Iterable): claim_statuses = [claim_statuses] # convert any claim_status name to id claim_status_names = set([x for x in claim_statuses if isinstance(x, basestring)]) if claim_status_names: claim_status_name_to_id = {x['name']:x['id'] for x in self.getResourceClaimStatuses()} claim_status_ids = [claim_status_name_to_id[x] if isinstance(x, basestring) else x for x in claim_status_names] conditions.append('status_id in %s') qargs.append(tuple(claim_status_ids)) if conditions: query += ' WHERE ' + ' AND '.join(conditions) query += ' ORDER BY as_of_timestamp' usages = self._executeQuery(query, qargs, fetch=_FETCH_ALL) for usage in usages: resource_id = usage['resource_id'] if resource_id not in usages_per_resource: usages_per_resource[resource_id] = {} status_id = usage['status_id'] if status_id not in usages_per_resource[resource_id]: usages_per_resource[resource_id][status_id] = [] usages_per_resource[resource_id][status_id].append({'as_of_timestamp':usage['as_of_timestamp'], 'usage':usage['usage']}) # replace resource claim status id's by names for resource_id, resource_usages_per_status in usages_per_resource.items(): for status_id, usages in resource_usages_per_status.items(): resource_usages_per_status[self.getResourceClaimStatusName(status_id)] = usages del resource_usages_per_status[status_id] return usages_per_resource def getResourceAllocationConfig(self, sql_like_name_pattern=None): ''' The argument sql_like_name_pattern can be e.g. 'max_fill_ratio_%' ''' query = "SELECT name, value FROM resource_allocation.config" if sql_like_name_pattern is not None: query += " WHERE name LIKE '%s'" % sql_like_name_pattern return list(self._executeQuery(query, fetch=_FETCH_ALL)) if __name__ == '__main__': logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',level=logging.INFO) # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs some test queries on the radb') parser.add_option_group(dbcredentials.options_group(parser)) parser.set_defaults(dbcredentials="RADB") (options, args) = parser.parse_args() dbcreds = dbcredentials.parse_options(options) logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) db = RADatabase(dbcreds=dbcreds, log_queries=True) def resultPrint(method): print '\n-- ' + str(method.__name__) + ' --' print '\n'.join([str(x) for x in method()]) resultPrint(db.getTaskStatuses) resultPrint(db.getTaskStatusNames) resultPrint(db.getTaskTypes) resultPrint(db.getTaskTypeNames) resultPrint(db.getResourceClaimStatuses) resultPrint(db.getResourceClaimStatusNames) resultPrint(db.getUnits) resultPrint(db.getUnitNames) resultPrint(db.getResourceTypes) resultPrint(db.getResourceTypeNames) resultPrint(db.getResourceGroupTypes) resultPrint(db.getResourceGroupTypeNames) resultPrint(db.getResources) resultPrint(db.getResourceGroups) resultPrint(db.getResourceGroupNames('cluster')) resultPrint(db.getResourceGroupMemberships) resultPrint(db.getTasks) resultPrint(db.getSpecifications) resultPrint(db.getResourceClaims) resultPrint(db.getResourceClaimProperties) resultPrint(db.getResourceAllocationConfig)