Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
subtasks.py 136.65 KiB
import logging
import typing

logger = logging.getLogger(__name__)

from copy import deepcopy
from functools import cmp_to_key
from collections.abc import Iterable
from math import ceil
from lofar.common.ring_coordinates import RingCoordinates
from os.path import splitext

from lofar.common.datetimeutils import formatDatetime, round_to_second_precision
from lofar.common import isProductionEnvironment
from lofar.common.lcu_utils import get_current_stations
from lofar.stationmodel.antennafields import antenna_fields

from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException, SubtaskException

from datetime import datetime, timedelta
from lofar.common.datetimeutils import parseDatetime
from lofar.sas.tmss.tmss.tmssapp.models import *
from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict
from lofar.common.cobaltblocksize import CorrelatorSettings, StokesSettings, BlockConstraints, BlockSize
from lofar.sas.resourceassignment.resourceassigner.schedulers import ScheduleException
from lofar.mac.observation_control_rpc import ObservationControlRPCClient
from lofar.mac.pipeline_control_rpc import PipelineControlRPCClient

from lofar.sas.tmss.tmss.tmssapp.conversions import antennafields_for_antennaset_and_station
from lofar.sas.tmss.tmss.exceptions import TMSSException
from django.db import transaction

# ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ====

def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool:
    try:
        task_blueprint.validate_json_against_this_templates_schema('specifications_doc', 'specifications_template')
    except Exception as e:
        logger.error("Failed to check_prerequities_for_subtask_creation: %s", e)
        raise SubtaskCreationException from e

    return True

def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
    '''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.'''
    with transaction.atomic():
        logger.debug("creating subtask(s) from task_blueprint id=%s name='%s' type='%s' scheduling_unit_blueprint id=%s",
            task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.type.value,
            task_blueprint.scheduling_unit_blueprint.id)
        check_prerequities_for_subtask_creation(task_blueprint)

        subtasks = []

        # recurse over predecessors, so that all dependencies in predecessor subtasks can be met.
        for predecessor in task_blueprint.predecessors.all():
            subtasks.extend(create_subtasks_from_task_blueprint(predecessor))

        if task_blueprint.subtasks.count() > 0:
            logger.debug("skipping creation of subtasks because they already exist for task_blueprint id=%s, name='%s', task_template_name='%s'",
                         task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.name)
            return subtasks

        # fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint
        generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint,
                                                     create_qafile_subtask_from_task_blueprint,
                                                     create_qaplots_subtask_from_task_blueprint],
                              'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint],
                              'pulsar pipeline': [create_pulsar_pipeline_subtask_from_task_blueprint],
                              'ingest': [create_ingest_subtask_from_task_blueprint],
                              'cleanup': [create_cleanup_subtask_from_task_blueprint]}
        generators_mapping['calibrator observation'] = generators_mapping['target observation']
        generators_mapping['parallel calibrator target observation'] = generators_mapping['target observation']
        generators_mapping['beamforming observation'] = [create_observation_control_subtask_from_task_blueprint]

        template_name = task_blueprint.specifications_template.name
        if  template_name in generators_mapping:
            generators = generators_mapping[template_name]
            for generator in generators:
                try:
                    # try to create the subtask, allow exception to bubble upwards so the creation transaction can be rolled back upon error.
                    subtask = generator(task_blueprint)
                    if subtask is not None:
                        logger.info("created subtask id=%s type='%s' from task_blueprint id=%s name='%s' type='%s' scheduling_unit_blueprint id=%s",
                                    subtask.id, subtask.specifications_template.type.value,
                                    task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.type.value,
                                    task_blueprint.scheduling_unit_blueprint.id)
                        subtasks.append(subtask)
                except Exception as e:
                    logger.exception(e)
                    raise SubtaskCreationException('Cannot create subtasks for task id=%s for its schema name=\'%s\' in generator \'%s\'' % (task_blueprint.pk, template_name, generator.__name__)) from e
            return subtasks
        else:
            logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
            raise SubtaskCreationException('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))


def _filter_subbands(obs_subbands: list, selection: dict) -> [int]:
    from itertools import groupby, count
    if not type(selection) == dict or not selection.get('method', None):
        raise SubtaskCreationException('Did not get a valid subband selection. Expected dict with "method" but got %s' % selection)
    if selection['method'] == 'copy':
        return obs_subbands
    elif selection['method'] == 'select subset':
        return list(set(obs_subbands) & set(selection['list']))  # intersection
    elif selection['method'] == 'largest continuous subset':
        c = count()
        return max((list(g) for _, g in groupby(obs_subbands, lambda x: x - next(c))), key=len)

    raise SubtaskCreationException('Did not get a valid subband selection: got %s' % selection["method"])


def _add_pointings(pointing_a, pointing_b):
    if pointing_a['direction_type'] != pointing_b['direction_type']:
        raise SubtaskCreationException(
            "Cannot add pointings because direction types differ pointing_a=%s; pointing_b=%s" % (pointing_a, pointing_b))
    pointing = {"direction_type": pointing_a['direction_type']}
    for angle in ['angle1', 'angle2']:
        pointing[angle] = pointing_a.get(angle, 0.0) + pointing_b.get(angle, 0.0)
    return pointing


def _generate_tab_ring_pointings(pointing, tab_rings) -> [dict]:

    if pointing['direction_type'] != 'J2000':
        raise SubtaskCreationException('Tab rings are not supported for direction_type=%s (use J2000 or specify TABs specifically)' % pointing['direction_type'])

    # Generate relative pointings according to tab rings spec
    # Note: Not sure what the center arg resembles here, because the rings don't seem to be formed around the given coordinates.
    #  Seems to be only used to do some correction (morph the grid properly towards the NCP, according to Jan David).
    coordinates = RingCoordinates(numrings=tab_rings['count'],
                                  width=tab_rings['width'],
                                  center=(pointing['angle1'], pointing['angle2']),
                                  dirtype=pointing['direction_type']).coordinates()
    relative_pointings = [{'angle1': angle1, 'angle2': angle2, 'direction_type': pointing['direction_type']} for angle1, angle2 in coordinates]

    # add ring coordinates to main pointing to get absolute TAB pointings and return them
    tab_pointings = [_add_pointings(pointing, relative_pointing) for relative_pointing in relative_pointings]
    return tab_pointings


def _get_related_target_sap_by_name(task_blueprint, sap_name):
    # TODO: If we start using beamforming observations in parallel with target imaging observations, then we need to search for saps in the target imaging obs spec.
    # See git history for an initial implementation.
    for target_sap in task_blueprint.specifications_doc['SAPs']:
        if target_sap['name'] == sap_name:
            return target_sap
    raise SubtaskCreationException("Cannot create beamformer subtask from task id=%s because it does not contain target SAP with name=%s in blueprint %s" % (task_blueprint.id, sap_name, task_blueprint.specifications_doc))

def _merge_subtask_beamformer_pipeline_specs(spec_a: dict, spec_b: dict) -> dict:
    """ Merge beamformer pipelines if they can be run in a single pipeline.

        In/out: specifications as described in subtask_spec['COBALT']['tab_pipelines'].

        Throws a ValueError if the specifications could not be merged.
    """

    if spec_a['stations'] != spec_b['stations']:
        raise ValueError("Cannot merge subtask beamformer pipelines with different station sets: %s vs %s" % (spec_a['stations'], spec_b['stations']))

    # whether we merged anything
    merged = False

    if 'coherent' in spec_a and 'coherent' in spec_b:
        # Could be implemented later if specs are sufficiently similar
        raise ValueError("Cannot merge 2 coherent beamformer pipelines")

    if 'incoherent' in spec_a and 'incoherent' in spec_b:
        # Could be implemented later if specs are sufficiently similar
        raise ValueError("Cannot merge 2 incoherent beamformer pipelines")

    """
      Coherent Stokes and Incoherent Stokes pipelines can be combined, if:

      - they process the same input stations,
      - they process the same subbands per SAP
    """

    if 'incoherent' in spec_a and 'coherent' in spec_b:
        # swap them, fall through to next if check for the reverse condition
        spec_a, spec_b = spec_b, spec_a

    if 'coherent' in spec_a and 'incoherent' in spec_b:
        merged = True

        # merge into spec_a
        merged_spec = deepcopy(spec_a)
        merged_spec['incoherent'] = spec_b['incoherent']

        for SAP_in_b in spec_b['SAPs']:
            # find all SAPs in a that have the name of this SAP in b. Should be either 0 or 1 as names should be unique.
            SAP_in_a = [s for s in merged_spec['SAPs'] if s['name'] == SAP_in_b['name']]
            assert 0 <= len(SAP_in_a) <= 1, "We should find only a single occurance of a SAP name in the beamformer specification"

            if not SAP_in_a:
                # spec_b beamforms a SAP that is not yet beamformed in spec_a, so we can just add it.
                merged_spec['SAPs'].append(SAP_in_b)
            else:
                # spec_a and spec_b both beamform this SAP, so we need to merge them

                # we should find only one, anyway
                SAP_in_a = SAP_in_a[0]

                if SAP_in_a['subbands'] != SAP_in_b['subbands']:
                    raise ValueError("Cannot merge beamformer pipelines that beamform different subbands for the same SAP")

                # settings are compatible -- merge TABs
                SAP_in_a['tabs'].extend(SAP_in_b['tabs'])

    if not merged:
        raise ValueError("Could not merge beamformer pipelines")

    return merged_spec


def _compress_subtask_beamformer_pipelines(pipeline_spec: list) -> list:
    """ Merge beamformer pipelines if they can be run in a single pipeline.

        Return a new list of pipeline specifications.

        In/out: subtask_spec['COBALT']['tab_pipelines'].
    """

    # resulting specifications
    result_specs = pipeline_spec[:]

    # whether we need to do another round of merging. keep merging until we can't.
    merge_again = True

    # Try to merge every pair of pipelines
    while merge_again:
        merge_again = False

        try:
            for a_idx, spec_a in enumerate(result_specs):
                # Try to merge each remaining spec into spec_a
                for spec_b in result_specs[a_idx+1:]:
                    try:
                        new_spec_a = _merge_subtask_beamformer_pipeline_specs(spec_a, spec_b)

                        # merged spec_b into spec_a, so remove original specs and add merged one
                        result_specs.remove(spec_a)
                        result_specs.remove(spec_b)
                        result_specs.append(new_spec_a)

                        # restart outer loop, since our counters are all off now
                        merge_again = True
                        raise StopIteration

                    except ValueError:
                        # couldn't merge spec_a and spec_b
                        pass
        except StopIteration:
            pass

    return result_specs

def _get_specifications_with_defaults(task_blueprint: TaskBlueprint):
    return task_blueprint.specifications_template.add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc)

def _get_target_and_or_calibrator_specification(task_blueprint: TaskBlueprint) -> (dict, dict):
    """ Return the (target_specifications, calibrator_specifications), or None instead of either
        if the calibrator or target cannot be found. 

        Defaults have been added to any specifications returned. """

    if task_blueprint.specifications_template.name == 'parallel calibrator target observation':
        specs = _get_specifications_with_defaults(task_blueprint)
        return (specs['target'], specs['calibrator'])

    if task_blueprint.specifications_template.name in ('target observation', 'beamforming observation'):
        cal_task, _ = _get_related_observation_task_blueprint(task_blueprint, 'calibrator observation')
        return (_get_specifications_with_defaults(task_blueprint), cal_task and _get_specifications_with_defaults(cal_task))

    if task_blueprint.specifications_template.name == 'calibrator observation':
        target_task, _ = _get_related_observation_task_blueprint(task_blueprint, 'target observation')
        return (target_task and _get_specifications_with_defaults(target_task), _get_specifications_with_defaults(task_blueprint))

    return (None, None)


def create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint: TaskBlueprint) -> (dict, SubtaskTemplate):
    """
    Create a valid observation subtask specification ('observation control' SubtaskTemplate schema) based on the task_blueprint's settings
    """
    # check if task_blueprint has an observation-like specification
    if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation', 'beamforming observation', 'parallel calibrator target observation']:
        raise SubtaskCreationException("Cannot create observation subtask specifications from task_blueprint id=%s with template name='%s'" % (
                                       task_blueprint.id, task_blueprint.specifications_template.name))


    # start with an observation subtask specification with all the defaults and the right structure according to the schema
    subtask_template = SubtaskTemplate.objects.get(name='observation control')
    subtask_spec = subtask_template.get_default_json_document_for_schema()


    # maintain a minimum version of COBALT that is required to execute these specifications
    min_COBALT_version = 1

    # wipe the default pointings, these should come from the task_spec
    subtask_spec['stations'].pop('analog_pointing', None)
    subtask_spec['stations']['digital_pointings'] = []

    # now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec
    task_spec = _get_specifications_with_defaults(task_blueprint)
    # extract the individual target/calibrator specs if possible
    target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(task_blueprint)

    # block size calculator will need to be fed all the relevant specs
    cobalt_calculator_constraints = BlockConstraints(None, [], [])

    # correlator
    subtask_spec["COBALT"]["correlator"] = { "enabled": False }

    if target_task_spec and "correlator" in target_task_spec:
        subtask_spec["COBALT"]["correlator"]["enabled"] = True
        subtask_spec["COBALT"]["correlator"]["channels_per_subband"]  = target_task_spec["correlator"]["channels_per_subband"]

        corr = CorrelatorSettings()
        corr.nrChannelsPerSubband = target_task_spec["correlator"]["channels_per_subband"]
        corr.integrationTime      = target_task_spec["correlator"]["integration_time"]
        cobalt_calculator_constraints.correlator = corr

    # The beamformer obs has a beamformer-specific specification block.
    # The rest of it's specs is the same as in a target observation.
    # So... copy the beamformer specs first, then loop over the shared specs...
    if 'beamforming' in task_blueprint.specifications_template.name.lower():
        # start with empty tab/flyseye pipelines, fill them below from task spec
        subtask_spec['COBALT']['beamformer']['tab_pipelines'] = []
        subtask_spec['COBALT']['beamformer']['flyseye_pipelines'] = []

        for task_beamformer_spec in task_spec['beamformers']:
            # the wanted/specified beamformer station list is the intersecion of the observation station list with the requested beamformer stations.
            # at the moment of scheduling this list is re-evaluated for available stations, and the max_nr_missing is evaluated as well.
            # this intersection is not needed per se, because COBALT plays nicely and does similar filtering for stations that are actually available,
            # but hey, if cobalt can play nice, then so can we! :)
            # So, let's come up with the correct complete beamforming-stations-list, and ask cobalt to explicitely uses these.

            # combine all stations in the groups...
            beamformer_station_list = sum([station_group["stations"] for station_group in task_beamformer_spec["station_groups"]], [])

            # make intersection with observing-stations...
            beamformer_station_set = set(beamformer_station_list).intersection(set(subtask_spec['stations']['station_list']))

            # make it a nice readable sorted list.
            beamformer_station_list = sorted(list(beamformer_station_list))    # todo: should this use beamformer_station_set instead?
            # use the beamformer_station_list below for the tab pipeline and/or flys eye

            for stokes_type in ["coherent", "incoherent"]:
                if stokes_type not in task_beamformer_spec or not task_beamformer_spec[stokes_type]["SAPs"]:
                    # nothing specified for this stokes type
                    continue

                # SAPs
                subtask_saps = []
                for sap in task_beamformer_spec[stokes_type]["SAPs"]:
                    subtask_sap = { "name": sap["name"], "tabs": [] }

                    try:
                        target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name'])
                    except Exception as e:
                        raise ValueError("Cannot find SAP %s in beamformer spec for %s: %s, part of task_beamformer_spec: %s"  % (sap['name'], stokes_type, sap, task_beamformer_spec)) from e

                    if stokes_type == "coherent":
                        # convert manual coherent TABs
                        for tab in sap["tabs"]:
                            subtask_sap["tabs"].append({
                                "coherent": True,
                                # determine absolute tab pointing for subtask by adding relative tab pointing from task to target sap pointing
                                "pointing": tab["pointing"] if not tab.get("relative", False) else _add_pointings(tab['pointing'], target_sap['digital_pointing'])
                            })

                        if "tab_rings" in sap:
                            # convert rings of coherent TABs
                            ring_pointings = _generate_tab_ring_pointings(target_sap["digital_pointing"], sap.pop("tab_rings"))
                            subtask_sap['tabs'] += [{'coherent': True, 'pointing': pointing} for pointing in ring_pointings]
                    else:
                        # an incoherent TAB, there can be only one
                        subtask_sap["tabs"] = [{"coherent": False}]

                    # construct set of subbands to beamform
                    subtask_sap['subbands'] = _filter_subbands(target_sap['subbands'], sap['subbands'])

                    # beamforming a subset of the subbands requires COBALT 2+
                    if subtask_sap['subbands'] != target_sap['subbands']:
                        min_COBALT_version = max(min_COBALT_version, 2)

                    subtask_saps.append(subtask_sap)

                # create a pipeline item and add it to the list
                beamformer_pipeline = {stokes_type: task_beamformer_spec[stokes_type]["settings"],
                                       "stations": beamformer_station_list,
                                       "SAPs": subtask_saps}
                subtask_spec['COBALT']['beamformer']['tab_pipelines'].append(beamformer_pipeline)

                # add constraints for calculator
                ss = StokesSettings()
                ss.nrChannelsPerSubband = task_beamformer_spec[stokes_type]["settings"]["channels_per_subband"]
                ss.timeIntegrationFactor = task_beamformer_spec[stokes_type]["settings"]["time_integration_factor"]
                if stokes_type == "coherent":
                    cobalt_calculator_constraints.coherentStokes.append(ss)
                else:
                    cobalt_calculator_constraints.incoherentStokes.append(ss)

            if 'flys eye' in task_beamformer_spec and task_beamformer_spec['flys eye']['enabled']:
                # add constraints for calculator
                ss = StokesSettings()
                ss.nrChannelsPerSubband = task_beamformer_spec["flys eye"]["settings"]["channels_per_subband"]
                ss.timeIntegrationFactor = task_beamformer_spec["flys eye"]["settings"]["time_integration_factor"]
                cobalt_calculator_constraints.coherentStokes.append(ss)

                flyseye_pipeline = {"coherent": task_beamformer_spec["flys eye"]["settings"],
                                    "stations": beamformer_station_list}
                subtask_spec['COBALT']['beamformer']['flyseye_pipelines'].append(flyseye_pipeline)
                # todo: Clarify if we can add a subbands_selection on the flys eye task spec, to filter down for sap['subbands']
                #  If I got that correctly, specifying subbands is not really supported later down the chain, so whatever we do here gets ignored anyway?
                # for sap in task_spec["SAPs"]:
                    # target_sap = _get_related_target_sap_by_name(task_blueprint, sap['name'])
                    # sap['subbands'] = filter_subbands(...)
                    # if sap['subbands'] == target_sap['subbands']:  # todo: is this really required? pseudo-code in confluence suggests so, but what harm does the list do?
                    #    sap['subbands'] = []

    # merge pipelines that can be run in a single pipeline, f.e. to combine CS+IS into a single pipeline
    subtask_spec['COBALT']['beamformer']['tab_pipelines'] = _compress_subtask_beamformer_pipelines(subtask_spec['COBALT']['beamformer']['tab_pipelines'])

    # if we have more than 1 pipeline, we need COBALT 2+
    if (len(subtask_spec['COBALT']['beamformer']['tab_pipelines']) + len(subtask_spec['COBALT']['beamformer']['flyseye_pipelines'])) > 1:
        min_COBALT_version = max(min_COBALT_version, 2)

    if target_task_spec and ('target' in task_blueprint.specifications_template.name.lower() or 'beamforming' in task_blueprint.specifications_template.name.lower()):
        # copy/convert the analoge/digital_pointings only for non-calibrator observations (the calibrator has its own pointing)
        # for parallel calibrator&target observations the calibrator pointing is added later as a final sap.
        for sap in target_task_spec.get("SAPs", []):
            subtask_spec['stations']['digital_pointings'].append(
                {"name": sap["name"],
                 "target": sap["target"],
                 "pointing": {"direction_type": sap["digital_pointing"]["direction_type"],
                              "angle1": sap["digital_pointing"]["angle1"],
                              "angle2": sap["digital_pointing"]["angle2"]},
                 "subbands": sap["subbands"]
                 })

        if "tile_beam" in target_task_spec:
            subtask_spec['stations']['analog_pointing'] = { "direction_type": target_task_spec["tile_beam"]["direction_type"],
                                                            "angle1": target_task_spec["tile_beam"]["angle1"],
                                                            "angle2": target_task_spec["tile_beam"]["angle2"] }



    # The calibrator has a minimal calibration-specific specification subset.
    # The rest of it's specs are 'shared' with the target observation.
    # So... copy the calibrator specs first, then loop over the shared target/calibrator specs...
    if calibrator_task_spec and 'calibrator' in task_blueprint.specifications_template.name.lower():
        # Calibrator requires related Target Task Observation for some specifications
        target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint)
        if target_task_blueprint is None:
            raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (task_blueprint.id, task_blueprint.specifications_template.name))

        if calibrator_task_spec.get('autoselect', True):
            logger.info("auto-selecting calibrator target based on elevation of target observation...")
            # Get related Target Observation Task
            if "tile_beam" in target_task_spec:
                subtask_spec['stations']['analog_pointing'] = {
                    "direction_type": target_task_spec["tile_beam"]["direction_type"],
                    "angle1": target_task_spec["tile_beam"]["angle1"],
                    "angle2": target_task_spec["tile_beam"]["angle2"]}
            else:
                raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint "
                                               "id=%s in auto-select mode, because the related target observation "
                                               "task_blueprint id=%s has no tile beam pointing defined" % (
                                                task_blueprint.id, target_task_blueprint.id))
        else:
            subtask_spec['stations']['analog_pointing'] = {"direction_type": calibrator_task_spec["pointing"]["direction_type"],
                                                           "angle1": calibrator_task_spec["pointing"]["angle1"],
                                                           "angle2": calibrator_task_spec["pointing"]["angle2"]}

        # for the calibrator, the subbands are the union of the subbands of the targetobs
        subbands = []
        for SAP in target_task_spec['SAPs']:
            subbands.extend(SAP['subbands'])
        subbands = sorted(list(set(subbands)))

        # for a plain calibrator, the digital pointing is equal to the analog pointing
        subtask_spec['stations']['digital_pointings'].append({'name': calibrator_task_spec['name'],
                                                              'subbands': subbands,
                                                              'pointing': subtask_spec['stations']['analog_pointing']})

    # At this moment of subtask creation we known which stations we *want* from the task_spec
    # But we do not know yet which stations are available at the moment of observing.
    # So, we decided that we set the subtask station_list as the union of all stations in all specified groups.
    # This way, the user can see which stations are (likely) to be used.
    # At the moment of scheduling of this subtask, then station_list is re-evaluated, and the max_nr_missing per group is validated.
    subtask_spec['stations']['station_list'] = []
    if target_task_spec and "station_groups" in target_task_spec:
        for station_group in target_task_spec["station_groups"]:
            subtask_spec['stations']['station_list'].extend(station_group["stations"])
        # make list have unique items
        subtask_spec['stations']['station_list'] = sorted(list(set(subtask_spec['stations']['station_list'])))

    if not subtask_spec['stations']['station_list']:
        raise SubtaskCreationException("Cannot create observation subtask specifications for task_blueprint id=%s. No stations are defined." % (task_blueprint.id,))


    subtask_spec['stations']["antenna_set"] = target_task_spec["antenna_set"]
    subtask_spec['stations']["filter"] = target_task_spec["filter"]

    # Calculate block sizes and feed those to the spec
    cobalt_calculator = BlockSize(constraints=cobalt_calculator_constraints)
    subtask_spec["COBALT"]["blocksize"] = cobalt_calculator.blockSize

    if "correlator" in target_task_spec:
        subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = cobalt_calculator.nrBlocks
        subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = cobalt_calculator.nrSubblocks

    subtask_spec["COBALT"]["version"] = min_COBALT_version

    subtask_spec["QA"]["inspection_plots"] = target_task_spec.get("QA", {"inspection_plots": "none"}).get("inspection_plots", "none")

    # make sure that the subtask_spec is valid conform the schema
    validate_json_against_schema(subtask_spec, subtask_template.schema)

    return subtask_spec, subtask_template


def get_stations_in_group(station_group_name: str) -> []:
    '''Get a list of station names in the given station_group.
    A lookup is performed in the RADB, in the virtual instrument table'''

    # TODO Make names RA and TMSS spec equal: 'NL' or 'DUTCH'?
    if station_group_name == "DUTCH":
        station_group_name = "NL"

    #International required is by defintion DE601 or DE605, take 601 for now
    # TODO check with RA the availability of both stations
    if station_group_name == "INTERNATIONAL_REQUIRED":
        return ["DE601"]

    with RADBRPC.create() as radbrpc:
        resource_group_memberships = radbrpc.getResourceGroupMemberships()['groups']
        station_resource_group = next(rg for rg in resource_group_memberships.values()
                                      if (rg['resource_group_type'] == 'station_group' or rg['resource_group_type'] == 'virtual') and rg['resource_group_name'] == station_group_name)
        station_names = set(resource_group_memberships[rg_id]['resource_group_name'] for rg_id in station_resource_group['child_ids']
                            if resource_group_memberships[rg_id]['resource_group_type'] == 'station')

        # HACK, RS408 should be removed from the RADB
        if 'RS408' in station_names:
            station_names.remove('RS408')

        # HACK remove TEST1 from station list otherwise validate will fail
        if 'TEST1' in station_names:
            station_names.remove('TEST1')

        return sorted(list(station_names))


def get_related_calibrator_observation_task_blueprint(target_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement):
    """
    get the related calibrator observation task_blueprint and the relative placement for the given target task_blueprint
    if nothing found return None
    """
    if 'target' not in target_task_blueprint.specifications_template.name.lower():
        raise ValueError("Cannot get a related calibrator observation task_blueprint for non-target task_blueprint id=%s template_name='%s'",
                        target_task_blueprint.id, target_task_blueprint.specifications_template.name)

    return _get_related_observation_task_blueprint(target_task_blueprint, ['calibrator observation', 'parallel calibrator target observation'])


def get_related_target_observation_task_blueprint(calibrator_or_beamformer_task_blueprint: TaskBlueprint) -> (TaskBlueprint, SchedulingRelationPlacement):
    """
    get the related target observation task_blueprint and the relative placement for the given calibrator or beamformer task_blueprint
    if nothing found return None
    """
    if 'calibrator' not in calibrator_or_beamformer_task_blueprint.specifications_template.name.lower() and \
       'beamformer' not in calibrator_or_beamformer_task_blueprint.specifications_template.name.lower():
        raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator/beamformer task_blueprint id=%s template_name='%s'",
                        calibrator_or_beamformer_task_blueprint.id, calibrator_or_beamformer_task_blueprint.specifications_template.name)

    return _get_related_observation_task_blueprint(calibrator_or_beamformer_task_blueprint, ['target observation', 'parallel calibrator target observation'])


def _get_related_observation_task_blueprint(task_blueprint: TaskBlueprint, related_template_name: typing.Union[str, list, tuple]) -> (TaskBlueprint, SchedulingRelationPlacement):
    related_template_names = [related_template_name] if isinstance(related_template_name, str) else related_template_name

    try:
        return next((relation.second, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(first=task_blueprint).all()
                    if relation.second is not None and relation.second.specifications_template.name.lower() in related_template_names)
    except StopIteration:
        try:
            return next((relation.first, relation.placement) for relation in TaskSchedulingRelationBlueprint.objects.filter(second=task_blueprint).all()
                        if relation.first is not None and relation.first.specifications_template.name.lower() in related_template_names)
        except StopIteration:
            if task_blueprint.specifications_template.name.lower() in related_template_names:
                # the 'related' task blueprint we are looking for is itself.
                return task_blueprint, None
            else:
                logger.debug("No related %s task_blueprint found for task_blueprint id=%d", related_template_names, task_blueprint.id)

    return None, None


def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    """
    Create an observation control subtask .
    This method implements "Instantiate subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    """
    # step 0: check pre-requisites
    check_prerequities_for_subtask_creation(task_blueprint)

    # step 0a: check specification. Json should be valid according to schema, but needs some additional sanity checks
    specifications_doc, subtask_template = create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint)
    # sanity check: total number of subbands should not exceed 488
    all_subbands = set(sum([dp['subbands'] for dp in specifications_doc['stations']['digital_pointings']], []))
    if len(all_subbands) > 488:
        raise SubtaskCreationException("Total number of subbands %d exceeds the maximum of 488 for task_blueprint id=%s" % (len(all_subbands), task_blueprint.id))

    # step 1: create subtask in defining state
    cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
    subtask_data = { "scheduled_on_sky_start_time": None,
                     "scheduled_on_sky_stop_time": None,
                     "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                     "specifications_doc": specifications_doc,
                     "task_blueprint": task_blueprint,
                     "specifications_template": subtask_template,
                     "tags": [],
                     "primary": True,
                     "cluster": Cluster.objects.get(name=cluster_name)
                     }

    subtask = Subtask.objects.create(**subtask_data)

    # step 2: create and link subtask input/output
    # an observation has no input, it just produces output data
    subtask_output = SubtaskOutput.objects.create(subtask=subtask)

    # step 3: set state to DEFINED
    subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
    subtask.save()
    return subtask


def create_qafile_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
    if not observation_subtasks:
        raise SubtaskCreationException("Cannot create %s subtask for task_blueprint id=%d because it has no observation subtask(s)" % (
            SubtaskType.Choices.QA_FILES.value, task_blueprint.pk))

    observation_subtask = observation_subtasks[-1] # TODO: decide what to do when there are multiple observation subtasks?
    return create_qafile_subtask_from_observation_subtask(observation_subtask)


def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) -> Subtask:
    ''' Create a subtask to convert the observation output to a QA h5 file.
    This method implements "Instantiate subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_subtask_creation(observation_subtask.task_blueprint)

    if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
        raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
            SubtaskType.Choices.QA_FILES.value, observation_subtask.pk,
            observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value))

    obs_task_spec = get_observation_task_specification_with_check_for_calibrator(observation_subtask)
    obs_task_qafile_spec = obs_task_spec.get("QA", {}).get("file_conversion", {})

    if not obs_task_qafile_spec.get("enabled", False):
        logger.debug("Skipping creation of qafile_subtask because QA.file_conversion is not enabled")
        return None

    # step 1: create subtask in defining state, with filled-in subtask_template
    qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion")
    qafile_subtask_spec = qafile_subtask_template.get_default_json_document_for_schema()
    qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands")
    qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps")
    validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema)

    qafile_subtask_data = { "scheduled_on_sky_start_time": None,
                            "scheduled_on_sky_stop_time": None,
                            "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                            "task_blueprint": observation_subtask.task_blueprint,
                            "specifications_template": qafile_subtask_template,
                            "specifications_doc": qafile_subtask_spec,
                            "primary": False,
                            "cluster": observation_subtask.cluster}
    qafile_subtask = Subtask.objects.create(**qafile_subtask_data)

    # step 2: create and link subtask input/output
    selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
    selection_doc = selection_template.get_default_json_document_for_schema()

    for obs_out in observation_subtask.outputs.all():
        qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask,
                                                           producer=obs_out,  # TODO: determine proper producer based on spec in task_relation_blueprint
                                                           selection_doc=selection_doc,
                                                           selection_template=selection_template)

    qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask)

    # step 3: set state to DEFINED
    qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
    qafile_subtask.save()

    # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qa_file_subtask
    return qafile_subtask


def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    if 'calibrator' in task_blueprint.specifications_template.name.lower():
        # Calibrator requires related Target Task Observation for some specifications
        target_task_blueprint, _ = get_related_target_observation_task_blueprint(task_blueprint)
        if target_task_blueprint is None:
            raise SubtaskCreationException("Cannot retrieve specifications for task id=%d because no related target observation is found " % task.pk)
    else:
        target_task_blueprint = task_blueprint

    if not target_task_blueprint.specifications_doc.get("QA", {}).get("file_conversion", {}).get("enabled", False):
        logger.debug("Skipping creation of qaplots_subtask because QA.file_conversion is not enabled")
        return None

    qafile_subtasks = [st for st in task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value]
    if qafile_subtasks:
        qafile_subtask = qafile_subtasks[0] # TODO: decide what to do when there are multiple qafile subtasks?
        return create_qaplots_subtask_from_qafile_subtask(qafile_subtask)
    else:
        raise SubtaskCreationException('Cannot create QA plotting subtask for task id=%s because no predecessor QA file conversion subtask exists.' % (task_blueprint.pk, ))


def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subtask:
    ''' Create a subtask to create inspection plots from the QA h5 file.
    This method implements "Instantiate subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint)

    if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
        raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
            SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk,
            qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))

    obs_task_spec = get_observation_task_specification_with_check_for_calibrator(qafile_subtask)
    obs_task_qaplots_spec = obs_task_spec.get("QA", {}).get("plots", {})

    if not obs_task_qaplots_spec.get("enabled", False):
        logger.debug("Skipping creation of qaplots_subtask because QA.plots is not enabled")
        return None

    # step 1: create subtask in defining state, with filled-in subtask_template
    qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots")
    qaplots_subtask_spec_doc = qaplots_subtask_template.get_default_json_document_for_schema()
    qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation")
    qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation")
    validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema)

    qaplots_subtask_data = { "scheduled_on_sky_start_time": None,
                             "scheduled_on_sky_stop_time": None,
                             "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                             "task_blueprint": qafile_subtask.task_blueprint,
                             "specifications_template": qaplots_subtask_template,
                             "specifications_doc": qaplots_subtask_spec_doc,
                             "primary": False,
                             "cluster": qafile_subtask.cluster}
    qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data)

    # step 2: create and link subtask input/output
    selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
    selection_doc = selection_template.get_default_json_document_for_schema()
    qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask,
                                                        producer=qafile_subtask.outputs.first(),
                                                        selection_doc=selection_doc,
                                                        selection_template=selection_template)

    qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask)

    # step 3: set state to DEFINED
    qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
    qaplots_subtask.save()

    # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
    return qaplots_subtask


def create_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint, subtask_template_name: str, generate_subtask_specs_from_task_spec_func) -> Subtask:
    ''' Create a subtask to for the preprocessing pipeline.
    This method implements "Instantiate subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_subtask_creation(task_blueprint)
    # TODO: go more elegant lookup of predecessor observation task
    # TODO: do not require the input to come from an observation
    observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all()
                                                                                         if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)]
    if not observation_predecessor_tasks:
        raise SubtaskCreationException("Cannot create a subtask for task_blueprint id=%s because it is not connected "
                                       "to an observation predecessor (sub)task." % task_blueprint.pk)

    # step 1: create subtask in defining state, with filled-in subtask_template
    subtask_template = SubtaskTemplate.objects.get(name=subtask_template_name)
    default_subtask_specs = subtask_template.get_default_json_document_for_schema()
    task_specs_with_defaults = task_blueprint.specifications_template.add_defaults_to_json_object_for_schema(task_blueprint.specifications_doc)
    subtask_specs = generate_subtask_specs_from_task_spec_func(task_specs_with_defaults, default_subtask_specs)

    cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
    subtask_data = { "scheduled_on_sky_start_time": None,
                     "scheduled_on_sky_stop_time": None,
                     "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                     "task_blueprint": task_blueprint,
                     "specifications_template": subtask_template,
                     "specifications_doc": subtask_specs,
                     "primary": True,
                     "cluster": Cluster.objects.get(name=cluster_name) }
    subtask = Subtask.objects.create(**subtask_data)

    # step 2: create and link subtask input/output
    for task_relation_blueprint in task_blueprint.produced_by.all():
        producing_task_blueprint = task_relation_blueprint.producer

        # create inputs for all predecessor observation subtask outputs that belong to the producer task of this task relation
        # TODO: more filtering needed?
        predecessor_observation_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
        for predecessor_obs_subtask in predecessor_observation_subtasks:
            for predecessor_subtask_output in predecessor_obs_subtask.outputs.all():
                subtask_input = SubtaskInput.objects.create(subtask=subtask,
                                                            producer=predecessor_subtask_output,
                                                            selection_doc=task_relation_blueprint.selection_doc,
                                                            selection_template=task_relation_blueprint.selection_template)

    subtask_output = SubtaskOutput.objects.create(subtask=subtask)

    # step 3: set state to DEFINED
    subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
    subtask.save()

    # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
    return subtask


def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    return create_pipeline_subtask_from_task_blueprint(task_blueprint, "preprocessing pipeline", _generate_subtask_specs_from_preprocessing_task_specs)


def create_pulsar_pipeline_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    return create_pipeline_subtask_from_task_blueprint(task_blueprint, "pulsar pipeline", _generate_subtask_specs_from_pulsar_pipeline_task_specs)


def create_ingest_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    ''' Create a subtask to for an ingest job
    This method implements "Instantiate subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_subtask_creation(task_blueprint)

    # step 1: create subtask in defining state, with filled-in subtask_template
    subtask_template = SubtaskTemplate.objects.get(name='ingest control')
    default_subtask_specs = subtask_template.get_default_json_document_for_schema()
    subtask_specs = default_subtask_specs  # todo: translate specs from task to subtask once we have non-empty templates
    cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
    subtask_data = {"scheduled_on_sky_start_time": None,
                    "scheduled_on_sky_stop_time": None,
                    "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                    "task_blueprint": task_blueprint,
                    "specifications_template": subtask_template,
                    "specifications_doc": subtask_specs,
                    "primary": True,
                    "cluster": Cluster.objects.get(name=cluster_name)}
    subtask = Subtask.objects.create(**subtask_data)

    # step 2: create and link subtask input
    for task_relation_blueprint in task_blueprint.produced_by.all():
        producing_task_blueprint = task_relation_blueprint.producer

        predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()]
        for predecessor_subtask in predecessor_subtasks:
            for predecessor_subtask_output in predecessor_subtask.outputs.all():
                SubtaskInput.objects.create(subtask=subtask,
                                            producer=predecessor_subtask_output,
                                            selection_doc=task_relation_blueprint.selection_doc,
                                            selection_template=task_relation_blueprint.selection_template)

    # step 3: set state to DEFINED
    subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
    subtask.save()

    # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest
    return subtask


def create_cleanup_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    ''' Create a subtask for a cleanup job
    This method implements "Instantiate subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_subtask_creation(task_blueprint)

    # step 1: create subtask in defining state, with filled-in subtask_template
    subtask_template = SubtaskTemplate.objects.get(name='cleanup')
    subtask_specs = subtask_template.get_default_json_document_for_schema()

    try:
        cluster_name = task_blueprint.specifications_doc['cluster_resources']['where']['cluster']
    except KeyError:
        cluster_name = "CEP4"

    subtask_data = {"scheduled_on_sky_start_time": None,
                    "scheduled_on_sky_stop_time": None,
                    "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                    "task_blueprint": task_blueprint,
                    "specifications_template": subtask_template,
                    "specifications_doc": subtask_specs,
                    "primary": True,
                    "cluster": Cluster.objects.get(name=cluster_name)}
    subtask = Subtask.objects.create(**subtask_data)

    # step 2: create and link subtask input
    # for this cleanup subtask an 'input' seems a bit weird, but it actually makes sense!
    # this cleanup subtask will cleanup the output data of all linked input predecessors.
    for task_relation_blueprint in task_blueprint.produced_by.all():
        producing_task_blueprint = task_relation_blueprint.producer

        predecessor_subtasks = [st for st in producing_task_blueprint.subtasks.filter(specifications_template__type__value__in=(SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value)).order_by('id').all()]
        for predecessor_subtask in predecessor_subtasks:
            for predecessor_subtask_output in predecessor_subtask.outputs.all():
                SubtaskInput.objects.create(subtask=subtask,
                                            producer=predecessor_subtask_output,
                                            selection_doc=task_relation_blueprint.selection_doc,
                                            selection_template=task_relation_blueprint.selection_template)

    # step 3: set state to DEFINED
    subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
    subtask.save()

    # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this ingest
    return subtask


# ==== various schedule* methods to schedule a Subtasks (if possible) ====

def schedule_subtask(subtask: Subtask) -> Subtask:
    '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.'''
    check_prerequities_for_scheduling(subtask)

    if (subtask.scheduled_on_sky_start_time is None or subtask.scheduled_on_sky_start_time < datetime.utcnow()) and subtask.predecessors.count() > 0:
        # this is a successor task that can start now. Auto assign nice start_time just a bit in the future.
        subtask.scheduled_on_sky_start_time = round_to_second_precision(datetime.utcnow()+timedelta(seconds=30))
        subtask.save()

    try:
        if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
            return schedule_pipeline_subtask(subtask)

        if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
            return schedule_observation_subtask(subtask)

        if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value:
            return schedule_qafile_subtask(subtask)

        if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
            return schedule_qaplots_subtask(subtask)

        if subtask.specifications_template.type.value == SubtaskType.Choices.INGEST.value:
            return schedule_ingest_subtask(subtask)

        if subtask.specifications_template.type.value == SubtaskType.Choices.CLEANUP.value:
            return schedule_cleanup_subtask(subtask)

        if subtask.specifications_template.type.value == SubtaskType.Choices.COPY.value:
            return schedule_copy_subtask(subtask)

        raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." %
                                         (subtask.pk, subtask.specifications_template.type.value))
    except Exception as e:
        try:
            logger.exception(e)

            if isinstance(e, SubtaskSchedulingSpecificationException):
                # set the subtask to state 'UNSCHEDULABLE' in case of a specification exception
                subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULABLE.value)
                subtask.save()
            else:
                # set the subtask to state 'ERROR'.
                subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
                subtask.error_reason = f'{e}'
                subtask.save()
        except Exception as e2:
            logger.error(e2)
        finally:
            # ... and re-raise the original exception (wrapped)
            raise SubtaskSchedulingException("Error while scheduling subtask id=%d : %s" % (subtask.pk, e)) from e


def unschedule_subtask(subtask: Subtask, post_state: SubtaskState=None) -> Subtask:
    '''unschedule the given subtask, removing all output dataproducts,
    and setting its state afterwards to the post_state (which is 'defined' if None given).'''
    if subtask.state.value != SubtaskState.Choices.SCHEDULED.value:
        raise SubtaskSchedulingException("Cannot unschedule subtask id=%d because it is not SCHEDULED. Current state=%s" % (subtask.pk, subtask.state.value))

    try:
        subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULING.value)
        subtask.save()

        for output in subtask.outputs.all():
            # delete all output transforms, and the the dataproducts themselves
            DataproductTransform.objects.filter(output__in=output.dataproducts.all()).all().delete()
            output.dataproducts.all().delete()

        if subtask.specifications_template.type.value in (SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value):
            assign_or_unassign_resources(subtask)

        if post_state is None:
            post_state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)

        subtask.state = post_state
        subtask.save()
    except Exception as e:
        try:
            # set the subtask to state 'ERROR'...
            subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.UNSCHEDULABLE.value)
            subtask.save()
        except Exception as e2:
            logger.error(e2)
        finally:
            # ... and re-raise the original exception
            raise

def unschedule_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint):
    '''Convenience method: Unschedule (and return) all scheduled subtasks in the task_blueprint'''
    scheduled_subtasks = list(task_blueprint.subtasks.filter(state__value=SubtaskState.Choices.SCHEDULED.value).all())
    for subtask in scheduled_subtasks:
        unschedule_subtask(subtask)


def schedule_subtask_and_update_successor_start_times(subtask: Subtask) -> Subtask:
    scheduled_subtask = schedule_subtask(subtask)
    shift_successors_until_after_stop_time(scheduled_subtask)
    return scheduled_subtask


def update_subtasks_start_times_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint, start_time: datetime):
    for task_blueprint in scheduling_unit.task_blueprints.all():
        defined_independend_subtasks = task_blueprint.subtasks.filter(state__value='defined').filter(inputs=None).all()
        for subtask in defined_independend_subtasks:
            update_start_time_and_shift_successors_until_after_stop_time(subtask, start_time + subtask.task_blueprint.relative_start_time)


def update_start_time_and_shift_successors_until_after_stop_time(subtask: Subtask, start_time: datetime):
    subtask.scheduled_on_sky_start_time = start_time
    subtask.scheduled_on_sky_stop_time = subtask.scheduled_on_sky_start_time + subtask.specified_duration
    subtask.save()

    shift_successors_until_after_stop_time(subtask)


def shift_successors_until_after_stop_time(subtask: Subtask):
    for successor in subtask.successors:
        # by default, let the successor directly follow this tasks...
        successor_start_time = subtask.scheduled_on_sky_stop_time

        # ... but adjust it if there is a scheduling_relation with an offset.
        # so, check if these successive subtasks have different task_blueprint parents
        # Note: subtasks either have the same parent task(s) or different ones, no partial overlap.
        #  we now need to look up all combinations between subtask and successor blueprints
        #  to find if theres a relation with a time offset between the tasks...
        time_offsets = []
        if subtask.task_blueprint.id != successor.task_blueprint.id:
            relations = (TaskSchedulingRelationBlueprint.objects.filter(first=subtask.task_blueprint, second=successor.task_blueprint) |
                         TaskSchedulingRelationBlueprint.objects.filter(first=successor.task_blueprint, second=subtask.task_blueprint)).all()

            if relations:
                # there should be only one scheduling relation between the tasks
                time_offsets += [relations[0].time_offset]

        if len(time_offsets) > 0:
            successor_start_time += timedelta(seconds=max(time_offsets))

        # update the starttime and recurse to shift the successor successors as well
        update_start_time_and_shift_successors_until_after_stop_time(successor, successor_start_time)


def clear_defined_subtasks_start_stop_times_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint):
    '''set start/stop times of all the subtasks in the scheduling unit to None'''
    for task_blueprint in scheduling_unit.task_blueprints.all():
        defined_subtasks = task_blueprint.subtasks.filter(state__value='defined').all()
        for subtask in defined_subtasks:
            subtask.scheduled_on_sky_start_time = None
            subtask.scheduled_on_sky_stop_time = None
            subtask.save()


def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
    if subtask.state.value != SubtaskState.Choices.DEFINED.value:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value))

    if subtask.obsolete_since is not None:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is marked as obsolete since %s" % (subtask.pk, subtask.obsolete_since))

    for predecessor in subtask.predecessors.all():
        if predecessor.state.value != SubtaskState.Choices.FINISHED.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d because its predecessor id=%s in not FINISHED but state=%s"
                                             % (subtask.pk, predecessor.pk, predecessor.state.value))

    return True


def check_prerequities_for_cancelling(subtask: Subtask) -> bool:
    if not SubtaskAllowedStateTransitions.objects.filter(old_state=subtask.state, new_state__value=SubtaskState.Choices.CANCELLING.value).exists():
        # this check and exception is on top the the database trigger function which block any illegal transition.
        # It's here to signal the intent you that we do not allow cancelling from just any random state.
        raise SubtaskCancellingException("Cannot cancel subtask id=%d because it currently has state=%s" % (subtask.pk, subtask.state.value))

    return True


def _create_ra_specification(_subtask):
    # Should we do something with station list, for 'detecting' conflicts it can be empty
    parset_dict = convert_to_parset_dict(_subtask)
    return { 'tmss_id': _subtask.id,
             'task_type': _subtask.specifications_template.type.value.lower(),
             'task_subtype': parset_dict.get("Observation.processSubtype","").lower(),
             'status': 'prescheduled' if _subtask.state.value == SubtaskState.Choices.SCHEDULING.value else 'approved',
             'starttime': _subtask.scheduled_on_sky_start_time,
             'endtime': _subtask.scheduled_on_sky_stop_time,
             'cluster': _subtask.cluster.name,
             'station_requirements': [],
             'specification': parset_dict }


def assign_or_unassign_resources(subtask: Subtask):
    """
    Assign/unassign resources for subtasks. If resources are not available or they do not meet requirements,
    a SubtaskSchedulingException is raised.
    :param subtask:
    """

    if subtask.specifications_template.type.value not in (SubtaskType.Choices.OBSERVATION.value, SubtaskType.Choices.PIPELINE.value):
        raise SubtaskSchedulingException("Cannot assign/unassign resources for subtask id=%d because it is not an observation/pipeline. type=%s" % (subtask.pk, subtask.specifications_template.type.value))

    if subtask.state.value not in (SubtaskState.Choices.SCHEDULING.value, SubtaskState.Choices.UNSCHEDULING.value):
        raise SubtaskSchedulingException("Cannot assign/unassign resources for subtask id=%d because it is not in SCHEDULING state. "
                                         "Current state=%s" % (subtask.pk, subtask.state.value))

    ra_spec = _create_ra_specification(subtask)
    ra_spec['predecessors'] = []
    for pred in subtask.predecessors.all():
        try:
            ra_spec['predecessors'].append(_create_ra_specification(pred))
        except:
            pass

    # Reason about stations only for observations with a station list
    if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value and \
            "stations" in subtask.specifications_doc and "station_list" in subtask.specifications_doc["stations"]:
        _do_assignment_for_observations_with_required_station_check(subtask, ra_spec)
    else:
        with RARPC.create() as rarpc:
            try:
                rarpc.do_assignment(ra_spec)
            except ScheduleException as e:
                raise SubtaskSchedulingException("Cannot schedule/unschedule subtask id=%d. The required resources are not (fully) available." % subtask.pk)


def _do_assignment_for_observations_with_required_station_check(subtask: Subtask, ra_spec) -> bool:
    """
    Try to detect conflicts and re-assign if possible.
    :param subtask:
    :param ra_spec:
    """
    assigned = False
    with RARPC.create() as rarpc:
        # Try to re-assign till it succeeds. If the requirements are not met, an exception will be raised.
        while not assigned:
            try:
                assigned = rarpc.do_assignment(ra_spec)
            except ScheduleException as e:
                logger.exception(e)
            if not assigned:
                logger.info("Conflicts in assignment detected, checking stations in conflict and re-assign if possible")
                lst_stations_in_conflict = get_stations_in_conflict(subtask.id)
                lst_stations = determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict)
                ra_spec = update_specification(ra_spec, lst_stations)
    return assigned

def get_stations_in_conflict(subtask_id):
    """
    Retrieve a list of station names which RADB 'marked' as a resource in conflict after the last resource assignment
    :param subtask_id: The subtask id
    :return: lst_stations_in_conflict List of station names (string) which are in conflict
    """
    lst_stations_in_conflict = []
    with RADBRPC.create() as radbrpc:
        task_id = radbrpc.getTask(tmss_id=subtask_id)['id']
        conflict_claims = radbrpc.getResourceClaims(task_ids=[task_id], status="conflict", extended=True)
        # Conflicts_claims are resources which are in conflict. Determine the resource names in conflict which are
        # for example  ['CS001rcu', 'CS001chan0', 'CS001bw0', 'CS001chan1', 'CS001bw1']
        resource_names_in_conflict = []
        for resc in conflict_claims:
            # cross check on status in conflict
            if resc["status"] == "conflict":
                resource_names_in_conflict.append(resc["resource_name"])
        logger.info("Resource names with conflict %s" % resource_names_in_conflict)

        # Now get for all the resources in conflict its parent_id. Check for all parent_id which is
        # resource_group_type 'station', this will be the station name in conflict which we need
        resource_group_memberships = radbrpc.getResourceGroupMemberships()
        parent_ids = []
        for resc in resource_group_memberships["resources"].values():
            if resc["resource_name"] in resource_names_in_conflict:
                parent_ids.extend(resc['parent_group_ids'])

        logger.info("Parent group ids with conflict %s" % parent_ids)
        for parent_id in list(set(parent_ids)):
            resc_group_item = resource_group_memberships["groups"][parent_id]
            if resc_group_item["resource_group_type"] == "station":
                lst_stations_in_conflict.append(resc_group_item["resource_group_name"])
        logger.info("Stations in conflict %s", lst_stations_in_conflict)
    return lst_stations_in_conflict


def determine_stations_which_can_be_assigned(subtask, lst_stations_in_conflict):
    """
    Determine which stations can be assigned when conflict of stations are occurred
    Station in conflict should be removed.
    Use the max_nr_missing from the task specifications and the conflicted station list to create a station list
    which should be possible to assign. If the number of max missing in a station group is larger than the station
    to be skipped, then new assignment is not possible so raise an SubtaskSchedulingException with context
    :param subtask:
    :param lst_stations_in_conflict:
    :return: lst_stations: List of station which can be assigned
    """
    # Get the station list from specification and remove the conflict stations
    lst_specified_stations = subtask.specifications_doc["stations"]["station_list"]
    lst_stations = list(set(lst_specified_stations) - set(lst_stations_in_conflict))
    logger.info("Determine stations which can be assigned %s" % lst_stations)

    # Check whether the removing of the conflict station the requirements of max_nr_missing per station_group is
    # still fulfilled. If that is OK then we are done otherwise we will raise an Exception
    stations_groups = get_station_groups(subtask)
    for sg in stations_groups:
        nbr_missing = len(set(sg["stations"]) & set(lst_stations_in_conflict))
        if nbr_missing > sg["max_nr_missing"]:
            raise SubtaskSchedulingException("There are more stations in conflict than the specification is given "
                                             "(%d is larger than %d). The stations that are in conflict are '%s'."
                                             "Please check station of subtask %d " %
                                             (nbr_missing, sg["max_nr_missing"], lst_stations_in_conflict, subtask.pk))
    return lst_stations


def get_station_groups(subtask):
    """
    Retrieve the stations_group specifications of the given subtask
    Need to retrieve it from (related) Target Observation Task
    Note list can be empty (some testcase) which result in no checking max_nr_missing
    :param subtask:
    :return: station_groups which is a list of dict. { station_list, max_nr_missing }
    """
    station_groups = []
    if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
        # Calibrator requires related Target Task Observation for some specifications
        target_task_blueprint, _ = get_related_target_observation_task_blueprint(subtask.task_blueprint)
        if target_task_blueprint is None:
            raise SubtaskException("Cannot retrieve related target observation of task_blueprint %d (subtask %d)" %
                                   (subtask.task_blueprint.id, subtask.id))
        if "station_groups" in target_task_blueprint.specifications_doc.keys():
            station_groups = target_task_blueprint.specifications_doc["station_groups"]
    else:
        if "station_groups" in subtask.task_blueprint.specifications_doc.keys():
            station_groups = subtask.task_blueprint.specifications_doc["station_groups"]
    return station_groups


def update_specification(ra_spec, lst_stations):
    """
    Update the RA Specification dictionary with the correct list of stations
    :param ra_spec: Dictionary of the RA specification
    :param lst_stations: List of stations to 'assign'
    :return: Dictionary with updated RA specification
    """
    if len(lst_stations) == 0:
        raise SubtaskSchedulingSpecificationException("Cannot re-assign resources after conflict for subtask id=%d "
                                                      "because there are no stations left to assign. " % ra_spec["tmss_id"])
    updated_ra_spec = ra_spec
    updated_ra_spec["specification"]["Observation.VirtualInstrument.stationList"] = "[%s]" % ','.join(s for s in lst_stations)
    # ?? should the station_requirements also be updated or just leave that empty '[]' assume for now it can be empty
    return updated_ra_spec


def schedule_qafile_subtask(qafile_subtask: Subtask):
    ''' Schedule the given qafile_subtask (which converts the observation output to a QA h5 file)
    This method should typically be called upon the event of the observation_subtask being finished.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''

    # step 0: check pre-requisites
    check_prerequities_for_scheduling(qafile_subtask)

    if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk,
                                                                                                          qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))

    if len(qafile_subtask.inputs.all()) != 1:
        raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qafile_subtask.id, len(qafile_subtask.inputs)))

    # step 1: set state to SCHEDULING
    qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    qafile_subtask.save()

    # step 2: link input dataproducts
    qa_input = qafile_subtask.inputs.first()
    qa_input.dataproducts.set(qa_input.producer.dataproducts.all())

    # step 3: resource assigner
    # is a no-op for QA

    # step 4: create output dataproducts, and link these to the output
    # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
    if qafile_subtask.outputs.first():
        dp_spec_template = DataproductSpecificationsTemplate.objects.get(name="empty")
        dp_feedvack_template = DataproductFeedbackTemplate.objects.get(name="empty")
        qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%s_QA.h5" % (qa_input.producer.subtask_id, ),
                                                                directory="/data/qa/qa_files",
                                                                dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value),
                                                                datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value),   # todo: is this correct?
                                                                producer=qafile_subtask.outputs.first(),
                                                                specifications_doc=dp_spec_template.get_default_json_document_for_schema(),
                                                                specifications_template=dp_spec_template,
                                                                feedback_doc=dp_feedvack_template.get_default_json_document_for_schema(),
                                                                feedback_template=dp_feedvack_template,
                                                                sap=None  # todo: do we need to point to a SAP here? Of which dataproduct then?
                                                                )
        qafile_subtask_dataproduct.save()

    # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
    qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
    qafile_subtask.save()

    return qafile_subtask


def schedule_qaplots_subtask(qaplots_subtask: Subtask):
    ''' Schedule the given qaplots_subtask (which creates inspection plots from a QA h5 file)
    This method should typically be called upon the event of the qafile_subtask being finished.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''

    # step 0: check pre-requisites
    check_prerequities_for_scheduling(qaplots_subtask)

    if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk,
                                                                                                          qaplots_subtask.specifications_template.type,
                                                                                                          SubtaskType.Choices.QA_PLOTS.value))

    if len(qaplots_subtask.inputs.all()) != 1:
        raise SubtaskSchedulingSpecificationException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs)))

    # step 1: set state to SCHEDULING
    qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    qaplots_subtask.save()

    # step 2: link input dataproducts
    # this should typically be a single input with a single dataproduct (the qa h5 file)
    qa_input = qaplots_subtask.inputs.first()
    qa_input.dataproducts.set(qa_input.producer.dataproducts.all())

    # step 3: resource assigner
    # is a no-op for QA

    # step 4: create output dataproducts, and link these to the output
    # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
    qafile_subtask = qaplots_subtask.predecessors.first()
    obs_subtask = qafile_subtask.predecessors.first()
    dp_spec_template = DataproductSpecificationsTemplate.objects.get(name="empty")
    dp_feedvack_template = DataproductFeedbackTemplate.objects.get(name="empty")

    qaplots_subtask_dataproduct = Dataproduct.objects.create(directory="/data/qa/plots/L%s" % (obs_subtask.id, ),
                                                             dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value),
                                                             datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value),   # todo: is this correct?
                                                             producer=qaplots_subtask.outputs.first(),
                                                             specifications_doc=dp_spec_template.get_default_json_document_for_schema(),
                                                             specifications_template=dp_spec_template,
                                                             feedback_doc=dp_feedvack_template.get_default_json_document_for_schema(),
                                                             feedback_template=dp_feedvack_template,
                                                             sap=None  # todo: do we need to point to a SAP here? Of which dataproduct then?
                                                             )
    qaplots_subtask_dataproduct.save()

    # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
    qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
    qaplots_subtask.save()

    return qaplots_subtask

# todo: this can probably go when we switch to the new start time calculation in the model properties (which is based on this logic)
def get_previous_related_task_blueprint_with_time_offset(task_blueprint):
    """
    Retrieve the the previous related blueprint task object (if any)
    if nothing found return None, 0.
    :param task_blueprint:
    :return: previous_related_task_blueprint,
             time_offset (in seconds)
    """
    logger.info("get_previous_related_task_blueprint_with_time_offset %s (id=%s)", task_blueprint.name, task_blueprint.pk)
    previous_related_task_blueprint = None
    time_offset = 0

    scheduling_relations = list(task_blueprint.first_scheduling_relation.all()) + list(task_blueprint.second_scheduling_relation.all())
    for scheduling_relation in scheduling_relations:
        if scheduling_relation.first.id == task_blueprint.id and scheduling_relation.placement.value == "after":
            previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.second.id)
            time_offset = scheduling_relation.time_offset

        if scheduling_relation.second.id == task_blueprint.id and scheduling_relation.placement.value == "before":
            previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.first.id)
            time_offset = scheduling_relation.time_offset

    return previous_related_task_blueprint, time_offset


def _bulk_create_dataproducts_with_global_identifiers(dataproducts: list) -> list:
    """
    Bulk create the provided dataproducts in the database, and give each of them an unique global identifier.
    
    :return: the created dataproduct objects
    """

    # Bulk create identifiers, and then update the dataproducts with a link to the actual created objects.
    # This is needed as bulk_create needs to have any relations resolved.
    dp_global_identifiers = SIPidentifier.objects.bulk_create([SIPidentifier(source="TMSS") for _ in dataproducts])
    for dp, global_identifier in zip(dataproducts, dp_global_identifiers):
        dp.global_identifier = global_identifier

    return Dataproduct.objects.bulk_create(dataproducts)


def _output_root_directory(subtask: Subtask) -> str:
    """ Return the directory under which output needs to be stored. """

    # Support for several projects will be added in TMSS-689
    directory = "/data/%s/%s/L%s" % ("projects" if isProductionEnvironment() else "test-projects",
                                     subtask.project.name,
                                     subtask.id)

    return directory

def schedule_observation_subtask(observation_subtask: Subtask):
    ''' Schedule the given observation_subtask
    For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler.
    For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_scheduling(observation_subtask)

    if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk,
                                                                                                                       observation_subtask.specifications_template.type,
                                                                                                                       SubtaskType.Choices.OBSERVATION.value))

    # step 1: set state to SCHEDULING
    observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    observation_subtask.save()

    # step 1a: check start/stop times
    # start time should be known. If not raise. Then the user and/or scheduling service should supply a properly calculated/estimated start_time first.
    if observation_subtask.scheduled_on_sky_start_time is None:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no start_time" % (observation_subtask.pk,
                                                                                                                              observation_subtask.specifications_template.type))

    if observation_subtask.specified_duration < timedelta(seconds=1):
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (observation_subtask.pk,
                                                                                                                                                 observation_subtask.specifications_template.type,
                                                                                                                                                 observation_subtask.specified_duration))

    # always update the stop_time according to the spec
    observation_subtask.scheduled_on_sky_stop_time = observation_subtask.scheduled_on_sky_start_time + observation_subtask.specified_duration

    # step 2: define input dataproducts
    # NOOP: observations take no inputs

    # step 3: create output dataproducts, and link these to the output
    dataproducts = []
    specifications_doc = observation_subtask.specifications_doc
    dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP")
    dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
    dataproduct_feedback_doc = dataproduct_feedback_template.get_default_json_document_for_schema()


    # select correct output for each pointing based on name
    subtask_output_dict = {}
    output = observation_subtask.outputs.first()
    if not output:
        raise SubtaskSchedulingException('Cannot schedule subtask id=%s because it is missing the output' % (observation_subtask.id,))

    target_task_spec, calibrator_task_spec = _get_target_and_or_calibrator_specification(observation_subtask.task_blueprint)

    if target_task_spec and 'SAPs' in target_task_spec:  # target
        for sap in target_task_spec['SAPs']:
            subtask_output_dict[sap['name']] = output
    if calibrator_task_spec and 'pointing' in calibrator_task_spec:  # calibrator
        subtask_output_dict[calibrator_task_spec['name']] = output

    # create SAP objects, as observations create new beams
    antennaset = specifications_doc['stations']['antenna_set']
    antennafields = []
    for station in specifications_doc['stations']['station_list']:
        fields = antennafields_for_antennaset_and_station(antennaset, station)
        antennafields += [{"station": station, "field": field, "type": antennaset.split('_')[0]} for field in fields]

    saps = [SAP.objects.create(specifications_doc={ "name": "L%s_SAP%03d_%s" % (observation_subtask.id, sap_nr, pointing['name']),
                                                    "pointing": pointing['pointing'],
                                                    "time": {"start_time": observation_subtask.scheduled_on_sky_start_time.isoformat()+'Z',
                                                             "duration": (observation_subtask.scheduled_on_sky_stop_time - observation_subtask.scheduled_on_sky_start_time).total_seconds()},
                                                    "antennas": {
                                                      "antenna_set": antennaset,
                                                      "fields": antennafields
                                                    }
                                                  },
                               specifications_template=SAPTemplate.objects.get(name="SAP")) for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings'])]

    # store everything below this directory
    directory = _output_root_directory(observation_subtask)

    # create correlated dataproducts
    if specifications_doc['COBALT']['correlator']['enabled']:
        dataformat = Dataformat.objects.get(value=Dataformat.Choices.MEASUREMENTSET.value)
        datatype = Datatype.objects.get(value=Datatype.Choices.VISIBILITIES.value)
        dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities")
        sb_nr_offset = 0 # subband numbers run from 0 to (nr_subbands-1), increasing across SAPs

        for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']):
            if pointing['name'] in subtask_output_dict:
                subtask_output = subtask_output_dict[pointing['name']]
            else:
                raise SubtaskSchedulingException('Cannot schedule subtask id=%s because the output for pointing name=%s cannot be determined.' % (observation_subtask.id, pointing['name']))
            for sb_nr, subband in enumerate(pointing['subbands'], start=sb_nr_offset):
                dataproducts.append(Dataproduct(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
                                                         directory=os.path.join(directory, "uv"),
                                                         dataformat=dataformat,
                                                         datatype=datatype,
                                                         producer=subtask_output,
                                                         specifications_doc={"sap": pointing["name"], "subband": subband},
                                                         specifications_template=dataproduct_specifications_template,
                                                         feedback_doc=dataproduct_feedback_doc,
                                                         feedback_template=dataproduct_feedback_template,
                                                         size=0,
                                                         expected_size=0,
                                                         sap=saps[sap_nr],
                                                         global_identifier=None))

            sb_nr_offset += len(pointing['subbands'])


    # create beamformer dataproducts
    dataproduct_specifications_template_timeseries = DataproductSpecificationsTemplate.objects.get(name="time series")

    def _sap_index(saps: dict, sap_name: str) -> int:
        """ Return the SAP index in the observation given a certain SAP name. """
        sap_indices = [idx for idx,sap in enumerate(saps) if sap['name'] == sap_name]

        # needs to be exactly one hit
        if len(sap_indices) != 1:
            raise SubtaskSchedulingException("SAP name %s must appear exactly once in the specification. It appeared %d times. Available names: %s" % (sap_name, len(sap_indices), [sap['name'] for sap in saps]))

        return sap_indices[0]

    def tab_dataproducts(sap_nr, pipeline_nr, tab_nr, stokes_settings, coherent):
        nr_subbands = len(sap['subbands']) or len(specifications_doc['stations']['digital_pointings'][sap_nr]['subbands'])
        nr_stokes = len(stokes_settings['stokes'])
        nr_parts = ceil(1.0 * nr_subbands / stokes_settings['subbands_per_file'])

        cobalt_version = specifications_doc['COBALT']['version']

        return [Dataproduct(filename=f"L{observation_subtask.id}_SAP{sap_nr:03}_B{tab_nr:03}_S{stokes_nr}_P{part_nr:03}_bf.h5" if cobalt_version == 1 else
                                     f"L{observation_subtask.id}_SAP{sap_nr:03}_N{pipeline_nr:03}_B{tab_nr:03}_S{stokes_nr:03}_P{part_nr:03}_bf.h5",
                                         directory=directory+("/cs" if coherent else "/is"),
                                         dataformat=Dataformat.objects.get(value="Beamformed"),
                                         datatype=Datatype.objects.get(value="time series"),
                                         producer=observation_subtask.outputs.first(),  # todo: select correct output. I tried "subtask_output_dict[sap['name']]" but tests fail because the sap's name is not in the task blueprint. Maybe it's just test setup and this should work?
                                         specifications_doc={
                                             "sap": specifications_doc['stations']['digital_pointings'][sap_nr]["name"],
                                             "coherent": coherent,
                                             "stokes_set": stokes_settings["stokes"],
                                             "identifiers": {"sap_index": sap_nr, "pipeline_index": pipeline_nr, "tab_index": tab_nr, "stokes_index": stokes_nr, "part_index": part_nr}
                                         },
                                         specifications_template=dataproduct_specifications_template_timeseries,
                                         feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
                                         feedback_template=dataproduct_feedback_template,
                                         size=0,
                                         expected_size=1024*1024*1024*tab_nr,
                                         sap=saps[sap_nr],
                                         global_identifier=None)
                                     for part_nr in range(nr_parts) for stokes_nr in range(nr_stokes)]


    # beamformer pipelines: one set of dataproducts per TAB.
    pipeline_nr_offset = 0
    for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['tab_pipelines'], start=pipeline_nr_offset):
        for sap in pipeline['SAPs']:
            sap_idx = _sap_index(specifications_doc['stations']['digital_pointings'], sap['name'])

            for tab_idx, tab in enumerate(sap['tabs']):
                dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'] if tab['coherent'] else pipeline['incoherent'], tab['coherent'])

    # fly's eye pipelines: one set of dataproducts per antenna field.
    pipeline_nr_offset += len(specifications_doc['COBALT']['beamformer']['tab_pipelines'])
    for pipeline_nr, pipeline in enumerate(specifications_doc['COBALT']['beamformer']['flyseye_pipelines'], start=pipeline_nr_offset):
        for sap_idx, sap in enumerate(specifications_doc['stations']['digital_pointings']):
            stations = pipeline['stations'] or specifications_doc['stations']['station_list']
            fields = sum([list(antenna_fields(station, antennaset)) for station in stations], [])
            for tab_idx, tab in enumerate(fields):
                dataproducts += tab_dataproducts(sap_idx, pipeline_nr, tab_idx, pipeline['coherent'], True)

    # create the dataproducts
    _bulk_create_dataproducts_with_global_identifiers(dataproducts)

    # step 4: resource assigner (if possible)
    assign_or_unassign_resources(observation_subtask)

    # TODO: TMSS-382: evaluate the scheduled stations and see if the requiments given in the subtask.task_bluepring.specifications_doc are met for the station_groups and max_nr_missing.

    # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
    observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)

    # step 6: set scheduled_process_start/stop_time for MACScheduler
    observation_subtask.scheduled_process_start_time = observation_subtask.scheduled_on_sky_start_time - timedelta(minutes=3)
    observation_subtask.scheduled_process_stop_time = observation_subtask.scheduled_on_sky_stop_time + timedelta(minutes=1)
    observation_subtask.save()

    return observation_subtask


def _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list):
    # select subtask output the new dataproducts will be linked to
    pipeline_subtask_output = pipeline_subtask.outputs.first()  # TODO: if we have several, how to map input to output?

    # TODO: create them from the spec, instead of "copying" the input filename
    dataformat = Dataformat.objects.get(value="MeasurementSet")
    datatype = Datatype.objects.get(value="visibilities")

    # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty"
    dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="visibilities")
    dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
    directory = os.path.join(_output_root_directory(pipeline_subtask), "uv")

    # input:output mapping is 1:1
    def output_dataproduct_filename(input_dp: Dataproduct) -> str:
        """ Construct the output filename to produce for an input. """
        if '_' in input_dp.filename and input_dp.filename.startswith('L'):
            return "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
        else:
            return "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)

    output_dataproducts = [Dataproduct(filename=output_dataproduct_filename(input_dp),
                                directory=directory,
                                dataformat=dataformat,
                                datatype=datatype,
                                producer=pipeline_subtask_output,
                                specifications_doc=input_dp.specifications_doc,
                                specifications_template=dataproduct_specifications_template,
                                feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
                                feedback_template=dataproduct_feedback_template,
                                sap=input_dp.sap,
                                global_identifier=None) for input_dp in input_dataproducts]

    # create the dataproducts
    output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts)
    pipeline_subtask_output.dataproducts.set(output_dataproducts)

    transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=False) for input_dp,output_dp in zip(input_dataproducts, output_dataproducts)]
    DataproductTransform.objects.bulk_create(transforms)

    return output_dataproducts

def _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask: Subtask, input_dataproducts: list):
    # select subtask output the new dataproducts will be linked to
    pipeline_subtask_output = pipeline_subtask.outputs.first()  # TODO: if we have several, how to map input to output?

    dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
    directory = os.path.join(_output_root_directory(pipeline_subtask), "pulp")

    # ----- output tarball per input dataproduct
    # we process per data type, as later on we produce summaries per data type

    # different datatypes need different treatment
    data_types = ["cs", "is", "cv"]

    # sort input dataproducts by data type
    input_dataproducts = {
        "is": [input_dp for input_dp in input_dataproducts
               if input_dp.specifications_doc["coherent"] == False],
        "cs": [input_dp for input_dp in input_dataproducts
               if input_dp.specifications_doc["coherent"] == True
               and input_dp.specifications_doc["stokes_set"] != "XXYY"],
        "cv": [input_dp for input_dp in input_dataproducts
               if input_dp.specifications_doc["coherent"] == True
               and input_dp.specifications_doc["stokes_set"] == "XXYY"],
    }

    # in which subdirectory the output will be written
    output_subdir = {
        "is": "is",
        "cs": "cs",
        "cv": "cs",
    }

    # suffix used in filename of summary outputs
    summary_filename_suffix = {
        "cs": "CS",
        "is": "IS",
        "cv": "CV"
    }

    for data_type in data_types:
        # do not generate output for a data type that is not in the input
        if not input_dataproducts[data_type]:
            continue

        dataformat = Dataformat.objects.get(value="pulp analysis")
        datatype = Datatype.objects.get(value="pulsar profile")
        dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="time series")

        # how the output is constructed from the input. we do this based on filenames: input dataproducts that result in the same filename
        # are combined to produce the output with that filename.
        #
        # format: { output_dp_filename: {"output_dp": output_dp, "input_dps": [input_dps]} }
        transformation_map = {}

        for input_dp in input_dataproducts[data_type]:
            # the filename doesn't contain the stokes number if there is only 1 stokes for this data type or when recording cv:
            #
            # type                example filename                                 input:output mapping
            # CV                  pulp/cs/L808292_SAP000_B000_P008_bf.tar          4:1
            # IS I                pulp/is/L808290_SAP000_B001_P000_bf.tar          1:1
            # IS IQUV             pulp/is/L808290_SAP000_B001_S2_P000_bf.tar       1:1
            # CS I                pulp/cs/L808286_SAP000_B000_P000_bf.tar          1:1
            # CS IQUV             pulp/cs/L808288_SAP000_B000_S2_P000_bf.tar       1:1
            output_filename = ("L{subtask_id}_SAP{sap_index:03}_B{tab_index:03}_S{stokes_index}_P{part_index:03}_bf.tar" if input_dp.specifications_doc["stokes_set"] == "IQUV" else
                               "L{subtask_id}_SAP{sap_index:03}_B{tab_index:03}_P{part_index:03}_bf.tar").format(
                                                     subtask_id=pipeline_subtask.id,
                                                     sap_index=input_dp.specifications_doc["identifiers"]["sap_index"],
                                                     tab_index=input_dp.specifications_doc["identifiers"]["tab_index"],
                                                     stokes_index=input_dp.specifications_doc["identifiers"]["stokes_index"],
                                                     part_index=input_dp.specifications_doc["identifiers"]["part_index"])

            if output_filename in transformation_map:
                # we already modelled this output. add this input_dp to the input list
                #
                # This happens with CV only, where we've generated the same output_filename for each stokes, yet have more than one stokes (Xre,Xim,Yre,Yim)
                transformation_map[output_filename]["input_dps"].append(input_dp)
            else:
                # a new output to model
                output_dp = Dataproduct(filename=output_filename,
                                directory=os.path.join(directory, output_subdir[data_type]),
                                dataformat=dataformat,
                                datatype=datatype,
                                producer=pipeline_subtask_output,
                                specifications_doc=input_dp.specifications_doc,
                                specifications_template=dataproduct_specifications_template,
                                feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
                                feedback_template=dataproduct_feedback_template,
                                sap=input_dp.sap,
                                global_identifier=None)

                transformation_map[output_filename] = { "output_dp": output_dp, "input_dps": [input_dp] }

        # we don't need the keys anymore.. ditch them to work with a list, which is easier
        transformations = transformation_map.values()

        # list and create the dataproducts
        output_dataproducts = [t["output_dp"] for t in transformations]
        output_dataproducts = _bulk_create_dataproducts_with_global_identifiers(output_dataproducts)
        pipeline_subtask_output.dataproducts.set(output_dataproducts)

        # put the actually saved output_dataproducts back in, so we can refer to the actually created ones.
        # by iterating in the same order that we requested to save them, we can now reiterate and replace 1:1
        for nr, transform in enumerate(transformations):
            transform["output_dp"] = output_dataproducts[nr]

        # construct the transform objects
        transformation_objects = [[DataproductTransform(input=input_dp, output=transform["output_dp"], identity=False) for input_dp in transform["input_dps"]] for transform in transformations]
        # create the transforms (needs a flattened list)
        DataproductTransform.objects.bulk_create(sum(transformation_objects,[]))

        # ----- summary tarballs
        # there is a tarball for cv, cs, and is separately.
        # pulp only processes input from a single observation, so there is no possibility of needing separate
        # summaries for input from different observations.

        dataformat = Dataformat.objects.get(value="pulp summary")
        datatype = Datatype.objects.get(value="quality")
        dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="pulp summary")

        # 1. create dataproducts
        # type                example filename
        # CV                  pulp/cs/L808292_summaryCV.tar
        # IS                  pulp/is/L808290_summaryIS.tar
        # CS                  pulp/cs/L808288_summaryCS.tar
        summary_dataproduct = Dataproduct(filename="L%s_summary%s.tar" % (pipeline_subtask.id, summary_filename_suffix[data_type]),
                                    directory=os.path.join(directory, output_subdir[data_type]),
                                    dataformat=dataformat,
                                    datatype=datatype,
                                    producer=pipeline_subtask_output,
                                    specifications_doc={ "coherent": data_type != "is", "identifiers": { "data_type": data_type } },
                                    specifications_template=dataproduct_specifications_template,
                                    feedback_doc=dataproduct_feedback_template.get_default_json_document_for_schema(),
                                    feedback_template=dataproduct_feedback_template,
                                    sap=None, # TODO: Can we say anything here, as summaries cover all SAPs
                                    global_identifier=None)

        # create the dataproducts
        summary_dataproduct = _bulk_create_dataproducts_with_global_identifiers([summary_dataproduct])[0]
        pipeline_subtask_output.dataproducts.add(summary_dataproduct)

        # 2. create transforms from the input
        # populate the transform, each input_dp of this datatype is input for this summary
        transforms = [DataproductTransform(input=input_dp, output=summary_dataproduct, identity=False) for input_dp in input_dataproducts[data_type]]
        DataproductTransform.objects.bulk_create(transforms)

    return None

def schedule_pipeline_subtask(pipeline_subtask: Subtask):
    ''' Schedule the given pipeline_subtask
    This method should typically be called upon the event of an predecessor (observation) subtask being finished.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_scheduling(pipeline_subtask)

    if pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk,
                                                                                                                       pipeline_subtask.specifications_template.type,
                                                                                                                       SubtaskType.Choices.PIPELINE.value))

    # step 1: set state to SCHEDULING
    pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    pipeline_subtask.save()

    # step 1a: check start/stop times
    # not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
    if pipeline_subtask.scheduled_on_sky_start_time is None:
        now = datetime.utcnow()
        logger.info("pipeline id=%s has no starttime. assigned default: %s", pipeline_subtask.pk, formatDatetime(now))
        pipeline_subtask.scheduled_on_sky_start_time = now

    if pipeline_subtask.specified_duration < timedelta(seconds=1):
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because its specified duration is too short: %s" % (pipeline_subtask.pk,
                                                                                                                                                 pipeline_subtask.specifications_template.type,
                                                                                                                                                 pipeline_subtask.specified_duration))

    # always update the stop_time according to the spec
    pipeline_subtask.scheduled_on_sky_stop_time = pipeline_subtask.scheduled_on_sky_start_time + pipeline_subtask.specified_duration

    # step 2: link input dataproducts
    if pipeline_subtask.inputs.count() == 0:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk,
                                                                                                                            pipeline_subtask.specifications_template.type))

    # iterate over all inputs
    input_dataproducts = []
    for pipeline_subtask_input in pipeline_subtask.inputs.all():

        # select and set input dataproducts that meet the filter defined in selection_doc
        dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all()
                        if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)]

        if len(dataproducts) == 0:
            raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s because input id=%s has no (filtered) dataproducts" % (pipeline_subtask.pk,
                                                                                                                                                        pipeline_subtask.specifications_template.type,
                                                                                                                                                        pipeline_subtask_input.id))

        pipeline_subtask_input.dataproducts.set(dataproducts)
        input_dataproducts.extend(dataproducts)

    # step 3: create output dataproducts, and link these to the output
    if pipeline_subtask.specifications_template.name == "preprocessing pipeline":
        _create_preprocessing_output_dataproducts_and_transforms(pipeline_subtask, input_dataproducts)
    elif pipeline_subtask.specifications_template.name == "pulsar pipeline":
        _create_pulsar_pipeline_output_dataproducts_and_transforms(pipeline_subtask, input_dataproducts)

    # step 4: resource assigner (if possible)
    assign_or_unassign_resources(pipeline_subtask)

    # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
    pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
    pipeline_subtask.save()

    return pipeline_subtask


def schedule_ingest_subtask(ingest_subtask: Subtask):
    ''' Schedule the given ingest_subtask
    This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_scheduling(ingest_subtask)

    if ingest_subtask.specifications_template.type.value != SubtaskType.Choices.INGEST.value:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (ingest_subtask.pk,
                                                                                                                       ingest_subtask.specifications_template.type,
                                                                                                                       SubtaskType.Choices.INGEST.value))

    # check permission pre-requisites
    scheduling_unit_blueprint = ingest_subtask.task_blueprint.scheduling_unit_blueprint    # first() is fine because we assume an ingest subtask does not serve tasks across SU boundaries
    if scheduling_unit_blueprint.ingest_permission_required:
        if scheduling_unit_blueprint.ingest_permission_granted_since is None or scheduling_unit_blueprint.ingest_permission_granted_since > datetime.utcnow():
            raise SubtaskSchedulingException("Cannot schedule ingest subtask id=%d because it requires explicit permission and the permission has not been granted (yet)" % (ingest_subtask.pk,))

    # step 1: set state to SCHEDULING
    ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    ingest_subtask.save()

    # step 1a: set start/stop times
    # not very relevant for ingest subtasks, but it's nice for the user to see when the ingest task was scheduled.
    # please note that an ingest subtask may idle for some time while it is in the ingest queue.
    # the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops.
    ingest_subtask.scheduled_on_sky_start_time = max([pred.scheduled_on_sky_stop_time for pred in ingest_subtask.predecessors] + [datetime.utcnow()])
    ingest_subtask.scheduled_on_sky_stop_time = ingest_subtask.scheduled_on_sky_start_time  + timedelta(hours=6)

    # step 2: link input dataproducts
    if ingest_subtask.inputs.count() == 0:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (ingest_subtask.pk,
                                                                                                               ingest_subtask.specifications_template.type))

    if ingest_subtask.inputs.select_related('producer.dataproducts').count() == 0:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input dataproduct(s)" % (ingest_subtask.pk,
                                                                                                                           ingest_subtask.specifications_template.type))

    # iterate over all inputs
    for ingest_subtask_input in ingest_subtask.inputs.all():

        # select and set input dataproducts that meet the filter defined in selection_doc
        input_dataproducts = [dataproduct for dataproduct in ingest_subtask_input.producer.dataproducts.all()
                              if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, ingest_subtask_input.selection_doc)]
        ingest_subtask_input.dataproducts.set(input_dataproducts)

        # define output and create output dataproducts.
        ingest_subtask_output = SubtaskOutput.objects.create(subtask=ingest_subtask)

        # prepare identifiers in bulk for each output_dataproduct
        dp_gids = [SIPidentifier(source="TMSS") for _ in input_dataproducts]
        SIPidentifier.objects.bulk_create(dp_gids)

        output_dataproducts = [Dataproduct(filename=input_dp.filename, # overwritten later by ingest 'feedback'. Is determined at transfer time by the LTA.
                                           directory="LTA", # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA.
                                           dataformat=input_dp.dataformat,
                                           datatype=input_dp.datatype,
                                           specifications_doc=input_dp.specifications_doc,
                                           specifications_template=input_dp.specifications_template,
                                           producer=ingest_subtask_output,
                                           size=None,  # filled in later by ingest 'feedback'. Is determined at transfer time by the LTA.
                                           feedback_doc=input_dp.feedback_doc, # copy dp feedback from input dp. The ingest subtask does not alter the feedback/dataproducts.
                                           feedback_template=input_dp.feedback_template,
                                           sap=input_dp.sap,
                                           global_identifier=dp_gid) for input_dp, dp_gid in zip(input_dataproducts, dp_gids)]
        Dataproduct.objects.bulk_create(output_dataproducts)

        # link each input to each corresponding output dataproduct. identity=True because this is "just a copy".
        dataproduct_transforms = [DataproductTransform(input=input_dp, output=output_dp, identity=True)
                                  for input_dp, output_dp in zip(input_dataproducts, output_dataproducts)]
        DataproductTransform.objects.bulk_create(dataproduct_transforms)


    # skip step 4: ingest does not need to have resources assigned

    # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
    ingest_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
    ingest_subtask.save()

    return ingest_subtask


def schedule_cleanup_subtask(cleanup_subtask: Subtask):
    ''' Schedule the given cleanup_subtask
    This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_scheduling(cleanup_subtask)

    if cleanup_subtask.specifications_template.type.value != SubtaskType.Choices.CLEANUP.value:
        raise SubtaskSchedulingSpecificationException("Cannot schedule subtask id=%d type=%s but type should be %s" % (cleanup_subtask.pk,
                                                                                                                       cleanup_subtask.specifications_template.type,
                                                                                                                       SubtaskType.Choices.CLEANUP.value))

    # step 1: set state to SCHEDULING
    cleanup_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    cleanup_subtask.save()

    # step 1a: set start/stop times
    # not very relevant for ingest subtasks, but it's nice for the user to see when the cleanup task was scheduled.
    # please note that an cleanup subtask may idle for some time while it is in the cleanup queue.
    # the actual start/stop times are set by the IngestTMSSAdapter when the subtask starts and stops.
    cleanup_subtask.scheduled_on_sky_start_time = max([pred.scheduled_on_sky_stop_time for pred in cleanup_subtask.predecessors] + [datetime.utcnow()])
    cleanup_subtask.scheduled_on_sky_stop_time = cleanup_subtask.scheduled_on_sky_start_time  + timedelta(hours=1)

    # step 2: link input dataproducts
    if cleanup_subtask.inputs.count() == 0:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (cleanup_subtask.pk,
                                                                                                               cleanup_subtask.specifications_template.type))

    # iterate over all inputs
    for cleanup_subtask_input in cleanup_subtask.inputs.all():
        # select and set input dataproducts that meet the filter defined in selection_doc
        input_dataproducts = [dataproduct for dataproduct in cleanup_subtask_input.producer.dataproducts.all()
                              if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, cleanup_subtask_input.selection_doc)]
        cleanup_subtask_input.dataproducts.set(input_dataproducts)

    # cleanup has no outputs

    # skip step 4: cleanup does not need to have resources assigned

    # step 5: set state to SCHEDULED (resulting in the cleanup_service to pick this subtask up and run it)
    cleanup_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
    cleanup_subtask.save()

    return cleanup_subtask


def schedule_copy_subtask(copy_subtask: Subtask):
    ''' Schedule the given copy_subtask
    This method should typically be called upon the event of an predecessor (pipeline or observation) subtask being finished.
    This method implements "Scheduling subtasks" step from the "Specification Flow"
    https://support.astron.nl/confluence/display/TMSS/Specification+Flow
    '''
    # step 0: check pre-requisites
    check_prerequities_for_scheduling(copy_subtask)

    if copy_subtask.specifications_template.type.value != SubtaskType.Choices.COPY.value:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (copy_subtask.pk,
                                                                                                          copy_subtask.specifications_template.type,
                                                                                                          SubtaskType.Choices.COPY.value))

    # step 1: set state to SCHEDULING
    copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
    copy_subtask.save()

    # step 1a: check start/stop times
    # not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
    if copy_subtask.scheduled_on_sky_start_time is None:
        now = datetime.utcnow()
        logger.info("copy id=%s has no starttime. assigned default: %s", copy_subtask.pk, formatDatetime(now))
        copy_subtask.scheduled_on_sky_start_time = now

    if copy_subtask.scheduled_on_sky_stop_time is None:
        stop_time = copy_subtask.scheduled_on_sky_start_time  + timedelta(hours=+1)
        logger.info("copy id=%s has no stop_time. assigned default: %s", copy_subtask.pk, formatDatetime(stop_time))
        copy_subtask.scheduled_on_sky_stop_time = stop_time

    # step 2: link input dataproducts
    if copy_subtask.inputs.count() == 0:
        raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (copy_subtask.pk,
                                                                                                               copy_subtask.specifications_template.type))

    # iterate over all inputs
    for copy_subtask_input in copy_subtask.inputs.all():

        # select and set input dataproducts that meet the filter defined in selection_doc
        dataproducts = [dataproduct for dataproduct in copy_subtask_input.producer.dataproducts.all()
                        if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, copy_subtask_input.selection_doc)]
        copy_subtask_input.dataproducts.set(dataproducts)

        # todo: I assume that there is no RA involvement here? If there is, how does a copy parset look like?
        # step 4: resource assigner (if possible)
        #_assign_resources(copy_subtask)

        # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
        copy_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
        copy_subtask.save()

    return copy_subtask

# === Misc ===

def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint, start_time: datetime=None) -> [Subtask]:
    '''Convenience method: Schedule (and return) the subtasks in the task_blueprint that are not dependend on any predecessors'''
    independent_subtasks = list(Subtask.independent_subtasks().filter(task_blueprint__id=task_blueprint.id, state__value=SubtaskState.Choices.DEFINED.value).all())

    for subtask in independent_subtasks:
        if start_time is not None:
            subtask.scheduled_on_sky_start_time = start_time
        schedule_subtask_and_update_successor_start_times(subtask)

    return independent_subtasks


def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs):
    subtask_specs = default_subtask_specs
    subtask_specs['storagemanager'] = preprocessing_task_specs['storagemanager']
    subtask_specs['cluster_resources'] = preprocessing_task_specs['cluster_resources']

    # averaging (performed by the demixer)
    subtask_specs["demixer"]["enabled"]         = True
    subtask_specs['demixer']["frequency_steps"] = preprocessing_task_specs['average']['frequency_steps']
    subtask_specs['demixer']["time_steps"]      = preprocessing_task_specs['average']['time_steps']

    # demixing
    subtask_specs['demixer']["demix_frequency_steps"] = preprocessing_task_specs['demix']['frequency_steps']
    subtask_specs['demixer']["demix_time_steps"]      = preprocessing_task_specs['demix']['time_steps']
    subtask_specs['demixer']["ignore_target"]         = preprocessing_task_specs['demix']['ignore_target']
    subtask_specs['demixer']["demix_always"]          = preprocessing_task_specs['demix']['sources']
    subtask_specs['demixer']["demix_if_needed"]       = []

    # flagging
    if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none':
        subtask_specs["aoflagger"]["enabled"] = True
        subtask_specs["aoflagger"]["strategy"] = preprocessing_task_specs["flag"]["rfi_strategy"]
    else:
        subtask_specs["aoflagger"]["enabled"] = False

    if preprocessing_task_specs["flag"]["outerchannels"]:
        subtask_specs["preflagger0"]["enabled"] = True
        subtask_specs["preflagger0"]["channels"] = "0..nchan/32-1,31*nchan/32..nchan-1"
    else:
        subtask_specs["preflagger0"]["enabled"] = False

    if preprocessing_task_specs["flag"]["autocorrelations"]:
        subtask_specs["preflagger1"]["enabled"] = True
        subtask_specs["preflagger1"]["corrtype"] = "auto"
    else:
        subtask_specs["preflagger1"]["enabled"] = False

    return subtask_specs


def _generate_subtask_specs_from_pulsar_pipeline_task_specs(pipeline_task_specs, default_subtask_specs):
    subtask_specs = {}
    subtask_specs['cluster_resources'] = pipeline_task_specs['cluster_resources']

    # Pulsar to fold
    if pipeline_task_specs["pulsar"]["strategy"] == "manual":
        # pulsar is specified explicitly
        subtask_specs["pulsar"] = pipeline_task_specs["pulsar"]["name"]
    else:
        # search for the pulsar (f.e. in a library, based on the SAP direction)
        subtask_specs["pulsar"] = pipeline_task_specs["pulsar"]["strategy"]

    subtask_specs["single_pulse"] = pipeline_task_specs["single_pulse_search"]

    # PRESTO
    presto_specs = pipeline_task_specs["presto"]
    subtask_specs["presto"] = {}
    subtask_specs["presto"]["2bf2fits_extra_opts"] = "-nsamples={samples_per_block}".format(**presto_specs["input"])
    subtask_specs["presto"]["decode_nblocks"]      = presto_specs["input"]["nr_blocks"]
    subtask_specs["presto"]["decode_sigma"]        = presto_specs["input"]["decode_sigma"]
    subtask_specs["presto"]["nofold"]              = not presto_specs["fold_profile"]
    subtask_specs["presto"]["skip_prepfold"]       = not presto_specs["prepfold"]
    subtask_specs["presto"]["rrats"]               = presto_specs["rrats"]["enabled"]
    subtask_specs["presto"]["rrats_dm_range"]      = presto_specs["rrats"]["dm_range"]
    subtask_specs["presto"]["prepdata_extra_opts"] = " ".join([
              "-dm {dm}".format(**presto_specs["prepdata"]) if presto_specs["prepdata"]["dm"] != -1 else "",
              ])

    subtask_specs["presto"]["prepfold_extra_opts"] = ""
    subtask_specs["presto"]["prepsubband_extra_opts"] = ""
    subtask_specs["presto"]["rfifind_extra_opts"]  = "".join([
              "-blocks {blocks}".format(**presto_specs["rfifind"]),
              ])


    # DSPSR
    dspsr_specs = pipeline_task_specs["dspsr"]
    digifil_specs = dspsr_specs["digifil"]
    filterbank_specs = dspsr_specs["filterbank"]
    subtask_specs["dspsr"] = {}
    subtask_specs["dspsr"]["skip_dspsr"]           = not dspsr_specs["enabled"]
    subtask_specs["dspsr"]["digifil_extra_opts"]   = " ".join([
              "-D {dm}".format(**digifil_specs) if digifil_specs["dm"] != -1 else "",
              "-t {integration_time}".format(**digifil_specs),
              "-f {frequency_channels}{dedispersion}".format(**digifil_specs, dedispersion=":D" if digifil_specs["coherent_dedispersion"] else ""),
              ])

    subtask_specs["dspsr"]["nopdmp"]               = not dspsr_specs["optimise_period_dm"]
    subtask_specs["dspsr"]["norfi"]                = not dspsr_specs["rfi_excision"]
    subtask_specs["dspsr"]["tsubint"]              = dspsr_specs["subintegration_length"]
    subtask_specs["dspsr"]["dspsr_extra_opts"]     = " ".join([
              "-U minX1 -t 1", # minimise memory usage, and use 1 thread
              "-s -K" if dspsr_specs["single_pulse_subintegration"] else "",
              "-F {frequency_channels}{dedispersion}".format(**filterbank_specs, dedispersion=":D" if filterbank_specs["coherent_dedispersion"] else "") if filterbank_specs["enabled"] else "",
              ])

    # output
    output_specs = pipeline_task_specs["output"]
    subtask_specs["output"] = {}
    subtask_specs["output"]["raw_to_8bit"]                   = output_specs["quantisation"]["enabled"]
    subtask_specs["output"]["8bit_conversion_sigma"]         = output_specs["quantisation"]["scale"]
    subtask_specs["output"]["skip_dynamic_spectrum"]         = not output_specs["dynamic_spectrum"]["enabled"]
    subtask_specs["output"]["dynamic_spectrum_time_average"] = output_specs["dynamic_spectrum"]["time_average"]

    return subtask_specs


def specifications_doc_meets_selection_doc(specifications_doc, selection_doc):
    """
    Filter specs by selection. This requires the specification_doc to...
    A) ...contain ALL KEYS that we select / filter for
    B) ...contain NO ADDITIONAL VALUES that are not selected / filtered for
    :param specifications_doc: dataproduct specification as dict
    :param selection_doc: selection filter as dict
    :return: True when the input specifications_doc meets a filter described in selection_doc, False otherwise
    """
    meets_criteria = True
    for k, v in selection_doc.items():
        if k.startswith('$'):  # ignore stuff like $schema
            continue
        if k not in specifications_doc.keys():
            meets_criteria = False
        else:
            spec = specifications_doc[k]
            if isinstance(spec, list) and isinstance(v, list):
                for spec_v in spec:
                    if spec_v not in v:
                        meets_criteria = False
            elif isinstance(v, list):
                if spec not in v:
                    meets_criteria = False
            else:
                if spec != v:
                    meets_criteria = False

    logger.debug("specs %s matches selection %s: %s" % (specifications_doc, selection_doc, meets_criteria))
    return meets_criteria


def get_observation_task_specification_with_check_for_calibrator(subtask):
    """
    Retrieve the observation task blueprint specifications_doc from the given subtask object
    If the Task is a calibrator then the related Target Observation specification should be returned
    :param: subtask object
    :return: task_spec: the specifications_doc of the blue print task which is allways a target observation
    """
    if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
        # Calibrator requires related Target Task Observation for some specifications
        target_task_blueprint, _ = get_related_target_observation_task_blueprint(subtask.task_blueprint)
        if target_task_blueprint is None:
            raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk)
        task_spec = target_task_blueprint.specifications_doc
        logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s",
                    subtask.task_blueprint.id, target_task_blueprint.id)
    else:
        task_spec = subtask.task_blueprint.specifications_doc
    return task_spec


def cancel_subtask(subtask: Subtask) -> Subtask:
    '''Generic cancelling method for subtasks. Calls the appropriate cancel method based on the subtask's type.'''

    if subtask.state.value == SubtaskState.Choices.CANCELLED.value:
        logger.info("cancel_subtask: subtask id=%s type=%s is already cancelled.", subtask.id, subtask.specifications_template.type.value)
        return subtask

    # check prerequisites, blocks illegal state transitions, like from any -ING state.
    check_prerequities_for_cancelling(subtask)

    try:
        if subtask.state.value == SubtaskState.Choices.SCHEDULED.value:
            # the scheduled subtask still claims a timeslot and future resources.
            # unschedule the subtask, and make sure the post_state is CANCELLING and not DEFINED in order to not trigger any (dynamic) schedulers.
            logger.info("Unscheduling subtask subtask id=%s type=%s before it can be cancelled...", subtask.id, subtask.specifications_template.type.value)
            unschedule_subtask(subtask, post_state=SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLING.value))
        else:
            # no need to unschedule, but we may need to kill the running subtask...
            needs_to_kill_subtask = subtask.state.value in (SubtaskState.Choices.QUEUED.value, SubtaskState.Choices.STARTED.value)

            # set the state to CANCELLING
            logger.info("Cancelling subtask id=%s type=%s state=%s", subtask.id, subtask.specifications_template.type.value, subtask.state.value)
            subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLING.value)
            subtask.save()

            if needs_to_kill_subtask:
                # kill the queued/started subtask, depending on type
                if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
                    kill_observation_subtask(subtask)
                elif subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
                    kill_pipeline_subtask(subtask)
                else:
                    raise SubtaskCancellingException("Cannot kill subtask id=%s of type=%s" % (subtask.id, subtask.specifications_template.type.value))

        # finished cancelling, set to CANCELLED
        subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.CANCELLED.value)
        subtask.save()
        logger.info("Cancelled subtask id=%s type=%s state=%s", subtask.id, subtask.specifications_template.type.value, subtask.state.value)
    except Exception as e:
        logger.error("Error while cancelling subtask id=%s type=%s state=%s '%s'", subtask.id, subtask.specifications_template.type.value, subtask.state.value, e)
        subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
        subtask.error_reason = f'{e}'
        subtask.save()
        if isinstance(e, SubtaskCancellingException):
            # we intentionally raised the SubtaskCancellingException, so re-raise it and let the caller handle it
            raise

    return subtask


def cancel_subtask_and_successors(subtask: Subtask) -> Subtask:
    '''cancel this given subtask and all the downstream successor subtasks (recurses, following the successors of successors of successors of... etc'''
    cancel_subtask(subtask)
    cancel_subtask_successors(subtask)
    return subtask


def cancel_subtask_successors(subtask: Subtask):
    '''cancel all the downstream successor subtasks (recurses, following the successors of successors of successors of... etc'''
    for successor in subtask.successors:
        cancel_subtask_and_successors(successor)


def kill_observation_subtask(subtask: Subtask) -> bool:
    '''Kill the observation subtask. Return True if actually killed.'''
    if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
        with ObservationControlRPCClient.create() as obs_control_client:
            return obs_control_client.abort_observation(subtask.id)['aborted']
    return False


def kill_pipeline_subtask(subtask: Subtask) -> bool:
    '''Kill the pipeline subtask. Return True if actually killed.'''
    if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
        with PipelineControlRPCClient.create() as pipeline_control_client:
            return pipeline_control_client.cancel_pipeline(subtask.id)['canceled']
    return False