diff --git a/atdb/taskdatabase/migrations/0049_task_is_aggregated.py b/atdb/taskdatabase/migrations/0049_task_is_aggregated.py new file mode 100644 index 0000000000000000000000000000000000000000..572666a0386aa2ae5ca38c79f3ee4fbefdb5c4a2 --- /dev/null +++ b/atdb/taskdatabase/migrations/0049_task_is_aggregated.py @@ -0,0 +1,18 @@ +# Generated by Django 5.0 on 2024-07-23 11:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('taskdatabase', '0048_alter_activity_storage_location'), + ] + + operations = [ + migrations.AddField( + model_name='task', + name='is_aggregated', + field=models.BooleanField(default=False), + ), + ] diff --git a/atdb/taskdatabase/models.py b/atdb/taskdatabase/models.py index 4875172ebd816cd1c53b9866a7db21e827e97bb3..6af6728dea29566905c7c5553d930907154615af 100644 --- a/atdb/taskdatabase/models.py +++ b/atdb/taskdatabase/models.py @@ -155,6 +155,10 @@ class Activity(models.Model): def __str__(self): return str(self.sas_id) + def create_storage_location(self): + workdir = Configuration.objects.get(key='executor:workdir').value + self.storage_location = workdir.replace('run', 'aggregate') + self.sas_id + def check_if_summary(task): """ @@ -196,6 +200,7 @@ class Task(models.Model): # Task control properties task_type = models.CharField(max_length=20, default="regular") is_summary = models.BooleanField(default=False) + is_aggregated = models.BooleanField(default=False) filter = models.CharField(max_length=30, blank=True, null=True) environment = models.CharField(max_length=255, blank=True, null=True) new_status = models.CharField(max_length=50, default="defining", null=True) @@ -236,9 +241,11 @@ class Task(models.Model): return str(self.id) + ' - (' + self.task_type + ') - ' + str(self.sas_id) def handle_aggregation(self): - """ depending on the aggregation_strategy for this task, different functionality is executed """ - try: + """ + depending on the aggregation_strategy for this task, different functionality is executed + """ + try: # for requantisation pipeline: HOLD summary tasks if (self.workflow.aggregation_strategy == AggregationStrategy.WAIT_FOR_SUMMARY_TASK.value): @@ -249,48 +256,22 @@ class Task(models.Model): # for image_compression_pipeline: ... - # test with WF-14, http://localhost:8000/atdb/task_details/30608/1 + if (self.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value): - # TODO: move this to update_activity later? (for now keep everything for this aggregation_stragegy together - if self.activity.storage_location == "unknown": - # file:///project/ldv/Share/aggregate/658584 - # executor:workdir - workdir = Configuration.objects.get(key='executor:workdir').value - self.activity.storage_location = workdir.replace('run','aggregate') + self.sas_id - self.activity.save() - - - """ - # what logic should we put here? perhaps this?: - # (A) check if the activity is ready - if (not activity.is_aggregated and activity.status!='aggregate'): - (1) fill activity.storage_location with a spider working location - (2) set activity.status to 'ready-to-aggregate' - - # (B) create a new aggregation task? with status 'aggregate' and task_type='aggregation'? - - to have a task to collect the aggregated output in later. - - # (C) handle the current task - (1) set the task.status to aggregate (so that the aggregator can pick it up - (2) aggregator: - - check if activity.storage_location exists on spider, if not, create it. - - copy the H5 file to the 'activity.storage_location' - - make an entry for the H5 in the inputs.txt file - - (3) aggregator: set task to ... what... - - 'processed' would be nice, but how do I prevent it being picked up again? - - aggregated? But then the datamanager should trigger on 'aggregated' also. - - # (D) do the real aggregation - - all H5 files are stored in activity.storage_location - - activity.status was set to 'aggregate' because all tasks are 'processed' - aggregator: - (1) pull the script and make it executable - (2) run it - (3) fill the aggregator task with inputs (based on inputs.txt) - and the outputs (based on outputs.txt) - """ + if not self.is_aggregated: + # set the task to AGGREGATE, + # to prevent the datamanager from picking it up, + # and to trigger aggregator service to pick it up, + # to copy its H5 files to the activity.storage_location on spider + + # the aggregator will then returns the task to PROCESSED with 'is_aggregated = true', + # so that it won't be picked up again. + + # TODO: only activate when the aggregator service actually picks this up + #self.new_status = State.AGGREGATE.value + pass + except Exception as error: # this should never happen diff --git a/atdb/taskdatabase/services/activities_handler.py b/atdb/taskdatabase/services/activities_handler.py index 6ffd497947121e7e25ff455735f384c78bb08184..790f3e53eb16c39f7840b1353c74007de1c39ddb 100644 --- a/atdb/taskdatabase/services/activities_handler.py +++ b/atdb/taskdatabase/services/activities_handler.py @@ -1,8 +1,8 @@ import logging; from django.conf import settings -from .common import State, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \ +from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \ UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN -from taskdatabase.models import Task, Activity +from taskdatabase.models import Task, Activity, Configuration status_list = settings.STATUSSES_WITH_DATA @@ -46,6 +46,7 @@ def calculate_ingested_fraction(this_task): result['completion'] = completion return result + def calculate_finished_fraction(this_task): size_archived = 0 size_remaining = 0 @@ -71,6 +72,7 @@ def calculate_finished_fraction(this_task): return result + def update_ingest_fractions(task): """ The IngestQ page shows the fraction of completed/archived tasks per SAS_ID @@ -84,6 +86,7 @@ def update_ingest_fractions(task): activity.ingestq_status = result['status'] activity.save() + def update_archive_info(task): """ The Finished page shows some information about the archiving/ingest results, @@ -105,6 +108,7 @@ def update_archive_info(task): activity.save() + def update_finished_fraction(task): """ The Failures page shows the failed tasks, @@ -121,6 +125,7 @@ def update_finished_fraction(task): activity.remaining = result['remaining'] activity.save() + def reset_activity(task): """ When a task is recycled back to DEFINED or FETCHED then the activity as a whole is no longer 'processed' or 'aggregated'. @@ -147,6 +152,13 @@ def update_processed_and_aggregate(task): logger.info(f'- update_processed') activity = task.activity + # this complicated looking piece of code checks if ALL tasks of the activity are processed, + # if yes... set activity.status = AGGREGATE + + # this needs to be done for all aggregation_strategies, + # because it signals the moment where all the input data for aggregation is (or has been) available. + # For some strategies something extra needs to be done... + current_is_processed = activity.is_processed activity.is_processed = True non_discarded_found = False @@ -170,6 +182,24 @@ def update_processed_and_aggregate(task): activity.save() + + if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value): + + # check if the activity is ready to collect H5 data + if (not activity.is_aggregated and activity.status != 'aggregate'): + + # check if there is already a storage_location, if not, add it. + if activity.storage_location == "unknown": + activity.create_storage_location() + + # this means that its tasks know that they should copy their H5 files to the storage_location + # (but the tasks cannot do that, the aggregator service does) + activity.status = State.COLLECT_DATA.value + + activity.save() + + + def update_is_verified(task): """ The Validation page shows Activities (SAS_ID's) that are ready to be validated, by giving them a quality. diff --git a/atdb/taskdatabase/services/common.py b/atdb/taskdatabase/services/common.py index a27c173feeb9b21fe2ff1f725fdd3aaad7a0ab78..c4da474873448c9a9399ee1c414033b602b9c1ee 100644 --- a/atdb/taskdatabase/services/common.py +++ b/atdb/taskdatabase/services/common.py @@ -28,6 +28,7 @@ class State(Enum): SUSPENDED = "suspended" DISCARDED = "discarded" FAILED = "failed" + COLLECT_DATA = "collect_data" VERIFIED_STATUSSES = [State.STORED.value, State.VALIDATED.value, State.SCRUBBED.value, State.PRE_ARCHIVED.value, State.ARCHIVED.value, State.FINISHED.value, State.SUSPENDED.value, State.DISCARDED.value]