Select Git revision
lofarinit.csh.in
-
Marcel Loose authored
Bug 1475: Use Python distutils to determine third-party extensions directory, instead of hardcoding 'site-packages' into it.
Marcel Loose authoredBug 1475: Use Python distutils to determine third-party extensions directory, instead of hardcoding 'site-packages' into it.
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hdf5_writer.py 7.85 KiB
# imports for working with datetime objects
from datetime import datetime, timedelta
import pytz
# python hdf5
import h5py
import numpy
import json
import logging
# import statistics classes with workaround
import sys
sys.path.append("..")
from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket
import devices.sdp.statistics_collector as statistics_collector
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("hdf5_writer")
__all__ = ["hdf5_writer"]
class hdf5_writer:
def __init__(self, new_file_time_interval, file_location, statistics_mode):
# all variables that deal with the SST matrix that's currently being decoded
self.current_matrix = None
self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
# the header of the first packet of a new matrix is written as metadata.
# Assumes all subsequent headers of the same matrix are identical (minus index)
self.statistics_header = None
# file handing
self.file_location = file_location
self.new_file_time_interval = timedelta(hours=new_file_time_interval)
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
self.file = None
# config the writer for the correct statistics type
self.collector = None
self.decoder = None
self.mode = statistics_mode.upper()
self.config_mode()
def next_packet(self, packet):
"""
All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets.
As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple
packets as well as storing matrices and starting new ones
The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp
it will close the current matrix, store it and start a new one.
"""
# process the packet
statistics_packet = self.decoder(packet)
if not self.statistics_header:
self.statistics_header = statistics_packet.header()
# grab the timestamp
statistics_timestamp = statistics_packet.timestamp()
# ignore packets with no timestamp, as they indicate FPGA processing was disabled
# and are useless anyway.
if statistics_packet.block_serial_number == 0:
logger.warning(f"Received statistics with no timestamp. Packet dropped.")
return
# check if te statistics timestamp is unexpectedly older than the current one
if statistics_timestamp < self.current_timestamp:
logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.")
return
# if this statistics packet has a new timestamp it means we need to start a new matrix
if statistics_timestamp > self.current_timestamp:
self.start_new_matrix(statistics_timestamp)
self.current_timestamp = statistics_timestamp
self.process_packet(packet)
def start_new_matrix(self, timestamp):
logger.info(f"starting new matrix with timestamp: {timestamp}")
"""
is called when a statistics packet with a newer timestamp is received.
Writes the matrix to the hdf5 file
Creates a new hdf5 file if needed
updates current timestamp and statistics matrix collector
"""
# write the finished (and checks if its the first matrix)
if self.current_matrix is not None:
try:
self.write_matrix()
except Exception as e:
time = self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3]
logger.exception(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped")
# only start a new file if its time AND we are done with the previous matrix.
if timestamp >= self.new_file_time_interval + self.last_file_time:
self.start_new_hdf5(timestamp)
# create a new and empty current_matrix
self.current_matrix = self.collector()
self.statistics_header = None
def write_matrix(self):
logger.info("writing matrix to file")
"""
Writes the finished matrix to the hdf5 file
"""
# create the new hdf5 group based on the timestamp of packets
current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3]))
# store the statistics values
current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"])
# might be optional, but they're easy to add.
current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"])
current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"])
# get the statistics header
header = self.statistics_header
# can't store datetime objects, convert to string instead
header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds")
# Stores the header of the packet received for this matrix as a list of atttributes
for k,v in header.items():
if type(v) == dict:
for subk, subv in v.items():
current_group.attrs[f"{k}_{subk}"] = subv
else:
current_group.attrs[k] = v
def process_packet(self, packet):
logger.debug(f"Processing packet")
"""
Adds the newly received statistics packet to the statistics matrix
"""
self.current_matrix.process_packet(packet)
def start_new_hdf5(self, timestamp):
if self.file is not None:
try:
self.file.close()
except Exception as e:
logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.")
current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5")
try:
self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w')
except Exception as e:
logger.exception(f"Error while creating new file")
raise e
self.last_file_time = timestamp
def config_mode(self):
logger.debug(f"attempting to configure {self.mode} mode")
"""
Configures the object for the correct statistics type to be used.
"""
if self.mode == 'SST':
self.decoder = SSTPacket
self.collector = statistics_collector.SSTCollector
elif self.mode == 'BST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
elif self.mode == 'XST':
# self.decoder = XSTPacket
raise NotImplementedError("BST collector has not yet been implemented")
else:
# make sure the mode is valid
raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode))
def close_writer(self):
"""
Function that can be used to stop the writer without data loss.
"""
logger.debug("closing hdf5 file")
if self.file is not None:
if self.current_matrix is not None:
# Write matrix if one exists
# only creates file if there is a matrix to actually write
try:
self.write_matrix()
finally:
self.file.close()
logger.debug(f"{self.file} closed")