Select Git revision
power_plot.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
power_plot.py 6.45 KiB
#!/usr/bin/env python3
from datetime import datetime
import sys
import os
import math
import numpy as np
import json
import signal
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.widgets import Button, CheckButtons
from matplotlib.collections import LineCollection
from matplotlib.ticker import FuncFormatter
from matplotlib.dates import HourLocator, date2num, num2date, AutoDateLocator
from matplotlib.figure import Figure
from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QComboBox
from PyQt5.QtCore import QSocketNotifier, Qt
from PyQt5 import QtGui
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
import select
import time
from datetime import datetime
from argparse import ArgumentParser
# make this a cli option
last_seconds = 10 * 60
class PowerPlot:
"""A power plot with two time scales"""
def __init__(self, fig):
self.fig = fig
self.ax = fig.add_subplot()
self.ax.set_xlabel("Time (UTC)")
self.ax.set_ylabel("Frequency")
self.ax.xaxis_date()
self.min, self.max = None, None
self.ax.xaxis.set_major_formatter(
FuncFormatter(lambda x, pos: f"{num2date(x):%H:%M}")
)
fig.suptitle(f"Maximum frequency", fontsize=16)
fig.tight_layout()
(self.totalplot,) = self.ax.plot([], [], "-", color="blue")
(self.totalplothighlight,) = self.ax.plot(
[], [], ".", color="red", markersize=10
)
def set_totalplot_values(self, xdata, ydata):
self.totalplot.set_data(xdata, ydata)
self.totalplothighlight.set_data(xdata[-1], ydata[-1])
if self.min is None or ydata[-1] < self.min:
self.min = ydata[-1]
if self.max is None or ydata[-1] > self.max:
self.max = ydata[-1]
self.ax.autoscale_view(True, True, True)
def do_autoscale_y(self):
self.ax.set_ylim(
self.min - (self.max - self.min) * 0.1 - 0.1,
self.max + (self.max - self.min) * 0.1 + 0.1,
)
class MyToolbar(NavigationToolbar):
"""Custom toolbar that sets the 'autoscale' member of its owner"""
def __init__(self, canvas, powerplotwindow, parent=None):
super(MyToolbar, self).__init__(canvas, parent)
self.powerplotwindow = powerplotwindow
def home(self, *args):
super().home(*args)
self.powerplotwindow.autoscale_y = True
self.powerplotwindow.autoscale_x = True
def release_zoom(self, event):
super().release_zoom(event)
modifiers = event.guiEvent.modifiers()
self.powerplotwindow.autoscale_y = False
if modifiers != Qt.ShiftModifier:
self.powerplotwindow.autoscale_x = False
class PowerPlotMainWindow(QMainWindow):
"""QT Application that reads data from stdin, plots in a PowerPlot"""
def __init__(self):
super().__init__()
self.setStyleSheet("background-color: white;")
self.autoscale_x = True
self.autoscale_y = True
self.setWindowTitle("Dwingeloo Radio Telescope")
self.setGeometry(100, 100, 800, 600)
central_widget = QWidget(self)
layout = QVBoxLayout(central_widget)
# Get metadata
header_lines = []
while True:
line = sys.stdin.readline()
header_lines.append(line)
if line[0] != "#":
column_names = line.split()
break
line = sys.stdin.readline()
values = line.split(",")
self.time_dt = np.array(date2num(datetime.utcfromtimestamp(float(values[0]))))
self.last_x_buffer = np.empty(last_seconds + 1)
self.last_x_buffer[:] = np.nan
self.min_range = np.arange(-last_seconds, 1, 1)
self.data = np.array([float(values[1])])
self.start_time = datetime(2023, 12, 13, 9, 15)
self.stop_time = datetime(2023, 12, 13, 11, 30)
plt.ion()
self.output_counter = 0
now = time.time()
self.last_update = now
fig = Figure(figsize=(8, 6), dpi=75)
canvas = FigureCanvas(fig)
layout.addWidget(canvas)
self.setCentralWidget(central_widget)
toolbar = MyToolbar(canvas, self)
toolbar.setIconSize(toolbar.iconSize() * 0.75)
layout.addWidget(toolbar)
# fig.canvas.manager.set_window_title("Dwingeloo Radio Telescope")
self.powerplot = PowerPlot(fig)
self.read_stdin()
self.stdin_notifier = QSocketNotifier(
sys.stdin.fileno(), QSocketNotifier.Read, self
)
self.stdin_notifier.activated.connect(self.read_stdin)
def read_stdin(self):
ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
for file in ready_to_read:
line = file.readline().strip()
if len(line) == 0 or line[0] == "#":
continue
self.output_counter += 1
values = line.split(",")
time_now = float(values[0])
power_now = float(values[1])
self.time_dt = np.append(
self.time_dt, date2num(datetime.utcfromtimestamp(time_now))
)
self.data = np.append(self.data, power_now)
self.last_x_buffer = np.roll(self.last_x_buffer, -1)
self.last_x_buffer[-1] = power_now
self.powerplot.set_totalplot_values(self.time_dt, self.data)
now = time.time()
if self.autoscale_x:
self.powerplot.ax.set_xlim(self.start_time, self.stop_time)
if self.autoscale_y:
self.powerplot.do_autoscale_y()
# check if catching up on old data (set to 0.5 then)
if ((now - self.last_update) > 0.5) or (self.output_counter % 15 == 0):
self.powerplot.fig.canvas.draw()
self.powerplot.fig.canvas.flush_events()
self.last_update = now
def handle_interrupt_signal(signum, frame):
print("Ctrl+C pressed. Handling interrupt signal.")
QApplication.quit()
sys.exit(0)
if __name__ == "__main__":
app = QApplication(sys.argv)
icon_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "sag_a_icon.png"
)
try:
app.setWindowIcon(QtGui.QIcon(icon_path))
except:
pass
main_window = PowerPlotMainWindow()
main_window.show()
signal.signal(signal.SIGINT, handle_interrupt_signal)
sys.exit(app.exec_())