Skip to content
Snippets Groups Projects
Select Git revision
  • 123d2767d7402d1c1b390c69da18be513b25c1bb
  • master default protected
  • plot_using_locky
  • poppy_integration_v50
  • optimize_workflow
  • hotfix/disable_shm
  • poppy_integration
  • releases/v5.0 protected
  • use-versioned-releases
  • releases/v5.0rc2 protected
  • releases/v5.0rc1 protected
  • releases/ldv_v407_atdb protected
  • ldv_v407_debug
  • releases/ldv_v406_debug protected
  • releases/ldv_v405 protected
  • releases/ldv_v404 protected
  • v5.0
  • v5.0rc2
  • v5.0rc1
  • ldv_v406_debug
  • ldv_v405_debug
  • ldv_v404
  • ldv_v403
  • ldv_v402
  • v4.0
  • ldv_v401
  • ldv_v40
  • ldv_v031
  • ldv_v03
  • ldv_v01
30 results

LoSoTo.ClockTec.cwl

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    subtasks.py 58.66 KiB
    import logging
    logger = logging.getLogger(__name__)
    
    from functools import cmp_to_key
    from collections.abc import Iterable
    
    from lofar.common.datetimeutils import formatDatetime
    from lofar.common import isProductionEnvironment
    from lofar.common.json_utils import add_defaults_to_json_object_for_schema, get_default_json_object_for_schema
    from lofar.common.lcu_utils import get_current_stations
    
    from lofar.sas.tmss.tmss.exceptions import SubtaskCreationException, SubtaskSchedulingException
    
    from datetime import datetime, timedelta
    from lofar.common.datetimeutils import parseDatetime
    from lofar.common.json_utils import add_defaults_to_json_object_for_schema
    from lofar.sas.tmss.tmss.tmssapp.models import *
    from lofar.sas.resourceassignment.resourceassigner.rarpc import RARPC
    from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
    from lofar.sas.tmss.tmss.tmssapp.adapters.parset import convert_to_parset_dict
    from lofar.sas.resourceassignment.taskprescheduler.cobaltblocksize import CorrelatorSettings, BlockConstraints, BlockSize
    
    # ==== various create* methods to convert/create a TaskBlueprint into one or more Subtasks ====
    
    def check_prerequities_for_subtask_creation(task_blueprint: TaskBlueprint) -> bool:
        if task_blueprint.do_cancel:
            raise SubtaskCreationException("Cannot create subtasks from task_blueprint id=%d, because the task_blueprint is explicit set to cancel." % task_blueprint.id)
    
        return True
    
    def create_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
        '''Generic create-method for subtasks. Calls the appropriate create method based on the task_blueprint specifications_template name.'''
        check_prerequities_for_subtask_creation(task_blueprint)
    
        subtasks = []
    
        # recurse over predecessors, so that all dependencies in predecessor subtasks can be met.
        for predecessor in task_blueprint.predecessors.all():
            subtasks.extend(create_subtasks_from_task_blueprint(predecessor))
    
        if task_blueprint.subtasks.count() > 0:
            logger.debug("skipping creation of subtasks because they already exist for task_blueprint id=%s, name='%s', task_template_name='%s'",
                         task_blueprint.id, task_blueprint.name, task_blueprint.specifications_template.name)
            return subtasks
    
        # fixed mapping from template name to generator functions which create the list of subtask(s) for this task_blueprint
        generators_mapping = {'target observation': [create_observation_control_subtask_from_task_blueprint,
                                                     create_qafile_subtask_from_task_blueprint,
                                                     create_qaplots_subtask_from_task_blueprint],
                              'preprocessing pipeline': [create_preprocessing_subtask_from_task_blueprint]}
        generators_mapping['calibrator observation'] = generators_mapping['target observation']
    
        template_name = task_blueprint.specifications_template.name
        if  template_name in generators_mapping:
            generators = generators_mapping[template_name]
            for generator in generators:
                try:
                    subtask = generator(task_blueprint)
                    if subtask is not None:
                        subtasks.append(subtask)
                except SubtaskCreationException as e:
                    logger.error(e)
            return subtasks
        else:
            logger.error('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
            raise SubtaskCreationException('Cannot create subtasks for task id=%s because no generator exists for its schema name=%s' % (task_blueprint.pk, template_name))
    
    
    def create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint: TaskBlueprint) -> (dict, SubtaskTemplate):
        """
        Create a valid observation subtask specification ('observation control' SubtaskTemplate schema) based on the task_blueprint's settings
        """
    
        # check if task_blueprint has an observation-like specification
        if task_blueprint.specifications_template.name.lower() not in ['target observation', 'calibrator observation']:
            raise SubtaskCreationException("Cannot create observation subtask specifications from task_blueprint id=%s with template name='%s'" % (
                                           task_blueprint.id, task_blueprint.specifications_template.name))
    
        # start with an observation subtask specification with all the defaults and the right structure according to the schema
        subtask_template = SubtaskTemplate.objects.get(name='observation control')
        subtask_spec = get_default_json_object_for_schema(subtask_template.schema)
    
        # wipe the default pointings, these should come from the task_spec
        subtask_spec['stations']['analog_pointing'] = {}
        subtask_spec['stations']['digital_pointings'] = []
    
        # now go over the settings in the task_spec and 'copy'/'convert' them to the subtask_spec
        task_spec = task_blueprint.specifications_doc
    
        # The calibrator has a minimal calibration-specific specification subset.
        # The rest of it's specs are 'shared' with the target observation.
        # So... copy the calibrator specs first, then loop over the shared target/calibrator specs...
        if 'calibrator' in task_blueprint.specifications_template.name.lower():
            # Calibrator requires related Target Task Observation for some specifications
            target_task_blueprint = get_related_target_observation_task_blueprint(task_blueprint)
            if target_task_blueprint is None:
                raise SubtaskCreationException("Cannot create calibrator observation subtask specifications from task_blueprint id=%s with template name='%s' because no related target observation task_blueprint is found" % (
                                               task_blueprint.id, task_blueprint.specifications_template.name))
            target_task_spec = target_task_blueprint.specifications_doc
    
            if task_spec.get('autoselect', True):
                logger.info("auto-selecting calibrator target based on elevation of target observation...")
                # Get related Target Observation Task
                if "tile_beam" in target_task_spec:
                    subtask_spec['stations']['analog_pointing'] = {
                        "direction_type": target_task_spec["tile_beam"]["direction_type"],
                        "angle1": target_task_spec["tile_beam"]["angle1"],
                        "angle2": target_task_spec["tile_beam"]["angle2"]}
                else:
                    raise SubtaskCreationException("Cannot determine the pointing specification from task_blueprint "
                                                   "id=%s in auto-select mode, because the related target observation "
                                                   "task_blueprint id=%s has no tile beam pointing defined" % (
                                                    task_blueprint.id, target_task_blueprint.id))
            else:
                subtask_spec['stations']['analog_pointing'] = {"direction_type": task_spec["pointing"]["direction_type"],
                                                               "angle1": task_spec["pointing"]["angle1"],
                                                               "angle2": task_spec["pointing"]["angle2"]}
    
            # for the calibrator, the digital pointing is equal to the analog pointing
            subtask_spec['stations']['digital_pointings'] = [ {'name': 'calibrator', # there is no name for the calibrator pointing in the task spec
                                                               'subbands': list(range(0,488)), # there are no subbands for the calibrator pointing in the task spec
                                                               'pointing': subtask_spec['stations']['analog_pointing'] } ]
            # Use the Task Specification of the Target Observation
            task_spec = target_task_spec
            logger.info("Using station and correlator settings for calibrator observation task_blueprint id=%s from target observation task_blueprint id=%s",
                        task_blueprint.id, target_task_blueprint.id)
    
        subtask_spec['stations']["antenna_set"] = task_spec["antenna_set"]
        subtask_spec['stations']["filter"] = task_spec["filter"]
    
        if "stations" in task_spec:
            if "group" in task_spec["stations"]:
                try:
                    # retrieve stations in group from RADB virtual instrument
                    station_group_name = task_spec["stations"]["group"]
                    subtask_spec['stations']['station_list'] = get_stations_in_group(station_group_name)
                except Exception as e:
                    raise SubtaskCreationException("Could not determine stations in group '%s' for task_blueprint id=%s. Error: %s" % (
                        station_group_name, task_blueprint.id, e))
            else:
                subtask_spec['stations']['station_list'] = task_spec["stations"]
    
        if 'calibrator' not in task_blueprint.specifications_template.name.lower():
            # copy/convert the analoge/digital_pointings only for non-calibrator observations (the calibrator has its own pointing)
            for sap in task_spec.get("SAPs", []):
                subtask_spec['stations']['digital_pointings'].append(
                    {"name": sap["name"],
                     "pointing": {"direction_type": sap["digital_pointing"]["direction_type"],
                                  "angle1": sap["digital_pointing"]["angle1"],
                                  "angle2": sap["digital_pointing"]["angle2"]},
                     "subbands": sap["subbands"]
                     })
    
            if "tile_beam" in task_spec:
                subtask_spec['stations']['analog_pointing'] = { "direction_type": task_spec["tile_beam"]["direction_type"],
                                                                "angle1": task_spec["tile_beam"]["angle1"],
                                                                "angle2": task_spec["tile_beam"]["angle2"] }
    
        if "correlator" in task_spec:
            corr = CorrelatorSettings()
            corr.nrChannelsPerSubband = task_spec["correlator"]["channels_per_subband"]
            corr.integrationTime      = task_spec["correlator"]["integration_time"]
            calculator = BlockSize(constraints=BlockConstraints(correlatorSettings=corr))
            subtask_spec["COBALT"]["correlator"]["blocks_per_integration"] = calculator.nrBlocks
            subtask_spec["COBALT"]["correlator"]["integrations_per_block"] = calculator.nrSubblocks
    
        # make sure that the subtask_spec is valid conform the schema
        validate_json_against_schema(subtask_spec, subtask_template.schema)
    
        return subtask_spec, subtask_template
    
    
    def get_stations_in_group(station_group_name: str) -> []:
        '''Get a list of station names in the given station_group.
        A lookup is performed in the RADB, in the virtual instrument table'''
        with RADBRPC.create() as radbrpc:
            resource_group_memberships = radbrpc.getResourceGroupMemberships()['groups']
            station_resource_group = next(rg for rg in resource_group_memberships.values()
                                          if (rg['resource_group_type'] == 'station_group' or rg['resource_group_type'] == 'virtual') and rg['resource_group_name'] == station_group_name)
            station_names = set(resource_group_memberships[rg_id]['resource_group_name'] for rg_id in station_resource_group['child_ids']
                                if resource_group_memberships[rg_id]['resource_group_type'] == 'station')
    
            # HACK, RS408 should be removed from the RADB
            if 'RS408' in station_names:
                station_names.remove('RS408')
    
            return sorted(list(station_names))
    
    
    def get_related_target_observation_task_blueprint(calibrator_task_blueprint: TaskBlueprint) -> TaskBlueprint:
        """
        get the related target observation task_blueprint for the given calibrator task_blueprint
        if nothing found return None
        """
        if 'calibrator' not in calibrator_task_blueprint.specifications_template.name.lower():
            raise ValueError("Cannot get a related target observation task_blueprint for non-calibrator task_blueprint id=%s template_name='%s'",
                            calibrator_task_blueprint.id, calibrator_task_blueprint.specifications_template.name)
    
        try:
            return next(relation.second for relation in TaskSchedulingRelationBlueprint.objects.filter(first=calibrator_task_blueprint).all()
                        if relation.second is not None and relation.second.specifications_template.name.lower() == 'target observation')
        except StopIteration:
            try:
                return next(relation.first for relation in TaskSchedulingRelationBlueprint.objects.filter(second=calibrator_task_blueprint).all()
                            if relation.first is not None and relation.first.specifications_template.name.lower() == 'target observation')
            except StopIteration:
                logger.info("No related target observation task_blueprint found for calibrator observation task_blueprint id=%d", calibrator_task_blueprint.id)
    
        return None
    
    
    def create_observation_control_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
        """
        Create an observation control subtask .
        This method implements "Instantiate subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        """
        # step 0: check pre-requisites
        check_prerequities_for_subtask_creation(task_blueprint)
    
        # step 1: create subtask in defining state
        specifications_doc, subtask_template = create_observation_subtask_specifications_from_observation_task_blueprint(task_blueprint)
        cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
        subtask_data = { "start_time": None,
                         "stop_time": None,
                         "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                         "specifications_doc": specifications_doc,
                         "task_blueprint": task_blueprint,
                         "specifications_template": subtask_template,
                         "tags": [],
                         "priority": 1,
                         "schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
                         "cluster": Cluster.objects.get(name=cluster_name)
                         }
        subtask = Subtask.objects.create(**subtask_data)
    
        # step 2: create and link subtask input/output
        # an observation has no input, it just produces output data
        subtask_output = SubtaskOutput.objects.create(subtask=subtask)
    
        # step 3: set state to DEFINED
        subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
        subtask.save()
        return subtask
    
    
    def create_qafile_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
        observation_subtasks = [st for st in task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
        if not observation_subtasks:
            raise SubtaskCreationException("Cannot create %s subtask for task_blueprint id=%d because it has no observation subtask(s)" % (
                SubtaskType.Choices.QA_FILES.value, task_blueprint.pk))
    
        observation_subtask = observation_subtasks[-1] # TODO: decide what to do when there are multiple observation subtasks?
        return create_qafile_subtask_from_observation_subtask(observation_subtask)
    
    
    def create_qafile_subtask_from_observation_subtask(observation_subtask: Subtask) -> Subtask:
        ''' Create a subtask to convert the observation output to a QA h5 file.
        This method implements "Instantiate subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
        # step 0: check pre-requisites
        check_prerequities_for_subtask_creation(observation_subtask.task_blueprint)
    
        if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
            raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
                SubtaskType.Choices.QA_FILES.value, observation_subtask.pk,
                observation_subtask.specifications_template.type, SubtaskType.Choices.OBSERVATION.value))
    
        if observation_subtask.state.value == SubtaskState.Choices.DEFINING.value:
            raise ValueError("Cannot create %s subtask for subtask id=%d because it is not DEFINED" % (
                SubtaskType.Choices.QA_FILES.value, observation_subtask.pk))
    
        obs_task_spec = get_observation_task_specification_with_check_for_calibrator(observation_subtask)
        obs_task_qafile_spec = obs_task_spec.get("QA", {}).get("file_conversion", {})
    
        if not obs_task_qafile_spec.get("enabled", False):
            logger.debug("Skipping creation of qafile_subtask because QA.file_conversion is not enabled")
            return None
    
        # step 1: create subtask in defining state, with filled-in subtask_template
        qafile_subtask_template = SubtaskTemplate.objects.get(name="QA file conversion")
        qafile_subtask_spec = add_defaults_to_json_object_for_schema({}, qafile_subtask_template.schema)
        qafile_subtask_spec['nr_of_subbands'] = obs_task_qafile_spec.get("nr_of_subbands")
        qafile_subtask_spec['nr_of_timestamps'] = obs_task_qafile_spec.get("nr_of_timestamps")
        validate_json_against_schema(qafile_subtask_spec, qafile_subtask_template.schema)
    
        qafile_subtask_data = { "start_time": None,
                                "stop_time": None,
                                "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                                "task_blueprint": observation_subtask.task_blueprint,
                                "specifications_template": qafile_subtask_template,
                                "specifications_doc": qafile_subtask_spec,
                                "priority": 1,
                                "schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
                                "cluster": observation_subtask.cluster}
        qafile_subtask = Subtask.objects.create(**qafile_subtask_data)
    
        # step 2: create and link subtask input/output
        selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
        selection_doc = get_default_json_object_for_schema(selection_template.schema)
        qafile_subtask_input = SubtaskInput.objects.create(subtask=qafile_subtask,
                                                           producer=observation_subtask.outputs.first(), # TODO: determine proper producer based on spec in task_relation_blueprint
                                                           selection_doc=selection_doc,
                                                           selection_template=selection_template)
        qafile_subtask_output = SubtaskOutput.objects.create(subtask=qafile_subtask)
    
        # step 3: set state to DEFINED
        qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
        qafile_subtask.save()
    
        # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qa_file_subtask
        return qafile_subtask
    
    
    def create_qaplots_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
    
        qafile_subtasks = [st for st in task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value]
        if qafile_subtasks:
            qafile_subtask = qafile_subtasks[0] # TODO: decide what to do when there are multiple qafile subtasks?
            return create_qaplots_subtask_from_qafile_subtask(qafile_subtask)
        else:
            raise SubtaskCreationException('Cannot create QA plotting subtask for task id=%s because no predecessor QA file conversion subtask exists.' % (task_blueprint.pk, ))
    
    
    def create_qaplots_subtask_from_qafile_subtask(qafile_subtask: Subtask) -> Subtask:
        ''' Create a subtask to create inspection plots from the QA h5 file.
        This method implements "Instantiate subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
        # step 0: check pre-requisites
        check_prerequities_for_subtask_creation(qafile_subtask.task_blueprint)
    
        if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
            raise ValueError("Cannot create %s subtask for subtask id=%d type=%s because it is not an %s" % (
                SubtaskType.Choices.QA_PLOTS.value, qafile_subtask.pk,
                qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
    
        obs_task_spec = get_observation_task_specification_with_check_for_calibrator(qafile_subtask)
        obs_task_qaplots_spec = obs_task_spec.get("QA", {}).get("plots", {})
    
        if not obs_task_qaplots_spec.get("enabled", False):
            logger.debug("Skipping creation of qaplots_subtask because QA.plots is not enabled")
            return None
    
        # step 1: create subtask in defining state, with filled-in subtask_template
        qaplots_subtask_template = SubtaskTemplate.objects.get(name="QA plots")
        qaplots_subtask_spec_doc = add_defaults_to_json_object_for_schema({}, qaplots_subtask_template.schema)
        qaplots_subtask_spec_doc['autocorrelation'] = obs_task_qaplots_spec.get("autocorrelation")
        qaplots_subtask_spec_doc['crosscorrelation'] = obs_task_qaplots_spec.get("crosscorrelation")
        validate_json_against_schema(qaplots_subtask_spec_doc, qaplots_subtask_template.schema)
    
        qaplots_subtask_data = { "start_time": None,
                                 "stop_time": None,
                                 "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                                 "task_blueprint": qafile_subtask.task_blueprint,
                                 "specifications_template": qaplots_subtask_template,
                                 "specifications_doc": qaplots_subtask_spec_doc,
                                 "priority": 1,
                                 "schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
                                 "cluster": qafile_subtask.cluster}
        qaplots_subtask = Subtask.objects.create(**qaplots_subtask_data)
    
        # step 2: create and link subtask input/output
        selection_template = TaskRelationSelectionTemplate.objects.get(name="all")
        selection_doc = get_default_json_object_for_schema(selection_template.schema)
        qaplots_subtask_input = SubtaskInput.objects.create(subtask=qaplots_subtask,
                                                            producer=qafile_subtask.outputs.first(),
                                                            selection_doc=selection_doc,
                                                            selection_template=selection_template)
        qaplots_subtask_output = SubtaskOutput.objects.create(subtask=qaplots_subtask)
    
        # step 3: set state to DEFINED
        qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
        qaplots_subtask.save()
    
        # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
        return qaplots_subtask
    
    
    def create_preprocessing_subtask_from_task_blueprint(task_blueprint: TaskBlueprint) -> Subtask:
        ''' Create a subtask to for the preprocessing pipeline.
        This method implements "Instantiate subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
        # step 0: check pre-requisites
        check_prerequities_for_subtask_creation(task_blueprint)
        # TODO: go more elegant lookup of predecessor observation task
        observation_predecessor_tasks = [t for t in task_blueprint.predecessors.all() if any(st for st in t.subtasks.all()
                                                                                             if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value)]
        if not observation_predecessor_tasks:
            raise SubtaskCreationException("Cannot create a subtask for task_blueprint id=%s because it is not connected "
                                           "to an observation predecessor (sub)task." % task_blueprint.pk)
    
        # step 1: create subtask in defining state, with filled-in subtask_template
        subtask_template = SubtaskTemplate.objects.get(name='pipeline control')
        default_subtask_specs = get_default_json_object_for_schema(subtask_template.schema)
        subtask_specs = _generate_subtask_specs_from_preprocessing_task_specs(task_blueprint.specifications_doc, default_subtask_specs)
        cluster_name = task_blueprint.specifications_doc.get("storage_cluster", "CEP4")
        subtask_data = { "start_time": None,
                         "stop_time": None,
                         "state": SubtaskState.objects.get(value=SubtaskState.Choices.DEFINING.value),
                         "task_blueprint": task_blueprint,
                         "specifications_template": subtask_template,
                         "specifications_doc": subtask_specs,
                         "priority": 1,
                         "schedule_method": ScheduleMethod.objects.get(value=ScheduleMethod.Choices.DYNAMIC.value),
                         "cluster": Cluster.objects.get(name=cluster_name) }
        subtask = Subtask.objects.create(**subtask_data)
    
        # step 2: create and link subtask input/output
        for task_relation_blueprint in task_blueprint.produced_by.all():
            producing_task_blueprint = task_relation_blueprint.producer
    
            # TODO: apply some better filtering. Now we're just connecting it to all predecessor observation subtasks
            predecessor_observation_subtasks = [st for st in producing_task_blueprint.subtasks.order_by('id').all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
            for predecessor_obs_subtask in predecessor_observation_subtasks:
                for predecessor_subtask_output in predecessor_obs_subtask.outputs.all():
                    subtask_input = SubtaskInput.objects.create(subtask=subtask,
                                                                producer=predecessor_subtask_output,
                                                                selection_doc=task_relation_blueprint.selection_doc,
                                                                selection_template=task_relation_blueprint.selection_template)
        subtask_output = SubtaskOutput.objects.create(subtask=subtask)
    
        # step 3: set state to DEFINED
        subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.DEFINED.value)
        subtask.save()
    
        # done, now return the subtask, and allow the system to wait for the predecessors to be finished before we schedule this qaplots_subtask
        return subtask
    
    
    # ==== various schedule* methods to schedule a Subtasks (if possible) ====
    
    def schedule_subtask(subtask: Subtask) -> Subtask:
        '''Generic scheduling method for subtasks. Calls the appropiate scheduling method based on the subtask's type.'''
        check_prerequities_for_scheduling(subtask)
    
        try:
            if subtask.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
                return schedule_pipeline_subtask(subtask)
    
            if subtask.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
                return schedule_observation_subtask(subtask)
    
            if subtask.specifications_template.type.value == SubtaskType.Choices.QA_FILES.value:
                return schedule_qafile_subtask(subtask)
    
            if subtask.specifications_template.type.value == SubtaskType.Choices.QA_PLOTS.value:
                return schedule_qaplots_subtask(subtask)
    
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d because there is no schedule-method known for this subtasktype=%s." %
                                             (subtask.pk, subtask.specifications_template.type.value))
        except Exception as e:
            try:
                # set the subtask to state 'ERROR'...
                subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.ERROR.value)
                subtask.save()
            except Exception as e2:
                logger.error(e2)
            finally:
                # ... and re-raise the original exception
                raise
    
    
    def check_prerequities_for_scheduling(subtask: Subtask) -> bool:
        if subtask.state.value != SubtaskState.Choices.DEFINED.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d because it is not DEFINED. Current state=%s" % (subtask.pk, subtask.state.value))
    
        for predecessor in subtask.predecessors.all():
            if predecessor.state.value != SubtaskState.Choices.FINISHED.value:
                raise SubtaskSchedulingException("Cannot schedule subtask id=%d because its predecessor id=%s in not FINISHED but state=%s"
                                                 % (subtask.pk, predecessor.pk, predecessor.state.value))
    
        # check if settings allow scheduling observations
        setting = Setting.objects.get(name='allow_scheduling_observations')
        if not setting.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d because setting %s=%s does not allow that." %
                                             (subtask.pk, setting.name, setting.value))
    
        return True
    
    def _assign_resources(subtask: Subtask):
        if subtask.state.value != SubtaskState.Choices.SCHEDULING.value:
            raise SubtaskSchedulingException("Cannot assign resources for subtask id=%d because it is not in SCHEDULING state. "
                                             "Current state=%s" % (subtask.pk, subtask.state.value))
    
        def create_ra_specification(_subtask):
            parset_dict = convert_to_parset_dict(_subtask)
            return { 'tmss_id': _subtask.id,
                     'task_type': _subtask.specifications_template.type.value.lower(),
                     'task_subtype': parset_dict.get("Observation.processSubtype","").lower(),
                     'status': 'prescheduled',
                     'starttime': _subtask.start_time,
                     'endtime': _subtask.stop_time,
                     'cluster': _subtask.cluster.name,
                     'station_requirements': [],
                     'specification': parset_dict }
    
        ra_spec = create_ra_specification(subtask)
        ra_spec['predecessors'] = []
        for pred in subtask.predecessors.all():
            try:
                ra_spec['predecessors'].append(create_ra_specification(pred))
            except:
                pass
    
        with RARPC.create() as rarpc:
            assigned = rarpc.do_assignment(ra_spec)
    
        if not assigned:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d because the required resources are not (fully) available." % (subtask.pk, ))
    
    
    def schedule_qafile_subtask(qafile_subtask: Subtask):
        ''' Schedule the given qafile_subtask (which converts the observation output to a QA h5 file)
        This method should typically be called upon the event of the observation_subtask being finished.
        This method implements "Scheduling subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
    
        # step 0: check pre-requisites
        check_prerequities_for_scheduling(qafile_subtask)
    
        if qafile_subtask.specifications_template.type.value != SubtaskType.Choices.QA_FILES.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qafile_subtask.pk,
                                                                                                              qafile_subtask.specifications_template.type, SubtaskType.Choices.QA_FILES.value))
    
        if len(qafile_subtask.inputs.all()) != 1:
            raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qafile_subtask.id, len(qafile_subtask.inputs)))
    
        # step 1: set state to SCHEDULING
        qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
        qafile_subtask.save()
    
        # step 2: link input dataproducts
        qa_input = qafile_subtask.inputs.first()
        qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
    
        # step 3: resource assigner
        # is a no-op for QA
    
        # step 4: create output dataproducts, and link these to the output
        # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
        if qafile_subtask.outputs.first():
            qafile_subtask_dataproduct = Dataproduct.objects.create(filename="L%s_QA.h5" % (qa_input.producer.subtask_id, ),
                                                                    directory="/data/qa/qa_files",
                                                                    dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_HDF5.value),
                                                                    datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value),   # todo: is this correct?
                                                                    producer=qafile_subtask.outputs.first(),
                                                                    specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema),
                                                                    specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"),
                                                                    feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema),
                                                                    feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"),
                                                                    SAP=None  # todo: do we need to point to a SAP here?
                                                                    )
    
        # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
        qafile_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
        qafile_subtask.save()
    
        return qafile_subtask
    
    
    def schedule_qaplots_subtask(qaplots_subtask: Subtask):
        ''' Schedule the given qaplots_subtask (which creates inspection plots from a QA h5 file)
        This method should typically be called upon the event of the qafile_subtask being finished.
        This method implements "Scheduling subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
    
        # step 0: check pre-requisites
        check_prerequities_for_scheduling(qaplots_subtask)
    
        if qaplots_subtask.specifications_template.type.value != SubtaskType.Choices.QA_PLOTS.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (qaplots_subtask.pk,
                                                                                                              qaplots_subtask.specifications_template.type,
                                                                                                              SubtaskType.Choices.QA_PLOTS.value))
    
        if len(qaplots_subtask.inputs.all()) != 1:
            raise SubtaskSchedulingException("QA subtask id=%s should have 1 input, but it has %s" % (qaplots_subtask.id, len(qaplots_subtask.inputs)))
    
        # step 1: set state to SCHEDULING
        qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
        qaplots_subtask.save()
    
        # step 2: link input dataproducts
        # this should typically be a single input with a single dataproduct (the qa h5 file)
        qa_input = qaplots_subtask.inputs.first()
        qa_input.dataproducts.set(qa_input.producer.dataproducts.all())
    
        # step 3: resource assigner
        # is a no-op for QA
    
        # step 4: create output dataproducts, and link these to the output
        # TODO: Should the output and/or dataproduct be determined by the specification in task_relation_blueprint?
        qafile_subtask = qaplots_subtask.predecessors.first()
        obs_subtask = qafile_subtask.predecessors.first()
        qaplots_subtask_dataproduct = Dataproduct.objects.create(directory="/data/qa/plots/L%s" % (obs_subtask.id, ),
                                                                 dataformat=Dataformat.objects.get(value=Dataformat.Choices.QA_PLOTS.value),
                                                                 datatype=Datatype.objects.get(value=Datatype.Choices.QUALITY.value),   # todo: is this correct?
                                                                 producer=qaplots_subtask.outputs.first(),
                                                                 specifications_doc=get_default_json_object_for_schema(DataproductSpecificationsTemplate.objects.get(name="empty").schema),
                                                                 specifications_template=DataproductSpecificationsTemplate.objects.get(name="empty"),
                                                                 feedback_doc=get_default_json_object_for_schema(DataproductFeedbackTemplate.objects.get(name="empty").schema),
                                                                 feedback_template=DataproductFeedbackTemplate.objects.get(name="empty"),
                                                                 SAP=None  # todo: do we need to point to a SAP here?
                                                                 )
    
        # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
        qaplots_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
        qaplots_subtask.save()
    
        return qaplots_subtask
    
    # todo: this can probably go when we switch to the new start time calculation in the model properties (which is based on this logic)
    def get_previous_related_task_blueprint_with_time_offset(task_blueprint):
        """
        Retrieve the the previous related blueprint task object (if any)
        if nothing found return None, 0.
        :param task_blueprint:
        :return: previous_related_task_blueprint,
                 time_offset (in seconds)
        """
        logger.info("get_previous_related_task_blueprint_with_time_offset %s (id=%s)", task_blueprint.name, task_blueprint.pk)
        previous_related_task_blueprint = None
        time_offset = 0
    
        scheduling_relations = list(task_blueprint.first_to_connect.all()) + list(task_blueprint.second_to_connect.all())
        for scheduling_relation in scheduling_relations:
            if scheduling_relation.first.id == task_blueprint.id and scheduling_relation.placement.value == "after":
                previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.second.id)
                time_offset = scheduling_relation.time_offset
    
            if scheduling_relation.second.id == task_blueprint.id and scheduling_relation.placement.value == "before":
                previous_related_task_blueprint = TaskBlueprint.objects.get(id=scheduling_relation.first.id)
                time_offset = scheduling_relation.time_offset
    
        return previous_related_task_blueprint, time_offset
    
    # todo: maybe this can now be replaced by subtask.relative_start_time
    def calculate_start_time(observation_subtask: Subtask):
        """
        Calculate the start time of an observation subtask. It should calculate the starttime in case of 'C-T-C train'
        The start time of an observation depends on the start_time+duration and offset time of the previous observation
        and so its scheduling relations should be known.
        If there is no previous observation the 'default' start time is in two minutes from now
        For demo purposes, will be changed into dynamic scheduled in the future
        Note that the method is not robust now when previous start time is unknown. Also parallel observations are
        not supported yet
        :param observation_subtask:
        :return: start_time (utc time)
        """
        previous_related_task_blueprint, time_offset = get_previous_related_task_blueprint_with_time_offset(observation_subtask.task_blueprint)
        if previous_related_task_blueprint is None:
            # This is the first observation so take start time 2 minutes from now
            now = datetime.utcnow()
            next_start_time = now + timedelta(minutes=+2, seconds=-now.second, microseconds=-now.microsecond)
        else:
            # Get the duration of last/previous observation
            duration_in_sec = previous_related_task_blueprint.specifications_doc["duration"]
            logger.info("Duration of previous observation '%s' (id=%s) is %d seconds",
                        previous_related_task_blueprint.pk, previous_related_task_blueprint.pk, duration_in_sec)
            # Get the previous observation subtask, should actually be one
            lst_previous_subtasks_obs = [st for st in previous_related_task_blueprint.subtasks.all() if st.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value]
            previous_subtask_obs = lst_previous_subtasks_obs[0]
            logger.info("The previous observation subtask is id=%s", previous_subtask_obs.pk)
            if previous_subtask_obs.start_time is None:
                logger.info("Oeps the previous start time is unknown so I can not calculate it")
            next_start_time = previous_subtask_obs.start_time + timedelta(seconds=duration_in_sec+time_offset)
        return next_start_time
    
    
    def schedule_observation_subtask(observation_subtask: Subtask):
        ''' Schedule the given observation_subtask
        For first observations in a 'train' of subtasks this method is typically called by hand, or by the short-term-scheduler.
        For subsequent observation subtasks this method is typically called by the subtask_scheduling_service upon the predecessor finished event.
        This method implements "Scheduling subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
        # step 0: check pre-requisites
        check_prerequities_for_scheduling(observation_subtask)
    
        if observation_subtask.specifications_template.type.value != SubtaskType.Choices.OBSERVATION.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (observation_subtask.pk,
                                                                                                              observation_subtask.specifications_template.type,
                                                                                                              SubtaskType.Choices.OBSERVATION.value))
    
        # step 1: set state to SCHEDULING
        observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
        observation_subtask.save()
    
        # step 1a: check start/stop times
        if observation_subtask.start_time is None:
            next_start_time = calculate_start_time(observation_subtask)
            logger.info("observation id=%s has no starttime. assigned default: %s", observation_subtask.pk, formatDatetime(next_start_time))
            observation_subtask.start_time = next_start_time
    
        if observation_subtask.stop_time is None:
            duration_in_sec = observation_subtask.task_blueprint.specifications_doc["duration"]
            logger.info("Duration of observation id=%s is %d seconds", observation_subtask.pk, duration_in_sec)
            stop_time = observation_subtask.start_time + timedelta(seconds=duration_in_sec)
            logger.info("observation id=%s has no stop_time. assigned default: %s", observation_subtask.pk, formatDatetime(stop_time))
            observation_subtask.stop_time = stop_time
    
        # step 2: define input dataproducts
        # TODO: are there any observations that take input dataproducts?
    
        # step 3: create output dataproducts, and link these to the output
        specifications_doc = observation_subtask.specifications_doc
        dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="SAP")  # todo: should this be derived from the task relation specification template?
        dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
        subtask_output = observation_subtask.outputs.first() # TODO: make proper selection, not default first()
        directory = "/data/%s/%s/L%s/uv" % ("projects" if isProductionEnvironment() else "test-projects",
                                            observation_subtask.task_blueprint.scheduling_unit_blueprint.draft.scheduling_set.project.name,
                                            observation_subtask.id)
        for sap_nr, pointing in enumerate(specifications_doc['stations']['digital_pointings']):
            sap_name = "%s_%s" % (observation_subtask.id, pointing['name'])
            sap = SAP.objects.create(name=sap_name,
                                     specifications_doc={ "name": sap_name,
                                                          "identifiers": {},  # todo: TMSS-324
                                                          "pointing": pointing['pointing'],
                                                          "time": {"start_time": observation_subtask.start_time.isoformat(),
                                                                   "duration": (observation_subtask.stop_time - observation_subtask.start_time).total_seconds()},
                                                          "antennas": {
                                                          "set": specifications_doc['stations']['antenna_set'],
                                                          "stations": specifications_doc['stations']['station_list'],
                                                          "fields": []   # todo: do we really have to calculate an object like {"station": "CS001", "field": "HBA", "type": "HBA"} here again for all involved stations? Isn't that info derived later anyway?
                                                          }
                                                        },
                                     specifications_template=SAPTemplate.objects.get(name="SAP"))
            for sb_nr in pointing['subbands']:
                Dataproduct.objects.create(filename="L%d_SAP%03d_SB%03d_uv.MS" % (observation_subtask.id, sap_nr, sb_nr),
                                           directory=directory,
                                           dataformat=Dataformat.objects.get(value="MeasurementSet"),
                                           datatype=Datatype.objects.get(value="visibilities"),  # todo: is this correct?
                                           producer=subtask_output,
                                           specifications_doc={"sap": [sap_nr]},  # todo: set correct value. This will be provided by the RA somehow
                                           specifications_template=dataproduct_specifications_template,
                                           feedback_doc=get_default_json_object_for_schema(dataproduct_feedback_template.schema),
                                           feedback_template=dataproduct_feedback_template,
                                           size=0 if sb_nr%10==0 else 1024*1024*1024*sb_nr,
                                           expected_size=1024*1024*1024*sb_nr,
                                           SAP=sap)
    
        # step 4: resource assigner (if possible)
        _assign_resources(observation_subtask)
    
        # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
        observation_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
        observation_subtask.save()
    
        return observation_subtask
    
    
    def schedule_pipeline_subtask(pipeline_subtask: Subtask):
        ''' Schedule the given pipeline_subtask
        This method should typically be called upon the event of an predecessor (observation) subtask being finished.
        This method implements "Scheduling subtasks" step from the "Specification Flow"
        https://support.astron.nl/confluence/display/TMSS/Specification+Flow
        '''
        # step 0: check pre-requisites
        check_prerequities_for_scheduling(pipeline_subtask)
    
        if pipeline_subtask.specifications_template.type.value != SubtaskType.Choices.PIPELINE.value:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s but type should be %s" % (pipeline_subtask.pk,
                                                                                                              pipeline_subtask.specifications_template.type,
                                                                                                              SubtaskType.Choices.PIPELINE.value))
    
        # step 1: set state to SCHEDULING
        pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULING.value)
        pipeline_subtask.save()
    
        # step 1a: check start/stop times
        # not very relevant for tmss/dynamic scheduling, but the resource assigner demands it.
        if pipeline_subtask.start_time is None:
            now = datetime.utcnow()
            logger.info("pipeline id=%s has no starttime. assigned default: %s", pipeline_subtask.pk, formatDatetime(now))
            pipeline_subtask.start_time = now
    
        if pipeline_subtask.stop_time is None:
            stop_time = pipeline_subtask.start_time  + timedelta(hours=+1)
            logger.info("pipeline id=%s has no stop_time. assigned default: %s", pipeline_subtask.pk, formatDatetime(stop_time))
            pipeline_subtask.stop_time = stop_time
    
        # step 2: link input dataproducts
        if pipeline_subtask.inputs.count() == 0:
            raise SubtaskSchedulingException("Cannot schedule subtask id=%d type=%s because it has no input(s)" % (pipeline_subtask.pk,
                                                                                                                   pipeline_subtask.specifications_template.type))
    
        # TODO: use existing and reasonable selection and specification templates for output when we have those, for now, use "empty"
        dataproduct_specifications_template = DataproductSpecificationsTemplate.objects.get(name="empty")
        dataproduct_feedback_template = DataproductFeedbackTemplate.objects.get(name="empty")
    
        # iterate over all inputs
        for pipeline_subtask_input in pipeline_subtask.inputs.all():
    
            # select and set input dataproducts that meet the filter defined in selection_doc
            dataproducts = [dataproduct for dataproduct in pipeline_subtask_input.producer.dataproducts.all()
                            if specifications_doc_meets_selection_doc(dataproduct.specifications_doc, pipeline_subtask_input.selection_doc)]
            pipeline_subtask_input.dataproducts.set(dataproducts)
    
            # select subtask output the new dataproducts will be linked to
            pipeline_subtask_output = pipeline_subtask.outputs.first()  # TODO: if we have several, how to map input to output?
    
            # step 3: create output dataproducts, and link these to the output
            # TODO: create them from the spec, instead of "copying" the input filename
            output_dps = []
            for input_dp in pipeline_subtask_input.dataproducts.all():
                if '_' in input_dp.filename and input_dp.filename.startswith('L'):
                    filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename.split('_', 1)[1])
                else:
                    filename = "L%s_%s" % (pipeline_subtask.pk, input_dp.filename)
    
                output_dp = Dataproduct.objects.create(filename=filename,
                                                       directory=input_dp.directory.replace(str(pipeline_subtask_input.producer.subtask.pk), str(pipeline_subtask.pk)),
                                                       dataformat=Dataformat.objects.get(value="MeasurementSet"),
                                                       datatype=Datatype.objects.get(value="visibilities"),  # todo: is this correct?
                                                       producer=pipeline_subtask_output,
                                                       specifications_doc={},
                                                       specifications_template=dataproduct_specifications_template,
                                                       feedback_doc="",
                                                       feedback_template=dataproduct_feedback_template,
                                                       SAP=input_dp.SAP)
                DataproductTransform.objects.create(input=input_dp, output=output_dp, identity=False)
                output_dps.append(output_dp)
    
            pipeline_subtask_output.dataproducts.set(output_dps)
    
            # step 4: resource assigner (if possible)
            _assign_resources(pipeline_subtask)
    
            # step 5: set state to SCHEDULED (resulting in the qaservice to pick this subtask up and run it)
            pipeline_subtask.state = SubtaskState.objects.get(value=SubtaskState.Choices.SCHEDULED.value)
            pipeline_subtask.save()
    
        return pipeline_subtask
    
    # === Misc ===
    
    def create_and_schedule_subtasks_from_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
        '''Convenience method: Create the subtasks form the task_blueprint, and schedule the ones that are not dependend on predecessors'''
        create_subtasks_from_task_blueprint(task_blueprint)
        return schedule_independent_subtasks_in_task_blueprint(task_blueprint)
    
    def schedule_independent_subtasks_in_task_blueprint(task_blueprint: TaskBlueprint) -> [Subtask]:
        '''Convenience method: Schedule the subtasks in the task_blueprint that are not dependend on predecessors'''
        subtasks = list(task_blueprint.subtasks.all())
    
        for subtask in subtasks:
            if len(subtask.predecessors.all()) == len(subtask.predecessors.filter(state__value='finished').all()):
                schedule_subtask(subtask)
        return subtasks
    
    
    def _generate_subtask_specs_from_preprocessing_task_specs(preprocessing_task_specs, default_subtask_specs):
        # preprocessing task default spec: {
        #   "storagemanager": "dysco",
        #   "flag": {"outerchannels": true, "autocorrelations": true, "rfi_strategy": "auto"},
        #   "demix": {"frequency_steps": 64, "time_steps": 10, "ignore_target": false, "sources": {}},
        #   "average": {"frequency_steps": 4, "time_steps": 1}}
        # pipelinecontrol subtask default spec: {
        #   "storagemanager": "dysco",
        #   "demixer": {"baselines": "CS*,RS*&", "frequency_steps": 4, "time_steps": 1, "demix_frequency_steps": 4,
        #               "demix_time_steps": 1, "ignore_target": false, "demix_always": [], "demix_if_needed": []},
        #   "aoflagger": {"strategy": "HBAdefault"},
        #    "preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"},
        #    "preflagger1": {"corrtype": "auto"}}
    
        # todo: check that this is actually how these need to be translated
        # todo: especially check when defaults are NOT supposed to be set because the task implies to not include them
    
        # todo: translate task "sources": {} - I guess this is demix_always/demix_if_needed?
        # todo: set subtask demixer properties "baselines": "CS*,RS*&", "demix_always": [], "demix_if_needed": []
    
        subtask_specs = {}
        subtask_specs['storagemanager'] = preprocessing_task_specs.get('storagemanager',
                                                                       default_subtask_specs.get('storagemanager'))
    
        # todo: we depend on valid json here with knowledge about required properties. To generalize, we need to expect things to not be there.
        if 'demix' or 'average' in preprocessing_task_specs:
            # todo: should we exclude defaults in subtask.demixer if only one of these is defined on the task?
            subtask_specs['demixer'] = default_subtask_specs['demixer']
            if 'demix' in preprocessing_task_specs:
                subtask_specs['demixer'].update({
                    "demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'],
                    "demix_time_steps": preprocessing_task_specs['demix']['time_steps'],
                    "ignore_target": preprocessing_task_specs['demix']['ignore_target']
                }),
            if 'average' in preprocessing_task_specs:
                subtask_specs['demixer'].update({
                    "demix_frequency_steps": preprocessing_task_specs['demix']['frequency_steps'],
                    "frequency_steps": preprocessing_task_specs['average']['frequency_steps'],
                    "demix_time_steps": preprocessing_task_specs['demix']['time_steps'],
                    "time_steps": preprocessing_task_specs['average']['time_steps'],
                    "ignore_target": preprocessing_task_specs['demix']['ignore_target']
                }),
        if 'flag' in preprocessing_task_specs:
            if preprocessing_task_specs["flag"]["rfi_strategy"] != 'none':
                subtask_specs.update({"aoflagger": {"strategy": preprocessing_task_specs["flag"]["rfi_strategy"]}})
    
                if preprocessing_task_specs["flag"]["rfi_strategy"] == 'auto':
                    # todo: handle 'auto' properly: we need to determine input dataproduct type and set LBA or HBA accordingly
                    #   either here or allow 'auto' in subtask json and translate it when we connect obs to pipe subtask
                    default_strategy = default_subtask_specs['aoflagger']['strategy']
                    subtask_specs.update({"aoflagger": {"strategy": default_strategy}})
                    logger.warning('Translating aoflagger "auto" strategy to "%s" without knowing whether that makes sense!' % default_strategy)
    
            if preprocessing_task_specs["flag"]["outerchannels"]:
                subtask_specs.update({"preflagger0": {"channels": "0..nchan/32-1,31*nchan/32..nchan-1"}})
    
            if preprocessing_task_specs["flag"]["autocorrelations"]:
                subtask_specs.update({"preflagger1": {"corrtype": "auto"}})
    
        return subtask_specs
    
    
    def specifications_doc_meets_selection_doc(specifications_doc, selection_doc):
        """
        Filter specs by selection. This requires the specification_doc to...
        A) ...contain ALL KEYS that we select / filter for
        B) ...contain NO ADDITIONAL VALUES that are not selected / filtered for
        :param specifications_doc: dataproduct specification as dict
        :param selection_doc: selection filter as dict
        :return: True when the input specifications_doc meets a filter described in selection_doc, False otherwise
        """
        meets_criteria = True
        for k, v in selection_doc.items():
            if k not in specifications_doc.keys():
                meets_criteria = False
            else:
                spec = specifications_doc[k]
                if isinstance(spec, Iterable) and isinstance(v, Iterable):
                    for spec_v in spec:
                        if spec_v not in v:
                            meets_criteria = False
                else:
                    if spec != v:
                        meets_criteria = False
    
        logger.debug("specs %s matches selection %s: %s" % (specifications_doc, selection_doc, meets_criteria))
        return meets_criteria
    
    
    def get_observation_task_specification_with_check_for_calibrator(subtask):
        """
        Retrieve the observation task blueprint specifications_doc from the given subtask object
        If the Task is a calibrator then the related Target Observation specification should be returned
        :param: subtask object
        :return: task_spec: the specifications_doc of the blue print task which is allways a target observation
        """
        if 'calibrator' in subtask.task_blueprint.specifications_template.name.lower():
            # Calibrator requires related Target Task Observation for some specifications
            target_task_blueprint = get_related_target_observation_task_blueprint(subtask.task_blueprint)
            if target_task_blueprint is None:
                raise SubtaskCreationException("Cannot retrieve specifications for subtask id=%d because no related target observation is found " % subtask.pk)
            task_spec = target_task_blueprint.specifications_doc
            logger.info("Using specifications for calibrator observation (id=%s) from target observation task_blueprint id=%s",
                        subtask.task_blueprint.id, target_task_blueprint.id)
        else:
            task_spec = subtask.task_blueprint.specifications_doc
        return task_spec