-
Jorrit Schaap authored
TMSS-1117: use transactions. If one task/subtask creation/update fails, then rollback the whole transaction
Jorrit Schaap authoredTMSS-1117: use transactions. If one task/subtask creation/update fails, then rollback the whole transaction
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subtasks.py 136.65 KiB
import logging
import typing
logger = logging.getLogger(__name__)
from copy import deepcopy
from functools import cmp_to_key
from collections.abc import Iterable
from math import ceil
from lofar.common.ring_coordinates import RingCoordinates
from os.path import splitext
from lofar.common.datetimeutils import formatDatetime, round_to_second_precision
from lofar.common import isProductionEnvironment
from lofar.common.lcu_utils import get_current_stations
from lofar.stationmodel.antennafields import antenna_fields
from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException, SubtaskException
from datetime import datetime, timedelta
from lofar.common.datetimeutils import parseDatetime
from lofar.sas.tmss.tmss.tmssapp.models import *
from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict
from lofar.common.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize
from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException
from lofar.mac.observation_control_rpc import ObservationControlRPCClient
from lofar.mac.pipeline_control_rpc import PipelineControlRPCClient
from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset_and_station
from lofar.sas.tmss.tmss.exceptions import TMSSException
from django.db import transaction
# ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ====
def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool:
try:
task_blueprint.validate_json_against_this_templates_schema('specifications_doc', 'specifications_template')
except Exception as e:
logger.error("Failed to check_prerequities_for_subtask_creation: %s", e)
raise SubtaskCreationException from e
return True
def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
'''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.'''
with transaction.atomic():
logger.debug("creating subtask(s) from task_blueprint id=%s name='%s' type='%s' scheduling_unit_blueprint id=%s",
task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.type.value,
task_blueprint.scheduling_unit_blueprint.id)
check_prerequities_for_subtask_creation(task_blueprint)
subtasks = []
# recurse over predecessors, so that all dependencies in predecessor subtasks can be met.
for predecessor in task_blueprint.predecessors.all():
subtasks.extend(create_subtasks_from_task_blueprint(predecessor))
if task_blueprint.subtasks.count() > 0:
logger.debug("skipping creation of subtasks because they already exist for task_blueprint id=%s, name='%s', task_template_name='%s'",
task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.name)
return subtasks
# fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint
generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint,
create_qafile_subtask_from_task_blueprint,
create_qaplots_subtask_from_task_blueprint],
'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint],
'pulsar pipeline': [create_pulsar_pipeline_subtask_from_task_blueprint],
'ingest': [create_ingest_subtask_from_task_blueprint],
'cleanup': [create_cleanup_subtask_from_task_blueprint]}
generators_mapping['calibrator observation'] = generators_mapping['target observation']
generators_mapping['parallel calibrator target observation'] = generators_mapping['target observation']
generators_mapping['beamforming observation'] = [create_observation_control_subtask_from_task_blueprint]
template_name = task_blueprint.specifications_template.name
if template_name in generators_mapping:
generators = generators_mapping[template_name]
for generator in generators:
try:
# try to create the subtask, allow exception to bubble upwards so the creation transaction can be rolled back upon error.
subtask = generator(task_blueprint)
if subtask is not None:
logger.info("created subtask id=%s type='%s' from task_blueprint id=%s name='%s' type='%s' scheduling_unit_blueprint id=%s",
subtask.id, subtask.specifications_template.type.value,
task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.type.value,
task_blueprint.scheduling_unit_blueprint.id)
subtasks.append(subtask)
except Exception as e:
logger.exception(e)
raise SubtaskCreationException('Cannot create subtasks for task id=%s for its schema name=\'%s\' in generator \'%s\'' % (task_blueprint.pk, template_name, generator.__name__)) from e
return subtasks
else:
logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
raise SubtaskCreationException('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
def _filter_subbands(obs_subbands: list, selection: dict) -> [int]:
from itertools import groupby, count
if not type(selection) == dict or not selection.get('method', None):
raise SubtaskCreationException('Did not get a valid subband selection. Expected dict with "method" but got %s' % selection)
if selection['method'] == 'copy':
return obs_subbands
elif selection['method'] == 'select subset':
return list(set(obs_subbands) & set(selection['list'])) # intersection
elif selection['method'] == 'largest continuous subset':
c = count()
return max((list(g) for _, g in groupby(obs_subbands, lambda x: x - next(c))), key=len)
raise SubtaskCreationException('Did not get a valid subband selection: got %s' % selection["method"])
def _add_pointings(pointing_a, pointing_b):
if pointing_a['direction_type'] != pointing_b['direction_type']:
raise SubtaskCreationException(
"Cannot add pointings because direction types differ pointing_a=%s; pointing_b=%s" % (pointing_a, pointing_b))
pointing = {"direction_type": pointing_a['direction_type']}
for angle in ['angle1', 'angle2']:
pointing[angle] = pointing_a.get(angle, 0.0) + pointing_b.get(angle, 0.0)
return pointing
def _generate_tab_ring_pointings(pointing, tab_rings) -> [dict]:
if pointing['direction_type'] != 'J2000':
raise SubtaskCreationException('Tab rings are not supported for direction_type=%s (use J2000 or specify TABs specifically)' % pointing['direction_type'])
# Generate relative pointings according to tab rings spec
# Note: Not sure what the center arg resembles here, because the rings don't seem to be formed around the given coordinates.
# Seems to be only used to do some correction (morph the grid properly towards the NCP, according to Jan David).
coordinates = RingCoordinates(numrings=tab_rings['count'],
width=tab_rings['width'],
center=(pointing['angle1'], pointing['angle2']),
dirtype=pointing['direction_type']).coordinates()
relative_pointings = [{'angle1': angle1, 'angle2': angle2, 'direction_type': pointing['direction_type']} for angle1, angle2 in coordinates]
# add ring coordinates to main pointing to get absolute TAB pointings and return them
tab_pointings = [_add_pointings(pointing, relative_pointing) for relative_pointing in relative_pointings]
return tab_pointings
def _get_related_target_sap_by_name(task_blueprint, sap_name):
# TODO: If we start using beamforming observations in parallel with target imaging observations, then we need to search for saps in the target imaging obs spec.
# See git history for an initial implementation.
for target_sap in task_blueprint.specifications_doc['SAPs']:
if target_sap['name'] == sap_name:
return target_sap
raise SubtaskCreationException("Cannot create beamformer subtask from task id=%s because it does not contain target SAP with name=%s in blueprint %s" % (task_blueprint.id, sap_name, task_blueprint.specifications_doc))
def _merge_subtask_beamformer_pipeline_specs(spec_a: dict, spec_b: dict) -> dict:
""" Merge beamformer pipelines if they can be run in a single pipeline.
In/out: specifications as described in subtask_spec['COBALT']['tab_pipelines'].
Throws a ValueError if the specifications could not be merged.
"""
if spec_a['stations'] != spec_b['stations']:
raise ValueError("Cannot merge subtask beamformer pipelines with different station sets: %s vs %s" % (spec_a['stations'], spec_b['stations']))
# whether we merged anything
merged = False
if 'coherent' in spec_a and 'coherent' in spec_b:
# Could be implemented later if specs are sufficiently similar
raise ValueError("Cannot merge 2 coherent beamformer pipelines")
if 'incoherent' in spec_a and 'incoherent' in spec_b:
# Could be implemented later if specs are sufficiently similar
raise ValueError("Cannot merge 2 incoherent beamformer pipelines")
"""
Coherent Stokes and Incoherent Stokes pipelines can be combined, if:
- they process the same input stations,
- they process the same subbands per SAP
"""
if 'incoherent' in spec_a and 'coherent' in spec_b:
# swap them, fall through to next if check for the reverse condition
spec_a, spec_b = spec_b, spec_a
if 'coherent' in spec_a and 'incoherent' in spec_b:
merged = True
# merge into spec_a
merged_spec = deepcopy(spec_a)
merged_spec['incoherent'] = spec_b['incoherent']
for SAP_in_b in spec_b['SAPs']:
# find all SAPs in a that have the name of this SAP in b. Should be either 0 or 1 as names should be unique.
SAP_in_a = [s for s in merged_spec['SAPs'] if s['name'] == SAP_in_b['name']]
assert 0 <= len(SAP_in_a) <= 1, "We should find only a single occurance of a SAP name in the beamformer specification"
if not SAP_in_a:
# spec_b beamforms a SAP that is not yet beamformed in spec_a, so we can just add it.
merged_spec['SAPs'].append(SAP_in_b)
else:
# spec_a and spec_b both beamform this SAP, so we need to merge them
# we should find only one, anyway
SAP_in_a = SAP_in_a[0]
if SAP_in_a['subbands'] != SAP_in_b['subbands']:
raise ValueError("Cannot merge beamformer pipelines that beamform different subbands for the same SAP")
# settings are compatible -- merge TABs
SAP_in_a['tabs'].extend(SAP_in_b['tabs'])
if not merged:
raise ValueError("Could not merge beamformer pipelines")
return merged_spec
def _compress_subtask_beamformer_pipelines(pipeline_spec: list) -> list:
""" Merge beamformer pipelines if they can be run in a single pipeline.
Return a new list of pipeline specifications.
In/out: subtask_spec['COBALT']['tab_pipelines'].
"""
# resulting specifications
result_specs = pipeline_spec[:]
# whether we need to do another round of merging. keep merging until we can't.
merge_again = True
# Try to merge every pair of pipelines
while merge_again:
merge_again = False
try:
for a_idx, spec_a in enumerate(result_specs):
# Try to merge each remaining spec into spec_a
for spec_b in result_specs[a_idx+1:]:
try:
new_spec_a = _merge_subtask_beamformer_pipeline_specs(spec_a, spec_b)
# merged spec_b into spec_a, so remove original specs and add merged one
result_specs.remove(spec_a)
result_specs.remove(spec_b)
result_specs.append(new_spec_a)
# restart outer loop, since our counters are all off now
merge_again = True
raise StopIteration
except ValueError:
# couldn't merge spec_a and spec_b
pass
except StopIteration:
pass
return result_specs
def _get_specifications_with_defaults(task_blueprint: TaskBlueprint):
return task_blueprint.specifications_template.add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc)
def _get_target_and_or_calibrator_specification(task_blueprint: TaskBlueprint) -> (dict, dict):
""" Return the (target_specifications, calibrator_specifications), or None instead of either
if the calibrator or target cannot be found.
Defaults have been added to any specifications returned. """
if task_blueprint.specifications_template.name == 'parallel calibrator target observation':
specs = _get_specifications_with_defaults(task_blueprint)
return (specs['target'], specs['calibrator'])
if task_blueprint.specifications_template.name in ('target observation', 'beamforming observation'):
cal_task, _ = _get_related_observation_task_blueprint(task_blueprint, 'calibrator observation')
return (_get_specifications_with_defaults(task_blueprint), cal_task and _get_specifications_with_defaults(cal_task))
if task_blueprint.specifications_template.name == 'calibrator observation':
target_task, _ = _get_related_observation_task_blueprint(task_blueprint, 'target observation')
return (target_task and _get_specifications_with_defaults(target_task), _get_specifications_with_defaults(task_blueprint))
return (None, None)
def create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint: TaskBlueprint) -> (dict, SubtaskTemplate):
"""
Create a valid observation subtask specification ('observation control' SubtaskTemplate schema) based on the task_blueprint's settings
"""
# check if task_blueprint has an observation-like specification
if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation', 'beamforming observation', 'parallel calibrator target observation']:
raise SubtaskCreationException("Cannot create observation subtask specifications from task_blueprint id=%s with template name='%s'" % (
task_blueprint.id, task_blueprint.specifications_template.name))
# start with an observation subtask specification with all the defaults and the right structure according to the schema
subtask_template = SubtaskTemplate.objects.get(name='observation control')
subtask_spec = subtask_template.get_default_json_document_for_schema()
# maintain a minimum version of COBALT that is required to execute these specifications
min_COBALT_version = 1
# wipe the default pointings, these should come from the task_spec
subtask_spec['stations'].pop('analog_pointing', None)
subtask_spec['stations']['digital_pointings'] = []
# now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec
task_spec = _get_specifications_with_defaults(task_blueprint)
# extract the individual target/calibrator specs if possible
target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(task_blueprint)
# block size calculator will need to be fed all the relevant specs
cobalt_calculator_constraints = BlockConstraints(None, [], [])
# correlator
subtask_spec["COBALT"]["correlator"] = { "enabled": False }
if target_task_spec and "correlator" in target_task_spec:
subtask_spec["COBALT"]["correlator"]["enabled"] = True
subtask_spec["COBALT"]["correlator"]["channels_per_subband"] = target_task_spec["correlator"]["channels_per_subband"]
corr = CorrelatorSettings()
corr.nrChannelsPerSubband = target_task_spec["correlator"]["channels_per_subband"]
corr.integrationTime = target_task_spec["correlator"]["integration_time"]
cobalt_calculator_constraints.correlator = corr
# The beamformer obs has a beamformer-specific specification block.
# The rest of it's specs is the same as in a target observation.
# So... copy the beamformer specs first, then loop over the shared specs...
if 'beamforming' in task_blueprint.specifications_template.name.lower():
# start with empty tab/flyseye pipelines, fill them below from task spec
subtask_spec['COBALT']['beamformer']['tab_pipelines'] = []
subtask_spec['COBALT']['beamformer']['flyseye_pipelines'] = []
for task_beamformer_spec in task_spec['beamformers']:
# the wanted/specified beamformer station list is the intersecion of the observation station list with the requested beamformer stations.
# at the moment of scheduling this list is re-evaluated for available stations, and the max_nr_missing is evaluated as well.
# this intersection is not needed per se, because COBALT plays nicely and does similar filtering for stations that are actually available,
# but hey, if cobalt can play nice, then so can we! :)
# So, let's come up with the correct complete beamforming-stations-list, and ask cobalt to explicitely uses these.
# combine all stations in the groups...
beamformer_station_list = sum([station_group["stations"] for station_group in task_beamformer_spec["station_groups"]], [])
# make intersection with observing-stations...
beamformer_station_set = set(beamformer_station_list).intersection(set(subtask_spec['stations']['station_list']))
# make it a nice readable sorted list.
beamformer_station_list = sorted(list(beamformer_station_list)) # todo: should this use beamformer_station_set instead?
# use the beamformer_station_list below for the tab pipeline and/or flys eye
for stokes_type in ["coherent", "incoherent"]:
if stokes_type not in task_beamformer_spec or not task_beamformer_spec[stokes_type]["SAPs"]:
# nothing specified for this stokes type
continue
# SAPs
subtask_saps = []
for sap in task_beamformer_spec[stokes_type]["SAPs"]:
subtask_sap = { "name": sap["name"], "tabs": [] }
try:
target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name'])
except Exception as e:
raise ValueError("Cannot find SAP %s in beamformer spec for %s: %s, part of task_beamformer_spec: %s" % (sap['name'], stokes_type, sap, task_beamformer_spec)) from e
if stokes_type == "coherent":
# convert manual coherent TABs
for tab in sap["tabs"]:
subtask_sap["tabs"].append({
"coherent": True,
# determine absolute tab pointing for subtask by adding relative tab pointing from task to target sap pointing
"pointing": tab["pointing"] if not tab.get("relative", False) else _add_pointings(tab['pointing'], target_sap['digital_pointing'])
})
if "tab_rings" in sap:
# convert rings of coherent TABs
ring_pointings = _generate_tab_ring_pointings(target_sap["digital_pointing"], sap.pop("tab_rings"))
subtask_sap['tabs'] += [{'coherent': True, 'pointing': pointing} for pointing in ring_pointings]
else:
# an incoherent TAB, there can be only one
subtask_sap["tabs"] = [{"coherent": False}]
# construct set of subbands to beamform
subtask_sap['subbands'] = _filter_subbands(target_sap['subbands'], sap['subbands'])
# beamforming a subset of the subbands requires COBALT 2+
if subtask_sap['subbands'] != target_sap['subbands']:
min_COBALT_version = max(min_COBALT_version, 2)
subtask_saps.append(subtask_sap)
# create a pipeline item and add it to the list
beamformer_pipeline = {stokes_type: task_beamformer_spec[stokes_type]["settings"],
"stations": beamformer_station_list,
"SAPs": subtask_saps}
subtask_spec['COBALT']['beamformer']['tab_pipelines'].append(beamformer_pipeline)
# add constraints for calculator
ss = StokesSettings()
ss.nrChannelsPerSubband = task_beamformer_spec[stokes_type]["settings"]["channels_per_subband"]
ss.timeIntegrationFactor = task_beamformer_spec[stokes_type]["settings"]["time_integration_factor"]
if stokes_type == "coherent":
cobalt_calculator_constraints.coherentStokes.append(ss)
else:
cobalt_calculator_constraints.incoherentStokes.append(ss)
if 'flys eye' in task_beamformer_spec and task_beamformer_spec['flys eye']['enabled']:
# add constraints for calculator
ss = StokesSettings()
ss.nrChannelsPerSubband = task_beamformer_spec["flys eye"]["settings"]["channels_per_subband"]
ss.timeIntegrationFactor = task_beamformer_spec["flys eye"]["settings"]["time_integration_factor"]
cobalt_calculator_constraints.coherentStokes.append(ss)
flyseye_pipeline = {"coherent": task_beamformer_spec["flys eye"]["settings"],
"stations": beamformer_station_list}
subtask_spec['COBALT']['beamformer']['flyseye_pipelines'].append(flyseye_pipeline)
# todo: Clarify if we can add a subbands_selection on the flys eye task spec, to filter down for sap['subbands']
# If I got that correctly, specifying subbands is not really supported later down the chain, so whatever we do here gets ignored anyway?
# for sap in task_spec["SAPs"]:
# target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name'])
# sap['subbands'] = filter_subbands(...)
# if sap['subbands'] == target_sap['subbands']: # todo: is this really required? pseudo-code in confluence suggests so, but what harm does the list do?
# sap['subbands'] = []
# merge pipelines that can be run in a single pipeline, f.e. to combine CS+IS into a single pipeline
subtask_spec['COBALT']['beamformer']['tab_pipelines'] = _compress_subtask_beamformer_pipelines(subtask_spec['COBALT']['beamformer']['tab_pipelines'])
# if we have more than 1 pipeline, we need COBALT 2+
if (len(subtask_spec['COBALT']['beamformer']['tab_pipelines']) + len(subtask_spec['COBALT']['beamformer']['flyseye_pipelines'])) > 1:
min_COBALT_version = max(min_COBALT_version, 2)
if target_task_spec and ('target' in task_blueprint.specifications_template.name.lower() or 'beamforming' in task_blueprint.specifications_template.name.lower()):
# copy/convert the analoge/digital_pointings only for non-calibrator observations (the calibrator has its own pointing)
# for parallel calibrator&target observations the calibrator pointing is added later as a final sap.
for sap in target_task_spec.get("SAPs", []):
subtask_spec['stations']['digital_pointings'].append(
{"name": sap["name"],
"target": sap["target"],
"pointing": {"direction_type": sap["digital_pointing"]["direction_type"],
"angle1": sap["digital_pointing"]["angle1"],
"angle2": sap["digital_pointing"]["angle2"]},
"subbands": sap["subbands"]
})
if "tile_beam" in target_task_spec:
subtask_spec['stations']['analog_pointing'] = { "direction_type": target_task_spec["tile_beam"]["direction_type"],
"angle1": target_task_spec["tile_beam"]["angle1"],
"angle2": target_task_spec["tile_beam"]["angle2"] }
# The calibrator has a minimal calibration-specific specification subset.
# The rest of it's specs are 'shared' with the target observation.
# So... copy the calibrator specs first, then loop over the shared target/calibrator specs...
if calibrator_task_spec and 'calibrator' in task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (task_blueprint.id, task_blueprint.specifications_template.name))
if calibrator_task_spec.get('autoselect', True):
logger.info("auto-selecting calibrator target based on elevation of target observation...")
# Get related Target Observation Task
if "tile_beam" in target_task_spec:
subtask_spec['stations']['analog_pointing'] = {
"direction_type": target_task_spec["tile_beam"]["direction_type"],
"angle1": target_task_spec["tile_beam"]["angle1"],
"angle2": target_task_spec["tile_beam"]["angle2"]}
else:
raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint "
"id=%s in auto-select mode, because the related target observation "
"task_blueprint id=%s has no tile beam pointing defined" % (
task_blueprint.id, target_task_blueprint.id))
else:
subtask_spec['stations']['analog_pointing'] = {"direction_type": calibrator_task_spec["pointing"]["direction_type"],
"angle1": calibrator_task_spec["pointing"]["angle1"],
"angle2": calibrator_task_spec["pointing"]["angle2"]}
# for the calibrator, the subbands are the union of the subbands of the targetobs
subbands = []
for SAP in target_task_spec['SAPs']:
subbands.extend(SAP['subbands'])
subbands = sorted(list(set(subbands)))
# for a plain calibrator, the digital pointing is equal to the analog pointing
subtask_spec['stations']['digital_pointings'].append({'name': calibrator_task_spec['name'],
'subbands': subbands,
'pointing': subtask_spec['stations']['analog_pointing']})
# At this moment of subtask creation we known which stations we *want* from the task_spec
# But we do not know yet which stations are available at the moment of observing.
# So, we decided that we set the subtask station_list as the union of all stations in all specified groups.
# This way, the user can see which stations are (likely) to be used.
# At the moment of scheduling of this subtask, then station_list is re-evaluated, and the max_nr_missing per group is validated.
subtask_spec['stations']['station_list'] = []
if target_task_spec and "station_groups" in target_task_spec:
for station_group in target_task_spec["station_groups"]:
subtask_spec['stations']['station_list'].extend(station_group["stations"])
# make list have unique items
subtask_spec['stations']['station_list'] = sorted(list(set(subtask_spec['stations']['station_list'])))
if not subtask_spec['stations']['station_list']:
raise SubtaskCreationException("Cannot create observation subtask specifications for task_blueprint id=%s. No stations are defined." % (task_blueprint.id,))
subtask_spec['stations']["antenna_set"] = target_task_spec["antenna_set"]
subtask_spec['stations']["filter"] = target_task_spec["filter"]
# Calculate block sizes and feed those to the spec
cobalt_calculator = BlockSize(constraints=cobalt_calculator_constraints)
subtask_spec["COBALT"]["blocksize"] = cobalt_calculator.blockSize
if "correlator" in target_task_spec:
subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = cobalt_calculator.nrBlocks
subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = cobalt_calculator.nrSubblocks
subtask_spec["COBALT"]["version"] = min_COBALT_version
subtask_spec["QA"]["inspection_plots"] = target_task_spec.get("QA", {"inspection_plots": "none"}).get("inspection_plots", "none")
# make sure that the subtask_spec is valid conform the schema
validate_json_against_schema(subtask_spec, subtask_template.schema)
return subtask_spec, subtask_template
def get_stations_in_group(station_group_name: str) -> []:
'''Get a list of station names in the given station_group.
A lookup is performed in the RADB, in the virtual instrument table'''
# TODO Make names RA and TMSS spec equal: 'NL' or 'DUTCH'?
if station_group_name == "DUTCH":
station_group_name = "NL"
#International required is by defintion DE601 or DE605, take 601 for now
# TODO check with RA the availability of both stations
if station_group_name == "INTERNATIONAL_REQUIRED":
return ["DE601"]
with RADBRPC.create() as radbrpc:
resource_group_memberships = radbrpc.getResourceGroupMemberships()['groups']
station_resource_group = next(rg for rg in resource_group_memberships.values()
if (rg['resource_group_type'] == 'station_group' or rg['resource_group_type'] == 'virtual') and rg['resource_group_name'] == station_group_name)
station_names = set(resource_group_memberships[rg_id]['resource_group_name'] for rg_id in station_resource_group['child_ids']
if resource_group_memberships[rg_id]['resource_group_type'] == 'station')
# HACK, RS408 should be removed from the RADB
if 'RS408' in station_names:
station_names.remove('RS408')
# HACK remove TEST1 from station list otherwise validate will fail
if 'TEST1' in station_names:
station_names.remove('TEST1')
return sorted(list(station_names))
def get_related_calibrator_observation_task_blueprint(target_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement):
"""
get the related calibrator observation task_blueprint and the relative placement for the given target task_blueprint
if nothing found return None
"""
if 'target' not in target_task_blueprint.specifications_template.name.lower():
raise ValueError("Cannot get a related calibrator observation task_blueprint for non-target task_blueprint id=%s template_name='%s'",
target_task_blueprint.id, target_task_blueprint.specifications_template.name)
return _get_related_observation_task_blueprint(target_task_blueprint, ['calibrator observation', 'parallel calibrator target observation'])
def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement):
"""
get the related target observation task_blueprint and the relative placement for the given calibrator or beamformer task_blueprint
if nothing found return None
"""
if 'calibrator' not in calibrator_or_beamformer_task_blueprint.specifications_template.name.lower() and \
'beamformer' not in calibrator_or_beamformer_task_blueprint.specifications_template.name.lower():
raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator/beamformer task_blueprint id=%s template_name='%s'",
calibrator_or_beamformer_task_blueprint.id, calibrator_or_beamformer_task_blueprint.specifications_template.name)
return _get_related_observation_task_blueprint(calibrator_or_beamformer_task_blueprint, ['target observation', 'parallel calibrator target observation'])
def _get_related_observation_task_blueprint(task_blueprint: TaskBlueprint, related_template_name: typing.Union[str, list, tuple]) -> (TaskBlueprint, SchedulingRelationPlacement):
related_template_names = [related_template_name] if isinstance(related_template_name, str) else related_template_name
try:
return next((relation.second, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(first=task_blueprint).all()
if relation.second is not None and relation.second.specifications_template.name.lower() in related_template_names)
except StopIteration:
try:
return next((relation.first, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(second=task_blueprint).all()
if relation.first is not None and relation.first.specifications_template.name.lower() in related_template_names)
except StopIteration:
if task_blueprint.specifications_template.name.lower() in related_template_names:
# the 'related' task blueprint we are looking for is itself.
return task_blueprint, None
else:
logger.debug("No related %s task_blueprint found for task_blueprint id=%d", related_template_names, task_blueprint.id)
return None, None
def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
"""
Create an observation control subtask .
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
"""
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 0a: check specification. Json should be valid according to schema, but needs some additional sanity checks
specifications_doc, subtask_template = create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint)
# sanity check: total number of subbands should not exceed 488
all_subbands = set(sum([dp['subbands'] for dp in specifications_doc['stations']['digital_pointings']], []))
if len(all_subbands) > 488:
raise SubtaskCreationException("Total number of subbands %d exceeds the maximum of 488 for task_blueprint id=%s" % (len(all_subbands), task_blueprint.id))
# step 1: create subtask in defining state
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = { "scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"specifications_doc": specifications_doc,
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"tags": [],
"primary": True,
"cluster": Cluster.objects.get(name=cluster_name)
}
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
# an observation has no input, it just produces output data
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
return subtask
def create_qafile_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
if not observation_subtasks:
raise SubtaskCreationException("Cannot create %s subtask for task_blueprint id=%d because it has no observation subtask(s)" % (
SubtaskType.Choices.QA_FILES.value, task_blueprint.pk))
observation_subtask = observation_subtasks[-1] # TODO: decide what to do when there are multiple observation subtasks?
return create_qafile_subtask_from_observation_subtask(observation_subtask)
def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) -> Subtask:
''' Create a subtask to convert the observation output to a QA h5 file.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(observation_subtask.task_blueprint)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
SubtaskType.Choices.QA_FILES.value, observation_subtask.pk,
observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value))
obs_task_spec = get_observation_task_specification_with_check_for_calibrator(observation_subtask)
obs_task_qafile_spec = obs_task_spec.get("QA", {}).get("file_conversion", {})
if not obs_task_qafile_spec.get("enabled", False):
logger.debug("Skipping creation of qafile_subtask because QA.file_conversion is not enabled")
return None
# step 1: create subtask in defining state, with filled-in subtask_template
qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion")
qafile_subtask_spec = qafile_subtask_template.get_default_json_document_for_schema()
qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands")
qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps")
validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema)
qafile_subtask_data = { "scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": observation_subtask.task_blueprint,
"specifications_template": qafile_subtask_template,
"specifications_doc": qafile_subtask_spec,
"primary": False,
"cluster": observation_subtask.cluster}
qafile_subtask = Subtask.objects.create(**qafile_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = selection_template.get_default_json_document_for_schema()
for obs_out in observation_subtask.outputs.all():
qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask,
producer=obs_out, # TODO: determine proper producer based on spec in task_relation_blueprint
selection_doc=selection_doc,
selection_template=selection_template)
qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask)
# step 3: set state to DEFINED
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
qafile_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qa_file_subtask
return qafile_subtask
def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
if 'calibrator' in task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot retrieve specifications for task id=%d because no related target observation is found " % task.pk)
else:
target_task_blueprint = task_blueprint
if not target_task_blueprint.specifications_doc.get("QA", {}).get("file_conversion", {}).get("enabled", False):
logger.debug("Skipping creation of qaplots_subtask because QA.file_conversion is not enabled")
return None
qafile_subtasks = [st for st in task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value]
if qafile_subtasks:
qafile_subtask = qafile_subtasks[0] # TODO: decide what to do when there are multiple qafile subtasks?
return create_qaplots_subtask_from_qafile_subtask(qafile_subtask)
else:
raise SubtaskCreationException('Cannot create QA plotting subtask for task id=%s because no predecessor QA file conversion subtask exists.' % (task_blueprint.pk, ))
def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subtask:
''' Create a subtask to create inspection plots from the QA h5 file.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint)
if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk,
qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
obs_task_spec = get_observation_task_specification_with_check_for_calibrator(qafile_subtask)
obs_task_qaplots_spec = obs_task_spec.get("QA", {}).get("plots", {})
if not obs_task_qaplots_spec.get("enabled", False):
logger.debug("Skipping creation of qaplots_subtask because QA.plots is not enabled")
return None
# step 1: create subtask in defining state, with filled-in subtask_template
qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots")
qaplots_subtask_spec_doc = qaplots_subtask_template.get_default_json_document_for_schema()
qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation")
qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation")
validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema)
qaplots_subtask_data = { "scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": qafile_subtask.task_blueprint,
"specifications_template": qaplots_subtask_template,
"specifications_doc": qaplots_subtask_spec_doc,
"primary": False,
"cluster": qafile_subtask.cluster}
qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data)
# step 2: create and link subtask input/output
selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
selection_doc = selection_template.get_default_json_document_for_schema()
qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask,
producer=qafile_subtask.outputs.first(),
selection_doc=selection_doc,
selection_template=selection_template)
qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask)
# step 3: set state to DEFINED
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
qaplots_subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
return qaplots_subtask
def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, subtask_template_name: str, generate_subtask_specs_from_task_spec_func) -> Subtask:
''' Create a subtask to for the preprocessing pipeline.
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# TODO: go more elegant lookup of predecessor observation task
# TODO: do not require the input to come from an observation
observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all()
if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)]
if not observation_predecessor_tasks:
raise SubtaskCreationException("Cannot create a subtask for task_blueprint id=%s because it is not connected "
"to an observation predecessor (sub)task." % task_blueprint.pk)
# step 1: create subtask in defining state, with filled-in subtask_template
subtask_template = SubtaskTemplate.objects.get(name=subtask_template_name)
default_subtask_specs = subtask_template.get_default_json_document_for_schema()
task_specs_with_defaults = task_blueprint.specifications_template.add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc)
subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs)
cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
subtask_data = { "scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"specifications_doc": subtask_specs,
"primary": True,
"cluster": Cluster.objects.get(name=cluster_name) }
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input/output
for task_relation_blueprint in task_blueprint.produced_by.all():
producing_task_blueprint = task_relation_blueprint.producer
# create inputs for all predecessor observation subtask outputs that belong to the producer task of this task relation
# TODO: more filtering needed?
predecessor_observation_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
for predecessor_obs_subtask in predecessor_observation_subtasks:
for predecessor_subtask_output in predecessor_obs_subtask.outputs.all():
subtask_input = SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
subtask_output = SubtaskOutput.objects.create(subtask=subtask)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
return subtask
def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
return create_pipeline_subtask_from_task_blueprint(task_blueprint, "preprocessing pipeline", _generate_subtask_specs_from_preprocessing_task_specs)
def create_pulsar_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
return create_pipeline_subtask_from_task_blueprint(task_blueprint, "pulsar pipeline", _generate_subtask_specs_from_pulsar_pipeline_task_specs)
def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
''' Create a subtask to for an ingest job
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 1: create subtask in defining state, with filled-in subtask_template
subtask_template = SubtaskTemplate.objects.get(name='ingest control')
default_subtask_specs = subtask_template.get_default_json_document_for_schema()
subtask_specs = default_subtask_specs # todo: translate specs from task to subtask once we have non-empty templates
cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
subtask_data = {"scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"specifications_doc": subtask_specs,
"primary": True,
"cluster": Cluster.objects.get(name=cluster_name)}
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input
for task_relation_blueprint in task_blueprint.produced_by.all():
producing_task_blueprint = task_relation_blueprint.producer
predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()]
for predecessor_subtask in predecessor_subtasks:
for predecessor_subtask_output in predecessor_subtask.outputs.all():
SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest
return subtask
def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
''' Create a subtask for a cleanup job
This method implements "Instantiate subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_subtask_creation(task_blueprint)
# step 1: create subtask in defining state, with filled-in subtask_template
subtask_template = SubtaskTemplate.objects.get(name='cleanup')
subtask_specs = subtask_template.get_default_json_document_for_schema()
try:
cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
except KeyError:
cluster_name = "CEP4"
subtask_data = {"scheduled_on_sky_start_time": None,
"scheduled_on_sky_stop_time": None,
"state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
"task_blueprint": task_blueprint,
"specifications_template": subtask_template,
"specifications_doc": subtask_specs,
"primary": True,
"cluster": Cluster.objects.get(name=cluster_name)}
subtask = Subtask.objects.create(**subtask_data)
# step 2: create and link subtask input
# for this cleanup subtask an 'input' seems a bit weird, but it actually makes sense!
# this cleanup subtask will cleanup the output data of all linked input predecessors.
for task_relation_blueprint in task_blueprint.produced_by.all():
producing_task_blueprint = task_relation_blueprint.producer
predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()]
for predecessor_subtask in predecessor_subtasks:
for predecessor_subtask_output in predecessor_subtask.outputs.all():
SubtaskInput.objects.create(subtask=subtask,
producer=predecessor_subtask_output,
selection_doc=task_relation_blueprint.selection_doc,
selection_template=task_relation_blueprint.selection_template)
# step 3: set state to DEFINED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.save()
# done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest
return subtask
# ==== various schedule* methods to schedule a Subtasks (if possible) ====
def schedule_subtask(subtask: Subtask) -> Subtask:
'''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.'''
check_prerequities_for_scheduling(subtask)
if (subtask.scheduled_on_sky_start_time is None or subtask.scheduled_on_sky_start_time < datetime.utcnow()) and subtask.predecessors.count() > 0:
# this is a successor task that can start now. Auto assign nice start_time just a bit in the future.
subtask.scheduled_on_sky_start_time = round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))
subtask.save()
try:
if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
return schedule_pipeline_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
return schedule_observation_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value:
return schedule_qafile_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
return schedule_qaplots_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value:
return schedule_ingest_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.CLEANUP.value:
return schedule_cleanup_subtask(subtask)
if subtask.specifications_template.type.value == SubtaskType.Choices.COPY.value:
return schedule_copy_subtask(subtask)
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." %
(subtask.pk, subtask.specifications_template.type.value))
except Exception as e:
try:
logger.exception(e)
if isinstance(e, SubtaskSchedulingSpecificationException):
# set the subtask to state 'UNSCHEDULABLE' in case of a specification exception
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULABLE.value)
subtask.save()
else:
# set the subtask to state 'ERROR'.
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
subtask.error_reason = f'{e}'
subtask.save()
except Exception as e2:
logger.error(e2)
finally:
# ... and re-raise the original exception (wrapped)
raise SubtaskSchedulingException("Error while scheduling subtask id=%d : %s" % (subtask.pk, e)) from e
def unschedule_subtask(subtask: Subtask, post_state: SubtaskState=None) -> Subtask:
'''unschedule the given subtask, removing all output dataproducts,
and setting its state afterwards to the post_state (which is 'defined' if None given).'''
if subtask.state.value != SubtaskState.Choices.SCHEDULED.value:
raise SubtaskSchedulingException("Cannot unschedule subtask id=%d because it is not SCHEDULED. Current state=%s" % (subtask.pk, subtask.state.value))
try:
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULING.value)
subtask.save()
for output in subtask.outputs.all():
# delete all output transforms, and the the dataproducts themselves
DataproductTransform.objects.filter(output__in=output.dataproducts.all()).all().delete()
output.dataproducts.all().delete()
if subtask.specifications_template.type.value in (SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value):
assign_or_unassign_resources(subtask)
if post_state is None:
post_state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
subtask.state = post_state
subtask.save()
except Exception as e:
try:
# set the subtask to state 'ERROR'...
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULABLE.value)
subtask.save()
except Exception as e2:
logger.error(e2)
finally:
# ... and re-raise the original exception
raise
def unschedule_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint):
'''Convenience method: Unschedule (and return) all scheduled subtasks in the task_blueprint'''
scheduled_subtasks = list(task_blueprint.subtasks.filter(state__value=SubtaskState.Choices.SCHEDULED.value).all())
for subtask in scheduled_subtasks:
unschedule_subtask(subtask)
def schedule_subtask_and_update_successor_start_times(subtask: Subtask) -> Subtask:
scheduled_subtask = schedule_subtask(subtask)
shift_successors_until_after_stop_time(scheduled_subtask)
return scheduled_subtask
def update_subtasks_start_times_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint, start_time: datetime):
for task_blueprint in scheduling_unit.task_blueprints.all():
defined_independend_subtasks = task_blueprint.subtasks.filter(state__value='defined').filter(inputs=None).all()
for subtask in defined_independend_subtasks:
update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + subtask.task_blueprint.relative_start_time)
def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtask, start_time: datetime):
subtask.scheduled_on_sky_start_time = start_time
subtask.scheduled_on_sky_stop_time = subtask.scheduled_on_sky_start_time + subtask.specified_duration
subtask.save()
shift_successors_until_after_stop_time(subtask)
def shift_successors_until_after_stop_time(subtask: Subtask):
for successor in subtask.successors:
# by default, let the successor directly follow this tasks...
successor_start_time = subtask.scheduled_on_sky_stop_time
# ... but adjust it if there is a scheduling_relation with an offset.
# so, check if these successive subtasks have different task_blueprint parents
# Note: subtasks either have the same parent task(s) or different ones, no partial overlap.
# we now need to look up all combinations between subtask and successor blueprints
# to find if theres a relation with a time offset between the tasks...
time_offsets = []
if subtask.task_blueprint.id != successor.task_blueprint.id:
relations = (TaskSchedulingRelationBlueprint.objects.filter(first=subtask.task_blueprint, second=successor.task_blueprint) |
TaskSchedulingRelationBlueprint.objects.filter(first=successor.task_blueprint, second=subtask.task_blueprint)).all()
if relations:
# there should be only one scheduling relation between the tasks
time_offsets += [relations[0].time_offset]
if len(time_offsets) > 0:
successor_start_time += timedelta(seconds=max(time_offsets))
# update the starttime and recurse to shift the successor successors as well
update_start_time_and_shift_successors_until_after_stop_time(successor, successor_start_time)
def clear_defined_subtasks_start_stop_times_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint):
'''set start/stop times of all the subtasks in the scheduling unit to None'''
for task_blueprint in scheduling_unit.task_blueprints.all():
defined_subtasks = task_blueprint.subtasks.filter(state__value='defined').all()
for subtask in defined_subtasks:
subtask.scheduled_on_sky_start_time = None
subtask.scheduled_on_sky_stop_time = None
subtask.save()
def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
if subtask.state.value != SubtaskState.Choices.DEFINED.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value))
if subtask.obsolete_since is not None:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is marked as obsolete since %s" % (subtask.pk, subtask.obsolete_since))
for predecessor in subtask.predecessors.all():
if predecessor.state.value != SubtaskState.Choices.FINISHED.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d because its predecessor id=%s in not FINISHED but state=%s"
% (subtask.pk, predecessor.pk, predecessor.state.value))
return True
def check_prerequities_for_cancelling(subtask: Subtask) -> bool:
if not SubtaskAllowedStateTransitions.objects.filter(old_state=subtask.state, new_state__value=SubtaskState.Choices.CANCELLING.value).exists():
# this check and exception is on top the the database trigger function which block any illegal transition.
# It's here to signal the intent you that we do not allow cancelling from just any random state.
raise SubtaskCancellingException("Cannot cancel subtask id=%d because it currently has state=%s" % (subtask.pk, subtask.state.value))
return True
def _create_ra_specification(_subtask):
# Should we do something with station list, for 'detecting' conflicts it can be empty
parset_dict = convert_to_parset_dict(_subtask)
return { 'tmss_id': _subtask.id,
'task_type': _subtask.specifications_template.type.value.lower(),
'task_subtype': parset_dict.get("Observation.processSubtype","").lower(),
'status': 'prescheduled' if _subtask.state.value == SubtaskState.Choices.SCHEDULING.value else 'approved',
'starttime': _subtask.scheduled_on_sky_start_time,
'endtime': _subtask.scheduled_on_sky_stop_time,
'cluster': _subtask.cluster.name,
'station_requirements': [],
'specification': parset_dict }
def assign_or_unassign_resources(subtask: Subtask):
"""
Assign/unassign resources for subtasks. If resources are not available or they do not meet requirements,
a SubtaskSchedulingException is raised.
:param subtask:
"""
if subtask.specifications_template.type.value not in (SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value):
raise SubtaskSchedulingException("Cannot assign/unassign resources for subtask id=%d because it is not an observation/pipeline. type=%s" % (subtask.pk, subtask.specifications_template.type.value))
if subtask.state.value not in (SubtaskState.Choices.SCHEDULING.value, SubtaskState.Choices.UNSCHEDULING.value):
raise SubtaskSchedulingException("Cannot assign/unassign resources for subtask id=%d because it is not in SCHEDULING state. "
"Current state=%s" % (subtask.pk, subtask.state.value))
ra_spec = _create_ra_specification(subtask)
ra_spec['predecessors'] = []
for pred in subtask.predecessors.all():
try:
ra_spec['predecessors'].append(_create_ra_specification(pred))
except:
pass
# Reason about stations only for observations with a station list
if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value and \
"stations" in subtask.specifications_doc and "station_list" in subtask.specifications_doc["stations"]:
_do_assignment_for_observations_with_required_station_check(subtask, ra_spec)
else:
with RARPC.create() as rarpc:
try:
rarpc.do_assignment(ra_spec)
except ScheduleException as e:
raise SubtaskSchedulingException("Cannot schedule/unschedule subtask id=%d. The required resources are not (fully) available." % subtask.pk)
def _do_assignment_for_observations_with_required_station_check(subtask: Subtask, ra_spec) -> bool:
"""
Try to detect conflicts and re-assign if possible.
:param subtask:
:param ra_spec:
"""
assigned = False
with RARPC.create() as rarpc:
# Try to re-assign till it succeeds. If the requirements are not met, an exception will be raised.
while not assigned:
try:
assigned = rarpc.do_assignment(ra_spec)
except ScheduleException as e:
logger.exception(e)
if not assigned:
logger.info("Conflicts in assignment detected, checking stations in conflict and re-assign if possible")
lst_stations_in_conflict = get_stations_in_conflict(subtask.id)
lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict)
ra_spec = update_specification(ra_spec, lst_stations)
return assigned
def get_stations_in_conflict(subtask_id):
"""
Retrieve a list of station names which RADB 'marked' as a resource in conflict after the last resource assignment
:param subtask_id: The subtask id
:return: lst_stations_in_conflict List of station names (string) which are in conflict
"""
lst_stations_in_conflict = []
with RADBRPC.create() as radbrpc:
task_id = radbrpc.getTask(tmss_id=subtask_id)['id']
conflict_claims = radbrpc.getResourceClaims(task_ids=[task_id], status="conflict", extended=True)
# Conflicts_claims are resources which are in conflict. Determine the resource names in conflict which are
# for example ['CS001rcu', 'CS001chan0', 'CS001bw0', 'CS001chan1', 'CS001bw1']
resource_names_in_conflict = []
for resc in conflict_claims:
# cross check on status in conflict
if resc["status"] == "conflict":
resource_names_in_conflict.append(resc["resource_name"])
logger.info("Resource names with conflict %s" % resource_names_in_conflict)
# Now get for all the resources in conflict its parent_id. Check for all parent_id which is
# resource_group_type 'station', this will be the station name in conflict which we need
resource_group_memberships = radbrpc.getResourceGroupMemberships()
parent_ids = []
for resc in resource_group_memberships["resources"].values():
if resc["resource_name"] in resource_names_in_conflict:
parent_ids.extend(resc['parent_group_ids'])
logger.info("Parent group ids with conflict %s" % parent_ids)
for parent_id in list(set(parent_ids)):
resc_group_item = resource_group_memberships["groups"][parent_id]
if resc_group_item["resource_group_type"] == "station":
lst_stations_in_conflict.append(resc_group_item["resource_group_name"])
logger.info("Stations in conflict %s", lst_stations_in_conflict)
return lst_stations_in_conflict
def determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict):
"""
Determine which stations can be assigned when conflict of stations are occurred
Station in conflict should be removed.
Use the max_nr_missing from the task specifications and the conflicted station list to create a station list
which should be possible to assign. If the number of max missing in a station group is larger than the station
to be skipped, then new assignment is not possible so raise an SubtaskSchedulingException with context
:param subtask:
:param lst_stations_in_conflict:
:return: lst_stations: List of station which can be assigned
"""
# Get the station list from specification and remove the conflict stations
lst_specified_stations = subtask.specifications_doc["stations"]["station_list"]
lst_stations = list(set(lst_specified_stations) - set(lst_stations_in_conflict))
logger.info("Determine stations which can be assigned %s" % lst_stations)
# Check whether the removing of the conflict station the requirements of max_nr_missing per station_group is
# still fulfilled. If that is OK then we are done otherwise we will raise an Exception
stations_groups = get_station_groups(subtask)
for sg in stations_groups:
nbr_missing = len(set(sg["stations"]) & set(lst_stations_in_conflict))
if nbr_missing > sg["max_nr_missing"]:
raise SubtaskSchedulingException("There are more stations in conflict than the specification is given "
"(%d is larger than %d). The stations that are in conflict are '%s'."
"Please check station of subtask %d " %
(nbr_missing, sg["max_nr_missing"], lst_stations_in_conflict, subtask.pk))
return lst_stations
def get_station_groups(subtask):
"""
Retrieve the stations_group specifications of the given subtask
Need to retrieve it from (related) Target Observation Task
Note list can be empty (some testcase) which result in no checking max_nr_missing
:param subtask:
:return: station_groups which is a list of dict. { station_list, max_nr_missing }
"""
station_groups = []
if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint, _ = get_related_target_observation_task_blueprint(subtask.task_blueprint)
if target_task_blueprint is None:
raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" %
(subtask.task_blueprint.id, subtask.id))
if "station_groups" in target_task_blueprint.specifications_doc.keys():
station_groups = target_task_blueprint.specifications_doc["station_groups"]
else:
if "station_groups" in subtask.task_blueprint.specifications_doc.keys():
station_groups = subtask.task_blueprint.specifications_doc["station_groups"]
return station_groups
def update_specification(ra_spec, lst_stations):
"""
Update the RA Specification dictionary with the correct list of stations
:param ra_spec: Dictionary of the RA specification
:param lst_stations: List of stations to 'assign'
:return: Dictionary with updated RA specification
"""
if len(lst_stations) == 0:
raise SubtaskSchedulingSpecificationException("Cannot re-assign resources after conflict for subtask id=%d "
"because there are no stations left to assign. " % ra_spec["tmss_id"])
updated_ra_spec = ra_spec
updated_ra_spec["specification"]["Observation.VirtualInstrument.stationList"] = "[%s]" % ','.join(s for s in lst_stations)
# ?? should the station_requirements also be updated or just leave that empty '[]' assume for now it can be empty
return updated_ra_spec
def schedule_qafile_subtask(qafile_subtask: Subtask):
''' Schedule the given qafile_subtask (which converts the observation output to a QA h5 file)
This method should typically be called upon the event of the observation_subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(qafile_subtask)
if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk,
qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
if len(qafile_subtask.inputs.all()) != 1:
raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qafile_subtask.id, len(qafile_subtask.inputs)))
# step 1: set state to SCHEDULING
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
qafile_subtask.save()
# step 2: link input dataproducts
qa_input = qafile_subtask.inputs.first()
qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
# step 3: resource assigner
# is a no-op for QA
# step 4: create output dataproducts, and link these to the output
# TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
if qafile_subtask.outputs.first():
dp_spec_template = DataproductSpecificationsTemplate.objects.get(name="empty")
dp_feedvack_template = DataproductFeedbackTemplate.objects.get(name="empty")
qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%s_QA.h5" % (qa_input.producer.subtask_id, ),
directory="/data/qa/qa_files",
dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value),
datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct?
producer=qafile_subtask.outputs.first(),
specifications_doc=dp_spec_template.get_default_json_document_for_schema(),
specifications_template=dp_spec_template,
feedback_doc=dp_feedvack_template.get_default_json_document_for_schema(),
feedback_template=dp_feedvack_template,
sap=None # todo: do we need to point to a SAP here? Of which dataproduct then?
)
qafile_subtask_dataproduct.save()
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
qafile_subtask.save()
return qafile_subtask
def schedule_qaplots_subtask(qaplots_subtask: Subtask):
''' Schedule the given qaplots_subtask (which creates inspection plots from a QA h5 file)
This method should typically be called upon the event of the qafile_subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(qaplots_subtask)
if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk,
qaplots_subtask.specifications_template.type,
SubtaskType.Choices.QA_PLOTS.value))
if len(qaplots_subtask.inputs.all()) != 1:
raise SubtaskSchedulingSpecificationException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs)))
# step 1: set state to SCHEDULING
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
qaplots_subtask.save()
# step 2: link input dataproducts
# this should typically be a single input with a single dataproduct (the qa h5 file)
qa_input = qaplots_subtask.inputs.first()
qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
# step 3: resource assigner
# is a no-op for QA
# step 4: create output dataproducts, and link these to the output
# TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
qafile_subtask = qaplots_subtask.predecessors.first()
obs_subtask = qafile_subtask.predecessors.first()
dp_spec_template = DataproductSpecificationsTemplate.objects.get(name="empty")
dp_feedvack_template = DataproductFeedbackTemplate.objects.get(name="empty")
qaplots_subtask_dataproduct = Dataproduct.objects.create(directory="/data/qa/plots/L%s" % (obs_subtask.id, ),
dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value),
datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value), # todo: is this correct?
producer=qaplots_subtask.outputs.first(),
specifications_doc=dp_spec_template.get_default_json_document_for_schema(),
specifications_template=dp_spec_template,
feedback_doc=dp_feedvack_template.get_default_json_document_for_schema(),
feedback_template=dp_feedvack_template,
sap=None # todo: do we need to point to a SAP here? Of which dataproduct then?
)
qaplots_subtask_dataproduct.save()
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
qaplots_subtask.save()
return qaplots_subtask
# todo: this can probably go when we switch to the new start time calculation in the model properties (which is based on this logic)
def get_previous_related_task_blueprint_with_time_offset(task_blueprint):
"""
Retrieve the the previous related blueprint task object (if any)
if nothing found return None, 0.
:param task_blueprint:
:return: previous_related_task_blueprint,
time_offset (in seconds)
"""
logger.info("get_previous_related_task_blueprint_with_time_offset %s (id=%s)", task_blueprint.name, task_blueprint.pk)
previous_related_task_blueprint = None
time_offset = 0
scheduling_relations = list(task_blueprint.first_scheduling_relation.all()) + list(task_blueprint.second_scheduling_relation.all())
for scheduling_relation in scheduling_relations:
if scheduling_relation.first.id == task_blueprint.id and scheduling_relation.placement.value == "after":
previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.second.id)
time_offset = scheduling_relation.time_offset
if scheduling_relation.second.id == task_blueprint.id and scheduling_relation.placement.value == "before":
previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.first.id)
time_offset = scheduling_relation.time_offset
return previous_related_task_blueprint, time_offset
def _bulk_create_dataproducts_with_global_identifiers(dataproducts: list) -> list:
"""
Bulk create the provided dataproducts in the database, and give each of them an unique global identifier.
:return: the created dataproduct objects
"""
# Bulk create identifiers, and then update the dataproducts with a link to the actual created objects.
# This is needed as bulk_create needs to have any relations resolved.
dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts])
for dp, global_identifier in zip(dataproducts, dp_global_identifiers):
dp.global_identifier = global_identifier
return Dataproduct.objects.bulk_create(dataproducts)
def _output_root_directory(subtask: Subtask) -> str:
""" Return the directory under which output needs to be stored. """
# Support for several projects will be added in TMSS-689
directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects",
subtask.project.name,
subtask.id)
return directory
def schedule_observation_subtask(observation_subtask: Subtask):
''' Schedule the given observation_subtask
For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler.
For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(observation_subtask)
if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk,
observation_subtask.specifications_template.type,
SubtaskType.Choices.OBSERVATION.value))
# step 1: set state to SCHEDULING
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
observation_subtask.save()
# step 1a: check start/stop times
# start time should be known. If not raise. Then the user and/or scheduling service should supply a properly calculated/estimated start_time first.
if observation_subtask.scheduled_on_sky_start_time is None:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no start_time" % (observation_subtask.pk,
observation_subtask.specifications_template.type))
if observation_subtask.specified_duration < timedelta(seconds=1):
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (observation_subtask.pk,
observation_subtask.specifications_template.type,
observation_subtask.specified_duration))
# always update the stop_time according to the spec
observation_subtask.scheduled_on_sky_stop_time = observation_subtask.scheduled_on_sky_start_time + observation_subtask.specified_duration
# step 2: define input dataproducts
# NOOP: observations take no inputs
# step 3: create output dataproducts, and link these to the output
dataproducts = []
specifications_doc = observation_subtask.specifications_doc
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP")
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
dataproduct_feedback_doc = dataproduct_feedback_template.get_default_json_document_for_schema()
# select correct output for each pointing based on name
subtask_output_dict = {}
output = observation_subtask.outputs.first()
if not output:
raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it is missing the output' % (observation_subtask.id,))
target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(observation_subtask.task_blueprint)
if target_task_spec and 'SAPs' in target_task_spec: # target
for sap in target_task_spec['SAPs']:
subtask_output_dict[sap['name']] = output
if calibrator_task_spec and 'pointing' in calibrator_task_spec: # calibrator
subtask_output_dict[calibrator_task_spec['name']] = output
# create SAP objects, as observations create new beams
antennaset = specifications_doc['stations']['antenna_set']
antennafields = []
for station in specifications_doc['stations']['station_list']:
fields = antennafields_for_antennaset_and_station(antennaset, station)
antennafields += [{"station": station, "field": field, "type": antennaset.split('_')[0]} for field in fields]
saps = [SAP.objects.create(specifications_doc={ "name": "L%s_SAP%03d_%s" % (observation_subtask.id, sap_nr, pointing['name']),
"pointing": pointing['pointing'],
"time": {"start_time": observation_subtask.scheduled_on_sky_start_time.isoformat()+'Z',
"duration": (observation_subtask.scheduled_on_sky_stop_time - observation_subtask.scheduled_on_sky_start_time).total_seconds()},
"antennas": {
"antenna_set": antennaset,
"fields": antennafields
}
},
specifications_template=SAPTemplate.objects.get(name="SAP")) for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings'])]
# store everything below this directory
directory = _output_root_directory(observation_subtask)
# create correlated dataproducts
if specifications_doc['COBALT']['correlator']['enabled']:
dataformat = Dataformat.objects.get(value=Dataformat.Choices.MEASUREMENTSET.value)
datatype = Datatype.objects.get(value=Datatype.Choices.VISIBILITIES.value)
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities")
sb_nr_offset = 0 # subband numbers run from 0 to (nr_subbands-1), increasing across SAPs
for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']):
if pointing['name'] in subtask_output_dict:
subtask_output = subtask_output_dict[pointing['name']]
else:
raise SubtaskSchedulingException('Cannot schedule subtask id=%s because the output for pointing name=%s cannot be determined.' % (observation_subtask.id, pointing['name']))
for sb_nr, subband in enumerate(pointing['subbands'], start=sb_nr_offset):
dataproducts.append(Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
directory=os.path.join(directory, "uv"),
dataformat=dataformat,
datatype=datatype,
producer=subtask_output,
specifications_doc={"sap": pointing["name"], "subband": subband},
specifications_template=dataproduct_specifications_template,
feedback_doc=dataproduct_feedback_doc,
feedback_template=dataproduct_feedback_template,
size=0,
expected_size=0,
sap=saps[sap_nr],
global_identifier=None))
sb_nr_offset += len(pointing['subbands'])
# create beamformer dataproducts
dataproduct_specifications_template_timeseries = DataproductSpecificationsTemplate.objects.get(name="time series")
def _sap_index(saps: dict, sap_name: str) -> int:
""" Return the SAP index in the observation given a certain SAP name. """
sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name]
# needs to be exactly one hit
if len(sap_indices) != 1:
raise SubtaskSchedulingException("SAP name %s must appear exactly once in the specification. It appeared %d times. Available names: %s" % (sap_name, len(sap_indices), [sap['name'] for sap in saps]))
return sap_indices[0]
def tab_dataproducts(sap_nr, pipeline_nr, tab_nr, stokes_settings, coherent):
nr_subbands = len(sap['subbands']) or len(specifications_doc['stations']['digital_pointings'][sap_nr]['subbands'])
nr_stokes = len(stokes_settings['stokes'])
nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])
cobalt_version = specifications_doc['COBALT']['version']
return [Dataproduct(filename=f"L{observation_subtask.id}_SAP{sap_nr:03}_B{tab_nr:03}_S{stokes_nr}_P{part_nr:03}_bf.h5" if cobalt_version == 1 else
f"L{observation_subtask.id}_SAP{sap_nr:03}_N{pipeline_nr:03}_B{tab_nr:03}_S{stokes_nr:03}_P{part_nr:03}_bf.h5",
directory=directory+("/cs" if coherent else "/is"),
dataformat=Dataformat.objects.get(value="Beamformed"),
datatype=Datatype.objects.get(value="time series"),
producer=observation_subtask.outputs.first(), # todo: select correct output. I tried "subtask_output_dict[sap['name']]" but tests fail because the sap's name is not in the task blueprint. Maybe it's just test setup and this should work?
specifications_doc={
"sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"],
"coherent": coherent,
"stokes_set": stokes_settings["stokes"],
"identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}
},
specifications_template=dataproduct_specifications_template_timeseries,
feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
feedback_template=dataproduct_feedback_template,
size=0,
expected_size=1024*1024*1024*tab_nr,
sap=saps[sap_nr],
global_identifier=None)
for part_nr in range(nr_parts) for stokes_nr in range(nr_stokes)]
# beamformer pipelines: one set of dataproducts per TAB.
pipeline_nr_offset = 0
for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['tab_pipelines'], start=pipeline_nr_offset):
for sap in pipeline['SAPs']:
sap_idx = _sap_index(specifications_doc['stations']['digital_pointings'], sap['name'])
for tab_idx, tab in enumerate(sap['tabs']):
dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'] if tab['coherent'] else pipeline['incoherent'], tab['coherent'])
# fly's eye pipelines: one set of dataproducts per antenna field.
pipeline_nr_offset += len(specifications_doc['COBALT']['beamformer']['tab_pipelines'])
for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_nr_offset):
for sap_idx, sap in enumerate(specifications_doc['stations']['digital_pointings']):
stations = pipeline['stations'] or specifications_doc['stations']['station_list']
fields = sum([list(antenna_fields(station, antennaset)) for station in stations], [])
for tab_idx, tab in enumerate(fields):
dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'], True)
# create the dataproducts
_bulk_create_dataproducts_with_global_identifiers(dataproducts)
# step 4: resource assigner (if possible)
assign_or_unassign_resources(observation_subtask)
# TODO: TMSS-382: evaluate the scheduled stations and see if the requiments given in the subtask.task_bluepring.specifications_doc are met for the station_groups and max_nr_missing.
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
# step 6: set scheduled_process_start/stop_time for MACScheduler
observation_subtask.scheduled_process_start_time = observation_subtask.scheduled_on_sky_start_time - timedelta(minutes=3)
observation_subtask.scheduled_process_stop_time = observation_subtask.scheduled_on_sky_stop_time + timedelta(minutes=1)
observation_subtask.save()
return observation_subtask
def _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list):
# select subtask output the new dataproducts will be linked to
pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output?
# TODO: create them from the spec, instead of "copying" the input filename
dataformat = Dataformat.objects.get(value="MeasurementSet")
datatype = Datatype.objects.get(value="visibilities")
# TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty"
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities")
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
directory = os.path.join(_output_root_directory(pipeline_subtask), "uv")
# input:output mapping is 1:1
def output_dataproduct_filename(input_dp: Dataproduct) -> str:
""" Construct the output filename to produce for an input. """
if '_' in input_dp.filename and input_dp.filename.startswith('L'):
return "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
else:
return "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
output_dataproducts = [Dataproduct(filename=output_dataproduct_filename(input_dp),
directory=directory,
dataformat=dataformat,
datatype=datatype,
producer=pipeline_subtask_output,
specifications_doc=input_dp.specifications_doc,
specifications_template=dataproduct_specifications_template,
feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
feedback_template=dataproduct_feedback_template,
sap=input_dp.sap,
global_identifier=None) for input_dp in input_dataproducts]
# create the dataproducts
output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts)
pipeline_subtask_output.dataproducts.set(output_dataproducts)
transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dataproducts, output_dataproducts)]
DataproductTransform.objects.bulk_create(transforms)
return output_dataproducts
def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list):
# select subtask output the new dataproducts will be linked to
pipeline_subtask_output = pipeline_subtask.outputs.first() # TODO: if we have several, how to map input to output?
dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
directory = os.path.join(_output_root_directory(pipeline_subtask), "pulp")
# ----- output tarball per input dataproduct
# we process per data type, as later on we produce summaries per data type
# different datatypes need different treatment
data_types = ["cs", "is", "cv"]
# sort input dataproducts by data type
input_dataproducts = {
"is": [input_dp for input_dp in input_dataproducts
if input_dp.specifications_doc["coherent"] == False],
"cs": [input_dp for input_dp in input_dataproducts
if input_dp.specifications_doc["coherent"] == True
and input_dp.specifications_doc["stokes_set"] != "XXYY"],
"cv": [input_dp for input_dp in input_dataproducts
if input_dp.specifications_doc["coherent"] == True
and input_dp.specifications_doc["stokes_set"] == "XXYY"],
}
# in which subdirectory the output will be written
output_subdir = {
"is": "is",
"cs": "cs",
"cv": "cs",
}
# suffix used in filename of summary outputs
summary_filename_suffix = {
"cs": "CS",
"is": "IS",
"cv": "CV"
}
for data_type in data_types:
# do not generate output for a data type that is not in the input
if not input_dataproducts[data_type]:
continue
dataformat = Dataformat.objects.get(value="pulp analysis")
datatype = Datatype.objects.get(value="pulsar profile")
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="time series")
# how the output is constructed from the input. we do this based on filenames: input dataproducts that result in the same filename
# are combined to produce the output with that filename.
#
# format: { output_dp_filename: {"output_dp": output_dp, "input_dps": [input_dps]} }
transformation_map = {}
for input_dp in input_dataproducts[data_type]:
# the filename doesn't contain the stokes number if there is only 1 stokes for this data type or when recording cv:
#
# type example filename input:output mapping
# CV pulp/cs/L808292_SAP000_B000_P008_bf.tar 4:1
# IS I pulp/is/L808290_SAP000_B001_P000_bf.tar 1:1
# IS IQUV pulp/is/L808290_SAP000_B001_S2_P000_bf.tar 1:1
# CS I pulp/cs/L808286_SAP000_B000_P000_bf.tar 1:1
# CS IQUV pulp/cs/L808288_SAP000_B000_S2_P000_bf.tar 1:1
output_filename = ("L{subtask_id}_SAP{sap_index:03}_B{tab_index:03}_S{stokes_index}_P{part_index:03}_bf.tar" if input_dp.specifications_doc["stokes_set"] == "IQUV" else
"L{subtask_id}_SAP{sap_index:03}_B{tab_index:03}_P{part_index:03}_bf.tar").format(
subtask_id=pipeline_subtask.id,
sap_index=input_dp.specifications_doc["identifiers"]["sap_index"],
tab_index=input_dp.specifications_doc["identifiers"]["tab_index"],
stokes_index=input_dp.specifications_doc["identifiers"]["stokes_index"],
part_index=input_dp.specifications_doc["identifiers"]["part_index"])
if output_filename in transformation_map:
# we already modelled this output. add this input_dp to the input list
#
# This happens with CV only, where we've generated the same output_filename for each stokes, yet have more than one stokes (Xre,Xim,Yre,Yim)
transformation_map[output_filename]["input_dps"].append(input_dp)
else:
# a new output to model
output_dp = Dataproduct(filename=output_filename,
directory=os.path.join(directory, output_subdir[data_type]),
dataformat=dataformat,
datatype=datatype,
producer=pipeline_subtask_output,
specifications_doc=input_dp.specifications_doc,
specifications_template=dataproduct_specifications_template,
feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
feedback_template=dataproduct_feedback_template,
sap=input_dp.sap,
global_identifier=None)
transformation_map[output_filename] = { "output_dp": output_dp, "input_dps": [input_dp] }
# we don't need the keys anymore.. ditch them to work with a list, which is easier
transformations = transformation_map.values()
# list and create the dataproducts
output_dataproducts = [t["output_dp"] for t in transformations]
output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts)
pipeline_subtask_output.dataproducts.set(output_dataproducts)
# put the actually saved output_dataproducts back in, so we can refer to the actually created ones.
# by iterating in the same order that we requested to save them, we can now reiterate and replace 1:1
for nr, transform in enumerate(transformations):
transform["output_dp"] = output_dataproducts[nr]
# construct the transform objects
transformation_objects = [[DataproductTransform(input=input_dp, output=transform["output_dp"], identity=False) for input_dp in transform["input_dps"]] for transform in transformations]
# create the transforms (needs a flattened list)
DataproductTransform.objects.bulk_create(sum(transformation_objects,[]))
# ----- summary tarballs
# there is a tarball for cv, cs, and is separately.
# pulp only processes input from a single observation, so there is no possibility of needing separate
# summaries for input from different observations.
dataformat = Dataformat.objects.get(value="pulp summary")
datatype = Datatype.objects.get(value="quality")
dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="pulp summary")
# 1. create dataproducts
# type example filename
# CV pulp/cs/L808292_summaryCV.tar
# IS pulp/is/L808290_summaryIS.tar
# CS pulp/cs/L808288_summaryCS.tar
summary_dataproduct = Dataproduct(filename="L%s_summary%s.tar" % (pipeline_subtask.id, summary_filename_suffix[data_type]),
directory=os.path.join(directory, output_subdir[data_type]),
dataformat=dataformat,
datatype=datatype,
producer=pipeline_subtask_output,
specifications_doc={ "coherent": data_type != "is", "identifiers": { "data_type": data_type } },
specifications_template=dataproduct_specifications_template,
feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
feedback_template=dataproduct_feedback_template,
sap=None, # TODO: Can we say anything here, as summaries cover all SAPs
global_identifier=None)
# create the dataproducts
summary_dataproduct = _bulk_create_dataproducts_with_global_identifiers([summary_dataproduct])[0]
pipeline_subtask_output.dataproducts.add(summary_dataproduct)
# 2. create transforms from the input
# populate the transform, each input_dp of this datatype is input for this summary
transforms = [DataproductTransform(input=input_dp, output=summary_dataproduct, identity=False) for input_dp in input_dataproducts[data_type]]
DataproductTransform.objects.bulk_create(transforms)
return None
def schedule_pipeline_subtask(pipeline_subtask: Subtask):
''' Schedule the given pipeline_subtask
This method should typically be called upon the event of an predecessor (observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(pipeline_subtask)
if pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type,
SubtaskType.Choices.PIPELINE.value))
# step 1: set state to SCHEDULING
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
pipeline_subtask.save()
# step 1a: check start/stop times
# not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
if pipeline_subtask.scheduled_on_sky_start_time is None:
now = datetime.utcnow()
logger.info("pipeline id=%s has no starttime. assigned default: %s", pipeline_subtask.pk, formatDatetime(now))
pipeline_subtask.scheduled_on_sky_start_time = now
if pipeline_subtask.specified_duration < timedelta(seconds=1):
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type,
pipeline_subtask.specified_duration))
# always update the stop_time according to the spec
pipeline_subtask.scheduled_on_sky_stop_time = pipeline_subtask.scheduled_on_sky_start_time + pipeline_subtask.specified_duration
# step 2: link input dataproducts
if pipeline_subtask.inputs.count() == 0:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type))
# iterate over all inputs
input_dataproducts = []
for pipeline_subtask_input in pipeline_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)]
if len(dataproducts) == 0:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because input id=%s has no (filtered) dataproducts" % (pipeline_subtask.pk,
pipeline_subtask.specifications_template.type,
pipeline_subtask_input.id))
pipeline_subtask_input.dataproducts.set(dataproducts)
input_dataproducts.extend(dataproducts)
# step 3: create output dataproducts, and link these to the output
if pipeline_subtask.specifications_template.name == "preprocessing pipeline":
_create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask, input_dataproducts)
elif pipeline_subtask.specifications_template.name == "pulsar pipeline":
_create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask, input_dataproducts)
# step 4: resource assigner (if possible)
assign_or_unassign_resources(pipeline_subtask)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
pipeline_subtask.save()
return pipeline_subtask
def schedule_ingest_subtask(ingest_subtask: Subtask):
''' Schedule the given ingest_subtask
This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(ingest_subtask)
if ingest_subtask.specifications_template.type.value != SubtaskType.Choices.INGEST.value:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (ingest_subtask.pk,
ingest_subtask.specifications_template.type,
SubtaskType.Choices.INGEST.value))
# check permission pre-requisites
scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint # first() is fine because we assume an ingest subtask does not serve tasks across SU boundaries
if scheduling_unit_blueprint.ingest_permission_required:
if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow():
raise SubtaskSchedulingException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,))
# step 1: set state to SCHEDULING
ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
ingest_subtask.save()
# step 1a: set start/stop times
# not very relevant for ingest subtasks, but it's nice for the user to see when the ingest task was scheduled.
# please note that an ingest subtask may idle for some time while it is in the ingest queue.
# the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops.
ingest_subtask.scheduled_on_sky_start_time = max([pred.scheduled_on_sky_stop_time for pred in ingest_subtask.predecessors] + [datetime.utcnow()])
ingest_subtask.scheduled_on_sky_stop_time = ingest_subtask.scheduled_on_sky_start_time + timedelta(hours=6)
# step 2: link input dataproducts
if ingest_subtask.inputs.count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (ingest_subtask.pk,
ingest_subtask.specifications_template.type))
if ingest_subtask.inputs.select_related('producer.dataproducts').count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input dataproduct(s)" % (ingest_subtask.pk,
ingest_subtask.specifications_template.type))
# iterate over all inputs
for ingest_subtask_input in ingest_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
input_dataproducts = [dataproduct for dataproduct in ingest_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, ingest_subtask_input.selection_doc)]
ingest_subtask_input.dataproducts.set(input_dataproducts)
# define output and create output dataproducts.
ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask)
# prepare identifiers in bulk for each output_dataproduct
dp_gids = [SIPidentifier(source="TMSS") for _ in input_dataproducts]
SIPidentifier.objects.bulk_create(dp_gids)
output_dataproducts = [Dataproduct(filename=input_dp.filename, # overwritten later by ingest 'feedback'. Is determined at transfer time by the LTA.
directory="LTA", # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA.
dataformat=input_dp.dataformat,
datatype=input_dp.datatype,
specifications_doc=input_dp.specifications_doc,
specifications_template=input_dp.specifications_template,
producer=ingest_subtask_output,
size=None, # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA.
feedback_doc=input_dp.feedback_doc, # copy dp feedback from input dp. The ingest subtask does not alter the feedback/dataproducts.
feedback_template=input_dp.feedback_template,
sap=input_dp.sap,
global_identifier=dp_gid) for input_dp, dp_gid in zip(input_dataproducts, dp_gids)]
Dataproduct.objects.bulk_create(output_dataproducts)
# link each input to each corresponding output dataproduct. identity=True because this is "just a copy".
dataproduct_transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=True)
for input_dp, output_dp in zip(input_dataproducts, output_dataproducts)]
DataproductTransform.objects.bulk_create(dataproduct_transforms)
# skip step 4: ingest does not need to have resources assigned
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
ingest_subtask.save()
return ingest_subtask
def schedule_cleanup_subtask(cleanup_subtask: Subtask):
''' Schedule the given cleanup_subtask
This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(cleanup_subtask)
if cleanup_subtask.specifications_template.type.value != SubtaskType.Choices.CLEANUP.value:
raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (cleanup_subtask.pk,
cleanup_subtask.specifications_template.type,
SubtaskType.Choices.CLEANUP.value))
# step 1: set state to SCHEDULING
cleanup_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
cleanup_subtask.save()
# step 1a: set start/stop times
# not very relevant for ingest subtasks, but it's nice for the user to see when the cleanup task was scheduled.
# please note that an cleanup subtask may idle for some time while it is in the cleanup queue.
# the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops.
cleanup_subtask.scheduled_on_sky_start_time = max([pred.scheduled_on_sky_stop_time for pred in cleanup_subtask.predecessors] + [datetime.utcnow()])
cleanup_subtask.scheduled_on_sky_stop_time = cleanup_subtask.scheduled_on_sky_start_time + timedelta(hours=1)
# step 2: link input dataproducts
if cleanup_subtask.inputs.count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (cleanup_subtask.pk,
cleanup_subtask.specifications_template.type))
# iterate over all inputs
for cleanup_subtask_input in cleanup_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
input_dataproducts = [dataproduct for dataproduct in cleanup_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, cleanup_subtask_input.selection_doc)]
cleanup_subtask_input.dataproducts.set(input_dataproducts)
# cleanup has no outputs
# skip step 4: cleanup does not need to have resources assigned
# step 5: set state to SCHEDULED (resulting in the cleanup_service to pick this subtask up and run it)
cleanup_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
cleanup_subtask.save()
return cleanup_subtask
def schedule_copy_subtask(copy_subtask: Subtask):
''' Schedule the given copy_subtask
This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
This method implements "Scheduling subtasks" step from the "Specification Flow"
https://support.astron.nl/confluence/display/TMSS/Specification+Flow
'''
# step 0: check pre-requisites
check_prerequities_for_scheduling(copy_subtask)
if copy_subtask.specifications_template.type.value != SubtaskType.Choices.COPY.value:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (copy_subtask.pk,
copy_subtask.specifications_template.type,
SubtaskType.Choices.COPY.value))
# step 1: set state to SCHEDULING
copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
copy_subtask.save()
# step 1a: check start/stop times
# not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
if copy_subtask.scheduled_on_sky_start_time is None:
now = datetime.utcnow()
logger.info("copy id=%s has no starttime. assigned default: %s", copy_subtask.pk, formatDatetime(now))
copy_subtask.scheduled_on_sky_start_time = now
if copy_subtask.scheduled_on_sky_stop_time is None:
stop_time = copy_subtask.scheduled_on_sky_start_time + timedelta(hours=+1)
logger.info("copy id=%s has no stop_time. assigned default: %s", copy_subtask.pk, formatDatetime(stop_time))
copy_subtask.scheduled_on_sky_stop_time = stop_time
# step 2: link input dataproducts
if copy_subtask.inputs.count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (copy_subtask.pk,
copy_subtask.specifications_template.type))
# iterate over all inputs
for copy_subtask_input in copy_subtask.inputs.all():
# select and set input dataproducts that meet the filter defined in selection_doc
dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all()
if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)]
copy_subtask_input.dataproducts.set(dataproducts)
# todo: I assume that there is no RA involvement here? If there is, how does a copy parset look like?
# step 4: resource assigner (if possible)
#_assign_resources(copy_subtask)
# step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
copy_subtask.save()
return copy_subtask
# === Misc ===
def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint, start_time: datetime=None) -> [Subtask]:
'''Convenience method: Schedule (and return) the subtasks in the task_blueprint that are not dependend on any predecessors'''
independent_subtasks = list(Subtask.independent_subtasks().filter(task_blueprint__id=task_blueprint.id, state__value=SubtaskState.Choices.DEFINED.value).all())
for subtask in independent_subtasks:
if start_time is not None:
subtask.scheduled_on_sky_start_time = start_time
schedule_subtask_and_update_successor_start_times(subtask)
return independent_subtasks
def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs):
subtask_specs = default_subtask_specs
subtask_specs['storagemanager'] = preprocessing_task_specs['storagemanager']
subtask_specs['cluster_resources'] = preprocessing_task_specs['cluster_resources']
# averaging (performed by the demixer)
subtask_specs["demixer"]["enabled"] = True
subtask_specs['demixer']["frequency_steps"] = preprocessing_task_specs['average']['frequency_steps']
subtask_specs['demixer']["time_steps"] = preprocessing_task_specs['average']['time_steps']
# demixing
subtask_specs['demixer']["demix_frequency_steps"] = preprocessing_task_specs['demix']['frequency_steps']
subtask_specs['demixer']["demix_time_steps"] = preprocessing_task_specs['demix']['time_steps']
subtask_specs['demixer']["ignore_target"] = preprocessing_task_specs['demix']['ignore_target']
subtask_specs['demixer']["demix_always"] = preprocessing_task_specs['demix']['sources']
subtask_specs['demixer']["demix_if_needed"] = []
# flagging
if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none':
subtask_specs["aoflagger"]["enabled"] = True
subtask_specs["aoflagger"]["strategy"] = preprocessing_task_specs["flag"]["rfi_strategy"]
else:
subtask_specs["aoflagger"]["enabled"] = False
if preprocessing_task_specs["flag"]["outerchannels"]:
subtask_specs["preflagger0"]["enabled"] = True
subtask_specs["preflagger0"]["channels"] = "0..nchan/32-1,31*nchan/32..nchan-1"
else:
subtask_specs["preflagger0"]["enabled"] = False
if preprocessing_task_specs["flag"]["autocorrelations"]:
subtask_specs["preflagger1"]["enabled"] = True
subtask_specs["preflagger1"]["corrtype"] = "auto"
else:
subtask_specs["preflagger1"]["enabled"] = False
return subtask_specs
def _generate_subtask_specs_from_pulsar_pipeline_task_specs(pipeline_task_specs, default_subtask_specs):
subtask_specs = {}
subtask_specs['cluster_resources'] = pipeline_task_specs['cluster_resources']
# Pulsar to fold
if pipeline_task_specs["pulsar"]["strategy"] == "manual":
# pulsar is specified explicitly
subtask_specs["pulsar"] = pipeline_task_specs["pulsar"]["name"]
else:
# search for the pulsar (f.e. in a library, based on the SAP direction)
subtask_specs["pulsar"] = pipeline_task_specs["pulsar"]["strategy"]
subtask_specs["single_pulse"] = pipeline_task_specs["single_pulse_search"]
# PRESTO
presto_specs = pipeline_task_specs["presto"]
subtask_specs["presto"] = {}
subtask_specs["presto"]["2bf2fits_extra_opts"] = "-nsamples={samples_per_block}".format(**presto_specs["input"])
subtask_specs["presto"]["decode_nblocks"] = presto_specs["input"]["nr_blocks"]
subtask_specs["presto"]["decode_sigma"] = presto_specs["input"]["decode_sigma"]
subtask_specs["presto"]["nofold"] = not presto_specs["fold_profile"]
subtask_specs["presto"]["skip_prepfold"] = not presto_specs["prepfold"]
subtask_specs["presto"]["rrats"] = presto_specs["rrats"]["enabled"]
subtask_specs["presto"]["rrats_dm_range"] = presto_specs["rrats"]["dm_range"]
subtask_specs["presto"]["prepdata_extra_opts"] = " ".join([
"-dm {dm}".format(**presto_specs["prepdata"]) if presto_specs["prepdata"]["dm"] != -1 else "",
])
subtask_specs["presto"]["prepfold_extra_opts"] = ""
subtask_specs["presto"]["prepsubband_extra_opts"] = ""
subtask_specs["presto"]["rfifind_extra_opts"] = "".join([
"-blocks {blocks}".format(**presto_specs["rfifind"]),
])
# DSPSR
dspsr_specs = pipeline_task_specs["dspsr"]
digifil_specs = dspsr_specs["digifil"]
filterbank_specs = dspsr_specs["filterbank"]
subtask_specs["dspsr"] = {}
subtask_specs["dspsr"]["skip_dspsr"] = not dspsr_specs["enabled"]
subtask_specs["dspsr"]["digifil_extra_opts"] = " ".join([
"-D {dm}".format(**digifil_specs) if digifil_specs["dm"] != -1 else "",
"-t {integration_time}".format(**digifil_specs),
"-f {frequency_channels}{dedispersion}".format(**digifil_specs, dedispersion=":D" if digifil_specs["coherent_dedispersion"] else ""),
])
subtask_specs["dspsr"]["nopdmp"] = not dspsr_specs["optimise_period_dm"]
subtask_specs["dspsr"]["norfi"] = not dspsr_specs["rfi_excision"]
subtask_specs["dspsr"]["tsubint"] = dspsr_specs["subintegration_length"]
subtask_specs["dspsr"]["dspsr_extra_opts"] = " ".join([
"-U minX1 -t 1", # minimise memory usage, and use 1 thread
"-s -K" if dspsr_specs["single_pulse_subintegration"] else "",
"-F {frequency_channels}{dedispersion}".format(**filterbank_specs, dedispersion=":D" if filterbank_specs["coherent_dedispersion"] else "") if filterbank_specs["enabled"] else "",
])
# output
output_specs = pipeline_task_specs["output"]
subtask_specs["output"] = {}
subtask_specs["output"]["raw_to_8bit"] = output_specs["quantisation"]["enabled"]
subtask_specs["output"]["8bit_conversion_sigma"] = output_specs["quantisation"]["scale"]
subtask_specs["output"]["skip_dynamic_spectrum"] = not output_specs["dynamic_spectrum"]["enabled"]
subtask_specs["output"]["dynamic_spectrum_time_average"] = output_specs["dynamic_spectrum"]["time_average"]
return subtask_specs
def specifications_doc_meets_selection_doc(specifications_doc, selection_doc):
"""
Filter specs by selection. This requires the specification_doc to...
A) ...contain ALL KEYS that we select / filter for
B) ...contain NO ADDITIONAL VALUES that are not selected / filtered for
:param specifications_doc: dataproduct specification as dict
:param selection_doc: selection filter as dict
:return: True when the input specifications_doc meets a filter described in selection_doc, False otherwise
"""
meets_criteria = True
for k, v in selection_doc.items():
if k.startswith('$'): # ignore stuff like $schema
continue
if k not in specifications_doc.keys():
meets_criteria = False
else:
spec = specifications_doc[k]
if isinstance(spec, list) and isinstance(v, list):
for spec_v in spec:
if spec_v not in v:
meets_criteria = False
elif isinstance(v, list):
if spec not in v:
meets_criteria = False
else:
if spec != v:
meets_criteria = False
logger.debug("specs %s matches selection %s: %s" % (specifications_doc, selection_doc, meets_criteria))
return meets_criteria
def get_observation_task_specification_with_check_for_calibrator(subtask):
"""
Retrieve the observation task blueprint specifications_doc from the given subtask object
If the Task is a calibrator then the related Target Observation specification should be returned
:param: subtask object
:return: task_spec: the specifications_doc of the blue print task which is allways a target observation
"""
if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
# Calibrator requires related Target Task Observation for some specifications
target_task_blueprint, _ = get_related_target_observation_task_blueprint(subtask.task_blueprint)
if target_task_blueprint is None:
raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk)
task_spec = target_task_blueprint.specifications_doc
logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s",
subtask.task_blueprint.id, target_task_blueprint.id)
else:
task_spec = subtask.task_blueprint.specifications_doc
return task_spec
def cancel_subtask(subtask: Subtask) -> Subtask:
'''Generic cancelling method for subtasks. Calls the appropriate cancel method based on the subtask's type.'''
if subtask.state.value == SubtaskState.Choices.CANCELLED.value:
logger.info("cancel_subtask: subtask id=%s type=%s is already cancelled.", subtask.id, subtask.specifications_template.type.value)
return subtask
# check prerequisites, blocks illegal state transitions, like from any -ING state.
check_prerequities_for_cancelling(subtask)
try:
if subtask.state.value == SubtaskState.Choices.SCHEDULED.value:
# the scheduled subtask still claims a timeslot and future resources.
# unschedule the subtask, and make sure the post_state is CANCELLING and not DEFINED in order to not trigger any (dynamic) schedulers.
logger.info("Unscheduling subtask subtask id=%s type=%s before it can be cancelled...", subtask.id, subtask.specifications_template.type.value)
unschedule_subtask(subtask, post_state=SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLING.value))
else:
# no need to unschedule, but we may need to kill the running subtask...
needs_to_kill_subtask = subtask.state.value in (SubtaskState.Choices.QUEUED.value, SubtaskState.Choices.STARTED.value)
# set the state to CANCELLING
logger.info("Cancelling subtask id=%s type=%s state=%s", subtask.id, subtask.specifications_template.type.value, subtask.state.value)
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLING.value)
subtask.save()
if needs_to_kill_subtask:
# kill the queued/started subtask, depending on type
if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
kill_observation_subtask(subtask)
elif subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
kill_pipeline_subtask(subtask)
else:
raise SubtaskCancellingException("Cannot kill subtask id=%s of type=%s" % (subtask.id, subtask.specifications_template.type.value))
# finished cancelling, set to CANCELLED
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLED.value)
subtask.save()
logger.info("Cancelled subtask id=%s type=%s state=%s", subtask.id, subtask.specifications_template.type.value, subtask.state.value)
except Exception as e:
logger.error("Error while cancelling subtask id=%s type=%s state=%s '%s'", subtask.id, subtask.specifications_template.type.value, subtask.state.value, e)
subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
subtask.error_reason = f'{e}'
subtask.save()
if isinstance(e, SubtaskCancellingException):
# we intentionally raised the SubtaskCancellingException, so re-raise it and let the caller handle it
raise
return subtask
def cancel_subtask_and_successors(subtask: Subtask) -> Subtask:
'''cancel this given subtask and all the downstream successor subtasks (recurses, following the successors of successors of successors of... etc'''
cancel_subtask(subtask)
cancel_subtask_successors(subtask)
return subtask
def cancel_subtask_successors(subtask: Subtask):
'''cancel all the downstream successor subtasks (recurses, following the successors of successors of successors of... etc'''
for successor in subtask.successors:
cancel_subtask_and_successors(successor)
def kill_observation_subtask(subtask: Subtask) -> bool:
'''Kill the observation subtask. Return True if actually killed.'''
if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
with ObservationControlRPCClient.create() as obs_control_client:
return obs_control_client.abort_observation(subtask.id)['aborted']
return False
def kill_pipeline_subtask(subtask: Subtask) -> bool:
'''Kill the pipeline subtask. Return True if actually killed.'''
if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
with PipelineControlRPCClient.create() as pipeline_control_client:
return pipeline_control_client.cancel_pipeline(subtask.id)['canceled']
return False