Skip to content
Snippets Groups Projects
Commit bc2df747 authored by Paulus Kruger's avatar Paulus Kruger
Browse files

order OPCUA changes using priority queue

parent 2320ec09
Branches
No related tags found
No related merge requests found
Pipeline #84818 passed
...@@ -6,7 +6,7 @@ import time ...@@ -6,7 +6,7 @@ import time
#from opcua import ua, Server #from opcua import ua, Server
from asyncua.sync import ua, Server from asyncua.sync import ua, Server
from datetime import datetime; from datetime import datetime;
from queue import Queue from queue import PriorityQueue
import logging import logging
#import Vars #import Vars
#import HWconf #import HWconf
...@@ -22,17 +22,17 @@ class SubHandler(object): ...@@ -22,17 +22,17 @@ class SubHandler(object):
Subscription Handler. To receive events from server for a subscription Subscription Handler. To receive events from server for a subscription
""" """
def __init__(self): def __init__(self):
self.datachange_queue=Queue() self.datachange_queue=PriorityQueue()
def check_datachange(self,timeout): def check_datachange(self,timeout):
while True: while True:
try: try:
node,val=self.datachange_queue.get(timeout=timeout) _,val,node=self.datachange_queue.get(timeout=timeout)
except: except:
break break
nodeid=node.nodeid.Identifier nodeid=node.nodeid.Identifier
vname,myvar,v,reader,opcvar=Vars_W[nodeid] vname,myvar,v,reader,opcvar=Vars_W[nodeid]
# val=(val if isinstance(val, list) else [val] ) # val=(val if isinstance(val, list) else [val] )
logging.info(str(("Datachange callback",nodeid,vname,val))) logging.warn(str(("Datachange",vname,val[:16] if isinstance(val,list) else val)))
node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously))) node.set_value(ua.DataValue(val, ua.StatusCode(ua.StatusCodes.GoodCompletesAsynchronously)))
for r in reader: for r in reader:
r.setvar(v,val) r.setvar(v,val)
...@@ -41,10 +41,11 @@ class SubHandler(object): ...@@ -41,10 +41,11 @@ class SubHandler(object):
# NOTE: OPC variables can not be updates in the datachange_notification when using asyncua.sync!! So we put them in a queue. # NOTE: OPC variables can not be updates in the datachange_notification when using asyncua.sync!! So we put them in a queue.
# print("Python: New data change event", node, val,data) # print("Python: New data change event", node, val,data)
if not(running): return if not(running): return
logging.info(str(("Datachange callback",node.nodeid.Identifier,data.monitored_item.Value.StatusCode))) sourcetime=data.monitored_item.Value.SourceTimestamp
logging.info(str(("Datachange callback",node.nodeid.Identifier,None if sourcetime is None else sourcetime.timestamp(),data.monitored_item.Value.StatusCode)))
if data.monitored_item.Value.StatusCode != ua.StatusCode(ua.StatusCodes.Good): return if data.monitored_item.Value.StatusCode != ua.StatusCode(ua.StatusCodes.Good): return
# logging.warning(str(("Python: New client data change event", node, val, data.monitored_item.Value.StatusCode))) # logging.warning(str(("Python: New client data change event", node, val, data.monitored_item.Value.StatusCode)))
self.datachange_queue.put([node,val]) self.datachange_queue.put((0 if sourcetime is None else sourcetime.timestamp(),val,node))
# myvar2.Value.Value=val # myvar2.Value.Value=val
# myvar2.SourceTimestamp = datetime.utcnow() # myvar2.SourceTimestamp = datetime.utcnow()
......
...@@ -190,7 +190,7 @@ class yamlreader(yamlconfig): ...@@ -190,7 +190,7 @@ class yamlreader(yamlconfig):
if not(mask): continue; if not(mask): continue;
mask=Find(self.conf['variables'],'name',mask) mask=Find(self.conf['variables'],'name',mask)
if not(mask): continue; if not(mask): continue;
mask=mask.get('OPCW',None) mask=mask.get('OPCR',None)
if (mask==None): continue; if (mask==None): continue;
v['maskOPC']=mask v['maskOPC']=mask
if not self.monitorvarid is None: if not self.monitorvarid is None:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment