diff --git a/LTA/ltastorageoverview/lib/scraper.py b/LTA/ltastorageoverview/lib/scraper.py index 318eb781e086c92d9765345a75affd7aab870d2a..64ca33f03a8b3eec556263224b95daf6e218d2ea 100755 --- a/LTA/ltastorageoverview/lib/scraper.py +++ b/LTA/ltastorageoverview/lib/scraper.py @@ -131,7 +131,7 @@ class Location: foundFiles = [] foundDirectories = [] - logger.info("Scanning %s", self.path()) + logger.info("Scanning %s with offset=%s", self.path(), offset) # the core command: do an srmls call and parse the results # srmls can only yield max 900 items in a result, hence we can recurse for the next 900 by using the offset @@ -410,10 +410,25 @@ def main(): from optparse import OptionParser from lofar.common import dbcredentials + from lofar.messaging import setQpidLogLevel + from lofar.lta.ltastorageoverview.ingesteventhandler import IngestEventHandler + from lofar.lta.ltastorageoverview.ingesteventhandler import DEFAULT_BROKER + from lofar.lta.ltastorageoverview.ingesteventhandler import DEFAULT_INGEST_NOTIFICATION_QUEUE + from lofar.lta.ltastorageoverview.ingesteventhandler import DEFAULT_INGEST_NOTIFICATION_SUBJECTS # Check the invocation arguments parser = OptionParser("%prog [options]", description='runs the lta scraper and stores results in the speficied database.') parser.add_option('-j', '--parallel', dest='parallel', type='int', default=8, help='number of parallel srmls jobs to run, default: %default') + + parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER, + help='Address of the qpid broker, default: %default') + parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string', + default=DEFAULT_INGEST_NOTIFICATION_QUEUE, + help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default') + parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string', + default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS, + help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default') + parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') parser.add_option('-Q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries') parser.add_option_group(dbcredentials.options_group(parser)) @@ -422,7 +437,7 @@ def main(): logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', level=logging.DEBUG if options.verbose else logging.INFO) - + setQpidLogLevel(logging.INFO) options.parallel = max(1, min(8*multiprocessing.cpu_count(), options.parallel)) logger.info("Using maximum number of parallel srmls jobs: %d" % options.parallel) @@ -457,68 +472,70 @@ def main(): # loop over the locations and spawn ResultGetterThreads to get the results parallel # use load balancing over the different sites and with respect to queue lengths # do not overload this host system - while True: - - cleanupFinishedGetters() - - # spawn new ResultGetterThreads - # do not overload this host system - while (numLocationsInQueues() > 0 and - totalNumGetters() < options.parallel and - os.getloadavg()[0] < 4*multiprocessing.cpu_count()): - sitesStats = db.visitStats(datetime.datetime.utcnow() - VISIT_INTERVAL) - - for site_name, site_stats in sitesStats.items(): - numGetters = len(getters[site_name]) - queue_length = site_stats['queue_length'] - weight = float(queue_length) / float(20 * (numGetters + 1)) - if numGetters == 0 and queue_length > 0: - weight = 1e6 # make getterless sites extra important, so each site keeps flowing - site_stats['# get'] = numGetters - site_stats['weight'] = weight - - totalWeight = max(1.0, sum([site_stats['weight'] for site_stats in sitesStats.values()])) - - logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()]))) - - # now pick a random site using the weights - chosen_site_name = None - cumul = 0.0 - r = random() - for site_name,site_stats in sitesStats.items(): - ratio = site_stats['weight']/totalWeight - cumul += ratio - - if r <= cumul and site_stats['queue_length'] > 0: - chosen_site_name = site_name - break + with IngestEventHandler(dbcreds=dbcreds, busname=options.ingest_notification_busname, + subjects=options.ingest_notification_subjects, broker=options.broker): + while True: - if not chosen_site_name: - break + cleanupFinishedGetters() + + # spawn new ResultGetterThreads + # do not overload this host system + while (numLocationsInQueues() > 0 and + totalNumGetters() < options.parallel and + os.getloadavg()[0] < 4*multiprocessing.cpu_count()): + sitesStats = db.visitStats(datetime.datetime.utcnow() - VISIT_INTERVAL) + + for site_name, site_stats in sitesStats.items(): + numGetters = len(getters[site_name]) + queue_length = site_stats['queue_length'] + weight = float(queue_length) / float(20 * (numGetters + 1)) + if numGetters == 0 and queue_length > 0: + weight = 1e6 # make getterless sites extra important, so each site keeps flowing + site_stats['# get'] = numGetters + site_stats['weight'] = weight + + totalWeight = max(1.0, sum([site_stats['weight'] for site_stats in sitesStats.values()])) + + logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()]))) + + # now pick a random site using the weights + chosen_site_name = None + cumul = 0.0 + r = random() + for site_name,site_stats in sitesStats.items(): + ratio = site_stats['weight']/totalWeight + cumul += ratio + + if r <= cumul and site_stats['queue_length'] > 0: + chosen_site_name = site_name + break + + if not chosen_site_name: + break - chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id'] - db.updateDirectoryLastVisitTime(chosen_dir_id, datetime.datetime.utcnow()) + chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id'] + db.updateDirectoryLastVisitTime(chosen_dir_id, datetime.datetime.utcnow()) - logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) + logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) - # make and start a new ResultGetterThread the location deque of the chosen site - newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries) - newGetter.start() - getters[chosen_site_name].append(newGetter) + # make and start a new ResultGetterThread the location deque of the chosen site + newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries) + newGetter.start() + getters[chosen_site_name].append(newGetter) - cleanupFinishedGetters() + cleanupFinishedGetters() - logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s load_5min: %.1f' % (numLocationsInQueues(), - totalNumGetters(), - ' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in sitesStats.items()]), - os.getloadavg()[0])) + logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s load_5min: %.1f' % (numLocationsInQueues(), + totalNumGetters(), + ' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in sitesStats.items()]), + os.getloadavg()[0])) - # sleep before main loop next iteration - # to wait for some results - # and some getters to finish - time.sleep(5 if numLocationsInQueues() <= options.parallel else 0.25) + # sleep before main loop next iteration + # to wait for some results + # and some getters to finish + time.sleep(5 if numLocationsInQueues() <= options.parallel else 0.25) - # all locations were processed + # all locations were processed if __name__ == "__main__": main()