Skip to content
Snippets Groups Projects
Commit 0b0f6a17 authored by Mattia Mancini's avatar Mattia Mancini
Browse files

Merge branch 'add_atdb_inspect_transition' into 'master'

Add atdb inspect transition

See merge request !4
parents f2fdf7b1 87b7b523
Branches
Tags
1 merge request!4Add atdb inspect transition
...@@ -50,3 +50,19 @@ atdb_result_gatherer [sas_id] --print_first ...@@ -50,3 +50,19 @@ atdb_result_gatherer [sas_id] --print_first
# If you want to specify a different atdb url you can do it with # If you want to specify a different atdb url you can do it with
atdb_result_gatherer --atdb_url https://youratdbserver/atdb/ [sas_id] [paths] atdb_result_gatherer --atdb_url https://youratdbserver/atdb/ [sas_id] [paths]
``` ```
## ATDB Inspect Transition
Inspect the time elapsed between the transition of two states given a sas id.
### Installation
```bash
# Using pip install (requires valid ssh key, until the repo is set to public)
pip install pip install -e "git+https://git.astron.nl/ldv/ldv_utils.git#egg=atdb-inspect-transition&subdirectory=atdb_inspect-transition"
```
### Running
```bash
atdb_inspect_transition [sas_id] [from status] [to status]
# More info and flags
atdb_csv_gen -h
```
from .inspect_transition import main
\ No newline at end of file
import requests
from argparse import ArgumentParser
from typing import Dict, List
import json
from datetime import datetime
import re
def parse_args():
parser = ArgumentParser(description='Gather the results of tasks with a give SAS ID')
parser.add_argument('--atdb_url', help='url to the atdb website', default='https://sdc.astron.nl:5554/atdb')
parser.add_argument('sas_id', help='sas id of the observation')
parser.add_argument('--print_first', help='print first result', action='store_true')
parser.add_argument('from_state', help='from state')
parser.add_argument('to_state', help='to state')
return parser.parse_args()
class MissingFieldException(Exception):
def __init__(self, key):
super(MissingFieldException, self).__init__(f'missing field {key}')
self.key = key
def path_to_nested_dict(content: dict, path: str):
leaf = content
uri = path.split('.')
for item in uri[:-1]:
try:
leaf = leaf[item]
except KeyError:
raise MissingFieldException(item)
if isinstance(leaf, list):
return list(map(lambda x: path_to_nested_dict(x, uri[-1]), leaf))
return leaf[uri[-1]]
def _get_paginated(url, params):
partial_result = []
while True:
http_response = requests.get(url.replace('http:', 'https:'), params=params)
if not http_response.ok:
raise Exception(http_response.reason)
data_block = http_response.json()
partial_result += data_block['results']
next_page_url = data_block['next']
if next_page_url is None:
return partial_result
else:
url = next_page_url
def get_tasks(url: str, sas_id: Dict):
tasks = _get_paginated(f'{url}/tasks', params=dict(sas_id=sas_id))
return tasks
def find_latest_status(item, status):
status_history: List[str] = path_to_nested_dict(item, 'status_history')
status_history.reverse()
for last_status in status_history:
if status in last_status:
return last_status
def parse_datetime_from_status(status):
date_time_str, = re.findall('\w*\s\((.*)Z\)', status)
return datetime.strptime(date_time_str, '%Y-%m-%dT%H:%M:%S')
def last_status_time(item, status):
return parse_datetime_from_status(find_latest_status(item, status))
def extract_result_from_tasks(tasks, from_state, to_state):
result = map(
lambda item: (path_to_nested_dict(item, 'id'),
(last_status_time(item, to_state) - last_status_time(item, from_state)).seconds),
tasks
)
return result
def main():
args = parse_args()
tasks = get_tasks(args.atdb_url, args.sas_id)
if args.print_first:
print(json.dumps(tasks[0], indent=4))
else:
results = extract_result_from_tasks(tasks, args.from_state, args.to_state)
print('task_id', 'elapsed_time(s)')
for result in results:
print(*result)
#!/usr/bin/env python3
import atdb_inspect_transition
atdb_inspect_transition.main()
\ No newline at end of file
requests
\ No newline at end of file
# SPDX-License-Identifier: Apache-2.0
from setuptools import setup
def get_requirements() -> str:
with open("requirements.txt") as reqs:
return reqs.read().split("\n")
setup(
name="atdb-inspect-transition",
version="1.0.0",
description="Tool to gather ATDB transition time",
author="ASTRON",
packages=["atdb_inspect_transition"],
install_requires=get_requirements(),
scripts=["bin/atdb_inspect_transition"],
license="Apache License, Version 2.0",
zip=False,
python_requires=">=3.7"
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment