Skip to content
Snippets Groups Projects
Select Git revision
  • fd3e6c6ebcbe1aa074d9c2aecba9fe0f830388f3
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

schedulechecker.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    power_plot.py 7.09 KiB
    #!/usr/bin/env python3
    
    from datetime import datetime, timedelta
    
    import sys
    import os
    import math
    import numpy as np
    import json
    
    import signal
    
    import matplotlib
    import matplotlib.pyplot as plt
    from matplotlib.widgets import Button, CheckButtons
    from matplotlib.collections import LineCollection
    from matplotlib.dates import date2num, num2date, AutoDateFormatter
    from matplotlib.figure import Figure
    
    from PyQt5.QtWidgets import QApplication, QMainWindow, QVBoxLayout, QWidget, QComboBox
    from PyQt5.QtCore import QSocketNotifier, Qt
    from PyQt5 import QtGui
    from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
    from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
    
    import select
    
    
    import time
    from datetime import datetime
    
    from argparse import ArgumentParser
    
    
    class PowerPlot:
        """A power plot with two time scales"""
    
        def __init__(self, fig, yname, offset=None):
            self.fig = fig
            self.ax = fig.add_subplot()
            self.ax.set_xlabel("Time (UTC)")
            self.ax.set_ylabel(yname)
            self.ax.xaxis_date()
    
            formatter = AutoDateFormatter(self.ax.xaxis.get_major_locator())
            formatter.scaled[1 / (24 * 60)] = "%H:%M"
    
            self.ax.xaxis.set_major_formatter(formatter)
            if offset is not None:
                self.ax.yaxis.get_major_formatter().set_useOffset(offset)
    
            self.min, self.max = None, None
            fig.suptitle(yname, fontsize=16)
            fig.tight_layout()
            (self.totalplot,) = self.ax.plot([], [], ".", color="blue")
            (self.totalplothighlight,) = self.ax.plot(
                [], [], ".", color="red", markersize=10
            )
    
        def set_totalplot_values(self, xdata, ydata):
            self.totalplot.set_data(xdata, ydata)
            self.totalplothighlight.set_data(xdata[-1], ydata[-1])
            if self.min is None or ydata[-1] < self.min:
                self.min = ydata[-1]
            if self.max is None or ydata[-1] > self.max:
                self.max = ydata[-1]
            self.ax.autoscale_view(True, True, True)
    
        def do_autoscale_y(self):
            self.ax.set_ylim(
                self.min - (self.max - self.min) * 0.1 - 0.1,
                self.max + (self.max - self.min) * 0.1 + 0.1,
            )
    
    
    class MyToolbar(NavigationToolbar):
        """Custom toolbar that sets the 'autoscale' member of its owner"""
    
        def __init__(self, canvas, powerplotwindow, parent=None):
            super(MyToolbar, self).__init__(canvas, parent)
            self.powerplotwindow = powerplotwindow
    
        def home(self, *args):
            super().home(*args)
            self.powerplotwindow.autoscale_y = True
            self.powerplotwindow.autoscale_x = True
    
        def release_zoom(self, event):
            super().release_zoom(event)
            modifiers = event.guiEvent.modifiers()
            self.powerplotwindow.autoscale_y = False
            if modifiers != Qt.ShiftModifier:
                self.powerplotwindow.autoscale_x = False
    
    
    class PowerPlotMainWindow(QMainWindow):
        """QT Application that reads data from stdin, plots in a PowerPlot"""
    
        def __init__(self, offset=None, col_num=1):
            super().__init__()
            self.setStyleSheet("background-color: white;")
            self.autoscale_x = True
            self.autoscale_y = True
            self.col_num = col_num
            self.setWindowTitle("Dwingeloo Radio Telescope")
            self.setGeometry(100, 100, 800, 600)
    
            central_widget = QWidget(self)
            layout = QVBoxLayout(central_widget)
    
            # Get metadata and ignore it
            header_lines = []
            while True:
                # TODO: this is blocking and freezes the UI, could
                # be refactored to use the QT notification mechanism
                line = sys.stdin.readline()
                if len(line.strip()) == 0:
                    continue
                header_lines.append(line)
                if line[0] != "#":
                    column_names = [name.strip() for name in line.split()]
                    break
    
            yname = column_names[self.col_num]
            yname = yname.rstrip(",").capitalize()
    
            line = sys.stdin.readline()
            values = line.split(",")
            first_time = datetime.utcfromtimestamp(float(values[0]))
            self.time_dt = np.array(date2num(first_time))
    
            self.data = np.array([float(values[self.col_num])])
    
            self.start_time = first_time
            self.stop_time = first_time + timedelta(hours=2)
    
            plt.ion()
    
            self.output_counter = 0
    
            now = time.time()
            self.last_update = now
    
            fig = Figure(figsize=(8, 6), dpi=75)
    
            canvas = FigureCanvas(fig)
            layout.addWidget(canvas)
            self.setCentralWidget(central_widget)
    
            toolbar = MyToolbar(canvas, self)
            toolbar.setIconSize(toolbar.iconSize() * 0.75)
            layout.addWidget(toolbar)
    
            # fig.canvas.manager.set_window_title("Dwingeloo Radio Telescope")
    
            self.powerplot = PowerPlot(fig, yname, offset=offset)
    
            self.read_stdin()
            self.stdin_notifier = QSocketNotifier(
                sys.stdin.fileno(), QSocketNotifier.Read, self
            )
            self.stdin_notifier.activated.connect(self.read_stdin)
    
        def read_stdin(self):
            ready_to_read, _, _ = select.select([sys.stdin], [], [], 0)
            for file in ready_to_read:
                line = file.readline().strip()
                if len(line) == 0 or line[0] == "#":
                    continue
                self.output_counter += 1
                values = line.split(",")
                if not values[0][0].isnumeric():
                    print("Skipping line: " + line.strip())
                    print(values[0], values[0].isnumeric())
                    continue
                time_now = float(values[0])
                power_now = float(values[self.col_num])
    
                self.time_dt = np.append(
                    self.time_dt, date2num(datetime.utcfromtimestamp(time_now))
                )
                self.data = np.append(self.data, power_now)
    
                self.powerplot.set_totalplot_values(self.time_dt, self.data)
    
                now = time.time()
    
                if self.autoscale_x:
                    self.powerplot.ax.set_xlim(self.start_time, self.stop_time)
                if self.autoscale_y:
                    self.powerplot.do_autoscale_y()
    
                # check if catching up on old data (set to 0.5 then)
                if ((now - self.last_update) > 0.5) or (self.output_counter % 15 == 0):
                    self.powerplot.fig.canvas.draw()
                    self.powerplot.fig.canvas.flush_events()
                self.last_update = now
    
    
    def handle_interrupt_signal(signum, frame):
        print("Ctrl+C pressed. Handling interrupt signal.")
        QApplication.quit()
        sys.exit(0)
    
    
    if __name__ == "__main__":
        parser = ArgumentParser(description="Read files and numbers from stdin and plot them")
        parser.add_argument("-o", "--offset", help="Offset along y-axis", type=float, default=None)
    
        args = parser.parse_args()
    
        app = QApplication(sys.argv)
        icon_path = os.path.join(
            os.path.dirname(os.path.realpath(__file__)), "sag_a_icon.png"
        )
        try:
            app.setWindowIcon(QtGui.QIcon(icon_path))
        except:
            pass
        main_window = PowerPlotMainWindow(offset=args.offset)
        main_window.show()
        signal.signal(signal.SIGINT, handle_interrupt_signal)
    
        sys.exit(app.exec_())