diff --git a/devices/statistics_writer/data/test.h5 b/devices/statistics_writer/data/test.h5 deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/devices/statistics_writer/data/testing.h5 b/devices/statistics_writer/data/testing.h5 deleted file mode 100644 index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..0000000000000000000000000000000000000000 diff --git a/devices/statistics_writer/hdf5_writer.py b/devices/statistics_writer/hdf5_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..456d36c0824220b597f7897be62d6fa545debc2c --- /dev/null +++ b/devices/statistics_writer/hdf5_writer.py @@ -0,0 +1,195 @@ +# imports for working with datetime objects +from datetime import datetime, timedelta +import pytz + +# python hdf5 +import h5py + +import numpy +import json +import logging + +# import statistics classes with workaround +import sys +sys.path.append("..") +from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket +import devices.sdp.statistics_collector as statistics_collector + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("hdf5_writer") + +__all__ = ["hdf5_writer"] + +class hdf5_writer: + + + def __init__(self, new_file_time_interval, file_location, statistics_mode): + + # all variables that deal with the SST matrix that's currently being decoded + self.current_matrix = None + self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC) + + # the header of the first packet of a new matrix is written as metadata. + # Assumes all subsequent headers of the same matrix are identical (minus index) + self.statistics_header = None + + # file handing + self.file_location = file_location + self.new_file_time_interval = timedelta(hours=new_file_time_interval) + self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) + self.file = None + + # config the writer for the correct statistics type + self.collector = None + self.decoder = None + self.mode = statistics_mode.upper() + self.config_mode() + + def next_packet(self, packet): + """ + All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets. + As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple + packets as well as storing matrices and starting new ones + + The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp + it will close the current matrix, store it and start a new one. + """ + + # process the packet + statistics_packet = self.decoder(packet) + + if not self.statistics_header: + self.statistics_header = statistics_packet.header() + + # grab the timestamp + statistics_timestamp = statistics_packet.timestamp() + + # check if te statistics timestamp is unexpectedly older than the current one + if statistics_timestamp < self.current_timestamp: + logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.") + return + + # if this statistics packet has a new timestamp it means we need to start a new matrix + if statistics_timestamp > self.current_timestamp: + self.start_new_matrix(statistics_timestamp) + self.current_timestamp = statistics_timestamp + + self.process_packet(packet) + + def start_new_matrix(self, timestamp): + logger.info(f"starting new matrix with timestamp: {timestamp}") + """ + is called when a statistics packet with a newer timestamp is received. + Writes the matrix to the hdf5 file + Creates a new hdf5 file if needed + updates current timestamp and statistics matrix collector + """ + + # write the finished (and checks if its the first matrix) + if self.current_matrix is not None: + try: + self.write_matrix() + except Exception as e: + time = self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3] + logger.exception(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped") + + # only start a new file if its time AND we are done with the previous matrix. + if timestamp >= self.new_file_time_interval + self.last_file_time: + self.start_new_hdf5(timestamp) + + # create a new and empty current_matrix + self.current_matrix = self.collector() + self.statistics_header = None + + def write_matrix(self): + logger.info("writing matrix to file") + """ + Writes the finished matrix to the hdf5 file + """ + + # create the new hdf5 group based on the timestamp of packets + current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3])) + + # store the statistics values + current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"]) + + # might be optional, but they're easy to add. + current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) + current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"]) + + # get the statistics header + header = self.statistics_header + + # can't store datetime objects, convert to string instead + header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") + + # Stores the header of the packet received for this matrix as a list of atttributes + for k,v in header.items(): + if type(v) == dict: + for subk, subv in v.items(): + current_group.attrs[f"{k}_{subk}"] = subv + else: + current_group.attrs[k] = v + + + def process_packet(self, packet): + logger.debug(f"Processing packet") + """ + Adds the newly received statistics packet to the statistics matrix + """ + self.current_matrix.process_packet(packet) + + def start_new_hdf5(self, timestamp): + + if self.file is not None: + try: + self.file.close() + except Exception as e: + logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.") + + current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") + + try: + self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') + except Exception as e: + logger.exception(f"Error while creating new file") + raise e + + self.last_file_time = timestamp + + def config_mode(self): + logger.debug(f"attempting to configure {self.mode} mode") + + """ + Configures the object for the correct statistics type to be used. + """ + + if self.mode == 'SST': + self.decoder = SSTPacket + self.collector = statistics_collector.SSTCollector + elif self.mode == 'BST': + # self.decoder = XSTPacket + raise NotImplementedError("BST collector has not yet been implemented") + elif self.mode == 'XST': + # self.decoder = XSTPacket + raise NotImplementedError("BST collector has not yet been implemented") + else: + # make sure the mode is valid + raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode)) + + def close_writer(self): + """ + Function that can be used to stop the writer without data loss. + """ + logger.debug("closing hdf5 file") + if self.file is not None: + if self.current_matrix is not None: + # Write matrix if one exists + # only creates file if there is a matrix to actually write + try: + self.write_matrix() + finally: + self.file.close() + logger.debug(f"{self.file} closed") diff --git a/devices/statistics_writer/tcp_receiver.py b/devices/statistics_writer/receiver.py similarity index 70% rename from devices/statistics_writer/tcp_receiver.py rename to devices/statistics_writer/receiver.py index 4112f926785d5fbb5b25672113c55445f8a8952b..919357764a2196cb7955e4ec77f2487b81d24d59 100644 --- a/devices/statistics_writer/tcp_receiver.py +++ b/devices/statistics_writer/receiver.py @@ -3,16 +3,15 @@ import socket import sys sys.path.append("..") from devices.sdp.statistics_packet import StatisticsPacket +import os + +class receiver: + """ Reads data from a file descriptor. """ -class tcp_receiver: HEADER_LENGTH = 32 - def __init__(self, HOST, PORT): - self.host = HOST - self.port = PORT - - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.connect((self.host, self.port)) + def __init__(self, fd): + self.fd = fd def get_packet(self) -> bytes: """ Read exactly one statistics packet from the TCP connection. """ @@ -36,11 +35,31 @@ class tcp_receiver: # try to read the remainder. # NOTE: recv() may return less data than requested, and returns 0 # if there is nothing left to read (end of stream) - more_data = self.sock.recv(data_length - len(data)) + more_data = os.read(self.fd, data_length - len(data)) if not more_data: # connection got dropped - raise IOError("Connection closed by peer") + raise EOFError("End of stream") data += more_data return data + +class tcp_receiver(receiver): + def __init__(self, HOST, PORT): + self.host = HOST + self.port = PORT + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.host, self.port)) + + super().__init__(fd=self.sock.fileno()) + +class file_receiver(receiver): + def __init__(self, filename): + self.filename = filename + self.fileno = os.open(filename, os.O_RDONLY) + + super().__init__(fd=self.fileno) + + def __del__(self): + os.close(self.fileno) diff --git a/devices/statistics_writer/statistics_writer.py b/devices/statistics_writer/statistics_writer.py index a013cb779b758b8e5256aba23e1ac0ff9a24e9d7..5e06a49043a42e5ca783f6ed895c2012546728b1 100644 --- a/devices/statistics_writer/statistics_writer.py +++ b/devices/statistics_writer/statistics_writer.py @@ -1,187 +1,66 @@ -# imports for working with datetime objects -from datetime import datetime, timedelta -import pytz +import argparse +from receiver import tcp_receiver, file_receiver +from hdf5_writer import hdf5_writer -# python hdf5 -import h5py - -import numpy -import json -import logging - -# import statistics classes with workaround import sys -sys.path.append("..") -from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket -import devices.sdp.statistics_collector as statistics_collector - +import signal +import logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger("statistics_writer") -__all__ = ["statistics_writer"] - -class statistics_writer: - - - def __init__(self, new_file_time_interval, file_location, statistics_mode): - - # all variables that deal with the SST matrix that's currently being decoded - self.current_matrix = None - self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC) - - # the header of the first packet of a new matrix is written as metadata. - # Assumes all subsequent headers of the same matrix are identical (minus index) - self.statistics_header = None - - # file handing - self.file_location = file_location - self.new_file_time_interval = timedelta(hours=new_file_time_interval) - self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) - self.file = None - - # config the writer for the correct statistics type - self.collector = None - self.decoder = None - self.mode = statistics_mode.upper() - self.config_mode() - - def next_packet(self, packet): - """ - All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets. - As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple - packets as well as storing matrices and starting new ones - - The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp - it will close the current matrix, store it and start a new one. - """ - - # process the packet - statistics_packet = self.decoder(packet) - - # grab the timestamp - statistics_timestamp = statistics_packet.timestamp() - - # check if te statistics timestamp is unexpectedly older than the current one - if statistics_timestamp < self.current_timestamp: - logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.") - return - - # if this statistics packet has a new timestamp it means we need to start a new matrix - if statistics_timestamp > self.current_timestamp: - self.statistics_header = statistics_packet.header() - self.start_new_matrix(statistics_timestamp) - self.current_timestamp = statistics_timestamp - - self.process_packet(packet) - - def start_new_matrix(self, timestamp): - logger.debug(f"starting new matrix with timestamp: {timestamp}") - """ - is called when a statistics packet with a newer timestamp is received. - Writes the matrix to the hdf5 file - Creates a new hdf5 file if needed - updates current timestamp and statistics matrix collector - """ - - # write the finished (and checks if its the first matrix) - if self.current_matrix is not None: - try: - self.write_matrix() - except Exception as e: - time = str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S")) - logger.error(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped: {e}") - - # only start a new file if its time AND we are done with the previous matrix. - if timestamp >= self.new_file_time_interval + self.last_file_time: - self.start_new_hdf5(timestamp) - - # create a new and empty current_matrix - self.current_matrix = self.collector() - - def write_matrix(self): - logger.debug("writing matrix to file") - """ - Writes the finished matrix to the hdf5 file - """ - - # create the new hdf5 group based on the timestamp of packets - current_group = self.file.create_group("{}_{}".format(self.mode, str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S")))) - - # store the statistics values - current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"]) - - # might be optional, but they're easy to add. - current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) - current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"]) - - # get the statistics header - header = self.statistics_header - # can't store datetime objects in json, converted to string instead - header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") - # convert the header to JSON - json_header = json.dumps(header) - # Stores the header of the packet received for this matrix - current_group.create_dataset(name='first_packet_header', data=numpy.str(json_header)) - - - def process_packet(self, packet): - logger.debug(f"Processing packet") - """ - Adds the newly received statistics packet to the statistics matrix - """ - self.current_matrix.process_packet(packet) - - def start_new_hdf5(self, timestamp): - - if self.file is not None: - try: - self.file.close() - except Exception as e: - logger.error(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity. \r\n Exception: {e}") - - current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) - logger.debug(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") - - try: - self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') - except Exception as e: - logger.error(f"Error while creating new file: {e}") - raise e - - self.last_file_time = timestamp - - def config_mode(self): - logger.debug(f"attempting to configure {self.mode} mode") - - """ - Configures the object for the correct statistics type to be used. - """ +parser = argparse.ArgumentParser(description='Converts a stream of statistics packets into HDF5 files.') +parser.add_argument('--host', type=str, help='the host to connect to') +parser.add_argument('--port', type=int, default=5101, help='the port to connect to (default: %(default)s)') +parser.add_argument('--file', type=str, help='the file to read from') + +parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: %(default)s)') +parser.add_argument('--interval', type=float, default=3600, nargs="?", help='The time between creating new files in seconds (default: %(default)s)') +parser.add_argument('--output_dir', type=str, default=".", nargs="?", help='specifies the folder to write all the files (default: %(default)s)') +parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='increase log output') + + +# create a data dumper that creates a new file every 10s (for testing) +if __name__ == "__main__": + args = parser.parse_args() + + # argparse arguments + host = args.host + port = args.port + filename = args.file + output_dir = args.output_dir + interval = args.interval + mode = args.mode + debug = args.debug + + if debug: + logger.setLevel(logging.DEBUG) + logger.debug("Setting loglevel to DEBUG") + + # creates the TCP receiver that is given to the writer + if filename: + receiver = file_receiver(filename) + elif host and port: + receiver = tcp_receiver(host, port) + else: + logger.fatal("Must provide either a host and port, or a file to receive input from") + sys.exit(1) + + # create the writer + writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode) + + # start looping + try: + while True: + packet = receiver.get_packet() + writer.next_packet(packet) + except KeyboardInterrupt: + # user abort, don't complain + pass + except EOFError: + # done processing all input, don't complain + pass + finally: + writer.close_writer() - if self.mode == 'SST': - self.decoder = SSTPacket - self.collector = statistics_collector.SSTCollector - elif self.mode == 'BST': - # self.decoder = XSTPacket - raise NotImplementedError("BST collector has not yet been implemented") - elif self.mode == 'XST': - # self.decoder = XSTPacket - raise NotImplementedError("BST collector has not yet been implemented") - else: - # make sure the mode is valid - raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode)) - def close_writer(self): - """ - Function that can be used to stop the writer without data loss. - """ - logger.debug("closing hdf5 file") - if self.file is not None: - if self.current_matrix is not None: - # Write matrix if one exists - # only creates file if there is a matrix to actually write - try: - self.write_matrix() - finally: - self.file.close() - logger.debug(f"{self.file} closed") diff --git a/devices/statistics_writer/tcp_hdf5_writer.py b/devices/statistics_writer/tcp_hdf5_writer.py deleted file mode 100644 index 8d290e293e2db82fa3e12e071e7dda4d6077ea40..0000000000000000000000000000000000000000 --- a/devices/statistics_writer/tcp_hdf5_writer.py +++ /dev/null @@ -1,59 +0,0 @@ -import argparse -from tcp_receiver import tcp_receiver -from statistics_writer import statistics_writer - -import sys -import signal - -import logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger("statistics_writer") - -parser = argparse.ArgumentParser(description='Arguments to configure the TCP connection and writer mode') -parser.add_argument('--address', type=str, help='the address to connect to') -parser.add_argument('--port', type=int, help='the port to use') - -parser.add_argument('--interval', type=float, default=1, nargs="?", help='The time between creating new files in hours') -parser.add_argument('--location', type=str, default="data", nargs="?", help='specifies the folder to write all the files') -parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], help='sets the statistics type to be decoded options: "SST", "XST", "BST"') -parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='when used stores failed packets') - - -# create a data dumper that creates a new file every 10s (for testing) -if __name__ == "__main__": - def signal_handler(signum, frame): - writer.close_writer() - sys.exit(0) - - - signal.signal(signal.SIGINT, signal_handler) - - args = parser.parse_args() - - # argparse arguments - address = args.address - port = args.port - location = args.location - interval = args.interval - mode = args.mode - debug = args.debug - - if debug: - logger.setLevel(logging.DEBUG) - logger.debug("Setting loglevel to DEBUG") - - # creates the TCP receiver that is given to the writer - receiver = tcp_receiver(address, port) - - # create the writer - writer = statistics_writer(new_file_time_interval=interval, file_location=location, statistics_mode=mode) - - # start looping - try: - while True: - packet = receiver.get_packet() - writer.next_packet(packet) - except KeyboardInterrupt: - writer.close_writer() - -