Skip to content
Snippets Groups Projects
Commit e3c9bc6b authored by Nico Vermaas's avatar Nico Vermaas
Browse files

cleaned signals

parent 32e4a1c0
No related branches found
No related tags found
1 merge request!339SDC-1188 - STEP 1 of 3 (the database)
Pipeline #71865 passed
......@@ -209,7 +209,7 @@ ACTIVE_STATUSSES = ['staging','staged','processing','processed','validated','sto
STATUSSES_WITH_DATA = ['staged','fetching','fetched','processing','processed','validated','storing','stored','scrubbing','scrubbed','archiving','archived']
AGGREGATES = ['failed','active','total']
QUERY_LIMIT_MULTI_CHANGE = 5000
QUERY_LIMIT_MULTI_CHANGE = 10000
MAX_MONITORING_HISTORY_HOURS = 7 * 24
SERVICES_LATE_WARNING_SECONDS = 1800
......
......@@ -8,7 +8,11 @@ class TaskAdmin(admin.ModelAdmin):
ordering = ['-creationTime']
search_fields = ['id','sas_id']
admin.site.register(Activity)
@admin.register(Activity)
class ActivityAdmin(admin.ModelAdmin):
ordering = ['-sas_id']
search_fields = ['id','sas_id']
admin.site.register(Workflow)
admin.site.register(LogEntry)
admin.site.register(Configuration)
......
# Generated by Django 5.0 on 2024-02-12 10:19
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('taskdatabase', '0040_activity_ingestq_status'),
]
operations = [
migrations.AlterField(
model_name='task',
name='activity',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='tasks', to='taskdatabase.activity'),
),
]
......@@ -76,6 +76,25 @@ def convert_summary_to_list_for_template(task):
return list
def associate_task_with_activity(task):
if not task.activity:
try:
activity = Activity.objects.get(sas_id=task.sas_id)
except:
# no activity exists yet, create it
logger.info(f'create new activity for sas_id {task.sas_id}')
activity = Activity(sas_id=task.sas_id,
project=task.project,
workflow_id = task.workflow.id,
filter=task.filter)
activity.save()
task.activity = activity
return task.activity
class Activity(models.Model):
"""
......@@ -156,7 +175,7 @@ class Task(models.Model):
joined_output_task = models.ForeignKey('self', related_name='joined_input_tasks', on_delete=models.SET_NULL, null=True, blank=True)
# pipeline or observation
activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.DO_NOTHING, null=True, blank=True)
activity = models.ForeignKey(Activity, related_name='tasks', on_delete=models.SET_NULL, null=True, blank=True)
def __str__(self):
return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id)
......@@ -184,6 +203,9 @@ class Task(models.Model):
tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id)
self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds)
# make sure that every task has an activity (backward compatibility)
associate_task_with_activity(self)
# remark:
# a post_save signal is triggered by this save()
# to update the associated 'activity' with relevant aggregated information
......
......@@ -65,27 +65,6 @@ def calculate_finished_fraction(this_task):
return result
def associate_task_with_activity(task, save_task=True):
if not task.activity:
try:
activity = Activity.objects.get(sas_id=task.sas_id)
except:
# no activity exists yet, create it
logger.info(f'create new activity for sas_id {task.sas_id}')
activity = Activity(sas_id=task.sas_id,
project=task.project,
workflow_id = task.workflow.id,
filter=task.filter)
activity.save()
task.activity = activity
if save_task:
task.save()
return task.activity
def update_activity(task):
"""
......@@ -104,7 +83,8 @@ def update_activity(task):
# do not save the task,
# because this function is called from signals where the task.save is delayed on purpose to avoid recursion
activity = associate_task_with_activity(task, save_task=False)
#activity = associate_task_with_activity(task, save_task=False)
activity = task.activity
# depending on the status transition, perform calculations
if task.status == State.STORED.value:
......
......@@ -69,13 +69,8 @@ def handle_post_save(sender, **kwargs):
"""
task = kwargs.get('instance')
# temporarily disconnect and save later, to avoid recursion.
update_activity(task)
disconnect_signals()
task.save()
connect_signals()
def connect_signals():
#logger.info("connect_signals")
......
......@@ -31,7 +31,7 @@
{% include 'taskdatabase/pagination.html' %}
</div>
</div>
<p class="footer"> Version 9 Feb 2024
<p class="footer"> Version 12 Feb 2024
</div>
{% include 'taskdatabase/refresh.html' %}
......
......@@ -23,7 +23,9 @@ from rest_framework.request import Request
from django.conf import settings
from .models import Activity, Task, Workflow, LogEntry, Configuration, Job, PostProcessingRule, Monitor, LatestMonitor
from .models import associate_task_with_activity
from .services.common import State
from .services.signals import disconnect_signals, connect_signals
from .tables import TaskTable
from .forms import QualityAnnotationForm, DiscardAnnotationForm
......@@ -69,6 +71,7 @@ class TaskFilter(filters.FilterSet):
# http://localhost:8000/atdb/tasks/?predecessor__isnull=True
'predecessor': ['isnull'],
'predecessor__status': ['exact', 'icontains', 'in', 'startswith'],
'activity__id': ['exact'],
}
......@@ -1679,16 +1682,23 @@ class GetUniqueValuesForKey(generics.ListAPIView):
@staff_member_required
def AssociateActivities(request):
# disconnect the signals to avoid save recursion
disconnect_signals()
tasks = Task.objects.all().only('sas_id')
#tasks = Task.objects.all().only('sas_id')
tasks = Task.objects.filter(activity__isnull=True)
total = tasks.count()
i = 0
for task in tasks:
i+=1
if not task.activity:
if task.status not in ['discarded', 'suspended']:
activities.associate_task_with_activity(task)
# saving triggers a call to associate_task_with_activity(task)
task.save()
logger.info(f'{i} of {total}')
connect_signals()
return redirect('index')
@staff_member_required
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment