Skip to content
Snippets Groups Projects
Commit bb33e9bb authored by Hannes Feldt's avatar Hannes Feldt
Browse files

L2SS-1115: Add generic HDF file writer

parent fdf3b3df
No related branches found
No related tags found
1 merge request!30L2SS-1115: Add generic HDF file writer
Pipeline #55391 failed
Showing
with 1076 additions and 38 deletions
# Coverage generations # Coverage generations
cover cover
.coverage .coverage
.coverage.*
coverage.xml coverage.xml
# IDE files # IDE files
......
0.12 0.12.1
...@@ -2,13 +2,16 @@ ...@@ -2,13 +2,16 @@
## Define a model ## Define a model
The data structure of the HDF file is defined by python objects using decorators. Currently, there are two decorators available: The data structure of the HDF file is defined by python objects using decorators. Currently, there are two decorators
available:
1. `member`: defines a class property to be an HDF group or dataset depending on the type. 1. `member`: defines a class property to be an HDF group or dataset depending on the type.
2. `attribute`: defines a class property to be an HDF attribute on a group or dataset. 2. `attribute`: defines a class property to be an HDF attribute on a group or dataset.
### Dataset definition ### Dataset definition
A basic data structure to define the HDF file looks like this: A basic data structure to define the HDF file looks like this:
```python ```python
class Data: class Data:
list_of_ints: List[int] = member() list_of_ints: List[int] = member()
...@@ -16,18 +19,23 @@ class Data: ...@@ -16,18 +19,23 @@ class Data:
numpy_array: ndarray = member() numpy_array: ndarray = member()
``` ```
It is important to always use type hints. It not only makes the classes more self-explanatory during development it is also It is important to always use type hints. It not only makes the classes more self-explanatory during development it is
also
important for the file reader to guesstimate the right action to perform. important for the file reader to guesstimate the right action to perform.
In this first example we only used arrays and lists. These types always map to a dataset within HDF. By default, In this first example we only used arrays and lists. These types always map to a dataset within HDF. By default,
the reader is looking for a dataset with the name of the variable, if the dataset is named differently it can be overwritten the reader is looking for a dataset with the name of the variable, if the dataset is named differently it can be
by specifying the `name` parameter: `member(name='other_name_then_variable')`. Also, all members are required by default. overwritten
If they don't appear in the HDF file an error is thrown. This behavior can be changed by specifying the `optional` parameter: by specifying the `name` parameter: `member(name='other_name_then_variable')`. Also, all members are required by
`member(optional=False)`. default.
If they don't appear in the HDF file an error is thrown. This behavior can be changed by specifying the `optional`
parameter:
`member(optional=True)`.
### Group definition ### Group definition
HDF supports to arrange the data in groups. Groups can be defined as additional classes: HDF supports to arrange the data in groups. Groups can be defined as additional classes:
```python ```python
class SubGroup: class SubGroup:
list_of_ints: List[int] = member() list_of_ints: List[int] = member()
...@@ -40,28 +48,33 @@ class Data: ...@@ -40,28 +48,33 @@ class Data:
Additionally, all additional settings apply in the same way as they do for datasets. Additionally, all additional settings apply in the same way as they do for datasets.
### Dictionaries ### Dictionaries
A special case is the `dict`. It allows to read a set of groups or datasets using the name of the group or dataset as the key.
A special case is the `dict`. It allows to read a set of groups or datasets using the name of the group or dataset as
the key.
```python ```python
class Data: class Data:
data_dict: Dict[str, List[int]] = member() data_dict: Dict[str, List[int]] = member()
``` ```
### Attribute definition ### Attribute definition
Attributes in a HDF file can appear on groups as well as on datasets and can be defined by using `attribute()`: Attributes in a HDF file can appear on groups as well as on datasets and can be defined by using `attribute()`:
```python ```python
class Data: class Data:
first_attr: str = attribute() an_attr: str = attribute()
``` ```
The file reader will look for a attribute with the name first_attr on the group that is represented by the class `Data`. The file reader will look for an attribute with the name `an_attr` on the group that is represented by the class `Data`.
The name of the attribute can be overwritten by specifying the `name` parameter: `attribute(name='other_name')`. All attributes The name of the attribute can be overwritten by specifying the `name` parameter: `attribute(name='other_name')`. All
are required by default and will cause an exception to be thrown if they are not available. This behavior can be changed by specifying the `optional` parameter: attributes
`attribute(optional=False)`. are required by default and will cause an exception to be thrown if they are not available. This behavior can be changed
by specifying the `optional` parameter:
`attribute(optional=True)`.
In HDF also datasets can contain attributes. Since they are usually mapped to primitive types it would not be possible to access In HDF also datasets can contain attributes. Since they are usually mapped to primitive types it would not be possible
to access
these attributes. Therefor `attribute` allows to specify another member in the class by setting `from_member`. these attributes. Therefor `attribute` allows to specify another member in the class by setting `from_member`.
## Read a HDF file ## Read a HDF file
...@@ -70,5 +83,33 @@ A file can be read using `read_hdf5`: ...@@ -70,5 +83,33 @@ A file can be read using `read_hdf5`:
```python ```python
with read_hdf5('file_name.h5', Data) as data: with read_hdf5('file_name.h5', Data) as data:
a = data.first_attr a = data.an_attr
```
## Create a HDF file
A file can be created using `create_hdf5` - existing files will be overwritten:
```python
with create_hdf5('file_name.h5', Data) as data:
data.an_attr = "data"
``` ```
## Change a HDF file
A file can be changed using `open_hdf5` - the file must exist:
```python
with open_hdf5('file_name.h5', Data) as data:
data.an_attr = "new value"
```
## Data write behaviour
### members
All changes to members of the object are immediately written to the underlying HDF file. Therefore, altering the object
should be minimized to have no performance degradation.
### attributes
Attributes are written if `flush()` is invoked on the `FileWriter` or when the `with` scope is exited. This behaviour is
necessary because attributes depend on the underlying members. Therefore, the attributes can only be written after
the members.
...@@ -6,8 +6,19 @@ ...@@ -6,8 +6,19 @@
Contains classes to interact with (hdf5) files Contains classes to interact with (hdf5) files
""" """
from ._hdf5_attribute_def import attribute from ._attribute_def import attribute
from ._hdf5_member_def import member from ._member_def import member
from .hdf_file_reader import Hdf5FileReader, read_hdf5 from ._readers import FileReader
from .hdf._hdf_readers import read_hdf5
from .hdf._hdf_writers import open_hdf5, create_hdf5
from ._writers import FileWriter
__all__ = ["Hdf5FileReader", "attribute", "member", "read_hdf5"] __all__ = [
"FileReader",
"FileWriter",
"attribute",
"member",
"read_hdf5",
"open_hdf5",
"create_hdf5",
]
...@@ -4,21 +4,21 @@ ...@@ -4,21 +4,21 @@
""" """
Contains HDF5 specific classes and methods to define class members as an HDF attribute Contains HDF5 specific classes and methods to define class members as an HDF attribute
""" """
from inspect import getattr_static
from typing import Any from typing import Any
from ._hdf5_member_def import Hdf5MemberDef from ._readers import DataReader
from ._writers import DataWriter
def attribute(name: str = None, optional: bool = False, from_member: str = None): def attribute(name: str = None, optional: bool = False, from_member: str = None):
""" """
Define a class member as an attribute within a HDF5 file Define a class member as an attribute within a HDF5 file
""" """
return Hdf5AttributeDef(name, optional, from_member) return AttributeDef(name, optional, from_member)
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
class Hdf5AttributeDef: class AttributeDef:
""" """
Decorator to extract attributes of HDF5 groups and datasets to pythonic objects Decorator to extract attributes of HDF5 groups and datasets to pythonic objects
""" """
...@@ -27,7 +27,6 @@ class Hdf5AttributeDef: ...@@ -27,7 +27,6 @@ class Hdf5AttributeDef:
self.name = name self.name = name
self.from_member = from_member self.from_member = from_member
self.optional = optional self.optional = optional
self.cache = None
self.owner: Any self.owner: Any
def __set_name__(self, owner, name): def __set_name__(self, owner, name):
...@@ -35,25 +34,33 @@ class Hdf5AttributeDef: ...@@ -35,25 +34,33 @@ class Hdf5AttributeDef:
self.name = name self.name = name
self.owner = owner self.owner = owner
def __get__(self, obj, obj_type=None): def __set__(self, instance, value):
if self.cache is not None: setattr(instance, self.attr_name, value)
return self.cache
attrs = self._resolve_attrs(obj) if hasattr(instance, "_data_writer"):
writer: DataWriter = getattr(instance, "_data_writer")
writer.write_attribute(
instance, self.name, self.owner, self.from_member, self.optional, value
)
if self.name not in attrs: def __get__(self, instance, obj_type=None):
if self.optional: if hasattr(instance, self.attr_name):
return None return getattr(instance, self.attr_name)
raise KeyError(f"Could not find required attribute key {self.name}")
self.cache = attrs[self.name]
return self.cache
def _resolve_attrs(self, obj): if hasattr(instance, "_data_reader"):
data = getattr(obj, "_data") reader: DataReader = getattr(instance, "_data_reader")
attr = reader.read_attribute(
self.name, self.owner, self.from_member, self.optional
)
setattr(instance, self.attr_name, attr)
return attr
return None
@property
def attr_name(self):
"""
Name used to store the value in the owning object
"""
if self.from_member is None: if self.from_member is None:
return data.attrs return f"_a_{self.name}"
return f"_a_{self.from_member}_{self.name}"
member: Hdf5MemberDef = getattr_static(self.owner, self.from_member)
return data[member.name].attrs
...@@ -5,11 +5,9 @@ ...@@ -5,11 +5,9 @@
# pylint: skip-file # pylint: skip-file
""" """
Contains required methods missing in pyton 3.7. Will be obsolete as soon as python 3.7 Contains required methods missing in older python versions.
goes eol.
""" """
try: try:
# Python >=3.8 should have these functions already # Python >=3.8 should have these functions already
from typing import get_args from typing import get_args
...@@ -34,3 +32,71 @@ except ImportError: ...@@ -34,3 +32,71 @@ except ImportError:
res = (list(res[:-1]), res[-1]) res = (list(res[:-1]), res[-1])
return res return res
return () return ()
try:
from inspect import get_annotations
except ImportError:
import sys
import types
import functools
def get_annotations(obj, *, globals=None, locals=None, eval_str=False):
"""
Simplified copy from the Python 3.10 inspect module, only supporting
get_annotation on types.
https://github.com/python/cpython/blob/3.10/Lib/inspect.py
"""
# class
obj_dict = getattr(obj, "__dict__", None)
if obj_dict and hasattr(obj_dict, "get"):
ann = obj_dict.get("__annotations__", None)
if isinstance(ann, types.GetSetDescriptorType):
ann = None
else:
ann = None
obj_globals = None
module_name = getattr(obj, "__module__", None)
if module_name:
module = sys.modules.get(module_name, None)
if module:
obj_globals = getattr(module, "__dict__", None)
obj_locals = dict(vars(obj))
unwrap = obj
if ann is None:
return {}
if not isinstance(ann, dict):
raise ValueError(f"{obj!r}.__annotations__ is neither a dict nor None")
if not ann:
return {}
if not eval_str:
return dict(ann)
if unwrap is not None:
while True:
if hasattr(unwrap, "__wrapped__"):
unwrap = unwrap.__wrapped__
continue
if isinstance(unwrap, functools.partial):
unwrap = unwrap.func
continue
break
if hasattr(unwrap, "__globals__"):
obj_globals = unwrap.__globals__
if globals is None:
globals = obj_globals
if locals is None:
locals = obj_locals
return_value = {
key: value if not isinstance(value, str) else eval(value, globals, locals)
for key, value in ann.items()
}
return return_value
...@@ -8,19 +8,20 @@ of HDF5 files ...@@ -8,19 +8,20 @@ of HDF5 files
from typing import Type from typing import Type
from ._hdf5_utils import _detect_reader from ._readers import DataReader
from ._utils import _extract_type from ._utils import _extract_type
from ._writers import DataWriter
def member(name: str = None, optional: bool = False): def member(name: str = None, optional: bool = False):
""" """
Define a class member as a member of a HDF5 file Define a class member as a member of a HDF5 file
""" """
return Hdf5MemberDef(name, optional) return MemberDef(name, optional)
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
class Hdf5MemberDef: class MemberDef:
""" """
Decorator to handle the transformation of HDF5 groups Decorator to handle the transformation of HDF5 groups
and datasets to pythonic objects and datasets to pythonic objects
...@@ -29,27 +30,36 @@ class Hdf5MemberDef: ...@@ -29,27 +30,36 @@ class Hdf5MemberDef:
def __init__(self, name: str, optional: bool): def __init__(self, name: str, optional: bool):
self.name = name self.name = name
self.optional = optional self.optional = optional
self.cache = None
self.attr_name: str
self.type: Type self.type: Type
def __set_name__(self, owner, name): def __set_name__(self, owner, name):
self.attr_name = name
if self.name is None: if self.name is None:
self.name = name self.name = name
self.type = _extract_type(owner, name) self.type = _extract_type(owner, name)
def __get__(self, obj, obj_type=None): def __get__(self, instance, obj_type=None):
if self.cache is not None: if hasattr(instance, "_data_reader"):
return self.cache reader: DataReader = getattr(instance, "_data_reader")
return reader.read_member(instance, self.name, self.type, self.optional)
data = getattr(obj, "_data") if hasattr(instance, self.attr_name):
return getattr(instance, self.attr_name)
if self.name not in data:
if self.optional:
return None return None
raise KeyError(f"Could not find required key {self.name}")
reader = _detect_reader(self.type) def __set__(self, instance, value):
self.cache = reader(data[self.name]) if not hasattr(instance, "_data_writer"):
return self.cache setattr(instance, self.attr_name, value)
return
writer: DataWriter = getattr(instance, "_data_writer")
writer.write_member(self.name, self.type, value)
if hasattr(instance, self.attr_name):
delattr(instance, self.attr_name)
@property
def attr_name(self):
"""
Name used to store the value in the owning object
"""
return f"_v_{self.name}"
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Class wrappers for lists and dictionaries monitoring changes of itself and notifying
the registered event handler about these changes.
"""
class MonitoredWrapper:
"""
A wrapper monitoring changes of itself and notifying the registered event handler
about changes.
"""
def __init__(self, event, instance):
self._event = event
self._instance = instance
def __setitem__(self, key, value):
self._instance.__setitem__(key, value)
self._event(self._instance)
def __getitem__(self, item):
return self._instance.__getitem__(item)
def __getattribute__(self, name):
if name in ["_instance", "_event"]:
return object.__getattribute__(self, name)
attr = object.__getattribute__(self._instance, name)
if hasattr(attr, "__call__"):
def wrapper(*args, **kwargs):
result = attr(*args, **kwargs)
self._event(self._instance)
return result
return wrapper
return attr
...@@ -2,43 +2,31 @@ ...@@ -2,43 +2,31 @@
# SPDX-License-Identifier: Apache-2.0 # SPDX-License-Identifier: Apache-2.0
""" """
Contains classes to handle file reading Contains classes to handle reading
""" """
from abc import ABC, abstractmethod
from typing import TypeVar, Generic from typing import TypeVar, Generic
import h5py
from ._hdf5_utils import _detect_reader
T = TypeVar("T") T = TypeVar("T")
class Hdf5FileReader(Generic[T]): class FileReader(Generic[T], ABC):
""" """
HDF5 specific file reader Abstract file reader
""" """
def __init__(self, name, target_type): @abstractmethod
self._hdf5_file = h5py.File(name, "r")
self._target_type = target_type
self._is_closed = False
def read(self) -> T: def read(self) -> T:
""" """
Read the opened file into a pythonic representation specified by target_type. Read the opened file into a pythonic representation specified by target_type.
Will automatically figure out if target_type is a dict or a regular object Will automatically figure out if target_type is a dict or a regular object
""" """
reader = _detect_reader(self._target_type)
return reader(self._hdf5_file)
@abstractmethod
def close(self): def close(self):
""" """
Close the underlying HDF file Close the underlying file
""" """
if not self._is_closed:
self._is_closed = True
del self._hdf5_file
def __enter__(self): def __enter__(self):
return self.read() return self.read()
...@@ -50,8 +38,19 @@ class Hdf5FileReader(Generic[T]): ...@@ -50,8 +38,19 @@ class Hdf5FileReader(Generic[T]):
self.close() self.close()
def read_hdf5(name: str, target_type: T): class DataReader(ABC):
"""
Abstract data reader
"""
@abstractmethod
def read_member(self, obj, name: str, target_type, optional: bool):
"""
Read given member from underlying file
"""
@abstractmethod
def read_attribute(self, name, owner, from_member, optional):
""" """
Open a HDF5 file by name/path Read given attribute from underlying file
""" """
return Hdf5FileReader[T](name, target_type)
...@@ -6,7 +6,10 @@ General utils ...@@ -6,7 +6,10 @@ General utils
""" """
from typing import Optional, Type, get_type_hints from typing import Optional, Type, get_type_hints
from numpy import ndarray
from ._compat_utils import get_args, get_origin from ._compat_utils import get_args, get_origin
from ._monitoring import MonitoredWrapper
def _extract_type(owner: object, name: str) -> Optional[Type]: def _extract_type(owner: object, name: str) -> Optional[Type]:
...@@ -22,3 +25,16 @@ def _extract_base_type(target_type: Type): ...@@ -22,3 +25,16 @@ def _extract_base_type(target_type: Type):
return [ return [
get_args(b)[1] for b in target_type.__orig_bases__ if get_origin(b) is dict get_args(b)[1] for b in target_type.__orig_bases__ if get_origin(b) is dict
][0] ][0]
def _wrap(target_type, value, callback):
origin_type = get_origin(target_type)
if origin_type is dict:
return MonitoredWrapper(callback, value)
if get_origin(target_type) is list:
return MonitoredWrapper(callback, value)
if target_type is ndarray:
return MonitoredWrapper(callback, value)
if issubclass(target_type, dict):
return MonitoredWrapper(callback, value)
return value
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains classes to handle file writing
"""
from abc import ABC, abstractmethod
from typing import TypeVar
from ._readers import FileReader, DataReader
T = TypeVar("T")
class FileWriter(FileReader[T], ABC):
"""
Abstract file writer
"""
def __init__(self, create):
self._create = create
@abstractmethod
def create(self) -> T:
"""
Create the object representing the file
"""
@abstractmethod
def open(self) -> T:
"""
Create the object representing the file
"""
def __enter__(self):
if self._create:
return self.create()
return self.open()
class DataWriter(DataReader, ABC):
"""
Abstract data writer
"""
@abstractmethod
def write_member(self, name: str, target_type, value):
"""
Write given member to underlying file
"""
@abstractmethod
# pylint: disable=too-many-arguments
def write_attribute(self, instance, name, owner, from_member, optional, value):
"""
Write given attribute to underlying file
"""
...@@ -4,13 +4,13 @@ ...@@ -4,13 +4,13 @@
""" """
Utils to handle transformation of HDF5 specific classes to pythonic objects Utils to handle transformation of HDF5 specific classes to pythonic objects
""" """
import inspect
from collections.abc import MutableMapping from collections.abc import MutableMapping
from typing import Type, TypeVar, Dict from typing import Type, TypeVar
from numpy import zeros, ndarray from numpy import ndarray
from ._compat_utils import get_origin from .._compat_utils import get_origin, get_annotations
from ._utils import _extract_base_type
T = TypeVar("T") T = TypeVar("T")
...@@ -31,46 +31,29 @@ def _assert_is_group(value): ...@@ -31,46 +31,29 @@ def _assert_is_group(value):
) )
def _read_object(target_type: Type[T], value) -> T: def _write_ndarray(data, key, value):
_assert_is_group(value) _assert_is_group(data)
obj = target_type() if key in data:
setattr(obj, "_data", value) _assert_is_dataset(data[key])
return obj del data[key]
data.create_dataset(key, data=value)
def _read_list(value):
_assert_is_dataset(value)
return value[:]
def _is_attachable(target_type: Type[T]):
def _read_ndarray(value):
_assert_is_dataset(value)
nd_value = zeros(value.shape, value.dtype)
# convert the data set to a numpy array
value.read_direct(nd_value)
return nd_value
def _read_dict(target_type: Type[T], value, dict_type) -> Dict[str, T]:
result = dict_type()
reader = _detect_reader(target_type)
for k in value.keys():
result[k] = reader(value[k])
if dict_type is not dict:
setattr(result, "_data", value)
return result
def _detect_reader(target_type):
origin_type = get_origin(target_type) origin_type = get_origin(target_type)
if origin_type is dict: if origin_type is dict:
return lambda value: _read_dict(_extract_base_type(target_type), value, dict) return False
if get_origin(target_type) is list: if get_origin(target_type) is list:
return _read_list return False
if target_type is ndarray: if target_type is ndarray:
return _read_ndarray return False
if issubclass(target_type, dict): return True
return lambda value: _read_dict(
_extract_base_type(target_type), value, target_type
) def _attach_object(target_type: Type[T], instance):
return lambda value: _read_object(target_type, value) annotations = get_annotations(target_type)
for annotation in annotations:
attr = inspect.getattr_static(target_type, annotation)
if hasattr(instance, attr.attr_name):
setattr(instance, attr.name, getattr(instance, attr.attr_name))
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains classes to handle file reading
"""
from inspect import getattr_static
from typing import TypeVar, Type, Dict
import h5py
from numpy import ndarray, zeros
from ._hdf5_utils import (
_assert_is_group,
_assert_is_dataset,
)
from .._compat_utils import get_origin
from .._readers import FileReader, DataReader
from .._utils import _extract_base_type
T = TypeVar("T")
class HdfFileReader(FileReader[T]):
"""
HDF5 specific file reader
"""
def __init__(self, name, target_type):
self._is_closed = None
self._target_type = target_type
self._open_file(name)
def _open_file(self, name):
self._hdf5_file = h5py.File(name, "r")
self._is_closed = False
def read(self) -> T:
"""
Read the opened file into a pythonic representation specified by target_type.
Will automatically figure out if target_type is a dict or a regular object
"""
reader = HdfDataReader.detect_reader(
self._target_type, HdfDataReader(self, self._hdf5_file)
)
obj = reader(self._hdf5_file)
return obj
def close(self):
"""
Close the underlying HDF file
"""
if not self._is_closed:
self._is_closed = True
del self._hdf5_file
class HdfDataReader(DataReader):
"""
HDF data reader
"""
def __init__(self, file_reader: HdfFileReader, data):
self.file_reader = file_reader
self.data = data
def read_member(self, obj, name, target_type, optional):
if name not in self.data:
if optional:
return None
raise KeyError(f"Could not find required key {name}")
reader = self.detect_reader(
target_type, self.__class__(self.file_reader, self.data[name])
)
return reader(self.data[name])
def read_attribute(self, name, owner, from_member, optional):
attrs: dict
if from_member is None:
attrs = self.data.attrs
else:
member = getattr_static(owner, from_member)
attrs = self.data[member.name].attrs
if name not in attrs:
if optional:
return None
raise KeyError(f"Could not find required attribute key {name}")
return attrs[name]
@classmethod
def _read_object(
cls, target_type: Type[T], value, file_reader: "HdfDataReader"
) -> T:
_assert_is_group(value)
obj = target_type()
setattr(obj, "_data_reader", cls(file_reader.file_reader, value))
return obj
@staticmethod
def _read_list(value):
_assert_is_dataset(value)
return list(value[:])
@staticmethod
def _read_ndarray(value):
_assert_is_dataset(value)
nd_value = zeros(value.shape, value.dtype)
# convert the data set to a numpy array
value.read_direct(nd_value)
return nd_value
@classmethod
def _read_dict(
cls, target_type: Type[T], value, dict_type, data_reader: "HdfDataReader"
) -> Dict[str, T]:
result = dict_type()
reader = cls.detect_reader(target_type, data_reader)
for k in value.keys():
result[k] = reader(value[k])
if dict_type is not dict:
setattr(result, "_data_reader", cls(data_reader.file_reader, value))
return result
@classmethod
def detect_reader(cls, target_type, data_reader: "HdfDataReader"):
"""
Detect the required reader based on expected type
"""
origin_type = get_origin(target_type)
if origin_type is dict:
return lambda value: cls._read_dict(
_extract_base_type(target_type), value, dict, data_reader
)
if get_origin(target_type) is list:
return cls._read_list
if target_type is ndarray:
return cls._read_ndarray
if issubclass(target_type, dict):
return lambda value: cls._read_dict(
_extract_base_type(target_type), value, target_type, data_reader
)
return lambda value: cls._read_object(target_type, value, data_reader)
def read_hdf5(name: str, target_type: Type[T]) -> FileReader[T]:
"""
Open a HDF5 file by name/path
"""
return HdfFileReader[T](name, target_type)
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
"""
Contains classes to handle file writing
"""
from inspect import getattr_static
from typing import TypeVar, Type, Dict
import h5py
from numpy import ndarray
from ._hdf5_utils import (
_is_attachable,
_attach_object,
_write_ndarray,
_assert_is_group,
)
from ._hdf_readers import HdfFileReader, HdfDataReader
from .._writers import FileWriter, DataWriter
from .._utils import _wrap, _extract_base_type
from .._compat_utils import get_origin
T = TypeVar("T")
class HdfFileWriter(HdfFileReader[T], FileWriter[T]):
"""
HDF5 specific file writer
"""
def __init__(self, name, target_type, create):
self._create = create
self.writers: list[HdfDataWriter] = []
super().__init__(name, target_type)
def _open_file(self, name):
self._hdf5_file = h5py.File(name, "w" if self._create else "a")
self._is_closed = False
def flush(self):
"""
Flush all registered writers
"""
for writer in self.writers:
writer.flush()
self.writers = []
def close(self):
self.flush()
super().close()
def open(self) -> T:
return self.create()
def create(self) -> T:
"""
Create the object representing the HDF file
"""
data_writer = HdfDataWriter(self, self._hdf5_file)
reader = HdfDataWriter.detect_reader(self._target_type, data_writer)
obj = reader(self._hdf5_file)
if isinstance(obj, dict):
obj = _wrap(
self._target_type,
obj,
lambda value: HdfDataWriter.write_dict(
_extract_base_type(self._target_type),
self._hdf5_file,
value,
data_writer,
),
)
setattr(obj, "_data_writer", data_writer)
return obj
class HdfDataWriter(HdfDataReader, DataWriter):
"""
HDF data writer
"""
def read_member(self, obj, name, target_type, optional):
instance = super().read_member(obj, name, target_type, optional)
return _wrap(target_type, instance, lambda a: setattr(obj, name, a))
@classmethod
def _read_dict(
cls, target_type: Type[T], value, dict_type, data_reader: "HdfDataWriter"
) -> Dict[str, T]:
obj = super()._read_dict(target_type, value, dict_type, data_reader)
if dict_type is not dict:
setattr(obj, "_data_writer", cls(data_reader.file_writer, value))
return obj
@classmethod
def _read_object(
cls, target_type: Type[T], value, file_reader: "HdfDataWriter"
) -> T:
obj = super()._read_object(target_type, value, file_reader)
setattr(obj, "_data_writer", cls(file_reader.file_writer, value))
return obj
def __init__(self, file_writer: HdfFileWriter, data):
self.file_writer = file_writer
self.file_writer.writers.append(self)
self.data = data
self.write_actions = []
super().__init__(file_writer, data)
def write_member(self, name: str, target_type: Type[T], value):
data = self.data
writer = self.detect_writer(target_type, self)
writer(data, name, value)
if _is_attachable(target_type):
_attach_object(target_type, value)
def flush(self):
"""
Executed all pending write actions
"""
for action in self.write_actions:
action()
# pylint: disable=too-many-arguments
def write_attribute(self, instance, name, owner, from_member, optional, value):
self.write_actions.append(
lambda: self._write_attribute(name, owner, from_member, value)
)
def _write_attribute(self, name, owner, from_member, value):
attrs = self._resolve_attrs(owner, from_member)
attrs[name] = value
def _resolve_attrs(self, owner, from_member):
"""
Finds the right attribute to write into
"""
if from_member is None:
return self.data.attrs
member = getattr_static(owner, from_member)
return self.data[member.name].attrs
@classmethod
def detect_writer(cls, target_type, data_writer: "HdfDataWriter"):
"""
Detect required writer based on expected type
"""
origin_type = get_origin(target_type)
if origin_type is dict:
return lambda data, key, value: cls._write_dict_group(
_extract_base_type(target_type), data, key, value, data_writer
)
if get_origin(target_type) is list:
return _write_ndarray
if target_type is ndarray:
return _write_ndarray
if issubclass(target_type, dict):
return lambda data, key, value: cls._write_dict_group(
_extract_base_type(target_type), data, key, value, data_writer
)
return lambda data, key, value: cls._write_object(
target_type, data, key, value, data_writer
)
@classmethod
# pylint: disable=too-many-arguments
def _write_dict_group(
cls, target_type: Type[T], data, key, value, data_writer: "HdfDataWriter"
):
_assert_is_group(data)
if key not in data:
data.create_group(key)
cls.write_dict(
target_type, data[key], value, cls(data_writer.file_writer, data[key])
)
@classmethod
def write_dict(
cls, target_type: Type[T], data, value, data_writer: "HdfDataWriter"
):
"""
Write given dictionary to given data group
"""
_assert_is_group(data)
for k in data.keys():
if k not in value:
del data[k]
writer = HdfDataWriter.detect_writer(target_type, data_writer)
for k in value.keys():
writer(data, k, value[k])
@classmethod
# pylint: disable=too-many-arguments
def _write_object(
cls, target_type: Type[T], data, key, value: T, data_writer: "HdfDataWriter"
):
_assert_is_group(data)
if key in data:
_assert_is_group(data[key])
else:
data.create_group(key)
data_writer = cls(data_writer.file_writer, data[key])
setattr(value, "_data_writer", data_writer)
setattr(value, "_data_reader", data_writer)
_attach_object(target_type, value)
def open_hdf5(name: str, target_type: Type[T]) -> FileWriter[T]:
"""
Open a HDF5 file by name/path
"""
return HdfFileWriter[T](name, target_type, False)
def create_hdf5(name: str, target_type: Type[T]) -> FileWriter[T]:
"""
Create a HDF5 file by name/path
"""
return HdfFileWriter[T](name, target_type, True)
...@@ -57,24 +57,18 @@ class TestHdf5FileReader(base.TestCase): ...@@ -57,24 +57,18 @@ class TestHdf5FileReader(base.TestCase):
) as ds: ) as ds:
self.assertEqual(21, len(ds.keys())) self.assertEqual(21, len(ds.keys()))
item = ds["SST_2022-11-15T14:21:59.000+00:00"] item = ds["SST_2022-11-15T14:21:59.000+00:00"]
self.assertFalse( self.assertEqual(
( [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] item.nof_payload_errors,
- item.nof_payload_errors
).any()
) )
# double read to check if (cached) value is the same # double read to check if (cached) value is the same
self.assertFalse( self.assertEqual(
( [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] item.nof_payload_errors,
- item.nof_payload_errors
).any()
) )
self.assertFalse( self.assertEqual(
( [12, 12, 12, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[12, 12, 12, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] item.nof_valid_payloads,
- item.nof_valid_payloads
).any()
) )
self.assertIsNone(item.non_existent) self.assertIsNone(item.non_existent)
self.assertEqual(192, len(item.values)) self.assertEqual(192, len(item.values))
......
# Copyright (C) 2022 ASTRON (Netherlands Institute for Radio Astronomy)
# SPDX-License-Identifier: Apache-2.0
from os.path import dirname
from typing import List, Dict
from numpy import ndarray, array
from lofar_station_client.file_access import (
member,
attribute,
create_hdf5,
read_hdf5,
open_hdf5,
)
from tests import base
class SimpleSet:
values: ndarray = member()
class DataSubSet:
values: List[int] = member()
dict_test_ndarray: Dict[str, ndarray] = member()
dict_test_object: Dict[str, SimpleSet] = member()
class DataSet:
observation_station: str = attribute()
observation_source: str = attribute(from_member="sub_set")
nof_payload_errors: List[int] = member()
values: List[List[float]] = member()
sub_set: DataSubSet = member(name="test")
non_existent: DataSubSet = member(optional=True)
class TestHdf5FileWriter(base.TestCase):
def test_simple_writing(self):
with create_hdf5(dirname(__file__) + "/test_simple_writing.h5", DataSet) as ds:
ds.observation_station = "CS001"
ds.nof_payload_errors = [1, 2, 3, 4, 5, 6]
ds.values = [[2.0], [3.0], [4.0]]
ds.sub_set = DataSubSet()
ds.sub_set.values = [5, 4, 3, 2]
ds.observation_source = "CasA"
with read_hdf5(dirname(__file__) + "/test_simple_writing.h5", DataSet) as ds:
self.assertEqual("CS001", ds.observation_station)
self.assertEqual([1, 2, 3, 4, 5, 6], ds.nof_payload_errors)
self.assertEqual([[2.0], [3.0], [4.0]], ds.values)
self.assertIsNotNone(ds.sub_set)
self.assertEqual([5, 4, 3, 2], ds.sub_set.values)
self.assertEqual("CasA", ds.observation_source)
def test_list_writing(self):
with create_hdf5(
dirname(__file__) + "/test_list_writing.h5", DataSubSet
) as dss:
dss.values = [2, 3, 4, 5]
dss.values.append(1)
with read_hdf5(dirname(__file__) + "/test_list_writing.h5", DataSubSet) as dss:
self.assertEqual([2, 3, 4, 5, 1], dss.values)
def test_dict_writing(self):
with create_hdf5(
dirname(__file__) + "/test_dict_writing.h5", Dict[str, ndarray]
) as d:
d["test_1"] = array([1, 2, 3, 4, 5, 6])
d["test_2"] = array([6, 5, 4, 1])
with read_hdf5(
dirname(__file__) + "/test_dict_writing.h5", Dict[str, ndarray]
) as d:
self.assertFalse(([1, 2, 3, 4, 5, 6] - d["test_1"]).any())
self.assertFalse(([6, 5, 4, 1] - d["test_2"]).any())
def test_dict_altering(self):
with create_hdf5(
dirname(__file__) + "/test_dict_altering.h5", DataSubSet
) as dss:
dss.dict_test_ndarray = {
"test_1": array([2, 4, 6]),
"test_2": array([1, 3, 5]),
}
dss.dict_test_ndarray["test_3"] = array([9, 8, 7])
dss.dict_test_ndarray.pop("test_1")
ss = SimpleSet()
ss.values = array([4, 9, 3])
dss.dict_test_object = {"test_99": ss}
dss.dict_test_object["test_99"].values[0] = 5
dss.dict_test_object["test_98"] = SimpleSet()
dss.dict_test_object["test_98"].values = array([4, 9, 3])
with read_hdf5(dirname(__file__) + "/test_dict_altering.h5", DataSubSet) as dss:
self.assertTrue("test_2" in dss.dict_test_ndarray)
self.assertTrue("test_3" in dss.dict_test_ndarray)
self.assertFalse(([1, 3, 5] - dss.dict_test_ndarray["test_2"]).any())
self.assertFalse(([9, 8, 7] - dss.dict_test_ndarray["test_3"]).any())
self.assertTrue("test_99" in dss.dict_test_object)
self.assertTrue("test_98" in dss.dict_test_object)
self.assertFalse(([5, 9, 3] - dss.dict_test_object["test_99"].values).any())
self.assertFalse(([4, 9, 3] - dss.dict_test_object["test_98"].values).any())
def test_object_access(self):
ds = DataSet()
ds.observation_station = "CS001"
ds.nof_payload_errors = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
ds.values = [[1.0]]
ds.sub_set = DataSubSet()
ds.sub_set.values = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
ds.observation_source = "CasA"
self.assertEqual("CS001", ds.observation_station)
self.assertEqual(
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ds.nof_payload_errors
)
self.assertEqual([[1.0]], ds.values)
self.assertIsNotNone(ds.sub_set)
self.assertEqual(
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], ds.sub_set.values
)
self.assertEqual("CasA", ds.observation_source)
def test_attach_object(self):
with create_hdf5(dirname(__file__) + "/test_attach_object.h5", DataSet) as ds:
sub_set = DataSubSet()
sub_set.values = [7, 4, 9, 2, 9]
ds.sub_set = sub_set
ds.observation_source = "CasA"
with read_hdf5(dirname(__file__) + "/test_attach_object.h5", DataSet) as ds:
self.assertEqual([7, 4, 9, 2, 9], ds.sub_set.values)
self.assertEqual("CasA", ds.observation_source)
def test_open_write(self):
with create_hdf5(dirname(__file__) + "/test_open_write.h5", DataSet) as ds:
ds.observation_station = "CS001"
ds.nof_payload_errors = [1, 2, 3, 4, 5, 6]
ds.values = [[2.0], [3.0], [4.0]]
ds.sub_set = DataSubSet()
ds.sub_set.values = [5, 4, 3, 2]
ds.observation_source = "CasA"
with open_hdf5(dirname(__file__) + "/test_open_write.h5", DataSet) as ds:
ds.nof_payload_errors.append(7)
ds.values.append([5.0])
ds.observation_source = "ACAS"
ds.sub_set.values = [1, 2, 3]
with read_hdf5(dirname(__file__) + "/test_open_write.h5", DataSet) as ds:
self.assertEqual("CS001", ds.observation_station)
self.assertEqual([1, 2, 3, 4, 5, 6, 7], ds.nof_payload_errors)
self.assertEqual([[2.0], [3.0], [4.0], [5.0]], ds.values)
self.assertIsNotNone(ds.sub_set)
self.assertEqual([1, 2, 3], ds.sub_set.values)
self.assertEqual("ACAS", ds.observation_source)
from numpy import array
from lofar_station_client.file_access._monitoring import MonitoredWrapper
from tests import base
class TestMonitoredWrapper(base.TestCase):
def test_list(self):
invocations = []
def event(a):
invocations.append(f"Invoked with {a}")
l1 = MonitoredWrapper(event, [])
l1.append(1)
self.assertEqual("Invoked with [1]", invocations[0])
l1.append(2)
self.assertEqual("Invoked with [1, 2]", invocations[1])
l1.pop()
self.assertEqual("Invoked with [1]", invocations[2])
l2 = MonitoredWrapper(event, [1, 2, 3, 4])
l2.append(1)
self.assertEqual("Invoked with [1, 2, 3, 4, 1]", invocations[3])
l2.append(2)
self.assertEqual("Invoked with [1, 2, 3, 4, 1, 2]", invocations[4])
l2.pop()
self.assertEqual("Invoked with [1, 2, 3, 4, 1]", invocations[5])
l2[0] = 99
self.assertEqual(99, l2[0])
self.assertEqual("Invoked with [99, 2, 3, 4, 1]", invocations[6])
na = MonitoredWrapper(event, array([2, 3, 4]))
self.assertEqual((3,), na.shape)
...@@ -7,6 +7,7 @@ skipsdist = True ...@@ -7,6 +7,7 @@ skipsdist = True
[testenv] [testenv]
usedevelop = True usedevelop = True
package = editable-legacy
setenv = setenv =
LANGUAGE=en_US LANGUAGE=en_US
LC_ALL=en_US.UTF-8 LC_ALL=en_US.UTF-8
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment