Skip to content
Snippets Groups Projects
Select Git revision
  • 2e8b733bde2e7cc8b8c48ebbaf7554a74c97505b
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

observation.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    observation.py 25.86 KiB
    # observation.py
    #
    # Copyright (C) 2016-2017
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $
    
    import logging
    from math import ceil
    from base_resource_estimator import BaseResourceEstimator
    
    logger = logging.getLogger(__name__)
    
    DATAPRODUCTS = "Observation.DataProducts."
    COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
    
    class ObservationResourceEstimator(BaseResourceEstimator):
        """ ResourceEstimator for LOFAR Observations
        """
        def __init__(self):
            logger.info("init ObservationResourceEstimator")
            super(ObservationResourceEstimator, self).__init__(name='observation')
            self.required_keys = ('Observation.sampleClock',
                                  'Observation.startTime',
                                  'Observation.stopTime',
                                  'Observation.antennaSet',
                                  'Observation.nrBeams',
                                  COBALT + 'Correlator.nrChannelsPerSubband',
                                  COBALT + 'Correlator.integrationTime',
                                  COBALT + 'BeamFormer.flysEye',
                                  COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor',
                                  COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor',
                                  'Observation.VirtualInstrument.stationList',
                                  DATAPRODUCTS + 'Output_Correlated.enabled',
                                  DATAPRODUCTS + 'Output_Correlated.identifications',
                                  DATAPRODUCTS + 'Output_Correlated.storageClusterName',
                                  DATAPRODUCTS + 'Output_CoherentStokes.enabled',
                                  DATAPRODUCTS + 'Output_CoherentStokes.identifications',
                                  DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName',
                                  COBALT + 'BeamFormer.CoherentStokes.which',
                                  DATAPRODUCTS + 'Output_IncoherentStokes.enabled',
                                  DATAPRODUCTS + 'Output_IncoherentStokes.identifications',
                                  DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName',
                                  COBALT + 'BeamFormer.IncoherentStokes.which'
                                  )
    
        def _calculate(self, parset, input_files={}):
            """ Calculate the resources needed by the different data product types that can be in a single observation.
    
                The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters.
                Here we have UV data for CEP4 (100 files), and we have for DRAGNET 2 IS (IQUV) and 7 CS TABs (also IQUV),
                each split across 5 parts (i.e. 7*4*5 + 2*4*5 = 180 files).
                If estimate sizes and root_resource_groups are the same, estimates can be combined into 1 estimate (partly done here).
                (Not shown here, but note that for CS complex voltage data (XXYY), we must produce 1 estimate per 4 files (XXYY),
                 such that the XXYY files for a part end up on the same storage. Such a beam can still be split across >1 parts.)
    
                For a global filesystem and enough network bandwidth (e.g. CEP4), we produce 1 big estimate having count 1.
                For other clusters (e.g. DRAGNET) or multi cluster observations, we produce multiple estimates, as also shown.
    
                More examples at scu001:/opt/lofar/var/log/resourceassigner.log
            {
                'errors': [],
                'estimates': [{
                    'resource_types': {'bandwidth': 536870912, 'storage': 128849018880},
                    'count': 1, 'root_resource_groups': ['CEP4'],  # count is per root resource group!
                    'output_files': {
                        'uv': {'nr_of_uv_files': 100, 'uv_file_size': 1073741824, 'identifications': [...]},
                        'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 80, 'start_sb_nr': 0}},
                                 {'sap_nr': 1, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 80}},
                                 {'sap_nr': 2, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 100}}
                        ]
                    }
                }, {'resources': {'bandwidth': 8947849, 'storage': 2147483648},
                    'count': 180, 'root_resource_groups': ['DRAGNET'],  # count is per root resource group!
                    'output_files': {
                        'cs': {'nr_of_cs_files': 140, 'cs_file_size': 2147483648, 'nr_of_cs_stokes': 4, 'identifications': [...]},
                        'is': {'nr_of_is_files': 40,  'is_file_size': 2147483648, 'nr_of_is_stokes': 4, 'identifications': [...]},
                        'saps': [{'sap_nr': 0, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0,      # 1 IS: IQUV, 5 parts,
                                                              'nr_of_cs_files': 140, 'nr_of_tabs': 8}},  # + 7 CS: IQUV, 5 parts
                                 {'sap_nr': 2, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0}}     # 1 IS: IQUV, 5 parts
                        ]
                    }
                }]
            }
    
            The base_resource_estimator adds an {'observation': } around this.
            """
            logger.info("start estimate '{}'".format(self.name))
            logger.info('parset: %s ' % parset)
            # NOTE: obs estimates appear quite accurate. Diffs come mostly from Observation.stopTime
            # being planned instead of real stop time, because of Cobalt block size not being exactly 1.0 s.
            duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
    
            errors = []
            estimates = []
            uv_estimate = self.correlated(parset, duration)
            if uv_estimate is not None:
                if uv_estimate['count'] == 0 or uv_estimate['resource_types']['bandwidth'] == 0 or \
                                                uv_estimate['resource_types']['storage'] == 0:
                    errors.append('empty UV resource estimate!')
                    logger.error('resource estimate for UV data is empty!')
                else:
                    estimates.append(uv_estimate)
            cs_estimate = self.coherentstokes(parset, duration)
            if cs_estimate is not None:
                if cs_estimate['count'] == 0 or cs_estimate['resource_types']['bandwidth'] == 0 or \
                                                cs_estimate['resource_types']['storage'] == 0:
                    errors.append('empty CS resource estimate!')
                    logger.error('resource estimate for CS data is empty!')
                else:
                    estimates.append(cs_estimate)
            is_estimate = self.incoherentstokes(parset, duration)
            if is_estimate is not None:
                if is_estimate['count'] == 0 or is_estimate['resource_types']['bandwidth'] == 0 or \
                                                is_estimate['resource_types']['storage'] == 0:
                    errors.append('empty IS resource estimate!')
                    logger.error('resource estimate for IS data is empty!')
                else:
                    estimates.append(is_estimate)
            if not estimates:
                errors.append('Produced observation resource estimate list is empty!')
                logger.error('empty observation resource estimate list!')
    
            logger.info('Observation resource estimate(s) before merge: {}'.format(estimates))
    
            # Merge estimates if possible to alleviate the Resource Assigner and RADB.
            self._merge_estimates(estimates)
    
            result = {'errors': errors, 'estimates': estimates}
            return result
    
        def correlated(self, parset, duration):
            """ Estimate storage size and bandwidth needed for correlated (i.e. uv)
                data products. Also add SAP properties needed by the propagator.
            """
            if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'):
                logger.info("correlated data output not enabled")
                return None
    
            logger.info("calculating correlated datasize")
            storage_unit     = 512  # all sizes in bytes
            size_of_header   = 512
            size_of_overhead = 600000  # COBALT parset in MS HISTORY subtable + misc
            size_of_short    = 2
            size_of_visib    = 8  # a visibility is stored as a complex FP32
            nr_polarizations = 2
            channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64)  # defaults as in COBALT
            integration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1)
            nr_virtual_stations = self._virtual_stations(parset)
    
            # Reflects MeasurementSets produced by the casacore LOFAR storage manager (LofarStMan)
            integrated_seconds = int(duration / integration_time)
            nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1) / 2
            data_size = (nr_baselines * channels_per_subband * nr_polarizations * nr_polarizations * \
                         size_of_visib + storage_unit-1) / storage_unit * storage_unit 
            n_sample_size = (nr_baselines * channels_per_subband * size_of_short + storage_unit-1) / \
                            storage_unit * storage_unit
            file_size = (data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead  # bytes
    
            # sum of all subbands in all digital beams
            total_files = 0
            sap_files = []
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
                nr_files = len(subbandList)
                sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_uv_files': nr_files,
                                                                   'start_sb_nr': total_files}})
                total_files += nr_files
            if total_files == 0:
                logger.warn("Correlated data output enabled, but no UV files across all SAPs")
                return None
            bandwidth = int(ceil(8 * file_size / duration))  # bits/second
    
            idents = parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')
            output_files = {'uv': {'nr_of_uv_files': total_files, 'uv_file_size': file_size,
                                   'identifications': idents},
                            'saps': sap_files}
    
            root_resource_group = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName')
            if self._hasGlobalStorage(root_resource_group):
                # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
                file_size *= total_files
                bandwidth *= total_files
                total_files = 1
    
            estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
                        'count': total_files, 'root_resource_groups': [root_resource_group],
                        'output_files': output_files}
            logger.debug("Correlated data estimate: {}".format(estimate))
            return estimate
    
        def coherentstokes(self, parset, duration):
            """ Estimate storage size and bandwidth needed for Coherent Stokes (i.e. cs)
                data products. Also add SAP properties needed by the propagator.
            """
            if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'):
                logger.info("coherent stokes data output not enabled")
                return None
    
            logger.info("calculate coherentstokes datasize")
            size_of_sample = 4  # FP32
            coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which')
            subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512)
            samples_per_second = self._samples_per_second(parset)
            time_integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor')
            # Note that complex voltages (XXYY) cannot be meaningfully integrated (i.e. time_integration_factor 1)
            size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
            nr_coherent = len(coherent_type)  # 'I' or 'IQUV' or 'XXYY'
    
            total_nr_stokes = 0
            total_files     = 0
            max_nr_subbands = 0
            sap_files       = []
    
            doFlysEye = parset.getBool(COBALT + 'BeamFormer.flysEye')
    
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                logger.info("checking SAP {}".format(sap_nr))
                subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
                nr_subbands = len(subbandList)
                max_nr_subbands = max(nr_subbands, max_nr_subbands)
                nr_files = 0
                total_nr_tabs = 0
    
                for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)):
                    logger.info("checking TAB {}".format(tab_nr))
                    if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)):
                        logger.info("adding coherentstokes size")
                        nr_stokes = nr_coherent # TODO, there used to be a function with min() here?
                        total_nr_tabs += 1
                        total_nr_stokes += nr_stokes
                        nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
    
                nr_tab_rings = parset.getInt('Observation.Beam[%d].nrTabRings' % sap_nr)
                if nr_tab_rings > 0:
                    logger.info("adding size for {} tab_rings".format(nr_tab_rings))
                    nr_tabs = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1)
                    total_nr_tabs += nr_tabs
                    nr_stokes = nr_tabs * nr_coherent
                    total_nr_stokes += nr_stokes
                    nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
    
                if doFlysEye:
                    logger.info("adding flys eye data size")
                    nr_tabs = self._virtual_stations(parset)
                    total_nr_tabs += nr_tabs
                    nr_stokes = nr_tabs * nr_coherent
                    total_nr_stokes += nr_stokes
                    nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
    
                if nr_files:
                    sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_cs_files': nr_files,
                                                                       'nr_of_tabs': total_nr_tabs}})
                    total_files += nr_files
            if total_files == 0:
                logger.warn("Coherent Stokes data output enabled, but no CS files across all SAPs")
                return None
            # NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files),
            # the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
            nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
            file_size = int(nr_subbands_per_file * size_per_subband)  # bytes
            bandwidth = int(ceil(8 * file_size / duration))  # bits/second
    
            idents = parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')
            output_files = {'cs': {'nr_of_cs_files': total_files, 'cs_file_size': file_size,
                                   'identifications': idents},
                            'saps': sap_files}
    
            root_resource_group = parset.getString(DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName')
            if self._hasGlobalStorage(root_resource_group):
                # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
                file_size *= total_files
                bandwidth *= total_files
                total_files = 1
    
            estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
                        'count': total_files, 'root_resource_groups': [root_resource_group],
                        'output_files': output_files}
            estimate['output_files']['cs']['nr_of_cs_stokes'] = nr_coherent
    
            logger.debug("Coherent Stokes data estimate: {}".format(estimate))
            return estimate
    
        def incoherentstokes(self, parset, duration):
            """ Estimate storage size and bandwidth needed for Incoherent Stokes (i.e. is)
                data products. Also add SAP properties needed by the propagator.
            """
            if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'):
                logger.info("incoherent stokes data output not enabled")
                return None
    
            logger.info("calculate incoherentstokes data size")
            size_of_sample = 4  # FP32
            incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which')
            subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512)
            samples_per_second = self._samples_per_second(parset)
            time_integration_factor = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor')
            size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
            nr_incoherent = len(incoherent_type)  # 'I' or 'IQUV' ('XXYY' only possible for coherent stokes)
    
            total_nr_stokes = 0
            total_files     = 0
            max_nr_subbands = 0
            sap_files       = []
    
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                logger.info("checking SAP {}".format(sap_nr))
                subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
                nr_subbands = len(subbandList)
                max_nr_subbands = max(nr_subbands, max_nr_subbands)
                nr_files = 0
    
                # Atm can have 1 IS TAB per SAP, because its pointing is equal to the SAP pointing.
                # (When we support online coh dedisp and on multiple DMs, we can have >1 IS per SAP.)
                is_tab_nr = -1
                for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)):
                    logger.info("checking TAB {}".format(tab_nr))
                    if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): #not coherent is incoherent
                        logger.info("Found incoherent stokes TAB: %i" % tab_nr)
                        if is_tab_nr >= 0:
                            # Could get here to produce >1 IS TAB copies, maybe for some software test
                            logger.error("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr))
                            return None
                        is_tab_nr = tab_nr
                        total_nr_stokes += nr_incoherent
                        nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file)))
    
                if nr_files:
                    sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_is_files': nr_files,
                                                                       'nr_of_tabs': 1,
                                                                       'is_tab_nr': is_tab_nr}})
                    total_files += nr_files
            if total_files == 0:
                logger.warn("Incoherent Stokes data output enabled, but no IS files across all SAPs")
                return None
            # NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files),
            # the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
            nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
            file_size = int(nr_subbands_per_file * size_per_subband)  # bytes
            bandwidth = int(ceil(8 * file_size / duration))  # bits/second
    
            idents = parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')
            output_files = {'is': {'nr_of_is_files': total_files, 'is_file_size': file_size,
                                   'identifications': idents},
                            'saps': sap_files}
    
            root_resource_group = parset.getString(DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName')
            if self._hasGlobalStorage(root_resource_group):
                # global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
                file_size *= total_files
                bandwidth *= total_files
                total_files = 1
    
            estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
                        'count': total_files, 'root_resource_groups': [root_resource_group],
                        'output_files': output_files}
            estimate['output_files']['is']['nr_of_is_stokes'] = nr_incoherent
    
            logger.debug("Incoherent Stokes data estimate: {}".format(estimate))
            return estimate
    
        def _merge_estimates(self, estimates):
            """ Estimates can only be merged if same root_resource_groups and bandwidth and storage,
                or if the root_resource_groups have a (single) global filesystem.
    
                NOTE: assumed good enough to only merge conseq pairs, not all pairs.
            """
            i = 1
            while i < len(estimates):  # careful iterating while modifying
                if estimates[i-1]['root_resource_groups'] == estimates[i]['root_resource_groups'] and \
                   ((estimates[i-1]['resource_types']['bandwidth'] == estimates[i]['resource_types']['bandwidth'] and \
                     estimates[i-1]['resource_types']['storage']   == estimates[i]['resource_types']['storage']) or \
                    all(self._hasGlobalStorage(rg) for rg in root_resource_groups)
                   ):
                    # Mergeable. Add uv, cs, is from estimates[i] into estimates[i-1]
                    if 'uv' in estimates[i]['output_files']:
                        estimates[i-1]['output_files']['uv'] = estimates[i]['output_files']['uv']
                    if 'cs' in estimates[i]['output_files']:
                        estimates[i-1]['output_files']['cs'] = estimates[i]['output_files']['cs']
                    if 'is' in estimates[i]['output_files']:
                        estimates[i-1]['output_files']['is'] = estimates[i]['output_files']['is']
    
                    # Merge saps. Saps are listed in order, but a sap_nr may be missing.
                    # Extend one list by the other, sort, then merge any conseq duplicates.
                    estimates[i-1]['output_files']['saps'].extend(estimates[i]['output_files']['saps'])
                    estimates[i-1]['output_files']['saps'].sort(key=lambda sap: sap['sap_nr'])
                    j = 1
                    while j < len(estimates[i-1]['output_files']['saps']):
                        sap0 = estimates[i-1]['output_files']['saps'][j-1]
                        sap1 = estimates[i-1]['output_files']['saps'][j  ]
                        if sap0['sap_nr'] == sap1['sap_nr']:
                            # Sum nr_of_tabs, other keys must be from another data product type
                            if 'nr_of_tabs' in sap0['properties'] and \
                               'nr_of_tabs' in sap1['properties']:
                                sap1['properties']['nr_of_tabs'] += sap0['properties']['nr_of_tabs']
                            sap0['properties'].update(sap1['properties'])  # includes nr_of_tabs
    
                            estimates[i-1]['output_files']['saps'].pop(j)
                        else:
                            j += 1
    
                    if all(self._hasGlobalStorage(rg) for rg in estimates[i]['root_resource_groups']):
                        # for global fs, collapse regardless
                        estimates[i-1]['resource_types']['bandwidth'] *= estimates[i-1]['count']  # *= 1, but to be robust and clear
                        estimates[i-1]['resource_types']['bandwidth'] += estimates[i]['resource_types']['bandwidth'] * estimates[i]['count']
                        estimates[i-1]['resource_types']['storage'] *= estimates[i-1]['count']  # *= 1, but to be robust and clear
                        estimates[i-1]['resource_types']['storage'] += estimates[i]['resource_types']['storage'] * estimates[i]['count']
                        estimates[i-1]['count'] = 1  # already 1, but to be robust and clear
                    else:
                        # root_resource_groups and values of bandwidth and storage are equal for both estimates
                        estimates[i-1]['count'] += estimates[i]['count']
     
                    logger.info('Merged observation resource estimate {} into {}'.format(i, i-1))
                    estimates.pop(i)
                else:
                    i += 1
    
            return estimates
    
        def _samples_per_second(self, parset):
            """ set samples per second
            """
            samples_160mhz = 155648
            samples_200mhz = 196608
            sample_clock = parset.getInt('Observation.sampleClock')
            samples = samples_160mhz if 160 == sample_clock else samples_200mhz
            logger.info("samples per second for {} MHz clock = {}".format(sample_clock, samples))
            return samples
    
        def _virtual_stations(self, parset):
            """ calculate virtualnumber of stations
            """
            stationList = parset.getStringVector('Observation.VirtualInstrument.stationList')
            nr_virtual_stations = 0
            if parset.getString('Observation.antennaSet') in ('HBA_DUAL', 'HBA_DUAL_INNER'):
                for station in stationList:
                    if 'CS' in station:
                        nr_virtual_stations += 2
                    else:
                        nr_virtual_stations += 1
            else:
                nr_virtual_stations = len(stationList)
            logger.info("number of virtual stations = {}".format(nr_virtual_stations))
            return nr_virtual_stations
    
        def _hasGlobalStorage(self, resource_group_name):
            return resource_group_name == 'CEP4'