From 43249ebf794b20630230d77ba6ec6eacb1d49174 Mon Sep 17 00:00:00 2001
From: Eric Kooistra <kooistra@astron.nl>
Date: Thu, 7 Dec 2023 13:43:41 +0100
Subject: [PATCH] Add hilbert_response()

---
 applications/lofar2/model/pfb_os/dsp.py | 103 ++++++++++++++++++++++--
 1 file changed, 97 insertions(+), 6 deletions(-)

diff --git a/applications/lofar2/model/pfb_os/dsp.py b/applications/lofar2/model/pfb_os/dsp.py
index c30de5e7f0..c812a97628 100644
--- a/applications/lofar2/model/pfb_os/dsp.py
+++ b/applications/lofar2/model/pfb_os/dsp.py
@@ -58,10 +58,15 @@ def pow_db(volts):
 
 
 def is_even(n):
-    """Return True if n is even, else False when odd."""
+    """Return True if n >= 0 is even, else False when odd."""
     return n % 2 == 0
 
 
+def is_odd(n):
+    """Return True if n >= 0 is odd, else False when even."""
+    return n % 2 == 1
+
+
 def is_symmetrical(x, anti=False):
     """Return True when x[n] = +-x[N-1 - n], within tolerances, else False."""
     rtol = c_rtol
@@ -90,21 +95,53 @@ def read_coefficients_file(filepathname):
     return coefs
 
 
+def one_bit_quantizer(x):
+    """Quantize 0 and positive x to +1 and negative x to -1."""
+    return np.signbit(x) * -1 + 2
+
+
+def impulse_at_zero_crossing(x):
+    """Create signed impulse at zero crossings of x."""
+    diff = np.diff(one_bit_quantizer(x))
+    return np.concatenate((np.array([0]), diff))
+
+
 ###############################################################################
 # Filter design
 ###############################################################################
 
+def nof_taps_kaiser_window(fs, fpass, fstop, atten_db):
+    """Number of FIR LPF taps using Kaiser window based design
+
+    Reference: [HARRIS 3.2, Fig. 3.8 for beta]
+    """
+    df = fstop - fpass
+    return int((fs / df) * (atten_db - 8) / 14)
+
+
+def nof_taps_remez(fs, fpass, fstop, atten_db):
+    """Number of FIR LPF taps using remez = Parks-McClellan based design.
+
+    Reference: [HARRIS 3.3, LYONS 5.6]
+    """
+    df = fstop - fpass
+    return int((fs / df) * (atten_db / 22))
+
+
 def ideal_low_pass_filter(Npoints, Npass, bandEdgeGain=1.0):
     """Derive FIR coefficients for prototype low pass filter using ifft of
     magnitude frequency response.
 
+    The Npoints defines the double sided spectrum, so Npass = 2 * fpass / fs
+    * Npoints, where fpass is the positive cutoff frequency of the LPF.
+
     Input:
     - Npoints: Number of points of the DFT in the filterbank
-    - Npass: Number of points with gain > 0 in pass band
+    - Npass: Number of points with gain > 0 in pass band.
     - bandEdgeGain : Gain at band edge
     Return:
     - h: FIR coefficients from impulse response
-    . f: normalized frequency axis for HF, fs = 1
+    - f: normalized frequency axis for HF, fs = 1
     - HF: frequency transfer function of h
     """
     # Magnitude frequency reponse
@@ -132,6 +169,8 @@ def fourier_interpolate(HFfilter, Ncoefs):
     time shift of hInterpolated, to make it symmetrical. Similar as done in
     pfs_coeff_final.m and pfir_coeff.m. Use upper = conj(lower), because that
     is easier than using upper from HFfilter.
+
+    Reference: LYONS 13.27, 13.28
     """
     N = len(HFfilter)
     K = N // 2
@@ -173,6 +212,58 @@ def fourier_interpolate(HFfilter, Ncoefs):
     return hInterpolated.real
 
 
+###############################################################################
+# Hilbert transform filter
+###############################################################################
+
+def hilbert_response(Ntaps):
+    """Calculate impulse response of Hilbert filter truncated to Ntaps
+    coefficients.
+
+    h(t) = 1 / (pi t) ( 1 - cos(ws t / 2), l'Hoptial's rule: h(0) = 0
+
+    For n = ...-2, -1, 0, 1, 2, ..., and Ts = 1, and Ntaps is
+    . odd: t = n Ts
+        ht[n] = 1 / (pi n)) ( 1 - cos(pi n))
+              = 0, when n is even
+              = 2 / (pi n), when n is odd
+    . even: t = (n + 0.5) Ts
+        ht(n) = 1 / (pi m)) ( 1 - cos(pi m)), with m = n + 0.5
+    """
+    Npos = Ntaps // 2
+    if is_even(Ntaps):
+        ht = np.zeros(Npos)
+        for n in range(0, Npos):
+            m = n + 0.5
+            ht[n] = 1 / (np.pi * m) * (1 - np.cos(np.pi * m))
+        ht = np.concatenate((np.flip(-ht), ht))
+    else:
+        Npos += 1
+        ht = np.zeros(Npos)
+        for n in range(1, Npos, 2):
+            ht[n] = 2 / (np.pi * n)
+        ht = np.concatenate((np.flip(-ht[1:]), ht))
+    return ht
+
+
+def hilbert_delay(Ntaps):
+    """Delay impulse by (Ntaps - 1) / 2 to align with hilbert_response(Ntaps).
+
+    Analytic signal htComplex = htReal + 1j * htImag where:
+    . htReal = hilbert_delay(Ntaps)
+    . htImag = hilbert_response(Ntaps)
+
+    Only support integer delay D = (Ntaps - 1) / 2, so Ntaps is odd, then return
+    htReal, else return None.
+    """
+    if is_even(Ntaps):
+        return None
+    D = (Ntaps - 1) // 2
+    htReal = np.zeros(Ntaps)
+    htReal[D] = 1.0
+    return htReal
+
+
 ###############################################################################
 # DFT
 ###############################################################################
@@ -221,7 +312,7 @@ def dtft(coefs, Ndtft=None, zeroCenter=True, fftShift=True):
 # Plotting
 ###############################################################################
 
-def plot_time_response(h, markers=False):
+def plot_time_response(h, name='', markers=False):
     """Plot time response (= impulse response, window, FIR filter coefficients).
 
     Input:
@@ -232,7 +323,7 @@ def plot_time_response(h, markers=False):
         plt.plot(h, '-', h, 'o')
     else:
         plt.plot(h, '-')
-    plt.title('Time response')
+    plt.title('Time response %s' % name)
     plt.ylabel('Voltage')
     plt.xlabel('Sample')
     plt.grid(True)
@@ -247,7 +338,7 @@ def plot_spectra(f, HF, fs=1.0, fLim=None, dbLim=None):
     . fs: sample frequency in Hz, scale f by fs, fs >= 1
     """
     Hmag = np.abs(HF)
-    Hphs = np.angle(HF)
+    Hphs = np.unwrap(np.angle(HF))
     Hpow_dB = pow_db(HF)  # power response
     fn = f * fs
     if fs > 1:
-- 
GitLab