Skip to content
Snippets Groups Projects
Select Git revision
  • be2930cb32681f7fd875b197bc9f765b50a542c6
  • main default protected
  • hookup-to-django
  • small-test-to-test-pipeline
  • connect-to-adex_cache_fastapi-database
  • improve_alta_algorithm
6 results

convert_alta_to_adex_cache.py

Blame
  • Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    i2cthread.py 4.70 KiB
    #Start the correct I2C server connect to the pipes on a new process
    import logging
    from multiprocessing import Process
    from queuetypes import *
    from time import sleep
    #from i2cserv import I2Cswitch1
    #from i2cserv import I2C_dummy as I2C
    
    #from i2cserv import RCU
    #from i2cserv import CLK
    
    import yamlconfig as yc
    
    import yaml
    class AttrDict(object):
        def __init__(self, dct):
            self.dict = dct
    
        def __repr__(self):
            return repr(self.dict)
    
        def __getattr__(self, attr):
            try:
                val = self.dict[attr]
                if isinstance(val, dict):
                    val = AttrDict(val)
                return val
            except KeyError:
                raise AttributeError
    
    def runmethod(conf,Qout,methodid,mask):
    #    print("Run method",methodid,mask)
        var1=conf.getmethod(methodid)
        drv=var1.get('drivercls');
        for inst in var1['instructions']:
            for key,value in inst.items():
                if not(isinstance(value,list)): value=[value]
                logging.info(str(("Run instruction",key,value,mask)));
                if (key=='WAIT'):
                    sleep(value[0]/1000.)
                    continue;
                v1=conf.getvarid(key)
                if v1: 
                    if value[0]=='Update':
                       mask=getvar(conf,Qout,v1,mask)
                    else:
                       mask=setvar(conf,Qout,v1,value,mask)
                    continue;
                v1=conf.getmethodid(key)
                if v1: 
                    mask=runmethod(conf,Qout,v1,mask)
                    continue;
                v1=conf.getdevreg(key)
                if v1:
                    drv2=v1['drivercls']
                    if drv:  drv.Setdevreg(v1,value,mask)
                    elif drv2: drv2.Setdevreg(v1,value,mask)
                    else: logging.warn("Driver not specified for instruction"+key)
                    continue;
    #    def Setdevreg(self,devreg,value,mask=[],width=8,bitoffset=0):
                logging.warn("Unknown instruction "+key)
    #               drv=var1.get('drivercls');
    #               if not(drv): 
    #                   logging.warn(var1['name']+" driver not found!")
    #                   continue;
    #               for data in drv.OPCUAcallMethod(item.id,var1,item.data,item.mask):
    #                   Qout.put(data)
        return mask
    
    def setvar(conf,Qout,varid,data,mask):
        var1=conf.getvars()[varid]
        drv=var1.get('drivercls');
        if not(drv): 
            logging.warn(var1['name']+" driver not found!")
            return;
        for data in drv.OPCUASetVariable(varid,var1,data,mask):
            if len(mask)==len(data.mask): mask[:]=data.mask[:];
            elif len(mask)==0: mask=data.mask.copy();
            Qout.put(data)
        return mask;
    
    def getvar(conf,Qout,varid,mask):
        var1=conf.getvars()[varid]
        drv=var1.get('drivercls');
        if not(drv): 
            logging.warn(var1['name']+" driver not found!")
            return;
        for data in drv.OPCUAReadVariable(varid,var1,mask):
            if len(mask)==len(data.mask): mask[:]=data.mask[:];
            elif len(mask)==0: mask=data.mask.copy();
            Qout.put(data)
        return mask;
    
    
    def I2Cserver(Qin,Qout,name):
            logging.info("New i2c process "+name)
            conf=yc.yamlconfig(name)
            conf.linkdevices()
            conf.loaddrivers()
            conf.linkdrivers()
            while True:
               item = Qin.get()
               if item is None: break;
    #           print("TODO: Set busy")
    #           self.statevar.set_value("busy");
               #print("SetVar",item)#,self.conf.variables[item.id])
               if (item.type==InstType.varSet):
                   setvar(conf,Qout,item.id,item.data,item.mask)
               #    self.OPCUASetVariable(item.id,var1,item.data,item.mask)
               elif (item.type==InstType.varRead):
                   getvar(conf,Qout,item.id,item.mask)
               #    var1=self.conf.variables[item.id]
               #    self.OPCUAReadVariable(item.id,var1,item.data,item.mask)
               elif (item.type==InstType.method): runmethod(conf,Qout,item.id,item.mask)
               else: print("OPCUA call not implemented!");
    #           print("TODO: Set ready")
    #           if Qin.qsize()==0: self.statevar.set_value("ready");
            logging.info("End i2c process "+name)
    
    #        if name=='RCU':
    #          SW1=I2Cswitch1.I2Cswitch1(I2C.I2C1server)
    #          RCU1=RCU.RCU1(32,I2C.I2C1server,SW1.SetChannel,settings,Qin,Qout)
    #        elif name=='CLK':
    #          SW0=I2Cswitch1.I2Cswitch0(None) #Dummy switch as their is no switch on LTS
    #          RCU1=CLK.RCU1(1,I2C.I2C4server,SW0.SetChannel)
    #        else:
    #          logging.warn(str(("Unknown name",name," thread stopping")))
    #          return
    #        RCU1.load() #Load current register values from HW
    #        RCU1.RCUthread(Qin)
    
    def start(Qout,Qin,name):  
            thread1 = Process(target=I2Cserver, args=(Qin,Qout,name))
            thread1.start()
            return thread1