Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
LOFAR
Manage
Activity
Members
Labels
Plan
Issues
Wiki
Jira issues
Open Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
RadioObservatory
LOFAR
Commits
7d0a987b
Commit
7d0a987b
authored
4 years ago
by
Fabio Vitello
Browse files
Options
Downloads
Patches
Plain Diff
TMSS-506
: Remove sleep from test, using now synchronization with threading events
parent
2787063b
No related branches found
No related tags found
1 merge request
!302
Resolve TMSS-506
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
SAS/TMSS/services/workflow_service/lib/workflow_service.py
+2
-2
2 additions, 2 deletions
SAS/TMSS/services/workflow_service/lib/workflow_service.py
SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py
+100
-56
100 additions, 56 deletions
SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py
with
102 additions
and
58 deletions
SAS/TMSS/services/workflow_service/lib/workflow_service.py
+
2
−
2
View file @
7d0a987b
...
@@ -54,8 +54,8 @@ class SchedulingUnitEventMessageHandler(TMSSEventMessageHandler):
...
@@ -54,8 +54,8 @@ class SchedulingUnitEventMessageHandler(TMSSEventMessageHandler):
logger
.
error
(
e
)
logger
.
error
(
e
)
def
create_workflow_service
(
exchange
:
str
=
DEFAULT_BUSNAME
,
broker
:
str
=
DEFAULT_BROKER
):
def
create_workflow_service
(
handler_type
=
SchedulingUnitEventMessageHandler
,
exchange
:
str
=
DEFAULT_BUSNAME
,
broker
:
str
=
DEFAULT_BROKER
):
return
TMSSBusListener
(
handler_type
=
SchedulingUnitEventMessageH
andler
,
return
TMSSBusListener
(
handler_type
=
h
andler
_type
,
handler_kwargs
=
{},
handler_kwargs
=
{},
exchange
=
exchange
,
broker
=
broker
)
exchange
=
exchange
,
broker
=
broker
)
...
...
This diff is collapsed.
Click to expand it.
SAS/TMSS/src/tmss/workflowapp/tests/t_workflow_qaworkflow.py
+
100
−
56
View file @
7d0a987b
...
@@ -10,12 +10,17 @@ from lofar.common.test_utils import skip_integration_tests
...
@@ -10,12 +10,17 @@ from lofar.common.test_utils import skip_integration_tests
if
skip_integration_tests
():
if
skip_integration_tests
():
exit
(
3
)
exit
(
3
)
from
lofar.messaging.messagebus
import
TemporaryExchange
from
lofar.messaging.messagebus
import
TemporaryExchange
,
BusListenerJanitor
import
uuid
from
lofar.sas.tmss.services.workflow_service
import
create_workflow_service
,
SchedulingUnitEventMessageHandler
import
uuid
from
threading
import
Thread
,
Event
from
lofar.sas.tmss.client.tmssbuslistener
import
*
class
SchedulingUnitFlowTest
(
unittest
.
TestCase
):
class
SchedulingUnitFlowTest
(
unittest
.
TestCase
):
@classmethod
@classmethod
def
setUpClass
(
cls
)
->
None
:
def
setUpClass
(
cls
)
->
None
:
cls
.
TEST_UUID
=
uuid
.
uuid1
()
cls
.
TEST_UUID
=
uuid
.
uuid1
()
...
@@ -47,7 +52,6 @@ class SchedulingUnitFlowTest(unittest.TestCase):
...
@@ -47,7 +52,6 @@ class SchedulingUnitFlowTest(unittest.TestCase):
cls
.
ra_test_env
.
stop
()
cls
.
ra_test_env
.
stop
()
cls
.
tmp_exchange
.
close
()
cls
.
tmp_exchange
.
close
()
def
test_qa_workflow
(
self
):
def
test_qa_workflow
(
self
):
from
lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow
import
SchedulingUnitFlow
from
lofar.sas.tmss.tmss.workflowapp.flows.schedulingunitflow
import
SchedulingUnitFlow
...
@@ -58,67 +62,107 @@ class SchedulingUnitFlowTest(unittest.TestCase):
...
@@ -58,67 +62,107 @@ class SchedulingUnitFlowTest(unittest.TestCase):
from
lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow
import
SchedulingUnitProcess
from
lofar.sas.tmss.tmss.workflowapp.models.schedulingunitflow
import
SchedulingUnitProcess
from
viewflow.models
import
Task
from
viewflow.models
import
Task
import
time
import
time
sync_event_bp_status_changed
=
Event
()
sync_event_bp_cannot_proceed
=
Event
()
class
TestSchedulingUnitEventMessageHandler
(
SchedulingUnitEventMessageHandler
):
def
onSchedulingUnitBlueprintStatusChanged
(
self
,
id
:
int
,
status
:
str
):
sync_event_bp_status_changed
.
set
()
#check if one QA Workflow is created after scheduling unit blueprint creation
def
onSchedulingUnitBlueprintCannotProceed
(
self
,
id
:
int
):
self
.
assertEqual
(
0
,
len
(
SchedulingUnitProcess
.
objects
.
all
()))
sync_event_bp_cannot_proceed
.
set
()
strategy_template
=
models
.
SchedulingUnitObservingStrategyTemplate
.
objects
.
get
(
name
=
"
UC1 CTC+pipelines
"
)
scheduling_unit_draft
=
models
.
SchedulingUnitDraft
.
objects
.
create
(
name
=
"
Test Scheduling Unit UC1
"
,
requirements_doc
=
strategy_template
.
template
,
requirements_template
=
strategy_template
.
scheduling_unit_template
,
observation_strategy_template
=
strategy_template
,
copy_reason
=
models
.
CopyReason
.
objects
.
get
(
value
=
'
template
'
),
generator_instance_doc
=
"
para
"
,
copies
=
None
,
scheduling_set
=
models
.
SchedulingSet
.
objects
.
create
(
**
SchedulingSet_test_data
()))
scheduling_unit_blueprint
=
create_task_blueprints_and_subtasks_from_scheduling_unit_draft
(
scheduling_unit_draft
)
scheduling_unit_draft
.
refresh_from_db
()
task_drafts
=
scheduling_unit_draft
.
task_drafts
.
all
()
task_blueprints
=
scheduling_unit_blueprint
.
task_blueprints
.
all
()
qa_workflow
=
SchedulingUnitProcess
.
objects
.
all
()
service
=
create_workflow_service
(
handler_type
=
TestSchedulingUnitEventMessageHandler
,
self
.
assertEqual
(
1
,
len
(
qa_workflow
))
exchange
=
self
.
tmp_exchange
.
address
)
with
BusListenerJanitor
(
service
):
#test that QA workflow have two tasks
self
.
assertEqual
(
2
,
len
(
Task
.
objects
.
all
()))
#get the actual number of Scheduling Unit Process in the db
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
1
).
flow_task
.
name
,
'
start
'
)
scheduling_unit_process
=
len
(
SchedulingUnitProcess
.
objects
.
all
())
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
1
).
status
,
'
DONE
'
)
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
2
).
flow_task
.
name
,
'
wait_scheduled
'
)
strategy_template
=
models
.
SchedulingUnitObservingStrategyTemplate
.
objects
.
get
(
name
=
"
UC1 CTC+pipelines
"
)
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
2
).
status
,
'
NEW
'
)
scheduling_unit_draft
=
models
.
SchedulingUnitDraft
.
objects
.
create
(
#Change subtask status to scheduled
name
=
"
Test Scheduling Unit UC1
"
,
for
task_blueprint
in
task_blueprints
:
requirements_doc
=
strategy_template
.
template
,
requirements_template
=
strategy_template
.
scheduling_unit_template
,
observation_strategy_template
=
strategy_template
,
copy_reason
=
models
.
CopyReason
.
objects
.
get
(
value
=
'
template
'
),
generator_instance_doc
=
"
para
"
,
copies
=
None
,
scheduling_set
=
models
.
SchedulingSet
.
objects
.
create
(
**
SchedulingSet_test_data
()))
scheduling_unit_blueprint
=
create_task_blueprints_and_subtasks_from_scheduling_unit_draft
(
scheduling_unit_draft
)
scheduling_unit_draft
.
refresh_from_db
()
task_drafts
=
scheduling_unit_draft
.
task_drafts
.
all
()
task_blueprints
=
scheduling_unit_blueprint
.
task_blueprints
.
all
()
#check if the scheduling unit Process are incremented after the creation of a new Scheduling Unit Blueprint
scheduling_unit_process
+=
1
self
.
assertEqual
(
scheduling_unit_process
,
len
(
SchedulingUnitProcess
.
objects
.
all
()))
process_id
=
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
id
#test that QA workflow have two tasks
self
.
assertEqual
(
2
,
len
(
Task
.
objects
.
filter
(
process
=
process_id
)))
#test that there is jut one active task
self
.
assertEqual
(
1
,
len
(
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
active_tasks
()))
#check the active task name
self
.
assertEqual
(
"
wait_scheduled
"
,
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
active_tasks
()[
0
].
flow_task
.
name
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
0
].
flow_task
.
name
,
'
start
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
0
].
status
,
'
DONE
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
1
].
flow_task
.
name
,
'
wait_scheduled
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
1
].
status
,
'
NEW
'
)
#Change subtask status to scheduled
for
task_blueprint
in
task_blueprints
:
for
subtask
in
task_blueprint
.
subtasks
.
all
():
for
subtask
in
task_blueprint
.
subtasks
.
all
():
subtask
.
state
=
models
.
SubtaskState
.
objects
.
get
(
value
=
'
scheduled
'
)
subtask
.
state
=
models
.
SubtaskState
.
objects
.
get
(
value
=
'
scheduled
'
)
subtask
.
save
()
subtask
.
save
()
subtask
.
refresh_from_db
()
subtask
.
refresh_from_db
()
time
.
sleep
(
5
)
if
not
sync_event_bp_status_changed
.
wait
(
timeout
=
10
):
#Check the QA Workflow is now with 3 Task
raise
TimeoutError
()
self
.
assertEqual
(
3
,
len
(
Task
.
objects
.
all
()))
sync_event_bp_status_changed
.
clear
()
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
2
).
flow_task
.
name
,
'
wait_scheduled
'
)
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
2
).
status
,
'
DONE
'
)
#Check the QA Workflow is now with 3 Task
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
3
).
flow_task
.
name
,
'
wait_processed
'
)
self
.
assertEqual
(
3
,
len
(
Task
.
objects
.
filter
(
process
=
process_id
)))
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
3
).
status
,
'
NEW
'
)
#test that there is jut one active task
self
.
assertEqual
(
1
,
len
(
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
active_tasks
()))
#Change subtask status to cancelled
#check the active task name
for
task_blueprint
in
task_blueprints
:
self
.
assertEqual
(
"
wait_processed
"
,
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
active_tasks
()[
0
].
flow_task
.
name
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
1
].
flow_task
.
name
,
'
wait_scheduled
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
1
].
status
,
'
DONE
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
2
].
flow_task
.
name
,
'
wait_processed
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
2
].
status
,
'
NEW
'
)
#Change subtask status to cancelled
for
task_blueprint
in
task_blueprints
:
for
subtask
in
task_blueprint
.
subtasks
.
all
():
for
subtask
in
task_blueprint
.
subtasks
.
all
():
subtask
.
state
=
models
.
SubtaskState
.
objects
.
get
(
value
=
'
cancelled
'
)
subtask
.
state
=
models
.
SubtaskState
.
objects
.
get
(
value
=
'
cancelled
'
)
subtask
.
save
()
subtask
.
save
()
subtask
.
refresh_from_db
()
subtask
.
refresh_from_db
()
time
.
sleep
(
5
)
if
not
sync_event_bp_cannot_proceed
.
wait
(
timeout
=
10
):
#Check the QA Workflow is now with 4 Task
raise
TimeoutError
()
self
.
assertEqual
(
4
,
len
(
Task
.
objects
.
all
()))
sync_event_bp_cannot_proceed
.
clear
()
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
3
).
flow_task
.
name
,
'
wait_processed
'
)
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
3
).
status
,
'
DONE
'
)
#Check the QA Workflow is now with 4 Task
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
4
).
flow_task
.
name
,
'
qa_reporting_to
'
)
self
.
assertEqual
(
4
,
len
(
Task
.
objects
.
filter
(
process
=
process_id
)))
self
.
assertEqual
(
Task
.
objects
.
get
(
id
=
4
).
status
,
'
NEW
'
)
#test that there is jut one active task
self
.
assertEqual
(
1
,
len
(
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
active_tasks
()))
#check the active task name
self
.
assertEqual
(
"
qa_reporting_to
"
,
SchedulingUnitProcess
.
objects
.
get
(
su
=
scheduling_unit_blueprint
.
id
).
active_tasks
()[
0
].
flow_task
.
name
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
2
].
flow_task
.
name
,
'
wait_processed
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
2
].
status
,
'
DONE
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
3
].
flow_task
.
name
,
'
qa_reporting_to
'
)
self
.
assertEqual
(
Task
.
objects
.
filter
(
process
=
process_id
).
order_by
(
'
id
'
)[
3
].
status
,
'
NEW
'
)
if
__name__
==
'
__main__
'
:
if
__name__
==
'
__main__
'
:
#run the unit tests
#run the unit tests
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment