diff --git a/LTA/ltastorageoverview/lib/scraper.py b/LTA/ltastorageoverview/lib/scraper.py index e73bc12bd2b89b89105a1ff4c08035a2c69ae9c5..de1d0a4218bb37f53e2b1bec73ae01c24d0f8099 100755 --- a/LTA/ltastorageoverview/lib/scraper.py +++ b/LTA/ltastorageoverview/lib/scraper.py @@ -123,7 +123,7 @@ class Location: # the core command: do an srmls call and parse the results # srmls can only yield max 900 items in a result, hence we can recurse for the next 900 by using the offset - lexar_nr = randint(1,4) + lexar_nr = randint(1,2) cmd = ['ssh', 'ingest@10.178.1.%d' % lexar_nr, "bash", "-c", "\'source %s;srmls -l -count=900 -offset=%d %s%s\'" % ( '/globalhome/ingest/service/bin/init.sh' if lexar_nr <= 2 else '/globalhome/ingest/.grid/.ingest_profile', offset, @@ -255,7 +255,6 @@ class ResultGetterThread(threading.Thread): dir_id = dir['dir_id'] dir_name = dir['dir_name'] - self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow()) site_id = dir['site_id'] site = self.db.site(site_id) @@ -269,16 +268,20 @@ class ResultGetterThread(threading.Thread): logger.info(result) if result.files: - result_file_tuple_set = set( (file.filename, file.size, file.created_at, dir_id) for file in result.files) + result_file_tuple_set = set( (file.filename.split('/')[-1], int(file.size), file.created_at, dir_id) for file in result.files) #check for already known files in the db - known_file_tuple_set = set((file['filename'], file['size'], file['created_at'], dir_id) for file in self.db.filesInDirectory(dir_id)) + known_file_tuple_set = set((str(file['name']), int(file['size']), file['creation_date'], dir_id) for file in self.db.filesInDirectory(dir_id)) new_file_tuple_set = result_file_tuple_set - known_file_tuple_set; logger.info("%s %s: %d out of %d files are new, and %d are already known", site['name'], dir_name, len(new_file_tuple_set), len(result_file_tuple_set), len(known_file_tuple_set)) if new_file_tuple_set: - self.db.insertFileInfos(new_file_tuple_set) + file_ids = self.db.insertFileInfos(new_file_tuple_set) + + if len(file_ids) != len(new_file_tuple_set): + logger.info('Rescheduling %s for new visit.' % (location.path(),)) + self.db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000)) subDirectoryNames = [loc.directory for loc in result.subDirectories] @@ -291,8 +294,11 @@ class ResultGetterThread(threading.Thread): logger.info("%s %s: %d out of %d subdirs are new, and %d are already known", site['name'], dir_name, len(new_subdir_name_set), len(subDirectoryNames), len(known_subDirectoryNames_set)) if new_subdir_name_set: - not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000) - self.db.insertSubDirectories(new_subdir_name_set, dir_id, not_visited_yet_timestamp) + subdir_ids = self.db.insertSubDirectories(new_subdir_name_set, dir_id) + + if len(subdir_ids) != len(new_subdir_name_set): + logger.info('Rescheduling %s for new visit.' % (location.path(),)) + self.db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000)) except (SrmlsException, ParseException) as e: logger.error('Error while scanning %s\n%s' % (location.path(), str(e))) @@ -308,15 +314,15 @@ class ResultGetterThread(threading.Thread): def populateDbWithLTASitesAndRootDirs(db): if not db.sites(): - db.insertSite('target', 'srm://srm.target.rug.nl:8444') - db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446') + #db.insertSite('target', 'srm://srm.target.rug.nl:8444') + #db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446') db.insertSite('sara', 'srm://srm.grid.sara.nl:8443') db.insertSite('juelich', 'srm://lofar-srm.fz-juelich.de:8443') db.insertSite('poznan', 'srm://lta-head.lofar.psnc.pl:8443') - db.insertRootDirectory('target', '/lofar/ops') - db.insertRootDirectory('target', '/lofar/ops/disk') - db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar') + #db.insertRootDirectory('target', '/lofar/ops') + #db.insertRootDirectory('target', '/lofar/ops/disk') + #db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar') db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/ops') db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/user') db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/software') @@ -325,9 +331,6 @@ def populateDbWithLTASitesAndRootDirs(db): db.insertRootDirectory('juelich', '/pnfs/fz-juelich.de/data/lofar/ops') db.insertRootDirectory('poznan', '/lofar/ops/projects') - for dir_id in [x['dir_id'] for x in db.rootDirectories()]: - db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000)) - def main(): '''the main function scanning all locations and gathering the results''' @@ -361,7 +364,7 @@ def main(): # some helper functions def numLocationsInQueues(): '''returns the total number of locations in the queues''' - return db.numDirectoriesNotVisitedSince(datetime.datetime.utcnow() - datetime.timedelta(days=1)) + return db.numDirectoriesNotVisitedSince(datetime.datetime.utcnow() - datetime.timedelta(days=7)) def totalNumGetters(): '''returns the total number of parallel running ResultGetterThreads''' @@ -391,7 +394,7 @@ def main(): #(os.getloadavg()[0] < 3*multiprocessing.cpu_count() and #totalNumGetters() < 2.5*multiprocessing.cpu_count())): while numLocationsInQueues() > 0 and (totalNumGetters() < options.parallel): - sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1)) + sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=7)) for site_name, site_stats in sitesStats.items(): numGetters = len(getters[site_name]) @@ -402,9 +405,9 @@ def main(): site_stats['# get'] = numGetters site_stats['weight'] = weight - totalWeight = sum([site_stats['weight'] for site_stats in sitesStats.values()]) + totalWeight = max(1.0, sum([site_stats['weight'] for site_stats in sitesStats.values()])) - #logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()]))) + logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()]))) # now pick a random site using the weights chosen_site_name = None @@ -422,21 +425,22 @@ def main(): break chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id'] + db.updateDirectoryLastVisitTime(chosen_dir_id, datetime.datetime.utcnow()) - #logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) + logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) # make and start a new ResultGetterThread the location deque of the chosen site newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries) newGetter.start() getters[chosen_site_name].append(newGetter) - logger.info('numLocationsInQueues=%d totalNumGetters=%d' % (numLocationsInQueues(), totalNumGetters())) + logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s' % (numLocationsInQueues(), totalNumGetters(), + ' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in sitesStats.items()]))) # sleep before main loop next iteration # to wait for some results # and some getters to finish - if numLocationsInQueues() == 0: - time.sleep(1) + time.sleep(0.25 if numLocationsInQueues() == 0 else 5) # all locations were processed