Skip to content
Snippets Groups Projects

Resolve L2SDP-814

Merged Reinier van der Walle requested to merge L2SDP-814 into master
1 file
+ 145
36
Compare changes
  • Side-by-side
  • Inline
+ 145
36
@@ -25,6 +25,8 @@
# . Control, monitor and test JESD interface on SDP firmware
# Description:
# . run ./jesd.py -h for help
# . See ADCfunctions for how the clock_cw_statistics are determined from the
# known common RCU input CW.
# ##########################################################################
import sys
@@ -48,7 +50,7 @@ class Jesd(object):
self.rstTestCycleDuration = 3 # Each test cycle takes rstTestCycleDuration seconds, so the test will do about round_up(--mtime / rstTestCycleDuration) cycles.
self.berTestCycleDuration = 60 # Each test cycle takes berTestCycleDuration seconds, so the test will do about round_up(--mtime / berTestCycleDuration) cycles.
def jesd_ber_test(self, node_list, sp_list, mtime):
def jesd_ber_test(self, global_sp_list, mtime):
"""Executes JESD Bit Error Rate test. Captures err0 and err1
registers every berTestCycleDuration seconds for a total time
of mtime seconds. Finally calculates the BER."""
@@ -98,11 +100,10 @@ class Jesd(object):
jesd_stats["rx_err1"].append(self.client.read("jesd204b_rx_err1"))
err0_list = []
err1_list = []
for node in node_list:
for sp in sp_list:
err0_list.append(jesd_stats["rx_err0"][-1][c_sdp.S_pn * node + sp])
err1_list.append(jesd_stats["rx_err1"][-1][c_sdp.S_pn * node + sp])
err1_list = []
for sp in global_sp_list:
err0_list.append(jesd_stats["rx_err0"][-1][sp])
err1_list.append(jesd_stats["rx_err1"][-1][sp])
elapsed_time = time.time() - startTime
@@ -112,10 +113,9 @@ class Jesd(object):
# Validate
err0_list = []
err1_list = []
for node in node_list:
for sp in sp_list:
err0_list.append([err[c_sdp.S_pn * node + sp] for err in jesd_stats["rx_err0"]])
err1_list.append([err[c_sdp.S_pn * node + sp] for err in jesd_stats["rx_err1"]])
for sp in global_sp_list:
err0_list.append([err[sp] for err in jesd_stats["rx_err0"]])
err1_list.append([err[sp] for err in jesd_stats["rx_err1"]])
err0_cnt = np.count_nonzero(err0_list)
err1_cnt = np.count_nonzero(err1_list)
@@ -136,11 +136,14 @@ class Jesd(object):
return result
def jesd_reset_test(self, node_list, sp_list, mtime):
def jesd_reset_test(self, node_list, global_sp_list, mtime):
"""Executes JESD Reset test. Captures databuffer data after
every reset then calculates and verifies the statistics.
executed every rstTestCycleDuration seconds for a total time
of mtime seconds. Finally plots the statistics."""
of mtime seconds. Finally plots the clock_cw_statistics
(based on CW phase in databuffer) and the JESD statistics
(from JESD IP registers). All RCU inputs have a common CW
input with cwPeriod."""
adc_h = ADCFunctions()
adc_h.set_nof_bits(c_sdp.W_adc)
@@ -183,12 +186,11 @@ class Jesd(object):
# Read JESD DB
samples = self.client.read("signal_input_data_buffer")
selected_samples = []
for node in node_list:
for sp in sp_list:
h = (c_sdp.S_pn * node + sp + 1) * c_sdp.V_si_db
l = (c_sdp.S_pn * node + sp) * c_sdp.V_si_db
selected_samples.append(samples[l:h])
selected_samples = []
for sp in global_sp_list: # Select samples for chosen ADC inputs, can be any global signal input index.
h = (sp + 1) * c_sdp.V_si_db
l = (sp) * c_sdp.V_si_db
selected_samples.append(samples[l:h])
# Process ADC samples
adc_h.jesd_words_2_samples(selected_samples)
@@ -218,19 +220,18 @@ class Jesd(object):
# Validate
err0_list = []
err1_list = []
for node in node_list:
for sp in sp_list:
err0_list.append([err[c_sdp.S_pn * node + sp] for err in jesd_stats["rx_err0"]])
err1_list.append([err[c_sdp.S_pn * node + sp] for err in jesd_stats["rx_err1"]])
err1_list = []
for sp in global_sp_list:
err0_list.append([err[sp] for err in jesd_stats["rx_err0"]])
err1_list.append([err[sp] for err in jesd_stats["rx_err1"]])
err0_cnt = np.count_nonzero(err0_list)
err1_cnt = np.count_nonzero(err1_list)
self.logger.info(f"Done validating JESD: {cycle_index} resets")
self.logger.info(f"amount of err0: {err0_cnt}, amount of err1: {err1_cnt}")
max_phases = [np.amax(np.transpose(adc_h.clkCwPhases)[si]) for si in range(len(node_list) * len(sp_list))]
min_phases = [np.amin(np.transpose(adc_h.clkCwPhases)[si]) for si in range(len(node_list) * len(sp_list))]
max_phases = [np.amax(np.transpose(adc_h.clkCwPhases)[si]) for si in range(len(global_sp_list))]
min_phases = [np.amin(np.transpose(adc_h.clkCwPhases)[si]) for si in range(len(global_sp_list))]
diff_phases = [max_p - min_p for (max_p, min_p) in zip(max_phases, min_phases)]
self.logger.info(f"Difference between max/min phase offset for each channel: {diff_phases}")
@@ -240,5+241,5 @@
self.logger.info("SUCCESS")
else:
self.logger.error("Failed")
self.logger.info(f"err0 list = {err0_list}\nerr1 list = {err1_list}")
self.logger.info(f"BSN running = {bsn_running}")
return result
def no_wg_jesd_reset_test(self, node_list, global_sp_list, mtime):
"""Executes JESD Reset test. Captures the JESD error registers after
every reset. Executed every rstTestCycleDuration seconds for a total time
of mtime seconds. Same as jesd_reset_test() but without reading the data
buffers and the clock_cw_phase checks. This no_wg_jesd_reset_test() can
be used without the need for a common CW input signal at the receiver
inputs."""
jesd_stats = {
"csr_rbd_count": [],
"csr_dev_syncn": [],
"rx_err0": [],
"rx_err1": [],
}
cycle_index = 0
startTime = time.time()
start = time.time()
elapsed_time = 0.0
bsn_running = True
try:
while elapsed_time < mtime:
while start > time.time():
time.sleep(0.001)
start = start + self.rstTestCycleDuration
cycle_index = cycle_index + 1
previous_bsns = self.client.read("signal_input_bsn")[node_list[0]:node_list[-1] + 1]
# Processing on / reset
self.client.write("processing_enable", [True] * self.client.n_nodes)
time.sleep(2)
# Check if BSN is increasing
current_bsns = self.client.read("signal_input_bsn")[node_list[0]:node_list[-1] + 1]
bsn_running = all([c > p for (c, p) in zip(current_bsns, previous_bsns)])
# Read JESD stats
csr_rbd_count = self.client.read("jesd204b_csr_rbd_count")
csr_dev_syncn = self.client.read("jesd204b_csr_dev_syncn")
rx_err0 = self.client.read("jesd204b_rx_err0")
rx_err1 = self.client.read("jesd204b_rx_err1")
jesd_stats["csr_rbd_count"].append(csr_rbd_count)
jesd_stats["csr_dev_syncn"].append(csr_dev_syncn)
jesd_stats["rx_err0"].append(rx_err0)
jesd_stats["rx_err1"].append(rx_err1)
# Define Error conditions
sel_err0 = [rx_err0[sp] for sp in global_sp_list]
sel_err1 = [rx_err1[sp] for sp in global_sp_list]
sel_csr_dev_syncn = [csr_dev_syncn[sp] for sp in global_sp_list]
sel_csr_rbd_count = [csr_rbd_count[sp] for sp in global_sp_list]
csr_rbd_count_too_high = [int(n > 8) for n in sel_csr_rbd_count]
# Log Errors
if np.count_nonzero(sel_err0) > 0:
self.logger.error(f"Encountered non-zero value in rx_err0:\n{sel_err0}")
if np.count_nonzero(sel_err1) > 0:
self.logger.error(f"Encountered non-zero value in rx_err1:\n{sel_err1}")
if np.count_nonzero(csr_rbd_count_too_high) > 0:
self.logger.error(f"Encountered csr_rbd_count > 8:\n{sel_csr_rbd_count}")
if np.count_nonzero(sel_csr_dev_syncn) < len(sel_csr_dev_syncn):
self.logger.error(f"Encountered dev_sync = 0 for a selected signal input:\n{sel_csr_dev_syncn}")
elapsed_time = time.time() - startTime
except KeyboardInterrupt:
print(" user hit ctrl-c")
# Validate
err0_list = []
err1_list = []
for sp in global_sp_list:
err0_list.append([err[sp] for err in jesd_stats["rx_err0"]])
err1_list.append([err[sp] for err in jesd_stats["rx_err1"]])
err0_cnt = np.count_nonzero(err0_list)
err1_cnt = np.count_nonzero(err1_list)
self.logger.info(f"Done validating JESD: {cycle_index} resets")
self.logger.info(f"amount of err0: {err0_cnt}, amount of err1: {err1_cnt}")
result = bsn_running and ((err0_cnt + err1_cnt) == 0)
if result:
self.logger.info("SUCCESS")
else:
self.logger.error("Failed")
self.logger.info(f"err0 list = {err0_list}\nerr1 list = {err1_list}")
self.logger.info(f"BSN running = {bsn_running}")
return result
def main():
exit_state = 0
client = base.OpcuaClient(args.host, args.port)
client.connect()
node_list = nodelist
node_list = args_node_list
if node_list is None:
node_list = client.node_list # if no --nodes in arguments, select all.
sp_list = splist
if sp_list is None:
sp_list = [i in range(c_sdp.S_pn)] # if no --sp in arguments, select all.
global_sp_list = args_global_sp_list
if global_sp_list is None:
global_sp_list = [c_sdp.S_pn * n + ch for n in node_list for ch in range(c_sdp.S_pn)] # if no --sp in arguments, select all.
client.set_mask(node_list)
logger.info("fpga mask = {}".format(client.get_mask())) # read back nodes set.
@@ -264,11 +356,15 @@ def main():
jesd = Jesd(client)
try:
if args.validate_jesd_reset:
result = jesd.jesd_reset_test(node_list, sp_list, args.mtime)
result = jesd.jesd_reset_test(node_list, global_sp_list, args.mtime)
# Convert False->1 and True->0
exit_state = int(not result)
if args.validate_no_wg_jesd_reset:
result = jesd.no_wg_jesd_reset_test(node_list, global_sp_list, args.mtime)
# Convert False->1 and True->0
exit_state = int(not result)
if args.validate_jesd_ber:
result = jesd.jesd_ber_test(node_list, sp_list, args.mtime)
result = jesd.jesd_ber_test(global_sp_list, args.mtime)
# Convert False->1 and True->0
exit_state = int(not result)
@@ -287,8 +383,10 @@ if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="".join(textwrap.dedent("""\
opcua client command line argument parser:
# Example for validating JESD on nodes 5:7 using channels 0:2
jesd.py --host 10.99.0.250 -n 5:7 -r 0:2 --validate-jesd-reset --mtime 3600 -vv\n""")),
# Example for validating JESD on nodes 5:7 using channels 0:2 on each of the selected nodes
jesd.py --host 10.99.0.250 -n 5:7 -r 0:2 --validate-jesd-reset --mtime 3600 -vv\n
Alternatively, you could use --globalsp to select the ADC inputs using the global index, max(0:192).
Note that only the corresponding nodes will be reset when using --gloablsp. \n""")),
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("--host", type = str, default = "localhost", help = "host to connect to")
@@ -296,7 +394,8 @@ if __name__ == "__main__":
parser.add_argument("-n", "--nodes", type = str, help = "nodes to use")
parser.add_argument("-v", action = "count", help = "verbosity -v = WARNING, -vv = INFO, -vvv = DEBUG")
parser.add_argument("-r", "--sp", type = str, help = "signal inputs to use")
parser.add_argument("-r", "--localsp", type = str, help = "channel inputs on each of the selected nodes can be max 0:11")
parser.add_argument("-g," "--globalsp", type = str, help = "global signal inputs to use instead of -n/-r, this has priority over -n and -r. Can be max 0:192")
parser.add_argument(
"--mtime",
type=int,
@@ -306,12 +405,22 @@ if __name__ == "__main__":
)
parser.add_argument("--validate-jesd-reset", action = "store_true", help = "validate the jesd interface after reset repeatedly for --mtime seconds")
parser.add_argument("--validate-no-wg-jesd-reset", action = "store_true", help = "validate the jesd interface after reset repeatedly for --mtime seconds using only the JESD status registers")
parser.add_argument("--validate-jesd-ber", action = "store_true", help = "validate the jesd interface measuring bit error rate --mtime seconds")
args = parser.parse_args()
nodelist = base.arg_str_to_list(args.nodes) if args.nodes else None
splist = base.arg_str_to_list(args.sp) if args.sp else None
args_node_list = base.arg_str_to_list(args.nodes) if args.nodes else None
args_ch_list = base.arg_str_to_list(args.localsp) if args.localsp else None
args_global_sp_list = base.arg_str_to_list(args.globalsp) if args.globalsp else None
if args_global_sp_list is not None:
args_node_list = list(np.sort(np.unique([i // 12 for i in args_global_sp_list])))
elif args_ch_list is not None:
args_global_sp_list = [c_sdp.S_pn * n + ch for n in args_node_list for ch in args_ch_list]
else: # select all sp for all selected nodes
args_global_sp_list = [c_sdp.S_pn * n + ch for n in args_node_list for ch in range(c_sdp.S_pn)]
verbosity_nr = 0 if args.v is None else args.v
LOGLEVEL = ["ERROR", "WARNING", "INFO", "DEBUG"]
log_level = eval("logging.{}".format(LOGLEVEL[verbosity_nr]))
Loading