Skip to content
Snippets Groups Projects
Select Git revision
  • 5b2425e40205f3909dee49ad85d84eb634652c99
  • main default protected
  • extend_to_support_multi_evaluation
  • compute-smearterms-gpu
  • fix-radec2lmn
  • enable-radec2lmn-avx2
  • new-implementation
  • remove-duo-matrix
  • temp_initial_split
9 results

smearterms.cpp

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    input_validation.py 4.23 KiB
    import logging;
    import os.path
    
    logger = logging.getLogger(__name__)
    
    # copied from ldv_ingest module
    # https://git.astron.nl/astron-sdc/ldv-services/-/blob/master/atdb_services_pip/atdb_services/ldv_ingest.py?ref_type=heads#L1721
    def get_filename_without_hash(file_name):
        """
        Remove magic number (hash) between latest '_' and '.'
        The hash should be 8 characters long otherwise the original file_name will be returned
        e.g. LOFAR_PULSAR_ARCHIVE_locus004_L231550_red_c122bb36.tar will result in
             LOFAR_PULSAR_ARCHIVE_locus004_L231550_red.tar
        :param file_name:
        :return file_name_without_hash:
        """
        file_name_without_hash = file_name
        if "_" and "." in file_name:
            file_start_of_hash = file_name.rsplit("_", 1)[0]
            file_end_of_hash = file_name.rsplit(".", 1)[1]
            file_name_without_hash = f"{file_start_of_hash}.{file_end_of_hash}"
    
        # Check difference in length should be 8 char + '_'
        if (len(file_name) - len(file_name_without_hash)) != 9:
            logger.warning(f"No correct hash found in '{file_name_without_hash}'. Use the original filename")
            file_name_without_hash = file_name
    
        # only return the filename without the path
        filename = os.path.basename(file_name_without_hash)
        return filename
    
    def check_duplicates(data):
        """
        task.inputs can contain double entries (caused by faulty ingests during the 'mom era'). Remove them.
        The following is an example of an inputs section with a duplicate 'surl', the duplicate will be removed
    
        :return: found_duplicates is True when a duplicate was found
        :return:
        [
            {
                "size": 33638400,
                "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
                "type": "File",
                "location": "srm.grid.sara.nl"
            },
            {
                "size": 33638400,
                "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB001_uv.dppp.MS_74948c4c.tar",
                "type": "File",
                "location": "srm.grid.sara.nl"
            },
            {
                "size": 33638400,
                "surl": "srm://srm.grid.sara.nl:8443/pnfs/grid.sara.nl/data/lofar/ops/projects/lc5_020/432340/L432340_SB000_uv.dppp.MS_caf35c3d.tar",
                "type": "File",
                "location": "srm.grid.sara.nl"
            }]
    
        """
        logger.info(f'check_duplicates')
    
        if not data:
            # tasks without inputs should just return
            return False,None,None
    
        found_duplicates = False
        seen_surls = set()
        unique_inputs = []
        duplicates = []
    
        for item in data:
    
            filename_without_hash = get_filename_without_hash(item["surl"])
    
            if filename_without_hash not in seen_surls:
                unique_inputs.append(item)
                seen_surls.add(filename_without_hash)
            else:
                duplicates.append(filename_without_hash)
                found_duplicates = True
                logger.info(f'duplicate found: {filename_without_hash}')
    
        return found_duplicates,duplicates,unique_inputs
    
    
    def recalculate_size(data):
        """
        Operators or ATDB could have removed files from the inputs json blob.
        If that has happened, the task.size_to_process needs to be recalculated
        """
        logger.info(f'recalculate_size')
        new_size = sum([e["size"] for e in data])
        return new_size
    
    
    def validate_inputs(task):
        """
        check if the task.inputs is valid
        - currently only duplicates are checked, but this functionality can be expanded
        """
    
        # --- check for duplicate files in task.inputs ---
        found_duplicates,duplicates,unique_inputs = check_duplicates(task.inputs)
    
        # if a duplicate was found, recalculate the size_to_process and update the inputs list of the task
        if found_duplicates:
    
            # set a failed state.
            new_status = task.new_status + "_failed"
            task.new_status = new_status
    
            # add some information for the user
            # (abuse the unused 'task.environment' field as a 'cache' to transport this to a 'signal' (to avoid circular imports)
            task.environment = f"inputs_duplicates::{duplicates}"
    
    
        # --- recalculate sizes ---
        size_to_process = recalculate_size(task.inputs)
        task.size_to_process = size_to_process