Skip to content
Snippets Groups Projects
Commit 7c6bbb2d authored by Taya Snijder's avatar Taya Snijder
Browse files

removed a few stray files

parent 75a0de8a
No related branches found
No related tags found
1 merge request!27Resolve #2021 "04 16 branched from master apsctl device"
from src.comms_client import CommClient
import configparser
import numpy
numpy_to_ini_dict = {
numpy.int64: int,
numpy.double: float,
numpy.bool_: bool,
str: str
ini_to_numpy_dict = {
int: numpy.int64,
float: numpy.double,
bool: numpy.bool_,
str: str
import os
class ini_client(CommClient):
this class provides an example implementation of a comms_client.
Durirng initialisation it creates a correctly shaped zero filled value. on read that value is returned and on write its modified.
def start(self):
def __init__(self, filename, fault_func, streams, try_interval=2):
initialises the class and tries to connect to the client.
self.config = configparser.ConfigParser()
self.filename = filename
if not filename.endswith(".ini"):
filename = filename + ".ini"
super().__init__(fault_func, streams, try_interval)
# Explicitly connect
if not self.connect():
# hardware or infra is down -- needs fixing first
def connect(self):
files_path = [os.path.abspath(x) for x in os.listdir()]
self.streams.debug_stream(" %s", files_path)
self.config_file = open(self.filename, "rw")
self.connected = True # set connected to true
return True # if succesfull, return true. otherwise return false
def disconnect(self):
self.connected = False # always force a reconnect, regardless of a successful disconnect
self.streams.debug_stream("disconnected from the 'client' ")
def _setup_annotation(self, annotation):
this function gives the client access to the comm client annotation data given to the attribute wrapper.
The annotation data can be used to provide whatever extra data is necessary in order to find/access the monitor/control point.
the annotation can be in whatever format may be required. it is up to the user to handle its content
example annotation may include:
- a file path and file line/location
- COM object path
# as this is an example, just print the annotation
self.streams.debug_stream("annotation: {}".format(annotation))
name = annotation.get('name')
if name is None:
AssertionError("ini client requires a variable name to set/get")
section = annotation.get('section')
if section is None:
AssertionError("requires a section to open")
return section, name
def _setup_value_conversion(self, attribute):
gives the client access to the attribute_wrapper object in order to access all
necessary data such as dimensionality and data type
if attribute.dim_y > 1:
dims = (attribute.dim_y, attribute.dim_x)
dims = (attribute.dim_x,)
dtype = attribute.numpy_type
return dims, dtype
def _setup_mapping(self, name, section, dtype):
takes all gathered data to configure and return the correct read and write functions
def read_function():
value = self.config.get(section, name)
value = ini_to_numpy_dict[dtype](value)
return value
def write_function(write_value):
self.config.set(section, name, write_value)
fp = open(self.filename, 'w')
return read_function, write_function
def setup_attribute(self, annotation=None, attribute=None):
MANDATORY function: is used by the attribute wrapper to get read/write functions.
must return the read and write functions
# process the comms_annotation
section, name = self._setup_annotation(annotation)
# get all the necessary data to set up the read/write functions from the attribute_wrapper
dims, dtype = self._setup_value_conversion(attribute)
# configure and return the read/write functions
read_function, write_function = self._setup_mapping(name, section, dtype)
# return the read/write functions
return read_function, write_function
def write_config():
config = configparser.ConfigParser()
config['scalar'] = {}
config['scalar']['double_scalar'] = '1.2'
config['scalar']['double_scalar'] = '3.4'
config['scalar']['bool_scalar'] = 'True'
config['scalar']['bool_scalar'] = 'False'
config['scalar']['int_scalar'] = '5'
config['scalar']['int_scalar'] = '6'
config['scalar']['str_scalar'] = 'this is'
config['scalar']['str_scalar'] = 'a test'
config['spectrum'] = {}
config['spectrum']['double_scalar'] = '[1.2, 2.3, 3.4]'
config['spectrum']['double_scalar'] = '[5.6, 6.7, 7.8]'
config['spectrum']['bool_scalar'] = '[True, True, False]'
config['spectrum']['bool_scalar'] = '[False, False, True]'
config['spectrum']['int_scalar'] = '[5'
config['spectrum']['int_scalar'] = '[6,7,8,9]'
config['spectrum']['str_scalar'] = '["a", "b", "c"]'
config['spectrum']['str_scalar'] = '["D", "E", "F"]'
with open('example.ini', 'w') as configfile:
# -*- coding: utf-8 -*-
# This file wraps around a tango device class and provides a number of abstractions useful for hardware devices. It works together
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
# PyTango imports
from tango.server import run
from tango.server import device_property
from tango import AttrWriteType
from tango import DevState
# Additional import
from src.attribute_wrapper import attribute_wrapper
from src.hardware_device import hardware_device
from clients.ini_client import *
__all__ = ["ini_device"]
class ini_device(hardware_device):
This class is the minimal (read empty) implementation of a class using 'hardware_device'
# ----------
# Attributes
# ----------
attribute wrapper objects can be declared here. All attribute wrapper objects will get automatically put in a list (attr_list) for easy access
example = attribute_wrapper(comms_annotation="this is an example", datatype=numpy.double, dims=(8, 2), access=AttrWriteType.READ_WRITE)
double_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar"}, datatype=numpy.double, access=AttrWriteType.READ_WRITE)
double_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "double_scalar"}, datatype=numpy.double)
bool_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar"}, datatype=numpy.bool_, access=AttrWriteType.READ_WRITE)
bool_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "bool_scalar"}, datatype=numpy.bool_)
int_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar"}, datatype=numpy.int64, access=AttrWriteType.READ_WRITE)
int_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "int_scalar"}, datatype=numpy.int64)
str_scalar_RW = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar"}, datatype=numpy.str, access=AttrWriteType.READ_WRITE)
str_scalar_R = attribute_wrapper(comms_annotation={"section": "scalar", "name": "str_scalar"}, datatype=numpy.str)
double_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum"}, datatype=numpy.double, dims=(4,), access=AttrWriteType.READ_WRITE)
double_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "double_spectrum"}, datatype=numpy.double, dims=(4,))
bool_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum"}, datatype=numpy.bool_, dims=(4,), access=AttrWriteType.READ_WRITE)
bool_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "bool_spectrum"}, datatype=numpy.bool_, dims=(4,))
int_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum"}, datatype=numpy.int64, dims=(4,), access=AttrWriteType.READ_WRITE)
int_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "int_spectrum"}, datatype=numpy.int64, dims=(4,))
str_spectrum_RW = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum"}, datatype=numpy.str, dims=(4,), access=AttrWriteType.READ_WRITE)
str_spectrum_R = attribute_wrapper(comms_annotation={"section": "spectrum", "name": "str_spectrum"}, datatype=numpy.str, dims=(4,))
double_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image"}, datatype=numpy.double, dims=(3, 2), access=AttrWriteType.READ_WRITE)
double_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "double_image"}, datatype=numpy.double, dims=(3, 2))
bool_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image"}, datatype=numpy.bool_, dims=(3, 2), access=AttrWriteType.READ_WRITE)
bool_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "bool_image"}, datatype=numpy.bool_, dims=(3, 2))
int_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image"}, datatype=numpy.int64, dims=(3, 2), access=AttrWriteType.READ_WRITE)
int_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "int_image"}, datatype=numpy.int64, dims=(3, 2))
str_image_RW = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image"}, datatype=numpy.str, dims=(3, 2), access=AttrWriteType.READ_WRITE)
str_image_R = attribute_wrapper(comms_annotation={"section": "image", "name": "str_image"}, datatype=numpy.str, dims=(3, 2))
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
self.debug_stream("Shutting down...")
self.debug_stream("Shut down. Good bye.")
# --------
# overloaded functions
# --------
def initialise(self):
""" user code here. is called when the sate is set to INIT """
"""Initialises the attributes and properties of the PCC."""
# set up the OPC ua client
self.ini_client = ini_client("example/example.ini", self.Fault, self)
# map an access helper class
for i in self.attr_list():
# ----------
# Run server
# ----------
def main(args=None, **kwargs):
"""Main function of the hardware device module."""
return run((ini_device,), args=args, **kwargs)
if __name__ == '__main__':
# -*- coding: utf-8 -*-
# This file is part of the PCC project
# Distributed under the terms of the APACHE license.
# See LICENSE.txt for more info.
""" PCC Device Server for LOFAR2.0
# PyTango imports
from tango.server import Device, command
from tango import DevState, DebugIt
# Additional import
from util.attribute_wrapper import attribute_wrapper
from util.lofar_logging import log_exceptions
__all__ = ["hardware_device"]
from util.wrappers import only_in_states, fault_on_error
class hardware_device(Device):
States are as follows:
INIT = Device is initialising.
STANDBY = Device is initialised, but pends external configuration and an explicit turning on,
ON = Device is fully configured, functional, controls the hardware, and is possibly actively running,
FAULT = Device detected an unrecoverable error, and is thus malfunctional,
OFF = Device is turned off, drops connection to the hardware,
The following state transitions are implemented:
boot -> OFF: Triggered by tango. Device will be instantiated,
OFF -> INIT: Triggered by device. Device will initialise (connect to hardware, other devices),
INIT -> STANDBY: Triggered by device. Device is initialised, and is ready for additional configuration by the user,
STANDBY -> ON: Triggered by user. Device reports to be functional,
* -> FAULT: Triggered by device. Device has degraded to malfunctional, for example because the connection to the hardware is lost,
* -> FAULT: Triggered by user. Emulate a forced malfunction for integration testing purposes,
* -> OFF: Triggered by user. Device is turned off. Triggered by the Off() command,
FAULT -> INIT: Triggered by user. Device is reinitialised to recover from an error,
The user triggers their transitions by the commands reflecting the target state (Initialise(), On(), Fault()).
def attr_list(cls):
""" Return a list of all the attribute_wrapper members of this class. """
return [v for k, v in cls.__dict__.items() if type(v) == attribute_wrapper]
def setup_value_dict(self):
""" set the initial value for all the attribute wrapper objects"""
self.value_dict = {i: i.initial_value() for i in self.attr_list()}
def init_device(self):
""" Instantiates the device in the OFF state. """
# NOTE: Will delete_device first, if necessary
# --------
# Commands
# --------
@only_in_states([DevState.FAULT, DevState.OFF])
def Initialise(self):
Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.
def On(self):
Command to ask for initialisation of this device. Can only be called in FAULT or OFF state.
def Off(self):
Command to ask for shutdown of this device.
if self.get_state() == DevState.OFF:
# Already off. Don't complain.
# Turn off
# Turn off again, in case of race conditions through reconnecting
@only_in_states([DevState.ON, DevState.INIT, DevState.STANDBY])
def Fault(self):
FAULT state is used to indicate our connection with the OPC-UA server is down.
This device will try to reconnect once, and transition to the ON state on success.
If reconnecting fails, the user needs to call Initialise() to retry to restart this device.
# functions that can be overloaded
def fault(self):
def off(self):
def on(self):
def initialise(self):
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command (a Tango built-in).
self.debug_stream("Shutting down...")
self.debug_stream("Shut down. Good bye.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment