From f704116e95df06463da3f385026ffa785e55bcd3 Mon Sep 17 00:00:00 2001 From: Gijs <schoonderbeek@astron.nl> Date: Tue, 18 Oct 2022 17:12:20 +0200 Subject: [PATCH] Moved classes to lib file, modified EEPROM writing --- apspu_lib.py | 483 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 483 insertions(+) create mode 100644 apspu_lib.py diff --git a/apspu_lib.py b/apspu_lib.py new file mode 100644 index 0000000..b59bd79 --- /dev/null +++ b/apspu_lib.py @@ -0,0 +1,483 @@ +""" +Copyright 2021 Stichting Nederlandse Wetenschappelijk Onderzoek Instituten, +ASTRON Netherlands Institute for Radio Astronomy +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + + + Created: 2021-05-10 +This file contains the UniBoard2 class with all monitoring function. It can be used stand-alone to test it. + +""" + +import sys +sys.path.insert(0, '.') +import os +import math + +from APSPU_I2C import * +if os.name == "posix": + from I2C_serial_pi2 import * +else: + from I2C_serial import * + + +I2CBUSNR = 3 +DEBUG = False + +class ApspuClass: + # + # Class that contains all parts on a UniBoard2 + # + def __init__(self): + self.status = False + self.pols = [] + for pol in list(CTR_POLS.keys()): + self.pols.append(PolClass(pol)) + self.dev_i2c_eeprom = I2C(EEPROM) + self.dev_i2c_eeprom.bus_nr = I2CBUSNR + self.fans = FanmonitorClass() + + def write_eeprom(self, data="APSPU", address=0): + # + # Write the EEPROM with the serial number etc. + # + ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(0) + if ret_ack < 1: + print("EEPROM not found during write") + return False + else: + wr_data = bytearray(data.encode("utf-8", errors="ignore")) + for loc, data_byte in enumerate(wr_data): + self.dev_i2c_eeprom.write_bytes(address+loc, data_byte) + sleep(0.1) + return True + + def read_eeprom(self, address=0, nof_bytes=5): + # + # Read the EEPROM with the serial number etc. + # + ret_ack, ret_value = self.dev_i2c_eeprom.read_last_reg(1) + if ret_ack < 1: + print("no EEPROM found during read") + return False + else: + ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(address, nof_bytes) + str_return = bytes.fromhex(ret_value[:nof_bytes*2]).decode('UTF-8') + return str_return + + def wr_rd_eeprom(self, value="APSPU-1", address = 0): + # + # Write and Read the EEPROM to check functionality + # + if self.write_eeprom(value, address=0): + ret_value = self.read_eeprom(address=0, nof_bytes=len(value)) + stri = "Wrote to EEPROM register 0x{2:x} : {0}, Read from EEPROM: {1}".format(value, ret_value, address) + print(stri) + return True + + def read_all(self): + # + # Function to read all monitoring points of the UniBoard + # + print("--------- \nRead APSPU Status\n---------") + for pol in self.pols: + pol.read_all() + self.fans.read_all() + return True + + def print_status(self): + # + # Function to dump monitoring information on the screen + # + for pol in self.pols: + pol.print_status() + self.fans.print_status() + return True + + def set_pols(self): + print("--------- \nProgram Pols\n---------") + for pol in self.pols: + pol.set_vout_pol(VOUT_POLS[pol.name]) + pol.set_vout_ov_limit_pol(1.2*VOUT_POLS[pol.name]) + pol.set_on_off_config() + if DEBUG: + pol.read_vout_set() + pol.read_ov_limit() + pol.read_uv_limit() + pol.write_to_nvm() + print("Done") + + def check_apspu(self): + print("--------- \nCheck APSPU \n---------") + check_ok = True + for pol in self.pols: + check_ok = check_ok & pol.check_pol() + check_ok = check_ok & self.fans.check_fans() + if check_ok: + print("APSPU OK") + else: + print(">>> APSPU NOT OK <<<") + + def apspu_on_off(self, on): + for pol in self.pols: + pol.on_off(on) + return True + + + +class PolClass: + # + # Class to read all monitoring points Point of Load DC/DC converter + # + def __init__(self, name): + # + # All monitoring points Point of Load DC/DC converter + # + self.name = name + self.vout = 0 + self.vin = 0 + self.iout = 0 + self.temp = 0 + self.pol_dev = I2C(CTR_POLS[self.name]) + self.pol_dev.bus_nr = I2CBUSNR + ret_ack, ret_value = self.pol_dev.read_bytes(1) + if ret_ack < 1: + stri = " Device {0} at address 0x{1:X} not found".format(self.name, CTR_POLS[self.name]) + print(stri) + self.status = False + else: + self.status = True + + + def read_vout_set(self): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT_COMMAND, 2) + if len(raw_value)<4: + raw_value = '0' + raw_value + ret_value = [] + ret_value= int(raw_value[2:], 16) * 2**8 + ret_value += int(raw_value[:2], 16) + output_value = ret_value * 2**-11 + if DEBUG: + print(f"Output set to: = {output_value:5.2f} V using hex value {raw_value}") + return output_value + else: + return 999 + + def set_on_off_config(self): + # + # Function to set the output of the DC/DC converter + # + if self.status: + if DEBUG: + ret_ack, raw_value = self.pol_dev.read_bytes(LP_ON_OFF_CONFIG, 1) + print(f"Current setting ON/OFF register 0x{int(raw_value, 16):02x}") + on_off_bit = not 1 << 0 + polarity_pin = 1 << 1 + use_external = 0 << 2 + use_soft = 1 << 3 + default_off = 1 << 4 + wr_data = on_off_bit + polarity_pin + use_external + use_soft + default_off + ret_ack = self.pol_dev.write_bytes(LP_ON_OFF_CONFIG, wr_data) + return True + + def on_off(self, on=True): + wr_data = (on << 7) + 0 + ret_ack = self.pol_dev.write_bytes(LP_OPERATION, wr_data) + + def set_vout_pol(self, value): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + set_value = int(value * (2**11)) + hex_set_value = hex(set_value) + wr_value = (hex_set_value[4:6] + hex_set_value[2:4]) + if DEBUG: + print(f"Calculated wr_value is {wr_value}") + wr_data =[] + wr_data.append(int(hex_set_value[4:6], 16)) + wr_data.append(int(hex_set_value[2:4], 16)) + ret_ack = self.pol_dev.write_bytes(LP_VOUT_COMMAND, wr_data) + return True + + def set_vout_ov_limit_pol(self, value): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + set_value = int(value * (2**11)) + hex_set_value = hex(set_value) + wr_value = (hex_set_value[4:6] + hex_set_value[2:4]) + if DEBUG: + print(f"Calculated wr_value is {wr_value}") + wr_data =[] + wr_data.append(int(hex_set_value[4:6], 16)) + wr_data.append(int(hex_set_value[2:4], 16)) + ret_ack = self.pol_dev.write_bytes(LP_VOUT_OV_LIMIT, wr_data) + return True + + def write_to_nvm(self): + ret_ack = self.pol_dev.write_bytes(LP_STORE_USER_ALL, 0) + return True + + def read_vin(self): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + ret_ack, raw_value = self.pol_dev.read_bytes(LP_VIN, 2) + if len(raw_value)<4: + raw_value = '0' + raw_value + ret_value = [] + ret_value.append(int(raw_value[:2], 16)) + ret_value.append(int(raw_value[2:], 16)) # * 2**8 + output_value = calc_lin_2bytes(ret_value) #ret_value * 2**-11 + self.vin = output_value + + def read_vout(self): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + ret_ack, vout_mod = self.pol_dev.read_bytes(LP_VOUT_MODE, 1) + ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT, 2) + vout_mod = int(vout_mod, 16) + ret_value = [] + ret_value.append(int(raw_value[:2], 16)) + try: + ret_value.append(int(raw_value[2:], 16)) + except: + ret_value.append(0) + self.vout = calc_lin_3bytes(ret_value, [vout_mod]) + else: + self.vout = 999 + + def read_ov_limit(self): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT_OV_LIMIT, 2) + if len(raw_value)<4: + raw_value = '0' + raw_value + ret_value = [] + ret_value= int(raw_value[2:], 16) * 2**8 + ret_value += int(raw_value[:2], 16) + output_value = ret_value * 2**-11 + print(f"Output OV limit is set to: = {output_value:5.2f} V using hex value {raw_value}") + + def read_uv_limit(self): + # + # Function to read the output voltage of the Point of Load DC/DC converter + # + if self.status: + ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT_UV_LIMIT, 2) + if len(raw_value)<4: + raw_value = '0' + raw_value + ret_value = [] + ret_value= int(raw_value[2:], 16) * 2**8 + ret_value += int(raw_value[:2], 16) + output_value = ret_value * 2**-11 + print(f"Output UV limit is set to: = {output_value:5.2f} V using hex value {raw_value}") + return output_value + else: + return 9999 + + def read_iout(self): + # + # Function to read the output current of the Point of Load DC/DC converter + # + if self.status: + ret_ack, raw_value = self.pol_dev.read_bytes(0x39,2) + ret_ack, raw_value = self.pol_dev.read_bytes(LP_IOUT, 2) + ret_value = [] + ret_value.append(int(raw_value[:2], 16)) + ret_value.append(int(raw_value[2:], 16)) + self.iout = calc_lin_2bytes(ret_value) + else: + self.iout = 999 + + def read_temp(self): + # + # Function to read the temperature of the Point of Load DC/DC converter + # + if self.status: + ret_ack, raw_value = self.pol_dev.read_bytes(LP_temp, 2) + ret_value = [] + ret_value.append(int(raw_value[:2], 16)) + ret_value.append(int(raw_value[2:], 16)) + self.temp = calc_lin_2bytes(ret_value) + else: + self.temp = 999 + + def read_all(self): + # + # Function to read all monitoring points of the Point of Load DC/DC converter + # + self.read_vout() + self.read_iout() + self.read_temp() + self.read_vin() + + def check_pol(self): + self.read_all() + self.on_off(on=True) + check_ok = False + expected_vout = VOUT_POLS[self.name] + if 0.9*expected_vout < self.vout < 1.1*expected_vout: + check_ok = True + else: + check_ok = False + print(f"Vout not OK, expected {expected_vout} V, measured {self.vout} V") + vin_low = 45 + vin_high = 50 + if vin_low < self.vin < vin_high: + check_ok = True + else: + check_ok = False + print(f"Vin not OK, expected {vin_low} V, measured {self.vout} V < {vin_high} V") + temp_low = 20 + temp_high = 50 + if (temp_low < self.temp < temp_high) & check_ok: + check_ok = True + else: + check_ok = False + print(f"TEMP not OK, expected {temp_low} C < measured {self.temp} C < {temp_high} C") + return check_ok + + + def print_status(self): + # + # Function to dump all monitoring points of the Point of Load DC/DC converter on the screen + # + if self.status: + stri = f"POL: {self.name:10} :" + stri += "Vin :{0: >5.2f} V ".format(self.vin) + stri += "Vout :{0: >5.2f} V ".format(self.vout) + stri += "Iout :{0: >5.2f} A ".format(self.iout) + stri += "Temp :{0: >5.2f} \N{DEGREE SIGN}C".format(self.temp) + print(stri) + self.read_vout_set() + +class FanmonitorClass: + # + # Class to read all monitoring points Point of Load DC/DC converter + # + def __init__(self): + # + # All monitoring points Point of Load DC/DC converter + # + self.rpm = [] + self.fanmonitor_dev = I2C(MAX6620) + self.fanmonitor_dev.bus_nr = I2CBUSNR + ret_ack, ret_value = self.fanmonitor_dev.read_bytes(1) + if ret_ack < 1: + stri = " Device {0} at address 0x{1:X} not found".format("MAX6620", MAX6620) + print(stri) + self.status = False + else: + if DEBUG: + stri = "Device {0} at address 0x{1:X} is found ".format("MAX6620", MAX6620) + print(stri) + self.set_active() + self.status = True + + def set_active(self): + # + # Function to activate monitoring + # + ret_ack, reg_before = self.fanmonitor_dev.read_bytes(REG_GLOBAL, 1) + self.fanmonitor_dev.write_bytes(REG_GLOBAL, RUN_MONITOR) + ret_ack, reg_after = self.fanmonitor_dev.read_bytes(REG_GLOBAL, 1) + if DEBUG: + stri = "Reg at address 0x{0} before : {1} and after write action {2}".format(REG_GLOBAL, reg_before, reg_after) + print(stri) + fan_config_reg = int((math.log(TACH_PERIODS) / math.log(2))) << 5 + for fan_cnt in range(NOF_APS_FANS): + self.fanmonitor_dev.write_bytes(0x02 + fan_cnt, 0x88) + self.fanmonitor_dev.write_bytes(0x06 + fan_cnt, fan_config_reg) + + def read_fan(self, fan_nr): + # + # Function to a single fan + # + ret_ack, tach_msb = self.fanmonitor_dev.read_bytes(REG_TACH_MSP_REGS[fan_nr], 1) + tach_msb = int(tach_msb, 16) & 0xFF + if tach_msb > 254: + if DEBUG : + tach_lsb = 255 + tach = 99999 + rpm = 0 + else: + ret_ack, tach_lsb = self.fanmonitor_dev.read_bytes(REG_TACH_LSP_REGS[fan_nr], 1) + tach_lsb = int(tach_lsb, 16) & 0xE0 + tach = tach_msb*16 + tach_lsb/8 + rpm = float((TACH_COUNT_FREQ*TACH_PERIODS*60))/(FAN_TACHS*tach) + if DEBUG: + stri = "MSP: {0}, LSB: {1}, TACH : {2}".format(tach_msb, tach_lsb, tach) + print(stri) + return rpm + + def read_all(self): + # + # Function to read all fan's + # + self.rpm=[] + for fan_counter in range(NOF_APS_FANS): + self.rpm.append(self.read_fan(fan_counter)) + + def check_fans(self): + self.read_all() + check_ok = True + for cnt, speed in enumerate(self.rpm): + if speed < 10: + print(f"Low speed on fan {cnt}") + check_ok = False + return check_ok + + def print_status(self): + # + # Function to dump all monitoring points of the Point of Load DC/DC converter on the screen + # + if self.status: + stri = "Fan speeds of " + for fan_cnt in range(len(self.rpm)): + stri += "FAN_" + str(fan_cnt+1) + stri += " :{0: <5.2f} RPM ".format(self.rpm[fan_cnt]) + print(stri) + + +def main(): + # + # Function to test the class, read all info and dump on the screen + # + apspu = ApspuClass() + apspu.apspu_on_off(False) + sleep(5) + apspu.set_pols() + apspu.apspu_on_off(True) + sleep(10) + apspu.read_all() + apspu.print_status() + apspu.check_apspu() + apspu.apspu_on_off(False) + sleep(10) + apspu.read_all() + apspu.print_status() + apspu.apspu_on_off(True) + + +if __name__ == "__main__": + main() -- GitLab