From 4d87d3fd167a53264b03bb060c08edcbd3330f8c Mon Sep 17 00:00:00 2001 From: Vermaas <vermaas@astron.nl> Date: Fri, 2 Feb 2024 15:15:08 +0100 Subject: [PATCH] add update_activity functionality triggered by a signal --- atdb/taskdatabase/admin.py | 4 +- atdb/taskdatabase/migrations/0031_activity.py | 32 +++++++ .../migrations/0032_activity_priority.py | 18 ++++ ..._activity_calculated_qualities_and_more.py | 22 +++++ .../0034_activity_ingestq_status.py | 18 ++++ atdb/taskdatabase/models.py | 42 +++++---- atdb/taskdatabase/serializers.py | 8 +- atdb/taskdatabase/services/activities.py | 92 +++++++++++++++++++ atdb/taskdatabase/services/common.py | 19 +++- atdb/taskdatabase/services/signals.py | 21 +++-- atdb/taskdatabase/urls.py | 3 + atdb/taskdatabase/views.py | 39 +++++++- 12 files changed, 283 insertions(+), 35 deletions(-) create mode 100644 atdb/taskdatabase/migrations/0031_activity.py create mode 100644 atdb/taskdatabase/migrations/0032_activity_priority.py create mode 100644 atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py create mode 100644 atdb/taskdatabase/migrations/0034_activity_ingestq_status.py create mode 100644 atdb/taskdatabase/services/activities.py diff --git a/atdb/taskdatabase/admin.py b/atdb/taskdatabase/admin.py index 28057fdc..b659eaf4 100644 --- a/atdb/taskdatabase/admin.py +++ b/atdb/taskdatabase/admin.py @@ -1,5 +1,5 @@ from django.contrib import admin -from .models import Status, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor +from .models import Status, Task, Activity, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor admin.site.register(Status) @@ -8,7 +8,7 @@ class TaskAdmin(admin.ModelAdmin): ordering = ['-creationTime'] search_fields = ['id','sas_id'] - +admin.site.register(Activity) admin.site.register(Workflow) admin.site.register(LogEntry) admin.site.register(Configuration) diff --git a/atdb/taskdatabase/migrations/0031_activity.py b/atdb/taskdatabase/migrations/0031_activity.py new file mode 100644 index 00000000..9f706731 --- /dev/null +++ b/atdb/taskdatabase/migrations/0031_activity.py @@ -0,0 +1,32 @@ +# Generated by Django 5.0 on 2024-02-02 11:19 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0030_auto_20230707_1144'), + ] + + operations = [ + migrations.CreateModel( + name='Activity', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('sas_id', models.CharField(blank=True, db_index=True, max_length=15, null=True, verbose_name='SAS_ID')), + ('project', models.CharField(blank=True, default='unknown', max_length=100, null=True)), + ('filter', models.CharField(blank=True, max_length=30, null=True)), + ('status', models.CharField(blank=True, db_index=True, default='unknown', max_length=50, null=True)), + ('calculated_qualities', models.JSONField(blank=True, null=True)), + ('archive', models.JSONField(blank=True, null=True)), + ('output_sas_id', models.CharField(blank=True, max_length=15, null=True)), + ('has_summary', models.BooleanField(default=False)), + ('is_verified', models.BooleanField(default=False)), + ('finished_fraction', models.FloatField(blank=True, null=True)), + ('ingested_fraction', models.FloatField(blank=True, null=True)), + ('workflow', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='activities', to='taskdatabase.workflow')), + ], + ), + ] diff --git a/atdb/taskdatabase/migrations/0032_activity_priority.py b/atdb/taskdatabase/migrations/0032_activity_priority.py new file mode 100644 index 00000000..2d97ebd4 --- /dev/null +++ b/atdb/taskdatabase/migrations/0032_activity_priority.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-02-02 11:55 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0031_activity'), + ] + + operations = [ + migrations.AddField( + model_name='activity', + name='priority', + field=models.IntegerField(default=100, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py b/atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py new file mode 100644 index 00000000..f50df706 --- /dev/null +++ b/atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py @@ -0,0 +1,22 @@ +# Generated by Django 5.0 on 2024-02-02 12:57 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0032_activity_priority'), + ] + + operations = [ + migrations.RemoveField( + model_name='activity', + name='calculated_qualities', + ), + migrations.AddField( + model_name='activity', + name='calculated_quality', + field=models.CharField(blank=True, max_length=10, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0034_activity_ingestq_status.py b/atdb/taskdatabase/migrations/0034_activity_ingestq_status.py new file mode 100644 index 00000000..9511689d --- /dev/null +++ b/atdb/taskdatabase/migrations/0034_activity_ingestq_status.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-02-02 13:20 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0033_remove_activity_calculated_qualities_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='activity', + name='ingestq_status', + field=models.CharField(blank=True, default='', max_length=100, null=True), + ), + ] diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 01d9a97d..0b9a5a78 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -5,30 +5,14 @@ from django.utils.timezone import datetime, timedelta from django.conf import settings import json import logging -from enum import Enum from .services import calculated_qualities as qualities +from .services.common import State logger = logging.getLogger(__name__) # constants -class State(Enum): - DEFINED = "defined" - STAGED = "staged" - FETCHED = "fetched" - PROCESSED = "processed" - STORED = 'stored' - VALIDATED = "validated" - SCRUBBED = "scrubbed" - ARCHIVING = "archiving" - ARCHIVED = "archived" - FINISHED = "finished" - FINISHING = "finishing" - SUSPENDED = "suspended" - DISCARDED = "discarded" - FAILED = "failed" - datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' verified_statusses = ['stored','validated','scrubbed','archived','finished','suspended','discarded'] @@ -94,7 +78,6 @@ def convert_summary_to_list_for_template(task): return list - class Activity(models.Model): """ update when tasks status changes from ??? to ???: @@ -107,12 +90,13 @@ class Activity(models.Model): # fields used in overview pages sas_id = models.CharField(db_index=True, verbose_name="SAS_ID", max_length=15, blank=True, null=True) - workflow = models.ForeignKey(Workflow, related_name='tasks', on_delete=models.CASCADE, null=True, blank=True) + workflow = models.ForeignKey(Workflow, related_name='activities', on_delete=models.CASCADE, null=True, blank=True) project = models.CharField(max_length=100, blank=True, null=True, default="unknown") filter = models.CharField(max_length=30, blank=True, null=True) + priority = models.IntegerField(default=100, null=True) status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) - calculated_qualities = models.JSONField(null=True, blank=True) + calculated_quality = models.CharField(max_length=10, blank=True, null=True) archive = models.JSONField(null=True, blank=True) # output sas_id at LTA @@ -121,6 +105,20 @@ class Activity(models.Model): is_verified = models.BooleanField(default=False) finished_fraction = models.FloatField(blank=True, null=True) ingested_fraction = models.FloatField(blank=True, null=True) + ingestq_status = models.CharField(default="", max_length=100, blank=True, null=True) + + @property + def has_archived(self): + """ + check if any task belonging to this sas_id already has an output SAS_ID at the LTA + """ + + try: + if self.archive['sas_id_archived']: + return self.archive['sas_id_archived'] + + except: + return None class Task(models.Model): @@ -186,6 +184,10 @@ class Task(models.Model): tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) + # add information to the aggregated activity (SAS_ID) level + # moved to signals to avoid circular import + # activities.update_activity(self) + super(Task, self).save(*args, **kwargs) diff --git a/atdb/taskdatabase/serializers.py b/atdb/taskdatabase/serializers.py index dbecc5d1..7a2f4312 100644 --- a/atdb/taskdatabase/serializers.py +++ b/atdb/taskdatabase/serializers.py @@ -1,5 +1,11 @@ from rest_framework import serializers -from .models import Status, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor +from .models import Status, Task, Activity, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor + +class ActivitySerializer(serializers.ModelSerializer): + + class Meta: + model = Activity + fields = "__all__" class WorkflowSerializer(serializers.ModelSerializer): diff --git a/atdb/taskdatabase/services/activities.py b/atdb/taskdatabase/services/activities.py new file mode 100644 index 00000000..a34cd5bf --- /dev/null +++ b/atdb/taskdatabase/services/activities.py @@ -0,0 +1,92 @@ + +from .common import State +from taskdatabase.models import Task, Activity + + +def calculate_ingested_fraction(this_task): + """ + This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID + and a list of statusses of other tasks belonging to the same SAS_ID. + It is implemented as 'property', because then it can be used in html pages like this: + <td>{{ task.sasid_ingested_fraction.status }}</td> + <td>{{ task.sasid_ingested_fraction.completion }}%</td> + + A selection of statusses are considered 'queued', and another selection is considered 'ingested'. + The division of those 2 are returned as a 'completed %'. + A limited list of statusses for the other tasks that belong to this SAS_ID is also returned. + + """ + result = {} + statusses = {'scrubbed': 0, 'archiving': 0, 'archived': 0, 'finishing': 0, 'finished': 0, + 'suspended': 0, 'discarded': 0, 'archived_failed': 0, 'finished_failed': 0} + + tasks = Task.objects.filter(sas_id=this_task.sas_id) + + for task in tasks: + try: + statusses[task.status] += 1 + except: + pass + + incomplete = int(statusses['scrubbed']) + int(statusses['archiving']) + int(statusses['finishing']) + \ + int(statusses['suspended']) + int(statusses['archived_failed']) + int(statusses['finished_failed']) + complete = int(statusses['archived']) + int(statusses['finished']) + completion = round(complete / (incomplete + complete) * 100) + + non_zero_statusses = {key: value for key, value in statusses.items() if value != 0} + + result['status'] = non_zero_statusses + result['completion'] = completion + return result + + +def update_activity(task): + """ + The activity (SAS_ID level) is updated whenever a task change status. + Depending on the type of status change, certain calculations and updates are performed. + Doing this on status change, instead of on-the-fly when a user enters a page, balances the load. + + - to '???' : check for incoming 'archive' json from archiver + - to STORED : calculate quality + - to ??? : calculate finished_fraction + - to SCRUBBED, ARCHIVING, ARCHIVED, FINISHED : calculate ingested_fraction + + """ + try: + activity = Activity.objects.get(sas_id=task.sas_id) + except: + # no activity exists yet, create it + activity = Activity(sas_id=task.sas_id, + workflow=task.workflow, + project=task.project, + filter=task.filter, + priority=task.priority, + archive=task.archive) + activity.save() + + # depending on the status transition, perform calculations + if task.status == State.STORED.value: + # quality is calculated per task and per sas_id, reget the quality per sas_id + activity.calculated_quality = task.calculated_qualities['per_sasid'] + activity.save() + + # calculate the fraction and list of statusses of ingested tasks of this SAS_ID + if task.status in [State.SCRUBBED.value, State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value]: + result = calculate_ingested_fraction(task) + activity.ingested_fraction = result['completion'] + activity.ingestq_status = result['status'] + activity.save() + + + # check of any task of this activity already has LTA information. If so, copy to the activity level + if task.status in [State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value]: + + for t in Task.objects.filter(sas_id=task.sas_id): + try: + if t.archive['sas_id_archived']: + activity.archive = t.archive + break + except: + pass + + activity.save() \ No newline at end of file diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index 84a09434..f0ada76f 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -7,6 +7,21 @@ import time from enum import Enum logger = logging.getLogger(__name__) +class State(Enum): + DEFINED = "defined" + STAGED = "staged" + FETCHED = "fetched" + PROCESSED = "processed" + STORED = 'stored' + VALIDATED = "validated" + SCRUBBED = "scrubbed" + ARCHIVING = "archiving" + ARCHIVED = "archived" + FINISHED = "finished" + FINISHING = "finishing" + SUSPENDED = "suspended" + DISCARDED = "discarded" + FAILED = "failed" class SummaryFlavour(Enum): DEFAULT = "default" @@ -57,4 +72,6 @@ def get_summary_flavour(task): if 'details' in summary.keys(): summary_flavour = SummaryFlavour.IMAGING_COMPRESSION.value - return summary_flavour \ No newline at end of file + return summary_flavour + + diff --git a/atdb/taskdatabase/services/signals.py b/atdb/taskdatabase/services/signals.py index 99c213ba..42ae560a 100644 --- a/atdb/taskdatabase/services/signals.py +++ b/atdb/taskdatabase/services/signals.py @@ -6,6 +6,7 @@ from django.contrib.auth.models import User from django.dispatch import receiver from django.contrib.contenttypes.models import ContentType from taskdatabase.models import Task, Workflow, LogEntry, Status +from .activities import update_activity """ Signals sent from different parts of the backend are centrally defined and handled here. @@ -28,31 +29,31 @@ def handle_pre_save(sender, **kwargs): :param (in) kwargs: The instance of the object that sends the trigger. """ #logger.info("handle_pre_save(" + str(kwargs.get('instance')) + ")") - myTaskObject = kwargs.get('instance') + task = kwargs.get('instance') # IF this object does not exist yet, then abort, and let it first be handled by handle_post_save (get get a id). - if myTaskObject.id==None: + if task.id==None: return None # handle status change - status = str(myTaskObject.status) - new_status = str(myTaskObject.new_status) + status = str(task.status) + new_status = str(task.new_status) if (new_status!=None) and (status!=new_status): # set the new status - myTaskObject.status = new_status + task.status = new_status # add the new status to the status history - myStatus = Status(name=new_status, task=myTaskObject) + myStatus = Status(name=new_status, task=task) myStatus.save() - # temporarily disconnect the post_save handler to save the dataproduct (again) and avoiding recursion. - # I don't use pre_save, because then the 'created' key is not available, which is the most handy way to - # determine if this dataproduct already exists. (I could also check the database, but this is easier). + # temporarily disconnect to avoid recursion. + update_activity(task) + disconnect_signals() - myTaskObject.save() + task.save() connect_signals() diff --git a/atdb/taskdatabase/urls.py b/atdb/taskdatabase/urls.py index b3da85f8..9ee0ea9c 100644 --- a/atdb/taskdatabase/urls.py +++ b/atdb/taskdatabase/urls.py @@ -63,6 +63,9 @@ urlpatterns = [ path('postprocessing-tasks/', views.PostProcessingTaskListViewAPI.as_view(), name='postprocessing-tasks-api'), path('all-tasks/', views.AllTaskListViewAPI.as_view(), name='all-tasks-api'), + path('activities/', views.ActivityListViewAPI.as_view(), name='activities-api'), + path('activities/<int:pk>/', views.ActivityDetailsViewAPI.as_view(), name='activity-detail-view-api'), + path('workflows/', views.WorkflowListViewAPI.as_view(), name='workflows-api'), path('workflows/<int:pk>/', views.WorkflowDetailsViewAPI.as_view(), name='workflow-detail-view-api'), diff --git a/atdb/taskdatabase/views.py b/atdb/taskdatabase/views.py index 85f3cf55..f6325cc5 100644 --- a/atdb/taskdatabase/views.py +++ b/atdb/taskdatabase/views.py @@ -20,7 +20,8 @@ from rest_framework.request import Request #from silk.profiling.profiler import silk_profile from django.conf import settings -from .models import Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor, State +from .models import Activity, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor +from .services.common import State from .tables import TaskTable from .forms import QualityAnnotationForm, DiscardAnnotationForm @@ -29,6 +30,7 @@ from .serializers import \ TaskWriteSerializer, \ TaskReadSerializer, \ TaskReadSerializerFast, \ + ActivitySerializer, \ WorkflowSerializer, \ LogEntrySerializer, \ ConfigurationSerializer, \ @@ -88,6 +90,20 @@ class TaskFilterQueryPage(filters.FilterSet): } +class ActivityFilter(filters.FilterSet): + class Meta: + model = Activity + + fields = { + 'sas_id': ['exact', 'icontains', 'in'], + 'filter': ['exact', 'icontains'], + 'workflow__id': ['exact', 'icontains'], + 'project': ['exact', 'icontains'], + 'sas_id': ['exact', 'icontains', 'in'], + 'status': ['exact', 'icontains', 'in', 'startswith'], + } + + class WorkflowFilter(filters.FilterSet): class Meta: model = Workflow @@ -959,6 +975,27 @@ class TaskDetailsViewAPIFast(generics.RetrieveUpdateDestroyAPIView): serializer_class = TaskReadSerializerFast +# example: /atdb/activities/ +# show all tasks (regular and postprocessing) +class ActivityListViewAPI(generics.ListCreateAPIView): + """ + A pagination list of tasks, unsorted. + """ + model = Activity + queryset = Activity.objects.all() + + # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html + filter_backends = (filters.DjangoFilterBackend,) + filter_class = ActivityFilter + serializer_class = ActivitySerializer + +class ActivityDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): + model = Activity + queryset = Activity.objects.all() + serializer_class = ActivitySerializer + + + # example: /atdb/workflows/ class WorkflowListViewAPI(generics.ListCreateAPIView): model = Workflow -- GitLab