From d62f0535940f46f2d4d41e6582d431d36a728002 Mon Sep 17 00:00:00 2001 From: mancini <mancini@astron.nl> Date: Wed, 9 Nov 2022 15:09:18 +0100 Subject: [PATCH] Implement pruning mechanism for the utils script --- README.md | 16 ++++---- {map => atdb}/__init__.py | 0 atdb/communication.py | 81 +++++++++++++++++++++++++++++++++++++++ atdb/main.py | 57 +++++++++++++++++++++++++++ atdb/prune.py | 67 ++++++++++++++++++++++++++++++++ map/cool_module.py | 6 --- requirements.txt | 3 +- setup.cfg | 11 +++--- tests/test_cool_module.py | 13 ------- tests/test_prune.py | 16 ++++++++ 10 files changed, 236 insertions(+), 34 deletions(-) rename {map => atdb}/__init__.py (100%) create mode 100644 atdb/communication.py create mode 100644 atdb/main.py create mode 100644 atdb/prune.py delete mode 100644 map/cool_module.py delete mode 100644 tests/test_cool_module.py create mode 100644 tests/test_prune.py diff --git a/README.md b/README.md index 979ee10..012df21 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,20 @@ -# Example Python Package +# LDV Datamanagement Task -An example repository of an CI/CD pipeline for building, testing and publishing a python package. +A set of utils that can be used to manage the tasks data in ATDB -If you find some missing functionality with regards to CI/CD, testing, linting or something else, feel free to make a merge request with the proposed changes. -## Example of README.md contents below: - ## Installation + +To install the package go into the code repository and +execute: ``` -pip install map +pip install . ``` ## Usage ``` -from map import cool_module - -cool_module.greeter() # prints "Hello World" +atdb_mng clean --workflow [workflow_id] --status [tasks_status] ``` ## Contributing diff --git a/map/__init__.py b/atdb/__init__.py similarity index 100% rename from map/__init__.py rename to atdb/__init__.py diff --git a/atdb/communication.py b/atdb/communication.py new file mode 100644 index 0000000..676b42a --- /dev/null +++ b/atdb/communication.py @@ -0,0 +1,81 @@ +import requests + + +class APIError(Exception): + def __init__(self, reply: requests.Response): + status_code = reply.status_code + reason = reply.reason + message = reply.content + url = reply.url + + super().__init__(self, f'Failed contacting {url} [{status_code}]: {reason} - {message} ') + + +class DRFReply: + def __init__(self, response): + self._content = response.json() + + @property + def n_items(self): + return self._content['count'] + + @property + def results(self): + return self._content['results'] + + @property + def next_page_url(self): + return self._content['next'] + + @property + def previous_page_url(self): + return self._content['previous'] + + +class APIConnector: + def __init__(self, url, token): + self._url = url.rstrip('/') + self._token = token + self._session = None + + @staticmethod + def from_args(args): + return APIConnector(args.atdb_site, args.token) + + def session(self): + if self._session is None: + self._session = self.start_session() + return self._session + + def start_session(self): + s = requests.Session() + s.headers['Authorization'] = f'Token {self._token}' + s.headers['content-type'] = "application/json" + s.headers['cache-control'] = "no-cache" + return s + + def _request_url(self, method, url, query=None, content=None): + url = url.replace('http://', 'https://') + response = self.session().request(method, url, params=query, json=content) + if not response.ok: + raise APIError(response) + return response + + def _request_path(self, method, item, query=None, content=None): + url = '/'.join((self._url, item.lstrip('/'), '')) + return self._request_url(method, url, query=query, content=content) + + def list_iter(self, item, query=None): + response = self._request_path('get', item, query=query) + drf_reply = DRFReply(response) + + for r in drf_reply.results: + yield r + + while drf_reply.next_page_url is not None: + drf_reply = DRFReply(self._request_url('get', drf_reply.next_page_url)) + for r in drf_reply.results: + yield r + + def change_task_status(self, task_id, status): + self._request_path('PUT', f'tasks/{task_id}', content={'new_status': status}) diff --git a/atdb/main.py b/atdb/main.py new file mode 100644 index 0000000..f67ab7a --- /dev/null +++ b/atdb/main.py @@ -0,0 +1,57 @@ +import logging +from argparse import ArgumentParser +from configparser import ConfigParser +from atdb.prune import prune +import os +logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s') + +logger = logging.getLogger('atdb_mngr') + + +def read_conf_file(args, additional_location=None): + DEFAULT_PATH = os.path.expanduser('~/.config/ldv/services.cfg') + parser = ConfigParser() + if additional_location is not None: + parser.read(additional_location) + else: + parser.read(DEFAULT_PATH) + + global_config = parser['ATDB'] + if 'url' in global_config: + args.atdb_site = parser['ATDB']['url'] + if 'token' in global_config: + args.token = parser['ATDB']['token'] + return args + + +def parse_args(): + parser = ArgumentParser(description='ATDB management tool') + parser.add_argument('--atdb_site', help='ATDB url', default='https://sdc-dev.astron.nl:5554/atdb') + parser.add_argument('--token', help='ATDB token (if not provided it reads it from file', ) + parser.add_argument('--config', help='Configuration file ' + '(tries to load the one at ~/.config/ldv/services.cfg if not specified)', + default=None) + parser.add_argument('--dry_run', help='Test execution', action='store_true') + parser.add_argument('-v', action='store_true') + subparser = parser.add_subparsers(help='commands', dest='operation') + prune_parser = subparser.add_parser('prune') + prune_parser.add_argument('--workflow_id', help='Filters by workflow id') + prune_parser.add_argument('--status', help='Filter by status') + return parser.parse_args(), parser + + + + +def main(): + args, parser = parse_args() + args = read_conf_file(args, args.config) + if args.v: + logger.setLevel(logging.DEBUG) + + if args.operation == 'prune': + prune(args) + else: + parser.print_help() + +if __name__ == '__main__': + main() diff --git a/atdb/prune.py b/atdb/prune.py new file mode 100644 index 0000000..00e8cef --- /dev/null +++ b/atdb/prune.py @@ -0,0 +1,67 @@ +import logging + +import gfal2 + +from atdb.communication import APIConnector + +logger = logging.getLogger('prune') + + +def extract_surls_from_obj(obj, partial=None): + if partial is None: + partial = [] + + try: + if isinstance(obj, dict) and 'surl' in obj: + partial.append(obj['surl']) + elif isinstance(obj, dict): + for key, value in obj.items(): + extract_surls_from_obj(value, partial=partial) + elif isinstance(obj, list) or isinstance(obj, tuple): + for value in obj: + extract_surls_from_obj(value, partial=partial) + except Exception as e: + logging.exception(e) + print(obj, partial) + raise SystemExit(1) + return partial + + +def extract_task_surls_from_field(item, field_name): + return extract_surls_from_obj(item[field_name]) + + +def remove_surl_locations(surls, dry_run=False): + context = gfal2.creat_context() + + for surl in surls: + if not dry_run: + logging.debug('removing file: %s', context.stat(surl)) + context.release(surl) + else: + logging.info('removing file: %s', context.stat(surl)) + + +def prune(args): + connector = APIConnector.from_args(args) + workflow_id = args.workflow_id + status = args.status + query = {} + if workflow_id is not None: + query['workflow__id'] = workflow_id + if status is not None: + query['status'] = status + + surls_to_remove = [] + logger.info('Toggling status of tasks in ATDB....') + toggled_items = 0 + for task in connector.list_iter('tasks', query): + toggled_items += 1 + if not args.dry_run: + connector.change_task_status(task['id'], 'defining') + surls_to_remove += extract_task_surls_from_field(task, 'outputs') + logger.info('Successfully toggled %s tasks proceeding in removing files (%s) from disk', + toggled_items, len(surls_to_remove)) + + remove_surl_locations(surls_to_remove, dry_run=args.dry_run) + logger.info('Successfully removed %s files', len(surls_to_remove)) diff --git a/map/cool_module.py b/map/cool_module.py deleted file mode 100644 index bd9416c..0000000 --- a/map/cool_module.py +++ /dev/null @@ -1,6 +0,0 @@ -""" Cool module containing functions, classes and other useful things """ - - -def greeter(): - """Prints a nice message""" - print("Hello World!") diff --git a/requirements.txt b/requirements.txt index 56aacd7..7961efe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -numpy >= 1.20.0 # BSD +requests +gfal2-python \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 8233788..9fdb45e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,10 +1,10 @@ [metadata] -name = my-awseome-app +name = ldv-datamanagement-tasks version = file: VERSION -description = An example package for CI/CD working group +description = A set of utils to manage task disk utilization long_description = file: README.md long_description_content_type = text/markdown -url = https://git.astron.nl/templates/python-package +url = https://git.astron.nl/ldv/ldv_datamanagement_tasks license = Apache License 2.0 classifiers = Development Status :: 3 - Alpha @@ -30,8 +30,9 @@ include_package_data = true packages = find: python_requires = >=3.7 install_requires = - importlib-metadata>=0.12;python_version<"3.8" - numpy + importlib-metadata>=0.12 + requests + gfal2-python [flake8] max-line-length = 88 diff --git a/tests/test_cool_module.py b/tests/test_cool_module.py deleted file mode 100644 index da1002b..0000000 --- a/tests/test_cool_module.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Testing of the Cool Module""" -from unittest import TestCase - -from map.cool_module import greeter - - -class TestCoolModule(TestCase): - """Test Case of the Cool Module""" - - def test_greeter(self): - """Testing that the greeter does not crash""" - greeter() - self.assertEqual(2 + 2, 4) diff --git a/tests/test_prune.py b/tests/test_prune.py new file mode 100644 index 0000000..f57d317 --- /dev/null +++ b/tests/test_prune.py @@ -0,0 +1,16 @@ +from unittest import TestCase +from atdb.prune import extract_surls_from_obj + + +class TestPruneUtils(TestCase): + """Test Case of the prune utility functions""" + + def test_surl_filtering(self): + test_data = { + 'item': [{'surl': 'onesurl'}, 1, ['ciao', {'surl': 'another_surl'}]], + 'item2': {'surl': 'third_surl'}, + 'item3': 'bla' + } + result = extract_surls_from_obj(test_data) + expected_result = ['onesurl', 'another_surl', 'third_surl'] + self.assertListEqual(result, expected_result) -- GitLab