Skip to content
Snippets Groups Projects

Modified the scripts to run on Raspberry Pi.

1 file
+ 233
267
Compare changes
  • Side-by-side
  • Inline
+ 233
267
#******************************************#
# Read hardware info from UNB2c
# Created: 2021-05-10
#******************************************#
"""
Copyright 2021 Stichting Nederlandse Wetenschappelijk Onderzoek Instituten,
ASTRON Netherlands Institute for Radio Astronomy
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Created: 2021-05-10
This file contains the UniBoard2 class with all monitoring function. It can be used stand-alone to test it.
"""
import sys
import time
sys.path.insert(0,'.')
import os
if os.name =="posix":
# import time
from UniBoard2_I2C import *
if os.name == "posix":
from I2C_serial_pi import *
else:
from I2C_serial import *
sys.path.insert(0, '.')
I2CBUSNR = 3
DEBUG = False
from UniBoard2_I2C import *
I2CBUSNR=3
class c_unb2c:
class Unb2cClass:
#
# Class that contains all parts on a UniBoard2
#
def __init__(self):
self.status=False
self.nodes=[]
self.pols=[]
self.status = False
self.nodes = []
self.pols = []
for node_cnt in range(4):
self.nodes.append(c_node(node_cnt))
self.nodes.append(NodeClass(node_cnt))
for pol in list(CTR_POLS.keys()):
self.pols.append(c_pol(pol, type="central"))
self.pols.append(PolClass(pol, location="central"))
self.dev_i2c_eeprom = I2C(EEPROM)
self.dev_i2c_eeprom.bus = I2CBUSNR
self.main_switch = I2C(MAIN_I2C_SWITCH)
self.main_switch.bus = I2CBUSNR
self.front = I2C(LED_DRIVER)
self.front.bus = I2CBUSNR
def write_eeprom(self, data=0x01):
#
# Write the EEPROM with the serial number etc.
#
ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(0)
if ret_ack < 1:
print("no device found")
if DEBUG:
print("no device found")
return False
else:
self.dev_i2c_eeprom.write_bytes(0x00, data)
return True
def read_eeprom(self):
#
# Read the EEPROM with the serial number etc.
#
ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(0)
if ret_ack < 1:
print("no device found")
if DEBUG:
print("no EEPROM found")
return False
else:
ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(0x00, 1)
return ret_value
return ret_value
def wr_rd_eeprom(self, value=0x34):
#
# Write and Read the EEPROM to check functionality
#
self.write_eeprom(value)
ret_value = self.read_eeprom()
stri = "Wrote to EEPROM: 0x{0:X}, Read from EEPROM: 0x{1} ".format(value, ret_value)
print(stri)
def front_led(self, collor):
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x20,0x20) #select LED
return True
def front_led(self, color):
#
# Write the LED on the front panel,
# Input to this function is the color see UniBoard_i2c.py
#
# select LED
ret_ack = self.main_switch.write_bytes(0x20, 0x20)
if ret_ack < 1:
print("Main I2C switch not found")
if DEBUG:
print("Main I2C switch not found")
return False
else:
front = I2C(LED_DRIVER)
front.bus = I2CBUSNR
ret_ack = front.write_bytes(0x03, 0)
ret_ack = self.front.write_bytes(0x03, 0)
if ret_ack < 1:
print("Front LED driver not found")
if DEBUG:
print("Front LED driver not found")
else:
front.write_bytes(0x01, collor)
self.front.write_bytes(0x01, color)
return True
def read_all(self):
#
# Function to read all monitoring points of the UniBoard
#
for node in self.nodes:
node.read_all()
for pol in self.pols:
pol.read_all()
return True
def print_status(self):
#
# Function to dump monitoring information on the screen
#
for color in list(LED_COLORS.keys()):
print(color)
self.front_led(LED_COLORS[color])
@@ -79,22 +128,34 @@ class c_unb2c:
for pol in self.pols:
pol.print_status()
self.wr_rd_eeprom()
return True
class NodeClass:
#
# Class that contains all monitoring points of one FPGA node.
#
class c_node:
def __init__(self, number):
#
# All the monitoring points.
#
self.node_number = number
self.pols=[]
self.pols = []
self.set_i2c_switches()
for pol in list(LOC_POLS.keys()):
self.pols.append(c_pol(pol))
self.ddr=[]
self.pols.append(PolClass(pol))
self.ddr = []
for bank_cnt in range(2):
self.ddr.append(c_ddr(bank_cnt))
self.qsfp=[]
self.ddr.append(DdrClass(bank_cnt))
self.qsfp = []
for qsfp_cnt in range(6):
self.qsfp.append(c_qsfp(qsfp_cnt))
self.qsfp.append(QsfpClass(qsfp_cnt))
def read_all(self):
#
# Function to read all monitoring points of one FPGA node
#
self.set_i2c_switches()
for pol in self.pols:
pol.read_all()
@@ -104,6 +165,9 @@ class c_node:
qsfp.read_all()
def print_status(self):
#
# Function to dump all monitoring points of one FPGA node on the screen
#
stri = "Status of Node {0}".format(self.node_number)
print(stri)
for pol in self.pols:
@@ -114,20 +178,32 @@ class c_node:
qsfp.print_status()
def set_i2c_switches(self):
#
# Before a node can ba accessed the i2c switches have to be set, this function can be used.
#
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x0, 0x01<<self.node_number) # select Node
ret_ack = main_switch.write_bytes(0x0, 0x01 << self.node_number) # select Node
if ret_ack < 1:
print("Main I2C switch not found")
if DEBUG:
print("Main I2C switch not found")
else:
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, 0x20) # select DDR4
if ret_ack < 1:
print("Node I2C switch not found")
if DEBUG:
print("Node I2C switch not found")
class c_qsfp:
class QsfpClass:
#
# Class that contains all monitoring points of a QSFP cage
#
def __init__(self, port):
#
# All the monitoring points of a QSFP
#
self.port = port
self.temp = 0
self.volt = 0
@@ -137,57 +213,82 @@ class c_qsfp:
self.qsfp_cage.bus = I2CBUSNR
def select_qsfp(self):
#
# Function to set the I2C switch to access a QSFP cage
#
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, QSFP_PORT[self.port])
if ret_ack < 1:
# print("Node I2C switch not found")
if DEBUG:
print("Node I2C switch not found")
self.status = False
else:
self.status = True
def read_temp(self):
#
# Function to read the temperature of a QSFP cage
#
self.select_qsfp()
ret_ack, raw_ret = self.qsfp_cage.read_bytes(QSFP_TEMP, 2)
if (ret_ack < 1) | (raw_ret[:2]=='ff'):
# stri = "No QSFP found in port {0}".format(self.port)
# print(stri)
self.status=False
if (ret_ack < 1) | (raw_ret[:2] == 'ff'):
if DEBUG:
stri = "No QSFP found in port {0}".format(self.port)
print(stri)
self.status = False
else:
ret_value = []
ret_value.append(int(raw_ret[:2], 16))
ret_value.append(int(raw_ret[2:], 16))
self.temp = (ret_value[0] * 256 + ret_value[1]) / 256
self.status=True
self.status = True
def read_volt(self):
#
# Function to read the power input of a QSFP cage
#
self.select_qsfp()
ret_ack, raw_ret = self.qsfp_cage.read_bytes(QSFP_VOLT, 2)
if (ret_ack < 1) | (raw_ret[:2]=='ff') :
# stri = "No QSFP found in port {0}".format(self.port)
# print(stri)
self.status=False
if (ret_ack < 1) | (raw_ret[:2] == 'ff'):
if DEBUG:
stri = "No QSFP found in port {0}".format(self.port)
print(stri)
self.status = False
else:
ret_value=[]
ret_value = []
ret_value.append(int(raw_ret[:2], 16))
ret_value.append(int(raw_ret[2:], 16))
self.volt = (ret_value[0] * 256 + ret_value[1]) * 0.0001
self.status=True
self.status = True
def read_all(self):
#
# Function to read all monitoring points of a QSFP cage
#
self.read_temp()
self.read_volt()
def print_status(self):
#
# Function to dump all monitoring information of a QSFP cage on the screen
#
if self.status:
stri = "Slot {0} : QSFP Temperature QSFP {1:3.2f} gr. C Voltage {2:3.2f} V".format(self.port, self.temp, self.volt)
print(stri)
class c_ddr:
class DdrClass:
#
# Class to read all monitoring points of a DDR4 module
#
def __init__(self, bank):
#
# All monitoring points of a DDR4 module
#
self.bank = bank
self.status=False
self.status = False
self.temp = 0
if self.bank == 0:
self.ddr_dev = I2C(MB_I_TEMP_I2C_ADDR)
else:
@@ -195,47 +296,66 @@ class c_ddr:
self.ddr_dev.bus = I2CBUSNR
def set_i2c_switch(self):
#
# Function to set the I2C switch to access the DDR4 modules
#
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, DDR4)
if ret_ack < 1:
print("Node I2C switch not found")
if DEBUG:
print("Node I2C switch not found")
self.status = True
else:
self.status = False
def read_temp(self):
#
# Function to read the temperature of a DDR4 module
#
ret_ack, raw_ret = self.ddr_dev.read_bytes(MB_TEMP_REG, 2)
if ret_ack < 1:
# stri = "No DDR moduel in slot {0}".format(self.bank)
# print(stri)
self.status=False
if DEBUG:
stri = "No DDR module in slot {0}".format(self.bank)
print(stri)
self.status = False
else:
ret_value = []
ret_value.append(int(raw_ret[:2], 16))
ret_value.append(int(raw_ret[2:], 16))
self.temp = (((ret_value[0] & 0x1F) * 0x100) + (ret_value[1] & 0xFC)) * 0.0625
self.status=True
self.status = True
def read_all(self):
#
# Function to read all monitoring points of a DDR4 module
#
self.set_i2c_switch()
self.read_temp()
def print_status(self):
#
# Function to dump all monitoring points of a DDR4 module on the screen
#
if self.status:
stri = "Temperature DDR4 in slot {0} is {1:3.2f} C".format(self.bank, self.temp)
print(stri)
class c_pol:
def __init__(self, name, type="node"):
class PolClass:
#
# Class to read all monitoring points Point of Load DC/DC converter
#
def __init__(self, name, location="node"):
#
# All monitoring points Point of Load DC/DC converter
#
self.name = name
self.type = type
self.vout=0
self.iout=0
self.tem=0
if type == "node":
self.location = location
self.vout = 0
self.iout = 0
self.temp = 0
if self.location == "node":
self.set_i2c_switch()
self.pol_dev = I2C(LOC_POLS[self.name])
else:
@@ -244,24 +364,31 @@ class c_pol:
sleep(0.1)
ret_ack, ret_value = self.pol_dev.read_bytes(0)
if ret_ack < 1:
# stri = " Device {0} at address 0x{1:X} not found".format(self.name, LOC_POLS[self.name])
# print(stri)
self.status=False
if DEBUG:
stri = " Device {0} at address 0x{1:X} not found".format(self.name, LOC_POLS[self.name])
print(stri)
self.status = False
else:
self.status=True
self.status = True
def set_i2c_switch(self):
#
# Function to set the I2C switch to access the Point of Load DC/DC converter
#
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, LOC_POWER)
if ret_ack < 1:
print("Node I2C switch not found")
if DEBUG:
print("Node I2C switch not found")
self.status = True
else:
self.status = False
def read_vout(self):
#
# Function to read the output voltage of the Point of Load DC/DC converter
#
if self.status:
if type == "node":
self.set_i2c_switch()
@@ -269,48 +396,59 @@ class c_pol:
ret_ack, vout_mod = self.pol_dev.read_bytes(LP_VOUT_MODE, 1)
sleep(0.1)
ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT, 2)
vout_mod = int(vout_mod,16)
vout_mod = int(vout_mod, 16)
ret_value = []
ret_value.append(int(raw_value[:2], 16))
ret_value.append(int(raw_value[2:], 16))
self.vout = calc_lin_3bytes(ret_value, [vout_mod])
else:
self.vout=999
self.vout = 999
def read_iout(self):
#
# Function to read the output current of the Point of Load DC/DC converter
#
if self.status:
if type == "node":
self.set_i2c_switch()
sleep(0.1)
ret_ack, raw_value = self.pol_dev.read_bytes(LP_IOUT, 2)
ret_value=[]
ret_value.append(int(raw_value[:2],16))
ret_value.append(int(raw_value[2:],16))
ret_value = []
ret_value.append(int(raw_value[:2], 16))
ret_value.append(int(raw_value[2:], 16))
self.iout = calc_lin_2bytes(ret_value)
else:
self.iout=999
self.iout = 999
def read_temp(self):
#
# Function to read the temperature of the Point of Load DC/DC converter
#
if self.status:
if type == "node":
self.set_i2c_switch()
sleep(0.1)
ret_ack,raw_value = self.pol_dev.read_bytes(LP_temp, 2)
ret_value=[]
ret_value.append(int(raw_value[:2],16))
ret_value.append(int(raw_value[2:],16))
ret_ack, raw_value = self.pol_dev.read_bytes(LP_temp, 2)
ret_value = []
ret_value.append(int(raw_value[:2], 16))
ret_value.append(int(raw_value[2:], 16))
self.temp = calc_lin_2bytes(ret_value)
else:
self.temp=999
self.temp = 999
def read_all(self):
#
# Function to read all monitoring points of the Point of Load DC/DC converter
#
self.read_vout()
self.read_iout()
self.read_temp()
def print_status(self):
#
# Function to dump all monitoring points of the Point of Load DC/DC converter on the screen
#
if self.status:
# self.read_all()
stri = "POL: " + self.name + " "
stri += "Output voltage :{0: <5.2f} V ".format(self.vout)
stri += "Output Current :{0: <5.2f} A ".format(self.iout)
@@ -318,186 +456,14 @@ class c_pol:
print(stri)
def rw_eeprom(value=0xAB):
I2C_eeprom = I2C(EEPROM)
I2C_eeprom.bus = I2CBUSNR
ret_ack, ret_value = I2C_eeprom.read_bytes(0)
if ret_ack < 1:
print("no device found")
else:
pr_stri = "Found device at address 0x{:02x}".format(I2C_eeprom.I2C_Address)
print(pr_stri)
I2C_eeprom.write_bytes(0x00, value)
ret_ack, ret_value = I2C_eeprom.read_bytes(0x00, 1)
stri = "Wrote to EEPROM: 0x{0:X}, Read from EEPROM: 0x{1} ".format(value, ret_value)
print(stri)
def main():
#
# Function to test the class, read all info and dump on the screen
#
unb = Unb2cClass()
unb.read_all()
unb.print_status()
def front_led(collor):
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x20,0x20) #select LED
if ret_ack < 1:
print("Main I2C switch not found")
else:
front = I2C(LED_DRIVER)
front.bus = I2CBUSNR
ret_ack = front.write_bytes(0x03, 0)
if ret_ack < 1:
print("Front LED driver not found")
else:
front.write_bytes(0x01, collor)
sleep(1.5)
def read_pol(node_nr, i2c_addr):
if node_nr >= 0:
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x0, 0x01<<node_nr) #select Node
if ret_ack < 1:
print("Main I2C switch not found")
return False
else:
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, 0x20) #select DDR4
if ret_ack < 1:
print("Node I2C switch not found")
return False
else:
print("Central POL")
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x0, 0x10) #select Pol
if ret_ack < 1:
print("Main I2C switch not found")
return False
LOC_PWR = I2C(i2c_addr)
LOC_PWR.bus = I2CBUSNR
ret_ack, ret_value = LOC_PWR.read_bytes(0)
if ret_ack < 1:
print("no device found")
else:
sleep(0.1)
ret_ack, vout_mod = LOC_PWR.read_bytes(LP_VOUT_MODE, 1)
sleep(0.1)
ret_ack, raw_value = LOC_PWR.read_bytes(LP_VOUT, 2)
vout_mod = int(vout_mod,16)
ret_value = []
ret_value.append(int(raw_value[:2], 16))
ret_value.append(int(raw_value[2:], 16))
vout = calc_lin_3bytes(ret_value, [vout_mod])
stri = "Output voltage :{0: <5.2f} V ".format(vout)
sleep(0.1)
ret_ack, raw_value = LOC_PWR.read_bytes(LP_IOUT, 2)
ret_value=[]
ret_value.append(int(raw_value[:2],16))
ret_value.append(int(raw_value[2:],16))
iout = calc_lin_2bytes(ret_value)
stri += "Output Current :{0: <5.2f} A ".format(iout)
sleep(0.1)
ret_ack,raw_value = LOC_PWR.read_bytes(LP_temp, 2)
ret_value=[]
ret_value.append(int(raw_value[:2],16))
ret_value.append(int(raw_value[2:],16))
temp = calc_lin_2bytes(ret_value)
stri += "temperature :{0: <5.2f} Deg C".format(temp)
print(stri)
return True
def read_ddr(node_nr = 0, module=0):
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x0, 0x01<<node_nr) #select Node
if ret_ack < 1:
print("Main I2C switch not found")
else:
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, 0x10) #select DDR4
if ret_ack < 1:
print("Node I2C switch not found")
else:
if module==0:
ddr_module = I2C(MB_I_TEMP_I2C_ADDR)
else:
ddr_module = I2C(MB_II_TEMP_I2C_ADDR)
ddr_module.bus = I2CBUSNR
ret_ack, raw_ret = ddr_module.read_bytes(MB_TEMP_REG, 2)
if ret_ack < 1:
stri = "No DDR moduel in slot {0} node {1}".format(module, node_nr)
else:
ret_value=[]
ret_value.append(int(raw_ret[:2],16))
ret_value.append(int(raw_ret[2:],16))
temp = (((ret_value[0] & 0x1F) * 0x100) + (ret_value[1] & 0xFC)) * 0.0625
stri = "Temperature DDR4 in slot {0} node {1} is {2:3.2f} C".format(module, node_nr, temp)
print(stri)
def read_qsfp(node_nr = 0, module=0):
main_switch = I2C(MAIN_I2C_SWITCH)
main_switch.bus = I2CBUSNR
ret_ack = main_switch.write_bytes(0x00, 0x01<<node_nr) #select Node
if ret_ack < 1:
print("Main I2C switch not found")
else:
node_switch = I2C(NODE_I2C_SWITCH)
node_switch.bus = I2CBUSNR
ret_ack = node_switch.write_bytes(0x0, QSFP_PORT[module]) #select QSFP cage 1
if ret_ack < 1:
print("Node I2C switch not found")
else:
QSFP_cage = I2C(QSFP_I2C_ADDR)
QSFP_cage.bus = I2CBUSNR
ret_ack, raw_ret = QSFP_cage.read_bytes(QSFP_TEMP, 2)
if (ret_ack < 1) | (raw_ret[:2]=='ff'):
stri = "No QSFP module in slot {0} node {1}".format(module, node_nr)
print(stri)
else:
ret_value=[]
ret_value.append(int(raw_ret[:2],16))
ret_value.append(int(raw_ret[2:],16))
temp_in_mod = (ret_value[0] * 256 + ret_value[1]) / 256
stri = "Temperature QSFP in slot {0} node {1} is {2:3.2f} gr. C".format(module, node_nr, temp_in_mod)
print(stri)
ret_ack, raw_ret = QSFP_cage.read_bytes(QSFP_VOLT, 2)
ret_value.append(int(raw_ret[:2],16))
ret_value.append(int(raw_ret[2:],16))
Power_in_mod = (ret_value[0] * 256 + ret_value[1]) * 0.0001
stri = "Voltage QSFP in slot {0} node {1} is {2:3.2f} V".format(module, node_nr, Power_in_mod)
print(stri)
if 0:
rw_eeprom(0xCD)
for color in list(LED_COLORS.keys()):
print(color)
front_led(LED_COLORS[color])
for node_cnt in range(4):
for module_cnt in range(2):
read_ddr(node_nr=node_cnt,module=module_cnt)
for node_cnt in range(4):
for qsfp_cnt in range(6):
read_qsfp(node_nr = node_cnt, module=qsfp_cnt)
#else:
for node_cnt in range(4):
read_pol(node_cnt, LOC_POWER_CORE)
# read_pol(-1,0x01)
#main_switch = I2C(MAIN_I2C_SWITCH)
#main_switch.bus = I2CBUSNR
#ret_ack = main_switch.write_bytes(0x0, 0x01) #select Node
#if ret_ack < 1:
# print("Main I2C switch not found")
#else:
# node_switch = I2C(NODE_I2C_SWITCH)
# node_switch.bus = I2CBUSNR
# ret_ack = node_switch.write_bytes(0x0, 0x20) #select DDR4
# if ret_ack < 1:
# print("Node I2C switch not found")
#for cnt in range(2):
# node = c_node(cnt)
# node.read_all()
# node.print_status()
unb = c_unb2c()
unb.read_all()
unb.print_status()
if __name__ == "__main__":
main()
Loading