Skip to content
Snippets Groups Projects
Commit c159e1b9 authored by Taya Snijder's avatar Taya Snijder
Browse files

updated ini example and hardware_device function names

parent e8f09f33
No related branches found
No related tags found
1 merge request!24Resolve #2021 "04 16 branched from master ini file device"
......@@ -15,18 +15,19 @@
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
#attribute extention and hardware device imports
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
import numpy
# Additional import
from clients.opcua_connection import OPCUAConnection
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from util.lofar_logging import device_logging_to_python, log_exceptions
import numpy
__all__ = ["APSCTL", "main"]
@device_logging_to_python({"device": "APSCTL"})
class APSCTL(hardware_device):
"""
......@@ -132,7 +133,7 @@ class APSCTL(hardware_device):
UNB2_FPGA_POL_PGM_IOUT_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_PGM_IOUT_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
UNB2_FPGA_POL_PGM_TEMP_R = attribute_wrapper(comms_annotation=["2:UNB2_FPGA_POL_PGM_TEMP_R"], datatype=numpy.double, dims=(N_unb,N_fpga))
@log_exceptions()
def delete_device(self):
"""Hook to delete resources allocated in init_device.
......@@ -148,13 +149,15 @@ class APSCTL(hardware_device):
# --------
# overloaded functions
# --------
def off(self):
@log_exceptions()
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
self.opcua_connection.stop()
def initialise(self):
@log_exceptions()
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
......
# -*- coding: utf-8 -*-
#
# This file wraps around a tango device class and provides a number of abstractions useful for hardware devices. It works together
#
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
"""
"""
# PyTango imports
from tango.server import run
from tango import AttrWriteType
# Additional import
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
__all__ = ["HW_dev"]
class HW_dev(hardware_device):
"""
This class is the minimal (read empty) implementation of a class using 'hardware_device'
"""
# ----------
# Attributes
# ----------
"""
attribute wrapper objects can be declared here. All attribute wrapper objects will get automatically put in a list (attr_list) for easy access
example = attribute_wrapper(comms_annotation="this is an example", datatype=numpy.double, dims=(8, 2), access=AttrWriteType.READ_WRITE)
...
"""
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
pass
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
"""
self.debug_stream("Shutting down...")
self.Off()
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def fault(self):
""" user code here. is called when the state is set to FAULT """
pass
def off(self):
""" user code here. is called when the state is set to OFF """
pass
def on(self):
""" user code here. is called when the state is set to ON """
pass
def standby(self):
""" user code here. is called when the state is set to STANDBY """
pass
def initialise(self):
""" user code here. is called when the sate is set to INIT """
pass
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the hardware device module."""
return run((HW_dev,), args=args, **kwargs)
if __name__ == '__main__':
main()
......@@ -117,13 +117,13 @@ class PCC(hardware_device):
# overloaded functions
# --------
@log_exceptions()
def off(self):
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
self.OPCua_client.stop()
@log_exceptions()
def initialise(self):
def configure_for_initialise(self):
""" user code here. is called when the state is set to INIT """
# Init the dict that contains function to OPC-UA function mappings.
......
......@@ -99,14 +99,14 @@ class SDP(hardware_device):
# overloaded functions
# --------
@log_exceptions()
def off(self):
def configure_for_off(self):
""" user code here. is called when the state is set to OFF """
# Stop keep-alive
self.opcua_connection.stop()
@log_exceptions()
def initialise(self):
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the SDP."""
......
......@@ -15,11 +15,11 @@
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
# Additional import
# Additional import
from clients.SNMP_client import SNMP_client
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
import numpy
......@@ -70,7 +70,7 @@ class SNMP(hardware_device):
ip_route_mask_127_0_0_1_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.4.21.1.11.127.0.0.1", "type": "IpAddress"}, datatype=numpy.str_)
TCP_active_open_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.5.0", "type": "Counter32"}, datatype=numpy.str_)
sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access= AttrWriteType.READ_WRITE)
sys_contact_RW = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
sys_contact_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.1.4.0"}, datatype=numpy.str_)
TCP_Curr_estab_R = attribute_wrapper(comms_annotation={"oids": "1.3.6.1.2.1.6.9.0", "type": "Gauge"}, datatype=numpy.int64)
......@@ -97,7 +97,7 @@ class SNMP(hardware_device):
# --------
# overloaded functions
# --------
def initialise(self):
def configure_for_initialise(self):
""" user code here. is called when the state is set to STANDBY """
# set up the SNMP ua client
......
from util.comms_client import CommClient
import snmp
import numpy
import traceback
__all__ = ["SNMP_client"]
snmp_to_numpy_dict = {
snmp.types.INTEGER: numpy.int64,
snmp.types.TimeTicks: numpy.int64,
snmp.types.OCTET_STRING: str,
snmp.types.OID: str
}
snmp_types = {
"Integer": numpy.int64,
"Gauge": numpy.int64,
"TimeTick": numpy.int64,
"Counter32": numpy.int64,
"OctetString": str,
"IpAddress": str,
"OID": str,
}
# numpy_to_snmp_dict = {
# numpy.int64,
# numpy.int64,
# str,
# }
class SNMP_client(CommClient):
"""
messages to keep a check on the connection. On connection failure, reconnects once.
"""
def start(self):
super().start()
def __init__(self, community, host, timeout, fault_func, streams, try_interval=2):
"""
Create the SNMP and connect() to it
"""
super().__init__(fault_func, streams, try_interval)
self.community = community
self.host = host
self.manager = snmp.Manager(community=bytes(community, "utf8"))
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
fault_func()
return
def connect(self):
"""
Try to connect to the client
"""
self.streams.debug_stream("Connecting to community: %s, host: %s", self.community, self.host)
self.connected = True
return True
def ping(self):
"""
ping the client to make sure the connection with the client is still functional.
"""
pass
def _setup_annotation(self, annotation):
"""
This class's Implementation of the get_mapping function. returns the read and write functions
"""
if isinstance(annotation, dict):
# check if required path inarg is present
if annotation.get('oids') is None:
AssertionError("SNMP get attributes require an oid")
oids = annotation.get("oids") # required
else:
TypeError("SNMP attributes require a dict with oid(s)")
return
if annotation.get('type') is not None:
dtype = annotation.get("type") # required
# actual_type = snmp_types[dtype]
else:
dtype = None
return oids, dtype
def setup_value_conversion(self, attribute):
"""
gives the client access to the attribute_wrapper object in order to access all data it could potentially need.
"""
dim_x = attribute.dim_x
dim_y = attribute.dim_y
return dim_x, dim_y
def get_oids(self, x, y, in_oid):
if x == 0:
x = 1
if y == 0:
y = 1
nof_oids = x * y
if nof_oids == 1:
# is scalar
if type(in_oid) is str:
# for ease of handling put single oid in a 1 element list
in_oid = [in_oid]
return in_oid
elif type(in_oid) is list and len(in_oid) == nof_oids:
# already is an array and of the right length
return in_oid
elif type(in_oid) is list and len(in_oid) != nof_oids:
# already is an array but the wrong length. Unable to handle this
raise ValueError("SNMP oids need to either be a single value or an array the size of the attribute dimensions. got: {} expected: {}x{}={}".format(len(in_oid),x,y,x*y))
else:
out_oids = []
for i in range(nof_oids):
out_oids.append(in_oid + ".{}".format(i+1))
return out_oids
def setup_attribute(self, annotation, attribute):
"""
MANDATORY function: is used by the attribute wrapper to get read/write functions. must return the read and write functions
"""
# process the annotation
oids, dtype = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dim_x, dim_y = self.setup_value_conversion(attribute)
oids = self.get_oids(dim_x, dim_y, oids)
if dtype is None:
def _read_function():
vars = self.manager.get(self.host, *oids)
value = []
for i in vars:
val = snmp_to_numpy_dict[type(i.value)](str(i.value))
value.append(val)
return value
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
else:
for i in range(len(oids)):
self.manager.set(self.host, oids[i], value[i])
else:
def _read_function():
vars = self.manager.get(self.host, *oids)
value = []
for i in vars:
val = snmp_to_numpy_dict[type(i.value)](str(i.value))
value.append(val)
return value
def _write_function(value):
if len(oids) == 1 and type(value) != list:
value = [value]
else:
for i in range(len(oids)):
self.manager.set(self.host, oids[i], snmp_types[dtype](value[i]))
# return the read/write functions
return _read_function, _write_function
from src.comms_client import CommClient
from util.comms_client import CommClient
import configparser
import numpy
......@@ -6,19 +6,28 @@ import numpy
numpy_to_ini_dict = {
numpy.int64: int,
numpy.double: float,
numpy.float64: float,
numpy.bool_: bool,
str: str
}
numpy_to_ini_get_dict = {
numpy.int64: configparser.ConfigParser.getint,
numpy.double: configparser.ConfigParser.getfloat,
numpy.float64: configparser.ConfigParser.getfloat,
numpy.bool_: configparser.ConfigParser.getboolean,
str: str
}
ini_to_numpy_dict = {
int: numpy.int64,
float: numpy.double,
float: numpy.float64,
bool: numpy.bool_,
str: str
str: numpy.str_
}
import os
class ini_client(CommClient):
"""
this class provides an example implementation of a comms_client.
......@@ -50,7 +59,13 @@ class ini_client(CommClient):
def connect(self):
files_path = [os.path.abspath(x) for x in os.listdir()]
self.streams.debug_stream(" %s", files_path)
self.config_file = open(self.filename, "rw")
try:
write_config(self.filename)
self.config_file = open(self.filename, "r")
except FileNotFoundError:
write_config(self.filename)
self.config_file = open(self.filename, "r")
self.connected = True # set connected to true
return True # if succesfull, return true. otherwise return false
......@@ -88,26 +103,48 @@ class ini_client(CommClient):
necessary data such as dimensionality and data type
"""
if attribute.dim_y > 1:
dims = (attribute.dim_y, attribute.dim_x)
else:
dims = (attribute.dim_x,)
dim_y = attribute.dim_y
dim_x = attribute.dim_x
dtype = attribute.numpy_type
return dims, dtype
return dim_y, dim_x, dtype
def _setup_mapping(self, name, section, dtype):
def _setup_mapping(self, name, section, dtype, dim_y, dim_x):
"""
takes all gathered data to configure and return the correct read and write functions
"""
def read_function():
self.config.read_file(self.config_file)
value = self.config.get(section, name)
value = ini_to_numpy_dict[dtype](value)
print("pre conversion: ", value, type(value)) # pre conversion
value = array(value, dtype)
# convert values from buildin type to numpy type
value = dtype(value)
print("post conversion: ", value, type(value)) # pre conversion
if dim_y > 1:
# if data is an image, slice it according to the y dimensions
value = numpy.array(numpy.split(numpy.array(value), indices_or_sections=dim_y))
return value
def write_function(write_value):
def write_function(value):
if type(value) is list:
write_value = ""
for i in value:
write_value = write_value + str(value) + ", "
# strip the extra ", " at the end
write_value = write_value[:-2]
else:
write_value = str(value)
self.config.read_file(self.config_file)
self.config.set(section, name, write_value)
fp = open(self.filename, 'w')
self.config.write(fp)
......@@ -124,38 +161,72 @@ class ini_client(CommClient):
section, name = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dims, dtype = self._setup_value_conversion(attribute)
dim_y, dim_x, dtype = self._setup_value_conversion(attribute)
# configure and return the read/write functions
read_function, write_function = self._setup_mapping(name, section, dtype)
read_function, write_function = self._setup_mapping(name, section, dtype, dim_y, dim_x)
# return the read/write functions
return read_function, write_function
def array(string, dtype):
array = []
if dtype is numpy.bool_:
# Handle special case for Bools
for i in string.split(","):
i.replace("", "")
if "True" in i:
array.append(True)
elif "False" in i:
array.append(False)
else:
raise ValueError("String to bool failed. String is not True/False, but is: '{}'".format(i))
elif dtype is numpy.str_:
for i in string.split(","):
value = i
array.append(value)
else:
# regular case, go through the separator
for i in string.split(","):
i.replace(" ", "")
value = dtype(i)
array.append(value)
return array
def write_config(filename):
with open(filename, 'w') as configfile:
config = configparser.ConfigParser()
config['scalar'] = {}
config['scalar']['double_scalar_R'] = '1.2'
config['scalar']['double_scalar_RW'] = '3.4'
config['scalar']['bool_scalar_R'] = 'True'
config['scalar']['bool_scalar_RW'] = 'False'
config['scalar']['int_scalar_R'] = '5'
config['scalar']['int_scalar_RW'] = '6'
config['scalar']['str_scalar_R'] = 'this is'
config['scalar']['str_scalar_RW'] = 'a test'
config['spectrum'] = {}
config['spectrum']['double_spectrum_R'] = '1.2, 2.3, 3.4, 4.5'
config['spectrum']['double_spectrum_RW'] = '5.6, 6.7, 7.8, 9.0'
config['spectrum']['bool_spectrum_R'] = 'True, True, False, False'
config['spectrum']['bool_spectrum_RW'] = 'False, False, True, True'
config['spectrum']['int_spectrum_R'] = '1, 2, 3, 4'
config['spectrum']['int_spectrum_RW'] = '6, 7, 8, 9'
config['spectrum']['str_spectrum_R'] = '"a", "b", "c", "d"'
config['spectrum']['str_spectrum_RW'] = '"D", "E", "F", "G"'
config['image'] = {}
config['image']['double_image_R'] = '1.2, 2.3, 3.4, 4.5, 5.6, 6.7'
config['image']['double_image_RW'] = '5.6, 6.7, 7.8, 9.0, 1.2, 3.4'
config['image']['bool_image_R'] = 'True, True, False, False, True, False'
config['image']['bool_image_RW'] = 'False, False, True, True, False, Trie'
config['image']['int_image_R'] = '1, 2, 3, 4, 5, 6'
config['image']['int_image_RW'] = '6, 7, 8, 9, 10, 11'
config['image']['str_image_R'] = '"a", "b", "c", "d", "e", "f"'
config['image']['str_image_RW'] = '"D", "E", "F", "G", "H", "I"'
def write_config():
config = configparser.ConfigParser()
config['scalar'] = {}
config['scalar']['double_scalar'] = '1.2'
config['scalar']['double_scalar'] = '3.4'
config['scalar']['bool_scalar'] = 'True'
config['scalar']['bool_scalar'] = 'False'
config['scalar']['int_scalar'] = '5'
config['scalar']['int_scalar'] = '6'
config['scalar']['str_scalar'] = 'this is'
config['scalar']['str_scalar'] = 'a test'
config['spectrum'] = {}
config['spectrum']['double_scalar'] = '[1.2, 2.3, 3.4]'
config['spectrum']['double_scalar'] = '[5.6, 6.7, 7.8]'
config['spectrum']['bool_scalar'] = '[True, True, False]'
config['spectrum']['bool_scalar'] = '[False, False, True]'
config['spectrum']['int_scalar'] = '[5'
config['spectrum']['int_scalar'] = '[6,7,8,9]'
config['spectrum']['str_scalar'] = '["a", "b", "c"]'
config['spectrum']['str_scalar'] = '["D", "E", "F"]'
with open('example.ini', 'w') as configfile:
config.write(configfile)
......@@ -15,10 +15,8 @@ from tango.server import device_property
from tango import AttrWriteType
from tango import DevState
# Additional import
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
from util.attribute_wrapper import attribute_wrapper
from util.hardware_device import hardware_device
from clients.ini_client import *
......@@ -41,32 +39,37 @@ class ini_device(hardware_device):
...
"""
double_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar"}, datatype=numpy.double, access=AttrWriteType.READ_WRITE)
double_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar"}, datatype=numpy.double)
bool_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
bool_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar"}, datatype=numpy.bool_)
int_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar"}, datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
int_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar"}, datatype=numpy.int64)
str_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE)
str_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar"}, datatype=numpy.str)
double_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum"}, datatype=numpy.double, dims=(4,), access=AttrWriteType.READ_WRITE)
double_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum"}, datatype=numpy.double, dims=(4,))
bool_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum"}, datatype=numpy.bool_, dims=(4,), access=AttrWriteType.READ_WRITE)
bool_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum"}, datatype=numpy.bool_, dims=(4,))
int_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum"}, datatype=numpy.int64, dims=(4,), access=AttrWriteType.READ_WRITE)
int_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum"}, datatype=numpy.int64, dims=(4,))
str_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum"}, datatype=numpy.str, dims=(4,), access=AttrWriteType.READ_WRITE)
str_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum"}, datatype=numpy.str, dims=(4,))
double_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image"}, datatype=numpy.double, dims=(3, 2), access=AttrWriteType.READ_WRITE)
double_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image"}, datatype=numpy.double, dims=(3, 2))
bool_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image"}, datatype=numpy.bool_, dims=(3, 2), access=AttrWriteType.READ_WRITE)
bool_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image"}, datatype=numpy.bool_, dims=(3, 2))
int_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image"}, datatype=numpy.int64, dims=(3, 2), access=AttrWriteType.READ_WRITE)
int_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image"}, datatype=numpy.int64, dims=(3, 2))
str_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image"}, datatype=numpy.str, dims=(3, 2), access=AttrWriteType.READ_WRITE)
str_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image"}, datatype=numpy.str, dims=(3, 2))
double_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar_RW"}, datatype=numpy.double, access=AttrWriteType.READ_WRITE)
double_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar_R"}, datatype=numpy.double)
bool_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar_RW"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
bool_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar_R"}, datatype=numpy.bool_)
int_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar_RW"}, datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
int_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar_R"}, datatype=numpy.int64)
str_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar_RW"}, datatype=numpy.str_, access=AttrWriteType.READ_WRITE)
str_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar_R"}, datatype=numpy.str_)
double_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum_RW"}, datatype=numpy.double, dims=(4,), access=AttrWriteType.READ_WRITE)
double_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum_R"}, datatype=numpy.double, dims=(4,))
bool_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum_RW"}, datatype=numpy.bool_, dims=(4,), access=AttrWriteType.READ_WRITE)
bool_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum_R"}, datatype=numpy.bool_, dims=(4,))
int_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum_RW"}, datatype=numpy.int64, dims=(4,), access=AttrWriteType.READ_WRITE)
int_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum_R"}, datatype=numpy.int64, dims=(4,))
str_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum_RW"}, datatype=numpy.str_, dims=(4,), access=AttrWriteType.READ_WRITE)
str_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum_R"}, datatype=numpy.str_, dims=(4,))
double_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image_RW"}, datatype=numpy.double, dims=(3, 2), access=AttrWriteType.READ_WRITE)
double_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image_R"}, datatype=numpy.double, dims=(3, 2))
bool_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image_RW"}, datatype=numpy.bool_, dims=(3, 2), access=AttrWriteType.READ_WRITE)
bool_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image_R"}, datatype=numpy.bool_, dims=(3, 2))
int_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image_RW"}, datatype=numpy.int64, dims=(3, 2), access=AttrWriteType.READ_WRITE)
int_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image_R"}, datatype=numpy.int64, dims=(3, 2))
str_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image_RW"}, datatype=numpy.str_, dims=(3, 2), access=AttrWriteType.READ_WRITE)
str_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image_R"}, datatype=numpy.str_, dims=(3, 2))
@classmethod
def attr_list(cls):
""" Return a list of all the attribute_wrapper members of this class. """
return [v for k, v in cls.__dict__.items() if type(v) == attribute_wrapper]
def always_executed_hook(self):
......@@ -88,14 +91,14 @@ class ini_device(hardware_device):
# --------
# overloaded functions
# --------
def initialise(self):
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
self.set_state(DevState.INIT)
# set up the OPC ua client
self.ini_client = ini_client("example/example.ini", self.Fault, self)
self.ini_client = ini_client("example.ini", self.Fault, self)
# map an access helper class
for i in self.attr_list():
......
......@@ -64,7 +64,7 @@ class test_device(hardware_device):
# --------
# overloaded functions
# --------
def initialise(self):
def configure_for_initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
......
......@@ -23,7 +23,7 @@ __all__ = ["hardware_device"]
from util.wrappers import only_in_states, fault_on_error
#@log_exceptions
#@log_exceptions()
class hardware_device(Device):
"""
......@@ -85,7 +85,7 @@ class hardware_device(Device):
self.set_state(DevState.INIT)
self.setup_value_dict()
self.initialise()
self.configure_for_initialise()
self.set_state(DevState.STANDBY)
......@@ -100,7 +100,7 @@ class hardware_device(Device):
:return:None
"""
self.on()
self.configure_for_on()
self.set_state(DevState.ON)
@command()
......@@ -119,7 +119,7 @@ class hardware_device(Device):
# Turn off
self.set_state(DevState.OFF)
self.off()
self.configure_for_off()
# Turn off again, in case of race conditions through reconnecting
self.set_state(DevState.OFF)
......@@ -138,18 +138,18 @@ class hardware_device(Device):
:return:None
"""
self.fault()
self.configure_for_fault()
self.set_state(DevState.FAULT)
# functions that can be overloaded
def fault(self):
def configure_for_fault(self):
pass
def off(self):
def configure_for_off(self):
pass
def on(self):
def configure_for_on(self):
pass
def initialise(self):
def configure_for_initialise(self):
pass
def always_executed_hook(self):
......
......@@ -63,7 +63,7 @@ def log_exceptions():
try:
return func(self, *args, **kwargs)
except Exception as e:
self.error_stream("Caught exception: %s: %s", e.__class__.__name__, e, exc_info=1)
self.error_stream("Caught exception: %s: %s", e.__class__.__name__, e)
raise e
return inner
......
......@@ -12,7 +12,7 @@ def startup(device: str, force_restart: bool):
if force_restart is True:
print("Forcing device {} restart.".format(device))
proxy.off()
proxy.configure_for_off()
state = proxy.state()
if state is not tango._tango.DevState.OFF:
print("Device {} cannot perform off although restart has been enforced, state = {}. Please investigate.".format(device, state))
......@@ -21,13 +21,13 @@ def startup(device: str, force_restart: bool):
print("Device {} is not in OFF state, cannot start it. state = {}".format(device, state))
return proxy
print("Device {} is in OFF, performing initialisation.".format(device))
proxy.initialise()
proxy.configure_for_initialise()
state = proxy.state()
if state is not tango._tango.DevState.STANDBY:
print("Device {} cannot perform initialise, state = {}. Please investigate.".format(device, state))
return proxy
print("Device {} is in STANDBY, performing on.".format(device))
proxy.on()
proxy.configure_for_on()
state = proxy.state()
if state is not tango._tango.DevState.ON:
print("Device {} cannot perform on, state = {}. Please investigate.".format(device, state))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment