Skip to content
Snippets Groups Projects
Commit 6f937cc0 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

HBA1 added

parent e585da6b
No related branches found
No related tags found
1 merge request!11Pypcc2
......@@ -294,7 +294,7 @@ variables:
- name: HBA_element_beamformer_delays
description: Delays of each frontend
driver: I2C_RCU
driver: I2C_HBAT
devreg: [HBAT1.XY,HBAT2.XY,HBAT3.XY]
bitoffset: [2,2,2]
width: 5
......@@ -302,6 +302,7 @@ variables:
dtype: uint8
dim: 3072
mask: Ant_mask
wait: 100 #ms
- name: RCU_ID
description: Unique RCU ID
......
......@@ -3,13 +3,25 @@ from .hwdev import hwdev
import logging
from .i2c_array import ApplyMask
from time import sleep
from queuetypes import *
class hba1(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
logging.info("HBA1 not tested yet!")
logging.info("HBA1 driver loaded")
# self.previousHBA=np.zeros([32,3,32],dtype='int')
def OPCUASetVariable(self,varid,var1,data,mask):
logging.info(str(("HBA set Var",var1['name'],data[:32*3],mask[:12])))
self.conf['parentcls'].SetGetVarValueMask(var1,data,mask,getalso=False)
#Wait for PPS if required else wait a bit
if var1.get('wait'):
logging.debug("Wait %i ms",var1.get('wait'))
sleep(var1['wait']/100.)
data,mask2=self.conf['parentcls'].GetVarValueMask(var1,mask)
Data=OPCUAset(varid,InstType.varSet,data.copy(),mask2.copy())
return [Data]
def i2csetget(self,addr,data,reg=None,read=0):
if read==0: return self.sethba(addr,reg,data)
elif read==1: return self.gethba(addr,reg,data)
......@@ -19,7 +31,7 @@ class hba1(hwdev):
def sethba(self,addr,reg,data):
logging.debug("SetHba addr=0x%x reg=0x%x",addr,reg)
I2Ccallback=self.conf['parentcls'].i2csetget
I2Ccallback(0x40,[],reg=0,read=1)#wakeup, do nothing
I2Ccallback(0x40,[0],read=1)#wakeup, do nothing
sleep(0.01)
I2Ccallback(addr,data[:16],reg=reg)
if len(data)>16:
......
......@@ -43,7 +43,7 @@ class i2c_array(i2c_dev):
m|=1<<self.RCU_Switch1[RCUi];
self.conf['parentcls'].SetChannel(m);
def SetGetVarValueMask(self,var1,data,mask):
def SetGetVarValueMask(self,var1,data,mask,getalso=True):
Step,Step2=GetSteps(var1);
value1=[0]*Step*Step2;
if len(data)==Step:
......@@ -77,13 +77,14 @@ class i2c_array(i2c_dev):
self.I2Cmask[RCUi]=res;
mask[RCUi*Step+Vari]=False;
continue;
value2=value1[i0:i1]
res=self.GetVarValue(devreg,width,bitoffset,value2)
if not(res):
self.I2Cmask[RCUi]=False;
mask[RCUi*Step+Vari]=False;
continue;
value1[i0:i1]=value2
if getalso:
value2=value1[i0:i1]
res=self.GetVarValue(devreg,width,bitoffset,value2)
if not(res):
self.I2Cmask[RCUi]=False;
mask[RCUi*Step+Vari]=False;
continue;
value1[i0:i1]=value2
return value1,mask
......@@ -174,8 +175,8 @@ class i2c_array(i2c_dev):
previous=storearray[self.RCUi];
for x in range(len(value)):
value[x]=ApplyMask(value[x],width,bitoffset,previous);
storearray[self.RCUi]=value[0]
logging.debug("Stored value:"+str(value[0]))
storearray[self.RCUi]=(value[0] if len(value)==1 else value[:])
logging.debug("Stored value:"+str(storearray[self.RCUi]))
# devreg['drivercls'].i2csetget
return devreg['drivercls'].i2csetget(devreg['addr'],value,reg=devreg['register_W'])
......@@ -200,8 +201,8 @@ class i2c_array(i2c_dev):
value[:]=value2[:];
if devreg['store']:
storearray=self.getstorearray(devreg,len(value));
storearray[self.RCUi]=value[0]
logging.debug(str(("Store buffer",self.RCUi,value[0])))
storearray[self.RCUi]=(value[0] if len(value)==1 else value[:])
logging.debug(str(("Store buffer",self.RCUi,storearray[self.RCUi])))
# print("Stored values:",self.getstorearray(devreg))
if (width!=l1*8) or (bitoffset>0):
if (width<8):
......
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
import time
import sys
import signal
from yamlconfig import Find;
logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
RunTimer=True;
#def signal_handler(sig, frame):
# logging.warn('Stop signal received!')
# global RunTimer;
# RunTimer=False
#signal.signal(signal.SIGINT, signal_handler)
logging.info("Start I2C processes")
threads=[]
I2Cclients=[]
name='RCU'
RCU_I2C=i2client.i2client(name=name)
thread1=i2cthread.start(*RCU_I2C.GetInfo())
threads.append(thread1)
I2Cclients.append(RCU_I2C)
#Load yaml so that we know the variable names
RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
#RCU_conf.CallInit()
var1=RCU_conf.getmethodid('RCU_update');
#var1=RCU_conf.getmethodid('RCU_on');
#N=32;
#mask=[i<2 for i in range(N)];
#RCU_I2C.callmethod(var1,[])
var1=RCU_conf.getvarid('HBA_element_beamformer_delays');
N=32*3;
data=list(range(32))*N;
mask=[i<1*3 for i in range(N)];
RCU_I2C.setvar(var1,data,mask);
#RCU_I2C.readvar(var1,mask);
time.sleep(5);
while RCU_I2C.data_waiting():
varid,data,mask=RCU_I2C.readdata()
print("Results:",RCU_conf.getvar1(varid)['name'],data[:12],mask[:12])
logging.info("Stop threads")
for i2c in I2Cclients:
i2c.stop()
for thread1 in threads:
thread1.join()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment