Skip to content
Snippets Groups Projects
Commit 28ba98a5 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8887: made observation estimator working for correlated. Commented out...

Task #8887: made observation estimator working for correlated. Commented out the other estimators since they cause crashes.
First get the services going, then fix the estimators
parent 74c10e1e
No related branches found
No related tags found
No related merge requests found
......@@ -12,38 +12,28 @@ class BaseResourceEstimator(object):
self.name = name
self.error = ""
self.parset = {}
self.used_keys = ()
self.required_keys = ()
self.input_files = {}
self.output_files = {}
self.duration = 0 # in seconds
self.total_data_size = 0 # size in bytes
self.total_bandwidth = 0 # in bytes/second
def check_parset(self):
""" check if all keys needed are available
"""
missing_keys = False
for key in self.used_keys:
key_set = key.split('.')
parset = self.parset
for k in key_set:
if k in parset:
parset = parset[k]
else:
logger.error("missing key [{}]".format(key))
missing_keys = True
def checkParsetForRequiredKeys(self):
""" check if all required keys needed are available"""
logger.debug("required keys: %s" % ', '.join(self.required_keys))
logger.debug("parset keys: %s" % ', '.join(self.parset.keys()))
missing_keys = set(self.required_keys) - set(self.parset.keys())
if missing_keys:
self.error = 'missing key(s)'
logger.error("missing keys: %s" % ', '.join(missing_keys))
return False
return True
def estimate(self):
""" empty estimate function
"""
self.error = "estimate function not defined"
logger.info("estimate in base class is called")
return
raise NotImplementedError('estimate() in base class is called. Please implement estimate() in your subclass')
def result_as_dict(self):
""" return estimated values as dict """
......
......@@ -18,9 +18,9 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator):
self.parset = ParameterSet(kwargs).make_subset('dp.output')
self.duration = int(kwargs.get('observation.duration', 1))
self.input_files = input_files
self.used_keys = ('correlated.enabled', 'correlated.demixing_settings.freq_step',
self.required_keys = ('correlated.enabled', 'correlated.demixing_settings.freq_step',
'correlated.demixing_settings.time_step', 'instrument_model.enabled')
if self.check_parset():
if self.checkParsetForRequiredKeys():
self.estimate()
return
......
......@@ -17,8 +17,8 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator):
self.parset = ParameterSet(kwargs).make_subset('dp.output')
self.duration = int(kwargs.get('observation.duration', 1))
self.input_files = input_files
self.used_keys = ('skyimage.enabled', 'skyimage.slices_per_image', 'skyimage.subbands_per_image')
if self.check_parset():
self.required_keys = ('skyimage.enabled', 'skyimage.slices_per_image', 'skyimage.subbands_per_image')
if self.checkParsetForRequiredKeys():
self.estimate()
return
......
......@@ -18,9 +18,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator):
self.parset = ParameterSet(kwargs).make_subset('dp.output')
self.duration = int(kwargs.get('observation.duration', 1))
self.input_files = input_files
self.used_keys = ('correlated.enabled', 'longbaseline.subband_groups_per_ms',
self.required_keys = ('correlated.enabled', 'longbaseline.subband_groups_per_ms',
'longbaseline.subbands_per_subband_group')
if self.check_parset():
if self.checkParsetForRequiredKeys():
self.estimate()
return
......
......@@ -4,7 +4,9 @@
import logging
from math import ceil, floor
from base_resource_estimator import BaseResourceEstimator
from .parameterset import ParameterSet
from lofar.parameterset import parameterset
from lofar.common.datetimeutils import totalSeconds
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
......@@ -12,37 +14,52 @@ logger = logging.getLogger(__name__)
class ObservationResourceEstimator(BaseResourceEstimator):
""" ObservationResourceEstimator
"""
def __init__(self, kwargs):
BaseResourceEstimator.__init__(self, name='observation')
logger.debug("init ObservationResourceEstimator")
self.parset = ParameterSet(kwargs).make_subset('observation')
self.used_keys = ('sample_clock', 'duration', 'channels_per_subband', 'intergration_time', 'antenna_mode',
'stations', 'flys_eye', 'output.coherent_stokes.enabled', 'output.coherent_stokes.type',
'output.coherent_stokes.integration_factor', 'output.incoherent_stokes.enabled',
'output.incoherent_stokes.type')
if self.check_parset():
def __init__(self, parsetDict):
logger.info("init ObservationResourceEstimator")
super(ObservationResourceEstimator, self).__init__(name='Observation')
logger.info('parsetDict: %s ' % parsetDict)
self.parset = parameterset(parsetDict).makeSubset('Observation.')
logger.info('parset: %s ' % self.parset)
self.required_keys = ('sampleClock',
'startTime',
'stopTime',
'antennaSet',
'ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband',
'ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime',
'ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye',
'ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor',
'ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor',
'VirtualInstrument.stationList',
'DataProducts.Output_CoherentStokes.enabled',
#'DataProducts.Output_CoherentStokes.type',
'DataProducts.Output_IncoherentStokes.enabled',
#'DataProducts.Output_IncoherentStokes.type'
)
if self.checkParsetForRequiredKeys():
self.estimate()
def estimate(self):
""" estimate
"""
logger.debug("start estimate '{}'".format(self.name))
logger.info("start estimate '{}'".format(self.name))
self.correlated()
self.coherentstokes()
self.incoherentstokes()
def correlated(self):
""" Estimate number of files and size of file"""
logger.debug("calculate correlated datasize")
size_of_header = self.parset.get('size_of_header', 512)
size_of_overhead = self.parset.get('size_of_overhead', 600000)
size_of_short = self.parset.get('size_of_short', 2)
size_of_complex = self.parset.get('size_of_complex', 8)
duration = int(self.parset.get('duration', 0))
nr_polarizations = int(self.parset.get('nr_polarizations', 2))
channels_per_subband = int(self.parset.get('channels_per_subband', 64))
intergration_time = int(self.parset.get('intergration_time', 1))
logger.info("calculating correlated datasize")
size_of_header = self.parset.getInt('size_of_header', 512)
size_of_overhead = self.parset.getInt('size_of_overhead', 600000)
size_of_short = self.parset.getInt('size_of_short', 2)
size_of_complex = self.parset.getInt('size_of_complex', 8)
startTime = datetime.strptime(self.parset.getString('startTime'), '%Y-%m-%d %H:%M:%S')
endTime = datetime.strptime(self.parset.getString('stopTime'), '%Y-%m-%d %H:%M:%S')
duration = totalSeconds(endTime - startTime)
nr_polarizations = self.parset.getInt('nr_polarizations', 2)
channels_per_subband = self.parset.getInt('ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband', 64)
intergration_time = self.parset.getFloat('ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime', 1)
no_virtual_stations = self._virtual_stations()
integrated_seconds = floor(duration / intergration_time)
......@@ -52,30 +69,36 @@ class ObservationResourceEstimator(BaseResourceEstimator):
# sum of all subbands in all digital beams
nr_subbands = 0
beam_nr = 0
while 'beam[{}]'.format(beam_nr) in self.parset:
nr_subbands += int(self.parset['beam[{}]'.format(beam_nr)]['nr_subbands'])
beam_nr += 1
for beam_nr in range(1024):
try:
subbandList = self.parset.getStringVector('Beam[%d].subbandList' % beam_nr)
nr_subbands += len(subbandList)
except RuntimeError: #why does parset not raise a KeyError???
break
file_size = (data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead
self.output_files['dp_correlated_uv'] = {'nr_files': nr_subbands, 'file_size': int(file_size)}
logger.debug("dp_correlated_uv: {} files {} bytes each".format(nr_subbands, int(file_size)))
logger.info("dp_correlated_uv: {} files {} bytes each".format(nr_subbands, int(file_size)))
self.total_data_size += ceil(file_size * nr_subbands) # bytes
self.total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/second
return
def coherentstokes(self):
""" calculate coherent stokes
"""
if self.parset['output']['coherent_stokes']['enabled'] == 'false':
#TODO: implement properly
return
if self.parset['output']['CoherentStokes']['enabled'] == 'false':
return
logger.debug("calculate coherentstokes datasize")
coherent_type = self.parset['output']['coherent_stokes']['type']
logger.info("calculate coherentstokes datasize")
coherent_type = self.parset['output']['CoherentStokes']['type']
subbands_per_file = 512.
samples_per_second = self._samples_per_second()
duration = int(self.parset.get('duration', 0))
integration_factor = self.parset['output']['coherent_stokes']['integration_factor']
integration_factor = self.parset['output']['CoherentStokes']['integration_factor']
total_files_summed = 0
total_files = 0
......@@ -92,13 +115,13 @@ class ObservationResourceEstimator(BaseResourceEstimator):
beam_nr = 0
while 'beam[{}]'.format(beam_nr) in self.parset:
beam = self.parset['beam[{}]'.format(beam_nr)]
logger.debug("add beam {}".format(beam_nr))
logger.info("add beam {}".format(beam_nr))
max_nr_subbands = max(max_nr_subbands, int(beam['nr_subbands']))
tied_array_beam_nr = 0
while 'tied_array_beam[{}]'.format(tied_array_beam_nr) in beam:
tied_array_beam = beam['tied_array_beam[{}]'.format(tied_array_beam_nr)]
logger.debug("add tied_array_beam {}".format(tied_array_beam_nr))
logger.info("add tied_array_beam {}".format(tied_array_beam_nr))
if tied_array_beam['coherent'] == 'true':
total_files_min = nr_coherent
total_files_summed += total_files_min
......@@ -107,13 +130,13 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_tab_rings = int(beam['nr_tab_rings'])
if nr_tab_rings > 0:
logger.debug("add size for {} tab_rings".format(nr_tab_rings))
logger.info("add size for {} tab_rings".format(nr_tab_rings))
total_files_min = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1) * nr_coherent
total_files_summed += total_files_min
total_files += total_files_min * ceil(int(beam['nr_subbands']) / subbands_per_file)
if self.parset['flys_eye']['enabled'] == 'true':
logger.debug("calculate flys eye data size")
logger.info("calculate flys eye data size")
total_files_min = self._virtual_stations() * nr_coherent
total_files_summed += total_files_min
total_files += total_files_min * ceil(int(beam['nr_subbands']) / subbands_per_file)
......@@ -122,8 +145,8 @@ class ObservationResourceEstimator(BaseResourceEstimator):
nr_subbands_per_file = min(subbands_per_file, max_nr_subbands)
size_per_file = nr_subbands_per_file * size_per_subband
self.output_files['dp_coherent_stokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)}
logger.debug("dp_coherent_stokes: {} files {} bytes each".format(int(total_files), int(size_per_file)))
self.output_files['dp_CoherentStokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)}
logger.info("dp_CoherentStokes: {} files {} bytes each".format(int(total_files), int(size_per_file)))
self.total_data_size += ceil(total_files_summed * max_nr_subbands * size_per_subband)
self.total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/second
......@@ -134,10 +157,14 @@ class ObservationResourceEstimator(BaseResourceEstimator):
:return:
"""
if self.parset['output']['incoherent_stokes']['enabled'] == 'false':
#TODO: implement properly
return
if self.parset['output']['inCoherentStokes']['enabled'] == 'false':
return
logger.debug("calculate incoherentstokes data size")
incoherent_type = self.parset['output']['incoherent_stokes']['type']
logger.info("calculate incoherentstokes data size")
incoherent_type = self.parset['output']['inCoherentStokes']['type']
subbands_per_file = 512.0
samples_per_second = self._samples_per_second()
duration = int(self.parset.get('duration', 0))
......@@ -154,12 +181,12 @@ class ObservationResourceEstimator(BaseResourceEstimator):
beam_nr = 0
while 'beam[{}]'.format(beam_nr) in self.parset:
beam = self.parset['beam[{}]'.format(beam_nr)]
logger.debug("add beam {}".format(beam_nr))
logger.info("add beam {}".format(beam_nr))
max_nr_subbands = max(max_nr_subbands, int(beam['nr_subbands']))
tied_array_beam_nr = 0
while 'tied_array_beam[{}]'.format(tied_array_beam_nr) in beam:
tied_array_beam = beam['tied_array_beam[{}]'.format(tied_array_beam_nr)]
logger.debug("add tied_array_beam {}".format(tied_array_beam_nr))
logger.info("add tied_array_beam {}".format(tied_array_beam_nr))
if tied_array_beam['coherent'] == 'false':
total_files_summed += nr_incoherent
total_files += nr_incoherent * ceil(int(beam['nr_subbands']) / subbands_per_file)
......@@ -176,8 +203,8 @@ class ObservationResourceEstimator(BaseResourceEstimator):
size_per_subband = (samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration
size_per_file = nr_subbands_per_file * size_per_subband
self.output_files['dp_incoherent_stokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)}
logger.debug("dp_incoherent_stokes: {} files {} bytes each".format(int(total_files), int(size_per_file)))
self.output_files['dp_inCoherentStokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)}
logger.info("dp_inCoherentStokes: {} files {} bytes each".format(int(total_files), int(size_per_file)))
self.total_data_size += ceil(total_files_summed * max_nr_subbands * size_per_subband) # bytes
self.total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/sec
......@@ -189,21 +216,22 @@ class ObservationResourceEstimator(BaseResourceEstimator):
samples_160mhz = 155648
samples_200mhz = 196608
samples = samples_160mhz if '160' in self.parset['sample_clock'] else samples_200mhz
logger.debug("samples per second for {} MHz clock = {}".format(self.parset['sample_clock'], samples))
logger.info("samples per second for {} MHz clock = {}".format(self.parset['sample_clock'], samples))
return samples
def _virtual_stations(self):
""" calculate virtualnumber of stations
"""
stationList = self.parset.getStringVector('VirtualInstrument.stationList')
nr_virtual_stations = 0
if self.parset['antenna_mode'] in ('HBA_DUAL', 'HBA_DUAL_INNER'):
for station in self.parset['stations']:
if self.parset['antennaSet'] in ('HBA_DUAL', 'HBA_DUAL_INNER'):
for station in stationList:
if 'CS' in station:
nr_virtual_stations += 2
else:
nr_virtual_stations += 1
else:
nr_virtual_stations = len(self.parset['stations'])
logger.debug("number of virtual stations = {}".format(nr_virtual_stations))
nr_virtual_stations = len(stationList)
logger.info("number of virtual stations = {}".format(nr_virtual_stations))
return nr_virtual_stations
......@@ -18,8 +18,8 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator):
self.duration = int(kwargs.get('observation.duration', 1))
self.coherent_stokes_type = kwargs.get('observation.coherent_stokes.type')
self.input_files = input_files
self.used_keys = ('pulsar.enabled',)
if self.check_parset():
self.required_keys = ('pulsar.enabled',)
if self.checkParsetForRequiredKeys():
self.estimate()
return
......
......@@ -21,26 +21,27 @@ class ResourceEstimatorHandler(MessageHandlerInterface):
def handle_message(self, msg):
return self._get_estimated_resources(msg)
def _get_estimated_resources(self, parset_str):
logger.info('get_estimated_resources on: %s' % parset_str)
def _get_estimated_resources(self, parsetDict):
logger.info('get_estimated_resources on: %s' % parsetDict)
result = {}
observation = ObservationResourceEstimator(parset_str)
observation = ObservationResourceEstimator(parsetDict)
result.update(observation.result_as_dict())
pipeline_input_files = result['observation']['output_files']
#TODO: implement properly
#pipeline_input_files = result['observation']['output_files']
longbaseline = LongBaselinePipelineResourceEstimator(parset_str, input_files=pipeline_input_files)
result.update(longbaseline.result_as_dict())
#longbaseline = LongBaselinePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files)
#result.update(longbaseline.result_as_dict())
calibration = CalibrationPipelineResourceEstimator(parset_str, input_files=pipeline_input_files)
result.update(calibration.result_as_dict())
#calibration = CalibrationPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files)
#result.update(calibration.result_as_dict())
pulsar = PulsarPipelineResourceEstimator(parset_str, input_files=pipeline_input_files)
result.update(pulsar.result_as_dict())
#pulsar = PulsarPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files)
#result.update(pulsar.result_as_dict())
image = ImagePipelineResourceEstimator(parset_str, input_files=pipeline_input_files)
result.update(image.result_as_dict())
#image = ImagePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files)
#result.update(image.result_as_dict())
return result
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment