Skip to content
Snippets Groups Projects
Commit a436c96b authored by Aleksandar Shulevski's avatar Aleksandar Shulevski
Browse files

Add beam contour overplotting

parent 7e220aa7
Branches main
No related tags found
No related merge requests found
...@@ -3,559 +3,322 @@ import numpy as np ...@@ -3,559 +3,322 @@ import numpy as np
import numpy.ma as ma import numpy.ma as ma
import argparse import argparse
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.lines import Line2D
import matplotlib.dates as mdates import matplotlib.dates as mdates
import matplotlib.cm as mcm
import warnings import warnings
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
import h5py
import lofarantpos.db import lofarantpos.db
from lofarantpos.geo import localnorth_to_etrs, geographic_array_from_xyz, xyz_from_geographic, geographic_from_xyz from lofarantpos.geo import localnorth_to_etrs, geographic_array_from_xyz, xyz_from_geographic, geographic_from_xyz
import ephem from lofar_beams import voltage_beam, skycoord_to_lmn, lmn_to_skycoord
from photutils.detection import DAOStarFinder
from astropy.stats import mad_std from astropy.stats import mad_std
import astropy.units as u
from astropy import log from astropy import log
from astropy.time import Time
from astropy.coordinates import EarthLocation, SkyCoord, AltAz, get_sun, GCRS
from astropy.coordinates import SkyOffsetFrame, CartesianRepresentation, get_icrs_coordinates
def voltage_beam(lgrid, mgrid, lbeam, mbeam, c, p, q, r, freq, weights): def read_single_bst(bst_filename):
"""Compute the beam response.""" """Read a single beamlet statistics (BST) file.
# n-coordinates
ngrid = np.sqrt(1 - lgrid**2 - mgrid**2)
nbeam = np.sqrt(1 - lbeam**2 - mbeam**2)
# lmn-uvw product
u, v, w = np.array([p, q, r]) * freq / c
prod = 2j * np.pi * (u[:, np.newaxis, np.newaxis] * (lgrid - lbeam) +
v[:, np.newaxis, np.newaxis] * (mgrid - mbeam) +
w[:, np.newaxis, np.newaxis] * (ngrid - nbeam))
return np.sum(weights * np.exp(prod), axis=0)
def createFixedBody(ra, dec, epoch):
"""Fixed body adapted to us."""
fixedBody = ephem.FixedBody()
fixedBody._ra = ra
fixedBody._dec = dec
fixedBody._epoch = epoch
return fixedBody
class Target(object):
"""Encapsulates celestial bodies."""
def __init__(self, name="", body=None, elevation=[], note = ""):
self.name = name
self.body = body
self.elevation = elevation
self.note = note
def compute_fixed_sources_altaz(station, end_of_day, source):
"""Compute the azimuth and elevation for a given source on the sky.
Parameters Parameters
---------- ----------
station : str bst_filename : str
'CS001' 'LXXX_CS001HBA0_BST.h5'
end_of_day : date
End of the day specified
source : str
Source name, 'CasA' etc.
Returns Returns
------- -------
az : array metadata : dict
Array containing the azimuth of the source Dictionary containing some of the BST metadata
el : array data : list
Array containing the altitude of the source List containing the BST data
time : array
Array containing the time instant for which the calculation is performed
""" """
# Open HDF5 file
h5 = h5py.File(bst_filename, "r")
# Store main header meta data
metadata = {}
for key in ["antenna_names", "antenna_type", "station_name"]:
metadata[key] = h5["/"].attrs[key]
# Antennafield
metadata["antennafield"] = h5["/"].attrs["antenna_field"]
# Number of subintegrations
nsub = len(h5.keys())
metadata["nsub"] = nsub
# Loop over keys
data_dict = []
for key in h5.keys():
# Keys in group
attribute_keys = list(h5[key].attrs.keys())
# Keys to store in dict
keys_to_store = ["frequency_band", "subbands", "timestamp", "integration_interval", "digital_beam_pointing_direction"]
# Create dict
d = {"data": np.array(h5[key])}
for key_to_store in keys_to_store:
if key_to_store in attribute_keys:
d[key_to_store] = h5[key].attrs[key_to_store]
else:
print(f"Failed to find {key_to_store} in {key}")
d[key_to_store] = None
# Store tile beam for HBA
if "HBA" in metadata["antennafield"]:
d["tile_beam_pointing_direction"] = h5[key].attrs["tile_beam_pointing_direction"]
# Store dict
data_dict.append(d)
# Store arrays
metadata["timestamps"] = np.array([d["timestamp"] for d in data_dict])
metadata["durations"] = np.array([d["integration_interval"] for d in data_dict])
# Assume first entry holds for all (ASSUMPTION!!)
metadata["subbands"] = data_dict[0]["subbands"]
metadata["frequency_bands"] = data_dict[0]["frequency_band"]
metadata["digital_beam_pointing_directions"] = data_dict[0]["digital_beam_pointing_direction"][0]
if "HBA" in metadata["antennafield"]:
metadata["tile_beam_pointing_direction"] = data_dict[0]["tile_beam_pointing_direction"][0]
else:
metadata["tile_beam_pointing_direction"] = ""
# Data shape
nchan, npol = data_dict[0]["data"].shape
metadata["nchan"] = nchan
# Store as nsub x npol x nchan
data = np.zeros((nsub, 2, nchan))
for isub in range(nsub):
data[isub, 0] = data_dict[isub]["data"][:, 0::2].T
data[isub, 1] = data_dict[isub]["data"][:, 1::2].T
# Close file
h5.close()
az, el, time = [], [], [] return metadata, data
while station.date < end_of_day - ephem.minute:
station.date += ephem.minute * timestep
source.body.compute(station)
az.append(ephem.degrees(source.body.az))
el.append(source.body.alt * (180. / np.pi))
time.append(station.date.tuple()[3])
return az, el, time
def get_dynspec_for_source(station, end_of_day, pointing, source, freq_arr, c, p, q, r):
"""Construct the dynamic spectrum for a station. def read_bst(bst_filenames):
"""Read multiple beamlet statistics (BST) files.
Parameters Parameters
---------- ----------
station : str bst_filenames : list
'CS001' ['LXXX_CS001HBA0_BST.h5', 'LXXX_CS002HBA0_BST.h5']
end_of_day : date
End of the day specified
pointing : str
RA, DEC of the pointing direction
source : str
Source name, 'CasA' etc.
freq_arr : array
Array of frequencies for which we compute the dynamic spectrum
c : float
Speed of light, m/s
p, q, r : floats
Antenna position parameters
Returns Returns
------- -------
dynspec : array metadata : dict
2D array containing the dynamic spectrum Dictionary containing some of the BST metadata
data : list
List containing the BST data
"""
first, metadata, data = True, None, None
for bst_filename in bst_filenames:
# Get data
if first:
metadata, data = read_single_bst(bst_filename)
first = False
else:
newmetadata, newdata = read_single_bst(bst_filename)
# Concatenate timestamps
metadata["timestamps"] = np.hstack((metadata["timestamps"],
newmetadata["timestamps"]))
# Concatenate integration_interval
metadata["durations"] = np.hstack((metadata["durations"],
newmetadata["durations"]))
# Concatenate data
data = np.vstack((data, newdata))
return metadata, data
def freq_from_sb(sb: int, band: str) -> float:
""" """
Convert central frequency to subband number
dynspec = [] Args:
for fr in freq_arr: sb: subband number
time = [] band: filter band
station.date = start_date
while station.date < end_of_day - ephem.minute:
station.date += ephem.minute * timestep
pointing.body.compute(station)
source.body.compute(station)
az_src = ephem.degrees(source.body.az)
el_src = source.body.alt * (180. / np.pi)
az, el = ephem.degrees(pointing.body.az) * (180. / np.pi), ephem.degrees(pointing.body.alt) * (180. / np.pi)
if el < min_elevation:
continue
ltile, mtile = np.sin(az * np.pi / 180) * np.cos(el * np.pi / 180), np.cos(az * np.pi / 180) * np.cos(el * np.pi / 180)
l_src, m_src = np.sin(az_src * np.pi / 180) * np.cos(el_src * np.pi / 180), np.cos(az_src * np.pi / 180) * np.cos(el_src * np.pi / 180)
# Compute beam
log.debug('Compute beam for {} Hz'.format(fr))
beam = voltage_beam(lgrid, mgrid, ltile, mtile, c, p, q, r, fr, 1.0)
masked_beam = ma.masked_invalid(np.abs(beam))
# Select beam values around source position - is delta of 0.01 right?
# TODO: Normalize beam?
coord_idx = np.argwhere((lgrid > l_src - 0.01) & (lgrid < l_src + 0.01) & (mgrid > m_src - 0.01) & (mgrid < m_src + 0.01))
time.append(np.sum(masked_beam[coord_idx[0], coord_idx[1]]))
dynspec.append(time)
return dynspec
def compute_lobes(station, end_of_day, pointing, min_elevation, freq, c, p, q, r):
"""Compute the position of the grating lobes.
Parameters Returns:
---------- float: frequency in Hz
station : str
'e.g. CS001'
end_of_day : date
End of the day specified
pointing : Object
Pointing direction
min_elevation : float
Minimum elevation below which we do not calculate, in degrees
freq: float
Frequency for which we are performing the calculation
c : float
Speed of light, m/s
p, q, r : floats
Antenna position parameters
Returns Example:
------- >>> freq_from_sb(297, '30_90')
long_lobe : array 58007812.5
Array containing the longitude of the lobe (azimuth or RA)
lat_lobe : array
Array containing the latitude of the lobe (altitude or DEC)
time_lobe : array
Array containing the time instant for which the calculation is performed
""" """
if band not in ["LBA_10_90", "LBA_30_90", "HBA_110_190", "HBA_170_230", "HBA_210_250"]:
return None
long_lobe, lat_lobe, time_lobe = [], [], [] if band == "LBA_10_90" or band == "LBA_30_90":
clock, zone = 200e6, 1
while station.date < end_of_day - ephem.minute: elif band == "HBA_110_190":
station.date += ephem.minute * timestep clock, zone = 200e6, 2
pointing.body.compute(station) elif band == "HBA_170_230":
az, el = ephem.degrees(pointing.body.az) * (180. / np.pi), ephem.degrees(pointing.body.alt) * (180. / np.pi) clock, zone = 160e6, 3
if el < min_elevation: elif band == "HBA_210_250":
continue clock, zone = 200e6, 3
ltile, mtile = np.sin(az * np.pi / 180) * np.cos(el * np.pi / 180), np.cos(az * np.pi / 180) * np.cos(el * np.pi / 180)
x = (ltile + 1) * int(npix / 2)
y = (mtile + 1) * int(npix / 2)
# Compute beam
beam = voltage_beam(lgrid, mgrid, ltile, mtile, c, p, q, r, freq, 1.0)
masked_beam = ma.masked_invalid(np.abs(beam))
bkg_sigma = mad_std(masked_beam)
# Extract grating lobe maxima
daofind = DAOStarFinder(fwhm=4.0, threshold=20.0 * bkg_sigma)
sources = daofind(masked_beam)
if sources is not None:
for col in sources.colnames:
sources[col].info.format = '%.8g' # for consistent table output
for source in sources:
l_rec, m_rec = (int(source["xcentroid"]) - int(npix / 2)) / int(npix / 2), (int(source["ycentroid"]) - int(npix / 2)) / int(npix / 2)
pix_distance = int(np.sqrt(np.power(x - source["xcentroid"], 2) + np.power(y - source["ycentroid"], 2)))
az_rec = np.arctan2(l_rec, m_rec)
el_rec = np.arccos(l_rec / np.sin(az_rec))
grating_distance = ephem.separation((ephem.degrees(az_rec), ephem.degrees(el_rec)),(az * (np.pi / 180.), el * (np.pi / 180.))) * (180. / np.pi)
# Convert to degrees
az_rec = az_rec * (180./np.pi)
el_rec = el_rec * (180./np.pi)
if(grating_distance > 10):
if altaz:
# Return AZ and ALT
long_lobe.append(az_rec * (np.pi / 180.))
lat_lobe.append(el_rec)
time_lobe.append(station.date.tuple()[3])
else:
# Return RA and DEC
ra, dec = station.radec_of(az_rec * (np.pi / 180.), el_rec * (np.pi / 180.))
long_lobe.append(ephem.degrees(ra))
lat_lobe.append(dec * (180. / np.pi))
time_lobe.append(station.date.tuple()[3])
else:
continue
else:
continue
if len(time_lobe) == 0:
continue
return long_lobe, lat_lobe, time_lobe
def init_bodies(station_name, fixed_body_names, sol_system_body_names, fixed_body_coordinates, pointing_coordinates, epoch): sb_bandwidth = 0.5 * clock / 512.
"""Initialize the fixed bodies and get the station information. freq_offset = 0.5 * clock * (zone -1)
freq = (sb * sb_bandwidth) + freq_offset
return freq
def decibel(x):
return 10 * np.log10(x)
def plot_bst_dynamic_spectrum(metadata, data, pol, contours, plot_contours):
"""Plot a dynamic spectrum of the data.
Parameters Parameters
---------- ----------
station_name : str metadata : dict
Name of a LOFAR station, e.g. 'CS001' Dictionary containing some of the BST metadata
fixed_body_names : array data : list
Array containing the names of the fixed body celestial sources List containing the BST data
sol_system_body_names: array pol : string
Array containing the names of the Sol system body sources - the Sun in this case Polarization to plot
fixed_body_coordinates : array contours: dict
Array containing coordinate tuples (RA, DEC) of the fixed body sources Dictionary containing the computed station beam
pointing_coordinates : str
Pointing direction Ra, DEC coordinates
epoch : str
Coordinate epoch, J2000
Returns Returns
------- -------
a_team : array metadata : dict
Array containing the fixed body sources Dictionary containing some of the BST metadata
sol_system: array data : list
Array containing the Sol system body sources List containing the BST data
pointing_direction : object
The object representation of the pointing direction
station : object
The object representation of a station
""" """
# Timestamps
t = mdates.date2num(metadata["timestamps"])
# Frequencies
band_x, band_y = metadata["frequency_bands"][0]
freq_x = np.array([freq_from_sb(sb, band_x) for sb in metadata["subbands"]])
freq_y = np.array([freq_from_sb(sb, band_y) for sb in metadata["subbands"]])
data_mean = np.mean(data, axis=0)
data /= data_mean[np.newaxis, :, :]
#data = decibel(data)
if pol.lower() == "x":
p = data[:, 0, :].squeeze()
elif pol.lower() == "y":
p = data[:, 1, :].squeeze()
else:
p = np.sum(data[:, :, :], axis=1).squeeze()
fig, ax = plt.subplots(figsize=(10, 6))
date_format = mdates.DateFormatter("%F\n%H:%M:%S")
fig.autofmt_xdate(rotation=0, ha="center")
vmin, vmax = np.percentile(p, (5, 99))
if plot_contours:
legend_lines, sources = [], []
ax.imshow(p.T, origin="lower", aspect="auto", interpolation="None", vmin=vmin, vmax=vmax, cmap="Blues_r",
extent=[np.nanmin(t), np.nanmax(t), np.min(freq_x)/1.e6, np.max(freq_x)/1.e6])
if plot_contours:
for color, contour in contours.items():
contours = plt.contour(t, freq_x/1.e6, contour[1], 1, linewidths=3., alpha=0.5, colors=[color])
legend_lines.append(Line2D([],[], color=color))
sources.append(contour[0])
plt.legend(legend_lines, sources, bbox_to_anchor=(1, 1), loc='upper left')
ax.xaxis_date()
ax.xaxis.set_major_formatter(date_format)
ax.set_xlabel("Date (UTC)")
ax.set_ylabel("Frequency (MHz)")
plt.title("Beamlet statistics for " + metadata['station_name'] + metadata['antennafield'])
#plt.show()
plt.tight_layout()
log.info("Saving plots...")
fig.savefig(metadata['station_name'] + metadata['antennafield'] + "_dynspec.png", bbox_inches="tight")
log.info("Done.")
a_team, sol_system = [], []
station = ephem.Observer() # Set station geographic location below
geo_station_position = geographic_from_xyz(db.phase_centres[station_name])
station.lon = str(geo_station_position['lon_rad'] * (180. / np.pi))
station.lat = str(geo_station_position['lat_rad'] * (180. / np.pi))
station.elevation = 15. # Could not read elevation (altitude) properly from atenna position DB, set to fixed value
station.pressure = 0. # No refraction at horizon
station.horizon = '-0:34' # Idem
pointing_direction = Target()
pointing_direction.name = 'Target'
pointing_direction.elevation = []
pointing_direction.body = createFixedBody(pointing_coordinates[0], pointing_coordinates[1], epoch)
pointing_direction.body.compute(station)
for i, name in enumerate(fixed_body_names):
body = Target()
body.name = name
body.elevation = []
body.body = createFixedBody(fixed_body_coordinates[2*i], fixed_body_coordinates[2*i + 1], epoch)
body.body.compute(station)
a_team.append(body)
for name in sol_system_body_names:
sun = Target()
sun.name = name
sun.elevation = []
sun.body = ephem.Sun()
sun.body.compute(station)
sol_system.append(sun)
return a_team, sol_system, pointing_direction, station # Speed of light
c = 299792458.0 # m/s
if __name__ == "__main__":
desc = 'Plot bright sources in grating lobes over BST HBA dynamic spectra.'
if __name__ == "__main__": parser = argparse.ArgumentParser(description=desc)
parser = argparse.ArgumentParser(description=' Compute the sky tracks (ALT-AZ or RA-DEC) and dynamic spectra of HBA grating lobes.') parser.add_argument("filenames", help="SST filenames", nargs="*", metavar="FILE")
parser.add_argument("-p", "--pol", help="Polarization to plot dynamic spectrum [stokes/X/Y, default: stokes]", default="stokes")
parser.add_argument('--pointing_coordinates', type=str, nargs='+', default=[], parser.add_argument("-s", "--sources", type=str, nargs='+', default=[],
help='Pointing direction coordinates, RA DEC')
parser.add_argument('--frequency_range', type=float, nargs='+', default=[],
help='The frequency range over which we compute the sidelobes, MHz, e.g. 135 144.')
parser.add_argument('--freq_step', type=float, default=None,
help='Frequency step for calculation, MHz.')
parser.add_argument('--stations', type=str, default=None,
help='Station groups: ST, CS, RS, IS (super-terp, core, remote or international).')
parser.add_argument('--min_elevation', type=int, default=None,
help='Minimum elevation to which the calculation is performed in degrees.')
parser.add_argument('--timestep', type=int, default=None,
help='Timestep for calculation, in minutes.')
parser.add_argument('--fixed_body_names', type=str, nargs='+', default=[],
help='Array of names for the fixed sources, e.g. CasA CygA...') help='Array of names for the fixed sources, e.g. CasA CygA...')
parser.add_argument('--fixed_body_coordinates', type=str, nargs='+', default=[], parser.add_argument("-c", "--contours", default=True, action='store_false')
help='Array of RA, DEC coordinates for the fixed body sources ["19:59:28:3566", "+40:44:02.096", ...]')
parser.add_argument('--sol_system_body_names', type=str, nargs='+', default=[],
help='Solar system body names, "[Sun]".')
parser.add_argument('--start_date', type=str, default=None,
help='Date for which the calculation is performed e.g. 2024/05/08.')
parser.add_argument('--epoch', type=str, default='2000',
help='Epoch of the pointing direction coordinates, e.g. 2000')
parser.add_argument('--altaz', type=bool, default=False,
help='Is the plotting to be done in Equatorial or local coordinates; True/False.')
parser.add_argument('--dynspec_source', type=str, default=None,
help='Which source to plot a dynspec for; one of the fixed_body_names, e.g. "CasA".')
args = parser.parse_args() args = parser.parse_args()
# All sky grid
npix = 205
width = 1.0
lgrid, mgrid = np.meshgrid(np.linspace(-width, width, npix),
np.linspace(-width, width, npix))
# Load the LOFAR antenna database # Load the LOFAR antenna database
db = lofarantpos.db.LofarAntennaDatabase() db = lofarantpos.db.LofarAntennaDatabase()
# Speed of light (in m/s)
c = 299792458.0
# Station groups
# Super-terp stations
st_stations =["CS002HBA0", "CS002HBA1",
"CS003HBA0", "CS003HBA1",
"CS004HBA0", "CS004HBA1",
"CS005HBA0", "CS005HBA1",
"CS006HBA0", "CS006HBA1",
"CS007HBA0", "CS007HBA1"]
# Core minus super-terp stations
core_stations = ["CS001HBA0", "CS001HBA1",
"CS011HBA0", "CS011HBA1",
"CS013HBA0", "CS013HBA1",
"CS001HBA0", "CS001HBA1",
"CS017HBA0", "CS017HBA1",
"CS021HBA0", "CS021HBA1",
"CS024HBA0", "CS024HBA1",
"CS026HBA0", "CS026HBA1",
"CS028HBA0", "CS028HBA1",
"CS030HBA0", "CS030HBA1",
"CS031HBA0", "CS031HBA1",
"CS032HBA0", "CS032HBA1",
"CS101HBA0", "CS101HBA1",
"CS103HBA0", "CS103HBA1",
"CS201HBA0", "CS201HBA1",
"CS301HBA0", "CS301HBA1",
"CS302HBA0", "CS302HBA1",
"CS401HBA0", "CS401HBA1",
"CS501HBA0", "CS501HBA1"]
remote_stations = ["RS106HBA", "RS205HBA",
"RS208HBA", "RS210HBA",
"RS305HBA", "RS306HBA",
"RS307HBA", "RS310HBA",
"RS406HBA", "RS407HBA",
"RS409HBA", "RS503HBA",
"RS508HBA", "RS509HBA"]
international_stations = ["DE601HBA", "DE602HBA",
"DE603HBA", "DE604HBA",
"DE605HBA", "DE609HBA",
"FR606HBA", "IE613HBA",
"PL610HBA", "PL611HBA",
"PL612HBA", "LV614HBA",
"SE607HBA", "UK608HBA"]
try: try:
if len(args.pointing_coordinates) == 0: if args.filenames is None or len(args.filenames) == 0:
raise ValueError('Please enter a valid pointing direction, hh:mm:ss.s dd:mm:ss.s".') raise ValueError('Please provide valid data file(s)')
else:
pointing_coordinates = args.pointing_coordinates
log.info("Pointing: {}".format(pointing_coordinates))
except ValueError as e:
log.error(e)
if len(args.frequency_range) == 0:
frequency_range = [135., 144.]
log.info("Setting frequency range to {} MHz".format(frequency_range))
else:
frequency_range = args.frequency_range
log.info("Frequency range: {}".format(frequency_range))
if args.stations is None:
stations = 'ST'
log.info("Selected superterp stations.")
else:
stations = args.stations
if args.min_elevation is None:
min_elevation = 15
log.info("Setting minimum elevation to {} degrees".format(min_elevation))
else:
min_elevation = args.min_elevation
if args.timestep is None:
timestep = 5
log.info("Setting timestep to {} minutes".format(timestep))
else:
timestep = args.timestep
if args.freq_step is None:
freq_step = 1.
log.info("Setting frequency step to {} MHz".format(freq_step))
else:
freq_step = args.freq_step
if len(args.fixed_body_names) == 0:
fixed_body_names = ['CygA', 'CasA', 'TauA', 'VirA']
log.info("Setting celestial sources to {}".format(fixed_body_names))
else:
fixed_body_names = args.fixed_body_names
log.info("Provided celestial sources: {}".format(fixed_body_names))
if len(args.fixed_body_coordinates) == 0:
fixed_body_coordinates = ['19:59:28:3566', '+40:44:02.096',
'23:23:27.94', '+58:48:42.4',
'05:34:31.971', '+22:00:52.06',
'12:30:49.4233', '+12:23:28.043']
else:
fixed_body_coordinates = args.fixed_body_coordinates
log.info("Provided coordinates for celestial sources: {}".format(fixed_body_coordinates))
if len(args.sol_system_body_names) == 0:
sol_system_body_names = ['Sun']
log.info("Setting solar system bodies to {}".format(sol_system_body_names))
else:
sol_system_body_names = args.sol_system_body_names
try:
if args.start_date is None:
raise ValueError('Please enter a valid start date.')
else: else:
start_date = args.start_date filenames = args.filenames
log.info("Start date: {}".format(start_date)) log.info("Filename(s): {}".format(filenames))
except ValueError as e: except ValueError as e:
log.error(e) log.error(e)
if args.dynspec_source is None: if len(args.sources) == 0:
dynspec_source = 'CasA' sources = ['Cyg A', 'Cas A', 'Tau A', 'Vir A']
log.info("Creating dynspec plot for {}".format(dynspec_source)) log.info("Overplotting sidelobe contours for: {}".format(sources))
else:
dynspec_source = args.dynspec_source
if stations == 'ST':
stations = st_stations
elif stations == 'CS':
stations = core_stations
elif stations == 'RS':
stations = remote_stations
elif stations == 'IS':
stations = international_stations
else: else:
log.error('Invalid station group name provided.') sources = args.sources
raise ValueError('Invalid station group name provided.') log.info("Overplotting sidelobe contours for: {}".format(sources))
epoch = args.epoch log.info("Overplotting beam contours: {}".format(args.contours))
altaz = args.altaz metadata, data = read_bst(args.filenames)
freq_arr = np.arange(frequency_range[0], frequency_range[1], freq_step) * 1.e6 sap_pointings = metadata["digital_beam_pointing_directions"]
tile_pointing = metadata["tile_beam_pointing_direction"]
rows, cols = len(stations), 1 # TODO implement logic for more than one SAP
psaps = SkyCoord(sap_pointings.split(', ')[0].split(' (')[1].split('rad')[0], sap_pointings.split(', ')[1].split('rad')[0], unit="rad", frame="icrs")
ptile = SkyCoord(tile_pointing.split(', ')[0].split(' (')[1].split('rad')[0], tile_pointing.split(', ')[1].split('rad')[0], unit="rad", frame="icrs")
if stations in ['ST', 'CS']: log.info("SAP pointing(s): {}".format(psaps))
cols = 2 log.info("Tile pointing: {}".format(ptile))
fig, ax = plt.subplots(rows, cols, figsize=(10, 25), subplot_kw={'projection': 'polar'}) log.info("Station name: {}".format(metadata['station_name']+metadata['antennafield']))
ax = ax.ravel() station_name = metadata['station_name']+metadata['antennafield']
loc = db.phase_centres[station_name]
loc = EarthLocation.from_geocentric(*loc * u.m)
fig1, ax1 = plt.subplots(rows, cols, figsize=(10, 25)) log.info("Station location: {}".format(loc))
ax1 = ax1.ravel() t = metadata["timestamps"]
ntime = len(t)
band_x, band_y = metadata["frequency_bands"][0]
freq = np.array([freq_from_sb(sb, band_x) for sb in metadata["subbands"]])
if altaz: # Compute zenith
log.info("Plotting ALT-AZ sky plots.") zenith = SkyCoord(az=np.zeros(ntime), alt=90 * np.ones(ntime), unit="deg",
else: obstime=t, location=loc, frame="altaz").transform_to(GCRS)
log.info("Plotting RA-DEC sky plots.")
# Loop over stations
for i, station_name in enumerate(stations):
log.info("Calculating for station {}".format(station_name)) # Get LMN coordinates of the tile and SAP pointings, and the A-team source
ltile, mtile, _ = skycoord_to_lmn(ptile, zenith)
lsap, msap, _ = skycoord_to_lmn(psaps, zenith) # handle more SAPs
# Get antenna positions # Get element positions within tile
p, q, r = db.hba_dipole_pqr(station_name).T p, q, r = db.hba_dipole_pqr(station_name).T
# Initialize p, q, r = p[:16], q[:16], r[:16]
a_team, sol_system, pointing_direction, station = init_bodies(station_name, fixed_body_names, sol_system_body_names, fixed_body_coordinates, pointing_coordinates, epoch)
# Sidelobe sources
log.info("Calculating positions and dynspec for A-team sources...") beam_maps = {}
for at in a_team: if args.contours:
log.info("Calculating for {}...".format(at.name)) cmap = mcm.get_cmap('Spectral')
station.date = start_date for i in range(len(sources)):
end_of_day = station.date psrc = get_icrs_coordinates(sources[i])
end_of_day += 1. lsrc, msrc, _ = skycoord_to_lmn(psrc, zenith)
az, el, time = compute_fixed_sources_altaz(station, end_of_day, at) tilebeam = voltage_beam(lsrc, msrc, ltile, mtile, p, q, r, freq, 1.0)
# Compute beam per source
if altaz: p, q, r = db.antenna_pqr(station_name).T
ax[i].plot(az, el, linestyle=None, label=at.name) beam = voltage_beam(lsrc, msrc, lsap, msap, p, q, r, freq, tilebeam)
ax[i].annotate(str(time[0])+'h', (az[0], el[0]), textcoords="offset points", xytext=(0,10), ha='center') beam_map = np.abs(beam.T)**2
ax[i].annotate(str(time[-2])+'h', (az[-2], el[-2]), textcoords="offset points", xytext=(0,10), ha='center') color = cmap(i / len(sources))
if dynspec_source == at.name: beam_maps[color] = (sources[i], beam_map)
dynspec = get_dynspec_for_source(station, end_of_day, pointing_direction, at, freq_arr, c, p, q, r)
ax1[i].imshow(dynspec, origin='lower', extent=[time[0], time[-2], freq_arr[0] / 1.e6, freq_arr[-1] / 1.e6], aspect='equal') plot_bst_dynamic_spectrum(metadata, data, args.pol, beam_maps, args.contours)
else:
ax[i].scatter(ephem.degrees(at.body.ra), at.body.dec * (180. / np.pi), label=at.name)
if dynspec_source == at.name:
log.info("Calculating dynspec plot for {}".format(dynspec_source))
dynspec = get_dynspec_for_source(station, end_of_day, pointing_direction, at, freq_arr, c, p, q, r)
ax1[i].imshow(dynspec, origin='lower', extent=[time[0], time[-2], freq_arr[0] / 1.e6, freq_arr[-1] / 1.e6], aspect='equal')
ax1[i].set_title("Dynspec for station " + station_name, pad=10)
ax1[i].set_xlabel("Time [UT]")
ax1[i].set_ylabel("Frequency [MHz]")
log.info("Calculating positions and dynspec for Solar system sources...")
for sol in sol_system:
station.date = start_date
end_of_day = station.date
end_of_day += 1.
az, el, time = compute_fixed_sources_altaz(station, end_of_day, sol)
if altaz:
ax[i].plot(az, el, linestyle=None, label=sol.name)
ax[i].annotate(str(time[0])+'h', (az[0], el[0]), textcoords="offset points", xytext=(0,10), ha='center')
ax[i].annotate(str(time[-2])+'h', (az[-2], el[-2]), textcoords="offset points", xytext=(0,10), ha='center')
if dynspec_source == sol.name:
dynspec = get_dynspec_for_source(station, end_of_day, pointing_direction, sol, freq_arr, c, p, q, r)
ax1[i].imshow(dynspec, origin='lower', extent=[time[0], time[-2], freq_arr[0] / 1.e6, freq_arr[-1] / 1.e6], aspect='equal')
else:
ax[i].scatter(ephem.degrees(sol.body.ra), sol.body.dec * (180. /np.pi), label=sol.name)
if dynspec_source == sol.name:
dynspec = get_dynspec_for_source(station, end_of_day, pointing_direction, sol, freq_arr, c, p, q, r)
ax1[i].imshow(dynspec, origin='lower', extent=[time[0], time[-2], freq_arr[0] / 1.e6, freq_arr[-1] / 1.e6], aspect='equal')
ax1[i].set_title("Dynspec for station " + station_name, pad=10)
ax1[i].set_xlabel("Time [UT]")
ax1[i].set_ylabel("Frequency [MHz]")
log.info("Calculating frequency dependence of grating lobe positions...")
for freq in freq_arr:
station.date = start_date
end_of_day = station.date
end_of_day += 1.
long, lat, time = compute_lobes(station, end_of_day, pointing_direction, min_elevation, freq * 1.e6, c, p, q, r)
if len(long) != 0 and len(lat) != 0 and len(time) != 0:
if altaz:
ax[i].scatter(long, lat, marker='x', s=40, label='lobe: ' + str(freq) + ' MHz')
else:
ax[i].scatter(long, lat, marker='x', s=40, label='lobe: ' + str(freq) + ' MHz')
ax[i].annotate(str(time[0])+'h', (long[0], lat[0]), textcoords="offset points", xytext=(0,10), ha='center')
if len(time) > 4:
ax[i].annotate(str(time[-2])+'h', (long[-2], lat[-2]), textcoords="offset points", xytext=(0,10), ha='center')
else:
continue
leg_angle = np.deg2rad(10.5)
ax[i].legend(loc="lower left", bbox_to_anchor=(.5 + np.cos(leg_angle)/2, .5 + np.sin(leg_angle)/2))
if altaz:
ax[i].set_rlim(bottom=90, top=min_elevation)
ax[i].set_title("Grating lobe positions for station " + station_name + " [ALT/AZ]", pad=10)
#ax[i].set_aspect(1.0)
else:
ax[i].set_rlim(bottom=90, top=-30)
ax[i].set_title("Grating lobe positions for station " + station_name + " [RA/DEC]", pad=10)
plt.tight_layout()
log.info("Saving plots...")
fig.savefig("Sky_positions.png", bbox_inches="tight", overwrite=True)
fig1.savefig("Dynamic_spectrum.png", bbox_inches="tight", overwrite=True)
log.info("Done.")
#plt.show()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment