Skip to content
Snippets Groups Projects
Commit 11631e1b authored by Ruud Beukema's avatar Ruud Beukema
Browse files

Task #10107: Added last of the testcases and cleaned up code

parent d7fae7f1
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
import os
import unittest
import uuid
import datetime
from pprint import pprint
import logging
from lofar.messaging import RPC, RPCException
from lofar.sas.resourceassignment.resourceassignmentestimator.resource_estimators import *
from lofar.sas.resourceassignment.resourceassignmentestimator.service import ResourceEstimatorHandler
from lofar.sas.resourceassignment.resourceassignmentestimator.service import createService
from lofar.sas.resourceassignment.resourceassignmentestimator.test.testset import TestSet
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.INFO)
logger = logging.getLogger(__name__)
class TestEstimations(unittest.TestCase):
# Set to True to (re-)generate golden outputs for all data sets in ./data_sets that start with
# "t_resource_estimator.in_"
# ----------------------------------------------------------------------------------------------------------------------
DO_GENERATE_GOLDEN_OUTPUTS = False
class TestEstimationsAgainstGoldenOutput(unittest.TestCase):
"""
Collection of tests for verifying if the uut meets the estimation requirements
Collection of tests to verify that the uut meets the estimation requirements.
Verifications are performed against (pre-)generated golden outputs. Please change DO_GENERATE_GOLDEN_OUTPUTS to True
in order to (re-)generate golden outputs from input parsets - stored in ./data_sets and with filenames starting with
"t_resource_estimator.in_"
"""
def setUp(self):
self.uut = ResourceEstimatorHandler()
self.specification_tree = {
'otdb_id': 0,
'task_type': '',
'task_subtype': '',
'predecessors': []
'specification': {}
""" Actions to be performed before executing each test """
self.unique_otdb_id = 0
self.data_sets_dir = os.path.join(os.environ.get('srcdir', os.path.dirname(os.path.abspath(__file__))),
"data_sets")
# ------------------------------------------------------------------------------------------------------------------
# Test estimation for observations
def test_estimate_for_beam_observation(self):
""" Verify estimation for a beam observation specification tree against the golden output. """
# Arrange
data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_beam_observation')
golden_output_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.out_beam_observation')
task_type = 'observation'
task_subtype = 'bfmeasurement'
specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
uut = ResourceEstimatorHandler()
golden_estimation = self.get_golden_estimate(golden_output_filepath,
uut._get_estimated_resources,
specification_tree)
# Act
uut = ResourceEstimatorHandler()
estimation = uut.handle_message({'specification_tree': specification_tree})
# Assert
error_messages = self.get_uut_errors(estimation)
self.assertEqual(len(error_messages), 0, "\nThe uut reported errors:\n" + '\n- '.join(error_messages))
self.assertEqual(self.get_datastructure_as_string(estimation), golden_estimation)
def test_estimate_for_interferometer_observation(self):
""" Verify estimation for a interferometer observation specification tree against the golden output. """
# Arrange
data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_interferometer_observation')
golden_output_filepath = os.path.join(self.data_sets_dir,
't_resource_estimator.out_interferometer_observation')
task_type = 'observation'
task_subtype = 'interferometer'
specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
uut = ResourceEstimatorHandler()
golden_estimation = self.get_golden_estimate(golden_output_filepath,
uut._get_estimated_resources,
specification_tree)
# Act
uut = ResourceEstimatorHandler()
estimation = uut.handle_message({'specification_tree': specification_tree})
# Assert
error_messages = self.get_uut_errors(estimation)
self.assertEqual(len(error_messages), 0, "\nThe uut reported errors:\n" + '\n- '.join(error_messages))
self.assertEqual(self.get_datastructure_as_string(estimation), golden_estimation)
# ------------------------------------------------------------------------------------------------------------------
# Test estimation for pipelines
def test_estimate_for_pulsar_pipeline(self):
""" Verify estimation for a pulsar pipeline specification tree agains the golden output. Pulsar pipelines need a
beamformer observation as their predecessor. """
# Arrange
data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_pulsar_pipeline')
golden_output_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.out_pulsar_observation')
task_type = 'pipeline'
task_subtype = 'pulsar pipeline'
specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
# Pulsar pipelines need a beamformer observation as their predecessor
predecessor_data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_beam_observation')
predecessor_task_type = 'observation'
predecessor_task_subtype = 'bfmeasurement'
self.add_predecessor_to_specification_tree(predecessor_data_set_filepath,
predecessor_task_type,
predecessor_task_subtype,
specification_tree['predecessors'])
uut = ResourceEstimatorHandler()
golden_estimation = self.get_golden_estimate(golden_output_filepath,
uut._get_estimated_resources,
specification_tree)
# Act
uut = ResourceEstimatorHandler()
estimation = uut.handle_message({'specification_tree': specification_tree})
# Assert
error_messages = self.get_uut_errors(estimation)
self.assertEqual(len(error_messages), 0, "\nThe uut reported errors:\n" + '\n- '.join(error_messages))
self.assertEqual(self.get_datastructure_as_string(estimation), golden_estimation)
def test_estimate_for_preprocessing_pipeline(self):
""" Verify estimation for a preprocessing pipeline specification tree against the golden output. Preprocessing
pipelines need an interferometer observation as their predecessor. """
# Arrange
data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_preprocessing_pipeline')
golden_output_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.out_preprocessing_pipeline')
task_type = 'pipeline'
task_subtype = 'averaging pipeline'
specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
# Pipelines need a predecessor so give it one
predecessor_data_set_filepath = os.path.join(self.data_sets_dir,
't_resource_estimator.in_interferometer_observation')
predecessor_task_type = 'observation'
predecessor_task_subtype = 'interferometer'
self.add_predecessor_to_specification_tree(predecessor_data_set_filepath,
predecessor_task_type,
predecessor_task_subtype,
specification_tree['predecessors'])
uut = ResourceEstimatorHandler()
golden_estimation = self.get_golden_estimate(golden_output_filepath,
uut._get_estimated_resources,
specification_tree)
# Act
uut = ResourceEstimatorHandler()
estimation = uut.handle_message({'specification_tree': specification_tree})
# Assert
error_messages = self.get_uut_errors(estimation)
self.assertEqual(len(error_messages), 0, "\nThe uut reported errors:\n" + '\n- '.join(error_messages))
self.assertEqual(self.get_datastructure_as_string(estimation), golden_estimation)
def test_estimate_for_calibration_pipeline(self):
""" Verify estimation for a calibration pipeline specification tree against the golden output """
# Arrange
data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_calibration_pipeline')
golden_output_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.out_calibration_pipeline')
task_type = 'pipeline'
task_subtype = 'calibration pipeline'
specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
self.add_predecessor_to_specification_tree(os.path.join(self.data_sets_dir,
't_resource_estimator.in_calibration_pipeline_predecessor_558022'),
'observation',
'bfmeasurement',
specification_tree['predecessors'])
uut = ResourceEstimatorHandler()
golden_estimation = self.get_golden_estimate(golden_output_filepath,
uut._get_estimated_resources,
specification_tree)
# Act
uut = ResourceEstimatorHandler()
estimation = uut.handle_message({'specification_tree': specification_tree})
# Assert
error_messages = self.get_uut_errors(estimation)
self.assertEqual(len(error_messages), 0, "\nThe uut reported errors:\n" + '\n- '.join(error_messages))
self.assertEqual(self.get_datastructure_as_string(estimation), golden_estimation)
def test_estimate_for_long_baseline_pipeline(self):
""" Verify estimation for a long baseline pipeline specification tree against the golden output """
# Arrange
data_set_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.in_long_baseline_pipeline')
golden_output_filepath = os.path.join(self.data_sets_dir, 't_resource_estimator.out_long_baseline_pipeline')
task_type = 'pipeline'
task_subtype = 'long baseline pipeline'
specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
tree = specification_tree
predecessor_tree = self.get_specification_tree(
os.path.join(
self.data_sets_dir,
't_resource_estimator.in_long_baseline_pipeline_predecessor_556601'),
'pipeline',
'averaging pipeline')
tree['predecessors'].append(predecessor_tree)
tree = predecessor_tree
predecessor_tree = self.get_specification_tree(
os.path.join(
self.data_sets_dir,
't_resource_estimator.in_long_baseline_pipeline_predecessor_556601_556429'),
'pipeline',
'calibration pipeline')
tree['predecessors'].append(predecessor_tree)
tree = predecessor_tree
predecessor_tree_branch_a = self.get_specification_tree(
os.path.join(
self.data_sets_dir,
't_resource_estimator.in_long_baseline_pipeline_predecessor_556601_556429_556373'),
'pipeline',
'calibration pipeline')
tree['predecessors'].append(predecessor_tree_branch_a)
predecessor_tree_branch_b = self.get_specification_tree(
os.path.join(
self.data_sets_dir,
't_resource_estimator.in_long_baseline_pipeline_predecessor_556601_556429_556375'),
'observation',
'bfmeasurement')
tree['predecessors'].append(predecessor_tree_branch_b)
predecessor_tree = self.get_specification_tree(
os.path.join(
self.data_sets_dir,
't_resource_estimator.in_long_baseline_pipeline_predecessor_556601_556429_xxxxxx_556371'),
'observation',
'bfmeasurement')
predecessor_tree_branch_a['predecessors'].append(predecessor_tree)
predecessor_tree_branch_b['predecessors'].append(predecessor_tree)
uut = ResourceEstimatorHandler()
golden_estimation = self.get_golden_estimate(golden_output_filepath,
uut._get_estimated_resources,
specification_tree)
# Act
uut = ResourceEstimatorHandler()
estimation = uut.handle_message({'specification_tree': specification_tree})
# Assert
error_messages = self.get_uut_errors(estimation)
self.assertEqual(len(error_messages), 0, "\nThe uut reported errors:\n" + '\n- '.join(error_messages))
self.assertEqual(self.get_datastructure_as_string(estimation), golden_estimation)
# ------------------------------------------------------------------------------------------------------------------
def get_datastructure_as_string(self, datastructure={}):
""" Get the string-representation of a data structure.
:param datastructure: The datastructure to stringify
:return: The datastructure in string format
"""
return repr(datastructure).strip()
def add_predecessor_to_specification_tree(self, data_set_filepath, task_type, task_subtype, predecessor_list):
""" Adds a predecessor specification tree to an existing specification tree
:param data_set_filepath: predecessor specification data set filepath
:param task_type: predecessor task's type
:param task_subtype: predecessor task's subtype
:param specification_tree: specification tree to with to add predecessor specification tree
"""
predecessor_specification_tree = self.get_specification_tree(data_set_filepath, task_type, task_subtype)
predecessor_list.append(predecessor_specification_tree)
def get_specification_tree(self, data_set_filepath, task_type, task_subtype):
""" Create a specification tree from a specification data set text file
:param data_set_filename: specification data set's filepath
:param task_type: the task's type
:param task_subtype: the task's subtype
:return: specification tree
"""
parset = eval(open(data_set_filepath).read().strip())
return {
'otdb_id': self.get_unique_otdb_id(),
'task_type': task_type,
'task_subtype': task_subtype,
'predecessors': [],
'specification': parset
}
def test_get_subtree_estimate_for_observation_(self):
pass
def get_unique_otdb_id(self):
""" Generates a unique OTDB ID (for use in parsets with predecessors)
def test_get_subtree_estimate_for_calibration_pipeline(self):
pass
:return: unique OTDB ID
"""
self.unique_otdb_id += 1
return self.unique_otdb_id
def test_get_subtree_estimate_for_image_pipeline(self):
pass
def get_uut_errors(self, uut_return_value):
""" Get error messages returned by uut
def test_get_subtree_estimate_for_longbaseline_pipeline(self):
pass
:param uut_return_value: The uut's return value (a dict with the error status per task type)
:return: A list of error messages in string format
"""
error_messages = []
for otdb_id in uut_return_value.keys():
for task_type in uut_return_value[otdb_id]:
message_list = uut_return_value[otdb_id][task_type].get('errors', [])
message_prefix = "otdb_id: %s, task_type: %s, message: " % (otdb_id, task_type)
for message in message_list:
error_messages.append(message_prefix + message)
def test_get_subtree_estimate_for_pulsar_pipeline(self):
pass
return error_messages
class TestServices(unittest.TestCase):
def get_golden_estimate(self, golden_output_filepath, estimator_function=None, *estimator_args):
""" Obtain the golden estimation from file (and create one if DO_GENERATE_GOLDEN_OUTPUTS is True)
:param golden_output_filepath: the path to the golden estimate output file
:param estimator_function: the estimator function to be called with estimator_args as it argument(S)
:param estimator_args: the estimator function's arguments
:return: the golden estimate
"""
Collection of tests for verifying if the uut meets the service requirements (in other words: how it communicates to
the outside world).
# Generate the golden output prior to fetching it if user requests so
if DO_GENERATE_GOLDEN_OUTPUTS:
estimation = estimator_function(*estimator_args)
error_messages = self.get_uut_errors(estimation)
# Make sure that there no errors are returned by uut
if len(error_messages) == 0:
self.store_datastructure(estimation, golden_output_filepath)
else:
raise Exception("\nThe uut reported errors:\n" + '\n- '.join(error_messages))
# Fetch the golden output
f = open(golden_output_filepath, "r")
golden_output = f.read()
f.close()
# Remove trailing newline, and trailing and heading double-quotes
stringified = golden_output.strip()
stringified = stringified.strip('\n')
stringified = stringified.strip('"')
return stringified
def store_datastructure(self, estimation={}, output_file=""):
""" Stores the estimation data structure such that it can be used as golden output to verify against.
:param estimation: resource estimator data structure
:param output_file: file name to store the estimation to
"""
def setUp(self):
pass
def test_rpc_interface(self):
pass
# try:
# from qpid.messaging import Connection
# from qpidtoollibs import BrokerAgent
# except ImportError:
# print 'Cannot run test without qpid tools'
# print 'Please source qpid profile'
# exit(3)
#
# try:
# # setup broker connection
# connection = Connection.establish('127.0.0.1')
# broker = BrokerAgent(connection)
#
# # add test service busname
# busname = 'test-lofarbus-raestimator-%s' % (uuid.uuid1())
# broker.addExchange('topic', busname)
#
# class TestRAEstimator(unittest.TestCase):
# '''Test'''
#
# def test(self):
# '''basic test '''
# self.maxDiff = None
# ts = TestSet()
#
# # test observation
# ts.add_observation()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc(ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add beams
# ts.add_observation_beams()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add flys_eye
# ts.enable_flys_eye()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add coherent_stokes
# ts.enable_observations_coherent_stokes()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add incoherent_stokes
# ts.enable_observations_incoherent_stokes()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add calibration_pipeline
# ts.enable_calibration_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add longbaseline_pipeline
# ts.enable_longbaseline_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add pulsar_pipeline
# ts.enable_pulsar_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # test add image_pipeline
# ts.enable_image_pipeline()
# with RPC('ResourceEstimation', busname=busname, timeout=3) as rpc:
# result = rpc( ts.test_dict() )
# self.assertEqual(result[0], ts.valid_dict())
#
# # create and run the service
# with createService(busname=busname):
# # and run all tests
# unittest.main()
#
# finally:
# # cleanup test bus and exit
# broker.delExchange(busname)
# connection.close()
output_filepath = os.path.join(self.data_sets_dir, output_file)
f = open(output_filepath, 'w+')
pprint(repr(estimation).strip(), stream=f)
f.close()
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment