Skip to content
Snippets Groups Projects

Resolve #2021 "04 16 branched from master ini file device"

Merged Taya Snijder requested to merge 2021-04-16-Branched_from_master-INI_file_device into master
Files
16
+ 237
0
 
from util.comms_client import CommClient
 
import configparser
 
import numpy
 
 
 
numpy_to_ini_dict = {
 
numpy.int64: int,
 
numpy.double: float,
 
numpy.float64: float,
 
numpy.bool_: bool,
 
str: str
 
}
 
 
numpy_to_ini_get_dict = {
 
numpy.int64: configparser.ConfigParser.getint,
 
numpy.double: configparser.ConfigParser.getfloat,
 
numpy.float64: configparser.ConfigParser.getfloat,
 
numpy.bool_: configparser.ConfigParser.getboolean,
 
str: str
 
}
 
 
ini_to_numpy_dict = {
 
int: numpy.int64,
 
float: numpy.float64,
 
bool: numpy.bool_,
 
str: numpy.str_
 
}
 
 
import os
 
 
class ini_client(CommClient):
 
"""
 
this class provides an example implementation of a comms_client.
 
Durirng initialisation it creates a correctly shaped zero filled value. on read that value is returned and on write its modified.
 
"""
 
 
def start(self):
 
super().start()
 
 
def __init__(self, filename, fault_func, streams, try_interval=2):
 
"""
 
initialises the class and tries to connect to the client.
 
"""
 
self.config = configparser.ConfigParser()
 
self.filename = filename
 
 
if not filename.endswith(".ini"):
 
filename = filename + ".ini"
 
 
 
super().__init__(fault_func, streams, try_interval)
 
 
# Explicitly connect
 
if not self.connect():
 
# hardware or infra is down -- needs fixing first
 
fault_func()
 
return
 
 
def connect(self):
 
files_path = [os.path.abspath(x) for x in os.listdir()]
 
self.streams.debug_stream(" %s", files_path)
 
 
try:
 
write_config(self.filename)
 
self.config_file = open(self.filename, "r")
 
except FileNotFoundError:
 
write_config(self.filename)
 
self.config_file = open(self.filename, "r")
 
 
self.connected = True # set connected to true
 
return True # if succesfull, return true. otherwise return false
 
 
def disconnect(self):
 
self.connected = False # always force a reconnect, regardless of a successful disconnect
 
self.streams.debug_stream("disconnected from the 'client' ")
 
 
def _setup_annotation(self, annotation):
 
"""
 
this function gives the client access to the comm client annotation data given to the attribute wrapper.
 
The annotation data can be used to provide whatever extra data is necessary in order to find/access the monitor/control point.
 
 
the annotation can be in whatever format may be required. it is up to the user to handle its content
 
example annotation may include:
 
- a file path and file line/location
 
- COM object path
 
"""
 
 
# as this is an example, just print the annotation
 
self.streams.debug_stream("annotation: {}".format(annotation))
 
name = annotation.get('name')
 
if name is None:
 
AssertionError("ini client requires a variable name to set/get")
 
section = annotation.get('section')
 
if section is None:
 
AssertionError("requires a section to open")
 
 
return section, name
 
 
 
def _setup_value_conversion(self, attribute):
 
"""
 
gives the client access to the attribute_wrapper object in order to access all
 
necessary data such as dimensionality and data type
 
"""
 
 
dim_y = attribute.dim_y
 
dim_x = attribute.dim_x
 
 
dtype = attribute.numpy_type
 
 
return dim_y, dim_x, dtype
 
 
def _setup_mapping(self, name, section, dtype, dim_y, dim_x):
 
"""
 
takes all gathered data to configure and return the correct read and write functions
 
"""
 
 
def read_function():
 
self.config.read_file(self.config_file)
 
value = self.config.get(section, name)
 
 
value = array(value, dtype)
 
 
if dim_y > 1:
 
# if data is an image, slice it according to the y dimensions
 
value = numpy.array(numpy.split(value, indices_or_sections=dim_y))
 
 
return value
 
 
def write_function(value):
 
 
if type(value) is list:
 
write_value = ""
 
 
for i in value:
 
write_value = write_value + str(value) + ", "
 
 
# strip the extra ", " at the end
 
write_value = write_value[:-2]
 
else:
 
write_value = str(value)
 
 
self.config.read_file(self.config_file)
 
self.config.set(section, name, write_value)
 
fp = open(self.filename, 'w')
 
self.config.write(fp)
 
 
return read_function, write_function
 
 
def setup_attribute(self, annotation=None, attribute=None):
 
"""
 
MANDATORY function: is used by the attribute wrapper to get read/write functions.
 
must return the read and write functions
 
"""
 
 
# process the comms_annotation
 
section, name = self._setup_annotation(annotation)
 
 
# get all the necessary data to set up the read/write functions from the attribute_wrapper
 
dim_y, dim_x, dtype = self._setup_value_conversion(attribute)
 
 
# configure and return the read/write functions
 
read_function, write_function = self._setup_mapping(name, section, dtype, dim_y, dim_x)
 
 
# return the read/write functions
 
return read_function, write_function
 
 
def array(string, dtype):
 
array = []
 
 
if dtype is numpy.bool_:
 
# Handle special case for Bools
 
for i in string.split(","):
 
i.replace("", "")
 
if "True" in i:
 
array.append(True)
 
elif "False" in i:
 
array.append(False)
 
else:
 
raise ValueError("String to bool failed. String is not True/False, but is: '{}'".format(i))
 
 
array = dtype(array)
 
 
elif dtype is numpy.str_:
 
for i in string.split(","):
 
value = numpy.str_(i)
 
array.append(value)
 
 
array = numpy.array(array)
 
 
else:
 
# regular case, go through the separator
 
for i in string.split(","):
 
i.replace(" ", "")
 
value = dtype(i)
 
array.append(value)
 
 
# convert values from buildin type to numpy type
 
array = dtype(array)
 
 
return array
 
 
def write_config(filename):
 
with open(filename, 'w') as configfile:
 
 
config = configparser.ConfigParser()
 
config['scalar'] = {}
 
config['scalar']['double_scalar_R'] = '1.2'
 
config['scalar']['double_scalar_RW'] = '3.4'
 
config['scalar']['bool_scalar_R'] = 'True'
 
config['scalar']['bool_scalar_RW'] = 'False'
 
config['scalar']['int_scalar_R'] = '5'
 
config['scalar']['int_scalar_RW'] = '6'
 
config['scalar']['str_scalar_R'] = 'this is'
 
config['scalar']['str_scalar_RW'] = 'a test'
 
 
config['spectrum'] = {}
 
config['spectrum']['double_spectrum_R'] = '1.2, 2.3, 3.4, 4.5'
 
config['spectrum']['double_spectrum_RW'] = '5.6, 6.7, 7.8, 9.0'
 
config['spectrum']['bool_spectrum_R'] = 'True, True, False, False'
 
config['spectrum']['bool_spectrum_RW'] = 'False, False, True, True'
 
config['spectrum']['int_spectrum_R'] = '1, 2, 3, 4'
 
config['spectrum']['int_spectrum_RW'] = '6, 7, 8, 9'
 
config['spectrum']['str_spectrum_R'] = '"a", "b", "c", "d"'
 
config['spectrum']['str_spectrum_RW'] = '"D", "E", "F", "G"'
 
 
config['image'] = {}
 
config['image']['double_image_R'] = '1.2, 2.3, 3.4, 4.5, 5.6, 6.7'
 
config['image']['double_image_RW'] = '5.6, 6.7, 7.8, 9.0, 1.2, 3.4'
 
config['image']['bool_image_R'] = 'True, True, False, False, True, False'
 
config['image']['bool_image_RW'] = 'False, False, True, True, False, Trie'
 
config['image']['int_image_R'] = '1, 2, 3, 4, 5, 6'
 
config['image']['int_image_RW'] = '6, 7, 8, 9, 10, 11'
 
config['image']['str_image_R'] = '"a", "b", "c", "d", "e", "f"'
 
config['image']['str_image_RW'] = '"D", "E", "F", "G", "H", "I"'
 
 
config.write(configfile)
Loading