Skip to content
Snippets Groups Projects
Select Git revision
  • master
  • sdptr_lift
  • v1.5.0
  • v1.4.0
  • v1.3.0
  • v1.2.0
  • v1.1.2
  • v1.1.1
  • v1.1.0
  • v1.0.0
10 results

generic.sh

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    power_plot.py 6.45 KiB
    #!/usr/bin/env python3
    
    from datetime import datetime
    
    import sys
    import os
    import math
    import numpy as np
    import json
    
    import signal
    
    import matplotlib
    import matplotlib.pyplot as plt
    from matplotlib.widgets import Button, CheckButtons
    from matplotlib.collections import LineCollection
    from matplotlib.ticker import FuncFormatter
    from matplotlib.dates import HourLocator, date2num, num2date, AutoDateLocator
    from matplotlib.figure import Figure
    
    from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QComboBox
    from PyQt5.QtCore import QSocketNotifier, Qt
    from PyQt5 import QtGui
    from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
    from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
    
    import select
    
    
    import time
    from datetime import datetime
    
    from argparse import ArgumentParser
    
    # make this a cli option
    last_seconds = 10 * 60
    
    
    class PowerPlot:
        """A power plot with two time scales"""
    
        def __init__(self, fig):
            self.fig = fig
            self.ax = fig.add_subplot()
            self.ax.set_xlabel("Time (UTC)")
            self.ax.set_ylabel("Frequency")
            self.ax.xaxis_date()
            self.min, self.max = None, None
            self.ax.xaxis.set_major_formatter(
                FuncFormatter(lambda x, pos: f"{num2date(x):%H:%M}")
            )
            fig.suptitle(f"Maximum frequency", fontsize=16)
            fig.tight_layout()
            (self.totalplot,) = self.ax.plot([], [], "-", color="blue")
            (self.totalplothighlight,) = self.ax.plot(
                [], [], ".", color="red", markersize=10
            )
    
        def set_totalplot_values(self, xdata, ydata):
            self.totalplot.set_data(xdata, ydata)
            self.totalplothighlight.set_data(xdata[-1], ydata[-1])
            if self.min is None or ydata[-1] < self.min:
                self.min = ydata[-1]
            if self.max is None or ydata[-1] > self.max:
                self.max = ydata[-1]
            self.ax.autoscale_view(True, True, True)
    
        def do_autoscale_y(self):
            self.ax.set_ylim(
                self.min - (self.max - self.min) * 0.1 - 0.1,
                self.max + (self.max - self.min) * 0.1 + 0.1,
            )
    
    
    class MyToolbar(NavigationToolbar):
        """Custom toolbar that sets the 'autoscale' member of its owner"""
    
        def __init__(self, canvas, powerplotwindow, parent=None):
            super(MyToolbar, self).__init__(canvas, parent)
            self.powerplotwindow = powerplotwindow
    
        def home(self, *args):
            super().home(*args)
            self.powerplotwindow.autoscale_y = True
            self.powerplotwindow.autoscale_x = True
    
        def release_zoom(self, event):
            super().release_zoom(event)
            modifiers = event.guiEvent.modifiers()
            self.powerplotwindow.autoscale_y = False
            if modifiers != Qt.ShiftModifier:
                self.powerplotwindow.autoscale_x = False
    
    
    class PowerPlotMainWindow(QMainWindow):
        """QT Application that reads data from stdin, plots in a PowerPlot"""
    
        def __init__(self):
            super().__init__()
            self.setStyleSheet("background-color: white;")
            self.autoscale_x = True
            self.autoscale_y = True
            self.setWindowTitle("Dwingeloo Radio Telescope")
            self.setGeometry(100, 100, 800, 600)
    
            central_widget = QWidget(self)
            layout = QVBoxLayout(central_widget)
    
            # Get metadata
            header_lines = []
            while True:
                line = sys.stdin.readline()
                header_lines.append(line)
                if line[0] != "#":
                    column_names = line.split()
                    break
    
            line = sys.stdin.readline()
            values = line.split(",")
            self.time_dt = np.array(date2num(datetime.utcfromtimestamp(float(values[0]))))
    
            self.last_x_buffer = np.empty(last_seconds + 1)
            self.last_x_buffer[:] = np.nan
    
            self.min_range = np.arange(-last_seconds, 1, 1)
    
            self.data = np.array([float(values[1])])
    
            self.start_time = datetime(2023, 12, 13, 9, 15)
            self.stop_time = datetime(2023, 12, 13, 11, 30)
    
            plt.ion()
    
            self.output_counter = 0
    
            now = time.time()
            self.last_update = now
    
            fig = Figure(figsize=(8, 6), dpi=75)
    
            canvas = FigureCanvas(fig)
            layout.addWidget(canvas)
            self.setCentralWidget(central_widget)
    
            toolbar = MyToolbar(canvas, self)
            toolbar.setIconSize(toolbar.iconSize() * 0.75)
            layout.addWidget(toolbar)
    
            # fig.canvas.manager.set_window_title("Dwingeloo Radio Telescope")
    
            self.powerplot = PowerPlot(fig)
    
            self.read_stdin()
            self.stdin_notifier = QSocketNotifier(
                sys.stdin.fileno(), QSocketNotifier.Read, self
            )
            self.stdin_notifier.activated.connect(self.read_stdin)
    
        def read_stdin(self):
            ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
            for file in ready_to_read:
                line = file.readline().strip()
                if len(line) == 0 or line[0] == "#":
                    continue
                self.output_counter += 1
                values = line.split(",")
                time_now = float(values[0])
                power_now = float(values[1])
    
                self.time_dt = np.append(
                    self.time_dt, date2num(datetime.utcfromtimestamp(time_now))
                )
                self.data = np.append(self.data, power_now)
    
                self.last_x_buffer = np.roll(self.last_x_buffer, -1)
                self.last_x_buffer[-1] = power_now
    
                self.powerplot.set_totalplot_values(self.time_dt, self.data)
    
                now = time.time()
    
                if self.autoscale_x:
                    self.powerplot.ax.set_xlim(self.start_time, self.stop_time)
                if self.autoscale_y:
                    self.powerplot.do_autoscale_y()
    
                # check if catching up on old data (set to 0.5 then)
                if ((now - self.last_update) > 0.5) or (self.output_counter % 15 == 0):
                    self.powerplot.fig.canvas.draw()
                    self.powerplot.fig.canvas.flush_events()
                self.last_update = now
    
    
    def handle_interrupt_signal(signum, frame):
        print("Ctrl+C pressed. Handling interrupt signal.")
        QApplication.quit()
        sys.exit(0)
    
    
    if __name__ == "__main__":
        app = QApplication(sys.argv)
        icon_path = os.path.join(
            os.path.dirname(os.path.realpath(__file__)), "sag_a_icon.png"
        )
        try:
            app.setWindowIcon(QtGui.QIcon(icon_path))
        except:
            pass
        main_window = PowerPlotMainWindow()
        main_window.show()
        signal.signal(signal.SIGINT, handle_interrupt_signal)
        sys.exit(app.exec_())