-
Jorrit Schaap authoredJorrit Schaap authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subtasks.py 64.70 KiB
import logging
logger = logging.getLogger(__name__)
from functools import cmp_to_key
from collections.abc import Iterable
from lofar.common.datetimeutils import formatDatetime
from lofar.common import isProductionEnvironment
from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema
from lofar.common.lcu_utils import get_current_stations
from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException
from datetime import datetime, timedelta
from lofar.common.datetimeutils import parseDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema
from lofar.sas.tmss.tmss.tmssapp.models import *
from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict
from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, BlockConstraints, BlockSize
from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset_and_station
# ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ====
def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool:
if task_blueprint.do_cancel:
raise SubtaskCreationException("Cannot create subtasks from task_blueprint id=%d, because the task_blueprint is explicit set to cancel." % task_blueprint.id)
return True
def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.'''
check_prerequities_for_subtask_creation(task_blueprint)
subtasks = []
# recurse over predecessors, so that all dependencies in predecessor subtasks can be met.
for predecessor in task_blueprint.predecessors.all():
subtasks.extend(create_subtasks_from_task_blueprint(predecessor))
if task_blueprint.subtasks.count() > 0:
logger.debug("skipping creation of subtasks because they already exist for task_blueprint id=%s, name='%s', task_template_name='%s'",
task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.name)
return subtasks
# fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint
generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint,
create_qafile_subtask_from_task_blueprint,
create_qaplots_subtask_from_task_blueprint],
'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint]}
generators_mapping['calibrator observation'] = generators_mapping['target observation']
template_name = task_blueprint.specifications_template.name
if template_name in generators_mapping:
generators = generators_mapping[template_name]
for generator in generators:
try:
subtask = generator(task_blueprint)
if subtask is not None:
subtasks.append(subtask)
except SubtaskCreationException as e:
logger.error(e)
return subtasks
else:
logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
raise SubtaskCreationException('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
def create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint: TaskBlueprint) -> (dict, SubtaskTemplate):
"""
Create a valid observation subtask specification ('observation control' SubtaskTemplate schema) based on the task_blueprint's settings
"""
# check if task_blueprint has an observation-like specification
if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation']:
raise SubtaskCreationException("Cannot create observation subtask specifications from task_blueprint id=%s with template name='%s'" % (
task_blueprint.id, task_blueprint.specifications_template.name))
# start with an observation subtask specification with all the defaults and the right structure according to the schema
subtask_template = SubtaskTemplate.objects.get(name='observation control')
subtask_spec = get_default_json_object_for_schema(subtask_template.schema)
# wipe the default pointings, these should come from the task_spec
subtask_spec['stations']['analog_pointing'] = {}
subtask_spec['stations']['digital_pointings'] = []
# now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec
task_spec = task_blueprint.specifications_doc
# The calibrator has a minimal calibration-specific specification subset.
# The rest of it's specs are 'shared' with the target observation.
# So... copy the calibrator specs first, then loop over the shared target/calibrator specs...
if 'calibrator' in task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint = get_related_target_observation_task_blueprint(task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (
task_blueprint.id, task_blueprint.specifications_template.name))
target_task_spec = target_task_blueprint.specifications_doc
if task_spec.get('autoselect', True):
logger.info("auto-selecting calibrator target based on elevation of target observation...")
# Get related Target Observation Task
if "tile_beam" in target_task_spec:
subtask_spec['stations']['analog_pointing'] = {
"direction_type": target_task_spec["tile_beam"]["direction_type"],
"angle1": target_task_spec["tile_beam"]["angle1"],
"angle2": target_task_spec["tile_beam"]["angle2"]}
else:
raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint "
"id=%s in auto-select mode, because the related target observation "
"task_blueprint id=%s has no tile beam pointing defined" % (
task_blueprint.id, target_task_blueprint.id))
else:
subtask_spec['stations']['analog_pointing'] = {"direction_type": task_spec["pointing"]["direction_type"],
"angle1": task_spec["pointing"]["angle1"],
"angle2": task_spec["pointing"]["angle2"]}
# for the calibrator, the digital pointing is equal to the analog pointing
subtask_spec['stations']['digital_pointings'] = [ {'name': task_spec['name'],
'subbands': list(range(0,488)), # there are no subbands for the calibrator pointing in the task spec
'pointing': subtask_spec['stations']['analog_pointing'] } ]
# Use the Task Specification of the Target Observation
task_spec = target_task_spec
logger.info("Using station and correlator settings for calibrator observation task_blueprint id=%s from target observation task_blueprint id=%s",
task_blueprint.id, target_task_blueprint.id)
subtask_spec['stations']["antenna_set"] = task_spec["antenna_set"]
subtask_spec['stations']["filter"] = task_spec["filter"]
# At this moment of subtask creation we known which stations we *want* from the task_spec
# But we do not know yet which stations are available at the moment of observing.
# So, we decided that we set the subtask station_list as the union of all stations in all specified groups.
# This way, the user can see which stations are (likely) to be used.
# At the moment of scheduling of this subtask, then station_list is re-evaluated, and the max_nr_missing per group is validated.
subtask_spec['stations']['station_list'] = []
if "station_groups" in task_spec:
for station_group in task_spec["station_groups"]:
subtask_spec['stations']['station_list'].extend(station_group["stations"])
# make list have unique items
subtask_spec['stations']['station_list'] = sorted(list(set(subtask_spec['stations']['station_list'])))
if not subtask_spec['stations']['station_list']:
raise SubtaskCreationException("Cannot create observation subtask specifications for task_blueprint id=%s. No stations are defined." % (task_blueprint.id,))
if 'calibrator' not in task_blueprint.specifications_template.name.lower():
# copy/convert the analoge/digital_pointings only for non-calibrator observations (the calibrator has its own pointing)
for sap in task_spec.get("SAPs", []):
subtask_spec['stations']['digital_pointings'].append(
{"name": sap["name"],
"target": sap["target"],
"pointing": {"direction_type": sap["digital_pointing"]["direction_type"],
"angle1": sap["digital_pointing"]["angle1"],
"angle2": sap["digital_pointing"]["angle2"]},
"subbands": sap["subbands"]
})
if "tile_beam" in task_spec:
subtask_spec['stations']['analog_pointing'] = { "direction_type": task_spec["tile_beam"]["direction_type"],
"angle1": task_spec["tile_beam"]["angle1"],
"angle2": task_spec["tile_beam"]["angle2"] }
if "correlator" in task_spec:
corr = CorrelatorSettings()
corr.nrChannelsPerSubband = task_spec["correlator"]["channels_per_subband"]
corr.integrationTime = task_spec["correlator"]["integration_time"]
calculator = BlockSize(constraints=BlockConstraints(correlatorSettings=corr))
subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = calculator.nrBlocks
subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = calculator.nrSubblocks
# make sure that the subtask_spec is valid conform the schema
validate_json_against_schema(subtask_spec, subtask_template.schema)
return subtask_spec, subtask_template
def get_stations_in_group(station_group_name: str) -> []:
'''Get a list of station names in the given station_group.
A lookup is performed in the RADB, in the virtual instrument table'''
# TODO Make names RA and TMSS spec equal: 'NL' or 'DUTCH'?
if station_group_name == "DUTCH":
station_group_name = "NL"
#International required is by defintion DE601 or DE605, take 601 for now
# TODO check with RA the availability of both stations
if station_group_name == "INTERNATIONAL_REQUIRED":
return ["DE601"]
with RADBRPC.create() as radbrpc:
resource_group_memberships = radbrpc.getResourceGroupMemberships()['groups']
station_resource_group = next(rg for rg in resource_group_memberships.values()
if (rg['resource_group_type'] == 'station_group' or rg['resource_group_type'] == 'virtual') and rg['resource_group_name'] == station_group_name)
station_names = set(resource_group_memberships[rg_id]['resource_group_name'] for rg_id in station_resource_group['child_ids']
if resource_group_memberships[rg_id]['resource_group_type'] == 'station')
# HACK, RS408 should be removed from the RADB
if 'RS408' in station_names:
station_names.remove('RS408')
# HACK remove TEST1 from station list otherwise validate will fail
if 'TEST1' in station_names:
station_names.remove('TEST1')
return sorted(list(station_names))
def get_related_target_observation_task_blueprint(calibrator_task_blueprint: TaskBlueprint) -> TaskBlueprint:
"""
get the related target observation task_blueprint for the given calibrator task_blueprint
if nothing found return None
"""
if 'calibrator' not in calibrator_task_blueprint.specifications_template.name.lower():
raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator task_blueprint id=%s template_name='%s'",
calibrator_task_blueprint.id, calibrator_task_blueprint.specifications_template.name)
try:
return next(relation.second for relation in TaskSchedulingRelationBlueprint.objects.filter(first=calibrator_task_blueprint).all()
if relation.second is not None and relation.second.specifications_template.name.lower() == 'target observation')
except StopIteration:
try:
return next(relation.first for relation in TaskSchedulingRelationBlueprint.objects.filter(second=calibrator_task_blueprint).all()
if relation.first is not None and relation.first.specifications_template.name.lower() == 'target observation')
except StopIteration:
logger.info("No related target observation task_blueprint found for calibrator observation task_blueprint id=%d", calibrator_task_blueprint.id)
return None
def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
"""
Create an observation control subtask .
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
"""
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 1: create subtask in defining state
specifications_doc, subtask_template = create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint)
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"specifications_doc": specifications_doc,
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"tags": [],
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": Cluster.objects.get(name=cluster_name)
}
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
# an observation has no input, it just produces output data
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
return subtask
def create_qafile_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
if not observation_subtasks:
raise SubtaskCreationException("Cannot create %s subtask for task_blueprint id=%d because it has no observation subtask(s)" % (
SubtaskType.Choices.QA_FILES.value, task_blueprint.pk))
observation_subtask = observation_subtasks[-1] # TODO: decide what to do when there are multiple observation subtasks?
return create_qafile_subtask_from_observation_subtask(observation_subtask)
def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) -> Subtask:
''' Create a subtask to convert the observation output to a QA h5 file.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(observation_subtask.task_blueprint)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
SubtaskType.Choices.QA_FILES.value, observation_subtask.pk,
observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value))
if observation_subtask.state.value == SubtaskState.Choices.DEFINING.value:
raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED" % (
SubtaskType.Choices.QA_FILES.value, observation_subtask.pk))
obs_task_spec = get_observation_task_specification_with_check_for_calibrator(observation_subtask)
obs_task_qafile_spec = obs_task_spec.get("QA", {}).get("file_conversion", {})
if not obs_task_qafile_spec.get("enabled", False):
logger.debug("Skipping creation of qafile_subtask because QA.file_conversion is not enabled")
return None
# step 1: create subtask in defining state, with filled-in subtask_template
qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion")
qafile_subtask_spec = add_defaults_to_json_object_for_schema({}, qafile_subtask_template.schema)
qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands")
qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps")
validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema)
qafile_subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": observation_subtask.task_blueprint,
"specifications_template": qafile_subtask_template,
"specifications_doc": qafile_subtask_spec,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": observation_subtask.cluster}
qafile_subtask = Subtask.objects.create(**qafile_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = get_default_json_object_for_schema(selection_template.schema)
qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask,
producer=observation_subtask.outputs.first(), # TODO: determine proper producer based on spec in task_relation_blueprint
selection_doc=selection_doc,
selection_template=selection_template)
qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask)
# step 3: set state to DEFINED
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
qafile_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qa_file_subtask
return qafile_subtask
def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
qafile_subtasks = [st for st in task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value]
if qafile_subtasks:
qafile_subtask = qafile_subtasks[0] # TODO: decide what to do when there are multiple qafile subtasks?
return create_qaplots_subtask_from_qafile_subtask(qafile_subtask)
else:
raise SubtaskCreationException('Cannot create QA plotting subtask for task id=%s because no predecessor QA file conversion subtask exists.' % (task_blueprint.pk, ))
def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subtask:
''' Create a subtask to create inspection plots from the QA h5 file.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint)
if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk,
qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
obs_task_spec = get_observation_task_specification_with_check_for_calibrator(qafile_subtask)
obs_task_qaplots_spec = obs_task_spec.get("QA", {}).get("plots", {})
if not obs_task_qaplots_spec.get("enabled", False):
logger.debug("Skipping creation of qaplots_subtask because QA.plots is not enabled")
return None
# step 1: create subtask in defining state, with filled-in subtask_template
qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots")
qaplots_subtask_spec_doc = add_defaults_to_json_object_for_schema({}, qaplots_subtask_template.schema)
qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation")
qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation")
validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema)
qaplots_subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": qafile_subtask.task_blueprint,
"specifications_template": qaplots_subtask_template,
"specifications_doc": qaplots_subtask_spec_doc,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": qafile_subtask.cluster}
qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = get_default_json_object_for_schema(selection_template.schema)
qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask,
producer=qafile_subtask.outputs.first(),
selection_doc=selection_doc,
selection_template=selection_template)
qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask)
# step 3: set state to DEFINED
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
qaplots_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
return qaplots_subtask
def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
''' Create a subtask to for the preprocessing pipeline.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# TODO: go more elegant lookup of predecessor observation task
observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all()
if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)]
if not observation_predecessor_tasks:
raise SubtaskCreationException("Cannot create a subtask for task_blueprint id=%s because it is not connected "
"to an observation predecessor (sub)task." % task_blueprint.pk)
# step 1: create subtask in defining state, with filled-in subtask_template
subtask_template = SubtaskTemplate.objects.get(name='pipeline control')
default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema)
subtask_specs = _generate_subtask_specs_from_preprocessing_task_specs(task_blueprint.specifications_doc, default_subtask_specs)
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"specifications_doc": subtask_specs,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": Cluster.objects.get(name=cluster_name) }
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
for task_relation_blueprint in task_blueprint.produced_by.all():
producing_task_blueprint = task_relation_blueprint.producer
# TODO: apply some better filtering. Now we're just connecting it to all predecessor observation subtasks
predecessor_observation_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
for predecessor_obs_subtask in predecessor_observation_subtasks:
for predecessor_subtask_output in predecessor_obs_subtask.outputs.all():
subtask_input = SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
return subtask
# ==== various schedule* methods to schedule a Subtasks (if possible) ====
def schedule_subtask(subtask: Subtask) -> Subtask:
'''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.'''
check_prerequities_for_scheduling(subtask)
try:
if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
return schedule_pipeline_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
return schedule_observation_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value:
return schedule_qafile_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
return schedule_qaplots_subtask(subtask)
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." %
(subtask.pk, subtask.specifications_template.type.value))
except Exception as e:
try:
# set the subtask to state 'ERROR'...
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
subtask.save()
except Exception as e2:
logger.error(e2)
finally:
# ... and re-raise the original exception (wrapped)
raise SubtaskSchedulingException("Error while scheduling subtask id=%d: %s" % (subtask.pk, str(e)))
def unschedule_subtask(subtask: Subtask) -> Subtask:
'''unschedule the given subtask, removing all output dataproducts, and setting its state back to 'defined'.'''
if subtask.state.value != SubtaskState.Choices.SCHEDULED.value:
raise SubtaskSchedulingException("Cannot unschedule subtask id=%d because it is not SCHEDULED. Current state=%s" % (subtask.pk, subtask.state.value))
try:
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULING.value)
subtask.save()
for output in subtask.outputs.all():
output.dataproducts.all().delete()
#TODO: delete dataproduct transforms
_assign_or_unassign_resources(subtask)
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
except Exception as e:
try:
# set the subtask to state 'ERROR'...
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
subtask.save()
except Exception as e2:
logger.error(e2)
finally:
# ... and re-raise the original exception
raise
def unschedule_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint):
'''Convenience method: Unschedule (and return) all scheduled subtasks in the task_blueprint'''
scheduled_subtasks = list(task_blueprint.subtasks.filter(state__value=SubtaskState.Choices.SCHEDULED.value).all())
for subtask in scheduled_subtasks:
unschedule_subtask(subtask)
def schedule_subtask_and_update_successor_start_times(subtask: Subtask) -> Subtask:
scheduled_subtask = schedule_subtask(subtask)
shift_successors_until_after_stop_time(scheduled_subtask)
return scheduled_subtask
def update_subtasks_start_times_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint, start_time: datetime):
for task_blueprint in scheduling_unit.task_blueprints.all():
defined_independend_subtasks = task_blueprint.subtasks.filter(state__value='defined').filter(inputs=None).all()
for subtask in defined_independend_subtasks:
update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + subtask.task_blueprint.relative_start_time)
def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtask, start_time: datetime):
subtask.start_time = start_time
subtask.stop_time = subtask.start_time + subtask.specified_duration
subtask.save()
shift_successors_until_after_stop_time(subtask)
def shift_successors_until_after_stop_time(subtask: Subtask):
for successor in subtask.successors:
# by default, let the successor directly follow this tasks...
successor_start_time = subtask.stop_time
# ... but adjust it if there is a scheduling_relation with an offset.
# so, check if these successive subtasks have different task_blueprint parents
if subtask.task_blueprint.id != successor.task_blueprint.id:
relations = (TaskSchedulingRelationBlueprint.objects.filter(first=subtask.task_blueprint, second=successor.task_blueprint) |
TaskSchedulingRelationBlueprint.objects.filter(first=successor.task_blueprint, second=subtask.task_blueprint)).all()
if relations:
# there should be only one scheduling relation between the tasks
relation = relations[0]
successor_start_time += timedelta(seconds=relation.time_offset)
# update the starttime and recurse to shift the successor successors as well
update_start_time_and_shift_successors_until_after_stop_time(successor, successor_start_time)
def clear_defined_subtasks_start_stop_times_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint):
'''set start/stop times of all the subtasks in the scheduling unit to None'''
for task_blueprint in scheduling_unit.task_blueprints.all():
defined_subtasks = task_blueprint.subtasks.filter(state__value='defined').all()
for subtask in defined_subtasks:
subtask.start_time = None
subtask.stop_time = None
subtask.save()
def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
if subtask.state.value != SubtaskState.Choices.DEFINED.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value))
for predecessor in subtask.predecessors.all():
if predecessor.state.value != SubtaskState.Choices.FINISHED.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because its predecessor id=%s in not FINISHED but state=%s"
% (subtask.pk, predecessor.pk, predecessor.state.value))
# check if settings allow scheduling observations
setting = Setting.objects.get(name='allow_scheduling_observations')
if not setting.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because setting %s=%s does not allow that." %
(subtask.pk, setting.name, setting.value))
return True
def _assign_or_unassign_resources(subtask: Subtask):
if subtask.state.value not in [SubtaskState.Choices.SCHEDULING.value, SubtaskState.Choices.UNSCHEDULING.value]:
raise SubtaskSchedulingException("Cannot assign resources for subtask id=%d because it is not in (UN)SCHEDULING state. "
"Current state=%s" % (subtask.pk, subtask.state.value))
def create_ra_specification(_subtask):
parset_dict = convert_to_parset_dict(_subtask)
return { 'tmss_id': _subtask.id,
'task_type': _subtask.specifications_template.type.value.lower(),
'task_subtype': parset_dict.get("Observation.processSubtype","").lower(),
'status': 'prescheduled' if subtask.state.value == SubtaskState.Choices.SCHEDULING.value else 'approved',
'starttime': _subtask.start_time,
'endtime': _subtask.stop_time,
'cluster': _subtask.cluster.name,
'station_requirements': [],
'specification': parset_dict }
ra_spec = create_ra_specification(subtask)
ra_spec['predecessors'] = []
for pred in subtask.predecessors.all():
try:
ra_spec['predecessors'].append(create_ra_specification(pred))
except:
pass
with RARPC.create() as rarpc:
assigned = rarpc.do_assignment(ra_spec)
if not assigned:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because the required resources are not (fully) available." % (subtask.pk, ))
def schedule_qafile_subtask(qafile_subtask: Subtask):
''' Schedule the given qafile_subtask (which converts the observation output to a QA h5 file)
This method should typically be called upon the event of the observation_subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(qafile_subtask)
if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk,
qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
if len(qafile_subtask.inputs.all()) != 1:
raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qafile_subtask.id, len(qafile_subtask.inputs)))
# step 1: set state to SCHEDULING
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
qafile_subtask.save()
# step 2: link input dataproducts
qa_input = qafile_subtask.inputs.first()
qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
# step 3: resource assigner
# is a no-op for QA
# step 4: create output dataproducts, and link these to the output
# TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
if qafile_subtask.outputs.first():
qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%s_QA.h5" % (qa_input.producer.subtask_id, ),
directory="/data/qa/qa_files",
dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value),
datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct?
producer=qafile_subtask.outputs.first(),
specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema),
specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"),
feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema),
feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"),
sap=None # todo: do we need to point to a SAP here? Of which dataproduct then?
)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
qafile_subtask.save()
return qafile_subtask
def schedule_qaplots_subtask(qaplots_subtask: Subtask):
''' Schedule the given qaplots_subtask (which creates inspection plots from a QA h5 file)
This method should typically be called upon the event of the qafile_subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(qaplots_subtask)
if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk,
qaplots_subtask.specifications_template.type,
SubtaskType.Choices.QA_PLOTS.value))
if len(qaplots_subtask.inputs.all()) != 1:
raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs)))
# step 1: set state to SCHEDULING
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
qaplots_subtask.save()
# step 2: link input dataproducts
# this should typically be a single input with a single dataproduct (the qa h5 file)
qa_input = qaplots_subtask.inputs.first()
qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
# step 3: resource assigner
# is a no-op for QA
# step 4: create output dataproducts, and link these to the output
# TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
qafile_subtask = qaplots_subtask.predecessors.first()
obs_subtask = qafile_subtask.predecessors.first()
qaplots_subtask_dataproduct = Dataproduct.objects.create(directory="/data/qa/plots/L%s" % (obs_subtask.id, ),
dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value),
datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct?
producer=qaplots_subtask.outputs.first(),
specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema),
specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"),
feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema),
feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"),
sap=None # todo: do we need to point to a SAP here? Of which dataproduct then?
)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
qaplots_subtask.save()
return qaplots_subtask
# todo: this can probably go when we switch to the new start time calculation in the model properties (which is based on this logic)
def get_previous_related_task_blueprint_with_time_offset(task_blueprint):
"""
Retrieve the the previous related blueprint task object (if any)
if nothing found return None, 0.
:param task_blueprint:
:return: previous_related_task_blueprint,
time_offset (in seconds)
"""
logger.info("get_previous_related_task_blueprint_with_time_offset %s (id=%s)", task_blueprint.name, task_blueprint.pk)
previous_related_task_blueprint = None
time_offset = 0
scheduling_relations = list(task_blueprint.first_scheduling_relation.all()) + list(task_blueprint.second_scheduling_relation.all())
for scheduling_relation in scheduling_relations:
if scheduling_relation.first.id == task_blueprint.id and scheduling_relation.placement.value == "after":
previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.second.id)
time_offset = scheduling_relation.time_offset
if scheduling_relation.second.id == task_blueprint.id and scheduling_relation.placement.value == "before":
previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.first.id)
time_offset = scheduling_relation.time_offset
return previous_related_task_blueprint, time_offset
# todo: maybe this can now be replaced by subtask.relative_start_time
def calculate_start_time(observation_subtask: Subtask):
"""
Calculate the start time of an observation subtask. It should calculate the starttime in case of 'C-T-C train'
The start time of an observation depends on the start_time+duration and offset time of the previous observation
and so its scheduling relations should be known.
If there is no previous observation the 'default' start time is in two minutes from now
For demo purposes, will be changed into dynamic scheduled in the future
Note that the method is not robust now when previous start time is unknown. Also parallel observations are
not supported yet
:param observation_subtask:
:return: start_time (utc time)
"""
previous_related_task_blueprint, time_offset = get_previous_related_task_blueprint_with_time_offset(observation_subtask.task_blueprint)
if previous_related_task_blueprint is None:
# This is the first observation so take start time 2 minutes from now
now = datetime.utcnow()
next_start_time = now + timedelta(minutes=+2, seconds=-now.second, microseconds=-now.microsecond)
else:
# Get the duration of last/previous observation
duration_in_sec = previous_related_task_blueprint.specifications_doc["duration"]
logger.info("Duration of previous observation '%s' (id=%s) is %d seconds",
previous_related_task_blueprint.pk, previous_related_task_blueprint.pk, duration_in_sec)
# Get the previous observation subtask, should actually be one
lst_previous_subtasks_obs = [st for st in previous_related_task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
previous_subtask_obs = lst_previous_subtasks_obs[0]
logger.info("The previous observation subtask is id=%s", previous_subtask_obs.pk)
if previous_subtask_obs.start_time is None:
raise SubtaskSchedulingException("Cannot compute start_time for subtask id=%s because the its predecessor id=%s has not start_time" %(observation_subtask.id, previous_subtask_obs.id))
next_start_time = previous_subtask_obs.start_time + timedelta(seconds=duration_in_sec+time_offset)
return next_start_time
def schedule_observation_subtask(observation_subtask: Subtask):
''' Schedule the given observation_subtask
For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler.
For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(observation_subtask)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk,
observation_subtask.specifications_template.type,
SubtaskType.Choices.OBSERVATION.value))
# step 1: set state to SCHEDULING
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
observation_subtask.save()
# step 1a: check start/stop times
# start time should be known. If not raise. Then the user and/or scheduling service should supply a properly calculated/estimated start_time first.
if observation_subtask.start_time is None:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no start_time" % (observation_subtask.pk,
observation_subtask.specifications_template.type))
# always update the stop_time according to the spec
observation_subtask.stop_time = observation_subtask.start_time + observation_subtask.specified_duration
# step 2: define input dataproducts
# TODO: are there any observations that take input dataproducts?
# step 3: create output dataproducts, and link these to the output
specifications_doc = observation_subtask.specifications_doc
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") # todo: should this be derived from the task relation specification template?
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first()
directory = "/data/%s/%s/L%s/uv" % ("projects" if isProductionEnvironment() else "test-projects",
observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name,
observation_subtask.id)
for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']):
antennaset = specifications_doc['stations']['antenna_set']
antennafields = []
for station in specifications_doc['stations']['station_list']:
fields = antennafields_for_antennaset_and_station(antennaset, station)
antennafields += [{"station": station, "field": field, "type": antennaset.split('_')[0]} for field in fields]
sap = SAP.objects.create(specifications_doc={ "name": "%s_%s" % (observation_subtask.id, pointing['name']),
"identifiers": {}, # todo: TMSS-324
"pointing": pointing['pointing'],
"time": {"start_time": observation_subtask.start_time.isoformat(),
"duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()},
"antennas": {
"antenna_set": antennaset,
"fields": antennafields
}
},
specifications_template=SAPTemplate.objects.get(name="SAP"))
Dataproduct.objects.bulk_create([Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
directory=directory,
dataformat=Dataformat.objects.get(value="MeasurementSet"),
datatype=Datatype.objects.get(value="visibilities"),
producer=subtask_output,
specifications_doc={"sap": [str(sap_nr)]},
specifications_template=dataproduct_specifications_template,
feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
feedback_template=dataproduct_feedback_template,
size=0 if sb_nr%10==0 else 1024*1024*1024*sb_nr,
expected_size=1024*1024*1024*sb_nr,
sap=sap) for sb_nr in pointing['subbands']])
# step 4: resource assigner (if possible)
_assign_or_unassign_resources(observation_subtask)
# TODO: TMSS-382: evaluate the scheduled stations and see if the requiments given in the subtask.task_bluepring.specifications_doc are met for the station_groups and max_nr_missing.
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
observation_subtask.save()
return observation_subtask
def schedule_pipeline_subtask(pipeline_subtask: Subtask):
''' Schedule the given pipeline_subtask
This method should typically be called upon the event of an predecessor (observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(pipeline_subtask)
if pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type,
SubtaskType.Choices.PIPELINE.value))
# step 1: set state to SCHEDULING
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
pipeline_subtask.save()
# step 1a: check start/stop times
# not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
if pipeline_subtask.start_time is None:
now = datetime.utcnow()
logger.info("pipeline id=%s has no starttime. assigned default: %s", pipeline_subtask.pk, formatDatetime(now))
pipeline_subtask.start_time = now
# always update the stop_time according to the spec
pipeline_subtask.stop_time = pipeline_subtask.start_time + pipeline_subtask.specified_duration
# step 2: link input dataproducts
if pipeline_subtask.inputs.count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type))
# TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty"
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="empty")
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
# iterate over all inputs
for pipeline_subtask_input in pipeline_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)]
pipeline_subtask_input.dataproducts.set(dataproducts)
# select subtask output the new dataproducts will be linked to
pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output?
# step 3: create output dataproducts, and link these to the output
# TODO: create them from the spec, instead of "copying" the input filename
dataformat = Dataformat.objects.get(value="MeasurementSet")
input_dps = list(pipeline_subtask_input.dataproducts.all())
output_dp_objects = []
for input_dp in pipeline_subtask_input.dataproducts.all():
if '_' in input_dp.filename and input_dp.filename.startswith('L'):
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
else:
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
output_dp = Dataproduct(filename=filename,
directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)),
dataformat=dataformat,
datatype=Datatype.objects.get(value="visibilities"), # todo: is this correct?
producer=pipeline_subtask_output,
specifications_doc=get_default_json_object_for_schema(dataproduct_specifications_template.schema),
specifications_template=dataproduct_specifications_template,
feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
feedback_template=dataproduct_feedback_template,
sap=input_dp.sap)
output_dp_objects.append(output_dp)
output_dps = Dataproduct.objects.bulk_create(output_dp_objects)
pipeline_subtask_output.dataproducts.set(output_dps)
transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dps, output_dps)]
DataproductTransform.objects.bulk_create(transforms)
# step 4: resource assigner (if possible)
_assign_or_unassign_resources(pipeline_subtask)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
pipeline_subtask.save()
return pipeline_subtask
# === Misc ===
def create_and_schedule_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Convenience method: Create the subtasks form the task_blueprint, and schedule the ones that are not dependend on predecessors'''
create_subtasks_from_task_blueprint(task_blueprint)
return schedule_independent_subtasks_in_task_blueprint(task_blueprint)
def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint, start_time: datetime=None) -> [Subtask]:
'''Convenience method: Schedule (and return) the subtasks in the task_blueprint that are not dependend on any predecessors'''
independent_subtasks = list(Subtask.independent_subtasks().filter(task_blueprint_id=task_blueprint.id, state__value=SubtaskState.Choices.DEFINED.value).all())
for subtask in independent_subtasks:
if start_time is not None:
subtask.start_time = start_time
schedule_subtask_and_update_successor_start_times(subtask)
return independent_subtasks
def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs):
# preprocessing task default spec: {
# "storagemanager": "dysco",
# "flag": {"outerchannels": true, "autocorrelations": true, "rfi_strategy": "auto"},
# "demix": {"frequency_steps": 64, "time_steps": 10, "ignore_target": false, "sources": {}},
# "average": {"frequency_steps": 4, "time_steps": 1}}
# pipelinecontrol subtask default spec: {
# "storagemanager": "dysco",
# "demixer": {"baselines": "CS*,RS*&", "frequency_steps": 4, "time_steps": 1, "demix_frequency_steps": 4,
# "demix_time_steps": 1, "ignore_target": false, "demix_always": [], "demix_if_needed": []},
# "aoflagger": {"strategy": "HBAdefault"},
# "preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"},
# "preflagger1": {"corrtype": "auto"}}
# todo: check that this is actually how these need to be translated
# todo: especially check when defaults are NOT supposed to be set because the task implies to not include them
# todo: translate task "sources": {} - I guess this is demix_always/demix_if_needed?
# todo: set subtask demixer properties "baselines": "CS*,RS*&", "demix_always": [], "demix_if_needed": []
subtask_specs = {}
subtask_specs['storagemanager'] = preprocessing_task_specs.get('storagemanager',
default_subtask_specs.get('storagemanager'))
# todo: we depend on valid json here with knowledge about required properties. To generalize, we need to expect things to not be there.
if 'demix' or 'average' in preprocessing_task_specs:
# todo: should we exclude defaults in subtask.demixer if only one of these is defined on the task?
subtask_specs['demixer'] = default_subtask_specs['demixer']
if 'demix' in preprocessing_task_specs:
subtask_specs['demixer'].update({
"demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'],
"demix_time_steps": preprocessing_task_specs['demix']['time_steps'],
"ignore_target": preprocessing_task_specs['demix']['ignore_target']
}),
if 'average' in preprocessing_task_specs:
subtask_specs['demixer'].update({
"demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'],
"frequency_steps": preprocessing_task_specs['average']['frequency_steps'],
"demix_time_steps": preprocessing_task_specs['demix']['time_steps'],
"time_steps": preprocessing_task_specs['average']['time_steps'],
"ignore_target": preprocessing_task_specs['demix']['ignore_target']
}),
if 'flag' in preprocessing_task_specs:
if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none':
subtask_specs.update({"aoflagger": {"strategy": preprocessing_task_specs["flag"]["rfi_strategy"]}})
if preprocessing_task_specs["flag"]["rfi_strategy"] == 'auto':
# todo: handle 'auto' properly: we need to determine input dataproduct type and set LBA or HBA accordingly
# either here or allow 'auto' in subtask json and translate it when we connect obs to pipe subtask
default_strategy = default_subtask_specs['aoflagger']['strategy']
subtask_specs.update({"aoflagger": {"strategy": default_strategy}})
logger.warning('Translating aoflagger "auto" strategy to "%s" without knowing whether that makes sense!' % default_strategy)
if preprocessing_task_specs["flag"]["outerchannels"]:
subtask_specs.update({"preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"}})
if preprocessing_task_specs["flag"]["autocorrelations"]:
subtask_specs.update({"preflagger1": {"corrtype": "auto"}})
return subtask_specs
def specifications_doc_meets_selection_doc(specifications_doc, selection_doc):
"""
Filter specs by selection. This requires the specification_doc to...
A) ...contain ALL KEYS that we select / filter for
B) ...contain NO ADDITIONAL VALUES that are not selected / filtered for
:param specifications_doc: dataproduct specification as dict
:param selection_doc: selection filter as dict
:return: True when the input specifications_doc meets a filter described in selection_doc, False otherwise
"""
meets_criteria = True
for k, v in selection_doc.items():
if k.startswith('$'): # ignore stuff like $schema
continue
if k not in specifications_doc.keys():
meets_criteria = False
else:
spec = specifications_doc[k]
if isinstance(spec, Iterable) and isinstance(v, Iterable):
for spec_v in spec:
if spec_v not in v:
meets_criteria = False
else:
if spec != v:
meets_criteria = False
logger.debug("specs %s matches selection %s: %s" % (specifications_doc, selection_doc, meets_criteria))
return meets_criteria
def get_observation_task_specification_with_check_for_calibrator(subtask):
"""
Retrieve the observation task blueprint specifications_doc from the given subtask object
If the Task is a calibrator then the related Target Observation specification should be returned
:param: subtask object
:return: task_spec: the specifications_doc of the blue print task which is allways a target observation
"""
if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint = get_related_target_observation_task_blueprint(subtask.task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk)
task_spec = target_task_blueprint.specifications_doc
logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s",
subtask.task_blueprint.id, target_task_blueprint.id)
else:
task_spec = subtask.task_blueprint.specifications_doc
return task_spec