Skip to content
Snippets Groups Projects
Commit 8fd224e4 authored by Johan Venter's avatar Johan Venter
Browse files

Merge branch 'sar-287_subarray_commands_to_lrc' into 'main'

SAR-287 Change the remaining subarray commands to long running

See merge request ska-telescope/ska-tango-base!69
parents 833c8b26 c8f268e7
Branches
No related tags found
No related merge requests found
...@@ -641,7 +641,7 @@ class SKASubarray(SKAObsDevice): ...@@ -641,7 +641,7 @@ class SKASubarray(SKAObsDevice):
dtype_in="DevString", dtype_in="DevString",
doc_in="JSON-encoded string with the resources to add to subarray", doc_in="JSON-encoded string with the resources to add to subarray",
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def AssignResources(self, argin): def AssignResources(self, argin):
...@@ -654,31 +654,19 @@ class SKASubarray(SKAObsDevice): ...@@ -654,31 +654,19 @@ class SKASubarray(SKAObsDevice):
:param argin: the resources to be assigned :param argin: the resources to be assigned
:type argin: list of str :type argin: list of str
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
""" """
handler = self.get_command_object("AssignResources") handler = self.get_command_object("AssignResources")
args = json.loads(argin) args = json.loads(argin)
unique_id, return_code = self.component_manager.enqueue(handler, args) unique_id, return_code = self.component_manager.enqueue(handler, args)
return [[return_code], [unique_id]] return ([return_code], [unique_id])
def is_ReleaseResources_allowed(self):
"""
Check if command `ReleaseResources` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
"""
command = self.get_command_object("ReleaseResources")
return command.is_allowed(raise_if_disallowed=True)
@command( @command(
dtype_in="DevString", dtype_in="DevString",
doc_in="JSON-encoded string with the resources to remove from the subarray", doc_in="JSON-encoded string with the resources to remove from the subarray",
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def ReleaseResources(self, argin): def ReleaseResources(self, argin):
...@@ -691,29 +679,17 @@ class SKASubarray(SKAObsDevice): ...@@ -691,29 +679,17 @@ class SKASubarray(SKAObsDevice):
:param argin: the resources to be released :param argin: the resources to be released
:type argin: list of str :type argin: list of str
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
""" """
command = self.get_command_object("ReleaseResources") handler = self.get_command_object("ReleaseResources")
args = json.loads(argin) args = json.loads(argin)
(return_code, message) = command(args) unique_id, return_code = self.component_manager.enqueue(handler, args)
return [[return_code], [message]] return ([return_code], [unique_id])
def is_ReleaseAllResources_allowed(self):
"""
Check if command `ReleaseAllResources` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
"""
command = self.get_command_object("ReleaseAllResources")
return command.is_allowed(raise_if_disallowed=True)
@command( @command(
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def ReleaseAllResources(self): def ReleaseAllResources(self):
...@@ -723,30 +699,18 @@ class SKASubarray(SKAObsDevice): ...@@ -723,30 +699,18 @@ class SKASubarray(SKAObsDevice):
To modify behaviour for this command, modify the do() method of To modify behaviour for this command, modify the do() method of
the command class. the command class.
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("ReleaseAllResources")
(return_code, message) = command()
return [[return_code], [message]]
def is_Configure_allowed(self):
"""
Check if command `Configure` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
""" """
command = self.get_command_object("Configure") handler = self.get_command_object("ReleaseAllResources")
return command.is_allowed(raise_if_disallowed=True) unique_id, return_code = self.component_manager.enqueue(handler)
return ([return_code], [unique_id])
@command( @command(
dtype_in="DevString", dtype_in="DevString",
doc_in="JSON-encoded string with the scan configuration", doc_in="JSON-encoded string with the scan configuration",
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def Configure(self, argin): def Configure(self, argin):
...@@ -759,31 +723,19 @@ class SKASubarray(SKAObsDevice): ...@@ -759,31 +723,19 @@ class SKASubarray(SKAObsDevice):
:param argin: configuration specification :param argin: configuration specification
:type argin: string :type argin: string
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
""" """
command = self.get_command_object("Configure") handler = self.get_command_object("Configure")
args = json.loads(argin) args = json.loads(argin)
(return_code, message) = command(args) unique_id, return_code = self.component_manager.enqueue(handler, args)
return [[return_code], [message]] return ([return_code], [unique_id])
def is_Scan_allowed(self):
"""
Check if command `Scan` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
"""
command = self.get_command_object("Scan")
return command.is_allowed(raise_if_disallowed=True)
@command( @command(
dtype_in="DevString", dtype_in="DevString",
doc_in="JSON-encoded string with the per-scan configuration", doc_in="JSON-encoded string with the per-scan configuration",
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def Scan(self, argin): def Scan(self, argin):
...@@ -796,29 +748,17 @@ class SKASubarray(SKAObsDevice): ...@@ -796,29 +748,17 @@ class SKASubarray(SKAObsDevice):
:param argin: Information about the scan :param argin: Information about the scan
:type argin: Array of str :type argin: Array of str
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
""" """
command = self.get_command_object("Scan") handler = self.get_command_object("Scan")
args = json.loads(argin) args = json.loads(argin)
(return_code, message) = command(args) unique_id, return_code = self.component_manager.enqueue(handler, args)
return [[return_code], [message]] return ([return_code], [unique_id])
def is_EndScan_allowed(self):
"""
Check if command `EndScan` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
"""
command = self.get_command_object("EndScan")
return command.is_allowed(raise_if_disallowed=True)
@command( @command(
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def EndScan(self): def EndScan(self):
...@@ -828,28 +768,16 @@ class SKASubarray(SKAObsDevice): ...@@ -828,28 +768,16 @@ class SKASubarray(SKAObsDevice):
To modify behaviour for this command, modify the do() method of To modify behaviour for this command, modify the do() method of
the command class. the command class.
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("EndScan")
(return_code, message) = command()
return [[return_code], [message]]
def is_End_allowed(self):
"""
Check if command `End` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
""" """
command = self.get_command_object("End") handler = self.get_command_object("EndScan")
return command.is_allowed(raise_if_disallowed=True) unique_id, return_code = self.component_manager.enqueue(handler)
return ([return_code], [unique_id])
@command( @command(
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def End(self): def End(self):
...@@ -860,28 +788,16 @@ class SKASubarray(SKAObsDevice): ...@@ -860,28 +788,16 @@ class SKASubarray(SKAObsDevice):
To modify behaviour for this command, modify the do() method of To modify behaviour for this command, modify the do() method of
the command class. the command class.
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("End")
(return_code, message) = command()
return [[return_code], [message]]
def is_Abort_allowed(self):
""" """
Check if command `Abort` is allowed in the current device state. handler = self.get_command_object("End")
unique_id, return_code = self.component_manager.enqueue(handler)
:return: ``True`` if the command is allowed return ([return_code], [unique_id])
:rtype: boolean
"""
command = self.get_command_object("Abort")
return command.is_allowed(raise_if_disallowed=True)
@command( @command(
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def Abort(self): def Abort(self):
...@@ -891,28 +807,16 @@ class SKASubarray(SKAObsDevice): ...@@ -891,28 +807,16 @@ class SKASubarray(SKAObsDevice):
To modify behaviour for this command, modify the do() method of To modify behaviour for this command, modify the do() method of
the command class. the command class.
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("Abort")
(return_code, message) = command()
return [[return_code], [message]]
def is_ObsReset_allowed(self):
""" """
Check if command `ObsReset` is allowed in the current device state. handler = self.get_command_object("Abort")
unique_id, return_code = self.component_manager.enqueue(handler)
:return: ``True`` if the command is allowed return ([return_code], [unique_id])
:rtype: boolean
"""
command = self.get_command_object("ObsReset")
return command.is_allowed(raise_if_disallowed=True)
@command( @command(
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def ObsReset(self): def ObsReset(self):
...@@ -922,28 +826,16 @@ class SKASubarray(SKAObsDevice): ...@@ -922,28 +826,16 @@ class SKASubarray(SKAObsDevice):
To modify behaviour for this command, modify the do() method of To modify behaviour for this command, modify the do() method of
the command class. the command class.
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
"""
command = self.get_command_object("ObsReset")
(return_code, message) = command()
return [[return_code], [message]]
def is_Restart_allowed(self):
"""
Check if command `Restart` is allowed in the current device state.
:return: ``True`` if the command is allowed
:rtype: boolean
""" """
command = self.get_command_object("Restart") handler = self.get_command_object("ObsReset")
return command.is_allowed(raise_if_disallowed=True) unique_id, return_code = self.component_manager.enqueue(handler)
return ([return_code], [unique_id])
@command( @command(
dtype_out="DevVarLongStringArray", dtype_out="DevVarLongStringArray",
doc_out="(ReturnType, 'informational message')", doc_out="([Command ResultCode], [Unique ID of the command])",
) )
@DebugIt() @DebugIt()
def Restart(self): def Restart(self):
...@@ -953,14 +845,12 @@ class SKASubarray(SKAObsDevice): ...@@ -953,14 +845,12 @@ class SKASubarray(SKAObsDevice):
To modify behaviour for this command, modify the do() method of To modify behaviour for this command, modify the do() method of
the command class. the command class.
:return: A tuple containing a return code and a string :return: A tuple containing a result code and the unique ID of the command
message indicating status. The message is for :rtype: ([ResultCode], [str])
information purpose only.
:rtype: (ResultCode, str)
""" """
command = self.get_command_object("Restart") handler = self.get_command_object("Restart")
(return_code, message) = command() unique_id, return_code = self.component_manager.enqueue(handler)
return [[return_code], [message]] return ([return_code], [unique_id])
# ---------- # ----------
......
...@@ -96,15 +96,19 @@ class TestSKASubarray: ...@@ -96,15 +96,19 @@ class TestSKASubarray:
result_callback.wait_for_lrc_id(on_tr.unique_id) result_callback.wait_for_lrc_id(on_tr.unique_id)
result_callback.wait_for_lrc_id(assign_tr.unique_id) result_callback.wait_for_lrc_id(assign_tr.unique_id)
device_under_test.Configure('{"BAND1": 2}') conf_tr = device_under_test.Configure('{"BAND1": 2}')
result_callback.wait_for_lrc_id(conf_tr.unique_id)
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.READY) obs_state_callback.assert_call(ObsState.READY)
assert device_under_test.Abort() == [ abort_tr = device_under_test.Abort()
[ResultCode.OK], result_callback.wait_for_lrc_id(abort_tr.unique_id)
["Abort command completed OK"], assert (
] device_under_test.longRunningCommandResult[2]
== "Abort command completed OK"
)
assert int(device_under_test.longRunningCommandResult[1]) == ResultCode.OK
obs_state_callback.assert_calls([ObsState.ABORTING, ObsState.ABORTED]) obs_state_callback.assert_calls([ObsState.ABORTING, ObsState.ABORTED])
# PROTECTED REGION END # // SKASubarray.test_Abort # PROTECTED REGION END # // SKASubarray.test_Abort
...@@ -125,7 +129,8 @@ class TestSKASubarray: ...@@ -125,7 +129,8 @@ class TestSKASubarray:
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.IDLE) obs_state_callback.assert_call(ObsState.IDLE)
device_under_test.Configure('{"BAND1": 2}') conf_tr = device_under_test.Configure('{"BAND1": 2}')
result_callback.wait_for_lrc_id(conf_tr.unique_id)
obs_state_callback.assert_calls([ObsState.CONFIGURING, ObsState.READY]) obs_state_callback.assert_calls([ObsState.CONFIGURING, ObsState.READY])
assert device_under_test.obsState == ObsState.READY assert device_under_test.obsState == ObsState.READY
...@@ -192,7 +197,8 @@ class TestSKASubarray: ...@@ -192,7 +197,8 @@ class TestSKASubarray:
assert device_under_test.ObsState == ObsState.IDLE assert device_under_test.ObsState == ObsState.IDLE
assert list(device_under_test.assignedResources) == resources_to_assign assert list(device_under_test.assignedResources) == resources_to_assign
device_under_test.ReleaseAllResources() release_tr = device_under_test.ReleaseAllResources()
result_callback.wait_for_lrc_id(release_tr.unique_id)
obs_state_callback.assert_calls([ObsState.RESOURCING, ObsState.EMPTY]) obs_state_callback.assert_calls([ObsState.RESOURCING, ObsState.EMPTY])
assert device_under_test.ObsState == ObsState.EMPTY assert device_under_test.ObsState == ObsState.EMPTY
...@@ -214,15 +220,19 @@ class TestSKASubarray: ...@@ -214,15 +220,19 @@ class TestSKASubarray:
result_callback.wait_for_lrc_id(on_tr.unique_id) result_callback.wait_for_lrc_id(on_tr.unique_id)
result_callback.wait_for_lrc_id(assign_tr.unique_id) result_callback.wait_for_lrc_id(assign_tr.unique_id)
device_under_test.Configure('{"BAND1": 2}') conf_tr = device_under_test.Configure('{"BAND1": 2}')
result_callback.wait_for_lrc_id(conf_tr.unique_id)
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.READY) obs_state_callback.assert_call(ObsState.READY)
assert device_under_test.End() == [ end_tr = device_under_test.End()
[ResultCode.OK], result_callback.wait_for_lrc_id(end_tr.unique_id)
["End command completed OK"], assert (
] device_under_test.longRunningCommandResult[2] == "End command completed OK"
)
assert int(device_under_test.longRunningCommandResult[1]) == ResultCode.OK
obs_state_callback.assert_call(ObsState.IDLE) obs_state_callback.assert_call(ObsState.IDLE)
# PROTECTED REGION END # // SKASubarray.test_EndSB # PROTECTED REGION END # // SKASubarray.test_EndSB
...@@ -238,19 +248,24 @@ class TestSKASubarray: ...@@ -238,19 +248,24 @@ class TestSKASubarray:
on_tr = device_under_test.On() on_tr = device_under_test.On()
assign_tr = device_under_test.AssignResources(json.dumps(["BAND1"])) assign_tr = device_under_test.AssignResources(json.dumps(["BAND1"]))
conf_tr = device_under_test.Configure('{"BAND1": 2}')
scan_tr = device_under_test.Scan('{"id": 123}')
result_callback.wait_for_lrc_id(on_tr.unique_id) result_callback.wait_for_lrc_id(on_tr.unique_id)
result_callback.wait_for_lrc_id(assign_tr.unique_id) result_callback.wait_for_lrc_id(assign_tr.unique_id)
result_callback.wait_for_lrc_id(conf_tr.unique_id)
device_under_test.Configure('{"BAND1": 2}') result_callback.wait_for_lrc_id(scan_tr.unique_id)
device_under_test.Scan('{"id": 123}')
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.SCANNING) obs_state_callback.assert_call(ObsState.SCANNING)
assert device_under_test.EndScan() == [ endscan_tr = device_under_test.EndScan()
[ResultCode.OK], result_callback.wait_for_lrc_id(endscan_tr.unique_id)
["EndScan command completed OK"], assert (
] device_under_test.longRunningCommandResult[2]
== "EndScan command completed OK"
)
assert int(device_under_test.longRunningCommandResult[1]) == ResultCode.OK
obs_state_callback.assert_call(ObsState.READY) obs_state_callback.assert_call(ObsState.READY)
...@@ -274,7 +289,8 @@ class TestSKASubarray: ...@@ -274,7 +289,8 @@ class TestSKASubarray:
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.IDLE) obs_state_callback.assert_call(ObsState.IDLE)
device_under_test.ReleaseAllResources() release_tr = device_under_test.ReleaseAllResources()
result_callback.wait_for_lrc_id(release_tr.unique_id)
obs_state_callback.assert_calls([ObsState.RESOURCING, ObsState.EMPTY]) obs_state_callback.assert_calls([ObsState.RESOURCING, ObsState.EMPTY])
assert device_under_test.assignedResources is None assert device_under_test.assignedResources is None
...@@ -297,7 +313,8 @@ class TestSKASubarray: ...@@ -297,7 +313,8 @@ class TestSKASubarray:
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.IDLE) obs_state_callback.assert_call(ObsState.IDLE)
device_under_test.ReleaseResources(json.dumps(["BAND1"])) release_tr = device_under_test.ReleaseResources(json.dumps(["BAND1"]))
result_callback.wait_for_lrc_id(release_tr.unique_id)
obs_state_callback.assert_calls([ObsState.RESOURCING, ObsState.IDLE]) obs_state_callback.assert_calls([ObsState.RESOURCING, ObsState.IDLE])
assert device_under_test.ObsState == ObsState.IDLE assert device_under_test.ObsState == ObsState.IDLE
...@@ -318,16 +335,22 @@ class TestSKASubarray: ...@@ -318,16 +335,22 @@ class TestSKASubarray:
result_callback.wait_for_lrc_id(on_tr.unique_id) result_callback.wait_for_lrc_id(on_tr.unique_id)
result_callback.wait_for_lrc_id(assign_tr.unique_id) result_callback.wait_for_lrc_id(assign_tr.unique_id)
device_under_test.Configure('{"BAND1": 2}') conf_tr = device_under_test.Configure('{"BAND1": 2}')
device_under_test.Abort() result_callback.wait_for_lrc_id(conf_tr.unique_id)
abort_tr = device_under_test.Abort()
result_callback.wait_for_lrc_id(abort_tr.unique_id)
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.ABORTED) obs_state_callback.assert_call(ObsState.ABORTED)
assert device_under_test.ObsReset() == [ obs_reset_tr = device_under_test.ObsReset()
[ResultCode.OK], result_callback.wait_for_lrc_id(obs_reset_tr.unique_id)
["ObsReset command completed OK"], assert (
] device_under_test.longRunningCommandResult[2]
== "ObsReset command completed OK"
)
assert int(device_under_test.longRunningCommandResult[1]) == ResultCode.OK
obs_state_callback.assert_calls([ObsState.RESETTING, ObsState.IDLE]) obs_state_callback.assert_calls([ObsState.RESETTING, ObsState.IDLE])
assert device_under_test.obsState == ObsState.IDLE assert device_under_test.obsState == ObsState.IDLE
...@@ -347,20 +370,23 @@ class TestSKASubarray: ...@@ -347,20 +370,23 @@ class TestSKASubarray:
result_callback.wait_for_lrc_id(on_tr.unique_id) result_callback.wait_for_lrc_id(on_tr.unique_id)
result_callback.wait_for_lrc_id(assign_tr.unique_id) result_callback.wait_for_lrc_id(assign_tr.unique_id)
device_under_test.Configure('{"BAND1": 2}') conf_tr = device_under_test.Configure('{"BAND1": 2}')
result_callback.wait_for_lrc_id(conf_tr.unique_id)
obs_state_callback = tango_change_event_helper.subscribe("obsState") obs_state_callback = tango_change_event_helper.subscribe("obsState")
obs_state_callback.assert_call(ObsState.READY) obs_state_callback.assert_call(ObsState.READY)
assert device_under_test.Scan('{"id": 123}') == [ scan_tr = device_under_test.Scan('{"id": 123}')
[ResultCode.STARTED], result_callback.wait_for_lrc_id(scan_tr.unique_id)
["Scan command started"], assert device_under_test.longRunningCommandResult[2] == "Scan command started"
] assert int(device_under_test.longRunningCommandResult[1]) == ResultCode.STARTED
obs_state_callback.assert_call(ObsState.SCANNING) obs_state_callback.assert_call(ObsState.SCANNING)
assert device_under_test.obsState == ObsState.SCANNING assert device_under_test.obsState == ObsState.SCANNING
device_under_test.EndScan() endscan_tr = device_under_test.EndScan()
result_callback.wait_for_lrc_id(endscan_tr.unique_id)
with pytest.raises(DevFailed): with pytest.raises(DevFailed):
device_under_test.Scan("Invalid JSON") device_under_test.Scan("Invalid JSON")
# PROTECTED REGION END # // SKASubarray.test_Scan # PROTECTED REGION END # // SKASubarray.test_Scan
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment