Skip to content
Snippets Groups Projects
Commit 5549e923 authored by Jan David Mol's avatar Jan David Mol
Browse files

Merge branch 'wait-for-old-files' into 'main'

Wait for longer for data to appear

See merge request !13
parents 48cb8e20 41383d82
No related branches found
Tags v0.0.20
1 merge request!13Wait for longer for data to appear
Pipeline #89943 passed
Pipeline: Stingray

#89944

    ......@@ -95,12 +95,17 @@ def main() -> int:
    minio_client,
    )
    # wait for data to arrive on S3
    # wait for metadata to arrive on S3. files can have data 10 minutes old,
    # so we need to wait for files 10 minutes younger.
    logger.info("Waiting for metadata to arrive on S3 for %s", args.begin)
    if not metadata_storage.wait_for_packets(args.begin):
    if not metadata_storage.wait_for_filenames_after(
    args.begin + timedelta(minutes=10)
    ):
    logger.error("Metadata not available on S3")
    # wait for data to arrive on S3.
    logger.info("Waiting for statistics to arrive on S3 for %s", args.end)
    if not statistics_storage.wait_for_packets(args.end):
    if not statistics_storage.wait_for_filenames_after(args.end + timedelta(seconds=5)):
    logger.error("Statistics not available on S3")
    return 1 # this is fatal
    ......
    ......@@ -60,15 +60,15 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
    f"{timestamp.day:02d}/{timestamp.isoformat()}.json",
    )
    def wait_for_packets(self, timestamp: datetime) -> bool:
    """Wait until packets for the given timestamp are on disk.
    def wait_for_filenames_after(self, timestamp: datetime) -> bool:
    """Wait until filenames with the given timestamp are on disk.
    Returns whether such packets were found."""
    Returns whether such files were found."""
    # if the packets are there, always return True
    if next(self._list_objects_after(timestamp), False):
    logger.info(
    "wait_for_packets: Requested timestamp are available at startup"
    "wait_for_filenames_after: Requested timestamp are available at startup"
    )
    return True
    ......@@ -81,7 +81,7 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
    max_wait_time := max_arrival_time - datetime.now(tz=timestamp.tzinfo)
    ) > timedelta(0):
    logger.info(
    "wait_for_packets: Requested timestamp are not available,"
    "wait_for_filenames_after: Requested timestamp are not available, "
    "will wait at most %s for them",
    max_wait_time,
    )
    ......@@ -91,13 +91,14 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
    if next(self._list_objects_after(timestamp), False):
    logger.info(
    "wait_for_packets: Requested timestamp appeared after waiting %s",
    "wait_for_filenames_after: Requested timestamp appeared "
    "after waiting %s",
    datetime.now(tz=timestamp.tzinfo) - start_time,
    )
    return True
    logger.info(
    "wait_for_packets: Requested timestamp are NOT available, "
    "wait_for_filenames_after: Requested timestamp are NOT available, "
    "even after waiting for %s",
    datetime.now(tz=timestamp.tzinfo) - start_time,
    )
    ......
    ......@@ -43,8 +43,8 @@ class TestS3PacketLoader(TestCase):
    seconds=5
    )
    def test_wait_for_packets_already_there(self):
    """Test wait_for_packets for data from the distant past."""
    def test_wait_for_filenames_after_already_there(self):
    """Test wait_for_filenames_after for data from the distant past."""
    timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    self.mock_minio.list_objects.return_value = iter(
    ......@@ -57,7 +57,7 @@ class TestS3PacketLoader(TestCase):
    packet_loader = S3PacketLoader("bucket", "prefix/", self.mock_minio)
    result = packet_loader.wait_for_packets(timestamp)
    result = packet_loader.wait_for_filenames_after(timestamp)
    self.assertTrue(result) # objects were returned
    self.mock_minio.list_objects.assert_called_with(
    ......@@ -67,8 +67,8 @@ class TestS3PacketLoader(TestCase):
    start_after="prefix/2024/06/07/2024-06-07T13:21:32+00:00.json",
    )
    def test_wait_for_packets_never_arrive(self):
    """Test wait_for_packets for future packets that do not arrive."""
    def test_wait_for_filenames_after_never_arrive(self):
    """Test wait_for_filenames_after for future packets that do not arrive."""
    timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    packet_loader = S3PacketLoader(
    ......@@ -82,11 +82,12 @@ class TestS3PacketLoader(TestCase):
    ]
    self.mock_minio.list_objects.side_effect = [iter([]) for n in range(50)]
    result = packet_loader.wait_for_packets(timestamp)
    result = packet_loader.wait_for_filenames_after(timestamp)
    self.assertFalse(result) # no objects were returned
    def test_wait_for_packets_arrive_during_wait(self):
    """Test wait_for_packets for future packets that arrive while waiting."""
    def test_wait_for_filenames_after_arrive_during_wait(self):
    """Test wait_for_filenames_after for future packets that
    arrive while waiting."""
    timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
    packet_loader = S3PacketLoader(
    ......@@ -101,7 +102,7 @@ class TestS3PacketLoader(TestCase):
    self.mock_minio.list_objects.side_effect = [iter([]) for n in range(5)] + [
    iter([Mock(object_name="file3")]),
    ]
    result = packet_loader.wait_for_packets(timestamp)
    result = packet_loader.wait_for_filenames_after(timestamp)
    self.assertTrue(result) # objects were returned
    def test_load_packets(self):
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment