diff --git a/devices/.simulator.py.swo b/devices/.simulator.py.swo new file mode 100644 index 0000000000000000000000000000000000000000..69857d03c2fef9da1c3f1d9b632e7c0ef5de06b4 Binary files /dev/null and b/devices/.simulator.py.swo differ diff --git a/devices/.simulator.py.swp b/devices/.simulator.py.swp new file mode 100644 index 0000000000000000000000000000000000000000..9c61e2219b5f523237c09e97b0d0231d6bb4eec9 Binary files /dev/null and b/devices/.simulator.py.swp differ diff --git a/devices/clients/statistics_client.py b/devices/clients/statistics_client.py index 5d45ac472b52ac2f024dfd4a338cb3d03f4d3c77..29f432a5459b0216785976654d8527cbc4bf53a0 100644 --- a/devices/clients/statistics_client.py +++ b/devices/clients/statistics_client.py @@ -7,6 +7,8 @@ import queue from .comms_client import CommClient from .udp_receiver import UDPReceiver +from devices.sdp.statistics_collector import StatisticsConsumer + logger = logging.getLogger() @@ -19,11 +21,11 @@ class StatisticsClient(CommClient): def start(self): super().start() - def __init__(self, statistics_collector_class, host, port, fault_func, streams, try_interval=2, queuesize=1024): + def __init__(self, collector, host, port, fault_func, streams, try_interval=2, queuesize=1024): """ Create the statistics client and connect() to it and get the object node. - statistics_collector_class: a subclass of StatisticsCollector that specialises in processing the received packets. + collector: a subclass of StatisticsCollector that specialises in processing the received packets. host: hostname to listen on port: port number to listen on """ @@ -31,7 +33,7 @@ class StatisticsClient(CommClient): self.port = port self.poll_timeout = 0.1 self.queuesize = queuesize - self.statistics_collector_class = statistics_collector_class + self.collector = collector super().__init__(fault_func, streams, try_interval) @@ -55,7 +57,7 @@ class StatisticsClient(CommClient): if not self.connected: self.queue = Queue(maxsize=self.queuesize) self.udp = UDPReceiver(self.host, self.port, self.queue, self.poll_timeout) - self.statistics = self.statistics_collector_class(self.queue) + self.statistics = StatisticsConsumer(self.queue, self.collector) return super().connect() @@ -106,7 +108,7 @@ class StatisticsClient(CommClient): # redirect to right object. this works as long as the parameter names are unique among them. if annotation["type"] == "statistics": def read_function(): - return self.statistics.parameters[parameter] + return self.collector.parameters[parameter] elif annotation["type"] == "udp": def read_function(): return self.udp.parameters[parameter] diff --git a/devices/devices/sdp/statistics.py b/devices/devices/sdp/statistics.py index 5d10aae8b866acc0b30598856cb63b1ecc6d233a..b1a2681052c95650056afb08b8dd4dbf0e1dcf00 100644 --- a/devices/devices/sdp/statistics.py +++ b/devices/devices/sdp/statistics.py @@ -120,7 +120,8 @@ class Statistics(hardware_device, metaclass=ABCMeta): """ user code here. is called when the sate is set to INIT """ """Initialises the attributes and properties of the statistics device.""" - self.statistics_client = StatisticsClient(self.STATISTICS_COLLECTOR_CLASS, "0.0.0.0", self.Statistics_Client_Port, self.Fault, self) + self.statistics_collector = self.STATISTICS_COLLECTOR_CLASS() + self.statistics_client = StatisticsClient(self.statistics_collector, "0.0.0.0", self.Statistics_Client_Port, self.Fault, self) self.OPCUA_client = OPCUAConnection("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), "http://lofar.eu", self.OPC_Time_Out, self.Fault, self) diff --git a/devices/devices/sdp/statistics_collector.py b/devices/devices/sdp/statistics_collector.py index f3aac3c1982b03b169eaddedce52b50c939ddc45..4d845b13f1b70785443f2783d8de9f7d85f9d9d9 100644 --- a/devices/devices/sdp/statistics_collector.py +++ b/devices/devices/sdp/statistics_collector.py @@ -7,8 +7,8 @@ from .statistics_packet import SSTPacket logger = logging.getLogger() -class StatisticsCollector(Thread): - """ Base class to process statistics packets from a queue, asynchronously. """ +class StatisticsCollector: + """ Base class to process statistics packets into parameters matrices. """ # Maximum number of antenna inputs we support (used to determine array sizes) MAX_INPUTS = 192 @@ -16,18 +16,9 @@ class StatisticsCollector(Thread): # Maximum number of subbands we support (used to determine array sizes) MAX_SUBBANDS = 512 - # Maximum time to wait for the Thread to get unstuck, if we want to stop - DISCONNECT_TIMEOUT = 10.0 - - def __init__(self, queue: Queue): - self.queue = queue - self.last_packet = None - + def __init__(self): self.parameters = self._default_parameters() - super().__init__() - self.start() - def _default_parameters(self): return { "nof_packets": numpy.uint64(0), @@ -39,48 +30,18 @@ class StatisticsCollector(Thread): "last_invalid_packet": numpy.zeros((9000,), dtype=numpy.uint8), } - def run(self): - logger.info("Starting statistics thread") - - while True: - self.last_packet = self.queue.get() - - # This is the exception/slow path, but python doesn't allow us to optimise that - if self.last_packet is None: - # None is the magic marker to stop processing - break - - self.parameters["nof_packets"] += numpy.uint64(1) - - try: - self.process_packet(self.last_packet) - except Exception as e: - logger.exception("Could not parse statistics UDP packet") - - self.parameters["last_invalid_packet"] = numpy.frombuffer(self.last_packet, dtype=numpy.uint8) - self.parameters["nof_invalid_packets"] += numpy.uint64(1) - - logger.info("Stopped statistics thread") - - def join(self, timeout=0): - # insert magic marker - self.queue.put(None) - logger.info("Sent shutdown to statistics thread") - - super().join(timeout) - - def disconnect(self): - if not self.is_alive(): - return + def process_packet(self, packet): + self.parameters["nof_packets"] += numpy.uint64(1) - # try to get the thread shutdown, but don't stall forever - self.join(self.DISCONNECT_TIMEOUT) + try: + self.parse_packet(packet) + except Exception as e: + self.parameters["last_invalid_packet"] = numpy.frombuffer(packet, dtype=numpy.uint8) + self.parameters["nof_invalid_packets"] += numpy.uint64(1) - if self.is_alive(): - # there is nothing we can do except wait (stall) longer, which could be indefinitely. - logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") + raise ValueError("Could not parse statistics packet") from e - def process_packet(self, packet): + def parse_packet(self, packet): """ Update any information based on this packet. """ raise NotImplementedError @@ -113,7 +74,7 @@ class SSTCollector(StatisticsCollector): return defaults - def process_packet(self, packet): + def parse_packet(self, packet): fields = SSTPacket(packet) # determine which input this packet contains data for @@ -135,3 +96,58 @@ class SSTCollector(StatisticsCollector): self.parameters["sst_values"][input_index][:fields.nof_statistics_per_packet] = fields.payload self.parameters["sst_timestamps"][input_index] = numpy.float64(fields.timestamp().timestamp()) self.parameters["integration_intervals"][input_index] = fields.integration_interval() + + +class StatisticsConsumer(Thread): + """ Base class to process statistics packets from a queue, asynchronously. """ + + # Maximum time to wait for the Thread to get unstuck, if we want to stop + DISCONNECT_TIMEOUT = 10.0 + + def __init__(self, queue: Queue, collector: StatisticsCollector): + self.queue = queue + self.collector = collector + self.last_packet = None + + super().__init__() + self.start() + + def run(self): + logger.info("Starting statistics thread") + + while True: + self.last_packet = self.queue.get() + + # This is the exception/slow path, but python doesn't allow us to optimise that + if self.last_packet is None: + # None is the magic marker to stop processing + break + + + try: + self.collector.process_packet(self.last_packet) + except ValueError as e: + logger.exception("Could not parse statistics packet") + + # continue processing + + logger.info("Stopped statistics thread") + + def join(self, timeout=0): + # insert magic marker + self.queue.put(None) + logger.info("Sent shutdown to statistics thread") + + super().join(timeout) + + def disconnect(self): + if not self.is_alive(): + return + + # try to get the thread shutdown, but don't stall forever + self.join(self.DISCONNECT_TIMEOUT) + + if self.is_alive(): + # there is nothing we can do except wait (stall) longer, which could be indefinitely. + logger.error(f"Statistics thread did not shut down after {self.DISCONNECT_TIMEOUT} seconds, just leaving it dangling. Please attach a debugger to thread ID {self.ident}.") + diff --git a/devices/statistics_writer/README.md b/devices/statistics_writer/README.md new file mode 100644 index 0000000000000000000000000000000000000000..ea722c6cf552443364b196264034d768690955be --- /dev/null +++ b/devices/statistics_writer/README.md @@ -0,0 +1,58 @@ +# TCP to HDF5 statistics writer +The TCP to HDF5 statistics writer can be started with `tcp_hdf5_writer.py` This script imports +`tcp_receiver.py` and `statistics_writer.py`. `tcp_receiver.py` only takes care of receiving packets. +`statistics_writer.py` takes the receive function from the tcp_receiver and uses it to obtain packets. +Any function that can deliver statistics packets can be used by this code. +`statistics_writer.py` takes care of processing the packets it receives, filling statistics matrices +and writing those matrices (as well as a bunch of metadata) to hdf5. + + +### TCP Statistics writer + +The TCP statistics writer can be called with the `tcp_hdf5_writer.py` script. +This script can be called with the following arguments: + ``` + --address the address to connect to + --port the port to use + --interval The time between creating new files in hours + --location specifies the folder to write all the files + --mode sets the statistics type to be decoded options: "SST", "XST", "BST" + --debug takes no arguments, when used prints a lot of extra data to help with debugging + ``` + + +##HFD5 structure +Statistics packets are collected by the StatisticsCollector in to a matrix. Once the matrix is done or a newer +timestamp arrives this matrix along with the header of first packet header, nof_payload_errors and nof_valid_payloads. +The file will be named after the mode it is in and the timestamp of the statistics packets. For example: `SST_1970-01-01-00-00-00.h5`. + + +``` +File +| +|------ {mode_timestamp} |- {statistics matrix} +| |- {first packet header} +| |- {nof_valid_payloads} +| |- {nof_payload_errors} +| +|------ {mode_timestamp} |- {statistics matrix} +| |- {first packet header} +| |- {nof_valid_payloads} +| |- {nof_payload_errors} +| +... +``` + +###explorer +There is an hdf5 explorer that will walk through specified hdf5 files. +Its called `hdf5_explorer.py` and can be called with a `--file` argument +ex: `python3 hdf5_explorer.py --file data/SST_1970-01-01-00-00-00.h5` This allows for easy manual checking +of the structure and content of hdf5 files. useful for testing and debugging. +Can also be used as example of how to read the HDF5 statistics data files. +Provides a number of example functions inside that go through the file in various ways. + +###test server +There is a test server that will continuously send out the same statistics packet. +Its called `test_server.py`. Takes `--host`, `--port` and `--file` as optional input arguments. +Defaults to address `'127.0.0.1'`, port `65433` and file `devices_test_SDP_SST_statistics_packets.bin` + diff --git a/devices/statistics_writer/data/test.h5 b/devices/statistics_writer/data/test.h5 new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/devices/statistics_writer/data/testing.h5 b/devices/statistics_writer/data/testing.h5 new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/devices/statistics_writer/statistics_writer.py b/devices/statistics_writer/statistics_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..a013cb779b758b8e5256aba23e1ac0ff9a24e9d7 --- /dev/null +++ b/devices/statistics_writer/statistics_writer.py @@ -0,0 +1,187 @@ +# imports for working with datetime objects +from datetime import datetime, timedelta +import pytz + +# python hdf5 +import h5py + +import numpy +import json +import logging + +# import statistics classes with workaround +import sys +sys.path.append("..") +from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket +import devices.sdp.statistics_collector as statistics_collector + + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("statistics_writer") + +__all__ = ["statistics_writer"] + +class statistics_writer: + + + def __init__(self, new_file_time_interval, file_location, statistics_mode): + + # all variables that deal with the SST matrix that's currently being decoded + self.current_matrix = None + self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC) + + # the header of the first packet of a new matrix is written as metadata. + # Assumes all subsequent headers of the same matrix are identical (minus index) + self.statistics_header = None + + # file handing + self.file_location = file_location + self.new_file_time_interval = timedelta(hours=new_file_time_interval) + self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) + self.file = None + + # config the writer for the correct statistics type + self.collector = None + self.decoder = None + self.mode = statistics_mode.upper() + self.config_mode() + + def next_packet(self, packet): + """ + All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets. + As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple + packets as well as storing matrices and starting new ones + + The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp + it will close the current matrix, store it and start a new one. + """ + + # process the packet + statistics_packet = self.decoder(packet) + + # grab the timestamp + statistics_timestamp = statistics_packet.timestamp() + + # check if te statistics timestamp is unexpectedly older than the current one + if statistics_timestamp < self.current_timestamp: + logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.") + return + + # if this statistics packet has a new timestamp it means we need to start a new matrix + if statistics_timestamp > self.current_timestamp: + self.statistics_header = statistics_packet.header() + self.start_new_matrix(statistics_timestamp) + self.current_timestamp = statistics_timestamp + + self.process_packet(packet) + + def start_new_matrix(self, timestamp): + logger.debug(f"starting new matrix with timestamp: {timestamp}") + """ + is called when a statistics packet with a newer timestamp is received. + Writes the matrix to the hdf5 file + Creates a new hdf5 file if needed + updates current timestamp and statistics matrix collector + """ + + # write the finished (and checks if its the first matrix) + if self.current_matrix is not None: + try: + self.write_matrix() + except Exception as e: + time = str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + logger.error(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped: {e}") + + # only start a new file if its time AND we are done with the previous matrix. + if timestamp >= self.new_file_time_interval + self.last_file_time: + self.start_new_hdf5(timestamp) + + # create a new and empty current_matrix + self.current_matrix = self.collector() + + def write_matrix(self): + logger.debug("writing matrix to file") + """ + Writes the finished matrix to the hdf5 file + """ + + # create the new hdf5 group based on the timestamp of packets + current_group = self.file.create_group("{}_{}".format(self.mode, str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S")))) + + # store the statistics values + current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"]) + + # might be optional, but they're easy to add. + current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"]) + current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"]) + + # get the statistics header + header = self.statistics_header + # can't store datetime objects in json, converted to string instead + header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds") + # convert the header to JSON + json_header = json.dumps(header) + # Stores the header of the packet received for this matrix + current_group.create_dataset(name='first_packet_header', data=numpy.str(json_header)) + + + def process_packet(self, packet): + logger.debug(f"Processing packet") + """ + Adds the newly received statistics packet to the statistics matrix + """ + self.current_matrix.process_packet(packet) + + def start_new_hdf5(self, timestamp): + + if self.file is not None: + try: + self.file.close() + except Exception as e: + logger.error(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity. \r\n Exception: {e}") + + current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + logger.debug(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") + + try: + self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') + except Exception as e: + logger.error(f"Error while creating new file: {e}") + raise e + + self.last_file_time = timestamp + + def config_mode(self): + logger.debug(f"attempting to configure {self.mode} mode") + + """ + Configures the object for the correct statistics type to be used. + """ + + if self.mode == 'SST': + self.decoder = SSTPacket + self.collector = statistics_collector.SSTCollector + elif self.mode == 'BST': + # self.decoder = XSTPacket + raise NotImplementedError("BST collector has not yet been implemented") + elif self.mode == 'XST': + # self.decoder = XSTPacket + raise NotImplementedError("BST collector has not yet been implemented") + else: + # make sure the mode is valid + raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode)) + + def close_writer(self): + """ + Function that can be used to stop the writer without data loss. + """ + logger.debug("closing hdf5 file") + if self.file is not None: + if self.current_matrix is not None: + # Write matrix if one exists + # only creates file if there is a matrix to actually write + try: + self.write_matrix() + finally: + self.file.close() + logger.debug(f"{self.file} closed") diff --git a/devices/statistics_writer/tcp_hdf5_writer.py b/devices/statistics_writer/tcp_hdf5_writer.py new file mode 100644 index 0000000000000000000000000000000000000000..8d290e293e2db82fa3e12e071e7dda4d6077ea40 --- /dev/null +++ b/devices/statistics_writer/tcp_hdf5_writer.py @@ -0,0 +1,59 @@ +import argparse +from tcp_receiver import tcp_receiver +from statistics_writer import statistics_writer + +import sys +import signal + +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("statistics_writer") + +parser = argparse.ArgumentParser(description='Arguments to configure the TCP connection and writer mode') +parser.add_argument('--address', type=str, help='the address to connect to') +parser.add_argument('--port', type=int, help='the port to use') + +parser.add_argument('--interval', type=float, default=1, nargs="?", help='The time between creating new files in hours') +parser.add_argument('--location', type=str, default="data", nargs="?", help='specifies the folder to write all the files') +parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], help='sets the statistics type to be decoded options: "SST", "XST", "BST"') +parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='when used stores failed packets') + + +# create a data dumper that creates a new file every 10s (for testing) +if __name__ == "__main__": + def signal_handler(signum, frame): + writer.close_writer() + sys.exit(0) + + + signal.signal(signal.SIGINT, signal_handler) + + args = parser.parse_args() + + # argparse arguments + address = args.address + port = args.port + location = args.location + interval = args.interval + mode = args.mode + debug = args.debug + + if debug: + logger.setLevel(logging.DEBUG) + logger.debug("Setting loglevel to DEBUG") + + # creates the TCP receiver that is given to the writer + receiver = tcp_receiver(address, port) + + # create the writer + writer = statistics_writer(new_file_time_interval=interval, file_location=location, statistics_mode=mode) + + # start looping + try: + while True: + packet = receiver.get_packet() + writer.next_packet(packet) + except KeyboardInterrupt: + writer.close_writer() + + diff --git a/devices/statistics_writer/tcp_receiver.py b/devices/statistics_writer/tcp_receiver.py new file mode 100644 index 0000000000000000000000000000000000000000..4112f926785d5fbb5b25672113c55445f8a8952b --- /dev/null +++ b/devices/statistics_writer/tcp_receiver.py @@ -0,0 +1,46 @@ +import socket + +import sys +sys.path.append("..") +from devices.sdp.statistics_packet import StatisticsPacket + +class tcp_receiver: + HEADER_LENGTH = 32 + + def __init__(self, HOST, PORT): + self.host = HOST + self.port = PORT + + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.connect((self.host, self.port)) + + def get_packet(self) -> bytes: + """ Read exactly one statistics packet from the TCP connection. """ + + # read only the header, to compute the size of the packet + header = self.read_data(self.HEADER_LENGTH) + packet = StatisticsPacket(header) + + # read the rest of the packet (payload) + payload_length = packet.expected_size() - len(header) + payload = self.read_data(payload_length) + + # add payload to the header, and return the full packet + return header + payload + + def read_data(self, data_length: int) -> bytes: + """ Read exactly data_length bytes from the TCP connection. """ + + data = b'' + while len(data) < data_length: + # try to read the remainder. + # NOTE: recv() may return less data than requested, and returns 0 + # if there is nothing left to read (end of stream) + more_data = self.sock.recv(data_length - len(data)) + if not more_data: + # connection got dropped + raise IOError("Connection closed by peer") + + data += more_data + + return data diff --git a/devices/statistics_writer/test/devices_test_SDP_SST_statistics_packets.bin b/devices/statistics_writer/test/devices_test_SDP_SST_statistics_packets.bin new file mode 100644 index 0000000000000000000000000000000000000000..e94347b86a0a03b940eb84980ec8f6d3b6d4e2d7 Binary files /dev/null and b/devices/statistics_writer/test/devices_test_SDP_SST_statistics_packets.bin differ diff --git a/devices/statistics_writer/test/hdf5_explorer.py b/devices/statistics_writer/test/hdf5_explorer.py new file mode 100644 index 0000000000000000000000000000000000000000..29cc88049086f5bea22c441d1ca12f91769c7135 --- /dev/null +++ b/devices/statistics_writer/test/hdf5_explorer.py @@ -0,0 +1,132 @@ +import h5py +import numpy + +import argparse + +parser = argparse.ArgumentParser(description='Select a file to explore') +parser.add_argument('--file', type=str, help='the name and path of the file') + +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("hdf5_explorer") +logger.setLevel(logging.DEBUG) + + +class statistics_data: + """ + Example class not used by anything + This class takes the file and the statistics name as its __init__ arguments and then stores the + the datasets in them. + """ + + + NOF_PAYLOAD_ERRORS = "nof_payload_errors" + NOF_VALID_PAYLOADS = "nof_valid_payloads" + FIRST_PACKET_HEADER = "first_packet_header" + STATISTICS_VALUES = "statistics_values" + + def __init__(self, file, statistics_name): + self.nof_valid_payloads = file.get(f"{statistics_name}/{statistics_data.NOF_VALID_PAYLOADS}") + self.nof_payload_errors = file.get(f"{statistics_name}/{statistics_data.NOF_PAYLOAD_ERRORS}") + self.first_packet_header = file.get(f"{statistics_name}/{statistics_data.FIRST_PACKET_HEADER}") + self.statistics_values = file.get(f"{statistics_name}/{statistics_data.STATISTICS_VALUES}") + + +class explorer: + """ + This class serves both as a tool to test and verify the content of HDF5 files as well as provide an example + of how you can go through HDF5 files. + + + The first 2 functions, print_high_level and print_full both call the hdf5 file.visititems function. this function + takes another function as argument and then calls that function for each and every group and dataset in the file. + + The last 2 functions do this without this file.visititems function and instead have knowledge of how we structure the + statistics data. + """ + + + def __init__(self, filename): + self.file = h5py.File(filename, 'r') + + def print_high_level(self): + """Calls a function that will go through all groups and datasets in the file and pass data along to another specified function""" + self.file.visititems(self._high_level_explorer) + + def print_full(self): + """Calls a function that will go through all groups and datasets in the file and pass data along to another specified function""" + self.file.visititems(self._full_explorer) + + def _full_explorer(self, name, obj): + """ + Called by the file.visititems(func) function. Gets called for each and every group and dataset. + Prints all groups and datasets including their content. + """ + + shift = name.count('/') * ' ' + data = self.file.get(name) + logger.debug(f"{shift}{name}: {data}") + logger.debug(numpy.array(data)) + + def _high_level_explorer(self, name, obj): + """ + Called by the file.visititems(func) function. Gets called for each and every group and dataset. + Only lists the groups and datasets without the actual content. + """ + shift = name.count('/') * ' ' + data = self.file.get(name) + logger.debug(f"{shift}{name}: {data}") + + def print_all_statistics_full(self): + """ + Explores the file with knowledge of the file structure. assumes all top level groups are statistics + and that all statistics groups are made up of datasets. + Prints the groups, the datasets and the content of the datasets. + """ + + # List all groups + logger.debug("Keys: %s" % self.file.keys()) + + for group_key in self.file.keys(): + dataset = list(self.file[group_key]) + for i in dataset: + data = self.file.get(f"{group_key}/{i}") + logger.debug(group_key) + logger.debug(numpy.array(data)) + + def print_all_statistics_top_level(self): + """ + Explores the file with knowledge of the file structure. assumes all top level groups are statistics + and that all statistics groups are made up of datasets. + This function prints only the top level groups, AKA all the statistics collected. Useful when dealing with + potentially hundreds of statistics. + """ + # List all groups + logger.debug("Listing all statistics stored in this file:") + + for group_key in self.file.keys(): + logger.debug(group_key) + + + +# create a data dumper that creates a new file every 10s (for testing) +if __name__ == "__main__": + args = parser.parse_args() + Explorer = explorer(args.file) + + """ + Print the entire files content + """ + Explorer.print_all_statistics_full() + + """ + Print only the names of all the statistics in this file + """ + Explorer.print_all_statistics_top_level() + + + + + + + diff --git a/devices/statistics_writer/test/test_server.py b/devices/statistics_writer/test/test_server.py new file mode 100644 index 0000000000000000000000000000000000000000..eec9ec3eed992b03ee809ca37de012bad43bd213 --- /dev/null +++ b/devices/statistics_writer/test/test_server.py @@ -0,0 +1,52 @@ +import socket +import time + +import argparse + +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger("statistics_test_server") +logger.setLevel(logging.DEBUG) + +parser = argparse.ArgumentParser(description='Select what hostname to use and what port to use') +parser.add_argument('--port', type=int, help='port to use', default=65433) +parser.add_argument('--host', help='host to use', default='127.0.0.1') +parser.add_argument('--file', help='file to use as data', default='devices_test_SDP_SST_statistics_packets.bin') +parser.add_argument('--interval', type=int, help='ime between sending entire files content', default=1) + +args = parser.parse_args() +HOST = args.host +PORT = args.port +FILE = args.file +INTERVAL = args.interval + + +while True: + try: + f = open(FILE, "rb") + data = f.read() + except Exception as e: + logger.error(f"File not found, are you sure: '{FILE}' is a valid path, Exception: {e}") + exit() + + try: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + logger.debug(f"Starting TCP test server on {HOST} {PORT}") + logger.debug("To interrupt the script, press Ctrl-C twice within a second") + + s.bind((HOST, PORT)) + s.listen() + conn, addr = s.accept() + + with conn: + logger.debug(f'Connected by: {addr}') + + while True: + time.sleep(INTERVAL) + conn.sendall(data) + + except Exception as e: + logger.warning(f"Exception occurred: {e}") + + # just do 2 interrupt within a second to quit the program + time.sleep(1) diff --git a/devices/statistics_writer/udp_dev/udp_client.py b/devices/statistics_writer/udp_dev/udp_client.py new file mode 100644 index 0000000000000000000000000000000000000000..cef6a079d17dc0fb45d71f181ee2be908e9bd091 --- /dev/null +++ b/devices/statistics_writer/udp_dev/udp_client.py @@ -0,0 +1,62 @@ +import socket +import sys +import netifaces as ni +from datetime import datetime +import time + +class UDP_Client: + + def __init__(self, server_ip:str, server_port:int): + self.server_ip = server_ip + self.server_port = server_port + self.server_data = None + self.server_addr = None # tuple of address info + + def run(self): + # Create socket for server + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 0) + print("Do Ctrl+c to exit the program !!") + print('\n\n*** This Client keeps sending the same SST packet with an interval of 1s ***') + + # Let's send data through UDP protocol + while True: + + #Old interactive interface + #send_data = input("Type some text to send =>"); + #s.sendto(send_data.encode('utf-8'), (self.server_ip, self.server_port)) + #print("\n\n 1. Client Sent : ", send_data, "\n\n") + #self.server_data, self.server_addr = s.recvfrom(4096) + #print("\n\n 2. Client received : ", self.server_data.decode('utf-8'), "\n\n") + + time.sleep(1) + + f = open("../../test/SDP_SST_statistics_packet.bin", "rb") + send_data = f.read() + s.sendto(send_data, (self.server_ip, self.server_port)) + print("\n\n 1. Client Sent SST Packet at: ", datetime.now()) + self.server_data, self.server_addr = s.recvfrom(4096) + print("\n\n 2. Client received : ", self.server_data.decode('utf-8'), "\n\n") + + # close the socket + s.close() + +if __name__ == '__main__': + + if len(sys.argv) == 3: + if sys.argv[1]=='localhost': + server_ip = ni.ifaddresses('eth0')[ni.AF_INET][0]['addr'] + else : + server_ip = sys.argv[1] + server_port = int(sys.argv[2]) + #local_ip = local_ip = ni.ifaddresses('eth0')[ni.AF_INET][0]['addr'] + #server_ip = local_ip + else: + print("Run like : python3 udp_client.py <server_ip> <server_port>") + exit(1) + + client = UDP_Client(server_ip,server_port) + client.run() + + + + \ No newline at end of file diff --git a/devices/statistics_writer/udp_dev/udp_server.py b/devices/statistics_writer/udp_dev/udp_server.py new file mode 100644 index 0000000000000000000000000000000000000000..45624761519287b13bbce5c73cf8d8cb7dff9201 --- /dev/null +++ b/devices/statistics_writer/udp_dev/udp_server.py @@ -0,0 +1,50 @@ +import socket +import sys +import time +import netifaces as ni +from datetime import datetime + +class UDP_Server: + + def __init__(self, ip:str, port:int, buffer_size:int = 8192): + self.ip = ip + self.port = port + self.buffer_size = buffer_size + self.recv_data = None + self.recv_addr = None + + def run(self): + # Create a UDP socket + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # Bind the socket to the port + server_address = (self.ip, self.port) + s.bind(server_address) + print("Do Ctrl+c to exit the program !!") + print("\n\n####### Server is listening on %s - port %s #######" % (self.ip,self.port)) + + while True: + + self.recv_data, self.recv_addr = s.recvfrom(self.buffer_size) + print("\n\n 2. Server received at: ", datetime.now(), "\n\n") + + '''Server response''' + #send_data = input("Type some text to send => ") + send_data = 'Packet received. Waiting for the next one.' + s.sendto(send_data.encode('utf-8'), self.recv_addr) + print("\n\n 1. Server sent : ", send_data,"\n\n") + + #time.sleep(10) + #s.close() + + break + + # close the socket + s.close() + + def get_recv_data(self): + return self.recv_data + +if __name__ == '__main__': + local_ip = ni.ifaddresses('eth0')[ni.AF_INET][0]['addr'] + server = UDP_Server(local_ip,5600) + server.run() diff --git a/devices/statistics_writer/udp_dev/udp_write_manager.py b/devices/statistics_writer/udp_dev/udp_write_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..0c11f6a82dc11f8151eb771b90033feb38ef9c42 --- /dev/null +++ b/devices/statistics_writer/udp_dev/udp_write_manager.py @@ -0,0 +1,81 @@ +from datetime import datetime +import time +import os +import h5py +import numpy as np +from statistics_writer.udp_dev import udp_server as udp +import netifaces as ni +from statistics_packet import SSTPacket + +__all__ = ["statistics_writer"] + + +class Statistics_Writer: + + def __init__(self, new_file_time_interval): + + self.new_file_time_interval = new_file_time_interval + self.packet_cnt = 0 + + # Define ip and port of the receiver + self.local_ip = ni.ifaddresses('eth0')[ni.AF_INET][0]['addr'] + self.server = udp.UDP_Server(self.local_ip, 5600) + + # Create data directory if not exists + try: + os.makedirs('../data') + except: + print('Data directory already created') + + # create initial file + self.last_file_time = time.time() + self.file = None + self.new_hdf5() + + def write_packet(self, raw_data): + # create new file if the file was created more than the allowed time ago + if time.time() >= self.new_file_time_interval + self.last_file_time: + self.new_hdf5() + + self.packet_cnt += 1 + + # create dataset with the raw data in it + self.write_raw(raw_data) + self.write_metadata(raw_data) + + def new_hdf5(self): + + if self.file is not None: + self.file.close() + + timestamp = datetime.now() + current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) + print("creating new file: data/{}.h5".format(current_time)) + self.file = h5py.File("data/{}.h5".format(current_time), 'w') + self.last_file_time = time.time() + + def write_metadata(self, packet): + # decode packet + self.sst = SSTPacket(packet) + header = self.sst.header() + header_bytes = bytes(str(header), "utf-8") + header_bytes = np.frombuffer(header_bytes, dtype=np.uint8) + self.file.create_dataset('packet_{}_header'.format(self.packet_cnt), data=header_bytes) + + def write_raw(self, packet): + # create dataset with the raw data in it + data = np.frombuffer(packet, dtype=np.uint8) + self.file.create_dataset('packet_{}_raw'.format(self.packet_cnt), data=data) + + +if __name__ == "__main__": + # create a data dumper that creates a new file every 10s (for testing) + test = Statistics_Writer(new_file_time_interval=10) + + # simple loop to write data every second + while True: + test.server.run() + data = test.server.get_recv_data() + test.write_packet(data) + + # time.sleep(1) diff --git a/devices/test/SDP_SST_statistics_packet.bin b/devices/test/SDP_SST_statistics_packet.bin index ade2d62c32eb6cbf4fb9b5ec2d7c0368ab0af408..a45b77587a8104cbeb756d85cbb757f02abf39bf 100644 Binary files a/devices/test/SDP_SST_statistics_packet.bin and b/devices/test/SDP_SST_statistics_packet.bin differ