Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
LOFAR
Manage
Activity
Members
Labels
Plan
Issues
Wiki
Jira issues
Open Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
RadioObservatory
LOFAR
Commits
fe8ae886
Commit
fe8ae886
authored
4 years ago
by
Jorrit Schaap
Browse files
Options
Downloads
Patches
Plain Diff
TMSS-190
: reshuffled layout of module. Added documentation
parent
4be41d8c
No related branches found
No related tags found
1 merge request
!252
Resolve TMSS-190
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py
+77
-53
77 additions, 53 deletions
SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py
with
77 additions
and
53 deletions
SAS/TMSS/services/scheduling/lib/dynamic_scheduling.py
+
77
−
53
View file @
fe8ae886
...
...
@@ -42,56 +42,14 @@ from lofar.sas.tmss.services.scheduling.constraints import *
# LOFAR needs to have a gap in between observations to (re)initialize hardware.
DEFAULT_INTER_OBSERVATION_GAP
=
timedelta
(
seconds
=
60
)
def
get_schedulable_scheduling_units
()
->
[
models
.
SchedulingUnitBlueprint
]:
'''
get a list of all schedulable scheduling_units
'''
defined_independend_subtasks
=
models
.
Subtask
.
independent_subtasks
().
filter
(
state__value
=
'
defined
'
)
defined_independend_subtask_ids
=
defined_independend_subtasks
.
values
(
'
task_blueprint__scheduling_unit_blueprint_id
'
).
distinct
().
all
()
scheduling_units
=
models
.
SchedulingUnitBlueprint
.
objects
.
filter
(
id__in
=
defined_independend_subtask_ids
).
select_related
(
'
draft
'
,
'
draft__scheduling_constraints_template
'
).
all
()
return
[
su
for
su
in
scheduling_units
if
su
.
status
==
'
schedulable
'
]
def
get_scheduled_scheduling_units
(
lower
:
datetime
=
None
,
upper
:
datetime
=
None
)
->
[
models
.
SchedulingUnitBlueprint
]:
'''
get a list of all scheduled scheduling_units
'''
scheduled_subtasks
=
models
.
Subtask
.
objects
.
filter
(
state__value
=
'
scheduled
'
)
if
lower
is
not
None
:
scheduled_subtasks
=
scheduled_subtasks
.
filter
(
stop_time__gte
=
lower
)
if
upper
is
not
None
:
scheduled_subtasks
=
scheduled_subtasks
.
filter
(
start_time__lte
=
upper
)
return
list
(
models
.
SchedulingUnitBlueprint
.
objects
.
filter
(
id__in
=
scheduled_subtasks
.
values
(
'
task_blueprint__scheduling_unit_blueprint_id
'
).
distinct
()).
all
())
def
unschededule_blocking_scheduled_units_if_needed_and_possible
(
candidate
:
ScoredSchedulingUnit
)
->
bool
:
'''
check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.
'''
# check any previously scheduled units, and unschedule if needed/allowed
scheduled_scheduling_units
=
get_scheduled_scheduling_units
(
lower
=
candidate
.
start_time
,
upper
=
candidate
.
start_time
+
candidate
.
scheduling_unit
.
duration
)
# check if we can and need to unschedule the blocking units
for
scheduled_scheduling_unit
in
scheduled_scheduling_units
:
scheduled_score
=
compute_scores
(
scheduled_scheduling_unit
,
candidate
.
start_time
,
candidate
.
start_time
+
candidate
.
scheduling_unit
.
duration
)
if
candidate
.
weighted_score
>
scheduled_score
.
weighted_score
:
# ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled
logger
.
info
(
"
unscheduling id=%s
'
%s
'
because it is in the way and has a lower score than the best candidate id=%s
'
%s
'
score=%s start_time=%s
"
,
scheduled_scheduling_unit
.
id
,
scheduled_scheduling_unit
.
name
,
candidate
.
scheduling_unit
.
id
,
candidate
.
scheduling_unit
.
name
,
candidate
.
weighted_score
,
candidate
.
scheduling_unit
.
start_time
)
unschedule_subtasks_in_scheduling_unit_blueprint
(
scheduled_scheduling_unit
)
# check again... are still there any scheduled_scheduling_units in the way?
scheduled_scheduling_units
=
get_scheduled_scheduling_units
(
lower
=
candidate
.
start_time
,
upper
=
candidate
.
start_time
+
candidate
.
scheduling_unit
.
duration
)
if
scheduled_scheduling_units
:
# accept current solution with current scheduled_scheduling_units
logger
.
info
(
"
keeping current scheduled unit(s) which have a better (or equal) score: %s
"
,
"
;
"
.
join
(
"
id=%s
'
%s
'
start_time=
'
%s
'"
%
(
su
.
id
,
su
.
name
,
su
.
start_time
)
for
su
in
scheduled_scheduling_units
))
# indicate there are still blocking units
return
False
# all clear, nothing is blocking anymore
return
True
################## core dynamic scheduling methods ################################################
# #
# This module starts with the core dynamic scheduling methods which are used in the dynamic #
# scheduling service. These high level methods only filter/score/sort in a generic way. #
# The detailed concrete filter/score/sort methods are pick by a strategy pattern in the #
# constraints package based on each scheduling unit's scheduling_constrains template. #
# #
###################################################################################################
def
find_best_next_schedulable_unit
(
scheduling_units
:[
models
.
SchedulingUnitBlueprint
],
lower_bound_start_time
:
datetime
,
upper_bound_stop_time
:
datetime
)
->
ScoredSchedulingUnit
:
"""
...
...
@@ -236,8 +194,16 @@ def schedule_next_scheduling_unit_and_assign_start_stop_times_to_remaining_sched
return
scheduled_unit
class
TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler
(
TMSSEventMessageHandler
):
################## service/messagebug handler class ###############################################
class
TMSSDynamicSchedulingMessageHandler
(
TMSSEventMessageHandler
):
'''
The TMSSDynamicSchedulingMessageHandler reacts to TMSS EventMessages by triggering a new full update of the dynamic
schedule.
The actual schedule-update method runs on a backround thread, and can take some time to complete ranging from a
few seconds to several minutes. In the mean time new EventMessages may be received. These are handled by raising a flag
that signals the schedule-update-thread that a new full update is needed. This way, a burst of Events results in
a single update, and it also ensures that we always compute the schedule with the latest data.
'''
def
__init__
(
self
):
...
...
@@ -248,7 +214,7 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag
def
start_handling
(
self
):
# start the background thread which waits until the _do_schedule_event event is set upon receiving to the correct TMSS EVentMessages.
self
.
_scheduling_thread
=
Thread
(
target
=
TMSS
SchedulingUnitBlueprint
DynamicSchedulingMessageHandler
.
_scheduling_loop
,
kwargs
=
{
'
self
'
:
self
})
self
.
_scheduling_thread
=
Thread
(
target
=
TMSSDynamicSchedulingMessageHandler
.
_scheduling_loop
,
kwargs
=
{
'
self
'
:
self
})
self
.
_scheduling_thread
.
daemon
=
True
self
.
_scheduling_thread_running
=
True
self
.
_scheduling_thread
.
start
()
...
...
@@ -297,8 +263,66 @@ class TMSSSchedulingUnitBlueprintDynamicSchedulingMessageHandler(TMSSEventMessag
def
create_dynamic_scheduling_service
(
exchange
:
str
=
DEFAULT_BUSNAME
,
broker
:
str
=
DEFAULT_BROKER
):
return
TMSSBusListener
(
handler_type
=
TMSS
SchedulingUnitBlueprint
DynamicSchedulingMessageHandler
,
return
TMSSBusListener
(
handler_type
=
TMSSDynamicSchedulingMessageHandler
,
handler_kwargs
=
None
,
exchange
=
exchange
,
broker
=
broker
)
################## helper methods #################################################################
def
get_schedulable_scheduling_units
()
->
[
models
.
SchedulingUnitBlueprint
]:
'''
get a list of all schedulable scheduling_units
'''
defined_independend_subtasks
=
models
.
Subtask
.
independent_subtasks
().
filter
(
state__value
=
'
defined
'
)
defined_independend_subtask_ids
=
defined_independend_subtasks
.
values
(
'
task_blueprint__scheduling_unit_blueprint_id
'
).
distinct
().
all
()
scheduling_units
=
models
.
SchedulingUnitBlueprint
.
objects
.
filter
(
id__in
=
defined_independend_subtask_ids
).
select_related
(
'
draft
'
,
'
draft__scheduling_constraints_template
'
).
all
()
return
[
su
for
su
in
scheduling_units
if
su
.
status
==
'
schedulable
'
]
def
get_scheduled_scheduling_units
(
lower
:
datetime
=
None
,
upper
:
datetime
=
None
)
->
[
models
.
SchedulingUnitBlueprint
]:
'''
get a list of all scheduled scheduling_units
'''
scheduled_subtasks
=
models
.
Subtask
.
objects
.
filter
(
state__value
=
'
scheduled
'
)
if
lower
is
not
None
:
scheduled_subtasks
=
scheduled_subtasks
.
filter
(
stop_time__gte
=
lower
)
if
upper
is
not
None
:
scheduled_subtasks
=
scheduled_subtasks
.
filter
(
start_time__lte
=
upper
)
return
list
(
models
.
SchedulingUnitBlueprint
.
objects
.
filter
(
id__in
=
scheduled_subtasks
.
values
(
'
task_blueprint__scheduling_unit_blueprint_id
'
).
distinct
()).
all
())
def
unschededule_blocking_scheduled_units_if_needed_and_possible
(
candidate
:
ScoredSchedulingUnit
)
->
bool
:
'''
check if there are any already scheduled units in the way, and unschedule them if allowed. Return True if nothing is blocking anymore.
'''
# check any previously scheduled units, and unschedule if needed/allowed
scheduled_scheduling_units
=
get_scheduled_scheduling_units
(
lower
=
candidate
.
start_time
,
upper
=
candidate
.
start_time
+
candidate
.
scheduling_unit
.
duration
)
# check if we can and need to unschedule the blocking units
for
scheduled_scheduling_unit
in
scheduled_scheduling_units
:
scheduled_score
=
compute_scores
(
scheduled_scheduling_unit
,
candidate
.
start_time
,
candidate
.
start_time
+
candidate
.
scheduling_unit
.
duration
)
if
candidate
.
weighted_score
>
scheduled_score
.
weighted_score
:
# ToDo: also check if the scheduled_scheduling_unit is manually/dynamically scheduled
logger
.
info
(
"
unscheduling id=%s
'
%s
'
because it is in the way and has a lower score than the best candidate id=%s
'
%s
'
score=%s start_time=%s
"
,
scheduled_scheduling_unit
.
id
,
scheduled_scheduling_unit
.
name
,
candidate
.
scheduling_unit
.
id
,
candidate
.
scheduling_unit
.
name
,
candidate
.
weighted_score
,
candidate
.
scheduling_unit
.
start_time
)
unschedule_subtasks_in_scheduling_unit_blueprint
(
scheduled_scheduling_unit
)
# check again... are still there any scheduled_scheduling_units in the way?
scheduled_scheduling_units
=
get_scheduled_scheduling_units
(
lower
=
candidate
.
start_time
,
upper
=
candidate
.
start_time
+
candidate
.
scheduling_unit
.
duration
)
if
scheduled_scheduling_units
:
# accept current solution with current scheduled_scheduling_units
logger
.
info
(
"
keeping current scheduled unit(s) which have a better (or equal) score: %s
"
,
"
;
"
.
join
(
"
id=%s
'
%s
'
start_time=
'
%s
'"
%
(
su
.
id
,
su
.
name
,
su
.
start_time
)
for
su
in
scheduled_scheduling_units
))
# indicate there are still blocking units
return
False
# all clear, nothing is blocking anymore
return
True
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment