Skip to content
Snippets Groups Projects
Commit 41383d82 authored by Jan David Mol's avatar Jan David Mol
Browse files

Wait for longer for data to appear

parent 48cb8e20
No related branches found
No related tags found
1 merge request!13Wait for longer for data to appear
...@@ -95,12 +95,17 @@ def main() -> int: ...@@ -95,12 +95,17 @@ def main() -> int:
minio_client, minio_client,
) )
# wait for data to arrive on S3 # wait for metadata to arrive on S3. files can have data 10 minutes old,
# so we need to wait for files 10 minutes younger.
logger.info("Waiting for metadata to arrive on S3 for %s", args.begin) logger.info("Waiting for metadata to arrive on S3 for %s", args.begin)
if not metadata_storage.wait_for_packets(args.begin): if not metadata_storage.wait_for_filenames_after(
args.begin + timedelta(minutes=10)
):
logger.error("Metadata not available on S3") logger.error("Metadata not available on S3")
# wait for data to arrive on S3.
logger.info("Waiting for statistics to arrive on S3 for %s", args.end) logger.info("Waiting for statistics to arrive on S3 for %s", args.end)
if not statistics_storage.wait_for_packets(args.end): if not statistics_storage.wait_for_filenames_after(args.end + timedelta(seconds=5)):
logger.error("Statistics not available on S3") logger.error("Statistics not available on S3")
return 1 # this is fatal return 1 # this is fatal
......
...@@ -60,15 +60,15 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods ...@@ -60,15 +60,15 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
f"{timestamp.day:02d}/{timestamp.isoformat()}.json", f"{timestamp.day:02d}/{timestamp.isoformat()}.json",
) )
def wait_for_packets(self, timestamp: datetime) -> bool: def wait_for_filenames_after(self, timestamp: datetime) -> bool:
"""Wait until packets for the given timestamp are on disk. """Wait until filenames with the given timestamp are on disk.
Returns whether such packets were found.""" Returns whether such files were found."""
# if the packets are there, always return True # if the packets are there, always return True
if next(self._list_objects_after(timestamp), False): if next(self._list_objects_after(timestamp), False):
logger.info( logger.info(
"wait_for_packets: Requested timestamp are available at startup" "wait_for_filenames_after: Requested timestamp are available at startup"
) )
return True return True
...@@ -81,7 +81,7 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods ...@@ -81,7 +81,7 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
max_wait_time := max_arrival_time - datetime.now(tz=timestamp.tzinfo) max_wait_time := max_arrival_time - datetime.now(tz=timestamp.tzinfo)
) > timedelta(0): ) > timedelta(0):
logger.info( logger.info(
"wait_for_packets: Requested timestamp are not available," "wait_for_filenames_after: Requested timestamp are not available, "
"will wait at most %s for them", "will wait at most %s for them",
max_wait_time, max_wait_time,
) )
...@@ -91,13 +91,14 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods ...@@ -91,13 +91,14 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods
if next(self._list_objects_after(timestamp), False): if next(self._list_objects_after(timestamp), False):
logger.info( logger.info(
"wait_for_packets: Requested timestamp appeared after waiting %s", "wait_for_filenames_after: Requested timestamp appeared "
"after waiting %s",
datetime.now(tz=timestamp.tzinfo) - start_time, datetime.now(tz=timestamp.tzinfo) - start_time,
) )
return True return True
logger.info( logger.info(
"wait_for_packets: Requested timestamp are NOT available, " "wait_for_filenames_after: Requested timestamp are NOT available, "
"even after waiting for %s", "even after waiting for %s",
datetime.now(tz=timestamp.tzinfo) - start_time, datetime.now(tz=timestamp.tzinfo) - start_time,
) )
......
...@@ -43,8 +43,8 @@ class TestS3PacketLoader(TestCase): ...@@ -43,8 +43,8 @@ class TestS3PacketLoader(TestCase):
seconds=5 seconds=5
) )
def test_wait_for_packets_already_there(self): def test_wait_for_filenames_after_already_there(self):
"""Test wait_for_packets for data from the distant past.""" """Test wait_for_filenames_after for data from the distant past."""
timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00") timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
self.mock_minio.list_objects.return_value = iter( self.mock_minio.list_objects.return_value = iter(
...@@ -57,7 +57,7 @@ class TestS3PacketLoader(TestCase): ...@@ -57,7 +57,7 @@ class TestS3PacketLoader(TestCase):
packet_loader = S3PacketLoader("bucket", "prefix/", self.mock_minio) packet_loader = S3PacketLoader("bucket", "prefix/", self.mock_minio)
result = packet_loader.wait_for_packets(timestamp) result = packet_loader.wait_for_filenames_after(timestamp)
self.assertTrue(result) # objects were returned self.assertTrue(result) # objects were returned
self.mock_minio.list_objects.assert_called_with( self.mock_minio.list_objects.assert_called_with(
...@@ -67,8 +67,8 @@ class TestS3PacketLoader(TestCase): ...@@ -67,8 +67,8 @@ class TestS3PacketLoader(TestCase):
start_after="prefix/2024/06/07/2024-06-07T13:21:32+00:00.json", start_after="prefix/2024/06/07/2024-06-07T13:21:32+00:00.json",
) )
def test_wait_for_packets_never_arrive(self): def test_wait_for_filenames_after_never_arrive(self):
"""Test wait_for_packets for future packets that do not arrive.""" """Test wait_for_filenames_after for future packets that do not arrive."""
timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00") timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
packet_loader = S3PacketLoader( packet_loader = S3PacketLoader(
...@@ -82,11 +82,12 @@ class TestS3PacketLoader(TestCase): ...@@ -82,11 +82,12 @@ class TestS3PacketLoader(TestCase):
] ]
self.mock_minio.list_objects.side_effect = [iter([]) for n in range(50)] self.mock_minio.list_objects.side_effect = [iter([]) for n in range(50)]
result = packet_loader.wait_for_packets(timestamp) result = packet_loader.wait_for_filenames_after(timestamp)
self.assertFalse(result) # no objects were returned self.assertFalse(result) # no objects were returned
def test_wait_for_packets_arrive_during_wait(self): def test_wait_for_filenames_after_arrive_during_wait(self):
"""Test wait_for_packets for future packets that arrive while waiting.""" """Test wait_for_filenames_after for future packets that
arrive while waiting."""
timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00") timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00")
packet_loader = S3PacketLoader( packet_loader = S3PacketLoader(
...@@ -101,7 +102,7 @@ class TestS3PacketLoader(TestCase): ...@@ -101,7 +102,7 @@ class TestS3PacketLoader(TestCase):
self.mock_minio.list_objects.side_effect = [iter([]) for n in range(5)] + [ self.mock_minio.list_objects.side_effect = [iter([]) for n in range(5)] + [
iter([Mock(object_name="file3")]), iter([Mock(object_name="file3")]),
] ]
result = packet_loader.wait_for_packets(timestamp) result = packet_loader.wait_for_filenames_after(timestamp)
self.assertTrue(result) # objects were returned self.assertTrue(result) # objects were returned
def test_load_packets(self): def test_load_packets(self):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment