#!/usr/bin/env python3 from datetime import datetime import sys import os import math import numpy as np import json import signal import matplotlib import matplotlib.pyplot as plt from matplotlib.widgets import Button, CheckButtons from matplotlib.collections import LineCollection from matplotlib.ticker import FuncFormatter from matplotlib.dates import HourLocator, date2num, num2date, AutoDateLocator from matplotlib.figure import Figure from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QComboBox from PyQt5.QtCore import QSocketNotifier, Qt from PyQt5 import QtGui from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar import select import time from datetime import datetime from argparse import ArgumentParser # make this a cli option last_seconds = 10 * 60 class PowerPlot: """A power plot with two time scales""" def __init__(self, fig): self.fig = fig self.ax = fig.add_subplot() self.ax.set_xlabel("Time (UTC)") self.ax.set_ylabel("Frequency") self.ax.xaxis_date() self.min, self.max = None, None self.ax.xaxis.set_major_formatter( FuncFormatter(lambda x, pos: f"{num2date(x):%H:%M}") ) fig.suptitle(f"Maximum frequency", fontsize=16) fig.tight_layout() (self.totalplot,) = self.ax.plot([], [], "-", color="blue") (self.totalplothighlight,) = self.ax.plot([], [], ".", color="red", markersize=10) def set_totalplot_values(self, xdata, ydata): self.totalplot.set_data(xdata, ydata) self.totalplothighlight.set_data(xdata[-1], ydata[-1]) if self.min is None or ydata[-1] < self.min: self.min = ydata[-1] if self.max is None or ydata[-1] > self.max: self.max = ydata[-1] self.ax.autoscale_view(True, True, True) def do_autoscale_y(self): self.ax.set_ylim( self.min - (self.max - self.min) * 0.1 - 0.1, self.max + (self.max - self.min) * 0.1 + 0.1, ) class MyToolbar(NavigationToolbar): """Custom toolbar that sets the 'autoscale' member of its owner""" def __init__(self, canvas, powerplotwindow, parent=None): super(MyToolbar, self).__init__(canvas, parent) self.powerplotwindow = powerplotwindow def home(self, *args): super().home(*args) self.powerplotwindow.autoscale_y = True self.powerplotwindow.autoscale_x = True def release_zoom(self, event): super().release_zoom(event) modifiers = event.guiEvent.modifiers() self.powerplotwindow.autoscale_y = False if modifiers != Qt.ShiftModifier: self.powerplotwindow.autoscale_x = False class PowerPlotMainWindow(QMainWindow): """QT Application that reads data from stdin, plots in a PowerPlot""" def __init__(self): super().__init__() self.setStyleSheet("background-color: white;") self.autoscale_x = True self.autoscale_y = True self.setWindowTitle("Dwingeloo Radio Telescope") self.setGeometry(100, 100, 800, 600) central_widget = QWidget(self) layout = QVBoxLayout(central_widget) # Get metadata header_lines = [] while True: line = sys.stdin.readline() header_lines.append(line) if line[0] != "#": column_names = line.split() break line = sys.stdin.readline() values = line.split(",") self.time_dt = np.array(date2num(datetime.utcfromtimestamp(float(values[0])))) self.last_x_buffer = np.empty(last_seconds + 1) self.last_x_buffer[:] = np.nan self.min_range = np.arange(-last_seconds, 1, 1) self.data = np.array([float(values[1])]) self.start_time = datetime(2023, 12, 13, 9, 15) self.stop_time = datetime(2023, 12, 13, 11, 30) plt.ion() self.output_counter = 0 now = time.time() self.last_update = now fig = Figure(figsize=(8, 6), dpi=75) canvas = FigureCanvas(fig) layout.addWidget(canvas) self.setCentralWidget(central_widget) toolbar = MyToolbar(canvas, self) toolbar.setIconSize(toolbar.iconSize() * 0.75) layout.addWidget(toolbar) # fig.canvas.manager.set_window_title("Dwingeloo Radio Telescope") self.powerplot = PowerPlot(fig) self.read_stdin() self.stdin_notifier = QSocketNotifier( sys.stdin.fileno(), QSocketNotifier.Read, self ) self.stdin_notifier.activated.connect(self.read_stdin) def read_stdin(self): ready_to_read, _, _ = select.select([sys.stdin], [], [], 0) for file in ready_to_read: line = file.readline().strip() if len(line) == 0 or line[0] == "#": continue self.output_counter += 1 values = line.split(",") time_now = float(values[0]) power_now = float(values[1]) self.time_dt = np.append(self.time_dt, date2num(datetime.utcfromtimestamp(time_now))) self.data = np.append(self.data, power_now) self.last_x_buffer = np.roll(self.last_x_buffer, -1) self.last_x_buffer[-1] = power_now self.powerplot.set_totalplot_values(self.time_dt, self.data) now = time.time() if self.autoscale_x: self.powerplot.ax.set_xlim(self.start_time, self.stop_time) if self.autoscale_y: self.powerplot.do_autoscale_y() # check if catching up on old data (set to 0.5 then) if ((now - self.last_update) > 0.5) or (self.output_counter % 15 == 0): self.powerplot.fig.canvas.draw() self.powerplot.fig.canvas.flush_events() self.last_update = now def handle_interrupt_signal(signum, frame): print("Ctrl+C pressed. Handling interrupt signal.") QApplication.quit() sys.exit(0) if __name__ == "__main__": app = QApplication(sys.argv) icon_path = os.path.join( os.path.dirname(os.path.realpath(__file__)), "sag_a_icon.png" ) try: app.setWindowIcon(QtGui.QIcon(icon_path)) except: pass main_window = PowerPlotMainWindow() main_window.show() signal.signal(signal.SIGINT, handle_interrupt_signal) sys.exit(app.exec_())