Skip to content
Snippets Groups Projects
Commit 76ea35ea authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8721: various minor tweaks

parent e0656b44
No related branches found
No related tags found
No related merge requests found
......@@ -123,7 +123,7 @@ class Location:
# the core command: do an srmls call and parse the results
# srmls can only yield max 900 items in a result, hence we can recurse for the next 900 by using the offset
lexar_nr = randint(1,4)
lexar_nr = randint(1,2)
cmd = ['ssh', 'ingest@10.178.1.%d' % lexar_nr, "bash", "-c", "\'source %s;srmls -l -count=900 -offset=%d %s%s\'" % (
'/globalhome/ingest/service/bin/init.sh' if lexar_nr <= 2 else '/globalhome/ingest/.grid/.ingest_profile',
offset,
......@@ -255,7 +255,6 @@ class ResultGetterThread(threading.Thread):
dir_id = dir['dir_id']
dir_name = dir['dir_name']
self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow())
site_id = dir['site_id']
site = self.db.site(site_id)
......@@ -269,16 +268,20 @@ class ResultGetterThread(threading.Thread):
logger.info(result)
if result.files:
result_file_tuple_set = set( (file.filename, file.size, file.created_at, dir_id) for file in result.files)
result_file_tuple_set = set( (file.filename.split('/')[-1], int(file.size), file.created_at, dir_id) for file in result.files)
#check for already known files in the db
known_file_tuple_set = set((file['filename'], file['size'], file['created_at'], dir_id) for file in self.db.filesInDirectory(dir_id))
known_file_tuple_set = set((str(file['name']), int(file['size']), file['creation_date'], dir_id) for file in self.db.filesInDirectory(dir_id))
new_file_tuple_set = result_file_tuple_set - known_file_tuple_set;
logger.info("%s %s: %d out of %d files are new, and %d are already known", site['name'], dir_name, len(new_file_tuple_set), len(result_file_tuple_set), len(known_file_tuple_set))
if new_file_tuple_set:
self.db.insertFileInfos(new_file_tuple_set)
file_ids = self.db.insertFileInfos(new_file_tuple_set)
if len(file_ids) != len(new_file_tuple_set):
logger.info('Rescheduling %s for new visit.' % (location.path(),))
self.db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000))
subDirectoryNames = [loc.directory for loc in result.subDirectories]
......@@ -291,8 +294,11 @@ class ResultGetterThread(threading.Thread):
logger.info("%s %s: %d out of %d subdirs are new, and %d are already known", site['name'], dir_name, len(new_subdir_name_set), len(subDirectoryNames), len(known_subDirectoryNames_set))
if new_subdir_name_set:
not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000)
self.db.insertSubDirectories(new_subdir_name_set, dir_id, not_visited_yet_timestamp)
subdir_ids = self.db.insertSubDirectories(new_subdir_name_set, dir_id)
if len(subdir_ids) != len(new_subdir_name_set):
logger.info('Rescheduling %s for new visit.' % (location.path(),))
self.db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000))
except (SrmlsException, ParseException) as e:
logger.error('Error while scanning %s\n%s' % (location.path(), str(e)))
......@@ -308,15 +314,15 @@ class ResultGetterThread(threading.Thread):
def populateDbWithLTASitesAndRootDirs(db):
if not db.sites():
db.insertSite('target', 'srm://srm.target.rug.nl:8444')
db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446')
#db.insertSite('target', 'srm://srm.target.rug.nl:8444')
#db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446')
db.insertSite('sara', 'srm://srm.grid.sara.nl:8443')
db.insertSite('juelich', 'srm://lofar-srm.fz-juelich.de:8443')
db.insertSite('poznan', 'srm://lta-head.lofar.psnc.pl:8443')
db.insertRootDirectory('target', '/lofar/ops')
db.insertRootDirectory('target', '/lofar/ops/disk')
db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar')
#db.insertRootDirectory('target', '/lofar/ops')
#db.insertRootDirectory('target', '/lofar/ops/disk')
#db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/ops')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/user')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/software')
......@@ -325,9 +331,6 @@ def populateDbWithLTASitesAndRootDirs(db):
db.insertRootDirectory('juelich', '/pnfs/fz-juelich.de/data/lofar/ops')
db.insertRootDirectory('poznan', '/lofar/ops/projects')
for dir_id in [x['dir_id'] for x in db.rootDirectories()]:
db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000))
def main():
'''the main function scanning all locations and gathering the results'''
......@@ -361,7 +364,7 @@ def main():
# some helper functions
def numLocationsInQueues():
'''returns the total number of locations in the queues'''
return db.numDirectoriesNotVisitedSince(datetime.datetime.utcnow() - datetime.timedelta(days=1))
return db.numDirectoriesNotVisitedSince(datetime.datetime.utcnow() - datetime.timedelta(days=7))
def totalNumGetters():
'''returns the total number of parallel running ResultGetterThreads'''
......@@ -391,7 +394,7 @@ def main():
#(os.getloadavg()[0] < 3*multiprocessing.cpu_count() and
#totalNumGetters() < 2.5*multiprocessing.cpu_count())):
while numLocationsInQueues() > 0 and (totalNumGetters() < options.parallel):
sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1))
sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=7))
for site_name, site_stats in sitesStats.items():
numGetters = len(getters[site_name])
......@@ -402,9 +405,9 @@ def main():
site_stats['# get'] = numGetters
site_stats['weight'] = weight
totalWeight = sum([site_stats['weight'] for site_stats in sitesStats.values()])
totalWeight = max(1.0, sum([site_stats['weight'] for site_stats in sitesStats.values()]))
#logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()])))
logger.debug("siteStats:\n%s" % str('\n'.join([str((k, v)) for k, v in sitesStats.items()])))
# now pick a random site using the weights
chosen_site_name = None
......@@ -422,21 +425,22 @@ def main():
break
chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id']
db.updateDirectoryLastVisitTime(chosen_dir_id, datetime.datetime.utcnow())
#logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id)
logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id)
# make and start a new ResultGetterThread the location deque of the chosen site
newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries)
newGetter.start()
getters[chosen_site_name].append(newGetter)
logger.info('numLocationsInQueues=%d totalNumGetters=%d' % (numLocationsInQueues(), totalNumGetters()))
logger.info('numLocationsInQueues=%d totalNumGetters=%d siteQueueLengths: %s' % (numLocationsInQueues(), totalNumGetters(),
' '.join(['%s:%d' % (name, stats['queue_length']) for name, stats in sitesStats.items()])))
# sleep before main loop next iteration
# to wait for some results
# and some getters to finish
if numLocationsInQueues() == 0:
time.sleep(1)
time.sleep(0.25 if numLocationsInQueues() == 0 else 5)
# all locations were processed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment