Skip to content
Snippets Groups Projects
Select Git revision
  • 816299ee3d47dc246e888797d6c23cce07daf962
  • master default protected
  • control-single-hba-and-lba
  • stabilise-landing-page
  • all-stations-lofar2
  • L2SS-2357-fix-ruff
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • L2SS-1970-apsct-lol
  • DNM-pytango10.0.1rc1-test
  • remove-snmp-client
  • v0.52.3 protected
  • v0.52.3dev0 protected
  • 0.53.1dev0
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
  • v0.51.9-4 protected
  • v0.51.9-3 protected
  • v0.51.9-2 protected
  • v0.51.9-1 protected
  • v0.51.9 protected
  • v0.51.8 protected
  • v0.39.15-wsrttwo protected
  • v0.39.15-wsrt protected
41 results

01-devices.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    prune.py 2.65 KiB
    """
    Prune command module
    """
    import logging
    from typing import Union, List
    
    import gfal2
    
    from atdb.communication import APIConnector
    
    logger = logging.getLogger("prune")
    
    
    def extract_surls_from_obj(
        obj: Union[dict, list], partial: List[Union[dict, list]] = None
    ) -> List[Union[dict, list]]:
        """
        Iterate over a nested object to extract surl values
        """
        if partial is None:
            partial = []
    
        try:
            if isinstance(obj, dict) and "surl" in obj:
                partial.append(obj["surl"])
            elif isinstance(obj, dict):
                for value in obj.values():
                    extract_surls_from_obj(value, partial=partial)
            elif isinstance(obj, (list, tuple)):
                for value in obj:
                    extract_surls_from_obj(value, partial=partial)
        except KeyError as exception:
            logger.exception(exception)
        return partial
    
    
    def extract_task_surls_from_field(
        item: dict, field_name: str
    ) -> List[Union[dict, list]]:
        """
        Extract from task object field the surl
        """
        return extract_surls_from_obj(item[field_name])
    
    
    def remove_surl_locations(surls: List[str], dry_run=False) -> None:
        """
        Removes SURL location if dry_run is specified it only tests
        """
        context = gfal2.creat_context()
    
        for surl in surls:
            if not dry_run:
    
                logger.debug("removing surl %s", surl)
                logger.debug("file stats are: %s", context.stat(surl))
                context.unlink(surl)
            else:
                logger.info("[dry-run] removing surl %s", surl)
                logger.info("file stats are: %s", context.stat(surl))
    
    
    def prune(args):
        """
        Prune command entry point
        """
        connector = APIConnector.from_args(args)
        workflow_id = args.workflow_id
        status = args.status
        query = {}
        if workflow_id is not None:
            query["workflow__id"] = workflow_id
        if status is not None:
            query["status"] = status
    
        surls_to_remove = []
        logger.info("Toggling status of tasks in ATDB....")
        toggled_items = 0
        for task in connector.list_iter("tasks", query):
            toggled_items += 1
            if not args.dry_run:
                connector.change_task_status(task["id"], "defining")
            surls_to_remove += extract_task_surls_from_field(task, "outputs")
        logger.info(
            "Successfully toggled %s tasks proceeding in removing files (%s) from disk",
            toggled_items,
            len(surls_to_remove),
        )
    
        remove_surl_locations(surls_to_remove, dry_run=args.dry_run)
        if args.dry_run:
            logger.info("[dry-run] Successfully removed %s files", len(surls_to_remove))
        else:
            logger.info("Successfully removed %s files", len(surls_to_remove))