Skip to content
Snippets Groups Projects
Select Git revision
  • 9992dbf45245bdb622b8561fa755d93e2e3f3c5d
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_tmssapp_specification_REST_API.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    tc_bf_unit.py 18.00 KiB
    #! /usr/bin/env python
    ###############################################################################
    #
    # Copyright (C) 2012
    # ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
    # P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <http://www.gnu.org/licenses/>.
    #
    ###############################################################################
    
    """Test case for the bf_unit entity.
    
       Description:
    
       This testcase is used in conjunction with the tb_bf_unit.vhd file.
    
       Generics for the VHDL testbench are passed to modelsim to configure the
       current simulation.
       To observe the VHDL signals in the wave window modelsim needs to be
       started manually and then c_start_modelsim must be set to 0 and the generics
       also must be set manually.
    
       NOTE: Use c_write_bg, c_write_bf_weights and c_write_bf_ss_wide to en- or disable 
       the time-consuming writes to the mm-interface. WHen disabled, be sure to
       have the according .hex files in place. The .hex files can be generated 
       with the bf_unit.py script. 
       
       BEWARE: If the hex files change due to different script settings, then it is
       necessary to run the script again to ensure that Modelsim starts with the
       latest hex files.
       
       Usage:
    
       > python tc_bf_unit.py --unb 0 --fn 0 --sim 
    
    """
    
    ###############################################################################
    # System imports
    import test_case
    import node_io
    import unb_apertif as apr
    import pi_diag_block_gen
    import pi_diag_data_buffer
    import pi_ss_ss_wide
    import pi_bf_bf
    import dsp_test
    import sys, os
    import subprocess
    import pylab as pl
    import numpy as np
    import scipy as sp
    import random
    from tools import *
    from common import *  
    import mem_init_file
    
    ###############################################################################
    
    # Debug control
    c_debug_print = False 
    
    # Create a test case object
    tc = test_case.Testcase('TB - ', '')
    
    # Constants/Generics that are shared between VHDL and Python
    # Name                   Value   Default   Description
    # START_VHDL_GENERICS
    c_nof_signal_paths     =  64      # 64
    c_nof_input_streams    =  16      # 16 
    c_nof_subbands         =  24      # 24
    c_nof_weights          = 256      # 256
    c_nof_bf_units         =   4      # 4
    c_in_dat_w             =  16      # 16
    c_in_weight_w          =  16      # 16
    c_stat_data_w          =  56      # 56
    c_stat_data_sz         =   2      # 2
    c_bf_weights_file_name = "../../../src/hex/weights" #  -- "UNUSED" or relative path to e.g. the bf/src/hex/weights hex file for adr_w=8 and dat_w=32
    c_ss_wide_file_prefix  = "../../../src/hex/ss_wide"
    # END_VHDL_GENERICS
    
    tc.append_log(3, '>>>')
    tc.append_log(1, '>>> Title : Test bench for bf_unit' )
    tc.append_log(3, '>>>')
    tc.append_log(3, '')
    tc.set_result('PASSED')
    
    tc.append_log(3, '')
    tc.append_log(3, '>>> VHDL generic settings:')
    tc.append_log(3, '')
    tc.append_log(3, ' c_nof_signal_paths     = %d' % c_nof_signal_paths    )
    tc.append_log(3, ' c_nof_input_streams    = %d' % c_nof_input_streams   )
    tc.append_log(3, ' c_nof_subbands         = %d' % c_nof_subbands        )
    tc.append_log(3, ' c_nof_weights          = %d' % c_nof_weights         )
    tc.append_log(3, ' c_nof_bf_units         = %d' % c_nof_bf_units        )
    tc.append_log(3, ' c_in_dat_w             = %d' % c_in_dat_w            )
    tc.append_log(3, ' c_in_weight_w          = %s' % c_in_weight_w         )
    tc.append_log(3, ' c_stat_data_w          = %d' % c_stat_data_w         )
    tc.append_log(3, ' c_stat_data_sz         = %d' % c_stat_data_sz        )
    tc.append_log(3, ' c_bf_weights_file_name = %s' % c_bf_weights_file_name)
    tc.append_log(3, ' c_ss_wide_file_prefix  = %s' % c_ss_wide_file_prefix )
    tc.append_log(3, '')
    
    ###############################################################################
    # Python specific constants 
    
    c_nof_frames = 1             
    c_nof_signal_paths_per_stream = c_nof_signal_paths/c_nof_input_streams
    c_nof_subbands_per_stream     = c_nof_subbands*c_nof_signal_paths_per_stream 
    
    c_slice_size         = 2**ceil_log2(c_nof_weights)
    
    # Block Generator
    c_blocks_per_sync    = 7
    
    c_bg_nof_streams     = c_nof_input_streams
    c_bg_ram_size        = 2**ceil_log2(c_nof_subbands_per_stream)
    
    # Weights
    c_weight_w           = 10      # Use smaller weight values in order to stay with the limits of the 32-bit databuffers after summing c_nof_input_streams 
    
    # Data Buffer
    c_db_nof_streams     = c_nof_signal_paths
    c_db_ram_size        = c_slice_size
    
    c_dp_clk_freq_mhz    = 200
    c_block_period_ns    = c_slice_size*1e3/c_dp_clk_freq_mhz    # BSN block time in ns
    c_sync_interval_ns   = c_block_period_ns*c_blocks_per_sync   # sync interval in ns
    
    # Settings for the script in order to fasten the simulation.
    c_gen_hex_files      = True   # Use True to create hex files for BG, BF weights and SS wide, use False when the files are in place.
    
    # When following constants are set to False, be sure there are .hex files!!! 
    c_write_bg           = False  # When set to True the waveform will be written to the block generator over the MM bus. When False this will be skipped.
    c_write_bf_weights   = True  # When set to True the weights will be written via the MM bus. When False this will be skipped. 
    c_write_bf_weights   = False
    c_write_bf_ss_wide   = False  # When set to True the setting for ss_wide will be written via the MM bus. When False this will be skipped. 
    
    # Create access object for nodes
    io = node_io.NodeIO(tc.nodeImages, tc.base_ip)
    
    # Create block generator instance
    bg = pi_diag_block_gen.PiDiagBlockGen(tc, io, c_bg_nof_streams, c_bg_ram_size)
    
    # Create databuffer instances
    db_re = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'REAL', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
    db_im = pi_diag_data_buffer.PiDiagDataBuffer(tc, io, instanceName = 'IMAG', nofStreams=c_db_nof_streams, ramSizePerStream=c_db_ram_size)
    
    # Create dsp_test instance for helpful methods
    dsp_test_bg      = dsp_test.DspTest(inDatW=c_in_dat_w)
    dsp_test_weights = dsp_test.DspTest(inDatW=c_weight_w)
    
    # Create beamformer instance
    bf = pi_bf_bf.PiBfBf(tc, io, c_nof_weights, c_nof_signal_paths, c_nof_input_streams, c_stat_data_w, c_stat_data_sz)
    
    tc.append_log(3, '')
    tc.append_log(3, '>>> BF unit requantization settings:')
    tc.append_log(3, '')
    tc.append_log(3, ' bf.sum_of_prod_w  = %d' % bf.sum_of_prod_w)
    tc.append_log(3, ' bf.sign_w         = %d' % bf.sign_w       )
    tc.append_log(3, ' bf.unit_w         = %d' % bf.unit_w       )
    tc.append_log(3, ' bf.prod_w         = %d' % bf.prod_w       )
    tc.append_log(3, ' bf.sum_w          = %d' % bf.sum_w        )
    tc.append_log(3, ' bf.bst_gain_w     = %d' % bf.bst_gain_w   )
    tc.append_log(3, ' bf.bst_dat_w      = %d' % bf.bst_dat_w    )
    tc.append_log(3, ' bf.bst_lsb_w      = %d' % bf.bst_lsb_w    )
    tc.append_log(3, ' bf.out_gain_w     = %d' % bf.out_gain_w   )
    tc.append_log(3, ' bf.out_dat_w      = %d' % bf.out_dat_w    )
    tc.append_log(3, ' bf.out_lsb_w      = %d' % bf.out_lsb_w    )
    tc.append_log(3, '')
    
    # Functions to generate hex-files before simulation starts
    def gen_data_and_hex_files_bg(gen_hex=True, sel='noise', ampl=1.0):
        multiStream = []
        ampl = ampl * 1.0 # Force to float
        for i in xrange(c_nof_input_streams): 
            singleStream_real = dsp_test_bg.create_waveform(sel, ampl, seed=2*i, noiseLevel=0, length=c_nof_subbands_per_stream)
            singleStream_imag = dsp_test_bg.create_waveform(sel, ampl, seed=2*i+1, noiseLevel=0, length=c_nof_subbands_per_stream)
            singleStream_real = dsp_test_bg.quantize_waveform(singleStream_real)
            singleStream_imag = dsp_test_bg.quantize_waveform(singleStream_imag)
            singleStream      = dsp_test_bg.concatenate_two_lists(singleStream_real, singleStream_imag, c_in_dat_w)
            if c_debug_print and i==0:
                print "singleStream_real = %s" % singleStream_real
            if (gen_hex == True):
                filename = "../../src/hex/tb_bg_dat_" + str(i) + ".hex"
                mem_init_file.list_to_hex(list_in=singleStream, filename=filename, mem_width=2*c_in_dat_w, mem_depth=2**(ceil_log2(c_nof_subbands_per_stream)))
            multiStream.append(singleStream)
        return multiStream
    
    def gen_data_and_hex_files_bf_weights(gen_hex=True, sel='noise', ampl=1.0):    
        weightsBfUnit=[]
        ampl = ampl * 1.0 # Force to float
        for i in range(c_nof_signal_paths):
            singleList_real   = dsp_test_weights.create_waveform(sel, ampl, seed=2*i, noiseLevel=0, length=c_nof_weights)
            singleList_imag   = dsp_test_weights.create_waveform(sel, ampl, seed=2*i+1, noiseLevel=0, length=c_nof_weights)
            singleList_real   = dsp_test_weights.quantize_waveform(singleList_real)
            singleList_imag   = dsp_test_weights.quantize_waveform(singleList_imag)
            if c_debug_print and i==0:
                print "singleList_real = %s" % singleList_real
            weightsSignalPath = dsp_test_weights.concatenate_two_lists(singleList_real, singleList_imag, c_in_weight_w)
            if (gen_hex == True):
                filename = "../../src/hex/weights_" + str(i) + ".hex"
                mem_init_file.list_to_hex(list_in=weightsSignalPath, filename=filename, mem_width=2*c_in_weight_w, mem_depth=c_nof_weights)
            weightsBfUnit.append(weightsSignalPath)
        return weightsBfUnit    
        
    def gen_data_and_hex_files_bf_ss_wide(gen_hex=True):        
        # Apply simple SS wide scheme to select e.g. nof_beams_per_subband=4 sets of equal subbands per bf_unit.
        # . The nof_beams_per_subband must be <= c_nof_subbands=24, because that is the maximum number of different subbands that is available
        # . In this simple scheme the nof_beams_per_subband needs to be a divider of c_nof_weights=256, so a power of 2
        nof_beams_per_subband = 4
        select_buf = []
        for i in range(c_nof_signal_paths_per_stream):  # iterates over the number of single ss units
            select_buf_line = []
            for j in range(nof_beams_per_subband):
                for k in range(c_nof_weights/nof_beams_per_subband): 
                    select_buf_line.append(i*c_nof_subbands + j) 
            if (gen_hex == True): 
                filename = "../../src/hex/ss_wide_" + str(i) + ".hex"
                mem_init_file.list_to_hex(list_in=select_buf_line, filename=filename, mem_width=ceil_log2(c_nof_subbands_per_stream), mem_depth=c_nof_weights)
            select_buf.append(select_buf_line)
        return select_buf    
    
    if __name__ == "__main__":      
        #do_until_gt(io.simIO.getSimTime, 100, s_timeout=3600)
        
        ###############################################################################
        #
        # Create stimuli for the BG
        #
        ###############################################################################
        # Prepare x stimuli for block generator
        x_complex_arr = []
        x_re_arr    = []
        x_im_arr    = []
        bg_data_arr = gen_data_and_hex_files_bg(c_gen_hex_files)
        for i in xrange(c_nof_input_streams):
            dataListComplex = bg.convert_concatenated_to_complex(bg_data_arr[i], c_in_dat_w)
            x_complex_arr.append(dataListComplex)
            [x_re_list, x_im_list] = split_complex(dataListComplex)
            x_re_arr.append(x_re_list)
            x_im_arr.append(x_im_list)  
    
        ################################################################################
        ##
        ## Create settings for ss_wide in bf_unit
        ##
        ################################################################################
        select_buf = gen_data_and_hex_files_bf_ss_wide(c_gen_hex_files) 
       
        ###############################################################################
        #
        # Create weightfactors
        #
        ###############################################################################
        weight_complex_arr = []
        weight_re_arr = []
        weight_im_arr = []
        weightsListComplex = []
        generated_weights = gen_data_and_hex_files_bf_weights(c_gen_hex_files)
        for j in range(c_nof_signal_paths):
            weightsSignalPathComplex = bg.convert_concatenated_to_complex(generated_weights[j], c_in_weight_w)
            weight_complex_arr.append(weightsSignalPathComplex)
            [weight_re_list, weight_im_list] = split_complex(weightsSignalPathComplex)
            weight_re_arr.append(weight_re_list)
            weight_im_arr.append(weight_im_list)
            if c_debug_print and j==0:
                print "generated_weights = %s" % generated_weights[j]
                print "weightsSignalPathComplex = %s" % weightsSignalPathComplex
                print "weight_re_list = %s" % weight_re_list
        
        ###############################################################################
        #
        # Calculate expected results
        #
        ###############################################################################
        # Determine the data input for the multipliers. 
        bf_in_re = []
        bf_in_im = []
        for i in range(c_nof_input_streams):  
            for j in range(c_nof_signal_paths_per_stream):    
                bf_in_re.append(bf.ss_wide[i].subband_select(x_re_arr[i], select_buf[j]))
                bf_in_im.append(bf.ss_wide[i].subband_select(x_im_arr[i], select_buf[j]))  
        
        # Calculate the expected raw beamlets
        exp_beamlets_raw = []
        for i in range(c_nof_weights):
            beamlet = complex(0,0)
            for j in range(c_nof_signal_paths):
                beamlet = beamlet + complex(bf_in_re[j][i], bf_in_im[j][i]) * complex(weight_re_arr[j][i], weight_im_arr[j][i])  
            exp_beamlets_raw.append(beamlet)
        if c_debug_print:
            print "exp_beamlets_raw = %s" % exp_beamlets_raw
    
        # Calculate expected beamlet statistics
        bf_unit_stats_exp = bf.calculate_bf_unit_statistics(x_complex_arr, select_buf, weight_complex_arr, c_blocks_per_sync, use_16b=True)
        if c_debug_print:
            print "bf_unit_stats_exp = %s" % bf_unit_stats_exp
    
        #braak
    
        ################################################################################
        ##
        ## Write the selection buffers
        ##
        ################################################################################
        if c_write_bf_ss_wide:
            for i in range(c_nof_input_streams): 
                bf.ss_wide[i].write_selects(flatten(select_buf))
    
        ################################################################################
        ##
        ## Write the weights
        ##
        ################################################################################
        if c_write_bf_weights:
            for i in range(c_nof_signal_paths): 
                bf.write_weights(generated_weights[i], i) 
                bf.read_weight(i, weightNr=0)
                
        ################################################################################
        ##
        ## Write data and settings to block generator
        ##
        ################################################################################
        # Write setting for the block generator:
        bg.write_block_gen_settings(samplesPerPacket=c_nof_subbands_per_stream, blocksPerSync=c_blocks_per_sync, gapSize=160, memLowAddr=0, memHighAddr=c_nof_subbands_per_stream-1, BSNInit=0)
        
        # Write the stimuli to the block generator and enable the block generator
        if(c_write_bg):
            for i in range(c_nof_input_streams):
                bg.write_waveform_ram(data=bg_data_arr[i], channelNr= i)
        bg.write_enable()
        
        # Poll the databuffer to check if the response is there.
        # Retry after 3 seconds so we don't issue too many MM reads in case of simulation.
        do_until_ge(db_re.read_nof_words, ms_retry=3000, val=c_nof_weights, s_timeout=3600)
        
        ###############################################################################
        #
        # Read and verify beamlet output from data buffer
        #
        ###############################################################################
        db_out_re = []
        db_out_im = []
        db_out_re.append(flatten(db_re.read_data_buffer(streamNr=0, n=c_nof_weights, radix='dec', width=32, nofColumns=8)))
        db_out_im.append(flatten(db_im.read_data_buffer(streamNr=0, n=c_nof_weights, radix='dec', width=32, nofColumns=8)))
        
        db_out_re = flatten(db_out_re)
        db_out_im = flatten(db_out_im)     
        
        err = 0
        for i in range(c_nof_weights): 
            if db_out_re[i] != exp_beamlets_raw[i].real: 
                tc.append_log(3, str(i) + " Read: db_out_re[i] = " + str(db_out_re[i]) + " Expected: exp_beamlets_raw[i].real = " + str(exp_beamlets_raw[i].real))
                tc.set_result('FAILED')
                err += 1
            if db_out_im[i] != exp_beamlets_raw[i].imag:     
                tc.append_log(3, str(i) + " Read: db_out_im[i] = " + str(db_out_im[i]) + " Expected: exp_beamlets_raw[i].imag = " + str(exp_beamlets_raw[i].imag))   
                tc.set_result('FAILED')        
                err += 1
        tc.append_log(3, "number of errors: %d" % err)
        
        
        ###############################################################################
        #
        # Read and verify BST
        #
        ###############################################################################
        
        # Run simulator to into the second sync interval to ensure a fresh second capture in diag_data_buffer with stable WPFB output (because c_sync_interval_ns > c_wpfb_response_ns)
        do_until_gt(io.simIO.getSimTime, c_sync_interval_ns*1.5, s_timeout=3600)
        
        bf_unit_stats_read = flatten(bf.st.read_stats(vLevel=3))
        
        if bf_unit_stats_exp != bf_unit_stats_read: 
            tc.append_log(3, "Unexpected read BF statistics for blocks_per_sync = %d" % c_blocks_per_sync)
            tc.append_log(3, "Expected: %s" % bf_unit_stats_exp)
            tc.append_log(3, "Read: %s" % bf_unit_stats_read)
            tc.set_result('FAILED')
        
        ###############################################################################
        # End
        tc.set_section_id('')
        tc.append_log(3, '')
        tc.append_log(3, '>>>')
        tc.append_log(0, '>>> Test bench result: %s' % tc.get_result())
        tc.append_log(3, '>>>')
        
        sys.exit(tc.get_result())