Skip to content
Snippets Groups Projects
Select Git revision
  • 915aa8b7a683731cad895b8de0fc36646191c80c
  • master default protected
  • revert-cs032-ccd-ip
  • deploy-components-parallel
  • fix-chrony-exporter
  • L2SS-2407-swap-iers-caltable-monitoring-port
  • L2SS-2357-fix-ruff
  • sync-up-with-meta-pypcc
  • stabilise-landing-page
  • all-stations-lofar2
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • v0.55.5-r2 protected
  • v0.52.8-rc1 protected
  • v0.55.5 protected
  • v0.55.4 protected
  • 0.55.2.dev0
  • 0.55.1.dev0
  • 0.55.0.dev0
  • v0.54.0 protected
  • 0.53.2.dev0
  • 0.53.1.dev0
  • v0.52.3-r2 protected
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
41 results

alerta_grafana.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    activities_handler.py 11.90 KiB
    import logging;
    from django.conf import settings
    from .common import State, AggregationStrategy, VERIFIED_STATUSSES, PROCESSED_STATUSSES, INGEST_FRACTION_STATUSSES, \
        UPDATE_ARCHIVE_STATUSSES, ACTIVITY_RESET_STATUSSEN
    from taskdatabase.models import Task, Activity, Configuration
    
    status_list = settings.STATUSSES_WITH_DATA
    
    logger = logging.getLogger(__name__)
    
    def calculate_ingested_fraction(this_task):
        """
        This 'property' of a task returns the fraction of queued/ingested tasks per SAS_ID
        and a list of statusses of other tasks belonging to the same SAS_ID.
        It is implemented as 'property', because then it can be used in html pages like this:
           <td>{{ task.sasid_ingested_fraction.status }}</td>
           <td>{{ task.sasid_ingested_fraction.completion }}%</td>
    
        A selection of statusses are considered 'queued', and another selection is considered 'ingested'.
        The division of those 2 are returned as a 'completed %'.
        A limited list of statusses for the other tasks that belong to this SAS_ID is also returned.
    
        """
        result = {}
        statusses = {'scrubbed': 0, 'pre_archiving': 0, 'pre_archived': 0,'archiving': 0, 'archived': 0,
                     'finishing': 0, 'finished': 0, 'suspended': 0, 'discarded': 0,
                     'pre_archived_failed': 0, 'archived_failed': 0, 'finished_failed': 0}
    
        tasks = Task.objects.filter(sas_id=this_task.sas_id)
    
        for task in tasks:
            try:
                statusses[task.status] += 1
            except:
                pass
    
        incomplete = int(statusses['scrubbed']) + int(statusses['pre_archiving']) + int(statusses['pre_archived']) + \
                     int(statusses['archiving']) +int(statusses['finishing']) + int(statusses['suspended']) + \
                     int(statusses['pre_archived_failed']) + int(statusses['archived_failed']) + int(statusses['finished_failed'])
        complete = int(statusses['archived']) + int(statusses['finished'])
        completion = round(complete / (incomplete + complete) * 100)
    
        non_zero_statusses = {key: value for key, value in statusses.items() if value != 0}
    
        result['status'] = non_zero_statusses
        result['completion'] = completion
        return result
    
    
    def calculate_finished_fraction(this_task):
            size_archived = 0
            size_remaining = 0
            total_size = 0
    
            tasks = Task.objects.filter(sas_id=this_task.sas_id)
    
            for task in tasks:
               if task.status == State.FINISHED.value:
                   size_archived += task.size_to_process
               else:
                   size_remaining += task.size_to_process
               total_size += task.size_to_process
    
            result = {}
            try:
                result['fraction'] = round((size_archived / (size_remaining + size_archived)) * 100)
            except:
                result['fraction'] = -1
    
            result['total_size'] = total_size
            result['remaining'] = size_remaining
    
            return result
    
    
    def update_ingest_fractions(task):
        """
        The IngestQ page shows the fraction of completed/archived tasks per SAS_ID
        This is the function that calculates that fraction when a relevant status transition was done.
        """
        logger.info(f'- update_ingest_fractions')
        activity = task.activity
    
        result = calculate_ingested_fraction(task)
        activity.ingested_fraction = result['completion']
        activity.ingestq_status = result['status']
        activity.save()
    
    
    def update_archive_info(task):
        """
        The Finished page shows some information about the archiving/ingest results,
        like the new SAS_ID that the resulting output of the pipeline got in the LTA
    
        This is the function retrieves that information from the tasks and stores it in the Activity
        when a relevant status transition was done.
        """
        logger.info(f'- update_archive_info')
        activity = task.activity
    
        for t in Task.objects.filter(sas_id=task.sas_id):
            try:
                if t.archive['sas_id_archived']:
                    activity.archive = t.archive
                    break
            except:
                pass
    
        activity.save()
    
    
    def update_finished_fraction(task):
        """
        The Failures page shows the failed tasks,
         but also the fraction of the tasks that were succesfully ingested for the same SAS_ID (activity)
    
        This is the function that calculates that fraction when a relevant status transition was done.
        """
        logger.info(f'- update_finished_fraction')
        activity = task.activity
    
        result = calculate_finished_fraction(task)
        activity.finished_fraction = result['fraction']
        activity.total_size = result['total_size']
        activity.remaining = result['remaining']
        activity.save()
    
    
    def reset_activity(task):
        """
        When a task is recycled back to DEFINED or FETCHED then the activity as a whole is no longer 'processed' or 'aggregated'.
        Reset those fields accordingly
        """
        try:
            logger.info(f'- reset activity')
            activity = task.activity
            activity.status = task.status
            activity.is_processed = False
            activity.is_aggregated = False
            activity.save()
    
        except:
            # only bother with it if the task actually has an activity attached
            # which is always... except in some simpler unittests
            pass
    
    def create_aggregation_task(task):
        """
        create a new aggregation task based on this task
        """
        aggregation_task = Task(
            task_type='aggregation',
            filter=task.filter,
            project=task.project,
            sas_id=task.sas_id,
            workflow=task.workflow,
            status=State.IDLE.value,
            new_status=State.IDLE.value,
            activity=task.activity)
    
        aggregation_task.save()
        return aggregation_task
    
    def update_processed_and_aggregate(task):
        """
        Check if the whole SAS_ID (activity) is processed.
        Currently this is used to set the activity.aggregate status, which triggers the aggregator service.
        """
        logger.info(f'- update_processed')
        activity = task.activity
    
        # this complicated looking piece of code checks if ALL tasks of the activity are processed,
        # if yes... set activity.status = AGGREGATE
    
        # this needs to be done for all aggregation_strategies,
        # because it signals the moment where all the input data for aggregation is (or has been) available.
        # For some strategies something extra needs to be done...
    
        current_is_processed = activity.is_processed
        activity.is_processed = True
        non_discarded_found = False
        for t in Task.objects.filter(sas_id=task.sas_id,task_type='regular'):
            if t.status not in PROCESSED_STATUSSES:
                activity.is_processed = False
                break
    
            # at least one of the tasks should NOT be in discarded,
            # otherwise a fully discarded SAS_ID will also register as 'is_processed' and ready to 'AGGREGATE'
            if t.status != State.DISCARDED.value:
                non_discarded_found = True
    
        # only save when changed
        if activity.is_processed != current_is_processed:
            # if the whole activity has become processed, then set the status of this activity to 'AGGREGATE'
            # unless it was already aggregated
            if (activity.is_processed & non_discarded_found):
                if not (activity.is_aggregated):
                    activity.status = State.AGGREGATE.value
    
            activity.save()
    
    
        if (task.workflow.aggregation_strategy == AggregationStrategy.COLLECT_H5.value):
    
            # check if the activity is ready to collect H5 data
            if (not activity.is_aggregated and
                    activity.status != State.COLLECTING_DATA.value and
                    activity.status != State.AGGREGATE.value):
    
                # create a new 'aggregate_task' that is used to collect the aggregated output
                # this has to be done only once, so this is a good place to do it.
                aggregation_tasks = Task.objects.filter(sas_id=task.sas_id,task_type='aggregation')
                if aggregation_tasks.count()==0:
                    aggregation_task = create_aggregation_task(task)
                else:
                    aggregation_task = aggregation_tasks[0]
    
                # check if there is already a storage_location, if not, add it.
                if not activity.storage_location:
                    # for this aggregation_strategy, the activity storage_location is the workdir of the aggregation task
                    activity.create_storage_location()
    
                # this means that its tasks know that they should copy their H5 files to the storage_location
                # (but the tasks cannot do that, the aggregator service does)
                activity.status = State.COLLECTING_DATA.value
    
                activity.save()
    
    
    
    def update_is_verified(task):
        """
        The Validation page shows Activities (SAS_ID's) that are ready to be validated, by giving them a quality.
        All tasks belonging to the SAS_ID must be 'verified' for that.
        If that is the case, the activity itself gets the 'is_verified' flag.
    
        This is the function that checks the verified state and updates the activity.is_verified flag.
        """
    
        logger.info(f'- update_verified')
        activity = task.activity
        # check if all tasks of this SAS_ID have a status that is considered 'verified'
        # this is used for the Validation Page
        current_is_verified = activity.is_verified
        activity.is_verified = True
        for t in Task.objects.filter(sas_id=task.sas_id):
            if t.status not in VERIFIED_STATUSSES:
                activity.is_verified = False
                break
    
        # only save when changed
        if activity.is_verified != current_is_verified:
            activity.save()
    
    def update_changed_fields(task):
        """
        It shouldn't happen, but technically it is possible that users change the filter
        """
        logger.info(f'- update_changed_fields')
        activity = task.activity
    
        if activity.filter != task.filter:
            activity.filter = task.filter
            activity.save()
    
        try:
            if activity.workflow_id != task.workflow.id:
                activity.workflow_id = task.workflow.id
                activity.save()
        except:
            # continue, workflow is not initially needed
            logger.error(f'no workflow found for this task')
    
    def update_activity(task):
        """
        The activity (SAS_ID level) is updated when a task changes status.
        Depending on the type of status change, certain calculations and updates are performed.
        Doing this on status change, instead of on-the-fly when a user enters a page, balances the load
        and improves overall performance
    
        - to 'ARCHIVING, ARCHIVED, FINISHED'         : check for incoming/existing 'archive' json from archiver
        - to SCRUBBED .. FINISHED                    : calculate ingested_fraction
        - to _FAILED                                 : calculate finished_fraction
        - to (processed_statusses)                   : check if all tasks are processed, set Activity to is_processed and AGGREGATE
        - always                                     : determine if a task is in a 'verified' status
    
        """
        logger.info(f'update_activity for task {task.id} with sas_id {task.sas_id} and status {task.status}')
    
        # depending on the status transition, perform the appropriate action
    
        # calculate the fraction and list of statusses of ingested tasks of this SAS_ID
        if task.status in INGEST_FRACTION_STATUSSES:
            update_ingest_fractions(task)
    
        # check of any task of this activity already has LTA information. If so, copy to the activity level
        if task.status in UPDATE_ARCHIVE_STATUSSES:
            update_archive_info(task)
    
        if task.status in ACTIVITY_RESET_STATUSSEN:
            reset_activity(task)
    
        # calculate the finished fraction for failed tasks
        if State.FAILED.value in task.status:
            update_finished_fraction(task)
    
    
        # check if all tasks of this SAS_ID have a status that is considered 'processed'
        # this is used as a trigger for the ancillary service
        if task.status in PROCESSED_STATUSSES:
            update_processed_and_aggregate(task)
    
        # check if all tasks for this activity are in a verified status.
        update_is_verified(task)
    
        # check if users have changed certain fields after the specification step
        update_changed_fields(task)