diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py index dc63be64d6d060635340eed0c018fe396cccb415..c0b16e5dd717da8b2030eb2d178e5bf4d54451af 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/base_resource_estimator.py @@ -12,38 +12,28 @@ class BaseResourceEstimator(object): self.name = name self.error = "" self.parset = {} - self.used_keys = () + self.required_keys = () self.input_files = {} self.output_files = {} self.duration = 0 # in seconds self.total_data_size = 0 # size in bytes self.total_bandwidth = 0 # in bytes/second - def check_parset(self): - """ check if all keys needed are available - """ - missing_keys = False - for key in self.used_keys: - key_set = key.split('.') - parset = self.parset - for k in key_set: - if k in parset: - parset = parset[k] - else: - logger.error("missing key [{}]".format(key)) - missing_keys = True + def checkParsetForRequiredKeys(self): + """ check if all required keys needed are available""" + logger.debug("required keys: %s" % ', '.join(self.required_keys)) + logger.debug("parset keys: %s" % ', '.join(self.parset.keys())) + + missing_keys = set(self.required_keys) - set(self.parset.keys()) if missing_keys: - self.error = 'missing key(s)' + logger.error("missing keys: %s" % ', '.join(missing_keys)) return False + return True def estimate(self): - """ empty estimate function - """ - self.error = "estimate function not defined" - logger.info("estimate in base class is called") - return + raise NotImplementedError('estimate() in base class is called. Please implement estimate() in your subclass') def result_as_dict(self): """ return estimated values as dict """ diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py index a9487013ae9fb87926a5d1b5544034be21ab720c..8abb90cc1d1ff66d8ee5aecd3d238b195e7407ad 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/calibration_pipeline.py @@ -18,9 +18,9 @@ class CalibrationPipelineResourceEstimator(BaseResourceEstimator): self.parset = ParameterSet(kwargs).make_subset('dp.output') self.duration = int(kwargs.get('observation.duration', 1)) self.input_files = input_files - self.used_keys = ('correlated.enabled', 'correlated.demixing_settings.freq_step', + self.required_keys = ('correlated.enabled', 'correlated.demixing_settings.freq_step', 'correlated.demixing_settings.time_step', 'instrument_model.enabled') - if self.check_parset(): + if self.checkParsetForRequiredKeys(): self.estimate() return diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py index a913be890ec908858a7d14e1be07c88a650faa02..9cf9c20f8e25f8e941c6a31f77089d784157cd0a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/image_pipeline.py @@ -17,8 +17,8 @@ class ImagePipelineResourceEstimator(BaseResourceEstimator): self.parset = ParameterSet(kwargs).make_subset('dp.output') self.duration = int(kwargs.get('observation.duration', 1)) self.input_files = input_files - self.used_keys = ('skyimage.enabled', 'skyimage.slices_per_image', 'skyimage.subbands_per_image') - if self.check_parset(): + self.required_keys = ('skyimage.enabled', 'skyimage.slices_per_image', 'skyimage.subbands_per_image') + if self.checkParsetForRequiredKeys(): self.estimate() return diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py index 598f781ca0b4ab3cb72ec275e45bcee1fc6a6ab3..0ef237a206494726e80d4d16a424b35324b1bab3 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/longbaseline_pipeline.py @@ -18,9 +18,9 @@ class LongBaselinePipelineResourceEstimator(BaseResourceEstimator): self.parset = ParameterSet(kwargs).make_subset('dp.output') self.duration = int(kwargs.get('observation.duration', 1)) self.input_files = input_files - self.used_keys = ('correlated.enabled', 'longbaseline.subband_groups_per_ms', + self.required_keys = ('correlated.enabled', 'longbaseline.subband_groups_per_ms', 'longbaseline.subbands_per_subband_group') - if self.check_parset(): + if self.checkParsetForRequiredKeys(): self.estimate() return diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index 1a6b9939797ca51cf18b0aa36f76868eb8cb6ed2..3820d458ebbdb5ade5fd7b41031f7d90f9d9d93a 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -4,7 +4,9 @@ import logging from math import ceil, floor from base_resource_estimator import BaseResourceEstimator -from .parameterset import ParameterSet +from lofar.parameterset import parameterset +from lofar.common.datetimeutils import totalSeconds +from datetime import datetime, timedelta logger = logging.getLogger(__name__) @@ -12,37 +14,52 @@ logger = logging.getLogger(__name__) class ObservationResourceEstimator(BaseResourceEstimator): """ ObservationResourceEstimator """ - def __init__(self, kwargs): - BaseResourceEstimator.__init__(self, name='observation') - logger.debug("init ObservationResourceEstimator") - self.parset = ParameterSet(kwargs).make_subset('observation') - self.used_keys = ('sample_clock', 'duration', 'channels_per_subband', 'intergration_time', 'antenna_mode', - 'stations', 'flys_eye', 'output.coherent_stokes.enabled', 'output.coherent_stokes.type', - 'output.coherent_stokes.integration_factor', 'output.incoherent_stokes.enabled', - 'output.incoherent_stokes.type') - - if self.check_parset(): + def __init__(self, parsetDict): + logger.info("init ObservationResourceEstimator") + super(ObservationResourceEstimator, self).__init__(name='Observation') + logger.info('parsetDict: %s ' % parsetDict) + self.parset = parameterset(parsetDict).makeSubset('Observation.') + logger.info('parset: %s ' % self.parset) + self.required_keys = ('sampleClock', + 'startTime', + 'stopTime', + 'antennaSet', + 'ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband', + 'ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime', + 'ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye', + 'ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor', + 'ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor', + 'VirtualInstrument.stationList', + 'DataProducts.Output_CoherentStokes.enabled', + #'DataProducts.Output_CoherentStokes.type', + 'DataProducts.Output_IncoherentStokes.enabled', + #'DataProducts.Output_IncoherentStokes.type' + ) + + if self.checkParsetForRequiredKeys(): self.estimate() def estimate(self): """ estimate """ - logger.debug("start estimate '{}'".format(self.name)) + logger.info("start estimate '{}'".format(self.name)) self.correlated() self.coherentstokes() self.incoherentstokes() def correlated(self): """ Estimate number of files and size of file""" - logger.debug("calculate correlated datasize") - size_of_header = self.parset.get('size_of_header', 512) - size_of_overhead = self.parset.get('size_of_overhead', 600000) - size_of_short = self.parset.get('size_of_short', 2) - size_of_complex = self.parset.get('size_of_complex', 8) - duration = int(self.parset.get('duration', 0)) - nr_polarizations = int(self.parset.get('nr_polarizations', 2)) - channels_per_subband = int(self.parset.get('channels_per_subband', 64)) - intergration_time = int(self.parset.get('intergration_time', 1)) + logger.info("calculating correlated datasize") + size_of_header = self.parset.getInt('size_of_header', 512) + size_of_overhead = self.parset.getInt('size_of_overhead', 600000) + size_of_short = self.parset.getInt('size_of_short', 2) + size_of_complex = self.parset.getInt('size_of_complex', 8) + startTime = datetime.strptime(self.parset.getString('startTime'), '%Y-%m-%d %H:%M:%S') + endTime = datetime.strptime(self.parset.getString('stopTime'), '%Y-%m-%d %H:%M:%S') + duration = totalSeconds(endTime - startTime) + nr_polarizations = self.parset.getInt('nr_polarizations', 2) + channels_per_subband = self.parset.getInt('ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.nrChannelsPerSubband', 64) + intergration_time = self.parset.getFloat('ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime', 1) no_virtual_stations = self._virtual_stations() integrated_seconds = floor(duration / intergration_time) @@ -52,30 +69,36 @@ class ObservationResourceEstimator(BaseResourceEstimator): # sum of all subbands in all digital beams nr_subbands = 0 - beam_nr = 0 - while 'beam[{}]'.format(beam_nr) in self.parset: - nr_subbands += int(self.parset['beam[{}]'.format(beam_nr)]['nr_subbands']) - beam_nr += 1 + for beam_nr in range(1024): + try: + subbandList = self.parset.getStringVector('Beam[%d].subbandList' % beam_nr) + nr_subbands += len(subbandList) + except RuntimeError: #why does parset not raise a KeyError??? + break file_size = (data_size + n_sample_size + size_of_header) * integrated_seconds + size_of_overhead self.output_files['dp_correlated_uv'] = {'nr_files': nr_subbands, 'file_size': int(file_size)} - logger.debug("dp_correlated_uv: {} files {} bytes each".format(nr_subbands, int(file_size))) + logger.info("dp_correlated_uv: {} files {} bytes each".format(nr_subbands, int(file_size))) self.total_data_size += ceil(file_size * nr_subbands) # bytes self.total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/second - return + def coherentstokes(self): """ calculate coherent stokes """ - if self.parset['output']['coherent_stokes']['enabled'] == 'false': + + #TODO: implement properly + return + + if self.parset['output']['CoherentStokes']['enabled'] == 'false': return - logger.debug("calculate coherentstokes datasize") - coherent_type = self.parset['output']['coherent_stokes']['type'] + logger.info("calculate coherentstokes datasize") + coherent_type = self.parset['output']['CoherentStokes']['type'] subbands_per_file = 512. samples_per_second = self._samples_per_second() duration = int(self.parset.get('duration', 0)) - integration_factor = self.parset['output']['coherent_stokes']['integration_factor'] + integration_factor = self.parset['output']['CoherentStokes']['integration_factor'] total_files_summed = 0 total_files = 0 @@ -92,13 +115,13 @@ class ObservationResourceEstimator(BaseResourceEstimator): beam_nr = 0 while 'beam[{}]'.format(beam_nr) in self.parset: beam = self.parset['beam[{}]'.format(beam_nr)] - logger.debug("add beam {}".format(beam_nr)) + logger.info("add beam {}".format(beam_nr)) max_nr_subbands = max(max_nr_subbands, int(beam['nr_subbands'])) tied_array_beam_nr = 0 while 'tied_array_beam[{}]'.format(tied_array_beam_nr) in beam: tied_array_beam = beam['tied_array_beam[{}]'.format(tied_array_beam_nr)] - logger.debug("add tied_array_beam {}".format(tied_array_beam_nr)) + logger.info("add tied_array_beam {}".format(tied_array_beam_nr)) if tied_array_beam['coherent'] == 'true': total_files_min = nr_coherent total_files_summed += total_files_min @@ -107,13 +130,13 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_tab_rings = int(beam['nr_tab_rings']) if nr_tab_rings > 0: - logger.debug("add size for {} tab_rings".format(nr_tab_rings)) + logger.info("add size for {} tab_rings".format(nr_tab_rings)) total_files_min = (3 * nr_tab_rings * (nr_tab_rings + 1) + 1) * nr_coherent total_files_summed += total_files_min total_files += total_files_min * ceil(int(beam['nr_subbands']) / subbands_per_file) if self.parset['flys_eye']['enabled'] == 'true': - logger.debug("calculate flys eye data size") + logger.info("calculate flys eye data size") total_files_min = self._virtual_stations() * nr_coherent total_files_summed += total_files_min total_files += total_files_min * ceil(int(beam['nr_subbands']) / subbands_per_file) @@ -122,8 +145,8 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_subbands_per_file = min(subbands_per_file, max_nr_subbands) size_per_file = nr_subbands_per_file * size_per_subband - self.output_files['dp_coherent_stokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)} - logger.debug("dp_coherent_stokes: {} files {} bytes each".format(int(total_files), int(size_per_file))) + self.output_files['dp_CoherentStokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)} + logger.info("dp_CoherentStokes: {} files {} bytes each".format(int(total_files), int(size_per_file))) self.total_data_size += ceil(total_files_summed * max_nr_subbands * size_per_subband) self.total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/second @@ -134,10 +157,14 @@ class ObservationResourceEstimator(BaseResourceEstimator): :return: """ - if self.parset['output']['incoherent_stokes']['enabled'] == 'false': + + #TODO: implement properly + return + + if self.parset['output']['inCoherentStokes']['enabled'] == 'false': return - logger.debug("calculate incoherentstokes data size") - incoherent_type = self.parset['output']['incoherent_stokes']['type'] + logger.info("calculate incoherentstokes data size") + incoherent_type = self.parset['output']['inCoherentStokes']['type'] subbands_per_file = 512.0 samples_per_second = self._samples_per_second() duration = int(self.parset.get('duration', 0)) @@ -154,12 +181,12 @@ class ObservationResourceEstimator(BaseResourceEstimator): beam_nr = 0 while 'beam[{}]'.format(beam_nr) in self.parset: beam = self.parset['beam[{}]'.format(beam_nr)] - logger.debug("add beam {}".format(beam_nr)) + logger.info("add beam {}".format(beam_nr)) max_nr_subbands = max(max_nr_subbands, int(beam['nr_subbands'])) tied_array_beam_nr = 0 while 'tied_array_beam[{}]'.format(tied_array_beam_nr) in beam: tied_array_beam = beam['tied_array_beam[{}]'.format(tied_array_beam_nr)] - logger.debug("add tied_array_beam {}".format(tied_array_beam_nr)) + logger.info("add tied_array_beam {}".format(tied_array_beam_nr)) if tied_array_beam['coherent'] == 'false': total_files_summed += nr_incoherent total_files += nr_incoherent * ceil(int(beam['nr_subbands']) / subbands_per_file) @@ -176,8 +203,8 @@ class ObservationResourceEstimator(BaseResourceEstimator): size_per_subband = (samples_per_second * 4) / time_integration_factor / channel_integration_factor * duration size_per_file = nr_subbands_per_file * size_per_subband - self.output_files['dp_incoherent_stokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)} - logger.debug("dp_incoherent_stokes: {} files {} bytes each".format(int(total_files), int(size_per_file))) + self.output_files['dp_inCoherentStokes'] = {'nr_files': int(total_files), 'file_size': int(size_per_file)} + logger.info("dp_inCoherentStokes: {} files {} bytes each".format(int(total_files), int(size_per_file))) self.total_data_size += ceil(total_files_summed * max_nr_subbands * size_per_subband) # bytes self.total_bandwidth = ceil((self.total_data_size * 8) / duration) # bits/sec @@ -189,21 +216,22 @@ class ObservationResourceEstimator(BaseResourceEstimator): samples_160mhz = 155648 samples_200mhz = 196608 samples = samples_160mhz if '160' in self.parset['sample_clock'] else samples_200mhz - logger.debug("samples per second for {} MHz clock = {}".format(self.parset['sample_clock'], samples)) + logger.info("samples per second for {} MHz clock = {}".format(self.parset['sample_clock'], samples)) return samples def _virtual_stations(self): """ calculate virtualnumber of stations """ + stationList = self.parset.getStringVector('VirtualInstrument.stationList') nr_virtual_stations = 0 - if self.parset['antenna_mode'] in ('HBA_DUAL', 'HBA_DUAL_INNER'): - for station in self.parset['stations']: + if self.parset['antennaSet'] in ('HBA_DUAL', 'HBA_DUAL_INNER'): + for station in stationList: if 'CS' in station: nr_virtual_stations += 2 else: nr_virtual_stations += 1 else: - nr_virtual_stations = len(self.parset['stations']) - logger.debug("number of virtual stations = {}".format(nr_virtual_stations)) + nr_virtual_stations = len(stationList) + logger.info("number of virtual stations = {}".format(nr_virtual_stations)) return nr_virtual_stations diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py index 90ee55a92dfaf3b0e650d87a583212893ce3bd8e..019cf5efcb9a20efe367a8b0160343436d146a9e 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/pulsar_pipeline.py @@ -18,8 +18,8 @@ class PulsarPipelineResourceEstimator(BaseResourceEstimator): self.duration = int(kwargs.get('observation.duration', 1)) self.coherent_stokes_type = kwargs.get('observation.coherent_stokes.type') self.input_files = input_files - self.used_keys = ('pulsar.enabled',) - if self.check_parset(): + self.required_keys = ('pulsar.enabled',) + if self.checkParsetForRequiredKeys(): self.estimate() return diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py index 58c015f672f9ff6271d61950779f11b9d6410662..45ce0a28998ffe5a143d21e2ea6695399aaf3fe2 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/service.py @@ -21,26 +21,27 @@ class ResourceEstimatorHandler(MessageHandlerInterface): def handle_message(self, msg): return self._get_estimated_resources(msg) - def _get_estimated_resources(self, parset_str): - logger.info('get_estimated_resources on: %s' % parset_str) + def _get_estimated_resources(self, parsetDict): + logger.info('get_estimated_resources on: %s' % parsetDict) result = {} - observation = ObservationResourceEstimator(parset_str) + observation = ObservationResourceEstimator(parsetDict) result.update(observation.result_as_dict()) - pipeline_input_files = result['observation']['output_files'] + #TODO: implement properly + #pipeline_input_files = result['observation']['output_files'] - longbaseline = LongBaselinePipelineResourceEstimator(parset_str, input_files=pipeline_input_files) - result.update(longbaseline.result_as_dict()) + #longbaseline = LongBaselinePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) + #result.update(longbaseline.result_as_dict()) - calibration = CalibrationPipelineResourceEstimator(parset_str, input_files=pipeline_input_files) - result.update(calibration.result_as_dict()) + #calibration = CalibrationPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) + #result.update(calibration.result_as_dict()) - pulsar = PulsarPipelineResourceEstimator(parset_str, input_files=pipeline_input_files) - result.update(pulsar.result_as_dict()) + #pulsar = PulsarPipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) + #result.update(pulsar.result_as_dict()) - image = ImagePipelineResourceEstimator(parset_str, input_files=pipeline_input_files) - result.update(image.result_as_dict()) + #image = ImagePipelineResourceEstimator(parsetDict, input_files=pipeline_input_files) + #result.update(image.result_as_dict()) return result