diff --git a/l2json/minio_obsid2time.py b/l2json/minio_obsid2time.py new file mode 100644 index 0000000000000000000000000000000000000000..cacc6775b4189107b063a5880c30cebec60101f2 --- /dev/null +++ b/l2json/minio_obsid2time.py @@ -0,0 +1,65 @@ +import argparse + +import datetime +import dateutil.parser + +import l2json.sources.args as l2args +from l2json.sources.minio import fetch_minio_entries, get_minio_client +from l2json.data_types.typed_json_dicts import CapturedMetadataDict + + +# export MINIO_HOSTNAME=filefish.lofar.net:9000; export MINIO_BUCKET=central-statistics; export MINIO_ACCESS_KEY=***REMOVED***; export MINIO_SECRET_KEY=***REMOVED*** +def main() -> None: + parser = argparse.ArgumentParser() + parser = l2args.add_arguments_to_parser(parser, prefix="", discard_flags=["output_pattern", "split_by_obs_id", "types"]) + + args = parser.parse_args() + args = l2args.parse_input_args(args, check_env_args=True) # type: ignore + + target_obs_ids: set[int] = set(args.observation_ids) + + client, client_details = get_minio_client(None, args.security) + for station in args.stations: + metadata: list[CapturedMetadataDict] = fetch_minio_entries( # type: ignore + client=client, + bucket=client_details["bucket"], + prefix=f"{station}/metadata/", + tstart=args.tstart, + tend=args.tend, + dt=datetime.timedelta(seconds=15 * 60), + ) + metadata.sort(key=lambda entry: dateutil.parser.parse(entry["timestamp"]), reversed=True) # type: ignore + end_times: dict[int, str] = {} + start_times: dict[int, str] = {} + tracking_obs_ids: set[int] = set() + last_entry_obs_ids: set[int] = set() + previous_time: str = "" + for entry in metadata: + last_entry_obs_ids = tracking_obs_ids.copy() + for obs_id in entry["stat/observationcontrol/1"]["running_observations_r"]: + if obs_id in target_obs_ids and obs_id not in end_times: + end_times[obs_id] = entry["timestamp"] + tracking_obs_ids.add(obs_id) + else: + last_entry_obs_ids.remove(obs_id) + + for obs_id in last_entry_obs_ids: + start_times[obs_id] = previous_time + tracking_obs_ids.remove(obs_id) + + previous_time = entry["timestamp"] + for obs_id in last_entry_obs_ids: + start_times[obs_id] = f"{previous_time}" + end_times[obs_id] = f"{end_times[obs_id]} (OBSERVATION STARTED BEFORE MEATDATA WINDOW)" + + if start_times: + print(f"\n\n{station}: We saw the following observation IDs:") + for key in start_times: + print(f"\t - {key:12d}:\t{start_times[key]} - {end_times[key]}") + print("\n\n") + else: + print(f"{station}: Observation IDs {target_obs_ids} were not found.") + + +if __name__ == "__main__": + main() diff --git a/l2json/minio_time2obsid.py b/l2json/minio_time2obsid.py new file mode 100644 index 0000000000000000000000000000000000000000..73787704c3931611b98c9b796e464d4ff9467a8e --- /dev/null +++ b/l2json/minio_time2obsid.py @@ -0,0 +1,63 @@ +import argparse + +import datetime +import dateutil.parser + +import l2json.sources.args as l2args +from l2json.sources.minio import fetch_minio_entries, get_minio_client +from l2json.data_types.typed_json_dicts import CapturedMetadataDict + + +# export MINIO_HOSTNAME=filefish.lofar.net:9000; export MINIO_BUCKET=central-statistics; export MINIO_ACCESS_KEY=***REMOVED***; export MINIO_SECRET_KEY=***REMOVED*** +def main() -> None: + parser = argparse.ArgumentParser() + parser = l2args.add_arguments_to_parser(parser, prefix="", discard_flags=["output_pattern", "split_by_obs_id", "types"]) + + args = parser.parse_args() + args = l2args.parse_input_args(args, check_env_args=True) # type: ignore + + client, client_details = get_minio_client(None, args.security) + for station in args.stations: + metadata: list[CapturedMetadataDict] = fetch_minio_entries( # type: ignore + client=client, + bucket=client_details["bucket"], + prefix=f"{station}/metadata/", + tstart=args.tstart, + tend=args.tend, + dt=datetime.timedelta(seconds=15 * 60), + ) + metadata.sort(key=lambda entry: dateutil.parser.parse(entry["timestamp"]), reversed=True) # type: ignore + end_times: dict[int, str] = {} + start_times: dict[int, str] = {} + tracking_obs_ids: set[int] = set() + last_entry_obs_ids: set[int] = set() + previous_time: str = "" + for entry in metadata: + last_entry_obs_ids = tracking_obs_ids.copy() + for obs_id in entry["stat/observationcontrol/1"]["running_observations_r"]: + if obs_id not in end_times: + end_times[obs_id] = entry["timestamp"] + tracking_obs_ids.add(obs_id) + else: + last_entry_obs_ids.remove(obs_id) + + for obs_id in last_entry_obs_ids: + start_times[obs_id] = previous_time + tracking_obs_ids.remove(obs_id) + + previous_time = entry["timestamp"] + for obs_id in last_entry_obs_ids: + start_times[obs_id] = f"{previous_time}" + end_times[obs_id] = f"{end_times[obs_id]} (OBSERVATION STARTED BEFORE MEATDATA WINDOW)" + + if start_times: + print(f"\n\n{station}: We saw the following observation IDs:") + for key in start_times: + print(f"\t - {key:12d}:\t{start_times[key]} - {end_times[key]}") + print("\n\n") + else: + print(f"{station}: No observation IDs were found in the processed window.") + + +if __name__ == "__main__": + main() diff --git a/pyproject.toml b/pyproject.toml index e99a9199868a514b88e99cb6973f850f06854876..79b4b285e56327ccb5f5514c7645dc198ad1579c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,6 +58,8 @@ docstest = [ [project.scripts] l2json_minio_pull = "l2json:minio_pull.main" l2json_minio_sync = "l2json:minio_sync.main" +l2json_time2obsid = "l2json:minio_time2obsid.main" +l2json_obsid2time = "l2json:minio_obsid2time.main" [tool.setuptools] packages = ["l2json"]