Skip to content
Snippets Groups Projects
Commit ee9c206e authored by Taya Snijder's avatar Taya Snijder
Browse files

Started work on Tango controls

parent 7f0101d8
Branches
No related tags found
No related merge requests found
import yaml_reader
from opcua import ua, uamethod, Server
from opcua.common.callback import CallbackType
import sys
import time
import logging
import numpy
from tango import AttrQuality, AttrWriteType, DispLevel, DevState, DebugIt
from tango.server import Device, attribute, command, pipe, device_property
class real_values:
access_incrementer = 0
class mock_device(Device):
voltage = attribute(label="Voltage", dtype="float", display_level=DispLevel.OPERATOR,
access=AttrWriteType.READ, unit="V", format="8.4f", doc="The power supply voltage")
current = attribute(label="Current", dtype="float", display_level=DispLevel.EXPERT,
access=AttrWriteType.READ_WRITE, unit="A", format="8.4f",
min_value=0.0, max_value=8.4, min_alarm=0.1, max_alarm=8.4,
min_warning=0.5, max_warning=8.0, fget="get_current", fset="set_current", doc="The power supply current")
noise = attribute(label="Noise",
dtype=((int,),),
max_dim_x=1024, max_dim_y=1024)
info = pipe(label='Info')
host = device_property(dtype=str)
port = device_property(dtype=int, default_value=9788)
def init_device(self):
Device.init_device(self)
self.__current = 0.0
self.set_state(DevState.STANDBY)
def read_voltage(self):
self.info_stream("read_voltage(%s, %d)", self.host, self.port)
return 9.99, time.time(), AttrQuality.ATTR_WARNING
def get_current(self):
return self.__current
def set_current(self, current):
# should set the power supply current
self.__current = current
def read_info(self):
return 'Information', dict(manufacturer='Tango',
model='PS2000',
version_number=123)
@DebugIt()
def read_noise(self):
return numpy.random.random_integers(1000, size=(100, 100))
@command
def TurnOn(self):
# turn on the actual power supply here
self.set_state(DevState.ON)
@command
def TurnOff(self):
# turn off the actual power supply here
self.set_state(DevState.OFF)
@command(dtype_in=float, doc_in="Ramp target current",
dtype_out=bool, doc_out="True if ramping went well, "
"False otherwise")
def Ramp(self, target_current):
# should do the ramping
return True
def get_real_data():
real_values.access_incrementer += 1
return real_values.access_incrementer
def device_callback(event, dispatcher):
print(dispatcher)
dispatcher.set_value(get_real_data())
def add_monitor_point(device, node):
write_val = int(get_real_data())
node = obj.add_variable(idx, i.name, value)
print("Added: {}, description: {}".format(i.name, i.description))
OPC_nodes.append(node)
def add_control_point(device, node):
pass
class mock_server:
......@@ -41,17 +113,19 @@ idx = server.register_namespace(uri)
# get Objects node, this is where we should put our custom stuff
obj = server.get_objects_node()
# starting!
server.start()
OPC_nodes = []
for i in yaml_reader.device_list:
print("name: {}, description: {}".format(i.name, i.description))
OPC_nodes.append(obj.add_variable(idx, i.name, get_real_data(), ua.VariantType.Int64))
value = int(get_real_data())
var_node = obj.add_variable(idx, i.name, value)
OPC_nodes.append(var_node)
# starting!
server.start()
for i in OPC_nodes:
print(i.get_browse_name(), ": ", i.get_value())
server.subscribe_server_callback(CallbackType, device_callback)
while 1:
time.sleep(0.1)
\ No newline at end of file
name, description, RW, max, min, scale, width
"Att_0","Gain of ADC0","W","24","None","None","5"
"Att_1","Gain of ADC0","W","24","None","None","5"
"Att_2","Gain of ADC0","W","24","None","None","5"
"Pwr_dig_0","Enable LDOs","W","None","None","None","1"
"Pwr_dig_1","Enable LDOs","W","None","None","None","1"
"Pwr_dig_2","Enable LDOs","W","None","None","None","1"
"Pwr_Ant_0","Switch power of Antenna 1","W","None","None","None","1"
"Pwr_Ant_1","Switch power of Antenna 1","W","None","None","None","1"
"Pwr_Ant_2","Switch power of Antenna 1","W","None","None","None","1"
"Band_0","10/30MHz band select","W","None","None","None","2"
"Band_1","10/30MHz band select","W","None","None","None","2"
"Band_2","10/30MHz band select","W","None","None","None","2"
"LED_0","LEDs on PCB","W","None","None","None","1"
"LED_1","LEDs on PCB","W","None","None","None","1"
"ADC_lock_0","Check ADC lock and JESD link ready","R","None","None","None","8"
"ADC_lock_1","Check ADC lock and JESD link ready","R","None","None","None","8"
"ADC_lock_2","Check ADC lock and JESD link ready","R","None","None","None","8"
"V_2v5_0","2.5V on PCB","R","None","None","1.97e-07","24"
"V_2v5_1","2.5V on PCB","R","None","None","1.97e-07","24"
"V_2v5_2","2.5V on PCB","R","None","None","1.97e-07","24"
......@@ -33,7 +33,7 @@ class var_Param:
var_scale = "var_scale" # used to determine the datatype currently (subject to change)
class device:
class yaml_generated_device:
def __init__(self, name="", description="", RW="R", maxval=None, minval=None, scale=None, width=None):
self.name = name
self.description = description
......@@ -65,7 +65,16 @@ class explorer:
self.current_device = None # create an empty device for temporarily holding data
self.current_Device_instances = 1 # some devices need to be instantiated more than once
self.log_file = open("test_output.txt", 'w')
def log_data(self):
file = open("output_devices.csv", "w")
file.write("name, description, RW, max, min, scale, width\n")
for i in self.device_list:
file.write("\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\",\"{}\"\n".format(i.name, i.description, i.RW, i.max, i.min, i.scale, i.width))
file.close()
def new_device(self):
# add the device or devices to the list
......@@ -75,11 +84,11 @@ class explorer:
if self.current_Device_instances != 0:
self.current_device.name += "_x"
for i in range(self.current_Device_instances):
self.device_list.append(device())
self.device_list.append(yaml_generated_device())
self.device_list[-1].device_copy(self.current_device)
self.device_list[-1].name = self.current_device.name[:-1] + str(i)
else:
self.device_list.append(device())
self.device_list.append(yaml_generated_device())
self.device_list[-1].device_copy(self.current_device)
def add_device_data(self, param, data):
......@@ -89,7 +98,7 @@ class explorer:
self.new_device()
# reset the current device to its default
self.current_device = device()
self.current_device = yaml_generated_device()
self.current_device.name = data
# add the other parameters
if param == var_Param.var_RW:
......@@ -154,6 +163,8 @@ class explorer:
sub_data = yaml.load(str(v), Loader=yaml.FullLoader)
print(k)
self.explore(sub_data, str(k), depth)
self.log_data()
return self.device_list
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment