Skip to content
Snippets Groups Projects
Select Git revision
  • 90b5fa5b3fd3e09fb6ec9600a99ad6f3278b5ecc
  • master default protected
  • L2SS-1957-remove-pcon-control
  • expose-prometheus
  • stabilise-landing-page
  • all-stations-lofar2
  • L2SS-2357-fix-ruff
  • control-single-hba-and-lba
  • v0.39.7-backports
  • Move-sdptr-to-v1.5.0
  • fix-build-ubuntu
  • tokens-in-env-files
  • fix-build
  • L2SS-2214-deploy-cdb
  • fix-missing-init
  • add-power-hardware-apply
  • L2SS-2129-Add-Subrack-Routine
  • Also-listen-internal-to-rpc
  • fix-build-dind
  • L2SS-2153--Improve-Error-Handling
  • L2SS-2153-Add-Grpc-Gateway-support
  • v0.52.2-rc3 protected
  • v0.52.2-rc2 protected
  • v0.52.2-rc1 protected
  • v0.52.1.1 protected
  • v0.52.1 protected
  • v0.52.1-rc1 protected
  • v0.51.9-6 protected
  • v0.51.9-5 protected
  • v0.51.9-4 protected
  • v0.51.9-3 protected
  • v0.51.9-2 protected
  • v0.51.9-1 protected
  • v0.51.9 protected
  • v0.51.8 protected
  • v0.39.15-wsrttwo protected
  • v0.39.15-wsrt protected
  • v0.39.14-wsrt protected
  • v0.51.6 protected
  • v0.51.5-1 protected
  • v0.51.5 protected
41 results

README.md

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    hdf5_writer.py 7.85 KiB
    # imports for working with datetime objects
    from datetime import datetime, timedelta
    import pytz
    
    # python hdf5
    import h5py
    
    import numpy
    import json
    import logging
    
    # import statistics classes with workaround
    import sys
    sys.path.append("..")
    from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, StatisticsPacket
    import devices.sdp.statistics_collector as statistics_collector
    
    
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger("hdf5_writer")
    
    __all__ = ["hdf5_writer"]
    
    class hdf5_writer:
    
    
        def __init__(self, new_file_time_interval, file_location, statistics_mode):
    
            # all variables that deal with the SST matrix that's currently being decoded
            self.current_matrix = None
            self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
    
            # the header of the first packet of a new matrix is written as metadata.
            # Assumes all subsequent headers of the same matrix are identical (minus index)
            self.statistics_header = None
    
            # file handing
            self.file_location = file_location
            self.new_file_time_interval = timedelta(hours=new_file_time_interval)
            self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
            self.file = None
    
            # config the writer for the correct statistics type
            self.collector = None
            self.decoder = None
            self.mode = statistics_mode.upper()
            self.config_mode()
    
        def next_packet(self, packet):
            """
            All statistics packets come with a timestamp of the time they were measured. All the values will be spread across multiple packets.
            As long as the timestamp is the same they belong in the same matrix. This code handles collecting the matrix from those multiple
            packets as well as storing matrices and starting new ones
    
            The code receives new packets and checks the statistics timestamp of them. If the timestamp is higher than the current timestamp
            it will close the current matrix, store it and start a new one.
            """
    
            # process the packet
            statistics_packet = self.decoder(packet)
    
            if not self.statistics_header:
                self.statistics_header = statistics_packet.header()
    
            # grab the timestamp
            statistics_timestamp = statistics_packet.timestamp()
    
            # ignore packets with no timestamp, as they indicate FPGA processing was disabled
            # and are useless anyway.
            if statistics_packet.block_serial_number == 0:
                logger.warning(f"Received statistics with no timestamp. Packet dropped.")
                return
    
            # check if te statistics timestamp is unexpectedly older than the current one
            if statistics_timestamp < self.current_timestamp:
                logger.warning(f"Received statistics with earlier timestamp than is currently being processed ({statistics_timestamp}). Packet dropped.")
                return
    
            # if this statistics packet has a new timestamp it means we need to start a new matrix
            if statistics_timestamp > self.current_timestamp:
                self.start_new_matrix(statistics_timestamp)
                self.current_timestamp = statistics_timestamp
    
            self.process_packet(packet)
    
        def start_new_matrix(self, timestamp):
            logger.info(f"starting new matrix with timestamp: {timestamp}")
            """
            is called when a statistics packet with a newer timestamp is received.
            Writes the matrix to the hdf5 file
            Creates a new hdf5 file if needed
            updates current timestamp and statistics matrix collector
            """
    
            # write the finished (and checks if its the first matrix)
            if self.current_matrix is not None:
                try:
                    self.write_matrix()
                except Exception as e:
                    time = self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3]
                    logger.exception(f"Exception while attempting to write matrix to HDF5. Matrix: {time} dropped")
    
            # only start a new file if its time AND we are done with the previous matrix.
            if timestamp >= self.new_file_time_interval + self.last_file_time:
                self.start_new_hdf5(timestamp)
    
            # create a new and empty current_matrix
            self.current_matrix = self.collector()
            self.statistics_header = None
    
        def write_matrix(self):
            logger.info("writing matrix to file")
            """
            Writes the finished matrix to the hdf5 file
            """
    
            # create the new hdf5 group based on the timestamp of packets
            current_group = self.file.create_group("{}_{}".format(self.mode, self.current_timestamp.strftime("%Y-%m-%d-%H-%M-%S-%f")[:-3]))
    
            # store the statistics values
            current_group.create_dataset(name=f"{self.mode}_values", data=self.current_matrix.parameters["sst_values"])
    
            # might be optional, but they're easy to add.
            current_group.create_dataset(name="nof_payload_errors", data=self.current_matrix.parameters["nof_payload_errors"])
            current_group.create_dataset(name="nof_valid_payloads", data=self.current_matrix.parameters["nof_valid_payloads"])
    
            # get the statistics header
            header = self.statistics_header
    
            # can't store datetime objects, convert to string instead
            header["timestamp"] = header["timestamp"].isoformat(timespec="milliseconds")
    
            # Stores the header of the packet received for this matrix as a list of atttributes
            for k,v in header.items():
                if type(v) == dict:
                    for subk, subv in v.items():
                        current_group.attrs[f"{k}_{subk}"] = subv
                else:
                    current_group.attrs[k] = v
    
    
        def process_packet(self, packet):
            logger.debug(f"Processing packet")
            """
            Adds the newly received statistics packet to the statistics matrix
            """
            self.current_matrix.process_packet(packet)
    
        def start_new_hdf5(self, timestamp):
    
            if self.file is not None:
                try:
                    self.file.close()
                except Exception as e:
                    logger.exception(f"Error while attempting to close hdf5 file to disk. file {self.file} likely empty, please verify integrity.")
    
            current_time = str(timestamp.strftime("%Y-%m-%d-%H-%M-%S"))
            logger.info(f"creating new file: {self.file_location}/{self.mode}_{current_time}.h5")
    
            try:
                self.file = h5py.File(f"{self.file_location}/{self.mode}_{current_time}.h5", 'w')
            except Exception as e:
                logger.exception(f"Error while creating new file")
                raise e
    
            self.last_file_time = timestamp
    
        def config_mode(self):
            logger.debug(f"attempting to configure {self.mode} mode")
    
            """
            Configures the object for the correct statistics type to be used.
            """
    
            if self.mode == 'SST':
                self.decoder = SSTPacket
                self.collector = statistics_collector.SSTCollector
            elif self.mode == 'BST':
                # self.decoder = XSTPacket
                raise NotImplementedError("BST collector has not yet been implemented")
            elif self.mode == 'XST':
                # self.decoder = XSTPacket
                raise NotImplementedError("BST collector has not yet been implemented")
            else:
                # make sure the mode is valid
                raise ValueError("invalid statistics mode specified '{}', please use 'SST', 'XST' or 'BST' ".format(self.mode))
    
        def close_writer(self):
            """
            Function that can be used to stop the writer without data loss.
            """
            logger.debug("closing hdf5 file")
            if self.file is not None:
                if self.current_matrix is not None:
                    # Write matrix if one exists
                    # only creates file if there is a matrix to actually write
                    try:
                        self.write_matrix()
                    finally:
                        self.file.close()
                        logger.debug(f"{self.file} closed")