Commit f9ae2c7c authored by Corné Lukken's avatar Corné Lukken
Browse files

Merge branch 'L2SS-829' into 'main'

Resolve L2SS-818

Closes L2SS-818

See merge request !3
parents 4596b4e7 9e35ae62
Pipeline #32552 passed with stages
in 3 minutes and 30 seconds
......@@ -10,6 +10,7 @@ coverage.xml
.stestr
# Documentation
docs/source/source_documentation
docs/build
# Virtual environments
......@@ -21,4 +22,4 @@ venv
dist
# Caches
__pycache__
\ No newline at end of file
__pycache__
......@@ -8,12 +8,9 @@ default:
cache:
paths:
- .cache/pip
- .tox
stages:
- lint
# check if this needs to be a separate step
# - build_extensions
- test
- package
- integration
......@@ -75,6 +72,12 @@ coverage:
script:
- echo "run python3.7 unit tests /w coverage"
- tox -e coverage
artifacts:
reports:
cobertura: coverage.xml
paths:
- cover/*
package_files:
stage: package
......@@ -100,3 +103,16 @@ run_integration_tests:
- package_files
script:
- tox -e integration
run_package_publish:
stage: publish
needs:
- package_files
only:
- main
variables:
TWINE_PASSWORD: "${CI_JOB_TOKEN}"
TWINE_USERNAME: "gitlab-ci-token"
script:
- pip install twine
- python -m twine upload --repository-url ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/pypi dist/*
\ No newline at end of file
......@@ -4,4 +4,5 @@ include VERSION
recursive-include docs/source *
recursive-exclude tests *
recursive-exclude integration *
recursive-exclude scripts *
[![Pipeline Status](https://git.astron.nl/lofar2.0/lofar-station-client/badges/main/pipeline.svg)](https://git.astron.nl/lofar2.0/lofar-station-client/-/pipelines)
[![Coverage Status](https://git.astron.nl/lofar2.0/lofar-station-client/badges/main/coverage.svg)](https://git.astron.nl/lofar2.0/lofar-station-client/-/jobs/artifacts/main/download?job=coverage)
[![Python Versions](https://img.shields.io/badge/python-3.7%20%7C%203.8%20%7C%203.9%20%7C%203.10-informational)](https://git.astron.nl/lofar2.0/lofar-station-client)
[![Latest Documentation](https://img.shields.io/badge/docs-download-informational)](https://git.astron.nl/lofar2.0/lofar-station-client/-/jobs/artifacts/main/download?job=package_docs)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![Latest Release](https://git.astron.nl/lofar2.0/lofar-station-client/-/badges/release.svg)](https://git.astron.nl/lofar2.0/lofar-station-client/-/releases)
[//]: # (TODO Corne, Update badges to use dynamic types such as those from pypi once available)
# Station Client Library
Client library for using Tango Station Control.
## Installation
To install this library, use either:
Wheel distributions are available from the [gitlab package registry](https://git.astron.nl/lofar2.0/lofar-station-client/-/packages/),
install using after downloading:
```shell
pip3 install lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client
pip install *.whl
```
Or install directly from this source:
Alternatively install latest version on master using:
```shell
pip install ./
pip3 install lofar-station-client@git+https://git.astron.nl/lofar2.0/lofar-station-client
```
Wheel distributions are also available from the [gitlab artifacts](),
install using:
Or install directly from the source at any branch or commit:
```shell
pip install *.whl
pip install ./
```
## Usage
For more thorough usage explanation please consult the documentation
```python
import lofar_station_client
print(lofar_station_client.__version__)
from lofar_station_client import get_attribute_history
# Metrics from the last hour
get_attribute_history("Pointing_direction_R", "stat/digitalbeam/1")
# Specify range in epoch
get_attribute_history("Pointing_direction_R", "stat/digitalbeam/1", start=1655725200.0, end=1655815200.0)
```
## Development
......@@ -46,12 +60,18 @@ tox -e docs
tox -e build
```
Finally after running `tox -e build` the packaging distribution can be installed
in a virtual environment to test the integration tests
Running integration tests is also possible.
```shell
virtualenv venv
source venv/bin/activate
pip install dist/*.whl
tox -e build
tox -e integration
```
## Debug
Place `import pdb; pdb.set_trace()` on desired breakpoints and execute:
```shell
tox -e debug path.to.test
tox -e debug tests.requests.test_prometheus
```
0.1
\ No newline at end of file
0.2
\ No newline at end of file
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# TODO(Corne): Fix paths in a way that both support tox debug & integration job
import base
class PrometheusTest(base.TestCase):
def test_get_attribute_history_exported(self):
from lofar_station_client import get_attribute_history
self.assertIsNotNone(get_attribute_history)
......@@ -19,4 +19,10 @@ try:
except ImportError: # for Python<3.8
import importlib_metadata as metadata
from lofar_station_client.requests.prometheus import PrometheusRequests
__version__ = metadata.version("lofar-station-client")
get_attribute_history = PrometheusRequests.get_attribute_history
__all__ = ["get_attribute_history"]
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Type parsing and validation functionality"""
from typing import List
from typing import Tuple
from typing import Union
def validate_type(param, allowed_types: List[type]) -> bool:
"""Ensure that the param is of a type in allowed_types
:return: True if of type in allowed_types, False otherwise
"""
def bare_type_unless_none(arg: Union[type, None]) -> type:
"""Convert bare None to its NoneType as isinstance(None, None) returns False"""
return arg if arg is not None else type(arg)
for current_type in allowed_types:
if isinstance(param, bare_type_unless_none(current_type)):
return True
return False
def validate_types(
parameters: List[Tuple[any, List[any], str]], message: Union[str, None] = None
):
"""Validate function parameter types for user facing function.
Pass as a list with [(argument, [type1, type2]. 'name of argument'), ...]
:raises TypeError: For any argument that does not have a type in the specified list
"""
if not message:
message = "Argument {0} of type {1} must be one of {2}"
for validate in parameters:
if not validate_type(validate[0], validate[1]):
raise TypeError(message.format(validate[2], type(validate[0]), validate[1]))
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Contains classes and method to facilitate interacting with prometheus instances"""
# too-few-public-methods, too-many-method-arguments, too-many-local-variables
# pylint: disable=R0903, R0913, R0914
from collections import OrderedDict
from datetime import datetime
from datetime import timedelta
from http import HTTPStatus
from typing import Callable
from typing import List
from typing import Tuple
from typing import Union
import json
import urllib
import requests
from lofar_station_client.parsing.parse import validate_types
class PrometheusRequests:
"""Make requests to prometheus instances"""
datetime_or_epoch = Union[datetime, float]
QUERY_PATH = "/api/v1/query_range?query="
DEFAULT_HOST = "http://prometheus:9090"
DEFAULT_DURATION = 3600 # in seconds
DEFAULT_STEP_SIZE = 60 # in seconds
@staticmethod
def get_attribute_history(
attribute: str,
device: str,
host: str = DEFAULT_HOST,
start: datetime_or_epoch = None,
end: datetime_or_epoch = None,
step: int = DEFAULT_STEP_SIZE,
) -> List[List[List[Tuple[datetime, any]]]]:
"""Retrieve prometheus history for a specific attribute from a specific device
:param attribute: The device attribute to retrieve history about
:param device: The target device
:param host: The prometheus host, can be left blank
:param start: Beginning of range to retrieve history between
:param end: End of range to retrieve history between
:param step: Time between individual values in seconds, can be left blank
:return: multidimensional array of values and timestamp tuples for the
specified device and attribute where timestamps are between
start and end
:raises TypeError: Raised if any arguments do not adhere to the type hints
:raises ValueError: Raised if start time in the future or url invalid
:raises RuntimeError: Raised upon http or prometheus request error
"""
path = PrometheusRequests.QUERY_PATH
# This is an end user input function so validate argument types
_type_validators = [
(attribute, [str], "attribute"),
(device, [str], "device"),
(host, [str], "host"),
(start, [None, datetime, float], "start"),
(end, [None, datetime, float], "end"),
(step, [int], "step"),
]
validate_types(_type_validators)
# Determine duration
start, end = PrometheusRequests._default_duration(start, end)
# Verify start argument is not set in the future
if start and isinstance(start, datetime) and start > datetime.now():
raise ValueError("Start time can not be in the future.")
# Convert datetime objects to strings, shadow previous variables
start = start.strftime("%Y-%m-%dT%H:%M:%SZ")
end = end.strftime("%Y-%m-%dT%H:%M:%SZ")
query = urllib.parse.quote(
f'device_attribute{{name="{attribute}", device="{device.lower()}"}}'
)
# Perform the actual get request
http_result = requests.get(
f"{host}{path}{query}&start={start}&end={end}&step={step}"
)
# Check status code for any errors, only OK 200 accepted
if HTTPStatus(http_result.status_code) is not HTTPStatus.OK:
prometheus_error = ""
if http_result.text:
prometheus_error = json.loads(http_result.text)["error"]
raise RuntimeError(
f"Failed to retrieve attribute history HTTP error: "
f"{HTTPStatus(http_result.status_code).description}, "
f"{prometheus_error}"
)
json_results = json.loads(http_result.text)
# These objects can be quite large so 'free' memory as soon as possibles
http_result = None
# Verify prometheus response indicates success
if json_results["status"] != "success":
raise RuntimeError(
f"Failed to retrieve attribute history Prometheus error: "
f"{json_results['status']}, "
f"{json_results['error']}"
)
# Update json_results to only contain results array, reduce memory
json_results = json_results["data"]["result"]
# In-place conversion of all string types based on their str_value
PrometheusRequests._convert_str_value(json_results)
# Merge split results identified by x and y indices, shallow copies
# unsetting json_results has no effect on memory
result_sets = PrometheusRequests._merge_result_data(json_results)
# Expand the results into [x][y][tuple[datetime, value]] with converted values
return PrometheusRequests._expand_convert_results_3d(result_sets)
@staticmethod
def _default_duration(
start: datetime_or_epoch, end: datetime_or_epoch
) -> Tuple[datetime, datetime]:
"""generates the default duration if start and / or end is set
:return: tuple containing the start and stop datetime objects
"""
# Convert floats to datetime objects
if isinstance(start, float):
start = datetime.fromtimestamp(start)
if isinstance(end, float):
end = datetime.fromtimestamp(end)
# If start is set but no end, adjust end to default duration from start
if start and end is None:
end = start + timedelta(seconds=PrometheusRequests.DEFAULT_DURATION)
# If end is still None start must have been unset, base duration on end
if end is None:
end = datetime.now()
# If start is None base duration on end
if start is None:
start = end - timedelta(seconds=PrometheusRequests.DEFAULT_DURATION)
return start, end
@staticmethod
def _get_type_converter(type_str: str) -> Callable:
"""Convert the type string provided by prometheus to a callable converter
Cannot be used for str_value conversion only operates on values,
"""
if type_str == "bool":
return lambda x: json.loads(x.lower())
if type_str in ("float", "state"):
return float
return lambda x: x
@staticmethod
def _convert_str_value(json_results: dict):
"""In place conversion of string values
:param: json_results Dictionary with prometheus json result array
pass as `json["data"]["result"]` from raw json.
"""
for json_result in json_results:
# Only convert type string
if json_result["metric"]["type"] != "string":
continue
# Get str_value or empty as default
str_value = json_result.get("metric").get("str_value")
if not str_value:
str_value = ""
for value in json_result["values"]:
value[1] = str_value
@staticmethod
def _merge_result_data(json_results: dict) -> OrderedDict:
"""Merge all result entries for matching x,y,attribute,station combinations
:param json_results: Dictionary with prometheus json result array
pass as `json["data"]["result"]` from raw json.
"""
def ord_index(metric: dict) -> str:
"""Generate the ordered dict index from json metric"""
return f"{metric['x']}.{metric['y']}"
result_dict = OrderedDict()
for json_result in json_results:
dict_key = ord_index(json_result["metric"])
# New coordinate
if result_dict.get(dict_key) is None:
result_dict[dict_key] = json_result
continue
# Existing coordinate, extend
result_dict[dict_key]["values"].extend(json_result["values"])
return result_dict
@staticmethod
def _expand_convert_results_3d(
merged_results: OrderedDict,
) -> List[List[List[Tuple[datetime, any]]]]:
"""Expand the results into [x][y][tuple[datetime, value]] and convert values
:param merged_results: Result from _merge_result_data
"""
def str_to_coords(coordinate: str) -> (int, int):
"""Convert the string coordinate"""
split = coordinate.split(".", 1)
return int(split[0]), int(split[1])
def convert_row(row: List[any], converter: Callable):
return datetime.fromtimestamp(row[0]), converter(row[1])
results = [[[]]]
for key, values in merged_results.items():
x_index, y_index = str_to_coords(key)
# Ensure outermost array sufficiently sized
while len(results) <= x_index:
results.append([[]])
# Ensure internal array sufficiently sized
while len(results[x_index]) <= y_index:
results[x_index].append([])
# Per result determine the callable type converter
type_converter = PrometheusRequests._get_type_converter(
values["metric"]["type"]
)
result = [convert_row(row, type_converter) for row in values["values"]]
results[x_index][y_index].extend(result)
return results
# TODO(Corne): Uncomment below when L2SS-832 is fixed
# tangostationcontrol@git+https://git.astron.nl/lofar2.0/tango.git#egg=tangostationcontrol&subdirectory=tangostationcontrol
\ No newline at end of file
# tangostationcontrol@git+https://git.astron.nl/lofar2.0/tango.git#egg=tangostationcontrol&subdirectory=tangostationcontrol
requests>=2.0 # Apache 2
......@@ -29,7 +29,7 @@ classifiers =
[options]
include_package_data = true
packages = lofar_station_client
packages = find:
python_requires = >=3.7
install_requires =
importlib-metadata>=0.12;python_version<"3.8"
......
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from lofar_station_client.parsing import parse
from tests import base
class ParseTest(base.TestCase):
def test_validate_type(self):
self.assertTrue(parse.validate_type("", [str]))
def test_validate_type_invalid(self):
self.assertFalse(parse.validate_type("", [int, None]))
def test_validate_types(self):
_validate_valid = [
("", [str], "test"),
(1234, [int, None], "test"),
(None, [None], "empty"),
]
parse.validate_types(_validate_valid)
# -*- coding: utf-8 -*-
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import math
from datetime import datetime
from datetime import timedelta
import json
from unittest import mock
from lofar_station_client.requests import prometheus
from lofar_station_client.requests.prometheus import PrometheusRequests
from tests import base
class PrometheusTest(base.TestCase):
# Multiple results for identical (x, y, attribute, station) combinations
DUMMY_RESULT_DATA_DUPLICATE = """
{
"status":"success",
"data":{
"resultType":"matrix",
"result":[{
"metric":{
"__name__":"device_attribute",
"device":"stat/recv/1",
"host":"localhost","idx":"000",
"instance":"tango-prometheus-exporter:8000",
"job":"tango",
"name":"RCU_TEMP_R",
"station":"DevStation",
"type":"float",
"x":"00",