Skip to content
Snippets Groups Projects

Added BER test

Merged Reinier van der Walle requested to merge L2SDP-599 into master
+ 110
13
@@ -49,10 +49,102 @@ class Jesd(object):
self.logger = logging.getLogger("Jesd")
self.cwPeriod = 16 # nof 200MHz samples per CW sinus period (12.5 MHz)
self.testCycleDuration = 3 # Each test cycle takes testCycleDuration seconds, so the test will do about round_up(--mtime / testCycleDuration) cycles.
self.rstTestCycleDuration = 3 # Each test cycle takes rstTestCycleDuration seconds, so the test will do about round_up(--mtime / rstTestCycleDuration) cycles.
self.berTestCycleDuration = 60 # Each test cycle takes berTestCycleDuration seconds, so the test will do about round_up(--mtime / berTestCycleDuration) cycles.
def jesd_ber_test(self, node_list, sp_list, mtime):
"""Executes JESD Bit Error Rate test. Captures err0 and err1
registers every berTestCycleDuration seconds for a total time
of mtime seconds. Finally calculates the BER."""
jesd_stats = {
"csr_rbd_count": [],
"csr_dev_syncn": [],
"rx_err0": [],
"rx_err1": [],
}
cycle_index = 0
startTime = time.time()
start = time.time()
elapsed_time = 0.0
err_cnt = 0
bit_err_cnt = 0 # Counts all bit errors, fatal or non-fatal.
fatal_bit_err_cnt = 0 # Counts only fatal bit errors.
err0_list = []
err1_list = []
BER = 0 # Bit Error Rate including fatal and non-fatal bit errors.
fatalBER = 0 # Bit Error Rate including the fatal bit errors only.
try:
while elapsed_time < mtime: # Run for mtime seconds
while start > time.time(): # Sleep until berTestCycleDuration seconds are passed.
time.sleep(0.001)
start = start + self.berTestCycleDuration
cycle_index = cycle_index + 1
# Reset when encountered error
if (np.count_nonzero(err0_list) + np.count_nonzero(err1_list)) > 0:
self.logger.error(f"Error detected at cycle {cycle_index - 1}. err0 = {err0_list} , err1 = {err1_list}")
wasBitError = [((err & 0x300) >> 8) != 0 for err in err1_list] # ECC or Fatal ECC error detected
wasCorrected = [((err & 0x100) >> 8) != 0 for err in err1_list] # No Fatal ECC error detected, only corrected ECC.
if wasBitError:
self.logger.error(f"Error included {['UN',''][int(wasCorrected)]}recoverable bit error!")
bit_err_cnt += 1
fatal_bit_err_cnt = (fatal_bit_err_cnt + 1) if not wasCorrected else fatal_bit_err_cnt
self.client.write("processing_enable", [True] * self.client.n_nodes) # use processing_enable = True to reset JESD IP.
time.sleep(2)
# Read JESD stats
jesd_stats["csr_rbd_count"].append(self.client.read("jesd204b_csr_rbd_count"))
jesd_stats["csr_dev_syncn"].append(self.client.read("jesd204b_csr_dev_syncn"))
jesd_stats["rx_err0"].append(self.client.read("jesd204b_rx_err0"))
jesd_stats["rx_err1"].append(self.client.read("jesd204b_rx_err1"))
err0_list = []
err1_list = []
for node in node_list:
for sp in sp_list:
err0_list.append(jesd_stats["rx_err0"][-1][c_sdp.S_pn * node + sp])
err1_list.append(jesd_stats["rx_err1"][-1][c_sdp.S_pn * node + sp])
elapsed_time = time.time() - startTime
except KeyboardInterrupt:
print(" user hit ctrl-c")
# Validate
err0_list = []
err1_list = []
for node in node_list:
for sp in sp_list:
err0_list.append([err[c_sdp.S_pn * node + sp] for err in jesd_stats["rx_err0"]])
err1_list.append([err[c_sdp.S_pn * node + sp] for err in jesd_stats["rx_err1"]])
err0_cnt = np.count_nonzero(err0_list)
err1_cnt = np.count_nonzero(err1_list)
self.logger.info(f"Done validating JESD: {cycle_index} cycles")
self.logger.info(f"amount of err0: {err0_cnt}, amount of err1: {err1_cnt}")
BER = bit_err_cnt / (c_sdp.W_adc * c_sdp.f_adc_MHz * 10**6 * elapsed_time)
fatalBER = fatal_bit_err_cnt / (c_sdp.W_adc * c_sdp.f_adc_MHz * 10**6 * elapsed_time)
self.logger.info(f"BER = {BER}")
self.logger.info(f"Fatal BER = {fatalBER}")
result = ((err0_cnt + err1_cnt) == 0)
if result:
self.logger.info("SUCCESS")
else:
self.logger.error("Failed")
self.logger.info(f"err0 list = {err0_list}\nerr1 list = {err1_list}")
return result
def jesd_reset_test(self, node_list, sp_list, mtime):
"""mtime: monitor runtime in seconds"""
"""Executes JESD Reset test. Captures databuffer data after
every reset then calculates and verifies the statistics.
executed every rstTestCycleDuration seconds for a total time
of mtime seconds. Finally plots the statistics."""
adc_h = ADCFunctions()
adc_h.set_nof_bits(c_sdp.W_adc)
@@ -66,7 +158,7 @@ class Jesd(object):
"rx_err0": [],
"rx_err1": [],
}
nof_runs = 0
cycle_index = 0
startTime = time.time()
start = time.time()
elapsed_time = 0.0
@@ -75,8 +167,8 @@ class Jesd(object):
while elapsed_time < mtime:
while start > time.time():
time.sleep(0.001)
start = start + self.testCycleDuration
nof_runs = nof_runs + 1
start = start + self.rstTestCycleDuration
cycle_index = cycle_index + 1
previous_bsns = self.client.read("signal_input_bsn")[node_list[0]:node_list[-1] + 1]
# Processing on / reset
@@ -122,11 +214,11 @@ class Jesd(object):
# Plot
self.logger.info("plot results")
adc_h.plot_clock_cw_statistics(nof_runs, fignr = 2, statStr = 'dcs')
adc_h.plot_clock_cw_statistics(nof_runs, fignr = 3, statStr = 'phases', cwPeriod = self.cwPeriod)
adc_h.plot_clock_cw_statistics(nof_runs, fignr = 4, statStr = 'amplitudes')
adc_h.plot_clock_cw_statistics(nof_runs, fignr = 5, statStr = 'noisepeaks')
adc_h.plot_clock_cw_statistics(nof_runs, fignr = 6, statStr = 'snrs')
adc_h.plot_clock_cw_statistics(cycle_index, fignr = 2, statStr = 'dcs')
adc_h.plot_clock_cw_statistics(cycle_index, fignr = 3, statStr = 'phases', cwPeriod = self.cwPeriod)
adc_h.plot_clock_cw_statistics(cycle_index, fignr = 4, statStr = 'amplitudes')
adc_h.plot_clock_cw_statistics(cycle_index, fignr = 5, statStr = 'noisepeaks')
adc_h.plot_clock_cw_statistics(cycle_index, fignr = 6, statStr = 'snrs')
# Validate
err0_list = []
@@ -138,7 +230,7 @@ class Jesd(object):
err0_cnt = np.count_nonzero(err0_list)
err1_cnt = np.count_nonzero(err1_list)
self.logger.info(f"Done validating JESD: {nof_runs} resets")
self.logger.info(f"Done validating JESD: {cycle_index} resets")
self.logger.info(f"amount of err0: {err0_cnt}, amount of err1: {err1_cnt}")
max_phases = [np.amax(np.transpose(adc_h.clkCwPhases)[si]) for si in range(len(node_list) * len(sp_list))]
@@ -152,8 +244,8 @@ class Jesd(object):
self.logger.info("SUCCESS")
else:
self.logger.error("Failed")
self.logger.error(f"err0 list = {err0_list}\nerr1 list = {err1_list}")
self.logger.error(f"BSN running = {bsn_running}")
self.logger.info(f"err0 list = {err0_list}\nerr1 list = {err1_list}")
self.logger.info(f"BSN running = {bsn_running}")
return result
@@ -179,6 +271,10 @@ def main():
result = jesd.jesd_reset_test(node_list, sp_list, args.mtime)
# Convert False->1 and True->0
exit_state = int(not result)
if args.validate_jesd_ber:
result = jesd.jesd_ber_test(node_list, sp_list, args.mtime)
# Convert False->1 and True->0
exit_state = int(not result)
except BaseException as err:
exit_state = base.handle_exception(err)
@@ -211,6 +307,7 @@ if __name__ == "__main__":
)
parser.add_argument("--validate-jesd-reset", action = "store_true", help = "validate the jesd interface after reset repeatedly for --mtime seconds")
parser.add_argument("--validate-jesd-ber", action = "store_true", help = "validate the jesd interface measuring bit error rate --mtime seconds")
args = parser.parse_args()
Loading