Skip to content
Snippets Groups Projects

L2SS-1966: Add mutex around state commands in AsyncDevice

Merged Jan David Mol requested to merge L2SS-1966-mutex-tango-commands into master
Compare and
25 files
+ 470
153
Compare changes
  • Side-by-side
  • Inline
Files
25
@@ -2,12 +2,18 @@
@@ -2,12 +2,18 @@
# SPDX-License-Identifier: Apache-2.0
# SPDX-License-Identifier: Apache-2.0
import asyncio
import asyncio
 
import datetime
import logging
import logging
from contextlib import suppress
from contextlib import suppress
from concurrent.futures import Future, CancelledError
from concurrent.futures import Future, CancelledError
from typing import Callable
from typing import Callable, Dict
 
 
from prometheus_client import Histogram
 
from prometheus_client.utils import INF
from tangostationcontrol.common.threading import OmniThread
from tangostationcontrol.common.threading import OmniThread
 
from tangostationcontrol.common.device_decorators import DurationMetric
 
from tangostationcontrol.metrics import AttributeMetric
logger = logging.getLogger()
logger = logging.getLogger()
@@ -139,3 +145,64 @@ class PeriodicTask:
@@ -139,3 +145,64 @@ class PeriodicTask:
"""Return whether the periodic call is still scheduled."""
"""Return whether the periodic call is still scheduled."""
return self.task and not self.task.done() and not self.done
return self.task and not self.task.done() and not self.done
 
 
 
class MonitoredLock(asyncio.Lock):
 
HISTOGRAM_BUCKETS = (
 
0.1,
 
0.2,
 
0.5,
 
1.0,
 
2.0,
 
3.0,
 
5.0,
 
10.0,
 
30.0,
 
60.0,
 
90.0,
 
120.0,
 
300.0,
 
INF,
 
)
 
 
def __init__(self, name: str, metric_labels: Dict[str, str], *args, **kwargs):
 
super().__init__(*args, **kwargs)
 
 
self.metric_labels = metric_labels
 
self.name = name
 
self.acquired_at = datetime.datetime.min
 
 
self.hold_duration_metric = AttributeMetric(
 
f"duration_{self.name}_locked",
 
"How long the {self.name} lock was held",
 
metric_labels,
 
Histogram,
 
metric_class_init_kwargs={"buckets": self.HISTOGRAM_BUCKETS},
 
).get_metric()
 
 
self.acquiring_count_metric = AttributeMetric(
 
f"{self.name}_acquiring",
 
"How many threads are currently waiting for the {self.name} lock",
 
metric_labels,
 
).get_metric()
 
self.acquiring_count_metric.set(0)
 
 
@DurationMetric(
 
get_metric_labels=lambda self: self.metric_labels,
 
get_metric_name=lambda obj, _: f"{obj.name}_acquire",
 
buckets=HISTOGRAM_BUCKETS,
 
)
 
async def acquire(self):
 
with self.acquiring_count_metric.track_inprogress():
 
await super().acquire()
 
 
self.acquired_at = datetime.datetime.now()
 
 
def release(self):
 
released_at = datetime.datetime.now()
 
self.hold_duration_metric.observe(
 
(released_at - self.acquired_at).total_seconds()
 
)
 
self.acquired_at = datetime.datetime.min
 
 
super().release()
Loading