Skip to content
Snippets Groups Projects
Commit 89420b4a authored by Klaas Kliffen's avatar Klaas Kliffen :satellite:
Browse files

Merge branch 'add_prune_utils' into 'main'

Implement pruning mechanism for the utils script

See merge request !10
parents 58259d09 88f3c2ec
Branches
No related tags found
1 merge request!10Implement pruning mechanism for the utils script
Pipeline #39350 passed
default:
image: python:3.10 # use latest for building/linting
image: ubuntu:22.04 # use latest for building/linting
before_script:
- python --version # For debugging
- python -m pip install --upgrade pip
- apt update
- apt install -y libgfal2-dev python3.10 python3-pip cmake libboost-python-dev python3.10-venv
- python3 --version # For debugging
- python3 -m pip install --upgrade pip
- pip install --upgrade tox twine
cache:
paths:
......@@ -40,14 +42,8 @@ run_pylint:
- tox -e pylint
allow_failure: true
# build_extensions:
# stage: build_extensions
# script:
# - echo "build fortran/c/cpp extension source code"
run_unit_tests_coverage:
stage: test
image: python:3.7
script:
- tox -e coverage
artifacts:
......@@ -60,12 +56,11 @@ run_unit_tests_coverage:
run_unit_tests:
stage: test
image: python:3.${PY_VERSION}
script:
- tox -e py3${PY_VERSION}
parallel:
matrix: # use the matrix for testing
- PY_VERSION: [7, 8, 9, 10]
- PY_VERSION: [10]
package_files:
stage: package
......@@ -76,15 +71,6 @@ package_files:
script:
- tox -e build
package_docs:
stage: package
artifacts:
expire_in: 1w
paths:
- docs/* # update path to match the dest dir for documentation
script:
- echo "build and collect docs"
run_integration_tests:
stage: integration
needs:
......@@ -110,47 +96,3 @@ publish_on_gitlab:
python -m twine upload \
--repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
publish_on_test_pypi:
stage: publish
environment: pypi-test
needs:
- package_files
when: manual
rules:
- if: $CI_COMMIT_TAG
script:
- echo "run twine for test pypi"
# - |
# TWINE_PASSWORD=${PIPY_TOKEN} \
# TWINE_USERNAME=${PIPY_USERNAME} \
# TODO: replace URL with a pipy URL
# python -m twine upload \
# --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
publish_on_pypi:
stage: publish
environment: pypi
needs:
- package_files
when: manual
rules:
- if: $CI_COMMIT_TAG
script:
- echo "run twine for pypi"
# - |
# TWINE_PASSWORD=${PIPY_TOKEN} \
# TWINE_USERNAME=${PIPY_USERNAME} \
# TODO: replace URL with a pipy URL
# python -m twine upload \
# --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
publish_to_readthedocs:
stage: publish
environment: readthedocs
needs:
- package_docs
when: manual
rules:
- if: $CI_COMMIT_TAG
script:
- echo "scp docs/* ???"
# Example Python Package
# LDV Datamanagement Task
An example repository of an CI/CD pipeline for building, testing and publishing a python package.
A set of utils that can be used to manage the tasks data in ATDB
If you find some missing functionality with regards to CI/CD, testing, linting or something else, feel free to make a merge request with the proposed changes.
## Example of README.md contents below:
## Installation
To install the package go into the code repository and
execute:
```
pip install map
pip install .
```
## Usage
```
from map import cool_module
cool_module.greeter() # prints "Hello World"
atdb_mng clean --workflow [workflow_id] --status [tasks_status]
```
## Contributing
......
......@@ -5,4 +5,4 @@ try:
except ImportError: # for Python<3.8
import importlib_metadata as metadata
__version__ = metadata.version("my-awseome-app")
__version__ = metadata.version("ldv-datamanagement-tasks")
"""
This module is responsible for the communication to and from ATDB
"""
from typing import List, Generator
from argparse import Namespace
import requests
class APIError(Exception):
"""
The APIError is an exception which is raised when the communication with the ATDB
API fails or the requested operation fails
"""
def __init__(self, reply: requests.Response):
status_code = reply.status_code
reason = reply.reason
message = reply.content
url = reply.url
super().__init__(
self, f"Failed contacting {url} [{status_code}]: {reason} - {message}"
)
class DRFReply:
"""
A class to represent the DJANGO REST framework reply
"""
def __init__(self, response):
self._content = response.json()
@property
def n_items(self) -> int:
"""
Returns the number of items in the DRF reply
"""
return self._content["count"]
@property
def results(self) -> List[dict]:
"""
Access to the results list
"""
return self._content["results"]
@property
def next_page_url(self):
"""
Access to the next page if the results are paginated
"""
return self._content["next"]
@property
def previous_page_url(self):
"""
Access to the previous page of results if results are paginated
"""
return self._content["previous"]
class APIConnector:
"""
A class to represent the connection to the API
"""
def __init__(self, url, token):
self._url = url.rstrip("/")
self._token = token
self._session = None
@staticmethod
def from_args(args: Namespace):
"""
Creates API connector from command line arguments
"""
return APIConnector(args.atdb_site, args.token)
def session(self) -> requests.Session:
"""
Returns a http session object and creates if it is not initialized
"""
if self._session is None:
self._session = self.start_session()
return self._session
def start_session(self) -> requests.Session:
"""
Start a session
"""
session_instance = requests.Session()
session_instance.headers["Authorization"] = f"Token {self._token}"
session_instance.headers["content-type"] = "application/json"
session_instance.headers["cache-control"] = "no-cache"
return session_instance
def _request_url(self, method, url, query=None, content=None):
url = url.replace("http://", "https://")
response = self.session().request(method, url, params=query, json=content)
if not response.ok:
raise APIError(response)
return response
def _request_path(self, method, item, query=None, content=None):
url = "/".join((self._url, item.lstrip("/"), ""))
return self._request_url(method, url, query=query, content=content)
def list_iter(self, object_type, query=None) -> Generator[dict, None, None]:
"""
Returns a list iterator to a specific object_type in the REST API
"""
response = self._request_path("get", object_type, query=query)
drf_reply = DRFReply(response)
for item in drf_reply.results:
yield item
while drf_reply.next_page_url is not None:
drf_reply = DRFReply(self._request_url("get", drf_reply.next_page_url))
for item in drf_reply.results:
yield item
def change_task_status(self, task_id, status) -> None:
"""
Change the status of a task
"""
self._request_path("PUT", f"tasks/{task_id}", content={"new_status": status})
"""
Main entry point for command line script
"""
import logging
import os
from argparse import ArgumentParser, Namespace
from configparser import ConfigParser
from atdb.prune import prune
DEFAULT_PATH = os.path.expanduser("~/.config/ldv/services.cfg")
logging.basicConfig(
level=logging.DEBUG, format="%(asctime)s %(levelname)s : %(message)s"
)
logger = logging.getLogger("atdb_mngr")
def read_conf_file(args: Namespace, additional_location=None):
"""
Reads configuration files and append results to args namespace
"""
parser = ConfigParser()
if additional_location is not None:
read_files = parser.read(additional_location)
else:
read_files = parser.read(DEFAULT_PATH)
if not read_files and (args.atdb_site is None or args.token is None):
raise SystemError("Missing configuration file")
global_config = parser["ATDB"]
if "url" in global_config:
args.atdb_site = parser["ATDB"]["url"]
if "token" in global_config:
args.token = parser["ATDB"]["token"]
return args
def parse_args() -> (Namespace, ArgumentParser):
"""
Parse command line arguments
"""
parser = ArgumentParser(description="ATDB management tool")
parser.add_argument(
"--atdb_site", help="ATDB url", default="https://sdc-dev.astron.nl:5554/atdb"
)
parser.add_argument(
"--token",
help="ATDB token (if not provided it reads it from file",
)
parser.add_argument(
"--config",
help="Configuration file "
"(tries to load the one at ~/.config/ldv/services.cfg if not specified)",
default=None,
)
parser.add_argument("--dry_run", help="Test execution", action="store_true")
parser.add_argument("-v", action="store_true")
subparser = parser.add_subparsers(help="commands", dest="operation")
prune_parser = subparser.add_parser("prune")
prune_parser.add_argument("--workflow_id", help="Filters by workflow id")
prune_parser.add_argument("--status", help="Filter by status")
return parser.parse_args(), parser
def main():
"""
Main entry point
"""
args, parser = parse_args()
args = read_conf_file(args, args.config)
if args.v:
logger.setLevel(logging.DEBUG)
if args.operation == "prune":
prune(args)
else:
parser.print_help()
if __name__ == "__main__":
main()
"""
Prune command module
"""
import logging
from typing import Union, List
import gfal2
from atdb.communication import APIConnector
logger = logging.getLogger("prune")
def extract_surls_from_obj(
obj: Union[dict, list], partial: List[Union[dict, list]] = None
) -> List[Union[dict, list]]:
"""
Iterate over a nested object to extract surl values
"""
if partial is None:
partial = []
try:
if isinstance(obj, dict) and "surl" in obj:
partial.append(obj["surl"])
elif isinstance(obj, dict):
for value in obj.values():
extract_surls_from_obj(value, partial=partial)
elif isinstance(obj, (list, tuple)):
for value in obj:
extract_surls_from_obj(value, partial=partial)
except KeyError as exception:
logger.exception(exception)
return partial
def extract_task_surls_from_field(
item: dict, field_name: str
) -> List[Union[dict, list]]:
"""
Extract from task object field the surl
"""
return extract_surls_from_obj(item[field_name])
def remove_surl_locations(surls: List[str], dry_run=False) -> None:
"""
Removes SURL location if dry_run is specified it only tests
"""
context = gfal2.creat_context()
if not dry_run:
context.unlink(surls)
else:
for surl in surls:
logger.info("[dry-run] removing surl %s", surl)
logger.info("file stats are: %s", context.stat(surl))
def prune(args):
"""
Prune command entry point
"""
connector = APIConnector.from_args(args)
workflow_id = args.workflow_id
status = args.status
query = {}
if workflow_id is not None:
query["workflow__id"] = workflow_id
if status is not None:
query["status"] = status
surls_to_remove = []
logger.info("Toggling status of tasks in ATDB....")
toggled_items = 0
for task in connector.list_iter("tasks", query):
toggled_items += 1
if not args.dry_run:
connector.change_task_status(task["id"], "defining")
surls_to_remove += extract_task_surls_from_field(task, "outputs")
logger.info(
"Successfully toggled %s tasks proceeding in removing files (%s) from disk",
toggled_items,
len(surls_to_remove),
)
remove_surl_locations(surls_to_remove, dry_run=args.dry_run)
if args.dry_run:
logger.info("[dry-run] Successfully removed %s files", len(surls_to_remove))
else:
logger.info("Successfully removed %s files", len(surls_to_remove))
""" Cool module containing functions, classes and other useful things """
def greeter():
"""Prints a nice message"""
print("Hello World!")
numpy >= 1.20.0 # BSD
requests
gfal2-python
\ No newline at end of file
[metadata]
name = my-awseome-app
name = ldv-datamanagement-tasks
version = file: VERSION
description = An example package for CI/CD working group
description = A set of utils to manage task disk utilization
long_description = file: README.md
long_description_content_type = text/markdown
url = https://git.astron.nl/templates/python-package
url = https://git.astron.nl/ldv/ldv_datamanagement_tasks
license = Apache License 2.0
classifiers =
Development Status :: 3 - Alpha
......@@ -30,8 +30,12 @@ include_package_data = true
packages = find:
python_requires = >=3.7
install_requires =
importlib-metadata>=0.12;python_version<"3.8"
numpy
importlib-metadata>=0.12
requests
gfal2-python
[options.entry_points]
console_scripts =
atdb_mngr = atdb.main:main
[flake8]
max-line-length = 88
......
"""Testing of the Cool Module"""
from unittest import TestCase
from map.cool_module import greeter
class TestCoolModule(TestCase):
"""Test Case of the Cool Module"""
def test_greeter(self):
"""Testing that the greeter does not crash"""
greeter()
self.assertEqual(2 + 2, 4)
"""
Test prune command
"""
from unittest import TestCase
from atdb.prune import extract_surls_from_obj
class TestPruneUtils(TestCase):
"""Test Case of the prune utility functions"""
def test_surl_filtering(self):
"""
Test surl filtering utility function
"""
test_data = {
"item": [{"surl": "onesurl"}, 1, ["ciao", {"surl": "another_surl"}]],
"item2": {"surl": "third_surl"},
"item3": "bla",
}
result = extract_surls_from_obj(test_data)
expected_result = ["onesurl", "another_surl", "third_surl"]
self.assertListEqual(result, expected_result)
......@@ -21,7 +21,7 @@ commands =
[testenv:coverage]
commands =
{envpython} --version
{envpython} -m pytest --cov-report xml --cov-report html --cov=map
{envpython} -m pytest --cov-report xml --cov-report html --cov=atdb
# Use generative name and command prefixes to reuse the same virtualenv
# for all linting jobs.
......@@ -34,8 +34,8 @@ commands =
black: {envpython} -m black --version
black: {envpython} -m black --check --diff .
pylint: {envpython} -m pylint --version
pylint: {envpython} -m pylint map tests
format: {envpython} -m autopep8 -v -aa --in-place --recursive map
pylint: {envpython} -m pylint atdb tests --extension-pkg-allow-list=gfal2
format: {envpython} -m autopep8 -v -aa --in-place --recursive atdb
format: {envpython} -m autopep8 -v -aa --in-place --recursive tests
format: {envpython} -m black -v .
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment