Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
LOFAR
Manage
Activity
Members
Labels
Plan
Issues
Wiki
Jira issues
Open Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
GitLab community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
RadioObservatory
LOFAR
Commits
8f417e3d
Commit
8f417e3d
authored
8 years ago
by
Auke Klazema
Browse files
Options
Downloads
Patches
Plain Diff
Task #10106: Added second part of the unit tests
parent
cb12bbfb
No related branches found
No related tags found
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py
+528
-92
528 additions, 92 deletions
...rceAssignment/ResourceAssigner/test/t_resourceassigner.py
with
528 additions
and
92 deletions
SAS/ResourceAssignment/ResourceAssigner/test/t_resourceassigner.py
+
528
−
92
View file @
8f417e3d
...
...
@@ -3,11 +3,11 @@
import
unittest
import
mock
import
datetime
from
copy
import
deepcopy
from
lofar.sas.resourceassignment.resourceassigner.assignment
import
ResourceAssigner
from
lofar.parameterset
import
parameterset
ra_notification_prefix
=
"
ra_notification_prefix
"
class
TestingResourceAssigner
(
ResourceAssigner
):
def
__init__
(
self
,
rarpc
,
rerpc
,
otdbrpc
,
momrpc
,
curpc
,
ra_notification_bus
):
...
...
@@ -18,9 +18,17 @@ class TestingResourceAssigner(ResourceAssigner):
self
.
momrpc
=
momrpc
self
.
curpc
=
curpc
self
.
ra_notification_bus
=
ra_notification_bus
self
.
ra_notification_prefix
=
ra_notification_prefix
class
ResourceAssignerTest
(
unittest
.
TestCase
):
mom_id
=
351557
otdb_id
=
1290494
specification_id
=
2323
state
=
u
'
prescheduled
'
task_type
=
u
'
pipeline
'
specification_tree
=
{}
non_approved_or_prescheduled_status
=
u
'
opened
'
non_approved_or_prescheduled_otdb_id
=
1
...
...
@@ -67,21 +75,23 @@ class ResourceAssignerTest(unittest.TestCase):
task_mom_id
=
351543
task_otdb_id
=
1290472
task_id
=
2299
task_end_time
=
datetime
.
datetime
(
2016
,
3
,
25
,
22
,
47
,
31
)
task_start_time
=
datetime
.
datetime
(
2016
,
3
,
25
,
21
,
47
,
31
)
task
=
{
"
mom_id
"
:
task_mom_id
,
"
otdb_id
"
:
task_otdb_id
,
"
id
"
:
task_id
,
"
endtime
"
:
datetime
.
datetime
(
2016
,
3
,
25
,
22
,
47
,
31
)
,
"
endtime
"
:
task_end_time
,
"
name
"
:
"
IS HBA_DUAL
"
,
"
predecessor_ids
"
:
[],
"
project_mom_id
"
:
2
,
"
project_name
"
:
"
test-lofar
"
,
"
specification_id
"
:
2323
,
"
starttime
"
:
datetime
.
datetime
(
2016
,
3
,
25
,
21
,
47
,
31
)
,
"
specification_id
"
:
specification_id
,
"
starttime
"
:
task_start_time
,
"
status
"
:
"
prescheduled
"
,
"
status_id
"
:
350
,
"
successor_ids
"
:
[],
"
type
"
:
"
observation
"
,
"
type
"
:
"
pipeline
"
,
"
type_id
"
:
0
}
...
...
@@ -102,7 +112,7 @@ class ResourceAssignerTest(unittest.TestCase):
"
status
"
:
"
prescheduled
"
,
"
status_id
"
:
350
,
"
successor_ids
"
:
[],
"
type
"
:
"
observation
"
,
"
type
"
:
"
pipeline
"
,
"
type_id
"
:
0
}
...
...
@@ -123,23 +133,143 @@ class ResourceAssignerTest(unittest.TestCase):
"
status
"
:
"
prescheduled
"
,
"
status_id
"
:
350
,
"
successor_ids
"
:
[],
"
type
"
:
"
observation
"
,
"
type
"
:
"
pipeline
"
,
"
type_id
"
:
0
}
mom_id
=
351557
otdb_id
=
1290494
state
=
u
'
prescheduled
'
task_type
=
u
'
pipeline
'
resources_with_errors_otdb_id
=
1290496
resource_error1
=
"
error 1
"
resource_error2
=
"
error 2
"
unknown_resource_type_name
=
"
fuel
"
unknown_resource_type_otdb_id
=
123489
resource_unknown_property
=
'
unknown_property
'
rerpc_status
=
0
rerpc_needed_claim_for_bandwidth_size
=
19021319494
rerpc_needed_claim_for_bandwidth
=
{
'
total_size
'
:
rerpc_needed_claim_for_bandwidth_size
}
specification_tree
=
{
u
'
otdb_id
'
:
otdb_id
,
u
'
task_type
'
:
task_type
,
u
'
state
'
:
state
,
rerpc_needed_claim_for_storage_output_files
=
{
'
uv
'
:
{
'
nr_of_uv_files
'
:
481
,
'
uv_file_size
'
:
1482951104
},
'
saps
'
:
[
{
'
sap_nr
'
:
0
,
'
properties
'
:
{
'
nr_of_uv_files
'
:
319
}
},
{
'
sap_nr
'
:
1
,
'
properties
'
:
{
'
nr_of_uv_files
'
:
81
,
resource_unknown_property
:
-
1
}
},
{
'
sap_nr
'
:
2
,
'
properties
'
:
{
'
nr_of_uv_files
'
:
81
}
}
]
}
rerpc_needed_claim_for_storage_size
=
713299481024
rerpc_needed_claim_for_storage
=
{
'
total_size
'
:
rerpc_needed_claim_for_storage_size
,
'
output_files
'
:
rerpc_needed_claim_for_storage_output_files
}
rerpc_replymessage
=
{
str
(
otdb_id
):
{
'
pipeline
'
:
{
'
bandwidth
'
:
rerpc_needed_claim_for_bandwidth
,
'
storage
'
:
rerpc_needed_claim_for_storage
}
},
str
(
resources_with_errors_otdb_id
):
{
'
pipeline
'
:
{
'
bandwidth
'
:
{
'
total_size
'
:
19021319494
},
'
storage
'
:
{
'
total_size
'
:
713299481024
,
'
output_files
'
:
{
'
uv
'
:
{
'
nr_of_uv_files
'
:
481
,
'
uv_file_size
'
:
1482951104
},
'
saps
'
:
[{
'
sap_nr
'
:
0
,
'
properties
'
:
{
'
nr_of_uv_files
'
:
319
}
},
{
'
sap_nr
'
:
1
,
'
properties
'
:
{
'
nr_of_uv_files
'
:
81
}
},
{
'
sap_nr
'
:
2
,
'
properties
'
:
{
'
nr_of_uv_files
'
:
81
}
}]
}
},
},
'
errors
'
:
[
resource_error1
,
resource_error2
]
},
str
(
unknown_resource_type_otdb_id
):
{
'
pipeline
'
:
{
str
(
unknown_resource_type_name
):
{
}
}
}
}
cep4bandwidth_resource_id
=
116
cep4storage_resource_id
=
117
storage_claim
=
{
'
resource_id
'
:
cep4storage_resource_id
,
'
starttime
'
:
task_start_time
,
'
endtime
'
:
task_end_time
+
datetime
.
timedelta
(
days
=
365
),
'
status
'
:
'
claimed
'
,
'
claim_size
'
:
rerpc_needed_claim_for_storage_size
,
'
properties
'
:
[
{
'
type
'
:
2
,
'
io_type
'
:
'
output
'
,
'
value
'
:
481
},
{
'
type
'
:
10
,
'
io_type
'
:
'
output
'
,
'
value
'
:
1482951104
},
{
'
type
'
:
2
,
'
io_type
'
:
'
output
'
,
'
sap_nr
'
:
0
,
'
value
'
:
319
},
{
'
type
'
:
2
,
'
io_type
'
:
'
output
'
,
'
sap_nr
'
:
1
,
'
value
'
:
81
},
{
'
type
'
:
2
,
'
io_type
'
:
'
output
'
,
'
sap_nr
'
:
2
,
'
value
'
:
81
}
]
}
bandwith_claim
=
{
'
resource_id
'
:
cep4bandwidth_resource_id
,
'
starttime
'
:
task_start_time
,
'
endtime
'
:
task_end_time
,
'
status
'
:
'
claimed
'
,
'
claim_size
'
:
rerpc_needed_claim_for_bandwidth_size
}
specifaction_claims
=
[
bandwith_claim
,
storage_claim
]
def
reset_specification_tree
(
self
):
self
.
specification_tree
=
{
u
'
otdb_id
'
:
self
.
otdb_id
,
u
'
task_type
'
:
self
.
task_type
,
u
'
state
'
:
self
.
state
,
u
'
specification
'
:
{
u
'
Observation.momID
'
:
str
(
mom_id
),
u
'
Observation.startTime
'
:
future_start_time
,
u
'
Observation.stopTime
'
:
future_stop_time
,
u
'
Observation.momID
'
:
str
(
self
.
mom_id
),
u
'
Observation.startTime
'
:
self
.
future_start_time
,
u
'
Observation.stopTime
'
:
self
.
future_stop_time
,
u
'
Observation.DataProducts.Output_InstrumentModel.enabled
'
:
False
,
u
'
Observation.VirtualInstrument.stationList
'
:
[],
u
'
Observation.DataProducts.Input_CoherentStokes.enabled
'
:
False
,
...
...
@@ -200,7 +330,8 @@ class ResourceAssignerTest(unittest.TestCase):
u
'
Observation.DataProducts.Output_InstrumentModel.enabled
'
:
False
,
u
'
Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor
'
:
u
'
1
'
,
u
'
Observation.stopTime
'
:
u
'
2016-03-26 00:33:31
'
,
u
'
Observation.VirtualInstrument.stationList
'
:
[
u
'
RS205
'
,
u
'
RS503
'
,
u
'
CS013
'
,
u
'
RS508
'
,
u
'
RS106
'
],
u
'
Observation.VirtualInstrument.stationList
'
:
[
u
'
RS205
'
,
u
'
RS503
'
,
u
'
CS013
'
,
u
'
RS508
'
,
u
'
RS106
'
],
u
'
Observation.DataProducts.Input_CoherentStokes.enabled
'
:
False
,
u
'
Observation.DataProducts.Output_CoherentStokes.enabled
'
:
False
,
u
'
Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband
'
:
u
'
64
'
,
...
...
@@ -257,10 +388,58 @@ class ResourceAssignerTest(unittest.TestCase):
self
.
addCleanup
(
rarpc_patcher
.
stop
)
self
.
rarpc_mock
=
rarpc_patcher
.
start
()
self
.
rarpc_mock
.
getTask
.
side_effect
=
get_task_side_effect
self
.
rarpc_mock
.
insertSpecificationAndTask
.
return_value
=
{
'
inserted
'
:
True
,
'
specification_id
'
:
self
.
specification_id
,
'
task_id
'
:
self
.
task_id
}
self
.
rarpc_mock
.
getResourceClaimPropertyTypes
.
return_value
=
[
{
'
id
'
:
0
,
'
name
'
:
'
nr_of_is_files
'
},
{
'
id
'
:
1
,
'
name
'
:
'
nr_of_cs_files
'
},
{
'
id
'
:
2
,
'
name
'
:
'
nr_of_uv_files
'
},
{
'
id
'
:
3
,
'
name
'
:
'
nr_of_im_files
'
},
{
'
id
'
:
4
,
'
name
'
:
'
nr_of_img_files
'
},
{
'
id
'
:
5
,
'
name
'
:
'
nr_of_pulp_files
'
},
{
'
id
'
:
6
,
'
name
'
:
'
nr_of_cs_stokes
'
},
{
'
id
'
:
7
,
'
name
'
:
'
nr_of_is_stokes
'
},
{
'
id
'
:
8
,
'
name
'
:
'
is_file_size
'
},
{
'
id
'
:
9
,
'
name
'
:
'
cs_file_size
'
},
{
'
id
'
:
10
,
'
name
'
:
'
uv_file_size
'
},
{
'
id
'
:
11
,
'
name
'
:
'
im_file_size
'
},
{
'
id
'
:
12
,
'
name
'
:
'
img_file_size
'
},
{
'
id
'
:
13
,
'
name
'
:
'
nr_of_pulp_files
'
},
{
'
id
'
:
14
,
'
name
'
:
'
nr_of_tabs
'
},
{
'
id
'
:
15
,
'
name
'
:
'
start_sb_nr
'
},
{
'
id
'
:
16
,
'
name
'
:
'
uv_otdb_id
'
},
{
'
id
'
:
17
,
'
name
'
:
'
cs_otdb_id
'
},
{
'
id
'
:
18
,
'
name
'
:
'
is_otdb_id
'
},
{
'
id
'
:
19
,
'
name
'
:
'
im_otdb_id
'
},
{
'
id
'
:
20
,
'
name
'
:
'
img_otdb_id
'
},
{
'
id
'
:
21
,
'
name
'
:
'
pulp_otdb_id
'
},
{
'
id
'
:
22
,
'
name
'
:
'
is_tab_nr
'
},
{
'
id
'
:
23
,
'
name
'
:
'
start_sbg_nr
'
}
]
self
.
rarpc_mock
.
getResourceTypes
.
return_value
=
[
{
'
id
'
:
0
,
'
name
'
:
'
rsp
'
,
'
unit_id
'
:
0
,
'
units
'
:
'
rsp_channel_bit
'
},
{
'
id
'
:
1
,
'
name
'
:
'
tbb
'
,
'
unit_id
'
:
1
,
'
units
'
:
'
bytes
'
},
{
'
id
'
:
2
,
'
name
'
:
'
rcu
'
,
'
unit_id
'
:
2
,
'
units
'
:
'
rcu_board
'
},
{
'
id
'
:
3
,
'
name
'
:
'
bandwidth
'
,
'
unit_id
'
:
3
,
'
units
'
:
'
bits/second
'
},
{
'
id
'
:
4
,
'
name
'
:
'
processor
'
,
'
unit_id
'
:
4
,
'
units
'
:
'
cores
'
},
{
'
id
'
:
5
,
'
name
'
:
'
storage
'
,
'
unit_id
'
:
1
,
'
units
'
:
'
bytes
'
},
]
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[
1
,
2
]}
# incomplete response but good enough for tests
self
.
rarpc_mock
.
getResources
.
return_value
=
[
{
'
id
'
:
self
.
cep4bandwidth_resource_id
,
'
name
'
:
'
cep4bandwidth
'
,
'
type_id
'
:
3
,
'
type_name
'
:
'
bandwidth
'
,
'
unit_id
'
:
3
,
'
unit
'
:
'
bits/second
'
},
{
'
id
'
:
self
.
cep4storage_resource_id
,
'
name
'
:
'
cep4storage
'
,
'
type_id
'
:
5
,
'
type_name
'
:
'
storage
'
,
'
unit_id
'
:
1
,
'
unit
'
:
'
bytes
'
}
]
self
.
rarpc_mock
.
getResourceClaims
.
return_value
=
[]
rerpc_patcher
=
mock
.
patch
(
'
lofar.messaging.RPC
'
)
self
.
addCleanup
(
rerpc_patcher
.
stop
)
self
.
rerpc_mock
=
rerpc_patcher
.
start
()
self
.
rerpc_mock
.
return_value
=
self
.
rerpc_replymessage
,
self
.
rerpc_status
otdbrpc_patcher
=
mock
.
patch
(
'
lofar.sas.otdb.otdbrpc
'
)
self
.
addCleanup
(
otdbrpc_patcher
.
stop
)
...
...
@@ -292,6 +471,7 @@ class ResourceAssignerTest(unittest.TestCase):
self
.
resourceAssigner
=
TestingResourceAssigner
(
self
.
rarpc_mock
,
self
.
rerpc_mock
,
self
.
otdbrpc_mock
,
self
.
momrpc_mock
,
self
.
curpc_mock
,
self
.
ra_notification_bus_mock
)
self
.
reset_specification_tree
()
def
assert_all_services_opened
(
self
):
self
.
assertTrue
(
self
.
rarpc_mock
.
open
.
called
,
"
RARPC.open was not called
"
)
...
...
@@ -494,21 +674,20 @@ class ResourceAssignerTest(unittest.TestCase):
@mock.patch
(
'
lofar.sas.resourceassignment.resourceassigner.assignment.datetime
'
)
def
test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future
(
self
,
datetime_mock
):
now
=
datetime
.
datetime
.
utcnow
()
+
datetime
.
timedelta
(
days
=
1
)
now
=
self
.
strip_ms
(
now
)
now
=
self
.
_
strip_ms
(
now
)
datetime_mock
.
utcnow
.
return_value
=
now
datetime_mock
.
strptime
.
side_effect
=
\
lambda
date_string
,
format_string
:
datetime
.
datetime
.
strptime
(
date_string
,
format_string
)
future_predecessor_stop_time
=
now
+
datetime
.
timedelta
(
hours
=
1
)
tree_with_predecessor_observation_stop_time
=
deepcopy
(
self
.
specification_tree
)
tree_with_predecessor_observation_stop_time
[
'
predecessors
'
][
0
][
'
specification
'
][
'
Observation.stopTime
'
]
=
\
self
.
specification_tree
[
'
predecessors
'
][
0
][
'
specification
'
][
'
Observation.stopTime
'
]
=
\
future_predecessor_stop_time
.
strftime
(
'
%Y-%m-%d %H:%M:%S
'
)
new_starttime
=
future_predecessor_stop_time
+
datetime
.
timedelta
(
minutes
=
1
)
new_endtime
=
new_starttime
+
datetime
.
timedelta
(
hours
=
1
)
self
.
resourceAssigner
.
doAssignment
(
tree_with_predecessor_observation_stop_tim
e
)
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tre
e
)
self
.
logger_mock
.
warning
.
assert_any_call
(
'
Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s
'
,
...
...
@@ -525,7 +704,7 @@ class ResourceAssignerTest(unittest.TestCase):
'
LOFAR.ObsSW.Observation.stopTime
'
:
new_endtime
.
strftime
(
'
%Y-%m-%d %H:%M:%S
'
)
})
def
strip_ms
(
self
,
now
):
def
_
strip_ms
(
self
,
now
):
return
datetime
.
datetime
.
strptime
(
now
.
strftime
(
'
%Y-%m-%d %H:%M:%S
'
),
'
%Y-%m-%d %H:%M:%S
'
)
@mock.patch
(
'
lofar.sas.resourceassignment.resourceassigner.assignment.datetime
'
)
...
...
@@ -534,14 +713,13 @@ class ResourceAssignerTest(unittest.TestCase):
datetime_mock
.
utcnow
.
return_value
=
now
tree_with_observation_duraction
=
deepcopy
(
self
.
specification_tree
)
duration
=
100
tree_with_observation_duraction
[
'
specification
'
][
'
Observation.Scheduler.taskDuration
'
]
=
duration
self
.
specification_tree
[
'
specification
'
][
'
Observation.Scheduler.taskDuration
'
]
=
duration
new_starttime
=
now
+
datetime
.
timedelta
(
minutes
=
1
)
new_endtime
=
new_starttime
+
datetime
.
timedelta
(
seconds
=
duration
)
self
.
resourceAssigner
.
doAssignment
(
tree_with_observation_duraction
)
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
warning
.
assert_any_call
(
'
Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s
'
,
...
...
@@ -558,6 +736,21 @@ class ResourceAssignerTest(unittest.TestCase):
'
LOFAR.ObsSW.Observation.stopTime
'
:
new_endtime
.
strftime
(
'
%Y-%m-%d %H:%M:%S
'
)
})
def
test_do_assignment_should_log_insertion_of_specification_and_task
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s
'
'
cluster=%s
'
%
(
self
.
mom_id
,
self
.
otdb_id
,
self
.
state
,
self
.
task_type
,
self
.
future_start_time
,
self
.
future_stop_time
,
"
CEP4
"
))
def
test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
doAssignment: inserted specification (id=%s) and task (id=%s)
'
%
(
self
.
specification_id
,
self
.
task_id
))
def
test_do_assignment_should_log_on_non_presceduled_cep4_tasks
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
non_approved_or_prescheduled_specification_tree
)
...
...
@@ -567,9 +760,252 @@ class ResourceAssignerTest(unittest.TestCase):
self
.
non_approved_or_prescheduled_status
))
def
test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
non_approved_or_prescheduled_specification_tree
)
self
.
resourceAssigner
.
doAssignment
(
self
.
non_approved_or_prescheduled_specification_tree
)
self
.
rarpc_mock
.
insertResourceClaims
.
assert_not_called
()
def
test_do_assginement_should_request_needed_resources
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rerpc_mock
.
assert_any_call
(
{
"
specification_tree
"
:
self
.
specification_tree
},
timeout
=
10
)
def
test_do_assignemnet_logs_needed_resources
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
(
'
getNeededResouces: %s
'
%
self
.
rerpc_replymessage
)
def
test_do_assignment_logs_when_otdb_id_not_needed_resources
(
self
):
self
.
specification_tree
[
"
otdb_id
"
]
=
self
.
otdb_id
+
1
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
error
.
assert_any_call
(
"
no otdb_id %s found in estimator results %s
"
%
(
self
.
otdb_id
+
1
,
self
.
rerpc_replymessage
))
def
test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources
(
self
):
self
.
specification_tree
[
"
otdb_id
"
]
=
self
.
otdb_id
+
1
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
insertResourceClaims
.
assert_not_called
()
def
test_do_assignment_logs_when_task_type_not_in_needed_resources
(
self
):
wrong_task_type
=
"
observation
"
self
.
specification_tree
[
"
task_type
"
]
=
wrong_task_type
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
error
.
assert_any_call
(
"
no task type %s found in estimator results %s
"
%
(
wrong_task_type
,
self
.
rerpc_replymessage
[
str
(
self
.
otdb_id
)]))
def
test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources
(
self
):
wrong_task_type
=
"
observation
"
self
.
specification_tree
[
"
task_type
"
]
=
wrong_task_type
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
insertResourceClaims
.
assert_not_called
()
def
test_do_assignment_should_log_error_in_needed_resources
(
self
):
self
.
specification_tree
[
"
otdb_id
"
]
=
self
.
resources_with_errors_otdb_id
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
error
.
assert_any_call
(
"
Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to
'
error
'"
,
self
.
resources_with_errors_otdb_id
,
self
.
task_id
)
def
test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources
(
self
):
self
.
specification_tree
[
"
otdb_id
"
]
=
self
.
resources_with_errors_otdb_id
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
updateTask
.
assert_any_call
(
self
.
task_id
,
status
=
'
error
'
)
def
test_do_assignment_should_notify_bus_on_errors_in_needed_resources
(
self
):
content
=
{
'
radb_id
'
:
self
.
task_id
,
'
otdb_id
'
:
self
.
task_otdb_id
,
'
mom_id
'
:
self
.
task_mom_id
}
subject
=
ra_notification_prefix
+
'
TaskError
'
self
.
specification_tree
[
"
otdb_id
"
]
=
self
.
resources_with_errors_otdb_id
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
assertTrue
(
self
.
ra_notification_bus_send_called_with
(
content
,
subject
))
def
ra_notification_bus_send_called_with
(
self
,
content
,
subject
):
found
=
False
for
call
in
self
.
ra_notification_bus_mock
.
send
.
call_args_list
:
if
call
[
0
][
0
].
subject
==
subject
and
call
[
0
][
0
].
content
==
content
:
found
=
True
return
found
def
test_do_assignment_should_log_start_of_claim_resources
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: task %s needed_resources=%s
'
%
(
self
.
task
,
self
.
rerpc_replymessage
[
str
(
str
(
self
.
otdb_id
))]))
def
test_do_assigment_should_log_when_claiming_unknown_resource_type
(
self
):
self
.
specification_tree
[
"
otdb_id
"
]
=
self
.
unknown_resource_type_otdb_id
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
error
.
assert_any_call
(
'
claimResources: unknown resource_type:%s
'
%
self
.
unknown_resource_type_name
)
def
test_do_assignment_logs_claimable_resources_in_specification
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: processing resource_type: %s contents: %s
'
%
(
'
storage
'
,
self
.
rerpc_needed_claim_for_storage
))
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: processing resource_type: %s contents: %s
'
%
(
'
bandwidth
'
,
self
.
rerpc_needed_claim_for_bandwidth
))
def
test_do_assignment_logs_created_claim_per_needed_resource_type
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: created claim:%s
'
%
self
.
storage_claim
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: created claim:%s
'
%
self
.
bandwith_claim
)
def
test_do_assignment_logs_amount_of_claims_inserted_in_radb
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: inserting %d claims in the radb
'
%
2
)
def
test_do_assignment_inserts_resource_claims_in_radb
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
insertResourceClaims
.
assert_any_call
(
self
.
task_id
,
self
.
specifaction_claims
,
1
,
'
anonymous
'
,
-
1
)
def
test_do_assignment_logs_amount_claims_inserted
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: %d claims were inserted in the radb
'
%
2
)
def
test_do_assignment_logs_unknown_property_on_needed_resources
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
error
.
assert_any_call
(
'
claimResources: unknown prop_type:%s
'
%
self
.
resource_unknown_property
)
def
test_do_assignment_logs_multiple_properties_on_needed_resource
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
claimResources: processing resource_type: %s subdict_name:
\'
%s
\'
subdict_contents: %s
'
%
(
"
storage
"
,
"
output_files
"
,
self
.
rerpc_needed_claim_for_storage_output_files
))
def
test_do_assignment_logs_when_it_was_unable_to_claim_all_resources
(
self
):
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[]}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
warning
.
assert_any_call
(
'
doAssignment: No claims could be made. Setting task %s status to error
'
%
self
.
task_id
)
def
test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources
(
self
):
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[]}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
updateTask
.
assert_any_call
(
self
.
task_id
,
status
=
'
error
'
)
def
test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources
(
self
):
content
=
{
'
radb_id
'
:
self
.
task_id
,
'
otdb_id
'
:
self
.
task_otdb_id
,
'
mom_id
'
:
self
.
task_mom_id
}
subject
=
ra_notification_prefix
+
'
Task
'
+
'
Error
'
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[]}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
assertTrue
(
self
.
ra_notification_bus_send_called_with
(
content
,
subject
))
def
test_do_assignment_logs_when_it_was_unable_to_claim_some_resources
(
self
):
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[
1
]}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
warning
.
assert_any_call
(
'
doAssignment: Not all claims could be inserted. Setting task %s status to conflict
'
%
self
.
task_id
)
def
test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources
(
self
):
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[
1
]}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
updateTask
.
assert_any_call
(
self
.
task_id
,
status
=
'
conflict
'
)
def
test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources
(
self
):
content
=
{
'
radb_id
'
:
self
.
task_id
,
'
otdb_id
'
:
self
.
task_otdb_id
,
'
mom_id
'
:
self
.
task_mom_id
}
subject
=
ra_notification_prefix
+
'
Task
'
+
'
Conflict
'
self
.
rarpc_mock
.
insertResourceClaims
.
return_value
=
{
'
ids
'
:
[
1
]}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
assertTrue
(
self
.
ra_notification_bus_send_called_with
(
content
,
subject
))
def
test_do_assignment_logs_when_there_are_conflicting_claims
(
self
):
conflicting_claims
=
[{}]
self
.
rarpc_mock
.
getResourceClaims
.
return_value
=
conflicting_claims
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
warning
.
assert_any_call
(
'
doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s
'
%
(
len
(
conflicting_claims
),
conflicting_claims
))
def
test_do_assignment_notifies_bus_when_there_are_conflicting_claims
(
self
):
content
=
{
'
radb_id
'
:
self
.
task_id
,
'
otdb_id
'
:
self
.
task_otdb_id
,
'
mom_id
'
:
self
.
task_mom_id
}
subject
=
ra_notification_prefix
+
'
Task
'
+
'
Conflict
'
conflicting_claims
=
[{}]
self
.
rarpc_mock
.
getResourceClaims
.
return_value
=
conflicting_claims
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
assertTrue
(
self
.
ra_notification_bus_send_called_with
(
content
,
subject
))
def
test_do_assignment_logs_when_all_resources_were_claimed
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
info
.
assert_any_call
(
'
doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated
'
%
self
.
task_id
)
def
test_do_assignment_updates_task_and_resources_as_claimed_in_radb
(
self
):
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
rarpc_mock
.
updateTaskAndResourceClaims
.
assert_any_call
(
self
.
task_id
,
claim_status
=
'
allocated
'
)
def
test_do_assignment_removes_task_data_if_task_is_pipeline
(
self
):
self
.
curpc_mock
.
getPathForOTDBId
.
return_value
=
{
'
found
'
:
True
}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
curpc_mock
.
removeTaskData
.
assert_any_call
(
self
.
task_otdb_id
)
def
test_do_assignment_logs_when_taks_data_could_not_be_deleted
(
self
):
message
=
"
file was locked
"
self
.
curpc_mock
.
getPathForOTDBId
.
return_value
=
{
'
found
'
:
True
}
self
.
curpc_mock
.
removeTaskData
.
return_value
=
{
'
deleted
'
:
False
,
'
message
'
:
message
}
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
logger_mock
.
warning
.
assert_any_call
(
"
could not remove all data on disk from previous run for otdb_id %s: %s
"
,
self
.
otdb_id
,
message
)
def
test_do_assignment_notifies_bus_when_task_is_scheduled
(
self
):
content
=
{
'
radb_id
'
:
self
.
task_id
,
'
otdb_id
'
:
self
.
task_otdb_id
,
'
mom_id
'
:
self
.
task_mom_id
}
subject
=
ra_notification_prefix
+
'
Task
'
+
'
Scheduled
'
self
.
resourceAssigner
.
doAssignment
(
self
.
specification_tree
)
self
.
assertTrue
(
self
.
ra_notification_bus_send_called_with
(
content
,
subject
))
if
__name__
==
'
__main__
'
:
unittest
.
main
()
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment