Skip to content
Snippets Groups Projects
Commit 3edacee0 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

i2c debug1

parent 6fe7c6ff
No related branches found
No related tags found
No related merge requests found
from . import Vars
import numpy as np
import logging
from .spibitbang2 import *
import threading
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def strs2bytes(var):
# print("str2bytes",var)
if len(var)==0: return var;
if isinstance(var[0],str): #make string a byte array
# return [ord(c.encode('utf-8')[0]) for s in var for c in s]
return [c.encode('utf-8')[0] for s in var for c in s]
return var
def bytes2strs(var,step,dtype):
if not(dtype==Vars.datatype.dstring): return var
cnt=int(len(var)/step)
print(var)
return [(bytearray(var[i*step:(i+1)*step]).decode("utf-8")) for i in range(cnt)]
class RCU1():
def __init__(self,number,I2Ccallback,Switchcallback):
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous=np.zeros([number,Vars.RCU_storeReg],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
self.SetVar(Inst1)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Var,Vars.RCU_att,12,[0,1,2,3,4,5,6,7,8,9,11])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_off,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_on,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.ADC1_on,0,[])
#RCU.SetVar(Inst1)
#print(Vars.RCU)
def SetVar(self,Instr,Mask=[]):
# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes
if (self.N==1): Mask=[True]
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i,Mask=Mask)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value
mask=0;
RCU0=-1;
for RCUi in range(self.N):
if (Mask[RCUi]):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
if RCU0<0: RCU0=RCUi;
if RCU0<0: return; #Mask all zero
self.SWcallback(mask)
if Instr.type==Vars.DevType.I2C:
logging.info(str(('** Set I2C:',Instr.dev,Instr.value)))
self.SetI2C(RCU0,Instr.dev,8,0,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.SPIbb:
logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value)))
SetSPIbb(self.SetI2C,RCU0,Instr.dev,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.I2Cbb:
logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value)))
# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value))
return;
V1=Instr.dev
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.size*self.N):
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.get_data_value().Value.Value=Instr.value
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if (self.N>1):
Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask)
Mask=Mask.OPCW.get_data_value().Value.Value
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_data_value().Value.Value)
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if (Instr.type==Vars.DevType.Var) or ((self.N==1) and (Instr.type==Vars.DevType.VarUpdate)):
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
if not(Mask[RCUi*Step+Vari]): continue
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[i0:i1])
value2=value1[i0:i1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
elif Instr.type==Vars.DevType.VarUpdate:
self.GetVarValueAll(V1,value1)
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
def GetVarValue(self,RCUi,var,value):
logging.info(str(("RCU1 Get ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
GetSPIbb(self.SetI2C,self.GetI2C,RCUi,var.devreg,value)
# self.GetSPIBB(RCUi,var.devreg,var.width,var.bitoffset,value)
# logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def GetVarValueAll(self,V1,value1):
mask=0;
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if V1.Vars[0].type==Vars.DevType.I2C:
for Vari in range(Step):
DevReg=V1.Vars[Vari].devreg
self.SWcallback(mask)
self.SetI2CAddr(self,DevReg)
if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
var=V1.Vars[Vari]
# print(Step,Step2,i0,i1,value2,len(value1))
self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# print(value2)
if (var.Scale!=0): value2[0]*=var.Scale;
value1[i0:i1]=value2
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
else:
logging.error("Type not implemented")
# print(value1)
def GetBBValueAll(self,V1,value1,mask):
def SetBit(RCUi,dev,width,bitoffset,value,buffer=False):
if not(buffer): self.SWcallback(mask)
self.SetI2C(RCUi,dev,width,bitoffset,value,buffer=buffer)
def GetBit(RCUixx,dev,width,bitoffset,buffer=False):
value=[0 for RCUi in range(self.N)]
value2=[0]
if buffer:
for RCUi in range(self.N):
self.GetI2Cbuffer(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value
self.SWcallback(mask)
self.SetI2CAddr(self,dev)
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
#for i in range(len(value2)): value2[i]*=0;
value2[0]=0
self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value;
devs=[V1.Vars[Vari].devreg for Vari in range(V1.nVars)]
GetSPIbb2(SetBit,GetBit,0,devs,value1)
def SetVarValue(self,RCUi,var,value):
if var.devreg.Register_W==-1: return True; #We can not set it, only read it e.g. temperature
logging.debug(str(("RCU1 Set ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def SetI2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
if dev.store>0:
previous=self.previous[RCUi,dev.store-1];
value[0]=ApplyMask(value[0],width,bitoffset,previous);
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(('masked to',value)))
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1=int(np.floor((width+bitoffset+7)/8))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True
def GetI2C(self,RCUi,dev,width,bitoffset,value):
# if dev.store>0:
# value[0]=self.previous[RCUi,dev.store-1]
# return True
l1=int(np.floor((width+bitoffset+7)/8))
# print(width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if reg>255: #This is for the monitor ADC
if not(self.I2Ccallback(dev.Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
else:
if not(self.I2Ccallback(dev.Addr,value2,reg=reg,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
else: value[0]=value2[0]
return True;
def GetI2Cbit(self,RCUi,dev,pin):
value2=[value]
self.I2CGet(RCUi,dev,1,1<<pin,value2)
return value2[0]
def SetI2CAddr(self,RCUi,dev):
return self.I2Ccallback(dev.Addr,int2bytes(dev.Register_R),read=2)
def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
# if width<8:
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
# value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
#if (len(value)>1) and (width<8): print value
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
item = Q1.get()
if item is None: break;
self.SetVar(item)
logging.info("End RCU thread")
RCUthread1 = threading.Thread(target=RCUthread, args=(Q1,))
RCUthread1.start()
return RCUthread1
def Queue_Monitor(self,Q1):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,32,[0]*32)
# Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
return
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
dim2=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.size
dim3=int(v.size/v.nVars)
#print(v.name,dim)
varvalue2=0
if v.type==Vars.datatype.dInt: varvalue2=dim2*[0]
elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0]
elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3]
print(len(varvalue2),varvalue2)
if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]:
var1=AddVarR(v.name+"_R",varvalue2,v)
# print(len(varvalue1),len(varvalue2),v.size,dim2)
v.OPCR=var1
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim2,varvalue2)
Q1.put(Inst)
if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]:
var1=AddVarW(v.name+"_RW",varvalue2,v,Q1)
v.OPCW=var1
def AddMethod(self,Q1,Addmethod):
for v in Vars.OPC_methods:
Inst1=Vars.Instr(Vars.DevType.Instr,v,0,[])
Addmethod(v.name,Inst1,Q1)
#from collections import namedtuple
#from enum import Enum
from pcctypes import *
#Mid plane address
RCU_MPaddr=MPaddr(1,[1],1,[7])
CLK_IO3_OUT1=DevReg(0x20,0,2,1)
CLK_IO3_OUT2=DevReg(0x20,1,3,2)
CLK_IO3_CONF1=DevReg(0x20,6,6,3)
CLK_IO3_CONF2=DevReg(0x20,7,7,4)
RCU_storeReg=4; #Number of stored registers
SPIBB_PLL=BBdev(4,[CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT2],[4,7,5,6],0) #CLK,SDI,SDO,CS
CLK_PLL_lock =DevReg(SPIBB_PLL,0X00,0X00,0) # PLL locked status
from .HWconf import *
CLKmod=I2Cmodules.CLK
CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,6,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,7,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,4,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_lock1 = VarArray("CLK_Lock" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_lock,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
OPC_devvars=[CLK_IGNORE_PPS,CLK_Enable_PWR,CLK_Stat1,CLK_lock1]
RCU_init=Instrs("ReadRegisters",2,[
Instr(DevType.VarUpdate,CLK_IGNORE_PPS,1,[0]),
Instr(DevType.VarUpdate,CLK_Enable_PWR,1,[0])
])
CLK_on=Instrs("CLK_on",2,[
Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x42]),
])
CLK_off=Instrs("CLK_off",2,[
Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x40]),
])
def SPIinst(reg,value):
return Instr(DevType.SPIbb,DevReg(SPIBB_PLL,0,reg,0),1,[value]),
CLK_PLL_setup=Instrs("CLK_PLL_setup",2,[
SPIinst(0x03,0x08),
SPIinst(0x05,0x97),
SPIinst(0x06,0x10),
SPIinst(0x07,0x04),
SPIinst(0x08,0x01),
SPIinst(0x07,0x00),
SPIinst(0x09,0x10),
SPIinst(0x0A,0x14),
SPIinst(0x09,0x00),
SPIinst(0x0D,0x01),#was 2
SPIinst(0x0F,0x01),
SPIinst(0x11,0x01),
SPIinst(0x13,0x01)
])
OPC_methods=[CLK_on,CLK_off,CLK_PLL_setup]
from enum import Enum
import logging
import numpy as np
class SPIBB_pins(Enum):
CLK = 0
SDI = 1
SDO = 2
CS = 3
def SetSPIbb(SetI2C,RCUi,dev,value):
ADC_address=dev.Register_W
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value]
SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value]
SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb set",ADC_address,value)))
# dev_rw = 0x00 # 0 for write, 1 for read
# data2 = ( reg_address << 9 ) + ( dev_rw << 8 ) + value[0]
data2 = ( ADC_address << 9 ) + value[0]
bit_array = "{0:{fill}16b}".format(data2, fill='0')
# print(bit_array)
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
for bit in bit_array:
SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
# SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished
return True;
def GetSPIbb(SetI2C,GetI2C,RCUi,dev,value):
ADC_reg_address=dev.Register_R
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value]
SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value]
SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb get",ADC_reg_address)))
ADC_bytes = 0x00
# ADC_rw = 0x01 # 0 for write, 1 for read
data = (ADC_reg_address << 1) + 1 #was 7??
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
bit_array = "{0:{fill}8b}".format(data, fill='0')
for bit in bit_array:
SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
# SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished
# print("read byte")
a=[0]
N=1 #len(value)
ret_value=[0]
for i in range(N): value[i]=0
for cnt in range(8*(ADC_bytes+1)):
GetI2C(RCUi,SDOdev,1,SDOpin,ret_value)
for i in range(N): value[i]=(value[i]<<1)+ ret_value[i]
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0]) #read after falling edge
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
return True;
if os.sys.platform is 'linux':
import pylibi2c;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-1',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0
return False;
def I2C2server(addr,data,reg=None,read=0):
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C2",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-2',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0
return False;
#import pylibi2c;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
global I2Ccounter;
logging.debug(str(("I2C",addr,reg,data,read)))
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
data[0]=0
if reg: I2Ccounter+=1;
I2Ccounter+=len(data)
# logging.debug(str(("I2C",addr,reg,data,read)))
return True
def I2C2server(addr,data,reg=None,read=0):
global I2Ccounter;
logging.debug(str(("I2C2",addr,reg,data,read)))
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
data[0]=0
if reg: I2Ccounter+=1;
I2Ccounter+=len(data)
# logging.debug(str(("I2C",addr,reg,data,read)))
return True
import numpy as np
import logging
#import HWconf
SWaddr=0x70
class I2Cswitch1():
def __init__(self,callback1):
self.callback1=callback1;
self.CurrentChannel=0;
def SetChannel(self,channel):
if (channel)==self.CurrentChannel: return True;
logging.debug(str(("SetChannel",channel,self.CurrentChannel)))
self.CurrentChannel=channel
return self.callback1(SWaddr,[channel])
def I2Ccallback(self,RCU,addr,data,reg=None,read=0):
self.callback1(addr,data,reg,read)
class I2Cswitch0():
def __init__(self,callback1):
return
def SetChannel(self,channel):
return True
def I2Ccallback(self,RCU,addr,data,reg=None,read=0):
return
\ No newline at end of file
from I2C import *
import time
RCU=1
Ver="RCU2H v0.2"
R1=0
ROM=0x50
#Set switch
print("Set switch")
if not(I2C1server(0x70,[1<<RCU],reg=None,read=0)): exit() #select RCU
#exit()
#Get ID
print("Get ID")
ID=[0]*4
if not(I2C1server(ROM,ID,reg=0xFC,read=1)): exit() #select RCU
print(ID)
exit()
#Upload version
Ver2=[(c.encode('utf-8')[0]) for c in Ver]
print(len(Ver),Ver,Ver2)
V2=[0]
for i,v in enumerate(Ver2):
time.sleep(0.1)
I2C1server(ROM,[v],reg=R1+i,read=0) #select RCU
# I2C1server(ROM,[v],reg=None,read=0) #select RCU
time.sleep(0.1)
I2C1server(ROM,V2,reg=R1+i,read=1) #select RCU
print(i,v,V2)
#Download version
Ver2=[0]*10
I2C1server(ROM,Ver2,reg=R1,read=1) #select RCU
print(Ver2)
#from collections import namedtuple
#from enum import Enum
from pcctypes import *
#Mid plane address
#MPaddr=namedtuple("MPaddr","nI2C I2C nSwitch Switch");
RCU_MPaddr=MPaddr(1,[1],32,[0,1,2,3,4,5,6,7,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5,5])
#CLK_MPaddr=MPaddr(1,[1],1,[7])
#DevReg=namedtuple("DevReg","Addr Register_R Register_W store");
#I2C:Addr=I2C addr (int)
#BBdev: Addr=BBdev (pointer)
#Control board switch
#dev: TCA9548
SW1_ch=DevReg(0x70,0,0,0)
#I2C IO-Expanders
#Device: TCA9539
RCU_IO1_OUT1=DevReg(0x75,0,2,1)
RCU_IO1_OUT2=DevReg(0x75,1,3,2)
RCU_IO1_CONF1=DevReg(0x75,6,6,3)
RCU_IO1_CONF2=DevReg(0x75,7,7,4)
#Device: TCA9539
RCU_IO2_OUT1=DevReg(0x76,0,2,5)
RCU_IO2_OUT2=DevReg(0x76,1,3,6)
RCU_IO2_CONF1=DevReg(0x76,6,6,7)
RCU_IO2_CONF2=DevReg(0x76,7,7,8)
#Device: TCA6416
RCU_IO3_OUT1=DevReg(0x20,0,2,9)
RCU_IO3_OUT2=DevReg(0x20,1,3,10)
RCU_IO3_CONF1=DevReg(0x20,6,6,11)
RCU_IO3_CONF2=DevReg(0x20,7,7,12)
RCU_storeReg=13; #Number of stored registers
#I2C monitor ADC
RCU_AN_Ch0=DevReg(0x14,0xB080,-1,0)
RCU_AN_Ch1=DevReg(0x14,0xB880,-1,0)
RCU_AN_Ch2=DevReg(0x14,0xB180,-1,0)
#etc
RCU_AN_Temp=DevReg(0x14,0xA0C0,-1,0)
#HBA1
RCU_HBA1=DevReg(0x41,0,0,1)
RCU_HBA2=DevReg(0x42,0,0,2)
RCU_HBA3=DevReg(0x43,0,0,3)
RCU_ROM_ID =DevReg(0x50,0xfc,0xfc,0) #32 bit ID=4 bytes
RCU_ROM_Ver=DevReg(0x50,0,0,0) #String
#Bitbang devices
#BBdev=namedtuple("BBdev","nPins devs pins addr")
I2CBB_dth3=BBdev(3,[RCU_IO1_OUT1,RCU_IO2_OUT2,RCU_IO2_CONF2],[6,3,3],0x70); #SCL,SDIO,SDIOdir
I2CBB_dth2=BBdev(3,[RCU_IO1_OUT2,RCU_IO2_OUT1,RCU_IO1_CONF1],[7,7,7],0x70);
I2CBB_dth1=BBdev(3,[RCU_IO1_OUT2,RCU_IO2_OUT1,RCU_IO1_CONF1],[7,7,7],0x70);
SPIBB_ADC1=BBdev(4,[RCU_IO3_OUT1,RCU_IO3_OUT1,RCU_IO3_CONF1,RCU_IO3_OUT2],[1,0,0,0],0) #CLK,SDIO,SDIOdir,CS
SPIBB_ADC2=BBdev(4,[RCU_IO3_OUT1,RCU_IO3_OUT1,RCU_IO3_CONF1,RCU_IO3_OUT2],[3,2,2,1],0) #CLK,SDIO,SDIOdir,CS
SPIBB_ADC3=BBdev(4,[RCU_IO3_OUT1,RCU_IO3_OUT1,RCU_IO3_CONF1,RCU_IO3_OUT2],[5,4,4,2],0) #CLK,SDIO,SDIOdir,CS
#SPI ADC
#Dev: AD9683
RCU_ADC1_PLL_stat =DevReg(SPIBB_ADC1,0X0A,0X0A,0) # PLL locked status
RCU_ADC1_JESD_ctr =DevReg(SPIBB_ADC1,0X5F,0X5F,0) #JESD link control, ILAS mode
RCU_ADC1_CML_level=DevReg(SPIBB_ADC1,0X15,0X15,0) #CML output adjust
RCU_ADC1_SYNC_ctr =DevReg(SPIBB_ADC1,0X3A,0X3A,0) #SYNC / SYSREF control
RCU_ADC1_update =DevReg(SPIBB_ADC1,0XFF,0xFF,0) # Global device update
RCU_ADC2_PLL_stat =DevReg(SPIBB_ADC2,0X0A,0X0A,0) # PLL locked status
RCU_ADC2_JESD_ctr =DevReg(SPIBB_ADC2,0X5F,0X5F,0) #JESD link control, ILAS mode
RCU_ADC2_CML_level=DevReg(SPIBB_ADC2,0X15,0X15,0) #CML output adjust
RCU_ADC2_SYNC_ctr =DevReg(SPIBB_ADC2,0X3A,0X3A,0) #SYNC / SYSREF control
RCU_ADC2_update =DevReg(SPIBB_ADC2,0XFF,0xFF,0) # Global device update
RCU_ADC3_PLL_stat =DevReg(SPIBB_ADC3,0X0A,0X0A,0) # PLL locked status
RCU_ADC3_JESD_ctr =DevReg(SPIBB_ADC3,0X5F,0X5F,0) #JESD link control, ILAS mode
RCU_ADC3_CML_level=DevReg(SPIBB_ADC3,0X15,0X15,0) #CML output adjust
RCU_ADC3_SYNC_ctr =DevReg(SPIBB_ADC3,0X3A,0X3A,0) #SYNC / SYSREF control
RCU_ADC3_update =DevReg(SPIBB_ADC3,0XFF,0xFF,0) # Global device update
#I2C_dither
#Dev: SI4010
RCU_Dth1_Freq =DevReg(I2CBB_dth1,0x1140,0x1141,0) ##TBC
RCU_Dth1_Prop =DevReg(I2CBB_dth1,0x11 ,0x11,0)
RCU_Dth1_Start=DevReg(I2CBB_dth1,0x62 ,0x62,0)
RCU_Dth1_Stop =DevReg(I2CBB_dth1,0x67 ,0x67,0)
RCU_Dth2_Freq =DevReg(I2CBB_dth2,0x1140,0x1141,0) ##TBC
RCU_Dth2_Prop =DevReg(I2CBB_dth2,0x11 ,0x11,0)
RCU_Dth2_Start=DevReg(I2CBB_dth2,0x62 ,0x62,0)
RCU_Dth2_Stop =DevReg(I2CBB_dth2,0x67 ,0x67,0)
RCU_Dth3_Freq =DevReg(I2CBB_dth3,0x1140,0x1141,0) ##TBC
RCU_Dth3_Prop =DevReg(I2CBB_dth3,0x11 ,0x11,0)
RCU_Dth3_Start=DevReg(I2CBB_dth3,0x62 ,0x62,0)
RCU_Dth3_Stop =DevReg(I2CBB_dth3,0x67 ,0x67,0)
#class DevType(Enum):
# Var = 0
# I2C = 1
# SPIbb= 2
# I2Cbb= 3
# Instr =4
# VarUpdate = 5
# Internal = 6
from . import Vars
import numpy as np
import logging
from .spibitbang1 import *
import threading
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def strs2bytes(var):
# print("str2bytes",var)
if len(var)==0: return var;
if isinstance(var[0],str): #make string a byte array
# return [ord(c.encode('utf-8')[0]) for s in var for c in s]
return [c.encode('utf-8')[0] for s in var for c in s]
return var
def bytes2strs(var,step,dtype):
if not(dtype==Vars.datatype.dstring): return var
cnt=int(len(var)/step)
print(var)
return [(bytearray(var[i*step:(i+1)*step]).decode("utf-8")) for i in range(cnt)]
class RCU1():
def __init__(self,number,I2Ccallback,Switchcallback):
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous=np.zeros([number,Vars.RCU_storeReg],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
self.SetVar(Inst1)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Var,Vars.RCU_att,12,[0,1,2,3,4,5,6,7,8,9,11])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_off,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_on,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.ADC1_on,0,[])
#RCU.SetVar(Inst1)
#print(Vars.RCU)
def SetVar(self,Instr,Mask=[]):
# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i,Mask=Mask)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_data_value().Value.Value
mask=0;
RCU0=-1;
for RCUi in range(self.N):
if (Mask[RCUi]):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
if RCU0<0: RCU0=RCUi;
if RCU0<0: return; #Mask all zero
self.SWcallback(mask)
if Instr.type==Vars.DevType.I2C:
logging.info(str(('** Set I2C:',Instr.dev,Instr.value)))
self.SetI2C(RCU0,Instr.dev,8,0,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.SPIbb:
logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value)))
SetSPIbb(self.SetI2C,RCU0,Instr.dev,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.I2Cbb:
logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value)))
# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value))
return;
V1=Instr.dev
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.size*self.N):
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.get_data_value().Value.Value=Instr.value
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask)
Mask=Mask.OPCW.get_data_value().Value.Value
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_data_value().Value.Value)
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if Instr.type==Vars.DevType.Var:
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
if not(Mask[RCUi*Step+Vari]): continue
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[i0:i1])
value2=value1[i0:i1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
elif Instr.type==Vars.DevType.VarUpdate:
self.GetVarValueAll(V1,value1)
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
def GetVarValue(self,RCUi,var,value):
logging.info(str(("RCU1 Get ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def GetVarValueAll(self,V1,value1):
mask=0;
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if V1.Vars[0].type==Vars.DevType.I2C:
for Vari in range(Step):
DevReg=V1.Vars[Vari].devreg
self.SWcallback(mask)
self.SetI2CAddr(self,DevReg)
if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
var=V1.Vars[Vari]
# print(Step,Step2,i0,i1,value2,len(value1))
self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# print(value2)
if (var.Scale!=0): value2[0]*=var.Scale;
value1[i0:i1]=value2
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
else:
logging.error("Type not implemented")
# print(value1)
def GetBBValueAll(self,V1,value1,mask):
def SetBit(RCUi,dev,width,bitoffset,value,buffer=False):
if not(buffer): self.SWcallback(mask)
self.SetI2C(RCUi,dev,width,bitoffset,value,buffer=buffer)
def GetBit(RCUixx,dev,width,bitoffset,buffer=False):
value=[0 for RCUi in range(self.N)]
value2=[0]
if buffer:
for RCUi in range(self.N):
self.GetI2Cbuffer(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value
self.SWcallback(mask)
self.SetI2CAddr(self,dev)
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
#for i in range(len(value2)): value2[i]*=0;
value2[0]=0
self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value;
devs=[V1.Vars[Vari].devreg for Vari in range(V1.nVars)]
GetSPIbb2(SetBit,GetBit,0,devs,value1)
def SetVarValue(self,RCUi,var,value):
if var.devreg.Register_W==-1: return True; #We can not set it, only read it e.g. temperature
logging.debug(str(("RCU1 Set ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def SetI2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
if dev.store>0:
previous=self.previous[RCUi,dev.store-1];
value[0]=ApplyMask(value[0],width,bitoffset,previous);
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(('masked to',value)))
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1=int(np.floor((width+bitoffset+7)/8))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True
def GetI2C(self,RCUi,dev,width,bitoffset,value):
# if dev.store>0:
# value[0]=self.previous[RCUi,dev.store-1]
# return True
l1=int(np.floor((width+bitoffset+7)/8))
# print(width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if reg>255: #This is for the monitor ADC
if not(self.I2Ccallback(dev.Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
else:
if not(self.I2Ccallback(dev.Addr,value2,reg=reg,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
else: value[0]=value2[0]
return True;
def GetI2Cbit(self,RCUi,dev,pin):
value2=[value]
self.I2CGet(RCUi,dev,1,1<<pin,value2)
return value2[0]
def SetI2CAddr(self,RCUi,dev):
return self.I2Ccallback(dev.Addr,int2bytes(dev.Register_R),read=2)
def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
# if width<8:
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
# value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
#if (len(value)>1) and (width<8): print value
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
item = Q1.get()
if item is None: break;
self.SetVar(item)
logging.info("End RCU thread")
RCUthread1 = threading.Thread(target=RCUthread, args=(Q1,))
RCUthread1.start()
return RCUthread1
def Queue_Monitor(self,Q1):
Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,32,[0]*32)
Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
dim2=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.size
dim3=int(v.size/v.nVars)
#print(v.name,dim)
varvalue2=0
if v.type==Vars.datatype.dInt: varvalue2=dim2*[0]
elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0]
elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3]
print(len(varvalue2),varvalue2)
if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]:
var1=AddVarR(v.name+"_R",varvalue2,v)
# print(len(varvalue1),len(varvalue2),v.size,dim2)
v.OPCR=var1
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim2,varvalue2)
Q1.put(Inst)
if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]:
var1=AddVarW(v.name+"_RW",varvalue2,v,Q1)
v.OPCW=var1
def AddMethod(self,Q1,Addmethod):
for v in Vars.OPC_methods:
Inst1=Vars.Instr(Vars.DevType.Instr,v,0,[])
Addmethod(v.name,Inst1,Q1)
#from collections import namedtuple
from recordclass import recordclass
#from enum import Enum
from .HWconf import *
#OPCUA variables
RCUmod=I2Cmodules.RCU
#Var2dev=namedtuple("Var2dev","name MPaddr type devreg width bitoffset Scale")
#VarArray=recordclass("VarArray","name nVars Vars RW type OPCR OPCW") #OPCR and OPCW linked at runtime
RCU_att1= Var2dev("RCU_att1" ,RCUmod,DevType.I2C,RCU_IO1_OUT1,5 ,0,1)
RCU_att2= Var2dev("RCU_att2" ,RCUmod,DevType.I2C,RCU_IO1_OUT2,5 ,0,1)
RCU_att3= Var2dev("RCU_att3" ,RCUmod,DevType.I2C,RCU_IO2_OUT1,5 ,0,1)
RCU_band1=Var2dev("RCU_band1",RCUmod,DevType.I2C,RCU_IO2_OUT2,2 ,0,1)
RCU_band2=Var2dev("RCU_band2",RCUmod,DevType.I2C,RCU_IO2_OUT2,2 ,2,1)
RCU_band3=Var2dev("RCU_band3",RCUmod,DevType.I2C,RCU_IO2_OUT2,2 ,4,1)
RCU_led0= Var2dev("RCU_led0" ,RCUmod,DevType.I2C,RCU_IO2_OUT2,2 ,6,1)
RCU_pwrd1=Var2dev("RCU_pwrd1",RCUmod,DevType.I2C,RCU_IO2_OUT1,1 ,6,1)
RCU_temp1=Var2dev("RCU_temp1",RCUmod,DevType.I2C,RCU_AN_Temp ,23,0,4.21e-3)
dummy=Var2dev("Dummy",RCUmod,DevType.Internal,None,8,0,1)
Ant_mask=VarArray("Ant_mask" ,3,[dummy,dummy,dummy] ,RW.WriteOnly,datatype.dInt,3,None,None)
RCU_att =VarArray("RCU_attenuator" ,3,[RCU_att1 ,RCU_att2 ,RCU_att3] ,RW.ReadWrite,datatype.dInt,3,None,None)
RCU_band=VarArray("RCU_band" ,3,[RCU_band1,RCU_band2,RCU_band3],RW.ReadWrite,datatype.dInt,3,None,None)
RCU_mask=VarArray("RCU_mask" ,1,[dummy] ,RW.WriteOnly,datatype.dInt ,1,None,None)
RCU_temp=VarArray("RCU_temperature",1,[RCU_temp1],RW.ReadOnly ,datatype.dfloat,1,None,None)
RCU_pwrd=VarArray("RCU_Pwr_dig" ,1,[RCU_pwrd1],RW.ReadOnly ,datatype.dInt ,1,None,None)
RCU_LED =VarArray("RCU_LED0" ,1,[RCU_led0] ,RW.ReadWrite,datatype.dInt ,1,None,None)
RCU_Dth3_freq=Var2dev("RCU_dth1_freq",RCUmod,DevType.I2Cbb,RCU_Dth3_Freq,32,0,1e-6)
RCU_Dth2_freq=Var2dev("RCU_dth1_freq",RCUmod,DevType.I2Cbb,RCU_Dth2_Freq,32,0,1e-6)
RCU_Dth1_freq=Var2dev("RCU_dth1_freq",RCUmod,DevType.I2Cbb,RCU_Dth1_Freq,32,0,1e-6)
RCU_dth_freq=VarArray("RCU_dither_freq",3,[RCU_Dth1_freq,RCU_Dth2_freq,RCU_Dth3_freq],RW.ReadWrite,datatype.dfloat,3,None,None)
HBA1_Delay=Var2dev("",RCUmod,DevType.HBA1,RCU_HBA1,5,2,1)
HBA1_Pwr =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA1,1,1,1)
HBA2_Delay=Var2dev("",RCUmod,DevType.HBA1,RCU_HBA2,5,2,1)
HBA2_Pwr =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA2,1,1,1)
HBA3_Delay=Var2dev("",RCUmod,DevType.HBA1,RCU_HBA3,5,2,1)
HBA3_Pwr =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA3,1,1,1)
HBA1_Delay=VarArray("HBA_element_beamformer_delays",3,[HBA1_Delay,HBA2_Delay,HBA3_Delay],RW.ReadWrite,datatype.dInt,96,None,None)
HBA1_Pwr =VarArray("HBA_element_pwr" ,3,[HBA1_Pwr ,HBA2_Pwr ,HBA3_Pwr ],RW.ReadWrite,datatype.dInt,96,None,None)
#RCU_ID0=Var2dev("",RCUmod,DevType.I2C,RCU_ROM,8,0,1)
#RCU_ID=VarArray("RCU_ID",1,[RCU_ID0],RW.ReadOnly,datatype.dInt,4,None,None)
RCU_ID0=Var2dev("",RCUmod,DevType.I2C,RCU_ROM_ID,32,0,1)
RCU_ID=VarArray("RCU_ID",1,[RCU_ID0],RW.ReadOnly,datatype.dInt,1,None,None)
RCU_Ver0=Var2dev("",RCUmod,DevType.I2C,RCU_ROM_Ver,7,0,1)
RCU_VER=VarArray("RCU_version",1,[RCU_Ver0],RW.ReadOnly,datatype.dstring,10,None,None)
RCU_ADC1_lock=Var2dev("RCU_ADC1_lock",RCUmod,DevType.SPIbb,RCU_ADC1_PLL_stat,8,0,1)
RCU_ADC2_lock=Var2dev("RCU_ADC2_lock",RCUmod,DevType.SPIbb,RCU_ADC2_PLL_stat,8,0,1)
RCU_ADC3_lock=Var2dev("RCU_ADC3_lock",RCUmod,DevType.SPIbb,RCU_ADC3_PLL_stat,8,0,1)
RCU_ADC_lock=VarArray("RCU_ADC_lock",3,[RCU_ADC1_lock,RCU_ADC2_lock,RCU_ADC3_lock],RW.ReadOnly,datatype.dInt,3,None,None)
RCU_ADC1_SYNC=Var2dev("RCU_ADC1_SYNC",RCUmod,DevType.SPIbb,RCU_ADC1_SYNC_ctr,8,0,1)
RCU_ADC2_SYNC=Var2dev("RCU_ADC2_SYNC",RCUmod,DevType.SPIbb,RCU_ADC2_SYNC_ctr,8,0,1)
RCU_ADC3_SYNC=Var2dev("RCU_ADC3_SYNC",RCUmod,DevType.SPIbb,RCU_ADC3_SYNC_ctr,8,0,1)
RCU_ADC_SYNC=VarArray("RCU_ADC_SYNC",3,[RCU_ADC1_SYNC,RCU_ADC2_SYNC,RCU_ADC3_SYNC],RW.ReadOnly,datatype.dInt,3,None,None)
RCU_ADC1_JESD=Var2dev("RCU_ADC1_SYNC",RCUmod,DevType.SPIbb,RCU_ADC1_JESD_ctr,8,0,1)
RCU_ADC2_JESD=Var2dev("RCU_ADC2_SYNC",RCUmod,DevType.SPIbb,RCU_ADC2_JESD_ctr,8,0,1)
RCU_ADC3_JESD=Var2dev("RCU_ADC3_SYNC",RCUmod,DevType.SPIbb,RCU_ADC3_JESD_ctr,8,0,1)
RCU_ADC_JESD=VarArray("RCU_ADC_JESD",3,[RCU_ADC1_JESD,RCU_ADC2_JESD,RCU_ADC3_JESD],RW.ReadOnly,datatype.dInt,3,None,None)
RCU_ADC1_CML=Var2dev("RCU_ADC1_SYNC",RCUmod,DevType.SPIbb,RCU_ADC1_CML_level,8,0,1)
RCU_ADC2_CML=Var2dev("RCU_ADC2_SYNC",RCUmod,DevType.SPIbb,RCU_ADC2_CML_level,8,0,1)
RCU_ADC3_CML=Var2dev("RCU_ADC3_SYNC",RCUmod,DevType.SPIbb,RCU_ADC3_CML_level,8,0,1)
RCU_ADC_CML=VarArray("RCU_ADC_CML",3,[RCU_ADC1_CML,RCU_ADC2_CML,RCU_ADC3_CML],RW.ReadOnly,datatype.dInt,3,None,None)
RCU_IO1_1= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO1_OUT1,8,0,1)
RCU_IO1_2= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO1_OUT2,8,0,1)
#RCU_IO1_3= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO1_CONF1,8,0,1)
#RCU_IO1_4= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO1_CONF2,8,0,1)
#RCU_IO1=VarArray("RCU_IO1",3,[RCU_IO1_1,RCU_IO1_2,RCU_IO1_3],RW.ReadOnly,datatype.dInt,None,None)
RCU_IO2_1= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO2_OUT1,8,0,1)
RCU_IO2_2= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO2_OUT2,8,0,1)
#RCU_IO2_3= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO2_CONF1,8,0,1)
#RCU_IO2_4= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO2_CONF2,8,0,1)
#RCU_IO2=VarArray("RCU_IO2",3,[RCU_IO2_1,RCU_IO2_2,RCU_IO2_3],RW.ReadOnly,datatype.dInt,None,None)
RCU_IO3_1= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO3_OUT1,8,0,1)
RCU_IO3_2= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO3_OUT2,8,0,1)
#RCU_IO3_3= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO3_CONF1,8,0,1)
#RCU_IO3_4= Var2dev("" ,RCUmod,DevType.I2C,RCU_IO3_CONF2,8,0,1)
#RCU_IO3=VarArray("RCU_IO3",3,[RCU_IO3_1,RCU_IO3_2,RCU_IO3_3],RW.ReadOnly,datatype.dInt,None,None)
RCU_OUT1=VarArray("RCU_OUT1",3,[RCU_IO1_1,RCU_IO2_1,RCU_IO3_1],RW.ReadOnly,datatype.dInt,3,None,None)
RCU_OUT2=VarArray("RCU_OUT2",3,[RCU_IO1_2,RCU_IO2_2,RCU_IO3_2],RW.ReadOnly,datatype.dInt,3,None,None)
#RCU_CNF1=VarArray("RCU_CONF1",3,[RCU_IO1_3,RCU_IO2_3,RCU_IO3_3],RW.ReadOnly,datatype.dInt,None,None)
#RCU_CNF2=VarArray("RCU_CONF2",3,[RCU_IO1_4,RCU_IO2_4,RCU_IO3_4],RW.ReadOnly,datatype.dInt,None,None)
OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ADC_lock,RCU_ADC_SYNC,RCU_ADC_JESD,RCU_ADC_CML,RCU_OUT1,RCU_OUT2,RCU_ID,RCU_VER,HBA1_Delay,HBA1_Pwr]#,RCU_CNF1,RCU_CNF2]
#Instr=namedtuple("DevInstr","type dev nvalue value")
#Instrs=namedtuple("Instr","name ninstr instr")
#OPCUA methods
RCU_init=Instrs("ReadRegisters",2,[
Instr(DevType.VarUpdate,RCU_OUT1,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_OUT2,3,[0,0,0])
])
ADC_on=Instrs("ADC_on",16,[
Instr(DevType.SPIbb,RCU_ADC1_JESD_ctr,1,[0x14]),
Instr(DevType.SPIbb,RCU_ADC1_CML_level,1,[0x7]),
Instr(DevType.SPIbb,RCU_ADC1_SYNC_ctr,1,[1]),
Instr(DevType.SPIbb,RCU_ADC1_update,1,[1]),
Instr(DevType.SPIbb,RCU_ADC2_JESD_ctr,1,[0x14]),
Instr(DevType.SPIbb,RCU_ADC2_CML_level,1,[0x7]),
Instr(DevType.SPIbb,RCU_ADC2_SYNC_ctr,1,[1]),
Instr(DevType.SPIbb,RCU_ADC2_update,1,[1]),
Instr(DevType.SPIbb,RCU_ADC3_JESD_ctr,1,[0x14]),
Instr(DevType.SPIbb,RCU_ADC3_CML_level,1,[0x7]),
Instr(DevType.SPIbb,RCU_ADC3_SYNC_ctr,1,[1]),
Instr(DevType.SPIbb,RCU_ADC3_update,1,[1]),
Instr(DevType.VarUpdate,RCU_ADC_SYNC,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_JESD,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_CML,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_lock,3,[0,0,0])
])
RCU_on=Instrs("RCU_on",12,[
Instr(DevType.I2C,RCU_IO2_CONF1,1,[0]),
Instr(DevType.I2C,RCU_IO2_CONF2,1,[0]),
Instr(DevType.I2C,RCU_IO3_CONF1,1,[0]),
Instr(DevType.I2C,RCU_IO3_CONF2,1,[0]),
Instr(DevType.I2C,RCU_IO1_CONF1,1,[0]),
Instr(DevType.I2C,RCU_IO1_CONF2,1,[0]),
Instr(DevType.I2C,RCU_IO2_OUT1,1,[0x4A]),
Instr(DevType.I2C,RCU_IO2_OUT2,1,[0x55]),
Instr(DevType.I2C,RCU_IO3_OUT1,1,[0x15]),
Instr(DevType.I2C,RCU_IO3_OUT2,1,[0x47]),
Instr(DevType.I2C,RCU_IO1_OUT1,1,[0xCA]),
Instr(DevType.I2C,RCU_IO1_OUT2,1,[0xCA]),
# Instr(DevType.VarUpdate,RCU_att,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_pwrd,1,[0]),
Instr(DevType.VarUpdate,RCU_OUT1,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_OUT2,3,[0,0,0]),
# Instr(DevType.Instr,ADC1_on,0,[]),
# Instr(DevType.Instr,ADC2_on,0,[]),
# Instr(DevType.Instr,ADC3_on,0,[])
# Instr(DevType.VarUpdate,RCU_ADC_lock,3,[0,0,0])
])
RCU_off=Instrs("RCU_off",1,[
# Instr(DevType.Var,RCU_mask,4,[1,1,1,1]),
Instr(DevType.Var,RCU_pwrd,32,[0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0]),
# Instr(DevType.Var,RCU_mask,4,[0,0,0,0])
])
RCU_update=Instrs("RCU_update",11,[
Instr(DevType.VarUpdate,RCU_pwrd,1,[0]),
Instr(DevType.VarUpdate,RCU_OUT1,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_OUT2,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ID,1,[0]),
# Instr(DevType.VarUpdate,RCU_VER,1,[0]*10),
Instr(DevType.VarUpdate,RCU_att,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_band,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_LED,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_SYNC,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_JESD,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_CML,3,[0,0,0]),
Instr(DevType.VarUpdate,RCU_ADC_lock,3,[0,0,0])
])
OPC_methods=[RCU_on,RCU_off,ADC_on,RCU_update]
from enum import Enum
import logging
import numpy as np
class SPIBB_pins(Enum):
CLK = 0
SDIO = 1
SDIOdir = 2
CS = 3
def SetSPIbb(SetI2C,RCUi,dev,value):
ADC_address=dev.Register_W
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDIO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDIO.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb set",ADC_address,value)))
ADC_bytes = 0x00
ADC_rw = 0x00 # 0 for write, 1 for read
data2 = ( ADC_rw << 23 ) + ( ADC_bytes << 21 ) + ( ADC_address << 8 ) + value[0]
bit_array = "{0:{fill}24b}".format(data2, fill='0')
# print(bit_array)
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
for bit in bit_array:
SetI2C(RCUi,SDOdev,1,SDOpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
SetI2C(RCUi,SDOdev,1,SDOpin,[1]) #high when finished
return True;
def GetSPIbb(SetI2C,GetI2C,RCUi,dev,value):
ADC_reg_address=dev.Register_R
CSdev=dev.Addr.devs[Vars.SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[Vars.SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[Vars.SPIBB_pins.SDIO.value]
SDOpin=dev.Addr.pins[Vars.SPIBB_pins.SDIO.value]
CLKdev=dev.Addr.devs[Vars.SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[Vars.SPIBB_pins.CLK.value]
SDIOdirdev=dev.Addr.devs[Vars.SPIBB_pins.SDIOdir.value]
SDIOdirpin=dev.Addr.pins[Vars.SPIBB_pins.SDIOdir.value]
logging.info(str(("SPIbb get",ADC_reg_address)))
ADC_bytes = 0x00
ADC_rw = 0x01 # 0 for write, 1 for read
data = ( ADC_rw << 15) + ( ADC_bytes << 13 ) + ADC_reg_address
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
bit_array = "{0:{fill}16b}".format(data, fill='0')
for bit in bit_array:
SetI2C(RCUi,SDOdev,1,SDOpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
# print("read byte")
SetI2C(RCUi,SDIOdirdev,1,SDIOdirpin,[1]) #input
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
a=[0]
N=len(value)
for i in range(N): value[i]=0
for cnt in range(8*(ADC_bytes+1)):
ret_value=GetI2C(RCUi,SDOdev,1,SDOpin) #enable
for i in range(N): value[i]=(value[i]<<1)+ ret_value[i]
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0]) #read after falling edge
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
SetI2C(RCUi,SDIOdirdev,1,SDIOdirpin,[0]) #output
return True;
def GetSPIbb2(SetI2C,GetI2C,RCUi,devs,value):
ADC_reg_address=devs[0].Register_R
def Setbit(pintype,value):
for dev in devs:
dev1=dev.Addr.devs[pintype.value]
pin1=dev.Addr.pins[pintype.value]
SetI2C(RCUi,dev1,1,pin1,[value],buffer=True)
SetI2C(RCUi,dev1,1,pin1,[value])
def Getbit(pintype):
retvalue=np.zeros_like(value)
step=len(devs)
dev1=devs[0].Addr.devs[pintype.value]
pin1=devs[0].Addr.pins[pintype.value]
retvalue[0::step]=GetI2C(RCUi,dev1,1,pin1)
for i,dev in enumerate(devs[1:]):
dev1=dev.Addr.devs[pintype.value]
pin1=dev.Addr.pins[pintype.value]
retvalue[i+1::step]=GetI2C(RCUi,dev1,1,pin1,buffer=True)
return retvalue
CS=SPIBB_pins.CS
SDIO=SPIBB_pins.SDIO
CLK=SPIBB_pins.CLK
SDIOdir=SPIBB_pins.SDIOdir
logging.debug(str(("SPIbb get",ADC_reg_address)))
ADC_bytes = 0x00
ADC_rw = 0x01 # 0 for write, 1 for read
data = ( ADC_rw << 15) + ( ADC_bytes << 13 ) + ADC_reg_address
Setbit(CS,0) #enable
bit_array = "{0:{fill}16b}".format(data, fill='0')
logging.debug(str(("SPI TX",bit_array)))
for bit in bit_array:
Setbit(SDIO,int(bit))
Setbit(CLK,1)
Setbit(CLK,0)
Setbit(CS,1) #disable
# print("read byte")
Setbit(SDIOdir,1) #input
Setbit(CS,0) #enable
a=[0]
N=len(value)
for i in range(N): value[i]=0
for cnt in range(8*(ADC_bytes+1)):
ret_value=Getbit(SDIO)
for i in range(N): value[i]=(value[i]<<1)+ ret_value[i]
Setbit(CLK,1)
Setbit(CLK,0) #read after falling edge
Setbit(CS,1) #disable
Setbit(SDIO,1)#High when finished
Setbit(SDIOdir,0) #output
return True;
\ No newline at end of file
#include "i2c.h" #include "i2c.h"
#include <iostream> #include <iostream>
#include <linux/i2c-dev.h> #include <linux/i2c-dev.h>
#include <i2c/smbus.h> #include <linux/i2c.h>
#include <stdlib.h> #include <stdlib.h>
#include <fcntl.h> #include <fcntl.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <linux/types.h> #include <linux/types.h>
/* //https://www.pololu.com/docs/0J73/15.8
__s32 i2c_smbus_write_block_data(int file, __u8 command, __u8 length, bool i2c_smbus_write_block_data(int file,uint8_t adress,uint8_t len,uint8_t* buff){
const __u8 *values) struct i2c_msg message = {adress,0,len,buff};
{ struct i2c_rdwr_ioctl_data ioctl_data = {&message,1};
union i2c_smbus_data data; ioctl(file,I2C_RDWR,&ioctl_data);
int i;
if (length > I2C_SMBUS_BLOCK_MAX)
length = I2C_SMBUS_BLOCK_MAX;
for (i = 1; i <= length; i++)
data.block[i] = values[i-1];
data.block[0] = length;
return i2c_smbus_access(file, I2C_SMBUS_WRITE, command, I2C_SMBUS_BLOCK_DATA, &data);
} }
__s32 i2c_smbus_write_i2c_block_data(int file, __u8 command, __u8 length,
const __u8 *values)
{
union i2c_smbus_data data;
int i;
if (length > I2C_SMBUS_BLOCK_MAX)
length = I2C_SMBUS_BLOCK_MAX;
for (i = 1; i <= length; i++)
data.block[i] = values[i-1];
data.block[0] = length;
return i2c_smbus_access(file, I2C_SMBUS_WRITE, command,
I2C_SMBUS_I2C_BLOCK_DATA, &data);
}
__s32 i2c_smbus_read_i2c_block_data(int file, __u8 command, __u8 length,
__u8 *values)
{
union i2c_smbus_data data;
int i, err;
data.block[0] = length;
err = i2c_smbus_access(file, I2C_SMBUS_READ, command, I2C_SMBUS_I2C_BLOCK_DATA, &data);
if (err < 0)
return false;
for (i = 1; i <= data.block[0]; i++)
values[i-1] = data.block[i];
return (data.block[0]==length);
}
*/
c_i2c::c_i2c(const t_driver config1) : drvbase (config1){ c_i2c::c_i2c(const t_driver config1) : drvbase (config1){
std::cout << config.name <<": i2c server, connecting to device " << config.parameters[0] << "\n"; std::cout << config.name <<": i2c server, connecting to device " << config.parameters[0] << "\n";
char filename[20]; char filename[20];
...@@ -63,8 +28,8 @@ c_i2c::c_i2c(const t_driver config1) : drvbase (config1){ ...@@ -63,8 +28,8 @@ c_i2c::c_i2c(const t_driver config1) : drvbase (config1){
bool c_i2c::I2Csend_reg(int addr,int reg,int len,t_buffer* data){ bool c_i2c::I2Csend_reg(int addr,int reg,int len,t_buffer* data){
std::cout << config.name <<": i2c send to addr="<<addr<<" reg="<<reg<<" len="<<len<<" value="<<int(data[0]) <<"\n"; std::cout << config.name <<": i2c send to addr="<<addr<<" reg="<<reg<<" len="<<len<<" value="<<int(data[0]) <<"\n";
if (ioctl(file, I2C_SLAVE, addr)<0) return false; if (ioctl(file, I2C_SLAVE, addr)<0) return false;
return true; // return true;
// return (i2c_smbus_write_block_data(file,reg,len,data)>=0); return (i2c_smbus_write_block_data(file,reg,len,data)>=0);
} }
bool c_i2c::I2Cget_reg(int addr,int reg,int len,t_buffer* data){ bool c_i2c::I2Cget_reg(int addr,int reg,int len,t_buffer* data){
......
// Generated by Cap'n Proto compiler, DO NOT EDIT
// source: pypcc.capnp
#ifndef CAPNP_INCLUDED_9f2c99de8c7edd7f_
#define CAPNP_INCLUDED_9f2c99de8c7edd7f_
#include <capnp/generated-header-support.h>
#if CAPNP_VERSION != 6001
#error "Version mismatch between generated code and library headers. You must use the same version of the Cap'n Proto compiler and library."
#endif
namespace capnp {
namespace schemas {
CAPNP_DECLARE_SCHEMA(b1dfe08305a4b99c);
enum class InstType_b1dfe08305a4b99c: uint16_t {
VAR_SET,
VAR_READ,
METHOD,
};
CAPNP_DECLARE_ENUM(InstType, b1dfe08305a4b99c);
CAPNP_DECLARE_SCHEMA(aaca83afea5f387e);
} // namespace schemas
} // namespace capnp
typedef ::capnp::schemas::InstType_b1dfe08305a4b99c InstType;
struct OPCUAset {
OPCUAset() = delete;
class Reader;
class Builder;
class Pipeline;
struct _capnpPrivate {
CAPNP_DECLARE_STRUCT_HEADER(aaca83afea5f387e, 1, 2)
#if !CAPNP_LITE
static constexpr ::capnp::_::RawBrandedSchema const* brand() { return &schema->defaultBrand; }
#endif // !CAPNP_LITE
};
};
// =======================================================================================
class OPCUAset::Reader {
public:
typedef OPCUAset Reads;
Reader() = default;
inline explicit Reader(::capnp::_::StructReader base): _reader(base) {}
inline ::capnp::MessageSize totalSize() const {
return _reader.totalSize().asPublic();
}
#if !CAPNP_LITE
inline ::kj::StringTree toString() const {
return ::capnp::_::structString(_reader, *_capnpPrivate::brand());
}
#endif // !CAPNP_LITE
inline ::uint8_t getId() const;
inline ::InstType getType() const;
inline bool hasData() const;
inline ::capnp::List< ::uint8_t>::Reader getData() const;
inline bool hasMask() const;
inline ::capnp::List<bool>::Reader getMask() const;
private:
::capnp::_::StructReader _reader;
template <typename, ::capnp::Kind>
friend struct ::capnp::ToDynamic_;
template <typename, ::capnp::Kind>
friend struct ::capnp::_::PointerHelpers;
template <typename, ::capnp::Kind>
friend struct ::capnp::List;
friend class ::capnp::MessageBuilder;
friend class ::capnp::Orphanage;
};
class OPCUAset::Builder {
public:
typedef OPCUAset Builds;
Builder() = delete; // Deleted to discourage incorrect usage.
// You can explicitly initialize to nullptr instead.
inline Builder(decltype(nullptr)) {}
inline explicit Builder(::capnp::_::StructBuilder base): _builder(base) {}
inline operator Reader() const { return Reader(_builder.asReader()); }
inline Reader asReader() const { return *this; }
inline ::capnp::MessageSize totalSize() const { return asReader().totalSize(); }
#if !CAPNP_LITE
inline ::kj::StringTree toString() const { return asReader().toString(); }
#endif // !CAPNP_LITE
inline ::uint8_t getId();
inline void setId( ::uint8_t value);
inline ::InstType getType();
inline void setType( ::InstType value);
inline bool hasData();
inline ::capnp::List< ::uint8_t>::Builder getData();
inline void setData( ::capnp::List< ::uint8_t>::Reader value);
inline void setData(::kj::ArrayPtr<const ::uint8_t> value);
inline ::capnp::List< ::uint8_t>::Builder initData(unsigned int size);
inline void adoptData(::capnp::Orphan< ::capnp::List< ::uint8_t>>&& value);
inline ::capnp::Orphan< ::capnp::List< ::uint8_t>> disownData();
inline bool hasMask();
inline ::capnp::List<bool>::Builder getMask();
inline void setMask( ::capnp::List<bool>::Reader value);
inline void setMask(::kj::ArrayPtr<const bool> value);
inline ::capnp::List<bool>::Builder initMask(unsigned int size);
inline void adoptMask(::capnp::Orphan< ::capnp::List<bool>>&& value);
inline ::capnp::Orphan< ::capnp::List<bool>> disownMask();
private:
::capnp::_::StructBuilder _builder;
template <typename, ::capnp::Kind>
friend struct ::capnp::ToDynamic_;
friend class ::capnp::Orphanage;
template <typename, ::capnp::Kind>
friend struct ::capnp::_::PointerHelpers;
};
#if !CAPNP_LITE
class OPCUAset::Pipeline {
public:
typedef OPCUAset Pipelines;
inline Pipeline(decltype(nullptr)): _typeless(nullptr) {}
inline explicit Pipeline(::capnp::AnyPointer::Pipeline&& typeless)
: _typeless(kj::mv(typeless)) {}
private:
::capnp::AnyPointer::Pipeline _typeless;
friend class ::capnp::PipelineHook;
template <typename, ::capnp::Kind>
friend struct ::capnp::ToDynamic_;
};
#endif // !CAPNP_LITE
// =======================================================================================
inline ::uint8_t OPCUAset::Reader::getId() const {
return _reader.getDataField< ::uint8_t>(
::capnp::bounded<0>() * ::capnp::ELEMENTS);
}
inline ::uint8_t OPCUAset::Builder::getId() {
return _builder.getDataField< ::uint8_t>(
::capnp::bounded<0>() * ::capnp::ELEMENTS);
}
inline void OPCUAset::Builder::setId( ::uint8_t value) {
_builder.setDataField< ::uint8_t>(
::capnp::bounded<0>() * ::capnp::ELEMENTS, value);
}
inline ::InstType OPCUAset::Reader::getType() const {
return _reader.getDataField< ::InstType>(
::capnp::bounded<1>() * ::capnp::ELEMENTS);
}
inline ::InstType OPCUAset::Builder::getType() {
return _builder.getDataField< ::InstType>(
::capnp::bounded<1>() * ::capnp::ELEMENTS);
}
inline void OPCUAset::Builder::setType( ::InstType value) {
_builder.setDataField< ::InstType>(
::capnp::bounded<1>() * ::capnp::ELEMENTS, value);
}
inline bool OPCUAset::Reader::hasData() const {
return !_reader.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS).isNull();
}
inline bool OPCUAset::Builder::hasData() {
return !_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS).isNull();
}
inline ::capnp::List< ::uint8_t>::Reader OPCUAset::Reader::getData() const {
return ::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::get(_reader.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS));
}
inline ::capnp::List< ::uint8_t>::Builder OPCUAset::Builder::getData() {
return ::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::get(_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS));
}
inline void OPCUAset::Builder::setData( ::capnp::List< ::uint8_t>::Reader value) {
::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::set(_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS), value);
}
inline void OPCUAset::Builder::setData(::kj::ArrayPtr<const ::uint8_t> value) {
::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::set(_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS), value);
}
inline ::capnp::List< ::uint8_t>::Builder OPCUAset::Builder::initData(unsigned int size) {
return ::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::init(_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS), size);
}
inline void OPCUAset::Builder::adoptData(
::capnp::Orphan< ::capnp::List< ::uint8_t>>&& value) {
::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::adopt(_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS), kj::mv(value));
}
inline ::capnp::Orphan< ::capnp::List< ::uint8_t>> OPCUAset::Builder::disownData() {
return ::capnp::_::PointerHelpers< ::capnp::List< ::uint8_t>>::disown(_builder.getPointerField(
::capnp::bounded<0>() * ::capnp::POINTERS));
}
inline bool OPCUAset::Reader::hasMask() const {
return !_reader.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS).isNull();
}
inline bool OPCUAset::Builder::hasMask() {
return !_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS).isNull();
}
inline ::capnp::List<bool>::Reader OPCUAset::Reader::getMask() const {
return ::capnp::_::PointerHelpers< ::capnp::List<bool>>::get(_reader.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS));
}
inline ::capnp::List<bool>::Builder OPCUAset::Builder::getMask() {
return ::capnp::_::PointerHelpers< ::capnp::List<bool>>::get(_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS));
}
inline void OPCUAset::Builder::setMask( ::capnp::List<bool>::Reader value) {
::capnp::_::PointerHelpers< ::capnp::List<bool>>::set(_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS), value);
}
inline void OPCUAset::Builder::setMask(::kj::ArrayPtr<const bool> value) {
::capnp::_::PointerHelpers< ::capnp::List<bool>>::set(_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS), value);
}
inline ::capnp::List<bool>::Builder OPCUAset::Builder::initMask(unsigned int size) {
return ::capnp::_::PointerHelpers< ::capnp::List<bool>>::init(_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS), size);
}
inline void OPCUAset::Builder::adoptMask(
::capnp::Orphan< ::capnp::List<bool>>&& value) {
::capnp::_::PointerHelpers< ::capnp::List<bool>>::adopt(_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS), kj::mv(value));
}
inline ::capnp::Orphan< ::capnp::List<bool>> OPCUAset::Builder::disownMask() {
return ::capnp::_::PointerHelpers< ::capnp::List<bool>>::disown(_builder.getPointerField(
::capnp::bounded<1>() * ::capnp::POINTERS));
}
#endif // CAPNP_INCLUDED_9f2c99de8c7edd7f_
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment