Skip to content
Snippets Groups Projects
Commit ed3cdfbf authored by Taya Snijder's avatar Taya Snijder
Browse files

Merge branch 'master' of https://git.astron.nl/lofar2.0/tango

parents 07639432 d0f2f8de
No related branches found
No related tags found
1 merge request!145L2SS-397: renamed primary datavalues to just 'values' instead of sst/xst/bst _values...
...@@ -51,15 +51,12 @@ class OPCUAConnection(CommClient): ...@@ -51,15 +51,12 @@ class OPCUAConnection(CommClient):
# determine namespace used # determine namespace used
try:
if type(namespace) is str: if type(namespace) is str:
self.name_space_index = self.client.get_namespace_index(namespace) self.name_space_index = self.client.get_namespace_index(namespace)
elif type(namespace) is int: elif type(namespace) is int:
self.name_space_index = namespace self.name_space_index = namespace
else:
except Exception as e: raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}")
self.streams.error_stream("Could not determine namespace index from namespace: %s: %s", namespace, e)
raise Exception("Could not determine namespace index from namespace %s", namespace) from e
self.obj = self.client.get_objects_node() self.obj = self.client.get_objects_node()
self.check_nodes() self.check_nodes()
...@@ -135,6 +132,9 @@ class OPCUAConnection(CommClient): ...@@ -135,6 +132,9 @@ class OPCUAConnection(CommClient):
else: else:
raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation) raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
# prepend namespace index for each element if none is given
path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path]
try: try:
node = self.obj.get_child(path) node = self.obj.get_child(path)
except Exception as e: except Exception as e:
......
...@@ -12,17 +12,16 @@ and writing those matrices (as well as a bunch of metadata) to hdf5. ...@@ -12,17 +12,16 @@ and writing those matrices (as well as a bunch of metadata) to hdf5.
The TCP statistics writer can be called with the `tcp_hdf5_writer.py` script. The TCP statistics writer can be called with the `tcp_hdf5_writer.py` script.
This script can be called with the following arguments: This script can be called with the following arguments:
``` ```
--host the address to connect to -a --host the address to connect to
--port the port to use -p --port the port to use
--file file to read from (as opposed to host and port) -f --file file to read from (as opposed to host and port)
--interval The time between creating new files in hours -i --interval The time between creating new files in hours
--output_dir specifies the folder to write all the files -o --output_dir specifies the folder to write all the files
--mode sets the statistics type to be decoded options: "SST", "XST", "BST" -m --mode sets the statistics type to be decoded options: "SST", "XST", "BST"
--debug takes no arguments, when used prints a lot of extra data to help with debugging -v --debug takes no arguments, when used prints a lot of extra data to help with debugging
--fraction Configure the writer to only write one every here specified number of statistics. Saves file space -d --decimation Configure the writer to only store one every n samples. Saves storage space
``` ```
##HFD5 structure ##HFD5 structure
Statistics packets are collected by the StatisticsCollector in to a matrix. Once the matrix is done or a newer Statistics packets are collected by the StatisticsCollector in to a matrix. Once the matrix is done or a newer
timestamp arrives this matrix along with the header of first packet header, nof_payload_errors and nof_valid_payloads. timestamp arrives this matrix along with the header of first packet header, nof_payload_errors and nof_valid_payloads.
......
...@@ -15,8 +15,7 @@ from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, Stati ...@@ -15,8 +15,7 @@ from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, Stati
import devices.sdp.statistics_collector as statistics_collector import devices.sdp.statistics_collector as statistics_collector
logging.basicConfig(level=logging.INFO) logger = logging.getLogger("statistics_writer")
logger = logging.getLogger("hdf5_writer")
__all__ = ["hdf5_writer"] __all__ = ["hdf5_writer"]
...@@ -26,14 +25,14 @@ class hdf5_writer: ...@@ -26,14 +25,14 @@ class hdf5_writer:
XST_MODE = "XST" XST_MODE = "XST"
BST_MODE = "BST" BST_MODE = "BST"
def __init__(self, new_file_time_interval, file_location, statistics_mode, store_fraction): def __init__(self, new_file_time_interval, file_location, statistics_mode, decimation_factor):
# all variables that deal with the matrix that's currently being decoded # all variables that deal with the matrix that's currently being decoded
self.current_matrix = None self.current_matrix = None
self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC) self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
# counter that tracks how many statistics have been received # counter that tracks how many statistics have been received
self.statistics_counter = -1 self.statistics_counter = 0
# the header of the first packet of a new matrix is written as metadata. # the header of the first packet of a new matrix is written as metadata.
# Assumes all subsequent headers of the same matrix are identical (minus index) # Assumes all subsequent headers of the same matrix are identical (minus index)
...@@ -41,7 +40,7 @@ class hdf5_writer: ...@@ -41,7 +40,7 @@ class hdf5_writer:
# file handing # file handing
self.file_location = file_location self.file_location = file_location
self.store_fraction = store_fraction self.decimation_factor = decimation_factor
self.new_file_time_interval = timedelta(seconds=new_file_time_interval) self.new_file_time_interval = timedelta(seconds=new_file_time_interval)
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
self.file = None self.file = None
...@@ -97,16 +96,19 @@ class hdf5_writer: ...@@ -97,16 +96,19 @@ class hdf5_writer:
Creates a new hdf5 file if needed Creates a new hdf5 file if needed
updates current timestamp and statistics matrix collector updates current timestamp and statistics matrix collector
""" """
# received new statistic, so increment counter
self.statistics_counter += 1
# only write the specified fraction of statistics, skip the rest # only write the specified fraction of statistics, skip the rest
if self.statistics_counter % self.store_fraction != 0: if self.statistics_counter % self.decimation_factor != 0:
logger.info(f"Skipping statistic with timestamp: {timestamp}. Only writing 1/{self.store_fraction} statistics") logger.debug(f"Skipping statistic with timestamp: {timestamp}. Only writing 1/{self.decimation_factor} statistics")
# increment even though its skipped
self.statistics_counter += 1
return return
logger.info(f"starting new matrix with timestamp: {timestamp}") # received new statistic, so increment counter
self.statistics_counter += 1
logger.debug(f"starting new matrix with timestamp: {timestamp}")
# write the finished (and checks if its the first matrix) # write the finished (and checks if its the first matrix)
if self.current_matrix is not None: if self.current_matrix is not None:
...@@ -125,7 +127,7 @@ class hdf5_writer: ...@@ -125,7 +127,7 @@ class hdf5_writer:
self.statistics_header = None self.statistics_header = None
def write_matrix(self): def write_matrix(self):
logger.info("writing matrix to file") logger.debug("writing matrix to file")
""" """
Writes the finished matrix to the hdf5 file Writes the finished matrix to the hdf5 file
""" """
...@@ -167,12 +169,11 @@ class hdf5_writer: ...@@ -167,12 +169,11 @@ class hdf5_writer:
def process_packet(self, packet): def process_packet(self, packet):
logger.debug(f"Processing packet")
""" """
Adds the newly received statistics packet to the statistics matrix Adds the newly received statistics packet to the statistics matrix
""" """
# only process the packets of the wanted fraction # only process the packets of the wanted fraction
if self.statistics_counter % self.store_fraction != 0: if self.statistics_counter % self.decimation_factor != 0:
return return
self.current_matrix.process_packet(packet) self.current_matrix.process_packet(packet)
...@@ -235,5 +236,7 @@ class hdf5_writer: ...@@ -235,5 +236,7 @@ class hdf5_writer:
try: try:
self.write_matrix() self.write_matrix()
finally: finally:
filename = str(self.file)
self.file.close() self.file.close()
logger.debug(f"{self.file} closed") logger.debug(f"{filename} closed")
logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ")
...@@ -10,18 +10,17 @@ logging.basicConfig(level=logging.INFO) ...@@ -10,18 +10,17 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("statistics_writer") logger = logging.getLogger("statistics_writer")
parser = argparse.ArgumentParser(description='Converts a stream of statistics packets into HDF5 files.') parser = argparse.ArgumentParser(description='Converts a stream of statistics packets into HDF5 files.')
parser.add_argument('--host', type=str, help='the host to connect to') parser.add_argument('-a', '--host', type=str, help='the host to connect to')
parser.add_argument('--port', type=int, default=0, help='the port to connect to, or 0 to use default port for the selected mode (default: %(default)s)') parser.add_argument('-p', '--port', type=int, default=0, help='the port to connect to, or 0 to use default port for the selected mode (default: %(default)s)')
parser.add_argument('--file', type=str, help='the file to read from') parser.add_argument('-f', '--file', type=str, help='the file to read from')
parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: %(default)s)') parser.add_argument('-m', '--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: %(default)s)')
parser.add_argument('--interval', type=float, default=3600, nargs="?", help='The time between creating new files in seconds (default: %(default)s)') parser.add_argument('-i', '--interval', type=float, default=3600, nargs="?", help='The time between creating new files in seconds (default: %(default)s)')
parser.add_argument('--output_dir', type=str, default=".", nargs="?", help='specifies the folder to write all the files (default: %(default)s)') parser.add_argument('-o', '--output_dir', type=str, default=".", nargs="?", help='specifies the folder to write all the files (default: %(default)s)')
parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='increase log output') parser.add_argument('-v', '--debug', dest='debug', action='store_true', default=False, help='increase log output')
parser.add_argument('--fraction', type=int, default=1, help='Fraction of statistics written to file to save space. When used only writes 1/n statistics') parser.add_argument('-d', '--decimation', type=int, default=1, help='Configure the writer to only store one every n samples. Saves storage space')
# create a data dumper that creates a new file every 10s (for testing)
if __name__ == "__main__": if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
...@@ -33,7 +32,10 @@ if __name__ == "__main__": ...@@ -33,7 +32,10 @@ if __name__ == "__main__":
interval = args.interval interval = args.interval
mode = args.mode mode = args.mode
debug = args.debug debug = args.debug
fraction = args.fraction decimation = args.decimation
if decimation < 1:
raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ")
if port == 0: if port == 0:
default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 } default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 }
...@@ -53,7 +55,7 @@ if __name__ == "__main__": ...@@ -53,7 +55,7 @@ if __name__ == "__main__":
sys.exit(1) sys.exit(1)
# create the writer # create the writer
writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode, store_fraction=fraction) writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode, decimation_factor=decimation)
# start looping # start looping
try: try:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment