"readme.md" did not exist on "55b5a71d13f6936e52325e376f4519c67483b3d8"
Select Git revision
LoSoTo.ClockTec.cwl
-
Mattia Mancini authoredMattia Mancini authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subtasks.py 58.66 KiB
import logging
logger = logging.getLogger(__name__)
from functools import cmp_to_key
from collections.abc import Iterable
from lofar.common.datetimeutils import formatDatetime
from lofar.common import isProductionEnvironment
from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema
from lofar.common.lcu_utils import get_current_stations
from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException
from datetime import datetime, timedelta
from lofar.common.datetimeutils import parseDatetime
from lofar.common.json_utils import add_defaults_to_json_object_for_schema
from lofar.sas.tmss.tmss.tmssapp.models import *
from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict
from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, BlockConstraints, BlockSize
# ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ====
def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool:
if task_blueprint.do_cancel:
raise SubtaskCreationException("Cannot create subtasks from task_blueprint id=%d, because the task_blueprint is explicit set to cancel." % task_blueprint.id)
return True
def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.'''
check_prerequities_for_subtask_creation(task_blueprint)
subtasks = []
# recurse over predecessors, so that all dependencies in predecessor subtasks can be met.
for predecessor in task_blueprint.predecessors.all():
subtasks.extend(create_subtasks_from_task_blueprint(predecessor))
if task_blueprint.subtasks.count() > 0:
logger.debug("skipping creation of subtasks because they already exist for task_blueprint id=%s, name='%s', task_template_name='%s'",
task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.name)
return subtasks
# fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint
generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint,
create_qafile_subtask_from_task_blueprint,
create_qaplots_subtask_from_task_blueprint],
'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint]}
generators_mapping['calibrator observation'] = generators_mapping['target observation']
template_name = task_blueprint.specifications_template.name
if template_name in generators_mapping:
generators = generators_mapping[template_name]
for generator in generators:
try:
subtask = generator(task_blueprint)
if subtask is not None:
subtasks.append(subtask)
except SubtaskCreationException as e:
logger.error(e)
return subtasks
else:
logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
raise SubtaskCreationException('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
def create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint: TaskBlueprint) -> (dict, SubtaskTemplate):
"""
Create a valid observation subtask specification ('observation control' SubtaskTemplate schema) based on the task_blueprint's settings
"""
# check if task_blueprint has an observation-like specification
if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation']:
raise SubtaskCreationException("Cannot create observation subtask specifications from task_blueprint id=%s with template name='%s'" % (
task_blueprint.id, task_blueprint.specifications_template.name))
# start with an observation subtask specification with all the defaults and the right structure according to the schema
subtask_template = SubtaskTemplate.objects.get(name='observation control')
subtask_spec = get_default_json_object_for_schema(subtask_template.schema)
# wipe the default pointings, these should come from the task_spec
subtask_spec['stations']['analog_pointing'] = {}
subtask_spec['stations']['digital_pointings'] = []
# now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec
task_spec = task_blueprint.specifications_doc
# The calibrator has a minimal calibration-specific specification subset.
# The rest of it's specs are 'shared' with the target observation.
# So... copy the calibrator specs first, then loop over the shared target/calibrator specs...
if 'calibrator' in task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint = get_related_target_observation_task_blueprint(task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (
task_blueprint.id, task_blueprint.specifications_template.name))
target_task_spec = target_task_blueprint.specifications_doc
if task_spec.get('autoselect', True):
logger.info("auto-selecting calibrator target based on elevation of target observation...")
# Get related Target Observation Task
if "tile_beam" in target_task_spec:
subtask_spec['stations']['analog_pointing'] = {
"direction_type": target_task_spec["tile_beam"]["direction_type"],
"angle1": target_task_spec["tile_beam"]["angle1"],
"angle2": target_task_spec["tile_beam"]["angle2"]}
else:
raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint "
"id=%s in auto-select mode, because the related target observation "
"task_blueprint id=%s has no tile beam pointing defined" % (
task_blueprint.id, target_task_blueprint.id))
else:
subtask_spec['stations']['analog_pointing'] = {"direction_type": task_spec["pointing"]["direction_type"],
"angle1": task_spec["pointing"]["angle1"],
"angle2": task_spec["pointing"]["angle2"]}
# for the calibrator, the digital pointing is equal to the analog pointing
subtask_spec['stations']['digital_pointings'] = [ {'name': 'calibrator', # there is no name for the calibrator pointing in the task spec
'subbands': list(range(0,488)), # there are no subbands for the calibrator pointing in the task spec
'pointing': subtask_spec['stations']['analog_pointing'] } ]
# Use the Task Specification of the Target Observation
task_spec = target_task_spec
logger.info("Using station and correlator settings for calibrator observation task_blueprint id=%s from target observation task_blueprint id=%s",
task_blueprint.id, target_task_blueprint.id)
subtask_spec['stations']["antenna_set"] = task_spec["antenna_set"]
subtask_spec['stations']["filter"] = task_spec["filter"]
if "stations" in task_spec:
if "group" in task_spec["stations"]:
try:
# retrieve stations in group from RADB virtual instrument
station_group_name = task_spec["stations"]["group"]
subtask_spec['stations']['station_list'] = get_stations_in_group(station_group_name)
except Exception as e:
raise SubtaskCreationException("Could not determine stations in group '%s' for task_blueprint id=%s. Error: %s" % (
station_group_name, task_blueprint.id, e))
else:
subtask_spec['stations']['station_list'] = task_spec["stations"]
if 'calibrator' not in task_blueprint.specifications_template.name.lower():
# copy/convert the analoge/digital_pointings only for non-calibrator observations (the calibrator has its own pointing)
for sap in task_spec.get("SAPs", []):
subtask_spec['stations']['digital_pointings'].append(
{"name": sap["name"],
"pointing": {"direction_type": sap["digital_pointing"]["direction_type"],
"angle1": sap["digital_pointing"]["angle1"],
"angle2": sap["digital_pointing"]["angle2"]},
"subbands": sap["subbands"]
})
if "tile_beam" in task_spec:
subtask_spec['stations']['analog_pointing'] = { "direction_type": task_spec["tile_beam"]["direction_type"],
"angle1": task_spec["tile_beam"]["angle1"],
"angle2": task_spec["tile_beam"]["angle2"] }
if "correlator" in task_spec:
corr = CorrelatorSettings()
corr.nrChannelsPerSubband = task_spec["correlator"]["channels_per_subband"]
corr.integrationTime = task_spec["correlator"]["integration_time"]
calculator = BlockSize(constraints=BlockConstraints(correlatorSettings=corr))
subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = calculator.nrBlocks
subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = calculator.nrSubblocks
# make sure that the subtask_spec is valid conform the schema
validate_json_against_schema(subtask_spec, subtask_template.schema)
return subtask_spec, subtask_template
def get_stations_in_group(station_group_name: str) -> []:
'''Get a list of station names in the given station_group.
A lookup is performed in the RADB, in the virtual instrument table'''
with RADBRPC.create() as radbrpc:
resource_group_memberships = radbrpc.getResourceGroupMemberships()['groups']
station_resource_group = next(rg for rg in resource_group_memberships.values()
if (rg['resource_group_type'] == 'station_group' or rg['resource_group_type'] == 'virtual') and rg['resource_group_name'] == station_group_name)
station_names = set(resource_group_memberships[rg_id]['resource_group_name'] for rg_id in station_resource_group['child_ids']
if resource_group_memberships[rg_id]['resource_group_type'] == 'station')
# HACK, RS408 should be removed from the RADB
if 'RS408' in station_names:
station_names.remove('RS408')
return sorted(list(station_names))
def get_related_target_observation_task_blueprint(calibrator_task_blueprint: TaskBlueprint) -> TaskBlueprint:
"""
get the related target observation task_blueprint for the given calibrator task_blueprint
if nothing found return None
"""
if 'calibrator' not in calibrator_task_blueprint.specifications_template.name.lower():
raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator task_blueprint id=%s template_name='%s'",
calibrator_task_blueprint.id, calibrator_task_blueprint.specifications_template.name)
try:
return next(relation.second for relation in TaskSchedulingRelationBlueprint.objects.filter(first=calibrator_task_blueprint).all()
if relation.second is not None and relation.second.specifications_template.name.lower() == 'target observation')
except StopIteration:
try:
return next(relation.first for relation in TaskSchedulingRelationBlueprint.objects.filter(second=calibrator_task_blueprint).all()
if relation.first is not None and relation.first.specifications_template.name.lower() == 'target observation')
except StopIteration:
logger.info("No related target observation task_blueprint found for calibrator observation task_blueprint id=%d", calibrator_task_blueprint.id)
return None
def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
"""
Create an observation control subtask .
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
"""
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 1: create subtask in defining state
specifications_doc, subtask_template = create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint)
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"specifications_doc": specifications_doc,
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"tags": [],
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": Cluster.objects.get(name=cluster_name)
}
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
# an observation has no input, it just produces output data
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
return subtask
def create_qafile_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
if not observation_subtasks:
raise SubtaskCreationException("Cannot create %s subtask for task_blueprint id=%d because it has no observation subtask(s)" % (
SubtaskType.Choices.QA_FILES.value, task_blueprint.pk))
observation_subtask = observation_subtasks[-1] # TODO: decide what to do when there are multiple observation subtasks?
return create_qafile_subtask_from_observation_subtask(observation_subtask)
def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) -> Subtask:
''' Create a subtask to convert the observation output to a QA h5 file.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(observation_subtask.task_blueprint)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
SubtaskType.Choices.QA_FILES.value, observation_subtask.pk,
observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value))
if observation_subtask.state.value == SubtaskState.Choices.DEFINING.value:
raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED" % (
SubtaskType.Choices.QA_FILES.value, observation_subtask.pk))
obs_task_spec = get_observation_task_specification_with_check_for_calibrator(observation_subtask)
obs_task_qafile_spec = obs_task_spec.get("QA", {}).get("file_conversion", {})
if not obs_task_qafile_spec.get("enabled", False):
logger.debug("Skipping creation of qafile_subtask because QA.file_conversion is not enabled")
return None
# step 1: create subtask in defining state, with filled-in subtask_template
qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion")
qafile_subtask_spec = add_defaults_to_json_object_for_schema({}, qafile_subtask_template.schema)
qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands")
qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps")
validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema)
qafile_subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": observation_subtask.task_blueprint,
"specifications_template": qafile_subtask_template,
"specifications_doc": qafile_subtask_spec,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": observation_subtask.cluster}
qafile_subtask = Subtask.objects.create(**qafile_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = get_default_json_object_for_schema(selection_template.schema)
qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask,
producer=observation_subtask.outputs.first(), # TODO: determine proper producer based on spec in task_relation_blueprint
selection_doc=selection_doc,
selection_template=selection_template)
qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask)
# step 3: set state to DEFINED
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
qafile_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qa_file_subtask
return qafile_subtask
def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
qafile_subtasks = [st for st in task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value]
if qafile_subtasks:
qafile_subtask = qafile_subtasks[0] # TODO: decide what to do when there are multiple qafile subtasks?
return create_qaplots_subtask_from_qafile_subtask(qafile_subtask)
else:
raise SubtaskCreationException('Cannot create QA plotting subtask for task id=%s because no predecessor QA file conversion subtask exists.' % (task_blueprint.pk, ))
def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subtask:
''' Create a subtask to create inspection plots from the QA h5 file.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint)
if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk,
qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
obs_task_spec = get_observation_task_specification_with_check_for_calibrator(qafile_subtask)
obs_task_qaplots_spec = obs_task_spec.get("QA", {}).get("plots", {})
if not obs_task_qaplots_spec.get("enabled", False):
logger.debug("Skipping creation of qaplots_subtask because QA.plots is not enabled")
return None
# step 1: create subtask in defining state, with filled-in subtask_template
qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots")
qaplots_subtask_spec_doc = add_defaults_to_json_object_for_schema({}, qaplots_subtask_template.schema)
qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation")
qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation")
validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema)
qaplots_subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": qafile_subtask.task_blueprint,
"specifications_template": qaplots_subtask_template,
"specifications_doc": qaplots_subtask_spec_doc,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": qafile_subtask.cluster}
qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = get_default_json_object_for_schema(selection_template.schema)
qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask,
producer=qafile_subtask.outputs.first(),
selection_doc=selection_doc,
selection_template=selection_template)
qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask)
# step 3: set state to DEFINED
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
qaplots_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
return qaplots_subtask
def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
''' Create a subtask to for the preprocessing pipeline.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# TODO: go more elegant lookup of predecessor observation task
observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all()
if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)]
if not observation_predecessor_tasks:
raise SubtaskCreationException("Cannot create a subtask for task_blueprint id=%s because it is not connected "
"to an observation predecessor (sub)task." % task_blueprint.pk)
# step 1: create subtask in defining state, with filled-in subtask_template
subtask_template = SubtaskTemplate.objects.get(name='pipeline control')
default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema)
subtask_specs = _generate_subtask_specs_from_preprocessing_task_specs(task_blueprint.specifications_doc, default_subtask_specs)
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = { "start_time": None,
"stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"specifications_doc": subtask_specs,
"priority": 1,
"schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
"cluster": Cluster.objects.get(name=cluster_name) }
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
for task_relation_blueprint in task_blueprint.produced_by.all():
producing_task_blueprint = task_relation_blueprint.producer
# TODO: apply some better filtering. Now we're just connecting it to all predecessor observation subtasks
predecessor_observation_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
for predecessor_obs_subtask in predecessor_observation_subtasks:
for predecessor_subtask_output in predecessor_obs_subtask.outputs.all():
subtask_input = SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
return subtask
# ==== various schedule* methods to schedule a Subtasks (if possible) ====
def schedule_subtask(subtask: Subtask) -> Subtask:
'''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.'''
check_prerequities_for_scheduling(subtask)
try:
if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
return schedule_pipeline_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
return schedule_observation_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value:
return schedule_qafile_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
return schedule_qaplots_subtask(subtask)
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." %
(subtask.pk, subtask.specifications_template.type.value))
except Exception as e:
try:
# set the subtask to state 'ERROR'...
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
subtask.save()
except Exception as e2:
logger.error(e2)
finally:
# ... and re-raise the original exception
raise
def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
if subtask.state.value != SubtaskState.Choices.DEFINED.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value))
for predecessor in subtask.predecessors.all():
if predecessor.state.value != SubtaskState.Choices.FINISHED.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because its predecessor id=%s in not FINISHED but state=%s"
% (subtask.pk, predecessor.pk, predecessor.state.value))
# check if settings allow scheduling observations
setting = Setting.objects.get(name='allow_scheduling_observations')
if not setting.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because setting %s=%s does not allow that." %
(subtask.pk, setting.name, setting.value))
return True
def _assign_resources(subtask: Subtask):
if subtask.state.value != SubtaskState.Choices.SCHEDULING.value:
raise SubtaskSchedulingException("Cannot assign resources for subtask id=%d because it is not in SCHEDULING state. "
"Current state=%s" % (subtask.pk, subtask.state.value))
def create_ra_specification(_subtask):
parset_dict = convert_to_parset_dict(_subtask)
return { 'tmss_id': _subtask.id,
'task_type': _subtask.specifications_template.type.value.lower(),
'task_subtype': parset_dict.get("Observation.processSubtype","").lower(),
'status': 'prescheduled',
'starttime': _subtask.start_time,
'endtime': _subtask.stop_time,
'cluster': _subtask.cluster.name,
'station_requirements': [],
'specification': parset_dict }
ra_spec = create_ra_specification(subtask)
ra_spec['predecessors'] = []
for pred in subtask.predecessors.all():
try:
ra_spec['predecessors'].append(create_ra_specification(pred))
except:
pass
with RARPC.create() as rarpc:
assigned = rarpc.do_assignment(ra_spec)
if not assigned:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because the required resources are not (fully) available." % (subtask.pk, ))
def schedule_qafile_subtask(qafile_subtask: Subtask):
''' Schedule the given qafile_subtask (which converts the observation output to a QA h5 file)
This method should typically be called upon the event of the observation_subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(qafile_subtask)
if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk,
qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
if len(qafile_subtask.inputs.all()) != 1:
raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qafile_subtask.id, len(qafile_subtask.inputs)))
# step 1: set state to SCHEDULING
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
qafile_subtask.save()
# step 2: link input dataproducts
qa_input = qafile_subtask.inputs.first()
qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
# step 3: resource assigner
# is a no-op for QA
# step 4: create output dataproducts, and link these to the output
# TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
if qafile_subtask.outputs.first():
qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%s_QA.h5" % (qa_input.producer.subtask_id, ),
directory="/data/qa/qa_files",
dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value),
datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct?
producer=qafile_subtask.outputs.first(),
specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema),
specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"),
feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema),
feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"),
SAP=None # todo: do we need to point to a SAP here?
)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
qafile_subtask.save()
return qafile_subtask
def schedule_qaplots_subtask(qaplots_subtask: Subtask):
''' Schedule the given qaplots_subtask (which creates inspection plots from a QA h5 file)
This method should typically be called upon the event of the qafile_subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(qaplots_subtask)
if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk,
qaplots_subtask.specifications_template.type,
SubtaskType.Choices.QA_PLOTS.value))
if len(qaplots_subtask.inputs.all()) != 1:
raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs)))
# step 1: set state to SCHEDULING
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
qaplots_subtask.save()
# step 2: link input dataproducts
# this should typically be a single input with a single dataproduct (the qa h5 file)
qa_input = qaplots_subtask.inputs.first()
qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
# step 3: resource assigner
# is a no-op for QA
# step 4: create output dataproducts, and link these to the output
# TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
qafile_subtask = qaplots_subtask.predecessors.first()
obs_subtask = qafile_subtask.predecessors.first()
qaplots_subtask_dataproduct = Dataproduct.objects.create(directory="/data/qa/plots/L%s" % (obs_subtask.id, ),
dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value),
datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct?
producer=qaplots_subtask.outputs.first(),
specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema),
specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"),
feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema),
feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"),
SAP=None # todo: do we need to point to a SAP here?
)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
qaplots_subtask.save()
return qaplots_subtask
# todo: this can probably go when we switch to the new start time calculation in the model properties (which is based on this logic)
def get_previous_related_task_blueprint_with_time_offset(task_blueprint):
"""
Retrieve the the previous related blueprint task object (if any)
if nothing found return None, 0.
:param task_blueprint:
:return: previous_related_task_blueprint,
time_offset (in seconds)
"""
logger.info("get_previous_related_task_blueprint_with_time_offset %s (id=%s)", task_blueprint.name, task_blueprint.pk)
previous_related_task_blueprint = None
time_offset = 0
scheduling_relations = list(task_blueprint.first_to_connect.all()) + list(task_blueprint.second_to_connect.all())
for scheduling_relation in scheduling_relations:
if scheduling_relation.first.id == task_blueprint.id and scheduling_relation.placement.value == "after":
previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.second.id)
time_offset = scheduling_relation.time_offset
if scheduling_relation.second.id == task_blueprint.id and scheduling_relation.placement.value == "before":
previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.first.id)
time_offset = scheduling_relation.time_offset
return previous_related_task_blueprint, time_offset
# todo: maybe this can now be replaced by subtask.relative_start_time
def calculate_start_time(observation_subtask: Subtask):
"""
Calculate the start time of an observation subtask. It should calculate the starttime in case of 'C-T-C train'
The start time of an observation depends on the start_time+duration and offset time of the previous observation
and so its scheduling relations should be known.
If there is no previous observation the 'default' start time is in two minutes from now
For demo purposes, will be changed into dynamic scheduled in the future
Note that the method is not robust now when previous start time is unknown. Also parallel observations are
not supported yet
:param observation_subtask:
:return: start_time (utc time)
"""
previous_related_task_blueprint, time_offset = get_previous_related_task_blueprint_with_time_offset(observation_subtask.task_blueprint)
if previous_related_task_blueprint is None:
# This is the first observation so take start time 2 minutes from now
now = datetime.utcnow()
next_start_time = now + timedelta(minutes=+2, seconds=-now.second, microseconds=-now.microsecond)
else:
# Get the duration of last/previous observation
duration_in_sec = previous_related_task_blueprint.specifications_doc["duration"]
logger.info("Duration of previous observation '%s' (id=%s) is %d seconds",
previous_related_task_blueprint.pk, previous_related_task_blueprint.pk, duration_in_sec)
# Get the previous observation subtask, should actually be one
lst_previous_subtasks_obs = [st for st in previous_related_task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
previous_subtask_obs = lst_previous_subtasks_obs[0]
logger.info("The previous observation subtask is id=%s", previous_subtask_obs.pk)
if previous_subtask_obs.start_time is None:
logger.info("Oeps the previous start time is unknown so I can not calculate it")
next_start_time = previous_subtask_obs.start_time + timedelta(seconds=duration_in_sec+time_offset)
return next_start_time
def schedule_observation_subtask(observation_subtask: Subtask):
''' Schedule the given observation_subtask
For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler.
For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(observation_subtask)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk,
observation_subtask.specifications_template.type,
SubtaskType.Choices.OBSERVATION.value))
# step 1: set state to SCHEDULING
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
observation_subtask.save()
# step 1a: check start/stop times
if observation_subtask.start_time is None:
next_start_time = calculate_start_time(observation_subtask)
logger.info("observation id=%s has no starttime. assigned default: %s", observation_subtask.pk, formatDatetime(next_start_time))
observation_subtask.start_time = next_start_time
if observation_subtask.stop_time is None:
duration_in_sec = observation_subtask.task_blueprint.specifications_doc["duration"]
logger.info("Duration of observation id=%s is %d seconds", observation_subtask.pk, duration_in_sec)
stop_time = observation_subtask.start_time + timedelta(seconds=duration_in_sec)
logger.info("observation id=%s has no stop_time. assigned default: %s", observation_subtask.pk, formatDatetime(stop_time))
observation_subtask.stop_time = stop_time
# step 2: define input dataproducts
# TODO: are there any observations that take input dataproducts?
# step 3: create output dataproducts, and link these to the output
specifications_doc = observation_subtask.specifications_doc
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP") # todo: should this be derived from the task relation specification template?
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first()
directory = "/data/%s/%s/L%s/uv" % ("projects" if isProductionEnvironment() else "test-projects",
observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name,
observation_subtask.id)
for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']):
sap_name = "%s_%s" % (observation_subtask.id, pointing['name'])
sap = SAP.objects.create(name=sap_name,
specifications_doc={ "name": sap_name,
"identifiers": {}, # todo: TMSS-324
"pointing": pointing['pointing'],
"time": {"start_time": observation_subtask.start_time.isoformat(),
"duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()},
"antennas": {
"set": specifications_doc['stations']['antenna_set'],
"stations": specifications_doc['stations']['station_list'],
"fields": [] # todo: do we really have to calculate an object like {"station": "CS001", "field": "HBA", "type": "HBA"} here again for all involved stations? Isn't that info derived later anyway?
}
},
specifications_template=SAPTemplate.objects.get(name="SAP"))
for sb_nr in pointing['subbands']:
Dataproduct.objects.create(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
directory=directory,
dataformat=Dataformat.objects.get(value="MeasurementSet"),
datatype=Datatype.objects.get(value="visibilities"), # todo: is this correct?
producer=subtask_output,
specifications_doc={"sap": [sap_nr]}, # todo: set correct value. This will be provided by the RA somehow
specifications_template=dataproduct_specifications_template,
feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
feedback_template=dataproduct_feedback_template,
size=0 if sb_nr%10==0 else 1024*1024*1024*sb_nr,
expected_size=1024*1024*1024*sb_nr,
SAP=sap)
# step 4: resource assigner (if possible)
_assign_resources(observation_subtask)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
observation_subtask.save()
return observation_subtask
def schedule_pipeline_subtask(pipeline_subtask: Subtask):
''' Schedule the given pipeline_subtask
This method should typically be called upon the event of an predecessor (observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(pipeline_subtask)
if pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type,
SubtaskType.Choices.PIPELINE.value))
# step 1: set state to SCHEDULING
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
pipeline_subtask.save()
# step 1a: check start/stop times
# not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
if pipeline_subtask.start_time is None:
now = datetime.utcnow()
logger.info("pipeline id=%s has no starttime. assigned default: %s", pipeline_subtask.pk, formatDatetime(now))
pipeline_subtask.start_time = now
if pipeline_subtask.stop_time is None:
stop_time = pipeline_subtask.start_time + timedelta(hours=+1)
logger.info("pipeline id=%s has no stop_time. assigned default: %s", pipeline_subtask.pk, formatDatetime(stop_time))
pipeline_subtask.stop_time = stop_time
# step 2: link input dataproducts
if pipeline_subtask.inputs.count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type))
# TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty"
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="empty")
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
# iterate over all inputs
for pipeline_subtask_input in pipeline_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)]
pipeline_subtask_input.dataproducts.set(dataproducts)
# select subtask output the new dataproducts will be linked to
pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output?
# step 3: create output dataproducts, and link these to the output
# TODO: create them from the spec, instead of "copying" the input filename
output_dps = []
for input_dp in pipeline_subtask_input.dataproducts.all():
if '_' in input_dp.filename and input_dp.filename.startswith('L'):
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
else:
filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
output_dp = Dataproduct.objects.create(filename=filename,
directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)),
dataformat=Dataformat.objects.get(value="MeasurementSet"),
datatype=Datatype.objects.get(value="visibilities"), # todo: is this correct?
producer=pipeline_subtask_output,
specifications_doc={},
specifications_template=dataproduct_specifications_template,
feedback_doc="",
feedback_template=dataproduct_feedback_template,
SAP=input_dp.SAP)
DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False)
output_dps.append(output_dp)
pipeline_subtask_output.dataproducts.set(output_dps)
# step 4: resource assigner (if possible)
_assign_resources(pipeline_subtask)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
pipeline_subtask.save()
return pipeline_subtask
# === Misc ===
def create_and_schedule_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Convenience method: Create the subtasks form the task_blueprint, and schedule the ones that are not dependend on predecessors'''
create_subtasks_from_task_blueprint(task_blueprint)
return schedule_independent_subtasks_in_task_blueprint(task_blueprint)
def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Convenience method: Schedule the subtasks in the task_blueprint that are not dependend on predecessors'''
subtasks = list(task_blueprint.subtasks.all())
for subtask in subtasks:
if len(subtask.predecessors.all()) == len(subtask.predecessors.filter(state__value='finished').all()):
schedule_subtask(subtask)
return subtasks
def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs):
# preprocessing task default spec: {
# "storagemanager": "dysco",
# "flag": {"outerchannels": true, "autocorrelations": true, "rfi_strategy": "auto"},
# "demix": {"frequency_steps": 64, "time_steps": 10, "ignore_target": false, "sources": {}},
# "average": {"frequency_steps": 4, "time_steps": 1}}
# pipelinecontrol subtask default spec: {
# "storagemanager": "dysco",
# "demixer": {"baselines": "CS*,RS*&", "frequency_steps": 4, "time_steps": 1, "demix_frequency_steps": 4,
# "demix_time_steps": 1, "ignore_target": false, "demix_always": [], "demix_if_needed": []},
# "aoflagger": {"strategy": "HBAdefault"},
# "preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"},
# "preflagger1": {"corrtype": "auto"}}
# todo: check that this is actually how these need to be translated
# todo: especially check when defaults are NOT supposed to be set because the task implies to not include them
# todo: translate task "sources": {} - I guess this is demix_always/demix_if_needed?
# todo: set subtask demixer properties "baselines": "CS*,RS*&", "demix_always": [], "demix_if_needed": []
subtask_specs = {}
subtask_specs['storagemanager'] = preprocessing_task_specs.get('storagemanager',
default_subtask_specs.get('storagemanager'))
# todo: we depend on valid json here with knowledge about required properties. To generalize, we need to expect things to not be there.
if 'demix' or 'average' in preprocessing_task_specs:
# todo: should we exclude defaults in subtask.demixer if only one of these is defined on the task?
subtask_specs['demixer'] = default_subtask_specs['demixer']
if 'demix' in preprocessing_task_specs:
subtask_specs['demixer'].update({
"demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'],
"demix_time_steps": preprocessing_task_specs['demix']['time_steps'],
"ignore_target": preprocessing_task_specs['demix']['ignore_target']
}),
if 'average' in preprocessing_task_specs:
subtask_specs['demixer'].update({
"demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'],
"frequency_steps": preprocessing_task_specs['average']['frequency_steps'],
"demix_time_steps": preprocessing_task_specs['demix']['time_steps'],
"time_steps": preprocessing_task_specs['average']['time_steps'],
"ignore_target": preprocessing_task_specs['demix']['ignore_target']
}),
if 'flag' in preprocessing_task_specs:
if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none':
subtask_specs.update({"aoflagger": {"strategy": preprocessing_task_specs["flag"]["rfi_strategy"]}})
if preprocessing_task_specs["flag"]["rfi_strategy"] == 'auto':
# todo: handle 'auto' properly: we need to determine input dataproduct type and set LBA or HBA accordingly
# either here or allow 'auto' in subtask json and translate it when we connect obs to pipe subtask
default_strategy = default_subtask_specs['aoflagger']['strategy']
subtask_specs.update({"aoflagger": {"strategy": default_strategy}})
logger.warning('Translating aoflagger "auto" strategy to "%s" without knowing whether that makes sense!' % default_strategy)
if preprocessing_task_specs["flag"]["outerchannels"]:
subtask_specs.update({"preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"}})
if preprocessing_task_specs["flag"]["autocorrelations"]:
subtask_specs.update({"preflagger1": {"corrtype": "auto"}})
return subtask_specs
def specifications_doc_meets_selection_doc(specifications_doc, selection_doc):
"""
Filter specs by selection. This requires the specification_doc to...
A) ...contain ALL KEYS that we select / filter for
B) ...contain NO ADDITIONAL VALUES that are not selected / filtered for
:param specifications_doc: dataproduct specification as dict
:param selection_doc: selection filter as dict
:return: True when the input specifications_doc meets a filter described in selection_doc, False otherwise
"""
meets_criteria = True
for k, v in selection_doc.items():
if k not in specifications_doc.keys():
meets_criteria = False
else:
spec = specifications_doc[k]
if isinstance(spec, Iterable) and isinstance(v, Iterable):
for spec_v in spec:
if spec_v not in v:
meets_criteria = False
else:
if spec != v:
meets_criteria = False
logger.debug("specs %s matches selection %s: %s" % (specifications_doc, selection_doc, meets_criteria))
return meets_criteria
def get_observation_task_specification_with_check_for_calibrator(subtask):
"""
Retrieve the observation task blueprint specifications_doc from the given subtask object
If the Task is a calibrator then the related Target Observation specification should be returned
:param: subtask object
:return: task_spec: the specifications_doc of the blue print task which is allways a target observation
"""
if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint = get_related_target_observation_task_blueprint(subtask.task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk)
task_spec = target_task_blueprint.specifications_doc
logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s",
subtask.task_blueprint.id, target_task_blueprint.id)
else:
task_spec = subtask.task_blueprint.specifications_doc
return task_spec