Skip to content
Snippets Groups Projects
Commit f2d0d781 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Add various fixes

parent a277998e
No related branches found
No related tags found
1 merge request!11Fix output sizes
Pipeline #40633 failed
...@@ -123,11 +123,12 @@ class APIConnector: ...@@ -123,11 +123,12 @@ class APIConnector:
for item in drf_reply.results: for item in drf_reply.results:
yield item yield item
def update_task(self, task_id, content): def update_task_processed_size(self, task_id, processed_size):
""" """
Change the whole task content Change the whole task content
""" """
self._request_path("PUT", f"tasks/{task_id}", content=content) return self._request_path("PUT", f"tasks/{task_id}",
content={"size_processed": processed_size})
def change_task_status(self, task_id, status) -> None: def change_task_status(self, task_id, status) -> None:
""" """
......
import atdb.communication as com import atdb.communication as com
import tqdm import logging
logger = logging.getLogger('fix')
def aggregate_on_tree(tree, field): def aggregate_on_tree(tree, field):
if isinstance(tree, dict) and field in tree: if isinstance(tree, dict) and field in tree:
return tree[field] return tree[field]
...@@ -19,17 +20,25 @@ def aggregate_on_tree(tree, field): ...@@ -19,17 +20,25 @@ def aggregate_on_tree(tree, field):
def compute_output_sizes(outputs): def compute_output_sizes(outputs):
if outputs is not None: if outputs is not None:
return aggregate_on_tree({key: value for key, value in outputs.items if key != 'ingest'}, 'size') return aggregate_on_tree({key: value for key, value in outputs.items() if key != 'ingest'}, 'size')
else: else:
return 0 return 0
def fix_computed_sizes(connector): def fix_computed_sizes(connector, dry_run=True):
for task in tqdm.tqdm(connector.list_iter('tasks')): for task in connector.list_iter('tasks'):
task_id = task['id'] task_id = task['id']
size_before = task['size_processed']
total_output_size = compute_output_sizes(task['outputs']) total_output_size = compute_output_sizes(task['outputs'])
task['size_processed'] = total_output_size task['size_processed'] = total_output_size
connector.update_task(task_id, task) if not dry_run:
if size_before != total_output_size:
connector.update_task_processed_size(task_id, total_output_size)
else:
if size_before != total_output_size:
logger.info('Dry run: Size updated for %s from %s to %s', task_id,
size_before, total_output_size)
def fix(args): def fix(args):
connector = com.APIConnector.from_args(args) connector = com.APIConnector.from_args(args)
fix_computed_sizes(connector) fix_computed_sizes(connector, dry_run=args.dry_run)
\ No newline at end of file \ No newline at end of file
...@@ -8,6 +8,7 @@ from argparse import ArgumentParser, Namespace ...@@ -8,6 +8,7 @@ from argparse import ArgumentParser, Namespace
from configparser import ConfigParser from configparser import ConfigParser
from atdb.prune import prune from atdb.prune import prune
from atdb.fix import fix
DEFAULT_PATH = os.path.expanduser("~/.config/ldv/services.cfg") DEFAULT_PATH = os.path.expanduser("~/.config/ldv/services.cfg")
logging.basicConfig( logging.basicConfig(
...@@ -78,6 +79,8 @@ def main(): ...@@ -78,6 +79,8 @@ def main():
if args.operation == "prune": if args.operation == "prune":
prune(args) prune(args)
elif args.operation == "fix":
fix(args)
else: else:
parser.print_help() parser.print_help()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment