Skip to content
Snippets Groups Projects
Select Git revision
  • b1746228eed489cb86ccb65d9d425d97ad199e50
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

__init__.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    observation.py 15.29 KiB
    # observation.py
    #
    # Copyright (C) 2016
    # ASTRON (Netherlands Institute for Radio Astronomy)
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This file is part of the LOFAR software suite.
    # The LOFAR software suite is free software: you can redistribute it
    # and/or modify it under the terms of the GNU General Public License as
    # published by the Free Software Foundation, either version 3 of the
    # License, or (at your option) any later version.
    #
    # The LOFAR software suite is distributed in the hope that it will be
    # useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License along
    # with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
    #
    # $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $
    
    import logging
    from math import ceil, floor
    from base_resource_estimator import BaseResourceEstimator
    
    logger = logging.getLogger(__name__)
    
    DATAPRODUCTS = "Observation.DataProducts."
    COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
    
    class ObservationResourceEstimator(BaseResourceEstimator):
        """ ResourceEstimator for LOFAR Observations
        """
        def __init__(self):
            logger.info("init ObservationResourceEstimator")
            super(ObservationResourceEstimator, self).__init__(name='observation')
            self.required_keys = ('Observation.sampleClock',
                                  'Observation.startTime',
                                  'Observation.stopTime',
                                  'Observation.antennaSet',
                                  COBALT + 'Correlator.nrChannelsPerSubband',
                                  COBALT + 'Correlator.integrationTime',
                                  COBALT + 'BeamFormer.flysEye',
                                  COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor',
                                  COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor',
                                  'Observation.VirtualInstrument.stationList',
                                  DATAPRODUCTS + 'Output_Correlated.enabled',
                                  DATAPRODUCTS + 'Output_Correlated.identifications',
                                  DATAPRODUCTS + 'Output_CoherentStokes.enabled',
                                  DATAPRODUCTS + 'Output_CoherentStokes.identifications',
                                  COBALT + 'BeamFormer.CoherentStokes.which',
                                  DATAPRODUCTS + 'Output_IncoherentStokes.enabled',
                                  DATAPRODUCTS + 'Output_IncoherentStokes.identifications',
                                  COBALT + 'BeamFormer.IncoherentStokes.which'
                                  )
    
        def _calculate(self, parset, input_files={}):
            """ Calculate the combined resources needed by the different observation types that
            can be in a single observation.
            reply is something along the lines of:
            {'bandwidth': {'total_size': 19021319494},
            'storage': {'total_size': 713299481024,
            'output_files': 
              {'uv': {'nr_of_uv_files': 481, 'uv_file_size': 1482951104},
              'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 319}},
                       {'sap_nr': 1, 'properties': {'nr_of_uv_files': 81}}, 
                       {'sap_nr': 2, 'properties': {'nr_of_uv_files': 81}}
            ]}}}
            The base_resource_estimator adds an {'observation': } around this.
            """
            logger.info("start estimate '{}'".format(self.name))
            logger.info('parset: %s ' % parset)
            duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
            
            result = {}
            output_files = {}
            correlated_size, correlated_bandwidth, output_files_uv, correlated_saps = self.correlated(parset, duration)
            coherentstokes_size, coherentstokes_bandwidth, output_files_cs, coherentstokes_saps = self.coherentstokes(parset, duration)
            incoherentstokes_size, incoherentstokes_bandwidth, output_files_is, incoherentstokes_saps = self.incoherentstokes(parset, duration)
            
            if output_files_uv:
                output_files['uv'] = output_files_uv
            if output_files_cs:
                output_files['cs'] = output_files_cs
            if output_files_is:
                output_files['is'] = output_files_is
    
            output_files['saps'] = []
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                sap = {'sap_nr': sap_nr, 'properties': {}}
                if sap_nr in correlated_saps:
                    sap['properties'].update(correlated_saps[sap_nr])
                if sap_nr in coherentstokes_saps:
                    sap['properties'].update(coherentstokes_saps[sap_nr])
                if sap_nr in incoherentstokes_saps:
                    sap['properties'].update(incoherentstokes_saps[sap_nr])
                output_files['saps'].append(sap)
    
            total_data_size = correlated_size + coherentstokes_size + incoherentstokes_size
            result['storage'] = {'total_size': total_data_size, 'output_files': output_files}
            result['bandwidth'] = {'total_size': correlated_bandwidth + coherentstokes_bandwidth + incoherentstokes_bandwidth}
            return result
    
        def correlated(self, parset, duration):
            """ Estimate number of files, file size and bandwidth needed for correlated data
            """
            logger.info("calculating correlated datasize")
            size_of_header   = 512 #TODO More magic numbers (probably from Alwin). ScS needs to check these. They look ok though.
            size_of_overhead = 600000
            size_of_short    = 2
            size_of_complex  = 8
            nr_polarizations = 2
            channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults?
            intergration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1)
            nr_virtual_stations = self._virtual_stations(parset)
    
            integrated_seconds = floor(duration / intergration_time)
            nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1.0) / 2.0 #TODO Why is this done in float?
            data_size = ceil((nr_baselines * channels_per_subband * nr_polarizations**2 * size_of_complex) / 512.0) * 512.0 #TODO What's the 512.0 magic numbers?
            n_sample_size = ceil((nr_baselines * channels_per_subband * size_of_short) / 512.0) * 512.0
    
            # sum of all subbands in all digital beams
            total_files = 0
            sap_files = {}
            
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
                nr_files = len(subbandList)
                sap_files[sap_nr] = {'nr_of_uv_files': nr_files, 'start_sb_nr': total_files}
                total_files += nr_files
    
            file_size = int((data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead)
            output_files = {'nr_of_uv_files': total_files, 'uv_file_size': file_size, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')}
            logger.info("correlated_uv: {} files {} bytes each".format(total_files, file_size))
    
            total_data_size = int(ceil(file_size * total_files))  # bytes
            total_bandwidth = int(ceil((total_data_size * 8) / duration))  # bits/second
            return (total_data_size, total_bandwidth, output_files, sap_files)
    
        def coherentstokes(self, parset, duration):
            """  Estimate number of files, file size and bandwidth needed for coherent stokes
            """
            if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'):
                return (0,0, {}, {})
                
            logger.info("calculate coherentstokes datasize")
            coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which')
            subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512)
            samples_per_second = self._samples_per_second(parset)
            integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor')
    
            nr_coherent = 4 if coherent_type in ('XXYY', 'IQUV') else 1
            if coherent_type in ('XXYY',):
                size_per_subband = samples_per_second * 4.0 * duration
            else:
                size_per_subband = (samples_per_second * 4.0 * duration) / integration_factor
    
            total_nr_stokes = 0
            total_files     = 0
            max_nr_subbands = 0
            sap_files       = {}
            
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                logger.info("checking SAP {}".format(sap_nr))
                subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
                nr_subbands = len(subbandList)
                max_nr_subbands = max(nr_subbands, max_nr_subbands)
                nr_files = 0
                total_nr_tabs = 0
                
                for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)):
                    logger.info("checking TAB {}".format(tab_nr))
                    if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)):
                        logger.info("adding coherentstokes size")
                        nr_stokes = nr_coherent #TODO what does min mean here?
                        total_nr_tabs += 1
                        total_nr_stokes += nr_stokes
                        nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
    
                nr_tab_rings = parset.getInt('Observation.Beam[%d].nrTabRings' % sap_nr)
                if nr_tab_rings > 0:
                    logger.info("adding size for {} tab_rings".format(nr_tab_rings))
                    nr_tabs = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1)
                    total_nr_tabs += nr_tabs
                    nr_stokes = nr_tabs * nr_coherent
                    total_nr_stokes += nr_stokes
                    nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
    
                if parset.getBool(COBALT + 'BeamFormer.flysEye'):
                    logger.info("adding flys eye data size")
                    nr_tabs = self._virtual_stations(parset)
                    total_nr_tabs += nr_tabs
                    nr_stokes = nr_tabs * nr_coherent
                    total_nr_stokes += nr_stokes
                    nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
    
                sap_files[sap_nr]= {'nr_of_cs_files': nr_files, 'nr_of_tabs': total_nr_tabs}
                total_files += nr_files
    
            nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
            size_per_file = int(nr_subbands_per_file * size_per_subband)
    
            output_files = {'nr_of_cs_files': total_files, 'nr_of_cs_stokes': nr_coherent, 'cs_file_size': size_per_file, 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')}
            logger.info("coherentstokes: {} files {} bytes each".format(total_files, size_per_file))
    
            total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband))
            total_bandwidth = int(ceil((total_data_size * 8) / duration)) # bits/second
            return (total_data_size, total_bandwidth, output_files, sap_files)
    
        def incoherentstokes(self, parset, duration):
            """  Estimate number of files, file size and bandwidth needed for incoherentstokes
            """
            if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'):
                return (0,0, {}, {})
                
            logger.info("calculate incoherentstokes data size")
            incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which')
            subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512)
            samples_per_second = self._samples_per_second(parset)
            time_integration_factor = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor')
            channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) #TODO should these have defaults?
            incoherent_channels_per_subband = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.nrChannelsPerSubband', 0)
    
            nr_incoherent = 4 if incoherent_type in ('IQUV',) else 1
    
            total_nr_stokes = 0
            total_files     = 0
            max_nr_subbands = 0
            sap_files       = {}
    
            for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
                logger.info("checking SAP {}".format(sap_nr))
                subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
                nr_subbands = len(subbandList)
                max_nr_subbands = max(nr_subbands, max_nr_subbands)
                nr_files = 0
                total_nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)
                for tab_nr in xrange(total_nr_tabs):
                    logger.info("checking TAB {}".format(tab_nr))
                    if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)):
                        logger.info("adding incoherentstokes size")
                        total_nr_stokes += nr_incoherent
                        nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file)))
    
                sap_files[sap_nr]= {'nr_of_is_files': nr_files, 'nr_of_tabs': total_nr_tabs}
                total_files += nr_files
    
            if incoherent_channels_per_subband > 0:
                channel_integration_factor = channels_per_subband / incoherent_channels_per_subband
            else:
                channel_integration_factor = 1
    
            if total_files > 0:
                nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
                size_per_subband = int((samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration)
                size_per_file = nr_subbands_per_file * size_per_subband
    
            output_files = {'nr_of_is_files': total_files, 'nr_of_is_stokes': nr_incoherent, 'is_file_size': int(size_per_file), 'identifications': parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')}
            logger.info("incoherentstokes: {} files {} bytes each".format(total_files, size_per_file))
    
            total_data_size = int(ceil(total_nr_stokes * max_nr_subbands * size_per_subband))  # bytes
            total_bandwidth = int(ceil((total_data_size * 8) / duration))  # bits/sec
            return (total_data_size, total_bandwidth, output_files, sap_files)
    
        def _samples_per_second(self, parset):
            """ set samples per second
            """
            samples_160mhz = 155648
            samples_200mhz = 196608
            sample_clock = parset.getInt('Observation.sampleClock')
            samples = samples_160mhz if 160 == sample_clock else samples_200mhz
            logger.info("samples per second for {} MHz clock = {}".format(sample_clock, samples))
            return samples
    
        def _virtual_stations(self, parset):
            """ calculate virtualnumber of stations
            """
            stationList = parset.getStringVector('Observation.VirtualInstrument.stationList')
            nr_virtual_stations = 0
            if parset.getString('Observation.antennaSet') in ('HBA_DUAL', 'HBA_DUAL_INNER'):
                for station in stationList:
                    if 'CS' in station:
                        nr_virtual_stations += 2
                    else:
                        nr_virtual_stations += 1
            else:
                nr_virtual_stations = len(stationList)
            logger.info("number of virtual stations = {}".format(nr_virtual_stations))
            return nr_virtual_stations