Commit 4a0745b7 authored by Jan David Mol's avatar Jan David Mol

TMSS-604: Initial implementation of scheduling beamformed dataproducts and...

TMSS-604: Initial implementation of scheduling beamformed dataproducts and putting those in the right order in the parset
parent 61ffa596
......@@ -176,11 +176,16 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
# marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
for s in range(nr_stokes):
for p in range(nr_parts):
# TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order
dataproduct = [dp for dp in dataproducts
if dp.specifications_doc["identifiers"]["sap_index"] == sap_idx
and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
and dp.specifications_doc["identifiers"]["tab_index"] == field_idx
and dp.specifications_doc["identifiers"]["stokes_index"] == s
and dp.specifications_doc["identifiers"]["part_index"] == p]
if tab['coherent']:
coherent_dataproducts.append(dataproducts.pop(0) if dataproducts else null_dataproduct)
coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
else:
incoherent_dataproducts.append(dataproducts.pop(0) if dataproducts else null_dataproduct)
incoherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
if cobalt_version >= 2:
pipeline_parset['Beam[%s].subbandList' % sap_idx] = sap['subbands']
......@@ -194,7 +199,8 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
beamformer_pipeline_parsets.append(pipeline_parset)
# Process fly's eye pipelines
for pipeline in spec['COBALT']['beamformer']['flyseye_pipelines']:
pipeline_idx_offset = len(beamformer_pipeline_parsets)
for pipeline_idx, pipeline in enumerate(spec['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_idx_offset):
pipeline_parset = {}
pipeline_parset.update(_add_prefix(_stokes_settings_parset_subkeys(pipeline['coherent']), "CoherentStokes."))
pipeline_parset['flysEye'] = True
......@@ -208,7 +214,7 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
antennaset = spec['stations']['antenna_set']
fields = sum([list(antenna_fields(station, antennaset)) for station in stations], [])
for field in fields:
for field_idx, field in enumerate(fields):
stokes_settings = pipeline['coherent']
nr_subbands = len(sap['subbands'])
......@@ -218,8 +224,13 @@ def _convert_beamformer_settings_to_parset_dict(subtask: models.Subtask, spec: d
# marshall dataproducts, but only if they're supplied. in some use cases, we want a parset before the subtask is scheduled.
for s in range(nr_stokes):
for p in range(nr_parts):
# TODO: don't assume ordering by filename is sufficient: we need to inspect the dataproduct properties to make sure saps and subbands are in the correct order
coherent_dataproducts.append(dataproducts.pop(0) if dataproducts else null_dataproduct)
dataproduct = [dp for dp in dataproducts
if dp.specifications_doc["identifiers"]["sap_index"] == sap_idx
and dp.specifications_doc["identifiers"]["pipeline_index"] == pipeline_idx
and dp.specifications_doc["identifiers"]["tab_index"] == field_idx
and dp.specifications_doc["identifiers"]["stokes_index"] == s
and dp.specifications_doc["identifiers"]["part_index"] == p]
coherent_dataproducts.append(dataproduct[0] if dataproduct else null_dataproduct)
if cobalt_version >= 2:
pipeline_parset['Beam[%s].stationList' % sap_idx] = pipeline['stations']
......
......@@ -10,6 +10,7 @@ from lofar.common.datetimeutils import formatDatetime, round_to_second_precision
from lofar.common import isProductionEnvironment
from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema
from lofar.common.lcu_utils import get_current_stations
from lofar.stationmodel.antennafields import antenna_fields
from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException, SubtaskException
......@@ -1142,7 +1143,7 @@ def schedule_observation_subtask(observation_subtask: Subtask):
observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name,
observation_subtask.id)
# create correlated dataproducts, in bulk
# create correlated dataproducts
if specifications_doc['COBALT']['correlator']['enabled']:
dataproduct_specifications_template_visibilities = DataproductSpecificationsTemplate.objects.get(name="visibilities")
......@@ -1167,12 +1168,64 @@ def schedule_observation_subtask(observation_subtask: Subtask):
sb_nr_offset += len(pointing['subbands'])
# Bulk create identifiers, and then update the dataproducts with a link to the actual created objects.
# This is needed as bulk_create needs to have any relations resolved.
dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts])
for dp, global_identifier in zip(dataproducts, dp_global_identifiers):
dp.global_identifier = global_identifier
Dataproduct.objects.bulk_create(dataproducts)
# create beamformer dataproducts
dataproduct_specifications_template_timeseries = DataproductSpecificationsTemplate.objects.get(name="timeseries")
pipeline_nr_offset = 0
def _sap_index(saps: dict, sap_name: str) -> int:
""" Return the SAP index in the observation given a certain SAP name. """
sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name]
# needs to be exactly one hit
if len(sap_indices) != 1:
raise SubtaskSchedulingException("SAP name %s must appear exactly once in the specification. It appeared %d times. Available names: %s" % (sap_name, len(sap_indices), [sap['name'] for sap in saps]))
return sap_indices[0]
def add_tab_dataproducts(sap_nr, pipeline_nr, tab_nr, stokes_settings, coherent):
nr_subbands = len(sap['subbands']) or len(specifications_doc['stations']['digital_pointings'][sap_idx]['subbands'])
nr_stokes = len(stokes_settings['stokes'])
nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
dataproducts.extend([Dataproduct(filename="L750406_SAP%03d_N%03d_B%03d_S%03d_P%03d_bf.h5" % (observation_subtask.id, sap_nr, pipeline_nr, tab_nr, stokes_nr, part_nr),
directory=directory+("/cs" if coherent else "/is"),
dataformat=Dataformat.objects.get(value="Beamformed"),
datatype=Datatype.objects.get(value="time series"),
producer=subtask_output,
specifications_doc={"identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr} },
specifications_template=dataproduct_specifications_template_timeseries,
feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
feedback_template=dataproduct_feedback_template,
size=0,
expected_size=1024*1024*1024*sb_nr,
sap=saps[sap_nr],
global_identifier=None)
for part_nr in range(nr_parts) for stokes_nr in range(nr_stokes)])
for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['tab_pipelines'], start=pipeline_nr_offset):
for sap in pipeline['SAPs']:
sap_idx = _sap_index(specifications_doc['stations']['digital_pointings'], sap['name'])
for tab_idx, tab in enumerate(sap['tabs']):
add_tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'] if tab['coherent'] else pipeline['incoherent'], tab['coherent'])
pipeline_nr_offset += len(specifications_doc['COBALT']['beamformer']['tab_pipelines'])
for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_nr_offset):
for sap_nr, sap in enumerate(specifications_doc['stations']['digital_pointings']):
stations = pipeline['stations'] or spec['stations']['station_list']
fields = sum([list(antenna_fields(station, antennaset)) for station in stations], [])
for tab_idx, tab in enumerate(fields):
add_tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'], True)
# Bulk create identifiers, and then update the dataproducts with a link to the actual created objects.
# This is needed as bulk_create needs to have any relations resolved.
dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts])
for dp, global_identifier in zip(dataproducts, dp_global_identifiers):
dp.global_identifier = global_identifier
Dataproduct.objects.bulk_create(dataproducts)
# step 4: resource assigner (if possible)
assign_or_unassign_resources(observation_subtask)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment