Skip to content
Snippets Groups Projects
Select Git revision
  • 03b2a810ae418022c56cea807dfe7b099f1a2740
  • master default protected
  • L2SS-1914-fix_job_dispatch
  • TMSS-3170
  • TMSS-3167
  • TMSS-3161
  • TMSS-3158-Front-End-Only-Allow-Changing-Again
  • TMSS-3133
  • TMSS-3319-Fix-Templates
  • test-fix-deploy
  • TMSS-3134
  • TMSS-2872
  • defer-state
  • add-custom-monitoring-points
  • TMSS-3101-Front-End-Only
  • TMSS-984-choices
  • SDC-1400-Front-End-Only
  • TMSS-3079-PII
  • TMSS-2936
  • check-for-max-244-subbands
  • TMSS-2927---Front-End-Only-PXII
  • Before-Remove-TMSS
  • LOFAR-Release-4_4_318 protected
  • LOFAR-Release-4_4_317 protected
  • LOFAR-Release-4_4_316 protected
  • LOFAR-Release-4_4_315 protected
  • LOFAR-Release-4_4_314 protected
  • LOFAR-Release-4_4_313 protected
  • LOFAR-Release-4_4_312 protected
  • LOFAR-Release-4_4_311 protected
  • LOFAR-Release-4_4_310 protected
  • LOFAR-Release-4_4_309 protected
  • LOFAR-Release-4_4_308 protected
  • LOFAR-Release-4_4_307 protected
  • LOFAR-Release-4_4_306 protected
  • LOFAR-Release-4_4_304 protected
  • LOFAR-Release-4_4_303 protected
  • LOFAR-Release-4_4_302 protected
  • LOFAR-Release-4_4_301 protected
  • LOFAR-Release-4_4_300 protected
  • LOFAR-Release-4_4_299 protected
41 results

t_resourceassigner.py

Blame
    • Alexander van Amesfoort's avatar
      03b2a810
      Task #9893: merge trunk into this branch. · 03b2a810
      Alexander van Amesfoort authored
      Fix merge conflict on momqueryservice.py. Moved on trunk, but changed on this branch.
      Solve by replacing the moved file with the changed file (thus, move the changed file over the moved file).
      Careful checking of (JD's) changes shows this is correct; the changes are preserved.
      Other moved files did not cause conflicts, since they were not changed.
      03b2a810
      History
      Task #9893: merge trunk into this branch.
      Alexander van Amesfoort authored
      Fix merge conflict on momqueryservice.py. Moved on trunk, but changed on this branch.
      Solve by replacing the moved file with the changed file (thus, move the changed file over the moved file).
      Careful checking of (JD's) changes shows this is correct; the changes are preserved.
      Other moved files did not cause conflicts, since they were not changed.
    Code owners
    Assign users and groups as approvers for specific file changes. Learn more.
    t_resourceassigner.py 55.89 KiB
    #!/usr/bin/env python
    
    import unittest
    import mock
    import datetime
    
    from lofar.sas.resourceassignment.resourceassigner.assignment import ResourceAssigner
    from lofar.parameterset import parameterset
    
    ra_notification_prefix = "ra_notification_prefix"
    
    
    class TestingResourceAssigner(ResourceAssigner):
        def __init__(self, rarpc, rerpc, otdbrpc, momrpc, curpc, sqrpc, ra_notification_bus):
            # super gets not done to be able to insert mocks as early as possible otherwise the RPC block unittesting
            self.radbrpc = rarpc
            self.rerpc = rerpc
            self.otdbrpc = otdbrpc
            self.momrpc = momrpc
            self.curpc = curpc
            self.sqrpc = sqrpc
            self.ra_notification_bus = ra_notification_bus
            self.ra_notification_prefix = ra_notification_prefix
    
    
    class ResourceAssignerTest(unittest.TestCase):
        mom_id = 351557
        otdb_id = 1290494
        specification_id = 2323
        state = u'prescheduled'
        task_type = u'pipeline'
    
        specification_tree = {}
    
        non_approved_or_prescheduled_status = u'opened'
        non_approved_or_prescheduled_otdb_id = 1
    
        future_start_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=1)).strftime('%Y-%m-%d %H:%M:%S')
        future_stop_time = (datetime.datetime.utcnow() + datetime.timedelta(hours=2)).strftime('%Y-%m-%d %H:%M:%S')
    
        non_approved_or_prescheduled_specification_tree = {
            u'otdb_id': non_approved_or_prescheduled_otdb_id,
            u'task_type': u'pipeline',
            u'state': non_approved_or_prescheduled_status,
            u'specification': {
                u'Observation.startTime': future_start_time,
                u'Observation.stopTime': future_stop_time
            }
        }
    
        approved_status = u'approved'
        approved_otdb_id = 22
        approved_specification_tree = {
            u'otdb_id': approved_otdb_id,
            u'task_type': u'pipeline',
            u'state': approved_status,
            u'specification': {
                u'Observation.startTime': future_start_time,
                u'Observation.stopTime': future_stop_time
            }
        }
    
        cep2_specification_tree = {
            u'otdb_id': otdb_id,
            u'task_type': u'pipeline',
            u'state': u'prescheduled',
            u'specification': {
                u'Observation.startTime': future_start_time,
                u'Observation.stopTime': future_stop_time,
                u'Observation.DataProducts.Output_Pulsar.enabled': True,
                u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP2'
            }
        }
    
        mom_bug_processing_cluster_name = 'CEP2'
        mom_bug_otbd_id = 1234
        mom_bug_specification_tree = {
            u'otdb_id': mom_bug_otbd_id,
            u'task_type': u'pipeline',
            u'state': u'prescheduled',
            u'specification': {
                u'Observation.startTime': future_start_time,
                u'Observation.stopTime': future_stop_time,
                u'Observation.DataProducts.Output_Pulsar.enabled': True,
                u'Observation.DataProducts.Output_Pulsar.storageClusterName': u'CEP4',
                u'Observation.Cluster.ProcessingCluster.clusterName': mom_bug_processing_cluster_name
            }
        }
    
        task_mom_id = 351543
        task_otdb_id = 1290472
        task_id = 2299
        task_end_time = datetime.datetime(2016, 3, 25, 22, 47, 31)
        task_start_time = datetime.datetime(2016, 3, 25, 21, 47, 31)
        task = {
            "mom_id": task_mom_id,
            "otdb_id": task_otdb_id,
            "id": task_id,
            "endtime": task_end_time,
            "name": "IS HBA_DUAL",
            "predecessor_ids": [],
            "project_mom_id": 2,
            "project_name": "test-lofar",
            "specification_id": specification_id,
            "starttime": task_start_time,
            "status": "prescheduled",
            "status_id": 350,
            "successor_ids": [],
            "type": "pipeline",
            "type_id": 0
        }
    
        non_existing_task_mom_id = -1
    
        predecessor_task_mom_id = 1
        predecessor_task_otdb_id = 2
        predecessor_task_id = 3
        predecessor_task = {
            "mom_id": predecessor_task_mom_id,
            "otdb_id": predecessor_task_otdb_id,
            "id": predecessor_task_id,
            "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31),
            "name": "IS HBA_DUAL",
            "predecessor_ids": [],
            "project_mom_id": 2,
            "project_name": "test-lofar",
            "specification_id": 2323,
            "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31),
            "status": "prescheduled",
            "status_id": 350,
            "successor_ids": [],
            "type": "pipeline",
            "type_id": 0
        }
    
        successor_task_mom_id = 4
        successor_task_otdb_id = 5
        successor_task_id = 6
        successor_task = {
            "mom_id": successor_task_mom_id,
            "otdb_id": successor_task_otdb_id,
            "id": successor_task_id,
            "endtime": datetime.datetime(2016, 3, 25, 22, 47, 31),
            "name": "IS HBA_DUAL",
            "predecessor_ids": [],
            "project_mom_id": 2,
            "project_name": "test-lofar",
            "specification_id": 2323,
            "starttime": datetime.datetime(2016, 3, 25, 21, 47, 31),
            "status": "prescheduled",
            "status_id": 350,
            "successor_ids": [],
            "type": "pipeline",
            "type_id": 0
        }
    
        resources_with_errors_otdb_id = 1290496
        resource_error1 = "error 1"
        resource_error2 = "error 2"
        unknown_resource_type_name = "fuel"
        unknown_resource_type_otdb_id = 123489
    
        resource_unknown_property = 'unknown_property'
    
        rerpc_status = 0
        rerpc_needed_claim_for_bandwidth_size = 19021319494
        rerpc_needed_claim_for_bandwidth = {
            'total_size': rerpc_needed_claim_for_bandwidth_size
        }
    
        rerpc_needed_claim_for_storage_output_files = {
            'uv': {
                'nr_of_uv_files': 481,
                'uv_file_size': 1482951104
            },
            'saps': [
                {
                    'sap_nr': 0,
                    'properties': {
                        'nr_of_uv_files': 319
                    }
                },
                {
                    'sap_nr': 1,
                    'properties': {
                        'nr_of_uv_files': 81,
                        resource_unknown_property: -1
                    }
                },
                {
                    'sap_nr': 2,
                    'properties': {
                        'nr_of_uv_files': 81
                    }
                }
            ]
        }
        rerpc_needed_claim_for_storage_size = 713299481024
        rerpc_needed_claim_for_storage = {
            'total_size': rerpc_needed_claim_for_storage_size,
            'output_files': rerpc_needed_claim_for_storage_output_files
        }
        rerpc_replymessage = {
            str(otdb_id): {
                'pipeline': {
                    'bandwidth': rerpc_needed_claim_for_bandwidth,
                    'storage': rerpc_needed_claim_for_storage
                }
            },
            str(resources_with_errors_otdb_id): {
                'pipeline': {
                    'bandwidth': {
                        'total_size': 19021319494
                    },
                    'storage': {
                        'total_size': 713299481024,
                        'output_files': {
                            'uv': {
                                'nr_of_uv_files': 481,
                                'uv_file_size': 1482951104
                            },
                            'saps': [{
                                'sap_nr': 0,
                                'properties': {
                                    'nr_of_uv_files': 319
                                }
                            },
                                {
                                    'sap_nr': 1,
                                    'properties': {
                                        'nr_of_uv_files': 81
                                    }
                                },
                                {
                                    'sap_nr': 2,
                                    'properties': {
                                        'nr_of_uv_files': 81
                                    }
                                }]
                        }
                    },
                },
                'errors': [resource_error1, resource_error2]
            },
            str(unknown_resource_type_otdb_id): {
                'pipeline': {
                    str(unknown_resource_type_name): {
                    }
                }
            }
        }
    
        cep4bandwidth_resource_id = 116
        cep4storage_resource_id = 117
    
        storage_claim = {
            'resource_id': cep4storage_resource_id,
            'starttime': task_start_time,
            'endtime': task_end_time + datetime.timedelta(days=365),
            'status': 'claimed',
            'claim_size': rerpc_needed_claim_for_storage_size,
            'properties': [
                {'type': 2, 'io_type': 'output', 'value': 481},
                {'type': 10, 'io_type': 'output', 'value': 1482951104},
                {'type': 2, 'io_type': 'output', 'sap_nr': 0, 'value': 319},
                {'type': 2, 'io_type': 'output', 'sap_nr': 1, 'value': 81},
                {'type': 2, 'io_type': 'output', 'sap_nr': 2, 'value': 81}
            ]
        }
    
        bandwith_claim = {
            'resource_id': cep4bandwidth_resource_id,
            'starttime': task_start_time,
            'endtime': task_end_time,
            'status': 'claimed',
            'claim_size': rerpc_needed_claim_for_bandwidth_size
        }
    
        specifaction_claims = [bandwith_claim, storage_claim]
    
        def reset_specification_tree(self):
            self.specification_tree = {
                u'otdb_id': self.otdb_id,
                u'task_type': self.task_type,
                u'state': self.state,
                u'specification': {
                    u'Observation.momID': str(self.mom_id),
                    u'Observation.startTime': self.future_start_time,
                    u'Observation.stopTime': self.future_stop_time,
                    u'Observation.DataProducts.Output_InstrumentModel.enabled': False,
                    u'Observation.VirtualInstrument.stationList': [],
                    u'Observation.DataProducts.Input_CoherentStokes.enabled': False,
                    u'Observation.DataProducts.Output_CoherentStokes.enabled': False,
                    u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0],
                    u'Observation.antennaSet': u'LBA_INNER',
                    u'Observation.nrBitsPerSample': u'16',
                    u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'2',
                    u'Observation.DataProducts.Output_IncoherentStokes.enabled': False,
                    u'Observation.DataProducts.Input_IncoherentStokes.enabled': False,
                    u'Observation.DataProducts.Input_Correlated.enabled': True,
                    u'Observation.DataProducts.Output_Pulsar.enabled': False,
                    u'Observation.DataProducts.Input_CoherentStokes.skip': [],
                    u'Observation.DataProducts.Output_SkyImage.enabled': False,
                    u'Version.number': u'33774',
                    u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'2',
                    u'Observation.nrBeams': u'0',
                    u'Observation.DataProducts.Input_IncoherentStokes.skip': [],
                    u'Observation.DataProducts.Output_Correlated.enabled': True,
                    u'Observation.DataProducts.Output_Correlated.storageClusterName': 'CEP4',
                    u'Observation.sampleClock': u'200',
                    u'Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'
                },
                u'task_subtype': u'long baseline pipeline',
                u'predecessors': [{
                    u'task_subtype': u'averaging pipeline',
                    u'specification': {
                        u'Observation.DataProducts.Output_InstrumentModel.enabled': False,
                        u'Observation.stopTime': u'2016-03-25 13:51:05',
                        u'Observation.VirtualInstrument.stationList': [],
                        u'Observation.DataProducts.Input_CoherentStokes.enabled': False,
                        u'Observation.DataProducts.Output_CoherentStokes.enabled': False,
                        u'Observation.DataProducts.Output_SkyImage.enabled': False,
                        u'Observation.DataProducts.Input_Correlated.skip': [0, 0, 0, 0],
                        u'Observation.antennaSet': u'LBA_INNER',
                        u'Observation.nrBitsPerSample': u'16',
                        u'Observation.ObservationControl.PythonControl.LongBaseline.subbandgroups_per_ms': u'1',
                        u'Observation.DataProducts.Output_IncoherentStokes.enabled': False,
                        u'Observation.DataProducts.Input_IncoherentStokes.enabled': False,
                        u'Observation.DataProducts.Input_Correlated.enabled': True,
                        u'Observation.DataProducts.Output_Pulsar.enabled': False,
                        u'Observation.DataProducts.Input_CoherentStokes.skip': [],
                        u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixtimestep': u'10',
                        u'Version.number': u'33774',
                        u'Observation.momID': u'351556',
                        u'Observation.startTime': u'2016-03-25 13:49:55',
                        u'Observation.ObservationControl.PythonControl.LongBaseline.subbands_per_subbandgroup': u'1',
                        u'Observation.nrBeams': u'0',
                        u'Observation.DataProducts.Input_IncoherentStokes.skip': [],
                        u'Observation.ObservationControl.PythonControl.DPPP.demixer.demixfreqstep': u'64',
                        u'Observation.DataProducts.Output_Correlated.enabled': True,
                        u'Observation.sampleClock': u'200'
                    },
                    u'task_type': u'pipeline',
                    u'otdb_id': 1290496,
                    u'predecessors': [{
                        u'task_subtype': u'bfmeasurement',
                        u'specification': {
                            u'Observation.DataProducts.Output_InstrumentModel.enabled': False,
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.timeIntegrationFactor': u'1',
                            u'Observation.stopTime': u'2016-03-26 00:33:31',
                            u'Observation.VirtualInstrument.stationList': [u'RS205', u'RS503', u'CS013', u'RS508',
                                                                           u'RS106'],
                            u'Observation.DataProducts.Input_CoherentStokes.enabled': False,
                            u'Observation.DataProducts.Output_CoherentStokes.enabled': False,
                            u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.nrChannelsPerSubband': u'64',
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.which': u'I',
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.which': u'I',
                            u'Observation.Beam[0].subbandList': [100, 101, 102, 103],
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.IncoherentStokes.subbandsPerFile': u'512',
                            u'Observation.DataProducts.Input_Correlated.skip': [],
                            u'Observation.antennaSet': u'HBA_DUAL',
                            u'Observation.nrBitsPerSample': u'8',
                            u'Observation.Beam[0].nrTabRings': u'0',
                            u'Observation.Beam[0].nrTiedArrayBeams': u'0',
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.flysEye': False,
                            u'Observation.nrBeams': u'1',
                            u'Observation.ObservationControl.OnlineControl.Cobalt.Correlator.integrationTime': u'1.0',
                            u'Observation.DataProducts.Output_IncoherentStokes.enabled': False,
                            u'Observation.DataProducts.Input_IncoherentStokes.enabled': False,
                            u'Observation.DataProducts.Input_Correlated.enabled': False,
                            u'Observation.DataProducts.Output_Pulsar.enabled': False,
                            u'Observation.DataProducts.Input_CoherentStokes.skip': [],
                            u'Observation.DataProducts.Output_SkyImage.enabled': False,
                            u'Version.number': u'33774',
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.timeIntegrationFactor': u'1',
                            u'Observation.momID': u'351539',
                            u'Observation.startTime': u'2016-03-26 00:31:31',
                            u'Observation.ObservationControl.OnlineControl.Cobalt.BeamFormer.CoherentStokes.subbandsPerFile': u'512',
                            u'Observation.DataProducts.Input_IncoherentStokes.skip': [],
                            u'Observation.DataProducts.Output_Correlated.enabled': True,
                            u'Observation.sampleClock': u'200'
                        },
                        u'task_type': u'observation',
                        u'otdb_id': 1290476,
                        u'predecessors': []
                    }]
                }]
            }
    
        def setUp(self):
            def get_task_side_effect(*args, **kwargs):
                if 'mom_id' in kwargs:
                    if kwargs['mom_id'] == self.successor_task_mom_id:
                        return self.successor_task
                    elif kwargs['mom_id'] == self.predecessor_task_mom_id:
                        return self.predecessor_task
                    elif kwargs['mom_id'] == self.non_existing_task_mom_id:
                        return None
                    else:
                        return self.task
                else:
                    return self.task
    
            self.successor_task_mom_ids = [self.successor_task_mom_id]
            self.predecessor_task_mom_ids = [self.predecessor_task_mom_id]
    
            rarpc_patcher = mock.patch('lofar.sas.resourceassignment.resourceassignmentservice.rpc.RARPC')
            self.addCleanup(rarpc_patcher.stop)
            self.rarpc_mock = rarpc_patcher.start()
            self.rarpc_mock.getTask.side_effect = get_task_side_effect
            self.rarpc_mock.insertSpecificationAndTask.return_value = {
                'inserted': True,
                'specification_id': self.specification_id,
                'task_id': self.task_id
            }
            self.rarpc_mock.getResourceClaimPropertyTypes.return_value = [
                {'id': 0, 'name': 'nr_of_is_files'},
                {'id': 1, 'name': 'nr_of_cs_files'},
                {'id': 2, 'name': 'nr_of_uv_files'},
                {'id': 3, 'name': 'nr_of_im_files'},
                {'id': 4, 'name': 'nr_of_img_files'},
                {'id': 5, 'name': 'nr_of_pulp_files'},
                {'id': 6, 'name': 'nr_of_cs_stokes'},
                {'id': 7, 'name': 'nr_of_is_stokes'},
                {'id': 8, 'name': 'is_file_size'},
                {'id': 9, 'name': 'cs_file_size'},
                {'id': 10, 'name': 'uv_file_size'},
                {'id': 11, 'name': 'im_file_size'},
                {'id': 12, 'name': 'img_file_size'},
                {'id': 13, 'name': 'nr_of_pulp_files'},
                {'id': 14, 'name': 'nr_of_tabs'},
                {'id': 15, 'name': 'start_sb_nr'},
                {'id': 16, 'name': 'uv_otdb_id'},
                {'id': 17, 'name': 'cs_otdb_id'},
                {'id': 18, 'name': 'is_otdb_id'},
                {'id': 19, 'name': 'im_otdb_id'},
                {'id': 20, 'name': 'img_otdb_id'},
                {'id': 21, 'name': 'pulp_otdb_id'},
                {'id': 22, 'name': 'is_tab_nr'},
                {'id': 23, 'name': 'start_sbg_nr'}
            ]
            self.rarpc_mock.getResourceTypes.return_value = [
                {'id': 0, 'name': 'rsp', 'unit_id': 0, 'units': 'rsp_channel_bit'},
                {'id': 1, 'name': 'tbb', 'unit_id': 1, 'units': 'bytes'},
                {'id': 2, 'name': 'rcu', 'unit_id': 2, 'units': 'rcu_board'},
                {'id': 3, 'name': 'bandwidth', 'unit_id': 3, 'units': 'bits/second'},
                {'id': 4, 'name': 'processor', 'unit_id': 4, 'units': 'cores'},
                {'id': 5, 'name': 'storage', 'unit_id': 1, 'units': 'bytes'},
            ]
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1, 2]}
    
            # incomplete response but good enough for tests
            self.rarpc_mock.getResources.return_value = [
                {'id': self.cep4bandwidth_resource_id, 'name': 'cep4bandwidth', 'type_id': 3, 'type_name': 'bandwidth', 'unit_id': 3, 'unit': 'bits/second'},
                {'id': self.cep4storage_resource_id, 'name': 'cep4storage', 'type_id': 5, 'type_name': 'storage', 'unit_id': 1, 'unit': 'bytes'}
            ]
            self.rarpc_mock.getResourceClaims.return_value = []
    
            rerpc_patcher = mock.patch('lofar.messaging.RPC')
            self.addCleanup(rerpc_patcher.stop)
            self.rerpc_mock = rerpc_patcher.start()
            self.rerpc_mock.return_value = self.rerpc_replymessage, self.rerpc_status
    
            otdbrpc_patcher = mock.patch('lofar.sas.otdb.otdbrpc')
            self.addCleanup(otdbrpc_patcher.stop)
            self.otdbrpc_mock = otdbrpc_patcher.start()
    
            momrpc_patcher = mock.patch('lofar.mom.momqueryservice.momqueryrpc')
            self.addCleanup(momrpc_patcher.stop)
            self.momrpc_mock = momrpc_patcher.start()
            self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): self.predecessor_task_mom_ids}
            self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): self.successor_task_mom_ids}
    
            curpc_patcher = mock.patch('lofar.sas.datamanagement.cleanup.rpc')
            self.addCleanup(curpc_patcher.stop)
            self.curpc_mock = curpc_patcher.start()
    
            sqrpc_patcher = mock.patch('lofar.sas.datamanagement.storagequery.rpc')
            self.addCleanup(sqrpc_patcher.stop)
            self.sqrpc_mock = sqrpc_patcher.start()
    
            ra_notification_bus_patcher = mock.patch('lofar.messaging.messagebus')
            self.addCleanup(ra_notification_bus_patcher.stop)
            self.ra_notification_bus_mock = ra_notification_bus_patcher.start()
    
            logger_patcher = mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.logger')
            self.addCleanup(logger_patcher.stop)
            self.logger_mock = logger_patcher.start()
    
            move_pipeline_after_its_predecessors_patcher = mock.patch(
                'lofar.sas.resourceassignment.resourceassigner.assignment.movePipelineAfterItsPredecessors')
            self.addCleanup(move_pipeline_after_its_predecessors_patcher.stop)
            self.movePipelineAfterItsPredecessors_mock = move_pipeline_after_its_predecessors_patcher.start()
    
            self.resourceAssigner = TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock,
                                                            self.otdbrpc_mock, self.momrpc_mock,
                                                            self.curpc_mock, self.sqrpc_mock,
                                                            self.ra_notification_bus_mock)
            self.reset_specification_tree()
    
        def assert_all_services_opened(self):
            self.assertTrue(self.rarpc_mock.open.called, "RARPC.open was not called")
            self.assertTrue(self.rerpc_mock.open.called, "RPC.open was not called")
            self.assertTrue(self.otdbrpc_mock.open.called, "OTDBRPC.open was not called")
            self.assertTrue(self.momrpc_mock.open.called, "MOMRPC.open was not called")
            self.assertTrue(self.curpc_mock.open.called, "CURPC.open was not called")
            self.assertTrue(self.ra_notification_bus_mock.open.called, "ra_notification_bus.open was not called")
    
        def assert_all_services_closed(self):
            self.assertTrue(self.rarpc_mock.close.called, "RARPC.close was not called")
            self.assertTrue(self.rerpc_mock.close.called, "RPC.close was not called")
            self.assertTrue(self.otdbrpc_mock.close.called, "OTDBRPC.close was not called")
            self.assertTrue(self.momrpc_mock.close.called, "MOMRPC.close was not called")
            self.assertTrue(self.curpc_mock.close.called, "CURPC.close was not called")
            self.assertTrue(self.ra_notification_bus_mock.close.called, "ra_notification_bus.close was not called")
    
        def test_open_opens_all_services(self):
            self.resourceAssigner.open()
    
            self.assert_all_services_opened()
    
        def test_close_closes_all_services(self):
            self.resourceAssigner.close()
    
            self.assert_all_services_closed()
    
        def test_contextManager_opens_and_closes_all_services(self):
            with TestingResourceAssigner(self.rarpc_mock, self.rerpc_mock,
                                         self.otdbrpc_mock, self.momrpc_mock,
                                         self.curpc_mock, self.sqrpc_mock,
                                         self.ra_notification_bus_mock):
                self.assert_all_services_opened()
    
            self.assert_all_services_closed()
    
        def test_do_assignment_logs_specification(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('doAssignment: specification_tree=%s' % self.specification_tree)
    
        def test_do_assignment_log_non_approved_or_prescheduled_states(self):
            self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'skipping specification for otdb_id=%s because status=%s',
                (self.non_approved_or_prescheduled_otdb_id, self.non_approved_or_prescheduled_status))
    
        def test_do_assignment_non_approved_or_prescheduled_states_should_be_skipped(self):
            self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree)
    
            self.assertEqual(len(self.otdbrpc_mock.method_calls), 0,
                             "OTDBRPC was called for non approved or scheduled specification tree")
            self.assertEqual(len(self.rarpc_mock.method_calls), 0,
                             "RARPC was called for non approved or scheduled specification tree")
            self.assertEqual(len(self.momrpc_mock.method_calls), 0,
                             "MOMRPC was called for non approved or scheduled specification tree")
            self.assertEqual(len(self.rerpc_mock.method_calls), 0,
                             "RERPC was called for non approved or scheduled specification tree")
            self.assertEqual(len(self.curpc_mock.method_calls), 0,
                             "CURPC was called for non approved or scheduled specification tree")
            self.assertEqual(len(self.ra_notification_bus_mock.method_calls), 0,
                             "RA notification bus was called for non approved or scheduled specification tree")
    
        def test_do_assignment_inserts_specification_and_task_in_radb(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            start_time = datetime.datetime.strptime(self.future_start_time, '%Y-%m-%d %H:%M:%S')
            stop_time = datetime.datetime.strptime(self.future_stop_time, '%Y-%m-%d %H:%M:%S')
            parset = parameterset(self.specification_tree['specification'])
    
            self.rarpc_mock.insertSpecificationAndTask.assert_any_call(self.mom_id, self.otdb_id, self.state,
                                                                       self.task_type, start_time, stop_time, str(parset),
                                                                       "CEP4")
    
        def test_do_assignment_logs_when_insertion_of_specification_and_task_in_radb_failed(self):
            self.rarpc_mock.insertSpecificationAndTask.return_value = {'inserted': False}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call('could not insert specification and task')
    
        def test_do_assignment_logs_when_no_predecessors_found(self):
            self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): []}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('no predecessors for otdb_id=%s', self.task_otdb_id)
    
        def test_do_assignment_logs_when_predecessors_are_found(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('proccessing predecessor mom_ids=%s for mom_id=%s otdb_id=%s',
                                                  self.predecessor_task_mom_ids, self.task_mom_id, self.task_otdb_id)
    
        def test_do_assignment_logs_when_predecessors_are_found_but_its_task_is_missing_in_radb(self):
            self.momrpc_mock.getPredecessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'could not find predecessor task with mom_id=%s in radb for task otdb_id=%s', self.non_existing_task_mom_id, self.task_otdb_id)
    
        def test_do_assignment_logs_when_predecessors_are_found_that_need_to_be_linked_to_task(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'connecting predecessor task with mom_id=%s otdb_id=%s to it\'s successor with mom_id=%s otdb_id=%s',
                self.predecessor_task_mom_id,
                self.predecessor_task_otdb_id,
                self.task_mom_id,
                self.task_otdb_id)
    
        def test_do_assignment_insert_predecessor_into_task_when_not_linked_to_task(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.task_id, self.predecessor_task_id)
    
        def test_do_assignment_logs_when_no_successors_found(self):
            self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): []}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('no successors for otdb_id=%s', self.task_otdb_id)
    
        def test_do_assignment_logs_when_successors_are_found(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('proccessing successor mom_ids=%s for mom_id=%s otdb_id=%s',
                                                  self.successor_task_mom_ids, self.task_mom_id, self.task_otdb_id)
    
        def test_do_assignment_logs_when_successors_are_found_but_its_task_is_missing_in_radb(self):
            self.momrpc_mock.getSuccessorIds.return_value = {str(self.task_mom_id): [self.non_existing_task_mom_id]}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'could not find successor task with mom_id=%s in radb for task mom_id=%s otdb_id=%s',
                self.non_existing_task_mom_id, self.task_mom_id, self.task_otdb_id)
    
        def test_do_assignment_logs_when_successors_are_found_that_need_to_be_linked_to_task(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'connecting successor task with mom_id=%s otdb_id=%s to it\'s predecessor with mom_id=%s otdb_id=%s',
                self.successor_task_mom_id,
                self.successor_task_otdb_id,
                self.task_mom_id,
                self.task_otdb_id)
    
        def test_do_assignment_insert_successor_into_task_when_not_linked_to_task(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.insertTaskPredecessor.assert_any_call(self.successor_task_id, self.task_id)
    
        def test_do_assignment_moves_pipeline_of_successor_after_predecessor(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertTrue(self.movePipelineAfterItsPredecessors_mock.called)
    
        def test_do_assignment_logs_mom_bug(self):
            self.resourceAssigner.doAssignment(self.mom_bug_specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'overwriting and uploading processingClusterName to otdb from %s to %s for otdb_id=%s',
                self.mom_bug_processing_cluster_name, 'CEP4', self.mom_bug_otbd_id)
    
        def test_do_assignment_resets_ProcessingCluster_clusterName_on_mom_bug(self):
            self.resourceAssigner.doAssignment(self.mom_bug_specification_tree)
    
            self.otdbrpc_mock.taskSetSpecification.assert_any_call(
                self.mom_bug_otbd_id,
                {'LOFAR.ObsSW.Observation.Cluster.ProcessingCluster.clusterName': 'CEP4'})
    
        @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
        def test_do_assignment_should_reset_observation_period_when_in_past_without_predecessor_and_duration(
                self, datetime_mock):
            now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
            datetime_mock.utcnow.return_value = now
    
            new_starttime = now + datetime.timedelta(minutes=1)
            new_endtime = new_starttime + datetime.timedelta(hours=1)
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
                new_starttime, new_endtime, self.otdb_id)
            self.logger_mock.info.assert_any_call(
                'uploading auto-generated start/end time  (%s, %s) to otdb for otdb_id=%s',
                new_starttime,
                new_endtime,
                self.otdb_id)
            self.otdbrpc_mock.taskSetSpecification.assert_any_call(
                self.otdb_id,
                {
                    'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'),
                    'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S')
                })
    
        @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
        def test_do_assignment_should_reset_observation_period_when_in_past_with_predecessor_in_future(self, datetime_mock):
            now = self.freeze_time_one_day_in_the_future(datetime_mock)
    
            future_predecessor_stop_time = now + datetime.timedelta(hours=1)
            self.specification_tree['predecessors'][0]['specification']['Observation.stopTime'] = \
                future_predecessor_stop_time.strftime('%Y-%m-%d %H:%M:%S')
    
            new_starttime = future_predecessor_stop_time + datetime.timedelta(minutes=1)
            new_endtime = new_starttime + datetime.timedelta(hours=1)
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
                new_starttime, new_endtime, self.otdb_id)
            self.logger_mock.info.assert_any_call(
                'uploading auto-generated start/end time  (%s, %s) to otdb for otdb_id=%s',
                new_starttime,
                new_endtime,
                self.otdb_id)
            self.otdbrpc_mock.taskSetSpecification.assert_any_call(
                self.otdb_id,
                {
                    'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'),
                    'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S')
                })
    
        def freeze_time_one_day_in_the_future(self, datetime_mock):
            now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
            now = self._strip_ms(now)
            datetime_mock.utcnow.return_value = now
            datetime_mock.strptime.side_effect = \
                lambda date_string, format_string: datetime.datetime.strptime(date_string, format_string)
            return now
    
        def _strip_ms(self, now):
            return datetime.datetime.strptime(now.strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S')
    
        @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
        def test_do_assignment_should_reset_observation_period_when_in_past_with_task_duration(self, datetime_mock):
            now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
    
            datetime_mock.utcnow.return_value = now
    
            duration = 100
            self.specification_tree['specification']['Observation.Scheduler.taskDuration'] = duration
    
            new_starttime = now + datetime.timedelta(minutes=1)
            new_endtime = new_starttime + datetime.timedelta(seconds=duration)
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'Applying sane defaults (%s, %s) for start/end time from specification for otdb_id=%s',
                new_starttime, new_endtime, self.otdb_id)
            self.logger_mock.info.assert_any_call(
                'uploading auto-generated start/end time  (%s, %s) to otdb for otdb_id=%s',
                new_starttime,
                new_endtime,
                self.otdb_id)
            self.otdbrpc_mock.taskSetSpecification.assert_any_call(
                self.otdb_id,
                {
                    'LOFAR.ObsSW.Observation.startTime': new_starttime.strftime('%Y-%m-%d %H:%M:%S'),
                    'LOFAR.ObsSW.Observation.stopTime': new_endtime.strftime('%Y-%m-%d %H:%M:%S')
                })
    
        def test_do_assignment_should_log_insertion_of_specification_and_task(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s'
                ' cluster=%s' %
                (self.mom_id, self.otdb_id, self.state, self.task_type, self.future_start_time, self.future_stop_time,
                 "CEP4"))
    
        def test_do_assignment_should_log_when_insertion_of_specification_and_task_is_done(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('doAssignment: inserted specification (id=%s) and task (id=%s)' %
                                                  (self.specification_id, self.task_id))
    
        def test_do_assignment_should_log_on_non_presceduled_cep4_tasks(self):
            self.resourceAssigner.doAssignment(self.approved_specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'skipping resource assignment for CEP4 task otdb_id=%s because status=%s' %
                (self.approved_otdb_id,
                 self.approved_status))
    
        def test_do_assignment_should_not_claim_resources_on_CEP2_tasks(self):
            self.resourceAssigner.doAssignment(self.cep2_specification_tree)
    
            self.rarpc_mock.insertResourceClaims.assert_not_called()
    
        def test_do_assignment_should_not_claim_resources_on_non_presceduled_cep4_tasks(self):
            self.resourceAssigner.doAssignment(self.non_approved_or_prescheduled_specification_tree)
    
            self.rarpc_mock.insertResourceClaims.assert_not_called()
    
        def test_do_assignment_should_log_if_not_all_storageClusters_are_seen_as_CEP4(self):
            self.resourceAssigner.doAssignment(self.cep2_specification_tree)
    
            self.logger_mock.warn.assert_any_call("storageClusterName not CEP4, rejecting specification.")
    
        def test_do_assignment_should_log_if_all_storageClusters_are_seen_as_CEP4(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call("all enabled storageClusterName's are CEP4, accepting specification.")
    
        def test_do_assginement_should_request_needed_resources(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rerpc_mock.assert_any_call(
                {"specification_tree": self.specification_tree}, timeout=10)
    
        def test_do_assignemnet_logs_needed_resources(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('getNeededResouces: %s' % self.rerpc_replymessage)
            self.logger_mock.info.assert_any_call('doAssignment: getNeededResouces=%s' % self.rerpc_replymessage)
    
        def test_do_assignment_logs_when_otdb_id_not_needed_resources(self):
            self.specification_tree["otdb_id"] = self.otdb_id + 1
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call("no otdb_id %s found in estimator results %s" %
                                                   (self.otdb_id + 1, self.rerpc_replymessage))
    
        def test_do_assignment_should_not_claim_resouces_when_otdb_id_not_needed_resources(self):
            self.specification_tree["otdb_id"] = self.otdb_id + 1
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.insertResourceClaims.assert_not_called()
    
        def test_do_assignment_logs_when_task_type_not_in_needed_resources(self):
            wrong_task_type = "observation"
            self.specification_tree["task_type"] = wrong_task_type
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call("no task type %s found in estimator results %s" %
                                                   (wrong_task_type,
                                                    self.rerpc_replymessage[str(self.otdb_id)]))
    
        def test_do_assignment_should_not_claim_resources_when_task_type_not_in_needed_resources(self):
            wrong_task_type = "observation"
            self.specification_tree["task_type"] = wrong_task_type
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.insertResourceClaims.assert_not_called()
    
        def test_do_assignment_should_log_single_errors_in_needed_resources(self):
            self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call("Error in estimator: %s", self.resource_error1)
            self.logger_mock.error.assert_any_call("Error in estimator: %s", self.resource_error2)
    
        def test_do_assignment_should_log_error_in_needed_resources(self):
            self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call(
                "Error(s) in estimator for otdb_id=%s radb_id=%s, setting task status to 'error'",
                self.resources_with_errors_otdb_id,
                self.task_id)
    
        def test_do_assignment_should_update_task_with_error_on_errors_in_needed_resources(self):
            self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error')
    
        def test_do_assignment_should_notify_bus_on_errors_in_needed_resources(self):
            content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
            subject = 'TaskError'
    
            self.specification_tree["otdb_id"] = self.resources_with_errors_otdb_id
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertBusNotificationAndLogging(content, subject)
    
        def ra_notification_bus_send_called_with(self, content, subject):
            found = False
            for call in self.ra_notification_bus_mock.send.call_args_list:
                if call[0][0].subject == subject and call[0][0].content == content:
                    found = True
            return found
    
        def test_do_assignment_should_log_start_of_claim_resources(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('claimResources: task %s needed_resources=%s' %
                                                  (self.task, self.rerpc_replymessage[str(str(self.otdb_id))]))
    
        def test_do_assigment_should_log_when_claiming_unknown_resource_type(self):
            self.specification_tree["otdb_id"] = self.unknown_resource_type_otdb_id
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warn.assert_any_call('claimResources: unknown resource_type:%s' %
                                                   self.unknown_resource_type_name)
    
        def test_do_assignment_logs_claimable_resources_in_specification(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s'
                                                  % ('storage', self.rerpc_needed_claim_for_storage))
            self.logger_mock.info.assert_any_call('claimResources: processing resource_type: %s contents: %s'
                                                  % ('bandwidth', self.rerpc_needed_claim_for_bandwidth))
    
        def test_do_assignment_logs_created_claim_per_needed_resource_type(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.storage_claim)
            self.logger_mock.info.assert_any_call('claimResources: created claim:%s' % self.bandwith_claim)
    
        def test_do_assignment_logs_amount_of_claims_inserted_in_radb(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('claimResources: inserting %d claims in the radb' % 2)
    
        def test_do_assignment_inserts_resource_claims_in_radb(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.insertResourceClaims.assert_any_call(self.task_id, self.specifaction_claims, 1, 'anonymous', -1)
    
        def test_do_assignment_logs_amount_claims_inserted(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call('claimResources: %d claims were inserted in the radb' % 2)
    
        def test_do_assignment_logs_unknown_property_on_needed_resources(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warn.assert_any_call('claimResources: unknown prop_type:%s' % self.resource_unknown_property)
    
        def test_do_assignment_logs_multiple_properties_on_needed_resource(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'claimResources: processing resource_type: %s subdict_name: \'%s\' subdict_contents: %s' %
                ("storage", "output_files", self.rerpc_needed_claim_for_storage_output_files))
    
        def test_do_assignment_logs_when_it_was_unable_to_claim_all_resources(self):
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': []}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'doAssignment: No claims could be made. Setting task %s status to error' % self.task_id)
    
        def test_do_assignment_updates_task_when_it_was_unable_to_claim_all_resources(self):
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': []}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error')
    
        def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_all_resources(self):
            content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
            subject = 'Task' + 'Error'
    
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': []}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertBusNotificationAndLogging(content, subject)
    
        def test_do_assignment_logs_when_it_was_unable_to_claim_some_resources(self):
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'doAssignment: Not all claims could be inserted. Setting task %s status to conflict' % self.task_id)
    
        def test_do_assignment_updates_task_when_it_was_unable_to_claim_some_resources(self):
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='conflict')
    
        def test_do_assignment_notifies_bus_when_it_was_unable_to_claim_some_resources(self):
            content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
            subject = 'Task' + 'Conflict'
    
            self.rarpc_mock.insertResourceClaims.return_value = {'ids': [1]}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertBusNotificationAndLogging(content, subject)
    
        def test_do_assignment_logs_when_there_are_conflicting_claims(self):
            conflicting_claims = [{}]
    
            self.rarpc_mock.getResourceClaims.return_value = conflicting_claims
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'doAssignment: %s conflicting claims detected. Task cannot be scheduled. %s' %
                (len(conflicting_claims), conflicting_claims))
    
        def test_do_assignment_notifies_bus_when_there_are_conflicting_claims(self):
            content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
            subject = 'Task' + 'Conflict'
    
            conflicting_claims = [{}]
            self.rarpc_mock.getResourceClaims.return_value = conflicting_claims
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertBusNotificationAndLogging(content, subject)
    
        def test_do_assignment_logs_when_all_resources_were_claimed(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'doAssignment: all claims for task %s were succesfully claimed. Setting claim statuses to allocated' %
                self.task_id)
    
        def test_do_assignment_updates_task_and_resources_as_claimed_in_radb(self):
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.updateTaskAndResourceClaims.assert_any_call(self.task_id, claim_status='allocated')
    
        def test_do_assignment_logs_task_data_removal_if_task_is_pipeline(self):
            self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call("removing data on disk from previous run for otdb_id %s", self.otdb_id)
    
        def test_do_assignment_removes_task_data_if_task_is_pipeline(self):
            self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.curpc_mock.removeTaskData.assert_any_call(self.task_otdb_id)
    
        def test_do_assignment_logs_when_taks_data_could_not_be_deleted(self):
            message = "file was locked"
            self.sqrpc_mock.getDiskUsageForOTDBId.return_value = {'found': True, 'disk_usage': 10}
            self.curpc_mock.removeTaskData.return_value = {'deleted': False, 'message': message}
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                "could not remove all data on disk from previous run for otdb_id %s: %s", self.otdb_id, message)
    
        def test_do_assignment_notifies_bus_when_task_is_scheduled(self):
            content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
            subject = 'Task' + 'Scheduled'
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertBusNotificationAndLogging(content, subject)
    
        def assertBusNotificationAndLogging(self, content, subject):
            self.assertTrue(self.ra_notification_bus_send_called_with(content, ra_notification_prefix + subject))
            self.logger_mock.info.assert_any_call('Sending notification %s: %s' %
                                                  (subject, str(content).replace('\n', ' ')))
    
        @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
        def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification(self, datetime_mock):
            self.freeze_time_one_day_in_the_future(datetime_mock)
    
            exception = Exception("Error something went wrong")
            self.otdbrpc_mock.taskSetSpecification.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call(exception)
    
        def test_do_assignment_logs_unparsable_start_time(self):
            self.specification_tree[u'specification'][u'Observation.startTime'] = "non parse"
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...',
                self.otdb_id)
    
        def test_do_assignment_logs_unparsable_stop_time(self):
            self.specification_tree[u'specification'][u'Observation.stopTime'] = "non parse"
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.warning.assert_any_call(
                'cannot parse for start/end time from specification for otdb_id=%s. searching for sane defaults...',
                self.otdb_id)
    
        def test_do_assignment_logs_exception_from_otdbrpc_taskSetSpecification_with_mom_bug(self):
            exception = Exception("Error something went wrong")
            self.otdbrpc_mock.taskSetSpecification.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.mom_bug_specification_tree)
    
            self.logger_mock.error.assert_any_call(exception)
    
        def test_do_assignment_logs_exception_from_rerpc(self):
            exception = Exception("Error something went wrong")
            self.rerpc_mock.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call(exception)
    
        def test_do_assignment_updates_task_on_exception_from_rerpc(self):
            exception = Exception("Error something went wrong")
            self.rerpc_mock.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.rarpc_mock.updateTask.assert_any_call(self.task_id, task_status='error')
    
        def test_do_assignment_notifies_bus_on_exception_from_rerpc(self):
            content = {'radb_id': self.task_id, 'otdb_id': self.task_otdb_id, 'mom_id': self.task_mom_id}
            subject = 'Task' + 'Error'
    
            exception = Exception("Error something went wrong")
            self.rerpc_mock.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.assertBusNotificationAndLogging(content, subject)
    
        def test_do_assignment_logs_when_notifies_bus_thows_exception(self):
            exception = Exception("Error something went wrong")
            self.ra_notification_bus_mock.send.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call(str(exception))
    
        def test_do_assignment_logs_when_momrpc_getPredecessorIds_throws_exception(self):
            exception = Exception("Error something went wrong")
            self.momrpc_mock.getPredecessorIds.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call(exception)
    
        def test_do_assignment_logs_when_momrpc_getSuccessorIds_throws_exception(self):
            exception = Exception("Error something went wrong")
            self.momrpc_mock.getSuccessorIds.side_effect = exception
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.error.assert_any_call(exception)
    
        @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
        def test_do_assignment_logs_exception_stop_time_parsing_on_predecessor(self, datetime_mock):
            self.freeze_time_one_day_in_the_future(datetime_mock)
    
            self.specification_tree[u'predecessors'][0]['specification'][u'Observation.stopTime'] = 'non parse'
            exception = ValueError('time data \'non parse\' does not match format \'%Y-%m-%d %H:%M:%S\'')
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            # workarround because assert_any_call(exception) seems not to work together
            found = False
            calls = self.logger_mock.error.call_args_list
            for args, kwargs in calls:
                first_argument = args[0]
                if type(first_argument) is type(exception) and first_argument.args == exception.args:
                    found = True
    
            self.assertTrue(found)
    
        @mock.patch('lofar.sas.resourceassignment.resourceassigner.assignment.datetime')
        def test_do_assignment_sets_start_and_end_time_on_unparsable_datetime(self, datetime_mock):
            now = self.freeze_time_one_day_in_the_future(datetime_mock)
    
            self.specification_tree['specification'][u'Observation.stopTime'] = 'non parse'
    
            new_starttime = now + datetime.timedelta(minutes=1)
            new_endtime = new_starttime + datetime.timedelta(hours=1)
    
            self.resourceAssigner.doAssignment(self.specification_tree)
    
            self.logger_mock.info.assert_any_call(
                'doAssignment: insertSpecification momId=%s, otdb_id=%s, status=%s, taskType=%s, startTime=%s, endTime=%s'
                ' cluster=%s' %
                (self.mom_id, self.otdb_id, self.state, self.task_type, new_starttime, new_endtime, "CEP4"))
    
    if __name__ == '__main__':
        unittest.main()