Commit 2ec97a93 authored by mancini's avatar mancini

First commit

parents
import logging
import argparse
from time import sleep
from lib.opc_interface import define_get_crosslet_statistics_parameters, setup_server_metadata, register_namespace,\
define_rcu_object
# ----------------------------------MAIN CODE LOGIC
def setup_logging():
"""
Setup the logging system
"""
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(name)s: %(message)s',
datefmt="%m/%d/%Y %I:%M:%S %p",
level=logging.INFO)
__MODE_NOT_SET_DEFAULT = -2
def init():
"""
Init phase of the program
"""
setup_logging()
def setup_command_argument_parser():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Record cross correlation statistics and offers it through OPC UA")
parser.add_argument('--port', help='tcp port', type=int)
return parser
def main():
init()
parser = setup_command_argument_parser()
arguments = parser.parse_args()
server = setup_server_metadata()
namespace_idx = register_namespace(server)
rcu_obj = define_rcu_object(server, namespace_idx)
define_get_crosslet_statistics_parameters(rcu_obj, namespace_idx)
try:
# Start the server.
server.start()
while True:
sleep(1)
except:
pass
finally:
# close connection, remove subcsriptions, etc
server.stop()
if __name__=='__main__':
main()
\ No newline at end of file
import logging
import argparse
# ----------------------------------MAIN CODE LOGIC
def setup_logging():
"""
Setup the logging system
"""
logging.basicConfig(
format='%(asctime)s - %(levelname)s - %(name)s: %(message)s',
datefmt="%m/%d/%Y %I:%M:%S %p",
level=logging.INFO)
__MODE_NOT_SET_DEFAULT = -2
def init():
"""
Init phase of the program
"""
setup_logging()
def setup_command_argument_parser():
parser = argparse.ArgumentParser(
formatter_class=argparse.RawDescriptionHelpFormatter,
description="Read cross correlation statistics and offers it through OPC UA")
parser.add_argument('url', help='url to connect to', type=int)
return parser
def main():
init()
parser = setup_command_argument_parser()
arguments = parser.parse_args()
if __name__=='__main__':
main()
\ No newline at end of file
import logging
from opcua import ua, uamethod, Server
import numpy
DEFAULT_URI = "http://lofar.eu"
# method to be exposed through server
# uses a decorator to automatically convert to and from variants
@uamethod
def get_crosslet_statistics(parent, subband, integration_time):
import time
print("Ask to record subband {} with integration time {}", subband,
integration_time)
crosslets = numpy.zeros([96, 96]).tolist()
timestamp = time.localtime()
return timestamp, crosslets
def define_get_crosslet_statistics_parameters(object, idx):
subband = ua.Argument()
subband.Name = "subband"
subband.DataType = ua.NodeId(ua.ObjectIds.Int16)
subband.ValueRank = -1
subband.ArrayDimensions = []
subband.Description = ua.LocalizedText("subband to use to collect the crosslet statistics")
integration_time = ua.Argument()
integration_time.Name = "integration_time"
integration_time.DataType = ua.NodeId(ua.ObjectIds.Int16)
integration_time.ValueRank = -1
integration_time.ArrayDimensions = []
integration_time.Description = ua.LocalizedText("integration time")
cross = ua.Argument()
cross.Name = "crosslet_statistics"
cross.DataType = ua.NodeId(ua.ObjectIds.ComplexNumberType)
cross.ValueRank = 2
cross.ArrayDimensions = [96, 96]
cross.Description = ua.LocalizedText("crosscorrelation statistics")
timestamp = ua.Argument()
timestamp.Name = "timestamp"
timestamp.DataType = ua.NodeId(ua.ObjectIds.Int64)
timestamp.ValueRank = -1
timestamp.ArrayDimensions = []
timestamp.Description = ua.LocalizedText("timestamp")
return object.add_method(idx, "record_cross", get_crosslet_statistics,
[subband, integration_time],
[timestamp, cross])
def setup_server_metadata(server=None, port=55555):
if server is None:
server = Server()
server.set_endpoint("opc.tcp://localhost:%d/LOFAR2.0/CROSSECHO" % port)
server.set_server_name("LOFAR2.0 Crosscorrelations statistic echo server (CROSSECHO)")
return server
def register_namespace(server, uri=DEFAULT_URI):
idx = server.register_namespace(uri=uri)
return idx
def define_rcu_object(server, idx):
# get Objects node, this is where we should put our custom stuff
objects = server.get_objects_node()
# populating our address space
station_metric_folder = objects.add_folder(idx, "StationMetrics")
rcu = station_metric_folder.add_object(idx, "RCU")
return rcu
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment