diff --git a/atdb/atdb/static/taskdatabase/style.css b/atdb/atdb/static/taskdatabase/style.css index 5e4bce6b5f3f230602792cbd6eddbac6d8018acd..55982d9b4d32155c7999546788b8ec51f7df2f71 100644 --- a/atdb/atdb/static/taskdatabase/style.css +++ b/atdb/atdb/static/taskdatabase/style.css @@ -8,7 +8,7 @@ TD { color: green; } -.defined,.staged,.fetched,.processed,.stored,.validated,.scrubbed,.archived,.finished,.aggregated { +.defined,.staged,.fetched,.processed,.stored,.validated,.scrubbed,.archived,.finished,.aggregate,.aggregated { background-color: lemonchiffon; color: blue; } @@ -27,11 +27,6 @@ TD { background-color: lightgreen; } -.aggregate { - font-weight: bold; - background-color: lightgreen; -} - .aggregate_failed,.aggregating_failed { color: red; font-weight: bold; diff --git a/atdb/requirements/base.txt b/atdb/requirements/base.txt index fcc3da67820b11bdff135ff597f5e5ff589b0da6..3bfd52ab41348792a26fdb0d76551189575c1d02 100644 --- a/atdb/requirements/base.txt +++ b/atdb/requirements/base.txt @@ -12,6 +12,7 @@ fontawesome-free==5.15.2 matplotlib==3.8.3 oauthlib==3.2.2 psycopg2-binary==2.9.3 +python-bidi==0.4.2 python3-openid==3.2.0 requests-oauthlib==1.3.1 six==1.15.0 diff --git a/atdb/run_coverage.bat b/atdb/run_coverage.bat new file mode 100644 index 0000000000000000000000000000000000000000..faa9fc6688cc8cc4b95a0b4727ca2b9e63dff969 --- /dev/null +++ b/atdb/run_coverage.bat @@ -0,0 +1,3 @@ +coverage run --omit=taskdatabase/tests/*,taskdatabase/migrations/* manage.py test --settings=atdb.settings.dev +coverage -m +coverage html \ No newline at end of file diff --git a/atdb/taskdatabase/migrations/0049_task_is_aggregated.py b/atdb/taskdatabase/migrations/0049_task_is_aggregated.py new file mode 100644 index 0000000000000000000000000000000000000000..572666a0386aa2ae5ca38c79f3ee4fbefdb5c4a2 --- /dev/null +++ b/atdb/taskdatabase/migrations/0049_task_is_aggregated.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-07-23 11:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0048_alter_activity_storage_location'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='is_aggregated', + field=models.BooleanField(default=False), + ), + ] diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 9f94f94f2fc5fbe18878e41c670c025aee85cdd2..a289b34c9326c011703d073aa5b81648a3cbeafb 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -8,7 +8,7 @@ import json import logging from .services import calculated_qualities as qualities -from .services.common import State +from .services.common import State, AggregationStrategy, ACTIVITY_RESET_STATUSSEN logger = logging.getLogger(__name__) @@ -16,9 +16,9 @@ logger = logging.getLogger(__name__) datetime_format_string = '%Y-%m-%dT%H:%M:%SZ' AGGREGATION_STRATEGY_CHOICES = ( - ("none", "none"), - ("wait_for_summary_task", "wait_for_summary_task"), - ("collect_h5", "collect_h5"), + (AggregationStrategy.NONE.value, AggregationStrategy.NONE.value), + (AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value, AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value), + (AggregationStrategy.COLLECT_H5.value, AggregationStrategy.COLLECT_H5.value), ) class Workflow(models.Model): @@ -34,7 +34,7 @@ class Workflow(models.Model): prefetch = models.BooleanField(null=True, default=True) # this is a fixed list of options, because when an option is added it also requires changes in the aggregator service - aggregation_strategy = models.CharField(max_length=50, choices = AGGREGATION_STRATEGY_CHOICES, default="none") + aggregation_strategy = models.CharField(max_length=50, choices = AGGREGATION_STRATEGY_CHOICES, default=AggregationStrategy.NONE.value) aggregation_script = models.CharField(max_length=100, blank=True, null=True) quality_thresholds = models.CharField(max_length=50, blank=True, null=True) @@ -42,51 +42,6 @@ class Workflow(models.Model): return str(self.id) + ' - ' + str(self.workflow_uri) -# convert the quality information from the JSONfield into a easy parsable list for the template -def convert_quality_to_list_for_template(task): - list = [] - - try: - list.append(str(task.quality_json['uv-coverage'])) - except: - list.append("-") - - try: - list.append(str(task.quality_json['sensitivity'])) - except: - list.append("-") - - try: - list.append(str(task.quality_json['observing-conditions'])) - except: - list.append("-") - - return list - - -def convert_quality_to_shortlist_for_template(task): - list = [] - - try: - list.append(str(task.quality_json['uv-coverage'])) - list.append(str(task.quality_json['sensitivity'])) - list.append(str(task.quality_json['observing-conditions'])) - except Exception as err: - pass - - return list - -def convert_summary_to_list_for_template(task): - list = [] - - try: - summary = task.quality_json['summary'] - - except Exception as err: - pass - - return list - def associate_task_with_activity(task): if not task.activity: @@ -155,12 +110,32 @@ class Activity(models.Model): def __str__(self): return str(self.sas_id) + def create_storage_location(self): + workdir = Configuration.objects.get(key='executor:workdir').value + self.storage_location = workdir.replace('run', 'aggregate') + str(self.sas_id) + def check_if_summary(task): """ check if this task is a summary task for backward compatiblity reasons this is done very ugly, by looking if certain filenames contain the substring 'summary' """ + # look for the new 'is_summary' flag + try: + tars = task.outputs['summary'] + + for tar in tars: + if tar['is_summary']: + # a summary tarball was found, this task is a summary task + logger.info(f'task {task.id} with workflow {task.workflow} is a summary task') + return True + return False + + except: + # no 'is_summary' field found, but then this is probably an older task. + # for backward compatibility, ignore the error and continue with they old filename-based method + pass + # look in the outputs.tar_archive try: tars = task.outputs['tar_archive'] @@ -180,6 +155,7 @@ class Task(models.Model): # Task control properties task_type = models.CharField(max_length=20, default="regular") is_summary = models.BooleanField(default=False) + is_aggregated = models.BooleanField(default=False) filter = models.CharField(max_length=30, blank=True, null=True) environment = models.CharField(max_length=255, blank=True, null=True) new_status = models.CharField(max_length=50, default="defining", null=True) @@ -219,8 +195,50 @@ class Task(models.Model): def __str__(self): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) + def handle_aggregation(self): + """ + depending on the aggregation_strategy for this task, different functionality is executed + """ + + try: + # for requantisation pipeline: HOLD summary tasks + if (self.workflow.aggregation_strategy == AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value): + + if (self.activity.status != State.AGGREGATED.value): + # if so, temporarily put it on hold so that the ancillary service can grab it with it + if (self.is_summary and not self.activity.is_aggregated): + self.resume = False + + + # for image_compression_pipeline: ... + + if (self.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value): + + if not self.is_aggregated: + # set the task to AGGREGATE, + # to prevent the datamanager from picking it up, + # and to trigger aggregator service to pick it up, + # to copy its H5 files to the activity.storage_location on spider + + # the aggregator will then returns the task to PROCESSED with 'is_aggregated = true', + # so that it won't be picked up again. + + # TODO: only activate when the aggregator service actually picks this up + self.new_status = State.AGGREGATE.value + #pass + + + except Exception as error: + # this should never happen + # But it can happen that tasks are inserted directly in an advanced status without going through + # the proper steps (like in tests). If that happens, just log the error and continue. + logger.error(error) + def save(self, *args, **kwargs): + # make sure that every task has an activity (also for backward compatibility) + associate_task_with_activity(self) + # calculate the qualities for this task if (self.status != State.STORED.value) & (self.new_status == State.STORED.value): @@ -238,63 +256,21 @@ class Task(models.Model): tasks_for_this_sasid = Task.objects.filter(sas_id=self.sas_id) self.calculated_qualities = qualities.calculate_qualities(self, tasks_for_this_sasid, quality_thresholds) - # when tasks are put back on defined (resetting the flow), also reset some activity values - if (self.status != State.DEFINED.value) & (self.new_status == State.DEFINED.value): - try: - self.activity.status = State.DEFINED.value - self.activity.is_processed = False - self.activity.is_aggregated = False - self.activity.save() - except: - # only bother with it if the task actually has an activity attached - # which is always... except in some simpler unittests - pass - - # when tasks are put back on fetched (resetting the flow), also reset some activity values - if (self.status != State.FETCHED.value) & (self.new_status == State.FETCHED.value): - try: - self.activity.status = State.FETCHED.value - self.activity.is_processed = False - self.activity.is_aggregated = False - self.activity.save() - except: - # only bother with it if the task actually has an activity attached - # which is always... except in some simpler unittests - pass - - # when a task goes to PROCESSED... + # when a task goes to PROCESSED... handle the (potential) aggregation functionality if (self.status != State.PROCESSED.value) & (self.new_status == State.PROCESSED.value): - try: - # ...but only when the tasks of this activity are not yet aggregated - # (because the aggregator service also sets tasks back to processed) + self.handle_aggregation() - if (self.activity.status != State.AGGREGATED.value): - # check if this task is a summary task - self.is_summary = check_if_summary(self) - - # if so, temporarily put it on hold so that the ancillary service can grab it with it - if (self.is_summary and not self.activity.is_aggregated): - self.resume = False + if self.new_status in ACTIVITY_RESET_STATUSSEN: + self.is_aggregated = False - except Exception as error: - # this should never happen - # But it can happen that tasks are inserted directly in an advanced status without going through - # the proper steps (like in tests). If that happens, just log the error and continue. - logger.error(error) + # check in the outputs if this task should be considered to be summary task + self.is_summary = check_if_summary(self) - # nv:20feb2024, same as above, but for backward compatibilty reasons. - # For tasks that are already beyond PROCESSED, but not yet ingested. - # But it has no use to put them on 'hold' for the ancillary service, - # because at this point the spider work directory has already been deleted by datamanager - if (self.status != State.VALIDATED.value) & (self.new_status == State.VALIDATED.value): - self.is_summary = check_if_summary(self) - - # make sure that every task has an activity (also for backward compatibility) - associate_task_with_activity(self) # remark: - # a post_save signal is triggered by this save() + # a post_save signal is triggered, which in turn executes the activities_handler.update_activity() function # to update the associated 'activity' with relevant aggregated information + # see documentation: https://support.astron.nl/confluence/display/SDCP/Activities super(Task, self).save(*args, **kwargs) @@ -392,29 +368,6 @@ class Task(models.Model): except: return None - @property - def quality_as_list(self): - try: - q = convert_quality_to_list_for_template(self) - return q - except: - return None - - @property - def quality_as_shortlist(self): - try: - q = convert_quality_to_shortlist_for_template(self) - return q - except: - return None - - @property - def summary_as_list(self): - try: - q = convert_summary_to_list_for_template(self) - return q - except: - return None @property def sas_id_archived(self): diff --git a/atdb/taskdatabase/serializers.py b/atdb/taskdatabase/serializers.py index 55484a36048d1b2872f560b55f2615de05f1c2f5..75fb231ab3a334bc9b7d3a98708e9b197158a0ba 100644 --- a/atdb/taskdatabase/serializers.py +++ b/atdb/taskdatabase/serializers.py @@ -45,7 +45,7 @@ class TaskWriteSerializer(serializers.ModelSerializer): model = Task fields = ('id','task_type','filter','predecessor','successors', 'joined_output_task', - 'project','sas_id','priority','purge_policy','cleanup_policy','resume', + 'project','sas_id','priority','purge_policy','cleanup_policy','resume','is_aggregated', 'new_workflow_id','new_workflow_uri','workflow', 'stage_request_id', 'status','new_status','quality','calculated_qualities', @@ -97,7 +97,7 @@ class TaskReadSerializer(serializers.ModelSerializer): class Meta: model = Task - fields = ['id','task_type','is_summary','creationTime','filter', + fields = ['id','task_type','is_summary','is_aggregated','creationTime','filter', 'predecessor','predecessor_status','successors', 'joined_input_tasks','joined_output_task','joined_status', 'project','sas_id','priority','purge_policy','cleanup_policy','resume', @@ -132,7 +132,7 @@ class TaskReadSerializerFast(serializers.ModelSerializer): """ class Meta: model = Task - fields = ['id','task_type','is_summary','creationTime','filter','predecessor','predecessor_status', + fields = ['id','task_type','is_summary','is_aggregated','creationTime','filter','predecessor','predecessor_status', #'joined_input_tasks', 'joined_output_task', 'joined_status', 'project','sas_id','priority','purge_policy','cleanup_policy','resume', 'workflow', diff --git a/atdb/taskdatabase/services/activities_handler.py b/atdb/taskdatabase/services/activities_handler.py index a98e6b33467b8466b76205dea6feb1a1a79abb4d..45cfd5d6d816271b3fe0d9fd3a2d53aab7f2ed22 100644 --- a/atdb/taskdatabase/services/activities_handler.py +++ b/atdb/taskdatabase/services/activities_handler.py @@ -1,6 +1,10 @@ import logging; -from .common import State, verified_statusses, processed_statusses -from taskdatabase.models import Task, Activity +from django.conf import settings +from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \ + UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN +from taskdatabase.models import Task, Activity, Configuration + +status_list = settings.STATUSSES_WITH_DATA logger = logging.getLogger(__name__) @@ -42,6 +46,7 @@ def calculate_ingested_fraction(this_task): result['completion'] = completion return result + def calculate_finished_fraction(this_task): size_archived = 0 size_remaining = 0 @@ -68,92 +73,178 @@ def calculate_finished_fraction(this_task): return result -def update_activity(task): +def update_ingest_fractions(task): """ - The activity (SAS_ID level) is updated when a task changes status. - Depending on the type of status change, certain calculations and updates are performed. - Doing this on status change, instead of on-the-fly when a user enters a page, balances the load - and improves overall performance + The IngestQ page shows the fraction of completed/archived tasks per SAS_ID + This is the function that calculates that fraction when a relevant status transition was done. + """ + logger.info(f'- update_ingest_fractions') + activity = task.activity + + result = calculate_ingested_fraction(task) + activity.ingested_fraction = result['completion'] + activity.ingestq_status = result['status'] + activity.save() - - to 'ARCHIVING, ARCHIVED, FINISHED' : check for incoming/existing 'archive' json from archiver - - to SCRUBBED, ARCHIVING, ARCHIVED, FINISHED : calculate ingested_fraction - - to _FAILED : calculate finished_fraction - - to (processed_statusses) : check if all tasks are processed, set Activity to is_processed and AGGREGATE - - always : determine if a task is in a 'verified' status +def update_archive_info(task): """ - logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}') + The Finished page shows some information about the archiving/ingest results, + like the new SAS_ID that the resulting output of the pipeline got in the LTA + This is the function retrieves that information from the tasks and stores it in the Activity + when a relevant status transition was done. + """ + logger.info(f'- update_archive_info') activity = task.activity - # depending on the status transition, perform calculations + for t in Task.objects.filter(sas_id=task.sas_id): + try: + if t.archive['sas_id_archived']: + activity.archive = t.archive + break + except: + pass - # calculate the fraction and list of statusses of ingested tasks of this SAS_ID - if task.status in [State.SCRUBBED.value, State.PRE_ARCHIVING.value, State.PRE_ARCHIVED.value, - State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value, State.FINISHED.value]: - logger.info(f'- calculate_ingested_fraction') - result = calculate_ingested_fraction(task) - activity.ingested_fraction = result['completion'] - activity.ingestq_status = result['status'] - activity.save() + activity.save() - # check of any task of this activity already has LTA information. If so, copy to the activity level - if task.status in [State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHED.value]: - logger.info(f'- add archive json') - for t in Task.objects.filter(sas_id=task.sas_id): - try: - if t.archive['sas_id_archived']: - activity.archive = t.archive - break - except: - pass +def update_finished_fraction(task): + """ + The Failures page shows the failed tasks, + but also the fraction of the tasks that were succesfully ingested for the same SAS_ID (activity) + + This is the function that calculates that fraction when a relevant status transition was done. + """ + logger.info(f'- update_finished_fraction') + activity = task.activity + + result = calculate_finished_fraction(task) + activity.finished_fraction = result['fraction'] + activity.total_size = result['total_size'] + activity.remaining = result['remaining'] + activity.save() + +def reset_activity(task): + """ + When a task is recycled back to DEFINED or FETCHED then the activity as a whole is no longer 'processed' or 'aggregated'. + Reset those fields accordingly + """ + try: + logger.info(f'- reset activity') + activity = task.activity + activity.status = task.status + activity.is_processed = False + activity.is_aggregated = False activity.save() + except: + # only bother with it if the task actually has an activity attached + # which is always... except in some simpler unittests + pass + +def create_aggregation_task(task): + """ + create a new aggregation task based on this task + """ + aggregation_task = Task( + task_type='aggregation', + filter=task.filter, + project=task.project, + sas_id=task.sas_id, + workflow=task.workflow, + status=State.IDLE.value, + new_status=State.IDLE.value, + activity=task.activity) + + aggregation_task.save() + return aggregation_task + +def update_processed_and_aggregate(task): + """ + Check if the whole SAS_ID (activity) is processed. + Currently this is used to set the activity.aggregate status, which triggers the aggregator service. + """ + logger.info(f'- update_processed') + activity = task.activity + + # this complicated looking piece of code checks if ALL tasks of the activity are processed, + # if yes... set activity.status = AGGREGATE + + # this needs to be done for all aggregation_strategies, + # because it signals the moment where all the input data for aggregation is (or has been) available. + # For some strategies something extra needs to be done... + + current_is_processed = activity.is_processed + activity.is_processed = True + non_discarded_found = False + for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'): + if t.status not in PROCESSED_STATUSSES: + activity.is_processed = False + break + + # at least one of the tasks should NOT be in discarded, + # otherwise a fully discarded SAS_ID will also register as 'is_processed' and ready to 'AGGREGATE' + if t.status != State.DISCARDED.value: + non_discarded_found = True + + # only save when changed + if activity.is_processed != current_is_processed: + # if the whole activity has become processed, then set the status of this activity to 'AGGREGATE' + # unless it was already aggregated + if (activity.is_processed & non_discarded_found): + if not (activity.is_aggregated): + activity.status = State.AGGREGATE.value - # calculate the finished fraction, this is only used on the Failures page - if State.FAILED.value in task.status: - logger.info(f'- calculate_finished_fraction') - result = calculate_finished_fraction(task) - activity.finished_fraction = result['fraction'] - activity.total_size = result['total_size'] - activity.remaining = result['remaining'] activity.save() - # check if all tasks of this SAS_ID have a status that is considered 'processed' - # this is used as a trigger for the ancillary service - if task.status in processed_statusses: - current_is_processed = activity.is_processed - activity.is_processed = True - non_discarded_found = False - for t in Task.objects.filter(sas_id=task.sas_id): - if t.status not in processed_statusses: - activity.is_processed = False - break + if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value): + + # check if there is already a storage_location, if not, add it. + if not activity.storage_location: + # for this aggregation_strategy, the activity storage_location is the workdir of the aggregation task + activity.create_storage_location() + + # check if the activity is ready to collect H5 data + if (not activity.is_aggregated and + activity.status != State.COLLECTING_DATA.value and + activity.status != State.AGGREGATE.value): - # at least one of the tasks should NOT be in discarded, - # otherwise a fully discarded SAS_ID will also register as 'is_processed' and ready to 'AGGREGATE' - if t.status != State.DISCARDED.value: - non_discarded_found = True + # create a new 'aggregate_task' that is used to collect the aggregated output + # this has to be done only once, so this is a good place to do it. + aggregation_tasks = Task.objects.filter(sas_id=task.sas_id,task_type='aggregation') + if aggregation_tasks.count()==0: + aggregation_task = create_aggregation_task(task) + else: + aggregation_task = aggregation_tasks[0] - # only save when changed - if activity.is_processed != current_is_processed: - # if the whole activity has become processed, then set the status of this activity to 'AGGREGATE' - # unless it was already aggregated - if (activity.is_processed & non_discarded_found): - if not (activity.is_aggregated): - activity.status = State.AGGREGATE.value + # this means that its tasks know that they should copy their H5 files to the storage_location + # (but the tasks cannot do that, the aggregator service does) + activity.status = State.COLLECTING_DATA.value activity.save() + + +def update_is_verified(task): + """ + The Validation page shows Activities (SAS_ID's) that are ready to be validated, by giving them a quality. + All tasks belonging to the SAS_ID must be 'verified' for that. + If that is the case, the activity itself gets the 'is_verified' flag. + + This is the function that checks the verified state and updates the activity.is_verified flag. + """ + + logger.info(f'- update_verified') + activity = task.activity # check if all tasks of this SAS_ID have a status that is considered 'verified' # this is used for the Validation Page current_is_verified = activity.is_verified activity.is_verified = True for t in Task.objects.filter(sas_id=task.sas_id): - if t.status not in verified_statusses: + if t.status not in VERIFIED_STATUSSES: activity.is_verified = False break @@ -161,6 +252,13 @@ def update_activity(task): if activity.is_verified != current_is_verified: activity.save() +def update_changed_fields(task): + """ + It shouldn't happen, but technically it is possible that users change the filter + """ + logger.info(f'- update_changed_fields') + activity = task.activity + if activity.filter != task.filter: activity.filter = task.filter activity.save() @@ -171,4 +269,49 @@ def update_activity(task): activity.save() except: # continue, workflow is not initially needed - logger.error(f'no workflow found for this task') \ No newline at end of file + logger.error(f'no workflow found for this task') + +def update_activity(task): + """ + The activity (SAS_ID level) is updated when a task changes status. + Depending on the type of status change, certain calculations and updates are performed. + Doing this on status change, instead of on-the-fly when a user enters a page, balances the load + and improves overall performance + + - to 'ARCHIVING, ARCHIVED, FINISHED' : check for incoming/existing 'archive' json from archiver + - to SCRUBBED .. FINISHED : calculate ingested_fraction + - to _FAILED : calculate finished_fraction + - to (processed_statusses) : check if all tasks are processed, set Activity to is_processed and AGGREGATE + - always : determine if a task is in a 'verified' status + + """ + logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}') + + # depending on the status transition, perform the appropriate action + + # calculate the fraction and list of statusses of ingested tasks of this SAS_ID + if task.status in INGEST_FRACTION_STATUSSES: + update_ingest_fractions(task) + + # check of any task of this activity already has LTA information. If so, copy to the activity level + if task.status in UPDATE_ARCHIVE_STATUSSES: + update_archive_info(task) + + if task.status in ACTIVITY_RESET_STATUSSEN: + reset_activity(task) + + # calculate the finished fraction for failed tasks + if State.FAILED.value in task.status: + update_finished_fraction(task) + + + # check if all tasks of this SAS_ID have a status that is considered 'processed' + # this is used as a trigger for the ancillary service + if task.status in PROCESSED_STATUSSES: + update_processed_and_aggregate(task) + + # check if all tasks for this activity are in a verified status. + update_is_verified(task) + + # check if users have changed certain fields after the specification step + update_changed_fields(task) diff --git a/atdb/taskdatabase/services/algorithms.py b/atdb/taskdatabase/services/algorithms.py index b95e5429371ee5b3b1ddad7458997e5ac1a0ce3c..6f93f982072752c3e4ac72704a7ced3ebc78b4d4 100644 --- a/atdb/taskdatabase/services/algorithms.py +++ b/atdb/taskdatabase/services/algorithms.py @@ -9,7 +9,7 @@ import base64 from datetime import datetime from django.db.models import Q, Sum import logging -from .common import timeit, get_summary_flavour, SummaryFlavour +from .common import get_summary_flavour, SummaryFlavour from django.urls import reverse from ..models import Task, LogEntry, Workflow, Configuration from django.conf import settings @@ -22,7 +22,6 @@ DJANGO_TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" logger = logging.getLogger(__name__) -@timeit def get_size(status_list, type): """ aggregate the sizes of all task with a status in the list @@ -46,7 +45,6 @@ def get_size(status_list, type): return sum_value -@timeit def get_min_start_and_max_end_time(sas_id): """ Retrieve the minimum start time en maximum end time of a set of taskids (sas_id) diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index a55b3347e896c9d9659b46076e0dd58e2522149a..e3fb9ecd4b9cbfeed41626ef67d6f02c7a4b88a2 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -28,11 +28,20 @@ class State(Enum): SUSPENDED = "suspended" DISCARDED = "discarded" FAILED = "failed" + COLLECTING_DATA = "collecting_data" + IDLE = "idle" -verified_statusses = [State.STORED.value, State.VALIDATED.value, State.SCRUBBED.value, State.PRE_ARCHIVED.value, +VERIFIED_STATUSSES = [State.STORED.value, State.VALIDATED.value, State.SCRUBBED.value, State.PRE_ARCHIVED.value, State.ARCHIVED.value, State.FINISHED.value, State.SUSPENDED.value, State.DISCARDED.value] -processed_statusses = [State.PROCESSED.value, State.AGGREGATING.value, State.AGGREGATED.value, State.STORED.value, State.DISCARDED.value] +PROCESSED_STATUSSES = [State.PROCESSED.value, State.AGGREGATE.value, State.AGGREGATING.value, State.AGGREGATED.value, State.STORED.value, + State.DISCARDED.value] + +INGEST_FRACTION_STATUSSES = [State.SCRUBBED.value, State.PRE_ARCHIVING.value, State.PRE_ARCHIVED.value, + State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value, State.FINISHED.value] + +UPDATE_ARCHIVE_STATUSSES = [State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHED.value] +ACTIVITY_RESET_STATUSSEN = [State.DEFINED.value, State.FETCHED.value] class SummaryFlavour(Enum): DEFAULT = "default" @@ -40,20 +49,11 @@ class SummaryFlavour(Enum): LINC_TARGET = "linc_target" IMAGING_COMPRESSION = "imaging_compression" -# this is a decorator that can be put in front (around) a function all to measure its execution time -def timeit(method): - def timed(*args, **kw): - ts = time.time() - result = method(*args, **kw) - te = time.time() - if 'log_time' in kw: - name = kw.get('log_name', method.__name__.upper()) - kw['log_time'][name] = int((te - ts) * 1000) - else: - print('execution time: %r %2.2f ms' % \ - (method.__name__, (te - ts) * 1000)) - return result - return timed +class AggregationStrategy(Enum): + NONE = "none" + WAIT_FOR_SUMMARY_TASK = "wait_for_summary_task" + COLLECT_H5 = "collect_h5" + def get_summary_flavour(task): """ diff --git a/atdb/taskdatabase/static/taskdatabase/style.css b/atdb/taskdatabase/static/taskdatabase/style.css index 5e4bce6b5f3f230602792cbd6eddbac6d8018acd..55982d9b4d32155c7999546788b8ec51f7df2f71 100644 --- a/atdb/taskdatabase/static/taskdatabase/style.css +++ b/atdb/taskdatabase/static/taskdatabase/style.css @@ -8,7 +8,7 @@ TD { color: green; } -.defined,.staged,.fetched,.processed,.stored,.validated,.scrubbed,.archived,.finished,.aggregated { +.defined,.staged,.fetched,.processed,.stored,.validated,.scrubbed,.archived,.finished,.aggregate,.aggregated { background-color: lemonchiffon; color: blue; } @@ -27,11 +27,6 @@ TD { background-color: lightgreen; } -.aggregate { - font-weight: bold; - background-color: lightgreen; -} - .aggregate_failed,.aggregating_failed { color: red; font-weight: bold; diff --git a/atdb/taskdatabase/templates/taskdatabase/index.html b/atdb/taskdatabase/templates/taskdatabase/index.html index e5941648725060db6ee8041b9dd4990a098f0db4..0e67068a191fb73ea3e9dc61c3cf0e2d2689fd2c 100644 --- a/atdb/taskdatabase/templates/taskdatabase/index.html +++ b/atdb/taskdatabase/templates/taskdatabase/index.html @@ -31,7 +31,7 @@ {% include 'taskdatabase/pagination.html' %} </div> </div> - <p class="footer"> Version 16 Jul 2024 + <p class="footer"> Version 8 Aug 2024 </div> {% include 'taskdatabase/refresh.html' %} diff --git a/atdb/taskdatabase/templates/taskdatabase/tasks.html b/atdb/taskdatabase/templates/taskdatabase/tasks.html index ecf378640b5913ef23eeb895a6d0add7f6ab2297..3bc7d44829cc51c670ae40d3fe6a756c1853a0d2 100644 --- a/atdb/taskdatabase/templates/taskdatabase/tasks.html +++ b/atdb/taskdatabase/templates/taskdatabase/tasks.html @@ -1,6 +1,6 @@ {% load static %} {% for task in my_tasks %} - {% if task.status != "discarded" %} + {% if task.status != "discarded" and task.task_type == 'regular' %} <div class="row"> <tr class="{{ task.status }}"> @@ -17,7 +17,7 @@ {% if task.has_quality %} - {% if task.is_summary %} + {% if task.is_summary or task.task_type == 'aggregation' %} <a class="open-modal btn btn-secondary btn-sm" href="{% url 'task-quality' task.id my_tasks.number %}" data-popup-url="{% url 'task-quality' task.id my_tasks.number %}" diff --git a/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html b/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html index 85b5007cef9df74b1976f5b9bec1980148e9b2b5..856161cad8212d406104187221d06c39e6d720db 100644 --- a/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html +++ b/atdb/taskdatabase/templates/taskdatabase/validation/tasks.html @@ -44,7 +44,7 @@ {% endif %} {% else %} - <td>-</td><td>-</td><td>-</td><td>-</td><td>-</td> + <td>-</td><td>-</td> {% endif %} diff --git a/atdb/taskdatabase/tests/test_models_processed_summary.py b/atdb/taskdatabase/tests/test_models_processed_summary.py index 81f6abe6309cec2b60e4a614748f393c9310672b..9774545cfeef1145d93bf5da1b64288a3626ce87 100644 --- a/atdb/taskdatabase/tests/test_models_processed_summary.py +++ b/atdb/taskdatabase/tests/test_models_processed_summary.py @@ -1,6 +1,6 @@ from django.test import TestCase import json -from taskdatabase.models import Task, Workflow, Activity +from taskdatabase.models import Configuration, Task, Workflow, Activity from taskdatabase.services.common import State class TestProcessedSummary(TestCase): @@ -9,29 +9,65 @@ class TestProcessedSummary(TestCase): """ initialize test data """ - self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") + Configuration.objects.create(key="executor:workdir", value="/project/ldv/Share/run/") + + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation", + aggregation_strategy="wait_for_summary_task") self.workflow_requantisation.save() + self.workflow_imaging_compression = Workflow(id=14, workflow_uri="imaging_compression", + aggregation_strategy="collect_h5") + self.workflow_imaging_compression.save() + self.task1 = Task.objects.create(sas_id=222, filter="test_blabla", new_status=State.PROCESSED.value, workflow=self.workflow_requantisation, is_summary=False) self.task1.save() + + # this is a summary task (old style) self.task2 = Task.objects.create(sas_id=222, new_status=State.PROCESSED.value, workflow=self.workflow_requantisation, outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) self.task2.save() + # this is a summary task (new style, using ís_summary) + self.task3 = Task.objects.create(sas_id=222, + new_status=State.PROCESSED.value, + workflow=self.workflow_requantisation, + outputs={ + "summary": [{"is_summary": True}], + "quality": { + "summary" : {"L441006_summaryCS.tar" : {"is_summary" : True} }, + "plots" : [{"basename": "L441006_CS_quick_summary.pdf"}] + } + }) + + self.task3.save() + + # this is a summary task, but it uses a workflow with an aggregation_strategy that should not hold the task + self.task4 = Task.objects.create(sas_id=333, + new_status=State.PROCESSED.value, + workflow=self.workflow_imaging_compression, + outputs={"summary": [{"is_summary": True}]}) + self.task4.save() + def test_processed_not_on_hold(self): """ task 1 is processed, but not a summary dataproduct. Should NOT go on hold """ actual = self.task1.resume - self.assertEqual(actual, True) + self.assertTrue(actual) + def test_processed_not_on_hold_for_different_strategy(self): + """ + this is a summary task, but it uses a workflow with an aggregation_strategy that should not hold the task + """ + actual = self.task4.resume + self.assertTrue(actual) def test_processed_on_hold(self): """ @@ -39,8 +75,7 @@ class TestProcessedSummary(TestCase): """ actual = self.task2.resume - # this test fails, because "self.resume = False" is still commented out in models.py L249 - self.assertEqual(actual, False) + self.assertFalse(actual) def test_activity_is_processed(self): @@ -49,4 +84,46 @@ class TestProcessedSummary(TestCase): """ actual = self.task1.activity.is_processed - self.assertEqual(actual, True) \ No newline at end of file + self.assertTrue(actual) + + def test_has_summary_substring(self): + """ + task 2 only has the old summary filename test. Check if the task indeed gets seen as a summary_task + """ + actual = self.task2.is_summary + self.assertTrue(actual) + + def test_is_summary(self): + """ + task 3 only has the new 'is_summary' test. Check if the task indeed gets seen as a summary_task + """ + actual = self.task3.is_summary + self.assertTrue(actual) + + def test_has_quality_summary(self): + """ + task 3 has both the quality.summary field and summary field filled + It is the quality.summary field that is used as the real source of truth. + """ + actual = self.task3.has_summary + self.assertTrue(actual) + + def test_has_plots(self): + """ + task 3 has quality.plots field to test the has_plots function + """ + actual = self.task3.has_plots + self.assertTrue(actual) + + def test_has_no_plots(self): + """ + task 4 has no quality.plots field to test the has_plots function + """ + actual = self.task4.has_plots + self.assertFalse(actual) + def test_predecessor_status(self): + """ + test prececessor_status + """ + actual = self.task3.predecessor_status + self.assertEqual(actual, "no_predecessor") \ No newline at end of file diff --git a/atdb/taskdatabase/tests/test_summary_tasks.py b/atdb/taskdatabase/tests/test_summary_tasks.py index e607fd0cdb27f62b130014c53e8e245e2e5eb47e..d4d568dfea5cd5aef854f2cef47dd010785945a2 100644 --- a/atdb/taskdatabase/tests/test_summary_tasks.py +++ b/atdb/taskdatabase/tests/test_summary_tasks.py @@ -7,15 +7,15 @@ class TestSummaryTasks(TestCase): """ initialize test data """ - self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation", + aggregation_strategy="wait_for_summary_task") self.workflow_requantisation.save() self.no_summary_task_77777 = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation, outputs={"tar_archive": [{"size": 4885985280, "basename": "L621240_SAP002_B073_P000_bf.tar", "nameroot": "L621240_SAP002_B073_P000_bf"}]}) - self.summary_task_defined_77777 = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation, - outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) + self.summary_task_defined_77777 = Task.objects.create(sas_id=77777, new_status=State.DEFINED.value, workflow=self.workflow_requantisation) self.summary_task_validated_77777 = Task.objects.create(sas_id=77777, new_status=State.VALIDATED.value, workflow=self.workflow_requantisation, - outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) + outputs={"summary": [{"is_summary": True}]}) self.summary_task_processed_77777 = Task.objects.create(sas_id=77777, new_status=State.PROCESSED.value, workflow=self.workflow_requantisation, outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) diff --git a/atdb/taskdatabase/tests/test_update_activity.py b/atdb/taskdatabase/tests/test_update_activity.py index 7197ee95e807d01d58d0f53841d1ff1a01e7a51b..364f1036649393295b067a4b30d0a95a01a58d1d 100644 --- a/atdb/taskdatabase/tests/test_update_activity.py +++ b/atdb/taskdatabase/tests/test_update_activity.py @@ -1,7 +1,7 @@ from django.test import TestCase import json -from taskdatabase.models import Task, Workflow, Activity -from taskdatabase.services.common import State +from taskdatabase.models import Configuration, Task, Workflow, Activity +from taskdatabase.services.common import State, AggregationStrategy class TestUpdateActivity(TestCase): @@ -9,9 +9,16 @@ class TestUpdateActivity(TestCase): """ initialize test data """ + # used to create the activity.storage_location + Configuration.objects.create(key="executor:workdir", value="/project/ldv/Share/run/") + self.workflow_requantisation = Workflow(id=22, workflow_uri="psrfits_requantisation") self.workflow_requantisation.save() + self.workflow_imaging_compression = Workflow(id=28, workflow_uri="imaging_compression", + aggregation_strategy = AggregationStrategy.COLLECT_H5.value) + self.workflow_imaging_compression.save() + self.task1 = Task.objects.create(sas_id=12345, filter="test_blabla", status='stored', @@ -83,6 +90,13 @@ class TestUpdateActivity(TestCase): outputs={"tar_archive": [{"size": 4885985280, "basename": "L185619_summaryCS.tar", "nameroot": "L185619_summaryCS"}]}) self.task10.save() + # to test imaging + self.task11 = Task.objects.create(sas_id=113, + new_status='fetched', + workflow=self.workflow_imaging_compression, + outputs={"inspect": {"location": "file:///project/ldv/Share/run/2023/3/26/331_30608/inspect.h5", "basename": "inspect.h5", "nameroot": "inspect"}}) + self.task11.save() + def test_created_activity(self): """ test if activity is created @@ -90,8 +104,7 @@ class TestUpdateActivity(TestCase): activity = self.task1.activity # test if an activity with the correct sas_id was created - actual = activity.sas_id - self.assertEqual(actual, 12345) + self.assertEqual(activity.sas_id, 12345) def test_scrubbed(self): @@ -100,11 +113,8 @@ class TestUpdateActivity(TestCase): """ activity = self.task2.activity - actual = activity.ingestq_status - self.assertEqual(actual, {'scrubbed': 1}) - - actual = activity.ingested_fraction - self.assertEqual(actual, 0) + self.assertEqual(activity.ingestq_status, {'scrubbed': 1}) + self.assertEqual(activity.ingested_fraction, 0) def test_archived(self): @@ -113,11 +123,8 @@ class TestUpdateActivity(TestCase): """ activity = self.task3.activity - actual = activity.ingestq_status - self.assertEqual(actual, {'scrubbed': 1, 'archived': 1}) - - actual = activity.archive['sas_id_archived'] - self.assertEqual(actual, "1219995") + self.assertEqual(activity.ingestq_status, {'scrubbed': 1, 'archived': 1}) + self.assertEqual(activity.archive['sas_id_archived'], "1219995") def test_finished(self): @@ -127,11 +134,8 @@ class TestUpdateActivity(TestCase): activity = self.task4.activity - actual = activity.ingestq_status - self.assertEqual(actual, {'scrubbed': 1, 'finished': 1, 'archived': 1}) - - actual = activity.archive['sas_id_archived'] - self.assertEqual(actual, "1219995") + self.assertEqual(activity.ingestq_status, {'scrubbed': 1, 'finished': 1, 'archived': 1}) + self.assertEqual(activity.archive['sas_id_archived'], "1219995") def test_failed(self): @@ -141,14 +145,9 @@ class TestUpdateActivity(TestCase): activity = self.task5.activity - actual = activity.finished_fraction - self.assertEqual(actual, 33) - - actual = activity.total_size - self.assertEqual(actual, 3000) - - actual = activity.remaining - self.assertEqual(actual, 2000) + self.assertEqual(activity.finished_fraction, 33) + self.assertEqual(activity.total_size, 3000) + self.assertEqual(activity.remaining, 2000) def test_filter_and_workflow(self): @@ -157,12 +156,8 @@ class TestUpdateActivity(TestCase): """ activity = self.task1.activity - - actual = activity.filter - self.assertEqual(actual, "test_blabla") - - actual = activity.workflow_id - self.assertEqual(actual, 22) + self.assertEqual(activity.filter, "test_blabla") + self.assertEqual(activity.workflow_id, 22) def test_is_not_processed(self): """ @@ -171,9 +166,7 @@ class TestUpdateActivity(TestCase): """ activity = self.task9.activity - - actual = activity.is_processed - self.assertEqual(actual, False) + self.assertFalse(activity.is_processed) def test_is_processed(self): @@ -181,11 +174,46 @@ class TestUpdateActivity(TestCase): task 6, 7 and 8 are processed, activity.is_processed should be true and activity status should go to 'aggregate' """ - activity = self.task6.activity + self.assertTrue(activity.is_processed) + self.assertEqual(activity.status, State.AGGREGATE.value) + + def test_reset_activity(self): + """ + when a task is set to DEFINED or FETCHED, the Activity is reset + """ + activity = self.task1.activity + + # simulate post aggregation state + activity.is_aggregated = True + activity.is_processed = True + + # simulate task to FETCHED + self.task1.new_status = State.FETCHED.value + self.task1.save() + + # check if the activity is reset + self.assertFalse(activity.is_aggregated) + self.assertFalse(activity.is_processed) + + def test_create_storage_location(self): + """ + create activity.storage_location + + WHEN a task goes to processed, + and its workflow has the COLLECT_H5 aggregation strategy + and its activity does not have a 'storage_location' yet + """ + + activity = self.task11.activity + + # check initial state + self.assertEqual(activity.storage_location, None) - actual = activity.is_processed - self.assertEqual(actual, True) + # simulate task to PROCESSED + self.task11.new_status = State.PROCESSED.value + self.task11.save() - actual = activity.status - self.assertEqual(actual, State.AGGREGATE.value) \ No newline at end of file + expected = "/project/ldv/Share/aggregate/113" + actual = self.task11.activity.storage_location + self.assertEqual(actual, expected) diff --git a/atdb/taskdatabase/tests/test_views_postprocessing_page.py b/atdb/taskdatabase/tests/test_views_postprocessing_page.py new file mode 100644 index 0000000000000000000000000000000000000000..69b603d4f01c582792f66256228e3cc63a24b211 --- /dev/null +++ b/atdb/taskdatabase/tests/test_views_postprocessing_page.py @@ -0,0 +1,19 @@ +from django.test import TestCase +from django.urls import reverse + +from taskdatabase.models import Task, Workflow +class PostProcessingPageViewTest(TestCase): + + + def test_url_exists_at_desired_location(self): + response = self.client.get('/atdb/postprocessing-tasks') + self.assertEqual(response.status_code, 200) + + def test_url_accessible_by_name(self): + response = self.client.get(reverse('postprocessing-tasks')) + self.assertEqual(response.status_code, 200) + + def test_uses_correct_template(self): + response = self.client.get(reverse('postprocessing-tasks')) + self.assertEqual(response.status_code, 200) + self.assertTemplateUsed(response, 'taskdatabase/postprocessing.html') \ No newline at end of file diff --git a/atdb/taskdatabase/views.py b/atdb/taskdatabase/views.py index 29137bc94be6bae3a94cdc3172e93ddd46c2c01f..67b6bbb95dc31343e6e3bf35972d7fb843333e3c 100644 --- a/atdb/taskdatabase/views.py +++ b/atdb/taskdatabase/views.py @@ -73,6 +73,7 @@ class TaskFilter(filters.FilterSet): fields = { 'task_type': ['exact', 'icontains', 'in'], 'is_summary': ['exact'], + 'is_aggregated': ['exact'], 'creationTime': ['icontains'], 'filter': ['exact', 'icontains'], 'workflow__id': ['exact', 'icontains'],