diff --git a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py index d17b25e59712a5811366ba1222d5db5dd302823d..b13a1c3e7dc362ccfe5cb0987d99cbcb74d2b42c 100755 --- a/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py +++ b/SAS/ResourceAssignment/RAtoOTDBTaskSpecificationPropagator/lib/translator.py @@ -189,7 +189,7 @@ class RAtoOTDBTranslator(): if prop.get('saps') is None or 'nr_of_cs_stokes' not in prop: continue - nr_cs_stokes = prop['nr_of_cs_stokes'] + nr_cs_stokes = prop['nr_of_cs_stokes'] # the 'cs_stokes' term here can also mean cv XXYY for sap in prop['saps']: if 'nr_of_cs_files' not in sap['properties'] or sap['properties']['nr_of_cs_files'] < 1: continue @@ -204,20 +204,15 @@ class RAtoOTDBTranslator(): filenames_per_sap.extend([[]] * added_sap_nrs) # idem if nr_parts_per_tab_per_sap[sap_nr] == 0: # be robust to out of order sap nrs - nr_parts_per_tab_per_sap[sap_nr] = sap['properties']['nr_of_cs_parts'] # independent of claim size + nr_parts_per_tab_per_sap[sap_nr] = sap['properties']['nr_of_cs_parts'] # regardless of claim size if 'is_tab_nr' in sap['properties']: is_tab_nrs_per_sap[sap_nr] = sap['properties']['is_tab_nr'] - # Determine the SAP's nr of CS tabs and parts in the prop's claim. - nr_cs_tabs = sap['properties']['nr_of_tabs'] # next, subtract IS tab if any - if 'nr_of_is_stokes' in prop and 'nr_of_is_files' in prop['properties']: - nr_cs_tabs -= prop['properties']['nr_of_is_files'] / prop['nr_of_is_stokes'] - nr_parts = sap['properties']['nr_of_cs_files'] / (nr_cs_stokes * nr_cs_tabs) # in this prop's claim! - # Stokes (IQUV/XXYY) always come together in a claim, but we must match COBALT's filename order, # which is parts within stokes. We don't yet know the total nr of files. - # First, do stokes within parts, then reorder when we have all names. + # First, do stokes within parts, then later reorder when we have all names. # The tab nr dim must be and remain the outer dim, even though it's sliced from the parts dim. + nr_parts = sap['properties']['nr_of_cs_files'] / nr_cs_stokes # in this prop's claim! nparts_tab = nr_parts_per_tab_per_sap[sap_nr] # alias for readability; this is also per stokes while nr_parts > 0: # loops over tab nrs tab_nr = next_tab_part_nrs_per_sap[sap_nr] / nparts_tab @@ -234,8 +229,8 @@ class RAtoOTDBTranslator(): (otdb_id, sap_nr, tab_nr, stokes_nr, part_nr)) skip.append("0") # what's this for? - next_tab_part_nrs_per_sap[sap_nr] += nr_parts - total_nr_files += nr_parts * nr_cs_stokes + next_tab_part_nrs_per_sap[sap_nr] += nparts_remain + total_nr_files += nparts_remain * nr_cs_stokes nr_parts -= nparts_remain if max_cs_sap_nr == -1: @@ -243,7 +238,7 @@ class RAtoOTDBTranslator(): logger.info('CreateCoherentStokes: total_nr_files = %d', total_nr_files) - # Concat lists in locations_per_sap and in filenames_per_sap, and reorder parts and stokes dims. + # Reorder parts and stokes dims, then concat lists in locations_per_sap and in filenames_per_sap locations2_per_sap = [[]] * len(locations_per_sap) filenames2_per_sap = [[]] * len(filenames_per_sap) for sap_nr in xrange(max_cs_sap_nr + 1): @@ -259,7 +254,7 @@ class RAtoOTDBTranslator(): locations_per_sap[sap_nr][tab_nr * nr_parts * nr_cs_stokes + part_nr * nr_cs_stokes + stokes_nr] filenames2_per_sap[sap_nr][tab_nr * nr_parts * nr_cs_stokes + stokes_nr * nr_parts + part_nr] = \ filenames_per_sap[sap_nr][tab_nr * nr_parts * nr_cs_stokes + part_nr * nr_cs_stokes + stokes_nr] - + locations = [] filenames = [] for i in xrange(len(locations_per_sap)): @@ -302,17 +297,10 @@ class RAtoOTDBTranslator(): locations_per_sap.extend([[]] * added_sap_nrs) # list of parts per sap (max 1 IS tab per sap) filenames_per_sap.extend([[]] * added_sap_nrs) # idem - #nr_parts_per_tab = sap['properties']['nr_of_cs_parts'] # independent of claim size (atm not needed for IS, as max 1 IS tab per sap) - - # Determine the SAP's nr of IS tabs and parts in the prop's claim. - nr_is_tabs = sap['properties']['nr_of_tabs'] # next, subtract CS tabs if any - if 'nr_of_cs_stokes' in prop and 'nr_of_cs_files' in prop['properties']: - nr_is_tabs -= prop['properties']['nr_of_cs_files'] / prop['nr_of_cs_stokes'] - nr_parts = sap['properties']['nr_of_is_files'] / (nr_is_stokes * nr_is_tabs) # in this prop's claim! - # Stokes (IQUV) always come together in a claim, but we must match COBALT's filename order, # which is parts within stokes. We don't yet know the total nr of files. - # First, do stokes within parts, then reorder when we have all names. + # First, do stokes within parts, then later reorder when we have all names. + nr_parts = sap['properties']['nr_of_is_files'] / nr_is_stokes # in this prop's claim! next_part_nr = next_tab_part_nrs_per_sap[sap_nr] for part_nr in xrange(next_part_nr, next_part_nr + nr_parts): for stokes_nr in xrange(nr_is_stokes): diff --git a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py index dd4bffa3ad3b9f81063a97e45af069d2f654426a..7ebc74002f846b9db8cea2e8180cecad4e30f21b 100755 --- a/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py +++ b/SAS/ResourceAssignment/ResourceAssigner/lib/assignment.py @@ -708,7 +708,6 @@ class ResourceAssigner(): logger.info('mergeClaims: merging claims for the same resource across %d claims', len(claims)) summablePropTypes = {db_resource_prop_types['nr_of_' + dt + '_files'] for dt in dtypes} - summablePropTypes.add(db_resource_prop_types['nr_of_tabs']) claims.sort( key=lambda claim: (claim['resource_id'], claim.get('properties')) ) i = 1 diff --git a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql index 6ec79bb2041c08d2764b21af01a5fabc3e614d24..611f7b5f8ff60c83ac09d335d9aa9bf2852191c7 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql +++ b/SAS/ResourceAssignment/ResourceAssignmentDatabase/sql/add_resource_allocation_statics.sql @@ -7,7 +7,7 @@ INSERT INTO resource_allocation.task_status VALUES (200, 'prepared'), (300, 'app (1150, 'error'), (1200, 'obsolete'); -- This is the list from OTDB, we'll need to merge it with the list from MoM in the future, might use different indexes? INSERT INTO resource_allocation.task_type VALUES (0, 'observation'),(1, 'pipeline'); -- We'll need more types INSERT INTO resource_allocation.resource_claim_status VALUES (0, 'claimed'), (1, 'allocated'), (2, 'conflict'); -INSERT INTO resource_allocation.resource_claim_property_type VALUES (0, 'nr_of_is_files'),(1, 'nr_of_cs_files'),(2, 'nr_of_uv_files'),(3, 'nr_of_im_files'),(4, 'nr_of_img_files'),(5, 'nr_of_pulp_files'),(6, 'nr_of_cs_stokes'),(7, 'nr_of_is_stokes'),(8, 'is_file_size'),(9, 'cs_file_size'),(10, 'uv_file_size'),(11, 'im_file_size'),(12, 'img_file_size'),(13, 'nr_of_pulp_files'),(14, 'nr_of_tabs'),(15, 'start_sb_nr'),(16,'uv_otdb_id'),(17,'cs_otdb_id'),(18,'is_otdb_id'),(19,'im_otdb_id'),(20,'img_otdb_id'),(21,'pulp_otdb_id'),(22, 'is_tab_nr'),(23, 'start_sbg_nr'),(24, 'pulp_file_size'),(25, 'nr_of_cs_parts'),(26, 'nr_of_is_parts'); +INSERT INTO resource_allocation.resource_claim_property_type VALUES (0, 'nr_of_is_files'),(1, 'nr_of_cs_files'),(2, 'nr_of_uv_files'),(3, 'nr_of_im_files'),(4, 'nr_of_img_files'),(5, 'nr_of_pulp_files'),(6, 'nr_of_cs_stokes'),(7, 'nr_of_is_stokes'),(8, 'is_file_size'),(9, 'cs_file_size'),(10, 'uv_file_size'),(11, 'im_file_size'),(12, 'img_file_size'),(13, 'nr_of_pulp_files'),(14, 'nr_of_cs_parts'),(15, 'start_sb_nr'),(16,'uv_otdb_id'),(17,'cs_otdb_id'),(18,'is_otdb_id'),(19,'im_otdb_id'),(20,'img_otdb_id'),(21,'pulp_otdb_id'),(22, 'is_tab_nr'),(23, 'start_sbg_nr'),(24, 'pulp_file_size'); INSERT INTO resource_allocation.resource_claim_property_io_type VALUES (0, 'output'),(1, 'input'); INSERT INTO resource_allocation.config VALUES (0, 'max_fill_ratio_CEP4_storage', '0.85'), (1, 'claim_timeout', '172800'), (2, 'min_inter_task_delay', '60'), (3, 'max_fill_ratio_CEP4_bandwidth', '0.75'); -- Just some values 172800 is two days in seconds INSERT INTO resource_allocation.conflict_reason diff --git a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py index 9f63f91f08f40a927f95a732006f16d2b4acc7a7..10229182a7524f43f1a212fa0a9c40699c2d9b47 100644 --- a/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py +++ b/SAS/ResourceAssignment/ResourceAssignmentEstimator/resource_estimators/observation.py @@ -64,7 +64,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): The following return value example is for an obs duration of 240.0 s and 3 data product types for 2 clusters. All values under saps are for the estimate only for that sap. - However, nr_of_cs_parts is per tab per stokes across all CS estimates for this sap. Idem for nr_of_is_parts. + However, nr_of_cs_parts is per SAP (per stokes component). More examples at scu001:/opt/lofar/var/log/raestimatorservice.log { 'errors': [], @@ -78,26 +78,26 @@ class ObservationResourceEstimator(BaseResourceEstimator): {'sap_nr': 2, 'properties': {'uv_file_size': 1073741824, 'nr_of_uv_files': 1, 'start_sb_nr': 80}} ] } - }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # per tab part in an IQUV/XXYY quad - 'resource_count': 35, 'root_resource_group': 'DRAGNET', - 'output_files': { # per tab part + }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # per tab part, incl all stokes + 'resource_count': 34, 'root_resource_group': 'DRAGNET', + 'output_files': { 'cs': {'nr_of_cs_stokes': 4, 'identifications': [...]}, 'saps': [{'sap_nr': 0, 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, - 'nr_of_tabs': 1, 'nr_of_cs_parts': 2}}] + 'nr_of_cs_parts': 2}}] # parts per tab for this sap } - }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # per tab part in an IQUV/XXYY quad + }, {'resource_types': {'bandwidth': 71582789, 'storage': 2147483648}, # per tab part, incl all stokes 'resource_count': 6, 'root_resource_group': 'DRAGNET', - 'output_files': { # per tab part + 'output_files': { 'cs': {'nr_of_cs_stokes': 4, 'identifications': [...]}, 'saps': [{'sap_nr': 1, 'properties': {'cs_file_size': 536870912, 'nr_of_cs_files': 4, - 'nr_of_tabs': 1, 'nr_of_cs_parts': 1}}] + 'is_tab_nr': 0, 'nr_of_cs_parts': 1}}] # parts per tab for this sap } - }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # per tab part + }, {'resource_types': {'bandwidth': 17895698, 'storage': 536870912}, # per tab part, incl all stokes 'resource_count': 1, 'root_resource_group': 'DRAGNET', - 'output_files': { # per tab part + 'output_files': { 'is': {'nr_of_is_stokes': 1, 'identifications': [...]}, 'saps': [{'sap_nr': 1, 'properties': {'is_file_size': 536870912, 'nr_of_is_files': 1, - 'nr_of_tabs': 1, 'nr_of_is_parts': 1, 'is_tab_nr': 0}}] + 'is_tab_nr': 0}}] # IS can have >1 parts, but atm max 1 IS TAB per SAP } }] } @@ -134,7 +134,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): errors.append('Produced observation resource estimate list is empty!') logger.error('empty observation resource estimate list!') - logger.info('Observation resource estimate(s): {}'.format(estimates)) + logger.debug('Observation resource estimate(s): {}'.format(estimates)) result = {'errors': errors, 'estimates': estimates} return result @@ -242,10 +242,12 @@ class ObservationResourceEstimator(BaseResourceEstimator): nr_subbands_per_file = min(subbands_per_file, nr_subbands) nr_coherent_tabs = 0 + is_tab_nr = -1 nr_tabs = parset.getInt('Observation.Beam[%d].nrTiedArrayBeams' % sap_nr) for tab_nr in xrange(nr_tabs): if not parset.getBool("Observation.Beam[%d].TiedArrayBeam[%d].coherent" % (sap_nr, tab_nr)): + is_tab_nr = tab_nr logger.info("coherentstokes: skipping incoherent tab") continue nr_coherent_tabs += 1 @@ -267,14 +269,14 @@ class ObservationResourceEstimator(BaseResourceEstimator): return None # Keep XXYY/IQUV together (>1 parts still possible). - # Else translator to parset filenames cannot know which stokes (nr_of_stokes property too coarse). + # Else translator to parset filenames cannot know which stokes (nr_of_XX_stokes property too coarse). # Also for complex voltages (XXYY) only: pipeline needs all 4 XXYY accessible from the same node. # # NOTE: If a TAB is split into parts, then the last TAB part may contain fewer subbands. # Simplify: compute a single (i.e. max) file size for all TABs or TAB parts. file_size = int(nr_subbands_per_file * size_per_subband) # bytes storage = file_size * nr_coherent # bytes - bandwidth = int(ceil(8 * file_size / duration)) # bits/second + bandwidth = int(ceil(8 * storage / duration)) # bits/second nr_parts_per_tab = int(ceil(nr_subbands / float(nr_subbands_per_file))) # thus per tab per stokes est = {'resource_types': {}, 'root_resource_group': root_resource_group, @@ -283,7 +285,9 @@ class ObservationResourceEstimator(BaseResourceEstimator): est['resource_types']['bandwidth'] = bandwidth est['resource_count'] = nr_coherent_tabs * nr_parts_per_tab est['output_files']['saps'] = [{'sap_nr': sap_nr, 'properties': {'cs_file_size': file_size, - 'nr_of_cs_files': nr_coherent, 'nr_of_tabs': 1, 'nr_of_cs_parts': nr_parts_per_tab}}] + 'nr_of_cs_files': nr_coherent, 'nr_of_cs_parts': nr_parts_per_tab}}] + if is_tab_nr != -1: # translator to filenames needs to know: it may not have all CS+IS info in one claim + est['output_files']['saps'][0]['properties']['is_tab_nr'] = is_tab_nr estimates.append(est) logger.debug("Coherent Stokes data estimates: {}".format(estimates)) @@ -349,14 +353,14 @@ class ObservationResourceEstimator(BaseResourceEstimator): return None # Keep IQUV together (>1 parts still possible). - # Else translator to parset filenames cannot know which stokes (nr_of_stokes property too coarse). + # Else translator to parset filenames cannot know which stokes (nr_of_XX_stokes property too coarse). # # NOTE: If a TAB is split into parts, then the last TAB part may contain fewer subbands. # Simplify: compute a single (i.e. max) file size for all TABs or TAB parts. - file_size = int(subbands_per_file * size_per_subband) # bytes + file_size = int(nr_subbands_per_file * size_per_subband) # bytes storage = file_size * nr_incoherent # bytes - bandwidth = int(ceil(8 * file_size / duration)) # bits/second - nr_parts_per_tab = int(ceil(nr_subbands / float(subbands_per_file))) # thus per tab per stokes + bandwidth = int(ceil(8 * storage / duration)) # bits/second + nr_parts_per_tab = int(ceil(nr_subbands / float(nr_subbands_per_file))) # thus per tab per stokes est = {'resource_types': {}, 'root_resource_group': root_resource_group, 'output_files': {'is': {'nr_of_is_stokes': nr_incoherent, 'identifications': idents}}} @@ -364,7 +368,7 @@ class ObservationResourceEstimator(BaseResourceEstimator): est['resource_types']['bandwidth'] = bandwidth est['resource_count'] = nr_incoherent_tabs * nr_parts_per_tab est['output_files']['saps'] = [{'sap_nr': sap_nr, 'properties': {'is_file_size': file_size, - 'nr_of_is_files': nr_incoherent, 'nr_of_tabs': 1, 'nr_of_is_parts': nr_parts_per_tab, 'is_tab_nr': is_tab_nr}}] + 'nr_of_is_files': nr_incoherent, 'is_tab_nr': is_tab_nr}}] estimates.append(est) logger.debug("Incoherent Stokes data estimates: {}".format(estimates))