Select Git revision
-
Corné Lukken authoredCorné Lukken authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
common.py 2.90 KiB
"""
common helper functions
"""
import logging;
from datetime import *
import time
from enum import Enum
logger = logging.getLogger(__name__)
class State(Enum):
UNKNOWN = "unknown"
DEFINING = "defining"
DEFINED = "defined"
STAGED = "staged"
FETCHED = "fetched"
PROCESSED = "processed"
AGGREGATE = "aggregate"
AGGREGATING = "aggregating"
AGGREGATED = "aggregated"
STORING = 'storing'
STORED = 'stored'
VALIDATED = "validated"
SCRUBBED = "scrubbed"
PRE_ARCHIVING = "pre_archiving"
PRE_ARCHIVED = "pre_archived"
ARCHIVING = "archiving"
ARCHIVED = "archived"
FINISHED = "finished"
FINISHING = "finishing"
SUSPENDED = "suspended"
DISCARD = "discard"
DISCARDED = "discarded"
FAILED = "failed"
COLLECTING_DATA = "collecting_data"
IDLE = "idle"
VERIFIED_STATUSSES = [State.STORED.value, State.VALIDATED.value, State.SCRUBBED.value, State.PRE_ARCHIVED.value,
State.ARCHIVED.value, State.FINISHED.value, State.SUSPENDED.value, State.DISCARDED.value]
PROCESSED_STATUSSES = [State.PROCESSED.value, State.AGGREGATE.value, State.AGGREGATING.value, State.AGGREGATED.value, State.STORED.value, State.STORING.value,
State.DISCARDED.value]
INGEST_FRACTION_STATUSSES = [State.SCRUBBED.value, State.PRE_ARCHIVING.value, State.PRE_ARCHIVED.value,
State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHING.value, State.FINISHED.value]
UPDATE_ARCHIVE_STATUSSES = [State.ARCHIVING.value, State.ARCHIVED.value, State.FINISHED.value]
ACTIVITY_RESET_STATUSSEN = [State.DEFINED.value, State.FETCHED.value]
class SummaryFlavour(Enum):
DEFAULT = "default"
LINC_CALIBRATOR = "linc_calibrator"
LINC_TARGET = "linc_target"
IMAGING_COMPRESSION = "imaging_compression"
class AggregationStrategy(Enum):
NONE = "none"
WAIT_FOR_SUMMARY_TASK = "wait_for_summary_task"
COLLECT_H5 = "collect_h5"
def get_summary_flavour(task):
"""
not every workflow has the same summary structure
determine the flavour based on the selected task, and construct the html accordingly
this could be made implicit in the future by adding a setting to the Workflow,
but currently it is derived in a yucky way. But at least the yuck is confined to this function
"""
summary_flavour = SummaryFlavour.DEFAULT.value
# so... yikes... what distinguishes the summary flavours?
workflow_uri = task.workflow.workflow_uri
# for linc, look at the workflow_uri
if "linc_calibrator" in workflow_uri:
return SummaryFlavour.LINC_CALIBRATOR.value
if "linc_target" in workflow_uri:
return SummaryFlavour.LINC_TARGET.value
try:
summary = task.quality_json["summary"]
except:
# no summary found
return None
if 'details' in summary.keys():
summary_flavour = SummaryFlavour.IMAGING_COMPRESSION.value
return summary_flavour