Commit d2058b39 authored by sarrvesh's avatar sarrvesh

pylinted code

parent f29860fa
[MASTER]
ignore=
[BASIC]
good-names =
lv,
ie,
d,
i,
c,
ra,
dec,
ax,
bandwidth,
n
[FORMAT]
max-line-length = 84
[TYPECHECK]
ignored-modules=astropy,ephem
"""Functions to validate user-input"""
import numpy as np
from astropy.coordinates import SkyCoord
def compute_baselines(nCore, nRemote, nInt, hbaMode):
"""For a given number of core, remote, and international stations
and the HBA mode, compute the number of baselines formed by
the array. The number of baselines includes autocorrelations."""
if 'hba' in hbaMode:
nStations = (2*nCore)+nRemote+nInt
else:
nStations = nCore+nRemote+nInt
return (nStations*(nStations+1))/2
def compute_baselines(n_core, n_remote, n_int, hba_mode):
"""For a given number of core, remote, and international stations
and the HBA mode, compute the number of baselines formed by
the array. The number of baselines includes autocorrelations."""
if 'hba' in hba_mode:
n_stations = (2*n_core)+n_remote+n_int
else:
n_stations = n_core+n_remote+n_int
return (n_stations*(n_stations+1))/2
def calculate_im_noise(n_core, n_remote, n_int, hba_mode, obs_t, n_sb):
"""Calculate the image sensitivity for a given number of stations, HBA/LBA mode,
observation time, and number of subbands."""
# Hardcoded value for subband width
sb_width = 195.3125 # kHz
# Hardcoded values for station SEFD
core_sefd = {'lba' : 38160, 'hba' : 2820}
remote_sefd = {'lba' : 38160, 'hba' : 1410}
int_sefd = {'lba' : 18840, 'hba' : 710}
# Figure out whether the user wants to observe with LBA or HBA.
if 'hba' in hba_mode:
mode = 'hba'
n_core *= 2
else:
mode = 'lba'
# Calculate the bandwidth in MHz
bandwidth = n_sb * sb_width * 1.E3
bandwidth /= 1.E6
def calculate_im_noise(nCore, nRemote, nInt, hbaMode, obsT, nSB):
"""Calculate the image sensitivity for a given number of stations, HBA/LBA mode,
observation time, and number of subbands."""
# Hardcoded value for subband width
sbWidth = 195.3125 # kHz
# Hardcoded values for station SEFD
coresefd = { 'lba' : 38160, 'hba' : 2820 }
remotesefd = { 'lba' : 38160, 'hba' : 1410 }
intsefd = { 'lba' : 18840, 'hba' : 710 }
# Figure out whether the user wants to observe with LBA or HBA.
if 'hba' in hbaMode:
mode = 'hba'
nCore *= 2
else:
mode = 'lba'
# Calculate the bandwidth in MHz
bandwidth = nSB * sbWidth * 1.E3
bandwidth /= 1.E6
# Calculate the sensitivity
prodcc = coresefd[mode]
if hbaMode == 'hbadualinner':
# SEFD of the tapered remote station is the same as a core station
prodrr = coresefd[mode]
else:
prodrr = remotesefd[mode]
prodii = intsefd[mode]
prodcr = np.sqrt(prodcc) * np.sqrt(prodrr)
prodci = np.sqrt(prodcc) * np.sqrt(prodii)
prodri = np.sqrt(prodrr) * np.sqrt(prodii)
nccbl = nCore*(nCore-1)/2
nrrbl = nRemote*(nRemote-1)/2
niibl = nInt*(nInt-1)/2
ncrbl = nCore * nRemote
ncibl = nCore * nInt
nribl = nRemote * nInt
denom = 4 * bandwidth * obsT * 1.E6 * ( (nccbl/prodcc**2) + (nrrbl/prodrr**2) + \
(niibl/prodii**2) + (ncrbl/prodcr**2) + \
(ncibl/prodci**2) + (nribl/prodri**2) )
imNoise = 1/np.sqrt(denom)
imNoise *= 1.E6 # In uJy
return '{:0.2f}'.format(imNoise)
# Calculate the sensitivity
prodcc = core_sefd[mode]
if hba_mode == 'hbadualinner':
# SEFD of the tapered remote station is the same as a core station
prodrr = core_sefd[mode]
else:
prodrr = remote_sefd[mode]
prodii = int_sefd[mode]
prodcr = np.sqrt(prodcc) * np.sqrt(prodrr)
prodci = np.sqrt(prodcc) * np.sqrt(prodii)
prodri = np.sqrt(prodrr) * np.sqrt(prodii)
nccbl = n_core*(n_core-1)/2
nrrbl = n_remote*(n_remote-1)/2
niibl = n_int*(n_int-1)/2
ncrbl = n_core * n_remote
ncibl = n_core * n_int
nribl = n_remote * n_int
denom = 4 * bandwidth * obs_t * 1.E6 * \
((nccbl/prodcc**2) + (nrrbl/prodrr**2) + \
(niibl/prodii**2) + (ncrbl/prodcr**2) + \
(ncibl/prodci**2) + (nribl/prodri**2))
im_noise = 1/np.sqrt(denom)
im_noise *= 1.E6 # In uJy
return '{:0.2f}'.format(im_noise)
def calculate_raw_size(obsT, intTime, nBaselines, nChan, nSB):
"""Compute the datasize of a raw LOFAR measurement set given the
length of the observation, correlator integration time, number
of baselines, number of channels per subband, and number of subbands"""
nRows = int( nBaselines * (obsT / intTime) ) - nBaselines
# A single row in LofarStMan format contains
# - 32-bit sequence number (4 bytes)
# - nChan*16-bit samples for weight and sigma calculation (2*nChan bytes)
# - 4*nChan*2*float data array (4*nChan*2*4 bytes)
sbSize = nRows * ((4) + (2*nChan) + (4*nChan*2*4))/(1024*1024*1024) # in GB
totSize = sbSize * nSB
return '{:0.2f}'.format(totSize)
def calculate_raw_size(obs_t, int_time, n_baselines, n_chan, n_sb):
"""Compute the datasize of a raw LOFAR measurement set given the
length of the observation, correlator integration time, number
of baselines, number of channels per subband, and number of subbands"""
n_rows = int(n_baselines * (obs_t / int_time)) - n_baselines
# A single row in LofarStMan format contains
# - 32-bit sequence number (4 bytes)
# - n_chan*16-bit samples for weight and sigma calculation (2*n_chan bytes)
# - 4*n_chan*2*float data array (4*n_chan*2*4 bytes)
sb_size = n_rows * ((4) + (2*n_chan) + (4*n_chan*2*4))/(1024*1024*1024) # in GB
tot_size = sb_size * n_sb
return '{:0.2f}'.format(tot_size)
def calculate_proc_size(obsT, intTime, nBaselines, nChan, nSB, pipeType, tAvg,
fAvg, dyCompress):
"""Compute the datasize of averaged LOFAR measurement set given the
length of the observation, integration time, number of baselines,
pipeline type, time and frequency averaging factor, and
enable dysco compression."""
if pipeType == 'none':
return ''
elif pipeType == 'preprocessing':
# Change nChan to account for fAvg
nChan //= fAvg
# Change integT to account for tAvg
intTime *= tAvg
nRows = int( nBaselines * (obsT / intTime) ) - nBaselines
# What does a single row in an averaged MS contain?
sbSize = nRows * ((7*8) + \
(4+(4*nChan)) + \
(4*11) + \
(8*1) + \
(4) + \
(4 * (8 + 8*nChan + 4*nChan)) )
# Convert byte length to GB
sbSize /= (1024*1024*1024)
totSize = sbSize * nSB
# Reduce the data size if dysco is enabled.
if dyCompress == 'enable':
totSize = totSize/3.
return '{:0.2f}'.format(totSize)
else:
pass
def calculate_proc_size(obs_t, int_time, n_baselines, n_chan, n_sb, pipe_type,
t_avg, f_avg, dy_compress):
"""Compute the datasize of averaged LOFAR measurement set given the
length of the observation, integration time, number of baselines,
pipeline type, time and frequency averaging factor, and
enable dysco compression."""
if pipe_type == 'none':
return ''
elif pipe_type == 'preprocessing':
# Change n_chan to account for f_avg
n_chan //= f_avg
# Change integ_t to account for t_avg
int_time *= t_avg
n_rows = int(n_baselines * (obs_t / int_time)) - n_baselines
# What does a single row in an averaged MS contain?
sb_size = n_rows * ((7*8) + \
(4+(4*n_chan)) + \
(4*11) + \
(8*1) + \
(4) + \
(4 * (8 + 8*n_chan + 4*n_chan)))
# Convert byte length to GB
sb_size /= (1024*1024*1024)
tot_size = sb_size * n_sb
# Reduce the data size if dysco is enabled.
if dy_compress == 'enable':
tot_size = tot_size/3.
return '{:0.2f}'.format(tot_size)
def validate_inputs(obsT, nCore, nRemote, nInt, nSB, integT, tAvg,
fAvg, srcName, coord, hbaMode):
"""Valid text input supplied by the user: observation time, number of
subbands, and integration time. Following checks will be performed:
- obsTime is a valid positive number
- nCore is not None
- nRemote is not None
- nInt is not None
- nCore+nRemote+nInt is at least 1
- nSB is an integer and is at least 1 or greater
- integT is a valid positive number greater than or equal to 0.16
- tAvg is an integer
- fAvg is an integer
- srcName is a string
def validate_inputs(obs_t, n_core, n_remote, n_int, n_sb, integ_t, t_avg,
f_avg, src_name, coord, hba_mode):
"""Valid text input supplied by the user: observation time, number of
subbands, and integration time. Following checks will be performed:
- obs_time is a valid positive number
- n_core is not None
- n_remote is not None
- n_int is not None
- n_core+n_remote+n_int is at least 1
- n_sb is an integer and is at least 1 or greater
- integ_t is a valid positive number greater than or equal to 0.16
- t_avg is an integer
- f_avg is an integer
- src_name is a string
- coord is a valid AstroPy coordinate
- While observing with HBA, check if the targets are inside the tile beam.
Return state=True/False accompanied by an error msg
Note: all input parameters are still strings."""
msg = ''
# Validate the length of the observing time
try:
float(obsT)
if not float(obsT) > 0:
msg += 'Observation time cannot be zero or negative.\n'
except ValueError:
msg += 'Invalid observation time specified.\n'
# Validate the number of stations
if nCore < 0 or nCore > 24:
msg += 'Number of core stations must be between 0 and 24.\n'
if nRemote < 0 or nRemote > 14:
msg += 'Number of remote stations must be between 0 and 14.\n'
if nInt < 0 or nInt > 14:
msg += 'Number of international stations must be between 0 and 14.\n'
if nCore + nRemote + nInt < 2:
msg += 'At least 2 station must be included.\n'
# Validate the number of subbands
try:
int(nSB)
if int(nSB) < 1:
msg += 'Number of subbands cannot be less than 1.\n'
if int(nSB) > 488:
msg += 'Number of subbands cannot be larger than 488.\n'
except ValueError:
msg += 'Invalid number of subbands specified.\n'
# Validate integration time
try:
float(integT)
if float(integT) < 0.16:
msg += 'Invalid integration time specified. Must be >= 0.16\n'
except:
msg += 'Invalid integration time specified.\n'
# Validate time averaging factor
try:
int(str(tAvg))
except ValueError:
msg += 'Invalid time averaging factor specified.'
# Validate frequency averaging factor
try:
int(str(fAvg))
except ValueError:
msg += 'Invalid frequency averaging factor specified.'
# Validate the coordinates specified under target setup
if coord is not '':
# Warn if the number of targets do not match the number of coordinates
if len(srcName.split(',')) != len(coord.split(',')):
msg += 'Number of target names do not match the number of coordinates. '
# Check if the coordinates are valid
try:
for i in range(len(coord.split(','))):
SkyCoord(coord.split(',')[i])
except:
msg += 'Invalid coodinate value under Target setup. Please make ' +\
'sure it is compatible with the AstroPy formats.'
# While observing with HBA, check if the specified targets all lie within 10 deg
coord_list = coord.split(',')
if 'hba' in hbaMode and len(coord_list) > 1:
refPoint = SkyCoord(coord_list[0])
angDistance = []
for i in range(1, len(coord_list)):
thisPoint = SkyCoord(coord_list[i])
angDistance.append(thisPoint.separation(refPoint).deg)
maxDistance = np.max(np.asarray(angDistance))
if maxDistance > 10.:
msg += 'Maximum angular separation between specified target pointings ' + \
'is {:.2f} degrees. This is not allowed while '.format(maxDistance) + \
'observing with the High Band Antenna'
# If any error has been triggered above, return the error message
if msg is not '':
return False, msg
else:
return True, msg
Return state=True/False accompanied by an error msg
Note: all input parameters are still strings."""
msg = ''
# Validate the length of the observing time
try:
float(obs_t)
if float(obs_t) <= 0:
msg += 'Observation time cannot be zero or negative.\n'
except ValueError:
msg += 'Invalid observation time specified.\n'
# Validate the number of stations
if n_core < 0 or n_core > 24:
msg += 'Number of core stations must be between 0 and 24.\n'
if n_remote < 0 or n_remote > 14:
msg += 'Number of remote stations must be between 0 and 14.\n'
if n_int < 0 or n_int > 14:
msg += 'Number of international stations must be between 0 and 14.\n'
if n_core + n_remote + n_int < 2:
msg += 'At least 2 station must be included.\n'
# Validate the number of subbands
try:
int(n_sb)
if int(n_sb) < 1:
msg += 'Number of subbands cannot be less than 1.\n'
if int(n_sb) > 488:
msg += 'Number of subbands cannot be larger than 488.\n'
except ValueError:
msg += 'Invalid number of subbands specified.\n'
# Validate integration time
try:
float(integ_t)
if float(integ_t) < 0.16:
msg += 'Invalid integration time specified. Must be >= 0.16\n'
except:
msg += 'Invalid integration time specified.\n'
# Validate time averaging factor
try:
int(str(t_avg))
except ValueError:
msg += 'Invalid time averaging factor specified.'
# Validate frequency averaging factor
try:
int(str(f_avg))
except ValueError:
msg += 'Invalid frequency averaging factor specified.'
# Validate the coordinates specified under target setup
if coord is not '':
# Warn if the number of targets do not match the number of coordinates
if len(src_name.split(',')) != len(coord.split(',')):
msg += 'Number of target names do not match the number of coordinates. '
# Check if the coordinates are valid
try:
for i in range(len(coord.split(','))):
SkyCoord(coord.split(',')[i])
except:
msg += 'Invalid coodinate value under Target setup. Please make ' +\
'sure it is compatible with the AstroPy formats.'
# While observing with HBA, check if the specified targets all lie within 10 deg
coord_list = coord.split(',')
if 'hba' in hba_mode and len(coord_list) > 1:
ref_point = SkyCoord(coord_list[0])
ang_distance = []
for i in range(1, len(coord_list)):
this_point = SkyCoord(coord_list[i])
ang_distance.append(this_point.separation(ref_point).deg)
max_distance = np.max(np.asarray(ang_distance))
if max_distance > 10.:
msg += 'Maximum angular separation between specified target ' + \
'pointings is {:.2f} degrees. This is '.format(max_distance) + \
'not allowed while observing with the High Band Antenna'
# If any error has been triggered above, return the error message
if msg is not '':
return False, msg
else:
return True, msg
This diff is collapsed.
from datetime import datetime, timedelta
"""Functions to generate PDF file"""
from datetime import datetime
import os
from fpdf import FPDF, HTMLMixin
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import os
# Dummy class needed to generate the PDF file
class MyFPDF(FPDF, HTMLMixin): pass
class MyFPDF(FPDF, HTMLMixin):
"""Dummy class"""
pass
def convert_figure_to_axis_info(figure):
"""For a given Graph Figure object, return
xaxis (a list of datetime.datetime objects),
yaxis (a list of source elevation), and
label (name of the source as a string)."""
time_axis = figure['x']
xaxis = []
for val in time_axis:
d = datetime.strptime(val, '%Y-%m-%dT%H:%M:%S')
xaxis.append(d)
yaxis = figure['y']
label = figure['name']
return xaxis, yaxis, label
def make_pdf_plot(elevation_fig, outfilename):
"""For a given elevation_fig object and output filename, generate a
matplotlib plot and write it to disk."""
fig, ax = plt.subplots(1, 1, figsize=(8, 5))
for figure in elevation_fig['data']:
xaxis, yaxis, label = convert_figure_to_axis_info(figure)
ax.plot(xaxis, yaxis, label=label)
hour_loc = (0, 3, 6, 9, 12, 15, 18, 21)
ax.xaxis.set_major_locator(mdates.HourLocator(hour_loc))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
plt.xlabel('Time (UTC)')
plt.ylabel('Elevation (deg)')
plt.title('Target visibility plot')
if len(elevation_fig['data']) > 1:
ax.legend()
plt.tight_layout()
plt.savefig(outfilename, dpi=100)
def generate_pdf(pdf_file, obs_t, n_core, n_remote, n_int, n_chan, n_sb, integ_t,
antenna_set, pipe_type, t_avg, f_avg, is_dysco, im_noise_val,
raw_size, proc_size, pipe_time, elevation_fig, distance_table):
"""Function to generate a pdf file summarizing the content of the calculator.
Return nothing."""
# Create an A4 sheet
pdf = MyFPDF('P', 'mm', 'A4')
pdf.add_page()
pdf.set_font('Arial', '', 16)
# Generate an html string to be written to the file
string = '<table border="0" align="left" width="80%">'
string += '<thead><tr><th width="70%" align="left">Parameter</th>'
string += '<th width="30%" align="left">Value</th></tr></thead>'
string += '<tbody>'
string += '<tr><td>Observation time (in seconds)</td>'
string += ' <td>{}</td></tr>'.format(obs_t)
string += '<tr><td>No. of stations</td>'
string += ' <td>({}, {}, {})</td></tr>'.format(n_core, n_remote, n_int)
string += '<tr><td>No. of subbands</td>'
string += ' <td>{}</td></tr>'.format(n_sb)
string += '<tr><td>No. of channels per subband</td>'
string += ' <td>{}</td></tr>'.format(n_chan)
string += '<tr><td>Integration time (in seconds)</td>'
string += ' <td>{}</td></tr>'.format(integ_t)
string += '<tr><td>Antenna set</td>'
string += ' <td>{}</td></tr>'.format(antenna_set)
string += '<tr></tr>'
string += '<tr><td>Pipeline type</td>'
if pipe_type == 'none':
string += ' <td>{}</td></tr>'.format('None')
else:
string += ' <td>{}</td></tr>'.format('Preprocessing')
string += '<tr><td>Averaging factor (time, freq)</td>'
string += ' <td>{}, {}</td></tr>'.format(t_avg, f_avg)
string += '<tr><td>Dysco compression</td>'
if is_dysco == 'enable':
string += ' <td>{}</td></tr>'.format('enabled')
else:
string += ' <td>{}</td></tr>'.format('disabled')
string += '<tr></tr>'
string += '<tr><td>Theoretical image sensitivity (uJy/beam)</td>'
string += ' <td>{}</td></tr>'.format(im_noise_val)
string += '<tr><td>Raw data size (in GB)</td>'
string += ' <td>{}</td></tr>'.format(raw_size)
if pipe_type != 'none':
string += '<tr><td>Processed data size (in GB)</td>'
string += ' <td>{}</td></tr>'.format(proc_size)
string += '<tr><td>Pipeline processing time (in hours)</td>'
string += ' <td>{}</td></tr>'.format(pipe_time)
string += '</tbody>'
string += '</table>'
# Generate a matplotlib plot showing the same plot as in the target
# visibility plot
if elevation_fig != {}:
# User has specified at least one source in the target setup
png_file_name = pdf_file.replace('summary', 'plot').replace('pdf', 'png')
make_pdf_plot(elevation_fig, png_file_name)
# Add the elevation plot to html
string += '<center>'
string += '<img src={} width=400 height=250>'.format(png_file_name)
string += '</center>'
def convertFigureToAxisInfo(figure):
"""For a given Graph Figure object, return
xaxis (a list of datetime.datetime objects),
yaxis (a list of source elevation), and
label (name of the source as a string)."""
time_axis = figure['x']
xaxis = []
for val in time_axis:
d = datetime.strptime(val, '%Y-%m-%dT%H:%M:%S')
xaxis.append( d )
yaxis = figure['y']
label = figure['name']
return xaxis, yaxis, label
# Add the distance table to the PDF
if distance_table != {}:
title = distance_table['layout']['title']
string += '<center><b>{}</b></center>'.format(title)
string += '<table border="0" align="left" width="80%">'
col_titles = distance_table['data'][0]['header']['values']
col_width = 100//len(col_titles)
string += '<thead><tr>'
for item in col_titles:
string += '<th width="{}%" align="left">'.format(col_width) + \
item + '</th>'
string += '</tr></thead>'
string += '<tbody>'
tab_data = distance_table['data'][0]['cells']['values']
# Transpose tab_data and write cells to the table
tab_data = list(map(list, zip(*tab_data)))
for row in tab_data:
string += '<tr>'
for item in row:
string += '<td>{}</td>'.format(item)
string += '</tr>'
string += '</tbody>'
string += '</table>'
def makePdfPlot(elevation_fig, outfilename):
"""For a given elevation_fig object and output filename, generate a
matplotlib plot and write it to disk."""
fig, ax = plt.subplots(1, 1, figsize=(8,5))
for figure in elevation_fig['data']:
xaxis, yaxis,label = convertFigureToAxisInfo(figure)
ax.plot(xaxis, yaxis, label=label)
hour_loc = (0, 3, 6, 9, 12, 15, 18, 21)
ax.xaxis.set_major_locator(mdates.HourLocator(hour_loc))
ax.xaxis.set_major_formatter(mdates.DateFormatter('%H:%M'))
plt.xlabel('Time (UTC)')
plt.ylabel('Elevation (deg)')
plt.title('Target visibility plot')
if len(elevation_fig['data']) > 1:
ax.legend()
plt.tight_layout()
plt.savefig(outfilename, dpi=100)
# Write text to the pdf file
pdf.write_html(string)
def generatepdf(pdffile, obsT, nCore, nRemote, nInt, nChan, nSb, integT,
antSet, pipeType, tAvg, fAvg, isDysco, imNoiseVal, rawSize,
procSize, pipeTime, elevation_fig, distance_table, isMsgBoxOpen):
"""Function to generate a pdf file summarizing the content of the calculator.
Return nothing."""
# Create an A4 sheet
pdf = MyFPDF('P', 'mm', 'A4')
pdf.add_page()
pdf.set_font('Arial', '', 16)
# Generate an html string to be written to the file
string = '<table border="0" align="left" width="80%">'
string += '<thead><tr><th width="70%" align="left">Parameter</th>'
string += '<th width="30%" align="left">Value</th></tr></thead>'
string += '<tbody>'
string += '<tr><td>Observation time (in seconds)</td>'
string += ' <td>{}</td></tr>'.format(obsT)
string += '<tr><td>No. of stations</td>'
string += ' <td>({}, {}, {})</td></tr>'.format(nCore, nRemote, nInt)
string += '<tr><td>No. of subbands</td>'
string += ' <td>{}</td></tr>'.format(nSb)
string += '<tr><td>No. of channels per subband</td>'
string += ' <td>{}</td></tr>'.format(nChan)
string += '<tr><td>Integration time (in seconds)</td>'
string += ' <td>{}</td></tr>'.format(integT)
string += '<tr><td>Antenna set</td>'