Skip to content
Snippets Groups Projects
Commit 24f5a5d8 authored by Ruud Beukema's avatar Ruud Beukema
Browse files

Task #10107: work in progress, just to test LOFAR account settings

parent 4ee0c3c3
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python #!/usr/bin/env python
print "TODO: fix this test"
exit(3)
import unittest import unittest
import uuid import uuid
import datetime import datetime
import logging import logging
from lofar.messaging import RPC, RPCException from lofar.messaging import RPC, RPCException
from lofar.sas.resourceassignment.resourceassignmentestimator.service import ResourceEstimatorHandler
from lofar.sas.resourceassignment.resourceassignmentestimator.service import createService from lofar.sas.resourceassignment.resourceassignmentestimator.service import createService
from lofar.sas.resourceassignment.resourceassignmentestimator.test.testset import TestSet from lofar.sas.resourceassignment.resourceassignmentestimator.test.testset import TestSet
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO) logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try:
from qpid.messaging import Connection
from qpidtoollibs import BrokerAgent
except ImportError:
print 'Cannot run test without qpid tools'
print 'Please source qpid profile'
exit(3)
try:
# setup broker connection
connection = Connection.establish('127.0.0.1')
broker = BrokerAgent(connection)
# add test service busname
busname = 'test-lofarbus-raestimator-%s' % (uuid.uuid1())
broker.addExchange('topic', busname)
class Test1(unittest.TestCase):
'''Test'''
def test(self):
'''basic test '''
self.maxDiff = None
ts = TestSet()
# test observation
ts.add_observation()
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
result = rpc(ts.test_dict() )
self.assertEqual(result[0], ts.valid_dict())
# test add beams
ts.add_observation_beams()
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
result = rpc( ts.test_dict() )
self.assertEqual(result[0], ts.valid_dict())
# test add flys_eye
ts.enabble_flys_eye()
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
result = rpc( ts.test_dict() )
self.assertEqual(result[0], ts.valid_dict())
# test add coherent_stokes class TestEstimations(unittest.TestCase):
ts.enable_observations_coherent_stokes() """
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc: Collection of tests for verifying if the uut meets the estimation requirements
result = rpc( ts.test_dict() ) """
self.assertEqual(result[0], ts.valid_dict()) def setUp(self):
self.uut = ResourceEstimatorHandler()
# test add incoherent_stokes self.specification_tree = {
ts.enable_observations_incoherent_stokes() 'otdb_id': 0,
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc: 'task_type': '',
result = rpc( ts.test_dict() ) 'task_subtype': '',
self.assertEqual(result[0], ts.valid_dict()) 'predecessors': []
'specification': {}
# test add calibration_pipeline }
ts.enable_calibration_pipeline()
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc: def test_get_subtree_estimate_for_observation_(self):
result = rpc( ts.test_dict() ) pass
self.assertEqual(result[0], ts.valid_dict())
def test_get_subtree_estimate_for_calibration_pipeline(self):
# test add longbaseline_pipeline pass
ts.enable_longbaseline_pipeline()
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc: def test_get_subtree_estimate_for_image_pipeline(self):
result = rpc( ts.test_dict() ) pass
self.assertEqual(result[0], ts.valid_dict())
def test_get_subtree_estimate_for_longbaseline_pipeline(self):
# test add pulsar_pipeline pass
ts.enable_pulsar_pipeline()
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc: def test_get_subtree_estimate_for_pulsar_pipeline(self):
result = rpc( ts.test_dict() ) pass
self.assertEqual(result[0], ts.valid_dict())
class TestServices(unittest.TestCase):
# test add image_pipeline """
ts.enable_image_pipeline() Collection of tests for verifying if the uut meets the service requirements (in other words: how it communicates to
with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc: the outside world).
result = rpc( ts.test_dict() ) """
self.assertEqual(result[0], ts.valid_dict()) def setUp(self):
pass
# create and run the service
with createService(busname=busname): def test_rpc_interface(self):
# and run all tests pass
# try:
# from qpid.messaging import Connection
# from qpidtoollibs import BrokerAgent
# except ImportError:
# print 'Cannot run test without qpid tools'
# print 'Please source qpid profile'
# exit(3)
#
# try:
# # setup broker connection
# connection = Connection.establish('127.0.0.1')
# broker = BrokerAgent(connection)
#
# # add test service busname
# busname = 'test-lofarbus-raestimator-%s' % (uuid.uuid1())
# broker.addExchange('topic', busname)
#
# class TestRAEstimator(unittest.TestCase):
# '''Test'''
#
# def test(self):
# '''basic test '''
# self.maxDiff = None
# ts = TestSet()
#
# # test observation
# ts.add_observation()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc(ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add beams
# ts.add_observation_beams()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add flys_eye
# ts.enable_flys_eye()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add coherent_stokes
# ts.enable_observations_coherent_stokes()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add incoherent_stokes
# ts.enable_observations_incoherent_stokes()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add calibration_pipeline
# ts.enable_calibration_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add longbaseline_pipeline
# ts.enable_longbaseline_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add pulsar_pipeline
# ts.enable_pulsar_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add image_pipeline
# ts.enable_image_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # create and run the service
# with createService(busname=busname):
# # and run all tests
# unittest.main()
#
# finally:
# # cleanup test bus and exit
# broker.delExchange(busname)
# connection.close()
if __name__ == '__main__':
unittest.main() unittest.main()
\ No newline at end of file
finally:
# cleanup test bus and exit
broker.delExchange(busname)
connection.close()
#TODO: fix test """
Test parameterset for resource estimator
#""" test parameterset for resource estimator """
#"""
from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators.parameterset import ParameterSet
#from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import ParameterSet
#class TestSet(object): class TestSet(object):
#def __init__(self): def __init__(self):
#self.check_set = ParameterSet() self.check_set = ParameterSet()
#self.valid_set = ParameterSet() self.valid_set = ParameterSet()
#def clear(self): def clear(self):
#self.check_set.clear() self.check_set.clear()
#self.valid_set.clear() self.valid_set.clear()
#def test_dict(self): def test_dict(self):
#return self.check_set.get_set() return self.check_set.get_set()
#def valid_dict(self): def valid_dict(self):
#return self.valid_set.get_set() return self.valid_set.get_set()
## for observation # for observation
#def add_observation(self): def add_observation(self):
#checkset = """ checkset = """
#observation.sample_clock= 200 observation.sample_clock= 200
#observation.duration=3600 observation.duration=3600
#observation.channels_per_subband= 61 observation.channels_per_subband= 61
#observation.intergration_time= 1 observation.intergration_time= 1
#observation.antenna_mode= HBA_DUAL observation.antenna_mode= HBA_DUAL
#observation.stations= [CS001, CS002, RS307, RS509] observation.stations= [CS001, CS002, RS307, RS509]
#observation.flys_eye.enabled= false observation.flys_eye.enabled= false
#observation.nr_beams= 0 observation.nr_beams= 0
## coherent_stokes.type can be: DATA_TYPE_XXYY or DATA_TYPE_STOKES_IQUV # coherent_stokes.type can be: DATA_TYPE_XXYY or DATA_TYPE_STOKES_IQUV
#observation.output.coherent_stokes.enabled= false observation.output.coherent_stokes.enabled= false
#observation.output.coherent_stokes.type= observation.output.coherent_stokes.type=
#observation.output.coherent_stokes.integration_factor= observation.output.coherent_stokes.integration_factor=
## incoherent_stokes.type can be: DATA_TYPE_STOKES_IQUV # incoherent_stokes.type can be: DATA_TYPE_STOKES_IQUV
#observation.output.incoherent_stokes.enabled= false observation.output.incoherent_stokes.enabled= false
#observation.output.incoherent_stokes.type= observation.output.incoherent_stokes.type=
## for calibration-pipeline # for calibration-pipeline
#dp.output.correlated.enabled= false dp.output.correlated.enabled= false
#dp.output.correlated.demixing_settings.freq_step= dp.output.correlated.demixing_settings.freq_step=
#dp.output.correlated.demixing_settings.time_step= dp.output.correlated.demixing_settings.time_step=
#dp.output.instrument_model.enabled= false dp.output.instrument_model.enabled= false
## for longbaseline-pipeline # for longbaseline-pipeline
#dp.output.longbaseline.enabled= false dp.output.longbaseline.enabled= false
#dp.output.longbaseline.subband_groups_per_ms= dp.output.longbaseline.subband_groups_per_ms=
#dp.output.longbaseline.subbands_per_subband_group= dp.output.longbaseline.subbands_per_subband_group=
## for pulsar-pipeline # for pulsar-pipeline
#dp.output.pulsar.enabled= false dp.output.pulsar.enabled= false
## for image-pipeline # for image-pipeline
#dp.output.skyimage.enabled= false dp.output.skyimage.enabled= false
#dp.output.skyimage.slices_per_image= dp.output.skyimage.slices_per_image=
#dp.output.skyimage.subbands_per_image= dp.output.skyimage.subbands_per_image=
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#observation.total_data_size= observation.total_data_size=
#observation.total_bandwidth= observation.total_bandwidth=
#observation.output_files.dp_correlated_uv.nr_files= observation.output_files.dp_correlated_uv.nr_files=
#observation.output_files.dp_correlated_uv.file_size= observation.output_files.dp_correlated_uv.file_size=
#observation.output_files.dp_coherent_stokes.nr_files= observation.output_files.dp_coherent_stokes.nr_files=
#observation.output_files.dp_coherent_stokes.file_size= observation.output_files.dp_coherent_stokes.file_size=
#observation.output_files.dp_incoherent_stokes.nr_files= observation.output_files.dp_incoherent_stokes.nr_files=
#observation.output_files.dp_incoherent_stokes.file_size= observation.output_files.dp_incoherent_stokes.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def add_observation_beams(self): def add_observation_beams(self):
#checkset = """ checkset = """
#observation.nr_beams= 2 observation.nr_beams= 2
#observation.beam[0].nr_subbands= 400 observation.beam[0].nr_subbands= 400
#observation.beam[0].nr_tab_rings= 4 observation.beam[0].nr_tab_rings= 4
#observation.beam[0].tied_array_beam[0].coherent= true observation.beam[0].tied_array_beam[0].coherent= true
#observation.beam[0].tied_array_beam[1].coherent= true observation.beam[0].tied_array_beam[1].coherent= true
#observation.beam[1].nr_subbands= 400 observation.beam[1].nr_subbands= 400
#observation.beam[1].nr_tab_rings= 4 observation.beam[1].nr_tab_rings= 4
#observation.beam[1].tied_array_beam[0].coherent= true observation.beam[1].tied_array_beam[0].coherent= true
#observation.beam[1].tied_array_beam[1].coherent= true observation.beam[1].tied_array_beam[1].coherent= true
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#observation.total_data_size= observation.total_data_size=
#observation.total_bandwidth= observation.total_bandwidth=
#observation.output_files.dp_correlated_uv.nr_files= observation.output_files.dp_correlated_uv.nr_files=
#observation.output_files.dp_correlated_uv.file_size= observation.output_files.dp_correlated_uv.file_size=
#observation.output_files.dp_coherent_stokes.nr_files= observation.output_files.dp_coherent_stokes.nr_files=
#observation.output_files.dp_coherent_stokes.file_size= observation.output_files.dp_coherent_stokes.file_size=
#observation.output_files.dp_incoherent_stokes.nr_files= observation.output_files.dp_incoherent_stokes.nr_files=
#observation.output_files.dp_incoherent_stokes.file_size= observation.output_files.dp_incoherent_stokes.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def enable_observations_coherent_stokes(self): def enable_observations_coherent_stokes(self):
#checkset = """ checkset = """
## coherent_stokes.type can be: DATA_TYPE_XXYY or DATA_TYPE_STOKES_IQUV # coherent_stokes.type can be: DATA_TYPE_XXYY or DATA_TYPE_STOKES_IQUV
#observation.output.coherent_stokes.enabled= true observation.output.coherent_stokes.enabled= true
#observation.output.coherent_stokes.type= DATA_TYPE_XXYY observation.output.coherent_stokes.type= DATA_TYPE_XXYY
#observation.output.coherent_stokes.integration_factor= 1 observation.output.coherent_stokes.integration_factor= 1
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#observation.total_data_size= observation.total_data_size=
#observation.total_bandwidth= observation.total_bandwidth=
#observation.output_files.dp_correlated_uv.nr_files= observation.output_files.dp_correlated_uv.nr_files=
#observation.output_files.dp_correlated_uv.file_size= observation.output_files.dp_correlated_uv.file_size=
#observation.output_files.dp_coherent_stokes.nr_files= observation.output_files.dp_coherent_stokes.nr_files=
#observation.output_files.dp_coherent_stokes.file_size= observation.output_files.dp_coherent_stokes.file_size=
#observation.output_files.dp_incoherent_stokes.nr_files= observation.output_files.dp_incoherent_stokes.nr_files=
#observation.output_files.dp_incoherent_stokes.file_size= observation.output_files.dp_incoherent_stokes.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def enable_observations_incoherent_stokes(self): def enable_observations_incoherent_stokes(self):
#checkset = """ checkset = """
## incoherent_stokes.type can be: DATA_TYPE_STOKES_IQUV # incoherent_stokes.type can be: DATA_TYPE_STOKES_IQUV
#observation.output.incoherent_stokes.enabled= true observation.output.incoherent_stokes.enabled= true
#observation.output.incoherent_stokes.type= DATA_TYPE_STOKES_IQUV observation.output.incoherent_stokes.type= DATA_TYPE_STOKES_IQUV
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#observation.total_data_size= observation.total_data_size=
#observation.total_bandwidth= observation.total_bandwidth=
#observation.output_files.dp_correlated_uv.nr_files= observation.output_files.dp_correlated_uv.nr_files=
#observation.output_files.dp_correlated_uv.file_size= observation.output_files.dp_correlated_uv.file_size=
#observation.output_files.dp_coherent_stokes.nr_files= observation.output_files.dp_coherent_stokes.nr_files=
#observation.output_files.dp_coherent_stokes.file_size= observation.output_files.dp_coherent_stokes.file_size=
#observation.output_files.dp_incoherent_stokes.nr_files= observation.output_files.dp_incoherent_stokes.nr_files=
#observation.output_files.dp_incoherent_stokes.file_size= observation.output_files.dp_incoherent_stokes.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def enabble_flys_eye(self): def enable_flys_eye(self):
#checkset = """ checkset = """
#observation.flys_eye.enabled= true observation.flys_eye.enabled= true
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#observation.total_data_size= observation.total_data_size=
#observation.total_bandwidth= observation.total_bandwidth=
#observation.output_files.dp_correlated_uv.nr_files= observation.output_files.dp_correlated_uv.nr_files=
#observation.output_files.dp_correlated_uv.file_size= observation.output_files.dp_correlated_uv.file_size=
#observation.output_files.dp_coherent_stokes.nr_files= observation.output_files.dp_coherent_stokes.nr_files=
#observation.output_files.dp_coherent_stokes.file_size= observation.output_files.dp_coherent_stokes.file_size=
#observation.output_files.dp_incoherent_stokes.nr_files= observation.output_files.dp_incoherent_stokes.nr_files=
#observation.output_files.dp_incoherent_stokes.file_size= observation.output_files.dp_incoherent_stokes.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
## for all pipelines # for all pipelines
#def enable_calibration_pipeline(self): def enable_calibration_pipeline(self):
#checkset = """ checkset = """
## for calibration-pipeline # for calibration-pipeline
#dp.output.correlated.enabled= true dp.output.correlated.enabled= true
#dp.output.correlated.demixing_settings.freq_step= 60 dp.output.correlated.demixing_settings.freq_step= 60
#dp.output.correlated.demixing_settings.time_step= 10 dp.output.correlated.demixing_settings.time_step= 10
#dp.output.instrument_model.enabled= true dp.output.instrument_model.enabled= true
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#calibration_pipeline.total_data_size= calibration_pipeline.total_data_size=
#calibration_pipeline.total_bandwidth= calibration_pipeline.total_bandwidth=
#calibration_pipeline.dp_correlated_uv.nr_files= calibration_pipeline.dp_correlated_uv.nr_files=
#calibration_pipeline.dp_correlated_uv.file_size= calibration_pipeline.dp_correlated_uv.file_size=
#calibration_pipeline.dp_instrument_model.nr_files= calibration_pipeline.dp_instrument_model.nr_files=
#calibration_pipeline.dp_instrument_model.file_size= calibration_pipeline.dp_instrument_model.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def enable_longbaseline_pipeline(self): def enable_longbaseline_pipeline(self):
#checkset = """ checkset = """
## for -pipeline # for -pipeline
#dp.output.longbaseline.enabled= true dp.output.longbaseline.enabled= true
#dp.output.longbaseline.subband_groups_per_ms= 1 dp.output.longbaseline.subband_groups_per_ms= 1
#dp.output.longbaseline.subbands_per_subband_group= 1 dp.output.longbaseline.subbands_per_subband_group= 1
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#longbaseline_pipeline.total_data_size= longbaseline_pipeline.total_data_size=
#longbaseline_pipeline.total_bandwidth= longbaseline_pipeline.total_bandwidth=
#longbaseline_pipeline.dp_correlated_uv.nr_files= longbaseline_pipeline.dp_correlated_uv.nr_files=
#longbaseline_pipeline.dp_correlated_uv.file_size= longbaseline_pipeline.dp_correlated_uv.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def enable_pulsar_pipeline(self): def enable_pulsar_pipeline(self):
#checkset = """ checkset = """
## for pulsar-pipeline # for pulsar-pipeline
#dp.output.pulsar.enabled= true dp.output.pulsar.enabled= true
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#pulsar_pipeline.total_data_size= pulsar_pipeline.total_data_size=
#pulsar_pipeline.total_bandwidth= pulsar_pipeline.total_bandwidth=
#pulsar_pipeline.dp_pulsar.nr_files= pulsar_pipeline.dp_pulsar.nr_files=
#pulsar_pipeline.dp_pulsar.file_size= pulsar_pipeline.dp_pulsar.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
#def enable_image_pipeline(self): def enable_image_pipeline(self):
#checkset = """ checkset = """
## for image-pipeline # for image-pipeline
#dp.output.skyimage.enabled= true dp.output.skyimage.enabled= true
#dp.output.skyimage.slices_per_image= 1 dp.output.skyimage.slices_per_image= 1
#dp.output.skyimage.subbands_per_image= 2 dp.output.skyimage.subbands_per_image= 2
#""" """
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#self.check_set.import_string(checkset) self.check_set.import_string(checkset)
#validset = """ validset = """
#image_pipeline.total_data_size= image_pipeline.total_data_size=
#image_pipeline.total_bandwidth= image_pipeline.total_bandwidth=
#image_pipeline.dp_sky_image.nr_files= image_pipeline.dp_sky_image.nr_files=
#image_pipeline.dp_sky_image.file_size= image_pipeline.dp_sky_image.file_size=
#""" """
#self.valid_set.import_string(validset) self.valid_set.import_string(validset)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment