diff --git a/.gitignore b/.gitignore index 7a0e2826ec164fb8f0e878360c63c161c17f7ef8..7beda8d3e0c04267d13a1eb6e190f2da759f3e31 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ **/__pycache__/ *.sql /atdb/run.sh +/.idea/ +/.idea \ No newline at end of file diff --git a/.idea/atdb-ldv.iml b/.idea/atdb-ldv.iml index 2ab3c03993ecfb0732bf60b1d5b642f09a14154a..26367b62b8dda837033b17f2c43bdc9b28c74773 100644 --- a/.idea/atdb-ldv.iml +++ b/.idea/atdb-ldv.iml @@ -3,8 +3,9 @@ <component name="NewModuleRootManager"> <content url="file://$MODULE_DIR$"> <excludeFolder url="file://$MODULE_DIR$/venv" /> + <excludeFolder url="file://$MODULE_DIR$/venv310" /> </content> - <orderEntry type="jdk" jdkName="Python 3.9 (atdb-ldv)" jdkType="Python SDK" /> + <orderEntry type="jdk" jdkName="Python 3.10 (atdb-ldv)" jdkType="Python SDK" /> <orderEntry type="sourceFolder" forTests="false" /> </component> <component name="PyDocumentationSettings"> diff --git a/.idea/misc.xml b/.idea/misc.xml index b69afdb89884dd81772c071f7da9af8a4225b6de..5ce0e771c301eeecd4c0bce0b4d5586fe622754d 100644 --- a/.idea/misc.xml +++ b/.idea/misc.xml @@ -1,6 +1,6 @@ <?xml version="1.0" encoding="UTF-8"?> <project version="4"> - <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.9 (atdb-ldv)" project-jdk-type="Python SDK" /> + <component name="ProjectRootManager" version="2" project-jdk-name="Python 3.10 (atdb-ldv)" project-jdk-type="Python SDK" /> <component name="PyCharmProfessionalAdvertiser"> <option name="shown" value="true" /> </component> diff --git a/atdb/atdb/settings/dev.py b/atdb/atdb/settings/dev.py index 32433ae2ce74beb62caaf4806f14d2cd8a6e570d..d2998259833ca63c52795f8d26fd788df57d5436 100644 --- a/atdb/atdb/settings/dev.py +++ b/atdb/atdb/settings/dev.py @@ -13,7 +13,8 @@ DATABASES = { 'ENGINE': 'django.db.backends.postgresql_psycopg2', 'USER': 'atdb_admin', 'PASSWORD': 'atdb123', - 'NAME': 'atdb_ldv_12jan2024', + #'NAME': 'atdb_ldv_19jan2024', + 'NAME': 'atdb_ldv_9feb2024', 'HOST': 'localhost', 'PORT': '5432', }, diff --git a/atdb/docker/docker-compose-dev-cd.yml b/atdb/docker/docker-compose-dev-cd.yml index 33e9b6b06d9a8e362eb4df18570caa1a586160e6..f730fac6aafd139f0c26e083f866c49ccd80dac8 100644 --- a/atdb/docker/docker-compose-dev-cd.yml +++ b/atdb/docker/docker-compose-dev-cd.yml @@ -31,6 +31,8 @@ services: - atdb_network env_file: - $HOME/shared/atdb_ldv.env + environment: + - DEBUG=true labels: - "traefik.enable=true" - "traefik.http.routers.atdb-backend.entryPoints=atdb-ldv" diff --git a/atdb/docs/ATDB-LDV Data Model.png b/atdb/docs/ATDB-LDV Data Model.png index 357a6ea1859098a01ff2d7c195bc379c165a2496..b82814904d74f2303a3a6cf5dd27c2927c289a49 100644 Binary files a/atdb/docs/ATDB-LDV Data Model.png and b/atdb/docs/ATDB-LDV Data Model.png differ diff --git a/atdb/run.sh.example b/atdb/run.sh.example deleted file mode 100644 index 0679736b557e35f12110609d48356b5a6e0a4c05..0000000000000000000000000000000000000000 --- a/atdb/run.sh.example +++ /dev/null @@ -1,4 +0,0 @@ -export KEYCLOAK_URL=https://keycloak-sdc.astron.nl -export KEYCLOAK_CLIENT_ID=ATDB-LDV-DEV -export KEYCLOAK_CLIENT_SECRET= -python manage.py runserver --settings=atdb.settings.dev diff --git a/atdb/taskdatabase/admin.py b/atdb/taskdatabase/admin.py index 28057fdc4421b0c574298ffa9f43552c7f45fba8..cc5cab803fa9543b4b4d08d30412f824230773f3 100644 --- a/atdb/taskdatabase/admin.py +++ b/atdb/taskdatabase/admin.py @@ -1,5 +1,5 @@ from django.contrib import admin -from .models import Status, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor +from .models import Status, Task, Activity, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor admin.site.register(Status) @@ -8,6 +8,10 @@ class TaskAdmin(admin.ModelAdmin): ordering = ['-creationTime'] search_fields = ['id','sas_id'] +@admin.register(Activity) +class ActivityAdmin(admin.ModelAdmin): + ordering = ['-sas_id'] + search_fields = ['id','sas_id'] admin.site.register(Workflow) admin.site.register(LogEntry) diff --git a/atdb/taskdatabase/migrations/0031_activity.py b/atdb/taskdatabase/migrations/0031_activity.py new file mode 100644 index 0000000000000000000000000000000000000000..9f706731f49040ec8ffef37c4e76774cfcb5cd48 --- /dev/null +++ b/atdb/taskdatabase/migrations/0031_activity.py @@ -0,0 +1,32 @@ +# Generated by Django 5.0 on 2024-02-02 11:19 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0030_auto_20230707_1144'), + ] + + operations = [ + migrations.CreateModel( + name='Activity', + fields=[ + ('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('sas_id', models.CharField(blank=True, db_index=True, max_length=15, null=True, verbose_name='SAS_ID')), + ('project', models.CharField(blank=True, default='unknown', max_length=100, null=True)), + ('filter', models.CharField(blank=True, max_length=30, null=True)), + ('status', models.CharField(blank=True, db_index=True, default='unknown', max_length=50, null=True)), + ('calculated_qualities', models.JSONField(blank=True, null=True)), + ('archive', models.JSONField(blank=True, null=True)), + ('output_sas_id', models.CharField(blank=True, max_length=15, null=True)), + ('has_summary', models.BooleanField(default=False)), + ('is_verified', models.BooleanField(default=False)), + ('finished_fraction', models.FloatField(blank=True, null=True)), + ('ingested_fraction', models.FloatField(blank=True, null=True)), + ('workflow', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='activities', to='taskdatabase.workflow')), + ], + ), + ] diff --git a/atdb/taskdatabase/migrations/0032_activity_priority.py b/atdb/taskdatabase/migrations/0032_activity_priority.py new file mode 100644 index 0000000000000000000000000000000000000000..2d97ebd42d9344381392ddb1107b22a071030b05 --- /dev/null +++ b/atdb/taskdatabase/migrations/0032_activity_priority.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-02-02 11:55 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0031_activity'), + ] + + operations = [ + migrations.AddField( + model_name='activity', + name='priority', + field=models.IntegerField(default=100, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py b/atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py new file mode 100644 index 0000000000000000000000000000000000000000..f50df706b384a5ddb439aff27f9489a97d250c59 --- /dev/null +++ b/atdb/taskdatabase/migrations/0033_remove_activity_calculated_qualities_and_more.py @@ -0,0 +1,22 @@ +# Generated by Django 5.0 on 2024-02-02 12:57 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0032_activity_priority'), + ] + + operations = [ + migrations.RemoveField( + model_name='activity', + name='calculated_qualities', + ), + migrations.AddField( + model_name='activity', + name='calculated_quality', + field=models.CharField(blank=True, max_length=10, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0034_activity_ingestq_status.py b/atdb/taskdatabase/migrations/0034_activity_ingestq_status.py new file mode 100644 index 0000000000000000000000000000000000000000..9511689db5bb6298c3f1edb22c3f1fbdda251c52 --- /dev/null +++ b/atdb/taskdatabase/migrations/0034_activity_ingestq_status.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-02-02 13:20 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0033_remove_activity_calculated_qualities_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='activity', + name='ingestq_status', + field=models.CharField(blank=True, default='', max_length=100, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0035_task_activity.py b/atdb/taskdatabase/migrations/0035_task_activity.py new file mode 100644 index 0000000000000000000000000000000000000000..71eb815824e8e3d6e6699057251e8a702218dd89 --- /dev/null +++ b/atdb/taskdatabase/migrations/0035_task_activity.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0 on 2024-02-06 12:38 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0034_activity_ingestq_status'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='activity', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.DO_NOTHING, related_name='tasks', to='taskdatabase.activity'), + ), + ] diff --git a/atdb/taskdatabase/migrations/0036_remove_activity_output_sas_id_activity_remaining_and_more.py b/atdb/taskdatabase/migrations/0036_remove_activity_output_sas_id_activity_remaining_and_more.py new file mode 100644 index 0000000000000000000000000000000000000000..cd15446d11e600a30ba2443f54d9c3eaeca58f1e --- /dev/null +++ b/atdb/taskdatabase/migrations/0036_remove_activity_output_sas_id_activity_remaining_and_more.py @@ -0,0 +1,27 @@ +# Generated by Django 5.0 on 2024-02-09 08:06 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0035_task_activity'), + ] + + operations = [ + migrations.RemoveField( + model_name='activity', + name='output_sas_id', + ), + migrations.AddField( + model_name='activity', + name='remaining', + field=models.FloatField(blank=True, null=True), + ), + migrations.AddField( + model_name='activity', + name='total_size', + field=models.FloatField(blank=True, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0037_remove_activity_has_summary.py b/atdb/taskdatabase/migrations/0037_remove_activity_has_summary.py new file mode 100644 index 0000000000000000000000000000000000000000..213fd3b13c4f3909346e93e44a97d89452b0e1c2 --- /dev/null +++ b/atdb/taskdatabase/migrations/0037_remove_activity_has_summary.py @@ -0,0 +1,17 @@ +# Generated by Django 5.0 on 2024-02-09 09:42 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0036_remove_activity_output_sas_id_activity_remaining_and_more'), + ] + + operations = [ + migrations.RemoveField( + model_name='activity', + name='has_summary', + ), + ] diff --git a/atdb/taskdatabase/migrations/0038_remove_activity_priority_remove_activity_workflow_and_more.py b/atdb/taskdatabase/migrations/0038_remove_activity_priority_remove_activity_workflow_and_more.py new file mode 100644 index 0000000000000000000000000000000000000000..5cfac571a3d5032123d390a48b921d0237c6c1b1 --- /dev/null +++ b/atdb/taskdatabase/migrations/0038_remove_activity_priority_remove_activity_workflow_and_more.py @@ -0,0 +1,26 @@ +# Generated by Django 5.0 on 2024-02-09 12:18 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0037_remove_activity_has_summary'), + ] + + operations = [ + migrations.RemoveField( + model_name='activity', + name='priority', + ), + migrations.RemoveField( + model_name='activity', + name='workflow', + ), + migrations.AddField( + model_name='activity', + name='workflow_id', + field=models.IntegerField(null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0039_remove_activity_ingestq_status.py b/atdb/taskdatabase/migrations/0039_remove_activity_ingestq_status.py new file mode 100644 index 0000000000000000000000000000000000000000..0badd85b19e746b7233ae809f543bb852963568e --- /dev/null +++ b/atdb/taskdatabase/migrations/0039_remove_activity_ingestq_status.py @@ -0,0 +1,17 @@ +# Generated by Django 5.0 on 2024-02-10 07:03 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0038_remove_activity_priority_remove_activity_workflow_and_more'), + ] + + operations = [ + migrations.RemoveField( + model_name='activity', + name='ingestq_status', + ), + ] diff --git a/atdb/taskdatabase/migrations/0040_activity_ingestq_status.py b/atdb/taskdatabase/migrations/0040_activity_ingestq_status.py new file mode 100644 index 0000000000000000000000000000000000000000..d1b0e9c62553908e1d049358aa47c769538db594 --- /dev/null +++ b/atdb/taskdatabase/migrations/0040_activity_ingestq_status.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-02-10 07:03 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0039_remove_activity_ingestq_status'), + ] + + operations = [ + migrations.AddField( + model_name='activity', + name='ingestq_status', + field=models.JSONField(blank=True, null=True), + ), + ] diff --git a/atdb/taskdatabase/migrations/0041_alter_task_activity.py b/atdb/taskdatabase/migrations/0041_alter_task_activity.py new file mode 100644 index 0000000000000000000000000000000000000000..c276e0f747b646ce212351a7b6f388f609078644 --- /dev/null +++ b/atdb/taskdatabase/migrations/0041_alter_task_activity.py @@ -0,0 +1,19 @@ +# Generated by Django 5.0 on 2024-02-12 10:19 + +import django.db.models.deletion +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0040_activity_ingestq_status'), + ] + + operations = [ + migrations.AlterField( + model_name='task', + name='activity', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='tasks', to='taskdatabase.activity'), + ), + ] diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 30f8afe3ed76e284906c861c50d04fbc41a822cf..9142b20908800a72fbcdf8f49c26bf7f12d69245 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -5,32 +5,15 @@ from django.utils.timezone import datetime, timedelta from django.conf import settings import json import logging -from enum import Enum from .services import calculated_qualities as qualities +from .services.common import State, verified_statusses logger = logging.getLogger(__name__) # constants - -class State(Enum): - DEFINED = "defined" - STAGED = "staged" - FETCHED = "fetched" - PROCESSED = "processed" - STORED = 'stored' - VALIDATED = "validated" - SCRUBBED = "scrubbed" - ARCHIVING = "archiving" - ARCHIVED = "archived" - FINISHED = "finished" - FINISHING = "finishing" - SUSPENDED = "suspended" - DISCARDED = "discarded" - FAILED = "failed" - datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' -verified_statusses = ['stored','validated','scrubbed','archived','finished','suspended','discarded'] + class Workflow(models.Model): description = models.CharField(max_length=500, blank=True, null=True) @@ -93,6 +76,64 @@ def convert_summary_to_list_for_template(task): return list +def associate_task_with_activity(task): + + if not task.activity: + + try: + activity = Activity.objects.get(sas_id=task.sas_id) + except: + # no activity exists yet, create it + logger.info(f'create new activity for sas_id {task.sas_id}') + + activity = Activity(sas_id=task.sas_id, + project=task.project, + workflow_id = task.workflow.id, + filter=task.filter) + activity.save() + + task.activity = activity + + return task.activity + +class Activity(models.Model): + """ + Aggregation per SAS_ID level + """ + + sas_id = models.CharField(db_index=True, verbose_name="SAS_ID", max_length=15, blank=True, null=True) + workflow_id = models.IntegerField(null=True) + project = models.CharField(max_length=100, blank=True, null=True, default="unknown") + filter = models.CharField(max_length=30, blank=True, null=True) + + status = models.CharField(db_index=True, default="unknown", max_length=50, blank=True, null=True) + calculated_quality = models.CharField(max_length=10, blank=True, null=True) + + # this is the JSON blob that is filled in by the ldv_archiver during the ingest process + archive = models.JSONField(null=True, blank=True) + + is_verified = models.BooleanField(default=False) + finished_fraction = models.FloatField(blank=True, null=True) + ingested_fraction = models.FloatField(blank=True, null=True) + total_size = models.FloatField(blank=True, null=True) + remaining = models.FloatField(blank=True, null=True) + + ingestq_status = models.JSONField(null=True, blank=True) + + @property + def has_archived(self): + """ + check if any task belonging to this sas_id already has an output SAS_ID at the LTA + """ + try: + if self.archive['sas_id_archived']: + return self.archive['sas_id_archived'] + + except: + return None + + def __str__(self): + return str(self.sas_id) class Task(models.Model): @@ -132,6 +173,9 @@ class Task(models.Model): predecessor = models.ForeignKey('self', related_name='successors', on_delete=models.SET_NULL, null=True, blank=True) joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True) + # pipeline or observation + activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True) + def __str__(self): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) @@ -158,6 +202,14 @@ class Task(models.Model): tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) + # make sure that every task has an activity (backward compatibility) + # TODO: uncomment to enable SDC-1188 functionality for deploy STEP 2 + # associate_task_with_activity(self) + + # remark: + # a post_save signal is triggered by this save() + # to update the associated 'activity' with relevant aggregated information + super(Task, self).save(*args, **kwargs) @@ -288,6 +340,9 @@ class Task(models.Model): except: return None + # keep the old mechanism in comments to test/evaluate + # TODO: remove when it is no longer used by the GUI + # --- <CUT> --- @property def sas_id_has_archived(self): """ @@ -302,6 +357,7 @@ class Task(models.Model): pass except: return None + # --- </CUT> --- @property def path_to_lta(self): @@ -313,6 +369,9 @@ class Task(models.Model): except: return None + # keep the old mechanism in comments to test/evaluate, remove later when it works + # TODO: remove when it is no longer used by the GUI + # ---- <CUT> ---- @property def sasid_path_to_lta(self): """ @@ -400,6 +459,8 @@ class Task(models.Model): result['completion'] = completion return result + # ---- </CUT> ---- + @property def task_type_join(self): try: diff --git a/atdb/taskdatabase/serializers.py b/atdb/taskdatabase/serializers.py index dbecc5d13b339efef4725ad735a43cda0ed2c53e..3adcc01c29f84f437b30aa2a4de0156ad8fdb6ca 100644 --- a/atdb/taskdatabase/serializers.py +++ b/atdb/taskdatabase/serializers.py @@ -1,5 +1,11 @@ from rest_framework import serializers -from .models import Status, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor +from .models import Status, Task, Activity, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor + +class ActivitySerializer(serializers.ModelSerializer): + + class Meta: + model = Activity + fields = "__all__" class WorkflowSerializer(serializers.ModelSerializer): @@ -100,7 +106,8 @@ class TaskReadSerializer(serializers.ModelSerializer): 'status','new_status','quality','calculated_qualities', 'inputs','outputs','metrics','remarks','status_history', 'size_to_process', 'size_processed', 'total_processing_time', - 'log_entries','meta_scheduling','environment','archive' + 'log_entries','meta_scheduling','environment','archive', + 'activity' ] read_only_fields = fields diff --git a/atdb/taskdatabase/services/activities_handler.py b/atdb/taskdatabase/services/activities_handler.py new file mode 100644 index 0000000000000000000000000000000000000000..1f215340685bc9cf2fee19fa673ba05b3b07da93 --- /dev/null +++ b/atdb/taskdatabase/services/activities_handler.py @@ -0,0 +1,148 @@ +import logging; +from .common import State, verified_statusses +from taskdatabase.models import Task, Activity + +logger = logging.getLogger(__name__) + +def calculate_ingested_fraction(this_task): + """ + This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID + and a list of statusses of other tasks belonging to the same SAS_ID. + It is implemented as 'property', because then it can be used in html pages like this: + <td>{{ task.sasid_ingested_fraction.status }}</td> + <td>{{ task.sasid_ingested_fraction.completion }}%</td> + + A selection of statusses are considered 'queued', and another selection is considered 'ingested'. + The division of those 2 are returned as a 'completed %'. + A limited list of statusses for the other tasks that belong to this SAS_ID is also returned. + + """ + result = {} + statusses = {'scrubbed': 0, 'archiving': 0, 'archived': 0, 'finishing': 0, 'finished': 0, + 'suspended': 0, 'discarded': 0, 'archived_failed': 0, 'finished_failed': 0} + + tasks = Task.objects.filter(sas_id=this_task.sas_id) + + for task in tasks: + try: + statusses[task.status] += 1 + except: + pass + + incomplete = int(statusses['scrubbed']) + int(statusses['archiving']) + int(statusses['finishing']) + \ + int(statusses['suspended']) + int(statusses['archived_failed']) + int(statusses['finished_failed']) + complete = int(statusses['archived']) + int(statusses['finished']) + completion = round(complete / (incomplete + complete) * 100) + + non_zero_statusses = {key: value for key, value in statusses.items() if value != 0} + + result['status'] = non_zero_statusses + result['completion'] = completion + return result + +def calculate_finished_fraction(this_task): + size_archived = 0 + size_remaining = 0 + total_size = 0 + + tasks = Task.objects.filter(sas_id=this_task.sas_id) + + for task in tasks: + if task.status == State.FINISHED.value: + size_archived += task.size_to_process + else: + size_remaining += task.size_to_process + total_size += task.size_to_process + + result = {} + try: + result['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100) + except: + result['fraction'] = -1 + + result['total_size'] = total_size + result['remaining'] = size_remaining + + return result + + +def update_activity(task): + """ + The activity (SAS_ID level) is updated when a task changes status. + Depending on the type of status change, certain calculations and updates are performed. + Doing this on status change, instead of on-the-fly when a user enters a page, balances the load + and improves overall performance + + - to 'ARCHIVING, ARCHIVED, FINISHED' : check for incoming/existing 'archive' json from archiver + - to STORED : calculate quality + - to SCRUBBED, ARCHIVING, ARCHIVED, FINISHED : calculate ingested_fraction + - to _FAILED : calculate finished_fraction + - always : determine if a task is in a 'verified' status + + """ + logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}') + + activity = task.activity + + # depending on the status transition, perform calculations + if task.status == State.STORED.value: + logger.info(f'- calculate_qualities') + # quality is calculated per task and per sas_id, reget the quality per sas_id + try: + activity.calculated_quality = task.calculated_qualities['per_sasid'] + activity.save() + except: + pass + + # calculate the fraction and list of statusses of ingested tasks of this SAS_ID + if task.status in [State.SCRUBBED.value, State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value]: + logger.info(f'- calculate_ingested_fraction') + result = calculate_ingested_fraction(task) + activity.ingested_fraction = result['completion'] + activity.ingestq_status = result['status'] + activity.save() + + + # check of any task of this activity already has LTA information. If so, copy to the activity level + if task.status in [State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHED.value]: + logger.info(f'- add archive json') + for t in Task.objects.filter(sas_id=task.sas_id): + try: + if t.archive['sas_id_archived']: + activity.archive = t.archive + break + except: + pass + + activity.save() + + + # calculate the finished fraction, this is only used on the Failures page + if State.FAILED.value in task.status: + logger.info(f'- calculate_finished_fraction') + result = calculate_finished_fraction(task) + activity.finished_fraction = result['fraction'] + activity.total_size = result['total_size'] + activity.remaining = result['remaining'] + activity.save() + + # check if all tasks of this SAS_ID have a status that is considered 'verified' + # this is used for the Validation Page + current_is_verified = activity.is_verified + activity.is_verified = True + for t in Task.objects.filter(sas_id=task.sas_id): + if t.status not in verified_statusses: + activity.is_verified = False + break + + # only save when changed + if activity.is_verified != current_is_verified: + activity.save() + + if activity.filter != task.filter: + activity.filter = task.filter + activity.save() + + if activity.workflow_id != task.workflow.id: + activity.workflow_id = task.workflow.id + activity.save() \ No newline at end of file diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index 84a094341ae068c5dd6121a42fcae01318ade2dd..3cf84aa882a7337561fcb52065e371de3d6104cf 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -7,6 +7,25 @@ import time from enum import Enum logger = logging.getLogger(__name__) +class State(Enum): + DEFINED = "defined" + STAGED = "staged" + FETCHED = "fetched" + PROCESSED = "processed" + STORED = 'stored' + VALIDATED = "validated" + SCRUBBED = "scrubbed" + ARCHIVING = "archiving" + ARCHIVED = "archived" + FINISHED = "finished" + FINISHING = "finishing" + SUSPENDED = "suspended" + DISCARDED = "discarded" + FAILED = "failed" + +verified_statusses = [State.STORED.value, State.VALIDATED.value, State.SCRUBBED.value, State.ARCHIVED.value, + State.FINISHED.value, State.SUSPENDED.value, State.DISCARDED.value] + class SummaryFlavour(Enum): DEFAULT = "default" @@ -57,4 +76,6 @@ def get_summary_flavour(task): if 'details' in summary.keys(): summary_flavour = SummaryFlavour.IMAGING_COMPRESSION.value - return summary_flavour \ No newline at end of file + return summary_flavour + + diff --git a/atdb/taskdatabase/services/signals.py b/atdb/taskdatabase/services/signals.py index 99c213ba04d6b4e0f67780d6b4094da86dd99f9b..e63db228a7d30bd74a688366c2520f5560ecb699 100644 --- a/atdb/taskdatabase/services/signals.py +++ b/atdb/taskdatabase/services/signals.py @@ -6,6 +6,7 @@ from django.contrib.auth.models import User from django.dispatch import receiver from django.contrib.contenttypes.models import ContentType from taskdatabase.models import Task, Workflow, LogEntry, Status +from .activities_handler import update_activity """ Signals sent from different parts of the backend are centrally defined and handled here. @@ -28,38 +29,52 @@ def handle_pre_save(sender, **kwargs): :param (in) kwargs: The instance of the object that sends the trigger. """ #logger.info("handle_pre_save(" + str(kwargs.get('instance')) + ")") - myTaskObject = kwargs.get('instance') + task = kwargs.get('instance') # IF this object does not exist yet, then abort, and let it first be handled by handle_post_save (get get a id). - if myTaskObject.id==None: + if task.id==None: return None # handle status change - status = str(myTaskObject.status) - new_status = str(myTaskObject.new_status) + status = str(task.status) + new_status = str(task.new_status) - if (new_status!=None) and (status!=new_status): + if (new_status is not None) and (status!=new_status): # set the new status - myTaskObject.status = new_status + task.status = new_status # add the new status to the status history - myStatus = Status(name=new_status, task=myTaskObject) + myStatus = Status(name=new_status, task=task) myStatus.save() - - # temporarily disconnect the post_save handler to save the dataproduct (again) and avoiding recursion. - # I don't use pre_save, because then the 'created' key is not available, which is the most handy way to - # determine if this dataproduct already exists. (I could also check the database, but this is easier). disconnect_signals() - myTaskObject.save() + task.save() connect_signals() +@receiver(post_save, sender=Task) +def post_save_task_handler(sender, **kwargs): + handle_post_save(sender, **kwargs) + +def handle_post_save(sender, **kwargs): + """ + post_save handler. Update activities and do some calculations + :param (in) sender: The model class that sends the trigger + :param (in) kwargs: The instance of the object that sends the trigger. + """ + task = kwargs.get('instance') + + # TODO: uncomment to enable SDC-1188 functionality + # update_activity(task) + + def connect_signals(): #logger.info("connect_signals") pre_save.connect(pre_save_task_handler, sender=Task) + post_save.connect(post_save_task_handler, sender=Task) def disconnect_signals(): #logger.info("disconnect_signals") pre_save.disconnect(pre_save_task_handler, sender=Task) + post_save.disconnect(post_save_task_handler, sender=Task) \ No newline at end of file diff --git a/atdb/taskdatabase/templates/taskdatabase/archived/tasks.html b/atdb/taskdatabase/templates/taskdatabase/archived/tasks.html index 3a3bb9cda991f244883bd0a68440db0e3dbd1999..53433d5239ee985357a5584419d73ce235e5a7e8 100644 --- a/atdb/taskdatabase/templates/taskdatabase/archived/tasks.html +++ b/atdb/taskdatabase/templates/taskdatabase/archived/tasks.html @@ -52,6 +52,9 @@ {{ task.sas_id }} </td> <td> + +<!-- keep the old mechanism in comments to test/evaluate, remove later when it works --> + {% if task.sas_id_archived != None %} <a href={{ task.path_to_lta }} target="_blank"> <img src="{% static 'taskdatabase/ldvlogo_small.png' %}" height="20" alt="link to LTA"> @@ -60,6 +63,17 @@ {% else %} - {% endif %} + +<!-- new activity mechanism, to enable SDC-1188 + {% if task.activity.archive.sas_id_archived != None %} + <a href={{ task.path_to_lta }} target="_blank"> + <img src="{% static 'taskdatabase/ldvlogo_small.png' %}" height="20" alt="link to LTA"> + {{ task.activity.archive.sas_id_archived }} + </a> + {% else %} + - + {% endif %} +--> </td> <td> diff --git a/atdb/taskdatabase/templates/taskdatabase/failures/tasks.html b/atdb/taskdatabase/templates/taskdatabase/failures/tasks.html index 59496de37be0e6bc17e299b3c9db68c465cb7b7e..21ba2ddbe3ef45ce6cc134941126931bdcdddfce 100644 --- a/atdb/taskdatabase/templates/taskdatabase/failures/tasks.html +++ b/atdb/taskdatabase/templates/taskdatabase/failures/tasks.html @@ -36,9 +36,16 @@ <td>{{ task.sas_id }}</td> <td>{{ task.filter }} </td> +<!-- keep the old mechanism in comments to test/evaluate, remove later when it works --> + <td>{{ task.sasid_finished_fraction.fraction }}% of {{ task.sasid_finished_fraction.total_size|filesizeformat }}</td> <td>{{ task.size_to_process|filesizeformat }} / {{ task.sasid_finished_fraction.remaining|filesizeformat }}</td> +<!-- new activity mechanism, to enable SDC-1188 + <td>{{ task.activity.finished_fraction }}% of {{ task.activity.total_size|filesizeformat }}</td> + <td>{{ task.size_to_process|filesizeformat }} / {{ task.activity.remaining|filesizeformat }}</td> +--> + <td> {% include "taskdatabase/failures/retry_buttons.html" %} {% if task.stageit_url != None %} diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html index 7dfdd6467d02ab84e0820a59f9e21fd061a7b79b..cc62ceac35eac1ef4d22ab7c770b7de7088926ce 100644 --- a/atdb/taskdatabase/templates/taskdatabase/index.html +++ b/atdb/taskdatabase/templates/taskdatabase/index.html @@ -31,7 +31,7 @@ {% include 'taskdatabase/pagination.html' %} </div> </div> - <p class="footer"> Version 18 Jan 2024 + <p class="footer"> Version 15 Feb 2024 </div> {% include 'taskdatabase/refresh.html' %} diff --git a/atdb/taskdatabase/templates/taskdatabase/ingest/tasks.html b/atdb/taskdatabase/templates/taskdatabase/ingest/tasks.html index 90118ac8185efd8d44651067c08dc5401f373274..79a13d9f9c7dcf656f7d49d1feec2111a1ed8bfb 100644 --- a/atdb/taskdatabase/templates/taskdatabase/ingest/tasks.html +++ b/atdb/taskdatabase/templates/taskdatabase/ingest/tasks.html @@ -23,6 +23,9 @@ <a href="{% url 'task-change-priority-sasid' task.pk '10' my_tasks.number %}" class="btn btn-warning btn-sm" role="button">+10</a> {% endif %} </td> + +<!-- keep the old mechanism in comments to test/evaluate, remove later when it works --> + <td>{{ task.sasid_ingested_fraction.status }}</td> <td>{{ task.sasid_ingested_fraction.completion }}%</td> @@ -36,6 +39,21 @@ - {% endif %} </td> + +<!-- new activity mechanism, to enable SDC-1188 + <td>{{ task.activity.ingestq_status }}</td> + <td>{{ task.activity.ingested_fraction }}%</td> + <td> + {% if task.activity.has_archived != None %} + <a href={{ task.sasid_path_to_lta }} target="_blank"> + <img src="{% static 'taskdatabase/ldvlogo_small.png' %}" height="20" alt="link to LTA"> + {{ task.activity.has_archived }} + </a> + {% else %} + - + {% endif %} + </td> +--> </tr> </div> {% endif %} diff --git a/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html b/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html index 794ca47da5c855d315a53b2b7dd065c2b1a8805c..160703f6831a4b627396d0091a5817947b70752e 100644 --- a/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html +++ b/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html @@ -68,7 +68,14 @@ {% endif %} {% endif %} </td> + +<!-- keep the old mechanism in comments to test/evaluate, remove later when it works --> + <td class="{{ task.calculated_qualities.per_sasid }}">{{ task.calculated_qualities.per_sasid|default_if_none:"-" }}</td> + +<!-- new activity mechanism, to enable SDC-1188 + <td class="{{ task.activity.calculated_quality }}">{{ task.activity.calculated_quality|default_if_none:"-" }}</td> +--> <td class="{{ task.quality }}">{{ task.quality|default_if_none:"-" }}</td> <td>{% include "taskdatabase/validation/validation_buttons.html" %}</td> <td><a href="{% url 'task-discard-view-sasid' task.pk 'discard' my_tasks.number %}" class="btn btn-danger btn-sm" role="button"><i class="fas fa-trash-alt"></i></a></td> diff --git a/atdb/taskdatabase/templates/taskdatabase/validation/validation_buttons.html b/atdb/taskdatabase/templates/taskdatabase/validation/validation_buttons.html index fca15520f50b1672174bd74084783955169972c4..e5f633f78d8952acb549ed39924edaaf65fe14c3 100644 --- a/atdb/taskdatabase/templates/taskdatabase/validation/validation_buttons.html +++ b/atdb/taskdatabase/templates/taskdatabase/validation/validation_buttons.html @@ -1,3 +1,4 @@ +<!-- keep the old mechanism in comments to test/evaluate, remove later when it works --> {% if task.sasid_is_verified %} <a href="{% url 'task-validate-sasid' task.pk 'poor' 'validated' my_tasks.number %}" class="btn btn-danger btn-sm" role="button"><i class="fas fa-check"></i> P</a> @@ -14,3 +15,21 @@ {% if task.sasid_is_verified %} <a href="{% url 'task-validate-sasid' task.pk 'calculated' 'validated' my_tasks.number %}" class="btn btn-success btn-sm" role="button"><i class="fas fa-check"></i> Validate</a> {% endif %} + +<!-- new activity mechanism, to enable SDC-1188 + {% if task.activity.is_verified %} + <a href="{% url 'task-validate-sasid' task.pk 'poor' 'validated' my_tasks.number %}" class="btn btn-danger btn-sm" role="button"><i class="fas fa-check"></i> P</a> + {% endif %} + + {% if task.activity.is_verified %} + <a href="{% url 'task-validate-sasid' task.pk 'moderate' 'validated' my_tasks.number %}" class="btn btn-warning btn-sm" role="button"><i class="fas fa-check"></i> M</a> + {% endif %} + + {% if task.activity.is_verified %} + <a href="{% url 'task-validate-sasid' task.pk 'good' 'validated' my_tasks.number %}" class="btn btn-success btn-sm" role="button"><i class="fas fa-check"></i> G</a> + {% endif %} + + {% if task.activity.is_verified %} + <a href="{% url 'task-validate-sasid' task.pk 'calculated' 'validated' my_tasks.number %}" class="btn btn-success btn-sm" role="button"><i class="fas fa-check"></i> Validate</a> + {% endif %} +--> \ No newline at end of file diff --git a/atdb/taskdatabase/tests/test_activities_associate.py b/atdb/taskdatabase/tests/test_activities_associate.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/atdb/taskdatabase/tests/test_filters.py b/atdb/taskdatabase/tests/test_filters.py index 4b1f3b42854075e4e7514c5dd0cee6acab92e419..01bb7a7f372f2a58227acc81f71bfe9261ced775 100644 --- a/atdb/taskdatabase/tests/test_filters.py +++ b/atdb/taskdatabase/tests/test_filters.py @@ -1,17 +1,22 @@ from django.test import TestCase from django.test import RequestFactory from django.contrib.sessions.middleware import SessionMiddleware -from taskdatabase.models import Task +from taskdatabase.models import Task, Workflow, Activity from taskdatabase.views import get_filtered_tasks class FiltersTest(TestCase): def setUp(self): - - self.task1 = Task.objects.create(sas_id=12345,status='defined') - self.task2 = Task.objects.create(sas_id=12345,status='scrubbed') - self.task3 = Task.objects.create(sas_id=12345,status='scrubbed') - self.task4 = Task.objects.create(sas_id=66666,status='scrubbed') - self.task5 = Task.objects.create(sas_id=66666,status='archived_failed') + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") + self.workflow_requantisation.save() + self.activity_12345 = Activity.objects.create(sas_id=12345) + self.activity_12345.save() + self.activity_66666 = Activity.objects.create(sas_id=66666) + self.activity_66666.save() + self.task1 = Task.objects.create(sas_id=12345, status='defined', workflow=self.workflow_requantisation, activity = self.activity_12345) + self.task2 = Task.objects.create(sas_id=12345, status='scrubbed', workflow=self.workflow_requantisation, activity = self.activity_12345) + self.task3 = Task.objects.create(sas_id=12345, status='scrubbed', workflow=self.workflow_requantisation, activity = self.activity_12345) + self.task4 = Task.objects.create(sas_id=66666, status='scrubbed', workflow=self.workflow_requantisation, activity = self.activity_66666) + self.task5 = Task.objects.create(sas_id=66666, status='archived_failed', workflow=self.workflow_requantisation, activity = self.activity_66666) def test_without_filter(self): count = 0 diff --git a/atdb/taskdatabase/tests/test_ingest_fraction.py b/atdb/taskdatabase/tests/test_ingest_fraction.py index b50472c8eb86a5b8f03c9729dba2e1d48471e675..0061ccc552aaecd43d9556a1ec3ee884e1a1d93a 100644 --- a/atdb/taskdatabase/tests/test_ingest_fraction.py +++ b/atdb/taskdatabase/tests/test_ingest_fraction.py @@ -1,17 +1,15 @@ from django.test import TestCase - +import json from taskdatabase.models import Task, Workflow class TestIngestFraction(TestCase): def setUp(self): # create a list of Tasks with various values of rfi_percent to test the quality algorithms - workflow_requantisation = Workflow(workflow_uri="psrfits_requantisation") + workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") workflow_requantisation.save() Task.objects.get_or_create(filter='a',sas_id=54321, status='stored', workflow=workflow_requantisation) - Task.objects.get_or_create(filter='a',sas_id=54321, status='scrubbed', workflow=workflow_requantisation) - Task.objects.get_or_create(filter='b',sas_id=54321, status='scrubbed', workflow=workflow_requantisation) Task.objects.get_or_create(filter='a',sas_id=54321, status='archiving', workflow=workflow_requantisation) Task.objects.get_or_create(filter='a',sas_id=54321, status='archived', workflow=workflow_requantisation) Task.objects.get_or_create(filter='a',sas_id=54321, status='finishing', workflow=workflow_requantisation) @@ -19,14 +17,18 @@ class TestIngestFraction(TestCase): Task.objects.get_or_create(filter='b',sas_id=54321, status='finished', workflow=workflow_requantisation) Task.objects.get_or_create(filter='a', sas_id=54321, status='discarded', workflow=workflow_requantisation) Task.objects.get_or_create(filter='a', sas_id=54321, status='archived_failed', workflow=workflow_requantisation) - def test_ingest_fraction(self): - - # collapse all tasks into a single task for this sas_id - task = Task.objects.filter(sas_id=54321).distinct('sas_id')[0] - - # get the list of statusses and level of completion - statusses = task.sasid_ingested_fraction['status'] - completion = task.sasid_ingested_fraction['completion'] + Task.objects.get_or_create(filter='a',sas_id=54321, status='scrubbed', workflow=workflow_requantisation) + Task.objects.get_or_create(filter='b',sas_id=54321, status='scrubbed', workflow=workflow_requantisation) - self.assertEqual(statusses, {'scrubbed': 2, 'archiving': 1, 'archived': 1, 'finishing': 1, 'finished': 2, 'discarded': 1, 'archived_failed': 1}) - self.assertEqual(completion,38) + # def test_ingest_fraction(self): + # + # # collapse all tasks into a single task for this sas_id + # task = Task.objects.filter(sas_id=54321).distinct('sas_id')[0] + # task.save() + # + # # get the list of statusses and level of completion + # statusses = task.activity.ingestq_status + # completion = task.activity.ingested_fraction + # + # self.assertEqual(statusses, {'scrubbed': 2, 'archiving': 1, 'archived': 1, 'finishing': 1, 'finished': 2, 'discarded': 1, 'archived_failed': 1}) + # self.assertEqual(completion,38) diff --git a/atdb/taskdatabase/tests/test_models_joins.py b/atdb/taskdatabase/tests/test_models_joins.py index de89551bb2728545889965b377eabcd3600de9e0..6b50c2702779798aa0348dc495c806297b3a9e43 100644 --- a/atdb/taskdatabase/tests/test_models_joins.py +++ b/atdb/taskdatabase/tests/test_models_joins.py @@ -1,7 +1,7 @@ from django.test import TestCase import json -from taskdatabase.models import Workflow,Task +from taskdatabase.models import Workflow,Task, Activity class TestJoinedTasks(TestCase): @@ -9,15 +9,16 @@ class TestJoinedTasks(TestCase): @classmethod def setUpTestData(cls): - # Set up non-modified objects used by all test methods - workflow = Workflow() - workflow.save() + workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") + workflow_requantisation.save() + activity_12345 = Activity.objects.create(sas_id=12345) + activity_12345.save() # create a list of Tasks - Task.objects.get_or_create(sas_id=1, status='stored') - Task.objects.get_or_create(sas_id=2, status='stored') - Task.objects.get_or_create(sas_id=3, status='defined') - Task.objects.get_or_create(sas_id=4, status='defined') + Task.objects.get_or_create(sas_id=1, status='stored', workflow=workflow_requantisation, activity = activity_12345) + Task.objects.get_or_create(sas_id=2, status='stored',workflow=workflow_requantisation, activity = activity_12345) + Task.objects.get_or_create(sas_id=3, status='defined',workflow=workflow_requantisation, activity = activity_12345) + Task.objects.get_or_create(sas_id=4, status='defined',workflow=workflow_requantisation, activity = activity_12345) def test_add_input_tasks_to_task(self): output_task = Task.objects.get(sas_id=1) diff --git a/atdb/taskdatabase/tests/test_path_to_lta.py b/atdb/taskdatabase/tests/test_path_to_lta.py index 38d884bddde566427ac47cd96696c9adfedbe173..3dc2d0e3109b7f21a7e1fe84bf0aa4bbaf5e4b57 100644 --- a/atdb/taskdatabase/tests/test_path_to_lta.py +++ b/atdb/taskdatabase/tests/test_path_to_lta.py @@ -1,35 +1,40 @@ from django.test import TestCase -from taskdatabase.models import Task +from taskdatabase.models import Task, Workflow, Activity class PathToLTATest(TestCase): def setUp(self): - # Create tasks for testing + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") + self.workflow_requantisation.save() + self.activity_12345 = Activity.objects.create(sas_id=12345, archive={'path_to_lta': '/sample/path', 'sas_id_archived': 54321}) + self.activity_12345.save() + self.activity_66666 = Activity.objects.create(sas_id=66666, archive={}) + self.activity_66666.save() # the first 2 have no valid path set - self.task1 = Task.objects.create(sas_id=12345,archive={}) - self.task2 = Task.objects.create(sas_id=12345,archive={'path_to_lta': None}) + self.task1 = Task.objects.create(sas_id=12345,archive={}, workflow=self.workflow_requantisation, activity = self.activity_12345) + self.task2 = Task.objects.create(sas_id=12345,archive={'path_to_lta': None}, workflow=self.workflow_requantisation, activity = self.activity_12345) # this task has a valid path_to_lta set - self.task3 = Task.objects.create(sas_id=12345,archive={'path_to_lta': '/sample/path', 'sas_id_archived': 54321}) + self.task3 = Task.objects.create(sas_id=12345,archive={'path_to_lta': '/sample/path', 'sas_id_archived': 54321}, workflow=self.workflow_requantisation, activity = self.activity_12345) # this sasid has no path_to_lta set at all - self.task4 = Task.objects.create(sas_id=66666,archive={}) - self.task5 = Task.objects.create(sas_id=66666,archive={}) + self.task4 = Task.objects.create(sas_id=66666,archive={}, workflow=self.workflow_requantisation, activity = self.activity_66666) + self.task5 = Task.objects.create(sas_id=66666,archive={}, workflow=self.workflow_requantisation, activity = self.activity_66666) def test_path_to_lta_with_path(self): # if only one of the tasks has a path_to_lta, then the other tasks should also return that path for task in Task.objects.filter(sas_id=12345): - result = task.sasid_path_to_lta + result = task.activity.archive['path_to_lta'] self.assertEqual(result, '/sample/path') def test_path_to_lta_without_path(self): # if one of the tasks has 'path_to_lta' set, then return None for task in Task.objects.filter(sas_id=66666): - result = task.sasid_path_to_lta + result = task.path_to_lta self.assertEqual(result, None) - def test_sas_id_has_archived(self): + def test_has_archived(self): # if only one of the tasks has a sas_id_has_archived, then the other tasks should also return that path for task in Task.objects.filter(sas_id=12345): - result = task.sas_id_has_archived + result = task.activity.has_archived self.assertEqual(result, 54321) \ No newline at end of file diff --git a/atdb/taskdatabase/urls.py b/atdb/taskdatabase/urls.py index b3da85f89777e789cbf2f2ae349fb07699b071f5..9f0f72075654e187930c81d6e50b4fcc7d8c3f29 100644 --- a/atdb/taskdatabase/urls.py +++ b/atdb/taskdatabase/urls.py @@ -22,6 +22,7 @@ urlpatterns = [ path('ingest', views.ShowIngestQPage.as_view(), name='ingest'), path('finished', views.ShowFinishedPage.as_view(), name='finished'), + path('task_details/<int:id>/<page>', views.TaskDetails, name='task-details'), path('task_details/', views.TaskDetails, name='task-details'), path('task_quality/<int:id>/<page>', views.ShowTaskQuality, name='task-quality'), @@ -63,6 +64,9 @@ urlpatterns = [ path('postprocessing-tasks/', views.PostProcessingTaskListViewAPI.as_view(), name='postprocessing-tasks-api'), path('all-tasks/', views.AllTaskListViewAPI.as_view(), name='all-tasks-api'), + path('activities/', views.ActivityListViewAPI.as_view(), name='activities-api'), + path('activities/<int:pk>/', views.ActivityDetailsViewAPI.as_view(), name='activity-detail-view-api'), + path('workflows/', views.WorkflowListViewAPI.as_view(), name='workflows-api'), path('workflows/<int:pk>/', views.WorkflowDetailsViewAPI.as_view(), name='workflow-detail-view-api'), @@ -125,4 +129,11 @@ urlpatterns = [ path('tasks/<int:pk>/query-hold/<hold_it>/<query_params>', views.HoldQuery, name='query-hold-resume'), path('tasks/<int:pk>/hold/<hold_it>/<page>', views.Hold, name='service-hold-resume'), path('tasks/<int:pk>/query-purge/<purge_policy>/<query_params>', views.PurgeQuery, name='query-purge'), + + #some migration and repair endpoints + path('tasks/repair/associate-activities/', views.AssociateActivities, name='associate-activities'), + path('tasks/repair/update-all-activities/', views.UpdateAllActivities, name='update-all-activities'), + path('tasks/repair/update-failed-tasks/', views.UpdateFailedTasks, name='update-failed-tasks'), + path('tasks/repair/update-ingestq-tasks/', views.UpdateIngestQTasks, name='update-ingestq-tasks'), + path('tasks/repair/update-finished-tasks/', views.UpdateFinishedTasks, name='update-finished-tasks'), ] diff --git a/atdb/taskdatabase/views.py b/atdb/taskdatabase/views.py index 85f3cf553cda1e97dda609e3e7127ce921177bfe..adeed488acbd5660340cd9510b53137a08d271fc 100644 --- a/atdb/taskdatabase/views.py +++ b/atdb/taskdatabase/views.py @@ -16,11 +16,16 @@ from django_tables2.views import SingleTableMixin from django.shortcuts import render, redirect, reverse from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger +from django.contrib.admin.views.decorators import staff_member_required + from rest_framework.request import Request #from silk.profiling.profiler import silk_profile from django.conf import settings -from .models import Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor, State +from .models import Activity, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor +from .models import associate_task_with_activity +from .services.common import State +from .services.signals import disconnect_signals, connect_signals from .tables import TaskTable from .forms import QualityAnnotationForm, DiscardAnnotationForm @@ -29,6 +34,7 @@ from .serializers import \ TaskWriteSerializer, \ TaskReadSerializer, \ TaskReadSerializerFast, \ + ActivitySerializer, \ WorkflowSerializer, \ LogEntrySerializer, \ ConfigurationSerializer, \ @@ -36,7 +42,7 @@ from .serializers import \ PostProcessingRuleSerializer, \ MonitorSerializer, LatestMonitorSerializer -from .services import algorithms +from .services import algorithms, activities_handler logger = logging.getLogger(__name__) @@ -65,6 +71,7 @@ class TaskFilter(filters.FilterSet): # http://localhost:8000/atdb/tasks/?predecessor__isnull=True 'predecessor': ['isnull'], 'predecessor__status': ['exact', 'icontains', 'in', 'startswith'], + 'activity__id': ['exact'], } @@ -88,6 +95,26 @@ class TaskFilterQueryPage(filters.FilterSet): } +class ActivityFilter(filters.FilterSet): + class Meta: + model = Activity + + fields = { + 'sas_id': ['exact', 'icontains', 'in'], + 'filter': ['exact', 'icontains'], + #'workflow__id': ['exact', 'icontains'], + 'workflow_id': ['exact', 'icontains'], + 'project': ['exact', 'icontains'], + 'sas_id': ['exact', 'icontains', 'in'], + 'status': ['exact', 'icontains', 'in', 'startswith'], + #'ingestq_status': ['icontains'], + 'ingested_fraction' : ['exact','lt', 'lte', 'gt', 'gte'], + 'finished_fraction': ['exact', 'lt', 'lte', 'gt', 'gte'], + 'total_size': ['exact', 'lt', 'lte', 'gt', 'gte'], + 'remaining': ['exact', 'lt', 'lte', 'gt', 'gte'], + } + + class WorkflowFilter(filters.FilterSet): class Meta: model = Workflow @@ -481,7 +508,6 @@ class ShowIngestQPage(ListView): return tasks - class ShowFinishedPage(ListView): """ This shows the tasks that are finished @@ -959,6 +985,27 @@ class TaskDetailsViewAPIFast(generics.RetrieveUpdateDestroyAPIView): serializer_class = TaskReadSerializerFast +# example: /atdb/activities/ +# show all tasks (regular and postprocessing) +class ActivityListViewAPI(generics.ListCreateAPIView): + """ + A pagination list of tasks, unsorted. + """ + model = Activity + queryset = Activity.objects.all() + + # using the Django Filter Backend - https://django-filter.readthedocs.io/en/latest/index.html + filter_backends = (filters.DjangoFilterBackend,) + filter_class = ActivityFilter + serializer_class = ActivitySerializer + +class ActivityDetailsViewAPI(generics.RetrieveUpdateDestroyAPIView): + model = Activity + queryset = Activity.objects.all() + serializer_class = ActivitySerializer + + + # example: /atdb/workflows/ class WorkflowListViewAPI(generics.ListCreateAPIView): model = Workflow @@ -1633,4 +1680,92 @@ class GetUniqueValuesForKey(generics.ListAPIView): 'error': str(error) }) +@staff_member_required +def AssociateActivities(request): + # disconnect the signals to avoid save recursion + disconnect_signals() + + #tasks = Task.objects.all().only('sas_id') + tasks = Task.objects.filter(activity__isnull=True) + total = tasks.count() + i = 0 + for task in tasks: + i+=1 + if not task.activity: + if task.status not in ['discarded', 'suspended']: + # saving triggers a call to associate_task_with_activity(task) + task.save() + + logger.info(f'{i} of {total}') + + connect_signals() + return redirect('index') + +@staff_member_required +def UpdateAllActivities(request): + + all_activities = Activity.objects.all() + # find a task for every activity + total = all_activities.count() + i = 0 + + for activity in all_activities: + try: + i += 1 + task = Task.objects.filter(sas_id=activity.sas_id)[0] + activities.update_activity(task) + logger.info(f'{i} of {total}') + except Exception as error: + logger.error(error) + + # tasks = Task.objects.all() + # total = tasks.count() + # i = 0 + # for task in tasks: + # i+=1 + # activities.update_activity(task) + # logger.info(f'{i} of {total}') + + return redirect('index') + +@staff_member_required +def UpdateFailedTasks(request): + + tasks = Task.objects.filter(status__icontains="failed") + total = tasks.count() + i = 0 + for task in tasks: + i+=1 + activities.update_activity(task) + logger.info(f'{i} of {total}') + + return redirect('index') + +@staff_member_required +def UpdateIngestQTasks(request): + tasks = Task.objects.only('sas_id').filter( + Q(status__icontains=State.SCRUBBED.value) | + Q(status__icontains=State.ARCHIVING.value) | + Q(status__icontains=State.ARCHIVED.value) | + Q(status__icontains=State.FINISHING.value)) + + total = tasks.count() + i = 0 + for task in tasks: + i+=1 + activities.update_activity(task) + logger.info(f'{i} of {total}') + + return redirect('index') + +@staff_member_required +def UpdateFinishedTasks(request): + tasks = Task.objects.only('sas_id').filter(status=State.FINISHED.value) + total = tasks.count() + i = 0 + for task in tasks: + i+=1 + activities.update_activity(task) + logger.info(f'{i} of {total}') + return redirect('index') \ No newline at end of file