Skip to content
Snippets Groups Projects
Commit cbadb0e1 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'wait-for-data-on-s3' into 'main'

L2SS-1969: Wait for data to be available on S3

See merge request !12
parents 4dcf4797 94f3312c
No related branches found
No related tags found
1 merge request!12L2SS-1969: Wait for data to be available on S3
Pipeline #89430 passed
Pipeline: Stingray

#89431

    ......@@ -67,6 +67,7 @@ def main():
    minio_client = get_minio_client(args)
    logger.info("Using source %s", args.source)
    # connect to storage
    metadata_storage = S3PacketLoader(
    args.source.netloc,
    f"{args.station}/metadata",
    ......@@ -79,6 +80,13 @@ def main():
    minio_client,
    )
    # wait for data to arrive on S3
    logger.info("Waiting for metadata to arrive on S3 for %s", args.begin)
    _ = metadata_storage.wait_for_packets(args.begin)
    logger.info("Waiting for statistics to arrive on S3 for %s", args.end)
    _ = statistics_storage.wait_for_packets(args.end)
    # aggregate data
    aggregator_args = (
    args.station,
    args.antennafield,
    ......@@ -124,12 +132,12 @@ def main():
    logger.info("Aggregation completed.")
    if args.destination.scheme == "s3":
    tf.close()
    logger.info("Upload file.")
    logger.info("Uploading %s", args.destination)
    minio_client.fput_object(
    args.destination.netloc, args.destination.path, tf.name
    )
    os.unlink(tf.name)
    logger.info("File upload completed.")
    logger.info("Upload completed for %s", args.destination)
    logger.info("Shutting down.")
    ......
    ......@@ -7,10 +7,22 @@ of n minutes and storing them on a S3 backend
    """
    import json
    from datetime import datetime
    import logging
    from datetime import datetime, timedelta
    import time
    from minio import Minio
    logger = logging.getLogger()
    # Interval with which to check the Minio store for data
    MINIO_POLL_INTERVAL = timedelta(seconds=20)
    # Delay with which blocks appear on S3 after being generated
    # The blocks are synced at 5 minute intervals, and require
    # a few seconds to copy.
    MAX_BLOCK_SYNC_DELAY = timedelta(minutes=5, seconds=10)
    class S3PacketLoader: # pylint: disable=too-few-public-methods
    """
    ......@@ -18,20 +30,79 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
    of n minutes and storing them on a S3 backend
    """
    def __init__(self, bucket: str, prefix: str, client: Minio):
    def __init__(
    self,
    bucket: str,
    prefix: str,
    client: Minio,
    block_duration: timedelta = timedelta(minutes=5),
    ):
    self._minio_client = client
    self.bucket = bucket
    self.prefix = prefix.strip("/")
    self.block_duration = block_duration
    def load_packets(self, start: datetime, end: datetime, ts_field="timestamp"):
    """Loads packets from the S3 storage until no matching packets are found"""
    for obj in self._minio_client.list_objects(
    def _list_objects_after(self, timestamp: datetime) -> list:
    """Return a list of objects that (should) contain packets of and after
    the given timestamp."""
    # NB: The timestamp in the filename is the time when the file was
    # completed, so after the last timestamp recorded in the file.
    return self._minio_client.list_objects(
    self.bucket,
    recursive=True,
    prefix=f"{self.prefix}",
    start_after=f"{self.prefix}/{start.year}/{start.month:02d}/"
    f"{start.day:02d}/{start.isoformat()}.json",
    ):
    start_after=f"{self.prefix}/{timestamp.year}/{timestamp.month:02d}/"
    f"{timestamp.day:02d}/{timestamp.isoformat()}.json",
    )
    def wait_for_packets(self, timestamp: datetime) -> bool:
    """Wait until packets for the given timestamp are on disk.
    Returns whether such packets were found."""
    # if the packets are there, always return True
    if self._list_objects_after(timestamp):
    logger.info(
    "wait_for_packets: Requested timestamp are available at startup"
    )
    return True
    # latest timestamp data can arrive
    max_arrival_time = timestamp + self.block_duration + MAX_BLOCK_SYNC_DELAY
    start_time = datetime.now(tz=timestamp.tzinfo)
    # wait for the end block to hit the disk, to make sure all data is there
    while (
    max_wait_time := max_arrival_time - datetime.now(tz=timestamp.tzinfo)
    ) > timedelta(0):
    logger.info(
    "wait_for_packets: Requested timestamp are not available,"
    "will wait at most %s for them",
    max_wait_time,
    )
    # max_delay = max(timedelta(0), max_arrival_time - datetime.now())
    sleep_interval = min(max_wait_time, MINIO_POLL_INTERVAL)
    time.sleep(sleep_interval.seconds)
    if self._list_objects_after(timestamp):
    logger.info(
    "wait_for_packets: Requested timestamp appeared after waiting %s",
    datetime.now(tz=timestamp.tzinfo) - start_time,
    )
    return True
    logger.info(
    "wait_for_packets: Requested timestamp are NOT available, "
    "even after waiting for %s",
    datetime.now(tz=timestamp.tzinfo) - start_time,
    )
    return False
    def load_packets(self, start: datetime, end: datetime, ts_field="timestamp"):
    """Loads packets from the S3 storage until no matching packets are found"""
    for obj in self._list_objects_after(start):
    packets = []
    response = None
    stop = False
    ......
    ......@@ -2,14 +2,17 @@
    # SPDX-License-Identifier: Apache-2.0
    """ S3PacketLoader tests """
    from datetime import datetime
    from datetime import datetime, timedelta
    from unittest import TestCase
    from unittest.mock import create_autospec, Mock, MagicMock
    from unittest.mock import create_autospec, Mock, MagicMock, patch
    from minio import Minio
    import lofar_stingray.reader._s3_packet_loader
    from lofar_stingray.reader import S3PacketLoader
    # pylint: disable=protected-access
    class TestS3PacketLoader(TestCase):
    """Test cases of the S3PacketLoader class"""
    ......@@ -32,6 +35,73 @@ class TestS3PacketLoader(TestCase):
    def setUp(self):
    self.mock_minio = create_autospec(Minio)
    # speed up polling
    lofar_stingray.reader._s3_packet_loader.MINIO_POLL_INTERVAL = timedelta(
    microseconds=1000
    )
    lofar_stingray.reader._s3_packet_loader.MAX_BLOCK_SYNC_DELAY = timedelta(
    seconds=5
    )
    def test_wait_for_packets_already_there(self):
    """Test wait_for_packets for data from the distant past."""
    timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    self.mock_minio.list_objects.return_value = [
    Mock(object_name="file1"),
    Mock(object_name="file2"),
    Mock(object_name="file3"),
    ]
    packet_loader = S3PacketLoader("bucket", "prefix/", self.mock_minio)
    result = packet_loader.wait_for_packets(timestamp)
    self.assertTrue(result) # objects were returned
    self.mock_minio.list_objects.assert_called_with(
    "bucket",
    recursive=True,
    prefix="prefix",
    start_after="prefix/2024/06/07/2024-06-07T13:21:32+00:00.json",
    )
    def test_wait_for_packets_never_arrive(self):
    """Test wait_for_packets for future packets that do not arrive."""
    timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    packet_loader = S3PacketLoader(
    "bucket", "prefix/", self.mock_minio, block_duration=timedelta(seconds=10)
    )
    with patch("lofar_stingray.reader._s3_packet_loader.datetime") as mock_datetime:
    mock_datetime.now.side_effect = [
    datetime.fromisoformat(f"2024-06-07T13:21:{seconds}+00:00")
    for seconds in range(32, 50)
    ]
    self.mock_minio.list_objects.side_effect = [[]] * 50
    result = packet_loader.wait_for_packets(timestamp)
    self.assertFalse(result) # no objects were returned
    def test_wait_for_packets_arrive_during_wait(self):
    """Test wait_for_packets for future packets that arrive while waiting."""
    timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    packet_loader = S3PacketLoader(
    "bucket", "prefix/", self.mock_minio, block_duration=timedelta(seconds=10)
    )
    with patch("lofar_stingray.reader._s3_packet_loader.datetime") as mock_datetime:
    mock_datetime.now.side_effect = [
    datetime.fromisoformat(f"2024-06-07T13:21:{seconds}+00:00")
    for seconds in range(32, 50)
    ]
    self.mock_minio.list_objects.side_effect = [[]] * 5 + [
    [Mock(object_name="file3")],
    ]
    result = packet_loader.wait_for_packets(timestamp)
    self.assertTrue(result) # objects were returned
    def test_load_packets(self):
    """Test if a numpy array is correctly serialized"""
    start = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment