Skip to content
Snippets Groups Projects
Commit 7db26624 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

HBAT2 pico first working

parent 4f1f8979
No related branches found
No related tags found
No related merge requests found
...@@ -4,11 +4,11 @@ description: "1234" ...@@ -4,11 +4,11 @@ description: "1234"
drivers: drivers:
- name: hbaio #TCA9548 - name: hbaio #TCA9548
type: hbat_pico_io type: hbat_pico_io
parameters: ['COM3'] #serial port number parameters: ['/dev/ttyACM0'] #serial port number
- name: hbat2 - name: hbat2
type: i2c_array #An array of similar devices connected to an I2C switch type: i2c_array #An array of similar devices connected to an I2C switch
parent: hbaio parent: hbaio
parameters: [1,4] #start,number of RCUs parameters: [2,5] #start,number of RCUs
status: HBAT2_COMM_STATUS status: HBAT2_COMM_STATUS
#This is the I2C devices in the RCU #This is the I2C devices in the RCU
......
...@@ -37,39 +37,8 @@ def Decode(D2): ...@@ -37,39 +37,8 @@ def Decode(D2):
S.append(v1) S.append(v1)
return(S) return(S)
CRCtab=np.load("CRC_HBAT1.npy")
CRCtabl=[d%256 for d in CRCtab]
CRCtabh=[d//256 for d in CRCtab]
def CRCcheck(S1):
crcl=0;crch=0;
for b in S1:
i=crcl ^ b
crcl=crch ^ CRCtabl[i]
crch=CRCtabh[i]
# print(i,CRCtabh[i])
# crch=crcl ^ CRCtabh[i]
# crcl= CRCtabl[i]
return crch*256+crcl
def MakeBroadcast(data,func=4,reg=0,serv1=1,serv2=16):
assert(len(data)==32)
data2=[func,reg,serv1,serv2]+data
l=len(data2)+1
data2=[0,l]+data2
CRC=CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(CRCcheck(data2)==0)
return data2
def MakeRequest(serv,data=[],func=5,reg=0):
data2=[func,reg]+data
l=len(data2)+1
data2=[serv,l]+data2
CRC=CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(CRCcheck(data2)==0)
return data2
#Mlookup=[0x55,0x56,0x59,0x5A,0x65,0x66,0x69,0x6A,0x95,0x96,0x99,0x9A,0xA5,0xA6,0xA9,0xAA] #Mlookup=[0x55,0x56,0x59,0x5A,0x65,0x66,0x69,0x6A,0x95,0x96,0x99,0x9A,0xA5,0xA6,0xA9,0xAA]
Mlookup=[0xAA,0x6A,0x9A,0x5A,0xA6,0x66,0x96,0x56,0xA9,0x69,0x99,0x59,0xA5,0x65,0x95,0x55] Mlookup=[0xAA,0x6A,0x9A,0x5A,0xA6,0x66,0x96,0x56,0xA9,0x69,0x99,0x59,0xA5,0x65,0x95,0x55]
...@@ -105,6 +74,39 @@ class hbat_pico_io(hwdev): ...@@ -105,6 +74,39 @@ class hbat_pico_io(hwdev):
self.ser = serial.Serial(port,115200,timeout=0.1) # open serial port self.ser = serial.Serial(port,115200,timeout=0.1) # open serial port
logging.info("hba-pico connecting to: "+self.ser.name) # check which port was really used logging.info("hba-pico connecting to: "+self.ser.name) # check which port was really used
self.CurrentChannel=0; self.CurrentChannel=0;
CRCtab=np.load("CRC_HBAT1.npy")
self.CRCtabl=[d%256 for d in CRCtab]
self.CRCtabh=[d//256 for d in CRCtab]
def CRCcheck(self,S1):
crcl=0;crch=0;
for b in S1:
i=crcl ^ b
crcl=crch ^ self.CRCtabl[i]
crch=self.CRCtabh[i]
# print(i,CRCtabh[i])
# crch=crcl ^ CRCtabh[i]
# crcl= CRCtabl[i]
return crch*256+crcl
def MakeBroadcast(self,data,func=4,reg=0,serv1=1,serv2=16):
assert(len(data)==32)
data2=[func,reg,serv1,serv2]+data
l=len(data2)+1
data2=[0,l]+data2
CRC=self.CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(self.CRCcheck(data2)==0)
return data2
def MakeRequest(self,serv,data=[],func=5,reg=0):
data2=[func,reg]+data
l=len(data2)+1
data2=[serv,l]+data2
CRC=self.CRCcheck(data2)
data2=data2+[CRC%256,CRC//256]
assert(self.CRCcheck(data2)==0)
return data2
def SetSW1(self,channel): def SetSW1(self,channel):
if (channel)==self.CurrentChannel: return True; if (channel)==self.CurrentChannel: return True;
...@@ -119,15 +121,16 @@ class hbat_pico_io(hwdev): ...@@ -119,15 +121,16 @@ class hbat_pico_io(hwdev):
return True return True
def i2csetget(self,addr,data,reg=None,read=0): def i2csetget(self,addr,data,reg=None,read=0):
#addr = BF address addr = self.CurrentChannel
print("I2cget",addr,data,reg,read) print("I2cget",addr,data,reg,read)
try: # try:
if True:
if read==3: if read==3:
time.sleep(data[0]/1000.) time.sleep(data[0]/1000.)
return True return True
if read==1: if read==1:
func=len(data)*2+1; func=len(data)*2+1;
TX1=MakeRequest(addr,[],func,reg); TX1=self.MakeRequest(addr,[],func,reg);
logging.debug(str(("Packet to TX",TX1))) logging.debug(str(("Packet to TX",TX1)))
TX2=ManchesterEncode(TX1) TX2=ManchesterEncode(TX1)
self.ser.write(bytearray(TX2)) self.ser.write(bytearray(TX2))
...@@ -138,14 +141,14 @@ class hbat_pico_io(hwdev): ...@@ -138,14 +141,14 @@ class hbat_pico_io(hwdev):
# print("I2C read",addr,reg,data,read) # print("I2C read",addr,reg,data,read)
else: else:
func=len(data)*2; func=len(data)*2;
TX1=MakeRequest(addr,data,func,reg); TX1=self.MakeRequest(addr,data,func,reg);
logging.debug(str(("Packet to TX",TX1))) logging.debug(str(("Packet to TX",TX1)))
TX2=ManchesterEncode(TX1) TX2=ManchesterEncode(TX1)
self.ser.write(bytearray(TX2)) self.ser.write(bytearray(TX2))
self.GetPackets(); self.GetPackets();
return True; return True;
except: # except:
logging.debug("I2C failed!") # logging.debug("I2C failed!")
return False; return False;
def GetDelay(self,Start=0x1000): def GetDelay(self,Start=0x1000):
...@@ -166,4 +169,24 @@ class hbat_pico_io(hwdev): ...@@ -166,4 +169,24 @@ class hbat_pico_io(hwdev):
D.append(a) D.append(a)
return D; return D;
def GetPackets(self):
D=self.GetDelay(self.ser);
D2=NormDelays(D)
#print("Received delays:",D2[:10])
NoData=True;
S=Decode(D2)
RXdata=[]
# print("Received packets:",S)
while len(S)>0:
NoData=False;
L=S[1]
S1=S[:L+3]
CRC=self.CRCcheck(S1);
logging.debug(str(("Received packet:",S1,"CRC=",CRC)))
if (CRC==0) and (S1[0]>0x80):
#print("Reply from Addr=",S1[0]-0x80," data=",S[2:-2])
RXdata=S[2:-2]
logging.info(str(("Reply from Addr=",S1[0]-0x80," data=",[hex(a) for a in RXdata])))
S=S[L+3:]
if NoData: logging.info("Communication error!")
return RXdata;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment