Skip to content
Snippets Groups Projects
Commit d0b217f5 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'L2SS-188_2021-05-27-Branched_from_master-UDP_recv-SST_client' into 'master'

Resolve L2SS-188 "2021 05 27 branched from master udp recv sst client"

Closes L2SS-188

See merge request !50
parents 6341a544 65c8c87d
No related branches found
No related tags found
1 merge request!50Resolve L2SS-188 "2021 05 27 branched from master udp recv sst client"
......@@ -36,13 +36,6 @@
"1": {
"example_device": {
"LTS/example_device/1": {
"attribute_properties": {
"Ant_mask_RW": {
"archive_period": [
"600000"
]
}
},
"properties": {
"OPC_Server_Name": [
"host.docker.internal"
......@@ -62,13 +55,6 @@
"1": {
"ini_device": {
"LTS/ini_device/1": {
"attribute_properties": {
"Ant_mask_RW": {
"archive_period": [
"600000"
]
}
},
"properties": {
"OPC_Server_Name": [
"host.docker.internal"
......@@ -88,13 +74,6 @@
"1": {
"APSCTL": {
"LTS/APSCTL/1": {
"attribute_properties": {
"Ant_mask_RW": {
"archive_period": [
"600000"
]
}
},
"properties": {
"OPC_Server_Name": [
"ltspi.astron.nl"
......@@ -110,17 +89,23 @@
}
}
},
"Statistics": {
"1": {
"SST": {
"LTS/SST/1": {
"properties": {
"SST_Port": [
"5001"
]
}
}
}
}
},
"SNMP": {
"1": {
"SNMP": {
"LTS/SNMP/1": {
"attribute_properties": {
"Ant_mask_RW": {
"archive_period": [
"600000"
]
}
},
"properties": {
"SNMP_community": [
"public"
......
# -*- coding: utf-8 -*-
#
# This file is part of the Statistics project
#
#
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" Statistics Device Server for LOFAR2.0
"""
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
# Additional import
from clients.sst_client import sst_client
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_logging import device_logging_to_python, log_exceptions
import numpy
__all__ = ["SST", "main"]
@device_logging_to_python({"device": "SST"})
class SST(hardware_device):
# -----------------
# Device Properties
# -----------------
SST_Port = device_property(
dtype='DevUShort',
mandatory=True
)
# ----------
# Attributes
# ----------
# --------
# SST client annotation consists of a dict that contains the parameter name that needs to be read.
# Example: comms_annotation={"parameter": "this_value_R"}
packet_count_R = attribute_wrapper(comms_annotation={"parameter": "packet_count_R"}, datatype=numpy.int64)
last_packet_timestamp_R = attribute_wrapper(comms_annotation={"parameter": "last_packet_timestamp_R"}, datatype=numpy.int64)
queue_percentage_used_R = attribute_wrapper(comms_annotation={"parameter": "queue_percentage_used_R"}, datatype=numpy.double)
# --------
# overloaded functions
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
try:
self.sst_client.stop()
except Exception as e:
self.warn_stream("Exception while stopping sst_client in configure_for_off function: {}. Exception ignored".format(e))
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the statistics device."""
self.sst_client = sst_client("0.0.0.0", self.SST_Port, self.Fault, self)
# map an access helper class
for i in self.attr_list():
try:
i.set_comm_client(self.sst_client)
except Exception as e:
# use the pass function instead of setting read/write fails
i.set_pass_func()
self.warn_stream("error while setting the sst attribute {} read/write function. {}. using pass function instead".format(i, e))
pass
self.sst_client.start()
# --------
# Commands
# --------
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the Statistics Device module."""
return run((SST,), args=args, **kwargs)
if __name__ == '__main__':
main()
import queue
from threading import Thread
import socket
from util.comms_client import CommClient
from queue import Queue
import numpy
import logging
import socket
from datetime import datetime
from multiprocessing import Value, Array
import time
__all__ = ["sst_client"]
class sst_client(CommClient):
"""
Connects to OPC-UA in the foreground or background, and sends HELLO
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, host, port, fault_func, streams, try_interval=2, queuesize=1024, buffersize=9000):
"""
Create the sst client and connect() to it and get the object node
"""
self.host = host
self.port = port
self.timeout = 0.1
self.buffersize = buffersize
self.queuesize = queuesize
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Function used to connect to the client.
"""
if not self.connected:
self.queue = Queue(maxsize=self.queuesize)
self.udp = UDP_Receiver(self.host, self.port, self.queue, self.streams, self.buffersize, self.timeout)
self.sst = SST(self.queue, self.streams)
return super().connect()
def disconnect(self):
del self.udp
del self.sst
del self.queue
return super().disconnect()
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
parameter = annotation.get('parameter', None)
if parameter is None:
raise Exception("No SST parameter was given in the annotation: %s", annotation)
return parameter
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
the OPC ua read/write functions require the dimensionality and the type to be known
"""
return
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
SST_param = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
self.setup_value_conversion(attribute)
def read_function():
return [self.sst.parameters[SST_param]]
def write_function(value):
"""
Not used here
"""
pass
return read_function, write_function
class UDP_Receiver(Thread):
"""
This class provides a small wrapper for the OPC ua read/write functions in order to better organise the code
"""
def __init__(self, host, port, queue, streams, buffersize=9000, timeout=0.1):
self.streams = streams
self.queue = queue
self.host = host
self.port = port
self.buffersize = buffersize
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.bind((self.host, self.port))
self.sock.settimeout(timeout)
self.stream_on = True
super().__init__()
self.start()
def run(self):
# all variables are manually defined and are updated each time
self.streams.debug_stream("starting UDP thread with port {} and host {}".format(self.port, self.host))
while self.stream_on:
try:
packet = [bytearray(self.buffersize)]
self.sock.recvmsg_into(packet[0:self.buffersize])
self.queue.put(packet)
except socket.timeout:
pass
except queue.Full:
pass
def __del__(self):
self.stream_on = False
self.join()
class SST(Thread):
def __init__(self, queue, streams):
self.streams = streams
self.queue = queue
self.last_packet = None
self.parameters = {
"packet_count_R": numpy.int64(0),
"last_packet_timestamp_R": numpy.int64(0),
"queue_percentage_used_R": numpy.double(100 * self.queue.qsize() / self.queue.maxsize)
}
super().__init__()
self.start()
def run(self):
self.streams.debug_stream("starting SST thread")
while True:
packet = self.queue.get()
if packet is None:
self.queue.clear()
break
self.process_packet(packet)
def __del__(self):
self.queue.put(None)
self.join()
def process_packet(self, packet):
self.parameters["packet_count_R"] += 1
self.parameters["last_packet_timestamp_R"] = numpy.int64(int(time.time()))
self.parameters["queue_percentage_used_R"] = numpy.double(100 * self.queue.qsize() / self.queue.maxsize)
\ No newline at end of file
import socket
import time
i = 1
UDP_IP = "127.0.0.1"
UDP_PORT = 5001
MESSAGE = "{}".format(i)
print("UDP target IP: %s" % UDP_IP)
print("UDP target port: %s" % UDP_PORT)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # create UDP socket
while True:
sock.sendto(bytes(MESSAGE, "utf-8"), (UDP_IP, UDP_PORT))
i += 1
MESSAGE = "{}".format(i)
#sleep for an arbitrary amount of time. Currently 0.2 settings for visual testing.
time.sleep(0.2)
#
# Docker compose file that launches an interactive iTango session.
#
# Connect to the interactive session with 'docker attach itango'.
# Disconnect with the Docker deattach sequence: <CTRL>+<P> <CTRL>+<Q>
#
# Defines:
# - itango: iTango interactive session
#
# Requires:
# - lofar-device-base.yml
#
version: '2'
services:
device-statistics:
image: device-statistics
# build explicitly, as docker-compose does not understand a local image
# being shared among services.
build:
context: lofar-device-base
container_name: ${CONTAINER_NAME_PREFIX}device-statistics
network_mode: ${NETWORK_MODE}
volumes:
- ${TANGO_LOFAR_CONTAINER_MOUNT}
environment:
- TANGO_HOST=${TANGO_HOST}
entrypoint:
- /usr/local/bin/wait-for-it.sh
- ${TANGO_HOST}
- --timeout=30
- --strict
- --
- python3 -u ${TANGO_LOFAR_CONTAINER_DIR}/devices/Statistics.py LTS -v
restart: on-failure
......@@ -23,6 +23,10 @@ services:
volumes:
- ${TANGO_LOFAR_CONTAINER_MOUNT}
- ${HOME}:/hosthome
ports:
- "5001:5001/udp"
expose:
- "5001/udp"
environment:
- TANGO_HOST=${TANGO_HOST}
- XAUTHORITY=${XAUTHORITY}
......
%% Cell type:code id:waiting-chance tags:
``` python
import time
import numpy
```
%% Cell type:code id:moving-alexandria tags:
``` python
d=DeviceProxy("LTS/test_device/1")
d=DeviceProxy("LTS/SST/1")
```
%% Cell type:code id:ranking-aluminum tags:
``` python
state = str(d.state())
if state == "OFF":
d.initialise()
time.sleep(1)
state = str(d.state())
if state == "STANDBY":
d.on()
state = str(d.state())
if state == "ON":
print("Device is now in on state")
```
%% Output
Device is now in on state
%% Cell type:code id:beneficial-evidence tags:
``` python
attr_names = d.get_attribute_list()
for i in attr_names:
exec("value = print(i, d.{})".format(i))
```
%% Output
bool_scalar_R [False]
bool_scalar_RW [False]
int64_spectrum_R [0 0 0 0 0 0 0 0]
str_spectrum_RW ('', '', '', '', '', '', '', '')
double_image_R [[0. 0.]
[0. 0.]
[0. 0.]
[0. 0.]
[0. 0.]
[0. 0.]
[0. 0.]
[0. 0.]]
double_image_RW [[0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0.]]
int32_scalar_R [0]
uint16_spectrum_RW [0 0 0 0 0 0 0 0]
float32_image_R [[0. 0. 0. 0. 0. 0. 0. 0.]
[0. 0. 0. 0. 0. 0. 0. 0.]]
uint8_image_RW [[0 0]
[0 0]
[0 0]
[0 0]
[0 0]
[0 0]
[0 0]
[0 0]]
tr_tod_R [0]
tr_uptime_R [0]
State <function __get_command_func.<locals>.f at 0x7f1c88a29e18>
Status <function __get_command_func.<locals>.f at 0x7f1c88a5abf8>
packet_count_R [55]
last_packet_timestamp_R [1623249385]
queue_percentage_used_R [0.]
State <function __get_command_func.<locals>.f at 0x7fcb205fd0d0>
Status <function __get_command_func.<locals>.f at 0x7fcb205fd0d0>
%% Cell type:code id:sporting-current tags:
``` python
d.RCU_mask_RW = [False, False, False, False, False, False, False, False, False, False, False, False,
False, False, False, False, False, False, False, False, False, False, False, False,
False, False, False, False, False, False, False, False,]
time.sleep(1)
print(d.RCU_mask_RW)
monitor_rate = d.RCU_monitor_rate_RW
print("current monitoring rate: {}, setting to {}".format(monitor_rate, monitor_rate + 1))
monitor_rate = monitor_rate + 1
time.sleep(1)
```
%% Output
3.0
%% Cell type:code id:sharing-mechanics tags:
``` python
```
%% Cell type:code id:ruled-tracy tags:
``` python
```
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment