Skip to content
Snippets Groups Projects
Commit 6a539712 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

UNB2 switch added

parent f2b3f508
No related branches found
No related tags found
1 merge request!11Pypcc2
......@@ -3,7 +3,7 @@ description: "UNB2 DTS first draft"
drivers:
- name: I2C1
type: UNB2_switch
type: i2c_switch2
devreg: [APSCT_SWITCH.MASK,UB2_SWITCH1.MASK,UB2_SWITCH2.MASK]
parameters: [1,0,0,0] #I2C port number, 3x switch reset pins
- name: switch_UNB2
......@@ -11,27 +11,27 @@ drivers:
parent: I2C1
parameters: [0,1]
- name: switch_PS
type: i2c_array
type: i2c_array2
parent: I2C1
parameters: [0,1, 4]
parameters: [0,1, 4,4]
- name: switch_FP
type: i2c_array
type: i2c_array2
parent: I2C1
parameters: [0,1, 5]
parameters: [0,1, 5,5]
- name: switch_QSFP
type: i2c_array #An array of similar devices connected to an I2C switch
type: i2c_array2 #An array of similar devices connected to an I2C switch
parent: I2C1
parameters: [0,1, 0,3, 0,3, 6,7] #Parameters: APSCT_Switch, main switch, 2nd switch, 2nd switch
- name: switch_DDR4
type: i2c_array
type: i2c_array2
parent: I2C1
parameters: [0,1, 0,3, 4,4]
- name: switch_FPGA_PS
type: i2c_array
type: i2c_array2
parent: I2C1
parameters: [0,1, 0,3, 5,5]
- name: GPIO
type: GPIO
type: gpio
#This is the I2C devices in the RCU
......
import numpy as np
from .hwdev import hwdev;
import logging
class gpio(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
logging.info("gpio todo")
......@@ -17,7 +17,7 @@ class i2c(hwdev):
self.I2Ccounter=0
def i2csetget(self,addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
# logging.debug(str(("I2C",addr,reg,data,read)))
bus=None;
try:
if read==3:
......
#from . import Vars
import numpy as np
import logging
from .spibitbang1 import *
#import threading
import time
#from multiprocessing import Process
#from pcctypes import *
from .spibitbang1 import spibitbang1
from queuetypes import *
from .hwdev import hwdev;
from .hwdev import hwdev
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
......@@ -142,8 +137,11 @@ class i2c_array(hwdev):
Data=OPCUAset(varid,InstType.varSet,data,mask)
return [Data]
def OPCUAcallMethod(self,var1,data,mask):
print("Call Method",var1)
# def OPCUAcallMethod(self,var1,data,mask):
# print("Call Method",var1)
def i2csetget(self,*args,**kwargs):
self.conf['parentcls'].i2csetget(*args,**kwargs)
def SetSwitch(self,RCUi):
self.conf['parentcls'].SetChannel(1<<self.RCU_Switch1[RCUi]);
......
import logging
from .i2c_array import i2c_array;
from .hwdev import hwdev
class i2c_array2(i2c_array):
def __init__(self,config):
hwdev.__init__(self,config);
# self.Qout=Qout;
# self.Qin=Qin;
# self.I2Ccallback=I2Ccallback
# self.SWcallback=Switchcallback
# self.previousHBA=np.zeros([number,3,32],dtype='int')
# Make array of switch states
pars=config['parameters'];
self.Nswitch=(len(pars)+1)//2;
sw1=list(range(pars[0],pars[1]+1));
sw2=([] if len(pars)<4 else list(range(pars[2],pars[3]+1)))
sw3=([] if len(pars)<6 else list(range(pars[4],pars[5]+1)))
sw4=([] if len(pars)<8 else list(range(pars[6],pars[7]+1)))
sw3=sw3+sw4
if self.Nswitch>3: self.Nswitch=3;
self.RCU_Switch1=range(pars[0],pars[1]+1);
self.N=len(sw1)
if len(sw2)>0: self.N*=len(sw2)
if len(sw3)>0: self.N*=len(sw3)
self.sw1=[x for x in sw1 for i in range(self.N//len(sw1))]
self.sw2=[x for x in sw2 for i in range(self.N//len(sw1)//len(sw2))]*len(sw1)
self.sw3=[x for x in sw3]*len(sw1)*len(sw2)
# self.devregs,RCU_storeReg=DevRegList(yaml)
logging.debug(str(("Init",config['name'],' len=',self.N,'Nswitch=',self.Nswitch,self.sw1,self.sw2,self.sw3)))
# self.previous =np.zeros([self.N,RCU_storeReg],dtype='int')
def SetSwitch(self,RCUi):
#print("Set switch element",RCUi,'=',self.sw1[RCUi],self.sw2[RCUi],(self.sw3[RCUi] if len(self.sw3)<0 else 'x'))
self.conf['parentcls'].SetSW1(self.sw1[RCUi]);
self.conf['parentcls'].SetSW2(self.sw2[RCUi]);
if len(self.sw3)>0:
self.conf['parentcls'].SetSW3(self.sw3[RCUi]);
def SetSwitchMask(self,mask):
logging.warn("Not implemented!!")
# m=0;
# for RCUi in range(self.N):
# if mask[RCUi]: m|=1<<self.RCU_Switch1[RCUi];
# self.conf['parentcls'].SetChannel(m);
#3 switches of UNB2
import logging
from .i2c import i2c
class i2c_switch2(i2c):
def __init__(self,config):
i2c.__init__(self,config)
self.SWaddr1=config['devreg'][0]['addr']
self.SWaddr2=config['devreg'][1]['addr']
self.SWaddr3=config['devreg'][2]['addr']
self.channel1=0
self.channel2=0
self.channel3=0
logging.info("i2c switch2 at address %i,%i,%i" % (self.SWaddr1,self.SWaddr2,self.SWaddr3))
def SetSW1(self,channelbit):
channel=1<<(channelbit)
if (channel)==self.channel1: return True;
logging.debug("SetChannel1=%i" % channelbit)
self.channel1=channel
self.channel2=0
self.channel3=0
return self.i2csetget(self.SWaddr1,[channel])
def SetSW2(self,channelbit):
channel=1<<(channelbit)
if (channel)==self.channel2: return True;
logging.debug("SetChannel2=%i" % channelbit)
self.channel2=channel
self.channel3=0
return self.i2csetget(self.SWaddr2,[channel])
def SetSW3(self,channelbit):
channel=1<<(channelbit)
if (channel)==self.channel3: return True;
logging.debug("SetChannel3=%i" % channelbit)
self.channel3=channel
return self.i2csetget(self.SWaddr3,[channel])
......@@ -20,8 +20,8 @@ class i2client():
# os.close(self.Output)
# self.Input.close()
def readvar(self,id1):
Data=OPCUAset(id1,InstType.varRead,[],[])
def readvar(self,id1,mask=[]):
Data=OPCUAset(id1,InstType.varRead,[],mask)
self.Qout.put(Data);
def setvar(self,id1,data=[],mask=[]):
......
......@@ -11,7 +11,7 @@ import sys
import signal
from yamlconfig import Find;
logging.basicConfig(level="INFO",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
RunTimer=True;
#def signal_handler(sig, frame):
......
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
import time
import sys
import signal
from yamlconfig import Find;
logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
RunTimer=True;
#def signal_handler(sig, frame):
# logging.warn('Stop signal received!')
# global RunTimer;
# RunTimer=False
#signal.signal(signal.SIGINT, signal_handler)
logging.info("Start I2C processes")
threads=[]
I2Cclients=[]
name='UNB2'
RCU_I2C=i2client.i2client(name=name)
thread1=i2cthread.start(*RCU_I2C.GetInfo())
threads.append(thread1)
I2Cclients.append(RCU_I2C)
#Load yaml so that we know the variable names
RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
var1=RCU_conf.getvarid('UNB2_DC_DC_48V_12V_VIN');
N=2;
mask=[i<2 for i in range(N)];
print("mask=",mask);
RCU_I2C.readvar(var1,mask);
var1=RCU_conf.getvarid('UNB2_FPGA_QSFP_CAGE_LOS');
N=48;
mask=[i<8 for i in range(N)];
print("mask=",mask);
RCU_I2C.readvar(var1,mask);
#var1=RCU_conf.getmethodid('RCU_on');
#N=32;
#mask=[i<2 for i in range(N)];
#RCU_I2C.callmethod(var1,mask)
time.sleep(2);
while RCU_I2C.data_waiting():
varid,data,mask=RCU_I2C.readdata()
print("Results:",RCU_conf.getvar1(varid)['name'],data)
logging.info("Stop threads")
for i2c in I2Cclients:
i2c.stop()
for thread1 in threads:
thread1.join()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment