Skip to content
Snippets Groups Projects
Commit 83337646 authored by Hannes Feldt's avatar Hannes Feldt
Browse files

Migrate code from station client

parent 2ceac589
No related branches found
No related tags found
1 merge request!1Migrate code from station client
Pipeline #109590 passed
Pipeline: lotus

#109591

    Showing
    with 1337 additions and 97 deletions
    ...@@ -10,8 +10,6 @@ default: ...@@ -10,8 +10,6 @@ default:
    stages: stages:
    - prepare - prepare
    - lint - lint
    # check if this needs to be a separate step
    # - build_extensions
    - test - test
    - package - package
    - images - images
    ...@@ -34,29 +32,12 @@ trigger_prepare: ...@@ -34,29 +32,12 @@ trigger_prepare:
    strategy: depend strategy: depend
    include: .prepare.gitlab-ci.yml include: .prepare.gitlab-ci.yml
    run_black: run_lint:
    stage: lint stage: lint
    script: script:
    - tox -e black - tox -e lint
    allow_failure: true allow_failure: true
    run_flake8:
    stage: lint
    script:
    - tox -e pep8
    allow_failure: true
    run_pylint:
    stage: lint
    script:
    - tox -e pylint
    allow_failure: true
    # build_extensions:
    # stage: build_extensions
    # script:
    # - echo "build fortran/c/cpp extension source code"
    sast: sast:
    variables: variables:
    SAST_EXCLUDED_ANALYZERS: brakeman, flawfinder, kubesec, nodejs-scan, phpcs-security-audit, SAST_EXCLUDED_ANALYZERS: brakeman, flawfinder, kubesec, nodejs-scan, phpcs-security-audit,
    ...@@ -89,7 +70,7 @@ run_unit_tests: ...@@ -89,7 +70,7 @@ run_unit_tests:
    - tox -e py3${PY_VERSION} - tox -e py3${PY_VERSION}
    parallel: parallel:
    matrix: # use the matrix for testing matrix: # use the matrix for testing
    - PY_VERSION: [9, 10, 11, 12, 13] - PY_VERSION: [10, 11, 12, 13]
    # Run code coverage on the base image thus also performing unit tests # Run code coverage on the base image thus also performing unit tests
    run_unit_tests_coverage: run_unit_tests_coverage:
    ...@@ -123,31 +104,6 @@ package_docs: ...@@ -123,31 +104,6 @@ package_docs:
    script: script:
    - tox -e docs - tox -e docs
    docker_build:
    stage: images
    image: docker:latest
    needs:
    - package_files
    tags:
    - dind
    before_script: []
    script:
    - docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
    - docker build -f docker/lofar_lotus/Dockerfile . --build-arg BUILD_ENV=copy --tag $CI_REGISTRY_IMAGE/lofar_lotus:$CI_COMMIT_REF_SLUG
    # enable this push line once you have configured docker registry cleanup policy
    # - docker push $CI_REGISTRY_IMAGE/lofar_lotus:$CI_COMMIT_REF_SLUG
    run_integration_tests:
    stage: integration
    allow_failure: true
    needs:
    - package_files
    script:
    - echo "make sure to move out of source dir"
    - echo "install package from filesystem (or use the artefact)"
    - echo "run against foreign systems (e.g. databases, cwl etc.)"
    - exit 1
    publish_on_gitlab: publish_on_gitlab:
    stage: publish stage: publish
    environment: gitlab environment: gitlab
    ...@@ -212,14 +168,3 @@ publish_to_readthedocs: ...@@ -212,14 +168,3 @@ publish_to_readthedocs:
    script: script:
    - echo "scp docs/* ???" - echo "scp docs/* ???"
    - exit 1 - exit 1
    release_job:
    stage: publish
    image: registry.gitlab.com/gitlab-org/release-cli:latest
    rules:
    - if: '$CI_COMMIT_TAG && $CI_COMMIT_REF_PROTECTED == "true"'
    script:
    - echo "running release_job"
    release:
    tag_name: '$CI_COMMIT_TAG'
    description: '$CI_COMMIT_TAG - $CI_COMMIT_TAG_MESSAGE'
    default_stages: [ commit, push ] default_stages: [ pre-commit, pre-push ]
    default_language_version: default_language_version:
    python: python3 python: python3
    exclude: '^docs/.*\.py$' exclude: '^docs/.*\.py$'
    ...@@ -14,25 +14,9 @@ repos: ...@@ -14,25 +14,9 @@ repos:
    - id: detect-private-key - id: detect-private-key
    - repo: local - repo: local
    hooks: hooks:
    - id: tox-black - id: tox-lint
    name: tox-black (local) name: tox-lint (local)
    entry: tox entry: tox
    language: python language: python
    types: [file, python] types: [file, python]
    args: ["-e", "black", "--"] args: ["-e", "lint", "--"]
    - repo: local
    hooks:
    - id: tox-pep8
    name: tox-pep8 (local)
    entry: tox
    language: python
    types: [file, python]
    args: ["-e", "pep8", "--"]
    - repo: local
    hooks:
    - id: tox-pylint
    name: tox-pylint (local)
    entry: tox
    language: python
    types: [file, python]
    args: ["-e", "pylint", "--"]
    ...@@ -4,36 +4,38 @@ ...@@ -4,36 +4,38 @@
    ![Test coverage](git.astron.nl/lofar2.0/lotus/badges/main/coverage.svg) ![Test coverage](git.astron.nl/lofar2.0/lotus/badges/main/coverage.svg)
    <!-- ![Latest release](https://git.astron.nl/templates/python-package/badges/main/release.svg) --> <!-- ![Latest release](https://git.astron.nl/templates/python-package/badges/main/release.svg) -->
    An example repository of an CI/CD pipeline for building, testing and publishing a python package. Common library containing various stuff for LOFAR2.
    ## Installation ## Installation
    ```
    pip install . Wheel distributions are available from the [gitlab package registry](https://git.astron.nl/lofar2.0/lotus/-/packages/),
    install using after downloading:
    ```shell
    python -m pip install *.whl
    ``` ```
    ## Setup Alternatively install latest version on master using:
    One time template setup should include configuring the docker registry to regularly cleanup old images of ```shell
    the CI/CD pipelines. And you can consider creating protected version tags for software releases: python -m pip install lofar-lotus@git+https://git.astron.nl/lofar2.0/lotus
    ```
    1. [Cleanup Docker Registry Images](https://git.astron.nl/groups/templates/-/wikis/Cleanup-Docker-Registry-Images) Or install directly from the source at any branch or commit:
    2. [Setup Protected Verson Tags](https://git.astron.nl/groups/templates/-/wikis/Setting-up-Protected-Version-Tags)
    Once the cleanup policy for docker registry is setup you can uncomment the `docker push` comment in the `.gitlab-ci.yml` ```shell
    file from the `docker_build` job. This will allow to download minimal docker images with your Python package installed. python -m pip install ./
    ```
    ## Usage ## Usage
    ```python
    from lofar_lotus import cool_module
    cool_module.greeter() # prints "Hello World" For more thorough usage explanation please consult the documentation
    ```
    ## Development ## Development
    ### Development environment ### Development environment
    To setup and activte the develop environment run ```source ./setup.sh``` from within the source directory. To set up and activate the develop environment run ```source ./setup.sh``` from within the source directory.
    If PyCharm is used, this only needs to be done once. If PyCharm is used, this only needs to be done once.
    Afterward the Python virtual env can be setup within PyCharm. Afterward the Python virtual env can be setup within PyCharm.
    ...@@ -46,9 +48,9 @@ should be assigned. ...@@ -46,9 +48,9 @@ should be assigned.
    Verify your changes locally and be sure to add tests. Verifying local Verify your changes locally and be sure to add tests. Verifying local
    changes is done through `tox`. changes is done through `tox`.
    ```pip install tox``` ```python -m pip install tox```
    With tox the same jobs as run on the CI/CD pipeline can be ran. These With tox the same jobs as run on the CI/CD pipeline can be executed. These
    include unit tests and linting. include unit tests and linting.
    ```tox``` ```tox```
    ......
    # Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """Common classes used in station"""
    from ._case_insensitive_dict import CaseInsensitiveDict, ReversibleKeysView
    from ._case_insensitive_string import CaseInsensitiveString
    __all__ = ["CaseInsensitiveDict", "CaseInsensitiveString", "ReversibleKeysView"]
    # Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """Provides a special dictionary with case-insensitive keys"""
    import abc
    from collections import UserDict
    from typing import List
    from typing import Tuple
    from typing import Union
    from ._case_insensitive_string import CaseInsensitiveString
    def _case_insensitive_comprehend_keys(data: dict) -> List[CaseInsensitiveString]:
    return [CaseInsensitiveString(key) for key in data]
    def _case_insensitive_comprehend_items(
    data: dict,
    ) -> List[Tuple[CaseInsensitiveString, any]]:
    return [(CaseInsensitiveString(key), value) for key, value in data.items()]
    class ReversibleIterator:
    """Reversible iterator using instance of self method
    See real-python for yield iterator method:
    https://realpython.com/python-reverse-list/#the-special-method-__reversed__
    """
    def __init__(self, data: List, start: int, stop: int, step: int):
    self.data = data
    self.current = start
    self.stop = stop
    self.step = step
    def __iter__(self):
    return self
    def __next__(self):
    if self.current == self.stop:
    raise StopIteration
    elem = self.data[self.current]
    self.current += self.step
    return elem
    def __reversed__(self):
    return ReversibleIterator(self.data, self.stop, self.current, -1)
    class AbstractReversibleView(abc.ABC):
    """An abstract reversible view"""
    def __init__(self, data: UserDict):
    self.data = data
    self.len = len(data)
    def __repr__(self):
    return f"{self.__class__.__name__}({self.data})"
    @abc.abstractmethod
    def __iter__(self):
    pass
    @abc.abstractmethod
    def __reversed__(self):
    pass
    class ReversibleItemsView(AbstractReversibleView):
    """Reversible view on items"""
    def __iter__(self):
    return ReversibleIterator(
    _case_insensitive_comprehend_items(self.data.data), 0, self.len, 1
    )
    def __reversed__(self):
    return ReversibleIterator(
    _case_insensitive_comprehend_items(self.data.data), self.len - 1, -1, -1
    )
    class ReversibleKeysView(AbstractReversibleView):
    """Reversible view on keys"""
    def __iter__(self):
    return ReversibleIterator(
    _case_insensitive_comprehend_keys(self.data.data), 0, self.len, 1
    )
    def __reversed__(self):
    return ReversibleIterator(
    _case_insensitive_comprehend_keys(self.data.data), self.len - 1, -1, -1
    )
    class ReversibleValuesView(AbstractReversibleView):
    """Reversible view on values"""
    def __iter__(self):
    return ReversibleIterator(list(self.data.data.values()), 0, self.len, 1)
    def __reversed__(self):
    return ReversibleIterator(list(self.data.data.values()), self.len - 1, -1, -1)
    class CaseInsensitiveDict(UserDict):
    """Special dictionary that ignores key casing if string
    While UserDict is the least performant / flexible it ensures __set_item__ and
    __get_item__ are used in all code paths reducing LoC severely.
    Background reference:
    https://realpython.com/inherit-python-dict/#creating-dictionary-like-classes-in-python
    Alternative (should this stop working at some point):
    https://github.com/DeveloperRSquared/case-insensitive-dict/blob/main/case_insensitive_dict/case_insensitive_dict.py
    """
    def __setitem__(self, key, value):
    if isinstance(key, str):
    key = CaseInsensitiveString(key)
    super().__setitem__(key, value)
    def __getitem__(self, key: Union[int, str]):
    if isinstance(key, str):
    key = CaseInsensitiveString(key)
    return super().__getitem__(key)
    def __iter__(self):
    return ReversibleIterator(
    _case_insensitive_comprehend_keys(self.data), 0, len(self.data), 1
    )
    def __contains__(self, key):
    if isinstance(key, str):
    key = CaseInsensitiveString(key)
    return super().__contains__(key)
    def keys(self) -> ReversibleKeysView:
    return ReversibleKeysView(self)
    def values(self) -> ReversibleValuesView:
    return ReversibleValuesView(self)
    def items(self) -> ReversibleItemsView:
    return ReversibleItemsView(self)
    # Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """Special string that ignores casing in comparison"""
    class CaseInsensitiveString(str):
    """Special string that ignores casing in comparison"""
    def __eq__(self, other):
    if isinstance(other, str):
    return self.casefold() == other.casefold()
    return self.casefold() == other
    def __hash__(self):
    return hash(self.__str__())
    def __contains__(self, key):
    if isinstance(key, str):
    return key.casefold() in str(self)
    return key in str(self)
    def __str__(self) -> str:
    return self.casefold().__str__()
    def __repr__(self) -> str:
    return self.casefold().__repr__()
    # HDF file reader
    ## Define a model
    The data structure of the HDF file is defined by python objects using decorators. Currently, there are two decorators
    available:
    1. `member`: defines a class property to be an HDF group or dataset depending on the type.
    2. `attribute`: defines a class property to be an HDF attribute on a group or dataset.
    ### Dataset definition
    A basic data structure to define the HDF file looks like this:
    ```python
    class Data:
    list_of_ints: List[int] = member()
    list_of_floats: List[float] = member()
    numpy_array: ndarray = member()
    ```
    It is important to always use type hints. It not only makes the classes more self-explanatory during development it is
    also
    important for the file reader to guesstimate the right action to perform.
    In this first example we only used arrays and lists. These types always map to a dataset within HDF. By default,
    the reader is looking for a dataset with the name of the variable, if the dataset is named differently it can be
    overwritten
    by specifying the `name` parameter: `member(name='other_name_then_variable')`. Also, all members are required by
    default.
    If they don't appear in the HDF file an error is thrown. This behavior can be changed by specifying the `optional`
    parameter:
    `member(optional=True)`.
    ### Group definition
    HDF supports to arrange the data in groups. Groups can be defined as additional classes:
    ```python
    class SubGroup:
    list_of_ints: List[int] = member()
    class Data:
    sub_group: SubGroup = member()
    ```
    Additionally, all additional settings apply in the same way as they do for datasets.
    ### Dictionaries
    A special case is the `dict`. It allows to read a set of groups or datasets using the name of the group or dataset as
    the key.
    ```python
    class Data:
    data_dict: Dict[str, List[int]] = member()
    ```
    ### Attribute definition
    Attributes in a HDF file can appear on groups as well as on datasets and can be defined by using `attribute()`:
    ```python
    class Data:
    an_attr: str = attribute()
    ```
    The file reader will look for an attribute with the name `an_attr` on the group that is represented by the class `Data`.
    The name of the attribute can be overwritten by specifying the `name` parameter: `attribute(name='other_name')`. All
    attributes
    are required by default and will cause an exception to be thrown if they are not available. This behavior can be changed
    by specifying the `optional` parameter:
    `attribute(optional=True)`.
    In HDF also datasets can contain attributes. Since they are usually mapped to primitive types it would not be possible
    to access
    these attributes. Therefor `attribute` allows to specify another member in the class by setting `from_member`.
    ## Read a HDF file
    A file can be read using `read_hdf5`:
    ```python
    with read_hdf5('file_name.h5', Data) as data:
    a = data.an_attr
    ```
    ## Create a HDF file
    A file can be created using `create_hdf5` - existing files will be overwritten:
    ```python
    with create_hdf5('file_name.h5', Data) as data:
    data.an_attr = "data"
    ```
    NB:
    1. Writes are cached until `flush()` is called or the file is closed.
    2. Reading back attributes will read them from disk.
    ## Change a HDF file
    A file can be changed using `open_hdf5` - the file must exist:
    ```python
    with open_hdf5('file_name.h5', Data) as data:
    data.an_attr = "new value"
    ```
    ## Data write behaviour
    ### members
    All changes to members of the object are immediately written to the underlying HDF file. Therefore, altering the object
    should be minimized to have no performance degradation.
    ### attributes
    Attributes are written if `flush()` is invoked on the `FileWriter` or when the `with` scope is exited. This behaviour is
    necessary because attributes depend on the underlying members. Therefore, the attributes can only be written after
    the members.
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains classes to interact with (hdf5) files
    """
    from ._attribute_def import attribute
    from ._member_def import member
    from ._readers import FileReader
    from .hdf._hdf_readers import read_hdf5
    from .hdf._hdf_writers import open_hdf5, create_hdf5
    from ._writers import FileWriter
    __all__ = [
    "FileReader",
    "FileWriter",
    "attribute",
    "member",
    "read_hdf5",
    "open_hdf5",
    "create_hdf5",
    ]
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains HDF5 specific classes and methods to define class members as an HDF attribute
    """
    from typing import Any, Type
    from ._readers import DataReader
    from ._utils import _extract_type
    from ._writers import DataWriter
    def attribute(name: str = None, optional: bool = False, from_member: str = None):
    """
    Define a class member as an attribute within a HDF5 file
    """
    return AttributeDef(name, optional, from_member)
    # pylint: disable=too-few-public-methods
    class AttributeDef:
    """
    Decorator to extract attributes of HDF5 groups and datasets to pythonic objects
    """
    def __init__(self, name: str, optional: bool, from_member: str = None):
    self.name = name
    self.property_name: str
    self.from_member = from_member
    self.optional = optional
    self.owner: Any
    self.type: Type
    def __set_name__(self, owner, name):
    if self.name is None:
    self.name = name
    self.property_name = name
    self.owner = owner
    self.type = _extract_type(owner, name)
    def __set__(self, instance, value):
    setattr(instance, self.attr_name, value)
    if hasattr(instance, "_data_writer"):
    writer: DataWriter = getattr(instance, "_data_writer")
    writer.write_attribute(
    instance, self.name, self.owner, self.from_member, self.optional, value
    )
    def __get__(self, instance, obj_type=None):
    if instance is None:
    # attribute is accessed as a class attribute
    return self
    if hasattr(instance, self.attr_name):
    return getattr(instance, self.attr_name)
    if hasattr(instance, "_data_reader"):
    reader: DataReader = getattr(instance, "_data_reader")
    attr = reader.read_attribute(
    self.name, self.owner, self.from_member, self.optional
    )
    setattr(instance, self.attr_name, attr)
    return attr
    return None
    @property
    def attr_name(self):
    """
    Name used to store the value in the owning object
    """
    if self.from_member is None:
    return f"_a_{self.name}"
    return f"_a_{self.from_member}_{self.name}"
    # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Provides a dictionary that dynamically resolves its values to reduce memory usage
    """
    from abc import abstractmethod
    from typing import TypeVar, Dict, Type
    K = TypeVar("K")
    V = TypeVar("V")
    class LazyDict:
    """
    Lazy evaluated dictionary
    """
    @abstractmethod
    def setup_write(self, writer):
    """
    Set up the lazy dict to support write actions
    """
    @classmethod
    def __subclasshook__(cls, subclass):
    return (
    hasattr(subclass, "setup_write")
    and callable(subclass.setup_write)
    or NotImplemented
    )
    def lazy_dict(base_dict: Type[Dict[K, V]], reader):
    """
    Dynamically derive lazy dict of given type
    """
    class LazyDictImpl(base_dict, LazyDict):
    """
    Implementation of the lazy dict dynamically derived from base dict
    """
    def __init__(self, reader, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self._reader = reader
    self._writer = None
    def __setitem__(self, item, value):
    if callable(value):
    super().__setitem__(item, value)
    return
    # write value somewhere
    if self._writer is not None:
    self._writer(item, value)
    super().__setitem__(item, lambda: self._reader(item))
    def __getitem__(self, item):
    return super().__getitem__(item)()
    def items(self):
    """D.items() -> a set-like object providing a view on D's items"""
    for key, value in super().items():
    yield key, value()
    def setup_write(self, writer):
    self._writer = writer
    return LazyDictImpl(reader)
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains HDF5 specific classes and methods to define class members as members
    of HDF5 files
    """
    from typing import Type
    from ._readers import DataReader
    from ._utils import _extract_type
    from ._writers import DataWriter
    def member(name: str = None, optional: bool = False, compression: str = None):
    """
    Define a class member as a member of a HDF5 file
    """
    return MemberDef(name, optional, compression)
    # pylint: disable=too-few-public-methods
    class MemberDef:
    """
    Decorator to handle the transformation of HDF5 groups
    and datasets to pythonic objects
    """
    def __init__(self, name: str, optional: bool, compression: str):
    self.name = name
    self.property_name: str
    self.optional = optional
    self.compression = compression
    self.type: Type
    def __set_name__(self, owner, name):
    if self.name is None:
    self.name = name
    self.property_name = name
    self.type = _extract_type(owner, name)
    def __get__(self, instance, obj_type=None):
    if instance is None:
    # attribute is accessed as a class attribute
    return self
    if hasattr(instance, "_data_reader"):
    reader: DataReader = getattr(instance, "_data_reader")
    return reader.read_member(instance, self.name, self.type, self.optional)
    if hasattr(instance, self.attr_name):
    return getattr(instance, self.attr_name)
    return None
    def __set__(self, instance, value):
    if not hasattr(instance, "_data_writer"):
    setattr(instance, self.attr_name, value)
    return
    writer: DataWriter = getattr(instance, "_data_writer")
    writer.write_member(self.name, self.type, value)
    if hasattr(instance, self.attr_name):
    delattr(instance, self.attr_name)
    @property
    def attr_name(self):
    """
    Name used to store the value in the owning object
    """
    return f"_v_{self.name}"
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Class wrappers for lists and dictionaries monitoring changes of itself and notifying
    the registered event handler about these changes.
    """
    from typing import Any
    class MonitoredWrapper:
    """
    A wrapper monitoring changes of itself and notifying the registered event handler
    about changes.
    """
    def __init__(self, event, instance):
    self._event = event
    self._instance = instance
    def __setitem__(self, key, value):
    self._instance.__setitem__(key, value)
    self._event(self._instance)
    def __getitem__(self, item):
    return self._instance.__getitem__(item)
    def __setattr__(self, name: str, value: Any) -> None:
    if name in ["_instance", "_event"]:
    object.__setattr__(self, name, value)
    else:
    self._instance.__setattr__(name, value)
    self._event(self._instance)
    def __getattribute__(self, name):
    if name in ["_instance", "_event"]:
    return object.__getattribute__(self, name)
    attr = object.__getattribute__(self._instance, name)
    if hasattr(attr, "__call__"):
    def wrapper(*args, **kwargs):
    result = attr(*args, **kwargs)
    self._event(self._instance)
    return result
    return wrapper
    return attr
    # Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains classes to handle reading
    """
    from abc import ABC, abstractmethod
    from typing import TypeVar, Generic
    T = TypeVar("T")
    class FileReader(Generic[T], ABC):
    """
    Abstract file reader
    """
    @abstractmethod
    def read(self) -> T:
    """
    Read the opened file into a pythonic representation specified by target_type.
    Will automatically figure out if target_type is a dict or a regular object
    """
    @abstractmethod
    def close(self):
    """
    Close the underlying file
    """
    def load(self, instance: T):
    """
    Load all the data from the underlying HDF file
    to preserve it in the objects after closing the
    file.
    """
    def __enter__(self):
    return self.read()
    def __exit__(self, exc_type, exc_val, exc_tb):
    self.close()
    def __del__(self):
    self.close()
    class DataReader(ABC):
    """
    Abstract data reader
    """
    @abstractmethod
    def read_member(self, obj, name: str, target_type, optional: bool):
    """
    Read given member from underlying file
    """
    @abstractmethod
    def read_attribute(self, name, owner, from_member, optional):
    """
    Read given attribute from underlying file
    """
    # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    General utils
    """
    from typing import Optional, Type, get_type_hints, get_args, get_origin
    from numpy import ndarray
    from ._monitoring import MonitoredWrapper
    def _extract_type(owner: object, name: str) -> Optional[Type]:
    type_hints = get_type_hints(owner)
    return type_hints[name] if name in type_hints else None
    def _extract_base_type(target_type: Type):
    args = get_args(target_type)
    if len(args) >= 2:
    return args[1]
    return [
    get_args(b)[1] for b in target_type.__orig_bases__ if get_origin(b) is dict
    ][0]
    def _wrap(target_type, value, callback):
    if get_origin(target_type) is list:
    return MonitoredWrapper(callback, value)
    if target_type is ndarray:
    return MonitoredWrapper(callback, value)
    return value
    # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains classes to handle file writing
    """
    from abc import ABC, abstractmethod
    from typing import TypeVar
    from ._readers import FileReader, DataReader
    T = TypeVar("T")
    class FileWriter(FileReader[T], ABC):
    """
    Abstract file writer
    """
    def __init__(self, create):
    self._create = create
    @abstractmethod
    def create(self) -> T:
    """
    Create the object representing the file
    """
    @abstractmethod
    def open(self) -> T:
    """
    Create the object representing the file
    """
    def __enter__(self):
    if self._create:
    return self.create()
    return self.open()
    class DataWriter(DataReader, ABC):
    """
    Abstract data writer
    """
    @abstractmethod
    def write_member(self, name: str, target_type, value):
    """
    Write given member to underlying file
    """
    @abstractmethod
    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def write_attribute(self, instance, name, owner, from_member, optional, value):
    """
    Write given attribute to underlying file
    """
    # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
    """ Cool module containing functions, classes and other useful things """
    def greeter():
    """Prints a nice message"""
    print("Hello World!")
    # Copyright (C) 2025 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Utils to handle transformation of HDF5 specific classes to pythonic objects
    """
    from collections.abc import MutableMapping
    from inspect import get_annotations, getattr_static
    from typing import Type, TypeVar, get_origin
    from numpy import ndarray
    T = TypeVar("T")
    def _assert_is_dataset(value):
    if issubclass(type(value), MutableMapping):
    raise TypeError(
    f"Only <Dataset> can be mappet do primitive type while "
    f"value is of type <{type(value).__name__}>"
    )
    def _assert_is_group(value):
    if not issubclass(type(value), MutableMapping):
    raise TypeError(
    "Only Group can be mapped to <object> while value"
    f" is of type <{type(value).__name__}>"
    )
    def _is_attachable(target_type: Type[T]):
    origin_type = get_origin(target_type)
    if origin_type is dict:
    return False
    if get_origin(target_type) is list:
    return False
    if target_type is ndarray:
    return False
    return True
    def _attach_object(target_type: Type[T], instance):
    for cls in target_type.mro():
    annotations = get_annotations(cls)
    for annotation in annotations:
    attr = getattr_static(target_type, annotation)
    if hasattr(instance, attr.attr_name):
    setattr(instance, attr.property_name, getattr(instance, attr.attr_name))
    # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains classes to handle file reading
    """
    import inspect
    import weakref
    from inspect import getattr_static
    from typing import TypeVar, Type, List, Dict, get_origin
    import h5py
    from numpy import ndarray, zeros
    from ._hdf5_utils import (
    _assert_is_group,
    _assert_is_dataset,
    )
    from .._attribute_def import AttributeDef
    from .._lazy_dict import lazy_dict
    from .._member_def import MemberDef
    from .._readers import FileReader, DataReader
    from .._utils import _extract_base_type
    T = TypeVar("T")
    class HdfFileReader(FileReader[T]):
    """
    HDF5 specific file reader
    """
    def __init__(self, name, target_type):
    self.file_name = name
    self._is_closed = None
    self._target_type = target_type
    self._open_file(name)
    self._references: List[weakref] = []
    def _open_file(self, name):
    self._hdf5_file = h5py.File(name, "r")
    self._is_closed = False
    def read(self) -> T:
    """
    Read the opened file into a pythonic representation specified by target_type.
    Will automatically figure out if target_type is a dict or a regular object
    """
    reader = HdfDataReader.detect_reader(
    self._target_type, HdfDataReader(self, self._hdf5_file)
    )
    obj = reader(self._hdf5_file)
    return obj
    def close(self):
    """
    Close the underlying HDF file
    """
    for ref in self._references:
    obj = ref()
    if obj is not None:
    self._detach_object(obj)
    self._references = []
    if not self._is_closed:
    self._is_closed = True
    self._hdf5_file.close()
    del self._hdf5_file
    def load(self, instance: T):
    """
    Load all the data from the underlying HDF file
    to preserve it in the objects after closing the
    file.
    """
    self._references.append(weakref.ref(instance))
    target_type = type(instance)
    for annotation in [
    m[0] for m in inspect.getmembers(instance) if not m[0].startswith("_")
    ]:
    attr = inspect.getattr_static(target_type, annotation)
    if isinstance(attr, (MemberDef, AttributeDef)):
    setattr(instance, attr.attr_name, getattr(instance, attr.property_name))
    def _detach_object(self, instance):
    if not hasattr(instance, "_data_reader"):
    return
    delattr(instance, "_data_reader")
    for attr in [
    m[0]
    for m in inspect.getmembers(instance)
    if not m[0].startswith("_") and m[0] != "T"
    ]:
    item = getattr(instance, attr)
    item_type = type(item)
    if (
    item is not None
    and item is object
    and not (item_type is ndarray or item_type is str)
    ):
    self._detach_object(item)
    class HdfDataReader(DataReader):
    """
    HDF data reader
    """
    def __init__(self, file_reader: HdfFileReader, data):
    self.file_reader = file_reader
    self.data = data
    def read_member(self, obj, name, target_type, optional):
    if name not in self.data:
    if optional:
    return None
    raise KeyError(f"Could not find required key {name}")
    reader = self.detect_reader(
    target_type, self.__class__(self.file_reader, self.data[name])
    )
    return reader(self.data[name])
    def read_attribute(self, name, owner, from_member, optional):
    attrs: dict
    if from_member is None:
    attrs = self.data.attrs
    else:
    member = getattr_static(owner, from_member)
    attrs = self.data[member.name].attrs
    if name not in attrs:
    if optional:
    return None
    raise KeyError(f"Could not find required attribute key {name}")
    return attrs[name]
    @classmethod
    def _read_object(
    cls, target_type: Type[T], value, file_reader: "HdfDataReader"
    ) -> T:
    _assert_is_group(value)
    obj = target_type()
    setattr(obj, "_data_reader", cls(file_reader.file_reader, value))
    return obj
    @staticmethod
    def _read_list(value):
    _assert_is_dataset(value)
    return list(value[:])
    @classmethod
    def _read_ndarray(cls, target_type: Type[T], value, file_reader: "HdfDataReader"):
    _assert_is_dataset(value)
    nd_value = zeros(value.shape, value.dtype)
    # convert the data set to a numpy array
    value.read_direct(nd_value)
    if target_type is ndarray:
    return nd_value
    obj = nd_value.view(target_type)
    setattr(obj, "_data_reader", cls(file_reader.file_reader, value))
    return obj
    @classmethod
    def _read_dict(
    cls, target_type: Type[T], value, dict_type, data_reader: "HdfDataReader"
    ) -> Dict[str, T]:
    reader = cls.detect_reader(target_type, data_reader)
    result = lazy_dict(dict_type, lambda k: reader(value[k]))
    for k in value.keys():
    result[k] = lambda n=k: reader(value[n])
    if dict_type is not dict:
    setattr(result, "_data_reader", cls(data_reader.file_reader, value))
    return result
    @classmethod
    def detect_reader(cls, target_type, data_reader: "HdfDataReader"):
    """
    Detect the required reader based on expected type
    """
    origin_type = get_origin(target_type)
    if origin_type is dict:
    return lambda value: cls._read_dict(
    _extract_base_type(target_type), value, dict, data_reader
    )
    if get_origin(target_type) is list:
    return cls._read_list
    if issubclass(target_type, ndarray):
    return lambda value: cls._read_ndarray(target_type, value, data_reader)
    if issubclass(target_type, dict):
    return lambda value: cls._read_dict(
    _extract_base_type(target_type), value, target_type, data_reader
    )
    return lambda value: cls._read_object(target_type, value, data_reader)
    def read_hdf5(name, target_type: Type[T]) -> FileReader[T]:
    """
    Open a HDF5 file by name/path
    """
    return HdfFileReader[T](name, target_type)
    # Copyright (C) 2024 ASTRON (Netherlands Institute for Radio Astronomy)
    # SPDX-License-Identifier: Apache-2.0
    """
    Contains classes to handle file writing
    """
    from inspect import getattr_static
    from typing import TypeVar, Type, Dict, get_origin
    import h5py
    from numpy import ndarray
    from ._hdf5_utils import (
    _is_attachable,
    _attach_object,
    _assert_is_group,
    _assert_is_dataset,
    )
    from ._hdf_readers import HdfFileReader, HdfDataReader
    from .._lazy_dict import LazyDict
    from .._utils import _wrap, _extract_base_type
    from .._writers import FileWriter, DataWriter
    T = TypeVar("T")
    class HdfFileWriter(HdfFileReader[T], FileWriter[T]):
    """
    HDF5 specific file writer
    """
    def __init__(self, name, target_type, create):
    self._create = create
    self.writers: list[HdfDataWriter] = []
    super().__init__(name, target_type)
    def _open_file(self, name):
    self._hdf5_file = h5py.File(name, "w" if self._create else "a")
    self._is_closed = False
    def flush(self):
    """
    Flush all registered writers
    """
    for writer in self.writers:
    writer.flush()
    self.writers = []
    if not self._is_closed:
    self._hdf5_file.flush()
    def close(self):
    self.flush()
    super().close()
    def open(self) -> T:
    return self.create()
    def create(self) -> T:
    """
    Create the object representing the HDF file
    """
    data_writer = HdfDataWriter(self, self._hdf5_file)
    reader = HdfDataWriter.detect_reader(self._target_type, data_writer)
    obj = reader(self._hdf5_file)
    if isinstance(obj, dict):
    obj = _wrap(
    self._target_type,
    obj,
    lambda value: HdfDataWriter.write_dict(
    self._target_type,
    self._hdf5_file,
    value,
    data_writer,
    ),
    )
    try:
    setattr(obj, "_data_writer", data_writer)
    except AttributeError:
    pass
    return obj
    class HdfDataWriter(HdfDataReader, DataWriter):
    """
    HDF data writer
    """
    def read_member(self, obj, name, target_type, optional):
    instance = super().read_member(obj, name, target_type, optional)
    return _wrap(
    target_type,
    instance,
    lambda a: setattr(obj, name, a),
    )
    @classmethod
    def _read_dict(
    cls, target_type: Type[T], value, dict_type, data_reader: "HdfDataWriter"
    ) -> Dict[str, T]:
    obj = super()._read_dict(target_type, value, dict_type, data_reader)
    data_writer = cls(data_reader.file_writer, value)
    if dict_type is not dict:
    setattr(obj, "_data_writer", data_writer)
    if isinstance(obj, LazyDict):
    obj.setup_write(
    lambda k, v: cls.write_dict_member(
    target_type, value, k, v, data_writer
    )
    )
    return obj
    @classmethod
    def _read_object(
    cls, target_type: Type[T], value, file_reader: "HdfDataWriter"
    ) -> T:
    obj = super()._read_object(target_type, value, file_reader)
    setattr(obj, "_data_writer", cls(file_reader.file_writer, value))
    return obj
    def __init__(self, file_writer: HdfFileWriter, data):
    self.file_writer = file_writer
    self.file_writer.writers.append(self)
    self.data = data
    self.write_actions = []
    super().__init__(file_writer, data)
    super(HdfDataReader, self).__init__()
    def write_member(self, name: str, target_type: Type[T], value):
    data = self.data
    writer = self.detect_writer(target_type, self)
    writer(data, name, value)
    if _is_attachable(target_type):
    _attach_object(target_type, value)
    def flush(self):
    """
    Executed all pending write actions
    """
    for action in self.write_actions:
    action()
    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def write_attribute(self, instance, name, owner, from_member, optional, value):
    self.write_actions.append(
    lambda: self._write_attribute(name, owner, from_member, value)
    )
    def _write_attribute(self, name, owner, from_member, value):
    attrs = self._resolve_attrs(owner, from_member)
    try:
    attrs[name] = value
    except (RuntimeError, TypeError) as exc:
    raise ValueError(
    f"Failed to write to attribute {self.data.name}.{name}"
    ) from exc
    def _resolve_attrs(self, owner, from_member):
    """
    Finds the right attribute to write into
    """
    if from_member is None:
    return self.data.attrs
    member = getattr_static(owner, from_member)
    return self.data[member.name].attrs
    @classmethod
    def detect_writer(cls, target_type, data_writer: "HdfDataWriter"):
    """
    Detect required writer based on expected type
    """
    origin_type = get_origin(target_type)
    if origin_type is dict:
    return lambda data, key, value: cls._write_dict_group(
    target_type, data, key, value, data_writer
    )
    if get_origin(target_type) is list:
    return lambda data, key, value: cls._write_ndarray(
    list, data, key, value, data_writer
    )
    if target_type is ndarray or issubclass(target_type, ndarray):
    return lambda data, key, value: cls._write_ndarray(
    target_type, data, key, value, data_writer
    )
    if issubclass(target_type, dict):
    return lambda data, key, value: cls._write_dict_group(
    target_type, data, key, value, data_writer
    )
    return lambda data, key, value: cls._write_object(
    target_type, data, key, value, data_writer
    )
    @classmethod
    def _write_ndarray(
    cls, target_type: Type[T], data, key, value, data_writer: "HdfDataWriter"
    ):
    _assert_is_group(data)
    if key in data:
    _assert_is_dataset(data[key])
    del data[key]
    # GZIP filter ("gzip"). Available with every installation of HDF5.
    # compression_opts sets the compression level and may be an integer from 0 to 9,
    # default is 4.
    # https://docs.h5py.org/en/stable/high/dataset.html#lossless-compression-filters
    data.create_dataset(key, data=value, compression="gzip", compression_opts=9)
    if target_type is not ndarray and issubclass(target_type, ndarray):
    data_writer = cls(data_writer.file_writer, data[key])
    setattr(value, "_data_writer", data_writer)
    setattr(value, "_data_reader", data_writer)
    _attach_object(target_type, value)
    @classmethod
    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def _write_dict_group(
    cls, target_type: Type[T], data, key, value, data_writer: "HdfDataWriter"
    ):
    _assert_is_group(data)
    if key not in data:
    data.create_group(key)
    try:
    data_writer = cls(data_writer.file_writer, data[key])
    setattr(value, "_data_writer", data_writer)
    setattr(value, "_data_reader", data_writer)
    _attach_object(target_type, value)
    except AttributeError:
    pass
    cls.write_dict(
    target_type, data[key], value, cls(data_writer.file_writer, data[key])
    )
    @classmethod
    def write_dict(
    cls, target_type: Type[T], data, value, data_writer: "HdfDataWriter"
    ):
    """
    Write given dictionary to given data group
    """
    _assert_is_group(data)
    for k in data.keys():
    if k not in value:
    del data[k]
    writer = HdfDataWriter.detect_writer(
    _extract_base_type(target_type), data_writer
    )
    for k in value.keys():
    writer(data, k, value[k])
    @classmethod
    def write_dict_member(
    cls, target_type: Type[T], data, key, value, data_writer: "HdfDataWriter"
    ):
    """
    Write single given dictionary member to given data group
    """
    _assert_is_group(data)
    writer = HdfDataWriter.detect_writer(target_type, data_writer)
    writer(data, key, value)
    @classmethod
    # pylint: disable=too-many-arguments,too-many-positional-arguments
    def _write_object(
    cls, target_type: Type[T], data, key, value: T, data_writer: "HdfDataWriter"
    ):
    _assert_is_group(data)
    if key in data:
    _assert_is_group(data[key])
    else:
    data.create_group(key)
    data_writer = cls(data_writer.file_writer, data[key])
    setattr(value, "_data_writer", data_writer)
    setattr(value, "_data_reader", data_writer)
    _attach_object(target_type, value)
    def open_hdf5(name, target_type: Type[T]) -> FileWriter[T]:
    """
    Open a HDF5 file by name/path
    """
    return HdfFileWriter[T](name, target_type, False)
    def create_hdf5(name, target_type: Type[T]) -> FileWriter[T]:
    """
    Create a HDF5 file by name/path
    """
    return HdfFileWriter[T](name, target_type, True)
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment