Skip to content
Snippets Groups Projects
Commit c69e3d15 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Improve code quality

parent f2d0d781
Branches
Tags
1 merge request!11Fix output sizes
Pipeline #40726 passed with warnings
...@@ -127,8 +127,9 @@ class APIConnector: ...@@ -127,8 +127,9 @@ class APIConnector:
""" """
Change the whole task content Change the whole task content
""" """
return self._request_path("PUT", f"tasks/{task_id}", return self._request_path(
content={"size_processed": processed_size}) "PUT", f"tasks/{task_id}", content={"size_processed": processed_size}
)
def change_task_status(self, task_id, status) -> None: def change_task_status(self, task_id, status) -> None:
""" """
......
import atdb.communication as com """
Fix command module
"""
import logging import logging
import atdb.communication as com
logger = logging.getLogger("fix")
logger = logging.getLogger('fix')
def aggregate_on_tree(tree, field): def aggregate_on_tree(tree, field):
"""
Aggregated values with a given field name from a dict tree
"""
if isinstance(tree, dict) and field in tree: if isinstance(tree, dict) and field in tree:
return tree[field] return tree[field]
elif isinstance(tree, dict): if isinstance(tree, dict):
total = 0 total = 0
for value in tree.values(): for value in tree.values():
total += aggregate_on_tree(value, field) total += aggregate_on_tree(value, field)
return total return total
elif isinstance(tree, list): if isinstance(tree, list):
total = 0 total = 0
for item in tree: for item in tree:
total += aggregate_on_tree(item, field) total += aggregate_on_tree(item, field)
return total return total
else:
return 0 return 0
def compute_output_sizes(outputs): def compute_output_sizes(outputs):
"""
Computes the size of the output files
"""
if outputs is not None: if outputs is not None:
return aggregate_on_tree({key: value for key, value in outputs.items() if key != 'ingest'}, 'size') return aggregate_on_tree(
else: {key: value for key, value in outputs.items() if key != "ingest"}, "size"
)
return 0 return 0
def fix_computed_sizes(connector, dry_run=True): def fix_computed_sizes(connector, dry_run=True):
for task in connector.list_iter('tasks'): """
task_id = task['id'] Fix the size of the computed task
size_before = task['size_processed'] """
total_output_size = compute_output_sizes(task['outputs']) for task in connector.list_iter("tasks"):
task['size_processed'] = total_output_size task_id = task["id"]
size_before = task["size_processed"]
total_output_size = compute_output_sizes(task["outputs"])
task["size_processed"] = total_output_size
if not dry_run: if not dry_run:
if size_before != total_output_size: if size_before != total_output_size:
connector.update_task_processed_size(task_id, total_output_size) connector.update_task_processed_size(task_id, total_output_size)
else: else:
if size_before != total_output_size: if size_before != total_output_size:
logger.info('Dry run: Size updated for %s from %s to %s', task_id, logger.info(
size_before, total_output_size) "Dry run: Size updated for %s from %s to %s",
task_id,
size_before,
total_output_size,
)
def fix(args): def fix(args):
"""
Fix command
Changes task fields to be consistent with each others
"""
connector = com.APIConnector.from_args(args) connector = com.APIConnector.from_args(args)
fix_computed_sizes(connector, dry_run=args.dry_run) fix_computed_sizes(connector, dry_run=args.dry_run)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment