Skip to content
Snippets Groups Projects
Commit 368f2c1a authored by Tammo Jan Dijkema's avatar Tammo Jan Dijkema
Browse files

Initial commit

parents
Branches
Tags
No related merge requests found
#!/usr/bin/env python3
from astropy.table import Table
from astropy.time import Time
from astropy.coordinates import ICRS, AltAz, EarthLocation, SkyCoord
import astropy.units as u
import astropy.constants
import sys
import os
import math
import numpy as np
import json
import signal
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.widgets import Button, CheckButtons
from matplotlib.collections import LineCollection
from matplotlib.ticker import FuncFormatter
from matplotlib.dates import HourLocator, date2num, num2date, AutoDateLocator
from matplotlib.figure import Figure
from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QComboBox
from PyQt5.QtCore import QSocketNotifier, Qt
from PyQt5 import QtGui
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
import select
from IPython.display import clear_output
import time
from datetime import datetime
from argparse import ArgumentParser
# make this a cli option
last_seconds = 10 * 60
class PowerPlot:
"""A power plot with two time scales"""
def __init__(self, fig, freq):
self.fig = fig
self.ax1 = fig.add_subplot(2, 1, 1)
self.ax2 = fig.add_subplot(2, 1, 2)
self.ax1.set_xlabel("time (s)")
self.ax1.set_ylabel("power (uncalibrated)")
self.ax2.set_xlabel("time (UTC)")
self.ax2.set_ylabel("power (uncalibrated)")
self.ax2.xaxis_date()
self.min1, self.max1 = None, None
self.min2, self.max2 = None, None
self.ax2.xaxis.set_major_formatter(
FuncFormatter(lambda x, pos: f"{num2date(x):%H:%M}")
)
fig.suptitle(f"Sgr A Occultation ({freq:.0f} MHz)", fontsize=16)
fig.tight_layout()
(self.zoomplot,) = self.ax1.plot([], [], "-", color="blue")
(self.zoomplothighlight,) = self.ax1.plot(
[], [], ".", color="red", markersize=10
)
(self.totalplot,) = self.ax2.plot([], [], "-", color="blue")
(self.totalplothighlight,) = self.ax2.plot(
[], [], ".", color="red", markersize=10
)
def set_zoomplot_values(self, xdata, ydata):
self.zoomplot.set_data(xdata, ydata)
self.zoomplothighlight.set_data(xdata[-1], ydata[-1])
self.min1 = np.nanmin(ydata)
self.max1 = np.nanmax(ydata)
if np.isnan(self.min1) or np.isnan(self.max1):
self.min1 = None
self.max1 = None
self.ax1.autoscale_view(True, True, True)
def set_totalplot_values(self, xdata, ydata):
self.totalplot.set_data(xdata, ydata)
self.totalplothighlight.set_data(xdata[-1], ydata[-1])
if self.min2 is None or ydata[-1] < self.min2:
self.min2 = ydata[-1]
if self.max2 is None or ydata[-1] > self.max2:
self.max2 = ydata[-1]
self.ax2.autoscale_view(True, True, True)
def do_autoscale_y(self):
self.ax1.set_ylim(
self.min1 - (self.max1 - self.min1) * 0.1 - 0.1,
self.max1 + (self.max1 - self.min1) * 0.1 + 0.1,
)
self.ax2.set_ylim(
self.min2 - (self.max2 - self.min2) * 0.1 - 0.1,
self.max2 + (self.max2 - self.min2) * 0.1 + 0.1,
)
class MyToolbar(NavigationToolbar):
"""Custom toolbar that sets the 'autoscale' member of its owner"""
def __init__(self, canvas, powerplotwindow, parent=None):
super(MyToolbar, self).__init__(canvas, parent)
self.powerplotwindow = powerplotwindow
def home(self, *args):
super().home(*args)
self.powerplotwindow.autoscale_y = True
self.powerplotwindow.autoscale_x = True
def release_zoom(self, event):
super().release_zoom(event)
modifiers = event.guiEvent.modifiers()
self.powerplotwindow.autoscale_y = False
if modifiers != Qt.ShiftModifier:
self.powerplotwindow.autoscale_x = False
class PowerPlotMainWindow(QMainWindow):
"""QT Application that reads data from stdin, plots in a PowerPlot"""
def __init__(self):
super().__init__()
self.setStyleSheet("background-color: white;")
self.autoscale_x = True
self.autoscale_y = True
self.setWindowTitle("Dwingeloo Radio Telescope")
self.setGeometry(100, 100, 800, 600)
central_widget = QWidget(self)
layout = QVBoxLayout(central_widget)
# Get metadata
header_lines = []
while True:
line = sys.stdin.readline()
header_lines.append(line)
if line[0] != "#":
column_names = line.split()
break
table = Table.read("".join(header_lines), format="ascii.ecsv")
self.col_first_bin = table.meta["spectrum"]["col_first_bin"]
freqs = (
np.array([float(col) for col in table.columns[self.col_first_bin :]]) / 1e6
) # In MHz
num_bins = table.meta["spectrum"]["bins"]
in_db = table.meta["spectrum"]["db"]
freq = table.meta["vrt"]["frequency"] / 1e6
line = sys.stdin.readline()
values = line.split(",")
self.time_dt = np.array(Time(values[0], format="unix").to_datetime())
self.last_x_buffer = np.empty(last_seconds + 1)
self.last_x_buffer[:] = np.nan
self.min_range = np.arange(-last_seconds, 1, 1)
self.data = np.array(
[(np.array(values[self.col_first_bin :], dtype=float).mean())]
)
self.start_time = Time("2023-12-13T09:15:00").to_datetime()
self.stop_time = Time("2023-12-13T11:30:00").to_datetime()
#self.start_time = Time(values[0], format="unix").to_datetime()
#self.stop_time = (Time(values[0], format="unix") + 2 * u.h).to_datetime()
plt.ion()
self.output_counter = 0
now = time.time()
self.last_update = now
fig = Figure(figsize=(8, 6), dpi=75)
canvas = FigureCanvas(fig)
layout.addWidget(canvas)
self.setCentralWidget(central_widget)
toolbar = MyToolbar(canvas, self)
toolbar.setIconSize(toolbar.iconSize() * 0.75)
layout.addWidget(toolbar)
# fig.canvas.manager.set_window_title("Dwingeloo Radio Telescope")
self.powerplot = PowerPlot(fig, freq)
self.powerplot.ax1.set_xlim(-last_seconds - 1, 5)
self.read_stdin()
self.stdin_notifier = QSocketNotifier(
sys.stdin.fileno(), QSocketNotifier.Read, self
)
self.stdin_notifier.activated.connect(self.read_stdin)
def read_stdin(self):
ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
for file in ready_to_read:
line = file.readline().strip()
if len(line) == 0 or line[0] == "#":
continue
self.output_counter += 1
values = line.split(",")
time_now = float(values[0])
power_now = (
np.array(values[self.col_first_bin :], dtype=float)[0]
)
self.time_dt = np.append(
self.time_dt, Time(values[0], format="unix").to_datetime()
)
self.data = np.append(self.data, power_now)
self.last_x_buffer = np.roll(self.last_x_buffer, -1)
self.last_x_buffer[-1] = power_now
self.powerplot.set_zoomplot_values(self.min_range, self.last_x_buffer)
self.powerplot.set_totalplot_values(self.time_dt, self.data)
now = time.time()
if self.autoscale_x:
self.powerplot.ax2.set_xlim(self.start_time, self.stop_time)
self.powerplot.ax1.set_xlim(-last_seconds, 5)
if self.autoscale_y:
self.powerplot.do_autoscale_y()
# check if catching up on old data (set to 0.5 then)
if ((now - self.last_update) > 0.5) or (self.output_counter % 15 == 0):
self.powerplot.fig.canvas.draw()
self.powerplot.fig.canvas.flush_events()
self.last_update = now
# time.sleep(1)
def handle_interrupt_signal(signum, frame):
print("Ctrl+C pressed. Handling interrupt signal.")
QApplication.quit()
sys.exit(0)
if __name__ == "__main__":
app = QApplication(sys.argv)
icon_path = os.path.join(
os.path.dirname(os.path.realpath(__file__)), "sag_a_icon.png"
)
try:
app.setWindowIcon(QtGui.QIcon(icon_path))
except:
pass
main_window = PowerPlotMainWindow()
main_window.show()
signal.signal(signal.SIGINT, handle_interrupt_signal)
sys.exit(app.exec_())
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment