Skip to content
Snippets Groups Projects
Select Git revision
  • df2925096741e0a9680293a03743cecf7aeb51c2
  • master default protected
2 results

atdb_stats.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    atdb_stats.py 19.77 KiB
    """
        File name: atdb_plot.py
        version: 1.0.0 (28 mar 2019)
        Author: Copyright (C) 2019 - Nico Vermaas - ASTRON
        Description: Plot data from ATDB
    
        This program is free software: you can redistribute it and/or modify
        it under the terms of the GNU General Public License as published by
        the Free Software Foundation, either version 3 of the License, or
        (at your option) any later version.
    
        This program is distributed in the hope that it will be useful,
        but WITHOUT ANY WARRANTY; without even the implied warranty of
        MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
        GNU General Public License for more details.
    
        You should have received a copy of the GNU General Public License
        along with this program.  If not, see <https://www.gnu.org/licenses/>.
    """
    
    import os
    import sys
    import datetime
    import time
    
    import plotly
    import requests
    import json
    import psycopg2
    
    import argparse
    import plotly.graph_objs as go
    from atdb_statistics import atdb_plot
    
    #import numpy as np
    
    # some constants
    ATDB_API_DEV = "http://localhost:8000/atdb"
    ATDB_API_TEST = "http://192.168.22.22/atdb"
    ATDB_API_ACC = "http://195.169.22.25/atdb"
    ATDB_API_PROD = "http://195.169.63.121/atdb"
    
    ALTA_API_DEV = "http://localhost:8000/altapi"
    ALTA_API_TEST = "http://alta-sys.astron.nl/altapi"
    ALTA_API_ACC = "https://alta-acc.astron.nl/altapi"
    ALTA_API_PROD = "https://alta.astron.nl/altapi"
    
    TIME_FORMAT = "%Y-%m-%d %H:%M"
    
    #--- common functions ---
    # this is a decorator that can be put in front (around) a function all to measure its execution time
    def timeit(method):
        def timed(*args, **kw):
            ts = time.time()
            result = method(*args, **kw)
            te = time.time()
            if 'log_time' in kw:
                name = kw.get('log_name', method.__name__.upper())
                kw['log_time'][name] = int((te - ts) * 1000)
            else:
                print('execution time: %r  %2.2f ms' % \
                      (method.__name__, (te - ts) * 1000))
            return result
        return timed
    
    def isCalibrator(name):
        """
        check if the provided name is in the (hardcoded) list of defined Apertif Calibrators
        :param: name, the name of a source.
        :return: True, if this is a calibrator
        """
        APERTIF_CALIBRATORS = ['3C48', '3C048', '3C138', '3C147', '3C196', '3C286', '3C295', 'CTD93']
        for calibrator in APERTIF_CALIBRATORS:
            if name.find(calibrator) >= 0:
                return True
    
        return False
    
    
    def scp_filename(host, source, target):
        """ scp a file from a remote location to a local dir
            location: directory on the node where the source file is, and where the target file will be copied.
            from_name: file to copy
            to_name : the new file.
        """
        print('scp '+host+':/' + source+ ' to ' + target)
        cmd = 'scp ' + host + ':' + source + ' ' + target
        os.system(cmd)
    
    
    def execute_remote_command(host, cmd):
        """ Run command on an ARTS node. Assumes ssh keys have been set up
            cmd: command to run
        """
        ssh_cmd = "ssh {} \"{}\"".format(host, cmd)
        print("Executing '{}'".format(ssh_cmd))
        return os.system(ssh_cmd)
    
    
    # --- presentation functions ---
    
    def do_ingest_sizes(args, starttime, endtime, plot_engine='plotly'):
    
        # connection parameters
        connection = None
    
        try:
    
            # connect to the PostgreSQL server
            connection = psycopg2.connect(host = args.atdb_database_host,
                                          port = args.atdb_database_port,
                                          database = args.atdb_database_name,
                                          user = args.atdb_database_user,
                                          password = args.atdb_database_password)
    
            # create a cursor
            cursor = connection.cursor()
    
            # input
            start_date = datetime.datetime.strptime(args.starttime, '%Y-%m-%d %H:%M')
            end_date = datetime.datetime.strptime(args.endtime, '%Y-%m-%d %H:%M')
            curr_date = start_date
    
            dates = []
            arts_list = []
            imaging_list = []
    
            print('DATE ARTS IMAGING')
    
            while curr_date <= end_date:
    
                datestr = datetime.datetime.strftime(curr_date, '%y%m%d')
    
                cursor.execute(
                    "SELECT sum(size) FROM public.taskdatabase_dataproduct where filename like 'ARTS%s%%'" % datestr)
                arts = cursor.fetchone()
    
                cursor.execute(
                    "SELECT sum(size) FROM public.taskdatabase_dataproduct where filename like 'WSRTA%s%%'" % datestr)
                imaging = cursor.fetchone()
    
                print(datestr, arts[0], imaging[0])
                dates.append(curr_date)
    
                if arts[0] != None:
                    arts_list.append(float(arts[0]) / 1e12)
                else:
                    arts_list.append(0)
    
                if imaging[0] != None:
                    imaging_list.append(float(imaging[0]) / 1e12)
                else:
                    imaging_list.append(0)
    
                curr_date = curr_date + datetime.timedelta(days=1)
    
            # Make cumulative
            arts_cumu = []
            imaging_cumu = []
            for i in range(0, len(dates)):
                j = sum(arts_list[0:i + 1])
                arts_cumu.append(j)
                j = sum(imaging_list[0:i + 1])
                imaging_cumu.append(j)
    
            print(arts_cumu[-1] / 134.40)
            # close the communication with the PostgreSQL
            cursor.close()
    
            # show the plot
            if ('IMAGING' in args.observing_mode.upper()):
                if (args.data_aggregation=='cumulative'):
                    atdb_plot.do_plot(args.plot_engine, args.title, dates, imaging_cumu, args.plot_type, args.color, args.output_html, args.y_axis_title)
                else:
                    atdb_plot.do_plot(args.plot_engine, args.title, dates, imaging_list, args.plot_type, args.color, args.output_html, args.y_axis_title)
            elif ('ARTS' in args.observing_mode.upper()):
                if (args.data_aggregation == 'cumulative'):
                    atdb_plot.do_plot(args.plot_engine, args.title, dates, arts_cumu, args.plot_type, args.color, args.output_html, args.y_axis_title)
                else:
                    atdb_plot.do_plot(args.plot_engine, args.title, dates, arts_list, args.plot_type, args.color, args.output_html, args.y_axis_title)
            #do_plot(args.title+' ARTS', dates, arts_list, args.type, args.output_html, args.y_axis_title)
    
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
        finally:
            if connection is not None:
                connection.close()
                print('Database connection closed.')
    
    
    def do_sky(args, starttime, endtime):
        """
        SELECT starttime, endtime, field_ra, field_dec
    	FROM public.taskdatabase_observation
        WHERE starttime>'2019-01-01' and endtime<'2019-02-01';
    
        :param args:
        :param starttime:
        :param endtime:
        :return:
        """
    
        # init
        ra_list = []
        dec_list = []
        duration_list = []
        sizes_list = []
        # connection parameters
        connection = None
    
        # input parameters
        starttime = datetime.datetime.strptime(args.starttime, '%Y-%m-%d %H:%M')
        endtime = datetime.datetime.strptime(args.endtime, '%Y-%m-%d %H:%M')
    
        try:
    
            # connect to the PostgreSQL server
            connection = psycopg2.connect(host = args.atdb_database_host,
                                          port = args.atdb_database_port,
                                          database = args.atdb_database_name,
                                          user = args.atdb_database_user,
                                          password = args.atdb_database_password)
    
            # create a cursor
            cursor = connection.cursor()
    
            # define and execute a sql query
            query = "SELECT field_ra, field_dec, field_name, starttime, endtime FROM public.taskdatabase_observation WHERE "
            query += "starttime > '"+ str(starttime) + "' AND endtime < '"+ str(endtime)+"' "
            query += "AND field_ha IS NULL;"
            cursor.execute(query)
    
            # fetch all the data from the query
    
            records = cursor.fetchall()
            for record in records:
                # only plot information about the targets
                fieldname = record[2]
                if not isCalibrator(fieldname):
                    t1 = record[3]
                    t2 = record[4]
    
                    duration = (t2 - t1).seconds
                    duration_list.append(int(duration/3600))
                    sizes_list.append(int(duration/360))
                    ra = record[0]
                    ra_list.append(ra)
                    dec_list.append(record[1])
    
    
            # show the plot
            atdb_plot.do_sky_plot(args.plot_engine, args.title, ra_list, dec_list, duration_list, sizes_list, args.output_html, args.y_axis_title, args.colormap)
    
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
    
        finally:
            if connection:
                cursor.close()
                connection.close()
                print('Database connection closed.')
    
    @timeit
    def do_ingest_speeds(args):
    
        # The request header
        ATDB_HEADER = {
            'content-type': "application/json",
            'cache-control': "no-cache",
            'authorization': "Basic YWRtaW46YWRtaW4="
        }
    
        # input parameters
    
        url = args.atdb_host + "/times?" + str(args.query)
    
        # do the request to the ATDB backend
        print('request to '+url)
        response = requests.request("GET", url, headers=ATDB_HEADER)
    
        # parse the response
        try:
            json_response = json.loads(response.text)
            results = json_response["results"]
    
        except Exception as err:
            print("Exception : " + str(err))
            raise (Exception(
                "ERROR: " + str(response.status_code) + ", " + str(response.reason) + ', ' + str(response.content)))
    
        # analyse the results
        print('analyse the results.')
        datapoints = []
        for result in results:
            if result['write_speed'] > 0:
                datapoint = {}
                datapoint['taskid'] = result['taskID']
                timestamp = datetime.datetime.strptime(result['starttime'], '%Y-%m-%dT%H:%M:%SZ')
                datapoint['timestamp'] = timestamp
                datapoint['type'] = 'observing'
                #datapoint['duration'] = result['duration']
                datapoint['timestamp_end'] = timestamp + datetime.timedelta(seconds=result['duration'])
                datapoint['speed_bps'] = result['write_speed'] * 8 / 1000
                datapoints.append(datapoint)
                #print(datapoint)
    
            if result['ingest_speed'] is not None:
                datapoint = {}
                datapoint['taskid'] = result['taskID']
                nofrag,frag = result['timestamp_ingesting'].split('.')
                timestamp = datetime.datetime.strptime(nofrag, '%Y-%m-%dT%H:%M:%S')
                datapoint['timestamp'] = timestamp
                datapoint['type'] = 'ingesting'
                datapoint['duration'] = result['ingest_duration']
                datapoint['timestamp_end'] = timestamp + datetime.timedelta(seconds=result['ingest_duration'])
                datapoint['speed_bps'] = result['ingest_speed'] * 8 / 1000
                datapoints.append(datapoint)
    
                prev_ingest_speed = datapoint['speed_bps']
                # print(datapoint)
    
            if result['timestamp_ingest_error'] is not None:
                datapoint = {}
                datapoint['taskid'] = result['taskID']
                nofrag,frag = result['timestamp_ingest_error'].split('.')
                timestamp = datetime.datetime.strptime(nofrag, '%Y-%m-%dT%H:%M:%S')
                datapoint['timestamp'] = timestamp
                datapoint['type'] = 'ingest_error'
                datapoint['speed_bps'] = prev_ingest_speed
                datapoints.append(datapoint)
    
        sorted_datapoints = sorted(datapoints, key=lambda k: k['timestamp'])
    
        # plot the results
        atdb_plot.do_speed_plot(args.title, args.y_axis_title, args.query, args.annotate, sorted_datapoints)
    
    
    
    def get_arguments(parser):
        """
        Gets the arguments with which this application is called and returns the parsed arguments.
        If a argfile is give as argument, the arguments will be overrided
        The args.argfile need to be an absolute path!
        :param parser: the argument parser.
        :return: Returns the arguments.
        """
        args = parser.parse_args()
        if args.argfile:
            args_file = args.argfile
            if os.path.exists(args_file):
                parse_args_params = ['@' + args_file]
                # First add argument file
                # Now add command-line arguments to allow override of settings from file.
                for arg in sys.argv[1:]:  # Ignore first argument, since it is the path to the python script itself
                    parse_args_params.append(arg)
                args = parser.parse_args(parse_args_params)
            else:
                raise (Exception("Can not find argument file " + args_file))
        return args
    
    
    def main():
        """
        The main module.
        """
        parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
    
       # IO parameters
        parser.add_argument("--atdb_database_host",
                            default="192.168.22.25",
                            help="database host")
        parser.add_argument("--atdb_database_port",
                            default="5432",
                            help="database port")
        parser.add_argument("--atdb_database_name",
                            default="atdb",
                            help="database name")
        parser.add_argument("--atdb_database_user",
                            default="atdbread",
                            help="database username")
        parser.add_argument("--atdb_database_password",
                            default="atdbread123",
                            help="database password")
        parser.add_argument("--atdb_api",
                            default="192.168.22.25/atdb",
                            help="ATDB ReST API")
        parser.add_argument("--plot_engine",
                            default="plotly",
                            help="options are: 'plotly' (for webpage)or 'mathplotlib")
        parser.add_argument("--filename",
                            default=None,
                            help="txt or qbx file to parse (when txt file parsing is used)")
        parser.add_argument("--dataset",
                            default=None,
                            help="dataset to parse (when qbackend is used). Possible options: gas, consumption, generation")
        parser.add_argument("--atdb_host",
                            default=None,
                            help="remote ssh/scp host where the files are stored (if None, then they are assumed to be local)")
        parser.add_argument("--remote_dir",
                            default=None,
                            help="remote directory where the files are stored")
        parser.add_argument("--remote_pre_command",
                            default=None,
                            help="execute this command on the remote host before downloading the data files")
        parser.add_argument("--remote_post_command",
                            default=None,
                            help="execute this command on the remote host after generating the html results.")
        parser.add_argument("--local_dir",
                            default='',
                            help="local directory where the data files are stored or read")
    
        # visualisation parameters
        parser.add_argument("--legends",
                            default="verbruik,teruglevering,totaal",
                            help="Legends for consumption, redelivery and totals.")
        parser.add_argument("--output_html",
                            default="atdb_plot.html",
                            help="output html file")
        parser.add_argument("--presentation",
                            default=None,
                            help="Possible options: ingest_sizes")
        parser.add_argument("--data_aggregation",
                            default="standard",
                            help="Possible options: cumulative, standard")
        parser.add_argument("--mode",
                            default=None,
                            help="Default modes. Possible options: today, this_week, this_month, this_year")
        parser.add_argument("--observing_mode",
                            default=None,
                            help="Observingmode")
        parser.add_argument("--starttime",
                            default=None,
                            help="Format like 2019-01-12 00:00")
        parser.add_argument("--endtime",
                            default=None,
                            help="Format like 2019-01-12 00:00")
        parser.add_argument("--query",
                            default=None,
                            help="query for the REST API, like 'taskID__contains=190607'")
        parser.add_argument("--annotate",
                            default=None,
                            help="field to annotate datapoints in the (speed) plot, like 'taskid'")
        parser.add_argument("--interval",
                            default="day",
                            help="Shows bars per interval. Possible options: minute, hour, day, month")
        # plot parameters
        parser.add_argument("--title",
                            default="Title",
                            help="Title of the Plot")
        parser.add_argument("--y_axis_title",
                            default="y-axis",
                            help="Title on the Y axis")
        parser.add_argument("--plot_type",
                            default="bar",
                            help="Chart type. Possible options: bar, scatter")
        parser.add_argument("--color",
                            default="#0081C9",
                            help="Color of the plot in hex value")
        parser.add_argument("--colormap",
                            default="viridis",
                            help="see: https://matplotlib.org/examples/color/colormaps_reference.html")
        # All parameters in a file
        parser.add_argument('--argfile',
                            nargs='?',
                            type=str,
                            help='Parameter file containing all the parameters')
        parser.add_argument("--version",
                            default=False,
                            help="Show current version of this program.",
                            action="store_true")
    
        args = get_arguments(parser)
    
        # --------------------------------------------------------------------------------------------------------
        if (args.version):
            print('--- atdb_stats.py - version 1.0.0 - 9 jun 2019 ---')
            print('Copyright (C) 2019 - Nico Vermaas - ASTRON. This program comes with ABSOLUTELY NO WARRANTY;')
            return
    
        print('--- atdb_stats.py - version 1.0.0 - 9 jun 2019 ---')
        print('Copyright (C) 2019 - Nico Vermaas - ASTRON. This program comes with ABSOLUTELY NO WARRANTY;')
        if args.starttime != None:
            starttime = datetime.datetime.strptime(args.starttime, TIME_FORMAT)
    
        # if no endtime is specified, then the endtime is now
        if args.endtime != None:
            endtime = datetime.datetime.strptime(args.endtime, TIME_FORMAT)
        else:
            endtime = datetime.datetime.now()
    
        # some default modes
        # today
        if args.mode=='today':
            endtime = datetime.datetime.now()
            starttime = endtime.replace(hour=0, minute=0)
    
        # this_month
        if args.mode=='this_month':
            endtime = datetime.datetime.now()
            starttime = endtime.replace(day=1, hour=0, minute=0)
    
        # this_year
        if args.mode=='this_year':
            endtime = datetime.datetime.now()
            starttime = endtime.replace(month=1,day=1, hour=0, minute=0)
    
        if args.remote_pre_command != None:
            execute_remote_command(args.atdb_host, args.remote_pre_command)
    
        # determine the type of presentation
        presentation = args.presentation
    
        # for backward compatibility with version 1.0,
        # the presentation mode was interpreted from the definition of the datafiles
    
        # for a single dataset
        if presentation=="ingest_sizes":
           do_ingest_sizes(args, starttime, endtime)
    
        elif presentation=="sky":
           do_sky(args, starttime, endtime)
    
        elif presentation=="ingest_speed":
           do_ingest_speeds(args)
    
        if args.remote_post_command != None:
            execute_remote_command(args.atdb_host, args.remote_post_command)
    
    
    if __name__ == "__main__":
            #try:
            main()
            #except Exception as error:
            #    print(str(error))