Skip to content
Snippets Groups Projects
Commit f091c502 authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8721: do locking in db, not in client code.

parent bd7bcf4f
No related branches found
No related tags found
No related merge requests found
......@@ -200,6 +200,26 @@ CREATE TRIGGER trigger_on_directory_inserted_add_directory_closure_entry
--------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION lta.on_fileinfo_inserting_lock_changed_fileinfo_cache()
RETURNS trigger AS
$BODY$
DECLARE
BEGIN
LOCK TABLE lta._changed_fileinfo_cache IN EXCLUSIVE MODE;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
CREATE TRIGGER trigger_on_fileinfo_inserting_lock_changed_fileinfo_cache
BEFORE INSERT
ON lta.fileinfo
FOR EACH STATEMENT
EXECUTE PROCEDURE lta.on_fileinfo_inserting_lock_changed_fileinfo_cache();
--------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION lta.on_fileinfo_inserted_add_to_cache()
RETURNS trigger AS
$BODY$
......
......@@ -234,10 +234,6 @@ class LocationResult:
return sum([fileinfo.size for fileinfo in self.files])
# our lock for safe guarding locations and results
# which will be queried in parallel
lock = threading.Lock()
class ResultGetterThread(threading.Thread):
'''Helper class to query Locations asynchronously for results.
Gets the result for the first Location in the locations deque and appends it to the results deque
......@@ -252,19 +248,18 @@ class ResultGetterThread(threading.Thread):
'''A single location is pop\'ed from the locations deque and the results are queried.
Resulting subdirectories are appended to the locations deque'''
try:
with lock:
dir = self.db.directory(self.dir_id)
dir = self.db.directory(self.dir_id)
if not dir:
return
if not dir:
return
dir_id = dir['dir_id']
dir_name = dir['dir_name']
self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow())
dir_id = dir['dir_id']
dir_name = dir['dir_name']
self.db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow())
site_id = dir['site_id']
site = self.db.site(site_id)
srm_url = site['url']
site_id = dir['site_id']
site = self.db.site(site_id)
srm_url = site['url']
location = Location(srm_url, dir_name)
......@@ -273,32 +268,31 @@ class ResultGetterThread(threading.Thread):
result = location.getResult()
logger.info(result)
with lock:
if result.files:
result_file_tuple_set = set( (file.filename, file.size, file.created_at, dir_id) for file in result.files)
if result.files:
result_file_tuple_set = set( (file.filename, file.size, file.created_at, dir_id) for file in result.files)
#check for already known files in the db
known_file_tuple_set = set((file['filename'], file['size'], file['created_at'], dir_id) for file in self.db.filesInDirectory(dir_id))
new_file_tuple_set = result_file_tuple_set - known_file_tuple_set;
#check for already known files in the db
known_file_tuple_set = set((file['filename'], file['size'], file['created_at'], dir_id) for file in self.db.filesInDirectory(dir_id))
new_file_tuple_set = result_file_tuple_set - known_file_tuple_set;
logger.info("%d out of %d files are new, and %d are already known", len(new_file_tuple_set), len(result_file_tuple_set), len(known_file_tuple_set))
logger.info("%s %s: %d out of %d files are new, and %d are already known", site['name'], dir_name, len(new_file_tuple_set), len(result_file_tuple_set), len(known_file_tuple_set))
if new_file_tuple_set:
self.db.insertFileInfos(new_file_tuple_set)
if new_file_tuple_set:
self.db.insertFileInfos(new_file_tuple_set)
subDirectoryNames = [loc.directory for loc in result.subDirectories]
subDirectoryNames = [loc.directory for loc in result.subDirectories]
if subDirectoryNames:
#check for already known subdirectories in the db
known_subDirectoryNames_set = set(subdir['name'] for subdir in self.db.subDirectories(dir_id))
if subDirectoryNames:
#check for already known subdirectories in the db
known_subDirectoryNames_set = set(subdir['name'] for subdir in self.db.subDirectories(dir_id))
new_subdir_name_set = set(subDirectoryNames) - known_subDirectoryNames_set;
new_subdir_name_set = set(subDirectoryNames) - known_subDirectoryNames_set;
logger.info("%d out of %d subdirs are new, and %d are already known", len(new_subdir_name_set), len(subDirectoryNames), len(known_subDirectoryNames_set))
logger.info("%s %s: %d out of %d subdirs are new, and %d are already known", site['name'], dir_name, len(new_subdir_name_set), len(subDirectoryNames), len(known_subDirectoryNames_set))
if new_subdir_name_set:
not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000)
self.db.insertSubDirectories(new_subdir_name_set, dir_id, not_visited_yet_timestamp)
if new_subdir_name_set:
not_visited_yet_timestamp = datetime.datetime.utcnow() - datetime.timedelta(days=1000)
self.db.insertSubDirectories(new_subdir_name_set, dir_id, not_visited_yet_timestamp)
except (SrmlsException, ParseException) as e:
logger.error('Error while scanning %s\n%s' % (location.path(), str(e)))
......@@ -397,8 +391,7 @@ def main():
#(os.getloadavg()[0] < 3*multiprocessing.cpu_count() and
#totalNumGetters() < 2.5*multiprocessing.cpu_count())):
while numLocationsInQueues() > 0 and (totalNumGetters() < options.parallel):
with lock:
sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1))
sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1))
for site_name, site_stats in sitesStats.items():
numGetters = len(getters[site_name])
......@@ -433,19 +426,17 @@ def main():
#logger.debug("chosen_site_name: %s chosen_dir_id: %s", chosen_site_name, chosen_dir_id)
# make and start a new ResultGetterThread the location deque of the chosen site
newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.verbose)
newGetter = ResultGetterThread(dbcreds, chosen_dir_id, options.log_queries)
newGetter.start()
getters[chosen_site_name].append(newGetter)
logger.info('numLocationsInQueues=%d totalNumGetters=%d' % (numLocationsInQueues(), totalNumGetters()))
# small sleep between starting multiple getters
time.sleep(0.25)
# sleep before main loop next iteration
# to wait for some results
# and some getters to finis
time.sleep(1)
# and some getters to finish
if numLocationsInQueues() == 0:
time.sleep(1)
# all locations were processed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment