diff --git a/lofar_stingray/aggregate.py b/lofar_stingray/aggregate.py index dcf6dbf1d8cd8aad9d121be126f726e3b0b5a861..cb420b52d7651c384d68ecd8b2bb60384c0b1d9e 100644 --- a/lofar_stingray/aggregate.py +++ b/lofar_stingray/aggregate.py @@ -95,12 +95,17 @@ def main() -> int: minio_client, ) - # wait for data to arrive on S3 + # wait for metadata to arrive on S3. files can have data 10 minutes old, + # so we need to wait for files 10 minutes younger. logger.info("Waiting for metadata to arrive on S3 for %s", args.begin) - if not metadata_storage.wait_for_packets(args.begin): + if not metadata_storage.wait_for_filenames_after( + args.begin + timedelta(minutes=10) + ): logger.error("Metadata not available on S3") + + # wait for data to arrive on S3. logger.info("Waiting for statistics to arrive on S3 for %s", args.end) - if not statistics_storage.wait_for_packets(args.end): + if not statistics_storage.wait_for_filenames_after(args.end + timedelta(seconds=5)): logger.error("Statistics not available on S3") return 1 # this is fatal diff --git a/lofar_stingray/reader/_s3_packet_loader.py b/lofar_stingray/reader/_s3_packet_loader.py index 41c18b1f1806d0b2cba76b30af5ab5d5fe3b83f0..dfaac02206d621e95331178832242c2fe395ba92 100644 --- a/lofar_stingray/reader/_s3_packet_loader.py +++ b/lofar_stingray/reader/_s3_packet_loader.py @@ -60,15 +60,15 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods f"{timestamp.day:02d}/{timestamp.isoformat()}.json", ) - def wait_for_packets(self, timestamp: datetime) -> bool: - """Wait until packets for the given timestamp are on disk. + def wait_for_filenames_after(self, timestamp: datetime) -> bool: + """Wait until filenames with the given timestamp are on disk. - Returns whether such packets were found.""" + Returns whether such files were found.""" # if the packets are there, always return True if next(self._list_objects_after(timestamp), False): logger.info( - "wait_for_packets: Requested timestamp are available at startup" + "wait_for_filenames_after: Requested timestamp are available at startup" ) return True @@ -81,7 +81,7 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods max_wait_time := max_arrival_time - datetime.now(tz=timestamp.tzinfo) ) > timedelta(0): logger.info( - "wait_for_packets: Requested timestamp are not available," + "wait_for_filenames_after: Requested timestamp are not available, " "will wait at most %s for them", max_wait_time, ) @@ -91,13 +91,14 @@ class S3PacketLoader: # pylint: disable=too-few-public-methods if next(self._list_objects_after(timestamp), False): logger.info( - "wait_for_packets: Requested timestamp appeared after waiting %s", + "wait_for_filenames_after: Requested timestamp appeared " + "after waiting %s", datetime.now(tz=timestamp.tzinfo) - start_time, ) return True logger.info( - "wait_for_packets: Requested timestamp are NOT available, " + "wait_for_filenames_after: Requested timestamp are NOT available, " "even after waiting for %s", datetime.now(tz=timestamp.tzinfo) - start_time, ) diff --git a/tests/reader/test_s3_packet_loader.py b/tests/reader/test_s3_packet_loader.py index 48946a395d9e132dd533f92ce51b869961628980..3e856dcb26236679ea833991a514e20d1c0246de 100644 --- a/tests/reader/test_s3_packet_loader.py +++ b/tests/reader/test_s3_packet_loader.py @@ -43,8 +43,8 @@ class TestS3PacketLoader(TestCase): seconds=5 ) - def test_wait_for_packets_already_there(self): - """Test wait_for_packets for data from the distant past.""" + def test_wait_for_filenames_after_already_there(self): + """Test wait_for_filenames_after for data from the distant past.""" timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00") self.mock_minio.list_objects.return_value = iter( @@ -57,7 +57,7 @@ class TestS3PacketLoader(TestCase): packet_loader = S3PacketLoader("bucket", "prefix/", self.mock_minio) - result = packet_loader.wait_for_packets(timestamp) + result = packet_loader.wait_for_filenames_after(timestamp) self.assertTrue(result) # objects were returned self.mock_minio.list_objects.assert_called_with( @@ -67,8 +67,8 @@ class TestS3PacketLoader(TestCase): start_after="prefix/2024/06/07/2024-06-07T13:21:32+00:00.json", ) - def test_wait_for_packets_never_arrive(self): - """Test wait_for_packets for future packets that do not arrive.""" + def test_wait_for_filenames_after_never_arrive(self): + """Test wait_for_filenames_after for future packets that do not arrive.""" timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00") packet_loader = S3PacketLoader( @@ -82,11 +82,12 @@ class TestS3PacketLoader(TestCase): ] self.mock_minio.list_objects.side_effect = [iter([]) for n in range(50)] - result = packet_loader.wait_for_packets(timestamp) + result = packet_loader.wait_for_filenames_after(timestamp) self.assertFalse(result) # no objects were returned - def test_wait_for_packets_arrive_during_wait(self): - """Test wait_for_packets for future packets that arrive while waiting.""" + def test_wait_for_filenames_after_arrive_during_wait(self): + """Test wait_for_filenames_after for future packets that + arrive while waiting.""" timestamp = datetime.fromisoformat("2024-06-07T13:21:32+00:00") packet_loader = S3PacketLoader( @@ -101,7 +102,7 @@ class TestS3PacketLoader(TestCase): self.mock_minio.list_objects.side_effect = [iter([]) for n in range(5)] + [ iter([Mock(object_name="file3")]), ] - result = packet_loader.wait_for_packets(timestamp) + result = packet_loader.wait_for_filenames_after(timestamp) self.assertTrue(result) # objects were returned def test_load_packets(self):