Skip to content
Snippets Groups Projects
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
tasks.py 76.79 KiB
from lofar.sas.tmss.tmss.exceptions import *
from lofar.sas.tmss.tmss.tmssapp import models
from lofar.sas.tmss.tmss.tmssapp.subtasks import unschedule_subtasks_in_task_blueprint, cancel_subtask, mark_independent_subtasks_in_task_blueprint_as_unschedulable, mark_subtasks_in_task_blueprint_as_schedulable, set_task_blueprint_start_times, restart_on_hold_subtask, convert_task_station_groups_specification_to_station_list_without_used_and_or_reserved_stations
from lofar.sas.tmss.tmss.tmssapp.models.common import TemplateState
from lofar.sas.tmss.tmss.tmssapp.models.specification import TaskBlueprint, SchedulingUnitBlueprint, SchedulingUnitDraft, IOType, TaskTemplate, TaskType, TaskRelationSelectionTemplate
from lofar.sas.tmss.tmss.tmssapp.subtasks import create_or_update_subtasks_from_task_blueprint, schedule_independent_subtasks_in_task_blueprint, update_subtasks_start_times_for_scheduling_unit, get_gaps_to_previous_and_next_observations
from lofar.common.datetimeutils import round_to_minute_precision
from functools import cmp_to_key
from collections.abc import Iterable
import os
from copy import deepcopy
from typing import Tuple
import logging
from datetime import datetime, timedelta
from django.db.utils import IntegrityError
from django.db.models.deletion import ProtectedError
from django.db import transaction
from django.db.models import Q, Max
from lofar.common.util import dict_with_overrides, subdict_of_pointer_items
from lofar.common.toposort import Graph, toposorted
from dateutil import parser
from functools import lru_cache

logger = logging.getLogger(__name__)


def _remove_specs_to_fully_override(base_specs_element, base_override_element, items_to_fully_override):
    # remove leaf elements at given paths from specs if present in overrides
    # :param base_specs_element: specs element as dict
    # :param base_override_element: overrides element as dict
    # :param items_to_fully_override: '__'-separated paths as list of str
    for item in items_to_fully_override:
        override_value = base_override_element
        specs_value = base_specs_element
        elems = item.split('__')
        try:
            while (elems):
                key = elems.pop(0)
                if len(elems) == 0:
                    # remove the item from specs so it gets later replaced by override
                    if key in override_value:
                        specs_value.pop(key)
                else:
                    # traverse further
                    specs_value = specs_value[key]
                    override_value = override_value[key]
        except:
            # item not in specs or overrides
            continue

def create_scheduling_unit_draft_from_observing_strategy_template(strategy_template: models.SchedulingUnitObservingStrategyTemplate, scheduling_set: models.SchedulingSet, name: str=None, description: str=None, rank: float=None, priority_queue: models.PriorityQueueType=None, specifications_doc_overrides: dict=None) -> models.SchedulingUnitDraft:
    '''create a new SchedulingUnitDraft from the given strategy_template with 'parent' scheduling_set'''
    if strategy_template.state.value == TemplateState.Choices.OBSOLETE.value:
        raise ObsoleteTemplateException("Template id=%s name='%s' version=%s is obsolete" % (strategy_template.id,
                                                                                             strategy_template.name,
                                                                                             strategy_template.version))
    if priority_queue is None:
        priority_queue = models.PriorityQueueType.objects.get(value=models.PriorityQueueType.Choices.A.value)
    if rank is None:
        rank = models.SchedulingUnitRank.DEFAULT.value

    specifications_doc = strategy_template.template_doc_complete_with_defaults

    # apply overrides on template specifications_doc if given
    if specifications_doc_overrides:
        # only use allowed overrides
        allowed_parameter_pointers = sum([p['refs'] for p in strategy_template.template['parameters']], [])
        allowed_parameter_pointers.extend(['#/$schema', '#/$id']) # and always accept standard json document keywords
        allowed_overrides = subdict_of_pointer_items(specifications_doc_overrides, allowed_parameter_pointers)

        if allowed_overrides != specifications_doc_overrides:
            # create a nice 'diff' view between allowed and submitted.
            from json import dumps
            from difflib import Differ
            diff = '\n'.join(Differ().compare(dumps(allowed_overrides, indent=4, sort_keys=True).split('\n'),
                                              dumps(specifications_doc_overrides, indent=4, sort_keys=True).split('\n')))

            raise InvalidSpecificationException("Specified faulty overrides. These are the allowed overridable parameters: \n%s\n\ndiff between allowed and submitted. Hint, seek added lines with the '+' symbol:\n%s" % (
                                                '\n'.join([str(p) for p in allowed_parameter_pointers]), diff))

        # remove items from specs that should not be updated but completely replaced by the override.
        items_to_fully_override_in_all_tasks = ["specifications_doc__station_configuration__station_groups"]
        if 'tasks' in allowed_overrides:
            for task in allowed_overrides['tasks'].keys():
                _remove_specs_to_fully_override(specifications_doc['tasks'][task], allowed_overrides['tasks'][task], items_to_fully_override_in_all_tasks)

        # do the actual override of the remaining allowed items
        specifications_doc = dict_with_overrides(specifications_doc, allowed_overrides)

    if 'scheduling_constraints_template' in specifications_doc:
        scheduling_constraints_template_name_version = specifications_doc.pop('scheduling_constraints_template')
        scheduling_constraints_template = models.SchedulingConstraintsTemplate.get_version_or_latest(name=scheduling_constraints_template_name_version['name'],
                                                                                                     version=scheduling_constraints_template_name_version.get('version'))
    else:
        # use the latest scheduling_constraints_template (can be None, which is acceptable)
        scheduling_constraints_template = models.SchedulingConstraintsTemplate.objects.all().order_by('created_at').last()

    # extract the scheduling_constraints_doc from the template_doc,
    # so we can feed in seperately into the SchedulingUnitDraft.
    scheduling_constraints_doc = specifications_doc.pop('scheduling_constraints_doc', {})

    with transaction.atomic():
        scheduling_unit_draft = models.SchedulingUnitDraft.objects.create(name=name or strategy_template.name,
                                                                          description=description or strategy_template.description,
                                                                          scheduling_set=scheduling_set,
                                                                          specifications_template=strategy_template.scheduling_unit_template,
                                                                          observation_strategy_template=strategy_template,
                                                                          scheduling_constraints_doc=scheduling_constraints_doc,
                                                                          scheduling_constraints_template=scheduling_constraints_template,
                                                                          rank=rank,
                                                                          priority_queue=priority_queue)

        # instantiate the tasks graph
        scheduling_unit_draft = update_task_graph_from_specifications_doc(scheduling_unit_draft, specifications_doc)

        return scheduling_unit_draft


def create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft: models.SchedulingUnitDraft) -> models.SchedulingUnitBlueprint:
    """
    Create a SchedulingUnitBlueprint from the SchedulingUnitDraft
    :raises Exception if instantiate fails.
    """
    try:
        # make sure the draft's speficifications_docs validate before we blueprint it
        scheduling_unit_draft.specifications_template.validate_document(scheduling_unit_draft.specifications_doc, raise_on_obsolete=True)
    except ValidationException as e:
        logger.error("cannot create blueprint for scheduling_unit_draft id=%s name='%s': %s", scheduling_unit_draft.pk, scheduling_unit_draft.name, e)
        raise

    logger.debug("create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft.id=%s name='%s')", scheduling_unit_draft.pk, scheduling_unit_draft.name)

    scheduling_unit_blueprint = SchedulingUnitBlueprint.objects.create(
        name=scheduling_unit_draft.name,
        description=scheduling_unit_draft.description,
        draft=scheduling_unit_draft,
        specifications_template=scheduling_unit_draft.specifications_template,
        ingest_permission_required=scheduling_unit_draft.ingest_permission_required,
        piggyback_allowed_tbb=scheduling_unit_draft.piggyback_allowed_tbb,
        piggyback_allowed_aartfaac=scheduling_unit_draft.piggyback_allowed_aartfaac,
        rank=scheduling_unit_draft.rank,
        interrupts_telescope=scheduling_unit_draft.interrupts_telescope)

    logger.info("create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft.id=%s name='%s') created scheduling_unit_blueprint id=%s name='%s' project='%s'",
                scheduling_unit_draft.pk, scheduling_unit_draft.name, scheduling_unit_blueprint.pk, scheduling_unit_blueprint.name, scheduling_unit_blueprint.project.name)
    return scheduling_unit_blueprint


def copy_scheduling_unit_draft(scheduling_unit_draft: models.SchedulingUnitDraft, scheduling_set_dst: models.SchedulingSet) -> models.SchedulingUnitDraft:
    """
    Copy a SchedulingUnitDraft
    :raises Exception if instantiate fails.
    """
    with transaction.atomic():
        scheduling_unit_draft_copy = models.SchedulingUnitDraft.objects.create(name="%s (Copy)" % (scheduling_unit_draft.name,),
                                                                               description="%s (Copy from draft '%s' id=%s)" % (scheduling_unit_draft.description or "<no description>", scheduling_unit_draft.name, scheduling_unit_draft.id),
                                                                               scheduling_set=scheduling_set_dst,
                                                                               specifications_template=scheduling_unit_draft.specifications_template,
                                                                               observation_strategy_template=scheduling_unit_draft.observation_strategy_template,
                                                                               scheduling_constraints_doc=scheduling_unit_draft.scheduling_constraints_doc,
                                                                               scheduling_constraints_template=scheduling_unit_draft.scheduling_constraints_template,
                                                                               ingest_permission_required=scheduling_unit_draft.ingest_permission_required,
                                                                               piggyback_allowed_tbb=scheduling_unit_draft.piggyback_allowed_tbb,
                                                                               piggyback_allowed_aartfaac=scheduling_unit_draft.piggyback_allowed_aartfaac,
                                                                               rank=scheduling_unit_draft.rank,
                                                                               priority_queue=scheduling_unit_draft.priority_queue,
                                                                               interrupts_telescope=False # we explicitly set this flag to False while copying, because only real submitted triggers are allowed to set this flag.
                                                                               )

        logger.info("copy_scheduling_unit_draft(scheduling_unit_draft.id=%s) created copy_scheduling_unit_draft id=%s",
                    scheduling_unit_draft.pk, scheduling_unit_draft_copy.pk)

        # instantiate a copy of the task graph
        scheduling_unit_draft_copy = update_task_graph_from_specifications_doc(scheduling_unit_draft_copy, scheduling_unit_draft.specifications_doc)
        return scheduling_unit_draft_copy


def create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint: models.SchedulingUnitBlueprint) -> models.SchedulingUnitDraft:
    """
    Create a SchedulingUnitDraft from the SchedulingUnitBlueprint
     :raises Exception if instantiate fails.
    """
    logger.debug("create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint.id=%s)", scheduling_unit_blueprint.pk)

    with transaction.atomic():
        scheduling_unit_draft_copy = models.SchedulingUnitDraft.objects.create(name="%s (Copy from blueprint)" % (scheduling_unit_blueprint.name,),
                                                                               description="%s (Copy from blueprint '%s' id=%s)" % (scheduling_unit_blueprint.description or "<no description>", scheduling_unit_blueprint.name,scheduling_unit_blueprint.id),
                                                                               scheduling_set=scheduling_unit_blueprint.draft.scheduling_set,
                                                                               observation_strategy_template=scheduling_unit_blueprint.draft.observation_strategy_template,
                                                                               specifications_template=scheduling_unit_blueprint.specifications_template,
                                                                               scheduling_constraints_doc=scheduling_unit_blueprint.scheduling_constraints_doc,
                                                                               scheduling_constraints_template=scheduling_unit_blueprint.scheduling_constraints_template,
                                                                               ingest_permission_required=scheduling_unit_blueprint.ingest_permission_required,
                                                                               piggyback_allowed_tbb=scheduling_unit_blueprint.piggyback_allowed_tbb,
                                                                               piggyback_allowed_aartfaac=scheduling_unit_blueprint.piggyback_allowed_aartfaac,
                                                                               rank=scheduling_unit_blueprint.rank,
                                                                               priority_queue=scheduling_unit_blueprint.priority_queue,
                                                                               interrupts_telescope=False # we explicitly set this flag to False while copying, because only real submitted triggers are allowed to set this flag.
                                                                               )

        logger.info("create_scheduling_unit_draft_from_scheduling_unit_blueprint(scheduling_unit_blueprint.id=%s) created copy_scheduling_unit_draft id=%s",
                    scheduling_unit_blueprint.pk, scheduling_unit_draft_copy.pk)

        # instantiate a copy of the tasks graph
        scheduling_unit_draft_copy = update_task_graph_from_specifications_doc(scheduling_unit_draft_copy, scheduling_unit_blueprint.specifications_doc)
        return scheduling_unit_draft_copy


def copy_task_draft(task_draft: models.TaskDraft) -> models.TaskDraft:
    '''create a copy of the given task. Also copy all the task_relation(s) and/or the task_scheduling_relation(s)'''
    logger.debug("copy_task_draft(task_draft.id=%s) copying...", task_draft.pk)

    with transaction.atomic():
        task_draft_copy = models.TaskDraft.objects.create(name="%s (Copy)" % (task_draft.name,),
                                                          description="%s (Copy from task_draft id=%s)" % (task_draft.description, task_draft.id),
                                                          short_description=task_draft.short_description,
                                                          specifications_doc=task_draft.specifications_doc,
                                                          scheduling_unit_draft=task_draft.scheduling_unit_draft,
                                                          specifications_template=task_draft.specifications_template)

        for task_rel in models.TaskRelationDraft.objects.filter(Q(producer__id=task_draft.id) | Q(consumer__id=task_draft.id)).all():
            models.TaskRelationDraft.objects.create(producer=task_draft_copy if task_rel.producer.id==task_draft.id else task_rel.producer,
                                                    consumer=task_draft_copy if task_rel.consumer.id==task_draft.id else task_rel.consumer,
                                                    input_role=task_rel.input_role,
                                                    output_role=task_rel.output_role,
                                                    selection_doc=task_rel.selection_doc,
                                                    selection_template=task_rel.selection_template)

        for task_sched_rel in models.TaskSchedulingRelationDraft.objects.filter(Q(first__id=task_draft.id) | Q(second_id=task_draft.id)).all():
            models.TaskSchedulingRelationDraft.objects.create(first=task_draft_copy if task_sched_rel.first.id==task_draft.id else task_sched_rel.first,
                                                              second=task_draft_copy if task_sched_rel.second.id==task_draft.id else task_sched_rel.second,
                                                              placement=task_sched_rel.placement,
                                                              time_offset=task_sched_rel.time_offset)

        logger.info("copy_task_draft(task_draft.id=%s) created copy_task_draft id=%s", task_draft.pk, task_draft_copy.pk)
        return task_draft_copy


def copy_task_blueprint_to_task_draft(task_blueprint: models.TaskBlueprint) -> models.TaskDraft:
    '''create a (draft)copy of the given task_blueprint. Also copy the task_relation(s) and/or the task_scheduling_relation(s)'''
    logger.debug("copy_task_blueprint_to_task_draft(task_blueprint.id=%s) copying...", task_blueprint.pk)

    with transaction.atomic():
        task_draft_copy = models.TaskDraft.objects.create(name="%s (Copy)" % (task_blueprint.name,),
                                                          description="%s (Copy from task_blueprint id=%s)" % (task_blueprint.description, task_blueprint.id),
                                                          short_description=task_blueprint.short_description,
                                                          specifications_doc=task_blueprint.specifications_doc,
                                                          scheduling_unit_draft=task_blueprint.scheduling_unit_blueprint.draft,
                                                          specifications_template=task_blueprint.specifications_template)

        for task_rel in models.TaskRelationBlueprint.objects.filter(Q(producer__id=task_blueprint.id) | Q(consumer__id=task_blueprint.id)).all():
            models.TaskRelationDraft.objects.create(producer=task_draft_copy if task_rel.producer.id==task_blueprint.id else task_rel.producer.draft,
                                                    consumer=task_draft_copy if task_rel.consumer.id==task_blueprint.id else task_rel.consumer.draft,
                                                    input_role=task_rel.input_role,
                                                    output_role=task_rel.output_role,
                                                    selection_doc=task_rel.selection_doc,
                                                    selection_template=task_rel.selection_template)

        for task_sched_rel in models.TaskSchedulingRelationBlueprint.objects.filter(Q(first__id=task_blueprint.id) | Q(second_id=task_blueprint.id)).all():
            models.TaskSchedulingRelationDraft.objects.create(first=task_draft_copy if task_sched_rel.first.id==task_blueprint.id else task_sched_rel.draft.first,
                                                              second=task_draft_copy if task_sched_rel.second.id==task_blueprint.id else task_sched_rel.draft.second,
                                                              placement=task_sched_rel.placement,
                                                              time_offset=task_sched_rel.time_offset)

        logger.info("copy_task_blueprint_to_task_draft(task_blueprint.id=%s) created copy_task_draft id=%s", task_blueprint.pk, task_draft_copy.pk)
        return task_draft_copy


def copy_task_blueprint_via_task_draft_to_new_task_blueprint(task_blueprint: models.TaskBlueprint) -> models.TaskBlueprint:
    '''create a new (blueprint)copy of the given task_blueprint via a new (draft)copy. Also copies all the task_relation(s) and/or the task_scheduling_relation(s)'''
    logger.debug("copy_task_blueprint_via_task_draft_to_new_task_blueprint(task_blueprint.id=%s)", task_blueprint.pk)

    with transaction.atomic():
        task_draft_copy = copy_task_blueprint_to_task_draft(task_blueprint)
        task_blueprint_copy = create_task_blueprint_from_task_draft(task_draft_copy)
        return task_blueprint_copy


def update_task_graph_from_specifications_doc(scheduling_unit_draft: models.SchedulingUnitDraft, specifications_doc: dict) -> models.SchedulingUnitDraft:
    """
    Update the current task graph in this scheduling_unit_draft according to the given specifications_doc: remove tasks from the graph that are not in the doc, update tasks with the same name, create tasks that are in the doc but not in the graph yet.
    """
    logger.debug("update_task_graph_from_specifications_doc(scheduling_unit_draft.id=%s, name='%s') ...", scheduling_unit_draft.pk, scheduling_unit_draft.name)

    # make sure the given specifications_doc validates
    scheduling_unit_draft.specifications_template.validate_document(specifications_doc)

    with transaction.atomic():
        # remove existing task relations which are not specified (anymore).
        for task_rel in models.TaskRelationDraft.objects.filter(Q(producer__scheduling_unit_draft__id=scheduling_unit_draft.id) |
                                                                Q(consumer__scheduling_unit_draft__id=scheduling_unit_draft.id)).all():
            if len([spec_task_rel for spec_task_rel in specifications_doc.get("task_relations", []) if spec_task_rel['producer']==task_rel.producer.name and spec_task_rel['consumer']==task_rel.consumer.name])==0:
                try:
                    # existing task_rel is not specified (anymore). Remove it.
                    task_rel.delete()
                    logger.info("deleted obsolete task_relation_draft id=%s producer=%s,'%s' consumer=%s,'%s' from scheduling_unit_draft id=%s name='%s'",
                                task_rel.id, task_rel.producer.id, task_rel.producer.name, task_rel.consumer.id, task_rel.consumer.name, scheduling_unit_draft.id, scheduling_unit_draft.name)
                except ProtectedError:
                    # keep the relation, it's already used in a blueprint
                    logger.warning("could not delete obsolete task_relation_draft id=%s producer=%s,'%s' consumer=%s,'%s' from scheduling_unit_draft id=%s name='%s' because it's referenced by a blueprint",
                                   task_rel.id, task_rel.producer.id, task_rel.producer.name, task_rel.consumer.id, task_rel.consumer.name, scheduling_unit_draft.id, scheduling_unit_draft.name)


        # remove existing task scheduling relations which are not specified (anymore).
        for task_sched_rel in models.TaskSchedulingRelationDraft.objects.filter(Q(first__scheduling_unit_draft__id=scheduling_unit_draft.id) |
                                                                                Q(first__scheduling_unit_draft__id=scheduling_unit_draft.id)).all():
            if len([spec_task_sched_rel for spec_task_sched_rel in specifications_doc.get("task_scheduling_relations",[]) if spec_task_sched_rel['first']==task_sched_rel.first.name and spec_task_sched_rel['second']==task_sched_rel.second.name])==0:
                try:
                    # existing task_sched_rel is not specified (anymore). Remove it.
                    task_sched_rel.delete()
                    logger.info("deleted obsolete task_scheduling_relation_draft id=%s first=%s,'%s' second=%s,'%s' from scheduling_unit_draft id=%s name='%s'",
                                task_sched_rel.id, task_sched_rel.first.id, task_sched_rel.first.name, task_sched_rel.second.id, task_sched_rel.second.name, scheduling_unit_draft.id, scheduling_unit_draft.name)
                except ProtectedError:
                    # keep the relation, it's already used in a blueprint
                    logger.warning("could not delete obsolete task_scheduling_relation_draft id=%s first=%s,'%s' second=%s,'%s' from scheduling_unit_draft id=%s name='%s' because it's referenced by a blueprint",
                                   task_sched_rel.id, task_sched_rel.first.id, task_sched_rel.first.name, task_sched_rel.second.id, task_sched_rel.second.name, scheduling_unit_draft.id, scheduling_unit_draft.name)

        # remove existing tasks which are not specified (anymore).
        for task in scheduling_unit_draft.task_drafts.all():
            if len([spec_task_name for spec_task_name in specifications_doc.get("tasks",{}).keys() if spec_task_name==task.name])==0:
                try:
                    # existing task_sched_rel is not specified (anymore). Remove it.
                    task.delete()
                    logger.info("deleted obsolete task_draft id=%s name='%s' from scheduling_unit_draft id=%s name='%s'",
                                task.id, task.name, scheduling_unit_draft.id, scheduling_unit_draft.name)
                except ProtectedError:
                    # keep the relation, it's already used in a blueprint
                    logger.warning("could not deleted obsolete task_draft id=%s name='%s' from scheduling_unit_draft id=%s name='%s' because it's referenced by a blueprint",
                                   task.id, task.name, scheduling_unit_draft.id, scheduling_unit_draft.name)

        # sort and process the tasks in 'data-flow'-order to create a natural order in which we create the task drafts
        tasks = specifications_doc.get("tasks", {})
        tasks_graph = Graph(list(tasks.keys()))

        # we like consumers to be instantiated before producers
        for task_rel in specifications_doc.get("task_relations",[]):
            tasks_graph.add_edge(task_rel['producer'], task_rel['consumer'])

        # we like earlier observations to be instantiated before later ones
        for task_sched_rel in specifications_doc.get("task_scheduling_relations",[]):
            if task_sched_rel["placement"] == "before":
                tasks_graph.add_edge(task_sched_rel["first"], task_sched_rel["second"])
            elif task_sched_rel["placement"] in ["after", "parallel"]: # for parallel, the choice is arbitrary
                tasks_graph.add_edge(task_sched_rel["second"], task_sched_rel["first"])

        # sort the drafts according to the given criteria
        ordered_task_names = toposorted(tasks_graph)

        # add/update new/updated tasks
        for task_name in ordered_task_names:
            task_definition = tasks[task_name]

            task_template_name = task_definition["specifications_template"]["name"]
            task_template_version = task_definition["specifications_template"].get("version")
            task_template = models.TaskTemplate.get_version_or_latest(name=task_template_name, version=task_template_version)

            task_specifications_doc = task_definition.get("specifications_doc", {})
            task_specifications_doc = task_template.add_defaults_to_json_object_for_schema(task_specifications_doc)

            logger.debug("creating/updating task draft... task_name='%s', task_template_name='%s', task_template_version=%s", task_name, task_template_name, task_template_version)

            try:
                task_draft = scheduling_unit_draft.task_drafts.get(name=task_name)
                task_draft.description = task_definition.get("description", "")
                task_draft.short_description = task_definition.get("short_description", "")
                task_draft.specifications_doc = task_specifications_doc
                task_draft.specifications_template = task_template
                task_draft.save()

                logger.info("updated task draft id=%s task_name='%s' su_draft_id=%s task_template_name='%s', task_template_version=%s",
                            task_draft.pk, task_name, scheduling_unit_draft.id, task_template_name, task_template_version)
            except models.TaskDraft.DoesNotExist:
                task_draft = models.TaskDraft.objects.create(name=task_name,
                                                             description=task_definition.get("description", ""),
                                                             short_description=task_definition.get("short_description", ""),
                                                             scheduling_unit_draft=scheduling_unit_draft,
                                                             specifications_doc = task_specifications_doc,
                                                             specifications_template=task_template)
                logger.info("created task draft id=%s task_name='%s' su_draft_id=%s task_template_name='%s', task_template_version=%s, task_template_state=%s",
                            task_draft.pk, task_name, scheduling_unit_draft.id, task_template_name, task_template_version, task_template.state.value)

        scheduling_unit_draft.refresh_from_db()

        # Now create task relations
        for task_relation_definition in specifications_doc.get("task_relations", []):
            try:
                producer_task_draft = scheduling_unit_draft.task_drafts.get(name=task_relation_definition["producer"])
                consumer_task_draft = scheduling_unit_draft.task_drafts.get(name=task_relation_definition["consumer"])
                input_role = models.TaskConnectorType.objects.get(task_template=consumer_task_draft.specifications_template,
                                                                  role=task_relation_definition["input"]["role"],
                                                                  datatype=task_relation_definition["input"]["datatype"],
                                                                  dataformat=task_relation_definition["input"]["dataformat"],
                                                                  iotype=models.IOType.Choices.INPUT.value)
                output_role = models.TaskConnectorType.objects.get(task_template=producer_task_draft.specifications_template,
                                                                   role=task_relation_definition["output"]["role"],
                                                                   datatype=task_relation_definition["output"]["datatype"],
                                                                   dataformat=task_relation_definition["output"]["dataformat"],
                                                                   iotype=models.IOType.Choices.OUTPUT.value)

                selection_template_name = task_relation_definition["selection_template"]["name"]
                selection_template_version = task_relation_definition["selection_template"].get("version")
                selection_template = models.TaskRelationSelectionTemplate.get_version_or_latest(name=selection_template_name, version=selection_template_version)
                selection_doc = task_relation_definition.get("selection_doc", selection_template.get_default_json_document_for_schema())
            except Exception as e:
                logger.error("Could not determine Task Relations for %s. Error: %s", task_relation_definition, e)
                raise

            task_relation, created = models.TaskRelationDraft.objects.update_or_create(tags=task_relation_definition.get("tags",[]),
                                                                                       producer=producer_task_draft,
                                                                                       consumer=consumer_task_draft,
                                                                                       input_role=input_role,
                                                                                       output_role=output_role,
                                                                                       selection_doc=selection_doc,
                                                                                       selection_template=selection_template)
            logger.info("%s task_relation id=%s between task draft id=%s name='%s' and id=%s name='%s",
                        "created" if created else "updated",
                        task_relation.pk, producer_task_draft.id, producer_task_draft.name, consumer_task_draft.id, consumer_task_draft.name)

        # task_scheduling_relation
        for task_scheduling_relation_definition in specifications_doc.get("task_scheduling_relations", []):
            placement = models.SchedulingRelationPlacement.objects.get(value=task_scheduling_relation_definition["placement"])
            time_offset = task_scheduling_relation_definition.get("time_offset", 0)
            first_task_draft = scheduling_unit_draft.task_drafts.get(name=task_scheduling_relation_definition["first"])
            second_task_draft = scheduling_unit_draft.task_drafts.get(name=task_scheduling_relation_definition["second"])

            task_scheduling_relation, created = models.TaskSchedulingRelationDraft.objects.update_or_create(placement=placement,
                                                                                                            time_offset=time_offset,
                                                                                                            first=first_task_draft,
                                                                                                            second=second_task_draft)
            logger.info("%s task_scheduling_relation id=%s between task draft id=%s name='%s' and id=%s name='%s",
                        "created" if created else "updated",
                        task_scheduling_relation.pk, first_task_draft.id, first_task_draft.name, second_task_draft.id, second_task_draft.name)

        if 'scheduling_constraints_doc' in specifications_doc:
            scheduling_unit_draft.scheduling_constraints_doc = specifications_doc['scheduling_constraints_doc']
            scheduling_unit_draft.save()

        # finally, check the scheduling_constraints (may raise, reverting the transaction)
        scheduling_unit_draft.validate_scheduling_constraints()

    scheduling_unit_draft.refresh_from_db()
    return scheduling_unit_draft


def create_task_blueprint_from_task_draft(task_draft: models.TaskDraft) -> models.TaskBlueprint:
    """
    Create a task_blueprint from the task_draft
    :raises Exception if instantiate fails.
    """
    # make sure the task draft's specifications_doc validates before we blueprint it
    task_draft.validate_specifications_doc()

    logger.debug("creating task_blueprint from task_draft id=%s", task_draft.pk)

    with transaction.atomic():
        # get or create a scheduling_unit_blueprint from the scheduling_unit_draft
        scheduling_unit_blueprint = task_draft.scheduling_unit_draft.scheduling_unit_blueprints.last()
        if scheduling_unit_blueprint is None:
            scheduling_unit_blueprint = create_scheduling_unit_blueprint_from_scheduling_unit_draft(task_draft.scheduling_unit_draft)

        try:
            task_blueprint = TaskBlueprint.objects.get(name=task_draft.name, scheduling_unit_blueprint=scheduling_unit_blueprint)
            if task_blueprint.draft != task_draft:
                task_blueprint.draft = task_draft
                task_blueprint.save()
        except TaskBlueprint.DoesNotExist:
            task_blueprint = TaskBlueprint.objects.create(description=task_draft.description,
                                                          short_description=task_draft.short_description,
                                                          name=task_draft.name,
                                                          draft=task_draft,
                                                          scheduling_unit_blueprint=scheduling_unit_blueprint,
                                                          specifications_doc=task_draft.specifications_doc,
                                                          specifications_template=task_draft.specifications_template,
                                                          output_pinned=task_draft.output_pinned)

            logger.info("created task_blueprint id=%s from task_draft id=%s name=%s", task_blueprint.pk, task_draft.pk, task_draft.name)

        # now that we have a task_blueprint, its time to refresh the task_draft so we get the non-cached fields
        task_draft.refresh_from_db()

        # loop over consumers/producers, and 'copy' the TaskRelationBlueprint from the TaskRelationDraft
        # this is only possible if both 'ends' of the task_relation are converted to a TaskBlueprint
        # so, when converting two TaskDrafts (for example an observation and a pipeline), then for the conversion
        # of the first TaskDraft->TaskBlueprint no relation is setup,
        # and for the second TaskDraft->TaskBlueprint conversion we have both endpoints available, so we can connect them.
        task_draft_relations = models.TaskRelationDraft.objects.filter(Q(consumer=task_draft) | Q(producer=task_draft)).all()
        for task_relation_draft in task_draft_relations:
            try:
                producing_task_blueprint = task_relation_draft.producer.task_blueprints.get(scheduling_unit_blueprint=task_blueprint.scheduling_unit_blueprint)
                consuming_task_blueprint = task_relation_draft.consumer.task_blueprints.get(scheduling_unit_blueprint=task_blueprint.scheduling_unit_blueprint)

                with transaction.atomic():
                    task_relation_blueprint = models.TaskRelationBlueprint.objects.create(draft=task_relation_draft,
                                                                                          input_role=task_relation_draft.input_role,
                                                                                          output_role=task_relation_draft.output_role,
                                                                                          producer=producing_task_blueprint,
                                                                                          consumer=consuming_task_blueprint,
                                                                                          selection_doc=task_relation_draft.selection_doc,
                                                                                          selection_template=task_relation_draft.selection_template)
                    logger.info("created task_relation_blueprint id=%s which connects task_blueprints producer_id=%s and consumer_id=%s",
                                task_relation_blueprint.pk, producing_task_blueprint.pk, consuming_task_blueprint.pk)
            except models.TaskBlueprint.DoesNotExist:
                # either the producer or the consumer has no task_blueprint (yet)
                pass
            except IntegrityError as e:
                if 'TaskRelationBlueprint_unique_relation' in str(e):
                    logger.info("task_relation_blueprint with producer_id=%s and consumer_id=%s already exists",
                                producing_task_blueprint.pk, consuming_task_blueprint.pk)
                else:
                    raise


        # Do the same 'trick' for Task Scheduling Relation Draft to Blueprint
        task_draft_scheduling_relations = models.TaskSchedulingRelationDraft.objects.filter(Q(first=task_draft) | Q(second=task_draft)).all()
        for task_scheduling_relation_draft in task_draft_scheduling_relations:
            try:
                first_task_blueprint = task_scheduling_relation_draft.first.task_blueprints.get(scheduling_unit_blueprint=task_blueprint.scheduling_unit_blueprint)
                second_task_blueprint = task_scheduling_relation_draft.second.task_blueprints.get(scheduling_unit_blueprint=task_blueprint.scheduling_unit_blueprint)

                with transaction.atomic():
                    task_scheduling_relation_blueprint = models.TaskSchedulingRelationBlueprint.objects.create(draft=task_scheduling_relation_draft,
                                                                                                               first=first_task_blueprint,
                                                                                                               second=second_task_blueprint,
                                                                                                               time_offset=task_scheduling_relation_draft.time_offset,
                                                                                                               placement=task_scheduling_relation_draft.placement)
                    logger.info("created task_scheduling_relation_blueprint id=%s which connects task_blueprints first_id=%s and second_id=%s, placement=%s time_offset=%s[sec]",
                                task_scheduling_relation_blueprint.pk, first_task_blueprint.pk, second_task_blueprint.pk, task_scheduling_relation_draft.placement, task_scheduling_relation_draft.time_offset)
            except models.TaskBlueprint.DoesNotExist:
                # either the first or the second has no task_blueprint (yet)
                pass
            except IntegrityError as e:
                if 'TaskSchedulingRelationBlueprint_unique_relation' in str(e):
                    logger.info("task_scheduling_relation_blueprint with first_id=%s and second_id=%s already exists",
                                first_task_blueprint.pk, second_task_blueprint.pk)
                else:
                    raise

        return task_blueprint


def create_scheduling_unit_blueprint_and_tasks_and_subtasks_from_scheduling_unit_draft(scheduling_unit_draft: models.SchedulingUnitDraft) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Create the scheduling_unit_blueprint, then create its child task_blueprint(s), then create the task_blueprint's subtasks'''
    with transaction.atomic():
        scheduling_unit_blueprint = create_scheduling_unit_blueprint_from_scheduling_unit_draft(scheduling_unit_draft)
        scheduling_unit_blueprint = update_task_blueprints_and_subtasks_graph_from_draft(scheduling_unit_blueprint)

        # set (earliest possible) start_time if we can derive from the constraints
        # the dynamic scheduler may update the start_time according to other constraints and circumstances.
        # This is just an initial good spot where the user expects it to pop up.
        if scheduling_unit_blueprint.scheduling_constraints_doc is not None:
            constraints = scheduling_unit_blueprint.scheduling_constraints_doc or dict()
            time_constraints = constraints.get('time', {})
            if 'at' in time_constraints:
                at = parser.parse(time_constraints['at'], ignoretz=True)
                set_scheduling_unit_blueprint_start_times(scheduling_unit_blueprint, at)
            elif 'after' in time_constraints:
                after = parser.parse(time_constraints['after'], ignoretz=True)
                set_scheduling_unit_blueprint_start_times(scheduling_unit_blueprint, after)
            elif 'between' in time_constraints:
                between_froms = [parser.parse(between['from'], ignoretz=True) for between in time_constraints['between'] if 'from' in between]
                if between_froms:
                    earliest_from = min(between_froms)
                    set_scheduling_unit_blueprint_start_times(scheduling_unit_blueprint, earliest_from)

        if scheduling_unit_blueprint.interrupts_telescope:
            # check the trigger accounting, may result in scheduling_unit_blueprint ERROR status
            # it's up to the user to deal with this trigger unit in error status.
            scheduling_unit_blueprint.check_trigger_accounting()

        return scheduling_unit_blueprint


def create_task_blueprint_and_subtasks_from_task_draft(task_draft: models.TaskDraft) -> models.TaskBlueprint:
    '''Convenience method: Create the task_blueprint, then create the task_blueprint's subtasks, and schedule the ones that are not dependend on predecessors'''
    with transaction.atomic():
        task_blueprint =  create_task_blueprint_from_task_draft(task_draft)
        create_or_update_subtasks_from_task_blueprint(task_blueprint)
    task_blueprint.refresh_from_db()
    return task_blueprint


def update_task_blueprint_graph_from_draft(scheduling_unit_blueprint: models.SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''Update this scheduling_unit_blueprint's task_blueprints graph from the linked scheduling_unit_draft's graph.
    - New task_drafts that are not in the blueprint graph (yet) are added to this blueprint graph.
    - Modified task_drafts that are in the blueprint graph are NOT updated to the task_blueprint, because task_blueprints are immutable.
    - Deleted task_drafts that are in the blueprint graph are NOT removed from the blueprint graph, because task_blueprints are immutable and carved in stone and reflect what happened.
    '''

    with transaction.atomic():
        scheduling_unit_draft = scheduling_unit_blueprint.draft
        task_drafts = list(scheduling_unit_draft.task_drafts.all())

        # sort them in 'data-flow'-order,
        task_draft_graph = Graph(task_drafts)

        # because successors can depend on predecessors, so the first taskdraft's need to be blueprinted first.
        task_draft_graph.predecessor_key(lambda draft: draft.predecessors)

        # furthermore, we like observations that are to be scheduled earlier, to be blueprinted first.
        for task_sched_rel in models.TaskSchedulingRelationDraft.objects.filter(Q(first__scheduling_unit_draft__id=scheduling_unit_draft.id) & Q(second__scheduling_unit_draft__id=scheduling_unit_draft.id)).all():
            if task_sched_rel.placement == models.SchedulingRelationPlacement.Choices.BEFORE.value:
                task_draft_graph.add_edge(task_sched_rel.first, task_sched_rel.second)
            elif task_sched_rel.placement in [models.SchedulingRelationPlacement.Choices.AFTER.value, models.SchedulingRelationPlacement.Choices.PARALLEL.value]: # for parallel, the choice is arbitrary
                task_draft_graph.add_edge(task_sched_rel.second, task_sched_rel.first)

        # sort the drafts according to the given criteria
        task_drafts = toposorted(task_draft_graph)

        # convert task_draft(s) to task_blueprint(s)
        for task_draft in task_drafts:
            create_task_blueprint_from_task_draft(task_draft)

        # refresh so all related fields are updated.
        scheduling_unit_blueprint.refresh_from_db()
        return scheduling_unit_blueprint


def update_task_blueprints_and_subtasks_graph_from_draft(scheduling_unit_blueprint: models.SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Create the scheduling_unit_blueprint's task_blueprint(s), then create each task_blueprint's subtasks'''
    with transaction.atomic():
        scheduling_unit_blueprint = update_task_blueprint_graph_from_draft(scheduling_unit_blueprint)

        for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
            create_or_update_subtasks_from_task_blueprint(task_blueprint)

        # assign reasonable default start/stop times so the subtasks/tasks/sched_unit can be displayed in a timeline view
        # these default start/stop times can of course be overridden by the operator and/or dynamic scheduling.
        initial_scheduled_start_time = round_to_minute_precision(datetime.utcnow() + timedelta(minutes=10))
        if scheduling_unit_blueprint.scheduling_constraints_doc:
            # use 'at' time as initial_scheduled_start_time if fixed_time-scheduled
            if scheduling_unit_blueprint.scheduling_constraints_doc.get('scheduler', '') == 'manual':
                if 'time' in scheduling_unit_blueprint.scheduling_constraints_doc and 'at' in scheduling_unit_blueprint.scheduling_constraints_doc['time']:
                    initial_scheduled_start_time = parser.parse(scheduling_unit_blueprint.scheduling_constraints_doc['time']['at'], ignoretz=True)

            # position just after the last other schedunit if dynamic-scheduled
            elif scheduling_unit_blueprint.scheduling_constraints_doc.get('scheduler', '') == 'dynamic':
                other_schedunits_in_project = models.SchedulingUnitBlueprint.objects.filter(draft__scheduling_set__project=scheduling_unit_blueprint.project).exclude(id=scheduling_unit_blueprint.id)
                last_stop_time = other_schedunits_in_project.aggregate(Max('on_sky_stop_time'))['on_sky_stop_time__max']
                if last_stop_time is not None:
                    initial_scheduled_start_time = last_stop_time + timedelta(minutes=3)

        # make the guesstimate pleasantly readable
        initial_scheduled_start_time = round_to_minute_precision(initial_scheduled_start_time)

        # and apply
        update_subtasks_start_times_for_scheduling_unit(scheduling_unit_blueprint, initial_scheduled_start_time)

        # finally, check the scheduling_constraints (may raise, reverting the transaction)
        scheduling_unit_blueprint.validate_scheduling_constraints()

    # refresh so all related fields are updated.
    scheduling_unit_blueprint.refresh_from_db()

    return scheduling_unit_blueprint


def schedule_independent_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint, start_time: datetime=None, misc_unavailable_stations: Iterable[str]=None) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Schedule the subtasks in the scheduling_unit_blueprint that are not dependend on predecessors'''
    if start_time is None:
        if scheduling_unit_blueprint.scheduled_start_time is None:
            start_time = datetime.utcnow()+timedelta(minutes=1)
        else:
            start_time = scheduling_unit_blueprint.scheduled_start_time

    task_blueprints = list(scheduling_unit_blueprint.task_blueprints.all())

    for task_blueprint in task_blueprints:
        schedule_independent_subtasks_in_task_blueprint(task_blueprint, start_time=start_time+task_blueprint.relative_start_time, misc_unavailable_stations=misc_unavailable_stations)

    scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Unschedule all scheduled subtasks in the scheduling_unit_blueprint'''
    with transaction.atomic():
        task_blueprints = list(scheduling_unit_blueprint.task_blueprints.all())
 
        for task_blueprint in task_blueprints:
            unschedule_subtasks_in_task_blueprint(task_blueprint)

        scheduling_unit_blueprint.refresh_from_db()

    return scheduling_unit_blueprint


def reschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint, misc_unavailable_stations: Iterable[str]=None) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Unschedule all scheduled subtasks and schedule them again in one transaction'''
    with transaction.atomic():
        unscheduled_unit = unschedule_subtasks_in_scheduling_unit_blueprint(scheduling_unit_blueprint)
        return schedule_independent_subtasks_in_scheduling_unit_blueprint(unscheduled_unit, misc_unavailable_stations=misc_unavailable_stations)


def set_scheduling_unit_blueprint_start_times(scheduling_unit_blueprint: SchedulingUnitBlueprint, first_start_time: datetime) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Set the subtask.scheduled_start_time such that all scheduling_relations and dependencies are taken into account and the first subtask has a scheduled_start_time equal to first_start_time'''
    with transaction.atomic():
        for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
            if first_start_time is not None and task_blueprint.relative_start_time is not None:
                set_task_blueprint_start_times(task_blueprint, first_start_time=first_start_time+task_blueprint.relative_start_time)

        scheduling_unit_blueprint.refresh_from_db()

    return scheduling_unit_blueprint

def mark_independent_subtasks_in_scheduling_unit_blueprint_as_unschedulable(scheduling_unit_blueprint: SchedulingUnitBlueprint, reason: str) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Mark the subtasks in the scheduling_unit_blueprint that are not dependend on predecessors as unschedulable'''
    logger.info("marking unit with id=%s as unschedulable with reason: %s", scheduling_unit_blueprint.id, reason)
    with transaction.atomic():
        for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
            mark_independent_subtasks_in_task_blueprint_as_unschedulable(task_blueprint, reason)

    scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def mark_subtasks_in_scheduling_unit_blueprint_as_schedulable(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''Convenience method: Mark the subtasks in the scheduling_unit_blueprint as schedulable'''
    with transaction.atomic():
        for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
            mark_subtasks_in_task_blueprint_as_schedulable(task_blueprint)

    scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def cancel_task_blueprint(task_blueprint: TaskBlueprint) -> TaskBlueprint:
    '''Convenience method: cancel all subtasks in the task_blueprint'''
    cancelled_state = models.SubtaskState.objects.get(value=models.SubtaskState.Choices.CANCELLING.value)
    cancellable_states = models.SubtaskAllowedStateTransitions.allowed_old_states(cancelled_state)
    for subtask in task_blueprint.subtasks.filter(state__in=cancellable_states).all():
        cancel_subtask(subtask)
    task_blueprint.refresh_from_db()
    return task_blueprint


def mark_task_blueprint_as_obsolete(task_blueprint: TaskBlueprint) -> TaskBlueprint:
    '''Convenience method: mark all cancelled/error subtasks in the task_blueprint as obsolete'''
    now = datetime.utcnow()
    for subtask in task_blueprint.subtasks.all():
        if subtask.obsolete_since is None:
            subtask.obsolete_since = now
            subtask.save()
    task_blueprint.refresh_from_db()
    return task_blueprint


def restart_on_hold_task_blueprint(task_blueprint: TaskBlueprint) -> TaskBlueprint:
    '''Convenience method: restart all subtasks in the task_blueprint which are on hold'''
    for subtask in task_blueprint.subtasks.filter(state__value=models.SubtaskState.Choices.ON_HOLD.value).all():
        restart_on_hold_subtask(subtask)
    task_blueprint.refresh_from_db()
    return task_blueprint


def cancel_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> SchedulingUnitBlueprint:
    '''Convenience method: cancel all subtasks in the task_blueprints in the scheduling_unit_blueprint'''
    for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
        cancel_task_blueprint(task_blueprint)
    scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def mark_scheduling_unit_blueprint_as_obsolete(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> SchedulingUnitBlueprint:
    '''Convenience method: mark all subtasks in the task_blueprints in the scheduling_unit_blueprint as obsolete'''
    for task_blueprint in scheduling_unit_blueprint.task_blueprints.all():
        mark_task_blueprint_as_obsolete(task_blueprint)
    scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def _generate_available_name(prefix: str, existing_objects: list):
    '''returns prefix if not used as name by any of the objects in existing_objects or otherwise the first available name with integer suffix'''
    existing_names = [obj.name for obj in existing_objects]
    name = prefix
    i = 1
    while name in existing_names:
        i += 1
        name = prefix + str(i)
    return name


def create_cleanuptask_for_scheduling_unit_blueprint(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''create a cleanuptask for the given scheduling_unit which will cleanup all output dataproducts from tasks in this scheduling_unit which aren't already cleaned up'''

    # Rationale:
    # adding a cleanup task(blueprint) to a scheduling_unit_blueprint adds a task to the graph (which breaks the immutable blueprint concept),
    # but it does not modify observation/pipeline behaviour, hence we allow it.
    # Regard this as a convenience function to allow users to simplify cleaning up after themselves if they forgot to specificy a cleanup task.

    with transaction.atomic():
        # create a cleanup task draft and blueprint....
        cleanup_template = models.TaskTemplate.get_version_or_latest(name="cleanup")
        cleanup_spec_doc = cleanup_template.get_default_json_document_for_schema()

        name = _generate_available_name("Cleanup", existing_objects=scheduling_unit_blueprint.task_blueprints.all())

        # create a temporary draft task, which we need to instantiate the blueprint. Remove it after updating the schedunit draft.
        tmp_cleanup_task_draft = models.TaskDraft.objects.create(
            name=name,
            description="Cleaning up all output dataproducts for this scheduling unit",
            scheduling_unit_draft=scheduling_unit_blueprint.draft,
            specifications_doc=cleanup_spec_doc,
            specifications_template=cleanup_template)

        cleanup_task_blueprint = TaskBlueprint.objects.create(
            name=name,
            description="Cleaning up all output dataproducts for this scheduling unit",
            draft=tmp_cleanup_task_draft, # is replaced at the end of this transaction...
            scheduling_unit_blueprint=scheduling_unit_blueprint,
            specifications_doc=cleanup_spec_doc,
            specifications_template=cleanup_template,
            output_pinned=False)

        logger.info("Created Cleanup Task id=%d for scheduling_unit id=%s, adding the outputs of all producing tasks in the scheduling unit to the cleanup...", cleanup_task_blueprint.id, scheduling_unit_blueprint.id)

        # ... and connect the outputs of the producing tasks to the cleanup, so the cleanup task knows what to remove.
        selection_template = TaskRelationSelectionTemplate.get_version_or_latest(name="all")
        selection_doc = selection_template.get_default_json_document_for_schema()

        tmp_task_relation_drafts = []
        task_relation_blueprints = []
        for producer_task_blueprint in scheduling_unit_blueprint.task_blueprints.exclude(specifications_template__type=TaskType.Choices.CLEANUP).exclude(specifications_template__type=TaskType.Choices.INGEST).all():
            for connector_type in producer_task_blueprint.specifications_template.connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all():
                # define what the producer_task_blueprint is producing
                output_role = models.TaskConnectorType.objects.get(task_template=producer_task_blueprint.specifications_template,
                                                                   role=connector_type.role,
                                                                   datatype=connector_type.datatype,
                                                                   iotype=models.IOType.objects.get(value=models.IOType.Choices.OUTPUT.value))

                # define what the cleanup task accepts/consumes
                input_role = models.TaskConnectorType.objects.filter(dataformat=connector_type.dataformat).get(task_template=cleanup_task_blueprint.specifications_template,
                                                                                                               role=models.Role.objects.get(value=models.Role.Choices.ANY.value),
                                                                                                               datatype=connector_type.datatype,
                                                                                                               iotype=models.IOType.objects.get(value=models.IOType.Choices.INPUT.value))


                # connect the two
                # create a temporary draft, which we need to instantiate the blueprint. Remove it after updating the schedunit draft.
                tmp_task_relation_draft = models.TaskRelationDraft.objects.create(producer=producer_task_blueprint.draft,
                                                                                  consumer=cleanup_task_blueprint.draft,
                                                                                  input_role=input_role,
                                                                                  output_role=output_role,
                                                                                  selection_doc=selection_doc,
                                                                                  selection_template=selection_template)
                tmp_task_relation_drafts.append(tmp_task_relation_draft)

                task_relation_blueprint = models.TaskRelationBlueprint.objects.create(draft=tmp_task_relation_draft, # is replaced at the end of this transaction...
                                                                                      producer=producer_task_blueprint,
                                                                                      consumer=cleanup_task_blueprint,
                                                                                      input_role=input_role,
                                                                                      output_role=output_role,
                                                                                      selection_doc=selection_doc,
                                                                                      selection_template=selection_template)
                task_relation_blueprints.append(task_relation_blueprint)

                logger.info("created task_relation id=%s between task blueprint id=%s name='%s' and id=%s name='%s",
                            task_relation_blueprint.pk, task_relation_blueprint.producer.id, task_relation_blueprint.producer.name, task_relation_blueprint.consumer.id, task_relation_blueprint.consumer.name)


        # add the same to the draft graph
        cleanup_task_draft = copy_task_blueprint_to_task_draft(cleanup_task_blueprint)

        # update dummy draft references
        cleanup_task_blueprint.draft = cleanup_task_draft  # update reference to point to the now existing draft
        cleanup_task_blueprint.save()

        for task_relation_blueprint in task_relation_blueprints:
            task_relation_blueprint.draft = models.TaskRelationDraft.objects.get(producer__id=task_relation_blueprint.producer.draft.id, consumer__id=cleanup_task_draft.id, input_role=task_relation_blueprint.input_role, output_role=task_relation_blueprint.output_role)
            task_relation_blueprint.save()

        # and wipe the dummy's
        for tmp_task_relation_draft in tmp_task_relation_drafts:
            tmp_task_relation_draft.delete()
        tmp_cleanup_task_draft.delete()

        # and finally also create the executable subtask for the cleanup_task_blueprint, so it can actually run.
        create_or_update_subtasks_from_task_blueprint(cleanup_task_blueprint)

        # return the modified scheduling_unit
        scheduling_unit_blueprint.refresh_from_db()
        return scheduling_unit_blueprint


def create_cleanuptask_for_scheduling_unit_draft(scheduling_unit_draft: SchedulingUnitDraft) -> models.SchedulingUnitDraft:
    '''create a cleanuptask for the given scheduling_unit which will cleanup all output dataproducts from tasks in this scheduling_unit which aren't already cleaned up'''

    with transaction.atomic():
        # create a cleanup task draft....
        cleanup_template = models.TaskTemplate.get_version_or_latest(name="cleanup")
        cleanup_spec_doc = cleanup_template.get_default_json_document_for_schema()

        name = _generate_available_name("Cleanup", existing_objects=scheduling_unit_draft.task_drafts.all())

        cleanup_task_draft = models.TaskDraft.objects.create(
            name=name,
            description="Cleaning up all output dataproducts for this scheduling unit",
            scheduling_unit_draft=scheduling_unit_draft,
            specifications_doc=cleanup_spec_doc,
            specifications_template=cleanup_template)

        logger.info("Created Cleanup Task id=%d for scheduling_unit id=%s, adding the outputs of all producing tasks in the scheduling unit to the cleanup...", cleanup_task_draft.id, scheduling_unit_draft.id)

        # ... and connect the outputs of the producing tasks to the cleanup, so the cleanup task knows what to remove.
        selection_template = TaskRelationSelectionTemplate.get_version_or_latest(name="all")
        selection_doc = selection_template.get_default_json_document_for_schema()

        for producer_task_draft in scheduling_unit_draft.task_drafts.exclude(specifications_template__type=TaskType.Choices.CLEANUP).exclude(specifications_template__type=TaskType.Choices.INGEST).all():
            for connector_type in producer_task_draft.specifications_template.connector_types.filter(iotype__value=IOType.Choices.OUTPUT.value).all():
                # define what the producer_task_draft is producing
                output_role = models.TaskConnectorType.objects.get(task_template=producer_task_draft.specifications_template,
                                                                   role=connector_type.role,
                                                                   datatype=connector_type.datatype,
                                                                   iotype=models.IOType.objects.get(value=models.IOType.Choices.OUTPUT.value))

                # define what the cleanup task accepts/consumes
                input_role = models.TaskConnectorType.objects.filter(dataformat=connector_type.dataformat).get(task_template=cleanup_task_draft.specifications_template,
                                                                                                               role=models.Role.objects.get(value=models.Role.Choices.ANY.value),
                                                                                                               datatype=connector_type.datatype,
                                                                                                               iotype=models.IOType.objects.get(value=models.IOType.Choices.INPUT.value))

                # connect the two
                task_relation_draft = models.TaskRelationDraft.objects.create(producer=producer_task_draft,
                                                                              consumer=cleanup_task_draft,
                                                                              input_role=input_role,
                                                                              output_role=output_role,
                                                                              selection_doc=selection_doc,
                                                                              selection_template=selection_template)

                logger.info("created task_relation id=%s between task draft id=%s name='%s' and id=%s name='%s",
                            task_relation_draft.pk, task_relation_draft.producer.id, task_relation_draft.producer.name, task_relation_draft.consumer.id, task_relation_draft.consumer.name)

        # return the modified scheduling_unit
        scheduling_unit_draft.refresh_from_db()
        return scheduling_unit_draft


def get_gaps_to_previous_and_next_observations_in_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint, include_schedulable_unschedulable: bool=False) -> ((datetime, datetime), (datetime, datetime)):
    '''get the gaps between the outer observation subtask(s) in the given scheduling_unit and its respective previous and next non-cancelled/obsolete observation subtask.
    returns a tuple of tuple's of datetimes: ((start_of_gap_to_previous_obs, end_of_gap_to_previous_obs), (start_of_gap_to_next_obs, end_of_gap_to_next_obs))
    If there are no prev/next observation(s), then the gap is an open-interval, like (datetime.min, end_of_gap_to_previous_obs) or (start_of_gap_to_next_obs, datetime.max)
    '''
    if scheduling_unit is None:
        return ((datetime.min, datetime.max), (datetime.min, datetime.max))

    own_observation_subtasks = scheduling_unit.subtasks.filter(specifications_template__type__value='observation').filter(obsolete_since__isnull=True).all()

    # get all gaps, exclude observations in this unit.
    results = [get_gaps_to_previous_and_next_observations(obs_subtask,
                                                          exclude_subtasks=own_observation_subtasks,
                                                          include_defined_unschedulable=include_schedulable_unschedulable,
                                                          stations=obs_subtask.stations)
               for obs_subtask in own_observation_subtasks]

    # gather overall gaps.
    overall_result = [[results[0][0][0], results[0][0][1]], [results[0][1][0], results[0][1][1]]]
    for result in results[1:]:
        # gap to prev
        overall_result[0][0] = max(result[0][0], overall_result[0][0])
        overall_result[0][1] = min(result[0][1], overall_result[0][1])

        # gap to next
        overall_result[1][0] = max(result[1][0], overall_result[1][0])
        overall_result[1][1] = min(result[1][1], overall_result[1][1])

    # return as immutable tuple
    return ((overall_result[0][0],overall_result[0][1]),(overall_result[1][0],overall_result[1][1]))


@lru_cache(10000)
def get_schedulable_stations(observation_task: TaskBlueprint, proposed_start_time: datetime=None) -> Tuple[str]:
    '''get the schedulable stations for this observation task at the given proposed_start_time (or the task's on_sky_start_time of not give) witout the used and/pr reserved stations.'''
    stations = set()
    if proposed_start_time is None:
        proposed_start_time = observation_task.on_sky_start_time
    for subtask in observation_task.subtasks.filter(specifications_template__type__value=models.SubtaskType.Choices.OBSERVATION.value).all():
        stations.update(convert_task_station_groups_specification_to_station_list_without_used_and_or_reserved_stations(subtask,
                                                                                                                        remove_reserved_stations=True,
                                                                                                                        remove_used_stations=True,
                                                                                                                        raise_when_too_many_missing=False,
                                                                                                                        lower_bound=proposed_start_time,
                                                                                                                        upper_bound=proposed_start_time+subtask.specified_duration))
    stations = tuple(sorted(list(stations)))
    return stations


def mark_scheduling_unit_fixed_time_scheduled_at_scheduled_starttime(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''Convenience method: When the scheduling_unit is scheduled, use its scheduled_start_time as 'at' constraint and set the scheduled-type to 'fixed_time'.  As a result, the scheduler will try to scheduled it using these new constraints.
    This is the 'inverse' of mark_scheduling_unit_dynamically_scheduled'''
    if scheduling_unit_blueprint.status.value in (models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.SCHEDULED.value):
        logger.info("marking unit id=%s scheduling_constraints as fixed_time scheduled at starttime='%s'", scheduling_unit_blueprint.id, scheduling_unit_blueprint.scheduled_start_time)
        scheduling_unit_blueprint.scheduling_constraints_doc['scheduler'] = 'fixed_time'
        scheduling_unit_blueprint.scheduling_constraints_doc['time']['at'] = scheduling_unit_blueprint.scheduled_start_time.isoformat()+'Z'
        scheduling_unit_blueprint.save()
        scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def mark_scheduling_unit_dynamically_scheduled(scheduling_unit_blueprint: SchedulingUnitBlueprint) -> models.SchedulingUnitBlueprint:
    '''Convenience method: When the scheduling_unit is (un)schedlable or scheduled, set the scheduled-type to 'dynamic'. As a result, the scheduler will try to scheduled it using these new constraints.
    This is the 'inverse' of mark_scheduling_unit_fixed_time_scheduled_at_scheduled_starttime'''
    if scheduling_unit_blueprint.status.value in (models.SchedulingUnitStatus.Choices.SCHEDULED.value, models.SchedulingUnitStatus.Choices.SCHEDULABLE.value, models.SchedulingUnitStatus.Choices.UNSCHEDULABLE.value):
        logger.info("marking unit id=%s scheduling_constraints as dynamically scheduled", scheduling_unit_blueprint.id)
        scheduling_unit_blueprint.scheduling_constraints_doc['scheduler'] = 'dynamic'
        if 'at' in scheduling_unit_blueprint.scheduling_constraints_doc['time']:
            scheduling_unit_blueprint.scheduling_constraints_doc['time'].pop('at')
        scheduling_unit_blueprint.save()
        scheduling_unit_blueprint.refresh_from_db()
    return scheduling_unit_blueprint


def convert_station_groups_to_list_of_available_stations(station_groups: Iterable[dict], unavailable_stations: Iterable[str], raise_when_too_many_missing: bool=True):
    # create station_list with stations that are actually available (and not reserved/used/unavailable)
    station_list = []
    for i, station_group in enumerate(station_groups):
        requested_stations = set(station_group['stations'])
        available_stations = requested_stations - set(unavailable_stations)
        missing_stations = requested_stations - available_stations
        max_nr_missing = station_group.get('max_nr_missing', 0)
        if raise_when_too_many_missing and len(missing_stations) > max_nr_missing:
            # early exit. No need to evaluate more groups when one groups does not meet the requirements
            raise TooManyStationsUnavailableException('missing more than max_nr_missing=%s stations\nrequested: %s\navailable: %s\nmissing: %s' % (
                                                      max_nr_missing,
                                                      ','.join(sorted(list(requested_stations))),
                                                      ','.join(sorted(list(available_stations))),
                                                      ','.join(sorted(list(missing_stations)))))

        station_list.extend(available_stations)

        if logger.isEnabledFor(logging.DEBUG) and len(missing_stations) > 0:
            logger.debug('unavailable requested stations in group \'%s\': %s (which is within the allowed max_nr_missing=%d)', i, sorted(list(missing_stations)), max_nr_missing)

    # collapse to sorted list of available unique stations
    station_list = sorted(list(set(station_list)))
    return station_list

def enough_stations_available_for_task(task: TaskBlueprint, unavailable_stations: Iterable[str]) -> bool:
    '''Are there enough stations available if the given unavailable_stations are removed?
    Checks all stations_groups of the spec, and checks if there are less stations missing than max_nr_missing'''
    try:
        convert_station_groups_to_list_of_available_stations(task.specified_station_groups,
                                                             unavailable_stations,
                                                             raise_when_too_many_missing=True)
    except TooManyStationsUnavailableException as e:
        logger.debug(e)
        return False
    return True


def enough_stations_available_for_scheduling_unit(scheduling_unit: SchedulingUnitBlueprint, unavailable_stations: Iterable[str]) -> bool:
    '''Are there enough stations available if the given unavailable_stations are removed?
    Checks all stations_groups of the spec, and checks if there are less stations missing than max_nr_missing'''
    return all(enough_stations_available_for_task(obs_task, unavailable_stations) for obs_task in scheduling_unit.observation_tasks.all())