diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py index 44e4b2d0eab0c39e36bbef4b067701ac785d222f..4f0f1255c31393d1fb5838118c6a8eb20e585c93 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/ltacp.py @@ -55,12 +55,27 @@ def createNetCatCmd(listener, user=None, host=None): if user and host: cmd = ['ssh', '-n', '-x', '%s@%s' % (user, host)] + cmd p = Popen(cmd, stdout=PIPE, stderr=PIPE) - out, err = p.communicate() + out, err = wait_for_utf8_output(p) if 'invalid option' not in err: return nc_variant raise LtacpException('could not determine remote netcat version') +def wait_for_utf8_output(proc): + """Helper function around subprocess.communicate() which changed from python2 to python3. + This function waits for the subprocess to finish and returns the stdout and stderr as utf-8 strings, just like python2 did.""" + out, err = proc.communicate() + if isinstance(out, bytes): + out = out.decode('UTF-8') + if isinstance(err, bytes): + err = err.decode('UTF-8') + if out is None: + out = '' + if err is None: + err = '' + + return out, err + class LtaCp: def __init__(self, src_host, @@ -125,7 +140,7 @@ class LtaCp: self.started_procs[proc] = cmd_login_to_source_host # block until find is finished - out, err = proc.communicate() + out, err = wait_for_utf8_output(proc) del self.started_procs[proc] if proc.returncode==0: @@ -190,7 +205,7 @@ class LtaCp: self.started_procs[p_remote_filetype] = cmd_remote_filetype # block until find is finished - output_remote_filetype = p_remote_filetype.communicate() + output_remote_filetype = wait_for_utf8_output(p_remote_filetype) del self.started_procs[p_remote_filetype] if p_remote_filetype.returncode != 0: raise LtacpException('ltacp %s: determining source type failed: \nstdout: %s\nstderr: %s' % (self.logId, @@ -247,7 +262,7 @@ class LtaCp: self.started_procs[p_remote_du] = cmd_remote_du # block until du is finished - output_remote_du = p_remote_du.communicate() + output_remote_du = wait_for_utf8_output(p_remote_du) del self.started_procs[p_remote_du] if p_remote_du.returncode != 0: raise LtacpException('ltacp %s: remote du failed: \nstdout: %s\nstderr: %s' % (self.logId, @@ -312,7 +327,7 @@ class LtaCp: if len(finished_procs): msg = '' for p, cl in list(finished_procs.items()): - o, e = p.communicate() + o, e = wait_for_utf8_output(p) msg += " process pid:%d exited prematurely with exit code %d. cmdline: %s\nstdout: %s\nstderr: %s\n" % (p.pid, p.returncode, cl, @@ -339,7 +354,7 @@ class LtaCp: self.started_procs[p_remote_mkfifo] = cmd_remote_mkfifo # block until fifo is created - output_remote_mkfifo = p_remote_mkfifo.communicate() + output_remote_mkfifo = wait_for_utf8_output(p_remote_mkfifo) del self.started_procs[p_remote_mkfifo] if p_remote_mkfifo.returncode != 0: raise LtacpException('ltacp %s: remote fifo creation failed: \nstdout: %s\nstderr: %s' % (self.logId, output_remote_mkfifo[0],output_remote_mkfifo[1])) @@ -465,7 +480,7 @@ class LtaCp: raise LtacpException('ltacp %s: %s did not finish within %s.' % (self.logId, proc_log_name, timeout)) waitForSubprocess(p_data_out, timedelta(seconds=self.globus_timeout), 'globus-url-copy', logging.INFO) - output_data_out = p_data_out.communicate() + output_data_out = wait_for_utf8_output(p_data_out) if p_data_out.returncode != 0: if 'file exist' in output_data_out[1].lower(): raise LtacpDestinationExistsException('ltacp %s: data transfer via globus-url-copy to LTA failed, file already exists at %s.' % (self.logId, dst_turl)) @@ -473,19 +488,19 @@ class LtaCp: logger.info('ltacp %s: data transfer via globus-url-copy to LTA complete.' % self.logId) waitForSubprocess(p_remote_data, timedelta(seconds=60), 'remote data transfer') - output_remote_data = p_remote_data.communicate() + output_remote_data = wait_for_utf8_output(p_remote_data) if p_remote_data.returncode != 0: raise LtacpException('ltacp %s: Error in remote data transfer: %s' % (self.logId, output_remote_data[1])) logger.debug('ltacp %s: remote data transfer finished...' % self.logId) waitForSubprocess(p_remote_checksum, timedelta(seconds=60), 'remote md5 checksum computation') - output_remote_checksum = p_remote_checksum.communicate() + output_remote_checksum = wait_for_utf8_output(p_remote_checksum) if p_remote_checksum.returncode != 0: raise LtacpException('ltacp %s: Error in remote md5 checksum computation: %s' % (self.logId, output_remote_checksum[1])) logger.debug('ltacp %s: remote md5 checksum computation finished.' % self.logId) waitForSubprocess(p_md5a32bc, timedelta(seconds=60), 'local computation of md5 adler32 and byte_count') - output_md5a32bc_local = p_md5a32bc.communicate() + output_md5a32bc_local = wait_for_utf8_output(p_md5a32bc) if p_md5a32bc.returncode != 0: raise LtacpException('ltacp %s: Error while computing md5 adler32 and byte_count: %s' % (self.logId, output_md5a32bc_local[1])) logger.debug('ltacp %s: computed local md5 adler32 and byte_count.' % self.logId) @@ -584,7 +599,7 @@ class LtaCp: time.sleep(0.5) if p_listen.poll() is not None: # nc returned prematurely, pick another port to listen to - o, e = p_listen.communicate() + o, e = wait_for_utf8_output(p_listen) logger.info('ltacp %s: nc returned prematurely: %s' % (self.logId, e.strip())) port = str(random.randint(49152, 65535)) else: diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py index 51f8056faea529be29d1b3d602ba66b70fb174cb..94b1b53f0fc12a8000cbf6a31acd38683d090b81 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/lib/momclient.py @@ -16,7 +16,7 @@ try: import mechanize except ImportError as e: print(e) - print("please install python 'mechanize' package: sudo pip install mechanize") + print("please install python 'mechanize' package: sudo pip3 install mechanize") print() exit(1) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/ltastubs.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/ltastubs.py index ea9f8b7c5829ab0a9eb0da5767af51050be3cb60..a4e7c6fdf7e1660a67083876798d087159e96bfd 100644 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/ltastubs.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/ltastubs.py @@ -51,7 +51,7 @@ def stub(): #determine filesize and a32 checksum from localy stored 'globus' file with open(_local_globus_file_path) as file: p = subprocess.Popen(['md5a32bc', '/dev/null'], stdin=file, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - o, e = p.communicate() + o, e = tuple(x.decode('ascii') for x in p.communicate()) items = e.strip().split() a32cs = items[1].strip().zfill(8) diff --git a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py index 727241bfc406cb735e55e753487290eb6c091411..bf6a702c12338803af8ee27e6a8b0e4ddf878986 100755 --- a/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py +++ b/LTA/LTAIngest/LTAIngestServer/LTAIngestTransferServer/test/t_ingestpipeline.py @@ -3,316 +3,288 @@ import logging import unittest import uuid -import os, os.path -import subprocess - -try: - from qpid.messaging.exceptions import * - from qpid.messaging import Connection - from qpidtoollibs import BrokerAgent -except ImportError: - print('Cannot run test without qpid tools') - print('Please source qpid profile') - exit(3) +import os.path +import shutil +from unittest.mock import patch -try: - from mock import MagicMock - from mock import patch -except ImportError: - print('Cannot run test without python MagicMock') - print('Please install MagicMock: pip install mock') - exit(3) +logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) +logger = logging.getLogger(__name__) from subprocess import call if call(['ssh', '-o', 'PasswordAuthentication=no', '-o', 'PubkeyAuthentication=yes', '-o', 'ConnectTimeout=1', 'localhost', 'true']) != 0: print('this test depends on keybased ssh login to localhost, which is not setup correctly. skipping test...') exit(3) - -connection = None -broker = None +from lofar.messaging.messagebus import TemporaryQueue testname = 't_ingestpipeline_%s' % uuid.uuid1() -#overwrite some defaults in the config to run this as an isolated test -import lofar.lta.ingest.common.config as config -config.DEFAULT_INGEST_NOTIFICATION_BUSNAME = '%s.%s' % (config.DEFAULT_INGEST_NOTIFICATION_BUSNAME, testname) - -try: - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) - logger = logging.getLogger(__name__) - - # setup broker connection - connection = Connection.establish('127.0.0.1') - broker = BrokerAgent(connection) - - # add test exchanges/queues - logger.info('adding test exchange to broker: %s', config.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - broker.addExchange('topic', config.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - - # patch (mock) the LTAClient class during these tests. - # when the ingestpipeline instantiates an LTAClient it will get the mocked class. - with patch('lofar.lta.ingest.server.ltaclient.LTAClient', autospec=True) as MockLTAClient: - ltamock = MockLTAClient.return_value - - # patch (mock) the MoMClient class during these tests. - # when the ingestpipeline instantiates an MoMClient it will get the mocked class. - with patch('lofar.lta.ingest.server.momclient.MoMClient', autospec=True) as MockMoMClient: - mommock = MockMoMClient.return_value - # modify the return values of the various MoMClient methods with pre-cooked answers - mommock.setStatus.return_value = True - - # patch (mock) the convert_surl_to_turl method during these tests. - with patch('lofar.lta.ingest.server.ltacp.convert_surl_to_turl') as mock_convert_surl_to_turl: - mock_convert_surl_to_turl.side_effect = lambda surl: surl.replace('srm', 'gsiftp') - - from lofar.lta.ingest.common.job import createJobXml, parseJobXml - from lofar.lta.ingest.server.ltaclient import LTAClient # <-- thanks to magick mock, we get the mocked ltaclient - from lofar.lta.ingest.server.momclient import MoMClient # <-- thanks to magick mock, we get the mocked momclient - from lofar.lta.ingest.server.ingestpipeline import * - import ltastubs - - logger = logging.getLogger() - - class TestIngestPipeline(unittest.TestCase): - def setUp(self): - ltastubs.stub() - self.ltaclient = LTAClient() - self.momclient = MoMClient() - - def tearDown(self): - ltastubs.un_stub() - - def test_single_file(self): - try: - project_name = 'test-project' - obs_id = 987654321 - dpname = 'L%s_SAP000_SB000_im.h5' % obs_id - test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1()) - - def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='MoM'): - return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s' % (project, obs_id, dpname), - 'result': 'ok', - 'error': '', - 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} - ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket - - def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): - #return unpecified sip with proper details - from lofar.lta.ingest.server.unspecifiedSIP import makeSIP - return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') - mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP - - os.makedirs(test_dir_path) - test_file_path = os.path.join(test_dir_path, dpname) + +# patch (mock) the LTAClient class during these tests. +# when the ingestpipeline instantiates an LTAClient it will get the mocked class. +with patch('lofar.lta.ingest.server.ltaclient.LTAClient', autospec=True) as MockLTAClient: + ltamock = MockLTAClient.return_value + + # patch (mock) the MoMClient class during these tests. + # when the ingestpipeline instantiates an MoMClient it will get the mocked class. + with patch('lofar.lta.ingest.server.momclient.MoMClient', autospec=True) as MockMoMClient: + mommock = MockMoMClient.return_value + # modify the return values of the various MoMClient methods with pre-cooked answers + mommock.setStatus.return_value = True + + # patch (mock) the convert_surl_to_turl method during these tests. + with patch('lofar.lta.ingest.common.srm.convert_surl_to_turl') as mock_convert_surl_to_turl: + mock_convert_surl_to_turl.side_effect = lambda surl: surl.replace('srm', 'gsiftp') + + from lofar.lta.ingest.common.job import createJobXml, parseJobXml + from lofar.lta.ingest.server.ltaclient import LTAClient # <-- thanks to magick mock, we get the mocked ltaclient + from lofar.lta.ingest.server.momclient import MoMClient # <-- thanks to magick mock, we get the mocked momclient + from lofar.lta.ingest.server.ingestpipeline import * + import ltastubs + + class TestIngestPipeline(unittest.TestCase): + def setUp(self): + self.test_dir_path = None + + self.tmp_queue = TemporaryQueue(testname) + self.tmp_queue.open() + + ltastubs.stub() + self.ltaclient = LTAClient() + self.momclient = MoMClient() + + def tearDown(self): + ltastubs.un_stub() + self.tmp_queue.close() + + if self.test_dir_path and os.path.exists(self.test_dir_path): + logger.info("removing test dir: %s", self.test_dir_path) + shutil.rmtree(self.test_dir_path, True) + + def test_single_file(self): + try: + project_name = 'test-project' + obs_id = 987654321 + dpname = 'L%s_SAP000_SB000_im.h5' % obs_id + self.test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1()) + + def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='MoM'): + return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s' % (project, obs_id, dpname), + 'result': 'ok', + 'error': '', + 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} + ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket + + def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): + #return unpecified sip with proper details + from lofar.lta.ingest.server.unspecifiedSIP import makeSIP + return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') + mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP + + os.makedirs(self.test_dir_path) + test_file_path = os.path.join(self.test_dir_path, dpname) + with open(test_file_path, 'w') as file: + file.write(4096*'a') + + job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % test_file_path) + logger.info('job xml: %s', job_xml) + job = parseJobXml(job_xml) + + pl = IngestPipeline(job, self.momclient, self.ltaclient, + notification_busname=self.tmp_queue.address) + pl.run() + + except Exception as e: + self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) + finally: + # the 'stub-transfered' file ended up in out local stub lta + # with the path: ltastubs._local_globus_file_path + #check extension + self.assertEqual(os.path.splitext(test_file_path)[-1], + os.path.splitext(ltastubs._local_globus_file_path)[-1]) + + #compare with original + with open(test_file_path) as input, open(ltastubs._local_globus_file_path) as output: + self.assertEqual(input.read(), output.read()) + + for f in os.listdir(self.test_dir_path): + os.remove(os.path.join(self.test_dir_path, f)) + os.removedirs(self.test_dir_path) + + def test_h5_plus_raw_file(self): + #beam formed h5 files are always accompanied by a raw file + #these should be tarred togheter + try: + project_name = 'test-project' + obs_id = 987654321 + dpname = 'L%s_SAP000_SB000_bf.h5' % obs_id + rawname = dpname.replace('.h5', '.raw') + self.test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1()) + + def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='MoM'): + return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s.tar' % (project, obs_id, dpname), + 'result': 'ok', + 'error': '', + 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} + ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket + + def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): + #return unpecified sip with proper details + from lofar.lta.ingest.server.unspecifiedSIP import makeSIP + return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') + mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP + + os.makedirs(self.test_dir_path) + test_file_path = os.path.join(self.test_dir_path, dpname) + with open(test_file_path, 'w') as file: + file.write(4096*'a') + raw_test_file_path = os.path.join(self.test_dir_path, dpname.replace('.h5', '.raw')) + with open(raw_test_file_path, 'w') as file: + file.write(4096*'b') + + job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % test_file_path) + logger.info('job xml: %s', job_xml) + job = parseJobXml(job_xml) + + pl = IngestPipeline(job, self.momclient, self.ltaclient, + notification_busname=self.tmp_queue.address) + pl.run() + + except Exception as e: + self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) + finally: + # the 'stub-transfered' file ended up in out local stub lta + # with the path: ltastubs._local_globus_file_path + #check extension + self.assertEqual('.tar', os.path.splitext(ltastubs._local_globus_file_path)[-1]) + + #check tar contents + tar = subprocess.Popen(['tar', '--list', '-f', ltastubs._local_globus_file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + tar_file_list, err = tuple(x.decode('ascii') for x in tar.communicate()) + self.assertEqual(tar.returncode, 0) + logger.info('file list in tar:\n%s', tar_file_list) + + self.assertTrue(os.path.basename(test_file_path) in tar_file_list) + self.assertTrue(os.path.basename(raw_test_file_path) in tar_file_list) + logger.info('all expected source files are in tar!') + + os.remove(test_file_path) + os.remove(raw_test_file_path) + os.removedirs(self.test_dir_path) + + + def test_directory(self): + try: + project_name = 'test-project' + obs_id = 987654321 + dpname = 'L%s_SAP000_SB000_uv.MS' % obs_id + self.test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1(), dpname) + + def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='MoM'): + return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s.tar' % (project, obs_id, dpname), + 'result': 'ok', + 'error': '', + 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} + ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket + + def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): + #return unpecified sip with proper details + from lofar.lta.ingest.server.unspecifiedSIP import makeSIP + return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') + mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP + + os.makedirs(self.test_dir_path) + test_file_paths = [] + for i in range(10): + test_file_path = os.path.join(self.test_dir_path, 'testfile_%s.txt' % i) + test_file_paths.append(test_file_path) with open(test_file_path, 'w') as file: - file.write(4096*'a') - - job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % test_file_path) - logger.info('job xml: %s', job_xml) - job = parseJobXml(job_xml) - - pl = IngestPipeline(job, self.momclient, self.ltaclient) - pl.run() - - except Exception as e: - self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) - finally: - # the 'stub-transfered' file ended up in out local stub lta - # with the path: ltastubs._local_globus_file_path - #check extension - self.assertEqual(os.path.splitext(test_file_path)[-1], - os.path.splitext(ltastubs._local_globus_file_path)[-1]) - - #compare with original - with open(test_file_path) as input, open(ltastubs._local_globus_file_path) as output: - self.assertEqual(input.read(), output.read()) - - for f in os.listdir(test_dir_path): - os.remove(os.path.join(test_dir_path, f)) - os.removedirs(test_dir_path) - - def test_h5_plus_raw_file(self): - #beam formed h5 files are always accompanied by a raw file - #these should be tarred togheter - try: - project_name = 'test-project' - obs_id = 987654321 - dpname = 'L%s_SAP000_SB000_bf.h5' % obs_id - rawname = dpname.replace('.h5', '.raw') - test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1()) - - def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='MoM'): - return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s.tar' % (project, obs_id, dpname), - 'result': 'ok', - 'error': '', - 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} - ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket - - def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): - #return unpecified sip with proper details - from lofar.lta.ingest.server.unspecifiedSIP import makeSIP - return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') - mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP - - os.makedirs(test_dir_path) - test_file_path = os.path.join(test_dir_path, dpname) + file.write(1000*'a') + + job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % self.test_dir_path) + logger.info('job xml: %s', job_xml) + job = parseJobXml(job_xml) + + pl = IngestPipeline(job, self.momclient, self.ltaclient, + notification_busname=self.tmp_queue.address) + pl.run() + except Exception as e: + self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) + finally: + # the 'stub-transfered' file ended up in out local stub lta + # with the path: ltastubs._local_globus_file_path + #check extension + self.assertTrue('.tar' == os.path.splitext(ltastubs._local_globus_file_path)[-1]) + + #check tar contents + tar = subprocess.Popen(['tar', '--list', '-f', ltastubs._local_globus_file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + tar_file_list, err = tuple(x.decode('ascii') for x in tar.communicate()) + self.assertEqual(tar.returncode, 0) + logger.info('file list in tar:\n%s', tar_file_list) + + for test_file_path in test_file_paths: + self.assertTrue(os.path.basename(test_file_path) in tar_file_list) + logger.info('all expected source files are in tar!') + + for f in os.listdir(self.test_dir_path): + os.remove(os.path.join(self.test_dir_path, f)) + os.removedirs(self.test_dir_path) + + def test_directory_with_odd_dataproduct_name(self): + #sometimes somebody has data in a odd directory + #and gives the dataproduct a different name than it's directory + try: + project_name = 'test-project' + obs_id = 987654321 + dpname = 'my_funky_dp_name' + self.test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1(), 'my_data_dir') + + def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): + #return unpecified sip with proper details + from lofar.lta.ingest.server.unspecifiedSIP import makeSIP + return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') + mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP + + os.makedirs(self.test_dir_path) + test_file_paths = [] + for i in range(10): + test_file_path = os.path.join(self.test_dir_path, 'testfile_%s.txt' % i) + test_file_paths.append(test_file_path) with open(test_file_path, 'w') as file: - file.write(4096*'a') - raw_test_file_path = os.path.join(test_dir_path, dpname.replace('.h5', '.raw')) - with open(raw_test_file_path, 'w') as file: - file.write(4096*'b') - - job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % test_file_path) - logger.info('job xml: %s', job_xml) - job = parseJobXml(job_xml) - - pl = IngestPipeline(job, self.momclient, self.ltaclient) - pl.run() - - except Exception as e: - self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) - finally: - # the 'stub-transfered' file ended up in out local stub lta - # with the path: ltastubs._local_globus_file_path - #check extension - self.assertEqual('.tar', os.path.splitext(ltastubs._local_globus_file_path)[-1]) - - #check tar contents - tar = subprocess.Popen(['tar', '--list', '-f', ltastubs._local_globus_file_path], stdout=subprocess.PIPE) - tar_file_list, err = tar.communicate() - self.assertEqual(tar.returncode, 0) - logger.info('file list in tar:\n%s', tar_file_list) - + file.write(1000*'a') + + job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % self.test_dir_path) + logger.info('job xml: %s', job_xml) + job = parseJobXml(job_xml) + + pl = IngestPipeline(job, self.momclient, self.ltaclient, + notification_busname=self.tmp_queue.address) + pl.run() + except Exception as e: + self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) + finally: + # the 'stub-transfered' file ended up in out local stub lta + # with the path: ltastubs._local_globus_file_path + #check extension + self.assertTrue('.tar' == os.path.splitext(ltastubs._local_globus_file_path)[-1]) + + #check tar contents + tar = subprocess.Popen(['tar', '--list', '-f', ltastubs._local_globus_file_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + tar_file_list, err = tuple(x.decode('ascii') for x in tar.communicate()) + self.assertEqual(tar.returncode, 0) + logger.info('file list in tar:\n%s', tar_file_list) + + for test_file_path in test_file_paths: self.assertTrue(os.path.basename(test_file_path) in tar_file_list) - self.assertTrue(os.path.basename(raw_test_file_path) in tar_file_list) - logger.info('all expected source files are in tar!') - - os.remove(test_file_path) - os.remove(raw_test_file_path) - os.removedirs(test_dir_path) - - - def test_directory(self): - try: - project_name = 'test-project' - obs_id = 987654321 - dpname = 'L%s_SAP000_SB000_uv.MS' % obs_id - test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1(), dpname) - - def stub_GetStorageTicket(project, filename, filesize, archive_id, job_id, obs_id, check_mom_id=True, id_source='MoM'): - return { 'primary_uri_rnd': 'srm://some.site.name:8443/some/path/data/lofar/ops/projects/%s/%s/%s.tar' % (project, obs_id, dpname), - 'result': 'ok', - 'error': '', - 'ticket': '3E0A47ED860D6339E053B316A9C3BEE2'} - ltamock.GetStorageTicket.side_effect = stub_GetStorageTicket - - def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): - #return unpecified sip with proper details - from lofar.lta.ingest.server.unspecifiedSIP import makeSIP - return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') - mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP - - os.makedirs(test_dir_path) - test_file_paths = [] - for i in range(10): - test_file_path = os.path.join(test_dir_path, 'testfile_%s.txt' % i) - test_file_paths.append(test_file_path) - with open(test_file_path, 'w') as file: - file.write(1000*'a') - - job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % test_dir_path) - logger.info('job xml: %s', job_xml) - job = parseJobXml(job_xml) - - pl = IngestPipeline(job, self.momclient, self.ltaclient) - pl.run() - except Exception as e: - self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) - finally: - # the 'stub-transfered' file ended up in out local stub lta - # with the path: ltastubs._local_globus_file_path - #check extension - self.assertTrue('.tar' == os.path.splitext(ltastubs._local_globus_file_path)[-1]) - - #check tar contents - tar = subprocess.Popen(['tar', '--list', '-f', ltastubs._local_globus_file_path], stdout=subprocess.PIPE) - tar_file_list, err = tar.communicate() - self.assertEqual(tar.returncode, 0) - logger.info('file list in tar:\n%s', tar_file_list) - - for test_file_path in test_file_paths: - self.assertTrue(os.path.basename(test_file_path) in tar_file_list) - logger.info('all expected source files are in tar!') - - for f in os.listdir(test_dir_path): - os.remove(os.path.join(test_dir_path, f)) - os.removedirs(test_dir_path) - - def test_directory_with_odd_dataproduct_name(self): - #sometimes somebody has data in a odd directory - #and gives the dataproduct a different name than it's directory - try: - project_name = 'test-project' - obs_id = 987654321 - dpname = 'my_funky_dp_name' - test_dir_path = os.path.join(os.getcwd(), 'testdir_%s' % uuid.uuid1(), 'my_data_dir') - - def stub_uploadDataAndGetSIP(archive_id, storage_ticket, filename, uri, filesize, md5_checksum, adler32_checksum, validate=True): - #return unpecified sip with proper details - from lofar.lta.ingest.server.unspecifiedSIP import makeSIP - return makeSIP(project_name, obs_id, archive_id, storage_ticket, filename, filesize, md5_checksum, adler32_checksum, 'TEST') - mommock.uploadDataAndGetSIP.side_effect = stub_uploadDataAndGetSIP - - os.makedirs(test_dir_path) - test_file_paths = [] - for i in range(10): - test_file_path = os.path.join(test_dir_path, 'testfile_%s.txt' % i) - test_file_paths.append(test_file_path) - with open(test_file_path, 'w') as file: - file.write(1000*'a') - - job_xml = createJobXml(testname, 123456789, obs_id, dpname, 918273645, 'localhost:%s' % test_dir_path) - logger.info('job xml: %s', job_xml) - job = parseJobXml(job_xml) - - pl = IngestPipeline(job, self.momclient, self.ltaclient) - pl.run() - except Exception as e: - self.assertTrue(False, 'Unexpected exception in pipeline: %s' % e) - finally: - # the 'stub-transfered' file ended up in out local stub lta - # with the path: ltastubs._local_globus_file_path - #check extension - self.assertTrue('.tar' == os.path.splitext(ltastubs._local_globus_file_path)[-1]) - - #check tar contents - tar = subprocess.Popen(['tar', '--list', '-f', ltastubs._local_globus_file_path], stdout=subprocess.PIPE) - tar_file_list, err = tar.communicate() - self.assertEqual(tar.returncode, 0) - logger.info('file list in tar:\n%s', tar_file_list) - - for test_file_path in test_file_paths: - self.assertTrue(os.path.basename(test_file_path) in tar_file_list) - logger.info('all expected source files are in tar!') - - for f in os.listdir(test_dir_path): - os.remove(os.path.join(test_dir_path, f)) - os.removedirs(test_dir_path) - - - if __name__ == '__main__': - logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', - level=logging.DEBUG) - unittest.main() - -except ConnectError as ce: - logger.error(ce) - exit(3) -finally: - # cleanup test bus and exit - if broker: - logger.info('removing test exchange from broker: %s', config.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - broker.delExchange(config.DEFAULT_INGEST_NOTIFICATION_BUSNAME) - if connection: - connection.close() + logger.info('all expected source files are in tar!') + + for f in os.listdir(self.test_dir_path): + os.remove(os.path.join(self.test_dir_path, f)) + os.removedirs(self.test_dir_path) + + + if __name__ == '__main__': + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', + level=logging.DEBUG) + unittest.main() +