Skip to content
Snippets Groups Projects
Commit 03649d9a authored by Paulus Kruger's avatar Paulus Kruger
Browse files

working version March 2021

parent a8c3a697
No related branches found
No related tags found
1 merge request!10Pypcc2
import pylibi2c;
import time
import logging
#bus = pylibi2c.I2CDevice('/dev/i2c-1'
#read=0: write to register
#read=1: read from register
#read=2: write to register (common in group)
#read=3: wait ms second
I2Ccounter=0;
def I2C1server(addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-1',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
# print("I2C read",addr,reg,data,read)
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
# data[0]=0xff
return False;
def I2C4server(addr,data,reg=None,read=0):
# print("I2C4",addr,reg,data,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
# print("I2C",addr,reg,data,read)
# return True;
bus=pylibi2c.I2CDevice('/dev/i2c-4',addr)
if read==1:
length=len(data)
bus.iaddr_bytes=0
if not(reg is None):
bus.ioctl_write(0,str(bytearray([reg])))
data[:]=[int(x) for x in bus.ioctl_read(0,length)]
# print(data)
else:
if reg is None:
bus.iaddr_bytes=0
reg=0;
bus.ioctl_write(reg,str(bytearray(data)))
bus.close()
return True;
except:
if bus: bus.close()
print("I2C4 error")
# data[0]=0xff
return False;
......@@ -21,9 +21,8 @@ def I2C1server(addr,data,reg=None,read=0):
time.sleep(data[0]/1000.)
return True
logging.debug(str(("I2C",addr,reg,data,read)))
if read==1:
if not(reg is None): i2c1.transfer(addr,[I2C.Message(reg)])
if not(reg is None): i2c1.transfer(addr,[I2C.Message([reg])])
msgs=[I2C.Message(data,read=True)]
i2c1.transfer(addr,msgs)
data[:]=msgs[0].data
......@@ -31,15 +30,15 @@ def I2C1server(addr,data,reg=None,read=0):
if reg is None:
msgs=[I2C.Message(data)]
else:
msgs=[I2C.Message(reg),I2C.Message(data)]
msgs=[I2C.Message([reg]+data)]
i2c1.transfer(addr,msgs)
return True;
except:
return False;
i2c4=I2C("/dev/i2c-1")
i2c4=I2C("/dev/i2c-4")
def I2C1server(addr,data,reg=None,read=0):
def I2C4server(addr,data,reg=None,read=0):
# print("I2C",addr,reg,data,read)
try:
if read==3:
......@@ -48,7 +47,7 @@ def I2C1server(addr,data,reg=None,read=0):
logging.debug(str(("I2C",addr,reg,data,read)))
if read==1:
if not(reg is None): i2c4.transfer(addr,[I2C.Message(reg)])
if not(reg is None): i2c4.transfer(addr,[I2C.Message([reg])])
msgs=[I2C.Message(data,read=True)]
i2c4.transfer(addr,msgs)
data[:]=msgs[0].data
......@@ -56,7 +55,7 @@ def I2C1server(addr,data,reg=None,read=0):
if reg is None:
msgs=[I2C.Message(data)]
else:
msgs=[I2C.Message(reg),I2C.Message(data)]
msgs=[I2C.Message([reg]+data)]
i2c4.transfer(addr,msgs)
return True;
except:
......
from I2C import *
import time
RCU=2
RCU=3
Ver="RCU2H v0.2"
R1=0
ROM=0x50
#Set switch
print("Set switch")
if not(I2C1server(0x70,[1<<RCU],reg=None,read=0)): exit() #select RCU
if not(I2C1server(0x70,[1<<RCU],reg=None,read=0)):
print("Error setting switch!")
exit() #select RCU
exit()
#Get ID
print("Get ID")
......
......@@ -33,6 +33,7 @@ if args.simulator:
from i2c import I2C_dummy as I2C
else:
from i2c import I2C
# from i2c import I2Cv2 as I2C
#Queue used to pass instructions from opc-ua server to RCU
Q1=queue.Queue() #RCUs
......
......@@ -198,6 +198,9 @@ class RCU1():
for RCUi in range(self.N):
self.SWcallback(1<<Vars.RCU_MPaddr.Switch[RCUi])
# print(Step,Step2,len(value1),V1.size)
WX=[0]
self.I2Ccallback(0x40,WX,reg=0,read=1)#wakeup, do nothing
self.I2Ccallback(0x40,[10],read=3)
i0=(RCUi*Step+ Vari)*Step2
i1=(RCUi*Step+(Vari+1))*Step2
value2=value1[i0:i1]
......@@ -271,6 +274,7 @@ class RCU1():
value[i]=ApplyMask(value[i],width,bitoffset,previous[i]);
self.previousHBA[RCUi,dev.store-1]=value
# if buffer: return True;
print("HBA set:",value);
XX=[0]
self.I2Ccallback(0x40,XX,reg=0,read=1)#wakeup, do nothing
self.I2Ccallback(dev.Addr,[10],read=3)
......@@ -278,7 +282,7 @@ class RCU1():
if L>16:
self.I2Ccallback(dev.Addr,[10],read=3)
self.I2Ccallback(dev.Addr,value[16:],reg=dev.Register_W+16)
self.I2Ccallback(dev.Addr,[600],reg=dev.Register_W,read=3) #Wait 500ms
self.I2Ccallback(dev.Addr,[600],read=3) #Wait 500ms
return True
# return self.I2Ccallback(dev.Addr,value,reg=dev.Register_W)
......@@ -357,11 +361,13 @@ class RCU1():
#print(width,len(value))
value2=value
reg=dev.Register_R
# if not(self.I2Ccallback(dev.Addr,[],reg=0x0)): return False;
if not(self.I2Ccallback(dev.Addr,value2,read=1)): return False;
if value2[0] is None: return False
value[:]=value2[:];
if dev.store>0:
self.previousHBA[RCUi,dev.store-1]=value
print("HBA ",RCUi,dev.Addr," received:",value);
# if dev.store>0: #This is disabled due to noise on readback
# self.previousHBA[RCUi,dev.store-1]=value[:]
for i in range(len(value)):
value[i]=UnMask(value[i],width,bitoffset)
return True;
......@@ -383,11 +389,11 @@ class RCU1():
return RCUthread1
def Queue_Monitor(self,Q1,NRCU):
Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,NRCU,[0]*NRCU)
Q1.put(Inst1)
Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
Q1.put(Inst1)
return
Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_temp,NRCU,[0]*NRCU)
Q1.put(Inst1)
Inst1=Vars.Instr(Vars.DevType.VarUpdate,Vars.RCU_ADC_lock,96,[0]*96)
Q1.put(Inst1)
return
def AddVars(self,Q1,AddVarR,AddVarW,AddVar):
self.statevar=AddVar("RCU_state_R","busy")
for v in Vars.OPC_devvars:
......
......@@ -44,9 +44,13 @@ HBA3_led =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA3,1,0,1)
HBA1_pwr =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA1,1,7,1)
HBA2_pwr =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA2,1,7,1)
HBA3_pwr =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA3,1,7,1)
HBA1_pwr2 =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA1,1,1,1)
HBA2_pwr2 =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA2,1,1,1)
HBA3_pwr2 =Var2dev("",RCUmod,DevType.HBA1,RCU_HBA3,1,1,1)
HBA_Delay=VarArray("HBA_element_beamformer_delays",3,[HBA1_Delay,HBA2_Delay,HBA3_Delay],RW.ReadWrite,datatype.dInt,96,None,None)
HBA_led =VarArray("HBA_element_led" ,3,[HBA1_led ,HBA2_led ,HBA3_led ],RW.ReadWrite,datatype.dInt,96,None,None)
HBA_pwr =VarArray("HBA_element_pwr" ,3,[HBA1_pwr ,HBA2_pwr ,HBA3_pwr ],RW.ReadWrite,datatype.dInt,96,None,None)
HBA_pwr2 =VarArray("HBA_element_pwr2" ,3,[HBA1_pwr2 ,HBA2_pwr2 ,HBA3_pwr2 ],RW.ReadWrite,datatype.dInt,96,None,None)
#RCU_ID0=Var2dev("",RCUmod,DevType.I2C,RCU_ROM,8,0,1)
#RCU_ID=VarArray("RCU_ID",1,[RCU_ID0],RW.ReadOnly,datatype.dInt,4,None,None)
......@@ -109,7 +113,7 @@ RCU_OUT2=VarArray("RCU_OUT2",3,[RCU_IO1_2,RCU_IO2_2,RCU_IO3_2],RW.ReadOnly,datat
#OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ID,RCU_VER,HBA_Delay,HBA_led]
#OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ID,RCU_VER,HBA_Delay,HBA_led,RCU_uCV_ID]
OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ADC_lock,RCU_ADC_SYNC,RCU_ADC_JESD,RCU_ADC_CML,RCU_OUT1,RCU_OUT2,RCU_ID,RCU_VER,HBA_Delay,HBA_led,HBA_pwr,RCU_uCV_ID]
OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_ADC_lock,RCU_ADC_SYNC,RCU_ADC_JESD,RCU_ADC_CML,RCU_OUT1,RCU_OUT2,RCU_ID,RCU_VER,HBA_Delay,HBA_led,HBA_pwr,HBA_pwr2,RCU_uCV_ID]
#OPC_devvars=[RCU_mask,Ant_mask,RCU_att,RCU_band,RCU_temp,RCU_pwrd,RCU_LED,RCU_uCV_ID,RCU_uCV_spd0,RCU_uCV_RXspd1,RCU_uCV_RXspd2,RCU_uCV_TXspd1,RCU_uCV_debug ]#,HBA1_Pwr]#,RCU_CNF1,RCU_CNF2]
#Instr=namedtuple("DevInstr","type dev nvalue value")
#Instrs=namedtuple("Instr","name ninstr instr")
......
from test_common import *
RCUs=[2,3];
RCUs=[0,1,2,3];
setRCUmask(RCUs)
callmethod("RCU_off")
#callmethod("RCU_off")
#exit()
time.sleep(1)
#time.sleep(2)
callmethod("RCU_on")
#callmethod("RCU_on")
#time.sleep(1)
......
from test_common import *
name="RCU_attenuator"
RCU=0;
Att=[5,5,5]
RCU=[0];
Att=[1,1,1]
setAntmask([RCU])
setAntmask(RCU)
att=get_value(name+"_R")
print("Att old:",att[3*RCU:3*RCU+3])
att[3*RCU:3*RCU+3]=Att
print("Att old:",att[:12])
for r in RCU:
att[3*r:3*r+3]=Att
set_value(name+"_RW",att)
time.sleep(0.5)
att=get_value(name+"_R")
print("Att new:",att[3*RCU:3*RCU+3])
print("Att new:",att[:12])
disconnect()
\ No newline at end of file
from test_common import *
name="RCU_band"
RCU=3;
Att=[3,3,3]
setAntmask([RCU])
att=get_value(name+"_R")
print("Att old:",att[3*RCU:3*RCU+3])
att[3*RCU:3*RCU+3]=Att
set_value(name+"_RW",att)
time.sleep(0.5)
att=get_value(name+"_R")
print("Att new:",att[3*RCU:3*RCU+3])
disconnect()
\ No newline at end of file
......@@ -2,7 +2,7 @@ from test_common import *
name="RCU_LED0"
RCU=0;
LEDvalue=0;
LEDvalue=2;
setRCUmask([RCU])
......
from test_common import *
RCUs=[0,1,2,3];
RCUs=[3];
setRCUmask(RCUs)
#for RCU in RCUs:
setAntmask(RCUs,[True,True,True])
......
......@@ -16,7 +16,7 @@ i=(RCU*3+HBAT)*32
val=get_value(name+"_R")
print("old:",val[i:i+32])
val[i:i+32]=np.array(range(32))[::]*0
val[i:i+32]=np.array(range(32))[::]*0+1
set_value(name+"_RW",val)
time.sleep(1)
......
......@@ -26,15 +26,16 @@ set_value(name+"_RW",val)
#print(busy)
for x in range(10):
busy=get_value("RCU_state_R")
print(busy)
# print(busy)
if (busy=='busy'): break;
time.sleep(0.05)
while not(busy=='ready'):
busy=get_value("RCU_state_R")
print(busy)
# print(busy)
time.sleep(0.05)
val=get_value(name+"_R")
print("new:",val[i:i+32])
disconnect()
RCU=3
HBAT=1 #HBAT on RCU 0..2
#HBA=5; #HBA Element in HBAT
#BFX=11 #delay in 0.5ns
#BFY=BFX+1
name="HBA_element_pwr"
from test_common import *
import numpy as np
AntMask=[(x==HBAT) for x in range(3)]
setAntmask([RCU],AntMask)
i=(RCU*3+HBAT)*32
val=get_value(name+"_R")
print("old:",val[i:i+32])
for ii in range(10):
val[i:i+32]=np.ones([32])
#val[i:i+32]=np.zeros([32])
set_value(name+"_RW",val)
#time.sleep(0.5)
#busy=get_value("RCU_state_R")
#print(busy)
for x in range(10):
busy=get_value("RCU_state_R")
# print(busy)
if (busy=='busy'): break;
time.sleep(0.05)
while not(busy=='ready'):
busy=get_value("RCU_state_R")
# print(busy)
time.sleep(0.05)
val=get_value(name+"_R")
print("new:",val[i:i+32])
#val[i:i+32]=np.ones([32])
val[i:i+32]=np.zeros([32])
set_value(name+"_RW",val)
#time.sleep(0.5)
#busy=get_value("RCU_state_R")
#print(busy)
for x in range(10):
busy=get_value("RCU_state_R")
# print(busy)
if (busy=='busy'): break;
time.sleep(0.05)
while not(busy=='ready'):
busy=get_value("RCU_state_R")
# print(busy)
time.sleep(0.05)
val=get_value(name+"_R")
print("new:",val[i:i+32])
disconnect()
from test_common import *
rate= 60 #seconds
rate= 10 #seconds
name="RCU_monitor_rate_RW"
print("old:",get_value(name))
......
#from gpiozero import LED,Button
import RPi.GPIO as GPIO
#SCL=3
#SDA=2
SCL=24
SDA=23
GPIO.setmode(GPIO.BCM)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA ",GPIO.input(SDA))
print("SCL ",GPIO.input(SCL))
GPIO.cleanup()
#from gpiozero import LED,Button
import RPi.GPIO as GPIO
SCL=3
SDA=2
GPIO.setmode(GPIO.BCM)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA=",GPIO.input(SDA))
print("SCL=",GPIO.input(SCL))
GPIO.cleanup()
exit()
GPIO.setmode(GPIO.BCM)
print("Clock")
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.OUT)
for x in range(10):
GPIO.output(SCL,0)
print("SDA=",GPIO.input(SDA))
GPIO.output(SCL,1)
print("SDA=",GPIO.input(SDA))
print("STOP")
#GPIO.setup(SCL,GPIO.OUT)
GPIO.output(SCL,0)
GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,1)
GPIO.output(SDA,1)
GPIO.setup(SDA,GPIO.IN)
print("SDA=",GPIO.input(SDA))
GPIO.cleanup()
#SCL=LED(9)
#SDA=Button(8)
#if Button.is_pressed: print("High")
#SCL.on()
#SCL.off()
#from gpiozero import LED,Button
import RPi.GPIO as GPIO
SCL=3
SDA=2
Addr=0x70 #SWITCH
#Addr=0x20 #io expander
Value=0
data=[(Addr<<1)+0,Value] #Write
#data=[Addr<<1+1] #Read
GPIO.setmode(GPIO.BCM)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA high",GPIO.input(SDA)==1)
print("SCL high",GPIO.input(SCL)==1)
#GPIO.setup(SDA,GPIO.OUT)
GPIO.setup(SCL,GPIO.OUT)
#GPIO.output(SDA,1)
GPIO.output(SCL,1)
#Start
#GPIO.output(SDA,0)
GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,0)
for b in data:
ba="{0:{fill}8b}".format(b,fill='0')
print("Sending",ba)
for bit in ba:
if int(bit)==0:
GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
else:
GPIO.setup(SDA,GPIO.IN)
# GPIO.output(SDA,int(bit))
GPIO.output(SCL,1)
GPIO.output(SCL,0)
GPIO.setup(SDA,GPIO.IN)
# GPIO.output(SDA,0)
# print("Ack",GPIO.input(SDA)==0)
# GPIO.setup(SDA,GPIO.OUT)
#ack clock
GPIO.output(SCL,1)
# GPIO.setup(SDA,GPIO.IN)
print("Ack",GPIO.input(SDA)==0)
# GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SCL,0)
print("Ack released",GPIO.input(SDA)==1)
#Stop
#GPIO.output(SDA,0)
GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,1)
GPIO.output(SDA,1)
#GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA high",GPIO.input(SDA)==1)
print("SCL high",GPIO.input(SCL)==1)
GPIO.cleanup()
#from gpiozero import LED,Button
import RPi.GPIO as GPIO
SCL=3
SDA=2
Addr=0x70 #SWITCH
#Addr=0x20 #io expander
Value=0
data=[(Addr<<1)+0,Value] #Write
#data=[Addr<<1+1] #Read
GPIO.setmode(GPIO.BCM)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA high",GPIO.input(SDA)==1)
print("SCL high",GPIO.input(SCL)==1)
GPIO.setup(SDA,GPIO.OUT)
GPIO.setup(SCL,GPIO.OUT)
GPIO.output(SDA,1)
GPIO.output(SCL,1)
#Start
#GPIO.output(SDA,0)
#GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,0)
for b in data:
ba="{0:{fill}8b}".format(b,fill='0')
print("Sending",ba)
for bit in ba:
# if int(bit)==0:
# GPIO.setup(SDA,GPIO.OUT)
# GPIO.output(SDA,0)
# else:
# GPIO.setup(SDA,GPIO.IN)
GPIO.output(SDA,int(bit))
GPIO.output(SCL,1)
GPIO.output(SCL,0)
GPIO.setup(SDA,GPIO.IN)
# GPIO.output(SDA,0)
# print("Ack",GPIO.input(SDA)==0)
# GPIO.setup(SDA,GPIO.OUT)
#ack clock
GPIO.output(SCL,1)
# GPIO.setup(SDA,GPIO.IN)
print("Ack",GPIO.input(SDA)==0)
GPIO.output(SCL,0)
print("Ack released",GPIO.input(SDA)==1)
GPIO.setup(SDA,GPIO.OUT)
#Stop
#GPIO.output(SDA,0)
#GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,1)
GPIO.output(SDA,1)
#GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA high",GPIO.input(SDA)==1)
print("SCL high",GPIO.input(SCL)==1)
GPIO.cleanup()
#from gpiozero import LED,Button
import RPi.GPIO as GPIO
SCL=24
SDA=23
Addr=0x20 #SWITCH
#Addr=0x20 #io expander
Value=0
data=[(Addr<<1)+0,Value] #Write
#data=[Addr<<1+1] #Read
GPIO.setmode(GPIO.BCM)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA high",GPIO.input(SDA)==1)
print("SCL high",GPIO.input(SCL)==1)
GPIO.setup(SDA,GPIO.OUT)
GPIO.setup(SCL,GPIO.OUT)
GPIO.output(SDA,1)
GPIO.output(SCL,1)
#Start
#GPIO.output(SDA,0)
#GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,0)
for b in data:
ba="{0:{fill}8b}".format(b,fill='0')
print("Sending",ba)
for bit in ba:
# if int(bit)==0:
# GPIO.setup(SDA,GPIO.OUT)
# GPIO.output(SDA,0)
# else:
# GPIO.setup(SDA,GPIO.IN)
GPIO.output(SDA,int(bit))
GPIO.output(SCL,1)
GPIO.output(SCL,0)
GPIO.setup(SDA,GPIO.IN)
# GPIO.output(SDA,0)
# print("Ack",GPIO.input(SDA)==0)
# GPIO.setup(SDA,GPIO.OUT)
#ack clock
GPIO.output(SCL,1)
# GPIO.setup(SDA,GPIO.IN)
print("Ack",GPIO.input(SDA)==0)
GPIO.output(SCL,0)
print("Ack released",GPIO.input(SDA)==1)
GPIO.setup(SDA,GPIO.OUT)
#Stop
#GPIO.output(SDA,0)
#GPIO.setup(SDA,GPIO.OUT)
GPIO.output(SDA,0)
GPIO.output(SCL,1)
GPIO.output(SDA,1)
#GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SDA,GPIO.IN)
GPIO.setup(SCL,GPIO.IN)
print("SDA high",GPIO.input(SDA)==1)
print("SCL high",GPIO.input(SCL)==1)
GPIO.cleanup()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment