Skip to content
Snippets Groups Projects
Commit 4f1f8979 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

HBAT2 via pico added

parent 0f420f87
Branches
Tags
No related merge requests found
File added
version: "1.0"
description: "1234"
drivers:
- name: hbaio #TCA9548
type: hbat_pico_io
parameters: ['COM3'] #serial port number
- name: hbat2
type: i2c_array #An array of similar devices connected to an I2C switch
parent: hbaio
parameters: [1,4] #start,number of RCUs
status: HBAT2_COMM_STATUS
#This is the I2C devices in the RCU
device_registers:
- name: BF
description: [BF board]
driver: hbaio
registers:
- name: X1
address: 0x0
store: True
- name: X2
address: 0x1
store: True
- name: X3
address: 0x2
store: True
- name: X4
address: 0x3
store: True
- name: Y
address: 0x4
store: True
- name: SWX
address: 0x8
store: True
- name: SWY
address: 0x9
store: True
- name: FE_FE_PWR
address: 0xA
store: True
- name: FE_BF_PWR
address: 0xB
store: True
- name: ID
address: 0x10
- name: VERSION
address: 0x14
- name: ADDR
address: 0x20
- name: VSENSE
address: 0x34
- name: SAVE_EEPROM
address: 0x40
- name: LOAD_EEPROM
address: 0x41
variables:
- name: HBAT2_ANT_mask
description: Only masked RCUs are updated
driver: hbat2
rw: variable #server RW variable, not linked to IO
dtype: boolean
dim: 16
- name: HBAT2_BF_mask
description: Only masked RCUs are updated
driver: hbat2
rw: variable #server RW variable, not linked to IO
dtype: boolean
dim: 4
- name: HBAT2_COMM_STATUS
description: 0=Good, 1=No communication, 2=error
driver: hbat2
rw: ro #server RW variable, not linked to IO
dtype: uint8
mask: HBAT2_BF_mask
dim: 4
- name: DELAYS_X_G1
description: Group1 delays
driver: hbat2
devreg: [BF.X1,BF.X2,BF.X3,BF.X4]
bitoffset: 0
width: 4
rw: rw
dtype: uint8
dim: 16
mask: HBAT2_ANT_mask
- name: DELAYS_X_G2
description: Group2 delays
driver: hbat2
devreg: [BF.X1]
bitoffset: 4
width: 4
rw: rw
dtype: uint8
dim: 4
mask: HBAT2_BF_mask
methods:
- name: LOAD_EEPROM
driver: hbat2
instructions:
- BF1.LOAD_EEPROM: 0
import numpy as np
import logging
from queuetypes import *
from .hwdev import hwdev
from .i2c_dev import i2c_dev
class hbat2_array(i2c_dev):
def __init__(self,config):
i2c_dev.__init__(self,config);
self.addr=config['parameters'];
self.N=len(self.addr);
self.I2Cmask=[0]*self.N
# self.disableI2ConError=True;
self.I2Cmaskid=config.get('maskid',None)
if self.I2Cmaskid is None: logging.warn(config['name']+" mask not found!")
from .hwdev import hwdev;
import logging
import serial
import numpy as np
def NormDelays(D,offset=19,scale=20):
return [(d+offset)//scale for d in D] #shorter (inbetween) delay = waiting for FIFO
def Decode(D2):
previous="0";
res=''
for b in D2[:]:
if b>=6: #Start a new packet
#n=(8-len(res)%8)%8
n=(len(res)//8)*8
#res+=n*"0"
res=res[:n]
previous="0"
# print(n,res,previous)
else:
if previous=="0":
bit="0" if b<=2 else "01"
else:
if b==2: bit="1"
elif b==3: bit="0"
else: bit="01"
# print(previous,b,bit)
previous=bit[-1];
res+=bit;
#print(res)
res=res[2:]
# print("len",len(res)//8)
S=[]
for x in range(len(res)//8):
v1=int(res[x*8:x*8+8],2)
# print(x,res[x*8:x*8+8],v1)
S.append(v1)
return(S)
CRCtab=np.load("CRC_HBAT1.npy")
CRCtabl=[d%256 for d in CRCtab]
CRCtabh=[d//256 for d in CRCtab]
def CRCcheck(S1):
crcl=0;crch=0;
for b in S1:
i=crcl ^ b
crcl=crch ^ CRCtabl[i]
crch=CRCtabh[i]
# print(i,CRCtabh[i])
# crch=crcl ^ CRCtabh[i]
# crcl= CRCtabl[i]
return crch*256+crcl
def MakeBroadcast(data,func=4,reg=0,serv1=1,serv2=16):
assert(len(data)==32)
data2=[func,reg,serv1,serv2]+data
l=len(data2)+1
data2=[0,l]+data2
CRC=CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(CRCcheck(data2)==0)
return data2
def MakeRequest(serv,data=[],func=5,reg=0):
data2=[func,reg]+data
l=len(data2)+1
data2=[serv,l]+data2
CRC=CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(CRCcheck(data2)==0)
return data2
#Mlookup=[0x55,0x56,0x59,0x5A,0x65,0x66,0x69,0x6A,0x95,0x96,0x99,0x9A,0xA5,0xA6,0xA9,0xAA]
Mlookup=[0xAA,0x6A,0x9A,0x5A,0xA6,0x66,0x96,0x56,0xA9,0x69,0x99,0x59,0xA5,0x65,0x95,0x55]
def ManchesterEncode(data):
# Inverted: 0=input=high, 1=output=low. Applied at end.
# Data clocked out MSB first, send on wire LSB first!
data2=[0xff,0x0f,0xa8] #------------_______-_- Start sequence
for d in data:
data2+=[Mlookup[d//16]]
data2+=[Mlookup[d%16]]
data2+=[0xfd]
return [255-d for d in data2]
def Loopback(TX2):
#Calculte time between edges
S=''
for b in TX2:
S+="{0:08b}".format(255-b)[::-1]
print("Loopback bits:",S[:30])
T=[0]
for i in range(len(S)-1):
if (S[i]=='1') and (S[i+1]=="0"): T+=[i]
T=np.array(T[1:])
T=T[1:]-T[:-1]
print("Loopback delay:",T[:30])
return T
class hbat_pico_io(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
port=str(config['parameters'][0]);
self.ser = serial.Serial(port,115200,timeout=0.1) # open serial port
logging.info("hba-pico connecting to: "+self.ser.name) # check which port was really used
self.CurrentChannel=0;
def SetSW1(self,channel):
if (channel)==self.CurrentChannel: return True;
logging.debug("SetChannelbit=%i" % channel)
self.CurrentChannel=channel
return True
def SetChannel(self,channel):
if (channel)==self.CurrentChannel: return True;
logging.debug("SetChannel=%i" % channel)
self.CurrentChannel=channel
return True
def i2csetget(self,addr,data,reg=None,read=0):
#addr = BF address
print("I2cget",addr,data,reg,read)
try:
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
func=len(data)*2+1;
TX1=MakeRequest(addr,[],func,reg);
logging.debug(str(("Packet to TX",TX1)))
TX2=ManchesterEncode(TX1)
self.ser.write(bytearray(TX2))
data2=self.GetPackets()
logging.debug(str(("Data RX",data2)))
data[:len(data2)]=data2
return len(data2)==len(data);
# print("I2C read",addr,reg,data,read)
else:
func=len(data)*2;
TX1=MakeRequest(addr,data,func,reg);
logging.debug(str(("Packet to TX",TX1)))
TX2=ManchesterEncode(TX1)
self.ser.write(bytearray(TX2))
self.GetPackets();
return True;
except:
logging.debug("I2C failed!")
return False;
def GetDelay(self,Start=0x1000):
a=0;
D=[]
Start=0x1000
while a!=b'':
s=self.ser.readline()
# print(s)
if len(s)==2: continue;
a=s[:8]
# print(a)
if a==b'': break;
a=str(a,'UTF-8')
a=Start-int(a,16)
# print(a)
if (a==0): continue;
D.append(a)
return D;
...@@ -56,7 +56,7 @@ class i2c_array(i2c_dev): ...@@ -56,7 +56,7 @@ class i2c_array(i2c_dev):
if (len(mask)==self.N): if (len(mask)==self.N):
mask=[m for m in mask for x in range(Step)] mask=[m for m in mask for x in range(Step)]
if not(len(mask)==Step*self.N): if not(len(mask)==Step*self.N):
print("Check mask length!"); print("Check mask length!",len(mask),Step,self.N);
return; return;
# if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N); # if (len(value1)==V1.nVars) and (self.N>1): value1=(value1*self.N);
# logging.debug(str(("Step=",Step,"Mask=",mask))) # logging.debug(str(("Step=",Step,"Mask=",mask)))
......
import logging
import argparse
from opcuaserv import opcuaserv
from opcuaserv import i2client
from opcuaserv import yamlreader
#from opcuaserv import pypcc2
from i2cserv import i2cthread
import threading
import time
import sys
import signal
from yamlconfig import Find;
logging.basicConfig(level="DEBUG",format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
if __name__ == '__main__':
RunTimer=True;
#def signal_handler(sig, frame):
# logging.warn('Stop signal received!')
# global RunTimer;
# RunTimer=False
#signal.signal(signal.SIGINT, signal_handler)
logging.info("Start I2C processes")
threads=[]
I2Cclients=[]
name='HBAT2'
RCU_I2C=i2client.i2client(name=name)
thread1=i2cthread.start(*RCU_I2C.GetInfo())
threads.append(thread1)
I2Cclients.append(RCU_I2C)
#Load yaml so that we know the variable names
RCU_conf=yamlreader.yamlreader(RCU_I2C,yamlfile=name)
#RCU_conf.CallInit()
var1=RCU_conf.getmethodid('LOAD_EEPROM');
#var1=RCU_conf.getmethodid('RCU_on');
#N=32;
#mask=[i<2 for i in range(N)];
#RCU_I2C.callmethod(var1,[])
var1=RCU_conf.getvarid('DELAYS_X_G1');
N=16;
data=[0,1,2,3]*4;
mask=[(i//4)==2 for i in range(N)];
print(mask,data)
RCU_I2C.setvar(var1,data,mask);
var1=RCU_conf.getvarid('DELAYS_X_G2');
N=4;
data=[0,0,0,0];
mask=[i==2 for i in range(N)];
print(mask,data)
RCU_I2C.setvar(var1,data,mask);
#RCU_I2C.readvar(var1,mask);
time.sleep(3);
while RCU_I2C.data_waiting():
varid,data,mask=RCU_I2C.readdata()
print("Results:",RCU_conf.getvar1(varid)['name'],data[:12],mask[:12])
logging.info("Stop threads")
for i2c in I2Cclients:
i2c.stop()
for thread1 in threads:
thread1.join()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment