Skip to content
Snippets Groups Projects
Commit 9ec0408f authored by Paulus Kruger's avatar Paulus Kruger
Browse files

pytr added

parent a5e36eb5
No related branches found
No related tags found
No related merge requests found
Pipeline #69214 failed
from asyncua.sync import ua, Server
from time import sleep
import subprocess
import signal
import logging
from queue import Queue
port=4899
logging.basicConfig(encoding='utf-8', level=logging.INFO)
#stop program neatly when stopped
#global running
#running=True;
datachanged=Queue()
def signal_handler(sig, frame):
global running
logging.warn('Stop signal received!')
datachanged.put(-1)
# server.stop()
# running=False
signal.signal(signal.SIGINT, signal_handler)
class SubHandler(object):
"""
Subscription Handler. To receive events from server for a subscription
"""
# def __init__(self):
# self.datachange_queue=Queue()
def check_datachange(self,timeout):
while True:
try:
node,val=self.datachange_queue.get(timeout=timeout)
except:
break
nodeid=node.nodeid.Identifier
# vname,myvar,v,reader,opcvar=Vars_W[nodeid]
# val=(val if isinstance(val, list) else [val] )
logging.info(str(("Datachange callback2",nodeid)))
# datachanged.put(
# node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously)))
def datachange_notification(self, node, val, data):
logging.info(str(("Datachange callback",node.nodeid.Identifier,data.monitored_item.Value.StatusCode)))
datachanged.put(node.nodeid.Identifier)
def event_notification(self, event):
logging.info(str(("Python: New event", event)))
def start_translator(ObjectId,tr_action):
serv=opcua_translator.get_value()+'.service'
logging.info(str(("start translator:",serv,tr_action)))
subprocess.run(['sudo','systemctl',tr_action,serv])
if True:
pcb_type=''
i2c_nr=-1
i2c1=subprocess.run(['i2cdetect','-y','1'],stdout=subprocess.PIPE).stdout.decode()
i2c5=subprocess.run(['i2cdetect','-y','5'],stdout=subprocess.PIPE).stdout.decode()
i2c1=i2c1.split('\n')[6].split()
i2c5=i2c5.split('\n')[6].split()
if i2c5[1]=='50':
logging.info("EEPROM detected on I2C 5")
pcb_type='APSCT'
i2c_nr=5
elif i2c1[1]=='50':
logging.info("EEPROM detected on I2C 1")
pcb_type='CCD'
i2c_nr=1
# print(i2c1)
# print(i2c5)
def get_i2c(register,length):
if i2c_nr<0: return
i2c1=subprocess.run(['i2ctransfer','-y',str(i2c_nr),'w1@0x50',str(register),'r%i@0x50'%length],stdout=subprocess.PIPE).stdout.decode()
i2c1=i2c1.split('\n')[0] #first line
i2c1=[int(s,16) for s in i2c1.split()]
return i2c1
# print(i2c1)
def byte2int(A):
r=A[0]
for r2 in A[1:]: r=r*256+r2
return r
#print('ID=',byte2int(get_i2c('0xfc',4)))
#print('Version=',get_i2c('0x0',16))
#print('Serial=',get_i2c('0x20',16))
def EEPROM_code_changed(value):
if value!='4899':
logging.warning("Wrong EEPROM_code:"+str(value))
return
# try:
EEPROM_ID.set_value(byte2int(get_i2c('0xfc',4)))
# except:
# logging.error("Get ID failed")
if True:
# global server,running,PCCobj,DEBUGobj,idx,sub;
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:{}/".format(port))
idx = server.register_namespace("http://lofar.eu")
obj = server.nodes.objects
# starting!
logging.info("Start server");
server.start()
handler = SubHandler()
sub = server.create_subscription(50, handler)
opcua_type = obj.add_variable(idx, 'pytr_type', pcb_type)
#Start/stop/enable/disable translators
opcua_translator = obj.add_variable(idx, 'pytr_translator_select_RW', 'CCDTR')
# print("***",opcua_translator,opcua_translator.get_value())
opcua_translator.set_writable()
mth_start = obj.add_method(idx, 'pytr_start',
lambda ObjectId,action="restart" : start_translator(ObjectId,action), [],[] )
mth_stop = obj.add_method(idx, 'pytr_stop',
lambda ObjectId,action="stop" : start_translator(ObjectId,action), [],[] )
mth_enable = obj.add_method(idx, 'pytr_enable',
lambda ObjectId,action="enable" : start_translator(ObjectId,action), [],[] )
mth_disable = obj.add_method(idx, 'pytr_disable',
lambda ObjectId,action="disable" : start_translator(ObjectId,action), [],[] )
#Program EEPROM
EEPROM_code = obj.add_variable(idx, 'pytr_EEPROM_passcode_RW', 'None')
EEPROM_code.set_writable()
handle = sub.subscribe_data_change(EEPROM_code)
EEPROM_ID = obj.add_variable(idx, 'pytr_EEPROM_ID_R', 0)
EEPROM_number = obj.add_variable(idx, 'pytr_EEPROM_number_R', 'None')
EEPROM_number_new = obj.add_variable(idx, 'pytr_EEPROM_number_RW', 'None')
EEPROM_number_new.set_writable()
EEPROM_version = obj.add_variable(idx, 'pytr_EEPROM_version_R', 'None')
EEPROM_version_new = obj.add_variable(idx, 'pytr_EEPROM_version_RW', 'None')
EEPROM_version_new.set_writable()
# logging.info("Add variables:")
nodeid=0
while nodeid!=-1:
nodeid=datachanged.get()
if (nodeid==EEPROM_code.nodeid.Identifier): EEPROM_code_changed(EEPROM_code.get_value())
# sleep(10)
#P1.GetVarNames("",AddVar);
server.stop()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment