diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index b17457db642b7eceb4eed0f04724adb973ba7f4e..50b2bf0e76e8b525a507604b605bfc2ec65ce660 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -1,13 +1,15 @@ default: - image: python:3.10 # use latest for building/linting + image: ubuntu:22.04 # use latest for building/linting before_script: - - python --version # For debugging - - python -m pip install --upgrade pip + - apt update + - apt install -y libgfal2-dev python3.10 python3-pip cmake libboost-python-dev python3.10-venv + - python3 --version # For debugging + - python3 -m pip install --upgrade pip - pip install --upgrade tox twine cache: paths: - .cache/pip - # Do not cache .tox, to recreate virtualenvs for every step + # Do not cache .tox, to recreate virtual envs for every step stages: - lint @@ -40,14 +42,8 @@ run_pylint: - tox -e pylint allow_failure: true -# build_extensions: -# stage: build_extensions -# script: -# - echo "build fortran/c/cpp extension source code" - run_unit_tests_coverage: stage: test - image: python:3.7 script: - tox -e coverage artifacts: @@ -60,12 +56,11 @@ run_unit_tests_coverage: run_unit_tests: stage: test - image: python:3.${PY_VERSION} script: - tox -e py3${PY_VERSION} parallel: matrix: # use the matrix for testing - - PY_VERSION: [7, 8, 9, 10] + - PY_VERSION: [10] package_files: stage: package @@ -76,15 +71,6 @@ package_files: script: - tox -e build -package_docs: - stage: package - artifacts: - expire_in: 1w - paths: - - docs/* # update path to match the dest dir for documentation - script: - - echo "build and collect docs" - run_integration_tests: stage: integration needs: @@ -110,47 +96,3 @@ publish_on_gitlab: python -m twine upload \ --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* -publish_on_test_pypi: - stage: publish - environment: pypi-test - needs: - - package_files - when: manual - rules: - - if: $CI_COMMIT_TAG - script: - - echo "run twine for test pypi" - # - | - # TWINE_PASSWORD=${PIPY_TOKEN} \ - # TWINE_USERNAME=${PIPY_USERNAME} \ - # TODO: replace URL with a pipy URL - # python -m twine upload \ - # --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* - -publish_on_pypi: - stage: publish - environment: pypi - needs: - - package_files - when: manual - rules: - - if: $CI_COMMIT_TAG - script: - - echo "run twine for pypi" - # - | - # TWINE_PASSWORD=${PIPY_TOKEN} \ - # TWINE_USERNAME=${PIPY_USERNAME} \ - # TODO: replace URL with a pipy URL - # python -m twine upload \ - # --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/* - -publish_to_readthedocs: - stage: publish - environment: readthedocs - needs: - - package_docs - when: manual - rules: - - if: $CI_COMMIT_TAG - script: - - echo "scp docs/* ???" diff --git a/README.md b/README.md index 979ee10941a6ed6da6e3617a93241d1d32c8cc6e..012df21afcac2dd243f63f9ae6a37236d173c2e6 100644 --- a/README.md +++ b/README.md @@ -1,22 +1,20 @@ -# Example Python Package +# LDV Datamanagement Task -An example repository of an CI/CD pipeline for building, testing and publishing a python package. +A set of utils that can be used to manage the tasks data in ATDB -If you find some missing functionality with regards to CI/CD, testing, linting or something else, feel free to make a merge request with the proposed changes. -## Example of README.md contents below: - ## Installation + +To install the package go into the code repository and +execute: ``` -pip install map +pip install . ``` ## Usage ``` -from map import cool_module - -cool_module.greeter() # prints "Hello World" +atdb_mng clean --workflow [workflow_id] --status [tasks_status] ``` ## Contributing diff --git a/map/__init__.py b/atdb/__init__.py similarity index 71% rename from map/__init__.py rename to atdb/__init__.py index 6681798345ae231c5cf5bfee036b809e2e598e1c..6dd3f98669522168daf25a1f30d22e49228ea4e2 100644 --- a/map/__init__.py +++ b/atdb/__init__.py @@ -5,4 +5,4 @@ try: except ImportError: # for Python<3.8 import importlib_metadata as metadata -__version__ = metadata.version("my-awseome-app") +__version__ = metadata.version("ldv-datamanagement-tasks") diff --git a/atdb/communication.py b/atdb/communication.py new file mode 100644 index 0000000000000000000000000000000000000000..a7cb8e86070569df1ffd6b2d3733b4f0e244a1b2 --- /dev/null +++ b/atdb/communication.py @@ -0,0 +1,129 @@ +""" +This module is responsible for the communication to and from ATDB +""" +from typing import List, Generator +from argparse import Namespace +import requests + + +class APIError(Exception): + """ + The APIError is an exception which is raised when the communication with the ATDB + API fails or the requested operation fails + """ + + def __init__(self, reply: requests.Response): + status_code = reply.status_code + reason = reply.reason + message = reply.content + url = reply.url + + super().__init__( + self, f"Failed contacting {url} [{status_code}]: {reason} - {message}" + ) + + +class DRFReply: + """ + A class to represent the DJANGO REST framework reply + """ + + def __init__(self, response): + self._content = response.json() + + @property + def n_items(self) -> int: + """ + Returns the number of items in the DRF reply + """ + return self._content["count"] + + @property + def results(self) -> List[dict]: + """ + Access to the results list + """ + return self._content["results"] + + @property + def next_page_url(self): + """ + Access to the next page if the results are paginated + """ + return self._content["next"] + + @property + def previous_page_url(self): + """ + Access to the previous page of results if results are paginated + """ + return self._content["previous"] + + +class APIConnector: + """ + A class to represent the connection to the API + """ + + def __init__(self, url, token): + self._url = url.rstrip("/") + self._token = token + self._session = None + + @staticmethod + def from_args(args: Namespace): + """ + Creates API connector from command line arguments + """ + return APIConnector(args.atdb_site, args.token) + + def session(self) -> requests.Session: + """ + Returns a http session object and creates if it is not initialized + """ + if self._session is None: + self._session = self.start_session() + return self._session + + def start_session(self) -> requests.Session: + """ + Start a session + """ + session_instance = requests.Session() + session_instance.headers["Authorization"] = f"Token {self._token}" + session_instance.headers["content-type"] = "application/json" + session_instance.headers["cache-control"] = "no-cache" + return session_instance + + def _request_url(self, method, url, query=None, content=None): + url = url.replace("http://", "https://") + response = self.session().request(method, url, params=query, json=content) + if not response.ok: + raise APIError(response) + return response + + def _request_path(self, method, item, query=None, content=None): + url = "/".join((self._url, item.lstrip("/"), "")) + return self._request_url(method, url, query=query, content=content) + + def list_iter(self, object_type, query=None) -> Generator[dict, None, None]: + """ + Returns a list iterator to a specific object_type in the REST API + + """ + response = self._request_path("get", object_type, query=query) + drf_reply = DRFReply(response) + + for item in drf_reply.results: + yield item + + while drf_reply.next_page_url is not None: + drf_reply = DRFReply(self._request_url("get", drf_reply.next_page_url)) + for item in drf_reply.results: + yield item + + def change_task_status(self, task_id, status) -> None: + """ + Change the status of a task + """ + self._request_path("PUT", f"tasks/{task_id}", content={"new_status": status}) diff --git a/atdb/main.py b/atdb/main.py new file mode 100644 index 0000000000000000000000000000000000000000..2268a3a9ed2a2457cc5aae4d26636e175ac674c0 --- /dev/null +++ b/atdb/main.py @@ -0,0 +1,83 @@ +""" +Main entry point for command line script +""" + +import logging +import os +from argparse import ArgumentParser, Namespace +from configparser import ConfigParser + +from atdb.prune import prune + +DEFAULT_PATH = os.path.expanduser("~/.config/ldv/services.cfg") +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s %(levelname)s : %(message)s" +) + +logger = logging.getLogger("atdb_mngr") + + +def read_conf_file(args: Namespace, additional_location=None): + """ + Reads configuration files and append results to args namespace + """ + parser = ConfigParser() + if additional_location is not None: + read_files = parser.read(additional_location) + else: + read_files = parser.read(DEFAULT_PATH) + + if not read_files and (args.atdb_site is None or args.token is None): + raise SystemError("Missing configuration file") + global_config = parser["ATDB"] + if "url" in global_config: + args.atdb_site = parser["ATDB"]["url"] + if "token" in global_config: + args.token = parser["ATDB"]["token"] + return args + + +def parse_args() -> (Namespace, ArgumentParser): + """ + Parse command line arguments + """ + parser = ArgumentParser(description="ATDB management tool") + parser.add_argument( + "--atdb_site", help="ATDB url", default="https://sdc-dev.astron.nl:5554/atdb" + ) + parser.add_argument( + "--token", + help="ATDB token (if not provided it reads it from file", + ) + parser.add_argument( + "--config", + help="Configuration file " + "(tries to load the one at ~/.config/ldv/services.cfg if not specified)", + default=None, + ) + parser.add_argument("--dry_run", help="Test execution", action="store_true") + parser.add_argument("-v", action="store_true") + subparser = parser.add_subparsers(help="commands", dest="operation") + prune_parser = subparser.add_parser("prune") + prune_parser.add_argument("--workflow_id", help="Filters by workflow id") + prune_parser.add_argument("--status", help="Filter by status") + return parser.parse_args(), parser + + +def main(): + """ + Main entry point + """ + args, parser = parse_args() + args = read_conf_file(args, args.config) + if args.v: + logger.setLevel(logging.DEBUG) + + if args.operation == "prune": + prune(args) + else: + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/atdb/prune.py b/atdb/prune.py new file mode 100644 index 0000000000000000000000000000000000000000..b4723c5599a332fa1d295844b5fd04b8efd71686 --- /dev/null +++ b/atdb/prune.py @@ -0,0 +1,90 @@ +""" +Prune command module +""" +import logging +from typing import Union, List + +import gfal2 + +from atdb.communication import APIConnector + +logger = logging.getLogger("prune") + + +def extract_surls_from_obj( + obj: Union[dict, list], partial: List[Union[dict, list]] = None +) -> List[Union[dict, list]]: + """ + Iterate over a nested object to extract surl values + """ + if partial is None: + partial = [] + + try: + if isinstance(obj, dict) and "surl" in obj: + partial.append(obj["surl"]) + elif isinstance(obj, dict): + for value in obj.values(): + extract_surls_from_obj(value, partial=partial) + elif isinstance(obj, (list, tuple)): + for value in obj: + extract_surls_from_obj(value, partial=partial) + except KeyError as exception: + logger.exception(exception) + return partial + + +def extract_task_surls_from_field( + item: dict, field_name: str +) -> List[Union[dict, list]]: + """ + Extract from task object field the surl + """ + return extract_surls_from_obj(item[field_name]) + + +def remove_surl_locations(surls: List[str], dry_run=False) -> None: + """ + Removes SURL location if dry_run is specified it only tests + """ + context = gfal2.creat_context() + if not dry_run: + context.unlink(surls) + else: + for surl in surls: + logger.info("[dry-run] removing surl %s", surl) + logger.info("file stats are: %s", context.stat(surl)) + + +def prune(args): + """ + Prune command entry point + """ + connector = APIConnector.from_args(args) + workflow_id = args.workflow_id + status = args.status + query = {} + if workflow_id is not None: + query["workflow__id"] = workflow_id + if status is not None: + query["status"] = status + + surls_to_remove = [] + logger.info("Toggling status of tasks in ATDB....") + toggled_items = 0 + for task in connector.list_iter("tasks", query): + toggled_items += 1 + if not args.dry_run: + connector.change_task_status(task["id"], "defining") + surls_to_remove += extract_task_surls_from_field(task, "outputs") + logger.info( + "Successfully toggled %s tasks proceeding in removing files (%s) from disk", + toggled_items, + len(surls_to_remove), + ) + + remove_surl_locations(surls_to_remove, dry_run=args.dry_run) + if args.dry_run: + logger.info("[dry-run] Successfully removed %s files", len(surls_to_remove)) + else: + logger.info("Successfully removed %s files", len(surls_to_remove)) diff --git a/map/cool_module.py b/map/cool_module.py deleted file mode 100644 index bd9416c291b7ed52a278bc6966f2dac155e4aa05..0000000000000000000000000000000000000000 --- a/map/cool_module.py +++ /dev/null @@ -1,6 +0,0 @@ -""" Cool module containing functions, classes and other useful things """ - - -def greeter(): - """Prints a nice message""" - print("Hello World!") diff --git a/requirements.txt b/requirements.txt index 56aacd774b88e370f0e2fad347abb341df445332..7961efe3e3e3a2ade42c7f84f3777acb72711dbb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,2 @@ -numpy >= 1.20.0 # BSD +requests +gfal2-python \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 8233788c085fb0962d36440627a54954eb17c4e1..499d392fa78c8379b76757b6329ecff9da661b3f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,10 +1,10 @@ [metadata] -name = my-awseome-app +name = ldv-datamanagement-tasks version = file: VERSION -description = An example package for CI/CD working group +description = A set of utils to manage task disk utilization long_description = file: README.md long_description_content_type = text/markdown -url = https://git.astron.nl/templates/python-package +url = https://git.astron.nl/ldv/ldv_datamanagement_tasks license = Apache License 2.0 classifiers = Development Status :: 3 - Alpha @@ -30,8 +30,12 @@ include_package_data = true packages = find: python_requires = >=3.7 install_requires = - importlib-metadata>=0.12;python_version<"3.8" - numpy + importlib-metadata>=0.12 + requests + gfal2-python +[options.entry_points] +console_scripts = + atdb_mngr = atdb.main:main [flake8] max-line-length = 88 diff --git a/tests/test_cool_module.py b/tests/test_cool_module.py deleted file mode 100644 index da1002b8b66176f7efb5e24c3b4748d0eb49ed51..0000000000000000000000000000000000000000 --- a/tests/test_cool_module.py +++ /dev/null @@ -1,13 +0,0 @@ -"""Testing of the Cool Module""" -from unittest import TestCase - -from map.cool_module import greeter - - -class TestCoolModule(TestCase): - """Test Case of the Cool Module""" - - def test_greeter(self): - """Testing that the greeter does not crash""" - greeter() - self.assertEqual(2 + 2, 4) diff --git a/tests/test_prune.py b/tests/test_prune.py new file mode 100644 index 0000000000000000000000000000000000000000..bd0f9eae52fb7d596b8f66c24c03264998fd0577 --- /dev/null +++ b/tests/test_prune.py @@ -0,0 +1,22 @@ +""" +Test prune command +""" +from unittest import TestCase +from atdb.prune import extract_surls_from_obj + + +class TestPruneUtils(TestCase): + """Test Case of the prune utility functions""" + + def test_surl_filtering(self): + """ + Test surl filtering utility function + """ + test_data = { + "item": [{"surl": "onesurl"}, 1, ["ciao", {"surl": "another_surl"}]], + "item2": {"surl": "third_surl"}, + "item3": "bla", + } + result = extract_surls_from_obj(test_data) + expected_result = ["onesurl", "another_surl", "third_surl"] + self.assertListEqual(result, expected_result) diff --git a/tox.ini b/tox.ini index c0530645e8f647ea7d94921271c1227dcd3522b0..49c39c0fb289bfa445fb8f21b1908275018fe1fc 100644 --- a/tox.ini +++ b/tox.ini @@ -21,7 +21,7 @@ commands = [testenv:coverage] commands = {envpython} --version - {envpython} -m pytest --cov-report xml --cov-report html --cov=map + {envpython} -m pytest --cov-report xml --cov-report html --cov=atdb # Use generative name and command prefixes to reuse the same virtualenv # for all linting jobs. @@ -34,8 +34,8 @@ commands = black: {envpython} -m black --version black: {envpython} -m black --check --diff . pylint: {envpython} -m pylint --version - pylint: {envpython} -m pylint map tests - format: {envpython} -m autopep8 -v -aa --in-place --recursive map + pylint: {envpython} -m pylint atdb tests --extension-pkg-allow-list=gfal2 + format: {envpython} -m autopep8 -v -aa --in-place --recursive atdb format: {envpython} -m autopep8 -v -aa --in-place --recursive tests format: {envpython} -m black -v .