diff --git a/.dockerignore b/.dockerignore index b596139559a2b0fdd4c5d14321db2878838d14f4..d6eeb976a84bda64a4321814e0b63aba78c99877 100644 --- a/.dockerignore +++ b/.dockerignore @@ -5,4 +5,4 @@ build/** **/__pycache__/** *.hdf5 *.hdf5_* -**.hdf5 \ No newline at end of file +**.hdf5 diff --git a/.gitignore b/.gitignore index 6ec11e0377fa473c64b3b57eb1caebd1295142e1..d297770400998e2f9ca31e93526ae3341d11d25f 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,4 @@ build/** *.hdf5 *.hdf5_* -l2json/_version.py \ No newline at end of file +l2json/_version.py diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml new file mode 100644 index 0000000000000000000000000000000000000000..1e87448c97f74a3289d0258960275c4d391d8350 --- /dev/null +++ b/.pre-commit-config.yaml @@ -0,0 +1,18 @@ +repos: +- repo: https://github.com/pre-commit/pre-commit-hooks + rev: v5.0.0 + hooks: + - id: check-merge-conflict + - id: check-toml + - id: check-yaml + - id: detect-private-key + - id: end-of-file-fixer + - id: trailing-whitespace + - id: mixed-line-ending +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.9.4 + hooks: + - id: ruff + args: [--fix, --show-fixes, l2json, tests] + - id: ruff-format + args: [l2json, tests] diff --git a/README.md b/README.md index b5a306655b286ee2fc61fccf7f8c1edb181e1da8..183e98f673b02619ea309c2e41714734da9f8f9e 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,15 @@ L2 JSON parser, handler, and general helper ## Installation ``` pip install . +pip install git+https://git.astron.nl/mckenna/l2json +``` + +For developers, you can additionally install the docstest target and configure the pre-commit hooks. + +```bash +pip install --editable ".[docstest]" +pre-commit install +pre-commit run --all-files ``` ## Setup @@ -41,7 +50,7 @@ l2json_minio_pull --station CS001 RS307 \ $(# Multiple stations accepted) # When using data directly from a station, you will need to include `--disable_security` export MINIO_HOSTNAME=cs001.control.lofar:9000 -l2json_minio_pull --station CS001 \ +l2json_minio_pull --station CS001 \ --type BST XST \ --field HBA0 \ --tstart 2025-04-23T12:50:00 \ diff --git a/docker/ci-runner/Dockerfile b/docker/ci-runner/Dockerfile index 1a10261a5b47a56dad5e7d9387a0e9bb2f202fcd..4ecde1753791ceb61cb7fd422f84d596cd00e106 100644 --- a/docker/ci-runner/Dockerfile +++ b/docker/ci-runner/Dockerfile @@ -14,4 +14,4 @@ RUN --mount=type=cache,target=/root/.cache/uv \ uv pip install --system --upgrade tox /data/l2json/ && \ l2json_minio_pull -h && \ l2json_minio_sync -h && \ - which l2json_minio_pull \ No newline at end of file + which l2json_minio_pull diff --git a/l2json/__init__.py b/l2json/__init__.py index e73ac131e272dce14f05cf642e06fb264604e8c7..9bea3ad22e3870d3419097bcc17bc13f61871999 100644 --- a/l2json/__init__.py +++ b/l2json/__init__.py @@ -1,4 +1,4 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -""" l2json """ +"""l2json""" diff --git a/l2json/data_types/__init__.py b/l2json/data_types/__init__.py index e73ac131e272dce14f05cf642e06fb264604e8c7..9bea3ad22e3870d3419097bcc17bc13f61871999 100644 --- a/l2json/data_types/__init__.py +++ b/l2json/data_types/__init__.py @@ -1,4 +1,4 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -""" l2json """ +"""l2json""" diff --git a/l2json/data_types/dataset.py b/l2json/data_types/dataset.py index dec412b6b85941e1b5c41c3c4b30499eccd6054c..3195691c4d3315c17c26eba6e38a2508a047b745 100644 --- a/l2json/data_types/dataset.py +++ b/l2json/data_types/dataset.py @@ -34,8 +34,10 @@ class XSTDataset(Dataset): subband: int frequency: float + BSX_Dataset = BSTDataset | SSTDataset | XSTDataset + @dataclass(frozen=True) class DatasetSharedMetadata: frequency_band: list[str] @@ -54,7 +56,7 @@ class DatasetSharedMetadata: # station_id: int # Payload # station_info: int # Payload - reference: str # Custom mapping back to data source + reference: str # Custom mapping back to data source dataset: Dataset @@ -84,8 +86,10 @@ class HBADataset(DatasetSharedMetadata): tile_beam_pointing_direction: list[str] tile_beam_tracking_enabled: list[bool] + Field_Dataset = LBADataset | HBADataset + @dataclass class StationInformation: antenna_to_sdp_mapping: list[str] @@ -107,9 +111,7 @@ data_type_dset_mapping: dict[DataTypeStr, type[BSX_Dataset]] = { } -def get_dset_type( - field: str, data_type: DataTypeStr -) -> tuple[type[Field_Dataset], type[BSX_Dataset]]: +def get_dset_type(field: str, data_type: DataTypeStr) -> tuple[type[Field_Dataset], type[BSX_Dataset]]: antfield = field[:3].upper() data_type = data_type @@ -137,20 +139,19 @@ class LOFAR2Observation: del self_dict["station_information"] self_dict["datasets"] = {} - for dset in sorted(self.datasets, key = lambda x: (x.unix_timestamp + 0.5 * x.integration_interval)): - if not isinstance( - dset.dataset, data_type_dset_mapping[self.data_type] - ): + for dset in sorted( + self.datasets, + key=lambda x: (x.unix_timestamp + 0.5 * x.integration_interval), + ): + if not isinstance(dset.dataset, data_type_dset_mapping[self.data_type]): raise RuntimeError( - f"Dataset type {type(dset.dataset)=} does not match expected type for {self.data_type} ({ data_type_dset_mapping[self.data_type]})" + f"Dataset type {type(dset.dataset)=} does not match expected type for {self.data_type} ({data_type_dset_mapping[self.data_type]})" ) dtype_suffix = "" if isinstance(dset.dataset, XSTDataset): dtype_suffix = f"_SB{dset.dataset.subband}" - self_dict["datasets"][ - f"{self.data_type}_{dset.timestamp}{dtype_suffix}" - ] = dset + self_dict["datasets"][f"{self.data_type}_{dset.timestamp}{dtype_suffix}"] = dset return self_dict diff --git a/l2json/data_types/other_data_types.py b/l2json/data_types/other_data_types.py index e94eb3ac53445041a798e48089806837b0fa926a..91eac9dee24899ffec2f29d893e80b0b0f2c7dee 100644 --- a/l2json/data_types/other_data_types.py +++ b/l2json/data_types/other_data_types.py @@ -9,7 +9,8 @@ type GenericDeviceName = Literal[ "tilebeam", "observationcontrol", ] -type DeviceName = Literal["stat/afh/hba", +type DeviceName = Literal[ + "stat/afh/hba", "stat/afh/hba0", "stat/afh/hba1", "stat/afl/lba", @@ -28,7 +29,7 @@ type DeviceName = Literal["stat/afh/hba", "stat/sdpfirmware/hba0", "stat/sdpfirmware/hba1", "stat/sdpfirmware/lba", - "stat/observationcontrol/1" + "stat/observationcontrol/1", ] type MappableAttributeName = Literal[ "antenna_to_sdp_mapping", @@ -54,4 +55,4 @@ type MappableAttributeName = Literal[ "digital_beam_tracking_enabled", "subbands", "subband_frequencies", -] \ No newline at end of file +] diff --git a/l2json/data_types/typed_json_dicts.py b/l2json/data_types/typed_json_dicts.py index 62acbfeb414bb7a19e9c17d37b40abfa9c3f7862..cd015a4b6883e89e85364652c0a31986b090b92a 100644 --- a/l2json/data_types/typed_json_dicts.py +++ b/l2json/data_types/typed_json_dicts.py @@ -23,6 +23,7 @@ class CapturedDatasetDict(TypedDict): xst_data_imag: NotRequired[list[list[float]]] reference: str + type DatasetAttributes = Literal[ "timestamp", "station", @@ -43,6 +44,7 @@ type DatasetAttributes = Literal[ "reference", ] + class ParsedDatasetDict(TypedDict): data: ndarray integration_interval: float @@ -52,15 +54,77 @@ class ParsedDatasetDict(TypedDict): subband: NotRequired[int] - # Derived from output of https://json2pyi.pages.dev/ + manual merging of HBA fields -_TypedDict_stat_antennafield = TypedDict("_TypedDict_stat_antennafield", {"antenna_to_sdp_mapping_r": list[list[int]], "antenna_names_r": list[str], "rcu_pcb_id_r": list[list[int]], "rcu_pcb_version_r": list[list[str]], "antenna_usage_mask_r": list[bool], "antenna_reference_itrf_r": list[list[float]], "frequency_band_rw": list[list[str]], "rcu_attenuator_db_r": list[list[int]], "rcu_dth_on_r": list[bool], "rcu_dth_freq_r": list[int], "antenna_status_r": list[int], "hbat_pwr_on_r": NotRequired[list[list[bool]]]}) +_TypedDict_stat_antennafield = TypedDict( + "_TypedDict_stat_antennafield", + { + "antenna_to_sdp_mapping_r": list[list[int]], + "antenna_names_r": list[str], + "rcu_pcb_id_r": list[list[int]], + "rcu_pcb_version_r": list[list[str]], + "antenna_usage_mask_r": list[bool], + "antenna_reference_itrf_r": list[list[float]], + "frequency_band_rw": list[list[str]], + "rcu_attenuator_db_r": list[list[int]], + "rcu_dth_on_r": list[bool], + "rcu_dth_freq_r": list[int], + "antenna_status_r": list[int], + "hbat_pwr_on_r": NotRequired[list[list[bool]]], + }, +) _TypedDict_stat_sdp = TypedDict("_TypedDict_stat_sdp", {"subband_frequency_r": list[list[float]]}) -_TypedDict_stat_tilebeam = TypedDict("_TypedDict_stat_tilebeam", {"pointing_direction_str_r": list[str], "tracking_enabled_r": bool}) -_TypedDict_stat_digitalbeam = TypedDict("_TypedDict_stat_digitalbeam", {"pointing_direction_str_r": list[str], "tracking_enabled_r": bool, "subband_select_rw": list[int]}) -_TypedDict_stat_sdpfirmware = TypedDict("_TypedDict_stat_sdpfirmware", {"fpga_firmware_version_r": list[str], "fpga_hardware_version_r": list[str], "nr_signal_inputs_r": int, "first_signal_input_index_r": int}) -_TypedDict_stat_observationcontrol = TypedDict("_TypedDict_stat_observationcontrol", {"running_observations_r": list[int], "active_antenna_fields_r": list[str]}) -CapturedMetadataDict = TypedDict("CapturedMetadataDict", {"stat/afh/hba": NotRequired[_TypedDict_stat_antennafield], "stat/afh/hba0": NotRequired[_TypedDict_stat_antennafield], "stat/afh/hba1": NotRequired[_TypedDict_stat_antennafield], "stat/afl/lba": _TypedDict_stat_antennafield, "stat/sdp/hba": NotRequired[_TypedDict_stat_sdp], "stat/sdp/hba0": NotRequired[_TypedDict_stat_sdp], "stat/sdp/hba1": NotRequired[_TypedDict_stat_sdp], "stat/sdp/lba": _TypedDict_stat_sdp, "stat/tilebeam/hba": NotRequired[_TypedDict_stat_tilebeam], "stat/tilebeam/hba0": NotRequired[_TypedDict_stat_tilebeam], "stat/tilebeam/hba1": NotRequired[_TypedDict_stat_tilebeam], "stat/digitalbeam/hba": NotRequired[_TypedDict_stat_digitalbeam], "stat/digitalbeam/hba0": NotRequired[_TypedDict_stat_digitalbeam], "stat/digitalbeam/hba1": NotRequired[_TypedDict_stat_digitalbeam], "stat/digitalbeam/lba": _TypedDict_stat_digitalbeam, "stat/sdpfirmware/hba": NotRequired[_TypedDict_stat_sdpfirmware], "stat/sdpfirmware/hba0": NotRequired[_TypedDict_stat_sdpfirmware], "stat/sdpfirmware/hba1": NotRequired[_TypedDict_stat_sdpfirmware], "stat/sdpfirmware/lba": _TypedDict_stat_sdpfirmware, "stat/observationcontrol/1": _TypedDict_stat_observationcontrol, "timestamp": str, "reference": str}) +_TypedDict_stat_tilebeam = TypedDict( + "_TypedDict_stat_tilebeam", + {"pointing_direction_str_r": list[str], "tracking_enabled_r": bool}, +) +_TypedDict_stat_digitalbeam = TypedDict( + "_TypedDict_stat_digitalbeam", + { + "pointing_direction_str_r": list[str], + "tracking_enabled_r": bool, + "subband_select_rw": list[int], + }, +) +_TypedDict_stat_sdpfirmware = TypedDict( + "_TypedDict_stat_sdpfirmware", + { + "fpga_firmware_version_r": list[str], + "fpga_hardware_version_r": list[str], + "nr_signal_inputs_r": int, + "first_signal_input_index_r": int, + }, +) +_TypedDict_stat_observationcontrol = TypedDict( + "_TypedDict_stat_observationcontrol", + {"running_observations_r": list[int], "active_antenna_fields_r": list[str]}, +) +CapturedMetadataDict = TypedDict( + "CapturedMetadataDict", + { + "stat/afh/hba": NotRequired[_TypedDict_stat_antennafield], + "stat/afh/hba0": NotRequired[_TypedDict_stat_antennafield], + "stat/afh/hba1": NotRequired[_TypedDict_stat_antennafield], + "stat/afl/lba": _TypedDict_stat_antennafield, + "stat/sdp/hba": NotRequired[_TypedDict_stat_sdp], + "stat/sdp/hba0": NotRequired[_TypedDict_stat_sdp], + "stat/sdp/hba1": NotRequired[_TypedDict_stat_sdp], + "stat/sdp/lba": _TypedDict_stat_sdp, + "stat/tilebeam/hba": NotRequired[_TypedDict_stat_tilebeam], + "stat/tilebeam/hba0": NotRequired[_TypedDict_stat_tilebeam], + "stat/tilebeam/hba1": NotRequired[_TypedDict_stat_tilebeam], + "stat/digitalbeam/hba": NotRequired[_TypedDict_stat_digitalbeam], + "stat/digitalbeam/hba0": NotRequired[_TypedDict_stat_digitalbeam], + "stat/digitalbeam/hba1": NotRequired[_TypedDict_stat_digitalbeam], + "stat/digitalbeam/lba": _TypedDict_stat_digitalbeam, + "stat/sdpfirmware/hba": NotRequired[_TypedDict_stat_sdpfirmware], + "stat/sdpfirmware/hba0": NotRequired[_TypedDict_stat_sdpfirmware], + "stat/sdpfirmware/hba1": NotRequired[_TypedDict_stat_sdpfirmware], + "stat/sdpfirmware/lba": _TypedDict_stat_sdpfirmware, + "stat/observationcontrol/1": _TypedDict_stat_observationcontrol, + "timestamp": str, + "reference": str, + }, +) T_Captured = TypeVar("T_Captured", CapturedMetadataDict, CapturedDatasetDict) @@ -79,32 +143,27 @@ type AntennafieldAttributes = Literal[ "hbat_pwr_on_r", ] -type SDPAttributes = Literal[ - "subband_frequency_r" -] +type SDPAttributes = Literal["subband_frequency_r"] -type TilebeamAttributes = Literal[ - "pointing_direction_str_r", - "tracking_enabled_r" -] +type TilebeamAttributes = Literal["pointing_direction_str_r", "tracking_enabled_r"] -type DigitalbeamAttributes = Literal[ - "pointing_direction_str_r", - "tracking_enabled_r", - "subband_select_rw" -] +type DigitalbeamAttributes = Literal["pointing_direction_str_r", "tracking_enabled_r", "subband_select_rw"] type SDPFirmwareAttributes = Literal[ "fpga_firmware_version_r", "fpga_hardware_version_r", "nr_signal_inputs_r", - "first_signal_input_index_r" + "first_signal_input_index_r", ] -type ObservationControlAttributes = Literal[ - "running_observations_r", - "active_antenna_fields_r" -] +type ObservationControlAttributes = Literal["running_observations_r", "active_antenna_fields_r"] -MetadataDeviceAttributes: TypeAlias = AntennafieldAttributes | SDPAttributes | TilebeamAttributes | DigitalbeamAttributes | SDPFirmwareAttributes | ObservationControlAttributes +MetadataDeviceAttributes: TypeAlias = ( + AntennafieldAttributes + | SDPAttributes + | TilebeamAttributes + | DigitalbeamAttributes + | SDPFirmwareAttributes + | ObservationControlAttributes +) diff --git a/l2json/minio_pull.py b/l2json/minio_pull.py index cefb4f1b14ca4d4976bbc670dae8b6c6dbaa71ac..74609ebb4d2e695dccef4876854171ed0ec2ac55 100644 --- a/l2json/minio_pull.py +++ b/l2json/minio_pull.py @@ -6,12 +6,13 @@ import l2json.sources.generators as l2generators import l2json.sinks.hdf5 as hdf5_writer import l2json.sources.args as l2args + # export MINIO_HOSTNAME=filefish.lofar.net:9000; export MINIO_BUCKET=central-statistics; export MINIO_ACCESS_KEY=***REMOVED***; export MINIO_SECRET_KEY=***REMOVED*** def main(): parser = argparse.ArgumentParser() - parser = l2args.add_arguments_to_parser(parser, prefix = "") + parser = l2args.add_arguments_to_parser(parser, prefix="") args = parser.parse_args() - args = l2args.parse_input_args(args, check_env_args = True) + args = l2args.parse_input_args(args, check_env_args=True) ststart = datetime.strftime(args.tstart, "%FT%T") stend = datetime.strftime(args.tend, "%FT%T") @@ -19,20 +20,31 @@ def main(): for var, key in zip(["{station}", "{field}", "{datatype}"], [args.stations, args.fields, args.types]): if len(key) > 1: if var not in args.output_pattern: - print( - f"WARNING: Did you forget to add {var} to the output file name pattern? {args.output_pattern=}" - ) + print(f"WARNING: Did you forget to add {var} to the output file name pattern? {args.output_pattern=}") if args.paths: - print( - f"Fetching data from disk from {ststart} to {stend}" - ) + print(f"Fetching data from disk from {ststart} to {stend}") else: print( - f"Fetching data from minio from {ststart} to {stend} for station(s) {", ".join(args.stations)}, field(s) {", ".join(args.fields)} for data type(s) {", ".join(args.types)}." + f"Fetching data from minio from {ststart} to {stend} for station(s) {', '.join(args.stations)}, field(s) {', '.join(args.fields)} for data type(s) {', '.join(args.types)}." ) - for (station, field, datatype, obs_id), l2obs in l2generators.yield_l2obs_from_params(args.stations, args.fields, args.types, args.paths, args.tstart, args.tend, args.security, args.observation_ids, args.split_by_obs_id): + for ( + station, + field, + datatype, + obs_id, + ), l2obs in l2generators.yield_l2obs_from_params( + args.stations, + args.fields, + args.types, + args.paths, + args.tstart, + args.tend, + args.security, + args.observation_ids, + args.split_by_obs_id, + ): if not obs_id: for dset in l2obs.datasets: if dset.observation_id: @@ -40,8 +52,13 @@ def main(): observation_id = f"L{obs_id:06d}" output_file_name = args.output_pattern.format( - start=ststart, end=stend, station=station, field=field, datatype=datatype, - observation_id=observation_id, antenna_type=l2obs.antenna_type, + start=ststart, + end=stend, + station=station, + field=field, + datatype=datatype, + observation_id=observation_id, + antenna_type=l2obs.antenna_type, ) if not output_file_name.endswith(".hdf5") and not output_file_name.endswith(".h5"): diff --git a/l2json/minio_sync.py b/l2json/minio_sync.py index 09c9088583de1ae584973280df89d28ada4f0a1d..728bf8a1a7aa4ee5b3c97fc595db2c8bb34eefe9 100644 --- a/l2json/minio_sync.py +++ b/l2json/minio_sync.py @@ -8,75 +8,95 @@ from urllib3 import BaseHTTPResponse import os import tqdm -def manually_transfer_minio_buckets(src_client: Minio, dst_client: Minio, src_bucket: str, dst_bucket: str, maximum_age: float = float("inf")): - src_objects = client_src.list_objects(src_bucket, recursive=True) - now = datetime.datetime.now().astimezone(datetime.UTC) - - pbar = tqdm.tqdm(src_objects, desc = "Paring objects") - last_dir = '' - for obj in pbar: - obj_dir = '/'.join(obj.object_name.split('/')[:-1]) - if obj_dir != last_dir: - pbar.set_description(obj_dir) - last_dir = obj_dir - - timestamp = dateutil.parser.parse(os.path.basename(obj.object_name).rstrip(".json")).astimezone(datetime.UTC) - if (now - timestamp).total_seconds() < maximum_age: - response_data: BaseHTTPResponse | None = None - # HACK: If the file exists, skip it. - try: - dst_client.stat_object(dst_bucket, obj.object_name) - except S3Error: - try: - response_data = src_client.get_object(src_bucket, obj.object_name) - raw_data = response_data.data.decode("utf-8") - data = BytesIO(raw_data.encode("utf-8")) - data_len = len(raw_data) - content_type = response_data.headers.get("Content-Type", "") - if content_type: - dst_client.put_object(dst_bucket, obj.object_name, data, length=data_len, content_type=content_type) - else: - dst_client.put_object(dst_bucket, obj.object_name, data, length=data_len) - finally: - if response_data: - response_data.close() - response_data.release_conn() - pbar.close() + +def manually_transfer_minio_buckets( + src_client: Minio, + dst_client: Minio, + src_bucket: str, + dst_bucket: str, + maximum_age: float = float("inf"), +): + src_objects = src_client.list_objects(src_bucket, recursive=True) + now = datetime.datetime.now().astimezone(datetime.UTC) + + pbar = tqdm.tqdm(src_objects, desc="Paring objects") + last_dir = "" + for obj in pbar: + obj_dir = "/".join(obj.object_name.split("/")[:-1]) + if obj_dir != last_dir: + pbar.set_description(obj_dir) + last_dir = obj_dir + + timestamp = dateutil.parser.parse(os.path.basename(obj.object_name).rstrip(".json")).astimezone(datetime.UTC) + if (now - timestamp).total_seconds() < maximum_age: + response_data: BaseHTTPResponse | None = None + # HACK: If the file exists, skip it. + try: + dst_client.stat_object(dst_bucket, obj.object_name) + except S3Error: + try: + response_data = src_client.get_object(src_bucket, obj.object_name) + raw_data = response_data.data.decode("utf-8") + data = BytesIO(raw_data.encode("utf-8")) + data_len = len(raw_data) + content_type = response_data.headers.get("Content-Type", "") + if content_type: + dst_client.put_object( + dst_bucket, + obj.object_name, + data, + length=data_len, + content_type=content_type, + ) + else: + dst_client.put_object(dst_bucket, obj.object_name, data, length=data_len) + finally: + if response_data: + response_data.close() + response_data.release_conn() + pbar.close() + def main(): - parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser() + + parser.add_argument("--src", type=str, required=True) + parser.add_argument("--dst", type=str, default="filefish.lofar.net:9000") + parser.add_argument("--src_bucket", type=str, default="statistics") + parser.add_argument("--dst_bucket", type=str, default="central-statistics") - parser.add_argument("--src", type=str, required=True) - parser.add_argument("--dst", type=str, default="filefish.lofar.net:9000") - parser.add_argument("--src_bucket", type=str, default="statistics") - parser.add_argument("--dst_bucket", type=str, default="central-statistics") + parser.add_argument("--src_user", required=True) + parser.add_argument("--dst_user", required=True) + parser.add_argument("--src_key", required=True) + parser.add_argument("--dst_key", required=True) - parser.add_argument("--src_user", required=True) - parser.add_argument("--dst_user", required=True) - parser.add_argument("--src_key", required=True) - parser.add_argument("--dst_key", required=True) + parser.add_argument( + "--disable_src_security", + dest="src_security", + default=True, + action="store_false", + ) - parser.add_argument("--disable_src_security", dest="src_security", default=True, action="store_false") + parser.add_argument("--maximum_age", type=float, default=600) - parser.add_argument("--maximum_age", type=float, default=600) + args = parser.parse_args() - args = parser.parse_args() + client_src = Minio( + args.src, + access_key=args.src_user, + secret_key=args.src_key, + secure=args.src_security, + ) - client_src = Minio( - args.src, - access_key=args.src_user, - secret_key=args.src_key, - secure=args.src_security, - ) + client_dst = Minio( + args.dst, + access_key=args.dst_user, + secret_key=args.dst_key, + secure=True, + ) - client_dst = Minio( - args.dst, - access_key=args.dst_user, - secret_key=args.dst_key, - secure=True, - ) + manually_transfer_minio_buckets(client_src, client_dst, args.src_bucket, args.dst_bucket, args.maximum_age) - manually_transfer_minio_buckets(client_src, client_dst, args.src_bucket, args.dst_bucket, args.maximum_age) -if __name__ == '__main__': - main() \ No newline at end of file +if __name__ == "__main__": + main() diff --git a/l2json/parsers/__init__.py b/l2json/parsers/__init__.py index e73ac131e272dce14f05cf642e06fb264604e8c7..9bea3ad22e3870d3419097bcc17bc13f61871999 100644 --- a/l2json/parsers/__init__.py +++ b/l2json/parsers/__init__.py @@ -1,4 +1,4 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -""" l2json """ +"""l2json""" diff --git a/l2json/parsers/filters.py b/l2json/parsers/filters.py index 532ee2ee499bb2286d5193c7ed346a20d13d47a3..0bfc07885b968202d9b2edb0f37b79579827123c 100644 --- a/l2json/parsers/filters.py +++ b/l2json/parsers/filters.py @@ -1,19 +1,24 @@ from collections import defaultdict from l2json.data_types.typed_json_dicts import CapturedDatasetDict -def filter_by_obs_id(input_data: list[CapturedDatasetDict], target_obs_ids: None | set[int] = None, split_obs_ids: bool = True) -> dict[int, list[CapturedDatasetDict]]: - if target_obs_ids is None: - target_obs_ids = set() - if not len(target_obs_ids) and not split_obs_ids: - return {0: input_data} - filtered_data: dict[int, list[CapturedDatasetDict]] = defaultdict(lambda: list()) - for dataset in input_data: - obs_id = dataset["observation_id"] - target_obs_id: bool = obs_id in target_obs_ids - if split_obs_ids and (target_obs_id or not len(target_obs_ids)): - filtered_data[obs_id].append(dataset) - elif target_obs_id: - filtered_data[0].append(dataset) +def filter_by_obs_id( + input_data: list[CapturedDatasetDict], + target_obs_ids: None | set[int] = None, + split_obs_ids: bool = True, +) -> dict[int, list[CapturedDatasetDict]]: + if target_obs_ids is None: + target_obs_ids = set() + if not len(target_obs_ids) and not split_obs_ids: + return {0: input_data} - return filtered_data + filtered_data: dict[int, list[CapturedDatasetDict]] = defaultdict(lambda: list()) + for dataset in input_data: + obs_id = dataset["observation_id"] + target_obs_id: bool = obs_id in target_obs_ids + if split_obs_ids and (target_obs_id or not len(target_obs_ids)): + filtered_data[obs_id].append(dataset) + elif target_obs_id: + filtered_data[0].append(dataset) + + return filtered_data diff --git a/l2json/parsers/mappings.py b/l2json/parsers/mappings.py index 652c88fcc392172e714353195ab7bf7ecfcdf6c5..46fa8fed67f24bc9355e234dfec8059112698d61 100644 --- a/l2json/parsers/mappings.py +++ b/l2json/parsers/mappings.py @@ -1,4 +1,8 @@ -from l2json.data_types.other_data_types import DeviceName, GenericDeviceName, MappableAttributeName +from l2json.data_types.other_data_types import ( + DeviceName, + GenericDeviceName, + MappableAttributeName, +) from l2json.data_types.typed_json_dicts import MetadataDeviceAttributes devices_by_antennafield: dict[str, dict[GenericDeviceName, DeviceName]] = { @@ -35,7 +39,10 @@ devices_by_antennafield: dict[str, dict[GenericDeviceName, DeviceName]] = { }, } -stationinformation_header_mapping: dict[MappableAttributeName, tuple[GenericDeviceName, MetadataDeviceAttributes, list[type]]] = { +stationinformation_header_mapping: dict[ + MappableAttributeName, + tuple[GenericDeviceName, MetadataDeviceAttributes, list[type]], +] = { "antenna_to_sdp_mapping": ( "antennafield", "antenna_to_sdp_mapping_r", @@ -111,4 +118,4 @@ dataset_metadata_attribute_mapping: dict[str, tuple[GenericDeviceName, MetadataD "digital_beam_tracking_enabled": ("digitalbeam", "tracking_enabled_r", [bool]), "subbands": ("digitalbeam", "subband_select_rw", [list, int]), "subband_frequencies": ("sdp", "subband_frequency_r", [list, list, float]), -} \ No newline at end of file +} diff --git a/l2json/parsers/misc.py b/l2json/parsers/misc.py index ea9d4dd890b83c9de7be448f6ed466cd36484800..579069e79f32c532947aaea353bbbc86c36fa432 100644 --- a/l2json/parsers/misc.py +++ b/l2json/parsers/misc.py @@ -1,17 +1,22 @@ import datetime -def sanitise_time_limits(tstart: None | datetime.datetime, tend: None | datetime.datetime, always_write: bool = False) -> tuple[datetime.datetime, datetime.datetime]: - if tstart is None and tend is None: - print("WARNING: No time limits provided, both start and tend are None.") - if tstart is None: - tstart = datetime.datetime.now() - if tend is None: - tend = datetime.datetime.now() - if (tend < tstart) and not always_write: - print(f"WARNING: No data can be extracted (invalid timestamps provided {tstart=} > {tend=}, {always_write=}.") +def sanitise_time_limits( + tstart: None | datetime.datetime, + tend: None | datetime.datetime, + always_write: bool = False, +) -> tuple[datetime.datetime, datetime.datetime]: + if tstart is None and tend is None: + print("WARNING: No time limits provided, both start and tend are None.") + if tstart is None: + tstart = datetime.datetime.now() + if tend is None: + tend = datetime.datetime.now() + if (tend < tstart) and not always_write: + print(f"WARNING: No data can be extracted (invalid timestamps provided {tstart=} > {tend=}, {always_write=}.") - return tstart, tend + return tstart, tend -def isot_to_datetime(timestamp: str, tz = datetime.UTC) -> datetime.datetime: - return datetime.datetime.fromisoformat(timestamp).replace(tzinfo=tz) \ No newline at end of file + +def isot_to_datetime(timestamp: str, tz=datetime.UTC) -> datetime.datetime: + return datetime.datetime.fromisoformat(timestamp).replace(tzinfo=tz) diff --git a/l2json/parsers/parse_file.py b/l2json/parsers/parse_file.py index 09ed672f411f9d0706dfacd6a0b366020f8d6c67..b2dbd97bc7dcce3646fd86656cc61df9fb16f2af 100644 --- a/l2json/parsers/parse_file.py +++ b/l2json/parsers/parse_file.py @@ -5,23 +5,29 @@ from l2json.data_types.typed_json_dicts import CapturedDatasetDict, CapturedMeta from l2json.parsers.misc import sanitise_time_limits, isot_to_datetime -def load_from_string(lines: list[str], tstart: None | datetime = None, tend: None | datetime = None, always_write: bool = False, reference: str = "") -> list[CapturedMetadataDict | CapturedDatasetDict]: - data: list[CapturedMetadataDict | CapturedDatasetDict] = [] +def load_from_string( + lines: list[str], + tstart: None | datetime = None, + tend: None | datetime = None, + always_write: bool = False, + reference: str = "", +) -> list[CapturedMetadataDict | CapturedDatasetDict]: + data: list[CapturedMetadataDict | CapturedDatasetDict] = [] - tstart, tend = sanitise_time_limits(tstart, tend) + tstart, tend = sanitise_time_limits(tstart, tend) - for line in lines: - if not len(line): - continue - d: CapturedMetadataDict | CapturedDatasetDict = json.loads(line) - if 'type' in d: # type: ignore[typeddict-unknown-key,typeddict-item] - d['type'] = d['type'].upper() # type: ignore[arg-type,typeddict-item] - if "ts" in d.keys(): # type: ignore[typeddict-unknown-key,typeddict-item] - d["timestamp"] = d.pop("ts") # type: ignore[typeddict-unknown-key,typeddict-item] - t = isot_to_datetime(d["timestamp"]) + for line in lines: + if not len(line): + continue + d: CapturedMetadataDict | CapturedDatasetDict = json.loads(line) + if "type" in d: # type: ignore[typeddict-unknown-key,typeddict-item] + d["type"] = d["type"].upper() # type: ignore[arg-type,typeddict-item] + if "ts" in d.keys(): # type: ignore[typeddict-unknown-key,typeddict-item] + d["timestamp"] = d.pop("ts") # type: ignore[typeddict-unknown-key,typeddict-item] + t = isot_to_datetime(d["timestamp"]) - d['reference'] = reference + d["reference"] = reference - if always_write or ((t >= tstart) and (t <= tend)): - data.append(d) - return data \ No newline at end of file + if always_write or ((t >= tstart) and (t <= tend)): + data.append(d) + return data diff --git a/l2json/parsers/parse_json.py b/l2json/parsers/parse_json.py index 64bdeac1ba121b2c8111544d37e96b3cff2ea500..114b967c1e81b71d130ce1f504ceaf7a7dd24a02 100644 --- a/l2json/parsers/parse_json.py +++ b/l2json/parsers/parse_json.py @@ -10,18 +10,30 @@ from l2json.data_types.dataset import ( get_dset_type, XSTDataset, BSX_Dataset, - DatasetSharedMetadata + DatasetSharedMetadata, +) +from l2json.data_types.other_data_types import DataTypeStr, MappableAttributeName +from l2json.data_types.typed_json_dicts import ( + CapturedDatasetDict, + ParsedDatasetDict, + CapturedMetadataDict, + T_Captured, +) +from l2json.parsers.mappings import ( + GenericDeviceName, + MetadataDeviceAttributes, + devices_by_antennafield, + stationinformation_header_mapping, + dataset_attribute_mapping, + dataset_alt_attribute_mapping, + dataset_metadata_attribute_mapping, ) -from l2json.data_types.other_data_types import DataTypeStr -from l2json.data_types.typed_json_dicts import CapturedDatasetDict, ParsedDatasetDict, CapturedMetadataDict, T_Captured -from l2json.parsers.mappings import * from l2json.parsers.misc import isot_to_datetime T = TypeVar("T") def validate_attribute_type(value: Any, attribute_type: list[type]) -> bool: - # print(value, attribute_type) if len(attribute_type) == 1: return isinstance(value, attribute_type[0]) @@ -31,13 +43,13 @@ def validate_attribute_type(value: Any, attribute_type: list[type]) -> bool: return False return all(validate_attribute_type(val, attribute_type[1:]) for val in value) -def get_valid_reversed_metadata( - filtered_input: list[T_Captured], timestamp: float -) -> Iterable[T_Captured]: + +def get_valid_reversed_metadata(filtered_input: list[T_Captured], timestamp: float) -> Iterable[T_Captured]: if timestamp <= 0.0: return filtered_input + def timestamp_key_func(entry: T_Captured) -> float: - ts: datetime.datetime | str = entry['timestamp'] + ts: datetime.datetime | str = entry["timestamp"] if isinstance(ts, datetime.datetime): return ts.timestamp() @@ -55,7 +67,10 @@ def get_value_from_metadata( metadata: CapturedMetadataDict, antennafield: str, output_key: MappableAttributeName, - key_device_mapping: dict[MappableAttributeName, tuple[GenericDeviceName, MetadataDeviceAttributes, list[type]]], + key_device_mapping: dict[ + MappableAttributeName, + tuple[GenericDeviceName, MetadataDeviceAttributes, list[type]], + ], ) -> Any | None: device, attribute, types = key_device_mapping[output_key] device_name = devices_by_antennafield[antennafield][device] @@ -78,13 +93,14 @@ def get_last_valid_device_value( output_key: MappableAttributeName, antennafield: str, filtered_metadata: list[CapturedMetadataDict], - device_mapping: dict[MappableAttributeName, tuple[GenericDeviceName, MetadataDeviceAttributes, list[type]]], - timestamp: float = 0.0 + device_mapping: dict[ + MappableAttributeName, + tuple[GenericDeviceName, MetadataDeviceAttributes, list[type]], + ], + timestamp: float = 0.0, ) -> Any: for metadata in get_valid_reversed_metadata(filtered_metadata, timestamp): - value = get_value_from_metadata( - metadata, antennafield, output_key, device_mapping - ) + value = get_value_from_metadata(metadata, antennafield, output_key, device_mapping) if value is not None: return value raise KeyError(f"No value found for {output_key=} in provided metadata dicts.") @@ -101,7 +117,10 @@ def get_last_valid_device_value( def get_static_metadata( - filtered_metadata: list[CapturedMetadataDict], antennafield: str, keys: set[MappableAttributeName], timestamp: float = 0.0 + filtered_metadata: list[CapturedMetadataDict], + antennafield: str, + keys: set[MappableAttributeName], + timestamp: float = 0.0, ) -> dict[MappableAttributeName, str]: values: dict[MappableAttributeName, str] = {} for key in keys: @@ -109,15 +128,16 @@ def get_static_metadata( raise KeyError(f"We do not have an existing mapping for {key}.") values[key] = get_last_valid_device_value( - key, antennafield, filtered_metadata, stationinformation_header_mapping, timestamp + key, + antennafield, + filtered_metadata, + stationinformation_header_mapping, + timestamp, ) return values -def get_data_type( - filtered_data: list[CapturedDatasetDict], data_type_key: str = "type" -) -> DataTypeStr: - +def get_data_type(filtered_data: list[CapturedDatasetDict], data_type_key: str = "type") -> DataTypeStr: base_type: DataTypeStr = filtered_data[0][data_type_key] # type: ignore[literal-required] for idx, entry in enumerate(filtered_data[1:]): if data_type_key not in entry: @@ -129,6 +149,7 @@ def get_data_type( ) return base_type + def parse_data_payload(data_entry: CapturedDatasetDict) -> np.ndarray: dtype = get_data_type([data_entry]) @@ -159,7 +180,7 @@ def parse_data_dicts( for output_key, (input_key, expected_type) in dataset_attribute_mapping.items(): if not input_key: continue - extracted_data[output_key] = capture[input_key] # type: ignore[literal-required] + extracted_data[output_key] = capture[input_key] # type: ignore[literal-required] if not isinstance(extracted_data[output_key], expected_type): raise TypeError( f"Expected {output_key} of entry {idx} to be of type {expected_type} (got {type(extracted_data[output_key])=})." @@ -172,7 +193,7 @@ def parse_data_dicts( if capture["type"].upper() == "XST": extracted_data["subband"] = capture["subband"] - extracted_dsets.append(ParsedDatasetDict(**extracted_data)) # type: ignore + extracted_dsets.append(ParsedDatasetDict(**extracted_data)) # type: ignore return extracted_dsets @@ -214,11 +235,12 @@ def parse_data_dicts( # return kept_datasets -def subband_to_frequency( - active_filter: str, subband: int | np.ndarray -) -> float | list[float]: +def subband_to_frequency(active_filter: str, subband: int | np.ndarray) -> float | list[float]: field, lower, upper = active_filter.split("_") - filter_split: tuple[int, int] = (int(lower), int(upper)) # Ugly? yes. But mypy did not approve of map-ing the data + filter_split: tuple[int, int] = ( + int(lower), + int(upper), + ) # Ugly? yes. But mypy did not approve of map-ing the data subband_width: float = 100.0 / 512.0 subband_offset: float = 0.0 @@ -251,33 +273,31 @@ def create_base_dset( if sb_arr and freq_arr: if issubclass(base_type, XSTDataset): - raise RuntimeError( - f"Expected SST or BST data based on sb/freq attribute type, but got {base_type} instead." - ) + raise RuntimeError(f"Expected SST or BST data based on sb/freq attribute type, but got {base_type} instead.") if not isinstance(subband, list) or not isinstance(frequency, list): raise ValueError(f"{subband=} or {frequency=} is not a list for type={base_type}") return base_type(data=data, subbands=subband, frequencies=frequency) elif sb_arr or freq_arr: - raise RuntimeError( - f"Type mismatch between {type(subband)} and {type(frequency)}." - ) + raise RuntimeError(f"Type mismatch between {type(subband)} and {type(frequency)}.") if not isinstance(subband, int) or not isinstance(frequency, float): - raise ValueError(f"{subband=} ({type(subband)}) or {frequency=} {type(frequency)} is not a int/float for type={base_type}") + raise ValueError( + f"{subband=} ({type(subband)}) or {frequency=} {type(frequency)} is not a int/float for type={base_type}" + ) if not issubclass(base_type, XSTDataset): - raise RuntimeError( - f"Expected XST data based on sb/freq attribute type, but got {base_type} instead." - ) + raise RuntimeError(f"Expected XST data based on sb/freq attribute type, but got {base_type} instead.") # I cannot use the base_type variable here no matter what I try, I always get syntax errors. - return XSTDataset(data = data, subband = subband, frequency = frequency) + return XSTDataset(data=data, subband=subband, frequency=frequency) def get_base_metadata( - filtered_metadata: list[CapturedMetadataDict], antennafield: str, keys: Iterable[MappableAttributeName], timestamp: float = 0.0 + filtered_metadata: list[CapturedMetadataDict], + antennafield: str, + keys: Iterable[MappableAttributeName], + timestamp: float = 0.0, ) -> dict[str, Any]: - attributes: dict[str, Any] = {} allowed_to_fail: dict[str, list[str | int]] = { "active_antenna_fields": [], @@ -285,9 +305,7 @@ def get_base_metadata( } for key in keys: if key not in dataset_metadata_attribute_mapping: - raise KeyError( - f"Requested dataset attribute {key=} does not have a mapping." - ) + raise KeyError(f"Requested dataset attribute {key=} does not have a mapping.") device, device_attr, attr_type = dataset_metadata_attribute_mapping[key] device_name = devices_by_antennafield[antennafield][device] for meta in get_valid_reversed_metadata(filtered_metadata, timestamp): @@ -312,15 +330,11 @@ def get_base_metadata( return attributes -def get_base_dataset_metadata( - filtered_datasets: list[CapturedDatasetDict], keys: Iterable[str] -) -> dict[str, Any]: +def get_base_dataset_metadata(filtered_datasets: list[CapturedDatasetDict], keys: Iterable[str]) -> dict[str, Any]: attributes: dict[str, Any] = {} for key in keys: if key not in dataset_alt_attribute_mapping: - raise KeyError( - f"Requested dataset attribute {key=} does not have a mapping." - ) + raise KeyError(f"Requested dataset attribute {key=} does not have a mapping.") dset_attr, attr_type = dataset_alt_attribute_mapping[key] for dset in filtered_datasets: if dset_attr not in dset: @@ -328,9 +342,7 @@ def get_base_dataset_metadata( attr = dset[dset_attr] # type: ignore[literal-required] if not validate_attribute_type(attr, [attr_type]): - raise TypeError( - f"Attribute {key=} ({dset_attr=}) has invalid type {attr_type=} != {attr}." - ) + raise TypeError(f"Attribute {key=} ({dset_attr=}) has invalid type {attr_type=} != {attr}.") attributes[key] = attr if key not in attributes: @@ -339,12 +351,10 @@ def get_base_dataset_metadata( def build_lofar2_observation( - filtered_metadata: list[CapturedMetadataDict], filtered_data: list[CapturedDatasetDict] + filtered_metadata: list[CapturedMetadataDict], + filtered_data: list[CapturedDatasetDict], ) -> LOFAR2Observation: - - base_observation_metadata: dict[str, str] = get_base_dataset_metadata( - filtered_data, keys={"station_name", "antenna_field"} - ) + base_observation_metadata: dict[str, str] = get_base_dataset_metadata(filtered_data, keys={"station_name", "antenna_field"}) station_information: StationInformation = StationInformation( **get_static_metadata( filtered_metadata, @@ -364,9 +374,7 @@ def build_lofar2_observation( ) ) data_type: DataTypeStr = get_data_type(filtered_data) - field_type, base_dset_type = get_dset_type( - base_observation_metadata["antenna_field"], data_type - ) + field_type, base_dset_type = get_dset_type(base_observation_metadata["antenna_field"], data_type) data_meta_keys: set[MappableAttributeName] = { "frequency_band", @@ -395,7 +403,7 @@ def build_lofar2_observation( filtered_metadata, base_observation_metadata["antenna_field"], data_meta_keys, - timestamp = isot_to_datetime(entry["timestamp"]).timestamp(), + timestamp=isot_to_datetime(entry["timestamp"]).timestamp(), ) if data_type == "XST": frequency = subband_to_frequency( @@ -419,9 +427,9 @@ def build_lofar2_observation( frequency=frequencies, ) del entry["data"] # type: ignore[misc] - del metadata["subbands"] # type: ignore[misc] + del metadata["subbands"] # type: ignore[misc] - complete_dataset = field_type(dataset=dataset, **entry, **metadata) # type: ignore[misc] + complete_dataset = field_type(dataset=dataset, **entry, **metadata) # type: ignore[misc] parsed_datasets.append(complete_dataset) observation = LOFAR2Observation( diff --git a/l2json/sinks/__init__.py b/l2json/sinks/__init__.py index e73ac131e272dce14f05cf642e06fb264604e8c7..9bea3ad22e3870d3419097bcc17bc13f61871999 100644 --- a/l2json/sinks/__init__.py +++ b/l2json/sinks/__init__.py @@ -1,4 +1,4 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -""" l2json """ +"""l2json""" diff --git a/l2json/sinks/hdf5.py b/l2json/sinks/hdf5.py index 6fb5b2268ff7676a5b68679170a8e46bd00ee0a0..2f2b5de298b2d936e995d049ee3b36de2da2f376 100644 --- a/l2json/sinks/hdf5.py +++ b/l2json/sinks/hdf5.py @@ -1,4 +1,4 @@ -import h5py #type: ignore[import-untyped] +import h5py # type: ignore[import-untyped] import os import shutil from l2json.data_types.dataset import LOFAR2Observation @@ -7,16 +7,10 @@ from l2json.data_types.dataset import LOFAR2Observation def write_obs(obs: LOFAR2Observation, output_file: str) -> str: if os.path.exists(output_file): count_files = len( - [ - file - for file in os.listdir(os.path.dirname(output_file) or os.getcwd()) - if file in os.path.basename(output_file) - ] + [file for file in os.listdir(os.path.dirname(output_file) or os.getcwd()) if file in os.path.basename(output_file)] ) if count_files and os.path.exists(output_file): - print( - f"Output file already exists at {output_file}, moving the old file to {output_file}_{count_files}" - ) + print(f"Output file already exists at {output_file}, moving the old file to {output_file}_{count_files}") shutil.move(output_file, f"{output_file}_{count_files}") obs_dict = obs.prepare_output_dict() diff --git a/l2json/sources/__init__.py b/l2json/sources/__init__.py index e73ac131e272dce14f05cf642e06fb264604e8c7..9bea3ad22e3870d3419097bcc17bc13f61871999 100644 --- a/l2json/sources/__init__.py +++ b/l2json/sources/__init__.py @@ -1,4 +1,4 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -""" l2json """ +"""l2json""" diff --git a/l2json/sources/args.py b/l2json/sources/args.py index cc2c9ea4b274e3254dca554ed346aaed0f4c74d4..5df53f1742c9eb3673417f05065d2125550f52af 100644 --- a/l2json/sources/args.py +++ b/l2json/sources/args.py @@ -11,77 +11,124 @@ default_env_vars: dict[str, str] = { "MINIO_SECRET_KEY": "UNSET", } -def add_arguments_to_parser(parser: ArgumentParser, prefix = "l2json_", ref_time: None | datetime = None, discard_flags: None | list[str] = None) -> ArgumentParser: - if ref_time is None: - ref_time = datetime.now() - - l2group = parser.add_argument_group("L2Json Parameters", description = "Parameters consumed by l2json to grab data from minio or on disk. Minio data requires that the MINIO_* environment variables are set.") - l2group.add_argument(f"--{prefix}paths", nargs='*', type=str, default=[], - help="Paths to search (if looking for data on disk, do not use to request data from minio)") - - l2group.add_argument(f"--{prefix}disable_security", dest=f"{prefix}security", default=True, action='store_false', - help="Disable SSL checks (currently needed for direct station access)") - l2group.add_argument( - f"--{prefix}tstart", - default=ref_time.isoformat(), - type=str, - help="Start time to get data from", - ) - - l2group.add_argument(f"--{prefix}observation_ids", nargs='*', default=[], type=int, - help="Filter parsed data by observation_id.") - - l2group.add_argument( - f"--{prefix}output_pattern", default="./{start}_{end}_{observation_id}_{station}{field}_{antenna_type}_{datatype}.hdf5", - type=str, help="Output directory location, using any subset of the listed formatting variables" - ) - l2group.add_argument(f"--{prefix}split_by_obs_id", default=False, action='store_true', - help="Split output data by observation ID as opposed to producing a single output. By default, if no obs IDs are provided, this will return all observations in the given window.") - - end_group = l2group.add_mutually_exclusive_group() - default_duration = 60 * 2 - end_group.add_argument( - f"--{prefix}tend", - default=(ref_time + timedelta(seconds=default_duration)).isoformat(), - type=str, - help="End time to get data from", - ) - end_group.add_argument( - f"--{prefix}duration", type=int, help="Duration of data to get in seconds" - ) - - l2group.add_argument(f"--{prefix}stations", nargs='+', default=["CS001", "CS032", "RS307"], type=str, - help="Stations to fetch") - l2group.add_argument(f"--{prefix}fields", nargs='+', default=["LBA", "HBA", "HBA0", "HBA1"], type=str, - help="Antenna fields to fetch") - l2group.add_argument(f"--{prefix}types", nargs='+', default=["BST", "SST", "XST"], type=str, help="Data types to fetch") - - if discard_flags: - discard_flags = [f"--{prefix}{flag}" for flag in discard_flags] - for action in parser._actions + getattr(parser, "_group_actions", []): - for arg_key in action.option_strings: - if arg_key in discard_flags: - parser._handle_conflict_resolve(None, [(arg_key, action)]) # type: ignore[arg-type] - - return parser + +def add_arguments_to_parser( + parser: ArgumentParser, + prefix="l2json_", + ref_time: None | datetime = None, + discard_flags: None | list[str] = None, +) -> ArgumentParser: + if ref_time is None: + ref_time = datetime.now() + + l2group = parser.add_argument_group( + "L2Json Parameters", + description="Parameters consumed by l2json to grab data from minio or on disk. Minio data requires that the MINIO_* environment variables are set.", + ) + l2group.add_argument( + f"--{prefix}paths", + nargs="*", + type=str, + default=[], + help="Paths to search (if looking for data on disk, do not use to request data from minio)", + ) + + l2group.add_argument( + f"--{prefix}disable_security", + dest=f"{prefix}security", + default=True, + action="store_false", + help="Disable SSL checks (currently needed for direct station access)", + ) + l2group.add_argument( + f"--{prefix}tstart", + default=ref_time.isoformat(), + type=str, + help="Start time to get data from", + ) + + l2group.add_argument( + f"--{prefix}observation_ids", + nargs="*", + default=[], + type=int, + help="Filter parsed data by observation_id.", + ) + + l2group.add_argument( + f"--{prefix}output_pattern", + default="./{start}_{end}_{observation_id}_{station}{field}_{antenna_type}_{datatype}.hdf5", + type=str, + help="Output directory location, using any subset of the listed formatting variables", + ) + l2group.add_argument( + f"--{prefix}split_by_obs_id", + default=False, + action="store_true", + help="Split output data by observation ID as opposed to producing a single output. By default, if no obs IDs are provided, this will return all observations in the given window.", + ) + + end_group = l2group.add_mutually_exclusive_group() + default_duration = 60 * 2 + end_group.add_argument( + f"--{prefix}tend", + default=(ref_time + timedelta(seconds=default_duration)).isoformat(), + type=str, + help="End time to get data from", + ) + end_group.add_argument(f"--{prefix}duration", type=int, help="Duration of data to get in seconds") + + l2group.add_argument( + f"--{prefix}stations", + nargs="+", + default=["CS001", "CS032", "RS307"], + type=str, + help="Stations to fetch", + ) + l2group.add_argument( + f"--{prefix}fields", + nargs="+", + default=["LBA", "HBA", "HBA0", "HBA1"], + type=str, + help="Antenna fields to fetch", + ) + l2group.add_argument( + f"--{prefix}types", + nargs="+", + default=["BST", "SST", "XST"], + type=str, + help="Data types to fetch", + ) + + if discard_flags: + discard_flags = [f"--{prefix}{flag}" for flag in discard_flags] + for action in parser._actions + getattr(parser, "_group_actions", []): + for arg_key in action.option_strings: + if arg_key in discard_flags: + parser._handle_conflict_resolve(None, [(arg_key, action)]) # type: ignore[arg-type] + + return parser + def parse_input_args(args: ArgumentParser, prefix: str = "", check_env_args: bool = False) -> ArgumentParser: - setattr(args, f"{prefix}tstart", isot_to_datetime(getattr(args, f"{prefix}tstart"))) - - if getattr(args, f"{prefix}duration"): - setattr(args, f"{prefix}tend", getattr(args, f"{prefix}tstart") + timedelta(seconds=getattr(args, f"{prefix}duration"))) - else: - setattr(args, f"{prefix}tend", isot_to_datetime(getattr(args, f"{prefix}tend"))) - - if check_env_args: - if not getattr(args, f"{prefix}paths"): - for key, val in default_env_vars.items(): - if key not in os.environ: - if val != "UNSET": - os.environ[key] = val - else: - raise RuntimeError( - "MINIO_ACCESS_KEY and MINIO_SECRET_KEY env vars are not set." - ) - - return args \ No newline at end of file + setattr(args, f"{prefix}tstart", isot_to_datetime(getattr(args, f"{prefix}tstart"))) + + if getattr(args, f"{prefix}duration"): + setattr( + args, + f"{prefix}tend", + getattr(args, f"{prefix}tstart") + timedelta(seconds=getattr(args, f"{prefix}duration")), + ) + else: + setattr(args, f"{prefix}tend", isot_to_datetime(getattr(args, f"{prefix}tend"))) + + if check_env_args: + if not getattr(args, f"{prefix}paths"): + for key, val in default_env_vars.items(): + if key not in os.environ: + if val != "UNSET": + os.environ[key] = val + else: + raise RuntimeError("MINIO_ACCESS_KEY and MINIO_SECRET_KEY env vars are not set.") + + return args diff --git a/l2json/sources/disk.py b/l2json/sources/disk.py index 347345e84ec4733da5c7bb8e57558b040081a444..3ede0764dc38767434856dcb504687af48883fee 100644 --- a/l2json/sources/disk.py +++ b/l2json/sources/disk.py @@ -23,32 +23,38 @@ def fetch_files_on_disk( path_pattern: None | re.Pattern = None, always_write: bool = False, early_exit: bool = True, - store_filename: bool = False + store_filename: bool = False, ) -> list[CapturedMetadataDict | CapturedDatasetDict]: - tstart, tend = sanitise_time_limits(tstart, tend) entries: list[CapturedMetadataDict | CapturedDatasetDict] = [] - for root, __, files in sorted(os.walk(base_path), key = lambda x: x[0]): - full_path = os.path.abspath(root).upper().split('/') - metadata_criteria_met = \ - station.upper() in full_path and \ - (field.upper() in full_path if field else True) and \ - (dtype.upper() in full_path if dtype else True) + for root, __, files in sorted(os.walk(base_path), key=lambda x: x[0]): + full_path = os.path.abspath(root).upper().split("/") + metadata_criteria_met = ( + station.upper() in full_path + and (field.upper() in full_path if field else True) + and (dtype.upper() in full_path if dtype else True) + ) if path_pattern is None or path_pattern.search(root) and metadata_criteria_met: for file in sorted(files): if file_pattern.match(file): - tstr = os.path.basename(file).split('.json')[0] + tstr = os.path.basename(file).split(".json")[0] t = datetime.datetime.fromisoformat(tstr) if (t >= tstart - dt) & (t < tend + dt): file_path = os.path.join(root, file) if file.endswith(".gz"): - raw_string = gzip.GzipFile(file_path, mode = 'rb').read().decode().split('\n') + raw_string = gzip.GzipFile(file_path, mode="rb").read().decode().split("\n") else: with open(file_path, "r") as ref: raw_string = ref.readlines() - data = [line.rstrip('\n') for line in raw_string if len(line.rstrip('\n'))] - entries += load_from_string(data, tstart, tend, always_write, reference = file if store_filename else "") + data = [line.rstrip("\n") for line in raw_string if len(line.rstrip("\n"))] + entries += load_from_string( + data, + tstart, + tend, + always_write, + reference=file if store_filename else "", + ) else: if len(entries) and early_exit: return entries @@ -58,33 +64,47 @@ def fetch_files_on_disk( # args.station, args.field, args.type, args.tstart, args.tend # args.paths, args.station, args.field, args.type, args.tstart, args.tend def fetch_from_disk( - base_paths: list[str], - station: str, - field: str, - dtype: str, - tstart: None | datetime.datetime = None, - tend: None | datetime.datetime = None, - write_interval_data: timedelta = timedelta(minutes=5), - write_interval_metadata: timedelta = timedelta(minutes=15), - metadata_match_str: str = r"/metadata/", - fetch_metadata: bool = True, - store_filename: bool = False + base_paths: list[str], + station: str, + field: str, + dtype: str, + tstart: None | datetime.datetime = None, + tend: None | datetime.datetime = None, + write_interval_data: timedelta = timedelta(minutes=5), + write_interval_metadata: timedelta = timedelta(minutes=15), + metadata_match_str: str = r"/metadata/", + fetch_metadata: bool = True, + store_filename: bool = False, ) -> tuple[list[CapturedMetadataDict], list[CapturedDatasetDict]]: - metadata: list[CapturedMetadataDict] = [] data: list[CapturedDatasetDict] = [] - pbar = tqdm.tqdm(base_paths, desc = "Walking paths...") + pbar = tqdm.tqdm(base_paths, desc="Walking paths...") for path in pbar: if fetch_metadata: - metadata += fetch_files_on_disk( # type: ignore[arg-type] - path, station, "", "metadata", tstart, tend, write_interval_metadata, - path_pattern=re.compile(f"(?i){metadata_match_str}"), always_write=True, store_filename = store_filename + metadata += fetch_files_on_disk( # type: ignore[arg-type] + path, + station, + "", + "metadata", + tstart, + tend, + write_interval_metadata, + path_pattern=re.compile(f"(?i){metadata_match_str}"), + always_write=True, + store_filename=store_filename, ) # Corrected regex for data files (exclude metadata paths) - data += fetch_files_on_disk( # type: ignore[arg-type] - path, station, field, dtype, tstart, tend, write_interval_data, - path_pattern=re.compile(f"(?!{metadata_match_str})"), always_write=False, # Corrected line - store_filename = store_filename + data += fetch_files_on_disk( # type: ignore[arg-type] + path, + station, + field, + dtype, + tstart, + tend, + write_interval_data, + path_pattern=re.compile(f"(?!{metadata_match_str})"), + always_write=False, # Corrected line + store_filename=store_filename, ) return metadata, data diff --git a/l2json/sources/generators.py b/l2json/sources/generators.py index d0b70249e236b7d457d8ec6ff075dcd943f49e27..ae0d4a07f3fcd16792821d0b0c2d4f53b4c2777b 100644 --- a/l2json/sources/generators.py +++ b/l2json/sources/generators.py @@ -9,50 +9,103 @@ from l2json.data_types.dataset import LOFAR2Observation from l2json.data_types.typed_json_dicts import CapturedMetadataDict, CapturedDatasetDict -def yield_data_from_params(stations: list[str], fields: list[str], types: list[str], paths: None | list[str] = None, tstart: None | datetime.datetime = None, tend: None | datetime.datetime = None, minio_security: bool = True, silent: bool = False, store_filename: bool = False) -> Generator[tuple[tuple[str, str, str], tuple[list[CapturedMetadataDict], list[CapturedDatasetDict]]], None, None]: - for station in stations: - # Cache metadata between fields, not stations - metadata: list[CapturedMetadataDict] = [] - for field in fields: - for datatype in types: - if paths: - working_metadata, data = l2disk.fetch_from_disk( - paths, station, field, datatype, tstart, tend, fetch_metadata = not bool(metadata), store_filename = store_filename - ) - else: - working_metadata, data = l2minio.fetch_from_minio( - station, field, datatype, tstart, tend, security = minio_security, fetch_metadata = not bool(metadata), store_filename = store_filename - ) - - # fetch_from_minio prints if no data was found - if not data: - continue - - if not metadata and not working_metadata: - raise RuntimeError(f"Unable to get metadata from {station}{field} between {tstart} and {tend}") - if not metadata: - metadata = working_metadata - - if not silent: - print( - f"Loaded {len(metadata)} metadata entries and {len(data)} {datatype} entries from minio for {station}{field}." - ) - - yield (station, field, datatype), (metadata, data) - - -def yield_l2obs_from_params(stations: list[str], fields: list[str], types: list[str], paths: None | list[str] = None, tstart: None | datetime.datetime = None, tend: None | datetime.datetime = None, minio_security: bool = True, observation_ids: Iterable[int] = (), split_by_obs_id: bool = False, silent: bool = False, store_filename: bool = False) -> Generator[tuple[tuple[str, str, str, int], LOFAR2Observation], None, None]: - for (station, field, datatype), (metadata, data) in yield_data_from_params(stations, fields, types, paths, tstart, tend, minio_security, silent, store_filename): - # Don't filter metadata for the moment - filtered_metadata = metadata - filtered_datasets = l2filter.filter_by_obs_id(data, set(observation_ids), split_by_obs_id) - - for obs_id, filtered_data in filtered_datasets.items(): - if not filtered_data: - print(f"While {len(data)} entries were found in the requested time range, none had {obs_id=}.") - continue - - # filtered_metadata, filtered_data = metadata, data - l2obs = parse_json.build_lofar2_observation(filtered_metadata, filtered_data) - - yield (station, field, datatype, obs_id), l2obs \ No newline at end of file +def yield_data_from_params( + stations: list[str], + fields: list[str], + types: list[str], + paths: None | list[str] = None, + tstart: None | datetime.datetime = None, + tend: None | datetime.datetime = None, + minio_security: bool = True, + silent: bool = False, + store_filename: bool = False, +) -> Generator[ + tuple[ + tuple[str, str, str], + tuple[list[CapturedMetadataDict], list[CapturedDatasetDict]], + ], + None, + None, +]: + for station in stations: + # Cache metadata between fields, not stations + metadata: list[CapturedMetadataDict] = [] + for field in fields: + for datatype in types: + if paths: + working_metadata, data = l2disk.fetch_from_disk( + paths, + station, + field, + datatype, + tstart, + tend, + fetch_metadata=not bool(metadata), + store_filename=store_filename, + ) + else: + working_metadata, data = l2minio.fetch_from_minio( + station, + field, + datatype, + tstart, + tend, + security=minio_security, + fetch_metadata=not bool(metadata), + store_filename=store_filename, + ) + + # fetch_from_minio prints if no data was found + if not data: + continue + + if not metadata and not working_metadata: + raise RuntimeError(f"Unable to get metadata from {station}{field} between {tstart} and {tend}") + if not metadata: + metadata = working_metadata + + if not silent: + print( + f"Loaded {len(metadata)} metadata entries and {len(data)} {datatype} entries from minio for {station}{field}." + ) + + yield (station, field, datatype), (metadata, data) + + +def yield_l2obs_from_params( + stations: list[str], + fields: list[str], + types: list[str], + paths: None | list[str] = None, + tstart: None | datetime.datetime = None, + tend: None | datetime.datetime = None, + minio_security: bool = True, + observation_ids: Iterable[int] = (), + split_by_obs_id: bool = False, + silent: bool = False, + store_filename: bool = False, +) -> Generator[tuple[tuple[str, str, str, int], LOFAR2Observation], None, None]: + for (station, field, datatype), (metadata, data) in yield_data_from_params( + stations, + fields, + types, + paths, + tstart, + tend, + minio_security, + silent, + store_filename, + ): + # Don't filter metadata for the moment + filtered_metadata = metadata + filtered_datasets = l2filter.filter_by_obs_id(data, set(observation_ids), split_by_obs_id) + + for obs_id, filtered_data in filtered_datasets.items(): + if not filtered_data: + print(f"While {len(data)} entries were found in the requested time range, none had {obs_id=}.") + continue + + # filtered_metadata, filtered_data = metadata, data + l2obs = parse_json.build_lofar2_observation(filtered_metadata, filtered_data) + + yield (station, field, datatype, obs_id), l2obs diff --git a/l2json/sources/minio.py b/l2json/sources/minio.py index 22f46d9e0a0b6682c9ea24a783149a7c3b0329f5..caccb576c9be437c66a561d66db2634ea1cfbf8b 100644 --- a/l2json/sources/minio.py +++ b/l2json/sources/minio.py @@ -11,127 +11,142 @@ from l2json.parsers.parse_file import load_from_string def load_file_from_minio(client: Minio, bucket: str, objname: str) -> list[str]: - response_data: None | BaseHTTPResponse = None - try: - response_data = client.get_object(bucket, objname) - lines = response_data.data.decode().split("\n") - finally: - if response_data: - response_data.close() - response_data.release_conn() - return lines + response_data: None | BaseHTTPResponse = None + try: + response_data = client.get_object(bucket, objname) + lines = response_data.data.decode().split("\n") + finally: + if response_data: + response_data.close() + response_data.release_conn() + return lines + def fetch_minio_entries( - client: Minio, - bucket: str, - prefix: str, - tstart: datetime, - tend: datetime, - dt: timedelta, - always_write: bool = False, - tqdm_desc: str = "", - store_filename: bool = False, + client: Minio, + bucket: str, + prefix: str, + tstart: datetime, + tend: datetime, + dt: timedelta, + always_write: bool = False, + tqdm_desc: str = "", + store_filename: bool = False, ) -> list[CapturedDatasetDict | CapturedMetadataDict]: - # Get objects from S3 bucket - objects = client.list_objects(bucket, prefix=prefix, recursive=True) - - # Loop over objects and download - data: list[CapturedDatasetDict | CapturedMetadataDict] = [] - if tqdm_desc: - objects = tqdm.tqdm(objects, desc=tqdm_desc) - for obj in objects: - # Get timestamp - objname = obj.object_name - tstr = os.path.splitext(os.path.basename(objname))[0] - t = datetime.fromisoformat(tstr) - - - - # Download and read - if (t >= tstart - dt) & (t < tend + dt): - lines = load_file_from_minio(client, bucket, objname) - data += load_from_string(lines, tstart, tend, always_write, reference = objname if store_filename else "") - - return data + # Get objects from S3 bucket + objects = client.list_objects(bucket, prefix=prefix, recursive=True) + + # Loop over objects and download + data: list[CapturedDatasetDict | CapturedMetadataDict] = [] + if tqdm_desc: + objects = tqdm.tqdm(objects, desc=tqdm_desc) + for obj in objects: + # Get timestamp + objname = obj.object_name + tstr = os.path.splitext(os.path.basename(objname))[0] + t = datetime.fromisoformat(tstr) + + # Download and read + if (t >= tstart - dt) & (t < tend + dt): + lines = load_file_from_minio(client, bucket, objname) + data += load_from_string( + lines, + tstart, + tend, + always_write, + reference=objname if store_filename else "", + ) + + return data def get_minio_client(client_details: None | dict[str, str] = None, security: bool = True) -> tuple[Minio, dict[str, str]]: - if client_details is None: - client_details = { - "hostname": str(os.getenv("MINIO_HOSTNAME")), - "bucket": str(os.getenv("MINIO_BUCKET")), - "access_key": str(os.getenv("MINIO_ACCESS_KEY")), - "secret_key": str(os.getenv("MINIO_SECRET_KEY")) - } - - minio_client = Minio( - client_details["hostname"], - access_key=client_details["access_key"], - secret_key=client_details["secret_key"], - secure = security - ) - return minio_client, client_details + if client_details is None: + client_details = { + "hostname": str(os.getenv("MINIO_HOSTNAME")), + "bucket": str(os.getenv("MINIO_BUCKET")), + "access_key": str(os.getenv("MINIO_ACCESS_KEY")), + "secret_key": str(os.getenv("MINIO_SECRET_KEY")), + } + + minio_client = Minio( + client_details["hostname"], + access_key=client_details["access_key"], + secret_key=client_details["secret_key"], + secure=security, + ) + return minio_client, client_details + def fetch_from_minio( - station: str, - antennafield: str, - statistics_type: str, - tstart: None | datetime = None, - tend: None | datetime = None, - client_details: None | dict[str, str] = None, - write_interval_data: timedelta = timedelta(minutes=5, seconds = 5), - write_interval_metadata: timedelta = timedelta(minutes=15), - security: bool = True, - fetch_metadata: bool = True, - dont_save_data: bool = False, - store_filename: bool = False + station: str, + antennafield: str, + statistics_type: str, + tstart: None | datetime = None, + tend: None | datetime = None, + client_details: None | dict[str, str] = None, + write_interval_data: timedelta = timedelta(minutes=5, seconds=5), + write_interval_metadata: timedelta = timedelta(minutes=15), + security: bool = True, + fetch_metadata: bool = True, + dont_save_data: bool = False, + store_filename: bool = False, ) -> tuple[list[CapturedMetadataDict], list[CapturedDatasetDict]]: - station = station.lower() - antennafield = antennafield.lower() - statistics_type = statistics_type.lower() - - if antennafield.upper() not in ["LBA", "HBA0", "HBA1", "HBA"]: - print(f"WARNING: {antennafield=} is not recognised.") - if statistics_type.upper() not in ["BST", "SST", "XST"]: - print(f"WARNING: {statistics_type=} is not recognised.") - - tstart, tend = sanitise_time_limits(tstart, tend) - - minio_client, client_details = get_minio_client(client_details, security) - - prefix = os.path.join(station, statistics_type, antennafield, "") - data: list[CapturedDatasetDict] = [] - - flag_data_keys: None | list[str] = None - res: CapturedDatasetDict - for res in fetch_minio_entries( # type: ignore - minio_client, client_details["bucket"], prefix, tstart, tend, write_interval_data, tqdm_desc="Parsing data", store_filename = store_filename - ): - if dont_save_data: - if flag_data_keys is None: - flag_data_keys = [key for key in res if 'data' in key] - for attr in flag_data_keys: - setattr(res, attr, [[]]) - data.append(res) - if not len(data): - print(f"No data was found for {station}/{statistics_type}/{antennafield} @ {tstart} - {tend}") - - if fetch_metadata: - metadata_prefix = os.path.join(station, "metadata") - metadata: list[CapturedMetadataDict] = [CapturedMetadataDict(**res) for res in fetch_minio_entries( # type: ignore - minio_client, - client_details["bucket"], - metadata_prefix, - tstart, - tend, - write_interval_metadata, - always_write=True, - tqdm_desc="Parsing metadata", - )] - if not len(metadata): - raise KeyError(f"No metadata was found for {station}/{statistics_type}/{antennafield} @ {tstart} - {tend}") - else: - metadata = [] - - - return metadata, data \ No newline at end of file + station = station.lower() + antennafield = antennafield.lower() + statistics_type = statistics_type.lower() + + if antennafield.upper() not in ["LBA", "HBA0", "HBA1", "HBA"]: + print(f"WARNING: {antennafield=} is not recognised.") + if statistics_type.upper() not in ["BST", "SST", "XST"]: + print(f"WARNING: {statistics_type=} is not recognised.") + + tstart, tend = sanitise_time_limits(tstart, tend) + + minio_client, client_details = get_minio_client(client_details, security) + + prefix = os.path.join(station, statistics_type, antennafield, "") + data: list[CapturedDatasetDict] = [] + + flag_data_keys: None | list[str] = None + res: CapturedDatasetDict + for res in fetch_minio_entries( # type: ignore + minio_client, + client_details["bucket"], + prefix, + tstart, + tend, + write_interval_data, + tqdm_desc="Parsing data", + store_filename=store_filename, + ): + if dont_save_data: + if flag_data_keys is None: + flag_data_keys = [key for key in res if "data" in key] + for attr in flag_data_keys: + setattr(res, attr, [[]]) + data.append(res) + if not len(data): + print(f"No data was found for {station}/{statistics_type}/{antennafield} @ {tstart} - {tend}") + + if fetch_metadata: + metadata_prefix = os.path.join(station, "metadata") + metadata: list[CapturedMetadataDict] = [ + CapturedMetadataDict(**res) + for res in fetch_minio_entries( # type: ignore + minio_client, + client_details["bucket"], + metadata_prefix, + tstart, + tend, + write_interval_metadata, + always_write=True, + tqdm_desc="Parsing metadata", + ) + ] + if not len(metadata): + raise KeyError(f"No metadata was found for {station}/{statistics_type}/{antennafield} @ {tstart} - {tend}") + else: + metadata = [] + + return metadata, data diff --git a/l2json/splitters/__init__.py b/l2json/splitters/__init__.py index e73ac131e272dce14f05cf642e06fb264604e8c7..9bea3ad22e3870d3419097bcc17bc13f61871999 100644 --- a/l2json/splitters/__init__.py +++ b/l2json/splitters/__init__.py @@ -1,4 +1,4 @@ # Copyright (C) 2023 ASTRON (Netherlands Institute for Radio Astronomy) # SPDX-License-Identifier: Apache-2.0 -""" l2json """ +"""l2json""" diff --git a/pyproject.toml b/pyproject.toml index 8691dfa19addc746916d4d191cc5597d334150be..f20e22e7e95c6905f21c734cb5d7ef3ca63650b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -34,6 +34,26 @@ dependencies = [ "tqdm" ] + +[project.optional-dependencies] +docstest = [ + # Lint/Format/Test + "mypy>=1.8.0", # MIT, PSFLv2 + "pre-commit>=4.0.1", # MIT + "pytest>=7.0.0", # MIT + "pytest-cov>=3.0.0", # MIT + "pytest-doctestplus>=1.2.1", # All rights reserved derived + "pytest-ruff>=0.4.1", # MIT + "ruff>=0.9.4", # MIT + "tox>=4.23.0", # MIT + # Docs + "sphinx!=1.6.6,!=1.6.7,>=1.6.5", # BSD + "sphinx-rtd-theme>=0.4.3", # MIT + "sphinxcontrib-apidoc>=0.3.0", # BSD + "myst-parser>=2.0", # MIT + "docutils>=0.17", # BSD +] + [project.scripts] l2json_minio_pull = "l2json:minio_pull.main" l2json_minio_sync = "l2json:minio_sync.main" @@ -49,3 +69,12 @@ version_file = "l2json/_version.py" [tool.pylint] ignore = "_version.py" + +[tool.ruff] +line-length = 130 +respect-gitignore = true + +[tool.ruff.format] +exclude = ["_version.py"] +docstring-code-format = false +docstring-code-line-length = 79 diff --git a/tests/test_cool_module.py b/tests/test_cool_module.py index d3191c48ea3e21622a55cb17265bce8ee4c98804..f674955d7a6d5e67cae6c3aaf61514b675c92456 100644 --- a/tests/test_cool_module.py +++ b/tests/test_cool_module.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 """Testing of the Cool Module""" + from unittest import TestCase from l2json.cool_module import greeter