Select Git revision
01-devices.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
prune.py 2.65 KiB
"""
Prune command module
"""
import logging
from typing import Union, List
import gfal2
from atdb.communication import APIConnector
logger = logging.getLogger("prune")
def extract_surls_from_obj(
obj: Union[dict, list], partial: List[Union[dict, list]] = None
) -> List[Union[dict, list]]:
"""
Iterate over a nested object to extract surl values
"""
if partial is None:
partial = []
try:
if isinstance(obj, dict) and "surl" in obj:
partial.append(obj["surl"])
elif isinstance(obj, dict):
for value in obj.values():
extract_surls_from_obj(value, partial=partial)
elif isinstance(obj, (list, tuple)):
for value in obj:
extract_surls_from_obj(value, partial=partial)
except KeyError as exception:
logger.exception(exception)
return partial
def extract_task_surls_from_field(
item: dict, field_name: str
) -> List[Union[dict, list]]:
"""
Extract from task object field the surl
"""
return extract_surls_from_obj(item[field_name])
def remove_surl_locations(surls: List[str], dry_run=False) -> None:
"""
Removes SURL location if dry_run is specified it only tests
"""
context = gfal2.creat_context()
for surl in surls:
if not dry_run:
logger.debug("removing surl %s", surl)
logger.debug("file stats are: %s", context.stat(surl))
context.unlink(surl)
else:
logger.info("[dry-run] removing surl %s", surl)
logger.info("file stats are: %s", context.stat(surl))
def prune(args):
"""
Prune command entry point
"""
connector = APIConnector.from_args(args)
workflow_id = args.workflow_id
status = args.status
query = {}
if workflow_id is not None:
query["workflow__id"] = workflow_id
if status is not None:
query["status"] = status
surls_to_remove = []
logger.info("Toggling status of tasks in ATDB....")
toggled_items = 0
for task in connector.list_iter("tasks", query):
toggled_items += 1
if not args.dry_run:
connector.change_task_status(task["id"], "defining")
surls_to_remove += extract_task_surls_from_field(task, "outputs")
logger.info(
"Successfully toggled %s tasks proceeding in removing files (%s) from disk",
toggled_items,
len(surls_to_remove),
)
remove_surl_locations(surls_to_remove, dry_run=args.dry_run)
if args.dry_run:
logger.info("[dry-run] Successfully removed %s files", len(surls_to_remove))
else:
logger.info("Successfully removed %s files", len(surls_to_remove))