Skip to content
Snippets Groups Projects
Commit e61a04ce authored by Jorrit Schaap's avatar Jorrit Schaap
Browse files

Task #8721: scraper now works with new postgres db

parent db63a759
No related branches found
No related tags found
No related merge requests found
......@@ -141,6 +141,28 @@ CREATE TABLE metainfo.project_top_level_directory (
primary key (project_id, directory_id)
) WITH (OIDS=FALSE);
CREATE TABLE metainfo.observation (
id int,
PRIMARY KEY (id)
) WITH (OIDS=FALSE);
CREATE TABLE metainfo.project_observation (
id serial,
project_id integer NOT NULL REFERENCES metainfo.project ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE,
observation_id integer NOT NULL REFERENCES metainfo.observation ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE,
PRIMARY KEY (id)
) WITH (OIDS=FALSE);
CREATE TABLE metainfo.dataproducts (
id serial,
fileinfo_id integer NOT NULL REFERENCES lta.fileinfo ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE,
observation_id integer NOT NULL REFERENCES metainfo.observation ON DELETE CASCADE DEFERRABLE INITIALLY IMMEDIATE,
name text NOT NULL,
PRIMARY KEY (id)
) WITH (OIDS=FALSE);
CREATE INDEX dp_dataproduct_name_idx on metainfo.dataproducts(name);
-- END TABLES
......@@ -248,6 +270,7 @@ project_name text;
project_id int;
project_dir_name text;
project_dir_id int;
obs_id int;
BEGIN
new_dir_name := trim(trailing '/' from NEW.name);
project_pos := strpos(new_dir_name, '/projects');
......@@ -279,6 +302,20 @@ BEGIN
INSERT INTO metainfo.project_top_level_directory(project_id, directory_id)
VALUES (project_id, NEW.id)
ON CONFLICT DO NOTHING;
ELSE
dir_name_tail := substring(dir_name_tail from length(project_name)+2);
next_slash_pos := strpos(dir_name_tail, '/');
IF next_slash_pos > 0 THEN
BEGIN
obs_id := substring(dir_name_tail from 0 for next_slash_pos)::integer;
INSERT INTO metainfo.observation(id) VALUES (obs_id) ON CONFLICT DO NOTHING;
INSERT INTO metainfo.project_observation(project_id, observation_id)
VALUES (project_id, obs_id) ON CONFLICT DO NOTHING;
EXCEPTION WHEN invalid_text_representation THEN
END;
END IF;
END IF;
END IF;
END IF;
......@@ -299,6 +336,51 @@ CREATE TRIGGER trigger_on_directory_inserted_parse_project_info
--------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION lta.on_fileinfo_inserted_parse_observation_info()
RETURNS trigger AS
$BODY$
DECLARE
new_file_name text;
L_pos int;
first_underscore_pos int;
last_underscore_pos int;
obs_id int;
dataproduct_name text;
BEGIN
new_file_name := trim(leading '/' from NEW.name);
L_pos := strpos(new_file_name, 'L');
first_underscore_pos := strpos(new_file_name, '_');
IF L_pos > 0 AND first_underscore_pos > L_pos THEN
BEGIN
obs_id := substring(new_file_name from L_pos+1 for first_underscore_pos-2)::integer;
INSERT INTO metainfo.observation(id) VALUES (obs_id) ON CONFLICT DO NOTHING;
last_underscore_pos := length(new_file_name) - strpos(reverse(new_file_name), '_');
IF last_underscore_pos > L_pos THEN
dataproduct_name := substring(new_file_name from L_pos for last_underscore_pos);
INSERT INTO metainfo.dataproducts(fileinfo_id, observation_id, name)
VALUES (NEW.id, obs_id, dataproduct_name) ON CONFLICT DO NOTHING;
END IF;
EXCEPTION WHEN invalid_text_representation THEN
END;
END IF;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
CREATE TRIGGER trigger_on_fileinfo_inserted_parse_observation_info
AFTER INSERT
ON lta.fileinfo
FOR EACH ROW
EXECUTE PROCEDURE lta.on_fileinfo_inserted_parse_observation_info();
--------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION lta.on_site_deleted_delete_site_directories()
RETURNS trigger AS
$BODY$
......@@ -407,6 +489,15 @@ CREATE VIEW metainfo.site_directory_stats as
left join metainfo.tree_stats ts on ts.tree_root_directory_id = sdt.dir_id
order by rootdir_id, depth;
CREATE VIEW metainfo.project_observation_dataproducts as
select dp.id as dataproduct_id, dp.name as dataproduct_name, dp.observation_id as observation_id,
p.id as project_id, p.name as project_name
from metainfo.dataproducts dp
left join metainfo.project_observation po on po.observation_id = dp.observation_id
left join metainfo.project p on p.id = po.project_id
order by project_name, observation_id, dataproduct_name;
-- END VIEWS
COMMIT;
......
......@@ -299,27 +299,28 @@ class ResultGetterThread(threading.Thread):
def populateDbWithLTASitesAndRootDirs(db):
if not db.sites():
#db.insertSite('target', 'srm://srm.target.rug.nl:8444')
db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446')
db.insertSite('sara', 'srm://srm.grid.sara.nl:8443')
db.insertSite('juelich', 'srm://lofar-srm.fz-juelich.de:8443')
#db.insertSite('nikhef', 'srm://tbn18.nikhef.nl:8446')
#db.insertSite('sara', 'srm://srm.grid.sara.nl:8443')
#db.insertSite('juelich', 'srm://lofar-srm.fz-juelich.de:8443')
db.insertSite('poznan', 'srm://lta-head.lofar.psnc.pl:8443')
#db.insertRootDirectory('target', '/lofar/ops')
#db.insertRootDirectory('target', '/lofar/ops/disk')
db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/ops')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/user')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/software')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/storage')
db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/pulsar')
db.insertRootDirectory('juelich', '/pnfs/fz-juelich.de/data/lofar/ops')
db.insertRootDirectory('poznan', '/lofar/ops/projects')
#db.insertRootDirectory('nikhef', '/dpm/nikhef.nl/home/lofar')
#db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/ops')
#db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/user')
#db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/software')
#db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/storage')
#db.insertRootDirectory('sara', '/pnfs/grid.sara.nl/data/lofar/pulsar')
#db.insertRootDirectory('juelich', '/pnfs/fz-juelich.de/data/lofar/ops')
#db.insertRootDirectory('poznan', '/lofar/ops/projects')
db.insertRootDirectory('poznan', '/lofar/ops/projects/lt5_009')
for dir_id in [x['dir_id'] for x in db.rootDirectories()]:
db.updateDirectoryLastVisitTime(dir_id, datetime.datetime.utcnow() - datetime.timedelta(days=1000))
def main(argv):
def main():
'''the main function scanning all locations and gathering the results'''
from optparse import OptionParser
......@@ -341,7 +342,8 @@ def main(argv):
db = store.LTAStorageDb(dbcreds, options.verbose)
populateDbWithLTASitesAndRootDirs(db)
exit()
# for each site we want one or more ResultGetterThreads
# so make a dict with a list per site based on the locations
getters = dict([(site['name'],[]) for site in db.sites()])
......@@ -378,7 +380,7 @@ def main(argv):
#while numLocationsInQueues() > 0 and (totalNumGetters() <= 4 or
#(os.getloadavg()[0] < 3*multiprocessing.cpu_count() and
#totalNumGetters() < 2.5*multiprocessing.cpu_count())):
while numLocationsInQueues() > 0 and (totalNumGetters() < 10):
while numLocationsInQueues() > 0 and (totalNumGetters() < 1):
with lock:
sitesStats = db.visitStats(datetime.datetime.utcnow() - datetime.timedelta(days=1))
......@@ -433,5 +435,5 @@ def main(argv):
# all locations were processed
if __name__ == "__main__":
main(sys.argv[1:])
main()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment