Skip to content
Snippets Groups Projects
Commit 8efa0b2a authored by Paulus Kruger's avatar Paulus Kruger
Browse files

Merge branch 'pypcc2' into 'master'

Pypcc2

See merge request !8
parents b558fc04 91840ed2
No related branches found
No related tags found
1 merge request!8Pypcc2
from I2Cdevices import *;
import numpy as np
import logging
class I2Cbitbang(I2Cdevices):
def I2CSet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
# def I2Cset(self,D,name,reg,data,I2Ccallback,dev_number=0):
Dev=Find(self.D['I2C_devices'],'dev_name',dev)
Reg=Find(Dev['dev_registers'],'reg_name',reg)
regnr=Reg['reg_addr']
Addr=Dev['dev_address'];
# data2=[Addr<<1,regnr]+data;
data2=[Addr<<1]+int2bytes(regnr)+value
# logging.info("%s",data2)
I2Ccallback('SDO',0)
I2Ccallback('SCL',1)
if not(I2CbbTX(data2,I2Ccallback)): return False;
data2=[(Addr<<1)+1];
data3=[0]
if not(I2CbbRX(data2,data3,I2Ccallback)): return False;
if data3[0]!=0x80:
logging.error("Expected Ack 0x80, received: %s",data3);
return False;
logging.info("Si4012 TX success!")
return True;
def I2CGet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
SPI=Find(self.D['I2C_devices'],'dev_name',dev)
dev_number=0;
Reg=Find(SPI['dev_registers'],'reg_name',reg)
Addr=SPI['dev_address'];
regnr=Reg['reg_addr']
data2=[Addr<<1]+int2bytes(regnr)
if (len(data2)==3) and (data2[1]==17):
data2[1]=18; #SI4012 hack as get is one higher than set
width+=1; #hack to ignore first ack byte
# while regnr>255:
# data2=[regnr%256]+data2;
# regnr=int(regnr/256);
# data2=[Addr<<1,regnr]+data2;
# logging.info("%s %s %s",data2,Addr,Reg['reg_addr']);
if not(I2CbbTX(data2,I2Ccallback)): return False;
# return False;
data2=[(Addr<<1)+1];
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: data3=[0 for x in range(l1)]
else: data3=value
if not(I2CbbRX(data2,data3,I2Ccallback)): return False;
if makesinglevalue: value[0]=bytes2int(data3)
else: value[:]=data3[:];
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
#logging.info("Received: %s",data3);
return True;
def I2CbbTX(data2,SetBit):
SetBit('SDIOdir',1) #Input, should be high
if SetBit('SDI',0,True)==0:
logging.error("I2C line low!")
return False;
#Start
SetBit('SDIOdir',0) #Output = low
SetBit('SCL',0)
for b in data2:
bit_array = "{0:{fill}8b}".format(b, fill='0')
#logging.info(bit_array)
for bit in bit_array:
SetBit('SDIOdir',int(bit)) #input=high, output=low
SetBit('SCL',1)
SetBit('SCL',0)
SetBit('SDIOdir',1) #Input
#logging.info(GetBit(SDI))
if SetBit('SDI',0,True)==1:
logging.error("I2C: Not ACK")
return False;
SetBit('SCL',1)
#logging.info(GetBit(SDI))
SetBit('SCL',0)
# SetBit(SDIOdir,0) #Output
#Stop
SetBit('SDIOdir',0) #Low
SetBit('SCL',1)
SetBit('SDIOdir',1) #High
return True
def I2CbbRX(data1,data2,SetBit):
SetBit('SDIOdir',1) #Input, should be high
if SetBit('SDI',0,True)==0:
logging.error("I2C ack: line low!")
return False;
#Start
SetBit('SDIOdir',0) #Output = low
SetBit('SCL',0)
for b in data1:
bit_array = "{0:{fill}8b}".format(b, fill='0')
#logging.info(bit_array)
for bit in bit_array:
SetBit('SDIOdir',int(bit)) #input=high, output=low
SetBit('SCL',1)
SetBit('SCL',0)
SetBit('SDIOdir',1) #Input
#logging.info(GetBit(SDI))
if SetBit('SDI',0,True)==1:
logging.error("I2C: Not ACK")
return False;
SetBit('SCL',1)
#logging.info(GetBit(SDI))
SetBit('SCL',0)
for x in range(len(data2)):
readbit='';
SetBit('SDIOdir',1) #Input
for bit in range(8):
SetBit('SCL',1)
readbit+=str(SetBit('SDI',0,True))
SetBit('SCL',0)
if x<len(data2)-1:
SetBit('SDIOdir',0) #Output
else: #last one = not-ack
SetBit('SDIOdir',1) #Output
SetBit('SCL',1)
SetBit('SCL',0)
data2[x]=int(readbit,2)
#Stop
SetBit('SDIOdir',0) #Output = low
SetBit('SCL',1)
SetBit('SDIOdir',1) #Input = high
return True;
from hwdev import *;
import numpy as np
import logging
def GetField(D,name,dev_number):
X=D.get(name)
return X[dev_number] if isinstance(X,list) else X;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
def I2CName2pin(s):
dev,reg,pin=s.split('.')
return dev,reg,int(pin)
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
class I2Cdevices(hwdev):
def GetVarValues(names,value,I2Ccallback):
return True
def CallMethod(self,name,params,I2Ccallback):
# logging.info("RCU1 call method %s",name)
for Mth in self.D['Methods']:
if Mth['method_name']==name:
logging.info("RCU1 Run method %s",name)
for regs in Mth['registers']:
for key,value in regs.items():
self.SetVarValue(key,value,I2Ccallback)
return True
return hwdev.CallMethod(self,name,params,I2Ccallback)
def SetChildValue(self,D,child,name,value,I2Ccallback):
Pins={}
for p in D['child_GPIO']:
for key,regvalue in p.items():
Pins[key]=I2CName2pin(regvalue)
def SetGetPin(pin,value,Get=False):
Pin=Pins[pin]
if not(Get): return self.I2CSet(Pin[0],Pin[1],[value],I2Ccallback,1,Pin[2])
a=[0]
self.I2CGet(Pin[0],Pin[1],a,I2Ccallback,1,Pin[2])
return a[0];
return child.SetVarValue(name,value,SetGetPin)
def GetChildValue(self,D,child,name,value,I2Ccallback):
Pins={}
for p in D['child_GPIO']:
for key,regvalue in p.items():
Pins[key]=I2CName2pin(regvalue)
def SetGetPin(pin,value,Get=False):
Pin=Pins[pin]
if not(Get): return self.I2CSet(Pin[0],Pin[1],[value],I2Ccallback,1,Pin[2])
a=[0]
self.I2CGet(Pin[0],Pin[1],a,I2Ccallback,1,Pin[2])
return a[0];
logging.info("Set Child %s",child)
return child.GetVarValue(name,value,SetGetPin)
def SetVarValue(self,name,value,I2Ccallback):
logging.info("RCU1 Set %s=%s",name,value)
D=self.D
Var1=Find(D['Variables'],'var_name',name)
if (Var1):
dev_number=0
Dev=GetField(Var1,'var_dev',dev_number)
Dev=Dev.split('.')
vmax=GetField(Var1,'var_max',dev_number)
if vmax:
if value[0]>vmax: value[0]=vmax;
vmin=GetField(Var1,'var_min',dev_number)
if vmin:
if value[0]<vmin: value[0]=vmin;
value[0]=value-vmin;
width=GetField(Var1,'var_width',dev_number)
bitoffset=GetField(Var1,'var_bitoffset',dev_number)
scale=GetField(Var1,'var_scale',dev_number)
elif (Find(D['Methods'],'method_name',name)):
return self.CallMethod(name,value,I2Ccallback)
elif len(name.split('.'))==2:
Dev=name.split('.');
width=False;
bitoffset=False;
scale=False;
else:
childname=name.split('_')[0]
for c in self.D['dev_children']:
if childname==c['child_name']:
return self.SetChildValue(c,c['obj'],name[len(childname)+1:],value,I2Ccallback)
# return hwdev.SetVarValue(self,name,value,I2Ccallback)
logging.error("Unknown variable %s",name);
return False;
logging.info("%s",Dev)
if scale: value=int2bytes(int(value[0]/float(scale)))
if Find(D['I2C_devices'],'dev_name',Dev[0]):
if not(width): width=8
if not(bitoffset): bitoffset=0
logging.info('SetVar %s I2C %s to %s (width=%s bitoffset=%s)',name,Dev,value,width,bitoffset)
return self.I2CSet(Dev[0],Dev[1],value,I2Ccallback,width,bitoffset)
if Find(D['SPI_devices'],'dev_name',Dev[0]):
logging.info('SetVar %s SPI %S to %s',name,Dev,value)
return SPIset(D,Dev[0],Dev[1],value,I2Ccallback,dev_number)
hwdev.SetVarValue(self,name,value,I2Ccallback)
# return SetVar(self.D,name,value,I2Ccallback)
def GetVarValue(self,name,value,I2Ccallback):
#def GetVar(D,name,value,I2Ccallback,dev_number=0):
dev_number=0;D=self.D
logging.info("RCU1 Get %s=%s",name,value)
Var1=Find(D['Variables'],'var_name',name)
if (Var1):
Dev=GetField(Var1,'var_dev',dev_number)
Dev=Dev.split('.')
width=GetField(Var1,'var_width',dev_number)
bitoffset=GetField(Var1,'var_bitoffset',dev_number)
scale=GetField(Var1,'var_scale',dev_number)
elif len(name.split('.'))==2:
Dev=name.split('.');
width=False;
bitoffset=False;
scale=False;
else:
childname=name.split('_')[0]
for c in self.D['dev_children']:
if childname==c['child_name']:
return self.GetChildValue(c,c['obj'],name[len(childname)+1:],value,I2Ccallback)
# return hwdev.SetVarValue(self,name,value,I2Ccallback)
logging.error("Unknown variable %s",name);
return False
logging.info("%s",Dev)
if not(width): width=8
if not(bitoffset): bitoffset=0
if Find(D['I2C_devices'],'dev_name',Dev[0]):
logging.info('GetVar %s I2C %s value %s (width=%s bitoffset=%s)',name,Dev,value,width,bitoffset)
result=self.I2CGet(Dev[0],Dev[1],value,I2Ccallback,width,bitoffset)
if not(result): return False;
if scale: value[0]=float(value[0])*float(scale);
return True
if Find(D['SPI_devices'],'dev_name',Dev[0]):
logging.info('GetVar %s SPI %s value %s',name,Dev,value)
return SPIget(D,Dev[0],Dev[1],value,I2Ccallback,dev_number)
if Find(D['I2C_bitbang'],'dev_name',Dev[0]):
logging.info('GetVar %s I2C bb %s value %s',name,Dev,value)
if not(I2Cbbget(D,Dev[0],Dev[1],value,I2Ccallback,dev_number,width,bitoffset)): return False
if scale: value[0]=float(value[0])*float(scale);
return True;
logging.error("Unknown variable %s",name)
return False;
# return GetVar(self.D,name,value,I2Ccallback)
def GetVarNames(self,parentname,callback):
for V in self.D['Variables']:
dtype=(1 if V.get('var_scale') else 0)
RWs={'RO' : 1,'WO' : 2,'RW' : 3}
RW=(RWs[V.get('var_R/W')] if V.get('var_R/W') else 0)
callback(parentname+V['var_name'],dtype,RW)
return hwdev.GetVarNames(self,parentname,callback);
def GetMethodNames(self,parentname,callback):
for V in self.D['Methods']:
if not(V.get('method_invisible')): callback(parentname+V['method_name'])
return hwdev.GetMethodNames(self,parentname,callback);
def I2CSet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
#remember of values, add new ones with mask
# logging.info('I2CSet: %s=%s (mask=%s)',dev,value,mask)
Dev=Find(self.D['I2C_devices'],'dev_name',dev)
Addr=Dev['dev_address']
Reg=Find(Dev['dev_registers'],'reg_name',reg)
# logging.info("%s %s %s",dev,reg,value)
if (width<8) or (bitoffset>0):
previous=Reg.get('previous')
if not(previous):
previous=Reg.get('reg_default')
if not(previous):
previous=0;
value[0]=ApplyMask(value[0],width,bitoffset,previous);
# logging.info('masked to %s',value)
# logging.info("Set %s %s to %s",dev,reg,value)
result=I2Ccallback(Addr,value,reg=Reg['reg_addr'],)
#if result:
Reg['previous']=value[0]
return result
def I2CGet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
Dev=Find(self.D['I2C_devices'],'dev_name',dev)
Addr=Dev['dev_address']
Reg=Find(Dev['dev_registers'],'reg_name',reg)
l1=int(np.floor((width+bitoffset+7)/8))
# logging.info("%s %s %s",width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=Reg['reg_addr']
if reg>255: #This is for the monitor ADC
if not(I2Ccallback(Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(I2Ccallback(Addr,value2,read=1)): return False;
else:
if not(I2Ccallback(Addr,value2,reg=Reg['reg_addr'],read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
Reg['previous']=value[0]
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
else: value[0]=value2[0]
return True;
#def I2CGetBuff(D,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
# Dev=Find(D['I2C_devices'],'dev_name',dev)
# Addr=Dev['dev_address']
# Reg=Find(Dev['dev_registers'],'reg_name',reg)
# value[0]=Reg.get('previous')
# logging.info("Get Value=%s",value[0])
# if (width<8) or (bitoffset>0):
# value[0]=UnMask(value[0],width,bitoffset)
# return True;
from hwdev import *;
import numpy as np
import logging
def I2C_dummy(addr,data,reg=None,read=0):
# logging.info("I2C dummy addr=%s data=%s",addr,data)
return True;
class I2Cswitch(hwdev):
def SetChannel(self,channel,I2Ccallback):
if not(hasattr(self,'CurrentChannel')): self.CurrentChannel=0
logging.info("SetChannel %s %s",channel,self.CurrentChannel)
if (channel)==self.CurrentChannel: return True;
self.CurrentChannel=channel
return I2Ccallback(self.D['dev_address'],[channel])
# def GetVarNames(self,parentname,callback):
# for c in self.D['dev_children']:
# c['obj'].GetVarNames(parentname+c['name']+'_',callback)
# return True;
# def GetMethodNames(self,parentname,callback):
# for c in self.D['dev_children']:
# c['obj'].GetMethodNames(parentname+c['name']+'_',callback)
# return True;
def GetVarValue(self,name,value,I2Ccallback):
def I2Ccallback2(addr,data,reg=None,read=0):
# logging.info("Getvarcallback %s",x)
if (read==2):
if (cnt>0): return True;
if not(self.SetChannel(gchannel,I2Ccallback)): return False;
elif (read==3):
if (cnt>0): return True;
return I2Ccallback(addr,data,reg,read)
elif not(self.SetChannel(cchannel,I2Ccallback)): return False;
return I2Ccallback(addr,data,reg,read)
childname=name.split('_')[0]
child=Find(self.D['dev_children'],'child_name',childname)
if child:
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
return child['obj'].GetVarValue(name[len(childname)+1:],value,I2Ccallback)
group=Find(self.D['dev_groups'],'group_name',childname)
if group:
devcnt=len(group['group_members'])
stride=len(value)//devcnt;
gchannel=0;
for childname2 in group['group_members']:
gchannel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
for cnt,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
logging.info("Get: %s",childname2)
cchannel=1<<child['child_addr']
value2=value[cnt*stride:(cnt+1)*stride]
if not(child['obj'].GetVarValue(name[len(childname)+1:],value2,I2Ccallback2)): return False;
value[cnt*stride:(cnt+1)*stride]=value2
return True;
return False
def SetVarValue(self,name,value,I2Ccallback):
childname=name.split('_')[0]
child=Find(self.D['dev_children'],'child_name',childname)
if child:
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
return child['obj'].SetVarValue(name[len(childname)+1:],value,I2Ccallback)
group=Find(self.D['dev_groups'],'group_name',childname)
if group:
devcnt=len(group['group_members'])
stride=len(value)//devcnt;
AllSame=True;
for x in range(devcnt-1):
for y in range(stride):
if value[x*stride+y]!=value[(x+1)*stride+y]: AllSame=False;
if AllSame:
logging.info("Allsame");
channel=0;
for childname2 in group['group_members']:
channel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
logging.info("%s",channel)
if not(self.SetChannel(channel,I2Ccallback)): return False;
for x,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
callback=(I2Ccallback if x==0 else I2C_dummy)
if not(child['obj'].SetVarValue(name[len(childname)+1:],value[:stride],callback)): return false;
return True;
else:
for x,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
logging.info("Set: %s",childname2)
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
if not(child['obj'].SetVarValue(name[len(childname)+1:],value[x*stride:(x+1)*stride],I2Ccallback)): return false;
return True;
return False
def CallMethod(self,name,params,I2Ccallback):
childname=name.split('_')[0]
child=Find(self.D['dev_children'],'child_name',childname)
if child:
if not(self.SetChannel(1<<child['child_addr'],I2Ccallback)): return False;
return child['obj'].CallMethod(name[len(childname)+1:],params,I2Ccallback)
group=Find(self.D['dev_groups'],'group_name',childname)
if group:
channel=0;
for childname2 in group['group_members']:
channel+=1<<(Find(self.D['dev_children'],'child_name',childname2)['child_addr'])
if not(self.SetChannel(channel,I2Ccallback)): return False;
for x,childname2 in enumerate(group['group_members']):
child=Find(self.D['dev_children'],'child_name',childname2)
if not(child): return False;
callback=(I2Ccallback if x==0 else I2C_dummy)
if not(child['obj'].CallMethod(name[len(childname)+1:],params,callback)): return false;
return True;
return False;
def GetVarNames(self,parentname,callback):
if not(hwdev.GetVarNames(self,parentname,callback)): return False;
def AddVar(name,dtype=0,RW=3,cnt=1):
callback(name,dtype,RW,cnt*devcnt)
for group in self.D['dev_groups']:
childname=group['group_members'][0]
devcnt=len(group['group_members'])
child=Find(self.D['dev_children'],'child_name',childname)
if not(child): return False;
child['obj'].GetVarNames(parentname+group['group_name']+'_',AddVar)
return True;
......@@ -11,7 +11,7 @@ Python OPC-UA server to control the I2C devices in the LTS.
This Python3 code uses other Python3 modules:
- opcua
- pyyaml
- recordclass
- numpy
We recommend to install a virtual environment and then install the dependencies there. For convenience we have added the relevant modules to the file `requirements.txt` that can be used to install them with pip3: `python3 -m pip install -r requirements.txt`
......@@ -39,10 +39,10 @@ The software can be simply executed with Python3: `python3 opcuaserv.py`
Optional parameters are explained when the `-h` or `--help` parameter is supplied:
```bash
python3 opcuaserv.py --help
python3 pypcc2.py --help
cryptography is not installed, use of crypto disabled
cryptography is not installed, use of crypto disabled
usage: opcuaserv.py [-h] [-s] [--no-lib-hack] [-p PORT]
usage: pypcc2.py [-h] [-s] [--no-lib-hack] [-p PORT]
optional arguments:
-h, --help show this help message and exit
......@@ -51,19 +51,3 @@ optional arguments:
-p PORT, --port PORT Port number to listen on [4842].
```
# LTS structure
Raspberry pi (LTS_pypcc.yaml -> I2C controller on raspberry pi)
> Control PCB (LTS_switch.yaml -> I2C switch)
>
> > RCU2 PCB (LTS_RCUx.yaml -> I2C devices)
> >
> > > RCU2 Dither source (LTS_RCU2_dither.yaml -> I2C bitbang)
> >
> > > ADC (LTS_RCU2_ADC.yaml -> SPI bitbang2)
> >
> > Clock PCB (LTS_clk.yaml -> I2C device)
> >
> > > PLL (LTS_clkPLL.yaml -> SPI bitbang1)
from I2Cdevices import *;
import numpy as np
import logging
class SPIbitbang1(I2Cdevices):
def I2CSet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
SPI=Find(self.D['I2C_devices'],'dev_name',dev)
Reg=Find(SPI['dev_registers'],'reg_name',reg)
ADC_address=Reg['reg_addr']
SetBit=I2Ccallback
ADC_bytes = 0x00
ADC_rw = 0x00 # 0 for write, 1 for read
data2 = ( ADC_rw << 23 ) + ( ADC_bytes << 21 ) + ( ADC_address << 8 ) + value[0]
bit_array = "{0:{fill}24b}".format(data2, fill='0')
# logging.debug("%s",bit_array)
SetBit('CS',0) #enable
for bit in bit_array:
SetBit('SDO',int(bit))
SetBit('CLK',1)
SetBit('CLK',0)
SetBit('CS',1)
return True;
def I2CGet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
SPI=Find(self.D['I2C_devices'],'dev_name',dev)
Reg=Find(SPI['dev_registers'],'reg_name',reg)
ADC_reg_address=Reg['reg_addr']
# logging.debug("%s %s %s %s %s %s",ADC_reg_address,CS,SDIOdir,SDI,SDO,CLK)
SetBit=I2Ccallback
ADC_bytes = 0x00
ADC_rw = 0x01 # 0 for write, 1 for read
data = ( ADC_rw << 15) + ( ADC_bytes << 13 ) + ADC_reg_address
SetBit('CS',0) #enable
bit_array = "{0:{fill}16b}".format(data, fill='0')
for bit in bit_array:
SetBit('SDO',int(bit))
SetBit('CLK',1)
SetBit('CLK',0)
SetBit('CS',1) #disable
# logging.debug("read byte")
SetBit('SDIOdir',1) #inpute
SetBit('CS',0) #enable
a=[0]
read_bit = ''
for cnt in range(8*(ADC_bytes+1)):
ret_value=SetBit('SDI',0,True)
read_bit += str(ret_value)
SetBit('CLK',1)
SetBit('CLK',0) #read after falling edge
SetBit('CS',1) #disable
SetBit('SDIOdir',0) #output
# SetBit(SDO,1)
# SetBit(CLK,1)
# logging.debug("%s",read_bit)
# stri = "Read back data is: {0:2x} ".format(int(read_bit, 2))
# logging.debug("%s",stri)
value[0]=int(read_bit, 2)
return True;
from I2Cdevices import *;
import numpy as np
import logging
class SPIbitbang2(I2Cdevices):
def I2CSet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
SPI=Find(self.D['I2C_devices'],'dev_name',dev)
Reg=Find(SPI['dev_registers'],'reg_name',reg)
reg_address=(Reg['reg_addr'] if Reg else int(reg))
SetBit=I2Ccallback
# dev_rw = 0x00 # 0 for write, 1 for read
# data2 = ( reg_address << 9 ) + ( dev_rw << 8 ) + value[0]
data2 = ( reg_address << 9 ) + value[0]
bit_array = "{0:{fill}16b}".format(data2, fill='0')
# logging.info("%s",bit_array)
SetBit('CS',0) #enable
for bit in bit_array:
SetBit('SDI',int(bit))
SetBit('CLK',1)
SetBit('CLK',0)
SetBit('CS',1)
return True;
def I2CGet(self,dev,reg,value,I2Ccallback,width=8,bitoffset=0):
SPI=Find(self.D['I2C_devices'],'dev_name',dev)
Reg=Find(SPI['dev_registers'],'reg_name',reg)
reg_address=(Reg['reg_addr'] if Reg else int(reg))
# logging.info("%s %s %s %s %s %s",ADC_reg_address,CS,SDIOdir,SDI,SDO,CLK)
SetBit=I2Ccallback
ADC_bytes = 0x00
# ADC_rw = 0x01 # 0 for write, 1 for read
data = ( reg_address << 7 ) + 1
SetBit('CS',0) #enable
bit_array = "{0:{fill}8b}".format(data, fill='0')
for bit in bit_array:
SetBit('SDI',int(bit))
SetBit('CLK',1)
SetBit('CLK',0)
# SetBit('CS',1) #disable
# logging.info("read byte")
# SetBit('SDIOdir',1) #inpute
# SetBit('CS',0) #enable
a=[0]
read_bit = ''
for cnt in range(8*(ADC_bytes+1)):
ret_value=SetBit('SDO',0,True)
read_bit += str(ret_value)
SetBit('CLK',1)
SetBit('CLK',0) #read after falling edge
SetBit('CS',1) #disable
# SetBit(SDO,1)
# SetBit(CLK,1)
# logging.info("%s",read_bit)
stri = "Read back data is: {0:2x} ".format(int(read_bit, 2))
logging.info("%s",stri)
value[0]=int(read_bit, 2)
return True;
dev_children:
- child_name: Dither3
child_dev : I2Cbitbang
child_conf: LTS_RCU2_dither
child_GPIO:
- SCL : IO1.OUT1.6
- SDO : IO2.OUT2.3
- SDI : IO2.IN2.3
- SDIOdir : IO2.CONF2.3
- child_name: Dither2
child_dev : I2Cbitbang
child_conf: LTS_RCU2_dither
child_GPIO:
- SCL : IO1.OUT2.7
- SDO : IO1.OUT1.7
- SDI : IO1.IN1.7
- SDIOdir : IO1.CONF1.7
- child_name: ADC1
child_dev : SPIbitbang1
child_conf: LTS_RCU2_ADC
child_GPIO:
- CLK : IO3.OUT1.1
- SDO : IO3.OUT1.0
- SDI : IO3.IN1.0
- SDIOdir: IO3.CONF1.0
- CS : IO3.OUT2.0
- child_name: ADC2
child_dev : SPIbitbang1
child_conf: LTS_RCU2_ADC
child_GPIO:
- CLK : IO3.OUT1.3
- SDO : IO3.OUT1.2
- SDI : IO3.IN1.2
- SDIOdir: IO3.CONF1.2
- CS : IO3.OUT2.1
- child_name: ADC3
child_dev : SPIbitbang1
child_conf: LTS_RCU2_ADC
child_GPIO:
- CLK : IO3.OUT1.5
- SDO : IO3.OUT1.4
- SDI : IO3.IN1.4
- SDIOdir: IO3.CONF1.4
- CS : IO3.OUT2.2
#This is the I2C devices in the RCU
I2C_devices:
- dev_name: IO1
dev_description: IO-Expander for filter selection
dev_address: 0x75
dev_device: TCA9539
dev_registers:
- reg_name: CONF1
reg_addr: 6
- reg_name: CONF2
reg_addr: 7
- reg_name: OUT1
reg_addr: 2
ref_default: 0
- reg_name: OUT2
reg_addr: 3
ref_default: 0
- reg_name: IN1
reg_addr: 0
- reg_name: IN2
reg_addr: 1
- dev_name: IO3
dev_description: IO-Expander for ADC control
dev_address: 0x20
dev_device: TCA6416
dev_registers:
- reg_name: CONF1
reg_description: Direction of IO pins 0..7
reg_addr: 6
- reg_name: CONF2
reg_description: Direction of IO pints 8..15
reg_addr: 7
- reg_name: IN1
reg_description: Ouput port register 0..7
reg_addr: 0
- reg_name: IN2
reg_description: Ouput port register 0..7
reg_addr: 1
- reg_name: OUT1
reg_description: Ouput port register 0..7
reg_addr: 2
reg_default: 0x0F
- reg_name: OUT2
reg_description: Ouput port register 8..15
reg_addr: 3
reg_default: 0x0F
- dev_name: IO2
dev_description: IO-Expander for ON/OFF, Band, BUFx2
dev_address: 0x76
dev_device: TCA9539
dev_registers:
- reg_name: CONF1
reg_addr: 6
- reg_name: CONF2
reg_addr: 7
- reg_name: OUT1
reg_addr: 2
ref_default: 0
- reg_name: OUT2
reg_addr: 3
ref_default: 0
- reg_name: IN1
reg_addr: 0
- reg_name: IN2
reg_addr: 1
- dev_name: UC
dev_description: RCU microcontroller
dev_address: 0x40
dev_registers:
- reg_name: ID
reg_description: Device ID
reg_addr: 0
- dev_name: ROM
dev_description: EEPROM memory
dev_address: 0x53
dev_registers:
- reg_name: ctr_len
reg_description: Length of control data
reg_addr: 0
- reg_name: ctr_dat
reg_description: Control data (protocol buffers)
reg_addr: 2
- dev_name: AN
dev_description: Monitor ADC on RCU
dev_address: 0x14
dev_device: LTC2495
dev_registers:
- reg_name: Ch0
reg_addr: 0xB080
- reg_name: Ch1
reg_addr: 0xB8
- reg_name: Ch2
reg_addr: 0xB1
- reg_name: Ch3
reg_addr: 0xB9
- reg_name: Ch4
reg_addr: 0xB2
- reg_name: Ch5
reg_addr: 0xBA
- reg_name: Ch6
reg_addr: 0xB3
- reg_name: Ch7
reg_addr: 0xBB
- reg_name: Temp
reg_addr: 0xA0C0
Variables:
- var_name: Attenuator1
var_dev: IO1.OUT1
var_width: 5
var_max: 21
var_R/W: RW
- var_name: Attenuator2
var_dev: IO1.OUT2
var_width: 5
var_max: 21
var_R/W: RW
- var_name: Attenuator3
var_dev: IO2.OUT1
var_width: 5
var_max: 24
var_R/W: RW
- var_name: Pwr_dig
var_description: Enable LDOs
var_dict: {0 : Off, 1 : On}
var_dev: IO2.OUT1
var_width: 1
var_bitoffset: 6
var_R/W: RO
- var_name: Band3
var_dev: IO2.OUT2
var_dict: {0 : 10MHz, 1 : 30MHz}
var_width: 1
var_bitoffset: 0
var_R/W: RW
- var_name: Band1
var_dev: IO2.OUT2
var_dict: {0 : 10MHz, 1 : 30MHz}
var_width: 1
var_bitoffset: 1
var_R/W: RW
- var_name: Band2
var_dev: IO2.OUT2
var_dict: {0 : 10MHz, 1 : 30MHz}
var_width: 1
var_bitoffset: 4
var_R/W: RW
- var_name: LED0
var_description: Front panel LEDs, 0=On
var_dict: {0 : Off, 1 : Yellow, 2: Red, 3: Orange}
var_dev: IO2.OUT2
var_width: 2
var_bitoffset: 6
var_R/W: RW
- var_name: Temperature
var_dev: AN.Temp
var_width: 23
var_scale: 4.21e-3
var_R/W: RO
- var_name: Dth3_Pwr
var_dev: IO2.OUT2
var_width: 1
var_bitoffset: 2
- var_name: Dth2_Pwr
var_dev: IO1.OUT2
var_width: 1
var_bitoffset: 6
Methods:
- method_name: RCU_on
registers:
- IO2.CONF1: [0] #Setup IO expanders to output with default values
- IO2.OUT1: [0x4A]
- IO2.OUT2: [0x55]
- IO3.OUT1: [0x7f]
- IO3.OUT2: [0x47]
- IO1.OUT1: [0xCA]
- IO1.OUT2: [0xCA]
- IO2.CONF2: [0]
- IO3.CONF1: [0]
- IO3.CONF2: [0]
- IO1.CONF1: [0]
- IO1.CONF2: [0]
- ADC1_Setup: 1
- ADC2_Setup: 1
- ADC3_Setup: 1
- method_name: RCU_off
registers:
- IO2.OUT1: [0x00] #Switch all off
- IO2.OUT2: [0x00]
- IO3.OUT1: [0x00]
- IO3.OUT2: [0x00]
- IO1.OUT1: [0x00]
- IO1.OUT2: [0x00]
- method_name: Dither_on
method_invisible: 1
registers:
- Dth3_Pwr: [1]
- Dth2_Pwr: [1]
- Dither3_Setup: 1
- Dither2_Setup: 1
- method_name: Dither_off
method_invisible: 1
registers:
- Dth3_Pwr: [0]
- Dth2_Pwr: [0]
I2C_devices:
- dev_name: ADC1
dev_description: I2C-SPI bridge to ADC
dev_device: AD9683
dev_registers:
- reg_name: PLL_stat
reg_description: PLL locked status
reg_addr: 0x0A
- reg_name: JESD_control1
reg_description: JESD link control, ILAS mode
reg_addr: 0x5F
- reg_name: CML_level
reg_description: CML output adjust
reg_addr: 0x15
- reg_name: SYNC_control
reg_description: SYNC / SYSREF control
reg_addr: 0x3A
- reg_name: Update
reg_description: Global device update
reg_addr: 0xFF
Variables:
- var_name: locked
var_dev: ADC1.PLL_stat
var_width: 8
# var_R/W: RO
- var_name: SYNC
var_dev: ADC1.SYNC_control
var_width: 8
- var_name: CML
var_dev: ADC1.CML_level
var_width: 8
- var_name: JESD
var_dev: ADC1.JESD_control1
var_width: 8
var_R/W: RO
Methods:
- method_name: Setup
method_invisible: 1
registers:
- ADC1.JESD_control1: [14] #Setup ADCs
- ADC1.SYNC_control: [1] #Setup ADCs
- ADC1.CML_level: [0x7]
- ADC1.Update: [1] #Needed to update ADC registers
I2C_devices:
- dev_name: DS1
dev_description: Dither Source, SI4010
dev_address: 0x70
dev_registers:
- reg_name: TX_Stop
reg_addr: 0x67
- reg_name: TX_Start
reg_addr: 0x62
- reg_name: property
reg_addr: 0x11
- reg_name: TX_FREQ
reg_addr: 0x1140
Variables:
- var_name: Frequency
var_dev: DS1.TX_FREQ
var_width: 32
var_scale: 1e-6
var_bitoffset: 0
Methods:
- method_name: Setup
method_invisible: 1
registers:
- DS1.property: [0x60,1,0,0,0,125,127] #Power level (1,0)
- DS1.property: [0x40,0x06,0x0c,0xc4,0x60] #Frequency 101.5MHz
- DS1.property: [0x21,0,0] #tuning off
- DS1.TX_Start: [0,0,0,0,1]
dev_children:
- child_name: ADC1
child_dev : SPIbitbang1
child_conf: LTS_RCU2_ADC
child_GPIO:
- CLK : IO3.OUT1.1
- SDO : IO3.OUT1.0
- SDI : IO3.IN1.0
- SDIOdir: IO3.CONF1.0
- CS : IO3.OUT2.0
- child_name: ADC2
child_dev : SPIbitbang1
child_conf: LTS_RCU2_ADC
child_GPIO:
- CLK : IO3.OUT1.3
- SDO : IO3.OUT1.2
- SDI : IO3.IN1.2
- SDIOdir: IO3.CONF1.2
- CS : IO3.OUT2.1
- child_name: ADC3
child_dev : SPIbitbang1
child_conf: LTS_RCU2_ADC
child_GPIO:
- CLK : IO3.OUT1.5
- SDO : IO3.OUT1.4
- SDI : IO3.IN1.4
- SDIOdir: IO3.CONF1.4
- CS : IO3.OUT2.2
#This is the I2C devices in the RCU
I2C_devices:
- dev_name: IO1
dev_description: IO-Expander for filter selection
dev_address: 0x75
dev_device: TCA9539
dev_registers:
- reg_name: CONF1
reg_addr: 6
- reg_name: CONF2
reg_addr: 7
- reg_name: OUT1
reg_addr: 2
ref_default: 0
- reg_name: OUT2
reg_addr: 3
ref_default: 0
- reg_name: IN1
reg_addr: 0
- reg_name: IN2
reg_addr: 1
- dev_name: IO3
dev_description: IO-Expander for ADC control
dev_address: 0x20
dev_device: TCA6416
dev_registers:
- reg_name: CONF1
reg_description: Direction of IO pins 0..7
reg_addr: 6
- reg_name: CONF2
reg_description: Direction of IO pints 8..15
reg_addr: 7
- reg_name: IN1
reg_description: Ouput port register 0..7
reg_addr: 0
- reg_name: IN2
reg_description: Ouput port register 0..7
reg_addr: 1
- reg_name: OUT1
reg_description: Ouput port register 0..7
reg_addr: 2
reg_default: 0x0F
- reg_name: OUT2
reg_description: Ouput port register 8..15
reg_addr: 3
reg_default: 0x0F
- dev_name: IO2
dev_description: IO-Expander for ON/OFF, Band, BUFx2
dev_address: 0x76
dev_device: TCA9539
dev_registers:
- reg_name: CONF1
reg_addr: 6
- reg_name: CONF2
reg_addr: 7
- reg_name: OUT1
reg_addr: 2
ref_default: 0
- reg_name: OUT2
reg_addr: 3
ref_default: 0
- reg_name: IN1
reg_addr: 0
- reg_name: IN2
reg_addr: 1
- dev_name: UC
dev_description: RCU microcontroller
dev_address: 0x40
dev_registers:
- reg_name: ID
reg_description: Device ID
reg_addr: 0
- dev_name: ROM
dev_description: EEPROM memory
dev_address: 0x53
dev_registers:
- reg_name: ctr_len
reg_description: Length of control data
reg_addr: 0
- reg_name: ctr_dat
reg_description: Control data (protocol buffers)
reg_addr: 2
- dev_name: AN
dev_description: Monitor ADC on RCU
dev_address: 0x14
dev_device: LTC2495
dev_registers:
- reg_name: Ch0
reg_addr: 0xB080
- reg_name: Ch1
reg_addr: 0xB8
- reg_name: Ch2
reg_addr: 0xB1
- reg_name: Ch3
reg_addr: 0xB9
- reg_name: Ch4
reg_addr: 0xB2
- reg_name: Ch5
reg_addr: 0xBA
- reg_name: Ch6
reg_addr: 0xB3
- reg_name: Ch7
reg_addr: 0xBB
- reg_name: Temp
reg_addr: 0xA0C0
Variables:
- var_name: Attenuator1
var_dev: IO1.OUT1
var_width: 5
var_max: 21
var_R/W: RW
- var_name: Attenuator2
var_dev: IO1.OUT2
var_width: 5
var_max: 21
var_R/W: RW
- var_name: Attenuator3
var_dev: IO2.OUT1
var_width: 5
var_max: 24
var_R/W: RW
- var_name: Pwr_dig
var_description: Enable LDOs
var_dict: {0 : Off, 1 : On}
var_dev: IO2.OUT1
var_width: 1
var_bitoffset: 6
var_R/W: RO
- var_name: LED0
var_description: Front panel LEDs, 0=On
var_dict: {0 : Off, 1 : Yellow, 2: Red, 3: Orange}
var_dev: IO2.OUT2
var_width: 2
var_bitoffset: 6
var_R/W: RW
- var_name: Temperature
var_dev: AN.Temp
var_width: 23
var_scale: 4.21e-3
var_R/W: RO
Methods:
- method_name: RCU_on
registers:
- IO2.CONF1: [0] #Setup IO expanders to output with default values
- IO2.OUT1: [0x4A]
- IO2.OUT2: [0x55]
- IO3.OUT1: [0x7f]
- IO3.OUT2: [0x47]
- IO1.OUT1: [0xCA]
- IO1.OUT2: [0xCA]
- IO2.CONF2: [0]
- IO3.CONF1: [0]
- IO3.CONF2: [0]
- IO1.CONF1: [0]
- IO1.CONF2: [0]
- ADC1_Setup: 1
- ADC2_Setup: 1
- ADC3_Setup: 1
- method_name: RCU_off
registers:
- IO2.OUT1: [0x00] #Switch all off
- IO2.OUT2: [0x00]
- IO3.OUT1: [0x00]
- IO3.OUT2: [0x00]
- IO1.OUT1: [0x00]
- IO1.OUT2: [0x00]
dev_children:
- child_name: PLL
child_dev : SPIbitbang2
child_conf: LTS_clkPLL
child_GPIO:
- CLK : IO3.OUT1.4
- SDO : IO3.IN1.5
- SDI : IO3.OUT1.7
- CS : IO3.OUT1.6
#This is the I2C devices in the RCU
I2C_devices:
- dev_name: IO3
dev_description: IO-Expander
dev_address: 0x20
dev_device: TCA6416
dev_registers:
- reg_name: CONF1
reg_addr: 6
- reg_name: CONF2
reg_addr: 7
- reg_name: IN1
reg_addr: 0
- reg_name: IN2
reg_addr: 1
- reg_name: OUT1
reg_addr: 2
reg_default: 0x0F
- reg_name: OUT2
reg_addr: 3
reg_default: 0x0F
Variables:
- var_name: IGNORE_PPS
var_dev: IO3.OUT1
var_width: 1
var_bitoffset: 6
# var_R/W: RW
- var_name: ENABLE_PWR
var_dev: IO3.OUT1
var_width: 1
var_bitoffset: 7
# var_R/W: RW
- var_name: STAT1
var_dev: IO3.OUT1
var_width: 1
var_bitoffset: 4
# var_R/W: RO
# - var_name: STAT2
# var_dev: IO3.OUT1
# var_width: 1
# var_bitoffset: 5
# var_R/W: RO
- var_name: CONF
var_dev: IO3.CONF1
var_width: 8
- var_name: OUT
var_dev: IO3.OUT1
var_width: 8
- var_name: IN
var_dev: IO3.IN1
var_width: 8
Methods:
- method_name: on1
registers:
- IO3.CONF1: [0x2C]
- IO3.OUT1: [0x42]
- method_name: off1
method_invisible: 1
registers:
- IO3.CONF1: [0x2C]
- IO3.OUT1: [0x40]
#Output High=SDI(x2), PPS (0x40) #Read 0x42,
# IN=0x46: High: SDI, CLK, Enable_PWR
I2C_devices:
- dev_name: PLL1
dev_description: I2C-SPI bridge to PLL
dev_device: LTC6950
dev_registers:
- reg_name: a0
reg_addr: 0x00
Variables:
- var_name: locked
var_dev: PLL1.a0
var_R/W: RO
var_width: 23
Methods:
- method_name: Power_off
method_invisible: 1
registers:
- PLL1.3: [0x88]
- method_name: Power_on
method_invisible: 1
registers:
- PLL1.3: [0x08]
- method_name: Reset
method_invisible: 1
registers:
- PLL1.3: [0x0C]
- method_name: Setup
method_invisible: 1
registers:
- PLL1.3: [0x08]
- PLL1.5: [0x97]
- PLL1.6: [0x10]
- PLL1.7: [0x04]
- PLL1.8: [0x01]
- PLL1.7: [0x00]
- PLL1.9: [0x10]
- PLL1.10: [0x14] #0x0A
- PLL1.9: [0x00]
- PLL1.13: [0x01] #0x0D
- PLL1.15: [0x01] #0x0F
- PLL1.17: [0x01] #0x11
- PLL1.19: [0x01] #0x13
dev_name: PCC_I2C
dev_children:
- child_dev : I2Cswitch
child_conf: LTS_switch
child_addr: 1 #I2C port on raspberry pi
\ No newline at end of file
dev_name: Switch1
dev_address: 0x70
dev_device: TCA9548
# LTS midplane: I2C 0-5 = RCU 0-5, I2C 6 = UNB2_N0, I2C 7 = UNB2
dev_children:
- child_name: RCU01
child_dev: I2Cdevices
child_conf: LTS_RCU2L
child_addr: 1
- child_name: RCU02
child_dev: I2Cdevices
child_conf: LTS_RCU2L
child_addr: 2
- child_name: RCU03
child_dev: I2Cdevices
child_conf: LTS_RCU2L
child_addr: 3
- child_name: CLK
child_dev: I2Cdevices
child_conf: LTS_clk
child_addr: 7
dev_groups: #Must have the same config!
- group_name: RCUs
group_members: [RCU01,RCU02,RCU03]
# group_members: [RCU01]
from . import Vars
import numpy as np
import logging
#from .spibitbang1 import *
import threading
def ApplyMask(value,width=8,bitoffset=0,previous=0):
mask=(1<<width)-1
if bitoffset>0:
value<<=bitoffset;
mask<<=bitoffset;
return (value & mask) + (previous - (previous & mask));
def UnMask(value,width=8,bitoffset=0):
mask=(1<<width)-1
if bitoffset>0:
value>>=bitoffset;
value=value&mask;
return value;
def bytes2int(bts):
x=0;
for b in bts:
x=x*256+b;
return x;
def int2bytes(i):
b=[];
while i>255:
b=[i%256]+b;
i>>=8;
return [i]+b;
class RCU1():
def __init__(self,number,I2Ccallback,Switchcallback):
self.N=number;
self.I2Ccallback=I2Ccallback
self.SWcallback=Switchcallback
self.previous=np.zeros([number,Vars.RCU_storeReg],dtype='int')
def load(self):
Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
self.SetVar(Inst1)
#Vars.RCU_mask.OPCW.get_data_value().Value.Value=[1,1,0,0]
#Vars.Ant_mask.OPCW.get_data_value().Value.Value=[1,1,0,0,0,0,0,0,0,0,1,0]
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_init,0,[]) #Read the current status of GPIO IOs
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Var,Vars.RCU_att,12,[0,1,2,3,4,5,6,7,8,9,11])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_off,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.RCU_on,0,[])
#RCU.SetVar(Inst1)
#Inst1=Vars.Instr(Vars.DevType.Instr,Vars.ADC1_on,0,[])
#RCU.SetVar(Inst1)
#print(Vars.RCU)
def SetVar(self,Instr):
if Instr.type==Vars.DevType.Instr:
#Execute instructions
Iset=Instr.dev;
for i in Iset.instr:
logging.debug(str(("Inst",i)))
self.SetVar(i)
return;
if Instr.type in [Vars.DevType.I2C,Vars.DevType.SPIbb,Vars.DevType.I2Cbb]:
RCU0=-1;
mask=0
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
if RCU0<0: RCU0=RCUi;
if RCU0<0: return; #Mask all zero
self.SWcallback(mask)
if Instr.type==Vars.DevType.I2C:
logging.info(str(('** Set I2C:',Instr.dev,Instr.value)))
self.SetI2C(RCU0,Instr.dev,8,0,Instr.value)
return;
elif Instr.type==Vars.DevType.SPIbb:
logging.debug(str(('** Set SPIbb:',Instr.dev,Instr.value)))
SetSPIbb(self.SetI2C,RCU0,Instr.dev,Instr.value)
return;
elif Instr.type==Vars.DevType.I2Cbb:
logging.info(str(('** Set I2Cbb:',Instr.dev,Instr.value)))
# self.SetI2C(RCUi,Instr.dev,8,0,Instr.value)
return;
V1=Instr.dev
if not((Instr.nvalue==V1.nVars) and (Instr.type==Vars.DevType.VarUpdate)) and not(Instr.nvalue==V1.nVars*self.N):
logging.error("Wrong size of value")
return False
if V1.Vars[0].type==Vars.DevType.Internal:
Instr.dev.OPCW.get_data_value().Value.Value=Instr.value
logging.debug("Update internal variable")
return
# if V1.Vars[0].type==Vars.DevType.Internal: return;
Step=V1.nVars
value1=Instr.value if V1.OPCR is None else V1.OPCR.get_data_value().Value.Value
if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
if Instr.type==Vars.DevType.Var:
logging.info(str(('** Set Var:',V1.name,value1)))
for RCUi in range(self.N):
for Vari in range(Step):
self.SetVarValue(RCUi,V1.Vars[Vari],Instr.value[RCUi*Step+Vari:RCUi*Step+Vari+1])
value2=value1[RCUi*Step+Vari:RCUi*Step+Vari+1]
self.GetVarValue(RCUi,V1.Vars[Vari],value2)
value1[RCUi*Step+Vari:RCUi*Step+Vari+1]=value2
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=value1
elif Instr.type==Vars.DevType.VarUpdate:
self.GetVarValueAll(V1,value1)
if not(V1.OPCR is None): V1.OPCR.get_data_value().Value.Value=value1
# V1.OPCR.get_data_value().Value.Value=value1
logging.info(str(('** Readback:',V1.name,value1)))
def GetVarValue(self,RCUi,var,value):
logging.info(str(("RCU1 Get ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def GetVarValueAll(self,V1,value1):
mask=0;
for RCUi in range(self.N):
mask|=1<<Vars.RCU_MPaddr.Switch[RCUi]
Step=V1.nVars
if V1.Vars[0].type==Vars.DevType.I2C:
for Vari in range(Step):
DevReg=V1.Vars[Vari].devreg
self.SWcallback(mask)
self.SetI2CAddr(self,DevReg)
if DevReg.Register_R>255: self.I2Ccallback(DevReg.Addr,[250],read=3) #Wait for ADC
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
value2=value1[RCUi*Step+Vari:RCUi*Step+Vari+1]
var=V1.Vars[Vari]
self.GetI2Cnoreg(RCUi,var.devreg,var.width,var.bitoffset,value2)
if (var.Scale!=0): value2[0]*=var.Scale;
value1[RCUi*Step+Vari:RCUi*Step+Vari+1]=value2
elif V1.Vars[0].type==Vars.DevType.SPIbb:
self.GetBBValueAll(V1,value1,mask)
# logging.info("SPIbb all not implemented yet")
else:
logging.error("Type not implemented")
def GetBBValueAll(self,V1,value1,mask):
def SetBit(RCUi,dev,width,bitoffset,value,buffer=False):
if not(buffer): self.SWcallback(mask)
self.SetI2C(RCUi,dev,width,bitoffset,value,buffer=buffer)
def GetBit(RCUixx,dev,width,bitoffset,buffer=False):
value=[0 for RCUi in range(self.N)]
value2=[0]
if buffer:
for RCUi in range(self.N):
self.GetI2Cbuffer(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value
self.SWcallback(mask)
self.SetI2CAddr(self,dev)
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.GetI2Cnoreg(RCUi,dev,width,bitoffset,value2)
value[RCUi]=value2[0]
return value;
devs=[V1.Vars[Vari].devreg for Vari in range(V1.nVars)]
GetSPIbb2(SetBit,GetBit,0,devs,value1)
def SetVarValue(self,RCUi,var,value):
if var.devreg.Register_W==-1: return True; #We can not set it, only read it e.g. temperature
logging.debug(str(("RCU1 Set ",RCUi,var,value)))
if var.type==Vars.DevType.I2C:
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
self.SetI2C(RCUi,var.devreg,var.width,var.bitoffset,value)
elif var.type==Vars.DevType.I2Cbb:
logging.error("I2Cbb Implemented")
elif var.type==Vars.DevType.SPIbb:
logging.error("SPIbb Implemented")
else:
logging.error("Not Implemented")
def SetI2C(self,RCUi,dev,width,bitoffset,value,buffer=False):
if dev.store>0:
previous=self.previous[RCUi,dev.store-1];
value[0]=ApplyMask(value[0],width,bitoffset,previous);
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(('masked to',value)))
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if buffer: return True;
return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
def GetI2Cbuffer(self,RCUi,dev,width,bitoffset,value):
if not(dev.store>0): return False;
value[0]=self.previous[RCUi,dev.store-1];
# logging.debug(str(("GetI2Cbuffer",RCUi,dev.store,value)))
l1=int(np.floor((width+bitoffset+7)/8))
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
return True
def GetI2C(self,RCUi,dev,width,bitoffset,value):
# if dev.store>0:
# value[0]=self.previous[RCUi,dev.store-1]
# return True
l1=int(np.floor((width+bitoffset+7)/8))
# print(width,bitoffset,l1)
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if reg>255: #This is for the monitor ADC
if not(self.I2Ccallback(dev.Addr,int2bytes(reg),read=2)): return False;
I2Ccallback(Addr,[250],read=3)
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
else:
if not(self.I2Ccallback(dev.Addr,value2,reg=reg,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
else: value[0]=value2[0]
return True;
def GetI2Cbit(self,RCUi,dev,pin):
value2=[value]
self.I2CGet(RCUi,dev,1,1<<pin,value2)
return value2[0]
def SetI2CAddr(self,RCUi,dev):
return self.I2Ccallback(dev.Addr,int2bytes(dev.Register_R),read=2)
def GetI2Cnoreg(self,RCUi,dev,width,bitoffset,value):
l1=int(np.floor((width+bitoffset+7)/8))
makesinglevalue=((len(value)==1) and (l1>1));
if makesinglevalue: value2=[0 for x in range(l1)]
else: value2=value
reg=dev.Register_R
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
if makesinglevalue: value[0]=bytes2int(value2)
else: value[:]=value2[:];
if dev.store>0:
self.previous[RCUi,dev.store-1]=value[0]
# logging.debug(str(("Store buffer",RCUi,dev.store,value[0])))
if (width!=l1*8) or (bitoffset>0):
value[0]=UnMask(value[0],width,bitoffset)
#else: value[0]=value2[0]
return True;
def start(self,Q1):
def RCUthread(Q1):
while True:
item = Q1.get()
if item is None: break;
self.SetVar(item)
logging.info("End RCU thread")
RCUthread1 = threading.Thread(target=RCUthread, args=(Q1,))
RCUthread1.start()
return RCUthread1
# def Queue_Monitor(self,Q1):
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,32,[0]*32)
# Q1.put(Inst1)
# Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
# Q1.put(Inst1)
def AddVars(self,Q1,AddVarR,AddVarW):
for v in Vars.OPC_devvars:
dim=Vars.RCU_MPaddr.nI2C*Vars.RCU_MPaddr.nSwitch*v.nVars
varvalue2=(dim*[0.0] if v.type==Vars.datatype.dfloat else dim*[0])
if v.RW in [Vars.RW.ReadOnly,Vars.RW.ReadWrite]:
var1=AddVarR(v.name+"_R",varvalue2,v)
v.OPCR=var1
Inst=Vars.Instr(Vars.DevType.VarUpdate,v,dim,varvalue2)
Q1.put(Inst)
if v.RW in [Vars.RW.WriteOnly,Vars.RW.ReadWrite]:
var1=AddVarW(v.name+"_RW",varvalue2,v,Q1)
v.OPCW=var1
def AddMethod(self,Q1,Addmethod):
for v in Vars.OPC_methods:
Inst1=Vars.Instr(Vars.DevType.Instr,v,0,[])
Addmethod(v.name,Inst1,Q1)
#from collections import namedtuple
#from enum import Enum
from pcctypes import *
#Mid plane address
RCU_MPaddr=MPaddr(1,[1],1,[7])
CLK_IO3_OUT1=DevReg(0x20,0,2,1)
CLK_IO3_OUT2=DevReg(0x20,1,3,2)
CLK_IO3_CONF1=DevReg(0x20,6,6,3)
CLK_IO3_CONF2=DevReg(0x20,7,7,4)
RCU_storeReg=4; #Number of stored registers
from .HWconf import *
CLK_IGNORE_PPS= VarArray("CLK_Ignore_PPS",1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,6,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Enable_PWR= VarArray("CLK_Enable_PWR",1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,7,1)],RW.ReadOnly,datatype.dInt,1,None,None)
CLK_Stat1 = VarArray("CLK_Stat" ,1,[Var2dev("",I2Cmodules.CLK,DevType.I2C,CLK_IO3_OUT1,1 ,4,1)],RW.ReadOnly,datatype.dInt,1,None,None)
OPC_devvars=[CLK_IGNORE_PPS,CLK_Enable_PWR,CLK_Stat1]
RCU_init=Instrs("ReadRegisters",2,[
Instr(DevType.VarUpdate,CLK_IGNORE_PPS,1,[0]),
Instr(DevType.VarUpdate,CLK_Enable_PWR,1,[0])
])
CLK_on=Instrs("CLK_on",2,[
Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x42]),
])
CLK_off=Instrs("CLK_off",2,[
Instr(DevType.I2C,CLK_IO3_CONF1,1,[0x2C]),
Instr(DevType.I2C,CLK_IO3_OUT1 ,1,[0x40]),
])
OPC_methods=[CLK_on]
import yaml;
import importlib
import sys, inspect
import logging
YAMLDIR='YAML/'
class hwdev:
def __init__(self,configfile):
logging.info("Loading: %s",configfile)
self.D=yaml.load(open(YAMLDIR+configfile))
self.loadchildren(self.D)
def loadchildren(self,D):
self.children=[]
self.childnames=[]
if "dev_children" in D:
for c in D['dev_children']:
childname=(c['child_name']+"_" if c.get('child_name') else '');
objnm=c['child_dev']
yamlname=c['child_conf']
logging.info("Add child: %s %s %s",childname,objnm,yamlname)
i = importlib.import_module(objnm)
for name, obj in inspect.getmembers(i,inspect.isclass):
if name==objnm:
# yamlname=(c['conf_name'] if c.get('conf_name') else nm)
# logging.debug(" Load child: %s conf=%s.yaml",childname,yamlname)
c['obj']=obj(yamlname+".yaml")
self.children.append([childname,c['obj']])
def GetVarNames(self,parentname,callback):
for name,child in self.children:
child.GetVarNames(parentname+name,callback)
return True;
def GetMethodNames(self,parentname,callback):
for name,child in self.children:
child.GetMethodNames(parentname+name,callback)
return True;
def GetVarValue(self,name,value,I2Ccallback):
return False
def SetVarValue(self,name,value,I2Ccallback):
return False
# def CallMethod(self,name,params,I2Ccallback):
# for name,child in self.children:
# child.CallMethod(name,params,I2Ccallback)
# return True;
def CallMethod(self,name,params,I2Ccallback):
childname=name.split('_')[0]
for cname,child in self.children:
# logging.debug("hwdev callmethod %s %s",cname,childname)
if cname=="": child.CallMethod(name,params,I2Ccallback)
elif cname[:-1]==childname: return child.CallMethod(name[len(childname)+1:],params,I2Ccallback)
return False
def Find(L,name,value):
for x in L:
if x[name]==value:
return x;
return False;
from hwdev import hwdev;
import pylibi2c;
import time
import logging
......@@ -7,12 +6,17 @@ import logging
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
try:
if read==3:
time.sleep(data[0]/1000.)
return True
# logging.debug("I2C addr=%s reg=%s data=%s read=%s",addr,reg,data,read)
logging.debug(str(("I2C",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-1',addr)
if read==1:
length=len(data)
......@@ -25,23 +29,10 @@ def I2C1server(addr,data,reg=None,read=0):
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0
return False;
class pypcc(hwdev):
def GetVarValue(self,name,value):
for cname,child in self.children:
if not(child.GetVarValue(name,value,I2C1server)): return False
return True
def SetVarValue(self,name,value):
for cname,child in self.children:
if not(child.SetVarValue(name,value,I2C1server)): return False
return True
def CallMethod(self,name,params):
for cname,child in self.children:
child.CallMethod(name,params,I2C1server)
return True;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment