Skip to content
Snippets Groups Projects
Select Git revision
  • c5df7932858995c3911cac01035891d716ec4d53
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

create_add_virtual_instrument.sql.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    apspu_lib.py 16.12 KiB
    """
    Copyright 2021 Stichting Nederlandse Wetenschappelijk Onderzoek Instituten,
    ASTRON Netherlands Institute for Radio Astronomy
    Licensed under the Apache License, Version 2.0 (the "License");
    you may not use this file except in compliance with the License.
    You may obtain a copy of the License at
     http://www.apache.org/licenses/LICENSE-2.0
    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
    
    
      Created: 2021-05-10
    This file contains the UniBoard2 class with all monitoring function. It can be used stand-alone to test it.
    
    """
    
    import sys
    sys.path.insert(0, '.')
    import os
    import math
     
    from APSPU_I2C import *
    if os.name == "posix":
        from I2C_serial_pi2 import *
    else:
        from I2C_serial import *
    
    
    I2CBUSNR = 3
    DEBUG = False
    
    class ApspuClass:
        #
        # Class that contains all parts on a UniBoard2
        #
        def __init__(self):
            self.status = False
            self.pols = []
            for pol in list(CTR_POLS.keys()):
                self.pols.append(PolClass(pol))
            self.dev_i2c_eeprom = I2C(EEPROM)
            self.dev_i2c_eeprom.bus_nr = I2CBUSNR
            self.fans = FanmonitorClass()
    
        def write_eeprom(self, data="APSPU", address=0):
            #
            # Write the EEPROM with the serial number etc.
            #
            ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(0)
            if ret_ack < 1:
                print("EEPROM not found during write")
                return False
            else:
                wr_data = bytearray(data.encode("utf-8", errors="ignore"))
                for loc, data_byte in enumerate(wr_data):
                    self.dev_i2c_eeprom.write_bytes(address+loc, data_byte)
                    sleep(0.1)
                return True
    
        def read_eeprom(self, address=0, nof_bytes=5):
            #
            # Read the EEPROM with the serial number etc.
            #
            ret_ack, ret_value = self.dev_i2c_eeprom.read_last_reg(1)
            if ret_ack < 1:
                print("no EEPROM found during read")
                return False
            else:
                ret_ack, ret_value = self.dev_i2c_eeprom.read_bytes(address, nof_bytes)
                str_return = bytes.fromhex(ret_value[:nof_bytes*2]).decode('UTF-8')
                return str_return
    
        def wr_rd_eeprom(self, value="APSPU-1", address = 0):
            #
            # Write and Read the EEPROM to check functionality
            #
            if self.write_eeprom(value, address=0):
                ret_value = self.read_eeprom(address=0, nof_bytes=len(value))
                stri = "Wrote to EEPROM register 0x{2:x} : {0}, Read from EEPROM: {1}".format(value, ret_value, address)
                print(stri)
            return True
    
        def read_all(self):
            #
            # Function to read all monitoring points of the UniBoard
            #
            print("--------- \nRead APSPU Status\n---------")
            for pol in self.pols:
                pol.read_all()
            self.fans.read_all()
            return True
    
        def print_status(self):
            #
            # Function to dump monitoring information on the screen
            #
            for pol in self.pols:
                pol.print_status()
            self.fans.print_status()
            return True
    
        def set_pols(self):
            print("--------- \nProgram Pols\n---------")
            for pol in self.pols:
                pol.set_vout_pol(VOUT_POLS[pol.name])
                pol.set_vout_ov_limit_pol(1.2*VOUT_POLS[pol.name])
                pol.set_on_off_config()
                if DEBUG:
                    pol.read_vout_set()
                    pol.read_ov_limit()
                    pol.read_uv_limit()
                pol.write_to_nvm()
            print("Done")
        
        def check_apspu(self):
            print("--------- \nCheck APSPU \n---------")
            check_ok = True
            for pol in self.pols:
                check_ok = check_ok & pol.check_pol()
            check_ok = check_ok & self.fans.check_fans()
            if check_ok:
                print("APSPU OK")
            else:
                print(">>> APSPU NOT OK <<<")
        
        def apspu_on_off(self, on):
            for pol in self.pols:
                pol.on_off(on)
            return True
            
    
    
    class PolClass:
        #
        # Class to read all monitoring points Point of Load DC/DC converter
        #
        def __init__(self, name):
            #
            # All monitoring points Point of Load DC/DC converter
            #
            self.name = name
            self.vout = 0
            self.vin = 0
            self.iout = 0
            self.temp = 0
            self.pol_dev = I2C(CTR_POLS[self.name])
            self.pol_dev.bus_nr = I2CBUSNR
            ret_ack, ret_value = self.pol_dev.read_bytes(1)
            if ret_ack < 1:
                stri = " Device {0} at address 0x{1:X} not found".format(self.name, CTR_POLS[self.name])
                print(stri)
                self.status = False
            else:
                self.status = True
    
    
        def read_vout_set(self):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT_COMMAND, 2)
                if len(raw_value)<4:
                    raw_value = '0' + raw_value
                ret_value = []
                ret_value= int(raw_value[2:], 16) * 2**8  
                ret_value += int(raw_value[:2], 16)
                output_value = ret_value * 2**-11
                if DEBUG:
                    print(f"Output set to: = {output_value:5.2f} V using hex value {raw_value}")
                return output_value
            else:
                return 999
    
        def set_on_off_config(self):
            #
            # Function to set the output of the DC/DC converter
            #
            if self.status:
               if DEBUG:
                   ret_ack, raw_value = self.pol_dev.read_bytes(LP_ON_OFF_CONFIG, 1)
                   print(f"Current setting ON/OFF register 0x{int(raw_value, 16):02x}")
               on_off_bit   = not 1 << 0
               polarity_pin = 1 << 1
               use_external = 0 << 2
               use_soft     = 1 << 3
               default_off  = 1 << 4
               wr_data = on_off_bit + polarity_pin + use_external + use_soft + default_off
               ret_ack = self.pol_dev.write_bytes(LP_ON_OFF_CONFIG, wr_data)
            return True
    
        def on_off(self, on=True):
               wr_data = (on << 7) + 0
               ret_ack = self.pol_dev.write_bytes(LP_OPERATION, wr_data)
    
        def set_vout_pol(self, value):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                set_value = int(value * (2**11))
                hex_set_value = hex(set_value)
                wr_value = (hex_set_value[4:6] + hex_set_value[2:4])
                if DEBUG:
                    print(f"Calculated wr_value is {wr_value}")
                wr_data =[]
                wr_data.append(int(hex_set_value[4:6], 16))
                wr_data.append(int(hex_set_value[2:4], 16))
                ret_ack = self.pol_dev.write_bytes(LP_VOUT_COMMAND, wr_data)
            return True
    
        def set_vout_ov_limit_pol(self, value):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                set_value = int(value * (2**11))
                hex_set_value = hex(set_value)
                wr_value = (hex_set_value[4:6] + hex_set_value[2:4])
                if DEBUG:
                    print(f"Calculated wr_value is {wr_value}")
                wr_data =[]
                wr_data.append(int(hex_set_value[4:6], 16))
                wr_data.append(int(hex_set_value[2:4], 16))
                ret_ack = self.pol_dev.write_bytes(LP_VOUT_OV_LIMIT, wr_data)
            return True
        
        def write_to_nvm(self):
            ret_ack = self.pol_dev.write_bytes(LP_STORE_USER_ALL, 0)
            return True
    
        def read_vin(self):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_VIN, 2)
                if len(raw_value)<4:
                    raw_value = '0' + raw_value
                ret_value = []
                ret_value.append(int(raw_value[:2], 16))
                ret_value.append(int(raw_value[2:], 16)) # * 2**8  
                output_value = calc_lin_2bytes(ret_value)    #ret_value * 2**-11
                self.vin = output_value
    
        def read_vout(self):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, vout_mod = self.pol_dev.read_bytes(LP_VOUT_MODE, 1)
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT, 2) 
                vout_mod = int(vout_mod, 16)
                ret_value = []
                ret_value.append(int(raw_value[:2], 16))
                try:
                    ret_value.append(int(raw_value[2:], 16))
                except:
                   ret_value.append(0) 
                self.vout = calc_lin_3bytes(ret_value, [vout_mod])
            else:
                self.vout = 999
    
        def read_ov_limit(self):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT_OV_LIMIT, 2)
                if len(raw_value)<4:
                    raw_value = '0' + raw_value
                ret_value = []
                ret_value= int(raw_value[2:], 16) * 2**8  
                ret_value += int(raw_value[:2], 16)
                output_value = ret_value * 2**-11
                print(f"Output OV limit is set to: = {output_value:5.2f} V using hex value {raw_value}")
    
        def read_uv_limit(self):
            #
            # Function to read the output voltage of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_VOUT_UV_LIMIT, 2)
                if len(raw_value)<4:
                    raw_value = '0' + raw_value
                ret_value = []
                ret_value= int(raw_value[2:], 16) * 2**8  
                ret_value += int(raw_value[:2], 16)
                output_value = ret_value * 2**-11
                print(f"Output UV limit is set to: = {output_value:5.2f} V using hex value {raw_value}")
                return output_value
            else:
                return 9999
    
        def read_iout(self):
            #
            # Function to read the output current of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, raw_value = self.pol_dev.read_bytes(0x39,2)
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_IOUT, 2)
                ret_value = []
                ret_value.append(int(raw_value[:2], 16))
                ret_value.append(int(raw_value[2:], 16))
                self.iout = calc_lin_2bytes(ret_value)
            else:
                self.iout = 999
    
        def read_temp(self):
            #
            # Function to read the temperature of the Point of Load DC/DC converter
            #
            if self.status:
                ret_ack, raw_value = self.pol_dev.read_bytes(LP_temp, 2)
                ret_value = []
                ret_value.append(int(raw_value[:2], 16))
                ret_value.append(int(raw_value[2:], 16))
                self.temp = calc_lin_2bytes(ret_value)
            else:
                self.temp = 999
    
        def read_all(self):
            #
            # Function to read all monitoring points of the Point of Load DC/DC converter
            #
            self.read_vout()
            self.read_iout()
            self.read_temp()
            self.read_vin()
    
        def check_pol(self):
            self.read_all()
            self.on_off(on=True)
            check_ok = False
            expected_vout = VOUT_POLS[self.name]
            if 0.9*expected_vout < self.vout < 1.1*expected_vout:
                check_ok = True
            else:
                check_ok = False
                print(f"Vout not OK, expected {expected_vout} V, measured {self.vout} V")
            vin_low = 45
            vin_high = 50
            if vin_low < self.vin < vin_high:
                check_ok = True
            else:
                check_ok = False
                print(f"Vin not OK, expected {vin_low} V, measured {self.vout} V < {vin_high} V")
            temp_low = 20
            temp_high = 50
            if (temp_low < self.temp < temp_high) & check_ok:
                check_ok = True
            else:
                check_ok = False
                print(f"TEMP not OK, expected {temp_low} C < measured {self.temp} C < {temp_high} C")
            return check_ok
                
    
        def print_status(self):
            #
            # Function to dump all monitoring points of the Point of Load DC/DC converter on the screen
            #
            if self.status:
                stri = f"POL: {self.name:10} :"
                stri += "Vin :{0: >5.2f} V ".format(self.vin)
                stri += "Vout :{0: >5.2f} V ".format(self.vout)
                stri += "Iout :{0: >5.2f} A ".format(self.iout)
                stri += "Temp :{0: >5.2f} \N{DEGREE SIGN}C".format(self.temp)
                print(stri)
                self.read_vout_set()
    
    class FanmonitorClass:
        #
        # Class to read all monitoring points Point of Load DC/DC converter
        #
        def __init__(self):
            #
            # All monitoring points Point of Load DC/DC converter
            #
            self.rpm = []
            self.fanmonitor_dev = I2C(MAX6620)
            self.fanmonitor_dev.bus_nr = I2CBUSNR
            ret_ack, ret_value = self.fanmonitor_dev.read_bytes(1)
            if ret_ack < 1:
                stri = " Device {0} at address 0x{1:X} not found".format("MAX6620", MAX6620)
                print(stri)
                self.status = False
            else:
                if DEBUG:
                    stri = "Device {0} at address 0x{1:X} is found ".format("MAX6620", MAX6620)
                    print(stri)
                self.set_active()
                self.status = True
    
        def set_active(self):
            #
            # Function to activate monitoring
            #
            ret_ack, reg_before = self.fanmonitor_dev.read_bytes(REG_GLOBAL, 1)
            self.fanmonitor_dev.write_bytes(REG_GLOBAL, RUN_MONITOR)
            ret_ack, reg_after = self.fanmonitor_dev.read_bytes(REG_GLOBAL, 1)
            if DEBUG:
                stri = "Reg at address 0x{0} before : {1} and after write action {2}".format(REG_GLOBAL, reg_before, reg_after)
                print(stri)
            fan_config_reg = int((math.log(TACH_PERIODS) / math.log(2))) << 5
            for fan_cnt in range(NOF_APS_FANS):
                self.fanmonitor_dev.write_bytes(0x02 + fan_cnt, 0x88)
                self.fanmonitor_dev.write_bytes(0x06 + fan_cnt, fan_config_reg)
    
        def read_fan(self, fan_nr):
            #
            # Function to a single fan
            #
            ret_ack, tach_msb = self.fanmonitor_dev.read_bytes(REG_TACH_MSP_REGS[fan_nr], 1)
            tach_msb = int(tach_msb, 16) & 0xFF
            if tach_msb > 254:
                if DEBUG :
                    tach_lsb = 255
                    tach = 99999
                rpm = 0
            else:
                ret_ack, tach_lsb = self.fanmonitor_dev.read_bytes(REG_TACH_LSP_REGS[fan_nr], 1)
                tach_lsb = int(tach_lsb, 16) & 0xE0
                tach = tach_msb*16 + tach_lsb/8
                rpm = float((TACH_COUNT_FREQ*TACH_PERIODS*60))/(FAN_TACHS*tach)
            if DEBUG:
                stri = "MSP: {0}, LSB: {1}, TACH : {2}".format(tach_msb, tach_lsb, tach)
                print(stri)
            return rpm
        
        def read_all(self):
            #
            # Function to read all fan's
            #
            self.rpm=[]
            for fan_counter in range(NOF_APS_FANS):
                self.rpm.append(self.read_fan(fan_counter))
                
        def check_fans(self):
            self.read_all()
            check_ok = True
            for cnt, speed in enumerate(self.rpm):
                if speed < 10:
                    print(f"Low speed on fan {cnt}")
                    check_ok = False
            return check_ok
    
        def print_status(self):
            #
            # Function to dump all monitoring points of the Point of Load DC/DC converter on the screen
            #
            if self.status:
                stri = "Fan speeds of "
                for fan_cnt in range(len(self.rpm)):
                    stri += "FAN_" + str(fan_cnt+1)
                    stri += " :{0: <5.2f} RPM ".format(self.rpm[fan_cnt])
                print(stri)
    
    
    def main():
        #
        # Function to test the class, read all info and dump on the screen
        #
        apspu = ApspuClass()
        apspu.apspu_on_off(False)
        sleep(5)
        apspu.set_pols()
        apspu.apspu_on_off(True)
        sleep(10)
        apspu.read_all()
        apspu.print_status()
        apspu.check_apspu()
        apspu.apspu_on_off(False)
        sleep(10)
        apspu.read_all()
        apspu.print_status()
        apspu.apspu_on_off(True)
    
    
    if __name__ == "__main__":
        main()