Select Git revision
dynamic_scheduling.py
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
unb_gui_commands.py 16.33 KiB
###############################################################################
#
# Copyright (C) 2016
# ASTRON (Netherlands Institute for Radio Astronomy) <http://www.astron.nl/>
# P.O.Box 2, 7990 AA Dwingeloo, The Netherlands
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
###############################################################################
#-------------------------------------------------------------------------------
# Name: unb_gui_commands.py
# Purpose: function class for Uniboard gui
#
# Author: walle (ASTRON)
#
# Created: 25-02-2016
#-------------------------------------------------------------------------------
import Tkinter as tk
from Tkinter import *
import subprocess
import sys
import time
from pinger import get_iplist
from commands import *
from shell import ssh_cmd_bkgnd
import tkMessageBox
class Commands:
#Initialize variables
def __init__(self, perdir_path, lcu_nrs, unb_nrs, fn_nrs, bn_nrs, autofind_max_unb, nvar, rvar, svar, rbfvar, utilvar, atext, gtext, lftext):
self.perdir_path = perdir_path
self.lcu_nrs = lcu_nrs
self.unb_nrs = unb_nrs
self.fn_nrs = fn_nrs
self.bn_nrs = bn_nrs
self.autofind_max_unb = autofind_max_unb
self.nvar = nvar
self.rvar = rvar
self.svar = svar
self.rbfvar = rbfvar
self.utilvar = utilvar
self.atext = atext
self.gtext = gtext
self.lftext = lftext
#getting environment variable $UPE from lcu and setting peripherals directory path
def set_lcupath(self,lcu_name):
cmd = "ssh " + lcu_name + " 'printf $UPE'"
output = self.run_command(cmd)[0] + "/peripherals/"
return output
#function to print a line to both text widgets
def print_to_terminal(self, s):
self.atext.insert(END, s)
self.gtext.insert(END, s)
self.atext.update_idletasks()
self.gtext.update_idletasks()
self.atext.see(END)
self.gtext.see(END)
#Converting lcu input field to a list of numbers, example: "0:3,5" -> [0, 1, 2, 3, 5]
def get_lcu_nrs(self):
mylist = []
s = self.lcu_nrs.get().split(',')
for x in s:
if ':' in x:
x = x.split(':')
i = int(x[0])
while i <= int(x[1]):
mylist.append(i)
i += 1
else:
mylist.append(int(x))
mylist.sort()
return mylist
#making a lcu list from the users inputs
def make_lcu_list(self):
lcu_nrs = self.get_lcu_nrs()
lcu_list = []
startnr = 0
if lcu_nrs[0] == 0:
lcu_list.append('local')
startnr = 1
for i in range(startnr,(len(lcu_nrs))):
ln = str(lcu_nrs[i]) + ".0"
le = str(lcu_nrs[i]) + ".end"
lcu = str(self.lftext.get(ln, le))
if not lcu == '':
lcu_list.append(lcu)
return lcu_list
#executing a command on a remote lcu
def remote_cmd(self, machine, cmd, backgnd = False):
if backgnd:
ssh_cmd_bkgnd(machine, cmd)
else:
cmd = 'ssh -f -f -ty'+ ' ' + machine + ' ' + cmd
output = getstatusoutput(cmd)[1]
retu_list = []
for line in output.rstrip().split('\n'):
retu_list.append(line)
return retu_list
#converting a list of number to fn numbers: [1,2,3,4,5,6,7,8] -> [0,1,2,3]
def check_fn(self, nodelist):
mylist = []
for n in nodelist:
if 5 > n > 0:
mylist.append(n-1)
return mylist
#converting a list of number to fn numbers: [1,2,3,4,5,6,7,8] -> [0,1,2,3]
def check_bn(self, nodelist):
mylist = []
for n in nodelist:
if 9 > n > 4:
mylist.append(n-5)
return mylist
#converts ip adress to unb, fn, bn numbers
def autofind(self):
max_unbs = self.autofind_max_unb.get()
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
for number in range(0,max_unbs):
iplist = get_iplist(number, lcu)
unblist = []
for x in range(max_unbs):
unblist.append([])
#getting rid of the prefix and appending the nodes to a unb in unblist. the index of a unb in unblist is equal to the Board number
for ip in iplist:
temp = ip.split('.')
unblist[int(temp[2])].append(int(temp[3]))
i = 0
for unb in unblist:
if not unb == []:
fnlist = self.check_fn(unb)
bnlist = self.check_bn(unb)
s = "Unb " + str(i) + ": fn" + str(fnlist) + ' bn' + str(bnlist) + '\n'
self.print_to_terminal('%s: %s' % (lcu, s))
i += 1
#makes a commandline from the given util and the given lcu name
def setcmd(self, util, lcu_name = 'local'):
if lcu_name == 'local':
path = self.perdir_path.get()
else:
path = self.set_lcupath(lcu_name)
r = 'python %s' % path
if not r.endswith('/'):
r += '/'
r += util
if not self.unb_nrs.get() == '':
r += ' --unb %s' % self.unb_nrs.get()
if not self.fn_nrs.get() == '':
r += ' --fn %s' % self.fn_nrs.get()
if not self.bn_nrs.get() == '':
r += ' --bn %s' % self.bn_nrs.get()
return r
#runs the given command line on the given lcu. when back == True, the command will run in the background and will not return any feedback
def run_command(self, cmd, lcu_name = 'local', back = False):
self.print_to_terminal('%s:\n' % lcu_name)
if lcu_name == 'local':
output = subprocess.Popen( cmd, stdout=subprocess.PIPE, shell=True )
elif back:
self.remote_cmd(lcu_name, cmd, True)
return
else:
output = self.remote_cmd(lcu_name, cmd)
line_list = []
i = 0
while (1):
if lcu_name == 'local':
line = output.stdout.readline()
else:
if i < len(output):
line = output[i] + '\n'
else:
break
i += 1
if line:
line_list.append(line)
self.print_to_terminal(line)
else:
break
self.print_to_terminal('\n')
return line_list
#function for the Run Util Button
def run_util(self):
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
s = self.setcmd(self.utilvar.get(), lcu)
if not self.nvar.get() == '':
s += ' -n %s' % self.nvar.get()
if not self.rvar.get() == '':
s += ' -r %s' % self.rvar.get()
if not self.svar.get() == '':
s += ' -s %s' % self.svar.get()
print s
self.run_command(s, lcu)
#function for the REGMAP Button also used by flash_rbf()
def regmap(self):
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
s = self.setcmd('util_system_info.py', lcu)
s += ' -n 4'
ok = True
for line in self.run_command(s, lcu):
if 'FAILED' in line:
ok = False
#if the first time fails, wait one second and try again
if not ok:
self.print_to_terminal('Retrying...\n')
time.sleep(1)
ok = True
for line in self.run_command(s, lcu):
if 'FAILED' in line:
ok = False
if not ok:
self.print_to_terminal('Not able to read register map\n')
return ok
#function for the INFO Button
def info(self):
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
s = self.setcmd('util_system_info.py', lcu)
s += ' -n 0'
self.run_command(s, lcu)
#function for the WDI Button also used by flash_rbf()
def wdi(self):
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
s = self.setcmd('util_epcs.py', lcu)
s += ' -n 9'
self.run_command(s, lcu)
time.sleep(3) #to avoid too fast reading out of the register map
#function for the REMU Button also used by flash_rbf()
def remu(self):
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
s = self.setcmd('util_epcs.py', lcu)
s += ' -n 8'
self.run_command(s,lcu)
time.sleep(3) #to avoid too fast reading out of the register map
#function for the FLASH (uses -s) Button
def advflash(self):
lcu_list = self.make_lcu_list()
#check if the selected path ends with '.rbf'
if self.svar.get().endswith('.rbf'):
rbf_file = self.svar.get()
else:
tkMessageBox.showwarning("No .rbf file selected","Please select a .rbf file")
return
#sending rbf file to LCU's
self.print_to_terminal('Sending rbf file to LCUs\n')
for lcu in lcu_list:
if not lcu == 'local':
cmd = 'scp ' + rbf_file + ' ' + lcu + ':/tmp/tmp.rbf'
if lcu == lcu_list[-1]: #Don't run in background for last lcu
self.run_command(cmd)
self.print_to_terminal('Done sending rbf file to LCUs\n')
else:
self.run_command(cmd, 'local', True)
#flash the rbf file using util_epcs.py
for lcu in lcu_list:
s = self.setcmd('util_epcs.py', lcu)
s += ' -n 7'
if lcu == 'local':
s += ' -s %s' % rbf_file
else:
s += ' -s /tmp/tmp.rbf'
if lcu == lcu_list[-1]:
self.run_command(s, lcu) #get output of last lcu
else:
self.run_command(s, lcu, True) #run parallel
#function for Show system info Button also used by flash_rbf(). Reads system info and filters unb nr, fn nr, bn nr, design and firmware
def show_info(self, p = True):
lcu_list = self.make_lcu_list()
return_list = []
for lcu in lcu_list:
self.print_to_terminal("%s:\n" % lcu)
s = self.setcmd('util_system_info.py', lcu)
s += ' -n 0'
if lcu == 'local':
output = subprocess.Popen( s, stdout=subprocess.PIPE, shell=True )
else:
output = self.remote_cmd(lcu, s)
line_list = []
i = 0
while (1):
if lcu == 'local':
line = output.stdout.readline()
else:
if i < len(output):
line = output[i]
else:
break
i += 1
if line:
if lcu == 'local':
x = -1
else:
x = None
if "UniBoard nr" in line:
temp = line[line.find("UniBoard nr"):x]
r1 = temp.replace(' ',' ')
if "Design" in line:
r3 = line[line.find("Design"):line.find('\x00', line.find("Design"))]
if "Firmware version" in line:
r4 = line[line.find("Firmware version"):x]
# 'FN nr' or 'BN nr' are the last lines needed from system info so print here
if "FN nr" in line:
temp = line[line.find("FN nr"):x]
r2 = temp.replace(' ',' ')
r = '%s, %s, %s, %s\n' % (r1, r2, r3, r4)
line_list.append(r)
if p:
self.print_to_terminal(r)
if "BN nr" in line:
temp = line[line.find("BN nr"):x]
r2 = temp.replace(' ',' ')
r = '%s, %s, %s, %s\n' % (r1, r2, r3, r4)
line_list.append(r)
if p:
self.print_to_terminal(r)
else:
break
return_list.append(line_list)
return return_list
# check rbf file name for 'bn_'/'fn_'
def check_rbf(self, rbf_path):
rbf_name = rbf_path.split('/')[-1]
if ('bn_' in rbf_name) and not (self.fn_nrs.get() == ''):
tkMessageBox.showwarning("Warning","You can't flash front nodes with a rbf for backnodes")
return False
elif ('fn_' in rbf_name) and not (self.bn_nrs.get() == ''):
tkMessageBox.showwarning("Warning","You can't flash back nodes with a rbf for frontnodes")
return False
else:
return True
#function for the FLASH Button on the general tab.
def flash_rbf(self):
if self.rbfvar.get().endswith('.rbf'):
rbf_file = self.rbfvar.get()
print rbf_file
else:
tkMessageBox.showwarning("No .rbf file selected","Please select a .rbf file")
return
# check rbf file for bn_ / fn_
if not self.check_rbf(rbf_file):
return
#get regmap
if not self.regmap():
return
#check unb_factory
er = False
for line_list in self.show_info(False):
for line in line_list:
if not 'unb_factory' in line:
er = True
if er:
if tkMessageBox.askyesno('continue?',"There are one or more nodes that are currently not set to unb_factory. Do you want to set these nodes to unb_factory and continue?"):
self.wdi()
if not self.regmap():
return
else:
self.print_to_terminal('image flashing terminated\n')
return
#Sending rbf file to LCUs
self.print_to_terminal('Sending rbf file to LCUs\n')
lcu_list = self.make_lcu_list()
for lcu in lcu_list:
if not lcu == 'local':
cmd = 'scp ' + rbf_file + ' ' + lcu + ':/tmp/tmp.rbf'
self.run_command(cmd)
self.print_to_terminal('Done sending rbf file to LCUs\n')
#flashing user image
self.print_to_terminal('Start flashing user image\n')
for lcu in lcu_list:
s = self.setcmd('util_epcs.py', lcu)
s += ' -n 7'
if lcu == 'local':
s += ' -s %s' % rbf_file
else:
s += ' -s /tmp/tmp.rbf'
if lcu == lcu_list[-1]:
self.run_command(s, lcu) #get output of last lcu
else:
self.run_command(s, lcu, True) #run parallel
self.print_to_terminal('Loading user image\n')
self.remu()
self.regmap()