Skip to content
Snippets Groups Projects
Commit db09ffc9 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

SW-283: start IngestEventHandler with configurable and proper default options...

SW-283: start IngestEventHandler with configurable and proper default options in scraper, so that the scraper will automatically visit the recent additions from ingest.
parent df48ede2
No related branches found
No related tags found
No related merge requests found
......@@ -131,7 +131,7 @@ class Location:
foundFiles = []
foundDirectories = []
logger.info("Scanning %s", self.path())
logger.info("Scanning %s with offset=%s", self.path(), offset)
# the core command: do an srmls call and parse the results
# srmls can only yield max 900 items in a result, hence we can recurse for the next 900 by using the offset
......@@ -410,10 +410,25 @@ def main():
from optparse import OptionParser
from lofar.common import dbcredentials
from lofar.messaging import setQpidLogLevel
from lofar.lta.ltastorageoverview.ingesteventhandler import IngestEventHandler
from lofar.lta.ltastorageoverview.ingesteventhandler import DEFAULT_BROKER
from lofar.lta.ltastorageoverview.ingesteventhandler import DEFAULT_INGEST_NOTIFICATION_QUEUE
from lofar.lta.ltastorageoverview.ingesteventhandler import DEFAULT_INGEST_NOTIFICATION_SUBJECTS
# Check the invocation arguments
parser = OptionParser("%prog [options]", description='runs the lta scraper and stores results in the speficied database.')
parser.add_option('-j', '--parallel', dest='parallel', type='int', default=8, help='number of parallel srmls jobs to run, default: %default')
parser.add_option('-q', '--broker', dest='broker', type='string', default=DEFAULT_BROKER,
help='Address of the qpid broker, default: %default')
parser.add_option('--ingest_notification_busname', dest='ingest_notification_busname', type='string',
default=DEFAULT_INGEST_NOTIFICATION_QUEUE,
help='Name of the notification bus exchange on the qpid broker on which the ingest notifications are published, default: %default')
parser.add_option('--ingest_notification_subjects', dest='ingest_notification_subjects', type='string',
default=DEFAULT_INGEST_NOTIFICATION_SUBJECTS,
help='Subject(s) to listen for on the ingest notification bus exchange on the qpid broker, default: %default')
parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging')
parser.add_option('-Q', '--log-queries', dest='log_queries', action='store_true', help='log all pqsl queries')
parser.add_option_group(dbcredentials.options_group(parser))
......@@ -422,7 +437,7 @@ def main():
logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s',
level=logging.DEBUG if options.verbose else logging.INFO)
setQpidLogLevel(logging.INFO)
options.parallel = max(1, min(8*multiprocessing.cpu_count(), options.parallel))
logger.info("Using maximum number of parallel srmls jobs: %d" % options.parallel)
......@@ -457,68 +472,70 @@ def main():
# loop over the locations and spawn ResultGetterThreads to get the results parallel
# use load balancing over the different sites and with respect to queue lengths
# do not overload this host system
while True:
cleanupFinishedGetters()
# spawn new ResultGetterThreads
# do not overload this host system
while (numLocationsInQueues() > 0 and
totalNumGetters() < options.parallel and
os.getloadavg()[0] < 4*multiprocessing.cpu_count()):
sitesStats = db.visitStats(datetime.datetime.utcnow() - VISIT_INTERVAL)
for site_name, site_stats in sitesStats.items():
numGetters = len(getters[site_name])
queue_length = site_stats['queue_length']
weight = float(queue_length) / float(20 * (numGetters + 1))
if numGetters == 0 and queue_length > 0:
weight = 1e6 # make getterless sites extra important, so each site keeps flowing
site_stats['# get'] = numGetters
site_stats['weight'] = weight
totalWeight = max(1.0, sum([site_stats['weight'] for site_stats in sitesStats.values()]))
logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()])))
# now pick a random site using the weights
chosen_site_name = None
cumul = 0.0
r = random()
for site_name,site_stats in sitesStats.items():
ratio = site_stats['weight']/totalWeight
cumul += ratio
if r <= cumul and site_stats['queue_length'] > 0:
chosen_site_name = site_name
break
with IngestEventHandler(dbcreds=dbcreds, busname=options.ingest_notification_busname,
subjects=options.ingest_notification_subjects, broker=options.broker):
while True:
if not chosen_site_name:
break
cleanupFinishedGetters()
# spawn new ResultGetterThreads
# do not overload this host system
while (numLocationsInQueues() > 0 and
totalNumGetters() < options.parallel and
os.getloadavg()[0] < 4*multiprocessing.cpu_count()):
sitesStats = db.visitStats(datetime.datetime.utcnow() - VISIT_INTERVAL)
for site_name, site_stats in sitesStats.items():
numGetters = len(getters[site_name])
queue_length = site_stats['queue_length']
weight = float(queue_length) / float(20 * (numGetters + 1))
if numGetters == 0 and queue_length > 0:
weight = 1e6 # make getterless sites extra important, so each site keeps flowing
site_stats['# get'] = numGetters
site_stats['weight'] = weight
totalWeight = max(1.0, sum([site_stats['weight'] for site_stats in sitesStats.values()]))
logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()])))
# now pick a random site using the weights
chosen_site_name = None
cumul = 0.0
r = random()
for site_name,site_stats in sitesStats.items():
ratio = site_stats['weight']/totalWeight
cumul += ratio
if r <= cumul and site_stats['queue_length'] > 0:
chosen_site_name = site_name
break
if not chosen_site_name:
break
chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id']
db.updateDirectoryLastVisitTime(chosen_dir_id, datetime.datetime.utcnow())
chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id']
db.updateDirectoryLastVisitTime(chosen_dir_id, datetime.datetime.utcnow())
logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id)
logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id)
# make and start a new ResultGetterThread the location deque of the chosen site
newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries)
newGetter.start()
getters[chosen_site_name].append(newGetter)
# make and start a new ResultGetterThread the location deque of the chosen site
newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries)
newGetter.start()
getters[chosen_site_name].append(newGetter)
cleanupFinishedGetters()
cleanupFinishedGetters()
logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s load_5min: %.1f' % (numLocationsInQueues(),
totalNumGetters(),
' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in sitesStats.items()]),
os.getloadavg()[0]))
logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s load_5min: %.1f' % (numLocationsInQueues(),
totalNumGetters(),
' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in sitesStats.items()]),
os.getloadavg()[0]))
# sleep before main loop next iteration
# to wait for some results
# and some getters to finish
time.sleep(5 if numLocationsInQueues() <= options.parallel else 0.25)
# sleep before main loop next iteration
# to wait for some results
# and some getters to finish
time.sleep(5 if numLocationsInQueues() <= options.parallel else 0.25)
# all locations were processed
# all locations were processed
if __name__ == "__main__":
main()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment