Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
L
lmc-base-classes
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
This is an archived project. Repository and other project resources are read-only.
Show more breadcrumbs
LOFAR2.0
lmc-base-classes
Commits
57e83a95
Unverified
Commit
57e83a95
authored
Sep 27, 2021
by
SKAJohanVenter
Browse files
Options
Downloads
Patches
Plain Diff
SAR-275
Updated from review comments
parent
c055956e
No related branches found
No related tags found
No related merge requests found
Changes
2
Show whitespace changes
Inline
Side-by-side
Showing
2 changed files
src/ska_tango_base/base/task_queue_component_manager.py
+23
-25
23 additions, 25 deletions
src/ska_tango_base/base/task_queue_component_manager.py
tests/test_task_queue_component_manager.py
+1
-1
1 addition, 1 deletion
tests/test_task_queue_component_manager.py
with
24 additions
and
26 deletions
src/ska_tango_base/base/task_queue_component_manager.py
+
23
−
25
View file @
57e83a95
...
...
@@ -14,7 +14,7 @@ TaskResult
**********
This is a simple convenience class to parse or format the task result. The result of a task will
be made available as a Tango device attribute named `command_result`. It will be a
list
of 3 strings.
be made available as a Tango device attribute named `command_result`. It will be a
tuple
of 3 strings.
1. The unique ID
Every command/task put on the queue for execution by background threads will have a unique ID.
...
...
@@ -30,7 +30,7 @@ be made available as a Tango device attribute named `command_result`. It will be
tr
TaskResult(result_code=<ResultCode.OK: 0>, task_result=
'
The task result
'
, unique_id=
'
UniqueID
'
)
tr.to_task_result()
[
'
UniqueID
'
,
'
0
'
,
'
The task result
'
]
(
'
UniqueID
'
,
'
0
'
,
'
The task result
'
)
*********
QueueTask
...
...
@@ -169,7 +169,7 @@ import time
import
traceback
from
queue
import
Empty
,
Queue
from
threading
import
Event
from
typing
import
Any
,
Callable
,
Dict
,
List
,
Optional
from
typing
import
Any
,
Callable
,
Dict
,
Optional
,
Tuple
from
attr
import
dataclass
import
tango
...
...
@@ -220,13 +220,13 @@ class TaskResult:
task_result
:
str
unique_id
:
str
def
to_task_result
(
self
)
->
List
[
str
]:
def
to_task_result
(
self
)
->
Tuple
[
str
]:
"""
Convert TaskResult to task_result.
:return: The task result
:rtype: list[str]
"""
return
[
f
"
{
self
.
unique_id
}
"
,
f
"
{
int
(
self
.
result_code
)
}
"
,
f
"
{
self
.
task_result
}
"
]
return
(
f
"
{
self
.
unique_id
}
"
,
f
"
{
int
(
self
.
result_code
)
}
"
,
f
"
{
self
.
task_result
}
"
)
@classmethod
def
from_task_result
(
cls
,
task_result
:
list
)
->
TaskResult
:
...
...
@@ -381,11 +381,7 @@ class QueueManager:
task
.
kwargs
[
"
update_progress_callback
"
]
=
self
.
_update_progress_callback
task_result
=
self
.
execute_task
(
task
)
result
=
TaskResult
(
task_result
[
0
],
f
"
{
task_result
[
1
]
}
"
,
unique_id
)
result
=
self
.
execute_task
(
task
,
unique_id
)
self
.
_result_callback
(
result
)
self
.
_work_queue
.
task_done
()
except
Empty
:
...
...
@@ -401,20 +397,23 @@ class QueueManager:
self
.
current_task_progress
=
progress
@classmethod
def
execute_task
(
cls
,
task
:
QueueTask
)
:
def
execute_task
(
cls
,
task
:
QueueTask
,
unique_id
:
str
)
->
TaskResult
:
"""
Execute a task, return results in a standardised format.
:param task: Task to execute
:type task: QueueTask
:return: (ResultCode, result)
:rtype: tuple
:param unique_id: The task unique ID
:type unique_id: str
:return: The result of the task
:rtype: TaskResult
"""
try
:
result
=
(
ResultCode
.
OK
,
task
.
do
())
result
=
TaskResult
(
ResultCode
.
OK
,
f
"
{
task
.
do
()
}
"
,
unique_id
)
except
Exception
as
err
:
result
=
(
result
=
TaskResult
(
ResultCode
.
FAILED
,
f
"
Error:
{
err
}
{
traceback
.
format_exc
()
}
"
,
unique_id
,
)
return
result
...
...
@@ -448,7 +447,7 @@ class QueueManager:
self
.
is_stopping
=
threading
.
Event
()
self
.
_property_update_lock
=
threading
.
Lock
()
self
.
_task_result
=
[]
self
.
_task_result
=
()
self
.
_tasks_in_queue
:
Dict
[
str
,
str
]
=
{}
# unique_id, task_name
self
.
_task_status
:
Dict
[
str
,
str
]
=
{}
# unique_id, status
self
.
_threads
=
[]
...
...
@@ -480,13 +479,13 @@ class QueueManager:
return
self
.
_work_queue
.
full
()
@property
def
task_result
(
self
)
->
list
:
def
task_result
(
self
)
->
Tuple
[
str
]
:
"""
Return the last task result.
:return: Last task result
:rtype:
list
:rtype:
Tuple
"""
return
self
.
_task_result
.
copy
()
return
self
.
_task_result
@property
def
task_ids_in_queue
(
self
)
->
list
:
...
...
@@ -541,8 +540,7 @@ class QueueManager:
# If there is no queue, just execute the command and return
if
self
.
_max_queue_size
==
0
:
self
.
update_task_state_callback
(
unique_id
,
"
IN_PROGRESS
"
)
task_result
=
self
.
Worker
.
execute_task
(
task
)
result
=
TaskResult
(
task_result
[
0
],
f
"
{
task_result
[
1
]
}
"
,
unique_id
)
result
=
self
.
Worker
.
execute_task
(
task
,
unique_id
)
self
.
result_callback
(
result
)
return
unique_id
...
...
@@ -615,12 +613,12 @@ class QueueManager:
self
.
is_stopping
.
set
()
@property
def
is_aborting
(
self
):
def
is_aborting
(
self
)
->
bool
:
"""
Return False if any of the threads are aborting.
"""
return
all
([
worker
.
is_aborting
.
is_set
()
for
worker
in
self
.
_threads
])
@classmethod
def
get_unique_id
(
cls
,
task_name
):
def
get_unique_id
(
cls
,
task_name
)
->
str
:
"""
Generate a unique ID for the task.
:param task_name: The name of the task
...
...
@@ -638,8 +636,8 @@ class QueueManager:
:return: State of the QueueTask
:rtype: TaskState
"""
if
self
.
task_result
:
_task_result
=
TaskResult
.
from_task_result
(
self
.
task_result
)
if
self
.
_
task_result
:
_task_result
=
TaskResult
.
from_task_result
(
self
.
_
task_result
)
if
unique_id
==
_task_result
.
unique_id
:
return
TaskState
.
COMPLETED
...
...
This diff is collapsed.
Click to expand it.
tests/test_task_queue_component_manager.py
+
1
−
1
View file @
57e83a95
...
...
@@ -337,7 +337,7 @@ class TestQueueManagerTasks:
assert
unique_id_two
in
qm
.
task_progress
progress_one_before
=
qm
.
task_progress
[
unique_id_one
]
progress_two_before
=
qm
.
task_progress
[
unique_id_two
]
time
.
sleep
(
0.5
)
time
.
sleep
(
1.0
)
progress_one_after
=
qm
.
task_progress
[
unique_id_one
]
progress_two_after
=
qm
.
task_progress
[
unique_id_two
]
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment