Commit 801485e1 authored by Vlad Kondratiev's avatar Vlad Kondratiev

added logic of using -s options in dspsr for single pulses addition; added new...

added logic of using -s options in dspsr for single pulses addition; added new argument for CEP2Info class for number of Slurm nodes for summaries. Default number is 2, but for Dragnet cluster 1 is used
parent 42a0414e
......@@ -211,8 +211,8 @@ class CVUnit(PipeUnit):
dspsr_popens=[] # list of dspsr Popen objects
for cc in range(bb, bb+self.nrChanPerSub):
input_file=bf2puma_outfiles[cc]
cmd="dspsr -m %s -b %d -A -L %d %s -E %s/%s.par -O %s_%s_SB%s -t %d %s %s" % \
(obsmjd, dspsr_nbins, cmdline.opts.tsubint, verbose, tmpdir, psr2, psr, self.output_prefix, \
cmd="dspsr -m %s -b %d -A %s %s -E %s/%s.par -O %s_%s_SB%s -t %d %s %s" % \
(obsmjd, dspsr_nbins, self.dspsr_folding_options, verbose, tmpdir, psr2, psr, self.output_prefix, \
input_file.split("_SB")[1], cmdline.opts.nthreads, cmdline.opts.dspsr_extra_opts, input_file)
dspsr_popen = self.start_and_go(cmd, workdir=self.curdir)
dspsr_popens.append(dspsr_popen)
......
......@@ -196,8 +196,8 @@ class CVUnitPart(PipeUnitPart, CVUnit):
dspsr_popens=[] # list of dspsr Popen objects
for cc in range(bb, bb+self.nrChanPerSub):
input_file=bf2puma_outfiles[cc]
cmd="dspsr -m %s -b %d -A -L %d %s -E %s/%s.par -O %s_%s_SB%s -t %d %s %s" % \
(obsmjd, dspsr_nbins, cmdline.opts.tsubint, verbose, tmpdir, psr2, psr, self.output_prefix, \
cmd="dspsr -m %s -b %d -A %s %s -E %s/%s.par -O %s_%s_SB%s -t %d %s %s" % \
(obsmjd, dspsr_nbins, self.dspsr_folding_options, verbose, tmpdir, psr2, psr, self.output_prefix, \
input_file.split("_SB")[1], cmdline.opts.nthreads, cmdline.opts.dspsr_extra_opts, input_file)
dspsr_popen = self.start_and_go(cmd, workdir=self.curdir)
dspsr_popens.append(dspsr_popen)
......
......@@ -425,8 +425,8 @@ class Pipeline:
if cmdline.opts.is_slurm:
# adding 1 more because the main Pulp process is also run under Slurm
#need_only_nnodes=len(self.summary_dirs.items()) + 1
need_only_nnodes=2 # maximum we will need 3 job steps (cpus), and original allocation should have 20 tasks per node,
# so we should just fit in 1 node
need_only_nnodes=cep2.slurm_number_summary_nodes # maximum we will need 3 job steps (cpus), and original allocation should have 20 tasks per node,
# so we should just fit in 1 node
log.info("Shrinking Slurm job %s (%s) allocation to %d node(s) to finish up with summaries..." % (cep2.slurm_jobid, cep2.slurm_jobname, need_only_nnodes))
try:
docker_cmd_prefix=cep2.docker_cmd_prefix
......
......@@ -49,7 +49,7 @@ def dspsr_postproc(root, ref, cmdline, obs, psr, total_chan, nsubs_eff, curdir,
if not cmdline.opts.is_norfi:
# first we check how large the dataset is (product of number of channels and subints)
# this is necessary as clean.py uses a lot of memory. If som then we use old-fashioned paz -r
if (obs.duration/cmdline.opts.tsubint) * total_chan >= 256000:
if (obs.duration/cmdline.opts.tsubint) * total_chan >= 512000:
root.log.info("Zapping channels using median smoothed difference algorithm...")
cmd="paz -r -e paz.ar %s_%s.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
......@@ -65,12 +65,19 @@ def dspsr_postproc(root, ref, cmdline, obs, psr, total_chan, nsubs_eff, curdir,
root.execute(cmd, workdir=curdir)
# dedispersing
root.log.info("Dedispersing...")
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.ar" % (curdir, psr, output_prefix)):
cmd="pam -D -m %s_%s.paz.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
cmd="pam -D -e dd %s_%s.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
# checking if there was already an option -K. That means we do not need to run dedispersion as all sub-integrations
# have been aligned already
if re.match("^\-K$", cmdline.opts.dspsr_extra_opts) or re.match("\s+\-K$", cmdline.opts.dspsr_extra_opts) or \
re.match("\s+\-K\s+", cmdline.opts.dspsr_extra_opts) or re.match("^\-K\s+", cmdline.opts.dspsr_extra_opts): \
cmd="mv %s_%s.ar %s_%s.dd" % (psr, output_prefix, psr, output_prefix)
root.execute(cmd, workdir=curdir)
else:
root.log.info("Dedispersing...")
if not cmdline.opts.is_norfi or os.path.exists("%s/%s_%s.paz.ar" % (curdir, psr, output_prefix)):
cmd="pam -D -m %s_%s.paz.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
cmd="pam -D -e dd %s_%s.ar" % (psr, output_prefix)
root.execute(cmd, workdir=curdir)
# scrunching in frequency
root.log.info("Scrunching in frequency to have %d channels in the output ar-file..." % (nsubs_eff))
......@@ -306,6 +313,14 @@ class PipeUnit:
# to be sure that we have unique list of pulsars (especially relevant for tabfind+ option)
self.psrs = np.unique(self.psrs)
# dspsr folding options
# making choice between -L %d and "-s"
# by default -L is used, but if -s is given in the dspsr_extra_opts, then we should get rid of -L
self.dspsr_folding_options="-L %d" % (cmdline.opts.tsubint)
if re.match("^\-s$", cmdline.opts.dspsr_extra_opts) or re.match("\s+\-s$", cmdline.opts.dspsr_extra_opts) or \
re.match("\s+\-s\s+", cmdline.opts.dspsr_extra_opts) or re.match("^\-s\s+", cmdline.opts.dspsr_extra_opts): \
self.dspsr_folding_options=""
# function to set outdir and curdir directories
def set_outdir(self, obs, cep2, cmdline):
if len(self.tab.location) == 0: # when raw data are erased but still want to run only summaries
......@@ -1374,8 +1389,8 @@ self.output_prefix, cmdline.opts.bf2fits_extra_opts, input_file)
for psr in self.psrs: # pulsar list is empty f --nofold is used
psr2=re.sub(r'^[BJ]', '', psr)
dspsr_nbins=self.get_best_nbins("%s/%s.par" % (tmpdir, psr2))
cmd="dspsr -b %d -A -L %d -E %s/%s.par %s -O %s_%s -t %d %s %s.fits" % \
(dspsr_nbins, cmdline.opts.tsubint, tmpdir, psr2, verbose, psr, \
cmd="dspsr -b %d -A %s -E %s/%s.par %s -O %s_%s -t %d %s %s.fits" % \
(dspsr_nbins, self.dspsr_folding_options, tmpdir, psr2, verbose, psr, \
self.output_prefix, cmdline.opts.nthreads, cmdline.opts.dspsr_extra_opts, self.output_prefix)
dspsr_popen = self.start_and_go(cmd, workdir=self.curdir)
dspsr_popens.append(dspsr_popen)
......@@ -1632,8 +1647,8 @@ self.output_prefix, cmdline.opts.bf2fits_extra_opts, input_file)
self.hoover_mounting(cep2, self.tab.rawfiles[loc][0], loc)
fpart=int(s0_files[ii].split("_P")[-1].split("_")[0])
if not cmdline.opts.is_nofold and not cmdline.opts.is_skip_dspsr:
cmd="dspsr -b %d -A -L %d %s -E %s/%s.par -O %s_%s_P%d -t %d %s %s" % \
(dspsr_nbins, cmdline.opts.tsubint, verbose, tmpdir, psr2, \
cmd="dspsr -b %d -A %s %s -E %s/%s.par -O %s_%s_P%d -t %d %s %s" % \
(dspsr_nbins, self.dspsr_folding_options, verbose, tmpdir, psr2, \
psr, self.output_prefix, fpart, cmdline.opts.nthreads, cmdline.opts.dspsr_extra_opts, s0_files[ii])
self.execute(cmd, workdir=self.curdir)
# run digifil with coherent dedispersion for further single-pulse analysis
......
......@@ -190,8 +190,8 @@ class PipeUnitPart(PipeUnit):
dspsr_nbins=self.get_best_nbins("%s/%s.par" % (tmpdir, psr2))
if not cmdline.opts.is_nofold and not cmdline.opts.is_skip_dspsr:
self.log.info("Running dspsr for pulsar %s..." % (psr))
cmd="dspsr -b %d -A -L %d %s -E %s/%s.par -O %s_%s -t %d %s %s" % \
(dspsr_nbins, cmdline.opts.tsubint, verbose, tmpdir, psr2, psr, self.output_prefix, \
cmd="dspsr -b %d -A %s %s -E %s/%s.par -O %s_%s -t %d %s %s" % \
(dspsr_nbins, self.dspsr_folding_options, verbose, tmpdir, psr2, psr, self.output_prefix, \
cmdline.opts.nthreads, cmdline.opts.dspsr_extra_opts, s0_file)
self.execute(cmd, workdir=self.curdir)
# run digifil with coherent dedispersion for further single-pulse analysis
......@@ -432,8 +432,8 @@ self.output_prefix, cmdline.opts.bf2fits_extra_opts, input_file)
for psr in self.psrs: # pulsar list is empty if --nofold is used
psr2=re.sub(r'^[BJ]', '', psr)
dspsr_nbins=self.get_best_nbins("%s/%s.par" % (tmpdir, psr2))
cmd="dspsr -b %d -A -L %d -E %s/%s.par %s -O %s_%s -t %d %s %s.fits" % \
(dspsr_nbins, cmdline.opts.tsubint, tmpdir, psr2, verbose, psr, \
cmd="dspsr -b %d -A %s -E %s/%s.par %s -O %s_%s -t %d %s %s.fits" % \
(dspsr_nbins, self.dspsr_folding_options, tmpdir, psr2, verbose, psr, \
self.output_prefix, cmdline.opts.nthreads, cmdline.opts.dspsr_extra_opts, self.output_prefix)
dspsr_popen = self.start_and_go(cmd, workdir=self.curdir)
dspsr_popens.append(dspsr_popen)
......
......@@ -95,6 +95,8 @@ class CEP2Info:
self.slurm_jobid=""
# job name
self.slurm_jobname="Pulp"
# number of nodes needed for summaries (default is 2)
self.slurm_number_summary_nodes=2
#
# DOCKER related
#
......@@ -167,6 +169,8 @@ class CEP2Info:
# extra options for summary nodes
#self.slurm_summaries_extra_opts="-p proc"
self.slurm_summaries_extra_opts="-N 1 -w dragproc --mem-per-cpu=8192"
# number of nodes needed for summaries (default is 2)
self.slurm_number_summary_nodes=1
# rawdata specificators
self.rawdir_suffix_specificator="*/"
self.rawdir_prefix_specificator="*/"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment