Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
assignment.py 43.60 KiB
#!/usr/bin/env python

# Copyright (C) 2015-2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.    See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: assignment.py 1580 2015-09-30 14:18:57Z loose $

"""
ResourceAssigner inserts/updates tasks and assigns resources to it based on incoming parset.
"""

import logging
from datetime import datetime, timedelta
import time

from lofar.common.util import humanreadablesize
from lofar.messaging.messages import EventMessage
from lofar.messaging.messagebus import ToBus
from lofar.messaging.RPC import RPC, RPCException
from lofar.parameterset import parameterset

from lofar.sas.resourceassignment.resourceassigner.schedulechecker import movePipelineAfterItsPredecessors

from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_BUSNAME as RADB_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentservice.config import DEFAULT_SERVICENAME as RADB_SERVICENAME

from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_BUSNAME as RE_BUSNAME
from lofar.sas.resourceassignment.resourceassignmentestimator.config import DEFAULT_SERVICENAME as RE_SERVICENAME

from lofar.sas.otdb.otdbrpc import OTDBRPC
from lofar.sas.otdb.config import DEFAULT_OTDB_SERVICE_BUSNAME, DEFAULT_OTDB_SERVICENAME

from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_BUSNAME
from lofar.sas.resourceassignment.resourceassigner.config import DEFAULT_RA_NOTIFICATION_PREFIX

from lofar.mom.momqueryservice.momqueryrpc import MoMQueryRPC
from lofar.mom.momqueryservice.config import DEFAULT_MOMQUERY_BUSNAME, DEFAULT_MOMQUERY_SERVICENAME

from lofar.sas.datamanagement.storagequery.rpc import StorageQueryRPC
from lofar.sas.datamanagement.storagequery.config import DEFAULT_BUSNAME as DEFAULT_STORAGEQUERY_BUSNAME
from lofar.sas.datamanagement.storagequery.config import DEFAULT_SERVICENAME as DEFAULT_STORAGEQUERY_SERVICENAME

from lofar.sas.datamanagement.cleanup.rpc import CleanupRPC
from lofar.sas.datamanagement.cleanup.config import DEFAULT_BUSNAME as DEFAULT_CLEANUP_BUSNAME
from lofar.sas.datamanagement.cleanup.config import DEFAULT_SERVICENAME as DEFAULT_CLEANUP_SERVICENAME

logger = logging.getLogger(__name__)

class ResourceAssigner():
    def __init__(self,
                 radb_busname=RADB_BUSNAME,
                 radb_servicename=RADB_SERVICENAME,
                 re_busname=RE_BUSNAME,
                 re_servicename=RE_SERVICENAME,
                 otdb_busname=DEFAULT_OTDB_SERVICE_BUSNAME,
                 otdb_servicename=DEFAULT_OTDB_SERVICENAME,
                 storagequery_busname=DEFAULT_STORAGEQUERY_BUSNAME,
                 storagequery_servicename=DEFAULT_STORAGEQUERY_SERVICENAME,
                 cleanup_busname=DEFAULT_CLEANUP_BUSNAME,
                 cleanup_servicename=DEFAULT_CLEANUP_SERVICENAME,
                 ra_notification_busname=DEFAULT_RA_NOTIFICATION_BUSNAME,
                 ra_notification_prefix=DEFAULT_RA_NOTIFICATION_PREFIX,
                 mom_busname=DEFAULT_MOMQUERY_BUSNAME,
                 mom_servicename=DEFAULT_MOMQUERY_SERVICENAME,
                 broker=None):
        """
        ResourceAssigner inserts/updates tasks in the radb and assigns resources to it based on incoming parset.
        :param radb_busname: busname on which the radb service listens (default: lofar.ra.command)
        :param radb_servicename: servicename of the radb service (default: RADBService)
        :param re_busname: busname on which the resource estimator service listens (default: lofar.ra.command)
        :param re_servicename: servicename of the resource estimator service (default: ResourceEstimation)
        :param broker: Valid Qpid broker host (default: None, which means localhost)
        """
        self.radbrpc = RARPC(servicename=radb_servicename, busname=radb_busname, broker=broker, timeout=180)
        self.rerpc = RPC(re_servicename, busname=re_busname, broker=broker, ForwardExceptions=True, timeout=180)
        self.otdbrpc = OTDBRPC(busname=otdb_busname, servicename=otdb_servicename, broker=broker, timeout=180) ## , ForwardExceptions=True hardcoded in RPCWrapper right now
        self.momrpc = MoMQueryRPC(servicename=mom_servicename, busname=mom_busname, broker=broker, timeout=180)
        self.sqrpc = StorageQueryRPC(busname=storagequery_busname, servicename=storagequery_servicename, broker=broker)
        self.curpc = CleanupRPC(busname=cleanup_busname, servicename=cleanup_servicename, broker=broker)
        self.ra_notification_bus = ToBus(address=ra_notification_busname, broker=broker)
        self.ra_notification_prefix = ra_notification_prefix

    def __enter__(self):
        """Internal use only. (handles scope 'with')"""
        self.open()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Internal use only. (handles scope 'with')"""
        self.close()

    def open(self):
        """Open rpc connections to radb service and resource estimator service"""
        self.radbrpc.open()
        self.rerpc.open()
        self.otdbrpc.open()
        self.momrpc.open()
        self.sqrpc.open()
        self.curpc.open()
        self.ra_notification_bus.open()

    def close(self):
        """Close rpc connections to radb service and resource estimator service"""
        self.radbrpc.close()
        self.rerpc.close()
        self.otdbrpc.close()
        self.momrpc.close()
        self.sqrpc.close()
        self.curpc.close()
        self.ra_notification_bus.close()

    def doAssignment(self, specification_tree):
        logger.info('doAssignment: specification_tree=%s' % (specification_tree))

        otdb_id = specification_tree['otdb_id']
        status = specification_tree.get('state', '').lower()
        if status not in ['approved', 'prescheduled']:  # For approved we only do a few checks and put it in the RADB
            logger.warn('skipping specification for task otdb_id=%s, because status=%s (not prescheduled)', otdb_id, status)
            return

        try:
            mainParset = parameterset(specification_tree['specification'])
        except Exception as e:
            logger.error(str(e))
            return

        # Only assign resources for task output to known clusters
        try:
            clusterNameSet = self.getClusterNames(mainParset)
            if str() in clusterNameSet or len(clusterNameSet) != 1:
                # Empty set or name is always an error.
                # TODO: To support >1 cluster per obs,
                #   self.radbrpc.insertSpecificationAndTask() as called below and the radb would need to take >1 cluster name
                #   Also, there is only 1 processingClusterName in the parset, but we do not always want to pipeline process all obs outputs, or not on 1 cluster
                raise Exception('clusterNameSet must have a single non-empty name for all enabled DataProducts, but is: %s' % clusterNameSet)
            clusterName = clusterNameSet.pop()

            # Retrieve known cluster names (not all may be a valid storage target, but we cannot know...)
            knownClusterSet = {clus['name'] for clus in self.radbrpc.getResourceGroupNames('cluster')}
            logger.info('known clusters: %s', knownClusterSet)
            if clusterName not in knownClusterSet:
                raise Exception('skipping resource assignment for task with cluster name \'%s\' not in known clusters %s' % (clusterName, knownClusterSet))
        except Exception as e:
            logger.error(str(e))
            return


        def applySaneStartEndTime():
            startTime = datetime.utcnow() + timedelta(minutes=1)

            maxPredecessorEndTime = self.getMaxPredecessorEndTime(specification_tree)
            if maxPredecessorEndTime is not None and maxPredecessorEndTime > startTime:
                startTime = maxPredecessorEndTime + timedelta(minutes=1)

            taskDuration = mainParset.getInt('Observation.Scheduler.taskDuration')  # no default passed on purpose
            taskDuration = timedelta(seconds=taskDuration) if taskDuration > 0 else timedelta(hours=1)

            endTime = startTime + taskDuration

            logger.warning('Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
                           startTime, endTime, otdb_id)

            logger.info('uploading auto-generated start/end time  (%s, %s) to otdb for otdb_id=%s', startTime, endTime, otdb_id)
            self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.startTime': startTime.strftime('%Y-%m-%d %H:%M:%S'),
                                                         'LOFAR.ObsSW.Observation.stopTime': endTime.strftime('%Y-%m-%d %H:%M:%S')})
            return startTime, endTime


        # TODO: don't fix this crap here. Bad start/stop time has to go to error, like any other bad spec part. Fix the cause! Idem for MoM fix up below.
        try:
            startTime = datetime.strptime(mainParset.getString('Observation.startTime'), '%Y-%m-%d %H:%M:%S')
            endTime = datetime.strptime(mainParset.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S')
            if startTime < datetime.utcnow():
                raise ValueError('parsed startTime (UTC) lies in the past (or large system clock offset)')  # same exc type as strptime
        except ValueError:
            logger.warning('cannot parse start/stop time from specification for otdb_id=%s. Searching for sane defaults...', otdb_id)
            try:
                startTime, endTime = applySaneStartEndTime()
                if startTime < datetime.utcnow():
                    raise Exception('"sane" start/stop time turned out to be insane')
            except Exception as e:
                logger.error(str(e))
                return

        try:
            # fix for MoM bug introduced before NV's holiday
            # MoM sets ProcessingCluster.clusterName to CEP2 even when inputxml says CEP4
            # so, override it here if needed, and update to otdb
            processingClusterName = mainParset.getString('Observation.Cluster.ProcessingCluster.clusterName', '')
            if processingClusterName != clusterName:
                logger.info('overwriting and uploading processingClusterName to otdb from \'%s\' to \'%s\' for otdb_id=%s',
                            processingClusterName, clusterName, otdb_id)
                self.otdbrpc.taskSetSpecification(otdb_id, { 'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': clusterName })
        except Exception as e:
            logger.error(str(e))
            return

        logger.info('doAssignment: Accepted specification')

        # insert new task and specification in the radb
        # any existing specification and task with same otdb_id will be deleted automatically
        momId = mainParset.getInt('Observation.momID', -1)
        taskType = specification_tree.get('task_type', '')  # i.e. u'observation' or u'pipeline
        logger.info('doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s cluster=%s' %
                    (momId, otdb_id, status, taskType, startTime, endTime, clusterName))
        try:
            result = self.radbrpc.insertSpecificationAndTask(momId, otdb_id, status, taskType,
                                                             startTime, endTime, str(mainParset), clusterName)
        except Exception as e:
            logger.error(str(e))
            result = dict()  # handled next
        if not result['inserted']:
            logger.error('could not insert specification and task: result = %s', result)
            return

        specificationId = result['specification_id']
        taskId = result['task_id']
        logger.info('doAssignment: inserted specification (id=%s) and task (id=%s)' % (specificationId, taskId))

        if status != 'prescheduled':  # should only happen for approved
            logger.info('skipping resource assignment for task otdb_id=%s, because status=%s' % (otdb_id, status))
            return

        task = None
        try:
            task = self.radbrpc.getTask(taskId)
        except Exception as e:
            logger.error(str(e))
            return
        logger.info('doAssignment: task=%s', task)

        # From now on we can and have to _sendStateChange() to 'error' or otherwise on unrecov errors
        errStatus = 'error'

        try:
            self.processPredecessors(task)
            self.processSuccessors(task)
        except Exception as e:
            logger.exception(str(e))
            self._sendStateChange(task, errStatus)
            return

        # Request estimates of needed resources from Resource Estimator. Check reply.
        try:
            reReply, rerpcStatus = self.rerpc({"specification_tree" : specification_tree}, timeout=10)
            logger.info('doAssignment: Resource Estimator reply = %s', reReply)

            if str(otdb_id) not in reReply:
                raise Exception("no otdb_id %s found in estimator results %s" % (otdb_id, reReply))
            estimates = reReply[str(otdb_id)]

            if taskType not in estimates:
                raise Exception("no task type %s found in estimator results %s" % (taskType, estimates))
            estimates = estimates[taskType]

            if 'errors' in estimates and estimates['errors']:
                for error in estimates['errors']:
                    logger.error("Error from Resource Estimator: %s", error)
                raise Exception("Error(s) in estimator for otdb_id=%s radb_id=%s" % (otdb_id, taskId))

            if 'estimates' not in estimates or any('resource_types' not in est for est in estimates['estimates']):
                raise Exception("missing 'resource_types' in 'estimates' in estimator results: %s" % estimates)
            estimates = estimates['estimates']

            if not all(est_val > 0 for est in estimates for est_val in est['resource_types'].values()):
                # Avoid div by 0 and inf looping from estimate < 0 later on.
                raise Exception("at least one of the estimates is not a positive number")
        except Exception as e:
            logger.error(str(e))
            self._sendStateChange(task, errStatus)
            return

        # get resources and related info from radb
        try:
            db_resource_group_mships = self.radbrpc.getResourceGroupMemberships()
            db_rgp2rgp = db_resource_group_mships['groups']  # resource-group-to-resource-group relations
            # resource-to-resource-group relations are under db_resource_group_mships['resources']

            db_resource_list = self.radbrpc.getResources(include_availability=True)
            db_resource_claims_rcus = self.radbrpc.getResourceClaims(resource_type='rcu', status=['claimed','allocated'],
                                                                     lower_bound=startTime, upper_bound=endTime)
            db_resource_types = {rt['name']:rt['id'] for rt in self.radbrpc.getResourceTypes()}
            db_resource_prop_types = {rcpt['name']:rcpt['id'] for rcpt in self.radbrpc.getResourceClaimPropertyTypes()}
            db_resource_max_fill_ratios = self.radbrpc.getResourceAllocationConfig(sql_like_name_pattern='max_fill_ratio_%')
        except Exception as e:
            logger.error(str(e))
            self._sendStateChange(task, errStatus)
            return

        self.applyMaxFillRatios(db_resource_list, db_rgp2rgp, db_resource_types, db_resource_max_fill_ratios)

        # Assume estimates are (close enough to) accurate to determine resource claims for this task.
        # Try to get a set of non-conflicting claims from availability info in the RA DB.
        # If that fails, insert no claims and set the task straight to conflict.
        # Also, inserted claims are still automatically validated, there can be a race.
        # If not enough resources are available after all, claims are put to conflict status.
        # If any claim is in conflict state, then the task is put to conflict status as well.
        claims = self.getClaimsForTask(task, estimates, db_resource_list, db_resource_claims_rcus, db_rgp2rgp, db_resource_types,
                                       db_resource_prop_types)
        if claims is None:
            self._sendStateChange(task, 'error')
            return
        if claims:
            logger.info('doAssignment: inserting %d claims in the radb: %s', len(claims), claims)
            try:
                claim_ids = self.radbrpc.insertResourceClaims(task['id'], claims, 1, 'anonymous', -1)['ids']
                logger.info('doAssignment: %d claims were inserted in the radb' % len(claim_ids))
                if len(claim_ids) != len(claims):
                    raise Exception('doAssignment: too few claims were inserted in the radb')

                conflictingClaims = self.radbrpc.getResourceClaims(task_ids=taskId, status='conflict')
                if conflictingClaims:
                    # Will do _sendStateChange(task, 'conflict'), and radb sets task status to conflict automatically
                    raise Exception('doAssignment: Task cannot be scheduled, because of %d conflicting claims: %s' %
                                    (len(conflictingClaims), conflictingClaims))

                logger.info('doAssignment: all resources for task %s were succesfully claimed. Setting claim statuses to allocated' % (taskId,))
                self.radbrpc.updateTaskAndResourceClaims(taskId, claim_status='allocated')
            except Exception as e:
                logger.error(str(e))
                self._sendStateChange(task, 'conflict')
                return

        # remove any output and/or intermediate data for restarting pipelines
        if task['type'] == 'pipeline':
            try:
                du_result = self.sqrpc.getDiskUsageForOTDBId(task['otdb_id'], include_scratch_paths=True, force_update=True)
                if du_result['found'] and du_result.get('disk_usage', 0) > 0:
                    logger.info("removing data on disk from previous run for otdb_id %s", otdb_id)
                    result = self.curpc.removeTaskData(task['otdb_id'])
                    if not result['deleted']:
                        logger.warning("could not remove all data on disk from previous run for otdb_id %s: %s", otdb_id, result['message'])
            except Exception as e:
                logger.error(str(e))  # in line with failure as warning just above: allow going to scheduled state here too

        # send notification that the task was scheduled
        self._sendStateChange(task, 'scheduled')

    def _sendStateChange(self, task, status):
        if status == 'scheduled' or status == 'conflict' or status == 'error':
            content={'radb_id': task['id'], 'otdb_id':task['otdb_id'], 'mom_id': task['mom_id']}
            subject= 'Task' + status[0].upper() + status[1:]
        else:  # should not end up here (bug)
            logger.error('_sendStateChange(): bug: Not sending notification as status is %s' % status)
            return

        try:
            if status != 'scheduled':
                # another service sets the parset spec in otdb, and updated otdb task status to scheduled, which is then synced to radb
                self.radbrpc.updateTask(task['id'], task_status=status)

            msg = EventMessage(context=self.ra_notification_prefix + subject, content=content)
            logger.info('Sending notification %s: %s' % (subject, str(content).replace('\n', ' ')))
            self.ra_notification_bus.send(msg)
        except Exception as e:
            logger.error(str(e))

    def processPredecessors(self, task):
        mom_id = task['mom_id']

        predecessor_ids = self.momrpc.getPredecessorIds(mom_id)
        if str(mom_id) not in predecessor_ids or not predecessor_ids[str(mom_id)]:
            logger.info('no predecessors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id)
            return
        predecessor_mom_ids = predecessor_ids[str(mom_id)]

        logger.info('processing predecessor mom_ids=%s for mom_id=%s otdb_id=%s', predecessor_mom_ids, task['mom_id'], task['otdb_id'])

        for predecessor_mom_id in predecessor_mom_ids:
            # check if the predecessor needs to be linked to this task
            predecessor_task = self.radbrpc.getTask(mom_id=predecessor_mom_id)
            if predecessor_task:
                if predecessor_task['id'] not in task['predecessor_ids']:
                    logger.info('connecting predecessor task with mom_id=%s otdb_id=%s to its successor with mom_id=%s otdb_id=%s',
                                predecessor_task['mom_id'], predecessor_task['otdb_id'], task['mom_id'], task['otdb_id'])
                    self.radbrpc.insertTaskPredecessor(task['id'], predecessor_task['id'])
            else:
                logger.warning('could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', predecessor_mom_id, task['otdb_id'])


    def processSuccessors(self, task):
        mom_id = task['mom_id']

        successor_ids = self.momrpc.getSuccessorIds(mom_id)
        if str(mom_id) not in successor_ids or not successor_ids[str(mom_id)]:
            logger.info('no successors for otdb_id=%s mom_id=%s', task['otdb_id'], mom_id)
            return
        successor_mom_ids = successor_ids[str(mom_id)]

        logger.info('processing successor mom_ids=%s for mom_id=%s otdb_id=%s', successor_mom_ids, task['mom_id'], task['otdb_id'])

        for successor_mom_id in successor_mom_ids:
            # check if the successor needs to be linked to this task
            successor_task = self.radbrpc.getTask(mom_id=successor_mom_id)
            if successor_task:
                if successor_task['id'] not in task['successor_ids']:
                    logger.info('connecting successor task with mom_id=%s otdb_id=%s to its predecessor with mom_id=%s otdb_id=%s',
                                successor_task['mom_id'], successor_task['otdb_id'], task['mom_id'], task['otdb_id'])
                    self.radbrpc.insertTaskPredecessor(successor_task['id'], task['id'])
                movePipelineAfterItsPredecessors(successor_task, self.radbrpc)
            else:
                logger.warning('could not find successor task with mom_id=%s in radb for task otdb_id=%s', successor_mom_id, task['otdb_id'])

    def getMaxPredecessorEndTime(self, specification_tree):
        predecessor_specs = [parameterset(tree['specification']) for tree in specification_tree['predecessors']]
        predecessor_endTimes = [datetime.strptime(spec.getString('Observation.stopTime'), '%Y-%m-%d %H:%M:%S') for spec in predecessor_specs]
        if predecessor_endTimes:
            return max(predecessor_endTimes)
        return None

    def getClusterNames(self, parset):
        """ Return set of storage cluster names for all enabled output data product types in parset,
            or raise for an enabled output data product type without storage cluster name.
        """
        clusterNames = set()

        keys = ['Output_Correlated',
                'Output_IncoherentStokes',
                'Output_CoherentStokes',
                'Output_InstrumentModel',
                'Output_SkyImage',
                'Output_Pulsar']
        for key in keys:
            if parset.getBool('Observation.DataProducts.%s.enabled' % key, False):
                name = parset.getString('Observation.DataProducts.%s.storageClusterName' % key)  # may raise; don't pass default arg
                clusterNames.add(name)

        return clusterNames

    def applyMaxFillRatios(self, db_resource_list, db_rgp2rgp, db_resource_types,
                           db_resource_max_fill_ratios):
        ''' Applies db_resource_max_fill_ratios to db_resource_list.
            db_resource_max_fill_ratios is e.g. [{'name': max_fill_ratio_CEP4_storage, 'value': 0.85}, ...]
        '''
        prefix = 'max_fill_ratio_'  # + resource_group_name + '_' + resource_type_name

        for ratio_dict in db_resource_max_fill_ratios:
            for res_type_name in db_resource_types:
                if not ratio_dict['name'].endswith('_' + res_type_name):
                    continue
                res_type_id = db_resource_types[res_type_name]
                res_group_name = ratio_dict['name'][len(prefix) : -len(res_type_name)-1]
                res_group_id = self.getResourceGroupIdByName(db_rgp2rgp, res_group_name)
                if res_group_id is None:
                    logger.warn('applyMaxFillRatios: could not find resource group for %s', ratio_dict['name'])
                    break

                res_group = db_rgp2rgp[res_group_id]
                for res_id in res_group['resource_ids']:
                    res = db_resource_list[res_id]
                    if res['type_id'] != res_type_id:
                        continue
                    try:
                        ratio = float(ratio_dict['value'])
                        if ratio < 0.0 or ratio > 1.0:
                            raise ValueError('value not a float in range [0.0, 1.0]')
                    except ValueError as err:
                        logger.error('applyMaxFillRatios: %s = %s: %s', ratio_dict['name'], ratio_dict['value'], str(err))
                        break

                    res['available_capacity'] = min(res['available_capacity'], int(ratio * res['total_capacity']))
                    logger.info('applyMaxFillRatios: applied %s = %f', ratio_dict['name'], ratio)

    def getClaimsForTask(self, task, needed_resources_list, db_resource_list, db_resource_claims_rcus,
                         db_rgp2rgp, db_resource_types, db_resource_prop_types):
        """ Return claims that satisfy needed_resources_list within db_resource_list, or an empty claim list if no 
            non-conflicting claims could be found, or None on error.
            
            :param task: an instance of an RADB task object
            :param needed_resources_list: a list of resources to be claimed
            :param db_resource_list: all resources in RADB with availability information
            :param db_resource_claims_rcus: all claims on RCUs that fall within the start/end time of this task
            :param db_rgp2rgp: all group->group relations from RADB
            :param db_resource_types: all virtual instrument resource types (and their units) from RADB   
            :param db_resource_prop_types: all resource claim property types from RADB
            
            :returns claims
        """
        # This function selects resources for a task (i.e. obs or pipeline). Keep it side-effect free!
        # Criteria:
        # * It all fits within max fill ratio per resource group (or consider failed)
        # * Avoid resources marked as unavailable
        # * At most one claim per resource; i.e. merge where possible (DB friendly)
        # * Most pipelines reduce data size. Observation output is relatively large,
        #   so spread across as many storage areas as possible (if storage is needed).
        # * Only completely fill a (e.g. storage) resource when inevitable, as this also makes other
        #   resources (e.g. write bandwidth) unavailable for future tasks until clean up.
        # * All 4 data products of complex voltage obs data (XXYY) go to a single (storage) resource.
        #
        # Parts of these criteria may be solved by the caller, e.g. by passing filtered arguments.
        #
        # Note: certain (obs) settings are more flexible than ever used (e.g. sb list per SAP for CS/IS).
        # We must support the full gamut of settings, but for scenarios that are complex *and* constrained,
        # producing 'conflict' may be ok, if this saves implementation or excessive computational complexity.
        # Users will then have to free more resources than strictly necessary before retrying.

        logger.debug('getClaimsForTask: db_rgp2rgp: %s', db_rgp2rgp)              # big!
        logger.debug('getClaimsForTask: db_resource_list: %s', db_resource_list)  # big!
        logger.debug('getClaimsForTask: db_resource_claims_rcus: %s', db_resource_claims_rcus)  # big!
        logger.debug('getClaimsForTask: db_resource_types: %s', db_resource_types)
        logger.debug('getClaimsForTask: db_resource_prop_types: %s', db_resource_prop_types)

        dtypes = {'uv', 'cs', 'is', 'im', 'img', 'pulp'}
        summablePropTypeIds = {db_resource_prop_types['nr_of_' + dt + '_files'] for dt in dtypes}
        db_storage_type_id = db_resource_types['storage']  # for sorting and to attach properties
        db_rcu_type_id = db_resource_types['rcu'] # for attaching rcu bit field to the claim

        claims = []
        for needed_resources in needed_resources_list:
            # Replace resource names by type ids: easy matching w/ other data structs
            needed_resources_by_type_id = {db_resource_types[name]: needed_resources['resource_types'][name] \
                                           for name in needed_resources['resource_types']}  # e.g. {3: 16536, 5: 170016}
            logger.info('getClaimsForTask: needed_resources_by_type_id: %s', needed_resources_by_type_id)

            # Find group id ('gid') of needed_resources['root_resource_group'],
            # then get list of claimable resources at root_gid and its children
            root_gid = self.getResourceGroupIdByName(db_rgp2rgp, needed_resources['root_resource_group'])
            if root_gid is None:
                logger.error('getClaimsForTask: cannot find resources to claim: unknown root_resource_group \'%s\'',
                             needed_resources['root_resource_group'])
                return None
            claimable_resources_list = self.getSubtreeResourcesList(root_gid, needed_resources_by_type_id,
                                           db_resource_list, db_rgp2rgp)  # e.g. [{3: <resource_dict>, 5: <resource_dict>}, ...]
            logger.info('getClaimsForTask: considering %d claimable resource dict(s)', len(claimable_resources_list))
            logger.debug('getClaimsForTask: claimable_resources_list: %s', claimable_resources_list)

            input_files  = needed_resources.get('input_files')
            output_files = needed_resources.get('output_files')
            properties =      self.getProperties(db_resource_prop_types, input_files,  'input')
            properties.extend(self.getProperties(db_resource_prop_types, output_files, 'output'))

            # Collapse needed resources if only 1 claimable resource dict, e.g. global filesystem
            if len(claimable_resources_list) == 1:
                logger.info('getClaimsForTask: collapsing needed_resources')
                for type_id in needed_resources_by_type_id:
                    needed_resources_by_type_id[type_id] *= needed_resources['resource_count']
                for prop in properties:
                    if prop['type'] in summablePropTypeIds:
                        prop['value'] *= needed_resources['resource_count']
                needed_resources['resource_count'] = 1

            if db_storage_type_id in needed_resources_by_type_id:
                sort_res_type = db_storage_type_id
            else:
                sort_res_type = needed_resources_by_type_id.keys()[0]  # some other if not storage

            for _ in xrange(needed_resources['resource_count']):
                # Sorting on every change may be slow, but for 100s of DPs, insertion of merged claims is still 3-5x slower.
                # A heapq was not faster, yet solving the lack of total ordering more elaborate.
                # Of course, big-O complexity here is terrible, but we are nowhere near (too) big.
                claimable_resources_list.sort(key=lambda res: res[sort_res_type]['available_capacity'], reverse=True)

                # Almost always iterates once. Still needed to match >1 resource types.
                claim = None
                for claimable_resources_dict in claimable_resources_list:
                    # Ignore check on claimable capacity of RCUs
                    is_claimable = self.is_claimable_capacity_wise(needed_resources_by_type_id,
                                                                   claimable_resources_dict,
                                                                   ignore_type_ids=[db_rcu_type_id])

                    is_claimable &= self.is_claimable_rcu_wise(needed_resources_by_type_id,
                                                               db_resource_claims_rcus,
                                                               db_rcu_type_id)

                    if is_claimable:
                        claim = self.makeClaim(db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task,
                                               properties, needed_resources_by_type_id, claimable_resources_dict)
                        logger.debug('getClaimsForTask: created claim: %s', claim)
                        claims.extend(claim)
                        break

                if claim is None:
                    logger.warn('getClaimsForTask: Failed to find enough claimable resources for %s (by_type_id: %s)',
                                needed_resources, needed_resources_by_type_id)
                    return None

        self.mergeClaims(summablePropTypeIds, claims)

        return claims

    def getResourceGroupIdByName(self, db_rgp2rgp, name):
        """ Returns group id of resource group named name, or None if name was not found.
            The search happens breadth-first.
        """
        gids = [0]  # root group id 0

        i = 0
        while i < len(gids):  # careful iterating while modifying
            res_group = db_rgp2rgp[gids[i]]
            if res_group['resource_group_name'] == name:
                return gids[i]
            gids.extend(res_group['child_ids'])
            i += 1

        return None

    def getSubtreeResourcesList(self, root_gid, needed_resources_by_type_id, db_resource_list, db_rgp2rgp):
        """ Returns list of available resources of type id in needed_resources_by_type_id.keys()
            starting at group id root_gid in the format [{type_id: {<resource_dict>}, ...}, ...].
        """
        # Search breadth-first starting at root_gid.
        gids = [root_gid]
        resources_list = []

        i = 0
        while i < len(gids):  # careful iterating while modifying
            resources = {}
            type_ids_seen = set()

            res_group = db_rgp2rgp[gids[i]]
            for rid in res_group['resource_ids']:
                type_id = db_resource_list[rid]['type_id']
                if type_id in needed_resources_by_type_id and db_resource_list[rid]['active'] and \
                                                              db_resource_list[rid]['available_capacity'] > 0:
                    resources[type_id] = db_resource_list[rid]
                    type_ids_seen.add(type_id)

            # Only add resource IDs if all needed types are present in this resource group
            if type_ids_seen == set(needed_resources_by_type_id):
                resources_list.append(resources)

            gids.extend(res_group['child_ids'])
            i += 1

        return resources_list

    def is_claimable_capacity_wise(self, needed_resources, claimable_resources, ignore_type_ids=None):
        """ Returns whether all needed_resources can be claimed from claimable_resources.
                
            :param needed_resources:    {resource_type_id: size, ...}
            :param claimable_resources: {resource_type_id: {<resource_dict>}, ...}
            :param ignore_type_ids:     IDs of types that should not be considered
            
            
        """
        types_to_ignore = ignore_type_ids if ignore_type_ids is not None else []

        is_claimable = all(claim_size <= claimable_resources[res_type]['available_capacity']
                    for res_type, claim_size in needed_resources.items() if res_type not in types_to_ignore)

        return is_claimable

    def is_claimable_rcu_wise(self, needed_resources, resource_claims_rcus, rcu_resource_type_id):
        is_claimable = True

        # if resource_claims_rcus:
        #     for res_type, used_rcus in needed_resources.items():
        #         if res_type == rcu_resource_type_id:
        #             for existing_claim in resource_claims_rcus:
        #                 for existing_rcu_claim in existing_claim['used_rcus']:
        #                     length_difference = abs(len(existing_rcu_claim) - len(used_rcus))
        #                     if len(existing_rcu_claim) < len(used_rcus):
        #                         existing_rcu_claim.ljust(length_difference, '0')
        #                     elif len(existing_rcu_claim) > len(used_rcus):
        #                         used_rcus.ljust(length_difference, '0')
        #
        #                     is_claimable &= all(not (int(used_rcus[idx]) & int(character))
        #                                         for idx, character in enumerate(existing_rcu_claim))

        return is_claimable

    def makeClaim(self, db_resource_prop_types, db_storage_type_id, db_rcu_type_id, task, properties,
                   needed_resources, claimable_resources):
        """ Returns list of claims for a data product (one for each needed resource type).
            Note: this function also updates claimable_resources.
            Format needed_resources:    {resource_type_id: size, ...}
            Format claimable_resources: {resource_type_id: {<resource_dict>}, ...}
        """
        claims = []

        for res_type, claim_size in needed_resources.items():
            claim = {'starttime': task['starttime'], 'endtime': task['endtime'], 'status': 'claimed'}
            claim['resource_id'] = claimable_resources[res_type]['id']

            # RCU claim size as returned by the ResourceEstimator is actually a bit pattern (encoding which of a
            # station's RCUs are requested to take part in a measurement and which not). In order to have it countable
            # (as is expected of a claim size) it needs to be replaced with the number of RCUs requested. Subsequently,
            # the bit pattern information is stored with the claim separately
            if res_type == db_rcu_type_id:
                used_rcus = needed_resources[db_rcu_type_id]
                claim_size = used_rcus.count('1')
                claim['used_rcus'] = used_rcus

            claim['claim_size'] = claim_size
            claimable_resources[res_type]['available_capacity'] -= claim_size

            if res_type == db_storage_type_id:
                # FIXME: find proper way to extend storage time with a year
                # 2016-09-27 scisup would like to be involved in chosing these kind of defaults
                # and what to do after the claim expires
                # we now choose a default period of a year, and do nothing if the claim expires
                claim['endtime'] += timedelta(days=365)

                if properties:
                    claim['properties'] = properties

            claims.append(claim)

        return claims

    def getProperties(self, db_resource_prop_types, files_dict, io_type):
        """ Return list of properties in claim format converted from files_dict.
        """
        if files_dict is None:
            return []

        logger.info('getProperties: processing %s_files: %s', io_type, files_dict)
        properties = []

        for group_name, needed_prop_group in files_dict.items():  # e.g. 'cs', {'...': 123, ...} or 'saps', [{...}, {...}]
            if group_name == 'saps':
                for sap_dict in needed_prop_group:
                    props = self.makeProperties(db_resource_prop_types, sap_dict['properties'],
                                                sap_dict['sap_nr'], io_type)
                    properties.extend(props)
            else:
                props = self.makeProperties(db_resource_prop_types, needed_prop_group, None, io_type)
                properties.extend(props)

        return properties

    def makeProperties(self, db_resource_prop_types, properties_dict, sap_nr, io_type):
        """ helper for getProperties() """
        properties = []

        for prop_type_name, prop_value in properties_dict.items():
            rc_property_type_id = db_resource_prop_types.get(prop_type_name)
            if rc_property_type_id is None:
                logger.warn('makeProperties: unknown prop_type: %s', prop_type_name)
                continue
            prop = {'type': rc_property_type_id, 'value': prop_value, 'io_type': io_type}
            if sap_nr is not None:
                prop['sap_nr'] = sap_nr

            properties.append(prop)

        return properties

    # TODO: (Ruud B) check and decide if we need to be able to merge claims based on claim[used_rcus]
    def mergeClaims(self, summablePropTypeIds, claims):
        """ Merge claims allocated onto the same resources.
            To merge claim properties, summablePropTypeIds is used.
        """
        logger.info('mergeClaims: merging claims for the same resource across %d claims', len(claims))

        claims.sort( key=lambda claim: (claim['resource_id'], claim.get('properties')) )

        i = 1
        while i < len(claims):  # careful iterating while modifying
            if claims[i-1]['resource_id'] == claims[i]['resource_id']:
                # Merge claim_size and props; starttime and endtime are always equal for the same resource_id
                claims[i-1]['claim_size'] += claims[i]['claim_size']

                if 'properties' in claims[i] and len(claims[i]['properties']):
                    if 'properties' not in claims[i-1]:
                        claims[i-1]['properties'] = []
                    else:  # shallow copy to avoid aliasing; mergeResourceProperties() does the rest
                        claims[i-1]['properties'] = list(claims[i-1]['properties'])
                    self.mergeResourceProperties(summablePropTypeIds,
                                                 claims[i-1]['properties'], claims[i]['properties'])

                claims.pop(i)  # can be more efficient O()-wise
            else:
                i += 1

    def mergeResourceProperties(self, summablePropTypeIds, props0, props1):
        """ Ensure props0 contains all properties of props1.
            A property of type in summablePropTypeIds must have its value added.
            NOTE: caller has to ensure that 'not (props0 is props1)' holds.
        """
        # Better datastructs could easily avoid this O(N^2). len(props) is ~5
        props0len = len(props0)
        for p1 in props1:
            found = False
            i = 0
            while i < props0len:  # careful iterating while modifying
                p0 = props0[i]
                if p0['type']       == p1['type'] and \
                   p0.get('sap_nr') == p1.get('sap_nr') and \
                   p0['io_type']    == p1['io_type']:
                    if p0['type'] in summablePropTypeIds:  # same and need to sum values
                        props0[i] = dict(props0[i])  # copy to avoid changing p1 too if refers to same obj
                        props0[i]['value'] += p1['value']
                    elif p0['value'] != p1['value']:
                        logger.warn('mergeResourceProperties: unexpected same prop pair, but with different values: %s and %s', p0, p1)
                    found = True
                    break
                i += 1

            if not found:
                props0.append(p1)