Newer
Older
"""
This file contains the database models
"""
import os

Jorrit Schaap
committed
import logging
logger = logging.getLogger(__name__)
from datetime import datetime, timedelta
Jörn Künsemöller
committed
from django.db.models import ForeignKey, CharField, DateTimeField, BooleanField, IntegerField, BigIntegerField, \

Jorrit Schaap
committed
ManyToManyField, CASCADE, SET_NULL, PROTECT, QuerySet
from django.contrib.postgres.fields import ArrayField, JSONField
from django.contrib.auth.models import User
from .specification import AbstractChoice, BasicCommon, Template, NamedCommon, annotate_validate_add_defaults_to_doc_using_template
from enum import Enum

Jorrit Schaap
committed
from django.db.models.expressions import RawSQL

Jorrit Schaap
committed
from lofar.sas.tmss.tmss.exceptions import SubtaskSchedulingException
from lofar.messaging.messagebus import ToBus, DEFAULT_BROKER, DEFAULT_BUSNAME

Jorrit Schaap
committed
from lofar.messaging.messages import EventMessage
from lofar.sas.tmss.client.tmssbuslistener import DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX
from lofar.common.util import single_line_with_single_spaces

Jorrit Schaap
committed
#
# I/O
#
#
# Choices
#
class SubtaskState(AbstractChoice):
Jörn Künsemöller
committed
"""Defines the model and predefined list of possible SubtaskStatusChoice's for Subtask.
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
Jörn Künsemöller
committed
DEFINING = "defining"
DEFINED = "defined"
SCHEDULING = "scheduling"
SCHEDULED = "scheduled"
QUEUEING = "queueing"
QUEUED = "queued"
STARTING = "starting"
STARTED = "started"
FINISHING = "finishing"
FINISHED = "finished"
CANCELLING = "cancelling"
CANCELLED = "cancelled"
ERROR = "error"
class SubtaskType(AbstractChoice):
"""Defines the model and predefined list of possible SubtaskType's for Subtask.
Jörn Künsemöller
committed
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
OBSERVATION = "observation"
PIPELINE = "pipeline"
INSPECTION = "inspection"
QA_FILES = "qa_files" # task which creates "adder" QA h5 file(s) from a MeasurementSet of beamformed data
QA_PLOTS = "qa_plots" # task which creates "adder" QA plots from an "adder" QA h5 file h5
DELETION = "deletion"
Jörn Künsemöller
committed
MANUAL = 'manual'
OTHER = 'other'
class StationType(AbstractChoice):
"""Defines the model and predefined list of possible StationType's for AntennaSet.
Jörn Künsemöller
committed
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
CORE = "core"
REMOTE = "remote"
INTERNATIONAL = "international"
Jörn Künsemöller
committed
class Algorithm(AbstractChoice):
"""Defines the model and predefined list of possible Algorithm's for DataproductHash.
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
MD5 = 'md5'
AES256 = 'aes256'
class ScheduleMethod(AbstractChoice):
"""Defines the model and predefined list of possible Algorithm's for DataproductHash.
The items in the Choices class below are automagically populated into the database via a data migration."""
class Choices(Enum):
MANUAL = 'manual'
BATCH = 'batch'
DYNAMIC = 'dynamic'
#
# Templates
#
Jörn Künsemöller
committed
class SubtaskTemplate(Template):
type = ForeignKey('SubtaskType', null=False, on_delete=PROTECT)
queue = BooleanField(default=False)
realtime = BooleanField(default=False)
Jörn Künsemöller
committed
class DefaultSubtaskTemplate(BasicCommon):
name = CharField(max_length=128, unique=True)
Jörn Künsemöller
committed
template = ForeignKey('SubtaskTemplate', on_delete=PROTECT)
class DataproductSpecificationsTemplate(Template):
class DefaultDataproductSpecificationsTemplate(BasicCommon):
name = CharField(max_length=128, unique=True)
template = ForeignKey('DataproductSpecificationsTemplate', on_delete=PROTECT)
Jörn Künsemöller
committed
class DataproductFeedbackTemplate(Template):
Jörn Künsemöller
committed
# todo: do we need to specify a default?
#
# Instance Objects
#

Jorrit Schaap
committed
class Subtask(BasicCommon):
"""
Represents a low-level task, which is an atomic unit of execution, such as running an observation, running
inspection plots on the observed data, etc. Each task has a specific configuration, will have resources allocated
to it, and represents a single run.
"""
start_time = DateTimeField(null=True, help_text='Start this subtask at the specified time (NULLable).')
stop_time = DateTimeField(null=True, help_text='Stop this subtask at the specified time (NULLable).')
state = ForeignKey('SubtaskState', null=False, on_delete=PROTECT, related_name='task_states', help_text='Subtask state (see Subtask State Machine).')
specifications_doc = JSONField(help_text='Final specifications, as input for the controller.')
Jörn Künsemöller
committed
task_blueprint = ForeignKey('TaskBlueprint', related_name='subtasks', null=True, on_delete=SET_NULL, help_text='Task Blueprint to which this Subtask belongs.')
specifications_template = ForeignKey('SubtaskTemplate', null=False, on_delete=PROTECT, help_text='Schema used for specifications_doc.')
do_cancel = DateTimeField(null=True, help_text='Timestamp when the subtask has been ordered to cancel (NULLable).')
priority = IntegerField(help_text='Absolute priority of this subtask (higher value means more important).')
schedule_method = ForeignKey('ScheduleMethod', null=False, on_delete=PROTECT, help_text='Which method to use for scheduling this Subtask. One of (MANUAL, BATCH, DYNAMIC).')
cluster = ForeignKey('Cluster', null=True, on_delete=PROTECT, help_text='Where the Subtask is scheduled to run (NULLable).')
Jörn Künsemöller
committed
# resource_claim = ForeignKey("ResourceClaim", null=False, on_delete=PROTECT) # todo <-- how is this external reference supposed to work?
Jörn Künsemöller
committed
created_or_updated_by_user = ForeignKey(User, null=True, editable=False, on_delete=PROTECT, help_text='The user who created / updated the subtask.')
raw_feedback = CharField(null=True, max_length=1048576, help_text='The raw feedback for this Subtask')
Jörn Künsemöller
committed
def __init__(self, *args, **kwargs):

Jorrit Schaap
committed
super().__init__(*args, **kwargs)
Jörn Künsemöller
committed
# keep original state for logging
Jörn Künsemöller
committed
self.__original_state_id = self.state_id

Jorrit Schaap
committed
@property
def duration(self) -> timedelta:
'''the duration of this subtask (stop-start), or 0 if start/stop are None'''
if self.start_time is None or self.stop_time is None:
return timedelta(seconds=0)
return self.stop_time - self.start_time
@property
def specified_duration(self) -> timedelta:
'''get the specified (or estimated) duration of this subtask based on the specified task duration and the subtask type'''
if self.specifications_template.type.value == SubtaskType.Choices.OBSERVATION:
# observations have a specified duration, so grab it from the spec.

Jorrit Schaap
committed
return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', 0))
if self.specifications_template.type.value == SubtaskType.Choices.PIPELINE:
# pipelines usually do not have a specified duration, so make a guess (half the obs duration?).
return timedelta(seconds=self.task_blueprint.specifications_doc.get('duration', max(p.specified_duration for p in self.predecessors)/2))

Jorrit Schaap
committed
# other subtasktypes usually depend on cpu/data/network etc. So, make a guess (for now)
return timedelta(minutes=5)

Jorrit Schaap
committed

Jorrit Schaap
committed
@staticmethod
def _send_state_change_event_message(subtask_id:int, old_state: str, new_state: str):
with ToBus(exchange=os.environ.get("TMSS_EXCHANGE", DEFAULT_BUSNAME),
broker=os.environ.get("TMSS_BROKER", DEFAULT_BROKER)) as tobus: #TODO: do we want to connect to the bus for each new message, or have some global tobus?

Jorrit Schaap
committed
msg = EventMessage(subject="%s.%s" % (DEFAULT_TMSS_SUBTASK_NOTIFICATION_PREFIX, new_state.capitalize()),
content={'subtask_id': subtask_id, 'old_state': old_state, 'new_state': new_state})
address = tobus.remote_address
logger.info("Sending message with subject '%s' to exchange='%s' on broker=%s:%s content: %s",
msg.subject, tobus.exchange, address[0], address[1], single_line_with_single_spaces(msg.content))

Jorrit Schaap
committed
tobus.send(msg)

Jorrit Schaap
committed
@staticmethod
def independent_subtasks() -> QuerySet:
'''return a QuerySet of all subtasks with no input (i.e. which are "independent" because they have no predecessors)
If you want the result, add .all() like so: Subtask.independent_subtasks().all()
'''
return Subtask.objects.filter(inputs=None)

Jorrit Schaap
committed
@property

Jorrit Schaap
committed
def successors(self) -> QuerySet:
'''return the connect successor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)
If you want the result, add .all() like so: my_subtask.successors.all()
'''

Jorrit Schaap
committed
# JS, 20200528: I couldn't make django do a "self-reference" query from the subtask table to the subtask table (via input, output), so I used plain SQL.
return Subtask.objects.filter(id__in=RawSQL("SELECT successor_st.id FROM tmssapp_subtask as successor_st\n"
"INNER JOIN tmssapp_subtaskinput as st_input on st_input.subtask_id = successor_st.id\n"
"INNER JOIN tmssapp_subtaskoutput as st_output on st_output.id = st_input.producer_id\n"
"WHERE st_output.subtask_id = %s", params=[self.id]))
@property

Jorrit Schaap
committed
def predecessors(self) -> QuerySet:
'''return the connect predecessor subtask(s) as queryset (over which you can perform extended queries, or return via the serializers/viewsets)
If you want the result, add .all() like so: my_subtask.predecessors.all()
'''

Jorrit Schaap
committed
# JS, 20200528: I couldn't make django do a "self-reference" query from the subtask table to the subtask table (via input, output), so I used plain SQL.
return Subtask.objects.filter(id__in=RawSQL("SELECT predecessor_st.id FROM tmssapp_subtask as predecessor_st\n"
"INNER JOIN tmssapp_subtaskoutput as st_output on st_output.subtask_id = predecessor_st.id\n"
"INNER JOIN tmssapp_subtaskinput as st_input on st_input.producer_id = st_output.id\n"
"WHERE st_input.subtask_id = %s", params=[self.id]))

Jorrit Schaap
committed
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
creating = self._state.adding # True on create, False on update

Jorrit Schaap
committed
annotate_validate_add_defaults_to_doc_using_template(self, 'specifications_doc', 'specifications_template')

Jorrit Schaap
committed
Jörn Künsemöller
committed
if self.state.value == SubtaskState.Choices.SCHEDULED.value and self.__original_state_id == SubtaskState.Choices.SCHEDULING.value:

Jorrit Schaap
committed
if self.predecessors.all().count() == 0:
raise SubtaskSchedulingException("Cannot schedule subtask id=%s when start time is 'None'." % (self.pk, ))
else:
self.start_time = datetime.utcnow()
if self.state.value == SubtaskState.Choices.FINISHING.value:
self.stop_time = datetime.utcnow()

Jorrit Schaap
committed
super().save(force_insert, force_update, using, update_fields)
Jörn Künsemöller
committed
# log if either state update or new entry:
Jörn Künsemöller
committed
if self.state_id != self.__original_state_id or creating == True:
if self.created_or_updated_by_user is None:
identifier = None
else:
identifier = self.created_or_updated_by_user.email
Jörn Künsemöller
committed
log_entry = SubtaskStateLog(subtask=self, old_state=SubtaskState.objects.get(value=self.__original_state_id), new_state=self.state,
user=self.created_or_updated_by_user, user_identifier=identifier)
log_entry.save()
Jörn Künsemöller
committed

Jorrit Schaap
committed
try:
self._send_state_change_event_message(self.id, log_entry.old_state.value, log_entry.new_state.value)
except Exception as e:
logger.error("Could not send state change to messagebus: %s", e)

Jorrit Schaap
committed
# update the previous state value
Jörn Künsemöller
committed
self.__original_state_id = self.state_id
class SubtaskStateLog(BasicCommon):
"""
History of state changes on subtasks
Jörn Künsemöller
committed
This is now a very specific solution and based on what SOS communicated is what they are regularly interested in.
Maybe one or two additional log tables for other models are benefitial and should be added at some point.
Note that we could of course also log on the db level and there is also a variety of audit middlewares for Django
available to keep track of changes more generally: https://djangopackages.org/grids/g/model-audit/
This seems a bit overkill at the moment and we have to manage access to those logs etc., this needs tbd.
user = ForeignKey(User, null=True, editable=False, on_delete=PROTECT, help_text='The user who changed the state of the subtask.')
user_identifier = CharField(null=True, editable=False, max_length=128, help_text='The ID of the user who changed the state of the subtask.')
subtask = ForeignKey('Subtask', null=False, editable=False, on_delete=CASCADE, help_text='Subtask to which this state change refers.')
Jörn Künsemöller
committed
old_state = ForeignKey('SubtaskState', null=True, editable=False, on_delete=PROTECT, related_name='is_old_state_of', help_text='Subtask state before update (see Subtask State Machine).')
new_state = ForeignKey('SubtaskState', null=False, editable=False, on_delete=PROTECT, related_name='is_new_state_of', help_text='Subtask state after update (see Subtask State Machine).')
Jörn Künsemöller
committed
class SubtaskInput(BasicCommon):

Jorrit Schaap
committed
subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='inputs', help_text='Subtask to which this input specification refers.')
task_relation_blueprint = ForeignKey('TaskRelationBlueprint', null=True, on_delete=SET_NULL, help_text='Task Relation Blueprint which this Subtask Input implements (NULLable).')

Jorrit Schaap
committed
producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, related_name='consumers', help_text='The SubtaskOutput producing the input dataproducts for this SubtaskInput.')
dataproducts = ManyToManyField('Dataproduct', help_text='The Dataproducts resulting from application of the filter at time of scheduling Although the dataproducts are simply the result of applying the filter on immutable data, the filter application could change over time. We thus store the result of this filtering directly to retain which input was specified for the task..')
selection_doc = JSONField(help_text='Filter to apply to the dataproducts of the producer, to derive input dataproducts when scheduling.')
selection_template = ForeignKey('TaskRelationSelectionTemplate', on_delete=PROTECT, help_text='Schema used for selection_doc.')
Jörn Künsemöller
committed

Jorrit Schaap
committed
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
annotate_validate_add_defaults_to_doc_using_template(self, 'selection_doc', 'selection_template')

Jorrit Schaap
committed
super().save(force_insert, force_update, using, update_fields)
Jörn Künsemöller
committed
class SubtaskOutput(BasicCommon):

Jorrit Schaap
committed
subtask = ForeignKey('Subtask', null=False, on_delete=CASCADE, related_name='outputs', help_text='Subtask to which this output specification refers.')
class Dataproduct(BasicCommon):
"""
A data product represents an atomic dataset, produced and consumed by subtasks. The consumed dataproducts are those
resulting from interpreting the Subtask Connector filters of the inputs. These links are explicitly saved, should
the interpretation of the filter change over time. The produced dataproducts enumerate everything produced by a
Subtask.
"""
filename = CharField(max_length=128, help_text='Name of the file (or top-level directory) of the dataproduct. Adheres to a naming convention, but is not meant for parsing.')
directory = CharField(max_length=1024, help_text='Directory where this dataproduct is (to be) stored.')
dataformat = ForeignKey('Dataformat', null=False, on_delete=PROTECT)
datatype = ForeignKey('Datatype', null=False, on_delete=PROTECT)
deleted_since = DateTimeField(null=True, help_text='When this dataproduct was removed from disk, or NULL if not deleted (NULLable).')
pinned_since = DateTimeField(null=True, help_text='When this dataproduct was pinned to disk, that is, forbidden to be removed, or NULL if not pinned (NULLable).')
specifications_doc = JSONField(help_text='Dataproduct properties (f.e. beam, subband), to distinguish them when produced by the same task, and to act as input for selections in the Task Input and Work Request Relation Blueprint objects.')
specifications_template = ForeignKey('DataproductSpecificationsTemplate', null=False, on_delete=CASCADE, help_text='Schema used for specifications_doc.')

Jorrit Schaap
committed
producer = ForeignKey('SubtaskOutput', on_delete=PROTECT, related_name="dataproducts", help_text='Subtask Output which generates this dataproduct.')
do_cancel = DateTimeField(null=True, help_text='When this dataproduct was cancelled (NULLable). Cancelling a dataproduct triggers cleanup if necessary.')
expected_size = BigIntegerField(null=True, help_text='Expected size of dataproduct size, in bytes. Used for scheduling purposes. NULL if size is unknown (NULLable).')
size = BigIntegerField(null=True, help_text='Dataproduct size, in bytes. Used for accounting purposes. NULL if size is (yet) unknown (NULLable).')
feedback_doc = JSONField(help_text='Dataproduct properties, as reported by the producing process.')
feedback_template = ForeignKey('DataproductFeedbackTemplate', on_delete=PROTECT, help_text='Schema used for feedback_doc.')

Jorrit Schaap
committed
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
annotate_validate_add_defaults_to_doc_using_template(self, 'specifications_doc', 'specifications_template')
annotate_validate_add_defaults_to_doc_using_template(self, 'feedback_doc', 'feedback_template')

Jorrit Schaap
committed
super().save(force_insert, force_update, using, update_fields)
class AntennaSet(NamedCommon):
station_type = ForeignKey('StationType', null=False, on_delete=PROTECT)
rcus = ArrayField(IntegerField(), size=128, blank=False)
inputs = ArrayField(CharField(max_length=128), size=128, blank=True)
Jörn Künsemöller
committed
class DataproductTransform(BasicCommon):
"""
Each output dataproduct of a Subtask is linked to the input dataproducts that are used to produce it.
These transforms encode the provenance information needed when tracking dependencies between dataproducts.
"""
input = ForeignKey('Dataproduct', related_name='inputs', on_delete=PROTECT, help_text='A dataproduct that was the input of a transformation.')
output = ForeignKey('Dataproduct', related_name='outputs', on_delete=PROTECT, help_text='A dataproduct that was produced from the input dataproduct.')
identity = BooleanField(help_text='TRUE if this transform only copies, tars, or losslessly compresses its input, FALSE if the transform changes the data. Allows for efficient reasoning about data duplication.')
Jörn Künsemöller
committed
class Filesystem(NamedCommon):
capacity = BigIntegerField(help_text='Capacity in bytes')
cluster = ForeignKey('Cluster', on_delete=PROTECT, help_text='Cluster hosting this filesystem.')
Jörn Künsemöller
committed
directory = CharField(max_length=1024, help_text='Root directory under which we are allowed to write our data.')
def save(self, force_insert=False, force_update=False, using=None, update_fields=None):
if self.directory and not self.directory.endswith('/'):
raise ValueError('directory value must end with a trailing slash!') # todo: ...and needs to start with slash?
super().save(force_insert, force_update, using, update_fields)
Jörn Künsemöller
committed
class Cluster(NamedCommon):
location = CharField(max_length=128, help_text='Human-readable location of the cluster.')
archive_site = BooleanField(default=False, null=False, help_text='TRUE if this cluster is an archive site, FALSE if not (f.e. a local cluster, or user-owned cluster).')
Jörn Künsemöller
committed
class DataproductArchiveInfo(BasicCommon):
dataproduct = ForeignKey('Dataproduct', on_delete=PROTECT, help_text='A dataproduct residing in the archive.')
storage_ticket = CharField(max_length=128, help_text='Archive-system identifier.')
public_since = DateTimeField(null=True, help_text='Dataproduct is available for public download since this moment, or NULL if dataproduct is not (NULLable).')
corrupted_since = DateTimeField(null=True, help_text='Earliest timestamp from which this dataproduct is known to be partially or fully corrupt, or NULL if dataproduct is not known to be corrupt (NULLable).')
Jörn Künsemöller
committed
class DataproductHash(BasicCommon):
dataproduct = ForeignKey('Dataproduct', on_delete=PROTECT, help_text='The dataproduct to which this hash refers.')
algorithm = ForeignKey('Algorithm', null=False, on_delete=PROTECT, help_text='Algorithm used (MD5, AES256).')
hash = CharField(max_length=128, help_text='Hash value.')