Newer
Older
"""
This file contains the database models
"""
import os

Jorrit Schaap
committed
import logging
logger = logging.getLogger(__name__)
from datetime import datetime, timedelta
from django.db.models import Model, ForeignKey, OneToOneField, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \
ManyToManyField, CASCADE, SET_NULL, PROTECT, QuerySet, BigAutoField, UniqueConstraint
from django.contrib.postgres.fields import ArrayField, JSONField
Jörn Künsemöller
committed
from .permissions import TMSSUser as User
from .common import AbstractChoice, BasicCommon, Template, NamedCommon, TemplateSchemaMixin, ProjectPropertyMixin, RefreshFromDbInvalidatesCachedPropertiesMixin
from enum import Enum

Jorrit Schaap
committed
from django.db.models.expressions import RawSQL
Jörn Künsemöller
committed
from django.core.exceptions import ValidationError

Jorrit Schaap
committed
from django.db.utils import InternalError
from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException, SubtaskIllegalStateTransitionException
from django.conf import settings
from lofar.sas.resourceassignment.resourceassignmentservice.rpc import RADBRPC
import uuid
Jörn Künsemöller
committed
from django.db.models.functions import Coalesce
from django.utils.functional import cached_property
Jörn Künsemöller
committed
#
# I/O
#
#
# Choices
#
class SubtaskState(AbstractChoice):
Jörn Künsemöller
committed
"""Defines the model and predefined list of possible SubtaskStatusChoice's for Subtask.
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
Jörn Künsemöller
committed
DEFINING = "defining"
DEFINED = "defined"
SCHEDULING = "scheduling"
SCHEDULED = "scheduled"
QUEUEING = "queueing"
QUEUED = "queued"
STARTING = "starting"
STARTED = "started"
FINISHING = "finishing"
FINISHED = "finished"
CANCELLING = "cancelling"
CANCELLED = "cancelled"
ERROR = "error"

Jorrit Schaap
committed
UNSCHEDULABLE = "unschedulable"
class SubtaskType(AbstractChoice):
"""Defines the model and predefined list of possible SubtaskType's for Subtask.
Jörn Künsemöller
committed
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
OBSERVATION = "observation"
PIPELINE = "pipeline"
INSPECTION = "inspection"
QA_FILES = "qa_files" # task which creates "adder" QA h5 file(s) from a MeasurementSet of beamformed data
QA_PLOTS = "qa_plots" # task which creates "adder" QA plots from an "adder" QA h5 file h5

Jorrit Schaap
committed
CLEANUP = "cleanup"
Jörn Künsemöller
committed
MANUAL = 'manual'
OTHER = 'other'
class StationType(AbstractChoice):
"""Defines the model and predefined list of possible StationType's for AntennaSet.
Jörn Künsemöller
committed
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
CORE = "core"
REMOTE = "remote"
INTERNATIONAL = "international"
class HashAlgorithm(AbstractChoice):
"""Defines the model and predefined list of possible HashAlgorithm's for DataproductHash.
Jörn Künsemöller
committed
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
MD5 = 'md5'
AES256 = 'aes256'

Jorrit Schaap
committed
ADLER32 = 'adler32'
Jörn Künsemöller
committed
#
# Templates
#
Jörn Künsemöller
committed
class SubtaskTemplate(Template):
type = ForeignKey('SubtaskType', null=False, on_delete=PROTECT)
class DataproductSpecificationsTemplate(Template):
Jörn Künsemöller
committed
class DataproductFeedbackTemplate(Template):
Jörn Künsemöller
committed
class SAPTemplate(Template):
pass
# todo: do we need to specify a default?
class SIPidentifier(Model):
'''A SIPidentifier is a global unique id used to build provenance chains in the SIP for the LTA.
It is derived from Model and not from BasicCommon to keep a small footprint.'''
source = CharField(null=False, max_length=128, help_text='Source name')
unique_identifier = BigAutoField(primary_key=True, help_text='Unique global identifier.')
@staticmethod
def assign_new_id_to_model(model):
"""
Create an Unique Identifier for given model class if model is being created.
"""
if model._state.adding:
model.global_identifier = SIPidentifier.objects.create(source="TMSS")
@staticmethod
def assign_new_parset_id_to_model(model):
"""
Create an Unique Identifier for given model class if model is being created.
"""
if model._state.adding:
model.global_parset_identifier = SIPidentifier.objects.create(source="TMSS")
#
# Instance Objects
#
Jörn Künsemöller
committed
class Subtask(BasicCommon, ProjectPropertyMixin, TemplateSchemaMixin):
"""
Represents a low-level task, which is an atomic unit of execution, such as running an observation, running
inspection plots on the observed data, etc. Each task has a specific configuration, will have resources allocated
to it, and represents a single run.
"""
scheduled_process_start_time = DateTimeField(null=True, help_text='The time the system will (try to) start the process (NULLable).')
scheduled_process_stop_time = DateTimeField(null=True, help_text='The time the process is expected to stop (NULLable).')
actual_process_start_time = DateTimeField(null=True, help_text='The time the process actually started (NULLable).')
actual_process_stop_time = DateTimeField(null=True, help_text='The time the process actually stopped (NULLable).')
scheduled_on_sky_start_time = DateTimeField(null=True, help_text='The time the observation will start recording (NULLable).') # previously start_time
scheduled_on_sky_stop_time = DateTimeField(null=True, help_text='The time the observation will stop recording (NULLable).') # previously stop_time
actual_on_sky_start_time = DateTimeField(null=True, help_text='The time the observation actually started recording (NULLable).')
actual_on_sky_stop_time = DateTimeField(null=True, help_text='The time the observation actually stopped recording (NULLable).')
state = ForeignKey('SubtaskState', null=False, on_delete=PROTECT, related_name='task_states', help_text='Subtask state (see Subtask State Machine).')

Jorrit Schaap
committed
primary = BooleanField(default=False, db_index=True, help_text='TRUE if this is the one-and-only primary subtask in a parent TaskBlueprint.')
specifications_doc = JSONField(help_text='Final specifications, as input for the controller.')

Jorrit Schaap
committed
task_blueprint = ForeignKey('TaskBlueprint', null=False, on_delete=PROTECT, related_name='subtasks', help_text='The parent TaskBlueprint.')
specifications_template = ForeignKey('SubtaskTemplate', null=False, on_delete=PROTECT, help_text='Schema used for specifications_doc.')
cluster = ForeignKey('Cluster', null=True, on_delete=PROTECT, help_text='Where the Subtask is scheduled to run (NULLable).')
Jörn Künsemöller
committed
# resource_claim = ForeignKey("ResourceClaim", null=False, on_delete=PROTECT) # todo <-- how is this external reference supposed to work?
Jörn Künsemöller
committed
created_or_updated_by_user = ForeignKey(User, null=True, editable=False, on_delete=PROTECT, help_text='The user who created / updated the subtask.')
error_reason = CharField(null=True, max_length=200, help_text='Reason why the Subtask went to error.')
raw_feedback = CharField(null=True, max_length=1048576, help_text='The raw feedback for this Subtask')
global_identifier = OneToOneField('SIPidentifier', null=False, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')
global_parset_identifier = OneToOneField('SIPidentifier', related_name='related_subtask', null=False, editable=False, on_delete=PROTECT, help_text='The global unique identifier of this Subtask\'s parset for LTA SIP.')
path_to_project = 'task_blueprint__scheduling_unit_blueprint__draft__scheduling_set__project'
obsolete_since = DateTimeField(null=True, help_text='When this subtask was marked obsolete, or NULL if not obsolete (NULLable).')
Jörn Künsemöller
committed
def __init__(self, *args, **kwargs):

Jorrit Schaap
committed
super().__init__(*args, **kwargs)
Jörn Künsemöller
committed
# keep original state for logging
Jörn Künsemöller
committed
self.__original_state_id = self.state_id
# keep original obsolete_since to detect changes on save
# Note: we cannot use self.obsolete_since here since that causes an infinite loop of update_from_db
if 'obsolete_since' in kwargs.keys():
self.__original_obsolete_since = kwargs['obsolete_since']
else:
field_names = [f.name for f in self._meta.fields]
if len(args) == len(field_names):
self.__original_obsolete_since = args[field_names.index('obsolete_since')]
else:
self.__original_obsolete_since = None
@property
def is_obsolete(self) -> bool:
'''convenience property turning the obsolete_since timestamp into a boolean'''
return self.obsolete_since is not None

Jorrit Schaap
committed
@property
def duration(self) -> timedelta:
'''the duration of this subtask (stop-start), or 0 if start/stop are None'''
if self.scheduled_on_sky_start_time is None or self.scheduled_on_sky_stop_time is None:

Jorrit Schaap
committed
return timedelta(seconds=0)
return self.scheduled_on_sky_stop_time - self.scheduled_on_sky_start_time

Jorrit Schaap
committed
@property
def specified_duration(self) -> timedelta:
'''get the specified (or estimated) duration of this subtask based on the specified task duration and the subtask type'''
if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
# observations have a specified duration, so grab it from the spec.
# In case we have several associated tasks: use the longest duration, since we assume that tasks will run in parallel (there would be no reason to combine them into a subtask).
return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', self.task_blueprint.specifications_doc.get('target', {}).get('duration', 0)))

Jorrit Schaap
committed
if self.specifications_template.type.value == SubtaskType.Choices.PIPELINE.value:
# pipelines usually do not have a specified duration, so make a guess (half the obs duration?).
# In case we have several associated tasks: this guess is probably in no way accurate anyway, so we assume it does not really matter which task blueprint we refer to here
try:
return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', max(p.specified_duration.total_seconds() for p in self.predecessors)/2))
except ValueError:
# no predecessors (should not happen). Return a default guess.
return timedelta(minutes=1)

Jorrit Schaap
committed
# other subtasktypes usually depend on cpu/data/network etc. So, make a guess (for now)
return timedelta(minutes=5)

Jorrit Schaap
committed

Jorrit Schaap
committed
@staticmethod
def independent_subtasks() -> QuerySet:
'''return a QuerySet of all subtasks with no input (i.e. which are "independent" because they have no predecessors)
If you want the result, add .all() like so: Subtask.independent_subtasks().all()
'''
return Subtask.objects.filter(inputs=None)

Jorrit Schaap
committed
@property

Jorrit Schaap
committed
def successors(self) -> QuerySet:
'''return the connect successor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)
If you want the result, add .all() like so: my_subtask.successors.all()
'''

Jorrit Schaap
committed
# JS, 20200528: I couldn't make django do a "self-reference" query from the subtask table to the subtask table (via input, output), so I used plain SQL.
return Subtask.objects.filter(id__in=RawSQL("SELECT successor_st.id FROM tmssapp_subtask as successor_st\n"
"INNER JOIN tmssapp_subtaskinput as st_input on st_input.subtask_id = successor_st.id\n"
"INNER JOIN tmssapp_subtaskoutput as st_output on st_output.id = st_input.producer_id\n"
"WHERE st_output.subtask_id = %s", params=[self.id]))
@property

Jorrit Schaap
committed
def predecessors(self) -> QuerySet:
'''return the connect predecessor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)
If you want the result, add .all() like so: my_subtask.predecessors.all()
'''

Jorrit Schaap
committed
# JS, 20200528: I couldn't make django do a "self-reference" query from the subtask table to the subtask table (via input, output), so I used plain SQL.
return Subtask.objects.filter(id__in=RawSQL("SELECT predecessor_st.id FROM tmssapp_subtask as predecessor_st\n"
"INNER JOIN tmssapp_subtaskoutput as st_output on st_output.subtask_id = predecessor_st.id\n"
"INNER JOIN tmssapp_subtaskinput as st_input on st_input.producer_id = st_output.id\n"
"WHERE st_input.subtask_id = %s", params=[self.id]))
@property
def related_primary_observation_subtask(self) -> 'Subtask' or None:
if self.specifications_template.name == 'observation control' and self.primary is True:
return self
else:
def _recursively_search_for_primary_predecessor_observation_subtask(subtask):
primary_observation_control_subtasks = subtask.predecessors.filter(specifications_template__name='observation control', primary=True)
if primary_observation_control_subtasks.count() > 0:
return primary_observation_control_subtasks.first()
else:
for predecessor in subtask.predecessors:
return _recursively_search_for_primary_predecessor_observation_subtask(predecessor)
return _recursively_search_for_primary_predecessor_observation_subtask(self)

Jorrit Schaap
committed
@property
def input_dataproducts(self) -> QuerySet:
'''return the input dataproducts(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)
If you want the result, add .all() like so: my_subtask.input_dataproducts.all()
'''

Jorrit Schaap
committed
return Dataproduct.objects.filter(consumers__subtask_id=self.id)

Jorrit Schaap
committed
@property
def output_dataproducts(self) -> QuerySet:
'''return the output dataproducts(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)

Jorrit Schaap
committed
If you want the result, add .all() like so: my_subtask.output_dataproducts.all()

Jorrit Schaap
committed
'''
return Dataproduct.objects.filter(producer__subtask_id=self.id)

Jorrit Schaap
committed
@property
def SAPs(self) -> QuerySet:
'''return the SAP's (SubArrayPointings) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)
If you want the result, add .all() like so: my_subtask.SAPs.all()
'''
return SAP.objects.filter(dataproducts__producer__subtask_id=self.id).distinct()
def get_transformed_input_dataproduct(self, output_dataproduct_id: int) -> 'Dataproduct':
'''return the transformed input dataproduct for the given output_dataproduct_id.'''
return self.input_dataproducts.get(consumer_transforms__output_id=output_dataproduct_id)
def get_transformed_output_dataproduct(self, input_dataproduct_id: int) -> 'Dataproduct':
'''return the transformed output dataproduct for the given input_dataproduct_id.'''
return self.output_dataproducts.get(producer_transforms__input_id=input_dataproduct_id)

Jorrit Schaap
committed
@property
def is_feedback_complete(self) -> bool:

Jorrit Schaap
committed
'''returns True if the feedback for all (>0) output dataproducts is filled in for a non-"empty"-template'''
nr_of_output_dataproducts = self.output_dataproducts.count()
return nr_of_output_dataproducts > 0 and self.output_dataproducts.filter(feedback_template__isnull=False).exclude(feedback_template__name="empty").count() == nr_of_output_dataproducts

Jorrit Schaap
committed
@property
def progress(self) -> float:
'''Get the progress of this subtask ranging from 0.0 before it is started, up to 1.0 when finished.'''
if self.state.value in [SubtaskState.Choices.DEFINING.value, SubtaskState.Choices.DEFINING.value,
SubtaskState.Choices.SCHEDULING.value, SubtaskState.Choices.SCHEDULED.value,
SubtaskState.Choices.QUEUEING.value, SubtaskState.Choices.QUEUED.value,
SubtaskState.Choices.STARTING.value]:
return 0.0
if self.state.value == SubtaskState.Choices.FINISHED.value:
return 1.0
if self.state.value in [SubtaskState.Choices.STARTED.value, SubtaskState.Choices.FINISHING.value]:
# subtask is running, compute progress if possible.
if self.specifications_template.type.value == SubtaskType.Choices.INGEST.value:
# progress for an ingest subtask is the ratio of archived output dataproducts over the total
num_archived_dataproducts = self.output_dataproducts.filter(archive_info__isnull=False).distinct('id').count()
num_dataproducts = self.output_dataproducts.count()
return float(num_archived_dataproducts) / float(num_dataproducts)
if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION.value:
# progress for an observation is just how far we are into the duration
num_seconds_running = max(0, (datetime.utcnow() - self.scheduled_on_sky_start_time).total_seconds())
return min(1.0, float(num_seconds_running) / float(self.duration.total_seconds()))
# TODO: add more progress computations for more subtask types if possible
raise NotImplementedError("Could not get progress for subtask id=%s, type=%s state=%s" % (self.id,
self.specifications_template.type.value,
self.state))
Jörn Künsemöller
committed
@property
def on_sky_start_time(self) -> datetime:
return self.scheduled_on_sky_start_time if self.actual_on_sky_start_time is None else self.actual_on_sky_start_time
@property
def on_sky_stop_time(self) -> datetime:
return self.scheduled_on_sky_stop_time if self.actual_on_sky_stop_time is None else self.actual_on_sky_stop_time
@property
def process_start_time(self) -> datetime:
return self.scheduled_process_start_time if self.actual_process_start_time is None else self.actual_process_start_time
@property
def process_stop_time(self) -> datetime:
return self.scheduled_process_stop_time if self.actual_process_stop_time is None else self.actual_process_stop_time

Jorrit Schaap
committed
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
creating = self._state.adding # True on create, False on update

Jorrit Schaap
committed
self.annotate_validate_add_defaults_to_doc_using_template('specifications_doc', 'specifications_template')
SIPidentifier.assign_new_id_to_model(self)
SIPidentifier.assign_new_parset_id_to_model(self)

Jorrit Schaap
committed
Jörn Künsemöller
committed
# check for uniqueness of SAP names:
# todo: this is a very specific check, that depends on the template. On the task level, we have a javascript
# field for complex validation on the template (I suspect that's for the frontend), so maybe we want to have
# a similar mechanism to describe complex validation on the backend?
if self.specifications_doc and 'stations' in self.specifications_doc and 'digital_pointings' in self.specifications_doc['stations']:
sap_names = [pointing['name'] for pointing in self.specifications_doc['stations']['digital_pointings']]
duplicate_names = [name for name in set(sap_names) if sap_names.count(name) > 1]
if duplicate_names:
raise ValidationError("Pointings defined in the same Subtask must have unique names. Duplicate names %s in subtask id=%s." % (duplicate_names, self.pk))
Jörn Künsemöller
committed

Jorrit Schaap
committed
# check if we have a start time when scheduling
Jörn Künsemöller
committed
if self.state.value == SubtaskState.Choices.SCHEDULED.value and self.__original_state_id == SubtaskState.Choices.SCHEDULING.value:
if self.scheduled_on_sky_start_time is None:

Jorrit Schaap
committed
raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start time is 'None'." % (self.pk, ))
# make sure that obsolete_since can only be set when it is None, but prevents changes:
if self.obsolete_since != self.__original_obsolete_since and self.__original_obsolete_since is not None:
raise ValidationError("This Subtask has been marked obsolete on %s and that cannot be changed to %s" % (self.__original_obsolete_since, self.obsolete_since))
# set actual_process_start_time when subtask goes to STARTED state
if self.state.value == SubtaskState.Choices.STARTED.value and self.__original_state_id == SubtaskState.Choices.STARTING.value:
self.actual_process_start_time = datetime.utcnow()
# set actual_process_stop_time when subtask goes to FINISHED/ERROR/CANCELLED state
if self.state.value in [SubtaskState.Choices.FINISHED.value, SubtaskState.Choices.ERROR.value or SubtaskState.Choices.CANCELLED.value] \
and self.actual_process_start_time and not self.actual_process_stop_time:
self.actual_process_stop_time = datetime.utcnow()

Jorrit Schaap
committed
# ensure there is and will be exactly one primary subtask per parent task_blueprint
# quite a complex check, luckily we have a test for that.
nr_of_primary_siblings = 0 if self.task_blueprint is None else self.task_blueprint.subtasks.filter(primary=True).exclude(id=self.id).count()
if (creating and ((self.primary and nr_of_primary_siblings!=0) or (not self.primary and nr_of_primary_siblings==0))) or \
(not creating and ((self.primary and nr_of_primary_siblings!=0) or (not self.primary and nr_of_primary_siblings==0))):
raise ValidationError("There should be exactly one primary subtask per parent task_blueprint")

Jorrit Schaap
committed
try:
super().save(force_insert, force_update, using, update_fields)
except InternalError as db_error:
# wrap in TMSS SubtaskIllegalStateTransitionException if needed
if 'ILLEGAL SUBTASK STATE TRANSITION' in str(db_error):
raise SubtaskIllegalStateTransitionException(str(db_error))
# else just reraise
raise
Jörn Künsemöller
committed
# log if either state update or new entry:
Jörn Künsemöller
committed
if self.state_id != self.__original_state_id or creating == True:
if self.created_or_updated_by_user is None:
identifier = None
else:
identifier = self.created_or_updated_by_user.email
Jörn Künsemöller
committed
log_entry = SubtaskStateLog(subtask=self, old_state=SubtaskState.objects.get(value=self.__original_state_id), new_state=self.state,
user=self.created_or_updated_by_user, user_identifier=identifier)
log_entry.save()
Jörn Künsemöller
committed
# update the previous state value
Jörn Künsemöller
committed
self.__original_state_id = self.state_id

Jorrit Schaap
committed
class SubtaskAllowedStateTransitions(Model):
"""
Table with the allowed subtask state transitions. See also the SQL trigger in populate which blocks any subtask state transitions which are not in this table, thus not allowed.
"""
old_state = ForeignKey('SubtaskState', null=True, editable=False, on_delete=PROTECT, related_name='allowed_transition_from', help_text='Subtask state before update (see Subtask State Machine).')
new_state = ForeignKey('SubtaskState', null=False, editable=False, on_delete=PROTECT, related_name='allowed_transition_to', help_text='Subtask state after update (see Subtask State Machine).')

Jorrit Schaap
committed
@staticmethod
def allowed_new_states(old_state: SubtaskState) -> [SubtaskState]:
'''get a list of all states we are allowed to transition to from the given old_state'''
return [transition.new_state for transition in SubtaskAllowedStateTransitions.objects.filter(old_state=old_state).all()]
@staticmethod
def illegal_new_states(old_state: SubtaskState) -> [SubtaskState]:
'''get a list of all states we are NOT allowed to transition to from the given old_state'''
allowed_new_states = SubtaskAllowedStateTransitions.allowed_new_states(old_state)
return list(SubtaskState.objects.exclude(value__in=[s.value for s in allowed_new_states]).exclude(pk=old_state.pk).all())

Jorrit Schaap
committed
@staticmethod
def allowed_old_states(new_state: SubtaskState) -> [SubtaskState]:
'''get a list of all states we are allowed to transition from to the given new_state'''
return [transition.old_state for transition in SubtaskAllowedStateTransitions.objects.filter(new_state=new_state).all()]

Jorrit Schaap
committed
class SubtaskStateLog(BasicCommon):
"""
History of state changes on subtasks
Jörn Künsemöller
committed
This is now a very specific solution and based on what SOS communicated is what they are regularly interested in.
Maybe one or two additional log tables for other models are benefitial and should be added at some point.
Note that we could of course also log on the db level and there is also a variety of audit middlewares for Django
available to keep track of changes more generally: https://djangopackages.org/grids/g/model-audit/
This seems a bit overkill at the moment and we have to manage access to those logs etc., this needs tbd.
user = ForeignKey(User, null=True, editable=False, on_delete=PROTECT, help_text='The user who changed the state of the subtask.')
user_identifier = CharField(null=True, editable=False, max_length=128, help_text='The ID of the user who changed the state of the subtask.')
subtask = ForeignKey('Subtask', null=False, editable=False, on_delete=CASCADE, help_text='Subtask to which this state change refers.')
Jörn Künsemöller
committed
old_state = ForeignKey('SubtaskState', null=True, editable=False, on_delete=PROTECT, related_name='is_old_state_of', help_text='Subtask state before update (see Subtask State Machine).')
new_state = ForeignKey('SubtaskState', null=False, editable=False, on_delete=PROTECT, related_name='is_new_state_of', help_text='Subtask state after update (see Subtask State Machine).')
class SubtaskInput(BasicCommon, TemplateSchemaMixin):

Jorrit Schaap
committed
subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='inputs', help_text='Subtask to which this input specification refers.')
input_role = ForeignKey('TaskConnectorType', null=True, related_name='subtask_inputs', on_delete=CASCADE, help_text='Input connector type (what kind of data can be consumed).')

Jorrit Schaap
committed
producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, related_name='consumers', help_text='The SubtaskOutput producing the input dataproducts for this SubtaskInput.')

Jorrit Schaap
committed
dataproducts = ManyToManyField('Dataproduct', related_name='consumers', help_text='The Dataproducts resulting from application of the filter at time of scheduling Although the dataproducts are simply the result of applying the filter on immutable data, the filter application could change over time. We thus store the result of this filtering directly to retain which input was specified for the task..')
selection_doc = JSONField(help_text='Filter to apply to the dataproducts of the producer, to derive input dataproducts when scheduling.')
selection_template = ForeignKey('TaskRelationSelectionTemplate', on_delete=PROTECT, help_text='Schema used for selection_doc.')
Jörn Künsemöller
committed

Jorrit Schaap
committed
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
self.annotate_validate_add_defaults_to_doc_using_template('selection_doc', 'selection_template')

Jorrit Schaap
committed
super().save(force_insert, force_update, using, update_fields)
Jörn Künsemöller
committed
class SubtaskOutput(BasicCommon):

Jorrit Schaap
committed
subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='outputs', help_text='Subtask to which this output specification refers.')

Jorrit Schaap
committed
output_role = ForeignKey('TaskConnectorType', null=True, related_name='subtask_outputs', on_delete=CASCADE, help_text='Output connector type (what kind of data is taken from the producer).')
class SAP(BasicCommon, TemplateSchemaMixin):
specifications_doc = JSONField(help_text='SAP properties.')
specifications_template = ForeignKey('SAPTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.')
global_identifier = OneToOneField('SIPidentifier', null=False, editable=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')

Jorrit Schaap
committed
# please note that the all fields become immutable when its dataproduct's producing subtask is finished (See SQL triggers)
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
self.annotate_validate_add_defaults_to_doc_using_template('specifications_doc', 'specifications_template')
SIPidentifier.assign_new_id_to_model(self)
super().save(force_insert, force_update, using, update_fields)
class Dataproduct(TemplateSchemaMixin, RefreshFromDbInvalidatesCachedPropertiesMixin, BasicCommon):
"""
A data product represents an atomic dataset, produced and consumed by subtasks. The consumed dataproducts are those
resulting from interpreting the Subtask Connector filters of the inputs. These links are explicitly saved, should
the interpretation of the filter change over time. The produced dataproducts enumerate everything produced by a
Subtask.
"""
filename = CharField(max_length=128, help_text='Name of the file (or top-level directory) of the dataproduct. Adheres to a naming convention, but is not meant for parsing.')
directory = CharField(max_length=1024, help_text='Directory where this dataproduct is (to be) stored.')
dataformat = ForeignKey('Dataformat', null=False, on_delete=PROTECT)
datatype = ForeignKey('Datatype', null=False, on_delete=PROTECT)
deleted_since = DateTimeField(null=True, help_text='When this dataproduct was removed from disk, or NULL if not deleted (NULLable).')
specifications_doc = JSONField(help_text='Dataproduct properties (f.e. beam, subband), to distinguish them when produced by the same task, and to act as input for selections in the Task Input and Work Request Relation Blueprint objects.')
specifications_template = ForeignKey('DataproductSpecificationsTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.')

Jorrit Schaap
committed
producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, related_name="dataproducts", help_text='Subtask Output which generates this dataproduct.')
expected_size = BigIntegerField(null=True, help_text='Expected size of dataproduct size, in bytes. Used for scheduling purposes. NULL if size is unknown (NULLable).')
size = BigIntegerField(null=True, help_text='Dataproduct size, in bytes. Used for accounting purposes. NULL if size is (yet) unknown (NULLable).')
feedback_doc = JSONField(help_text='Dataproduct properties, as reported by the producing process.')
feedback_template = ForeignKey('DataproductFeedbackTemplate', on_delete=PROTECT, help_text='Schema used for feedback_doc.')
sap = ForeignKey('SAP', on_delete=PROTECT, null=True, related_name="dataproducts", help_text='SAP this dataproduct was generated out of (NULLable).')
global_identifier = OneToOneField('SIPidentifier', editable=False, null=False, on_delete=PROTECT, help_text='The global unique identifier for LTA SIP.')

Jorrit Schaap
committed
# please note that the all fields except deleted_since and expected_size become immutable when its dataproduct's producing subtask is finished (See SQL triggers)

Jorrit Schaap
committed
class Meta(BasicCommon.Meta):
constraints = [UniqueConstraint(fields=['directory', 'filename'], name='%(class)s_unique_path')]

Jorrit Schaap
committed
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
self.annotate_validate_add_defaults_to_doc_using_template('specifications_doc', 'specifications_template')
self.annotate_validate_add_defaults_to_doc_using_template('feedback_doc', 'feedback_template')
SIPidentifier.assign_new_id_to_model(self)

Jorrit Schaap
committed
super().save(force_insert, force_update, using, update_fields)
@cached_property
def filepath(self):
'''return the full path of the dataproduct'''
return os.path.join(self.directory, self.filename)
class AntennaSet(NamedCommon):
station_type = ForeignKey('StationType', null=False, on_delete=PROTECT)
rcus = ArrayField(IntegerField(), size=128, blank=False)
inputs = ArrayField(CharField(max_length=128), size=128, blank=True)
Jörn Künsemöller
committed
class DataproductTransform(BasicCommon):
"""
Each output dataproduct of a Subtask is linked to the input dataproducts that are used to produce it.
These transforms encode the provenance information needed when tracking dependencies between dataproducts.
"""

Jorrit Schaap
committed
input = ForeignKey('Dataproduct', related_name='consumer_transforms', on_delete=PROTECT, help_text='A dataproduct that was the input of a transformation.')
output = ForeignKey('Dataproduct', related_name='producer_transforms', on_delete=PROTECT, help_text='A dataproduct that was produced from the input dataproduct.')
identity = BooleanField(help_text='TRUE if this transform only copies, tars, or losslessly compresses its input, FALSE if the transform changes the data. Allows for efficient reasoning about data duplication.')
Jörn Künsemöller
committed
class Filesystem(NamedCommon):
capacity = BigIntegerField(help_text='Capacity in bytes')
cluster = ForeignKey('Cluster', on_delete=PROTECT, help_text='Cluster hosting this filesystem.')
Jörn Künsemöller
committed
directory = CharField(max_length=1024, help_text='Root directory under which we are allowed to write our data.')
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
if self.directory and not self.directory.endswith('/'):
Jörn Künsemöller
committed
super().save(force_insert, force_update, using, update_fields)
Jörn Künsemöller
committed
class Cluster(NamedCommon):
location = CharField(max_length=128, help_text='Human-readable location of the cluster.')
archive_site = BooleanField(default=False, null=False, help_text='TRUE if this cluster is an archive site, FALSE if not (f.e. a local cluster, or user-owned cluster).')
Jörn Künsemöller
committed
class DataproductArchiveInfo(BasicCommon):

Jorrit Schaap
committed
dataproduct = OneToOneField('Dataproduct', related_name='archive_info', on_delete=PROTECT, help_text='A dataproduct residing in the archive.')
storage_ticket = CharField(max_length=128, help_text='Archive-system identifier.')
public_since = DateTimeField(null=True, help_text='Dataproduct is available for public download since this moment, or NULL if dataproduct is not (NULLable).')
corrupted_since = DateTimeField(null=True, help_text='Earliest timestamp from which this dataproduct is known to be partially or fully corrupt, or NULL if dataproduct is not known to be corrupt (NULLable).')

Jorrit Schaap
committed
# please note that the dataproduct, storage_ticket fields become immutable when its dataproduct's producing subtask is finished (See SQL triggers)
Jörn Künsemöller
committed
class DataproductHash(BasicCommon):
dataproduct = ForeignKey('Dataproduct', related_name='hashes', on_delete=PROTECT, help_text='The dataproduct to which this hash refers.')
hash_algorithm = ForeignKey('HashAlgorithm', null=False, on_delete=PROTECT, help_text='Algorithm used for hashing (MD5, AES256).')
hash = CharField(max_length=128, help_text='Hash value.')

Jorrit Schaap
committed
# please note that the dataproduct, hash_algorithm, hash fields become immutable when its dataproduct's producing subtask is finished (See SQL triggers)