Skip to content
Snippets Groups Projects
Commit 0a879313 authored by Alexey Mints's avatar Alexey Mints
Browse files

Task #2699 First commit for the new GSM database (under construction).

parent 41dafc60
No related branches found
No related tags found
No related merge requests found
Showing
with 1032 additions and 0 deletions
......@@ -741,6 +741,66 @@ CEP/DP3/DPPP/test/CS1_IDPPP.parset -text
CEP/DP3/DPPP/test/tNDPPP.in_MS.tgz -text svneol=unset#application/x-compressed-tar
CEP/DP3/DPPP/test/tmwflagger.in_cd -text
CEP/DP3/DPPP/test/tmwflagger.in_vd -text
CEP/GSM/bremen/sql/create.function.alpha.sql -text
CEP/GSM/bremen/sql/create.procedure.BuildFrequencyBands.sql -text
CEP/GSM/bremen/sql/create.procedure.fill_temp_assoc_kind.sql -text
CEP/GSM/bremen/sql/tables/create.table.assoccatsources.sql -text
CEP/GSM/bremen/sql/tables/create.table.assocxtrsources.sql -text
CEP/GSM/bremen/sql/tables/create.table.datasets.sql -text
CEP/GSM/bremen/sql/tables/create.table.detections.sql -text
CEP/GSM/bremen/sql/tables/create.table.extractedsources.sql -text
CEP/GSM/bremen/sql/tables/create.table.frequencybands.sql -text
CEP/GSM/bremen/sql/tables/create.table.images.sql -text
CEP/GSM/bremen/sql/tables/create.table.runningcatalog.sql -text
CEP/GSM/bremen/sql/tables/create.table.runningcatalog_fluxes.sql -text
CEP/GSM/bremen/sql/tables/create.table.temp_associations.sql -text
CEP/GSM/bremen/sql/tables/create.table.temprunningcatalog.sql -text
CEP/GSM/bremen/sql/tables/recreate_tables.py -text
CEP/GSM/bremen/src/__init__.py -text
CEP/GSM/bremen/src/bbsfilesource.py -text
CEP/GSM/bremen/src/connectionMonet.py -text
CEP/GSM/bremen/src/connectionPostgres.py -text
CEP/GSM/bremen/src/errors.py -text
CEP/GSM/bremen/src/grouper.py -text
CEP/GSM/bremen/src/gsmapi.py -text
CEP/GSM/bremen/src/gsmconnectionmanager.py -text
CEP/GSM/bremen/src/gsmlogger.py -text
CEP/GSM/bremen/src/gsmparset.py -text
CEP/GSM/bremen/src/gsmutils.py -text
CEP/GSM/bremen/src/pipeline.py -text
CEP/GSM/bremen/src/queries.py -text
CEP/GSM/bremen/src/sqllist.py -text
CEP/GSM/bremen/src/sqllist.sql -text
CEP/GSM/bremen/src/sqllist_api.sql -text
CEP/GSM/bremen/tests/__init__.py -text
CEP/GSM/bremen/tests/bbsfiletest.py -text
CEP/GSM/bremen/tests/data/bbs_field.dat -text
CEP/GSM/bremen/tests/data/bbs_field0.dat -text
CEP/GSM/bremen/tests/data/field2.dat -text
CEP/GSM/bremen/tests/data/field_multy.dat -text
CEP/GSM/bremen/tests/data/field_multy2.dat -text
CEP/GSM/bremen/tests/data/image1.dat -text
CEP/GSM/bremen/tests/data/image2.dat -text
CEP/GSM/bremen/tests/data/image3.dat -text
CEP/GSM/bremen/tests/data/image4.dat -text
CEP/GSM/bremen/tests/data/image5.dat -text
CEP/GSM/bremen/tests/data/image6.dat -text
CEP/GSM/bremen/tests/data/new_field.dat -text
CEP/GSM/bremen/tests/gsmconnection.py -text
CEP/GSM/bremen/tests/image1.parset -text
CEP/GSM/bremen/tests/image2.parset -text
CEP/GSM/bremen/tests/image3.parset -text
CEP/GSM/bremen/tests/image4.parset -text
CEP/GSM/bremen/tests/matching.py -text
CEP/GSM/bremen/tests/parset.py -text
CEP/GSM/bremen/tests/pipeline.py -text
CEP/GSM/bremen/tests/pipeline.py.orig -text
CEP/GSM/bremen/tests/pipeline1.parset -text
CEP/GSM/bremen/tests/sample.parset -text
CEP/GSM/bremen/tests/tempparset.py -text
CEP/GSM/bremen/tests/testlib.py -text
CEP/GSM/bremen/tests/wrong1.parset -text
CEP/GSM/bremen/tests/wrong2.parset -text
CEP/GSM/db/batches/setup.db.Dec2011.batch -text
CEP/GSM/db/batches/setup.db.batch -text
CEP/GSM/db/functions/create.function.alpha.sql -text
......@@ -797,6 +857,9 @@ CEP/GSM/db/tables/create.table.temprunningcatalog.sql -text
CEP/GSM/db/tables/create.table.versions.sql -text
CEP/GSM/src/gsmutils.py -text
CEP/GSM/src/lsm.py -text
CEP/GSM/src/lsm_upgrade/bbs_parameters.sql -text
CEP/GSM/src/lsm_upgrade/bbs_params.dat.gz -text
CEP/GSM/src/lsm_upgrade/new_lsm.py -text
CEP/GSM/src/ms3_script.py -text
CEP/GSM/src/msssprocess.py -text
CEP/Imager/LofarFT/TODO.txt -text
......
--DROP FUNCTION alpha;
/**
* This function computes the ra expansion for a given theta at
* a given declination.
* theta and decl are both in degrees.
*/
CREATE FUNCTION alpha(theta DOUBLE, decl DOUBLE) RETURNS DOUBLE
BEGIN
IF ABS(decl) + theta > 89.9 THEN
RETURN 180;
ELSE
RETURN DEGREES(ABS(ATAN(SIN(RADIANS(theta)) / SQRT(ABS(COS(RADIANS(decl - theta)) * COS(RADIANS(decl + theta))))))) ;
END IF ;
END;
--DROP PROCEDURE BuildFrequencyBands;
/**
*/
CREATE PROCEDURE BuildFrequencyBands()
BEGIN
/* Some cataloged sources do not have spectral information
(e.g. exoplanets) and are referred to freq 0
*/
INSERT INTO frequencybands
(freqbandid
,freq_central
,freq_low
,freq_high
)
VALUES
(0
,NULL
,NULL
,NULL
)
;
INSERT INTO frequencybands
(freq_central
,freq_low
,freq_high
)
VALUES
(30000000
,30000000 - 900000 / 2
,30000000 + 900000 / 2
)
,
(34000000
,34000000 - 960000 / 2
,34000000 + 960000 / 2
)
,
(38000000
,38000000 - 1040000 / 2
,38000000 + 1040000 / 2
)
,
(42000000
,42000000 - 1100000 / 2
,42000000 + 1100000 / 2
)
,
(120000000
,120000000 - 350000 / 2
,120000000 + 350000 / 2
)
,
(130000000
,130000000 - 450000 / 2
,130000000 + 450000 / 2
)
,
(140000000
,140000000 - 550000 / 2
,140000000 + 550000 / 2
)
,
(150000000
,150000000 - 700000 / 2
,150000000 + 700000 / 2
)
,
(160000000
,160000000 - 850000 / 2
,160000000 + 850000 / 2
)
,
(170000000
,170000000 - 1100000 / 2
,170000000 + 1100000 / 2
)
,
(325000000
,325000000 - 10000000 / 2
,325000000 + 10000000 / 2
)
,
(352000000
,352000000 - 20000000 / 2
,352000000 + 20000000 / 2
)
,
(640000000
,640000000 - 100000000 / 2
,640000000 + 100000000 / 2
)
,
(850000000
,850000000 - 100000000 / 2
,850000000 + 100000000 / 2
)
,
(1400000000
,1400000000 - 260000000 / 2
,1400000000 + 260000000 / 2
)
,
(2300000000
,2300000000 - 250000000 / 2
,2300000000 + 250000000 / 2
)
,
(4800000000
,4800000000 - 250000000 / 2
,4800000000 + 250000000 / 2
)
,
(8500000000
,8500000000 - 250000000 / 2
,8500000000 + 250000000 / 2
)
;
END;
drop procedure fill_temp_assoc_kind;
create procedure fill_temp_assoc_kind()
begin
update temp_associations
set xtr_count = (select count(xtrsrc_id)
from temp_associations as ta
where ta.xtrsrc_id = temp_associations.xtrsrc_id);
update temp_associations
set run_count = (select count(runcat_id)
from temp_associations as ta
where ta.runcat_id = temp_associations.runcat_id);
update temp_associations
set kind = 1
where xtr_count = 1
and run_count = 1;
update temp_associations
set kind = 2
where xtr_count > 1
and run_count = 1;
update temp_associations
set kind = 3
where xtr_count = 1
and run_count > 1;
update temp_associations
set kind = 4
where xtr_count > 1
and run_count > 1;
--complete groups
update temp_associations
set kind = 4
where kind <> 4
and exists (select kind from temp_associations ta
where ta.runcat_id = temp_associations.runcat_id
and kind = 4);
update temp_associations
set kind = 4
where kind <> 4
and exists (select kind from temp_associations ta
where ta.xtrsrc_id = temp_associations.xtrsrc_id
and kind = 4);
update temp_associations
set group_head_id = runcat_id
where kind = 4
and group_head_id is null;
end;
/**
* This table stores the information about the sources that
* could be associated.
* src_type: Either 'X' or 'C', for associations
* in extractedsources or catalogedsources
* xrtsrc_id: This is the xtrsrcid that corresponds to the
* first detection
* assoc_xtrsrcid: This is the id of the source that could be
* associated to a previously detection
* (corresponding to assoc_xtrsrcid)
* assoc_lr_method The applied method to determine the likelihood ratio
* (0) default, no method applied
* (1) de Ruiter
* (2) Rutledge
* (3) Rutledge/Masci
* (4) Sutherland & Saunders (1992)
* (5) TKP (see doc)
*/
--CREATE SEQUENCE "seq_assoccatsources" AS INTEGER;
CREATE TABLE assoccatsources
/*(id INT DEFAULT NEXT VALUE FOR "seq_assoccatsources"*/
(xtrsrc_id INT NOT NULL
,assoc_catsrc_id INT NOT NULL
,assoc_weight DOUBLE NULL
,assoc_distance_arcsec DOUBLE NULL
,assoc_lr_method INT NULL DEFAULT 0
,assoc_r DOUBLE NULL
,assoc_loglr DOUBLE NULL
/*,PRIMARY KEY (id)
,FOREIGN KEY (xtrsrc_id) REFERENCES extractedsources (xtrsrcid)
,FOREIGN KEY (assoc_catsrc_id) REFERENCES catalogedsources (catsrcid)*/
)
;
--drop table assocxtrsources;
/**
* This table stores the information about the sources that
* could be associated.
* src_type: Either 'X' or 'C', for associations
* in extractedsources or catalogedsources
* xrtsrc_id: This is the xtrsrcid that corresponds to the
* first detection
* assoc_xtrsrcid: This is the id of the source that could be
* associated to a previously detection
* (corresponding to assoc_xtrsrcid)
*/
--CREATE SEQUENCE "seq_assocxtrsources" AS INTEGER;
CREATE TABLE assocxtrsources
/*(id INT DEFAULT NEXT VALUE FOR "seq_assocxtrsources"*/
(xtrsrc_id INT NOT NULL
,runcat_id INT NULL
,weight DOUBLE NULL
,distance_arcsec DOUBLE NULL
,lr_method INT NULL DEFAULT 0
,r DOUBLE NULL
,lr DOUBLE NULL
/*,PRIMARY KEY (id)
,FOREIGN KEY (xtrsrc_id) REFERENCES extractedsources (xtrsrcid)
,FOREIGN KEY (assoc_xtrsrc_id) REFERENCES extractedsources (xtrsrcid)*/
)
;
/**
* This table contains the information about the dataset that is produced by LOFAR.
* A dataset has an integration time and consists of multiple frequency layers.
* taustart_timestamp: the start time of the integration
*/
CREATE SEQUENCE "seq_datasets" AS INTEGER;
CREATE TABLE datasets (
dsid INT DEFAULT NEXT VALUE FOR "seq_datasets",
rerun INT NOT NULL DEFAULT '0',
dstype TINYINT NOT NULL,
process_ts TIMESTAMP NOT NULL,
dsinname VARCHAR(64) NOT NULL,
dsoutname VARCHAR(64) DEFAULT NULL,
description VARCHAR(100) DEFAULT NULL,
PRIMARY KEY (dsid)
);
/**
* This is a temporary table, used to load
* the detections from the sources extraction.
*/
CREATE TABLE detections
(image_id VARCHAR(64) NOT NULL
,lra DOUBLE NOT NULL
,ldecl DOUBLE NOT NULL
,lra_err DOUBLE NOT NULL
,ldecl_err DOUBLE NOT NULL
,lf_peak DOUBLE NULL
,lf_peak_err DOUBLE NULL
,lf_int DOUBLE NULL
,lf_int_err DOUBLE NULL
,ldet_sigma DOUBLE NOT NULL
)
;
/**
* This table contains all the extracted sources during an observation.
* Maybe source is not the right description, because measurements
* may be made that were erronous and do not represent a source.
*
* This table is empty BEFORE an observation.
* DURING an observation new sources are inserted into this table
* AFTER an observation this table is dumped and transported to the
* catalog database
*
* xtrsrcid Every inserted source/measurement gets a unique id.
* image_id The reference id to the image from which this sources
* was extracted.
* zone The zone number in which the source declination resides.
* The width of the zones is determined by the "zoneheight"
* parameter defined in the zoneheight table.
* ra Right ascension of the measurement [in degrees]
* decl Declination of the measurement [in degrees]
* ra_err The 1sigma error of the ra measurement [in arcsec]
* decl_err The 1sigma error of the declination measurement [in arcsec]
* x, y, z: Cartesian coordinate representation of (ra,decl)
* margin Used for association procedures to take into
* account sources that lie close to ra=0 & ra=360 meridian.
* True: source is close to ra=0 meridian
* False: source is far away enough from the ra=0 meridian
* TODO: This is not implemented yet.
* det_sigma: The sigma level of the detection,
* 20*(I_peak/det_sigma) gives the rms of the detection.
* semimajor Semi-major axis that was used for gauss fitting
* [in arcsec]
* semiminor Semi-minor axis that was used for gauss fitting
* [in arcsec]
* pa Position Angle that was used for gauss fitting
* [from north through local east, in degrees]
* f_peak peak flux values (refer to images table for Stokes param)
* f_int integrated flux values
* err 1sigma errors
* Fluxes and flux errors are in Jy
*
*/
CREATE SEQUENCE "seq_extractedsources" AS INTEGER
START WITH 1
INCREMENT BY 1
;
CREATE TABLE extractedsources
(xtrsrcid INT DEFAULT NEXT VALUE FOR "seq_extractedsources"
,image_id INT NOT NULL
,zone INT NOT NULL
,ra DOUBLE NOT NULL
,decl DOUBLE NOT NULL
,ra_err DOUBLE NOT NULL
,decl_err DOUBLE NOT NULL
,x DOUBLE NOT NULL
,y DOUBLE NOT NULL
,z DOUBLE NOT NULL
,margin BOOLEAN NOT NULL DEFAULT 0
,det_sigma DOUBLE NOT NULL
,source_kind smallint not null default 0 -- 0-Point; 1-Gaussian;
,g_major DOUBLE NULL
,g_minor DOUBLE NULL
,g_pa DOUBLE NULL
,f_peak DOUBLE NULL
,f_peak_err DOUBLE NULL
,f_int DOUBLE NULL
,f_int_err DOUBLE NULL
,PRIMARY KEY (xtrsrcid)
--,FOREIGN KEY (image_id) REFERENCES images (imageid)
)
;
/**
* This table contains the frequencies at which the extracted sources
* were detected. It might also be preloaded with the frequencies
* at which the stokes of the catalog sources were measured.
*/
CREATE SEQUENCE "seq_frequencybands" AS INTEGER;
CREATE TABLE frequencybands (
freqbandid INT NOT NULL DEFAULT NEXT VALUE FOR "seq_frequencybands",
freq_central DOUBLE DEFAULT NULL,
freq_low DOUBLE DEFAULT NULL,
freq_high DOUBLE DEFAULT NULL,
PRIMARY KEY (freqbandid)
);
/**
* This table contains the images that are being processed.
* The only format for now is FITS. The HDF5 format will be implemented
* later.
* An image is characterised by
* - integration time (tau)
* - frequency band (band)
* - timestamp (seq_nr).
* A group of images that belong together (not specified any further)
* are in the same data set (they have the same ds_id).
* tau_time in seconds (ref. tau)
* freq_eff in Hz (ref. band)
* taustart_timestamp in YYYY-MM-DD-HH:mm:ss:nnnnnn, but without
* interpunctions (ref. seq_nr)
* bmaj, bmin, bpa are the major, minor axes of the synthesized beam in degrees
* NOTE that it are NOT the semimajor axes.
* centr_ra and _decl are the central coordinates (J2000) of the image in degrees.
*/
CREATE SEQUENCE "seq_images" AS INTEGER;
CREATE TABLE images
(imageid INT DEFAULT NEXT VALUE FOR "seq_images"
,ds_id INT NOT NULL
,tau INT NOT NULL
,band INT NOT NULL
,stokes CHAR(1) NOT NULL DEFAULT 'I'
,imagename CHAR(64) NOT NULL --unique LOFAR image id
--,tau_time DOUBLE NOT NULL
--,freq_eff DOUBLE NOT NULL
--,freq_bw DOUBLE NULL
--,taustart_ts TIMESTAMP NOT NULL
--,bmaj DOUBLE NOT NULL
--,bmin DOUBLE NOT NULL
--,bpa DOUBLE NOT NULL
,centr_ra DOUBLE NOT NULL
,centr_decl DOUBLE NOT NULL
,fov_radius double null -- field of view size
--,x DOUBLE NOT NULL
--,y DOUBLE NOT NULL
--,z DOUBLE NOT NULL
,url VARCHAR(120) NULL
,reprocessing INT NOT NULL DEFAULT 0
,obsolete boolean not null default false
,PRIMARY KEY (imageid)
--,FOREIGN KEY (ds_id) REFERENCES datasets (dsid)
,FOREIGN KEY (band) REFERENCES frequencybands (freqbandid)
)
;
--DROP TABLE runningcatalog;
/* This table contains the unique sources that were detected
* during an observation.
* TODO: The resolution element (from images table) is not implemented yet
* Extractedsources not in this table are appended when there is no positional match
* or when a source was detected in a higher resolution image.
*
* We maintain weighted averages for the sources (see ch4, Bevington)
* wm_ := weighted mean
*
* wm_ra := avg_wra/avg_weight_ra
* wm_decl := avg_wdecl/avg_weight_decl
* wm_ra_err := 1/(N * avg_weight_ra)
* wm_decl_err := 1/(N * avg_weight_decl)
* avg_wra := avg(ra/ra_err^2)
* avg_wdecl := avg(decl/decl_err^2)
* avg_weight_ra := avg(1/ra_err^2)
* avg_weight_decl := avg(1/decl_err^2)
*/
CREATE SEQUENCE "seq_runningcatalog" AS INTEGER;
CREATE TABLE runningcatalog
(runcatid INT DEFAULT NEXT VALUE FOR "seq_runningcatalog"
,first_xtrsrc_id int not null -- id of the first observation
,ds_id INT NULL
,band INT NULL -- not null for group members ONLY
,stokes CHAR(1) NULL -- not null for group members ONLY
,datapoints INT NOT NULL
,decl_zone INT NULL
,wm_ra DOUBLE NOT NULL
,wm_decl DOUBLE NOT NULL
,wm_ra_err DOUBLE NOT NULL
,wm_decl_err DOUBLE NOT NULL
,avg_wra DOUBLE NULL
,avg_wdecl DOUBLE NULL
,avg_weight_ra DOUBLE NULL
,avg_weight_decl DOUBLE NULL
,source_kind smallint not null default 0 -- 0-Point; 1-Gaussian, 2-Group head, 3-Dummy;
,is_group BOOLEAN NOT NULL DEFAULT FALSE -- to be used for groups
,group_head_id INT NULL --reference to the group head
,deleted BOOLEAN NOT NULL DEFAULT FALSE -- deletion flag
,x DOUBLE NOT NULL
,y DOUBLE NOT NULL
,z DOUBLE NOT NULL
--,beam_semimaj DOUBLE NULL
--,beam_semimin DOUBLE NULL
--,beam_pa DOUBLE NULL
--,stokes CHAR(1) NOT NULL DEFAULT 'I'
)
;
CREATE TABLE runningcatalog_fluxes(
-- Uniqe combination = runcat_id + band + stokes
runcat_id INT NOT NULL --reference to the running catalog (positions)
,band INT NOT NULL
,stokes CHAR(1) NOT NULL DEFAULT 'I'
,datapoints INT NOT NULL -- number of observations for this band + stokes
--Flux information
,avg_f_peak DOUBLE NULL
,avg_f_peak_sq DOUBLE NULL
,avg_weight_f_peak DOUBLE NULL
,avg_weighted_f_peak DOUBLE NULL
,avg_weighted_f_peak_sq DOUBLE NULL
);
drop table temp_associations;
create table temp_associations(
xtrsrc_id int not null
,runcat_id int not null
,weight double null
,distance_arcsec double null
,lr_method int null default 0
,r double null
,lr double null
,xtr_count int null
,run_count int null
,kind int null -- 1: 1-1, 2: 1-n; 3: n-1; 4: n-n
,group_head_id int null
);
--DROP TABLE temprunningcatalog;
/* This table contains the unique sources that were detected
* during an observation.
* TODO: The resolution element (from images table) is not implemented yet
* Extractedsources not in this table are appended when there is no positional match
* or when a source was detected in a higher resolution image.
*/
--DROP TABLE tempbasesources;
CREATE TABLE temprunningcatalog
(xtrsrc_id INT NOT NULL
,assoc_xtrsrc_id INT NOT NULL
,ds_id INT NOT NULL
,band INT NOT NULL
,datapoints INT NOT NULL
,source_kind smallint not null default 0 -- 0-Point; 1-Gaussian;
,zone INT NOT NULL
,wm_ra DOUBLE NOT NULL
,wm_decl DOUBLE NOT NULL
,wm_ra_err DOUBLE NOT NULL
,wm_decl_err DOUBLE NOT NULL
,avg_wra DOUBLE NOT NULL
,avg_wdecl DOUBLE NOT NULL
,avg_weight_ra DOUBLE NOT NULL
,avg_weight_decl DOUBLE NOT NULL
,x DOUBLE NOT NULL
,y DOUBLE NOT NULL
,z DOUBLE NOT NULL
,margin BOOLEAN NOT NULL DEFAULT 0
,beam_semimaj DOUBLE NULL
,beam_semimin DOUBLE NULL
,beam_pa DOUBLE NULL
,stokes CHAR(1) NOT NULL DEFAULT 'I'
,avg_f_peak DOUBLE NULL
,avg_f_peak_sq DOUBLE NULL
,avg_weight_f_peak DOUBLE NULL
,avg_weighted_f_peak DOUBLE NULL
,avg_weighted_f_peak_sq DOUBLE NULL
)
;
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import copy
import re
import monetdb
import monetdb.sql as db
def drop_table(conn, tab_name):
cur = conn.cursor();
cur.execute("select count(*) from sys.tables where name = '%s';" % tab_name)
data = cur.fetchone()
if data[0] == 1:
cur.execute("drop table %s;" % tab_name)
print 'Table %s dropped' % tab_name
cur.close()
def create_table(conn, tab_name):
sql_file = open("create.table.%s.sql" % tab_name, 'r')
sql_lines = ''.join(sql_file.readlines())
sql_lines = re.sub(r'/\*.*?\*/','', sql_lines, flags=re.DOTALL)
sql_lines = re.sub(r'--.*$','', sql_lines, flags=re.MULTILINE)
conn.execute(sql_lines)
print "Table %s recreated" % tab_name
def main():
db_host = "localhost"
db_dbase = "test"
db_user = "monetdb"
db_passwd = "monetdb"
db_port = 50000
db_autocommit = True
TABLES = ['frequencybands', 'datasets', 'images',
'assocxtrsources', 'detections',
'runningcatalog', 'runningcatalog_fluxes', 'temprunningcatalog',
'temp_associations']
try:
conn = db.connect(hostname=db_host, database=db_dbase, username=db_user, password=db_passwd, port=db_port, autocommit = db_autocommit)
drop_tables = copy.copy(TABLES)
drop_tables.reverse()
print '='*20
for table in drop_tables:
drop_table(conn, table)
print '='*20
for table in TABLES:
create_table(conn, table)
except db.Error, e:
raise
return 0
if __name__ == '__main__':
main()
#!/usr/bin/python
"""
BBS-format file source object for GSM.
Author: Alexey Mints (2012).
"""
from src.errors import SourcePartMissingException
from src.gsmlogger import get_gsm_logger
class GSMBBSFileSource(object):
"""
Reads source list from BBS file.
"""
#Number of lines to be read in each iteration;
BLOCK_SIZE = 100
FIELD_NAMES = {
'ra': 'ra',
'e_ra': 'ra_err',
'dec': 'decl',
'e_dec': 'decl_err',
'total_flux': 'flux',
'e_total_flux': 'flux_err',
}
def __init__(self, file_id, filename, fileformat="default"):
self.filename = filename
self.file_id = file_id
self.fileformat = fileformat
# Header is a hash of column header.
# For each column name a (column-number, default) tuple is stored.
self.header = {}
self.sources = 0
self.log = get_gsm_logger('bbsfiles', 'import.log')
self.log.info('BBS file source created for name: %s' % filename)
def get_part(self, name, line):
"""
Get a named part from the line using header.
"""
if name in self.header:
aheader = self.header[name]
if (aheader[0] < len(line) and line[aheader[0]]):
return line[aheader[0]]
elif aheader[1]:
return aheader[1]
else:
raise SourcePartMissingException(
"Missing data for column %s in file %s"
% (name, self.filename))
else:
raise SourcePartMissingException("No column %s in file %s"
% (name, self.filename))
def process_line(self, line):
"""
Turn a line into a hash-dataset using header information.
"""
answer = {}
for part in ["ra", "decl", "flux", "flux_err"]:
answer[part] = self.get_part(part, line)
return answer
def read_and_store_data(self, conn):
"""
Read all from the BBS file.
"""
line = None
datafile = open(self.filename, 'r')
if self.fileformat == 'test':
header = datafile.readline().split('=',
1)[1].strip(' ').lower().split(',')
ind = 0
for head_parts in header:
head_part = head_parts.split('=')
if len(head_part) == 1:
# No default value
self.header[head_parts.strip()] = (ind, None)
else:
self.header[head_part[0].strip()] = (ind,
head_part[1].strip("'").strip())
ind = ind + 1
elif self.fileformat == 'default':
line = datafile.readline()
while line.startswith('#'):
comments = line
line = datafile.readline()
header = comments[2:].strip().lower().split(' ')
ind = 0
for head_part in header:
self.header[self.FIELD_NAMES[head_part]] = (ind, head_part)
ind = ind + 1
sql_data = []
commit = conn.autocommit
conn.set_autocommit(False)
sql_insert = 'insert into detections (image_id, lra, ldecl, lra_err, '\
'ldecl_err, lf_peak, lf_peak_err, ldet_sigma) values'
while True:
data_lines = datafile.readlines(self.BLOCK_SIZE)
if not data_lines:
break
for data_line in data_lines:
self.sources = self.sources + 1
dhash = self.process_line(data_line.split())
sql_data.append("('%s', %s, %s, 0.1, 0.1, %s, %s, 3.0)" %
(self.file_id, dhash['ra'],
dhash['decl'], dhash['flux'],
dhash['flux_err']))
sql = "%s %s;" % (sql_insert, ',\n'.join(sql_data))
result = conn.execute(sql)
self.log.info('%s sources loaded' % str(result).strip())
conn.commit()
#Restore autocommit.
conn.set_autocommit(commit)
return True
#!/usr/bin/python
from exceptions import ValueError
import time
import monetdb.sql as db
import logging
from src.gsmlogger import get_gsm_logger
from monetdb.monetdb_exceptions import DatabaseError
"""
Database connection with logging.
Overrides MonetDB connection object.
"""
class MonetLoggedConnection(db.connections.Connection):
"""
Connection with logging.
Overrides MonetDB connection object.
"""
def __init__(self, **params):
super(MonetLoggedConnection, self).__init__(**params)
self.profile = False
self.log = get_gsm_logger('sql', 'sql.log')
self.log.setLevel(logging.DEBUG)
def execute(self, query):
"""
Overriding execute method with logging.
"""
if self.profile:
start = time.time()
self.log.debug(query.replace('\n', ' '))
try:
result = super(MonetLoggedConnection, self).execute(query)
except DatabaseError as oerr:
self.log.error(oerr)
raise oerr
if self.profile:
self.log.debug('Time spent: %s' % (time.time() - start))
return result
def execute_set(self, query_set):
"""
Execute several SQL statements and return the last result.
"""
if not isinstance(query_set, list):
if not isinstance(query_set, str):
raise ValueError("Got %s instead of list of SQLs" %
str(query_set))
else:
query_set = query_set.split(';')
cursor = self.cursor()
for query in query_set:
lastcount = cursor.execute(query)
if lastcount > 0:
return cursor.fetchall()
else:
return True
def exec_return(self, query):
"""
Run a single query and return the first value from resultset.
"""
result = []
try:
cursor = self.cursor()
cursor.execute(query)
result = cursor.fetchone()[0]
except db.Error, exc:
self.log.error("Failed on query: %s. Error: %s" % (query, exc))
raise exc
finally:
cursor.close()
return result
def get_cursor(self, query):
"""
Create and return a cursor for a given query.
"""
cur = self.cursor()
cur.execute(query)
return cur
def established(self):
"""
:returns: True if the connection is active.
"""
if self.mapi:
return True
else:
return False
def call_procedure(self, procname):
"""
Proper procedure call (for Monet/Postgres compatibility.)
"""
self.execute('call %s' % procname)
#!/usr/bin/python
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT, \
ISOLATION_LEVEL_READ_COMMITTED
import psycopg2
from exceptions import ValueError
import time
from src.gsmlogger import get_gsm_logger
class PgConnection(object):
"""
Connection object for PostgreSQL.
"""
def __init__(self, **params):
par = self.map_params(params)
self.conn = psycopg2.connect(**par)
#import ipdb; ipdb.set_trace()
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
self.autocommit = True
self.log = get_gsm_logger('sql', 'sql.log')
self.profile = False
@staticmethod
def is_monet(self):
"""
For quick distinction between MonetDB and Postgres.
"""
return False
@staticmethod
def map_params(somedict):
"""
Map MonetDB connection params to PostgreSQL ones.
If value is None in the mapper, parameter is removed.
"""
mapper = {
'hostname': 'host',
'username': 'user',
'database': 'dbname',
'autocommit': None,
'port': None
}
result = {}
for key, value in somedict.iteritems():
if key in mapper:
if mapper[key] != None:
result[mapper[key]] = value
else:
result[key] = value
return result
def set_autocommit(self, value):
"""
Change commit level for connection.
"""
if value:
self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
else:
self.conn.set_isolation_level(ISOLATION_LEVEL_READ_COMMITTED)
self.autocommit = value
def commit(self):
"""
Commit only if it is needed.
"""
if not self.autocommit:
self.conn.commit()
def execute(self, query):
"""
Overriding execute method with logging.
"""
if self.profile:
start = time.time()
self.log.debug(query.replace('\n', ' '))
cur = self.conn.cursor()
result = cur.execute(query)
cur.close()
if self.profile:
self.log.debug('Time spent: %s' % (time.time() - start))
return result
def execute_set(self, query_set):
"""
Execute several SQL statements and return the last result.
"""
if not isinstance(query_set, list):
if not isinstance(query_set, str):
raise ValueError("Got %s instead of list of SQLs" %
str(query_set))
else:
query_set = query_set.strip('; ').split(';')
cursor = self.conn.cursor()
for query in query_set:
cursor.execute(query)
#We have to be sure that there is anything to fetch.
if cursor.rowcount > 0 and cursor.statusmessage.split()[0] == 'SELECT':
return cursor.fetchall()
else:
return None
def exec_return(self, query):
"""
Run a single query and return the first value from resultset.
"""
result = []
try:
cursor = self.conn.cursor()
cursor.execute(query)
result = cursor.fetchone()[0]
if isinstance(result, long):
result = int(result)
except psycopg2.Error, exc:
self.log.error("Failed on query: %s. Error: %s" % (query, exc))
raise exc
finally:
cursor.close()
return result
def get_cursor(self, query):
"""
Create and return a cursor for a given query.
"""
cur = self.conn.cursor()
cur.execute(query)
return cur
def cursor(self):
"""
Create and return a cursor for a given query.
"""
cur = self.conn.cursor()
return cur
def close(self):
"""
Close the connection.
"""
self.conn.close()
def established(self):
"""
:returns: True if the connection is active.
"""
if self.conn:
return not self.conn.closed
else:
return False
def call_procedure(self, procname):
"""
Proper procedure call (for Monet/Postgres compatibility.)
"""
cur = self.conn.cursor()
cur.execute('select %s' % procname)
cur.close()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment