diff --git a/LTA/ltastorageoverview/lib/create_db_ltastorageoverview.sql b/LTA/ltastorageoverview/lib/create_db_ltastorageoverview.sql index 09ddfb079c3cb8ac669f4dbe6f7e050475087def..ef2b044c969805fa6a3cdacde0bcf75eb1d4c359 100644 --- a/LTA/ltastorageoverview/lib/create_db_ltastorageoverview.sql +++ b/LTA/ltastorageoverview/lib/create_db_ltastorageoverview.sql @@ -200,6 +200,26 @@ CREATE TRIGGER trigger_on_directory_inserted_add_directory_closure_entry -------------------------------------------------------------------------------- +CREATE OR REPLACE FUNCTION lta.on_fileinfo_inserting_lock_changed_fileinfo_cache() + RETURNS trigger AS +$BODY$ +DECLARE +BEGIN + LOCK TABLE lta._changed_fileinfo_cache IN EXCLUSIVE MODE; + RETURN NEW; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; + +CREATE TRIGGER trigger_on_fileinfo_inserting_lock_changed_fileinfo_cache + BEFORE INSERT + ON lta.fileinfo + FOR EACH STATEMENT + EXECUTE PROCEDURE lta.on_fileinfo_inserting_lock_changed_fileinfo_cache(); + +-------------------------------------------------------------------------------- + CREATE OR REPLACE FUNCTION lta.on_fileinfo_inserted_add_to_cache() RETURNS trigger AS $BODY$ diff --git a/LTA/ltastorageoverview/lib/scraper.py b/LTA/ltastorageoverview/lib/scraper.py index 597fe4471321fc03ef3818c1bdab8eea1dd54e09..e73bc12bd2b89b89105a1ff4c08035a2c69ae9c5 100755 --- a/LTA/ltastorageoverview/lib/scraper.py +++ b/LTA/ltastorageoverview/lib/scraper.py @@ -234,10 +234,6 @@ class LocationResult: return sum([fileinfo.size for fileinfo in self.files]) -# our lock for safe guarding locations and results -# which will be queried in parallel -lock = threading.Lock() - class ResultGetterThread(threading.Thread): '''Helper class to query Locations asynchronously for results. Gets the result for the first Location in the locations deque and appends it to the results deque @@ -252,19 +248,18 @@ class ResultGetterThread(threading.Thread): '''A single location is pop\'ed from the locations deque and the results are queried. Resulting subdirectories are appended to the locations deque''' try: - with lock: - dir = self.db.directory(self.dir_id) + dir = self.db.directory(self.dir_id) - if not dir: - return + if not dir: + return - dir_id = dir['dir_id'] - dir_name = dir['dir_name'] - self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow()) + dir_id = dir['dir_id'] + dir_name = dir['dir_name'] + self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow()) - site_id = dir['site_id'] - site = self.db.site(site_id) - srm_url = site['url'] + site_id = dir['site_id'] + site = self.db.site(site_id) + srm_url = site['url'] location = Location(srm_url, dir_name) @@ -273,32 +268,31 @@ class ResultGetterThread(threading.Thread): result = location.getResult() logger.info(result) - with lock: - if result.files: - result_file_tuple_set = set( (file.filename, file.size, file.created_at, dir_id) for file in result.files) + if result.files: + result_file_tuple_set = set( (file.filename, file.size, file.created_at, dir_id) for file in result.files) - #check for already known files in the db - known_file_tuple_set = set((file['filename'], file['size'], file['created_at'], dir_id) for file in self.db.filesInDirectory(dir_id)) - new_file_tuple_set = result_file_tuple_set - known_file_tuple_set; + #check for already known files in the db + known_file_tuple_set = set((file['filename'], file['size'], file['created_at'], dir_id) for file in self.db.filesInDirectory(dir_id)) + new_file_tuple_set = result_file_tuple_set - known_file_tuple_set; - logger.info("%d out of %d files are new, and %d are already known", len(new_file_tuple_set), len(result_file_tuple_set), len(known_file_tuple_set)) + logger.info("%s %s: %d out of %d files are new, and %d are already known", site['name'], dir_name, len(new_file_tuple_set), len(result_file_tuple_set), len(known_file_tuple_set)) - if new_file_tuple_set: - self.db.insertFileInfos(new_file_tuple_set) + if new_file_tuple_set: + self.db.insertFileInfos(new_file_tuple_set) - subDirectoryNames = [loc.directory for loc in result.subDirectories] + subDirectoryNames = [loc.directory for loc in result.subDirectories] - if subDirectoryNames: - #check for already known subdirectories in the db - known_subDirectoryNames_set = set(subdir['name'] for subdir in self.db.subDirectories(dir_id)) + if subDirectoryNames: + #check for already known subdirectories in the db + known_subDirectoryNames_set = set(subdir['name'] for subdir in self.db.subDirectories(dir_id)) - new_subdir_name_set = set(subDirectoryNames) - known_subDirectoryNames_set; + new_subdir_name_set = set(subDirectoryNames) - known_subDirectoryNames_set; - logger.info("%d out of %d subdirs are new, and %d are already known", len(new_subdir_name_set), len(subDirectoryNames), len(known_subDirectoryNames_set)) + logger.info("%s %s: %d out of %d subdirs are new, and %d are already known", site['name'], dir_name, len(new_subdir_name_set), len(subDirectoryNames), len(known_subDirectoryNames_set)) - if new_subdir_name_set: - not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000) - self.db.insertSubDirectories(new_subdir_name_set, dir_id, not_visited_yet_timestamp) + if new_subdir_name_set: + not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000) + self.db.insertSubDirectories(new_subdir_name_set, dir_id, not_visited_yet_timestamp) except (SrmlsException, ParseException) as e: logger.error('Error while scanning %s\n%s' % (location.path(), str(e))) @@ -397,8 +391,7 @@ def main(): #(os.getloadavg()[0] < 3*multiprocessing.cpu_count() and #totalNumGetters() < 2.5*multiprocessing.cpu_count())): while numLocationsInQueues() > 0 and (totalNumGetters() < options.parallel): - with lock: - sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1)) + sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1)) for site_name, site_stats in sitesStats.items(): numGetters = len(getters[site_name]) @@ -433,19 +426,17 @@ def main(): #logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id) # make and start a new ResultGetterThread the location deque of the chosen site - newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.verbose) + newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries) newGetter.start() getters[chosen_site_name].append(newGetter) logger.info('numLocationsInQueues=%d totalNumGetters=%d' % (numLocationsInQueues(), totalNumGetters())) - # small sleep between starting multiple getters - time.sleep(0.25) - # sleep before main loop next iteration # to wait for some results - # and some getters to finis - time.sleep(1) + # and some getters to finish + if numLocationsInQueues() == 0: + time.sleep(1) # all locations were processed