diff --git a/docs/source/api/queue_manager.rst b/docs/source/api/queue_manager.rst index 71dfdcd85a76d4a95a2363ab14a552a44ed069db..202c6a7c833d63fe910856cb655bf872c6abff80 100644 --- a/docs/source/api/queue_manager.rst +++ b/docs/source/api/queue_manager.rst @@ -2,5 +2,5 @@ Queue Manager ============= -.. automodule:: ska_tango_base.base.task_queue_component_manager +.. automodule:: ska_tango_base.base.task_queue_manager :members: diff --git a/pogo/CspSubElementController.xmi b/pogo/CspSubElementController.xmi index 74529ce22a43629c194923847c27677ea2d3df2d..c861cbdb54bde0004aa37903408b1df8606b27fa 100644 --- a/pogo/CspSubElementController.xmi +++ b/pogo/CspSubElementController.xmi @@ -381,31 +381,31 @@ <status abstract="false" inherited="true" concrete="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/CspSubElementObsDevice.xmi b/pogo/CspSubElementObsDevice.xmi index 75b40d045187ad5e5aaa9a6beed68dc7cfa30488..7befab6dceac94b1e8e3f94fc07f58ff46aab16f 100644 --- a/pogo/CspSubElementObsDevice.xmi +++ b/pogo/CspSubElementObsDevice.xmi @@ -286,31 +286,31 @@ <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="Flag reporting if the SDP link is active.
True: active
False:down" label="sdpLinkActive" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/CspSubElementSubarray.xmi b/pogo/CspSubElementSubarray.xmi index c39f6928a5e4ce39534f15b894a63eaecd35d02b..489b6908ab263dd26f752aaba5d17c2a405119c1 100644 --- a/pogo/CspSubElementSubarray.xmi +++ b/pogo/CspSubElementSubarray.xmi @@ -436,31 +436,31 @@ <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="Flag reporting if the SDP links are active." label="sdpLinkActive" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKAAlarmHandler.xmi b/pogo/SKAAlarmHandler.xmi index 74a0c53579515a5cb6dd41b07a578cc39db5a01d..db01bd184e51687cbe12657178050a89ab9d0a5e 100644 --- a/pogo/SKAAlarmHandler.xmi +++ b/pogo/SKAAlarmHandler.xmi @@ -260,31 +260,31 @@ <status abstract="false" inherited="true" concrete="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKABaseDevice.xmi b/pogo/SKABaseDevice.xmi index 149f11bcb79b92b5a207bce340356eefe5a9e8ec..a7f7ed3d33b5aab3b45766fea89602a568004058 100644 --- a/pogo/SKABaseDevice.xmi +++ b/pogo/SKABaseDevice.xmi @@ -167,31 +167,31 @@ <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> diff --git a/pogo/SKACapability.xmi b/pogo/SKACapability.xmi index 7e251b91e921543d103158db7720f4acd1930601..b71acefedfa4738c44587657b2a95208db2102fa 100644 --- a/pogo/SKACapability.xmi +++ b/pogo/SKACapability.xmi @@ -207,31 +207,31 @@ <status abstract="false" inherited="false" concrete="true" concreteHere="true"/> <properties description="A list of components with no. of instances in use on this Capability." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKAController.xmi b/pogo/SKAController.xmi index 88007f362265b1137fd0e65e018891c64f3ee5cb..46679e7d55b18d1c0a3fa52b503aaab808362dad 100644 --- a/pogo/SKAController.xmi +++ b/pogo/SKAController.xmi @@ -222,31 +222,31 @@ <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKALogger.xmi b/pogo/SKALogger.xmi index 522f094930d43c514e8db441f69c848c37b21155..267dbfb092f18f98b15fad112d09289811805fa7 100644 --- a/pogo/SKALogger.xmi +++ b/pogo/SKALogger.xmi @@ -148,31 +148,31 @@ <status abstract="false" inherited="true" concrete="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKAObsDevice.xmi b/pogo/SKAObsDevice.xmi index 4c8b697095044c5a9f59ada2f27c09b4861aba31..3512f14737da381a38d4b94adb0fafb894e6406b 100644 --- a/pogo/SKAObsDevice.xmi +++ b/pogo/SKAObsDevice.xmi @@ -186,31 +186,31 @@ <status abstract="false" inherited="true" concrete="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKASubarray.xmi b/pogo/SKASubarray.xmi index d7f2fee1992bdb3abfabdfe6b1bb93b60ddd5818..f13224cf3843b06f84b9c7c66a3d9ab52546f98b 100644 --- a/pogo/SKASubarray.xmi +++ b/pogo/SKASubarray.xmi @@ -284,31 +284,31 @@ <status abstract="false" inherited="true" concrete="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/pogo/SKATelState.xmi b/pogo/SKATelState.xmi index 73e5383d2246507083b6ab7883e331cbbd937135..009fed6b9224161dd27335abf828212d706e4023 100644 --- a/pogo/SKATelState.xmi +++ b/pogo/SKATelState.xmi @@ -143,31 +143,31 @@ <status abstract="false" inherited="true" concrete="true"/> <properties description="Logging targets for this device, excluding ska_ser_logging defaults - 
initialises to LoggingTargetsDefault on startup" label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Keep track of which commands are in the queue. 
Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="98" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandIDsInQueue" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="false" libCheckCriteria="false"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="Every client that executes a command will receive a command ID as response. 
Keep track of IDs in the queue. Pop off from front as they complete." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandStatus" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, status pair of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandProgress" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="100" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> <properties description="ID, progress of the currently executing command. 
Clients can subscribe to on_change event and wait for the ID they are interested in." label="" unit="" standardUnit="" displayUnit="" format="" maxValue="" minValue="" maxAlarm="" minAlarm="" maxWarning="" minWarning="" deltaTime="" deltaValue=""/> </attributes> - <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="2" maxY="" allocReadMember="true" isDynamic="false"> + <attributes name="longRunningCommandResult" attType="Spectrum" rwType="READ" displayLevel="OPERATOR" polledPeriod="0" maxX="3" maxY="" allocReadMember="true" isDynamic="false"> <dataType xsi:type="pogoDsl:StringType"/> <changeEvent fire="true" libCheckCriteria="true"/> <status abstract="false" inherited="true" concrete="true" concreteHere="false"/> diff --git a/src/ska_tango_base/base/base_device.py b/src/ska_tango_base/base/base_device.py index 02f8a15804f763ce273012a271f576bbffd40574..093a7ecd7ad426c77e2cdd9920f78d4051c15f58 100644 --- a/src/ska_tango_base/base/base_device.py +++ b/src/ska_tango_base/base/base_device.py @@ -58,6 +58,7 @@ from ska_tango_base.faults import ( LoggingTargetError, LoggingLevelError, ) +from ska_tango_base.base.task_queue_manager import MAX_WORKER_COUNT, MAX_QUEUE_SIZE LOG_FILE_SIZE = 1024 * 1024 # Log file size 1MB. _DEBUGGER_PORT = 5678 @@ -432,6 +433,17 @@ class SKABaseDevice(Device): device.set_change_event("status", True, True) device.set_archive_event("status", True, True) + device.set_change_event("longRunningCommandsInQueue", True, True) + device.set_archive_event("longRunningCommandsInQueue", True, True) + device.set_change_event("longRunningCommandIDsInQueue", True, True) + device.set_archive_event("longRunningCommandIDsInQueue", True, True) + device.set_change_event("longRunningCommandStatus", True, True) + device.set_archive_event("longRunningCommandStatus", True, True) + device.set_change_event("longRunningCommandProgress", True, True) + device.set_archive_event("longRunningCommandProgress", True, True) + device.set_change_event("longRunningCommandResult", True, True) + device.set_archive_event("longRunningCommandResult", True, True) + device._health_state = HealthState.OK device._control_mode = ControlMode.REMOTE device._simulation_mode = SimulationMode.FALSE @@ -691,7 +703,7 @@ class SKABaseDevice(Device): longRunningCommandsInQueue = attribute( dtype=("str",), - max_dim_x=98, + max_dim_x=MAX_QUEUE_SIZE, access=AttrWriteType.READ, doc="Keep track of which commands are in the queue. \n" "Pop off from front as they complete.", @@ -700,7 +712,7 @@ class SKABaseDevice(Device): longRunningCommandIDsInQueue = attribute( dtype=("str",), - max_dim_x=98, + max_dim_x=MAX_QUEUE_SIZE, access=AttrWriteType.READ, doc="Every client that executes a command will receive a command ID as response. \n" "Keep track of IDs in the queue. Pop off from front as they complete.", @@ -709,7 +721,7 @@ class SKABaseDevice(Device): longRunningCommandStatus = attribute( dtype=("str",), - max_dim_x=2, + max_dim_x=MAX_WORKER_COUNT * 2, # 2 per thread access=AttrWriteType.READ, doc="ID, status pair of the currently executing command. \n" "Clients can subscribe to on_change event and wait for the ID they are interested in.", @@ -718,7 +730,7 @@ class SKABaseDevice(Device): longRunningCommandProgress = attribute( dtype=("str",), - max_dim_x=2, + max_dim_x=MAX_WORKER_COUNT * 2, # 2 per thread access=AttrWriteType.READ, doc="ID, progress of the currently executing command. \n" "Clients can subscribe to on_change event and wait for the ID they are interested in..", @@ -727,9 +739,9 @@ class SKABaseDevice(Device): longRunningCommandResult = attribute( dtype=("str",), - max_dim_x=2, + max_dim_x=3, # Always the last result (unique_id, result_code, task_result) access=AttrWriteType.READ, - doc="ID, result pair. \n" + doc="unique_id, result_code, task_result. \n" "Clients can subscribe to on_change event and wait for the ID they are interested in.", ) """Device attribute for long running commands.""" @@ -1111,9 +1123,9 @@ class SKABaseDevice(Device): """ Read the long running commands in the queue. - :return: commands in the device queue + :return: tasks in the queue """ - return self.component_manager.commands_in_queue + return self.component_manager.tasks_in_queue def read_longRunningCommandIDsInQueue(self): # PROTECTED REGION ID(SKABaseDevice.longRunningCommandIDsInQueue_read) ENABLED START # @@ -1122,16 +1134,16 @@ class SKABaseDevice(Device): :return: unique ids for the enqueued commands """ - return self.component_manager.command_ids_in_queue + return self.component_manager.task_ids_in_queue def read_longRunningCommandStatus(self): # PROTECTED REGION ID(SKABaseDevice.longRunningCommandStatus_read) ENABLED START # """ - Read the status of the currently executing long running command. + Read the status of the currently executing long running commands. - :return: ID, status pair of the currently executing command + :return: ID, status pairs of the currently executing commands """ - return self.component_manager.command_status + return self.component_manager.task_status def read_longRunningCommandProgress(self): # PROTECTED REGION ID(SKABaseDevice.longRunningCommandProgress_read) ENABLED START # @@ -1140,16 +1152,16 @@ class SKABaseDevice(Device): :return: ID, progress of the currently executing command. """ - return self.component_manager.command_progress + return self.component_manager.task_progress def read_longRunningCommandResult(self): # PROTECTED REGION ID(SKABaseDevice.longRunningCommandResult_read) ENABLED START # """ Read the result of the completed long running command. - :return: ID, result pair. + :return: ID, ResultCode, result. """ - return self.component_manager.command_result + return self.component_manager.task_result # -------- # Commands @@ -1224,17 +1236,6 @@ class SKABaseDevice(Device): self.logger.info(message) return (ResultCode.OK, message) - def is_Reset_allowed(self): - """ - Whether the ``Reset()`` command is allowed to be run in the current state. - - :returns: whether the ``Reset()`` command is allowed to be run in the - current state - :rtype: boolean - """ - command = self.get_command_object("Reset") - return command.is_allowed(raise_if_disallowed=True) - @command( dtype_out="DevVarLongStringArray", doc_out="(ReturnType, 'informational message')", @@ -1253,8 +1254,9 @@ class SKABaseDevice(Device): :rtype: (ResultCode, str) """ command = self.get_command_object("Reset") - (return_code, message) = command() - return [[return_code], [message]] + unique_id, return_code = self.component_manager.enqueue(command) + + return [[return_code], [unique_id]] class StandbyCommand(StateModelCommand, ResponseCommand): """A class for the SKABaseDevice's Standby() command.""" @@ -1291,18 +1293,6 @@ class SKABaseDevice(Device): self.logger.info(message) return (ResultCode.OK, message) - def is_Standby_allowed(self): - """ - Check if command Standby is allowed in the current device state. - - :raises :py:exc:`CommandError`: if the command is not allowed - - :return: ``True`` if the command is allowed - :rtype: boolean - """ - command = self.get_command_object("Standby") - return command.is_allowed(raise_if_disallowed=True) - @command( dtype_out="DevVarLongStringArray", doc_out="(ReturnType, 'informational message')", @@ -1321,8 +1311,9 @@ class SKABaseDevice(Device): :rtype: (ResultCode, str) """ command = self.get_command_object("Standby") - (return_code, message) = command() - return [[return_code], [message]] + unique_id, return_code = self.component_manager.enqueue(command) + + return [[return_code], [unique_id]] class OffCommand(StateModelCommand, ResponseCommand): """A class for the SKABaseDevice's Off() command.""" @@ -1359,18 +1350,6 @@ class SKABaseDevice(Device): self.logger.info(message) return (ResultCode.OK, message) - def is_Off_allowed(self): - """ - Check if command `Off` is allowed in the current device state. - - :raises :py:exc:`CommandError`: if the command is not allowed - - :return: ``True`` if the command is allowed - :rtype: boolean - """ - command = self.get_command_object("Off") - return command.is_allowed(raise_if_disallowed=True) - @command( dtype_out="DevVarLongStringArray", doc_out="(ReturnType, 'informational message')", @@ -1389,8 +1368,9 @@ class SKABaseDevice(Device): :rtype: (ResultCode, str) """ command = self.get_command_object("Off") - (return_code, message) = command() - return [[return_code], [message]] + unique_id, return_code = self.component_manager.enqueue(command) + + return [[return_code], [unique_id]] class OnCommand(StateModelCommand, ResponseCommand): """A class for the SKABaseDevice's On() command.""" @@ -1427,19 +1407,6 @@ class SKABaseDevice(Device): self.logger.info(message) return (ResultCode.OK, message) - def is_On_allowed(self): - """ - Check if command `On` is allowed in the current device state. - - :raises :py:exc:`CommandError`: if the command is not - allowed - - :return: ``True`` if the command is allowed - :rtype: boolean - """ - command = self.get_command_object("On") - return command.is_allowed(raise_if_disallowed=True) - @command( dtype_out="DevVarLongStringArray", doc_out="(ReturnType, 'informational message')", @@ -1458,8 +1425,9 @@ class SKABaseDevice(Device): :rtype: (ResultCode, str) """ command = self.get_command_object("On") - (return_code, message) = command() - return [[return_code], [message]] + unique_id, return_code = self.component_manager.enqueue(command) + + return [[return_code], [unique_id]] class AbortCommandsCommand(ResponseCommand): """The command class for the AbortCommand command.""" @@ -1482,7 +1450,7 @@ class SKABaseDevice(Device): Abort the currently executing LRC and remove all enqueued LRCs. """ - # implementation details to be added + self.target.abort_tasks() return (ResultCode.OK, "Aborting") @command( @@ -1491,8 +1459,8 @@ class SKABaseDevice(Device): @DebugIt() def AbortCommands(self): """Empty out long running commands in queue.""" - handler = self.get_command_object("AbortCommands") - (return_code, message) = handler() + command = self.get_command_object("AbortCommands") + (return_code, message) = command() return [[return_code], [message]] class CheckLongRunningCommandStatusCommand(ResponseCommand): @@ -1520,23 +1488,23 @@ class SKABaseDevice(Device): :param argin: The command ID :type argin: str - :return: The resultcode for this command and the code for the state + :return: The resultcode for this command and the string of the TaskState :rtype: tuple - (ResultCode.OK, LongRunningCommandState) + (ResultCode.OK, str) """ - # implementation details to be added - return (ResultCode.OK, LongRunningCommandState.NOT_FOUND) + result = self.target.get_task_state(argin) + return (ResultCode.OK, f"{result}") @command( dtype_in=str, - dtype_out="DevVarShortArray", + dtype_out="DevVarLongStringArray", ) @DebugIt() def CheckLongRunningCommandStatus(self, argin): """Check the status of a long running command by ID.""" - handler = self.get_command_object("CheckLongRunningCommandStatus") - (return_code, command_state) = handler(argin) - return [return_code, command_state] + command = self.get_command_object("CheckLongRunningCommandStatus") + (return_code, command_state) = command(argin) + return [[return_code], [command_state]] class DebugDeviceCommand(BaseCommand): """A class for the SKABaseDevice's DebugDevice() command.""" diff --git a/src/ska_tango_base/base/component_manager.py b/src/ska_tango_base/base/component_manager.py index 6707bdec3088257064d6d76fd98c8dcee3296dfc..824c994f0a92dcaba38266e7745b824bbec9f035 100644 --- a/src/ska_tango_base/base/component_manager.py +++ b/src/ska_tango_base/base/component_manager.py @@ -23,7 +23,12 @@ The basic model is: the component to change behaviour and/or state; and it *monitors* its component by keeping track of its state. """ +from typing import Any, Optional, Tuple + +from ska_tango_base.commands import BaseCommand, ResultCode + from ska_tango_base.control_model import PowerMode +from ska_tango_base.base.task_queue_manager import QueueManager, TaskState class BaseComponentManager: @@ -49,6 +54,7 @@ class BaseComponentManager: manager """ self.op_state_model = op_state_model + self._queue_manager = self.create_queue_manager() def start_communicating(self): """ @@ -113,6 +119,51 @@ class BaseComponentManager: """ raise NotImplementedError("BaseComponentManager is abstract.") + @property + def tasks_in_queue(self): + """ + Read the long running commands in the queue. + + :return: tasks in the queue + """ + return self._queue_manager.tasks_in_queue + + @property + def task_ids_in_queue(self): + """ + Read the IDs of the long running commands in the queue. + + :return: unique ids for the enqueued commands + """ + return self._queue_manager.task_ids_in_queue + + @property + def task_status(self): + """ + Read the status of the currently executing long running commands. + + :return: ID, status pairs of the currently executing commands + """ + return self._queue_manager.task_status + + @property + def task_progress(self): + """ + Read the progress of the currently executing long running command. + + :return: ID, progress of the currently executing command. + """ + return self._queue_manager.task_progress + + @property + def task_result(self): + """ + Read the result of the completed long running command. + + :return: ID, ResultCode, result. + """ + return list(self._queue_manager.task_result) + def off(self): """Turn the component off.""" raise NotImplementedError("BaseComponentManager is abstract.") @@ -155,3 +206,38 @@ class BaseComponentManager: This is a callback hook. """ self.op_state_model.perform_action("component_fault") + + def create_queue_manager(self) -> QueueManager: + """Create a QueueManager. + + By default the QueueManager will not have a queue or workers. Thus + tasks enqueued will execute synchronously. + + :return: The queue manager. + :rtype: QueueManager + """ + return QueueManager(max_queue_size=0, num_workers=0) + + def enqueue( + self, + task: BaseCommand, + argin: Optional[Any] = None, + ) -> Tuple[str, ResultCode]: + """Put `task` on the queue. The unique ID for it is returned. + + :param task: The task to execute in the thread + :type task: BaseCommand + :param argin: The parameter for the command + :type argin: Any + :return: The unique ID of the queued command and the ResultCode + :rtype: tuple + """ + return self._queue_manager.enqueue_task(task, argin=argin) + + def abort_tasks(self) -> None: + """Start aborting tasks on the queue.""" + self._queue_manager.abort_tasks() + + def get_task_state(self, unique_id: str) -> TaskState: + """Attempt to get state of QueueTask.""" + return self._queue_manager.get_task_state(unique_id) diff --git a/src/ska_tango_base/base/reference_component_manager.py b/src/ska_tango_base/base/reference_component_manager.py index 065a348bd38821aa72c3006ecfe63b5322a4221a..8a8f94e33f543212c84592e0123a5f4f307dc185 100644 --- a/src/ska_tango_base/base/reference_component_manager.py +++ b/src/ska_tango_base/base/reference_component_manager.py @@ -1,12 +1,15 @@ """ -This module provided a reference implementation of a BaseComponentManager. +This module provided reference implementations of a BaseComponentManager. It is provided for explanatory purposes, and to support testing of this package. """ import functools +import logging +from typing import Optional, Callable -from ska_tango_base.base import BaseComponentManager +from ska_tango_base.base import BaseComponentManager, OpStateModel +from ska_tango_base.base.task_queue_manager import QueueManager from ska_tango_base.control_model import PowerMode from ska_tango_base.faults import ComponentFault @@ -389,3 +392,51 @@ class ReferenceBaseComponentManager(BaseComponentManager): This is a callback hook. """ self.op_state_model.perform_action("component_fault") + + +class QueueWorkerComponentManager(ReferenceBaseComponentManager): + """A component manager that configures the queue manager.""" + + def __init__( + self, + op_state_model: Optional[OpStateModel], + logger: logging.Logger, + max_queue_size: int, + num_workers: int, + push_change_event: Optional[Callable], + *args, + **kwargs + ): + """Component manager that configures the queue. + + :param op_state_model: The ops state model + :type op_state_model: OpStateModel + :param logger: Logger to use + :type logger: logging.Logger + :param max_queue_size: The size of the queue + :type max_queue_size: int + :param num_workers: The number of workers + :type num_workers: int + :param push_change_event: A method that will be called when attributes are updated + :type push_change_event: Callable + """ + self.logger = logger + self.max_queue_size = max_queue_size + self.num_workers = num_workers + self.push_change_event = push_change_event + super().__init__(op_state_model, *args, logger=logger, **kwargs) + + def create_queue_manager(self) -> QueueManager: + """Create a QueueManager. + + Create the QueueManager with the queue configured as needed. + + :return: The queue manager + :rtype: QueueManager + """ + return QueueManager( + max_queue_size=self.max_queue_size, + num_workers=self.num_workers, + logger=self.logger, + push_change_event=self.push_change_event, + ) diff --git a/src/ska_tango_base/base/task_queue_component_manager.py b/src/ska_tango_base/base/task_queue_manager.py similarity index 55% rename from src/ska_tango_base/base/task_queue_component_manager.py rename to src/ska_tango_base/base/task_queue_manager.py index d729bf074b18f6c8200a16dc89a91adedea93278..dc4a384902ea53e0b24b8d5ee167b968a36a2b9f 100644 --- a/src/ska_tango_base/base/task_queue_component_manager.py +++ b/src/ska_tango_base/base/task_queue_manager.py @@ -1,7 +1,10 @@ """ This module provides a QueueManager, TaskResult and QueueTask classes. -* **TaskResult**: is a convenience `dataclass` for parsing and formatting the +* **TaskUniqueId**: is a convenience class for parsing and generating the IDs used + to identify the tasks. + +* **TaskResult**: is a convenience class for parsing and formatting the results of a task. * **QueueTask**: is a class that instances of which can be added to the queue for execution @@ -9,6 +12,12 @@ This module provides a QueueManager, TaskResult and QueueTask classes. * **QueueManager**: that implements the queue and thread worker functionality. +************ +TaskUniqueId +************ + +This is a simple convenience class for generating and parsing the IDs that identify tasks. + ********** TaskResult ********** @@ -26,76 +35,12 @@ be made available as a Tango device attribute named `command_result`. It will be .. code-block:: py from ska_tango_base.base.task_queue_component_manager import TaskResult - tr = TaskResult.from_task_result(["UniqueID", "0", "The task result"]) + tr = TaskResult.from_task_result(("UniqueID", "0", "The task result")) tr TaskResult(result_code=<ResultCode.OK: 0>, task_result='The task result', unique_id='UniqueID') tr.to_task_result() ('UniqueID', '0', 'The task result') -********* -QueueTask -********* - -This class should be subclassed and the `do` method implemented with the required functionality. -The `do` method will be executed by the background worker in a thread. - -`get_task_name` can be overridden if you want to change the name of the task as it would appear in -the `tasks_in_queue` property. - -Simple example: - -.. code-block:: py - - class SimpleTask(QueueTask): - def do(self): - num_one = self.args[0] - num_two = self.kwargs.get("num_two") - return num_one + num_two - - return SimpleTask(2, num_two=3) - -3 items are added dynamically by the worker thread and is available for use in the class instance. - -* **is_aborting_event**: can be check periodically to determine whether - the queue tasks have been aborted to gracefully complete the task in progress. - The thread will stay active and once `is_aborting_event` has been unset, - new tasks will be fetched from the queue for execution. - -.. code-block:: py - - class AbortTask(QueueTask): - def do(self): - sleep_time = self.args[0] - while not self.is_aborting_event.is_set(): - time.sleep(sleep_time) - - return AbortTask(0.2) - -* **is_stopping_event**: can be check periodically to determine whether - the queue tasks have been stopped. In this case the thread will complete. - -.. code-block:: py - - class StopTask(QueueTask): - def do(self): - assert not self.is_stopping_event.is_set() - while not self.is_stopping_event.is_set(): - pass - - return StopTask() - -* **update_progress**: a callback that can be called wth the current progress - of the task in progress - -.. code-block:: py - - class ProgressTask(QueueTask): - def do(self): - for i in range(100): - self.update_progress(str(i)) - time.sleep(0.5) - - return ProgressTask() ************ QueueManager @@ -134,8 +79,8 @@ Aborting tasks When `abort_tasks` is called on the queue manager the following will happen. -* Any tasks in progress will complete. Tasks that check `is_aborting_event` will know to complete otherwise - it will complete as per normal. +* Any tasks in progress will complete. Tasks that check `aborting_event` periodically will know to complete + otherwise it will complete as per normal. * Any tasks on the queue will be removed and their result set to ABORTED. They will not be executed. @@ -150,7 +95,7 @@ Stopping tasks Once `stop_tasks` is called the worker threads completes as soon as possible. -* Any tasks in progress will complete. Tasks that check `is_stopping_event` will know to exit gracefully. +* Any tasks in progress will complete. Tasks that check `stopping_event` will know to exit gracefully. * The thread will cease. @@ -167,15 +112,19 @@ import logging import threading import time import traceback +from uuid import uuid4 from queue import Empty, Queue +from datetime import datetime from threading import Event -from typing import Any, Callable, Dict, Optional, Tuple -from attr import dataclass +from inspect import signature +from typing import Any, Callable, Dict, Optional, Tuple, Union import tango -from ska_tango_base.base.component_manager import BaseComponentManager -from ska_tango_base.commands import ResultCode +from ska_tango_base.commands import BaseCommand, ResultCode + +MAX_QUEUE_SIZE = 100 # Maximum supported size of the queue +MAX_WORKER_COUNT = 50 # Maximum number of workers supported class TaskState(enum.IntEnum): @@ -212,7 +161,40 @@ class TaskState(enum.IntEnum): """ -@dataclass +class TaskUniqueId: + """Convenience class for the unique ID of a task.""" + + def __init__(self, id_uuid: str, id_datetime: datetime, id_task_name: str) -> None: + """Create a TaskUniqueId instance. + + :param id_uuid: The uuid portion of the task identifier + :type id_uuid: str + :param id_datetime: The datetime portion of the task identifier + :type id_datetime: datetime + :param id_task_name: The task name portion of the task identifier + :type id_task_name: str + """ + self.id_uuid = id_uuid + self.id_datetime = id_datetime + self.id_task_name = id_task_name + + @classmethod + def generate_unique_id(cls, task_name: str) -> str: + """Return a new unique ID.""" + return f"{time.time()}_{uuid4().fields[-1]}_{task_name}" + + @classmethod + def from_unique_id(cls, unique_id: str): + """Parse a unique ID.""" + parts = unique_id.split("_") + id_uuid = parts[1] + id_datetime = datetime.fromtimestamp(float(parts[0])) + id_task_name = "_".join(parts[2:]) + return TaskUniqueId( + id_uuid=id_uuid, id_datetime=id_datetime, id_task_name=id_task_name + ) + + class TaskResult: """Convenience class for results.""" @@ -220,25 +202,41 @@ class TaskResult: task_result: str unique_id: str - def to_task_result(self) -> Tuple[str]: + def __init__( + self, result_code: ResultCode, task_result: str, unique_id: str + ) -> None: + """Create the TaskResult. + + :param result_code: The ResultCode of the task result + :type result_code: ResultCode + :param task_result: The string of the task result + :type task_result: str + :param unique_id: The unique identifier of a task. + :type unique_id: str + """ + self.result_code = result_code + self.task_result = task_result + self.unique_id = unique_id + + def to_task_result(self) -> Tuple[str, str, str]: """Convert TaskResult to task_result. :return: The task result - :rtype: list[str] + :rtype: tuple[str, str, str] """ - return (f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}") + return f"{self.unique_id}", f"{int(self.result_code)}", f"{self.task_result}" @classmethod - def from_task_result(cls, task_result: list) -> TaskResult: - """Convert task_result list to TaskResult. + def from_task_result(cls, task_result: Tuple[str, str, str]) -> TaskResult: + """Convert task_result tuple to TaskResult. - :param task_result: The task_result [unique_id, result_code, task_result] - :type task_result: list + :param task_result: The task_result (unique_id, result_code, task_result) + :type task_result: tuple :return: The task result :rtype: TaskResult :raises: ValueError """ - if len(task_result) != 3: + if not task_result or len(task_result) != 3: raise ValueError(f"Cannot parse task_result {task_result}") return TaskResult( @@ -247,59 +245,28 @@ class TaskResult: unique_id=task_result[0], ) + @classmethod + def from_response_command(cls, command_result: Tuple[str, str]) -> TaskResult: + """Convert from ResponseCommand to TaskResult. -class QueueTask: - """A task that can be put on the queue.""" - - def __init__(self: QueueTask, *args, **kwargs) -> None: - """Create the task. args and kwargs are stored and should be referenced in the `do` method.""" - self.args = args - self.kwargs = kwargs - self._update_progress_callback = None - - @property - def is_aborting_event(self) -> threading.Event: - """Worker adds is_aborting_event threading event. - - Indicates whether task execution have been aborted. - - :return: The is_aborted event. - :rtype: threading.Event - """ - return self.kwargs.get("is_aborting_event") - - @property - def is_stopping_event(self) -> threading.Event: - """Worker adds is_stopping_event threading event. - - Indicates whether task execution have been stopped. - - :return: The is_stopping event. - :rtype: threading.Event - """ - return self.kwargs.get("is_stopping_event") - - def update_progress(self, progress: str): - """Call the callback to update the progress. - - :param progress: String that to indicate progress of task - :type progress: str + :param command_result: The task_result (unique_id, result_code) + :type command_result: tuple + :return: The task result + :rtype: TaskResult + :raises: ValueError """ - self._update_progress_callback = self.kwargs.get("update_progress_callback") - if self._update_progress_callback: - self._update_progress_callback(progress) - - def get_task_name(self) -> str: - """Return a custom task name. + if not command_result or len(command_result) != 2: + raise ValueError(f"Cannot parse task_result {command_result}") - :return: The name of the task - :rtype: str - """ - return self.__class__.__name__ + return TaskResult( + result_code=ResultCode(int(command_result[1])), + task_result="", + unique_id=command_result[0], + ) - def do(self: QueueTask) -> Any: - """Implement this method with your functionality.""" - raise NotImplementedError + def get_task_unique_id(self) -> TaskUniqueId: + """Convert from the unique_id string to TaskUniqueId.""" + return TaskUniqueId.from_unique_id(self.unique_id) class QueueManager: @@ -313,8 +280,10 @@ class QueueManager: queue: Queue, logger: logging.Logger, stopping_event: Event, + aborting_event: Event, result_callback: Callable, update_command_state_callback: Callable, + update_progress_callback: Callable, queue_fetch_timeout: int = 0.1, ) -> None: """Initiate a worker. @@ -327,16 +296,19 @@ class QueueManager: :type logger: logging.Logger :param stopping_event: Indicates whether to get more tasks off the queue :type stopping_event: Event + :param aborting_event: Indicates whether the queue is being aborted + :type aborting_event: Event :param update_command_state_callback: Callback to update command state :type update_command_state_callback: Callable """ super().__init__() self._work_queue = queue self._logger = logger - self.is_stopping = stopping_event - self.is_aborting = threading.Event() + self.stopping_event = stopping_event + self.aborting_event = aborting_event self._result_callback = result_callback self._update_command_state_callback = update_command_state_callback + self._update_progress_callback = update_progress_callback self._queue_fetch_timeout = queue_fetch_timeout self.current_task_progress: Optional[str] = None self.current_task_id: Optional[str] = None @@ -346,19 +318,19 @@ class QueueManager: """Run in the thread. Tasks are fetched off the queue and executed. - if _is_stopping is set the thread wil exit. - If _is_aborting is set the queue will be emptied. All new commands will be aborted until - is_aborting cleared. + if stopping_event is set the thread will exit. + If aborting_event is set the queue will be emptied. All new commands will be aborted until + aborting_event cleared. """ with tango.EnsureOmniThread(): - while not self.is_stopping.is_set(): + while not self.stopping_event.is_set(): self.current_task_id = None self.current_task_progress = "" - if self.is_aborting.is_set(): - # Drain the Queue since self.is_aborting is set + if self.aborting_event.is_set(): + # Drain the Queue since self.aborting_event is set while not self._work_queue.empty(): - unique_id, _ = self._work_queue.get() + unique_id, _, _ = self._work_queue.get() self.current_task_id = unique_id self._logger.warning("Aborting task ID [%s]", unique_id) result = TaskResult( @@ -369,46 +341,68 @@ class QueueManager: time.sleep(self._queue_fetch_timeout) continue # Don't try and get work off the queue below, continue next loop try: - (unique_id, task) = self._work_queue.get( + (unique_id, task, argin) = self._work_queue.get( block=True, timeout=self._queue_fetch_timeout ) self._update_command_state_callback(unique_id, "IN_PROGRESS") self.current_task_id = unique_id - # Inject is_aborting, is_stopping, progress_update into task - task.kwargs["is_aborting_event"] = self.is_aborting - task.kwargs["is_stopping_event"] = self.is_stopping - task.kwargs[ - "update_progress_callback" - ] = self._update_progress_callback - result = self.execute_task(task, unique_id) + setattr(task, "update_progress", self._update_task_progress) + result = self.execute_task(task, argin, unique_id) self._result_callback(result) self._work_queue.task_done() except Empty: continue return - def _update_progress_callback(self, progress: str) -> None: + def _update_task_progress(self, progress: str) -> None: """Update the current task progress. :param progress: An indication of progress :type progress: str """ self.current_task_progress = progress + self._update_progress_callback() @classmethod - def execute_task(cls, task: QueueTask, unique_id: str) -> TaskResult: + def execute_task( + cls, task: BaseCommand, argin: Any, unique_id: str + ) -> TaskResult: """Execute a task, return results in a standardised format. :param task: Task to execute - :type task: QueueTask + :type task: BaseCommand + :param argin: The argument for the command + :type argin: Any :param unique_id: The task unique ID :type unique_id: str :return: The result of the task :rtype: TaskResult """ try: - result = TaskResult(ResultCode.OK, f"{task.do()}", unique_id) + if hasattr(task, "is_allowed"): + is_allowed_signature = signature(task.is_allowed) + if "raise_if_disallowed" in is_allowed_signature.parameters: + is_task_allowed = task.is_allowed(raise_if_disallowed=True) + else: + is_task_allowed = task.is_allowed() + if not is_task_allowed: + return TaskResult( + ResultCode.NOT_ALLOWED, "Command not allowed", unique_id + ) + if argin: + result = task(argin) + else: + result = task() + # If the response is (ResultCode, Any) + if ( + isinstance(result, tuple) + and len(result) == 2 + and isinstance(result[0], ResultCode) + ): + return TaskResult(result[0], f"{result[1]}", unique_id) + # else set as OK and return the string of whatever the result was + result = TaskResult(ResultCode.OK, f"{result}", unique_id) except Exception as err: result = TaskResult( ResultCode.FAILED, @@ -419,35 +413,42 @@ class QueueManager: def __init__( self: QueueManager, - logger: logging.Logger, max_queue_size: int = 0, queue_fetch_timeout: float = 0.1, num_workers: int = 0, - on_property_update_callback: Optional[Callable] = None, + logger: Optional[logging.Logger] = None, + push_change_event: Optional[Callable] = None, ): """Init QueryManager. Creates the queue and starts the thread that will execute tasks from it. - :param logger: Python logger - :type logger: logging.Logger :param max_queue_size: The maximum size of the queue :type max_queue_size: int :param max_queue_size: The time to wait for items in the queue :type max_queue_size: float :param num_workers: The number of worker threads to start :type num_workers: float + :param logger: Python logger + :type logger: logging.Logger """ - self._logger = logger + if max_queue_size > MAX_QUEUE_SIZE: + raise ValueError(f"A maximum queue size of {MAX_QUEUE_SIZE} is supported") + if num_workers > MAX_WORKER_COUNT: + raise ValueError( + f"A maximum number of {MAX_WORKER_COUNT} workers is supported" + ) self._max_queue_size = max_queue_size self._work_queue = Queue(self._max_queue_size) self._queue_fetch_timeout = queue_fetch_timeout - self._on_property_update_callback = on_property_update_callback - self.is_stopping = threading.Event() + self._push_change_event = push_change_event + self.stopping_event = threading.Event() + self.aborting_event = threading.Event() self._property_update_lock = threading.Lock() + self._logger = logger if logger else logging.getLogger(__name__) - self._task_result = () + self._task_result: Union[Tuple[str, str, str], Tuple[()]] = () self._tasks_in_queue: Dict[str, str] = {} # unique_id, task_name self._task_status: Dict[str, str] = {} # unique_id, status self._threads = [] @@ -460,9 +461,11 @@ class QueueManager: self.Worker( self._work_queue, self._logger, - self.is_stopping, + self.stopping_event, + self.aborting_event, self.result_callback, self.update_task_state_callback, + self.update_progress_callback, ) for _ in range(num_workers) ] @@ -479,7 +482,7 @@ class QueueManager: return self._work_queue.full() @property - def task_result(self) -> Tuple[str]: + def task_result(self) -> Union[Tuple[str, str, str], Tuple[()]]: """Return the last task result. :return: Last task result @@ -488,74 +491,99 @@ class QueueManager: return self._task_result @property - def task_ids_in_queue(self) -> list: + def task_ids_in_queue( + self, + ) -> Tuple[str,]: # noqa: E231 """Task IDs in the queue. :return: The task IDs in the queue - :rtype: list + :rtype: tuple """ - return list(self._tasks_in_queue.keys()) + return tuple(self._tasks_in_queue.keys()) @property - def tasks_in_queue(self) -> list: + def tasks_in_queue( + self, + ) -> Tuple[str,]: # noqa: E231 """Task names in the queue. :return: The list of task names in the queue - :rtype: list + :rtype: tuple """ - return list(self._tasks_in_queue.values()) + return tuple(self._tasks_in_queue.values()) @property - def task_status(self) -> Dict[str, str]: + def task_status( + self, + ) -> Tuple[str,]: # noqa: E231 """Return task status. - :return: The task status - :rtype: Dict[str, str] + :return: The task status pairs (id, status) + :rtype: tuple(str,) """ - return self._task_status.copy() + statuses = [] + for u_id, status in self._task_status.copy().items(): + statuses.append(u_id) + statuses.append(status) + return tuple(statuses) @property - def task_progress(self) -> Dict[str, str]: + def task_progress( + self, + ) -> Tuple[Optional[str],]: # noqa: E231 """Return the task progress. - :return: The task progress - :rtype: Dict[str, str] + :return: The task progress pairs (id, progress) + :rtype: tuple(str,) """ - progress = {} + progress = [] for worker in self._threads: if worker.current_task_id: - progress[worker.current_task_id] = worker.current_task_progress - return progress + progress.append(worker.current_task_id) + progress.append(worker.current_task_progress) + return tuple(progress) - def enqueue_task(self, task: QueueTask) -> str: + def enqueue_task( + self, task: BaseCommand, argin: Optional[Any] = None + ) -> Tuple[str, ResultCode]: """Add the task to be done onto the queue. :param task: The task to execute in a thread - :type task: QueueTask + :type task: BaseCommand + :param argin: The parameter for the command + :type argin: Any :return: The unique ID of the command :rtype: string """ - unique_id = self.get_unique_id(task.get_task_name()) + unique_id = self.generate_unique_id(task.__class__.__name__) + + # Inject the events into the task + setattr(task, "aborting_event", self.aborting_event) + setattr(task, "stopping_event", self.stopping_event) # If there is no queue, just execute the command and return if self._max_queue_size == 0: self.update_task_state_callback(unique_id, "IN_PROGRESS") - result = self.Worker.execute_task(task, unique_id) + + # This task blocks, so no need to update progress + setattr(task, "update_progress", lambda x: None) + + result = self.Worker.execute_task(task, argin, unique_id) self.result_callback(result) - return unique_id + return unique_id, result.result_code if self.queue_full: self.result_callback( TaskResult(ResultCode.REJECTED, "Queue is full", unique_id) ) - return unique_id + return unique_id, ResultCode.REJECTED - self._work_queue.put([unique_id, task]) + self._work_queue.put([unique_id, task, argin]) with self._property_update_lock: - self._tasks_in_queue[unique_id] = task.get_task_name() - self._on_property_change("tasks_in_queue") - self._on_property_change("task_ids_in_queue") - return unique_id + self._tasks_in_queue[unique_id] = task.__class__.__name__ + self._on_property_change("longRunningCommandsInQueue", self.tasks_in_queue) + self._on_property_change("longRunningCommandIDsInQueue", self.task_ids_in_queue) + return unique_id, ResultCode.QUEUED def result_callback(self, task_result: TaskResult): """Run when the task, taken from the queue, have completed to update the appropriate attributes. @@ -567,7 +595,13 @@ class QueueManager: if task_result.unique_id in self._task_status: del self._task_status[task_result.unique_id] self._task_result = task_result.to_task_result() - self._on_property_change("task_result") + + # Once the queue is cleared and all the work in progress have completed, clear + # the aborting state. + if self.is_aborting and self._work_queue.empty() and (not self.task_status): + self.resume_tasks() + + self._on_property_change("longRunningCommandResult", self.task_result) def update_task_state_callback(self, unique_id: str, status: str): """Update the executing task state. @@ -580,45 +614,49 @@ class QueueManager: if unique_id in self._tasks_in_queue: with self._property_update_lock: del self._tasks_in_queue[unique_id] - self._on_property_change("task_ids_in_queue") - self._on_property_change("tasks_in_queue") + self._on_property_change("longRunningCommandsInQueue", self.tasks_in_queue) + self._on_property_change( + "longRunningCommandIDsInQueue", self.task_ids_in_queue + ) with self._property_update_lock: self._task_status[unique_id] = status - self._on_property_change("task_status") + self._on_property_change("longRunningCommandStatus", self.task_status) + + def update_progress_callback(self): + """Trigger the property change callback back to the device.""" + self._on_property_change("longRunningCommandProgress", self.task_progress) - def _on_property_change(self, property_name: str): + def _on_property_change(self, property_name: str, property_value: Any): """Trigger when a property changes value. :param property_name: The property name :type property_name: str + :param property_name: The property value + :type property_name: Any """ - if self._on_property_update_callback: - self._on_property_update_callback( - property_name, getattr(self, property_name) - ) + if self._push_change_event: + self._push_change_event(property_name, property_value) def abort_tasks(self): """Start aborting tasks.""" - for worker in self._threads: - worker.is_aborting.set() + self.aborting_event.set() def resume_tasks(self): """Unsets aborting so tasks can be picked up again.""" - for worker in self._threads: - worker.is_aborting.clear() + self.aborting_event.clear() def stop_tasks(self): - """Set is_stopping on each thread so it exists out. Killing the thread.""" - self.is_stopping.set() + """Set stopping_event on each thread so it exists out. Killing the thread.""" + self.stopping_event.set() @property def is_aborting(self) -> bool: - """Return False if any of the threads are aborting.""" - return all([worker.is_aborting.is_set() for worker in self._threads]) + """Return whether we are in aborting state.""" + return self.aborting_event.is_set() @classmethod - def get_unique_id(cls, task_name) -> str: + def generate_unique_id(cls, task_name) -> str: """Generate a unique ID for the task. :param task_name: The name of the task @@ -626,7 +664,7 @@ class QueueManager: :return: The unique ID of the task :rtype: string """ - return f"{time.time()}_{task_name}" + return TaskUniqueId.generate_unique_id(task_name) def get_task_state(self, unique_id: str) -> TaskState: """Attempt to get state of QueueTask. @@ -644,27 +682,11 @@ class QueueManager: if unique_id in self.task_ids_in_queue: return TaskState.QUEUED - if unique_id in self.task_status.keys(): + if unique_id in self.task_status: return TaskState.IN_PROGRESS return TaskState.NOT_FOUND - def __del__(self) -> None: - """Release resources prior to instance deletion. - - - Set the workers to aborting, this will empty out the queue and set the result code - for each task to `Aborted`. - - Wait for the queues to empty out. - - Set the workers to stopping, this will exit out the running thread. - """ - if not self._threads: - return - - self.abort_tasks() - self._work_queue.join() - for worker in self._threads: - worker.is_stopping.set() - def __len__(self) -> int: """Approximate length of the queue. @@ -673,37 +695,10 @@ class QueueManager: """ return self._work_queue.qsize() + def __bool__(self): + """Ensure `if QueueManager()` works as expected with `__len__` being overridden. -class TaskQueueComponentManager(BaseComponentManager): - """A component manager that provides message queue functionality.""" - - def __init__( - self: TaskQueueComponentManager, - message_queue: QueueManager, - op_state_model: Any, - *args: Any, - **kwargs: Any, - ) -> None: - """Create a new component manager that puts tasks on the queue. - - :param message_queue: The queue manager instance - :type message_queue: QueueManager - :param op_state_model: The ops state model - :type op_state_model: Any - """ - self.message_queue = message_queue - - super().__init__(op_state_model, *args, **kwargs) - - def enqueue( - self, - task: QueueTask, - ) -> str: - """Put `task` on the queue. The unique ID for it is returned. - - :param task: The task to execute in the thread - :type task: QueueTask - :return: The unique ID of the queued command - :rtype: str + :return: True + :rtype: bool """ - return self.message_queue.enqueue_task(task) + return True diff --git a/src/ska_tango_base/commands.py b/src/ska_tango_base/commands.py index 7ebf3f582072ff211eafd94514c64cdf13ca515a..ff14c111acd3c128799c5bf62a68edad170cb015 100644 --- a/src/ska_tango_base/commands.py +++ b/src/ska_tango_base/commands.py @@ -381,7 +381,7 @@ class ResponseCommand(BaseCommand): f"Exiting command {self.name} with return_code " f"{return_code!s}, message: '{message}'.", ) - return (return_code, message) + return return_code, message class CompletionCommand(StateModelCommand): diff --git a/src/ska_tango_base/utils.py b/src/ska_tango_base/utils.py index 1cdfe12da26026d7f6ccd42dba7e0709402439e6..c020787c1a8730b820a54b0a65de216107c57f49 100644 --- a/src/ska_tango_base/utils.py +++ b/src/ska_tango_base/utils.py @@ -4,12 +4,16 @@ import ast import functools import inspect import json +import logging import pydoc import traceback import sys +import uuid import warnings +from dataclasses import dataclass from datetime import datetime +from typing import Any, Callable, Dict, List import tango from tango import ( @@ -20,10 +24,13 @@ from tango import ( AttrWriteType, Except, ErrSeverity, + EventData, + EventType, ) from tango import DevState from contextlib import contextmanager from ska_tango_base.faults import GroupDefinitionsError, SKABaseError +from ska_tango_base.base.task_queue_manager import TaskResult int_types = { tango._tango.CmdArgType.DevUShort, @@ -539,3 +546,171 @@ def for_testing_only(func, _testing_check=lambda: "pytest" in sys.modules): return func(*args, **kwargs) return _wrapper + + +@dataclass +class StoredCommand: + """Used to keep track of commands scheduled across devices. + + command_name: The Tango command to execute across devices. + command_id: Every Tango device will return the command ID for the + long running command submitted to it. + is_completed: Whether the command is done or not + """ + + command_name: str + command_id: str + is_completed: bool + + +class LongRunningDeviceInterface: + """This class is a convenience class for long running command devices. + + The intent of this class is that clients should not have to keep + track of command IDs or the various attributes + to determine long running command progress/results. + + This class is also useful when you want to run a long running + command across various devices. Once they all complete a callback + supplied by the user is fired. + + Using this class, a client would need to: + - Supply the Tango devices to connect to that implements long + running commands + - The Long running commands to run (including parameter) + - Optional callback that should be executed when the command + completes + + The callback will be executed once the command completes across all + devices. Thus there's no need to watch attribute changes or keep + track of commands IDs. They are handled here. + """ + + def __init__(self, tango_devices: List[str], logger: logging.Logger) -> None: + """Init LRC device interface.""" + self._logger = logger + self._tango_devices = tango_devices + self._long_running_device_proxies = [] + self._result_subscriptions = [] + self._stored_commands: Dict[str, List[StoredCommand]] = {} + self._stored_callbacks: Dict[str, Callable] = {} + + def setup(self): + """Only create the device proxy and subscribe when a command is invoked.""" + if not self._long_running_device_proxies: + for device in self._tango_devices: + self._long_running_device_proxies.append(tango.DeviceProxy(device)) + + if not self._result_subscriptions: + for device_proxy in self._long_running_device_proxies: + self._result_subscriptions.append( + device_proxy.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + self, + wait=True, + ) + ) + + def push_event(self, ev: EventData): + """Handle the attribute change events. + + For every event that comes in: + + - Update command state: + - Make sure that it's a longRunningCommandResult + - Check to see if the command ID we get from the event + is one we are keeping track of. + - If so, set that command to completed + + - Check if we should fire the callback: + Once the command across all devices have completed + (for that command) + - Check whether all have completed + - If so, fire the callback + - Clean up + """ + if ev.err: + self._logger.error("Event system DevError(s) occured: %s", str(ev.errors)) + return + + if ev.attr_value and ev.attr_value.name == "longrunningcommandresult": + if ev.attr_value.value: + # push change event to new attribute for all tango devices + # for tango_dev in self._tango_devices: + # tango_dev.push_change_event("lastResultCommandIDs", ev.attr_value.value[0]) + # tango_dev.push_change_event("lastResultCommandName", ev.attr_value.value[1]) + + event_command_id = ev.attr_value.value[0] + for stored_commands in self._stored_commands.values(): + for stored_command in stored_commands: + if stored_command.command_id == event_command_id: + stored_command.is_completed = True + + completed_group_keys = [] + for key, stored_command_group in self._stored_commands.items(): + if stored_command_group: + # Determine if all the commands in this group have completed + commands_are_completed = [ + stored_command.is_completed + for stored_command in stored_command_group + ] + if all(commands_are_completed): + completed_group_keys.append(key) + + # Get the command IDs + command_ids = [ + stored_command.command_id + for stored_command in stored_command_group + ] + command_name = stored_command_group[0].command_name + + # Trigger the callback, send command_name and command_ids + # as paramater + self._stored_callbacks[key](command_name, command_ids) + # Remove callback as the group completed + + # Clean up + # Remove callback and commands no longer needed + for key in completed_group_keys: + del self._stored_callbacks[key] + del self._stored_commands[key] + + def execute_long_running_command( + self, + command_name: str, + command_arg: Any = None, + on_completion_callback: Callable = None, + ): + """Execute the long running command with an argument if any. + + Once the commmand completes, then the `on_completion_callback` + will be executed with the EventData as parameter. + This class keeps track of the command ID and events + used to determine when this commmand has completed. + + :param command_name: A long running command that exists on the + target Tango device. + :type command_name: str + :param command_arg: The argument to be used in the long running + command method. + :type command_arg: Any, optional + :param on_completion_callback: The method to execute when the + long running command has completed. + :type on_completion_callback: callable, optional + """ + self.setup() + unique_id = uuid.uuid4() + self._stored_callbacks[unique_id] = on_completion_callback + self._stored_commands[unique_id] = [] + for device_proxy in self._long_running_device_proxies: + response = TaskResult.from_response_command( + device_proxy.command_inout(command_name, command_arg) + ) + self._stored_commands[unique_id].append( + StoredCommand( + command_name, + response.unique_id, + False, + ) + ) diff --git a/tests/long_running_tasks/conftest.py b/tests/long_running_tasks/conftest.py new file mode 100644 index 0000000000000000000000000000000000000000..1172b35468d1a23025326304dc64a9b7cee9cd2e --- /dev/null +++ b/tests/long_running_tasks/conftest.py @@ -0,0 +1,46 @@ +"""Fixtures for tests.""" +import pytest +import socket + +import tango + +from tango.test_context import get_host_ip, MultiDeviceTestContext + + +@pytest.fixture(scope="module") +def devices_to_test(request): + """Fixture for devices to test.""" + yield getattr(request.module, "devices_to_test") + + +@pytest.fixture(scope="function") +def multi_device_tango_context( + mocker, devices_to_test # pylint: disable=redefined-outer-name +): + """ + Create and return a TANGO MultiDeviceTestContext object. + + tango.DeviceProxy patched to work around a name-resolving issue. + """ + + def _get_open_port(): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.bind(("", 0)) + s.listen(1) + port = s.getsockname()[1] + s.close() + return port + + HOST = get_host_ip() + PORT = _get_open_port() + _DeviceProxy = tango.DeviceProxy + mocker.patch( + "tango.DeviceProxy", + wraps=lambda fqdn, *args, **kwargs: _DeviceProxy( + "tango://{0}:{1}/{2}#dbase=no".format(HOST, PORT, fqdn), *args, **kwargs + ), + ) + with MultiDeviceTestContext( + devices_to_test, host=HOST, port=PORT, process=True + ) as context: + yield context diff --git a/tests/long_running_tasks/reference_base_device.py b/tests/long_running_tasks/reference_base_device.py new file mode 100644 index 0000000000000000000000000000000000000000..329d1c54e8652add3617931b6ae347bdbe6ba4bb --- /dev/null +++ b/tests/long_running_tasks/reference_base_device.py @@ -0,0 +1,243 @@ +""" +This module provided a reference implementation of a BaseDevice. + +There are two versions used for testing long running commands. + - BlockingBaseDevice - Uses the default QueueManager. No threads, + thus blocking commands. + - AsyncBaseDevice - Uses the custom QueueManager. Multiple threads, + async commands/responses. + +It is provided to support testing of the BaseDevice. +""" +import time +import tango +from tango.server import command, device_property +from tango import DebugIt + +from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager +from ska_tango_base.base.base_device import SKABaseDevice +from ska_tango_base.base.task_queue_manager import ResultCode +from ska_tango_base.commands import ResponseCommand + + +class LongRunningCommandBaseTestDevice(SKABaseDevice): + """Implement commands to test queued work.""" + + client_devices = device_property(dtype="DevVarStringArray") + + def init_command_objects(self): + """Initialise the command handlers.""" + super().init_command_objects() + + self.register_command_object( + "Short", + self.ShortCommand(self.component_manager, logger=self.logger), + ) + self.register_command_object( + "NonAbortingLongRunning", + self.NonAbortingLongRunningCommand( + self.component_manager, logger=self.logger + ), + ) + self.register_command_object( + "AbortingLongRunning", + self.AbortingLongRunningCommand(self.component_manager, logger=self.logger), + ) + self.register_command_object( + "LongRunningException", + self.LongRunningExceptionCommand( + self.component_manager, logger=self.logger + ), + ) + + self.register_command_object( + "TestProgress", + self.TestProgressCommand(self.component_manager, logger=self.logger), + ) + + self.register_command_object( + "CallChildren", + self.CallChildrenCommand( + self.component_manager, logger=self.logger, devices=self.client_devices + ), + ) + + class ShortCommand(ResponseCommand): + """The command class for the Short command.""" + + def do(self, argin): + """Do command.""" + self.logger.info("In ShortCommand") + result = argin + 2 + return ResultCode.OK, result + + @command( + dtype_in=int, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def Short(self, argin): + """Short command.""" + handler = self.get_command_object("Short") + (return_code, message) = self.component_manager.enqueue(handler, argin=argin) + return f"{return_code}", f"{message}" + + class NonAbortingLongRunningCommand(ResponseCommand): + """The command class for the NonAbortingLongRunning command.""" + + def do(self, argin): + """NOTE This is an example of what _not_ to do. + + Always check self.is_aborting periodically so that the command + will exit out if long running commands are aborted. + + See the implementation of AnotherLongRunningCommand. + """ + retries = 45 + while retries > 0: + retries -= 1 + time.sleep(argin) # This command takes long + self.logger.info( + "In NonAbortingTask repeating %s", + retries, + ) + return ResultCode.OK, "Done" + + @command( + dtype_in=float, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def NonAbortingLongRunning(self, argin): + """Non AbortingLongRunning command.""" + handler = self.get_command_object("NonAbortingLongRunning") + (return_code, message) = self.component_manager.enqueue(handler, argin) + return f"{return_code}", f"{message}" + + class AbortingLongRunningCommand(ResponseCommand): + """The command class for the AbortingLongRunning command.""" + + def do(self, argin): + """Abort.""" + retries = 45 + while (not self.aborting_event.is_set()) and retries > 0: + retries -= 1 + time.sleep(argin) # This command takes long + self.logger.info("In NonAbortingTask repeating %s", retries) + + if retries == 0: # Normal finish + return ( + ResultCode.OK, + f"NonAbortingTask completed {argin}", + ) + else: # Aborted finish + return ( + ResultCode.ABORTED, + f"NonAbortingTask Aborted {argin}", + ) + + @command( + dtype_in=float, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def AbortingLongRunning(self, argin): + """AbortingLongRunning.""" + handler = self.get_command_object("AbortingLongRunning") + (return_code, message) = self.component_manager.enqueue(handler, argin) + return f"{return_code}", f"{message}" + + class LongRunningExceptionCommand(ResponseCommand): + """The command class for the LongRunningException command.""" + + def do(self): + """Throw an exception.""" + raise Exception("An error occurred") + + @command( + dtype_in=None, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def LongRunningException(self): + """Command that queues a task that raises an exception.""" + handler = self.get_command_object("LongRunningException") + (return_code, message) = self.component_manager.enqueue(handler) + return f"{return_code}", f"{message}" + + class TestProgressCommand(ResponseCommand): + """The command class for the TestProgress command.""" + + def do(self, argin): + """Do the task.""" + for progress in [1, 25, 50, 74, 100]: + self.update_progress(f"{progress}") + time.sleep(argin) + return ResultCode.OK, "OK" + + @command( + dtype_in=float, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def TestProgress(self, argin): + """Command to test the progress indicator.""" + handler = self.get_command_object("TestProgress") + (return_code, message) = self.component_manager.enqueue(handler, argin) + return f"{return_code}", f"{message}" + + class CallChildrenCommand(ResponseCommand): + """The command class for the TestProgress command.""" + + def __init__(self, target, *args, logger=None, **kwargs): + """Create ResponseCommand, add devices. + + :param target: component manager + :type target: BaseComponentManager + :param logger: logger, defaults to None + :type logger: logging.Logger, optional + """ + self.devices = kwargs.pop("devices") + super().__init__(target, *args, logger=logger, **kwargs) + + def do(self, argin): + """Call `CallChildren` on children, or block if not.""" + if self.devices: + for device in self.devices: + proxy = tango.DeviceProxy(device) + proxy.CallChildren(argin) + return ResultCode.QUEUED, f"Called children: {self.devices}" + else: + time.sleep(argin) + return ResultCode.OK, f"Slept {argin}" + + @command( + dtype_in=float, + dtype_out="DevVarStringArray", + ) + @DebugIt() + def CallChildren(self, argin): + """Command to call `CallChildren` on children, or block if not.""" + command = self.get_command_object("CallChildren") + (return_code, message) = self.component_manager.enqueue(command, argin) + return f"{return_code}", f"{message}" + + +class BlockingBaseDevice(LongRunningCommandBaseTestDevice): + """Test device that has a component manager with the default queue manager that has no workers.""" + + pass + + +class AsyncBaseDevice(LongRunningCommandBaseTestDevice): + """Test device that has a component manager with workers.""" + + def create_component_manager(self: SKABaseDevice): + """Create the component manager with a queue manager that has workers.""" + return QueueWorkerComponentManager( + op_state_model=self.op_state_model, + logger=self.logger, + max_queue_size=20, + num_workers=3, + push_change_event=self.push_change_event, + ) diff --git a/tests/long_running_tasks/test_multi_device.py b/tests/long_running_tasks/test_multi_device.py new file mode 100644 index 0000000000000000000000000000000000000000..01ab65638defc952c87209dd72882ea41894628d --- /dev/null +++ b/tests/long_running_tasks/test_multi_device.py @@ -0,0 +1,216 @@ +"""Test various Tango devices with long running commmands working together.""" +import time +import pytest + +from io import StringIO +from unittest.mock import MagicMock + +from tango.utils import EventCallback +from tango import EventType + +from reference_base_device import AsyncBaseDevice +from ska_tango_base.base.task_queue_manager import TaskResult, TaskState +from ska_tango_base.commands import ResultCode +from ska_tango_base.utils import LongRunningDeviceInterface + +# Testing a chain of calls +# On command `CallChildren` +# If the device has children: +# Call `CallChildren` on each child +# If no children: +# Sleep the time specified, simulating blocking work +# +# test/toplevel/1 +# test/midlevel/1 +# test/lowlevel/1 +# test/lowlevel/2 +# test/midlevel/2 +# test/lowlevel/3 +# test/lowlevel/4 +# test/midlevel/3 +# test/lowlevel/5 +# test/lowlevel/6 + + +devices_to_test = [ + { + "class": AsyncBaseDevice, + "devices": [ + { + "name": "test/toplevel/1", + "properties": { + "client_devices": [ + "test/midlevel/1", + "test/midlevel/2", + "test/midlevel/3", + ], + }, + }, + { + "name": "test/midlevel/1", + "properties": { + "client_devices": [ + "test/lowlevel/1", + "test/lowlevel/2", + ], + }, + }, + { + "name": "test/midlevel/2", + "properties": { + "client_devices": [ + "test/lowlevel/3", + "test/lowlevel/4", + ], + }, + }, + { + "name": "test/midlevel/3", + "properties": { + "client_devices": [ + "test/lowlevel/5", + "test/lowlevel/6", + ], + }, + }, + {"name": "test/lowlevel/1"}, + {"name": "test/lowlevel/2"}, + {"name": "test/lowlevel/3"}, + {"name": "test/lowlevel/4"}, + {"name": "test/lowlevel/5"}, + {"name": "test/lowlevel/6"}, + ], + }, +] + + +class TestMultiDevice: + """Multi-device tests.""" + + @pytest.mark.forked + @pytest.mark.timeout(6) + def test_chain(self, multi_device_tango_context): + """Test that commands flow from top to middle to low level.""" + # Top level + top_device = multi_device_tango_context.get_device("test/toplevel/1") + top_device_result_events = EventCallback(fd=StringIO()) + top_device.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + top_device_result_events, + wait=True, + ) + top_device_queue_events = EventCallback(fd=StringIO()) + top_device.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + top_device_queue_events, + wait=True, + ) + + # Mid level + mid_device = multi_device_tango_context.get_device("test/midlevel/3") + mid_device_result_events = EventCallback(fd=StringIO()) + mid_device.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + mid_device_result_events, + wait=True, + ) + mid_device_queue_events = EventCallback(fd=StringIO()) + mid_device.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + mid_device_queue_events, + wait=True, + ) + + # Low level + low_device = multi_device_tango_context.get_device("test/lowlevel/6") + low_device_result_events = EventCallback(fd=StringIO()) + low_device.subscribe_event( + "longRunningCommandResult", + EventType.CHANGE_EVENT, + low_device_result_events, + wait=True, + ) + low_device_queue_events = EventCallback(fd=StringIO()) + low_device.subscribe_event( + "longRunningCommandsInQueue", + EventType.CHANGE_EVENT, + low_device_queue_events, + wait=True, + ) + + # Call the toplevel command + # Sleep for 4 so that if a task is not queued the Tango command will time out + tr = TaskResult.from_response_command(top_device.CallChildren(4)) + assert tr.result_code == ResultCode.QUEUED + + # Get all the events + top_result_events = self.get_events(top_device_result_events, 1) + top_queue_events = self.get_events(top_device_queue_events, 1) + mid_result_events = self.get_events(mid_device_result_events, 1) + mid_queue_events = self.get_events(mid_device_queue_events, 1) + low_result_events = self.get_events(low_device_result_events, 1) + low_queue_events = self.get_events(low_device_queue_events, 1) + + # Make sure every level device command gets queued + top_queue_events[0] == ("CallChildrenCommand",) + mid_queue_events[0] == ("CallChildrenCommand",) + low_queue_events[0] == ("CallChildrenCommand",) + + top_level_taskresult = TaskResult.from_task_result(top_result_events[0]) + mid_level_taskresult = TaskResult.from_task_result(mid_result_events[0]) + low_level_taskresult = TaskResult.from_task_result(low_result_events[0]) + + # Make sure the command moved from top level to lowest level + assert ( + top_level_taskresult.task_result + == "Called children: ['test/midlevel/1', 'test/midlevel/2', 'test/midlevel/3']" + ) + assert ( + mid_level_taskresult.task_result + == "Called children: ['test/lowlevel/5', 'test/lowlevel/6']" + ) + assert low_level_taskresult.task_result == "Slept 4.0" + + @pytest.mark.forked + @pytest.mark.timeout(8) + def test_util_interface(self, multi_device_tango_context): + """Test LongRunningDeviceInterface.""" + devices = [] + for i in range(1, 5): + devices.append(f"test/lowlevel/{i}") + + mock_dunc = MagicMock() + + dev_interface = LongRunningDeviceInterface(devices, logger=None) + dev_interface.execute_long_running_command( + "CallChildren", 1.0, on_completion_callback=mock_dunc + ) + time.sleep(2) + assert mock_dunc.called + assert mock_dunc.call_args[0][0] == "CallChildren" + + task_ids = mock_dunc.call_args[0][1] + + low_device = multi_device_tango_context.get_device("test/lowlevel/1") + for id in task_ids: + res = low_device.CheckLongRunningCommandStatus(id) + if int(res[1][0]) == TaskState.COMPLETED: + break + else: + assert 0, "At least one task should be completed on device test/lowlevel/1" + + def get_events(self, event_callback: EventCallback, min_required: int): + """Keep reading events until the required count is found.""" + events = [] + while len(events) < min_required: + events = [ + i.attr_value.value + for i in event_callback.get_events() + if i.attr_value and i.attr_value.value + ] + time.sleep(0.2) + return events diff --git a/tests/long_running_tasks/test_reference_base_device.py b/tests/long_running_tasks/test_reference_base_device.py new file mode 100644 index 0000000000000000000000000000000000000000..470c5e52ffb3e141009903367dbf266a18d5fa9b --- /dev/null +++ b/tests/long_running_tasks/test_reference_base_device.py @@ -0,0 +1,201 @@ +"""Tests for the reference base device that uses queue manager.""" + +from io import StringIO + +import pytest +import time + +from unittest import mock +from tango import EventType +from tango.test_context import DeviceTestContext +from tango.utils import EventCallback +from reference_base_device import ( + BlockingBaseDevice, + AsyncBaseDevice, +) +from ska_tango_base.base.task_queue_manager import TaskResult +from ska_tango_base.commands import ResultCode + + +class TestCommands: + """Check that blocking and async commands behave the same way. + + BlockingBaseDevice - QueueManager has no threads and blocks tasks + AsyncBaseDevice - QueueManager has multiple threads, tasks run from queue + """ + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_short_command(self): + """Test a simple command.""" + for class_name in [BlockingBaseDevice, AsyncBaseDevice]: + with DeviceTestContext(class_name, process=True) as proxy: + proxy.Short(1) + # Wait for a result, if the task does not abort, we'll time out here + while not proxy.longRunningCommandResult: + time.sleep(0.1) + + result = TaskResult.from_task_result(proxy.longRunningCommandResult) + assert result.result_code == ResultCode.OK + assert result.get_task_unique_id().id_task_name == "ShortCommand" + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_non_aborting_command(self): + """Test tasks that does not abort.""" + for class_name in [BlockingBaseDevice, AsyncBaseDevice]: + with DeviceTestContext(class_name, process=True) as proxy: + proxy.NonAbortingLongRunning(0.01) + # Wait for a result, if the task does not abort, we'll time out here + while not proxy.longRunningCommandResult: + time.sleep(0.1) + result = TaskResult.from_task_result(proxy.longRunningCommandResult) + assert result.result_code == ResultCode.OK + assert ( + result.get_task_unique_id().id_task_name + == "NonAbortingLongRunningCommand" + ) + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_aborting_command(self): + """Test Abort. + + BlockingBaseDevice will block on `AbortingLongRunning` so calling + AbortCommands after that makes no sense. + """ + with DeviceTestContext(AsyncBaseDevice, process=True) as proxy: + unique_id, _ = proxy.AbortingLongRunning(0.5) + # Wait for the task to be in progress + while not proxy.longRunningCommandStatus: + time.sleep(0.1) + # Abort the tasks + proxy.AbortCommands() + # Wait for a result, if the task does not abort, we'll time out here + while not proxy.longRunningCommandResult: + time.sleep(0.1) + result = TaskResult.from_task_result(proxy.longRunningCommandResult) + assert result.unique_id == unique_id + assert result.result_code == ResultCode.ABORTED + assert "Aborted" in result.task_result + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_exception_command(self): + """Test the task that throws an error.""" + for class_name in [BlockingBaseDevice, AsyncBaseDevice]: + with DeviceTestContext(class_name, process=True) as proxy: + unique_id, _ = proxy.LongRunningException() + while not proxy.longRunningCommandResult: + time.sleep(0.1) + result = TaskResult.from_task_result(proxy.longRunningCommandResult) + assert result.unique_id == unique_id + assert result.result_code == ResultCode.FAILED + assert ( + "An error occurred Traceback (most recent call last)" + in result.task_result + ) + + +@pytest.mark.forked +def test_callbacks(): + """Check that the callback is firing that sends the push change event.""" + with mock.patch.object(AsyncBaseDevice, "push_change_event") as my_cb: + with DeviceTestContext(AsyncBaseDevice, process=False) as proxy: + # Execute some commands + proxy.TestProgress(0.5) + while not proxy.longRunningCommandResult: + time.sleep(0.1) + assert my_cb.called + + called_args = [ + (_call[0][0], _call[0][1]) for _call in my_cb.call_args_list[5:] + ] + + attribute_names = [arg[0] for arg in called_args] + assert attribute_names == [ + "longRunningCommandsInQueue", + "longRunningCommandIDsInQueue", + "longRunningCommandsInQueue", + "longRunningCommandIDsInQueue", + "longRunningCommandStatus", + "longRunningCommandProgress", + "longRunningCommandProgress", + "longRunningCommandProgress", + "longRunningCommandProgress", + "longRunningCommandProgress", + "longRunningCommandResult", + ] + + # longRunningCommandsInQueue + attribute_values = [arg[1] for arg in called_args] + assert len(attribute_values[0]) == 1 + assert attribute_values[0] == ("TestProgressCommand",) + + # longRunningCommandIDsInQueue + assert len(attribute_values[1]) == 1 + assert attribute_values[1][0].endswith("TestProgressCommand") + + # longRunningCommandsInQueue + assert not attribute_values[2] + + # longRunningCommandIDsInQueue + assert not attribute_values[3] + + # longRunningCommandStatus + assert len(attribute_values[4]) == 2 + assert attribute_values[4][0].endswith("TestProgressCommand") + assert attribute_values[4][1] == "IN_PROGRESS" + + # longRunningCommandProgress + for (index, progress) in zip(range(5, 9), ["1", "25", "50", "74", "100"]): + assert len(attribute_values[index]) == 2 + assert attribute_values[index][0].endswith("TestProgressCommand") + assert attribute_values[index][1] == progress + + # longRunningCommandResult + assert len(attribute_values[10]) == 3 + tr = TaskResult.from_task_result(attribute_values[10]) + assert tr.get_task_unique_id().id_task_name == "TestProgressCommand" + assert tr.result_code == ResultCode.OK + assert tr.task_result == "OK" + + +@pytest.mark.forked +@pytest.mark.timeout(10) +def test_events(): + """Testing the events. + + NOTE: Adding more than 1 event subscriptions leads to inconsistent results. + Sometimes misses events. + + Full callback tests (where the push events are triggered) are covered + in `test_callbacks` + """ + with DeviceTestContext(AsyncBaseDevice, process=True) as proxy: + progress_events = EventCallback(fd=StringIO()) + + proxy.subscribe_event( + "longRunningCommandProgress", + EventType.CHANGE_EVENT, + progress_events, + wait=True, + ) + + proxy.TestProgress(0.2) + + # Wait for task to finish + while not proxy.longRunningCommandResult: + time.sleep(0.1) + + # Wait for progress events + while not progress_events.get_events(): + time.sleep(0.5) + + progress_event_values = [ + event.attr_value.value + for event in progress_events.get_events() + if event.attr_value and event.attr_value.value + ] + for index, progress in enumerate(["1", "25", "50", "74", "100"]): + assert progress_event_values[index][1] == progress diff --git a/tests/long_running_tasks/test_task_queue_manager.py b/tests/long_running_tasks/test_task_queue_manager.py new file mode 100644 index 0000000000000000000000000000000000000000..f20b0e56df428ff6591fb4c31d8e4d26e692b659 --- /dev/null +++ b/tests/long_running_tasks/test_task_queue_manager.py @@ -0,0 +1,601 @@ +"""Tests for QueueManager and its component manager.""" +import logging +import time +import pytest +from unittest.mock import patch + +from ska_tango_base.commands import ResultCode +from ska_tango_base.base.task_queue_manager import ( + QueueManager, + TaskResult, + TaskState, +) +from ska_tango_base.base.reference_component_manager import QueueWorkerComponentManager +from ska_tango_base.commands import BaseCommand + +logger = logging.getLogger(__name__) + + +@pytest.fixture +def not_allowed_task(): + """Fixture for a test that throws an exception.""" + + def get_task(): + class NotAllowedTask(BaseCommand): + def do(self): + pass + + def is_allowed(self): + return False + + return NotAllowedTask(target=None) + + return get_task + + +@pytest.fixture +def not_allowed_exc_task(): + """Fixture for a test that throws an exception.""" + + def get_task(): + class NotAllowedErrorTask(BaseCommand): + def do(self): + pass + + def is_allowed(self, raise_if_disallowed=True): + raise Exception("Not allowed") + + return NotAllowedErrorTask(target=None) + + return get_task + + +@pytest.fixture +def progress_task(): + """Fixture for a test that throws an exception.""" + + def get_task(): + class ProgressTask(BaseCommand): + def do(self): + for i in range(100): + self.update_progress(str(i)) + time.sleep(0.5) + + return ProgressTask(target=None) + + return get_task + + +@pytest.fixture +def exc_task(): + """Fixture for a test that throws an exception.""" + + def get_task(): + class ExcTask(BaseCommand): + def do(self): + raise Exception("An error occurred") + + return ExcTask(target=None) + + return get_task + + +@pytest.fixture +def slow_task(): + """Fixture for a test that takes long.""" + + def get_task(): + class SlowTask(BaseCommand): + def do(self): + time.sleep(1) + + return SlowTask(target=None) + + return get_task + + +@pytest.fixture +def simple_task(): + """Fixture for a very simple task.""" + + def get_task(): + class SimpleTask(BaseCommand): + def do(self, argin): + return argin + 2 + + return SimpleTask(2) + + return get_task + + +@pytest.fixture +def abort_task(): + """Fixture for a task that aborts.""" + + def get_task(): + class AbortTask(BaseCommand): + def do(self, argin): + sleep_time = argin + while not self.aborting_event.is_set(): + time.sleep(sleep_time) + + return AbortTask(target=None) + + return get_task + + +@pytest.fixture +def stop_task(): + """Fixture for a task that stops.""" + + def get_task(): + class StopTask(BaseCommand): + def do(self): + assert not self.stopping_event.is_set() + while not self.stopping_event.is_set(): + time.sleep(0.1) + + return StopTask(target=None) + + return get_task + + +@pytest.mark.forked +class TestQueueManager: + """General QueueManager checks.""" + + def test_threads_start(self): + """Test that threads start up. Set stop and exit.""" + qm = QueueManager(max_queue_size=2, num_workers=2, logger=logger) + assert len(qm._threads) == 2 + for worker in qm._threads: + assert worker.is_alive() + assert worker.daemon + + for worker in qm._threads: + worker.stopping_event.set() + + +@pytest.mark.forked +class TestQueueManagerTasks: + """QueueManager checks for tasks executed.""" + + @pytest.mark.timeout(5) + def test_task_ids(self, simple_task): + """Check ids.""" + qm = QueueManager(max_queue_size=5, num_workers=2, logger=logger) + unique_id_one, result_code = qm.enqueue_task(simple_task(), 2) + unique_id_two, _ = qm.enqueue_task(simple_task(), 2) + assert unique_id_one.endswith("SimpleTask") + assert unique_id_one != unique_id_two + assert result_code == ResultCode.QUEUED + + @pytest.mark.timeout(5) + def test_task_is_executed(self, simple_task): + """Check that tasks are executed.""" + with patch.object(QueueManager, "result_callback") as my_cb: + qm = QueueManager(max_queue_size=5, num_workers=2, logger=logger) + unique_id_one, _ = qm.enqueue_task(simple_task(), 3) + unique_id_two, _ = qm.enqueue_task(simple_task(), 3) + + while my_cb.call_count != 2: + time.sleep(0.5) + result_one = my_cb.call_args_list[0][0][0] + result_two = my_cb.call_args_list[1][0][0] + + assert result_one.unique_id.endswith("SimpleTask") + assert result_one.unique_id == unique_id_one + assert result_two.unique_id.endswith("SimpleTask") + assert result_two.unique_id == unique_id_two + + assert result_one.result_code == ResultCode.OK + assert result_two.result_code == ResultCode.OK + + assert result_one.task_result == "5" + assert result_two.task_result == "5" + + @pytest.mark.timeout(5) + def test_task_result(self, simple_task, exc_task): + """Check task results are what's expected.""" + qm = QueueManager(max_queue_size=5, num_workers=2, logger=logger) + add_task_one = simple_task() + exc_task = exc_task() + + qm.enqueue_task(add_task_one, 3) + while not qm.task_result: + time.sleep(0.5) + task_result = TaskResult.from_task_result(qm.task_result) + assert task_result.unique_id.endswith("SimpleTask") + assert task_result.result_code == ResultCode.OK + assert task_result.task_result == "5" + + qm.enqueue_task(exc_task) + while qm.task_result[0].endswith("SimpleTask"): + time.sleep(0.5) + task_result = TaskResult.from_task_result(qm.task_result) + assert task_result.unique_id.endswith("ExcTask") + assert task_result.result_code == ResultCode.FAILED + assert task_result.task_result.startswith( + "Error: An error occurred Traceback (" + ) + + @pytest.mark.timeout(10) + def test_full_queue(self, slow_task): + """Check full queues rejects new commands.""" + with patch.object(QueueManager, "result_callback") as my_cb: + qm = QueueManager(max_queue_size=1, num_workers=1, logger=logger) + for i in range(10): + qm.enqueue_task(slow_task()) + + while len(my_cb.call_args_list) != 10: + time.sleep(0.5) + + results = [i[0][0].result_code for i in my_cb.call_args_list] + # 9/10 should be rejected since the first is busy and the queue length is 1 + # Give a buffer of 2 just in case a task finishes up quicker than expected + assert results[-1] == ResultCode.OK + for res in results[:-3]: + assert res == ResultCode.REJECTED + + with patch.object(QueueManager, "result_callback") as my_cb: + qm = QueueManager(max_queue_size=2, num_workers=2, logger=logger) + for i in range(10): + qm.enqueue_task(slow_task()) + + while len(my_cb.call_args_list) != 10: + time.sleep(0.5) + results = [i[0][0].result_code for i in my_cb.call_args_list] + # 8/10 should be rejected since two are taken to be processed. + # Give a buffer of 2 just in case a task finishes up quicker than expected + assert results[-1] == ResultCode.OK + assert results[-2] == ResultCode.OK + for res in results[:-4]: + assert res == ResultCode.REJECTED + + @pytest.mark.timeout(5) + def test_zero_queue(self, simple_task): + """Check task_result is the same between queue and non queue.""" + expected_name = "SimpleTask" + expected_result_code = ResultCode.OK + expected_result = "5" + + # No Queue + qm = QueueManager(max_queue_size=0, num_workers=1, logger=logger) + assert len(qm._threads) == 0 + res, _ = qm.enqueue_task(simple_task(), 3) + assert res.endswith(expected_name) + assert qm.task_result[0].endswith(expected_name) + assert int(qm.task_result[1]) == expected_result_code + assert qm.task_result[2] == expected_result + + # Queue + qm = QueueManager(max_queue_size=2, num_workers=1, logger=logger) + res, _ = qm.enqueue_task(simple_task(), 3) + assert res.endswith(expected_name) + + # Wait for the task to be picked up + while not qm.task_result: + time.sleep(0.5) + assert qm.task_result[0].endswith(expected_name) + assert int(qm.task_result[1]) == expected_result_code + assert qm.task_result[2] == expected_result + + @pytest.mark.timeout(5) + def test_multi_jobs(self, slow_task): + """Test that multiple threads are working. Test that attribute updates fires.""" + num_of_workers = 3 + + with patch.object(QueueManager, "_on_property_change") as call_back_func: + + qm = QueueManager( + max_queue_size=5, + num_workers=num_of_workers, + logger=logger, + ) + unique_ids = [] + for _ in range(4): + unique_id, _ = qm.enqueue_task(slow_task()) + unique_ids.append(unique_id) + + # Wait for a item on the queue + while not qm.task_ids_in_queue: + time.sleep(0.1) + + # Wait for the queue to empty + while not qm.task_status: + time.sleep(0.1) + + # Wait for all the callbacks to fire + while len(call_back_func.call_args_list) < 24: + time.sleep(0.1) + + all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list] + tasks_in_queue = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandsInQueue" + ] + task_ids_in_queue = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandIDsInQueue" + ] + task_status = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandStatus" + ] + task_result = [ + a_call[1] + for a_call in all_passed_params + if a_call[0] == "longRunningCommandResult" + ] + task_result_ids = [res[0] for res in task_result] + + assert len(tasks_in_queue) == 8 + assert len(task_ids_in_queue) == 8 + + # Since there's 3 workers there should at least once be 3 in progress + for status in task_status: + if len(status) == 2 * num_of_workers: + break + else: + assert 0, f"Length of {num_of_workers} in task_status not found" + assert len(task_result) == 4 + for unique_id in unique_ids: + assert unique_id in task_result_ids + + def test_task_get_state_completed(self, simple_task): + """Test the QueueTask get state is completed.""" + qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) + unique_id_one, _ = qm.enqueue_task(simple_task(), 3) + while not qm.task_result: + time.sleep(0.1) + assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED + + def test_task_get_state_in_queued(self, slow_task): + """Test the QueueTask get state is queued.""" + qm = QueueManager(max_queue_size=8, num_workers=1, logger=logger) + qm.enqueue_task(slow_task(), 2) + qm.enqueue_task(slow_task(), 2) + unique_id_last, _ = qm.enqueue_task(slow_task()) + + assert qm.get_task_state(unique_id=unique_id_last) == TaskState.QUEUED + + def test_task_get_state_in_progress(self, progress_task): + """Test the QueueTask get state is in progress.""" + qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) + unique_id_one, _ = qm.enqueue_task(progress_task()) + while not qm.task_progress: + time.sleep(0.1) + + assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS + + def test_task_get_state_in_not_found(self): + """Test the QueueTask get state not found.""" + qm = QueueManager(max_queue_size=8, num_workers=2, logger=logger) + assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND + + +class TestQueueManagerExit: + """Test the stopping and aborting.""" + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_exit_abort(self, abort_task, slow_task): + """Test aborting exit.""" + results = [] + + def catch_updates(name, result): + if name == "longRunningCommandResult": + tr = TaskResult.from_task_result(result) + results.append( + ( + tr.unique_id, + tr.result_code, + ) + ) + + cm = QueueWorkerComponentManager( + op_state_model=None, + logger=logger, + max_queue_size=10, + num_workers=2, + push_change_event=catch_updates, + child_devices=[], + ) + + cm.enqueue(abort_task(), 0.1) + + # Wait for the command to start + while not cm.task_status: + time.sleep(0.1) + # Start aborting + cm._queue_manager.abort_tasks() + + # Wait for the exit + while not cm.task_result: + time.sleep(0.1) + # aborting state should be cleaned up since the queue is empty and + # nothing is in progress + while cm._queue_manager.is_aborting: + time.sleep(0.1) + + # When aborting this should be rejected + # Fill up the workers + cm.enqueue(slow_task()) + cm.enqueue(slow_task()) + + assert not cm._queue_manager.is_aborting + # Abort tasks + cm._queue_manager.abort_tasks() + + assert cm._queue_manager.is_aborting + + # Load up some tasks that should be aborted + cm.enqueue(slow_task()) + cm.enqueue(slow_task()) + unique_id, _ = cm.enqueue(slow_task()) + + while True: + if (unique_id, ResultCode.ABORTED) in results: + break + time.sleep(0.1) + + # Resume the commands + cm._queue_manager.resume_tasks() + assert not cm._queue_manager.is_aborting + + # Wait for my slow command to finish + unique_id, _ = cm.enqueue(slow_task()) + + while True: + if (unique_id, ResultCode.OK) in results: + break + time.sleep(0.1) + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_exit_stop(self, stop_task): + """Test stopping exit.""" + cm = QueueWorkerComponentManager( + op_state_model=None, + logger=logger, + max_queue_size=5, + num_workers=2, + push_change_event=None, + child_devices=[], + ) + cm.enqueue(stop_task()) + + # Wait for the command to start + while not cm.task_status: + time.sleep(0.1) + + # Stop all threads + cm._queue_manager.stop_tasks() + # Wait for the exit + while not cm.task_result: + time.sleep(0.5) + + # Wait for all the workers to stop + while any([worker.is_alive() for worker in cm._queue_manager._threads]): + time.sleep(0.1) + + @pytest.mark.forked + @pytest.mark.timeout(5) + def test_delete_queue(self, slow_task, stop_task, abort_task): + """Test deleting the queue.""" + cm = QueueWorkerComponentManager( + op_state_model=None, + logger=logger, + max_queue_size=8, + num_workers=2, + push_change_event=None, + child_devices=[], + ) + cm.enqueue(slow_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + cm.enqueue(stop_task()) + cm.enqueue(abort_task()) + + del cm._queue_manager + del cm + + +@pytest.mark.forked +class TestComponentManager: + """Tests for the component manager.""" + + def test_init(self): + """Test that we can init the component manager.""" + cm = QueueWorkerComponentManager( + op_state_model=None, + logger=logger, + max_queue_size=0, + num_workers=1, + push_change_event=None, + child_devices=[], + ) + assert cm.task_ids_in_queue == () + + +@pytest.mark.forked +class TestStress: + """Stress test the queue manager.""" + + @pytest.mark.timeout(30) + def test_stress(self, slow_task): + """Stress test the queue manager.""" + qm = QueueManager(max_queue_size=100, num_workers=50, logger=logger) + assert len(qm._threads) == 50 + for worker in qm._threads: + assert worker.is_alive() + for _ in range(500): + qm.enqueue_task(slow_task()) + + assert qm._work_queue.qsize() > 90 + + # Wait for the queue to drain + while qm._work_queue.qsize(): + time.sleep(0.1) + del qm + + +class TestNotAllowed: + """Tests for `is_allowed`.""" + + @pytest.mark.timeout(5) + def test_not_allowed(self, not_allowed_task): + """Check is_allowed.""" + results = [] + + def catch_updates(name, result): + if name == "longRunningCommandResult": + tr = TaskResult.from_task_result(result) + results.append(tr.result_code) + + qm = QueueManager( + max_queue_size=2, + num_workers=2, + logger=logger, + push_change_event=catch_updates, + ) + qm.enqueue_task(not_allowed_task()) + + while ResultCode.NOT_ALLOWED not in results: + time.sleep(0.5) + + @pytest.mark.timeout(5) + def test_not_allowed_exc(self, not_allowed_exc_task): + """Check is_allowed error.""" + results = [] + + def catch_updates(name, result): + if name == "longRunningCommandResult": + tr = TaskResult.from_task_result(result) + results.append( + ( + tr.result_code, + tr.task_result, + ) + ) + + qm = QueueManager( + max_queue_size=2, + num_workers=2, + logger=logger, + push_change_event=catch_updates, + ) + qm.enqueue_task(not_allowed_exc_task()) + + while not results: + time.sleep(0.5) + + assert ResultCode.FAILED == results[0][0] + assert "Error: Not allowed Traceback (most recent call last)" in results[0][1] diff --git a/tests/test_base_device.py b/tests/test_base_device.py index c16219c5d85c3f9b6da50d1b8ea3b0696b4cc931..9a328bd56997b38d6ed5daac7ade56f5d503f572 100644 --- a/tests/test_base_device.py +++ b/tests/test_base_device.py @@ -455,8 +455,12 @@ class TestSKABaseDevice(object): # PROTECTED REGION ID(SKABaseDevice.test_Reset) ENABLED START # # The main test of this command is # TestSKABaseDevice_commands::test_ResetCommand - with pytest.raises(DevFailed): - device_under_test.Reset() + device_under_test.Reset() + assert f"{ResultCode.FAILED}" == device_under_test.longRunningCommandResult[1] + assert ( + "Action reset_invoked is not allowed in op_state OFF" + in device_under_test.longRunningCommandResult[2] + ) # PROTECTED REGION END # // SKABaseDevice.test_Reset def test_On(self, device_under_test, tango_change_event_helper): diff --git a/tests/test_task_queue_component_manager.py b/tests/test_task_queue_component_manager.py deleted file mode 100644 index 4b343afc565c0f46010469133f27e890afec597c..0000000000000000000000000000000000000000 --- a/tests/test_task_queue_component_manager.py +++ /dev/null @@ -1,487 +0,0 @@ -"""Tests for QueueManager and its component manager.""" -import logging -import time -import pytest -from unittest.mock import MagicMock, patch - -from ska_tango_base.commands import ResultCode -from ska_tango_base.base.task_queue_component_manager import ( - QueueManager, - TaskResult, - TaskQueueComponentManager, - QueueTask, - TaskState, -) - -logger = logging.getLogger(__name__) - - -def check_matching_pattern(list_to_check=()): - """Check that lengths go 1,2,3,2,1 for example.""" - list_to_check = list(list_to_check) - if not list_to_check[-1]: - list_to_check.pop() - assert len(list_to_check) > 2 - while len(list_to_check) > 2: - last_e = list_to_check.pop() - first_e = list_to_check.pop(0) - assert len(last_e) == len(first_e) - - -@pytest.fixture -def progress_task(): - """Fixture for a test that throws an exception.""" - - def get_task(): - class ProgressTask(QueueTask): - def do(self): - for i in range(100): - self.update_progress(str(i)) - time.sleep(0.5) - - return ProgressTask() - - return get_task - - -@pytest.fixture -def exc_task(): - """Fixture for a test that throws an exception.""" - - def get_task(): - class ExcTask(QueueTask): - def do(self): - raise Exception("An error occurred") - - return ExcTask() - - return get_task - - -@pytest.fixture -def slow_task(): - """Fixture for a test that takes long.""" - - def get_task(): - class SlowTask(QueueTask): - def do(self): - time.sleep(2) - - return SlowTask() - - return get_task - - -@pytest.fixture -def simple_task(): - """Fixture for a very simple task.""" - - def get_task(): - class SimpleTask(QueueTask): - def do(self): - num_one = self.args[0] - num_two = self.kwargs.get("num_two") - return num_one + num_two - - return SimpleTask(2, num_two=3) - - return get_task - - -@pytest.fixture -def abort_task(): - """Fixture for a task that aborts.""" - - def get_task(): - class AbortTask(QueueTask): - def do(self): - sleep_time = self.args[0] - while not self.is_aborting_event.is_set(): - time.sleep(sleep_time) - - return AbortTask(0.2) - - return get_task - - -@pytest.fixture -def stop_task(): - """Fixture for a task that stops.""" - - def get_task(): - class StopTask(QueueTask): - def do(self): - assert not self.is_stopping_event.is_set() - while not self.is_stopping_event.is_set(): - pass - - return StopTask() - - return get_task - - -class TestQueueTask: - """Test QueueTask.""" - - def test_simple(self, simple_task): - """Test simple task.""" - assert simple_task().do() == 5 - - def test_exception(self, exc_task): - """Test that exception is thrown.""" - with pytest.raises(Exception): - exc_task().do() - - -class TestQueueManager: - """General QueueManager checks.""" - - def test_threads_start(self): - """Test that threads start up. Set stop and exit.""" - qm = QueueManager(logger, max_queue_size=2, num_workers=2) - assert len(qm._threads) == 2 - for worker in qm._threads: - assert worker.is_alive() - assert worker.daemon - - for worker in qm._threads: - worker.is_stopping.set() - - -class TestQueueManagerTasks: - """QueueManager checks for tasks executed.""" - - @pytest.mark.timeout(5) - def test_task_ids(self, simple_task): - """Check ids.""" - qm = QueueManager(logger, max_queue_size=5, num_workers=2) - unique_id_one = qm.enqueue_task(simple_task()) - unique_id_two = qm.enqueue_task(simple_task()) - assert unique_id_one.endswith("SimpleTask") - assert unique_id_one != unique_id_two - - @pytest.mark.timeout(5) - def test_task_is_executed(self, simple_task): - """Check that tasks are executed.""" - with patch.object(QueueManager, "result_callback") as my_cb: - qm = QueueManager(logger, max_queue_size=5, num_workers=2) - unique_id_one = qm.enqueue_task(simple_task()) - unique_id_two = qm.enqueue_task(simple_task()) - - while my_cb.call_count != 2: - time.sleep(0.5) - result_one = my_cb.call_args_list[0][0][0] - result_two = my_cb.call_args_list[1][0][0] - - assert result_one.unique_id.endswith("SimpleTask") - assert result_one.unique_id == unique_id_one - assert result_two.unique_id.endswith("SimpleTask") - assert result_two.unique_id == unique_id_two - - assert result_one.result_code == ResultCode.OK - assert result_two.result_code == ResultCode.OK - - assert result_one.task_result == "5" - assert result_two.task_result == "5" - - @pytest.mark.timeout(5) - def test_task_result(self, simple_task, exc_task): - """Check task results are what's expected.""" - qm = QueueManager(logger, max_queue_size=5, num_workers=2) - add_task_one = simple_task() - exc_task = exc_task() - - qm.enqueue_task(add_task_one) - while not qm.task_result: - time.sleep(0.5) - task_result = TaskResult.from_task_result(qm.task_result) - assert task_result.unique_id.endswith("SimpleTask") - assert task_result.result_code == ResultCode.OK - assert task_result.task_result == "5" - - qm.enqueue_task(exc_task) - while qm.task_result[0].endswith("SimpleTask"): - time.sleep(0.5) - task_result = TaskResult.from_task_result(qm.task_result) - assert task_result.unique_id.endswith("ExcTask") - assert task_result.result_code == ResultCode.FAILED - assert task_result.task_result.startswith( - "Error: An error occurred Traceback (" - ) - - @pytest.mark.timeout(10) - def test_full_queue(self, slow_task): - """Check full queues rejects new commands.""" - with patch.object(QueueManager, "result_callback") as my_cb: - qm = QueueManager(logger, max_queue_size=1, num_workers=1) - for i in range(10): - qm.enqueue_task(slow_task()) - - while len(my_cb.call_args_list) != 10: - time.sleep(0.5) - - results = [i[0][0].result_code for i in my_cb.call_args_list] - # 9/10 should be rejected since the first is busy and the queue length is 1 - assert results[-1] == ResultCode.OK - for res in results[:-1]: - assert res == ResultCode.REJECTED - - with patch.object(QueueManager, "result_callback") as my_cb: - qm = QueueManager(logger, max_queue_size=2, num_workers=2) - for i in range(10): - qm.enqueue_task(slow_task()) - - while len(my_cb.call_args_list) != 10: - time.sleep(0.5) - results = [i[0][0].result_code for i in my_cb.call_args_list] - # 8/10 should be rejected since two are taken to be processed. - assert results[-1] == ResultCode.OK - assert results[-2] == ResultCode.OK - for res in results[:-2]: - assert res == ResultCode.REJECTED - - @pytest.mark.timeout(5) - def test_zero_queue(self, simple_task): - """Check task_result is the same between queue and non queue.""" - expected_name = "SimpleTask" - expected_result_code = ResultCode.OK - expected_result = "5" - - # No Queue - qm = QueueManager(logger, max_queue_size=0, num_workers=1) - assert len(qm._threads) == 0 - res = qm.enqueue_task(simple_task()) - assert res.endswith(expected_name) - assert qm.task_result[0].endswith(expected_name) - assert int(qm.task_result[1]) == expected_result_code - assert qm.task_result[2] == expected_result - - # Queue - qm = QueueManager(logger, max_queue_size=2, num_workers=1) - res = qm.enqueue_task(simple_task()) - assert res.endswith(expected_name) - - # Wait for the task to be picked up - while not qm.task_result: - time.sleep(0.5) - assert qm.task_result[0].endswith(expected_name) - assert int(qm.task_result[1]) == expected_result_code - assert qm.task_result[2] == expected_result - - @pytest.mark.timeout(5) - def test_multi_jobs(self, slow_task): - """Test that multiple threads are working. Test that attribute updates fires.""" - num_of_workers = 3 - - call_back_func = MagicMock() - qm = QueueManager( - logger, - max_queue_size=5, - num_workers=num_of_workers, - on_property_update_callback=call_back_func, - ) - unique_ids = [] - for _ in range(4): - unique_id = qm.enqueue_task(slow_task()) - unique_ids.append(unique_id) - - # Wait for a item on the queue - while not qm.task_ids_in_queue: - pass - - while not qm.task_result: - pass - - # Wait for last task to finish - while unique_ids[-1] != TaskResult.from_task_result(qm.task_result).unique_id: - pass - - all_passed_params = [a_call[0] for a_call in call_back_func.call_args_list] - tasks_in_queue = [ - a_call[1] for a_call in all_passed_params if a_call[0] == "tasks_in_queue" - ] - task_ids_in_queue = [ - a_call[1] - for a_call in all_passed_params - if a_call[0] == "task_ids_in_queue" - ] - task_status = [ - a_call[1] for a_call in all_passed_params if a_call[0] == "task_status" - ] - task_result = [ - a_call[1] for a_call in all_passed_params if a_call[0] == "task_result" - ] - task_result_ids = [res[0] for res in task_result] - - check_matching_pattern(tuple(tasks_in_queue)) - check_matching_pattern(tuple(task_ids_in_queue)) - - # Since there's 3 workers there should at least once be 3 in progress - for status in task_status: - if len(status) == num_of_workers: - break - else: - assert 0, f"Length of {num_of_workers} in task_status not found" - assert len(task_result) == 4 - for unique_id in unique_ids: - assert unique_id in task_result_ids - - def test_task_progress(self, progress_task): - """Test the progress updates.""" - qm = QueueManager(logger, max_queue_size=8, num_workers=2) - unique_id_one = qm.enqueue_task(progress_task()) - unique_id_two = qm.enqueue_task(progress_task()) - - time.sleep(0.5) - assert unique_id_one in qm.task_progress - assert unique_id_two in qm.task_progress - progress_one_before = qm.task_progress[unique_id_one] - progress_two_before = qm.task_progress[unique_id_two] - time.sleep(1.0) - progress_one_after = qm.task_progress[unique_id_one] - progress_two_after = qm.task_progress[unique_id_two] - - assert int(progress_one_after) > int(progress_one_before) - assert int(progress_two_after) > int(progress_two_before) - - def test_task_get_state_completed(self, simple_task): - """Test the QueueTask get state is completed.""" - qm = QueueManager(logger, max_queue_size=8, num_workers=2) - unique_id_one = qm.enqueue_task(simple_task()) - while not qm.task_result: - pass - assert qm.get_task_state(unique_id=unique_id_one) == TaskState.COMPLETED - - def test_task_get_state_in_queued(self, slow_task): - """Test the QueueTask get state is queued.""" - qm = QueueManager(logger, max_queue_size=8, num_workers=1) - qm.enqueue_task(slow_task()) - qm.enqueue_task(slow_task()) - unique_id_last = qm.enqueue_task(slow_task()) - - assert qm.get_task_state(unique_id=unique_id_last) == TaskState.QUEUED - - def test_task_get_state_in_progress(self, progress_task): - """Test the QueueTask get state is in progress.""" - qm = QueueManager(logger, max_queue_size=8, num_workers=2) - unique_id_one = qm.enqueue_task(progress_task()) - while not qm.task_progress: - pass - - assert qm.get_task_state(unique_id=unique_id_one) == TaskState.IN_PROGRESS - - def test_task_get_state_in_not_found(self): - """Test the QueueTask get state not found.""" - qm = QueueManager(logger, max_queue_size=8, num_workers=2) - assert qm.get_task_state(unique_id="non_existing_id") == TaskState.NOT_FOUND - - -class TestQueueManagerExit: - """Test the stopping and aborting.""" - - @pytest.mark.timeout(5) - def test_exit_abort(self, abort_task, slow_task): - """Test aborting exit.""" - call_back_func = MagicMock() - qm = QueueManager( - logger, - max_queue_size=5, - num_workers=2, - on_property_update_callback=call_back_func, - ) - cm = TaskQueueComponentManager( - message_queue=qm, op_state_model=None, logger=None - ) - cm.enqueue(abort_task()) - - # Wait for the command to start - while not qm.task_status: - pass - # Start aborting - cm.message_queue.abort_tasks() - # Wait for the exit - while not qm.task_result: - pass - assert qm.is_aborting - # When aborting this should be rejected - unique_id = cm.enqueue(slow_task()) - while True: - tr = TaskResult.from_task_result(qm.task_result) - if tr.unique_id == unique_id and tr.result_code == ResultCode.ABORTED: - break - - # Resume the commands - qm.resume_tasks() - assert not qm.is_aborting - - # Wait for my slow command to finish - unique_id = cm.enqueue(slow_task()) - while True: - tr = TaskResult.from_task_result(qm.task_result) - if tr.unique_id == unique_id: - break - - @pytest.mark.timeout(10) - def test_exit_stop(self, stop_task): - """Test stopping exit.""" - call_back_func = MagicMock() - qm = QueueManager( - logger, - max_queue_size=5, - num_workers=2, - on_property_update_callback=call_back_func, - ) - cm = TaskQueueComponentManager( - message_queue=qm, op_state_model=None, logger=None - ) - cm.enqueue(stop_task()) - - # Wait for the command to start - while not qm.task_status: - pass - # Stop all threads - cm.message_queue.stop_tasks() - # Wait for the exit - while not qm.task_result: - pass - # Wait for all the workers to stop - while not any([worker.is_alive() for worker in qm._threads]): - pass - - @pytest.mark.timeout(5) - def test_delete_queue(self, slow_task, stop_task, abort_task): - """Test deleting the queue.""" - call_back_func = MagicMock() - qm = QueueManager( - logger, - max_queue_size=8, - num_workers=2, - on_property_update_callback=call_back_func, - ) - cm = TaskQueueComponentManager( - message_queue=qm, op_state_model=None, logger=None - ) - cm.enqueue(slow_task()) - cm.enqueue(stop_task()) - cm.enqueue(abort_task()) - cm.enqueue(stop_task()) - cm.enqueue(abort_task()) - cm.enqueue(stop_task()) - cm.enqueue(abort_task()) - cm.enqueue(stop_task()) - cm.enqueue(abort_task()) - - del cm.message_queue - del cm - - -class TestComponentManager: - """Tests for the component manager.""" - - def test_init(self): - """Test that we can init the component manager.""" - qm = QueueManager(logger, max_queue_size=0, num_workers=1) - cm = TaskQueueComponentManager( - message_queue=qm, op_state_model=None, logger=logger - ) - assert cm.message_queue.task_ids_in_queue == []