Skip to content
Snippets Groups Projects
Commit 529b39e7 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

CLK SPI bitbang added, not tested

parent 598642cd
No related branches found
No related tags found
1 merge request!10Pypcc2
from . import Vars
import numpy as np
import logging
#from .spibitbang1 import *
from .spibitbang2 import *
import threading
......@@ -30,17 +30,31 @@ def int2bytes(i):
i>>=8;
return [i]+b;
def strs2bytes(var):
# print("str2bytes",var)
if len(var)==0: return var;
if isinstance(var[0],str): #make string a byte array
# return [ord(c.encode('utf-8')[0]) for s in var for c in s]
return [c.encode('utf-8')[0] for s in var for c in s]
return var
def bytes2strs(var,step,dtype):
if not(dtype==Vars.datatype.dstring): return var
cnt=int(len(var)/step)
print(var)
return [(bytearray(var[i*step:(i+1)*step]).decode("utf-8")) for i in range(cnt)]
class RCU1():
def __init__(self,number,I2Ccallback,Switchcallback):
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous=np.zeros([number,Vars.RCU_storeReg],dtype='int')
self.previous =np.zeros([number,Vars.RCU_storeReg],dtype='int')
self.previousHBA=np.zeros([number,3,32],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
self.SetVar(Inst1)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
......@@ -61,59 +75,72 @@ class RCU1():
#print(Vars.RCU)
def SetVar(self,Instr):
def SetVar(self,Instr,Mask=[]):
# Instr.value=strs2bytes(Instr.value) #Alwast be an array of bytes
if (self.N==1): Mask=[True]
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i)
self.SetVar(i,Mask=Mask)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
if len(Mask)==0: Mask=Vars.RCU_mask.OPCW.get_value()
mask=0;
RCU0=-1;
mask=0
for RCUi in range(self.N):
if (Mask[RCUi]):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
if RCU0<0: RCU0=RCUi;
if RCU0<0: return; #Mask all zero
self.SWcallback(mask)
if Instr.type==Vars.DevType.I2C:
logging.info(str(('** Set I2C:',Instr.dev,Instr.value)))
self.SetI2C(RCU0,Instr.dev,8,0,Instr.value)
self.SetI2C(RCU0,Instr.dev,8,0,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.SPIbb:
logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value)))
SetSPIbb(self.SetI2C,RCU0,Instr.dev,Instr.value)
SetSPIbb(self.SetI2C,RCU0,Instr.dev,strs2bytes(Instr.value))
return;
elif Instr.type==Vars.DevType.I2Cbb:
logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value)))
# self.SetI2C(RCUi,Instr.dev,8,0,Instr.value)
# self.SetI2C(RCUi,Instr.dev,8,0,strs2bytes(Instr.value))
return;
V1=Instr.dev
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.nVars*self.N):
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.size*self.N):
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.get_data_value().Value.Value=Instr.value
Instr.dev.OPCW.set_value(Instr.value)
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step=V1.nVars
value1=Instr.value if V1.OPCR is None else V1.OPCR.get_data_value().Value.Value
Step2=int(V1.size/V1.nVars)
if (self.N>1):
Mask=(Vars.RCU_mask if Step==1 else Vars.Ant_mask)
Mask=Mask.OPCW.get_value()
value1=strs2bytes(Instr.value) if V1.OPCR is None else strs2bytes(V1.OPCR.get_value())
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if Instr.type==Vars.DevType.Var:
if (Instr.type==Vars.DevType.Var) or ((self.N==1) and (Instr.type==Vars.DevType.VarUpdate)):
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[RCUi*Step+Vari:RCUi*Step+Vari+1])
value2=value1[RCUi*Step+Vari:RCUi*Step+Vari+1]
if not(Mask[RCUi*Step+Vari]): continue
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[i0:i1])
value2=value1[i0:i1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[RCUi*Step+Vari:RCUi*Step+Vari+1]=value2
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=value1
value1[i0:i1]=value2
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
elif Instr.type==Vars.DevType.VarUpdate:
self.GetVarValueAll(V1,value1)
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=value1
# if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=bytes2strs(value1,Step2,V1.type)
if not(V1.OPCR is None): V1.OPCR.set_value(bytes2strs(value1,Step2,V1.type))
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
......@@ -123,10 +150,14 @@ class RCU1():
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
GetSPIbb(self.SetI2C,self.GetI2C,RCUi,var.devreg,value)
else:
logging.error("Not Implemented")
......@@ -135,6 +166,7 @@ class RCU1():
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
Step=V1.nVars
Step2=int(V1.size/V1.nVars)
if V1.Vars[0].type==Vars.DevType.I2C:
for Vari in range(Step):
DevReg=V1.Vars[Vari].devreg
......@@ -143,16 +175,37 @@ class RCU1():
if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
value2=value1[RCUi*Step+Vari:RCUi*Step+Vari+1]
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
var=V1.Vars[Vari]
# print(Step,Step2,i0,i1,value2,len(value1))
self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# print(value2)
if (var.Scale!=0): value2[0]*=var.Scale;
value1[RCUi*Step+Vari:RCUi*Step+Vari+1]=value2
value1[i0:i1]=value2
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
elif V1.Vars[0].type==Vars.DevType.HBA1:
for Vari in range(Step):
var=V1.Vars[Vari]
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
# print(Step,Step2,len(value1),V1.size)
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
self.GetI2CHBA1(RCUi,var.devreg,var.width,var.bitoffset,value2)
value1[i0:i1]=value2
# i0=(RCUi*Step+ Vari)*Step2+16
# i1=(RCUi*Step+(Vari+1))*Step2
# value2=value1[i0:i1]
# self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
# value1[i0:i1]=value2
else:
logging.error("Type not implemented")
# print(value1)
def GetBBValueAll(self,V1,value1,mask):
def SetBit(RCUi,dev,width,bitoffset,value,buffer=False):
if not(buffer): self.SWcallback(mask)
......@@ -169,6 +222,8 @@ class RCU1():
self.SetI2CAddr(self,dev)
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
#for i in range(len(value2)): value2[i]*=0;
value2[0]=0
self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value;
......@@ -182,6 +237,10 @@ class RCU1():
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.HBA1:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetHBA1I2C(RCUi,var.devreg,var.width,var.bitoffset,value)
# print("RCU1 Set",RCUi,var,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
......@@ -199,13 +258,28 @@ class RCU1():
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def SetHBA1I2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
# if dev.store>0:
previous=self.previousHBA[RCUi,dev.store-1];
L=len(value)
for i in range(L):
value[i]=ApplyMask(value[i],width,bitoffset,previous[i]);
self.previousHBA[RCUi,dev.store-1]=value
# if buffer: return True;
self.I2Ccallback(dev.Addr,value[:16],reg=dev.Register_W)
if L>16: self.I2Ccallback(dev.Addr,value[16:],reg=dev.Register_W+16)
self.I2Ccallback(dev.Addr,[500],reg=dev.Register_W,read=3) #Wait 500ms
return True
# return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1=int(np.floor((width+bitoffset+7)/8))
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True
def GetI2C(self,RCUi,dev,width,bitoffset,value):
......@@ -231,7 +305,8 @@ class RCU1():
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
else: value[0]=value2[0]
return True;
def GetI2Cbit(self,RCUi,dev,pin):
......@@ -246,6 +321,7 @@ class RCU1():
def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
......@@ -258,11 +334,29 @@ class RCU1():
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
# if width<8:
if (width!=l1*8) or (bitoffset>0):
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
# value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
#if (len(value)>1) and (width<8): print value
return True;
def GetI2CHBA1(self,RCUi,dev,width,bitoffset,value):
#print(width,len(value))
value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
value[:]=value2[:];
if dev.store>0:
self.previousHBA[RCUi,dev.store-1]=value
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
......@@ -276,21 +370,29 @@ class RCU1():
return RCUthread1
# def Queue_Monitor(self,Q1):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,32,[0]*32)
# Q1.put(Inst1)
def Queue_Monitor(self,Q1,NRCU):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,NRCU,[0]*NRCU)
# Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
return
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
varvalue2=(dim*[0.0] if v.type==Vars.datatype.dfloat else dim*[0])
dim1=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
dim2=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.size
dim3=int(v.size/v.nVars)
#print(v.name,dim)
varvalue2=0
if v.type==Vars.datatype.dInt: varvalue2=dim2*[0]
elif v.type==Vars.datatype.dbool: varvalue2=dim2*[False]
elif v.type==Vars.datatype.dfloat: varvalue2=dim2*[0.0]
elif v.type==Vars.datatype.dstring: varvalue2=dim1*[" "*dim3]
print(len(varvalue2),varvalue2)
if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]:
var1=AddVarR(v.name+"_R",varvalue2,v)
# print(len(varvalue1),len(varvalue2),v.size,dim2)
v.OPCR=var1
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim,varvalue2)
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim2,varvalue2)
Q1.put(Inst)
if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]:
......
......@@ -11,3 +11,9 @@ CLK_IO3_CONF1=DevReg(0x20,6,6,3)
CLK_IO3_CONF2=DevReg(0x20,7,7,4)
RCU_storeReg=4; #Number of stored registers
SPIBB_PLL=BBdev(4,[CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT1,CLK_IO3_OUT2],[4,7,5,6],0) #CLK,SDI,SDO,CS
CLK_PLL_lock =DevReg(SPIBB_PLL,0X00,0X00,0) # PLL locked status
from .HWconf import *
CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,6,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,7,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,4,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLKmod=I2Cmodules.CLK
OPC_devvars=[CLK_IGNORE_PPS,CLK_Enable_PWR,CLK_Stat1]
CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,6,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,7,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",CLKmod,DevType.I2C,CLK_IO3_OUT1,1 ,4,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_lock1 = VarArray("CLK_Lock" ,1,[Var2dev("",CLKmod,DevType.SPIbb,CLK_PLL_lock,8 ,0,1)],RW.ReadOnly,datatype.dInt,1,None,None)
OPC_devvars=[CLK_IGNORE_PPS,CLK_Enable_PWR,CLK_Stat1,CLK_lock1]
RCU_init=Instrs("ReadRegisters",2,[
Instr(DevType.VarUpdate,CLK_IGNORE_PPS,1,[0]),
......@@ -21,4 +26,22 @@ CLK_off=Instrs("CLK_off",2,[
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x40]),
])
OPC_methods=[CLK_on]
def SPIinst(reg,value):
return Instr(DevType.SPIbb,DevReg(SPIBB_PLL,0,reg,0),1,[value]),
CLK_PLL_setup=Instrs("CLK_PLL_setup",2,[
SPIinst(0x03,0x08),
SPIinst(0x05,0x97),
SPIinst(0x06,0x10),
SPIinst(0x07,0x04),
SPIinst(0x08,0x01),
SPIinst(0x07,0x00),
SPIinst(0x09,0x10),
SPIinst(0x0A,0x14),
SPIinst(0x09,0x00),
SPIinst(0x0D,0x01),#was 2
SPIinst(0x0F,0x01),
SPIinst(0x11,0x01),
SPIinst(0x13,0x01)
])
OPC_methods=[CLK_on,CLK_off,CLK_PLL_setup]
from enum import Enum
import logging
import numpy as np
class SPIBB_pins(Enum):
CLK = 0
SDI = 1
SDO = 2
CS = 3
def SetSPIbb(SetI2C,RCUi,dev,value):
ADC_address=dev.Register_W
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value]
SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value]
SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb set",ADC_address,value)))
# dev_rw = 0x00 # 0 for write, 1 for read
# data2 = ( reg_address << 9 ) + ( dev_rw << 8 ) + value[0]
data2 = ( ADC_address << 9 ) + value[0]
bit_array = "{0:{fill}16b}".format(data2, fill='0')
# print(bit_array)
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
for bit in bit_array:
SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
# SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished
return True;
def GetSPIbb(SetI2C,GetI2C,RCUi,dev,value):
ADC_reg_address=dev.Register_R
CSdev=dev.Addr.devs[SPIBB_pins.CS.value]
CSpin=dev.Addr.pins[SPIBB_pins.CS.value]
SDOdev=dev.Addr.devs[SPIBB_pins.SDO.value]
SDOpin=dev.Addr.pins[SPIBB_pins.SDO.value]
SDIdev=dev.Addr.devs[SPIBB_pins.SDI.value]
SDIpin=dev.Addr.pins[SPIBB_pins.SDI.value]
CLKdev=dev.Addr.devs[SPIBB_pins.CLK.value]
CLKpin=dev.Addr.pins[SPIBB_pins.CLK.value]
logging.info(str(("SPIbb get",ADC_reg_address)))
ADC_bytes = 0x00
# ADC_rw = 0x01 # 0 for write, 1 for read
data = (ADC_reg_address << 1) + 1 #was 7??
SetI2C(RCUi,CSdev,1,CSpin,[0]) #enable
bit_array = "{0:{fill}8b}".format(data, fill='0')
for bit in bit_array:
SetI2C(RCUi,SDIdev,1,SDIpin,[int(bit)])
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0])
# SetI2C(RCUi,SDIdev,1,SDIpin,[1]) #high when finished
# print("read byte")
a=[0]
N=1 #len(value)
ret_value=[0]
for i in range(N): value[i]=0
for cnt in range(8*(ADC_bytes+1)):
GetI2C(RCUi,SDOdev,1,SDOpin,ret_value)
for i in range(N): value[i]=(value[i]<<1)+ ret_value[i]
SetI2C(RCUi,CLKdev,1,CLKpin,[1])
SetI2C(RCUi,CLKdev,1,CLKpin,[0]) #read after falling edge
SetI2C(RCUi,CSdev,1,CSpin,[1]) #disable
return True;
......@@ -36,3 +36,31 @@ def I2C1server(addr,data,reg=None,read=0):
# data[0]=0xff
return False;
def I2C2server(addr,data,reg=None,read=0):
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-1',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0xff
return False;
......@@ -19,3 +19,16 @@ def I2C1server(addr,data,reg=None,read=0):
I2Ccounter+=len(data)
# logging.debug(str(("I2C",addr,reg,data,read)))
return True
def I2C2server(addr,data,reg=None,read=0):
global I2Ccounter;
logging.debug(str(("I2C2",addr,reg,data,read)))
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
data[0]=0
if reg: I2Ccounter+=1;
I2Ccounter+=len(data)
# logging.debug(str(("I2C",addr,reg,data,read)))
return True
......@@ -16,3 +16,12 @@ class I2Cswitch1():
def I2Ccallback(self,RCU,addr,data,reg=None,read=0):
self.callback1(addr,data,reg,read)
class I2Cswitch0():
def __init__(self,callback1):
return
def SetChannel(self,channel):
return True
def I2Ccallback(self,RCU,addr,data,reg=None,read=0):
return
\ No newline at end of file
......@@ -43,13 +43,14 @@ opcuaserv.InitServer(port=args.port)
logging.info("OPC-UA Server started")
SW1=I2Cswitch1.I2Cswitch1(I2C.I2C1server)
SW0=I2Cswitch1.I2Cswitch0(I2C.I2C2server) #Dummy switch as their is no switch on LTS
RCU=RCU.RCU1(NRCU,I2C.I2C1server,SW1.SetChannel)
RCU.AddVars(Q1,opcuaserv.AddVarR,opcuaserv.AddVarW)
RCU.AddMethod(Q1,opcuaserv.Addmethod)
RCU.load() #Load current register values from HW
#CLK=CLK.RCU1(1,I2C.I2C1server,SW1.SetChannel)
#CLK=CLK.RCU1(1,I2C.I2C2server,SW0.SetChannel)
#CLK.AddVars(Q2,opcuaserv.AddVarR,opcuaserv.AddVarW)
#CLK.AddMethod(Q2,opcuaserv.Addmethod)
#CLK.load() #Load current register values from HW
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment