diff --git a/README.md b/README.md new file mode 100644 index 0000000000000000000000000000000000000000..689d62f0ea58b440693a42055a63512e31998778 --- /dev/null +++ b/README.md @@ -0,0 +1,77 @@ +# Station Digital Processor Translator (SDPTR) +The SDPTR translates between the OPCUA interface [1] with the Local Control Unit (LCU) machine of the user and the UniBoard Control Protocol (UPC) interface with the memory mapped registers interface [2] on the FPGAs of the UniBoard2 in a LOFAR2 Station. + + +## 1. Build OPCUA lib + SDPTR [1] +Build OPCUA lib in git/sdptr: +``` +./build_open62541_lib +``` +Build SDPTR in git/sdptr. First time: +``` +./build_sdptr_program --> result: sdptr executable in git/sdptr/src +``` +If OPCUA lib is not here yet, then it will also build that first. Next times when file changed do: +``` +make +``` + + +## 2. Simulate SDPTR with UniBoard2 stub and test scripts: +Start SDPTR in a terminal: + +``` +git/sdptr/src$ ./sdptr -s -g 0 -f 4 -d +``` + - Using ./ forces to look for SDPTR executable in this directory instead of in search environment PATH + - Run -d is run not as deamon, to see logging in terminal + - With -d the SDPTR can be closed using ctrl-C. + +Start UniBoard2 stub in a terminal: +``` +git/sdptr/test/py/stub$ ./unb2_stub.py -h +``` + - Uses test/py/stub/unb2_stub.map for memory map of the FPGA + - ctrl-C stops the stub and the SDPTR will disconnect + - after restart the stub the SDPTR will reconnect + +Do tests in terminal: +``` +sdp_rw.py --host localhost -l +``` + +## 3. Run SDPTR with UniBoard2 hardware +Run SDPTR e.g. on test machine dop421 = 10.87.0.221 and connect via 1GbE network card with one UniBoard2 (with 4 FPGAs). + +Stop SDPTR deamon: +``` +ps -ef|grep sdptr +kill process_number +``` +Start SDPTR deamon in git/sdptr/src: +``` +./sdptr --port 4840 -g 0 -f 4 +ps -ef|grep sdptr +``` +Access SDPTR and run test from other computer in network. First setup Python virtual environment from git/sdptr: +``` +. ./init_sdptr_python +``` +then e.g. use sdp_rw.py to access control points and monitor points [1]: +``` +sdp_rw.py --host 10.87.0.221 --port 4840 -l version +sdp_rw.py --host 10.87.0.221 --port 4840 -r software_version +``` + +## 4. Run on LCU +* Station Control (SC) builds SDPTR executable from tag in Gitlab at [3] +* For SDPTR and SDPFW without TBuf functionality use tag number: v1.5.0 + + +## References +1. https://support.astron.nl/confluence/display/L2M/L2+STAT+Decision%3A+SC+-+SDP+OPC-UA+interface +2. https://support.astron.nl/confluence/pages/viewpage.action?spaceKey=L2M&title=L3+SDP+Decision%3A+SDPTR+-+SDPFW+register+map+interface +3. https://git.astron.nl/lofar2.0/sdptr/-/tags +4. readme_sdptr_cpp.md +5. readme_sdptr_jupyter.md +6. readme_sdptr_regressiontest.md diff --git a/readme_sdptr_cpp,md b/readme_sdptr_cpp.md similarity index 100% rename from readme_sdptr_cpp,md rename to readme_sdptr_cpp.md diff --git a/restart_sdptr b/restart_sdptr deleted file mode 100755 index 88864fc6784ab217ae2b8fc507fd5135e7a483ec..0000000000000000000000000000000000000000 --- a/restart_sdptr +++ /dev/null @@ -1,30 +0,0 @@ -#!/usr/bin/env bash - -# check if this pi is for running the sdp translator -if [[ ${HOSTNAME} == *"sdptr"* ]]; then - # get proccess id of sdptr-lba and kill - case "$(pidof sdptr-lba | wc -w)" in - 1) echo "Stop LBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - kill $(pidof sdptr-lba | awk '{print $0}') 2>&1 & - sleep 1 - ;; - *) echo "LBA SDP not running" >> /usr/local/bin/sdptr.txt - ;; - esac - # start sdptr-lba again - echo "Start LBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - /usr/local/bin/sdptr-lba --port 4840 --first_gn 0 --fpgas 16 2>&1 - - # get proccess id of sdptr-hba and kill - case "$(pidof sdptr-hba | wc -w)" in - 1) echo "Stop HBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - kill $(pidof sdptr-hba | awk '{print $0}') 2>&1 & - sleep 1 - ;; - *) echo "HBA SDP not running" >> /usr/local/bin/sdptr.txt - ;; - esac - # start sdptr-hba again - echo "Start HBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - /usr/local/bin/sdptr-hba --port 4842 --first_gn 16 --fpgas 16 2>&1 -fi diff --git a/run_sdptr b/run_sdptr deleted file mode 100755 index 042fe9437a2ed2893a318a7f462a12389355e1ae..0000000000000000000000000000000000000000 --- a/run_sdptr +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env bash - -# check if this pi is for running the sdp translator -if [[ ${HOSTNAME} == *"sdptr"* ]]; then - case "$(pidof sdptr-lba | wc -w)" in - 0) echo "Restarting LBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - /usr/local/bin/sdptr-lba --port 4840 --first_gn 0 --fpgas 16 & - ;; - 1) # all ok - ;; - *) echo "Removed double LBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - kill $(pidof sdptr-lba | awk '{print $1}') - ;; - esac - - case "$(pidof sdptr-hba | wc -w)" in - 0) echo "Restarting HBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - /usr/local/bin/sdptr-hba --port 4842 --first_gn 16 --fpgas 16 & - ;; - 1) # all ok - ;; - *) echo "Removed double HBA SDP Translator: $(date)" >> /usr/local/bin/sdptr.txt - kill $(pidof sdptr-hba | awk '{print $1}') - ;; - esac -fi diff --git a/src/periph/fpga.cpp b/src/periph/fpga.cpp index b77f50e757c9e37384c709f8d16ba9b3eee1f7c4..5995ea7abb4bc0d98c9b569d8be0eb9a50b6c1b3 100644 --- a/src/periph/fpga.cpp +++ b/src/periph/fpga.cpp @@ -3290,6 +3290,7 @@ bool Periph_fpga::write_wg_enable(const char *data) scheduled_bsn = (uint64_t)wg_scheduled_bsn; LOG_F(INFO, "use wg_scheduled_bsn"); } + wg_scheduled_bsn = (int64_t)scheduled_bsn; LOG_F(INFO, "new bsn=%lu", (long)scheduled_bsn); sdp_data[0] = (uint32_t)(scheduled_bsn & 0xffffffff); sdp_data[1] = (uint32_t)((scheduled_bsn >> 32) & 0xffffffff); diff --git a/test/py/base/transient_reader.py b/test/py/base/transient_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..a8cb8e8305a230a0d3cf36cc0f1cbadc98849284 --- /dev/null +++ b/test/py/base/transient_reader.py @@ -0,0 +1,750 @@ +############################################################################### +# +# Copyright (C) 2012 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# +############################################################################### + +# Author: P.Donker +# Date : mei 2025 created + +""" + tbuf reader for transient data + + -------------------------------------------------------------------------------------- + | Tbuf Reader class (used by user) | + | | + | - start/stop all processes and threads | + | - to get received data the StreamReader.data.functions() can be used | + -------------------------------------------------------------------------------------- + [start] [start] [start] ^ + [stop] [stop] [stop] | + | n_workers | | + | | | | + | v | | + | ----------- | | + | | Unpack | | | + v --->| Transient |---> v [functions] + --------- | | Process | | ------------ + | Udp | | ----------- | | Transient | + | Reader | --> [raw_data] -- : --> [unpacket_data] --> | Data | + | Process | queue | ----------- | queue | Thread | + --------- | | Unpack | | ------------ + --->| Transient |---> + | Process | + ----------- + +The TransientReader class is the class that is included by the user, it starts the needed processes +and threads. The received data can be accessed using functions of data. +example: + transient_reader = TransientReader(port=5001) + transient_reader.start() + + received_rsn_numbers = transient_reader.data.get_all_rsn() + received_rsn_indexes = transient_reader.data.get_all_index(received_rsn_numbers[0]) + + header = transient_reader.data.get_header(rsn, index) + data = transient_reader.data.get_data(rsn, index) + + transient_reader.stop() + +It is live data, so the returned get_all_rsn() list is growing. + +References: +[1] ICD SC-SDP packet definition in +https://plm.astron.nl/polarion/#/project/LOFAR2System/wiki/L2%20Interface%20Control%20Documents/SC%20to%20SDP%20ICD + +[2] ICD STAT-SDP inbeamlet output packet definition in +https://plm.astron.nl/polarion/#/project/LOFAR2System/wiki/L1%20Interface%20Control%20Documents/STAT%20to%20CEP%20ICD + +""" +import struct +import logging +import time +import datetime +import random +from multiprocessing import Process, RLock, active_children +from threading import Thread +from queue import Empty, Full +from faster_fifo import Queue + +from .udp import UdpServer +from .handle_exceptions import handle_exception +from .attribute_dict import AttributeDict + +MARKER_TYPES = {0x72: "RAW"} + + +def extract_antenna_field_index(station_info): + """Extract antenna_field_index[5:0] from station_info[15:10]""" + return (station_info & 0xFC00) >> 10 + + +def extract_station_id(station_info): + """Extract station_id[9:0] from station_info[9:0]""" + return station_info & 0x3FF + + +def transient_header_data(hdr): + """ get header data from hdr """ + data = {} + data["marker"] = hdr[0] + data["data_type"] = MARKER_TYPES.get(hdr[0], "??") + data["version_id"] = hdr[1] + data["observation_id"] = hdr[2] + data["station_info"] = hdr[3] + data["antenna_field_index"] = extract_antenna_field_index(hdr[3]) + data["station_id"] = extract_station_id(hdr[3]) + data["source_info"] = hdr[4] + data["gn_index"] = hdr[4] & 0x00FF + data["sample_width"] = (hdr[4] & 0x0F00) >> 8 + data["f_adc"] = (hdr[4] & 0x1000) >> 12 + data["nyquist_zone_index"] = (hdr[4] & 0x6000) >> 13 + data["antenna_band_index"] = (hdr[4] & 0x8000) >> 15 + data["reserved_octet"] = hdr[5:8] + data["antenna_input_index"] = hdr[8] + data["nof_raw_data_per_packet"] = hdr[9] + data["rsn"] = hdr[10] + return data + + +def print_header(hdr, stdout=None, recv_time=None): + """ + print header info to: + """ + str_list = [] + if recv_time: + str_list.append(f"=== {recv_time}") + str_list.append(f"- marker = {hdr.marker}") + str_list.append(f"- version_id = {hdr.version_id}") + str_list.append(f"- observation_id = {hdr.observation_id}") + str_list.append(f"- antenna_field_index = {extract_antenna_field_index(hdr.station_info)}") + str_list.append(f"- station_id = {extract_station_id(hdr.station_info)}") + str_list.append(f"- si.gn_index = {hdr.gn_index}") + str_list.append(f"- si.sample_width = {hdr.sample_width}") + str_list.append(f"- si.f_adc = {hdr.f_adc}") + str_list.append(f"- si.nyquist_zone_index = {hdr.nyquist_zone_index}") + str_list.append(f"- si.antenna_band_index = {hdr.antenna_band_index}") + str_list.append(f"- reserved_octets = {hdr.reserved_octets}") + str_list.append(f"- antenna_input_index = {hdr.antenna_input_index}") + str_list.append(f"- nof_raw_data_per_packet = {hdr.nof_raw_data_per_packet}") + str_list.append(f"- rsn = {hdr.rsn}") + + if stdout is True: + print('\n'.join(str_list)) + + logger = logging.getLogger("print_header") + for _s in str_list: + logger.info(_s) + + +class _TransientData(Thread): + """ + This class hold the unpacked received data, and is a Thread so the data can be accessed. + """ + def __init__(self, in_data): + super().__init__() + self.logger = logging.getLogger("TransientData") + self.data_type = "RAW" + self.in_data = in_data + self.lock = RLock() + self.received_data = {} + self._n_packets_in = 0 + self._n_packets_valid = 0 + self._n_packets_duplicate = 0 + self._input_order = [] + self._last_new_data_time = time.time() + self._keep_running = True + + def stop(self): + """ stop """ + self._keep_running = False + self.logger.info("=== Transient data status") + self.logger.info("- read %d unpacked packets from in_queue", self._n_packets_in) + self.logger.info("- n_valid %d packets", self._n_packets_valid) + self.logger.info("- n_duplicate %d packets", self._n_packets_duplicate) + + def run(self): + """run """ + while self._keep_running: + try: + many_data = self.in_data.get_many_nowait(max_messages_to_get=4000) + self.add_to_received_data(many_data) + except Empty: + time.sleep(0.001) + except BaseException as err: + if handle_exception(err) == 1: + self._keep_running = False + self.in_data.close() + + def reset_data(self): + with self.lock: + self.received_data = {} + self._n_packets_in = 0 + self._n_packets_valid = 0 + self._n_packets_duplicate = 0 + self._input_order = [] + + def busy(self): + """ busy function + SDPFW sends all statistics data for an integration interval within 0.1 s, so after statistics + output disable it may take at most 0.1 s for the stream to become idle. + SDPFW continuously sends beamlet data packets, so after beamlet output disable it will take + less than 1 ms for the stream to become idle. + Use some more time margin to account for processing all recv data packets in the _data buffer. + """ + return time.time() < (self._last_new_data_time + 0.5) + + def _get_index(self, hdr): + return hdr.antenna_input_index + + def add_to_received_data(self, many_data): + """ add data from many_data into the received data dictonary """ + with self.lock: + # self.logger.info(f"append {len(many_data)} unpacked packets") + for recv_time, hdr, data in many_data: + self._n_packets_in += 1 + header = AttributeDict(hdr) + rsn = header.rsn + index = self._get_index(header) + if rsn in self.received_data and index in self.received_data[rsn]: + self.logger.error("%s, index %s is duplicate", str(rsn), str(index)) + self._n_packets_duplicate += 1 + return + + if rsn not in self.received_data: + self.received_data[rsn] = {} + if index not in self.received_data[rsn]: + self.received_data[rsn][index] = {} + + self._input_order.append((rsn, index)) + self.received_data[rsn][index]["recv_time"] = recv_time + self.received_data[rsn][index]["header"] = header + self.received_data[rsn][index]["data"] = data + self._n_packets_valid += 1 + self._last_new_data_time = time.time() + + def nof_received_indexes(self, rsn): + """ Return current number of indices that has been received for rsn interval.""" + with self.lock: + return len(self.received_data[rsn]) + + def get_packet_rsn_index(self, _nr): + """ get_packet_rsn_index """ + with self.lock: + if _nr < self._n_packets_valid: + return self._input_order[_nr] + return (None, None) + + def get_all_rsn(self): + """ get_all_rsn + returns a list with all rsn numbers available in the received data dictonary + """ + with self.lock: + return list(sorted(self.received_data.keys())) + + def nof_received_rsn(self): + """ Return number of rsn that has been recveived + + for transients number of rsn counts the number. + """ + with self.lock: + return len(self.received_data) + + def get_all_index(self, rsn): + """ get_all_index + returns a list with all index numbers available in the received data dictonary + for requested rsn number. + """ + with self.lock: + return list(sorted(self.received_data[rsn].keys())) + + def is_available(self, rsn, index): + """ is_available + checks in received data dictonary if rsn is available and index is available in rsn. + return True of available els False + """ + with self.lock: + if rsn in self.received_data: + if index in self.received_data[rsn]: + return True + return False + + def get_receive_time(self, rsn, index): + """ get_receive_time + return received-timestamp of packet with rsn and index number + """ + with self.lock: + return self.received_data[rsn][index]["recv_time"] + + def get_header(self, rsn, index): + """ get_header + return header of packet with rsn and index number + """ + with self.lock: + return self.received_data[rsn][index]["header"] + + def get_data(self, rsn, index): + """ get_data + return data of packet with rsn and index number + """ + with self.lock: + data = self.received_data[rsn][index]["data"] + return data + + def wait_for_nof_rsn(self, exp_nof_rsn, timeout=2.0): + """ Wait until expected number of rsn intervals has been received, or timeout. + return True if exp_nof_rsn or more is received + return False on timeout + """ + ref_time = time.time() + while True: + wait_time = time.time() - ref_time + nof_rsn = len(self.received_data) + if nof_rsn >= exp_nof_rsn: + self.logger.info("wait_for_nof_rsn(%d) obtained in %f s", exp_nof_rsn, wait_time) + return True # success + if wait_time < timeout: + time.sleep(0.000001) + else: + self.logger.error("wait_for_nof_rsn() timeout occured") + return False # timeout + + def wait_for_all_indices(self, rsn, exp_nof_index, timeout=12.0): + """ Wait until expected number of indices has been received for rsn interval, or timeout. + return True if exp_nof_index or more is received + return False on timeout + """ + ref_time = time.time() + while True: + wait_time = time.time() - ref_time + nof_index = len(self.received_data[rsn]) + if nof_index >= exp_nof_index: + self.logger.info("wait_for_all_indices() obtained in %f s", wait_time) + return True # success + if wait_time < timeout: + time.sleep(0.000001) + else: + self.logger.error("wait_for_all_indices() timeout occured (%d != %d)", nof_index, exp_nof_index) + return False # timeout + + +class _UdpReader(Process): + """UdpReader + This process reads on given udp port and stores all data into the given queue. + The commands and response queue is used to communicate with this process. + Commands that can be used: + stop : stop UDP reader + clear-buffer : clear input buffer (network) + is-buffer-cleared: if input-buffer cleared returns True in response queue + recv-on : start receiving data + recv-off : stop receiving data + received-packets : return number of received packets in response queue + cleared-packets : return number of cleared packets in response queue + """ + + def __init__(self, commands, response, port, out_data): + super().__init__() + self.logger = logging.getLogger("UdpReader") + self.commands = commands + self.response = response + self.port = port + self.out_data = out_data + self.packet_size = 7070 + self.stream = UdpServer(port=self.port) + self.stream.expected_n_bytes(self.packet_size) + self.keep_running = True + self.recv = False + self.clear_buffer = False + self.max_packets = 0 + self.recv_packages_cnt = 0 + self.cleared_packages_cnt = 0 + + def check_control_commands(self): + try: + # if the commands queue is empty it will throw a Empty exception + cmd = self.commands.get_nowait() + self.logger.debug("udp reader cmd = %s", cmd) + if cmd == "stop": + self.logger.debug("stop udp server") + self.recv = False + self.keep_running = False + elif cmd == "clear-buffer": + self.logger.debug("clear input buffer") + self.recv = False + self.clear_buffer = True + elif cmd == "is-buffer-cleared": + self.logger.debug("is buffer cleared") + self.response.put(not self.clear_buffer) + elif cmd == "recv-on": + self.logger.debug("start receiving udp packages") + self.recv = True + self.clear_buffer = False + elif cmd == "recv-off": + self.logger.debug("stop receiving udp packages") + self.recv = False + elif "max-packets" in cmd: + value = cmd.split("=")[1] + self.max_packets = int(value) + elif cmd == "received-packets": + self.response.put(self.recv_packages_cnt) + elif cmd == "cleared-packets": + self.response.put(self.cleared_packages_cnt) + except Empty: + pass + + def run(self): + self.logger.info("start udp server") + + while self.keep_running: + # check for control commands + self.check_control_commands() + + try: + if self.recv: + # recv-on command is received. + for _ in range(100): + data = self.stream.recv() # recv timeout is set to 1 sec + if data: + recv_time = datetime.datetime.now() + self.out_data.put((recv_time, data)) + self.recv_packages_cnt += 1 + else: + break + if self.max_packets and self.recv_packages_cnt >= self.max_packets: + self.recv = False + continue + + if self.clear_buffer: + # clear-buffer command is received. + for _ in range(200): + data = self.stream.recv() # recv timeout is set to 1 sec + if not data: + self.clear_buffer = False + break + else: + self.cleared_packages_cnt += 1 + continue + + if not self.recv or not self.clear_buffer: + time.sleep(0.001) + + except Full: + self.logger.warning("raw_data queue full") + self.recv = False + + # close udp_server (stream) and output queue + self.stream.close() + self.out_data.close() + + +class _UnpackTransientPacket(Process): + """ + This process listen to the in_data Queue and proccesses the received data (recvtime and rawdata) + after processing it will put (recvtime, header, data) in the out_data Queue + recvtime: time of receiving the packet in iso format + header: simple class with all unpacked header info + data: list with formatted data + The commands queue is used to communicate with this process, used commands: + stop : stop this process + unpack-on : start unpacking data from in_data and put unpacked in out_data + unpack-off: stop unpacking data + """ + + def __init__(self, id, commands, in_data, out_data): + super().__init__() + self.logger = logging.getLogger("UnpackPacket") + self.id = id + self.commands = commands + self.data_type = "RAW" + self.in_data = in_data + self.out_data = out_data + self.header_size = 32 + + def run(self): + self.logger.info("start unpack") + keep_running = True + unpack = False + many_cnt = [] + while keep_running: + # check for commands + try: + cmd = self.commands.get_nowait() + self.logger.debug("unpack cmd = %s", cmd) + if cmd == "stop": + self.logger.debug("stop udpack") + unpack = False + keep_running = False + elif cmd == "unpack-on": + self.logger.debug("start unpacking received udp packets") + many_cnt = [] + unpack = True + elif cmd == "unpack-off": + self.logger.debug("stop unpacking receiving udp packets") + self.logger.debug(f"id={self.id}: many_cnt={many_cnt}") + unpack = False + except Empty: + pass + + if unpack: + try: + many_data = self.in_data.get_many_nowait(max_messages_to_get=2000) + many_cnt.append(len(many_data)) + self.unpack(many_data) + time.sleep(random.uniform(0.00001, 0.01)) + except Empty: + time.sleep(random.uniform(0.0001, 0.1)) + except BaseException as err: + self.logger.error(err) + if handle_exception(err) == 1: + raise err + else: + time.sleep(0.01) + + # close input and output queue + self.in_data.close() + self.out_data.close() + + def unpack(self, many_data): + """ unpack raw data and put data in the out_queue """ + for recv_time, data in many_data: + try: + packet_hdr, packet_data = self.split_header_data(data) + unpacked = self.unpack_transient(packet_hdr, packet_data) + if unpacked is not None: + hdr = unpacked[0] + data = unpacked[1] + self.out_data.put((recv_time.isoformat(), hdr, data)) + except Full: + self.logger.warning("unpacked_data queue full") + time.sleep(0.001) + except BaseException as err: + self.logger.error(err) + if handle_exception(err) == 1: + raise err + + def split_header_data(self, packet): + """ split_header_data """ + # if len(packet) >= self.header_size: + try: + _header = packet[:self.header_size] + _data = packet[self.header_size:] + return _header, _data + except IndexError: + self.logger.error("Wrong packet size") + return None, None + + def unpack_transient(self, packet_hdr, packet_data): + """ Unpacking the transient header and data, see [1] + + https://docs.python.org/3/library/struct.html?highlight=struct#module-struct + """ + hdr = None + data = None + try: + fmt = ">BBLHHBBBBHQ" + hdr_list = struct.unpack(fmt, packet_hdr) + hdr = transient_header_data(hdr_list) + except struct.error: + self.logger.error("unpacking sst header failed") + return None + + if hdr["data_type"] != self.data_type: + return None + + try: + n_words = hdr["nof_raw_data_per_packet"] + fmt = ">" + "Q" * n_words # > = big-endian, Q = 8byte unsigned long long + data_list = list(struct.unpack(fmt, packet_data)) + + # Purpose: Wrap an integer value to w bits + data_w = 54 + wrap_sign = 2 ** (data_w - 1) + wrap_mask = 2 ** (data_w - 1) - 1 + + data = [] + for _data in data_list: + if _data & wrap_sign: + data.append((_data & wrap_mask) - wrap_sign) + else: + data.append(_data & wrap_mask) + except struct.error: + self.logger.error("unpacking sst data failed") + return None + return hdr, data + + +class TransientReader: + """ + This is the only class the user should use. + + This TransientReader class can read: + - Transient data + The received data is unpacked and can be accessed from self.data + calling start() starts receiving and unpacking data + calling stop() stops receiving data and stops + + """ + + def __init__(self, port, n_workers=None): + """ + port: used udp port to receive data + """ + self.logger = logging.getLogger("TransientReader") + self.port = port + self._reader = None + self._unpack_workers = [] + self._n_workers = 1 if n_workers is None else n_workers + self._running = False + + self._raw_data = Queue(int(1e10)) + self._unpacked_data = Queue(int(1e8)) + self._reader_commands = Queue(256) + self._reader_response = Queue(256) + self._unpack_commands = [Queue(256) for i in range(self._n_workers)] + self.data = _TransientData(self._unpacked_data) + + def __del__(self): + """ on closing this class the data class and queue are closed """ + self.data.stop() + self._raw_data.close() + self._unpacked_data.close() + self._running = False + + def done(self): + """ checks if all received data is processed and return True if done else False """ + return (self._raw_data.qsize() == 0) and (self._unpacked_data.qsize() == 0) and not self.data.busy() + + def isactive(self): + """ return True if Streamreader is active """ + return self._running + + def set_max_packets(self, max_packets): + self._reader_commands.put(f"max-packets={max_packets}") + + def clear_reader_buffer(self): + """ clear udp reader input buffer (network) + return number of cleared packets + """ + self._reader_commands.put("clear-buffer") + while True: + self._reader_commands.put("is-buffer-cleared") + try: + cleared = self._reader_response.get(timeout=2.0) + except Empty: + break + if cleared: + break + time.sleep(0.1) + self._reader_commands.put("cleared-packets") + try: + cleared_cnt = self._reader_response.get(timeout=2.0) + except Empty: + cleared_cnt = 0 + return cleared_cnt + + def empty_queue(self, q): + """ empty_queue + queues are used by more than one process, the queue is closed before + closing the processes to prefent hanging. + """ + while True: + try: + q.get_many_nowait() + except Empty: + break + + def start(self): + """ start stream reader + clear old udp network packets + setup 1 data thread + setup n unpack_workers processes + setup 1 udp reader process + start unpacking + start receiving + """ + self.logger.debug("== Start stream reader with %d workers ==", self._n_workers) + self.data.start() + # start unpack and reader process + for worker_nr in range(self._n_workers): + _p = _UnpackTransientPacket(worker_nr, self._unpack_commands[worker_nr], self._raw_data, self._unpacked_data) + self._unpack_workers.append(_p) + _p.start() + self._reader = _UdpReader(self._reader_commands, self._reader_response, self.port, self._raw_data) + self._reader.start() + + cleared_cnt = self.clear_reader_buffer() + self.logger.debug(f"start cleared {cleared_cnt} udp packets") + + # start unpacking and receiving + for worker_nr in range(self._n_workers): + (self._unpack_commands[worker_nr]).put("unpack-on") + self._reader_commands.put("recv-on") + self._running = True + + def stop(self): + """ stop stream reader + stop receiving + clear udp network buffer + stop unpacking + log some statistics + empty queues + stop udp reader + stop unpack workers + """ + self.logger.info("== stop stream reader ==") + + # stop receiving udp data + self.logger.debug("== stop receiving udp data ==") + self._reader_commands.put("recv-off") + + cleared_cnt = self.clear_reader_buffer() + self.logger.debug(f"stop cleared {cleared_cnt} udp packets") + + # stop receiving and unpacking + self.logger.debug("== stop unpacking raw data ==") + for worker_nr in range(self._n_workers): + (self._unpack_commands[worker_nr]).put("unpack-off") + + time.sleep(0.5) + # log received packages by reader + self.logger.debug(f"not processed {self._raw_data.qsize()} raw packets") + self.logger.debug(f"not processed {self._unpacked_data.qsize()} unpacked packets") + + self._reader_commands.put("received-packets") + cnt = self._reader_response.get(timeout=2.0) + self.logger.info(f"Received {cnt} udp packets") + + # empty the queue's + self.logger.debug("empty raw queue") + self.empty_queue(self._raw_data) + self.logger.debug("empty unpacked queue") + self.empty_queue(self._unpacked_data) + + # stop reader and unpacking process + self.logger.debug("== stop reader and workers ==") + self._reader_commands.put("stop") + for worker_nr in range(self._n_workers): + (self._unpack_commands[worker_nr]).put("stop") + time.sleep(0.5) + + # terminate reader and unpack processes if still running after stop command + self.logger.debug("== Terminating children ==") + for child in active_children(): + self.logger.debug(f"Terminating: {child}") + child.terminate() + time.sleep(0.5) diff --git a/test/py/control/sdp_ring_test.txt b/test/py/control/sdp_ring_test.txt new file mode 100644 index 0000000000000000000000000000000000000000..04b42d9bb7e4705fc37a74cd8058627cef14923d --- /dev/null +++ b/test/py/control/sdp_ring_test.txt @@ -0,0 +1,136 @@ +# ########################################################################## +# Copyright 2025 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ########################################################################## + +# ########################################################################## +# Author: +# . Eric Kooistra, 12 june 2025. +# Purpose: +# . Ring test using LOFAR2/SDP image with beamlets lane and crosslets lane. +# Description: +# . Use sdp_rw.py commands to manually control and monitor the ring. +# . The ring_rx_total_nof_packets_discarded yields the detected packet loss, that can be compared to the nof +# received packet of ring_rx_total_nof_packets_received, to get an impression of the error rate of the lane. +# - The beamlets packets are (3 + 549) * 8 = 4416 octets long. +# - The crosslet packets are (3 + 42) * 8 = 360 octets long. +# - A packet gets discarded due to CRC error. Assume a CRC error is cause by one bit error. Packets that are +# lost completely (so not received at all) are not detected. +# . The nof seconds of measurement is given by bf_ring_rx_total_nof_sync_received, because the beamlets lane uses +# BF_NOF_HOPS = 1, so each beamlet packet travels only one hop (i.e. to the next not, but not beyond that). +# +# Remarks: +# . To run a command manually copy paste the parameters and the command line from this sdp_ring_test.txt into the +# terminal. +# +# References: +# [1] https://support.astron.nl/confluence/display/L2M/L2+STAT+Decision%3A+SC+-+SDP+OPC-UA+interface +# +# ########################################################################## + +############################################################################ +# Test script input parameters + +# cd git/sdptr +# . ./init_sdptr_python.sh + +SDPTR_IP=localhost +SDPTR_PORT=4840 + +N_FPGA=4 +FIRST_GN=0 +P_SQ=${N_FPGA}//2+1 +BF_NOF_HOPS=1 +XST_NOF_HOPS=$P_SQ-1 + +############################################################################ +# Version +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -r software_version +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -r firmware_version + +############################################################################ +# Optional SDP FW reboot +# . Uncomment the --write line and set the PATH_TO_RBF to flash another SDPFW image + +PATH_TO_RBF=/home/kooistra/git/hdl/build_result/2025-04-28T09u11_lofar2_unb2c_sdp_station_lift_201MHz/lofar2_unb2c_sdp_station_lift-084e2cd6a.rbf + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -r firmware_version +sdp_firmware.py --host ${SDPTR_IP} --port ${SDPTR_PORT} --reboot --image FACT +sleep 20 +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -r firmware_version +#sdp_firmware.py --host ${SDPTR_IP} --port ${SDPTR_PORT} --write --image USER --file ${PATH_TO_RBF} +sdp_firmware.py --host ${SDPTR_IP} --port ${SDPTR_PORT} --reboot --image USER +sleep 20 +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -r firmware_version + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r processing_enable + +############################################################################ +# Setup ring for cable between end nodes and for beamlets lane (BF) and for crosslets lane (XST) + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w ring_node_offset [${FIRST_GN}]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w ring_nof_nodes [${N_FPGA}]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w ring_use_cable_to_next_rn [False]*(${N_FPGA}-1)+[True] +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w ring_use_cable_to_previous_rn [True]+[False]*(${N_FPGA}-1) + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w xst_ring_nof_transport_hops [${XST_NOF_HOPS}]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w bf_ring_nof_transport_hops [${BF_NOF_HOPS}]*${N_FPGA} + +############################################################################ +# Enable SDP processing + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w processing_enable [True]*${N_FPGA} + +sleep 5 +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w xst_processing_enable [True]*${N_FPGA} + +sleep 5 +############################################################################ +# Monitor SDP processing + +# Input samples +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r signal_input_bsn + +# Ring lane latency +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r bf_ring_rx_latency +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r xst_ring_rx_latency + +############################################################################ +# Clear total counts + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w bf_ring_rx_clear_total_counts [True]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w xst_ring_rx_clear_total_counts [True]*${N_FPGA} + +############################################################################ +# Monitor packet loss + +# Beamlets lane +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r bf_ring_rx_total_nof_packets_received +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r bf_ring_rx_total_nof_packets_discarded + +# Crosslets lane +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r xst_ring_rx_total_nof_packets_received +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r xst_ring_rx_total_nof_packets_discarded + +############################################################################ +# Monitor sync interval loss + +# Beamlets lane +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r bf_ring_rx_total_nof_sync_received +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r bf_ring_rx_total_nof_sync_discarded +# Crosslets lane +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r xst_ring_rx_total_nof_sync_received +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r xst_ring_rx_total_nof_sync_discarded diff --git a/test/py/control/tbuf_dump.py b/test/py/control/tbuf_dump.py index 82be8f24a4c4af0f30c4a692234877b0ae3bd95b..021b01f2e89bba2f7de25973cd8fae9df8aa15b8 100755 --- a/test/py/control/tbuf_dump.py +++ b/test/py/control/tbuf_dump.py @@ -38,37 +38,37 @@ import sys import logging import time -import numpy as np +# import numpy as np from base.handle_exceptions import handle_exception from base.opcua_client import OpcuaClient from base.base_tools import arg_str_to_list from base.constants import C_SDP, C_TBUF -from base.sdp_functions import hostname_to_mac_ip, start_processing, stop_all_streams -# from base.stream_reader import StreamReader, print_header +from base.sdp_functions import hostname_to_mac_ip, start_processing +from base.transient_reader import TransientReader, print_header # from base.stream_control import StreamControl from wg import WaveformGenerator class Tbuf: def __init__(self, client, sdp, dest_mac, dest_ip, dest_udp_port): - self.logger = logging.getLogger("TbufStatisticStream") + self.logger = logging.getLogger("Tbuf") self.client = client self.sdp = sdp self.n_nodes = self.client.n_nodes # get number of nodes from server self.dest_mac = dest_mac self.dest_ip = dest_ip self.dest_udp_port = dest_udp_port - self.stream_reader = None + self.transient_reader = None self.stream_control = None self.next_packet_nr = 0 self.stdout = self.logger.getEffectiveLevel() > logging.INFO - stop_all_streams(self.client) + self.print_state = False def print_log(self, fstr): print(fstr) if self.stdout else self.logger.info(fstr) - + def setup_output_hdr(self, sdp, mac=None, ip=None, port=None): """Setup TBUF output destination address (same to all n_nodes).""" # Use default or argument @@ -86,56 +86,180 @@ class Tbuf: sdp.tbuf_output_hdr_ip_destination_address = _ip print_result("setup_output_hdr", True) - # def start_stream_reader(self): - # self.stream_reader = StreamReader(self.dest_udp_port, "TBUF", n_workers=1) - # self.stream_control = StreamControl(self.client, self.stream_reader) - # self.logger.debug("start stream-reader") - # self.stream_reader.start() - # print_result("start_stream_reader", True) - - # def stop_stream_reader(self, timeout=60): - # if self.stream_reader: - # print_result("stop_stream_reader", True) - # # Wait until SDPFW has stopped stream output - # timeout_time = time.time() + timeout - # while time.time() < timeout_time and not self.stream_reader.done(): - # time.sleep(0.0001) - # self.stream_reader.stop() - - # def start_io(self): - # """Start stream_reader for input and start SDPFW output - - # Start stream_reader before SDPFW statistics offload, to receive all statistics - # packets per integration interval. - # """ - # self.start_stream_reader() - # self.stream_control.stream_on() - # self.logger.info("Started io") - # print_result("start_io", True) + def start_transient_reader(self): + self.transient_reader = TransientReader(self.dest_udp_port, n_workers=1) + # self.stream_control = StreamControl(self.client, self.transient_reader) + self.logger.debug("start transient-reader") + self.transient_reader.start() + print_result("start_transient_reader", True) + + def stop_transient_reader(self, timeout=60): + if self.transient_reader: + print_result("stop_transient_reader", True) + # Wait until SDPFW has stopped stream output + timeout_time = time.time() + timeout + while time.time() < timeout_time and not self.transient_reader.done(): + time.sleep(0.0001) + self.transient_reader.stop() + + def start_io(self): + """Start transient_reader for input and start SDPFW output + + Start transient_reader before SDPFW statistics offload, to receive all statistics + packets per integration interval. + """ + self.start_transient_reader() + # self.stream_control.stream_on() + self.logger.info("Started io") + print_result("start_io", True) + + def stop_io(self): + """Stop SDPFW output and stop transient_reader input + + Stop transient_reader after when SDPFW statistics offload has stopped, to receive all + statistics packets per integration interval. + """ + # self.stream_control.stream_off() + self.stop_transient_reader() + self.logger.info("Stopped io") + print_result("", True) + + def print_transient_header(self): + success = True + rsn, index = self.transient_reader.data.get_packet_rsn_index(self.next_packet_nr) + if rsn is not None and index is not None: + hdr = self.transient_reader.data.get_header(rsn, index) + recv_time = self.transient_reader.data.get_receive_time(rsn, index) + print_header(hdr, stdout=self.stdout, recv_time=recv_time) + self.next_packet_nr += 1 + else: + success = False + time.sleep(0.0001) + print_result("print_transient_header", success) + + def setup(self, nodes): + self.sdp.tbuf_clear_counts = [True] * self.n_nodes + self.sdp.tbuf_output_hdr_app_reserved = [0xFFFFFFFF] * self.n_nodes + self.sdp.tbuf_output_enable = [False] * (self.n_nodes - 1) + [True] + + def record(self): + self.sdp.tbuf_record_enable = [True] * self.n_nodes + self.print_log(f"tbuf_record_busy: {self.sdp.tbuf_record_busy}") + + def freeze(self): + self.sdp.tbuf_record_enable = [False] * self.n_nodes + self.print_log(f"tbuf_record_busy: {self.sdp.tbuf_record_busy}") + self.print_log(f"tbuf_nof_pages_in_buffer: {self.sdp.tbuf_nof_pages_in_buffer}") + self.print_recorded_state() + + def prepare_dump(self, m_bps, dump_offset, dump_interval): + self.sdp.tbuf_dump_nof_bps = [m_bps] * self.n_nodes + + dump_start_timestamp = self.sdp.tbuf_recorded_first_timestamp[0] + dump_offset + + self.sdp.tbuf_dump_start_timestamp = [dump_start_timestamp] * self.n_nodes + self.sdp.tbuf_dump_time_interval = [dump_interval*10e-6] * self.n_nodes + self.print_log(f"tbuf_dump_start_timestamp: {self.sdp.tbuf_dump_start_timestamp}") + self.print_log(f"tbuf_dump_time_interval: {self.sdp.tbuf_dump_time_interval}") + + # Read dump pages range + self.print_log(f"tbuf_dump_start_page: {self.sdp.tbuf_dump_start_page}") + self.print_log(f"tbuf_dump_nof_packets: {self.sdp.tbuf_dump_nof_packets}") + + def dump(self, nodes, antennas): + self.sdp.tbuf_clear_counts = [True] * self.n_nodes + + dump_enables = 0 + for i in antennas: + dump_enables += 1 << i + + en = [] + dump_done = [False] * self.n_nodes + for i in range(self.n_nodes): + if i in nodes: + en.append(dump_enables) + dump_done[i] = True + else: + en.append(0) + + self.print_log(f"tbuf_dump_enables: {en}") + self.sdp.tbuf_dump_enables = en + if self.print_state: + self.print_tbuf_state() + + # Wait until dump done + while self.sdp.tbuf_dump_done != dump_done: + self.print_log(f"tbuf_dump_done: {self.sdp.tbuf_dump_done}") + self.print_memory_state() + self.print_log(f"tbuf_output_nof_packets: {self.sdp.tbuf_output_nof_packets}") + self.print_log(f"beamlet_output_10gbe_tx_nof_frames: {self.sdp.beamlet_output_10gbe_tx_nof_frames}") + if self.print_state: + self.print_tbuf_state() + else: + self.print_log(f"tbuf_dump_done: {self.sdp.tbuf_dump_done}") + + def finish_dump(self): + # Disable dump + self.sdp.tbuf_dump_enables = [0] * self.n_nodes + self.print_log(f"tbuf_dump_enables: {self.sdp.tbuf_dump_enables}") + self.print_ddr_state() + if self.print_state: + self.print_tbuf_state() + + def print_tbuf_state(self, nodes=None): + nodes = range(self.n_nodes) if nodes is None else nodes + arbiter_states = ["s_idle", "s_recording", "s_dumping"] + writer_states = ["s_idle", "s_write_start", "s_write_busy", "s_write_done"] + reader_states = ["s_idle", "s_dump_start", "s_dump_antenna", "s_read_block", "s_read_gap", "s_read_done", "s_dump_done"] + + all_state = self.sdp.tbuf_state + for node in nodes: + state = all_state[node] + self.print_log(f"=== tbuf-state node-{node} ===") + self.print_log(f" io_ddr.ddr4_i_cal_ok = {state & 1}") + self.print_log(f" io_ddr.dvr_cipo.done = {(state >> 1) & 1}") + self.print_log(f" tbuf_arbiter.state = {arbiter_states[(state >> 2) & 0x03]}") + self.print_log(f" tbuf_writer.state = {writer_states[(state >> 4 & 0x03)]}") + self.print_log(f" tbuf_writer.record_done = {(state >> 6) & 1}") + self.print_log(f" tbuf_writer.record_busy = {(state >> 7) & 1}") + self.print_log(f" tbuf_writer.record_all = {(state >> 8) & 1}") + self.print_log(f" tbuf_writer.record_buf_full = {(state >> 9) & 1}") + self.print_log(f" tbuf_writer.record_sop_fifo_usedw = {(state >> 10) & 0x0f}") + self.print_log(f" tbuf_reader.state = {reader_states[(state >> 14 & 0x07)]}") + self.print_log(f" tbuf_reader.dump_enables = b{(state >> 17) & 0x1f:06b}") + self.print_log(f" tbuf_reader.dump_en = {(state >> 23) & 1}") + self.print_log(f" tbuf_reader.dump_busy = {(state >> 24) & 1}") + self.print_log(f" tbuf_reader.dump_done = {(state >> 25) & 1}") + self.print_log(f" tbuf_reader.dump_all = {(state >> 26) & 0x07}") + + def print_ddr_state(self): + self.print_log("=== ddr-state ===") + self.print_log(f" ddr_fifo_full = {self.sdp.ddr_fifo_full}") + self.print_log(f" ddr_wr_fifo_usedw = {self.sdp.ddr_wr_fifo_usedw}") + self.print_log(f" ddr_rd_fifo_usedw = {self.sdp.ddr_rd_fifo_usedw}") + + def print_recorded_state(self): + self.print_log("=== recorded-state ===") + self.print_log(f" tbuf_recorded_nof_pages = {self.sdp.tbuf_recorded_nof_pages}") + self.print_log(f" tbuf_recorded_first_page = {self.sdp.tbuf_recorded_first_page}") + self.print_log(f" tbuf_recorded_last_page = {self.sdp.tbuf_recorded_last_page}") + self.print_log(f" tbuf_recorded_time_interval = {self.sdp.tbuf_recorded_time_interval}") + self.print_log(f" tbuf_recorded_first_timestamp = {self.sdp.tbuf_recorded_first_timestamp}") + self.print_log(f" tbuf_recorded_last_timestamp = {self.sdp.tbuf_recorded_last_timestamp}") + + def print_memory_state(self): + self.print_log("=== memory-state ===") + self.print_log(f" tbuf_memory_read_nof_packets = {self.sdp.tbuf_memory_read_nof_packets}") + self.print_log(f" tbuf_memory_read_nof_crc_errors = {self.sdp.tbuf_memory_read_nof_crc_errors}") + self.print_log(f" tbuf_memory_read_nof_rsn_errors = {self.sdp.tbuf_memory_read_nof_rsn_errors}") + + def print_signal_input_state(self): + self.print_log("=== signal-input-state ===") + self.print_log(f" tbuf_signal_input_rsn = {self.sdp.tbuf_signal_input_rsn}") + self.print_log(f" tbuf_signal_input_nof_samples = {self.sdp.tbuf_signal_input_nof_samples}") + self.print_log(f" tbuf_signal_input_nof_blocks = {self.sdp.tbuf_signal_input_nof_blocks}") - # def stop_io(self): - # """Stop SDPFW output and stop stream_reader input - # Stop stream_reader after when SDPFW statistics offload has stopped, to receive all - # statistics packets per integration interval. - # """ - # self.stream_control.stream_off() - # self.stop_stream_reader() - # self.logger.info("Stopped io") - # print_result("", True) - - # def print_tbuf_header(self): - # success = True - # bsn, index = self.stream_reader.data.get_packet_bsn_index(self.next_packet_nr) - # if bsn is not None and index is not None: - # hdr = self.stream_reader.data.get_header(bsn, index) - # recv_time = self.stream_reader.data.get_receive_time(bsn, index) - # print_header(hdr, stdout=self.stdout, recv_time=recv_time) - # self.next_packet_nr += 1 - # else: - # success = False - # time.sleep(0.0001) - # print_result("print_tbuf_header", success) # def test_cp(self): # """ @@ -172,6 +296,8 @@ class Tbuf: # functions used, if this script is called direct + + def start_wg(client, ampl, phase, freq, scheduled_bsn=None): wg = WaveformGenerator(client) wg.disable() @@ -188,33 +314,57 @@ def stop_wg(client): wg.disable() -def run_record(client, sdp, tbuf, recording_time, m_bps, wg_periods, scheduled_bsn): +def run_record(tbuf, recording_time, wg_periods): + client = tbuf.client + sdp = tbuf.sdp + wg_periods = 1 if wg_periods is None else wg_periods if recording_time: # start waveform generator logger.debug("setup waveform generator") wg_ampl = 1.0 wg_phase = 0.0 wg_periods_per_block = wg_periods - wg_freq = wg_periods_per_block / C_TBUF.N_SAMPLES_PER_PAGE / C_TBUF.T_ADC + wg_freq = wg_periods_per_block / C_TBUF.N_SAMPLES_PER_PAGE / C_SDP.T_adc + logger.debug("start waveform generator") + scheduled_bsn = sdp.current_bsn[0] + 50000 start_wg(client, ampl=wg_ampl, phase=wg_phase, freq=wg_freq, scheduled_bsn=scheduled_bsn) logger.debug("sleep 10 seconds") time.sleep(10.0) - # setup tbuf + logger.debug("start processing") sdp.processing_enable = [True] * client.n_nodes - logger.debug("setup dump") - sdp.tbuf_clear_counts = [True] * client.n_nodes - sdp.tbuf_output_hdr_app_reserved = [0xFFFFFFFF] * client.n_nodes - sdp.tbuf_dump_nof_bps = [m_bps] * client.n_nodes - sdp.tbuf_output_enable = [True] * client.n_nodes - sdp.tbuf_output_enable = [False] * client.n_nodes - sdp.tbuf_output_enable = [False] * (client.n_nodes - 1) + [True] - # record - sdp.tbuf_record_enable = [True] * client.n_nodes - time.sleep(recording_time) - sdp.tbuf_record_enable = [False] * client.n_nodes - # stop_wg(client) - + + logger.debug("setup tbuf") + tbuf.setup(args.nodes) + + logger.debug("start recording") + tbuf.record() + stop_time = time.time() + recording_time + while time.time() < stop_time: + tbuf.print_signal_input_state() + if tbuf.print_state: + tbuf.print_tbuf_state() + + logger.debug("freeze recording") + tbuf.freeze() + + logger.debug("stop waveform generator") + stop_wg(client) + + +def run_dump(tbuf, nodes, m_bps, antennas, dump_offset, dump_interval): + if dump_interval: + logger.debug("prepare dump") + tbuf.prepare_dump(m_bps, dump_offset, dump_interval) + logger.debug("dump") + tbuf.dump(nodes, antennas) + logger.debug("finish dump") + tbuf.finish_dump() + + +def run_tbuf_state(tbuf, print_state, nodes): + if print_state: + tbuf.print_tbuf_state(nodes) # def run_stream(_stream, _args, tbuf): # if _stream: @@ -256,9 +406,9 @@ def run_record(client, sdp, tbuf, recording_time, m_bps, wg_periods, scheduled_b # def run_test_header(_test_header, _args, tbuf): # if _test_header: -# tbuf.start_stream_reader() +# tbuf.start_transient_reader() # tbuf.test_header(_args.mtime) -# tbuf.stop_stream_reader() +# tbuf.stop_transient_reader() # def run_test_data(_test_data, _args, client, tbuf): @@ -270,9 +420,9 @@ def run_record(client, sdp, tbuf, recording_time, m_bps, wg_periods, scheduled_b # start_wg(client, ampl=wg_ampl, phase=wg_phase, freq=wg_freq) # time.sleep(10.0) -# # tbuf.start_stream_reader() # = --stream ON +# # tbuf.start_transient_reader() # = --stream ON # # tbuf.test_data() -# # tbuf.stop_stream_reader() # = --stream OFF +# # tbuf.stop_transient_reader() # = --stream OFF # stop_wg(client) @@ -300,15 +450,17 @@ def main(): try: # Ensure correct destination mac, ip, udp port tbuf.setup_output_hdr(sdp) - if args.record: - run_record(client, sdp, tbuf, args.record, args.mbps, args.wgperiods, 20000) - - # run tests + tbuf.print_state = args.state + + # The following functions chech themselves whether they are executed + run_record(tbuf, args.record, args.wgperiods) + run_dump(tbuf, dumpnode_list, args.mbps, args.antennas, args.dump_page_offset, args.dump_nof_pages) + run_tbuf_state(tbuf, args.state, node_list) except BaseException as err: exit_state = handle_exception(err) - stop_all_streams(client) + # stop_all_streams(client) client.disconnect() return exit_state @@ -337,8 +489,15 @@ if __name__ == "__main__": parser.add_argument("--dest", type=str, help="hostname of destination machine") parser.add_argument("--udpport", type=int, default=5003, help="udp port of destination machine to use for output") parser.add_argument("--record", type=float, help="Record for number of seconds") - parser.add_argument("--mbps", type=int, help="Dump rate in Mbps") + parser.add_argument("--mbps", type=float, help="Dump rate in Mbps") parser.add_argument("--wgperiods", type=float, help="Number of WG sinus periods per raw samples blocks") + parser.add_argument("--dump_page_offset", type=int, help="Dump start page offset in number of pages with respect to tbuf_recorded_first_page.") + parser.add_argument("--dump_nof_pages", type=int, default=0,help="Dump interval in number of pages, one packet (= page) contains 10 us of raw data. Default 0 pages for no dump.") + parser.add_argument("--dump_nodes", type=str, help="Nodes in range(N_FPGA=16) that will dump. Default empty range for no dump") + parser.add_argument("--antennas", type=int, nargs="+", help="Antenna inputs in range(A_PN=6) that will be dumped per node. Default empty range for no dump") + parser.add_argument("--sequential", type=bool, default=True, help="Default True to dump from nodes in sequential order, else dump from all nodes in parallel") + parser.add_argument("--state", action="store_true", help="Print tbuf_state") + # parser.add_argument("--plots", action="store_true", help="print TBUF plots") # parser.add_argument("--no_wg", action="store_true", help="default use WG signal input for --plots, else use RCU2 inputs") @@ -350,6 +509,7 @@ if __name__ == "__main__": args = parser.parse_args() node_list = arg_str_to_list(args.nodes) if args.nodes else None + dumpnode_list = arg_str_to_list(args.dump_nodes) if args.dump_nodes else None LOGLEVEL = ["ERROR", "WARNING", "INFO", "DEBUG"] verbosity_nr = 0 if args.v is None else args.v diff --git a/test/py/control/tbuf_dump.txt b/test/py/control/tbuf_dump.txt index 21cbc27b54caaa0f73fd753aaee4bc2bf297c614..a7a3c7f91238ac8869398de20c0aeeab312efd55 100644 --- a/test/py/control/tbuf_dump.txt +++ b/test/py/control/tbuf_dump.txt @@ -45,8 +45,8 @@ SDPTR_IP=localhost SDPTR_PORT=4840 N_FPGA=4 -SRC_MACs="['00:22:86:08:00:00','00:22:86:08:00:00','00:22:86:08:00:00','00:22:86:08:00:00']" -SRC_IPs="['192.168.0.11','192.168.0.12','192.168.0.13','192.168.0.14']" +SRC_MACs="['00:22:86:08:00:00','00:22:86:08:00:01','00:22:86:08:00:02','00:22:86:08:00:03']" +SRC_IPs="['192.168.0.01','192.168.0.02','192.168.0.03','192.168.0.04']" SRC_PORTs="[0xD000,0xD001,0xD002,0xD003]" # dop421, 5000 = 0x1388 @@ -109,8 +109,9 @@ sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_clear_counts [True] # Setup WG # Use full scale WG aplitude +PI=3.141592654 WG_AMPL=1.0 -WG_PHASE=0.0 +WG_PHASE=PI/2 WG_PERIODS_PER_BLOCK=1.0 WG_FREQ=${WG_PERIODS_PER_BLOCK}/${N_RS}/${T_ADC} @@ -275,16 +276,141 @@ sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r ddr_rd_fifo_usedw # --host ${SDPTR_IP} : Control computer IP address # --port ${SDPTR_PORT} : Control computer port # -vv : verbosity level +# --dest dop421 : Combines --ip and --mac of dump destination computer # --wgPeriods 1.0 : Number of WG sinus periods per raw samples block. # --record 5 : Record for number of seconds -# --dest dop421 : Combines --ip and --mac of dump destination computer # --Mbps 10 : Dump rate in Mbps per node # --dumpOffset 0 : Dump start page offset in number of pages with respect to tbuf_recorded_first_page. # --dumpInterval 10 : Dump interval in number of pages, one packet (= page) contains 10 us of raw data. Default # 0 pages for no dump. # --nodes 0,1 : Nodes in range(N_FPGA=16) that will dump. Default empty range for no dump # --antennas 0,1,5 : Antenna inputs in range(A_PN=6) that will be dumped per node. Default empty range for no dump +# --repeat 1 : Repeat record, freeze, dump process repeat times, default repeat = 1 # --sequential True : Default True to dump from nodes in sequential order, else dump from all nodes in parallel +############################################################################ +# Demo + +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -r firmware_version + +SDPTR_IP=localhost +SDPTR_PORT=4840 + +N_FPGA=4 +SRC_MACs="['00:22:86:08:00:00','00:22:86:08:00:01','00:22:86:08:00:02','00:22:86:08:00:03']" +SRC_IPs="['192.168.0.01','192.168.0.02','192.168.0.03','192.168.0.04']" +SRC_PORTs="[0xD000,0xD001,0xD002,0xD003]" + +# dop421, 5000 = 0x1388 +DEST_MACs="['00:15:17:AA:22:9C']*${N_FPGA}" +DEST_IPs="['10.99.0.254']*${N_FPGA}" +DEST_PORTs=[5000]*${N_FPGA} +DUMP_NOF_BPS=[4000000000]*${N_FPGA} + +S_PN=12 +T_ADC=5e-9 +N_RS=2000 +T_PAGE=${N_RS}*${T_ADC} + +# Monitor DDR4, expect calibrated, never FIFO full, current FIFOs empty, buffer size +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r ddr_calibrated +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r ddr_fifo_full +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r ddr_wr_fifo_usedw +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r ddr_rd_fifo_usedw +# Disable recording +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_record_enable "[False]*${N_FPGA}" +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_record_enable + +# Disable dump +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_dump_enables [0]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_dump_enables + +# Read TBuf state (ecpect 515 for idle and tbuf_full) +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_state +tbuf_dump.py --host ${SDPTR_IP} --port ${SDPTR_PORT} --dest dop421 --state -v -n 0 + +############################################################################ +# Record + +# Enable recording +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_record_enable [True]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_record_busy + +# Monitor input RSN +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_signal_input_rsn +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_signal_input_nof_blocks + +# Read TBuf state +tbuf_dump.py --host ${SDPTR_IP} --port ${SDPTR_PORT} --dest dop421 --state -v -n 0 + +############################################################################ +# Freeze + +# Disable recording +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_record_enable "[False]*${N_FPGA}" +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_record_busy + +# Read buffer status +# . tbuf_recorded_nof_pages = tbuf_nof_pages_in_buffer +# tbuf_recorded_last_page = tbuf_recorded_first_page +# . tbuf_recorded_time_interval = tbuf_recorded_nof_pages * tbuf_nof_samples_per_page * T_adc +# tbuf_recorded_time_interval = tbuf_recorded_last_timestamp - tbuf_recorded_first_timestamp +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_nof_pages_in_buffer +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_recorded_nof_pages +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_recorded_first_page +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_recorded_last_page +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_recorded_time_interval +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_recorded_first_timestamp +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_recorded_last_timestamp + +############################################################################ +# Dump + +# Dump parameters, one page is T_PAGE = 10 us +T_DUMP_OFFSET=1e-3 +T_DUMP_INTERVAL=170e-6 + +# Fill in read tbuf_recorded_first_timestamp for T_RECORDED_FIRST_TIMESTAMP +T_RECORDED_FIRST_TIMESTAMP=1748738195.5965724 +T_DUMP_START_TIMESTAMP=${T_RECORDED_FIRST_TIMESTAMP}+${T_DUMP_OFFSET} + +# Dump enables for one or max A_PN = 6 antennas per node +A_EN=0x1 + +# Set dump start time and dump interval +T_DUMP_START_TIMESTAMP=${T_RECORDED_FIRST_TIMESTAMP}+${T_DUMP_OFFSET} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_dump_start_timestamp [${T_DUMP_START_TIMESTAMP}]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_dump_time_interval [${T_DUMP_INTERVAL}]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_dump_start_timestamp +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_dump_time_interval + +# Read dump pages range +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_dump_start_page +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_dump_nof_packets + +# Start tcpdump capture in other terminal +sudo tcpdump -vvXXSnelfi enp1s0f0np0 port 5000 > tcpdump.txt + +# Clear total counts +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_clear_counts [True]*${N_FPGA} + +# Dump disable +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_dump_enables [0]*${N_FPGA} +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_state + +# Dump enable +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -w tbuf_dump_enables [${A_EN}]*${N_FPGA} + +# Wait until dump done +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_dump_done + +# Read TBuf dump state +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_memory_read_nof_packets +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_memory_read_nof_crc_errors +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_memory_read_nof_rsn_errors +sdp_rw.py --host ${SDPTR_IP} --port ${SDPTR_PORT} -v -r tbuf_output_nof_packets +# Grep tcpdump capture in other terminal +cat tcpdump.txt | grep IPv4 +cat tcpdump.txt | grep IPv4 | wc diff --git a/test/py/control/view_data_buffer.py b/test/py/control/view_data_buffer.py new file mode 100644 index 0000000000000000000000000000000000000000..50b11c06c54e3ad3a3bc8784d828903b47f56453 --- /dev/null +++ b/test/py/control/view_data_buffer.py @@ -0,0 +1,65 @@ +#! /usr/bin/env python3 + +# ########################################################################## +# Copyright 2023 +# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/> +# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ########################################################################## + +# ########################################################################## +# Author: +# . Eric Kooistra +# Purpose: +# . Read data buffer file +# Description: +# . Capture signal input data buffer: +# sdp_rw.py --host 10.151.255.1 --port 4842 -r signal_input_data_buffer > x.txt +# . The data has format [4, 12*1024](int16) +# . Print first data of each signal input to see their alignment, in case +# ADC is setup to output synchronized test sequence. +# ########################################################################## + +import argparse +import numpy as np + +# Parse arguments to derive user parameters +_parser = argparse.ArgumentParser('view_data_buffer') +_parser.add_argument('-f', default='x.txt', type=str, help='Filename') +args = _parser.parse_args() + +# Read data captured with sdp_rw.py -r signal_input_data_buffer +with open(args.f, "r") as fp: + fileLines = fp.readlines() + +# Parse x.txt into signal_inputs array +N_pn = len(fileLines) - 1 +S_pn = 12 +S_adc = N_pn * S_pn +N_points = 1024 +shape = (N_pn * S_pn, N_points) +signal_inputs = np.zeros(shape, np.int16) +for li, line in enumerate(fileLines): + # skip first line + ni = li - 1 + if ni >= 0: + data = line.split(':')[1] + data = data.split() + for si in range(S_pn): + for di in range(N_points): + signal_inputs[ni * S_pn + si][di] = data[si * N_points + di] + +# Print first data for each signal input +for si in range(S_adc): + print('%2d : %s' % (si, signal_inputs[si][9:15]))