Skip to content
Snippets Groups Projects
Commit d62f0535 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Implement pruning mechanism for the utils script

parent 58259d09
No related branches found
No related tags found
1 merge request!10Implement pruning mechanism for the utils script
Pipeline #39170 failed
# Example Python Package
# LDV Datamanagement Task
An example repository of an CI/CD pipeline for building, testing and publishing a python package.
A set of utils that can be used to manage the tasks data in ATDB
If you find some missing functionality with regards to CI/CD, testing, linting or something else, feel free to make a merge request with the proposed changes.
## Example of README.md contents below:
## Installation
To install the package go into the code repository and
execute:
```
pip install map
pip install .
```
## Usage
```
from map import cool_module
cool_module.greeter() # prints "Hello World"
atdb_mng clean --workflow [workflow_id] --status [tasks_status]
```
## Contributing
......
File moved
import requests
class APIError(Exception):
def __init__(self, reply: requests.Response):
status_code = reply.status_code
reason = reply.reason
message = reply.content
url = reply.url
super().__init__(self, f'Failed contacting {url} [{status_code}]: {reason} - {message} ')
class DRFReply:
def __init__(self, response):
self._content = response.json()
@property
def n_items(self):
return self._content['count']
@property
def results(self):
return self._content['results']
@property
def next_page_url(self):
return self._content['next']
@property
def previous_page_url(self):
return self._content['previous']
class APIConnector:
def __init__(self, url, token):
self._url = url.rstrip('/')
self._token = token
self._session = None
@staticmethod
def from_args(args):
return APIConnector(args.atdb_site, args.token)
def session(self):
if self._session is None:
self._session = self.start_session()
return self._session
def start_session(self):
s = requests.Session()
s.headers['Authorization'] = f'Token {self._token}'
s.headers['content-type'] = "application/json"
s.headers['cache-control'] = "no-cache"
return s
def _request_url(self, method, url, query=None, content=None):
url = url.replace('http://', 'https://')
response = self.session().request(method, url, params=query, json=content)
if not response.ok:
raise APIError(response)
return response
def _request_path(self, method, item, query=None, content=None):
url = '/'.join((self._url, item.lstrip('/'), ''))
return self._request_url(method, url, query=query, content=content)
def list_iter(self, item, query=None):
response = self._request_path('get', item, query=query)
drf_reply = DRFReply(response)
for r in drf_reply.results:
yield r
while drf_reply.next_page_url is not None:
drf_reply = DRFReply(self._request_url('get', drf_reply.next_page_url))
for r in drf_reply.results:
yield r
def change_task_status(self, task_id, status):
self._request_path('PUT', f'tasks/{task_id}', content={'new_status': status})
import logging
from argparse import ArgumentParser
from configparser import ConfigParser
from atdb.prune import prune
import os
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s %(levelname)s : %(message)s')
logger = logging.getLogger('atdb_mngr')
def read_conf_file(args, additional_location=None):
DEFAULT_PATH = os.path.expanduser('~/.config/ldv/services.cfg')
parser = ConfigParser()
if additional_location is not None:
parser.read(additional_location)
else:
parser.read(DEFAULT_PATH)
global_config = parser['ATDB']
if 'url' in global_config:
args.atdb_site = parser['ATDB']['url']
if 'token' in global_config:
args.token = parser['ATDB']['token']
return args
def parse_args():
parser = ArgumentParser(description='ATDB management tool')
parser.add_argument('--atdb_site', help='ATDB url', default='https://sdc-dev.astron.nl:5554/atdb')
parser.add_argument('--token', help='ATDB token (if not provided it reads it from file', )
parser.add_argument('--config', help='Configuration file '
'(tries to load the one at ~/.config/ldv/services.cfg if not specified)',
default=None)
parser.add_argument('--dry_run', help='Test execution', action='store_true')
parser.add_argument('-v', action='store_true')
subparser = parser.add_subparsers(help='commands', dest='operation')
prune_parser = subparser.add_parser('prune')
prune_parser.add_argument('--workflow_id', help='Filters by workflow id')
prune_parser.add_argument('--status', help='Filter by status')
return parser.parse_args(), parser
def main():
args, parser = parse_args()
args = read_conf_file(args, args.config)
if args.v:
logger.setLevel(logging.DEBUG)
if args.operation == 'prune':
prune(args)
else:
parser.print_help()
if __name__ == '__main__':
main()
import logging
import gfal2
from atdb.communication import APIConnector
logger = logging.getLogger('prune')
def extract_surls_from_obj(obj, partial=None):
if partial is None:
partial = []
try:
if isinstance(obj, dict) and 'surl' in obj:
partial.append(obj['surl'])
elif isinstance(obj, dict):
for key, value in obj.items():
extract_surls_from_obj(value, partial=partial)
elif isinstance(obj, list) or isinstance(obj, tuple):
for value in obj:
extract_surls_from_obj(value, partial=partial)
except Exception as e:
logging.exception(e)
print(obj, partial)
raise SystemExit(1)
return partial
def extract_task_surls_from_field(item, field_name):
return extract_surls_from_obj(item[field_name])
def remove_surl_locations(surls, dry_run=False):
context = gfal2.creat_context()
for surl in surls:
if not dry_run:
logging.debug('removing file: %s', context.stat(surl))
context.release(surl)
else:
logging.info('removing file: %s', context.stat(surl))
def prune(args):
connector = APIConnector.from_args(args)
workflow_id = args.workflow_id
status = args.status
query = {}
if workflow_id is not None:
query['workflow__id'] = workflow_id
if status is not None:
query['status'] = status
surls_to_remove = []
logger.info('Toggling status of tasks in ATDB....')
toggled_items = 0
for task in connector.list_iter('tasks', query):
toggled_items += 1
if not args.dry_run:
connector.change_task_status(task['id'], 'defining')
surls_to_remove += extract_task_surls_from_field(task, 'outputs')
logger.info('Successfully toggled %s tasks proceeding in removing files (%s) from disk',
toggled_items, len(surls_to_remove))
remove_surl_locations(surls_to_remove, dry_run=args.dry_run)
logger.info('Successfully removed %s files', len(surls_to_remove))
""" Cool module containing functions, classes and other useful things """
def greeter():
"""Prints a nice message"""
print("Hello World!")
numpy >= 1.20.0 # BSD
requests
gfal2-python
\ No newline at end of file
[metadata]
name = my-awseome-app
name = ldv-datamanagement-tasks
version = file: VERSION
description = An example package for CI/CD working group
description = A set of utils to manage task disk utilization
long_description = file: README.md
long_description_content_type = text/markdown
url = https://git.astron.nl/templates/python-package
url = https://git.astron.nl/ldv/ldv_datamanagement_tasks
license = Apache License 2.0
classifiers =
Development Status :: 3 - Alpha
......@@ -30,8 +30,9 @@ include_package_data = true
packages = find:
python_requires = >=3.7
install_requires =
importlib-metadata>=0.12;python_version<"3.8"
numpy
importlib-metadata>=0.12
requests
gfal2-python
[flake8]
max-line-length = 88
......
"""Testing of the Cool Module"""
from unittest import TestCase
from map.cool_module import greeter
class TestCoolModule(TestCase):
"""Test Case of the Cool Module"""
def test_greeter(self):
"""Testing that the greeter does not crash"""
greeter()
self.assertEqual(2 + 2, 4)
from unittest import TestCase
from atdb.prune import extract_surls_from_obj
class TestPruneUtils(TestCase):
"""Test Case of the prune utility functions"""
def test_surl_filtering(self):
test_data = {
'item': [{'surl': 'onesurl'}, 1, ['ciao', {'surl': 'another_surl'}]],
'item2': {'surl': 'third_surl'},
'item3': 'bla'
}
result = extract_surls_from_obj(test_data)
expected_result = ['onesurl', 'another_surl', 'third_surl']
self.assertListEqual(result, expected_result)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment