Skip to content
Snippets Groups Projects
Commit a006e122 authored by Jan David Mol's avatar Jan David Mol
Browse files

TMSS-528: Refactoring, touch ups.

parent 6587ee2d
No related branches found
No related tags found
1 merge request!366Resolve TMSS-528 "Beamformer support"
...@@ -31,10 +31,6 @@ from math import ceil ...@@ -31,10 +31,6 @@ from math import ceil
import logging import logging
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
def add_prefix(parset: dict, prefix: str) -> dict:
""" Add a prefix to all the keys in the given parset """
return {prefix+k: v for k,v in parset.items()}
# placeholder for dataproducts for which we could find no location # placeholder for dataproducts for which we could find no location
class null_dataproduct: class null_dataproduct:
filename = "null:" filename = "null:"
...@@ -42,7 +38,11 @@ class null_dataproduct: ...@@ -42,7 +38,11 @@ class null_dataproduct:
null_dataproduct = null_dataproduct() null_dataproduct = null_dataproduct()
def stokes_to_parset(stokes_spec: dict) -> dict: def _add_prefix(parset: dict, prefix: str) -> dict:
""" Add a prefix to all the keys in the given parset """
return {prefix+k: v for k,v in parset.items()}
def _stokes_settings_parset_subkeys(stokes_spec: dict) -> dict:
""" Convert stokes specifications to parset keys. """ """ Convert stokes specifications to parset keys. """
parset = {} parset = {}
...@@ -60,7 +60,7 @@ def stokes_to_parset(stokes_spec: dict) -> dict: ...@@ -60,7 +60,7 @@ def stokes_to_parset(stokes_spec: dict) -> dict:
return parset return parset
def sap_index(saps: dict, sap_name: str) -> int: def _sap_index(saps: dict, sap_name: str) -> int:
""" Return the SAP index in the observation given a certain SAP name. """ """ Return the SAP index in the observation given a certain SAP name. """
sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name] sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name]
...@@ -71,87 +71,15 @@ def sap_index(saps: dict, sap_name: str) -> int: ...@@ -71,87 +71,15 @@ def sap_index(saps: dict, sap_name: str) -> int:
return sap_indices[0] return sap_indices[0]
def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtask) -> dict:
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_refs(subtask.specifications_template.schema))
# -----------------------------------------------------------------------------------------------
# Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
# With the help of Auke and Jan-David I could generate the parset as defined below.
# MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
# As a result, the generated parset contains many "duplicate"(nested) keys.
# We all agree that this is ugly, and we should not want this, but hey... it works.
# We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
# Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
# -----------------------------------------------------------------------------------------------
# ---------------------------- def _convert_correlator_settings_to_parset_dict(subtask: models.Subtask, spec: dict) -> dict:
# Generic settings """ Provide the parset keys for the COBALT correlator. """
# ----------------------------
parset = dict() # parameterset has no proper assignment operators, so take detour via dict...
parset["Observation.ObsID"] = subtask.pk
parset["Observation.momID"] = 0 # Needed by MACScheduler
parset["Observation.otdbID"] = 0 # Needed by MACScheduler; should/can this be the same as subtask.pk?
parset["Observation.tmssID"] = subtask.pk
parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize()
parset["Observation.processSubtype"] = "Beam Observation"
parset["Observation.Campaign.name"] = subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name
parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time
parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time
parset["Observation.strategy"] = "default" # maybe not mandatory?
# ----------------------------
# Station settings
# ----------------------------
parset["Observation.VirtualInstrument.minimalNrStations"] = 1 # maybe not mandatory?
parset["Observation.VirtualInstrument.stationSet"] = "Custom" # maybe not mandatory?
parset["Observation.VirtualInstrument.stationList"] = [s for s in spec["stations"]["station_list"]]
parset["Observation.antennaArray"] = "HBA" if "HBA" in spec["stations"]["antenna_set"] else "LBA" # maybe not mandatory?
parset["Observation.antennaSet"] = spec["stations"]["antenna_set"]
parset["Observation.bandFilter"] = spec["stations"]["filter"]
parset["Observation.sampleClock"] = 200 # fixed value, no other values are supported
parset["Observation.nrBitsPerSample"] = 8 # fixed value, no other values are supported.
# Digital beams
digi_beams = spec['stations']['digital_pointings']
parset["Observation.nrBeams"] = len(digi_beams)
for beam_nr, digi_beam in enumerate(digi_beams):
beam_prefix = "Observation.Beam[%d]." % beam_nr
parset[beam_prefix+"directionType"] = digi_beam['pointing']['direction_type']
parset[beam_prefix+"angle1"] = digi_beam['pointing']['angle1']
parset[beam_prefix+"angle2"] = digi_beam['pointing']['angle2']
parset[beam_prefix+"target"] = digi_beam['name']
parset[beam_prefix+"subbandList"] = digi_beam['subbands']
parset[beam_prefix+"nrTiedArrayBeams"] = 0
parset[beam_prefix+"nrTabRings"] = 0
# Analog beam (=HBA tile beam)
analog_beam = spec['stations']['analog_pointing']
parset["Observation.nrAnaBeams"] = 1
beam_prefix = "Observation.AnaBeam[0]."
parset[beam_prefix+"directionType"] = analog_beam['direction_type']
parset[beam_prefix+"angle1"] = analog_beam['angle1']
parset[beam_prefix+"angle2"] = analog_beam['angle2']
# ----------------------------
# COBALT settings
# ----------------------------
correlator_enabled = spec['COBALT']['correlator_enabled']
cobalt_version = spec['COBALT']['version'] cobalt_version = spec['COBALT']['version']
digi_beams = spec['stations']['digital_pointings']
parset["Cobalt.realTime"] = True parset = {}
parset["Cobalt.blockSize"] = spec['COBALT']['blocksize']
parset["Cobalt.correctBandPass"] = spec['COBALT']['bandpass_correction']
parset["Cobalt.delayCompensation"] = spec['COBALT']['delay_compensation']
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name
# Correlator settings
correlator_enabled = spec['COBALT']['correlator_enabled']
parset["Observation.DataProducts.Output_Correlated.enabled"] = correlator_enabled parset["Observation.DataProducts.Output_Correlated.enabled"] = correlator_enabled
parset["Observation.DataProducts.Output_Correlated.filenames"] = [] parset["Observation.DataProducts.Output_Correlated.filenames"] = []
parset["Observation.DataProducts.Output_Correlated.locations"] = [] parset["Observation.DataProducts.Output_Correlated.locations"] = []
...@@ -164,12 +92,12 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -164,12 +92,12 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1 parset["Cobalt.Correlator.nrIntegrationsPerBlock"] = spec['COBALT']['correlator']['integrations_per_block'] if correlator_enabled else 1
if correlator_enabled: if correlator_enabled:
if 'phase_centers' in spec['COBALT']['correlator']: if cobalt_version >= 2 and 'phase_centers' in spec['COBALT']['correlator']:
for beam_nr, digi_beam in enumerate(digi_beams): for beam_nr, digi_beam in enumerate(digi_beams):
beam_prefix = "Observation.Beam[%d]." % beam_nr
if cobalt_version >= 2:
phase_centers = spec['COBALT']['correlator']['phase_centers'] phase_centers = spec['COBALT']['correlator']['phase_centers']
if phase_centers: if phase_centers:
beam_prefix = "Observation.Beam[%d]." % beam_nr
# for now, cobalt can handle only one phase_center # for now, cobalt can handle only one phase_center
# assume the first is the one # assume the first is the one
phase_center = phase_centers[0] phase_center = phase_centers[0]
...@@ -191,19 +119,39 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -191,19 +119,39 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
# mimic MoM placeholder thingy (the resource estimator parses this) # mimic MoM placeholder thingy (the resource estimator parses this)
parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] parset["Observation.DataProducts.Output_Correlated.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]
# Beamformer settings return parset
def beamformer_pipeline_to_parset(pipeline, coherent_dataproducts, incoherent_dataproducts): def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: dict) -> dict:
""" Return parset settings for a beamformer pipeline in the subtask, and augments (in)coherent_dataproducts with any files produced.""" """ Provide the parset keys for the COBALT beamformer. """
cobalt_version = spec['COBALT']['version']
digi_beams = spec['stations']['digital_pointings']
parset = {}
# TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work
subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
subtask_output_ids = [o.id for o in subtask_outputs]
# TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order
dataproducts = list(models.Dataproduct.objects.filter(producer_id__in=subtask_output_ids).filter(dataformat=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype=Datatype.Choices.TIME_SERIES.value).order_by('filename'))
# Lists of coherent and incoherent dataproducts that will be produced, in the order COBALT wants them
coherent_dataproducts = []
incoherent_dataproducts = []
# List of beamformer pipelines, staged to be added to the parset later
beamformer_pipeline_parsets = []
# Process beamformer pipelines
for pipeline in spec['COBALT']['beamformer']['tab_pipelines']:
pipeline_parset = {} pipeline_parset = {}
pipeline_parset.update(add_prefix(stokes_to_parset(pipeline['coherent']), "CoherentStokes.")) pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))
pipeline_parset.update(add_prefix(stokes_to_parset(pipeline['incoherent']), "IncoherentStokes.")) pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['incoherent']), "IncoherentStokes."))
pipeline_parset['nrBeams'] = len(pipeline['SAPs']) pipeline_parset['nrBeams'] = len(pipeline['SAPs'])
for sap in pipeline['SAPs']: for sap in pipeline['SAPs']:
sap_idx = sap_index(spec['stations']['digital_pointings'], sap['name']) sap_idx = _sap_index(digi_beams, sap['name'])
nr_subbands = len(sap['subbands']) or len(spec['stations']['digital_pointings'][sap_idx]['subbands'])
pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = len(sap['tabs']) pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = len(sap['tabs'])
for tab_idx, tab in enumerate(sap['tabs']): for tab_idx, tab in enumerate(sap['tabs']):
...@@ -219,6 +167,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -219,6 +167,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
pipeline_parset['Beam[%s].TiedArrayBeam[%s].coherent' % (sap_idx, tab_idx)] = False pipeline_parset['Beam[%s].TiedArrayBeam[%s].coherent' % (sap_idx, tab_idx)] = False
stokes_settings = pipeline['incoherent'] stokes_settings = pipeline['incoherent']
nr_subbands = len(sap['subbands']) or len(digi_beams[sap_idx]['subbands'])
nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file']) nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
nr_stokes = len(stokes_settings['stokes']) nr_stokes = len(stokes_settings['stokes'])
...@@ -234,23 +183,25 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -234,23 +183,25 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
if cobalt_version >= 2: if cobalt_version >= 2:
pipeline_parset['Beam[%s].subbandList' % sap_idx] = sap['subbands'] pipeline_parset['Beam[%s].subbandList' % sap_idx] = sap['subbands']
if cobalt_version >= 2: if cobalt_version == 1:
# This won't overwrite anything, since COBALT1 supports only one beamformer pipeline
parset["Cobalt.BeamFormer.stationList"] = pipeline['stations']
else:
pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations'] pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations']
return pipeline_parset beamformer_pipeline_parsets.append(pipeline_parset)
def flyseye_pipeline_to_parset(pipeline, coherent_dataproducts):
""" Return parset settings for a fly's eye pipeline in the subtask, and augments coherent_dataproducts with any files produced."""
# Process fly's eye pipelines
for pipeline in spec['COBALT']['beamformer']['flyseye_pipelines']:
pipeline_parset = {} pipeline_parset = {}
pipeline_parset.update(add_prefix(stokes_to_parset(pipeline['coherent']), "CoherentStokes.")) pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))
pipeline_parset['flysEye'] = True pipeline_parset['flysEye'] = True
pipeline_parset['nrBeams'] = len(spec['stations']['digital_pointings']) pipeline_parset['nrBeams'] = len(digi_beams)
for sap_idx, sap in enumerate(spec['stations']['digital_pointings']): for sap_idx, sap in enumerate(digi_beams):
sap_idx = sap_index(spec['stations']['digital_pointings'], sap['name']) sap_idx = _sap_index(digi_beams, sap['name'])
nr_subbands = len(sap['subbands'])
# Generate coherent TABs for each antenna field
stations = pipeline['stations'] or spec['stations']['station_list'] stations = pipeline['stations'] or spec['stations']['station_list']
antennaset = spec['stations']['antenna_set'] antennaset = spec['stations']['antenna_set']
fields = sum([antenna_fields(station, antennaset) for station in stations], []) fields = sum([antenna_fields(station, antennaset) for station in stations], [])
...@@ -258,6 +209,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -258,6 +209,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
for field in fields: for field in fields:
stokes_settings = pipeline['coherent'] stokes_settings = pipeline['coherent']
nr_subbands = len(sap['subbands'])
nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file']) nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
nr_stokes = len(stokes_settings['stokes']) nr_stokes = len(stokes_settings['stokes'])
...@@ -272,24 +224,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -272,24 +224,7 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = 0 pipeline_parset['Beam[%s].nrTiedArrayBeams' % sap_idx] = 0
return pipeline_parset beamformer_pipeline_parsets.append(pipeline_parset)
# TODO: do not use SubtaskOutput.objects.filter but make subtask.subtask_outputs work
subtask_outputs = list(models.SubtaskOutput.objects.filter(subtask_id=subtask.id))
subtask_output_ids = [o.id for o in subtask_outputs]
# TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order
dataproducts = list(models.Dataproduct.objects.filter(producer_id__in=subtask_output_ids).filter(dataformat=Dataformat.Choices.MEASUREMENTSET.value).filter(datatype=Datatype.Choices.TIME_SERIES.value).order_by('filename'))
coherent_dataproducts = []
incoherent_dataproducts = []
beamformer_pipeline_parsets = []
for pipeline in spec['COBALT']['beamformer']['tab_pipelines']:
beamformer_pipeline_parsets.append(beamformer_pipeline_to_parset(pipeline, coherent_dataproducts, incoherent_dataproducts))
for pipeline in spec['COBALT']['beamformer']['flyseye_pipelines']:
beamformer_pipeline_parsets.append(flyseye_pipeline_to_parset(pipeline, coherent_dataproducts))
# global parset also needs flys eye set if any pipeline uses it # global parset also needs flys eye set if any pipeline uses it
parset['Cobalt.BeamFormer.flysEye'] = (len(spec['COBALT']['beamformer']['flyseye_pipelines']) > 0) parset['Cobalt.BeamFormer.flysEye'] = (len(spec['COBALT']['beamformer']['flyseye_pipelines']) > 0)
...@@ -303,13 +238,12 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -303,13 +238,12 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
raise ConversionException("COBALT1 only supports one beamformer pipeline. %d were specified." % len(beamformer_pipeline_parsets)) raise ConversionException("COBALT1 only supports one beamformer pipeline. %d were specified." % len(beamformer_pipeline_parsets))
# Beam keys are merged under Observation # Beam keys are merged under Observation
print(beamformer_pipeline_parsets[0]) parset.update(_add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if not k.startswith("Beam")}, "Cobalt.BeamFormer."))
parset.update(add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if not k.startswith("Beam")}, "Cobalt.BeamFormer.")) parset.update(_add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if k.startswith("Beam")}, "Observation."))
parset.update(add_prefix({k:v for k,v in beamformer_pipeline_parsets[0].items() if k.startswith("Beam")}, "Observation."))
else: else:
parset['Cobalt.BeamFormer.nrPipelines'] = len(beamformer_pipeline_parsets) parset['Cobalt.BeamFormer.nrPipelines'] = len(beamformer_pipeline_parsets)
for pipeline_idx, pipeline_parset in enumerate(beamformer_pipeline_parsets): for pipeline_idx, pipeline_parset in enumerate(beamformer_pipeline_parsets):
parset.update(add_prefix(pipeline_parset, "Cobalt.BeamFormer.Pipeline[%s]." % pipeline_idx)) parset.update(_add_prefix(pipeline_parset, "Cobalt.BeamFormer.Pipeline[%s]." % pipeline_idx))
# Filenames & locations are split for coherent & incoherent dataproducts. The following order is used, from slowest to fastest changing dimension: # Filenames & locations are split for coherent & incoherent dataproducts. The following order is used, from slowest to fastest changing dimension:
# #
...@@ -333,6 +267,95 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -333,6 +267,95 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
parset["Observation.DataProducts.Output_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] parset["Observation.DataProducts.Output_CoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]
parset["Observation.DataProducts.Output_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))] parset["Observation.DataProducts.Output_IncoherentStokes.identifications"] = ["TMSS_subtask_%s.SAP%03d" % (subtask.id, sap_nr) for sap_nr in range(len(digi_beams))]
return parset
def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtask) -> dict:
# make sure the spec is complete (including all non-filled in properties with default)
spec = add_defaults_to_json_object_for_schema(subtask.specifications_doc, resolved_refs(subtask.specifications_template.schema))
# -----------------------------------------------------------------------------------------------
# Historic rationale: in TMSS-183 we made MAC run an actual observation from a TMSS specification.
# With the help of Auke and Jan-David I could generate the parset as defined below.
# MAC turned out to be very sensitive for having specific keys with very specific prefixes etc.
# As a result, the generated parset contains many "duplicate"(nested) keys.
# We all agree that this is ugly, and we should not want this, but hey... it works.
# We decided to keep it like this, and maybe do more tuning/pruning later in the TMSS project.
# Or, we can just get rid of this to-parset-adaper when MAC has been rewritten to the new station API.
# -----------------------------------------------------------------------------------------------
# ----------------------------
# Generic settings
# ----------------------------
parset = dict() # parameterset has no proper assignment operators, so take detour via dict...
parset["Observation.ObsID"] = subtask.pk
parset["Observation.momID"] = 0 # Needed by MACScheduler
parset["Observation.otdbID"] = 0 # Needed by MACScheduler; should/can this be the same as subtask.pk?
parset["Observation.tmssID"] = subtask.pk
parset["Observation.processType"] = subtask.specifications_template.type.value.capitalize()
parset["Observation.processSubtype"] = "Beam Observation"
parset["Observation.Campaign.name"] = subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name
parset["Observation.startTime"] = formatDatetime(subtask.start_time) if isinstance(subtask.start_time, datetime) else subtask.start_time
parset["Observation.stopTime"] = formatDatetime(subtask.stop_time) if isinstance(subtask.stop_time, datetime) else subtask.stop_time
parset["Observation.strategy"] = "default" # maybe not mandatory?
# ----------------------------
# Station settings
# ----------------------------
parset["Observation.VirtualInstrument.minimalNrStations"] = 1 # maybe not mandatory?
parset["Observation.VirtualInstrument.stationSet"] = "Custom" # maybe not mandatory?
parset["Observation.VirtualInstrument.stationList"] = spec["stations"]["station_list"]
parset["Observation.antennaArray"] = "HBA" if "HBA" in spec["stations"]["antenna_set"] else "LBA" # maybe not mandatory?
parset["Observation.antennaSet"] = spec["stations"]["antenna_set"]
parset["Observation.bandFilter"] = spec["stations"]["filter"]
parset["Observation.sampleClock"] = 200 # fixed value, no other values are supported
parset["Observation.nrBitsPerSample"] = 8 # fixed value, no other values are supported.
# Digital beams
digi_beams = spec['stations']['digital_pointings']
parset["Observation.nrBeams"] = len(digi_beams)
for beam_nr, digi_beam in enumerate(digi_beams):
beam_prefix = "Observation.Beam[%d]." % beam_nr
parset[beam_prefix+"directionType"] = digi_beam['pointing']['direction_type']
parset[beam_prefix+"angle1"] = digi_beam['pointing']['angle1']
parset[beam_prefix+"angle2"] = digi_beam['pointing']['angle2']
parset[beam_prefix+"target"] = digi_beam['name']
parset[beam_prefix+"subbandList"] = digi_beam['subbands']
parset[beam_prefix+"nrTiedArrayBeams"] = 0
parset[beam_prefix+"nrTabRings"] = 0
# Analog beam (=HBA tile beam)
analog_beam = spec['stations']['analog_pointing']
parset["Observation.nrAnaBeams"] = 1
beam_prefix = "Observation.AnaBeam[0]."
parset[beam_prefix+"directionType"] = analog_beam['direction_type']
parset[beam_prefix+"angle1"] = analog_beam['angle1']
parset[beam_prefix+"angle2"] = analog_beam['angle2']
# ----------------------------
# COBALT settings
# ----------------------------
cobalt_version = spec['COBALT']['version']
parset["Cobalt.realTime"] = True
parset["Cobalt.blockSize"] = spec['COBALT']['blocksize']
parset["Cobalt.correctBandPass"] = spec['COBALT']['bandpass_correction']
parset["Cobalt.delayCompensation"] = spec['COBALT']['delay_compensation']
parset["Observation.Cluster.ProcessingCluster.clusterName"] = subtask.cluster.name
# Correlator settings
parset.update(_convert_correlator_settings_to_parset_dict(subtask, spec))
# Beamformer settings
parset.update(_convert_beamformer_settings_to_parset_dict(subtask, spec))
# ResourceEstimator wants all Cobalt keys to start with Observation.ObservationControl.OnlineControl.
parset.update(_add_prefix({k:v for k,v in parset.items() if k.startswith("Cobalt.")}, "Observation.ObservationControl.OnlineControl."))
# ---------------------------- # ----------------------------
# MAC settings # MAC settings
...@@ -360,9 +383,6 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas ...@@ -360,9 +383,6 @@ def _convert_to_parset_dict_for_observationcontrol_schema(subtask: models.Subtas
parset[prefix+"ObservationControl.StationControl.aartfaacPiggybackAllowed"] = False parset[prefix+"ObservationControl.StationControl.aartfaacPiggybackAllowed"] = False
parset[prefix+"ObservationControl.StationControl.tbbPiggybackAllowed"] = False parset[prefix+"ObservationControl.StationControl.tbbPiggybackAllowed"] = False
# ResourceEstimator wants all Cobalt keys to start with Observation.ObservationControl.OnlineControl.
parset.update(add_prefix({k:v for k,v in parset.items() if k.startswith("Cobalt.")}, "Observation.ObservationControl.OnlineControl."))
return parset return parset
......
...@@ -81,8 +81,7 @@ class ObservationParsetAdapterTest(unittest.TestCase): ...@@ -81,8 +81,7 @@ class ObservationParsetAdapterTest(unittest.TestCase):
subtask = self.create_subtask(specifications_doc) subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask) parset = convert_to_parset_dict(subtask)
logger.info("test_correlator parset:",parset)
print("correlator parset:",parset)
self.assertEqual(True, parset["Observation.DataProducts.Output_Correlated.enabled"]) self.assertEqual(True, parset["Observation.DataProducts.Output_Correlated.enabled"])
self.assertEqual(False, parset["Observation.DataProducts.Output_CoherentStokes.enabled"]) self.assertEqual(False, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
...@@ -122,8 +121,7 @@ class ObservationParsetAdapterTest(unittest.TestCase): ...@@ -122,8 +121,7 @@ class ObservationParsetAdapterTest(unittest.TestCase):
subtask = self.create_subtask(specifications_doc) subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask) parset = convert_to_parset_dict(subtask)
logger.info("test_flyseye parset:",parset)
print("FE parset:",parset)
self.assertEqual(True, parset["Cobalt.BeamFormer.flysEye"]) self.assertEqual(True, parset["Cobalt.BeamFormer.flysEye"])
self.assertEqual(True, parset["Observation.DataProducts.Output_CoherentStokes.enabled"]) self.assertEqual(True, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
...@@ -182,8 +180,7 @@ class ObservationParsetAdapterTest(unittest.TestCase): ...@@ -182,8 +180,7 @@ class ObservationParsetAdapterTest(unittest.TestCase):
subtask = self.create_subtask(specifications_doc) subtask = self.create_subtask(specifications_doc)
parset = convert_to_parset_dict(subtask) parset = convert_to_parset_dict(subtask)
logger.info("test_beamformer parset:",parset)
print("beamformer parset:",parset)
self.assertEqual(True, parset["Observation.DataProducts.Output_CoherentStokes.enabled"]) self.assertEqual(True, parset["Observation.DataProducts.Output_CoherentStokes.enabled"])
self.assertEqual(nr_cs_files, len(parset["Observation.DataProducts.Output_CoherentStokes.filenames"])) self.assertEqual(nr_cs_files, len(parset["Observation.DataProducts.Output_CoherentStokes.filenames"]))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment