Skip to content
Snippets Groups Projects
Select Git revision
  • af6896702ebde971477cff9cabc91130275c00d1
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

resourceassignementeditorwebservice

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    hbat_pico_io.py 5.66 KiB
    from .hwdev import hwdev;
    import logging
    import serial
    import numpy as np
    
    def NormDelays(D,offset=19,scale=20):
        return [(d+offset)//scale for d in D]  #shorter (inbetween) delay = waiting for FIFO
    
    def Decode(D2):
        previous="0";
        res=''
        for b in D2[:]:
            if b>=6: #Start a new packet
                #n=(8-len(res)%8)%8
                n=(len(res)//8)*8
                #res+=n*"0"
                res=res[:n]
                previous="0"
    #            print(n,res,previous)
            else:
                if previous=="0":
                    bit="0" if b<=2 else "01"
                else:
                    if b==2:   bit="1"
                    elif b==3: bit="0"
                    else:      bit="01"
    #    print(previous,b,bit)
                previous=bit[-1];
                res+=bit;
    #print(res)
        res=res[2:]
    #    print("len",len(res)//8)
        S=[]
        for x in range(len(res)//8):
            v1=int(res[x*8:x*8+8],2)
    #    print(x,res[x*8:x*8+8],v1)
            S.append(v1)
        return(S)
    
    
    
    
    #Mlookup=[0x55,0x56,0x59,0x5A,0x65,0x66,0x69,0x6A,0x95,0x96,0x99,0x9A,0xA5,0xA6,0xA9,0xAA]
    Mlookup=[0xAA,0x6A,0x9A,0x5A,0xA6,0x66,0x96,0x56,0xA9,0x69,0x99,0x59,0xA5,0x65,0x95,0x55]
    
    def ManchesterEncode(data):
    #    Inverted: 0=input=high, 1=output=low. Applied at end.
    #    Data clocked out MSB first, send on wire LSB first!
         data2=[0xff,0x0f,0xa8]  #------------_______-_-  Start sequence
         for d in data:
             data2+=[Mlookup[d//16]]
             data2+=[Mlookup[d%16]]
         data2+=[0xfd]
         return [255-d for d in data2]
    
    def Loopback(TX2):
        #Calculte time between edges
        S=''
        for b in TX2:
            S+="{0:08b}".format(255-b)[::-1]
        print("Loopback bits:",S[:30])
        T=[0]
        for i in range(len(S)-1):
            if (S[i]=='1') and (S[i+1]=="0"): T+=[i]
        T=np.array(T[1:])
        T=T[1:]-T[:-1]
        print("Loopback delay:",T[:30])
        return T
    
    class hbat_pico_io(hwdev):
        def __init__(self,config):
           hwdev.__init__(self,config);
           port=str(config['parameters'][0]);
           self.ser = serial.Serial(port,115200,timeout=0.1)  # open serial port      
           logging.info("hba-pico connecting to: "+self.ser.name)         # check which port was really used 
           self.CurrentChannel=0;
           CRCtab=np.load("CRC_HBAT1.npy")
           self.CRCtabl=[d%256 for d in CRCtab]
           self.CRCtabh=[d//256 for d in CRCtab]
    
        def CRCcheck(self,S1):
         crcl=0;crch=0;
         for b in S1:
            i=crcl ^ b
            crcl=crch ^ self.CRCtabl[i]
            crch=self.CRCtabh[i]
    #    print(i,CRCtabh[i])
    #    crch=crcl ^ CRCtabh[i]
    #    crcl= CRCtabl[i]
         return crch*256+crcl
    
        def MakeBroadcast(self,data,func=4,reg=0,serv1=1,serv2=16):
          assert(len(data)==32)
          data2=[func,reg,serv1,serv2]+data
          l=len(data2)+1
          data2=[0,l]+data2
          CRC=self.CRCcheck(data2)
          data2=data2+[CRC%256,CRC//256]
          assert(self.CRCcheck(data2)==0)
          return data2
    
        def MakeRequest(self,serv,data=[],func=5,reg=0):
          data2=[func,reg]+data
          l=len(data2)+1
          data2=[serv,l]+data2
          CRC=self.CRCcheck(data2)
          data2=data2+[CRC%256,CRC//256]
          assert(self.CRCcheck(data2)==0)
          return data2
    
        def SetSW1(self,channel):
            if (channel)==self.CurrentChannel: return True;
            logging.debug("SetChannelbit=%i" % channel)
            self.CurrentChannel=channel
            return True
    
        def SetChannel(self,channel):
            if (channel)==self.CurrentChannel: return True;
            logging.debug("SetChannel=%i" % channel)
            self.CurrentChannel=channel
            return True
    
        def i2csetget(self,addr,data,reg=None,read=0):
           addr = self.CurrentChannel
           print("I2cget",addr,data,reg,read)
    #       try:
           if True:
                  if read==3:
                         time.sleep(data[0]/1000.)
                         return True
                  if read==1:
                    func=len(data)*2+1;
                    TX1=self.MakeRequest(addr,[],func,reg);
                    logging.debug(str(("Packet to TX",TX1)))
                    TX2=ManchesterEncode(TX1)
                    self.ser.write(bytearray(TX2))
                    data2=self.GetPackets()
                    logging.debug(str(("Data RX",data2)))
                    data[:len(data2)]=data2
                    return len(data2)==len(data);
    #         print("I2C read",addr,reg,data,read)
                  else:
                    func=len(data)*2;
                    TX1=self.MakeRequest(addr,data,func,reg);
                    logging.debug(str(("Packet to TX",TX1)))
                    TX2=ManchesterEncode(TX1)
                    self.ser.write(bytearray(TX2))
                    self.GetPackets();
                    return True;
    #       except:
    #              logging.debug("I2C failed!")
           return False;
    
        def GetDelay(self,Start=0x1000):
            a=0;
            D=[]
            Start=0x1000
            while a!=b'':
                s=self.ser.readline()
    #        print(s)
                if len(s)==2: continue;
                a=s[:8]
    #        print(a)
                if a==b'': break;
                a=str(a,'UTF-8')
                a=Start-int(a,16)
    #        print(a)
                if (a==0): continue;
                D.append(a)
            return D;
    
        def GetPackets(self):
          D=self.GetDelay(self.ser);
          D2=NormDelays(D)
          #print("Received delays:",D2[:10])
          NoData=True;
          S=Decode(D2)
          RXdata=[]
    #      print("Received packets:",S)
          while len(S)>0:
            NoData=False;
            L=S[1]
            S1=S[:L+3]
            CRC=self.CRCcheck(S1);
            logging.debug(str(("Received packet:",S1,"CRC=",CRC)))
            if (CRC==0) and (S1[0]>0x80):
              #print("Reply from Addr=",S1[0]-0x80," data=",S[2:-2])
              RXdata=S[2:-2]   
              logging.info(str(("Reply from Addr=",S1[0]-0x80," data=",[hex(a) for a in RXdata])))
            S=S[L+3:]
          if NoData: logging.info("Communication error!")
          return RXdata;