-
Thomas Juerges authored
I am using the standard CORBA::ULong goe now. But note that this is mapped per machine and not per universe!
Thomas Juerges authoredI am using the standard CORBA::ULong goe now. But note that this is mapped per machine and not per universe!
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
Crossecho.py 5.15 KiB
# -*- coding: utf-8 -*-
#
# This file is part of the Crossecho project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" LOFAR2.0 Station Software
Implementation of a Tango device on top of an existing OPC-UA server in Python3.
"""
# PyTango imports
import PyTango
from PyTango import DebugIt
from PyTango.server import run
from PyTango.server import Device, DeviceMeta
from PyTango.server import attribute, command, pipe
from PyTango.server import device_property
from PyTango import AttrQuality, DispLevel, DevState
from PyTango import AttrWriteType, PipeWriteType
# Additional import
# PROTECTED REGION ID(Crossecho.additionnal_import) ENABLED START #
from opcua import Client
import numpy
# PROTECTED REGION END # // Crossecho.additionnal_import
__all__ = ["Crossecho", "main"]
class Crossecho(Device):
"""
Implementation of a Tango device on top of an existing OPC-UA server in Python3.
"""
__metaclass__ = DeviceMeta
# PROTECTED REGION ID(Crossecho.class_variable) ENABLED START #
# PROTECTED REGION END # // Crossecho.class_variable
# -----------------
# Device Properties
# -----------------
OPC_Server_Name = device_property(
dtype='str', default_value="localhost"
)
OPC_Server_Port = device_property(
dtype='str', default_value="55555"
)
OPC_time_out = device_property(
dtype='uint', default_value=1000
)
# ----------
# Attributes
# ----------
RCU_modes = attribute(
dtype=('str',),
max_dim_x=1024,
)
crosslet_stat = attribute(
dtype=('double',),
max_dim_x=10240,
)
# -----
# Pipes
# -----
xlt_stat = pipe(
label="xlt",
)
# ---------------
# General methods
# ---------------
def init_device(self):
Device.init_device(self)
# PROTECTED REGION ID(Crossecho.init_device) ENABLED START #
try:
self.client = Client("opc.tcp://{}:{}/".format(self.OPC_Server_Name, self.OPC_Server_Port), self.OPC_time_out)
self.client.connect()
self.client.load_type_definitions()
objects = self.client.get_objects_node()
idx = self.client.get_namespace_index(DEFAULT_URI)
# Now getting a variable node using its browse path
self.opc_obj = self.client.get_root_node().get_child(["0:Objects",
"{}:StationMetrics".format(idx),
"{}:RCU".format(idx)])
self.logger.info("Connected to the OPC-UA server %s", self.OPC_Server_Name)
except:
self.logger.error("Failed to connect to the OPC-UA server %s. Traceback: %s", self.OPC_Server_Name, traceback.format_exc())
# PROTECTED REGION END # // Crossecho.init_device
def always_executed_hook(self):
# PROTECTED REGION ID(Crossecho.always_executed_hook) ENABLED START #
pass
# PROTECTED REGION EloggND # // Crossecho.always_executed_hook
def delete_device(self):
# PROTECTED REGION ID(Crossecho.delete_device) ENABLED START #
self.client.close_session()
self.client.close_secure_channel()
# PROTECTED REGION END # // Crossecho.delete_device
# ------------------
# Attributes methods
# ------------------
def read_RCU_modes(self):
# PROTECTED REGION ID(Crossecho.RCU_modes_read) ENABLED START #
return self.RCU_modes
# PROTECTED REGION END # // Crossecho.RCU_modes_read
def read_crosslet_stat(self):
# PROTECTED REGION ID(Crossecho.crosslet_stat_read) ENABLED START #
return crosslet_stat
# PROTECTED REGION END # // Crossecho.crosslet_stat_read
# -------------
# Pipes methods
# -------------
def read_xlt_stat(self):
# PROTECTED REGION ID(Crossecho.xlt_stat_read) ENABLED START #
return dict(x=self.crosslet_stat, y=self.RCU_modes)
# PROTECTED REGION END # // Crossecho.xlt_stat_read
# --------
# Commands
# --------
@command(
)
@DebugIt()
def record_cross(self):
# PROTECTED REGION ID(Crossecho.record_cross) ENABLED START #
if self.is_record_cross_allowed() is True:
timeStamp, self.crosslet_stat, self.RCU_modes = self.opc_obj.call_method("{}:record_cross".format(idx), self.subBand, self.integrationTime)
print("Timestamp is ", timeStamp)
print("Crosscorrelations are ", self.crosslet_stat)
print("RCU modes are", self.RCU_modes)
# PROTECTED REGION END # // Crossecho.record_cross
def is_record_cross_allowed(self):
# PROTECTED REGION ID(Crossecho.is_record_cross_allowed) ENABLED START #
return self.get_state() not in [DevState.OFF,DevState.INIT,DevState.FAULT]
# PROTECTED REGION END # // Crossecho.is_record_cross_allowed
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
# PROTECTED REGION ID(Crossecho.main) ENABLED START #
return run((Crossecho,), args=args, **kwargs)
# PROTECTED REGION END # // Crossecho.main
if __name__ == '__main__':
main()