Skip to content
Snippets Groups Projects
Commit 16842a0b authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-390-minor-writer-fixes' into 'master'

L2SS-390: Usability improvements for statistics writer

Closes L2SS-390

See merge request !135
parents 74a671d6 506f64e3
No related branches found
No related tags found
1 merge request!135L2SS-390: Usability improvements for statistics writer
File suppressed by a .gitattributes entry, the file's encoding is unsupported, or the file size exceeds the limit.
File suppressed by a .gitattributes entry, the file's encoding is unsupported, or the file size exceeds the limit.
# imports for working with datetime objects
from datetime import datetime, timedelta
import pytz
# python hdf5
import h5py
import numpy
import json
import logging
# import statistics classes with workaround
import sys
sys.path.append("..")
from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket
import devices.sdp.statistics_collector as statistics_collector
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("hdf5_writer")
__all__ = ["hdf5_writer"]
class hdf5_writer:
def __init__(self, new_file_time_interval, file_location, statistics_mode):
# all variables that deal with the SST matrix that's currently being decoded
self.current_matrix = None
self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
# the header of the first packet of a new matrix is written as metadata.
# Assumes all subsequent headers of the same matrix are identical (minus index)
self.statistics_header = None
# file handing
self.file_location = file_location
self.new_file_time_interval = timedelta(hours=new_file_time_interval)
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
self.file = None
# config the writer for the correct statistics type
self.collector = None
self.decoder = None
self.mode = statistics_mode.upper()
self.config_mode()
def next_packet(self, packet):
"""
All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets.
As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple
packets as well as storing matrices and starting new ones
The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp
it will close the current matrix, store it and start a new one.
"""
# process the packet
statistics_packet = self.decoder(packet)
if not self.statistics_header:
self.statistics_header = statistics_packet.header()
# grab the timestamp
statistics_timestamp = statistics_packet.timestamp()
# check if te statistics timestamp is unexpectedly older than the current one
if statistics_timestamp < self.current_timestamp:
logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.")
return
# if this statistics packet has a new timestamp it means we need to start a new matrix
if statistics_timestamp > self.current_timestamp:
self.start_new_matrix(statistics_timestamp)
self.current_timestamp = statistics_timestamp
self.process_packet(packet)
def start_new_matrix(self, timestamp):
logger.info(f"starting new matrix with timestamp: {timestamp}")
"""
is called when a statistics packet with a newer timestamp is received.
Writes the matrix to the hdf5 file
Creates a new hdf5 file if needed
updates current timestamp and statistics matrix collector
"""
# write the finished (and checks if its the first matrix)
if self.current_matrix is not None:
try:
self.write_matrix()
except Exception as e:
time = self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3]
logger.exception(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped")
# only start a new file if its time AND we are done with the previous matrix.
if timestamp >= self.new_file_time_interval + self.last_file_time:
self.start_new_hdf5(timestamp)
# create a new and empty current_matrix
self.current_matrix = self.collector()
self.statistics_header = None
def write_matrix(self):
logger.info("writing matrix to file")
"""
Writes the finished matrix to the hdf5 file
"""
# create the new hdf5 group based on the timestamp of packets
current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3]))
# store the statistics values
current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"])
# might be optional, but they're easy to add.
current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"])
current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"])
# get the statistics header
header = self.statistics_header
# can't store datetime objects, convert to string instead
header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds")
# Stores the header of the packet received for this matrix as a list of atttributes
for k,v in header.items():
if type(v) == dict:
for subk, subv in v.items():
current_group.attrs[f"{k}_{subk}"] = subv
else:
current_group.attrs[k] = v
def process_packet(self, packet):
logger.debug(f"Processing packet")
"""
Adds the newly received statistics packet to the statistics matrix
"""
self.current_matrix.process_packet(packet)
def start_new_hdf5(self, timestamp):
if self.file is not None:
try:
self.file.close()
except Exception as e:
logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.")
current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5")
try:
self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w')
except Exception as e:
logger.exception(f"Error while creating new file")
raise e
self.last_file_time = timestamp
def config_mode(self):
logger.debug(f"attempting to configure {self.mode} mode")
"""
Configures the object for the correct statistics type to be used.
"""
if self.mode == 'SST':
self.decoder = SSTPacket
self.collector = statistics_collector.SSTCollector
elif self.mode == 'BST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
elif self.mode == 'XST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
else:
# make sure the mode is valid
raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode))
def close_writer(self):
"""
Function that can be used to stop the writer without data loss.
"""
logger.debug("closing hdf5 file")
if self.file is not None:
if self.current_matrix is not None:
# Write matrix if one exists
# only creates file if there is a matrix to actually write
try:
self.write_matrix()
finally:
self.file.close()
logger.debug(f"{self.file} closed")
...@@ -3,16 +3,15 @@ import socket ...@@ -3,16 +3,15 @@ import socket
import sys import sys
sys.path.append("..") sys.path.append("..")
from devices.sdp.statistics_packet import StatisticsPacket from devices.sdp.statistics_packet import StatisticsPacket
import os
class receiver:
""" Reads data from a file descriptor. """
class tcp_receiver:
HEADER_LENGTH = 32 HEADER_LENGTH = 32
def __init__(self, HOST, PORT): def __init__(self, fd):
self.host = HOST self.fd = fd
self.port = PORT
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
def get_packet(self) -> bytes: def get_packet(self) -> bytes:
""" Read exactly one statistics packet from the TCP connection. """ """ Read exactly one statistics packet from the TCP connection. """
...@@ -36,11 +35,31 @@ class tcp_receiver: ...@@ -36,11 +35,31 @@ class tcp_receiver:
# try to read the remainder. # try to read the remainder.
# NOTE: recv() may return less data than requested, and returns 0 # NOTE: recv() may return less data than requested, and returns 0
# if there is nothing left to read (end of stream) # if there is nothing left to read (end of stream)
more_data = self.sock.recv(data_length - len(data)) more_data = os.read(self.fd, data_length - len(data))
if not more_data: if not more_data:
# connection got dropped # connection got dropped
raise IOError("Connection closed by peer") raise EOFError("End of stream")
data += more_data data += more_data
return data return data
class tcp_receiver(receiver):
def __init__(self, HOST, PORT):
self.host = HOST
self.port = PORT
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
super().__init__(fd=self.sock.fileno())
class file_receiver(receiver):
def __init__(self, filename):
self.filename = filename
self.fileno = os.open(filename, os.O_RDONLY)
super().__init__(fd=self.fileno)
def __del__(self):
os.close(self.fileno)
# imports for working with datetime objects import argparse
from datetime import datetime, timedelta from receiver import tcp_receiver, file_receiver
import pytz from hdf5_writer import hdf5_writer
# python hdf5
import h5py
import numpy
import json
import logging
# import statistics classes with workaround
import sys import sys
sys.path.append("..") import signal
from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket
import devices.sdp.statistics_collector as statistics_collector
import logging
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("statistics_writer") logger = logging.getLogger("statistics_writer")
__all__ = ["statistics_writer"] parser = argparse.ArgumentParser(description='Converts a stream of statistics packets into HDF5 files.')
parser.add_argument('--host', type=str, help='the host to connect to')
class statistics_writer: parser.add_argument('--port', type=int, default=5101, help='the port to connect to (default: %(default)s)')
parser.add_argument('--file', type=str, help='the file to read from')
def __init__(self, new_file_time_interval, file_location, statistics_mode): parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: %(default)s)')
parser.add_argument('--interval', type=float, default=3600, nargs="?", help='The time between creating new files in seconds (default: %(default)s)')
# all variables that deal with the SST matrix that's currently being decoded parser.add_argument('--output_dir', type=str, default=".", nargs="?", help='specifies the folder to write all the files (default: %(default)s)')
self.current_matrix = None parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='increase log output')
self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
# the header of the first packet of a new matrix is written as metadata. # create a data dumper that creates a new file every 10s (for testing)
# Assumes all subsequent headers of the same matrix are identical (minus index) if __name__ == "__main__":
self.statistics_header = None args = parser.parse_args()
# file handing # argparse arguments
self.file_location = file_location host = args.host
self.new_file_time_interval = timedelta(hours=new_file_time_interval) port = args.port
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) filename = args.file
self.file = None output_dir = args.output_dir
interval = args.interval
# config the writer for the correct statistics type mode = args.mode
self.collector = None debug = args.debug
self.decoder = None
self.mode = statistics_mode.upper() if debug:
self.config_mode() logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
def next_packet(self, packet):
""" # creates the TCP receiver that is given to the writer
All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets. if filename:
As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple receiver = file_receiver(filename)
packets as well as storing matrices and starting new ones elif host and port:
receiver = tcp_receiver(host, port)
The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp else:
it will close the current matrix, store it and start a new one. logger.fatal("Must provide either a host and port, or a file to receive input from")
""" sys.exit(1)
# process the packet
statistics_packet = self.decoder(packet)
# grab the timestamp
statistics_timestamp = statistics_packet.timestamp()
# check if te statistics timestamp is unexpectedly older than the current one
if statistics_timestamp < self.current_timestamp:
logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.")
return
# if this statistics packet has a new timestamp it means we need to start a new matrix
if statistics_timestamp > self.current_timestamp:
self.statistics_header = statistics_packet.header()
self.start_new_matrix(statistics_timestamp)
self.current_timestamp = statistics_timestamp
self.process_packet(packet)
def start_new_matrix(self, timestamp):
logger.debug(f"starting new matrix with timestamp: {timestamp}")
"""
is called when a statistics packet with a newer timestamp is received.
Writes the matrix to the hdf5 file
Creates a new hdf5 file if needed
updates current timestamp and statistics matrix collector
"""
# write the finished (and checks if its the first matrix)
if self.current_matrix is not None:
try:
self.write_matrix()
except Exception as e:
time = str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
logger.error(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped: {e}")
# only start a new file if its time AND we are done with the previous matrix.
if timestamp >= self.new_file_time_interval + self.last_file_time:
self.start_new_hdf5(timestamp)
# create a new and empty current_matrix
self.current_matrix = self.collector()
def write_matrix(self):
logger.debug("writing matrix to file")
"""
Writes the finished matrix to the hdf5 file
"""
# create the new hdf5 group based on the timestamp of packets
current_group = self.file.create_group("{}_{}".format(self.mode, str(self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S"))))
# store the statistics values
current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"])
# might be optional, but they're easy to add.
current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"])
current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"])
# get the statistics header
header = self.statistics_header
# can't store datetime objects in json, converted to string instead
header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds")
# convert the header to JSON
json_header = json.dumps(header)
# Stores the header of the packet received for this matrix
current_group.create_dataset(name='first_packet_header', data=numpy.str(json_header))
def process_packet(self, packet):
logger.debug(f"Processing packet")
"""
Adds the newly received statistics packet to the statistics matrix
"""
self.current_matrix.process_packet(packet)
def start_new_hdf5(self, timestamp):
if self.file is not None:
try:
self.file.close()
except Exception as e:
logger.error(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity. \r\n Exception: {e}")
current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S")) # create the writer
logger.debug(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5") writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode)
# start looping
try: try:
self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w') while True:
except Exception as e: packet = receiver.get_packet()
logger.error(f"Error while creating new file: {e}") writer.next_packet(packet)
raise e except KeyboardInterrupt:
# user abort, don't complain
self.last_file_time = timestamp pass
except EOFError:
def config_mode(self): # done processing all input, don't complain
logger.debug(f"attempting to configure {self.mode} mode") pass
finally:
""" writer.close_writer()
Configures the object for the correct statistics type to be used.
"""
if self.mode == 'SST':
self.decoder = SSTPacket
self.collector = statistics_collector.SSTCollector
elif self.mode == 'BST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
elif self.mode == 'XST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
else:
# make sure the mode is valid
raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode))
def close_writer(self):
"""
Function that can be used to stop the writer without data loss.
"""
logger.debug("closing hdf5 file")
if self.file is not None:
if self.current_matrix is not None:
# Write matrix if one exists
# only creates file if there is a matrix to actually write
try:
self.write_matrix()
finally:
self.file.close()
logger.debug(f"{self.file} closed")
import argparse
from tcp_receiver import tcp_receiver
from statistics_writer import statistics_writer
import sys
import signal
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("statistics_writer")
parser = argparse.ArgumentParser(description='Arguments to configure the TCP connection and writer mode')
parser.add_argument('--address', type=str, help='the address to connect to')
parser.add_argument('--port', type=int, help='the port to use')
parser.add_argument('--interval', type=float, default=1, nargs="?", help='The time between creating new files in hours')
parser.add_argument('--location', type=str, default="data", nargs="?", help='specifies the folder to write all the files')
parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], help='sets the statistics type to be decoded options: "SST", "XST", "BST"')
parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='when used stores failed packets')
# create a data dumper that creates a new file every 10s (for testing)
if __name__ == "__main__":
def signal_handler(signum, frame):
writer.close_writer()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
args = parser.parse_args()
# argparse arguments
address = args.address
port = args.port
location = args.location
interval = args.interval
mode = args.mode
debug = args.debug
if debug:
logger.setLevel(logging.DEBUG)
logger.debug("Setting loglevel to DEBUG")
# creates the TCP receiver that is given to the writer
receiver = tcp_receiver(address, port)
# create the writer
writer = statistics_writer(new_file_time_interval=interval, file_location=location, statistics_mode=mode)
# start looping
try:
while True:
packet = receiver.get_packet()
writer.next_packet(packet)
except KeyboardInterrupt:
writer.close_writer()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment