Select Git revision
observation.py
-
Alexander van Amesfoort authored
Task #9939: rework observation estimate format. Other estimators and tests will be adjusted later when it turns out this is fine.
Alexander van Amesfoort authoredTask #9939: rework observation estimate format. Other estimators and tests will be adjusted later when it turns out this is fine.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
observation.py 25.86 KiB
# observation.py
#
# Copyright (C) 2016-2017
# ASTRON (Netherlands Institute for Radio Astronomy)
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This file is part of the LOFAR software suite.
# The LOFAR software suite is free software: you can redistribute it
# and/or modify it under the terms of the GNU General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# The LOFAR software suite is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with the LOFAR software suite. If not, see <http://www.gnu.org/licenses/>.
#
# $Id: observation.py 33534 2016-02-08 14:28:26Z schaap $
import logging
from math import ceil
from base_resource_estimator import BaseResourceEstimator
logger = logging.getLogger(__name__)
DATAPRODUCTS = "Observation.DataProducts."
COBALT = "Observation.ObservationControl.OnlineControl.Cobalt."
class ObservationResourceEstimator(BaseResourceEstimator):
""" ResourceEstimator for LOFAR Observations
"""
def __init__(self):
logger.info("init ObservationResourceEstimator")
super(ObservationResourceEstimator, self).__init__(name='observation')
self.required_keys = ('Observation.sampleClock',
'Observation.startTime',
'Observation.stopTime',
'Observation.antennaSet',
'Observation.nrBeams',
COBALT + 'Correlator.nrChannelsPerSubband',
COBALT + 'Correlator.integrationTime',
COBALT + 'BeamFormer.flysEye',
COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor',
COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor',
'Observation.VirtualInstrument.stationList',
DATAPRODUCTS + 'Output_Correlated.enabled',
DATAPRODUCTS + 'Output_Correlated.identifications',
DATAPRODUCTS + 'Output_Correlated.storageClusterName',
DATAPRODUCTS + 'Output_CoherentStokes.enabled',
DATAPRODUCTS + 'Output_CoherentStokes.identifications',
DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName',
COBALT + 'BeamFormer.CoherentStokes.which',
DATAPRODUCTS + 'Output_IncoherentStokes.enabled',
DATAPRODUCTS + 'Output_IncoherentStokes.identifications',
DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName',
COBALT + 'BeamFormer.IncoherentStokes.which'
)
def _calculate(self, parset, input_files={}):
""" Calculate the resources needed by the different data product types that can be in a single observation.
The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters.
Here we have UV data for CEP4 (100 files), and we have for DRAGNET 2 IS (IQUV) and 7 CS TABs (also IQUV),
each split across 5 parts (i.e. 7*4*5 + 2*4*5 = 180 files).
If estimate sizes and root_resource_groups are the same, estimates can be combined into 1 estimate (partly done here).
(Not shown here, but note that for CS complex voltage data (XXYY), we must produce 1 estimate per 4 files (XXYY),
such that the XXYY files for a part end up on the same storage. Such a beam can still be split across >1 parts.)
For a global filesystem and enough network bandwidth (e.g. CEP4), we produce 1 big estimate having count 1.
For other clusters (e.g. DRAGNET) or multi cluster observations, we produce multiple estimates, as also shown.
More examples at scu001:/opt/lofar/var/log/resourceassigner.log
{
'errors': [],
'estimates': [{
'resource_types': {'bandwidth': 536870912, 'storage': 128849018880},
'count': 1, 'root_resource_groups': ['CEP4'], # count is per root resource group!
'output_files': {
'uv': {'nr_of_uv_files': 100, 'uv_file_size': 1073741824, 'identifications': [...]},
'saps': [{'sap_nr': 0, 'properties': {'nr_of_uv_files': 80, 'start_sb_nr': 0}},
{'sap_nr': 1, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 80}},
{'sap_nr': 2, 'properties': {'nr_of_uv_files': 20, 'start_sb_nr': 100}}
]
}
}, {'resources': {'bandwidth': 8947849, 'storage': 2147483648},
'count': 180, 'root_resource_groups': ['DRAGNET'], # count is per root resource group!
'output_files': {
'cs': {'nr_of_cs_files': 140, 'cs_file_size': 2147483648, 'nr_of_cs_stokes': 4, 'identifications': [...]},
'is': {'nr_of_is_files': 40, 'is_file_size': 2147483648, 'nr_of_is_stokes': 4, 'identifications': [...]},
'saps': [{'sap_nr': 0, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0, # 1 IS: IQUV, 5 parts,
'nr_of_cs_files': 140, 'nr_of_tabs': 8}}, # + 7 CS: IQUV, 5 parts
{'sap_nr': 2, 'properties': {'nr_of_is_files': 20, 'is_tab_nr': 0}} # 1 IS: IQUV, 5 parts
]
}
}]
}
The base_resource_estimator adds an {'observation': } around this.
"""
logger.info("start estimate '{}'".format(self.name))
logger.info('parset: %s ' % parset)
# NOTE: obs estimates appear quite accurate. Diffs come mostly from Observation.stopTime
# being planned instead of real stop time, because of Cobalt block size not being exactly 1.0 s.
duration = self._getDuration(parset.getString('Observation.startTime'), parset.getString('Observation.stopTime'))
errors = []
estimates = []
uv_estimate = self.correlated(parset, duration)
if uv_estimate is not None:
if uv_estimate['count'] == 0 or uv_estimate['resource_types']['bandwidth'] == 0 or \
uv_estimate['resource_types']['storage'] == 0:
errors.append('empty UV resource estimate!')
logger.error('resource estimate for UV data is empty!')
else:
estimates.append(uv_estimate)
cs_estimate = self.coherentstokes(parset, duration)
if cs_estimate is not None:
if cs_estimate['count'] == 0 or cs_estimate['resource_types']['bandwidth'] == 0 or \
cs_estimate['resource_types']['storage'] == 0:
errors.append('empty CS resource estimate!')
logger.error('resource estimate for CS data is empty!')
else:
estimates.append(cs_estimate)
is_estimate = self.incoherentstokes(parset, duration)
if is_estimate is not None:
if is_estimate['count'] == 0 or is_estimate['resource_types']['bandwidth'] == 0 or \
is_estimate['resource_types']['storage'] == 0:
errors.append('empty IS resource estimate!')
logger.error('resource estimate for IS data is empty!')
else:
estimates.append(is_estimate)
if not estimates:
errors.append('Produced observation resource estimate list is empty!')
logger.error('empty observation resource estimate list!')
logger.info('Observation resource estimate(s) before merge: {}'.format(estimates))
# Merge estimates if possible to alleviate the Resource Assigner and RADB.
self._merge_estimates(estimates)
result = {'errors': errors, 'estimates': estimates}
return result
def correlated(self, parset, duration):
""" Estimate storage size and bandwidth needed for correlated (i.e. uv)
data products. Also add SAP properties needed by the propagator.
"""
if not parset.getBool('Observation.DataProducts.Output_Correlated.enabled'):
logger.info("correlated data output not enabled")
return None
logger.info("calculating correlated datasize")
storage_unit = 512 # all sizes in bytes
size_of_header = 512
size_of_overhead = 600000 # COBALT parset in MS HISTORY subtable + misc
size_of_short = 2
size_of_visib = 8 # a visibility is stored as a complex FP32
nr_polarizations = 2
channels_per_subband = parset.getInt(COBALT + 'Correlator.nrChannelsPerSubband', 64) # defaults as in COBALT
integration_time = parset.getFloat(COBALT + 'Correlator.integrationTime', 1)
nr_virtual_stations = self._virtual_stations(parset)
# Reflects MeasurementSets produced by the casacore LOFAR storage manager (LofarStMan)
integrated_seconds = int(duration / integration_time)
nr_baselines = nr_virtual_stations * (nr_virtual_stations + 1) / 2
data_size = (nr_baselines * channels_per_subband * nr_polarizations * nr_polarizations * \
size_of_visib + storage_unit-1) / storage_unit * storage_unit
n_sample_size = (nr_baselines * channels_per_subband * size_of_short + storage_unit-1) / \
storage_unit * storage_unit
file_size = (data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead # bytes
# sum of all subbands in all digital beams
total_files = 0
sap_files = []
for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_files = len(subbandList)
sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_uv_files': nr_files,
'start_sb_nr': total_files}})
total_files += nr_files
if total_files == 0:
logger.warn("Correlated data output enabled, but no UV files across all SAPs")
return None
bandwidth = int(ceil(8 * file_size / duration)) # bits/second
idents = parset.getStringVector(DATAPRODUCTS + 'Output_Correlated.identifications')
output_files = {'uv': {'nr_of_uv_files': total_files, 'uv_file_size': file_size,
'identifications': idents},
'saps': sap_files}
root_resource_group = parset.getString(DATAPRODUCTS + 'Output_Correlated.storageClusterName')
if self._hasGlobalStorage(root_resource_group):
# global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
file_size *= total_files
bandwidth *= total_files
total_files = 1
estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'count': total_files, 'root_resource_groups': [root_resource_group],
'output_files': output_files}
logger.debug("Correlated data estimate: {}".format(estimate))
return estimate
def coherentstokes(self, parset, duration):
""" Estimate storage size and bandwidth needed for Coherent Stokes (i.e. cs)
data products. Also add SAP properties needed by the propagator.
"""
if not parset.getBool('Observation.DataProducts.Output_CoherentStokes.enabled'):
logger.info("coherent stokes data output not enabled")
return None
logger.info("calculate coherentstokes datasize")
size_of_sample = 4 # FP32
coherent_type = parset.getString(COBALT + 'BeamFormer.CoherentStokes.which')
subbands_per_file = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.subbandsPerFile', 512)
samples_per_second = self._samples_per_second(parset)
time_integration_factor = parset.getInt(COBALT + 'BeamFormer.CoherentStokes.timeIntegrationFactor')
# Note that complex voltages (XXYY) cannot be meaningfully integrated (i.e. time_integration_factor 1)
size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
nr_coherent = len(coherent_type) # 'I' or 'IQUV' or 'XXYY'
total_nr_stokes = 0
total_files = 0
max_nr_subbands = 0
sap_files = []
doFlysEye = parset.getBool(COBALT + 'BeamFormer.flysEye')
for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
logger.info("checking SAP {}".format(sap_nr))
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_subbands = len(subbandList)
max_nr_subbands = max(nr_subbands, max_nr_subbands)
nr_files = 0
total_nr_tabs = 0
for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)):
logger.info("checking TAB {}".format(tab_nr))
if parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)):
logger.info("adding coherentstokes size")
nr_stokes = nr_coherent # TODO, there used to be a function with min() here?
total_nr_tabs += 1
total_nr_stokes += nr_stokes
nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
nr_tab_rings = parset.getInt('Observation.Beam[%d].nrTabRings' % sap_nr)
if nr_tab_rings > 0:
logger.info("adding size for {} tab_rings".format(nr_tab_rings))
nr_tabs = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1)
total_nr_tabs += nr_tabs
nr_stokes = nr_tabs * nr_coherent
total_nr_stokes += nr_stokes
nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
if doFlysEye:
logger.info("adding flys eye data size")
nr_tabs = self._virtual_stations(parset)
total_nr_tabs += nr_tabs
nr_stokes = nr_tabs * nr_coherent
total_nr_stokes += nr_stokes
nr_files += int(nr_stokes * ceil(nr_subbands / float(subbands_per_file)))
if nr_files:
sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_cs_files': nr_files,
'nr_of_tabs': total_nr_tabs}})
total_files += nr_files
if total_files == 0:
logger.warn("Coherent Stokes data output enabled, but no CS files across all SAPs")
return None
# NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files),
# the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
file_size = int(nr_subbands_per_file * size_per_subband) # bytes
bandwidth = int(ceil(8 * file_size / duration)) # bits/second
idents = parset.getStringVector(DATAPRODUCTS + 'Output_CoherentStokes.identifications')
output_files = {'cs': {'nr_of_cs_files': total_files, 'cs_file_size': file_size,
'identifications': idents},
'saps': sap_files}
root_resource_group = parset.getString(DATAPRODUCTS + 'Output_CoherentStokes.storageClusterName')
if self._hasGlobalStorage(root_resource_group):
# global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
file_size *= total_files
bandwidth *= total_files
total_files = 1
estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'count': total_files, 'root_resource_groups': [root_resource_group],
'output_files': output_files}
estimate['output_files']['cs']['nr_of_cs_stokes'] = nr_coherent
logger.debug("Coherent Stokes data estimate: {}".format(estimate))
return estimate
def incoherentstokes(self, parset, duration):
""" Estimate storage size and bandwidth needed for Incoherent Stokes (i.e. is)
data products. Also add SAP properties needed by the propagator.
"""
if not parset.getBool('Observation.DataProducts.Output_IncoherentStokes.enabled'):
logger.info("incoherent stokes data output not enabled")
return None
logger.info("calculate incoherentstokes data size")
size_of_sample = 4 # FP32
incoherent_type = parset.getString(COBALT + 'BeamFormer.IncoherentStokes.which')
subbands_per_file = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.subbandsPerFile', 512)
samples_per_second = self._samples_per_second(parset)
time_integration_factor = parset.getInt(COBALT + 'BeamFormer.IncoherentStokes.timeIntegrationFactor')
size_per_subband = (samples_per_second * size_of_sample * duration) / time_integration_factor
nr_incoherent = len(incoherent_type) # 'I' or 'IQUV' ('XXYY' only possible for coherent stokes)
total_nr_stokes = 0
total_files = 0
max_nr_subbands = 0
sap_files = []
for sap_nr in xrange(parset.getInt('Observation.nrBeams')):
logger.info("checking SAP {}".format(sap_nr))
subbandList = parset.getStringVector('Observation.Beam[%d].subbandList' % sap_nr)
nr_subbands = len(subbandList)
max_nr_subbands = max(nr_subbands, max_nr_subbands)
nr_files = 0
# Atm can have 1 IS TAB per SAP, because its pointing is equal to the SAP pointing.
# (When we support online coh dedisp and on multiple DMs, we can have >1 IS per SAP.)
is_tab_nr = -1
for tab_nr in xrange(parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr)):
logger.info("checking TAB {}".format(tab_nr))
if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): #not coherent is incoherent
logger.info("Found incoherent stokes TAB: %i" % tab_nr)
if is_tab_nr >= 0:
# Could get here to produce >1 IS TAB copies, maybe for some software test
logger.error("SAP %i: >1 incoherent TAB not supported: TAB nrs %i and %i" % (sap_nr, tab_nr, is_tab_nr))
return None
is_tab_nr = tab_nr
total_nr_stokes += nr_incoherent
nr_files += int(nr_incoherent * ceil(nr_subbands / float(subbands_per_file)))
if nr_files:
sap_files.append({'sap_nr': sap_nr, 'properties': {'nr_of_is_files': nr_files,
'nr_of_tabs': 1,
'is_tab_nr': is_tab_nr}})
total_files += nr_files
if total_files == 0:
logger.warn("Incoherent Stokes data output enabled, but no IS files across all SAPs")
return None
# NOTE: TABs in different SAPs can have different subband lists and if a TAB is split into parts (i.e. multiple files),
# the last TAB part may contain fewer subbands. But simplify: compute a single (i.e. max) file size for all TABs or TAB parts.
nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
file_size = int(nr_subbands_per_file * size_per_subband) # bytes
bandwidth = int(ceil(8 * file_size / duration)) # bits/second
idents = parset.getStringVector(DATAPRODUCTS + 'Output_IncoherentStokes.identifications')
output_files = {'is': {'nr_of_is_files': total_files, 'is_file_size': file_size,
'identifications': idents},
'saps': sap_files}
root_resource_group = parset.getString(DATAPRODUCTS + 'Output_IncoherentStokes.storageClusterName')
if self._hasGlobalStorage(root_resource_group):
# global fs and enough bandwidth: collapsed estimate is easier for Resource Assigner
file_size *= total_files
bandwidth *= total_files
total_files = 1
estimate = {'resource_types': {'bandwidth': bandwidth, 'storage': file_size},
'count': total_files, 'root_resource_groups': [root_resource_group],
'output_files': output_files}
estimate['output_files']['is']['nr_of_is_stokes'] = nr_incoherent
logger.debug("Incoherent Stokes data estimate: {}".format(estimate))
return estimate
def _merge_estimates(self, estimates):
""" Estimates can only be merged if same root_resource_groups and bandwidth and storage,
or if the root_resource_groups have a (single) global filesystem.
NOTE: assumed good enough to only merge conseq pairs, not all pairs.
"""
i = 1
while i < len(estimates): # careful iterating while modifying
if estimates[i-1]['root_resource_groups'] == estimates[i]['root_resource_groups'] and \
((estimates[i-1]['resource_types']['bandwidth'] == estimates[i]['resource_types']['bandwidth'] and \
estimates[i-1]['resource_types']['storage'] == estimates[i]['resource_types']['storage']) or \
all(self._hasGlobalStorage(rg) for rg in root_resource_groups)
):
# Mergeable. Add uv, cs, is from estimates[i] into estimates[i-1]
if 'uv' in estimates[i]['output_files']:
estimates[i-1]['output_files']['uv'] = estimates[i]['output_files']['uv']
if 'cs' in estimates[i]['output_files']:
estimates[i-1]['output_files']['cs'] = estimates[i]['output_files']['cs']
if 'is' in estimates[i]['output_files']:
estimates[i-1]['output_files']['is'] = estimates[i]['output_files']['is']
# Merge saps. Saps are listed in order, but a sap_nr may be missing.
# Extend one list by the other, sort, then merge any conseq duplicates.
estimates[i-1]['output_files']['saps'].extend(estimates[i]['output_files']['saps'])
estimates[i-1]['output_files']['saps'].sort(key=lambda sap: sap['sap_nr'])
j = 1
while j < len(estimates[i-1]['output_files']['saps']):
sap0 = estimates[i-1]['output_files']['saps'][j-1]
sap1 = estimates[i-1]['output_files']['saps'][j ]
if sap0['sap_nr'] == sap1['sap_nr']:
# Sum nr_of_tabs, other keys must be from another data product type
if 'nr_of_tabs' in sap0['properties'] and \
'nr_of_tabs' in sap1['properties']:
sap1['properties']['nr_of_tabs'] += sap0['properties']['nr_of_tabs']
sap0['properties'].update(sap1['properties']) # includes nr_of_tabs
estimates[i-1]['output_files']['saps'].pop(j)
else:
j += 1
if all(self._hasGlobalStorage(rg) for rg in estimates[i]['root_resource_groups']):
# for global fs, collapse regardless
estimates[i-1]['resource_types']['bandwidth'] *= estimates[i-1]['count'] # *= 1, but to be robust and clear
estimates[i-1]['resource_types']['bandwidth'] += estimates[i]['resource_types']['bandwidth'] * estimates[i]['count']
estimates[i-1]['resource_types']['storage'] *= estimates[i-1]['count'] # *= 1, but to be robust and clear
estimates[i-1]['resource_types']['storage'] += estimates[i]['resource_types']['storage'] * estimates[i]['count']
estimates[i-1]['count'] = 1 # already 1, but to be robust and clear
else:
# root_resource_groups and values of bandwidth and storage are equal for both estimates
estimates[i-1]['count'] += estimates[i]['count']
logger.info('Merged observation resource estimate {} into {}'.format(i, i-1))
estimates.pop(i)
else:
i += 1
return estimates
def _samples_per_second(self, parset):
""" set samples per second
"""
samples_160mhz = 155648
samples_200mhz = 196608
sample_clock = parset.getInt('Observation.sampleClock')
samples = samples_160mhz if 160 == sample_clock else samples_200mhz
logger.info("samples per second for {} MHz clock = {}".format(sample_clock, samples))
return samples
def _virtual_stations(self, parset):
""" calculate virtualnumber of stations
"""
stationList = parset.getStringVector('Observation.VirtualInstrument.stationList')
nr_virtual_stations = 0
if parset.getString('Observation.antennaSet') in ('HBA_DUAL', 'HBA_DUAL_INNER'):
for station in stationList:
if 'CS' in station:
nr_virtual_stations += 2
else:
nr_virtual_stations += 1
else:
nr_virtual_stations = len(stationList)
logger.info("number of virtual stations = {}".format(nr_virtual_stations))
return nr_virtual_stations
def _hasGlobalStorage(self, resource_group_name):
return resource_group_name == 'CEP4'