Select Git revision
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
hbat_pico_io.py 5.66 KiB
from .hwdev import hwdev;
import logging
import serial
import numpy as np
def NormDelays(D,offset=19,scale=20):
return [(d+offset)//scale for d in D] #shorter (inbetween) delay = waiting for FIFO
def Decode(D2):
previous="0";
res=''
for b in D2[:]:
if b>=6: #Start a new packet
#n=(8-len(res)%8)%8
n=(len(res)//8)*8
#res+=n*"0"
res=res[:n]
previous="0"
# print(n,res,previous)
else:
if previous=="0":
bit="0" if b<=2 else "01"
else:
if b==2: bit="1"
elif b==3: bit="0"
else: bit="01"
# print(previous,b,bit)
previous=bit[-1];
res+=bit;
#print(res)
res=res[2:]
# print("len",len(res)//8)
S=[]
for x in range(len(res)//8):
v1=int(res[x*8:x*8+8],2)
# print(x,res[x*8:x*8+8],v1)
S.append(v1)
return(S)
#Mlookup=[0x55,0x56,0x59,0x5A,0x65,0x66,0x69,0x6A,0x95,0x96,0x99,0x9A,0xA5,0xA6,0xA9,0xAA]
Mlookup=[0xAA,0x6A,0x9A,0x5A,0xA6,0x66,0x96,0x56,0xA9,0x69,0x99,0x59,0xA5,0x65,0x95,0x55]
def ManchesterEncode(data):
# Inverted: 0=input=high, 1=output=low. Applied at end.
# Data clocked out MSB first, send on wire LSB first!
data2=[0xff,0x0f,0xa8] #------------_______-_- Start sequence
for d in data:
data2+=[Mlookup[d//16]]
data2+=[Mlookup[d%16]]
data2+=[0xfd]
return [255-d for d in data2]
def Loopback(TX2):
#Calculte time between edges
S=''
for b in TX2:
S+="{0:08b}".format(255-b)[::-1]
print("Loopback bits:",S[:30])
T=[0]
for i in range(len(S)-1):
if (S[i]=='1') and (S[i+1]=="0"): T+=[i]
T=np.array(T[1:])
T=T[1:]-T[:-1]
print("Loopback delay:",T[:30])
return T
class hbat_pico_io(hwdev):
def __init__(self,config):
hwdev.__init__(self,config);
port=str(config['parameters'][0]);
self.ser = serial.Serial(port,115200,timeout=0.1) # open serial port
logging.info("hba-pico connecting to: "+self.ser.name) # check which port was really used
self.CurrentChannel=0;
CRCtab=np.load("CRC_HBAT1.npy")
self.CRCtabl=[d%256 for d in CRCtab]
self.CRCtabh=[d//256 for d in CRCtab]
def CRCcheck(self,S1):
crcl=0;crch=0;
for b in S1:
i=crcl ^ b
crcl=crch ^ self.CRCtabl[i]
crch=self.CRCtabh[i]
# print(i,CRCtabh[i])
# crch=crcl ^ CRCtabh[i]
# crcl= CRCtabl[i]
return crch*256+crcl
def MakeBroadcast(self,data,func=4,reg=0,serv1=1,serv2=16):
assert(len(data)==32)
data2=[func,reg,serv1,serv2]+data
l=len(data2)+1
data2=[0,l]+data2
CRC=self.CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(self.CRCcheck(data2)==0)
return data2
def MakeRequest(self,serv,data=[],func=5,reg=0):
data2=[func,reg]+data
l=len(data2)+1
data2=[serv,l]+data2
CRC=self.CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(self.CRCcheck(data2)==0)
return data2
def SetSW1(self,channel):
if (channel)==self.CurrentChannel: return True;
logging.debug("SetChannelbit=%i" % channel)
self.CurrentChannel=channel
return True
def SetChannel(self,channel):
if (channel)==self.CurrentChannel: return True;
logging.debug("SetChannel=%i" % channel)
self.CurrentChannel=channel
return True
def i2csetget(self,addr,data,reg=None,read=0):
addr = self.CurrentChannel
print("I2cget",addr,data,reg,read)
# try:
if True:
if read==3:
time.sleep(data[0]/1000.)
return True
if read==1:
func=len(data)*2+1;
TX1=self.MakeRequest(addr,[],func,reg);
logging.debug(str(("Packet to TX",TX1)))
TX2=ManchesterEncode(TX1)
self.ser.write(bytearray(TX2))
data2=self.GetPackets()
logging.debug(str(("Data RX",data2)))
data[:len(data2)]=data2
return len(data2)==len(data);
# print("I2C read",addr,reg,data,read)
else:
func=len(data)*2;
TX1=self.MakeRequest(addr,data,func,reg);
logging.debug(str(("Packet to TX",TX1)))
TX2=ManchesterEncode(TX1)
self.ser.write(bytearray(TX2))
self.GetPackets();
return True;
# except:
# logging.debug("I2C failed!")
return False;
def GetDelay(self,Start=0x1000):
a=0;
D=[]
Start=0x1000
while a!=b'':
s=self.ser.readline()
# print(s)
if len(s)==2: continue;
a=s[:8]
# print(a)
if a==b'': break;
a=str(a,'UTF-8')
a=Start-int(a,16)
# print(a)
if (a==0): continue;
D.append(a)
return D;
def GetPackets(self):
D=self.GetDelay(self.ser);
D2=NormDelays(D)
#print("Received delays:",D2[:10])
NoData=True;
S=Decode(D2)
RXdata=[]
# print("Received packets:",S)
while len(S)>0:
NoData=False;
L=S[1]
S1=S[:L+3]
CRC=self.CRCcheck(S1);
logging.debug(str(("Received packet:",S1,"CRC=",CRC)))
if (CRC==0) and (S1[0]>0x80):
#print("Reply from Addr=",S1[0]-0x80," data=",S[2:-2])
RXdata=S[2:-2]
logging.info(str(("Reply from Addr=",S1[0]-0x80," data=",[hex(a) for a in RXdata])))
S=S[L+3:]
if NoData: logging.info("Communication error!")
return RXdata;