Commit accb34d3 authored by Boudewijn Hut's avatar Boudewijn Hut
Browse files

Initial commit of user libraries for lsc

parent f9ae2c7c
Pipeline #33871 failed
RCU2L = [0, 1, 2, 3, 4, 5]
RCU2H = [8, 9]
# RCU2H = [8]
RCU2Hpwr = [8] # Should be even numbers
RCU2Hctl = [9] # Should be even numbers
# RCU2Hctl = [rcu + 1 for rcu in RCU2Hpwr] # This is then odd numbers
This diff is collapsed.
# processing_lib.py: Module with generic functions for processing
# LOFAR2 station data
#
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
""" module processing_lib
This module contains generic functions for processing LOFAR2 station data.
"""
# import h5py
# from os import listdir
# from os.path import isfile, join
import datetime
import numpy as np
# from mpl_toolkits.mplot3d import axes3d
# import matplotlib.pyplot as plt
cst_n_sub = 512 # number of subbands as output of subband generation in firmware
cst_fs = 100e6 # sampling frequency in Hz
bandlims = {'LB': [0, 100e6], 'HB1': [100e6, 200e6], 'HB2': [200e6, 300e6]}
LB_aliases = ['LB1', 'LB2']
def signal_index_2_processing_node_index(signal_index):
'''
Convert signal index to processing node index.
Can be used to determine on which processing node the signal is processed.
'''
return (signal_index % 192)//12
def signal_index_2_processing_node_input_index(signal_index):
'''
Convert signal index to processing node input index.
Can be used to determine wich input to the processing node is used for the signal.
'''
return (signal_index % 192) % 12
def signal_index_2_uniboard_index(signal_index):
'''
Convert signal index to UniBoard index.
Can be used to determine on which UniBoard the signal is processed.
'''
return signal_index//48
def signal_index_2_rcu_index(signal_index):
'''q
Convert signal index to RCU index.
Can be used to determine which RCU is used to provide the signal.
'''
return (signal_index//6)*2 + signal_index % 2
def signal_index_2_rcu_input_index(signal_index):
'''
Convert signal index to RCU input index.
Can be used to determine which input to the RCU is used for the signal.
'''
return (signal_index//2) % 3
def signal_index_2_aps_index(signal_index):
'''
Convert signal index to APS index.
Can be used to determine in which Antenna Processing Subrack this signal is
processed.
'''
return signal_index//96
def signal_input_2_rcu_index_and_rcu_input_index(signal_index):
'''
Given the gsi, get the RCU index and the RCU input index in one call.
'''
return (signal_index_2_rcu_index(signal_index),
signal_index_2_rcu_input_index(signal_index))
# the inverse:
def rcu_index_2_signal_index(rcu_index):
'''
Convert RCU index to signal indices.
'''
return rcu_index_and_rcu_input_index_2_signal_index(rcu_index)
def rcu_index_and_rcu_input_index_2_signal_index(rcu_index,
rcu_input_index=np.array([0, 1, 2])):
'''
Convert RCU index and RCU input index to signal index.
'''
return rcu_index + rcu_input_index * 2 + (rcu_index//2) * 4
# backward compatible methods:
def gsi_2_rcu_input_index(gsi):
print('Warning! Do not use gsi_2_rcu_input_index().')
print('Use signal_index_2_rcu_input_index() instead')
return signal_index_2_rcu_input_index(gsi)
def gsi_2_rcu_index(gsi):
print('Warning! Do not use gsi_2_rcu_index().')
print('Use signal_index_2_rcu_index() instead')
return signal_index_2_rcu_index(gsi)
def gsi_2_rcu_index_and_rcu_input_index(gsi):
print('Warning! Do not use gsi_2_rcu_index_and_rcu_input_index().')
print('Use signal_input_2_rcu_index_and_rcu_input_index() instead')
return signal_input_2_rcu_index_and_rcu_input_index(gsi)
def rcu_index_2_gsi(rcu_index):
print('Warning! Do not use rcu_index_2_gsi().')
print('Use rcu_index_2_signal_index() instead')
return rcu_index_2_signal_index(rcu_index)
def rcu_index_and_rcu_input_index_2_gsi(rcu_index, rcu_input_index=np.array([0, 1, 2])):
print('Warning! Do not use rcu_index_and_rcu_input_index_2_gsi().')
print('Use rcu_index_and_rcu_input_index_2_signal_index() instead')
return rcu_index_and_rcu_input_index_2_signal_index(rcu_index, rcu_input_index)
def test_rcu_index_2_gsi():
test_input = []
test_output = []
# test 1
test_input.append(np.array([0]))
test_output.append(np.array([0, 2, 4]))
# test 2
test_input.append(np.array([1]))
test_output.append(np.array([1, 3, 5]))
for input_rcu_index, expected_output in zip(test_input, test_output):
actual_output = rcu_index_2_gsi(input_rcu_index)
if not np.all(expected_output == actual_output):
print(expected_output)
print(actual_output)
raise Exception('Tests fails!')
def test_rcu_index_and_rcu_input_index_2_gsi():
test_input = []
test_output = []
# test 1
test_input.append([np.array([0]), np.array([0])])
test_output.append(np.array([0]))
# test 2
test_input.append([np.array([0]), np.array([0, 1, 2])])
test_output.append(np.array([0, 2, 4]))
for inputs, expected_output in zip(test_input, test_output):
input_rcu_index = inputs[0]
input_rcu_input_index = inputs[1]
actual_output = rcu_index_and_rcu_input_index_2_gsi(input_rcu_index,
input_rcu_input_index)
if not np.all(expected_output == actual_output):
print(expected_output)
print(actual_output)
raise Exception('Tests fails!')
def test_gsi_2_rcu_index_and_rcu_input_index():
test_input = []
test_output = []
# test 1
test_input.append(np.array([0]))
test_output.append((np.array([0]), np.array([0])))
# test 2
test_input.append(np.array([1]))
test_output.append((np.array([1]), np.array([0])))
for input_signal_index, expected_output in zip(test_input, test_output):
actual_output = gsi_2_rcu_index_and_rcu_input_index(input_signal_index)
print(actual_output)
print(expected_output)
if not np.all(expected_output == actual_output):
print(expected_output)
print(actual_output)
raise Exception('Tests fails!')
def get_timestamp(f=None):
'''
Get the timestamp in standard format
Input argument:
f = format specifier.
iso format (default)
'filename': filename without spaces and special characters,
format: yyyymmddThhmmss
Output argument:
timestamp = timestamp in correct format
'''
timestamp = datetime.datetime.isoformat(datetime.datetime.now())
if f == 'filename':
timestamp = datetime.datetime.now().strftime("%Y%m%dT%H%M%S")
return timestamp
def log(string):
print(f'{get_timestamp()} : {string}')
def get_valid_band_names():
band_names = []
for band_name in bandlims:
band_names.append(band_name)
return band_names
def get_band_name_and_subband_index_for_frequency(freqs):
'''
Get band name and subband index for one or more frequencies.
Input arguments:
freqs = one or more frequencies in Hz
bandlims = dict with band names and their limits (default: LOFAR2 bands)
Output arguments:
band_names = list of band names (see get_valid_band_names)
sbi = one or more indices of the subbands for which the frequencies
should be returned
'''
# make sure freqs is a list
if not isinstance(freqs, list):
freqs = [freqs]
# get band name and subband index
freqs = np.array(freqs)
band_names = []
for freq in freqs:
band_name = get_band_name_for_frequency(freq)
band_names.append(band_name)
# get indices for band_name 'LB'
sbis = np.round(freqs*cst_n_sub/cst_fs)
# and update for the other bands:
for idx in np.where(np.array(band_names) == 'HB1')[0]:
sbis[idx] = 2*cst_n_sub - sbis[idx]
for idx in np.where(np.array(band_names) == 'HB2')[0]:
sbis[idx] = sbis[idx] - 2*cst_n_sub
return band_names, sbis
def get_band_name_for_frequency(freq, bandlims=bandlims):
'''
Get band name for frequency
Input arguments:
freq = one frequency in Hz, float
bandlims = dict with band names and their limits (default: LOFAR2 bands)
Output arguments:
band_name = name of analog band (see get_valid_band_names),
empty string ('') if unsuccessful
'''
for band_name in bandlims:
if np.min(bandlims[band_name]) <= freq and freq <= np.max(bandlims[band_name]):
return band_name
return ''
def get_frequency_for_band_name(band_name):
'''
Get all frequencies for given band name
Input arguments:
band_name = name of analog band (see get_valid_band_names)
Output arguments:
freqs = frequencies in Hz for the selected band
'''
return get_frequency_for_band_name_and_subband_index(band_name, sbi=np.arange(512))
def get_frequency_for_band_name_and_subband_index(band_name, sbi):
'''
Get frequency for band name and subband(s)
Input arguments:
band_name = name of analog band (see get_valid_band_names)
sbi = one or more indices of the subbands for which the frequencies
should be returned
Output arguments:
freqs = frequencies in Hz for the selected band and subband
'''
# check input
valid_band_names = get_valid_band_names()
if band_name not in valid_band_names and band_name not in LB_aliases:
raise Exception('Unknown band_name "%s". Valid band_names are: %s' %
(band_name, valid_band_names))
# generate frequencies
freqs = sbi*cst_fs/cst_n_sub
if band_name == 'HB1':
return 200e6 - freqs
elif band_name == 'HB2':
return 200e6 + freqs
# else: # LB
return freqs
def get_band_names(RCU_band_select_R, RCU_PCB_version_R,
LB_tags=['RCU2L'], HB_tags=['RCU2H']):
'''
Get the frequency band names per receiver, based on their band selection
and RCU PCB name.
PCB name is used to determine the band (Low Band 'LB' or High Band 'HB',
None otherwise). The band selection is added as a postfix.
RCU PCB version can also be used, but then the default tags will not be sufficient
Input Arguments:
RCU_band_select_R = as returned by recv.RCU_band_select_R
RCU_PCB_version_R = as returned by recv.RCU_PCB_version_R
LB_tags = substrings in RCU_PCB_version_R fields to indicate Low Band
HB_tags = substrings in RCU_PCB_version_R fields to indicate High Band
Note:
RCU_PCB_version_R can also hold IDs as returned by recv.RCU_PCB_ID_R
In that case, the LB_tags and HB_tags should be passed on by the user
Output Arguments:
band_name = list of strings indicating the band name per receiver.
containts None if no band name could be determined
'''
# if IDs are passed on, convert to list of strings
if isinstance(RCU_PCB_version_R, np.ndarray):
RCU_PCB_version_R = [str(x) for x in RCU_PCB_version_R]
#
signal_indices = np.arange(RCU_band_select_R.shape[0] * RCU_band_select_R.shape[1])
band_name_per_signal_index = [None]*len(signal_indices)
for rcu_index in range(len(RCU_band_select_R)):
for rcu_input_index in range(len(RCU_band_select_R[rcu_index])):
signal_index = rcu_index_and_rcu_input_index_2_gsi(rcu_index,
rcu_input_index)
this_band = None
for tag in LB_tags:
if tag in RCU_PCB_version_R[rcu_index]:
this_band = 'LB%1s' % RCU_band_select_R[rcu_index][rcu_input_index]
break
if this_band is None: # continue with high band
for tag in HB_tags:
if tag in RCU_PCB_version_R[rcu_index]:
this_band = 'HB%1s' % (
RCU_band_select_R[rcu_index][rcu_input_index])
break
band_name_per_signal_index[signal_index] = this_band
return band_name_per_signal_index
# some basic test methods, for internal use:
def test_get_band_name_and_subband_index_for_frequency():
input_frequencies = [138.1e6, 137.9e6, 0.1, 0, 195312.5, 234e6]
expected_band_names = ['HB1', 'HB1', 'LB', 'LB', 'LB', 'HB2']
expected_subband_index = np.array([317., 318., 0., 0., 1., 174.])
actual_band_names, actual_subband_index = \
get_band_name_and_subband_index_for_frequency(input_frequencies)
if not expected_band_names == actual_band_names:
print(expected_band_names)
print(actual_band_names)
raise Exception('Tests fails!')
if not (expected_subband_index == actual_subband_index).all():
print(expected_subband_index)
print(actual_subband_index)
raise Exception('Tests fails!')
def test_get_frequency_for_band_name_and_subband_index():
for input_band_name, input_subband_index, expected_frequencies in zip(
['HB1', 'HB1', 'LB', 'LB', 'LB', 'HB2', 'LB1'],
[195., 194., 0., 0., 1., 174., 1.],
[161914062.5, 162109375.0, 0., 0., 195312.5, 233984375, 195312.5]):
actual_frequencies = get_frequency_for_band_name_and_subband_index(
input_band_name, input_subband_index)
if not expected_frequencies == actual_frequencies:
print(expected_frequencies)
print(actual_frequencies)
raise Exception('Tests fails!')
def test():
test_rcu_index_2_gsi()
test_rcu_index_and_rcu_input_index_2_gsi()
test_gsi_2_rcu_index_and_rcu_input_index()
test_get_band_name_and_subband_index_for_frequency()
test_get_frequency_for_band_name_and_subband_index()
def main():
return False
if __name__ == '__main__':
exit(main())
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment