diff --git a/devices/statistics_writer/README.md b/devices/statistics_writer/README.md index e2111f3d203158706f96a3eaee6004f3121f00ea..9c3e24a6ed360701778e023a9cc42d46b4b5dc8e 100644 --- a/devices/statistics_writer/README.md +++ b/devices/statistics_writer/README.md @@ -44,13 +44,20 @@ File ... ``` -###explorer -There is an hdf5 explorer that will walk through specified hdf5 files. -Its called `hdf5_explorer.py` and can be called with a `--file` argument -ex: `python3 hdf5_explorer.py --file data/SST_1970-01-01-00-00-00.h5` This allows for easy manual checking -of the structure and content of hdf5 files. useful for testing and debugging. -Can also be used as example of how to read the HDF5 statistics data files. -Provides a number of example functions inside that go through the file in various ways. +###reader +There is a statistics reader that is capable of parsing multiple HDF5 statistics files in to +a more easily usable format. It also allows for filtering between certain timestamps. +`statistics_reader.py` takes the following arguments: +`--files list of files to parse` +`--end_time highest timestamp to process in isoformat` +`--start_time lowest timestamp to process in isoformat` + +ex: `python3 statistics_reader.py --files SST_2021-10-04-07-36-52.h5 --end_time 2021-10-04#07:50:08.937+00:00` +This will parse all the statistics in the file `SST_2021-10-04-07-36-52.h5` up to the timestamp `2021-10-04#07:50:08.937+00:00` + +This file can be used as both a testing tool and an example for dealing with HDF5 statistics. +The code serves can serve as a starting point for further development. To help with these purposes a bunch of simple +helper functions are provided. ###test server There is a test server that will continuously send out the same statistics packet. diff --git a/devices/statistics_writer/SST_2021-10-04-07-36-52.h5 b/devices/statistics_writer/SST_2021-10-04-07-36-52.h5 new file mode 100644 index 0000000000000000000000000000000000000000..26179fc59a2fb032bb35d779676befd4ebe26356 Binary files /dev/null and b/devices/statistics_writer/SST_2021-10-04-07-36-52.h5 differ diff --git a/devices/statistics_writer/hdf5_writer.py b/devices/statistics_writer/hdf5_writer.py index 197c3242fe48a8f99d4d1e79eb5412a6b8d90e2a..6715dd870608a0202610ea52c417695844f0d1c9 100644 --- a/devices/statistics_writer/hdf5_writer.py +++ b/devices/statistics_writer/hdf5_writer.py @@ -133,7 +133,7 @@ class hdf5_writer: """ # create the new hdf5 group based on the timestamp of packets - current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3])) + current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.isoformat(timespec="milliseconds"))) # store the statistics values for the current group self.store_function(current_group) @@ -158,11 +158,11 @@ class hdf5_writer: def write_sst_matrix(self, current_group): # store the SST values - current_group.create_dataset(name="sst_values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") + current_group.create_dataset(name="values", data=self.current_matrix.parameters["sst_values"].astype(numpy.float32), compression="gzip") def write_xst_matrix(self, current_group): # requires a function call to transform the xst_blocks in to the right structure - current_group.create_dataset(name="xst_values", data=self.current_matrix.xst_values().astype(numpy.cfloat), compression="gzip") + current_group.create_dataset(name="values", data=self.current_matrix.xst_values().astype(numpy.cfloat), compression="gzip") def write_bst_matrix(self, current_group): raise NotImplementedError("BST values not implemented") diff --git a/devices/statistics_writer/statistics_reader.py b/devices/statistics_writer/statistics_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..c974fff75db6dd127f0506bf45d611ae531b9464 --- /dev/null +++ b/devices/statistics_writer/statistics_reader.py @@ -0,0 +1,258 @@ +import h5py +import numpy +import datetime +import argparse +import os +import psutil +import pytz +import time + +process = psutil.Process(os.getpid()) + +parser = argparse.ArgumentParser(description='Select a file to explore') +parser.add_argument('--files', type=str, nargs="+", help='the name and path of the files, takes one or more files') +parser.add_argument('--start_time', type=str, help='lowest timestamp to process (uses isoformat, ex: 2021-10-04#07:50:08.937+00:00)') +parser.add_argument('--end_time', type=str, help='highest timestamp to process (uses isoformat, ex: 2021-10-04#07:50:08.937+00:00)') + + +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("hdf5_explorer") +logger.setLevel(logging.DEBUG) + + +def timeit(method): + """ + Simple decorator function to log time, function and process memory usage + """ + + def timed(*args, **kw): + global RESULT + s = datetime.datetime.now() + RESULT = method(*args, **kw) + e = datetime.datetime.now() + + sizeMb = process.memory_info().rss / 1024 / 1024 + sizeMbStr = "{0:,}".format(round(sizeMb, 2)) + + logger.debug('Time taken = %s, %s ,size = %s MB' % (e - s, method.__name__, sizeMbStr)) + + return timed + + +class statistics_parser: + """ + This class goes through the file and creates a list of all statistics in the file it is given + """ + + def __init__(self): + + # list of all statistics + self.statistics = [] + + # dict of all statistics, allows for easier access. + self.statistics_dict = {} + + # for setting the range of times to parse. Initialise with the build in minimum and maximum values + self.start_time = datetime.datetime.min.replace(tzinfo=pytz.UTC) + self.end_time = datetime.datetime.max.replace(tzinfo=pytz.UTC) + + def set_start_time(self, start_time): + """ + set the lowest statistics timestamp to store + """ + self.start_time = datetime.datetime.fromisoformat(start_time) + + def set_end_time(self, end_time): + """ + set the highest statistics timestamp to store + """ + self.end_time = datetime.datetime.fromisoformat(end_time) + + @timeit + def parse_file(self, files): + """ + This function opens and parses the statistics HDF5 file and adds it to self.statistics. + """ + + # if its just a single file the type could be string + if type(files) is str: + file = [files] + + for file in files: + hdf5_file = h5py.File(file, 'r') + + # go through all the groups + logger.debug(f"Parsing hdf5 statistics file") + + for group_key in hdf5_file.keys(): + try: + # first get the statistic + statistic = statistics_data(hdf5_file, group_key) + + # extract the timestamp and convert to datetime + statistic_time = statistic.timestamp + + # check if the timestamp is before the start time + if statistic_time < self.start_time: + continue + + # check if the timestamp is after the end times + if statistic_time > self.end_time: + # Exit, we're done + logger.debug(f"Parsed {len(self.statistics)} statistics") + return + + # append to the statistics list + self.statistics.append(statistic) + self.statistics_dict[statistic.timestamp.isoformat(timespec="milliseconds")] = statistic + + except: + logger.warning(f"Encountered an error while parsing statistic. Skipped: {group_key}") + + logger.debug(f"Parsed {len(self.statistics)} statistics") + + @timeit + def collect_values(self): + """" + Collects all of the statistics values in to a single giant numpy array + Uses a lot more memory (Basically double since the values make up the bulk of memory) + """ + + lst = [] + for i in self.statistics: + lst.append(i.values) + + value_array = numpy.stack(lst) + return value_array + + def sort_by_timestamp(self): + """ + Ensures the statistics are correctly sorted. + In case files arent given in sequential order. + """ + self.statistics.sort(key=lambda r: r.timestamp) + + def get_statistic(self, timestamp): + """ + Returns a statistic object based on the timestamp given. + """ + for i in self.statistics: + if i.timestamp == datetime.datetime.fromisoformat(timestamp): + return i + + raise Exception(f"No statistic with timestamp {timestamp} found, make sure to use the isoformat") + + def list_statistics(self): + """ + Returns a list of all statistics + """ + return self.statistics_dict.keys() + + def get_statistics_count(self): + """ + Simply returns the amount of statistics + """ + return len(self.statistics) + + +class statistics_data: + """ + This class takes the file and the statistics name as its __init__ arguments and then stores the + the datasets in them. + """ + + # we will be creating potentially tens of thousands of these object. Using __slots__ makes them faster and uses less memory. At the cost of + # having to list all self attributes here. + __slots__ = "version_id", "timestamp", "station_id", "source_info_t_adc", "source_info_subband_calibrated_flag", "source_info_payload_error", \ + "source_info_payload_error", "source_info_payload_error", "source_info_nyquist_zone_index", "source_info_gn_index", \ + "source_info_fsub_type", "source_info_beam_repositioning_flag", "source_info_antenna_band_index", "source_info__raw",\ + "observation_id", "nof_statistics_per_packet", "nof_signal_inputs", "nof_bytes_per_statistic", "marker", "integration_interval_raw", \ + "integration_interval", "data_id__raw", "block_serial_number", "block_period_raw", "block_period", "data_id_signal_input_index", \ + "data_id_subband_index", "data_id_first_baseline", "data_id_beamlet_index", "nof_valid_payloads", "nof_payload_errors", "values" + + + def __init__(self, file, group_key): + + # get all the general header info + self.version_id = file[group_key].attrs["version_id"] + self.station_id = file[group_key].attrs["station_id"] + + # convert string timestamp to datetime object + self.timestamp = datetime.datetime.fromisoformat(file[group_key].attrs["timestamp"]) + + self.source_info_t_adc = file[group_key].attrs["source_info_t_adc"] + self.source_info_subband_calibrated_flag = file[group_key].attrs["source_info_subband_calibrated_flag"] + self.source_info_payload_error = file[group_key].attrs["source_info_payload_error"] + self.source_info_nyquist_zone_index = file[group_key].attrs["source_info_payload_error"] + self.source_info_gn_index = file[group_key].attrs["source_info_gn_index"] + self.source_info_fsub_type = file[group_key].attrs["source_info_fsub_type"] + self.source_info_beam_repositioning_flag = file[group_key].attrs["source_info_beam_repositioning_flag"] + self.source_info_antenna_band_index = file[group_key].attrs["source_info_antenna_band_index"] + self.source_info__raw = file[group_key].attrs["source_info__raw"] + + self.observation_id = file[group_key].attrs["observation_id"] + self.nof_statistics_per_packet = file[group_key].attrs["nof_statistics_per_packet"] + self.nof_signal_inputs = file[group_key].attrs["nof_signal_inputs"] + self.nof_bytes_per_statistic = file[group_key].attrs["nof_bytes_per_statistic"] + self.marker = file[group_key].attrs["marker"] + self.integration_interval_raw = file[group_key].attrs["integration_interval_raw"] + self.integration_interval = file[group_key].attrs["integration_interval"] + self.data_id__raw = file[group_key].attrs["data_id__raw"] + + self.block_serial_number = file[group_key].attrs["block_serial_number"] + self.block_period_raw = file[group_key].attrs["block_period_raw"] + self.block_period = file[group_key].attrs["block_period"] + + # get SST specific stuff + if self.marker == "S": + self.data_id_signal_input_index = file[group_key].attrs["data_id_signal_input_index"] + + # get XST specific stuff + if self.marker == "X": + self.data_id_subband_index = file[group_key].attrs["data_id_subband_index"] + self.data_id_first_baseline = file[group_key].attrs["data_id_first_baseline"] + + # get BST specific stuff + if self.marker == "B": + self.data_id_beamlet_index = file[group_key].attrs["data_id_beamlet_index"] + + # get the datasets + self.nof_valid_payloads = numpy.array(file.get(f"{group_key}/nof_valid_payloads")) + self.nof_payload_errors = numpy.array(file.get(f"{group_key}/nof_payload_errors")) + self.values = numpy.array(file.get(f"{group_key}/values")) + + +# create a data dumper that creates a new file every 10s (for testing) +if __name__ == "__main__": + args = parser.parse_args() + files = args.files + end_time = args.end_time + start_time = args.start_time + + # create the parser + parser = statistics_parser() + + # set the correct time ranges + if end_time is not None: + parser.set_end_time(end_time) + if start_time is not None: + parser.set_start_time(start_time) + + # parse all the files + parser.parse_file(files) + + # for good measure sort all the statistics by timestamp. Useful when multiple files are given out of order + parser.sort_by_timestamp() + + # get a single numpy array of all the statistics stored. + parser.collect_values() + + + + + + + + + diff --git a/devices/statistics_writer/test/SST_10m_test_1.h5 b/devices/statistics_writer/test/SST_10m_test_1.h5 new file mode 100644 index 0000000000000000000000000000000000000000..2d04a526e1ef73d7bd636e3b564192d95e49cef5 Binary files /dev/null and b/devices/statistics_writer/test/SST_10m_test_1.h5 differ diff --git a/devices/statistics_writer/test/SST_10m_test_2.h5 b/devices/statistics_writer/test/SST_10m_test_2.h5 new file mode 100644 index 0000000000000000000000000000000000000000..45fd32d831508f8d632c6f1778d4d9bb73059294 Binary files /dev/null and b/devices/statistics_writer/test/SST_10m_test_2.h5 differ diff --git a/devices/statistics_writer/test/SST_10m_test_3.h5 b/devices/statistics_writer/test/SST_10m_test_3.h5 new file mode 100644 index 0000000000000000000000000000000000000000..5c971e8e2cea131d6c9ba8b7e6b1d645f205f276 Binary files /dev/null and b/devices/statistics_writer/test/SST_10m_test_3.h5 differ diff --git a/devices/statistics_writer/test/hdf5_explorer.py b/devices/statistics_writer/test/hdf5_explorer.py deleted file mode 100644 index 102c36b79f7beeb6a34ffba9b95a495a85a76f6e..0000000000000000000000000000000000000000 --- a/devices/statistics_writer/test/hdf5_explorer.py +++ /dev/null @@ -1,95 +0,0 @@ -import h5py -import numpy - -import argparse - -parser = argparse.ArgumentParser(description='Select a file to explore') -parser.add_argument('--file', type=str, help='the name and path of the file') - -import logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("hdf5_explorer") -logger.setLevel(logging.DEBUG) - - -class statistics_data: - """ - Example class not used by anything - This class takes the file and the statistics name as its __init__ arguments and then stores the - the datasets in them. - """ - -class explorer: - """ - This class serves both as a tool to test and verify the content of HDF5 files as well as provide an example - of how you can go through HDF5 files. - """ - - - def __init__(self, filename): - self.file = h5py.File(filename, 'r') - - def print_all_statistics_full(self): - """ - Explores the file with knowledge of the file structure. assumes all top level groups are statistics - and that all statistics groups are made up of datasets. - Prints the groups, the datasets and the content of the datasets. - - Can easily be modified to instead of just logging all the data, store it in whatever structure is needed. - """ - - for group_key in self.file.keys(): - dataset = list(self.file[group_key]) - - #print group name - logger.debug(f" \n\ngroup: {group_key}") - - # Go through all the datasets - for i in dataset: - data = self.file.get(f"{group_key}/{i}") - logger.debug(f" dataset: {i}") - logger.debug(f" Data: {numpy.array(data)}") - - # go through all the attributes in the group (This is the header info) - attr_keys = self.file[group_key].attrs.keys() - for i in attr_keys: - attr = self.file[group_key].attrs[i] - - logger.debug(f" {i}: {attr}") - - def print_all_statistics_top_level(self): - """ - Explores the file with knowledge of the file structure. assumes all top level groups are statistics - and that all statistics groups are made up of datasets. - This function prints only the top level groups, AKA all the statistics collected. Useful when dealing with - potentially hundreds of statistics. - """ - # List all groups - logger.debug("Listing all statistics stored in this file:") - - for group_key in self.file.keys(): - logger.debug(group_key) - - -# create a data dumper that creates a new file every 10s (for testing) -if __name__ == "__main__": - args = parser.parse_args() - Explorer = explorer(args.file) - - """ - Print the entire files content - """ - Explorer.print_all_statistics_full() - - """ - Print only the names of all the statistics in this file - """ - logger.debug("--------------Top level groups--------------") - Explorer.print_all_statistics_top_level() - - - - - - -