Select Git revision
I2C_serial_pi.py
-
Gijs Schoonderbeek authored
Update rd_unb2c.py + I2C_serial_PI.py now EEPROM + Frontpanel LED + Nodes QSFP + Nodes DDR are working
Gijs Schoonderbeek authoredUpdate rd_unb2c.py + I2C_serial_PI.py now EEPROM + Frontpanel LED + Nodes QSFP + Nodes DDR are working
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
pypcc2.py 2.99 KiB
import opcuaserv
try:
import queue
except ImportError:
import Queue as queue;
from rcu import RCU
from clk import CLK
from i2c import I2Cswitch1
import threading
import signal
import sys
import time
#import Vars
import logging
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--simulator", help="Do not connect to I2c, but simulate behaviour.", action="store_true")
parser.add_argument("-p", "--port", help="Port number to listen on [%(default)s].", type=int, default=4842)
parser.add_argument("-l", "--loglevel", help="Log level [%(default)s].", type=str, choices=["DEBUG","INFO","WARNING","ERROR"], default="INFO")
args = parser.parse_args()
# set log level
loglevel_nr = getattr(logging, args.loglevel.upper(), None)
if not isinstance(loglevel_nr, int):
raise ValueError('Invalid log level: %s' % args.loglevel)
#logging.basicConfig(level=loglevel_nr, format="%(asctime)s [%(levelname)8s] %(message)s")
logging.basicConfig(level=loglevel_nr,format='%(asctime)s [%(levelname)-8s,%(filename)-20s:%(lineno)-3d] %(message)s')
if args.simulator:
from i2c import I2C_dummy as I2C
else:
from i2c import I2C
#Queue used to pass instructions from opc-ua server to RCU
Q1=queue.Queue() #RCUs
Q2=queue.Queue() #CLK
#Setup OPCUA server (running in its own thread)
opcuaserv.InitServer(port=args.port)
logging.info("OPC-UA Server started")
SW1=I2Cswitch1.I2Cswitch1(I2C.I2C1server)
RCU=RCU.RCU1(32,I2C.I2C1server,SW1.SetChannel)
RCU.AddVars(Q1,opcuaserv.AddVarR,opcuaserv.AddVarW)
RCU.AddMethod(Q1,opcuaserv.Addmethod)
RCU.load() #Load current register values from HW
#CLK=CLK.RCU1(1,I2C.I2C1server,SW1.SetChannel)
#CLK.AddVars(Q2,opcuaserv.AddVarR,opcuaserv.AddVarW)
#CLK.AddMethod(Q2,opcuaserv.Addmethod)
#CLK.load() #Load current register values from HW
#logging.debug(str(("I2C bytes=",I2C.I2Ccounter)))
if False:
opcuaserv.server.stop()
exit()
RCUthread1=RCU.start(Q1)
#CLKthread1=CLK.start(Q2)
RunTimer=True;
def TimerThread(Q1,RCU):
V1=opcuaserv.AddVar("RCU_monitor_rate",10)
cnt=0;#Count second ticks
while RunTimer:
time.sleep(1)
T1=V1.get_data_value().Value.Value
if T1==0:
continue;
cnt+=1;
if cnt>=T1:
if Q1.qsize()>3: continue;
cnt=0;
logging.debug(str(("I2C bytes=",I2C.I2Ccounter," Qlength=",Q1.qsize())))
RCU.Queue_Monitor(Q1)
logging.info("End Timer thread")
Timerthread1 = threading.Thread(target=TimerThread, args=(Q1,RCU))
Timerthread1.start()
# on SIGINT: stop thread(s) by adding None to instruction queue(s)
def signal_handler(sig, frame):
logging.info('Stop RCU thread')
Q1.put(None)
Q2.put(None)
logging.info('Stop timer thread')
global RunTimer;
RunTimer=False
signal.signal(signal.SIGINT, signal_handler)
time.sleep(1)
opcuaserv.start()
try:
#Do nothing.
while RunTimer:
time.sleep(1)
finally:
logging.info("Stop OPC-UA server")
opcuaserv.server.stop()
RCUthread1.join()
# CLKthread1.join()
Timerthread1.join()