Skip to content
Snippets Groups Projects
Commit 9792e012 authored by Stefano Di Frischia's avatar Stefano Di Frischia
Browse files

Merge branch 'master' into L2SS-362-Switch-Env-Mode

parents 6f121603 1e6be6bc
No related branches found
No related tags found
1 merge request!133Resolve L2SS-362 "Switch env mode"
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
"recv-sim" "recv-sim"
], ],
"OPC_Server_Port": [ "OPC_Server_Port": [
"4842" "4843"
], ],
"OPC_Time_Out": [ "OPC_Time_Out": [
"5.0" "5.0"
...@@ -250,10 +250,10 @@ ...@@ -250,10 +250,10 @@
"LTS/UNB2/1": { "LTS/UNB2/1": {
"properties": { "properties": {
"OPC_Server_Name": [ "OPC_Server_Name": [
"despi.astron.nl" "unb2-sim"
], ],
"OPC_Server_Port": [ "OPC_Server_Port": [
"4842" "4844"
], ],
"OPC_Time_Out": [ "OPC_Time_Out": [
"5.0" "5.0"
......
...@@ -51,15 +51,12 @@ class OPCUAConnection(CommClient): ...@@ -51,15 +51,12 @@ class OPCUAConnection(CommClient):
# determine namespace used # determine namespace used
try:
if type(namespace) is str: if type(namespace) is str:
self.name_space_index = self.client.get_namespace_index(namespace) self.name_space_index = self.client.get_namespace_index(namespace)
elif type(namespace) is int: elif type(namespace) is int:
self.name_space_index = namespace self.name_space_index = namespace
else:
except Exception as e: raise TypeError(f"namespace must be of type str or int, but is of type {type(namespace).__name__}")
self.streams.error_stream("Could not determine namespace index from namespace: %s: %s", namespace, e)
raise Exception("Could not determine namespace index from namespace %s", namespace) from e
self.obj = self.client.get_objects_node() self.obj = self.client.get_objects_node()
self.check_nodes() self.check_nodes()
...@@ -135,6 +132,9 @@ class OPCUAConnection(CommClient): ...@@ -135,6 +132,9 @@ class OPCUAConnection(CommClient):
else: else:
raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation) raise Exception("OPC-ua mapping requires either a list of the path or dict with the path. Was given %s type containing: %s", type(annotation), annotation)
# prepend namespace index for each element if none is given
path = [name if ':' in name else f'{self.name_space_index}:{name}' for name in path]
try: try:
node = self.obj.get_child(path) node = self.obj.get_child(path)
except Exception as e: except Exception as e:
......
...@@ -12,16 +12,16 @@ and writing those matrices (as well as a bunch of metadata) to hdf5. ...@@ -12,16 +12,16 @@ and writing those matrices (as well as a bunch of metadata) to hdf5.
The TCP statistics writer can be called with the `tcp_hdf5_writer.py` script. The TCP statistics writer can be called with the `tcp_hdf5_writer.py` script.
This script can be called with the following arguments: This script can be called with the following arguments:
``` ```
--host the address to connect to -a --host the address to connect to
--port the port to use -p --port the port to use
--file file to read from (as opposed to host and port) -f --file file to read from (as opposed to host and port)
--interval The time between creating new files in hours -i --interval The time between creating new files in hours
--output_dir specifies the folder to write all the files -o --output_dir specifies the folder to write all the files
--mode sets the statistics type to be decoded options: "SST", "XST", "BST" -m --mode sets the statistics type to be decoded options: "SST", "XST", "BST"
--debug takes no arguments, when used prints a lot of extra data to help with debugging -v --debug takes no arguments, when used prints a lot of extra data to help with debugging
-d --decimation Configure the writer to only store one every n samples. Saves storage space
``` ```
##HFD5 structure ##HFD5 structure
Statistics packets are collected by the StatisticsCollector in to a matrix. Once the matrix is done or a newer Statistics packets are collected by the StatisticsCollector in to a matrix. Once the matrix is done or a newer
timestamp arrives this matrix along with the header of first packet header, nof_payload_errors and nof_valid_payloads. timestamp arrives this matrix along with the header of first packet header, nof_payload_errors and nof_valid_payloads.
......
...@@ -15,8 +15,7 @@ from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, Stati ...@@ -15,8 +15,7 @@ from devices.sdp.statistics_packet import SSTPacket, XSTPacket, BSTPacket, Stati
import devices.sdp.statistics_collector as statistics_collector import devices.sdp.statistics_collector as statistics_collector
logging.basicConfig(level=logging.INFO) logger = logging.getLogger("statistics_writer")
logger = logging.getLogger("hdf5_writer")
__all__ = ["hdf5_writer"] __all__ = ["hdf5_writer"]
...@@ -26,19 +25,22 @@ class hdf5_writer: ...@@ -26,19 +25,22 @@ class hdf5_writer:
XST_MODE = "XST" XST_MODE = "XST"
BST_MODE = "BST" BST_MODE = "BST"
def __init__(self, new_file_time_interval, file_location, statistics_mode, decimation_factor):
def __init__(self, new_file_time_interval, file_location, statistics_mode):
# all variables that deal with the matrix that's currently being decoded # all variables that deal with the matrix that's currently being decoded
self.current_matrix = None self.current_matrix = None
self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC) self.current_timestamp = datetime.min.replace(tzinfo=pytz.UTC)
# counter that tracks how many statistics have been received
self.statistics_counter = 0
# the header of the first packet of a new matrix is written as metadata. # the header of the first packet of a new matrix is written as metadata.
# Assumes all subsequent headers of the same matrix are identical (minus index) # Assumes all subsequent headers of the same matrix are identical (minus index)
self.statistics_header = None self.statistics_header = None
# file handing # file handing
self.file_location = file_location self.file_location = file_location
self.decimation_factor = decimation_factor
self.new_file_time_interval = timedelta(seconds=new_file_time_interval) self.new_file_time_interval = timedelta(seconds=new_file_time_interval)
self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC) self.last_file_time = datetime.min.replace(tzinfo=pytz.UTC)
self.file = None self.file = None
...@@ -88,7 +90,6 @@ class hdf5_writer: ...@@ -88,7 +90,6 @@ class hdf5_writer:
self.process_packet(packet) self.process_packet(packet)
def start_new_matrix(self, timestamp): def start_new_matrix(self, timestamp):
logger.info(f"starting new matrix with timestamp: {timestamp}")
""" """
is called when a statistics packet with a newer timestamp is received. is called when a statistics packet with a newer timestamp is received.
Writes the matrix to the hdf5 file Writes the matrix to the hdf5 file
...@@ -96,6 +97,19 @@ class hdf5_writer: ...@@ -96,6 +97,19 @@ class hdf5_writer:
updates current timestamp and statistics matrix collector updates current timestamp and statistics matrix collector
""" """
# only write the specified fraction of statistics, skip the rest
if self.statistics_counter % self.decimation_factor != 0:
logger.debug(f"Skipping statistic with timestamp: {timestamp}. Only writing 1/{self.decimation_factor} statistics")
# increment even though its skipped
self.statistics_counter += 1
return
# received new statistic, so increment counter
self.statistics_counter += 1
logger.debug(f"starting new matrix with timestamp: {timestamp}")
# write the finished (and checks if its the first matrix) # write the finished (and checks if its the first matrix)
if self.current_matrix is not None: if self.current_matrix is not None:
try: try:
...@@ -113,7 +127,7 @@ class hdf5_writer: ...@@ -113,7 +127,7 @@ class hdf5_writer:
self.statistics_header = None self.statistics_header = None
def write_matrix(self): def write_matrix(self):
logger.info("writing matrix to file") logger.debug("writing matrix to file")
""" """
Writes the finished matrix to the hdf5 file Writes the finished matrix to the hdf5 file
""" """
...@@ -155,10 +169,13 @@ class hdf5_writer: ...@@ -155,10 +169,13 @@ class hdf5_writer:
def process_packet(self, packet): def process_packet(self, packet):
logger.debug(f"Processing packet")
""" """
Adds the newly received statistics packet to the statistics matrix Adds the newly received statistics packet to the statistics matrix
""" """
# only process the packets of the wanted fraction
if self.statistics_counter % self.decimation_factor != 0:
return
self.current_matrix.process_packet(packet) self.current_matrix.process_packet(packet)
def start_new_hdf5(self, timestamp): def start_new_hdf5(self, timestamp):
...@@ -219,5 +236,7 @@ class hdf5_writer: ...@@ -219,5 +236,7 @@ class hdf5_writer:
try: try:
self.write_matrix() self.write_matrix()
finally: finally:
filename = str(self.file)
self.file.close() self.file.close()
logger.debug(f"{self.file} closed") logger.debug(f"{filename} closed")
logger.debug(f"Received a total of {self.statistics_counter} statistics while running. With {int(self.statistics_counter/self.decimation_factor)} written to disk ")
...@@ -10,17 +10,17 @@ logging.basicConfig(level=logging.INFO) ...@@ -10,17 +10,17 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("statistics_writer") logger = logging.getLogger("statistics_writer")
parser = argparse.ArgumentParser(description='Converts a stream of statistics packets into HDF5 files.') parser = argparse.ArgumentParser(description='Converts a stream of statistics packets into HDF5 files.')
parser.add_argument('--host', type=str, help='the host to connect to') parser.add_argument('-a', '--host', type=str, help='the host to connect to')
parser.add_argument('--port', type=int, default=0, help='the port to connect to, or 0 to use default port for the selected mode (default: %(default)s)') parser.add_argument('-p', '--port', type=int, default=0, help='the port to connect to, or 0 to use default port for the selected mode (default: %(default)s)')
parser.add_argument('--file', type=str, help='the file to read from') parser.add_argument('-f', '--file', type=str, help='the file to read from')
parser.add_argument('--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: %(default)s)') parser.add_argument('-m', '--mode', type=str, choices=['SST', 'XST', 'BST'], default='SST', help='sets the statistics type to be decoded options (default: %(default)s)')
parser.add_argument('--interval', type=float, default=3600, nargs="?", help='The time between creating new files in seconds (default: %(default)s)') parser.add_argument('-i', '--interval', type=float, default=3600, nargs="?", help='The time between creating new files in seconds (default: %(default)s)')
parser.add_argument('--output_dir', type=str, default=".", nargs="?", help='specifies the folder to write all the files (default: %(default)s)') parser.add_argument('-o', '--output_dir', type=str, default=".", nargs="?", help='specifies the folder to write all the files (default: %(default)s)')
parser.add_argument('--debug', dest='debug', action='store_true', default=False, help='increase log output') parser.add_argument('-v', '--debug', dest='debug', action='store_true', default=False, help='increase log output')
parser.add_argument('-d', '--decimation', type=int, default=1, help='Configure the writer to only store one every n samples. Saves storage space')
# create a data dumper that creates a new file every 10s (for testing)
if __name__ == "__main__": if __name__ == "__main__":
args = parser.parse_args() args = parser.parse_args()
...@@ -32,6 +32,10 @@ if __name__ == "__main__": ...@@ -32,6 +32,10 @@ if __name__ == "__main__":
interval = args.interval interval = args.interval
mode = args.mode mode = args.mode
debug = args.debug debug = args.debug
decimation = args.decimation
if decimation < 1:
raise ValueError("Please use an integer --Decimation value 1 or higher to only store one every n statistics' ")
if port == 0: if port == 0:
default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 } default_ports = { "SST": 5101, "XST": 5102, "BST": 5103 }
...@@ -51,7 +55,7 @@ if __name__ == "__main__": ...@@ -51,7 +55,7 @@ if __name__ == "__main__":
sys.exit(1) sys.exit(1)
# create the writer # create the writer
writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode) writer = hdf5_writer(new_file_time_interval=interval, file_location=output_dir, statistics_mode=mode, decimation_factor=decimation)
# start looping # start looping
try: try:
......
...@@ -8,7 +8,7 @@ fi ...@@ -8,7 +8,7 @@ fi
# Start and stop sequence # Start and stop sequence
cd "$LOFAR20_DIR/docker-compose" || exit 1 cd "$LOFAR20_DIR/docker-compose" || exit 1
make stop device-sdp device-recv device-sst device-unb2 sdptr-sim recv-sim unb2-sim make stop device-sdp device-recv device-sst device-unb2 device-xst sdptr-sim recv-sim unb2-sim
make start databaseds dsconfig jupyter elk make start databaseds dsconfig jupyter elk
# Give dsconfig and databaseds time to start # Give dsconfig and databaseds time to start
...@@ -19,12 +19,12 @@ cd "$TANGO_LOFAR_LOCAL_DIR" || exit 1 ...@@ -19,12 +19,12 @@ cd "$TANGO_LOFAR_LOCAL_DIR" || exit 1
sbin/update_ConfigDb.sh CDB/integration_ConfigDb.json sbin/update_ConfigDb.sh CDB/integration_ConfigDb.json
cd "$LOFAR20_DIR/docker-compose" || exit 1 cd "$LOFAR20_DIR/docker-compose" || exit 1
make start sdptr-sim recv-sim make start sdptr-sim recv-sim unb2-sim
# Give the simulators time to start # Give the simulators time to start
sleep 5 sleep 5
make start device-sdp device-recv device-sst device-unb2 make start device-sdp device-recv device-sst device-unb2 device-xst
# Give the devices time to start # Give the devices time to start
sleep 5 sleep 5
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment