From db63a759c375545f5e73272599b8f8693ab9e16a Mon Sep 17 00:00:00 2001 From: Jorrit Schaap <schaap@astron.nl> Date: Tue, 6 Sep 2016 09:56:27 +0000 Subject: [PATCH] Task #8721: scraper now works with new postgres db --- LTA/ltastorageoverview/lib/scraper.py | 93 +++++++++++++++++---------- LTA/ltastorageoverview/lib/store.py | 5 +- 2 files changed, 63 insertions(+), 35 deletions(-) diff --git a/LTA/ltastorageoverview/lib/scraper.py b/LTA/ltastorageoverview/lib/scraper.py index 5b9cbe2bdc3..45ea9a1f59a 100755 --- a/LTA/ltastorageoverview/lib/scraper.py +++ b/LTA/ltastorageoverview/lib/scraper.py @@ -28,12 +28,13 @@ import logging import time import datetime import sys +import socket import os import os.path import threading import multiprocessing from ltastorageoverview import store -from ltastorageoverview.utils import humanreadablesize +from lofar.common.util import humanreadablesize from random import random #logging.basicConfig(filename='scraper.' + time.strftime("%Y-%m-%d") + '.log', level=logging.DEBUG, format="%(asctime)-15s %(levelname)s %(message)s") @@ -125,7 +126,11 @@ class Location: # the core command: do an srmls call and parse the results # srmls can only yield max 900 items in a result, hence we can recurse for the next 900 by using the offset - cmd = ["bash", "-c", "source %s;srmls -l -count=900 -offset=%d %s%s" % ('/globalhome/ingest/service/bin/init.sh', offset, self.srmurl, self.directory)] + cmd = ["bash", "-c", "\'source %s;srmls -l -count=900 -offset=%d %s%s\'" % ('/globalhome/ingest/service/bin/init.sh', offset, self.srmurl, self.directory)] + hostname = socket.gethostname() + if not 'lexar' in hostname: + cmd = ['ssh', 'ingest@10.178.1.2'] + cmd + # logger.debug(' '.join(cmd)) p = subprocess.Popen(cmd, stdin=open('/dev/null'), stdout=subprocess.PIPE, stderr=subprocess.PIPE) logs = p.communicate() @@ -238,10 +243,10 @@ class ResultGetterThread(threading.Thread): '''Helper class to query Locations asynchronously for results. Gets the result for the first Location in the locations deque and appends it to the results deque Appends the subdirectory Locations at the end of the locations deque for later processing''' - def __init__(self, db, dir_id): + def __init__(self, dbcreds, dir_id, verbose=False): threading.Thread.__init__(self) self.daemon = True - self.db = db + self.db = store.LTAStorageDb(dbcreds, verbose) self.dir_id = dir_id def run(self): @@ -254,13 +259,13 @@ class ResultGetterThread(threading.Thread): if not dir: return - dir_id = dir[0] - dir_name = dir[1] + dir_id = dir['dir_id'] + dir_name = dir['dir_name'] self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow()) - site_id = dir[2] + site_id = dir['site_id'] site = self.db.site(site_id) - srm_url = site[2] + srm_url = site['url'] location = Location(srm_url, dir_name) @@ -270,20 +275,14 @@ class ResultGetterThread(threading.Thread): logger.info(result) with lock: - self.db.insertFileInfos([(file.filename, file.size, file.created_at, dir_id) for file in result.files]) - - # skip empty nikhef dirs - filteredSubDirectories = [loc for loc in result.subDirectories - if not ('nikhef' in loc.srmurl and 'generated' in loc.directory) ] - - # filteredSubDirectories = [loc for loc in filteredSubDirectories - # if not 'lc3_007' in loc.directory ] + if result.files: + self.db.insertFileInfos([(file.filename, file.size, file.created_at, dir_id) for file in result.files]) - subDirectoryNames = [loc.directory for loc in filteredSubDirectories] + subDirectoryNames = [loc.directory for loc in result.subDirectories] if subDirectoryNames: - self.db.insertSubDirectories(subDirectoryNames, dir_id, - datetime.datetime.utcnow() - datetime.timedelta(days=1000)) + not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000) + self.db.insertSubDirectories(subDirectoryNames, dir_id, not_visited_yet_timestamp) except (SrmlsException, ParseException) as e: logger.error('Error while scanning %s\n%s' % (location.path(), str(e))) @@ -297,19 +296,16 @@ class ResultGetterThread(threading.Thread): logger.info('Rescheduling dir_id %d for new visit.' % (self.dir_id,)) self.db.updateDirectoryLastVisitTime(self.dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000)) -def main(argv): - '''the main function scanning all locations and gathering the results''' - - db = store.LTAStorageDb('/data2/ltastorageoverview.sqlite') - +def populateDbWithLTASitesAndRootDirs(db): if not db.sites(): - db.insertSite('target', 'srm://srm.target.rug.nl:8444') + #db.insertSite('target', 'srm://srm.target.rug.nl:8444') db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446') db.insertSite('sara', 'srm://srm.grid.sara.nl:8443') db.insertSite('juelich', 'srm://lofar-srm.fz-juelich.de:8443') + db.insertSite('poznan', 'srm://lta-head.lofar.psnc.pl:8443') - db.insertRootDirectory('target', '/lofar/ops') - db.insertRootDirectory('target', '/lofar/ops/disk') + #db.insertRootDirectory('target', '/lofar/ops') + #db.insertRootDirectory('target', '/lofar/ops/disk') db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar') db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/ops') db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/user') @@ -317,13 +313,38 @@ def main(argv): db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/storage') db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/pulsar') db.insertRootDirectory('juelich', '/pnfs/fz-juelich.de/data/lofar/ops') + db.insertRootDirectory('poznan', '/lofar/ops/projects') - for dir_id in [x[0] for x in db.rootDirectories()]: + for dir_id in [x['dir_id'] for x in db.rootDirectories()]: db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000)) + +def main(argv): + '''the main function scanning all locations and gathering the results''' + + from optparse import OptionParser + from lofar.common import dbcredentials + + # Check the invocation arguments + parser = OptionParser("%prog [options]", description='runs the lta scraper and stores results in the speficied database.') + parser.add_option('-V', '--verbose', dest='verbose', action='store_true', help='verbose logging') + parser.add_option_group(dbcredentials.options_group(parser)) + parser.set_defaults(dbcredentials="LTASO") + (options, args) = parser.parse_args() + + logging.basicConfig(format='%(asctime)s %(levelname)s %(message)s', + level=logging.DEBUG if options.verbose else logging.INFO) + + dbcreds = dbcredentials.parse_options(options) + + logger.info("Using dbcreds: %s" % dbcreds.stringWithHiddenPassword()) + + db = store.LTAStorageDb(dbcreds, options.verbose) + populateDbWithLTASitesAndRootDirs(db) + # for each site we want one or more ResultGetterThreads # so make a dict with a list per site based on the locations - getters = dict([(site[1],[]) for site in db.sites()]) + getters = dict([(site['name'],[]) for site in db.sites()]) # some helper functions def numLocationsInQueues(): @@ -334,6 +355,9 @@ def main(argv): '''returns the total number of parallel running ResultGetterThreads''' return sum([len(v) for v in getters.values()]) + #print 'numLocationsInQueues()', numLocationsInQueues() + #print 'totalNumGetters()', totalNumGetters() + # only enter main loop if there is anything to process if numLocationsInQueues() > 0: @@ -351,9 +375,10 @@ def main(argv): # spawn new ResultGetterThreads # do not overload this host system - while numLocationsInQueues() > 0 and (totalNumGetters() <= 4 or - (os.getloadavg()[0] < 3*multiprocessing.cpu_count() and - totalNumGetters() < 2.5*multiprocessing.cpu_count())): + #while numLocationsInQueues() > 0 and (totalNumGetters() <= 4 or + #(os.getloadavg()[0] < 3*multiprocessing.cpu_count() and + #totalNumGetters() < 2.5*multiprocessing.cpu_count())): + while numLocationsInQueues() > 0 and (totalNumGetters() < 10): with lock: sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1)) @@ -388,8 +413,10 @@ def main(argv): chosen_dir_id = sitesStats[chosen_site_name]['least_recent_visited_dir_id'] + #logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) + # make and start a new ResultGetterThread the location deque of the chosen site - newGetter = ResultGetterThread(db, chosen_dir_id) + newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.verbose) newGetter.start() getters[chosen_site_name].append(newGetter) diff --git a/LTA/ltastorageoverview/lib/store.py b/LTA/ltastorageoverview/lib/store.py index a566a466212..5790413a4e8 100644 --- a/LTA/ltastorageoverview/lib/store.py +++ b/LTA/ltastorageoverview/lib/store.py @@ -110,7 +110,8 @@ class LTAStorageDb(PostgresDatabaseConnection): return fileinfo_id def insertFileInfos(self, file_infos): - insert_values = ','.join(self._cursor.mogrify('(%s, %s, %s, %s)', [(f[0].split('/')[-1], f[1], f[2], f[3]) for f in file_infos])) + insert_values = [self._cursor.mogrify('(%s, %s, %s, %s)', (f[0].split('/')[-1], f[1], f[2], f[3])) for f in file_infos] + insert_values = ','.join([x for x in insert_values]) query = '''insert into lta.fileinfo (name, size, creation_date, directory_id) VALUES {values} @@ -155,7 +156,7 @@ class LTAStorageDb(PostgresDatabaseConnection): '''returns lta.directory (id, name, site_id, site_name) for the given directory_id''' return self.executeQuery('''SELECT dir.id as dir_id, dir.name as dir_name, site.id as site_id, site.name as site_name FROM lta.storage_site_root_dir - join storage_site site on site.id = storage_site_root_dir.storage_site_id + join lta.storage_site site on site.id = storage_site_root_dir.storage_site_id join lta.directory_closure dc on dc.ancestor_id = storage_site_root_dir.directory_id join lta.directory dir on dir.id = dc.descendant_id where dc.descendant_id = %s; -- GitLab