Select Git revision
schedulechecker.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
power_plot.py 7.09 KiB
#!/usr/bin/env python3
from datetime import datetime, timedelta
import sys
import os
import math
import numpy as np
import json
import signal
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.widgets import Button, CheckButtons
from matplotlib.collections import LineCollection
from matplotlib.dates import date2num, num2date, AutoDateFormatter
from matplotlib.figure import Figure
from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QComboBox
from PyQt5.QtCore import QSocketNotifier, Qt
from PyQt5 import QtGui
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
import select
import time
from datetime import datetime
from argparse import ArgumentParser
class PowerPlot:
"""A power plot with two time scales"""
def __init__(self, fig, yname, offset=None):
self.fig = fig
self.ax = fig.add_subplot()
self.ax.set_xlabel("Time (UTC)")
self.ax.set_ylabel(yname)
self.ax.xaxis_date()
formatter = AutoDateFormatter(self.ax.xaxis.get_major_locator())
formatter.scaled[1 / (24 * 60)] = "%H:%M"
self.ax.xaxis.set_major_formatter(formatter)
if offset is not None:
self.ax.yaxis.get_major_formatter().set_useOffset(offset)
self.min, self.max = None, None
fig.suptitle(yname, fontsize=16)
fig.tight_layout()
(self.totalplot,) = self.ax.plot([], [], ".", color="blue")
(self.totalplothighlight,) = self.ax.plot(
[], [], ".", color="red", markersize=10
)
def set_totalplot_values(self, xdata, ydata):
self.totalplot.set_data(xdata, ydata)
self.totalplothighlight.set_data(xdata[-1], ydata[-1])
if self.min is None or ydata[-1] < self.min:
self.min = ydata[-1]
if self.max is None or ydata[-1] > self.max:
self.max = ydata[-1]
self.ax.autoscale_view(True, True, True)
def do_autoscale_y(self):
self.ax.set_ylim(
self.min - (self.max - self.min) * 0.1 - 0.1,
self.max + (self.max - self.min) * 0.1 + 0.1,
)
class MyToolbar(NavigationToolbar):
"""Custom toolbar that sets the 'autoscale' member of its owner"""
def __init__(self, canvas, powerplotwindow, parent=None):
super(MyToolbar, self).__init__(canvas, parent)
self.powerplotwindow = powerplotwindow
def home(self, *args):
super().home(*args)
self.powerplotwindow.autoscale_y = True
self.powerplotwindow.autoscale_x = True
def release_zoom(self, event):
super().release_zoom(event)
modifiers = event.guiEvent.modifiers()
self.powerplotwindow.autoscale_y = False
if modifiers != Qt.ShiftModifier:
self.powerplotwindow.autoscale_x = False
class PowerPlotMainWindow(QMainWindow):
"""QT Application that reads data from stdin, plots in a PowerPlot"""
def __init__(self, offset=None, col_num=1):
super().__init__()
self.setStyleSheet("background-color: white;")
self.autoscale_x = True
self.autoscale_y = True
self.col_num = col_num
self.setWindowTitle("Dwingeloo Radio Telescope")
self.setGeometry(100, 100, 800, 600)
central_widget = QWidget(self)
layout = QVBoxLayout(central_widget)
# Get metadata and ignore it
header_lines = []
while True:
# TODO: this is blocking and freezes the UI, could
# be refactored to use the QT notification mechanism
line = sys.stdin.readline()
if len(line.strip()) == 0:
continue
header_lines.append(line)
if line[0] != "#":
column_names = [name.strip() for name in line.split()]
break
yname = column_names[self.col_num]
yname = yname.rstrip(",").capitalize()
line = sys.stdin.readline()
values = line.split(",")
first_time = datetime.utcfromtimestamp(float(values[0]))
self.time_dt = np.array(date2num(first_time))
self.data = np.array([float(values[self.col_num])])
self.start_time = first_time
self.stop_time = first_time + timedelta(hours=2)
plt.ion()
self.output_counter = 0
now = time.time()
self.last_update = now
fig = Figure(figsize=(8, 6), dpi=75)
canvas = FigureCanvas(fig)
layout.addWidget(canvas)
self.setCentralWidget(central_widget)
toolbar = MyToolbar(canvas, self)
toolbar.setIconSize(toolbar.iconSize() * 0.75)
layout.addWidget(toolbar)
# fig.canvas.manager.set_window_title("Dwingeloo Radio Telescope")
self.powerplot = PowerPlot(fig, yname, offset=offset)
self.read_stdin()
self.stdin_notifier = QSocketNotifier(
sys.stdin.fileno(), QSocketNotifier.Read, self
)
self.stdin_notifier.activated.connect(self.read_stdin)
def read_stdin(self):
ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
for file in ready_to_read:
line = file.readline().strip()
if len(line) == 0 or line[0] == "#":
continue
self.output_counter += 1
values = line.split(",")
if not values[0][0].isnumeric():
print("Skipping line: " + line.strip())
print(values[0], values[0].isnumeric())
continue
time_now = float(values[0])
power_now = float(values[self.col_num])
self.time_dt = np.append(
self.time_dt, date2num(datetime.utcfromtimestamp(time_now))
)
self.data = np.append(self.data, power_now)
self.powerplot.set_totalplot_values(self.time_dt, self.data)
now = time.time()
if self.autoscale_x:
self.powerplot.ax.set_xlim(self.start_time, self.stop_time)
if self.autoscale_y:
self.powerplot.do_autoscale_y()
# check if catching up on old data (set to 0.5 then)
if ((now - self.last_update) > 0.5) or (self.output_counter % 15 == 0):
self.powerplot.fig.canvas.draw()
self.powerplot.fig.canvas.flush_events()
self.last_update = now
def handle_interrupt_signal(signum, frame):
print("Ctrl+C pressed. Handling interrupt signal.")
QApplication.quit()
sys.exit(0)
if __name__ == "__main__":
parser = ArgumentParser(description="Read files and numbers from stdin and plot them")
parser.add_argument("-o", "--offset", help="Offset along y-axis", type=float, default=None)
args = parser.parse_args()
app = QApplication(sys.argv)
icon_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "sag_a_icon.png"
)
try:
app.setWindowIcon(QtGui.QIcon(icon_path))
except:
pass
main_window = PowerPlotMainWindow(offset=args.offset)
main_window.show()
signal.signal(signal.SIGINT, handle_interrupt_signal)
sys.exit(app.exec_())