Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
T
tango
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Wiki
Requirements
Jira issues
Open Jira
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Build
Pipelines
Jobs
Pipeline schedules
Test cases
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
LOFAR2.0
tango
Commits
234b3125
Commit
234b3125
authored
3 years ago
by
Stefano Di Frischia
Browse files
Options
Downloads
Patches
Plain Diff
L2SS-398
: improve subscribers handling
parent
7ecdac7f
Branches
Branches containing commit
Tags
Tags containing commit
1 merge request
!161
Resolve L2SS-398 "Archiver multi es"
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
devices/toolkit/archiver.py
+64
-11
64 additions, 11 deletions
devices/toolkit/archiver.py
jupyter-notebooks/Archiving_load_test.ipynb
+209
-435
209 additions, 435 deletions
jupyter-notebooks/Archiving_load_test.ipynb
with
273 additions
and
446 deletions
devices/toolkit/archiver.py
+
64
−
11
View file @
234b3125
...
@@ -2,8 +2,6 @@
...
@@ -2,8 +2,6 @@
#from logging import raiseExceptions
#from logging import raiseExceptions
import
logging
import
logging
import
traceback
from
clients.attribute_wrapper
import
attribute_wrapper
from
tango
import
DeviceProxy
,
AttributeProxy
from
tango
import
DeviceProxy
,
AttributeProxy
from
datetime
import
datetime
,
timedelta
from
datetime
import
datetime
,
timedelta
...
@@ -30,6 +28,20 @@ def parse_attribute_name(attribute_name:str):
...
@@ -30,6 +28,20 @@ def parse_attribute_name(attribute_name:str):
else
:
else
:
return
attribute_name
return
attribute_name
def
parse_device_name
(
device_name
:
str
,
tango_host
:
str
=
'
databaseds:10000
'
):
"""
For some operations Tango devices must be transformed from the form
'
domain/family/name
'
to
'
tango://db:port/domain/family/name
'
"""
chunk_num
=
len
(
device_name
.
split
(
'
/
'
))
if
(
chunk_num
==
3
):
return
'
tango://
'
+
tango_host
+
'
/
'
+
device_name
elif
(
chunk_num
==
6
and
device_name
.
split
(
'
/
'
)[
0
]
==
'
tango:
'
):
return
device_name
else
:
raise
Exception
(
f
'
{
device_name
}
is a wrong device name
'
)
class
Archiver
():
class
Archiver
():
"""
"""
The Archiver class implements the basic operations to perform attributes archiving
The Archiver class implements the basic operations to perform attributes archiving
...
@@ -48,29 +60,41 @@ class Archiver():
...
@@ -48,29 +60,41 @@ class Archiver():
raise
Exception
(
"
Configuration Manager is in FAULT state
"
)
raise
Exception
(
"
Configuration Manager is in FAULT state
"
)
except
Exception
as
e
:
except
Exception
as
e
:
raise
Exception
(
"
Connection failed with Configuration Manager device
"
)
from
e
raise
Exception
(
"
Connection failed with Configuration Manager device
"
)
from
e
self
.
es_list
=
[
es_name
for
es_name
in
self
.
get_subscribers
(
from_db
=
Tru
e
)]
or
[]
self
.
es_list
=
[
es_name
for
es_name
in
self
.
get_subscribers
(
from_db
=
Fals
e
)]
or
[]
self
.
cm
.
write_attribute
(
'
Context
'
,
context
)
# Set default Context Archiving for all the subscribers
self
.
cm
.
write_attribute
(
'
Context
'
,
context
)
# Set default Context Archiving for all the subscribers
self
.
selector
=
Selector
()
if
selector_filename
is
None
else
Selector
(
selector_filename
)
# Create selector for customized strategies
self
.
selector
=
Selector
()
if
selector_filename
is
None
else
Selector
(
selector_filename
)
# Create selector for customized strategies
try
:
try
:
self
.
apply_selector
()
self
.
apply_selector
()
except
Exception
as
e
:
except
Exception
as
e
:
raise
Exception
(
"
Error in selecting configuration! Archiving framework will not be updated!
"
)
from
e
raise
Exception
(
"
Error in selecting configuration! Archiving framework will not be updated!
"
)
from
e
def
get_
h
db
pp_libname
(
self
,
device_name
:
str
):
def
get_db
_config
(
self
,
device_name
:
str
):
"""
"""
Get the hdbpp library name used by the Configuration Manager or by the EventSubscribers
Retrieve the DB credentials from the Tango properties of Configuration Manager or EventSubscribers
Useful in the case of different DBMS architectures (e.g. MySQL, TimescaleDB)
"""
"""
device
=
DeviceProxy
(
device_name
)
device
=
DeviceProxy
(
device_name
)
config_list
=
device
.
get_property
(
'
LibConfiguration
'
)[
'
LibConfiguration
'
]
# dictionary {'LibConfiguration': list of strings}
config_list
=
device
.
get_property
(
'
LibConfiguration
'
)[
'
LibConfiguration
'
]
# dictionary {'LibConfiguration': list of strings}
host
=
str
([
s
for
s
in
config_list
if
"
host
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
libname
=
str
([
s
for
s
in
config_list
if
"
libname
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
libname
=
str
([
s
for
s
in
config_list
if
"
libname
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
return
libname
dbname
=
str
([
s
for
s
in
config_list
if
"
dbname
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
port
=
str
([
s
for
s
in
config_list
if
"
port
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
user
=
str
([
s
for
s
in
config_list
if
"
user
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
pw
=
str
([
s
for
s
in
config_list
if
"
password
"
in
s
][
0
].
split
(
'
=
'
)[
1
])
return
[
host
,
libname
,
dbname
,
port
,
user
,
pw
]
def
get_hdbpp_libname
(
self
,
device_name
:
str
):
"""
Get the hdbpp library name used by the Configuration Manager or by the EventSubscribers
Useful in the case of different DBMS architectures (e.g. MySQL, TimescaleDB)
"""
config_list
=
self
.
get_db_config
(
device_name
)
return
config_list
[
1
]
def
get_subscribers
(
self
,
from_db
:
bool
=
Tru
e
):
def
get_subscribers
(
self
,
from_db
:
bool
=
Fals
e
):
"""
"""
Get the list of Event Subscribers managed by the Configuration Manager.
Get the list of Event Subscribers managed by the Configuration Manager.
It can be retrieved as a device property (stored in TangoDB) or as a device attribute.
It can be retrieved as a device property (stored in TangoDB) or as a device attribute.
The first option seems to be the safest one
.
Choose from_db=True only if new subscribers are not added dynamically while ConfManager is running
.
"""
"""
if
from_db
:
if
from_db
:
es_list
=
self
.
cm
.
get_property
(
'
ArchiverList
'
)[
'
ArchiverList
'
]
or
[]
es_list
=
self
.
cm
.
get_property
(
'
ArchiverList
'
)[
'
ArchiverList
'
]
or
[]
...
@@ -84,7 +108,19 @@ class Archiver():
...
@@ -84,7 +108,19 @@ class Archiver():
TODO: the choice among subscribers should be done analysing their load
TODO: the choice among subscribers should be done analysing their load
"""
"""
es_list
=
self
.
get_subscribers
()
es_list
=
self
.
get_subscribers
()
return
es_list
[
0
]
# Only one subscriber in ConfManager list
if
len
(
es_list
)
==
1
:
return
es_list
[
0
]
else
:
# Choose the best subscriber analysing their load
load_dict
=
{}
for
es_name
in
es_list
:
es
=
DeviceProxy
(
es_name
)
load_dict
[
es_name
]
=
float
(
es
.
AttributeRecordFreq
)
min_load
=
min
(
load_dict
.
values
())
# Return the subscriber's name with min load
min_es
=
list
(
load_dict
.
keys
())[
list
(
load_dict
.
values
()).
index
(
min_load
)]
return
min_es
def
apply_selector
(
self
):
def
apply_selector
(
self
):
"""
"""
...
@@ -127,6 +163,23 @@ class Archiver():
...
@@ -127,6 +163,23 @@ class Archiver():
raise
Exception
from
e
raise
Exception
from
e
return
env_dict
return
env_dict
def
add_event_subscriber
(
self
,
es_name
:
str
=
None
):
"""
Add an additional Event Subscriber to the Configuration Manager
"""
# If the subscriber name is not defined, generate the name by incrementing the existing one
if
es_name
is
None
:
last_es_name
=
self
.
get_subscribers
()[
-
1
]
last_es_idx
=
int
(
last_es_name
[
-
2
:])
es_name
=
last_es_name
[:
-
2
]
+
'
0
'
+
str
(
last_es_idx
+
1
)
try
:
self
.
cm
.
ArchiverAdd
(
parse_device_name
(
es_name
))
except
Exception
as
e
:
if
'
already_present
'
in
str
(
e
):
logger
.
warning
(
f
"
Subscriber
{
es_name
}
already present in Configuration Manager
"
)
else
:
raise
Exception
from
e
def
add_attribute_to_archiver
(
self
,
attribute_name
:
str
,
polling_period
:
int
,
event_period
:
int
,
strategy
:
str
=
'
RUN
'
,
es_name
:
str
=
None
):
def
add_attribute_to_archiver
(
self
,
attribute_name
:
str
,
polling_period
:
int
,
event_period
:
int
,
strategy
:
str
=
'
RUN
'
,
es_name
:
str
=
None
):
"""
"""
Takes as input the attribute name, polling period (ms), event period (ms) and archiving strategy,
Takes as input the attribute name, polling period (ms), event period (ms) and archiving strategy,
...
...
This diff is collapsed.
Click to expand it.
jupyter-notebooks/Archiving_load_test.ipynb
+
209
−
435
View file @
234b3125
Source diff could not be displayed: it is too large. Options to address this:
view the blob
.
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment