Skip to content
Snippets Groups Projects
Commit d5b8c7ec authored by David McKenna's avatar David McKenna
Browse files

STASH: obsid <-> times scripts

parent 95534001
No related branches found
No related tags found
1 merge request!36Metadata matching
Pipeline #119885 passed
Pipeline: l2json

#119886

    import argparse
    import datetime
    import dateutil.parser
    import l2json.sources.args as l2args
    from l2json.sources.minio import fetch_minio_entries, get_minio_client
    from l2json.data_types.typed_json_dicts import CapturedMetadataDict
    # export MINIO_HOSTNAME=filefish.lofar.net:9000; export MINIO_BUCKET=central-statistics; export MINIO_ACCESS_KEY=***REMOVED***; export MINIO_SECRET_KEY=***REMOVED***
    def main() -> None:
    parser = argparse.ArgumentParser()
    parser = l2args.add_arguments_to_parser(parser, prefix="", discard_flags=["output_pattern", "split_by_obs_id", "types"])
    args = parser.parse_args()
    args = l2args.parse_input_args(args, check_env_args=True) # type: ignore
    target_obs_ids: set[int] = set(args.observation_ids)
    client, client_details = get_minio_client(None, args.security)
    for station in args.stations:
    metadata: list[CapturedMetadataDict] = fetch_minio_entries( # type: ignore
    client=client,
    bucket=client_details["bucket"],
    prefix=f"{station}/metadata/",
    tstart=args.tstart,
    tend=args.tend,
    dt=datetime.timedelta(seconds=15 * 60),
    )
    metadata.sort(key=lambda entry: dateutil.parser.parse(entry["timestamp"]), reversed=True) # type: ignore
    end_times: dict[int, str] = {}
    start_times: dict[int, str] = {}
    tracking_obs_ids: set[int] = set()
    last_entry_obs_ids: set[int] = set()
    previous_time: str = ""
    for entry in metadata:
    last_entry_obs_ids = tracking_obs_ids.copy()
    for obs_id in entry["stat/observationcontrol/1"]["running_observations_r"]:
    if obs_id in target_obs_ids and obs_id not in end_times:
    end_times[obs_id] = entry["timestamp"]
    tracking_obs_ids.add(obs_id)
    else:
    last_entry_obs_ids.remove(obs_id)
    for obs_id in last_entry_obs_ids:
    start_times[obs_id] = previous_time
    tracking_obs_ids.remove(obs_id)
    previous_time = entry["timestamp"]
    for obs_id in last_entry_obs_ids:
    start_times[obs_id] = f"{previous_time}"
    end_times[obs_id] = f"{end_times[obs_id]} (OBSERVATION STARTED BEFORE MEATDATA WINDOW)"
    if start_times:
    print(f"\n\n{station}: We saw the following observation IDs:")
    for key in start_times:
    print(f"\t - {key:12d}:\t{start_times[key]} - {end_times[key]}")
    print("\n\n")
    else:
    print(f"{station}: Observation IDs {target_obs_ids} were not found.")
    if __name__ == "__main__":
    main()
    import argparse
    import datetime
    import dateutil.parser
    import l2json.sources.args as l2args
    from l2json.sources.minio import fetch_minio_entries, get_minio_client
    from l2json.data_types.typed_json_dicts import CapturedMetadataDict
    # export MINIO_HOSTNAME=filefish.lofar.net:9000; export MINIO_BUCKET=central-statistics; export MINIO_ACCESS_KEY=***REMOVED***; export MINIO_SECRET_KEY=***REMOVED***
    def main() -> None:
    parser = argparse.ArgumentParser()
    parser = l2args.add_arguments_to_parser(parser, prefix="", discard_flags=["output_pattern", "split_by_obs_id", "types"])
    args = parser.parse_args()
    args = l2args.parse_input_args(args, check_env_args=True) # type: ignore
    client, client_details = get_minio_client(None, args.security)
    for station in args.stations:
    metadata: list[CapturedMetadataDict] = fetch_minio_entries( # type: ignore
    client=client,
    bucket=client_details["bucket"],
    prefix=f"{station}/metadata/",
    tstart=args.tstart,
    tend=args.tend,
    dt=datetime.timedelta(seconds=15 * 60),
    )
    metadata.sort(key=lambda entry: dateutil.parser.parse(entry["timestamp"]), reversed=True) # type: ignore
    end_times: dict[int, str] = {}
    start_times: dict[int, str] = {}
    tracking_obs_ids: set[int] = set()
    last_entry_obs_ids: set[int] = set()
    previous_time: str = ""
    for entry in metadata:
    last_entry_obs_ids = tracking_obs_ids.copy()
    for obs_id in entry["stat/observationcontrol/1"]["running_observations_r"]:
    if obs_id not in end_times:
    end_times[obs_id] = entry["timestamp"]
    tracking_obs_ids.add(obs_id)
    else:
    last_entry_obs_ids.remove(obs_id)
    for obs_id in last_entry_obs_ids:
    start_times[obs_id] = previous_time
    tracking_obs_ids.remove(obs_id)
    previous_time = entry["timestamp"]
    for obs_id in last_entry_obs_ids:
    start_times[obs_id] = f"{previous_time}"
    end_times[obs_id] = f"{end_times[obs_id]} (OBSERVATION STARTED BEFORE MEATDATA WINDOW)"
    if start_times:
    print(f"\n\n{station}: We saw the following observation IDs:")
    for key in start_times:
    print(f"\t - {key:12d}:\t{start_times[key]} - {end_times[key]}")
    print("\n\n")
    else:
    print(f"{station}: No observation IDs were found in the processed window.")
    if __name__ == "__main__":
    main()
    ...@@ -58,6 +58,8 @@ docstest = [ ...@@ -58,6 +58,8 @@ docstest = [
    [project.scripts] [project.scripts]
    l2json_minio_pull = "l2json:minio_pull.main" l2json_minio_pull = "l2json:minio_pull.main"
    l2json_minio_sync = "l2json:minio_sync.main" l2json_minio_sync = "l2json:minio_sync.main"
    l2json_time2obsid = "l2json:minio_time2obsid.main"
    l2json_obsid2time = "l2json:minio_obsid2time.main"
    [tool.setuptools] [tool.setuptools]
    packages = ["l2json"] packages = ["l2json"]
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Please register or to comment