diff --git a/bin/define_tasks_from_archive.py b/bin/define_tasks_from_archive.py index a70c25032fe7234dfdfce6448e7b77c0741df50d..4f75b16c8d8dda6d0216daed237d7719cf16fb43 100644 --- a/bin/define_tasks_from_archive.py +++ b/bin/define_tasks_from_archive.py @@ -24,6 +24,7 @@ def parse_args(): parser.add_argument('--config', help='path to configuration file', type=str) parser.add_argument('workflow', help='workflow uri', type=str) parser.add_argument('filter', help='filter string', type=str) + parser.add_argument('--averaging_window', help='size of window in s', type=float, default=1) parser.add_argument('sas_ids', help='list of SAS IDS', nargs='+') return parser.parse_args() @@ -44,7 +45,6 @@ SELECT FO.OBJECT_ID AS OBJECT_ID, def read_user_credentials(file_paths: Union[str, List[str]]) -> Dict: file_paths = list(map(os.path.expanduser, map(os.path.expandvars, file_paths))) - parser = ConfigParser() parser.read(file_paths) credentials = {'user': parser['global']['database_user'], 'password': parser['global']['database_password']} @@ -85,15 +85,17 @@ def parse_surl_for_info(surl): return site, result -def create_payload_from_entry(entry, filter_str, workflow, purge_policy='no', predecessor=None): +def create_payload_from_entry(entry, filter_str, workflow, avg_window, purge_policy='no', predecessor=None): size = entry['filesize'] site, payload = parse_surl_for_info(entry['primary_url']) - inputs_payload = [ - {'size': size, + inputs_payload = { + 'surls': + [{'size': size, 'surl': entry['primary_url'], 'type': 'File', - 'location': site} - ] + 'location': site}], + 'averaging_window': avg_window + } ## default values payload['task_type'] = 'regular' @@ -148,7 +150,7 @@ def main(): table = find_surl_and_size_per_sasid(engine, sas_id) for index, line in table.iterrows(): logging.info('creating task for sas_id %s - dataset %s', sas_id, index) - payload = create_payload_from_entry(line, args.filter, args.workflow) + payload = create_payload_from_entry(line, args.filter, args.workflow, args.averaging_window) atdb_interface.submit_task(payload) diff --git a/bin/scintillation_utils.py b/bin/scintillation_utils.py old mode 100644 new mode 100755 index 818ae97437644015c1e8645a38cd563d44692589..ae8cc32caa6dd73177e3a17a391c0c545bb2a82d --- a/bin/scintillation_utils.py +++ b/bin/scintillation_utils.py @@ -1,3 +1,4 @@ +#!/home/mevius/bin/python3.8 from argparse import ArgumentParser import os import logging @@ -14,7 +15,7 @@ def parse_args(): parser.add_argument('--samples_size', help='Samples size in seconds', default=3600, type=float) parser.add_argument('--averaging_window', help='Averaging window in seconds', default=1, type=float) - parser.add_argument('--export_only_station', help='Selects only one station to be exported', default=None) + parser.add_argument('--export_stations', help='Selects stations to be exported, separate with space', default=None, type=str) return parser.parse_args() @@ -23,8 +24,13 @@ def main(): dataset = averaging.open_dataset(args.scintillation_dataset) metadata = averaging.extract_metadata(dataset) os.makedirs(args.output_directory, exist_ok=True) + if args.export_stations: + stations = args.export_stations.split() + else: + stations = [] for dynspec in metadata: - if args.export_only_station and args.export_only_station not in metadata[dynspec]['BEAM_STATIONS_LIST']: + + if len(stations) and not(metadata[dynspec]['BEAM_STATIONS_LIST'][0] in stations): continue averaging.split_samples(dynspec, metadata[dynspec], diff --git a/scintillation/Calibrationlib.py b/scintillation/Calibrationlib.py new file mode 100644 index 0000000000000000000000000000000000000000..856cf5377f1d028d6da3c5d97a663a1e39dd6323 --- /dev/null +++ b/scintillation/Calibrationlib.py @@ -0,0 +1,287 @@ +import numpy as np +from scipy.ndimage import median_filter,gaussian_filter1d +from scipy.ndimage.filters import uniform_filter1d +from scipy.interpolate import interp1d +from astropy.convolution import interpolate_replace_nans + + +def model_flux(calibrator, frequency): + ''' + Calculates the model matrix for flux calibration for a range of known calibrators: + J0133-3629, 3C48, Fornax A, 3C 123, J0444+2809, 3C138, Pictor A, Taurus A, 3C147, 3C196, Hydra A, Virgo A, + 3C286, 3C295, Hercules A, 3C353, 3C380, Cygnus A, 3C444, Cassiopeia A + + Input: the calibrator name, frequency range, and time range + Output: the calibration matrix (in Jy) + Code adapted original from Peijin Zhang + ''' + parameters = [] + + Cal_dict = {'J0133-3629':[1.0440,-0.662,-0.225], + '3C48': [1.3253,-0.7553,-0.1914,0.0498], + 'For A': [2.218,-0.661], + 'ForA': [2.218,-0.661], + '3C123':[1.8017,-0.7884,-0.1035,-0.0248,0.0090], + 'J0444-2809':[0.9710,-0.894,-0.118], + '3C138':[1.0088,-0.4981,-0.155,-0.010,0.022,], + 'Pic A':[1.9380,-0.7470,-0.074], + 'Tau A':[2.9516,-0.217,-0.047,-0.067], + 'PicA':[1.9380,-0.7470,-0.074], + 'TauA':[2.9516,-0.217,-0.047,-0.067], + '3C147':[1.4516,-0.6961,-0.201,0.064,-0.046,0.029], + '3C196':[1.2872,-0.8530,-0.153,-0.0200,0.0201], + 'Hyd A':[1.7795,-0.9176,-0.084,-0.0139,0.030], + 'Vir A':[2.4466,-0.8116,-0.048], + 'HydA':[1.7795,-0.9176,-0.084,-0.0139,0.030], + 'VirA':[2.4466,-0.8116,-0.048], + '3C286':[1.2481 ,-0.4507 ,-0.1798 ,0.0357 ], + '3C295':[1.4701,-0.7658,-0.2780,-0.0347,0.0399], + 'Her A':[1.8298,-1.0247,-0.0951], + 'HerA':[1.8298,-1.0247,-0.0951], + '3C353':[1.8627,-0.6938,-0.100,-0.032], + '3C380':[1.2320,-0.791,0.095,0.098,-0.18,-0.16], + 'Cyg A':[3.3498,-1.0022,-0.225,0.023,0.043], + 'CygA':[3.3498,-1.0022,-0.225,0.023,0.043], + '3C444':[3.3498,-1.0022,-0.22,0.023,0.043], + 'Cas A':[3.3584,-0.7518,-0.035,-0.071], + 'CasA':[3.3584,-0.7518,-0.035,-0.071]} + if calibrator in Cal_dict.keys(): + parameters = Cal_dict[calibrator] + else: raise ValueError(calibrator, "is not in the calibrators list") + + flux_model = 0 + freqs = frequency*1.e-3# convert from MHz to GHz + for j,p in enumerate(parameters): + flux_model += p*np.log10(frequency)**j + flux_model = 10**flux_model # because at first the flux is in log10 + return flux_model + +def filter_rfi(data,nsigma=5,ntpts=50,nchans=10): + ''' + RFI mitigation strategy for dynamic spectra: + - median filter the data, in frequency dimension only + - flatten data by 10th percentile + - mitigate remaining spikes by standard deviation check across pass-band for each time sample + Assumes data are transposed such that [x,y] are [time,frequency]. + Input: + - A 2-D array of [time,frequency] + - number of sigma to use in thresholding + - Number of time points for flattening data, which must be low enough to not flatten also the scintillation + - Number of frequency channels for additional flattening of the data + Output: An array with flags + ''' + + + ntimepts,nfreqs = data.shape + + #flatten = median_filter(data,(ntpts,1),mode='constant',cval=1) + #faster to median filter every Nsamples and interpolate + cutoff = ntimepts - ntimepts%ntpts + medfilter = np.median(data[:cutoff].reshape((-1,ntpts,nfreqs)),axis=1) + xnew = np.arange(ntimepts) + x = xnew [:cutoff][::ntpts] + f = interp1d(x,medfilter,axis=0,fill_value='extrapolate') #interpolate to all skipped samples + flatten = f(xnew) + flatten = median_filter(flatten,(1,nchans),mode='constant',cval=1) + flatdata = data/flatten + nanmed = np.nanmedian(flatdata) + diff = abs(flatdata - nanmed) + sd = np.nanmedian(diff) # Median absolute deviation + maskdata = np.where(diff>sd*nsigma,1,0) + + return maskdata + + +def apply_bandpass(data, freqs, freqaxis=0, timeaxis=1, target = "Cas A", sample_rate= 180, filter_length = 300, rfi_filter_length =10, flag_value=10, replace_value=1): + '''apply a bandpass correction to the data, scaling the data to the nominal flux of the calibrator, also flag outliers and replace flagged data with replace_value + Input: + data: the numpy array with the data + freqs: numpy array of the freqs (MHz,for flux calibration) + freqaxis: frequency axis + timeaxis: time axis + target: name of the target (for flux calibration) + sample_rate: sample the data with this rate to speed up filtering + filter_length: lenght of the gaussian filter, typically half an hour for beam changes + flag_value: flag data above flag_value times nominal value + replace_value: replace flagged data with this value, if "interpolate" replace with the model flux + + Output: + scaled data + array of flags + array of flux values + + ''' + + flux = model_flux(target, freqs) + data = np.swapaxes(data,0,timeaxis) + + #medfilter = gaussian_filter1d(data[::sample_rate], axis=0 , sigma=filter_length) #or use median_filter? + medfilter = median_filter(data[::sample_rate], size=(filter_length,)+(1,)*(len(data.shape)-1)) #or use gaussian_filter? + xnew = np.arange(data.shape[0]) + x = xnew [::sample_rate] + f = interp1d(x,medfilter,axis=0,fill_value='extrapolate') #interpolate to all skipped samples + bandpass = f(xnew) + newshape = [1,]*len(data.shape) + if freqaxis == 0: + newshape[timeaxis] = flux.shape[0] + else: + newshape[freqaxis] = flux.shape[0] + data/=bandpass + flags = filter_rfi(data,nsigma=flag_value,ntpts=rfi_filter_length,nchans= 10) + tmpdata = np.copy(data) + if replace_value=="interpolate": + tmpdata [ flags>0] = np.nan + tmpdata = interpolate_replace_nans(tmpdata,np.ones((21,21))) + else: + + tmpdata [flags>0] = replace_value + + return np.swapaxes(tmpdata,0,timeaxis),np.swapaxes(flags,0,timeaxis),flux,np.swapaxes(bandpass,0,timeaxis) + +def getS4_fast(data, window_size, skip_sample_time=65, axis=1, has_nan=False): + '''Calculate S4 value for data along axis, Could be fast if it works with numba + Input: + data: numpy array with data (maximum resolution but RFI flagged) + window_size: int: the window to calculate S4, typically 60s and 180s + skip_sample_time: int: only calculate S4 every skip_sample_time (in samples, typically 1s) + has_nan: boolean, if True use the much slower nanmean function to ignore nans + stepsize: int, size of step through the data to reduce memorey usage + output: + numpy array with S4 values + new times axis array + ''' + # make sure axis to sample = 0-th axis: + tmpdata = np.swapaxes(data,0,axis) + ntimes = tmpdata.shape[0] + indices = np.arange(window_size) + + idx_step = np.arange(0,ntimes-window_size,skip_sample_time) + + idx = idx_step.reshape((-1,1)) + indices.reshape((1,-1)) + S4=np.zeros((idx.shape[0],)+tmpdata.shape[1:],dtype=tmpdata.dtype) + if has_nan: + for i in range(idx.shape[0]): + for j in range(window_size): + avgsqdata = np.sum(tmpdata[idx[i]]**2,axis=0)/(window_size - np.sum(np.isnan(tmpdata[idx[i]]),axis=0)) + avgdatasq = (np.sum(tmpdata[idx[i]],axis=0)/(window_size - np.sum(np.isnan(tmpdata[idx[i]]),axis=0)))**2 + S4[i] = np.sqrt((avgsqdata-avgdatasq)/avgdatasq) + else: + for i in range(idx.shape[0]): + avgsqdata = np.sum(tmpdata[idx[i]]**2,axis=0)/window_size + avgdatasq = np.sum(tmpdata[idx[i]],axis=0)**2/window_size**2 + S4[i] = np.sqrt((avgsqdata-avgdatasq)/avgdatasq) + + return np.swapaxes(S4,0,axis) + + +def window_stdv_mean(arr, window): + #Cool trick: you can compute the standard deviation + #given just the sum of squared values and the sum of values in the window. + c1 = uniform_filter1d(arr, window, mode='constant',axis=0) + c2 = uniform_filter1d(arr*arr, window, mode='constant',axis=0) + return (np.abs(c2 - c1*c1)**.5)[window//2:-window//2+1],c1[window//2:-window//2+1] + +def getS4(data, window_size, skip_sample_time=65, axis=1, has_nan=False): + '''Calculate S4 value for data along axis + Input: + data: numpy array with data (maximum resolution but RFI flagged) + window_size: int: the window to calculate S4, typically 60s and 180s + skip_sample_time: int: only calculate S4 every skip_sample_time (in samples, typically 1s) + has_nan: boolean, if True use the much slower nanmean function to ignore nans + stepsize: int, size of step through the data to reduce memorey usage + output: + numpy array with S4 values + new times axis array + ''' + # make sure axis to sample = 0-th axis: + tmpdata = np.swapaxes(data,0,axis) + stddata,avgdata = window_stdv_mean(tmpdata,window_size) + S4 = stddata[::skip_sample_time]/avgdata[::skip_sample_time] + return np.swapaxes(S4,0,axis) + +def getS4_slow(data, window_size, skip_sample_time=65, axis=1, has_nan=False): + '''Calculate S4 value for data along axis + Input: + data: numpy array with data (maximum resolution but RFI flagged) + window_size: int: the window to calculate S4, typically 60s and 180s + skip_sample_time: int: only calculate S4 every skip_sample_time (in samples, typically 1s) + has_nan: boolean, if True use the much slower nanmean function to ignore nans + stepsize: int, size of step through the data to reduce memorey usage + output: + numpy array with S4 values + new times axis array + ''' + # make sure axis to sample = 0-th axis: + tmpdata = np.swapaxes(data,0,axis) + ntimes = tmpdata.shape[0] + slides = sliding_window_view(tmpdata,window_size,axis=0)[::skip_sample_time] + if has_nan: + avgsqdata = np.nanmean(slides**2,axis=-1) + avgdatasq = np.nanmean(slides,axis=-1)**2 + else: + avgsqdata = np.mean(slides**2,axis=-1) + avgdatasq = np.mean(slides,axis=-1)**2 + S4 = np.sqrt(np.abs(avgsqdata-avgdatasq)/avgdatasq) + return np.swapaxes(S4,0,axis) + +def getS4_medium(data, window_size, skip_sample_time=65, axis=1, has_nan=False): + '''Calculate S4 value for data along axis + Input: + data: numpy array with data (maximum resolution but RFI flagged) + window_size: int: the window to calculate S4, typically 60s and 180s + skip_sample_time: int: only calculate S4 every skip_sample_time (in samples, typically 1s) + has_nan: boolean, if True use the much slower nanmean function to ignore nans + stepsize: int, size of step through the data to reduce memorey usage + output: + numpy array with S4 values + new times axis array + ''' + # make sure axis to sample = 0-th axis: + tmpdata = np.swapaxes(data,0,axis) + ntimes = tmpdata.shape[0] + indices = np.arange(window_size) + + idx_step = np.arange(0,ntimes-window_size,skip_sample_time) + + idx = idx_step[:,np.newaxis]+indices[np.newaxis] + + S4=[] + if has_nan: + for i in range(idx.shape[0]): + avgsqdata = np.nanmean(tmpdata[idx[i]]**2,axis=0) + avgdatasq = np.nanmean(tmpdata[idx[i]],axis=0)**2 + S4.append(np.sqrt(np.abs(avgsqdata-avgdatasq)/avgdatasq)) + else: + for i in range(idx.shape[0]): + avgsqdata = np.mean(tmpdata[idx[i]]**2,axis=0) + avgdatasq = np.mean(tmpdata[idx[i]],axis=0)**2 + S4.append(np.sqrt(np.abs(avgsqdata-avgdatasq)/avgdatasq)) + + S4 = np.array(S4) + return np.swapaxes(S4,0,axis) + + +def getMedian(data,Nsamples,axis=0, flags=None, bandpass = None, has_nan=False): + '''average the data using median over Nsamples samples, ignore last samples <Nsamples. If has_nan, use the much slower nanmedian function''' + #flags is None or has same shape as data + #make sure average axis is first + tmpdata = np.swapaxes(data,0,axis) + cutoff = (tmpdata.shape[0]//Nsamples)*Nsamples + + tmpdata = tmpdata[:cutoff].reshape((-1,Nsamples)+tmpdata.shape[1:]) + if has_nan: + avgdata = np.nanmedian(tmpdata,axis=1) + else: + avgdata = np.median(tmpdata,axis=1) + if not flags is None: + flags = np.swapaxes(flags,0,axis).astype(float) + flags = np.average(flags[:cutoff].reshape((-1,Nsamples)+avgdata.shape[1:]),axis=1) + if not bandpass is None: + bandpass = np.swapaxes(bandpass,0,axis).astype(float) + bandpass = np.average(bandpass[:cutoff].reshape((-1,Nsamples)+avgdata.shape[1:]),axis=1) + + return np.swapaxes(avgdata,0,axis),np.swapaxes(flags,0,axis),np.swapaxes(bandpass,0,axis) #swap back + + + + diff --git a/scintillation/averaging.py b/scintillation/averaging.py index 09f2ac34a058ffb2473701a69d8da13cc2bc4b99..220bc7506178a43d6559e08c4616d0bdd31166a0 100644 --- a/scintillation/averaging.py +++ b/scintillation/averaging.py @@ -9,8 +9,13 @@ import logging from astropy.coordinates import EarthLocation, SkyCoord, AltAz from astropy.time import Time import astropy.io.fits as fits +import matplotlib +matplotlib.use('agg') import matplotlib.pyplot as plt import matplotlib.dates as mdates +from scintillation.Calibrationlib import apply_bandpass, getMedian, getS4 + + logging.basicConfig(format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO) @@ -115,7 +120,7 @@ def parse_args(): def open_dataset(path): if not os.path.exists(path): - raise FileNotFoundError(f'Cannot find file at {path}') + raise FileNotFoundError('Cannot find file at {}'.format(path)) return h5py.File(path, mode='r') @@ -205,7 +210,7 @@ def extract_metadata(dataset): return metadata_per_dynspec -def create_fits_from_dataset(sample_info, data_array, output_path): +def create_fits_from_dataset(sample_info, data_array, S4data, S4data180, flags, flux, bandpass, output_path): start_datetime = sample_info['sample_start_datetime'] end_datetime = sample_info['sample_end_datetime'] delta_seconds = (end_datetime - start_datetime).seconds / data_array.shape[0] @@ -215,7 +220,7 @@ def create_fits_from_dataset(sample_info, data_array, output_path): delta_freq = (end_freq - start_freq) / data_array.shape[1] hdu_lofar = fits.PrimaryHDU() - hdu_lofar.data = data_array[:, :, 0].T + hdu_lofar.data = data_array.T hdu_lofar.header['SIMPLE'] = True hdu_lofar.header['BITPIX'] = 8 hdu_lofar.header['NAXIS '] = 2 @@ -252,34 +257,72 @@ def create_fits_from_dataset(sample_info, data_array, output_path): hdu_lofar.header['DEC'] = sample_info['POINT_DEC'] hdu_lofar.header['HISTORY'] = ' ' - - full_hdu = fits.HDUList([hdu_lofar]) + S4_hdu = fits.ImageHDU(S4data.T, name='S4_60s') #to do , add header info + S4_hdu.header['CRVAL1'] = start_datetime.timestamp() + S4_hdu.header['CRPIX1'] = 0 + S4_hdu.header['CTYPE1'] = 'Time [UT]' + S4_hdu.header['CDELT1'] = delta_seconds + S4_hdu.header['CRVAL2'] = start_freq + S4_hdu.header['CRPIX2'] = 0 + S4_hdu.header['CTYPE2'] = 'Frequency [MHz]' + S4_hdu.header['CDELT2'] = delta_freq + + + S4_180hdu = fits.ImageHDU(S4data180.T, name='S4_180s') #to do , add header info + S4_180hdu.header['CRVAL1'] = start_datetime.timestamp() + S4_180hdu.header['CRPIX1'] = 0 + S4_180hdu.header['CTYPE1'] = 'Time [UT]' + S4_180hdu.header['CDELT1'] = delta_seconds + S4_180hdu.header['CRVAL2'] = start_freq + S4_180hdu.header['CRPIX2'] = 0 + S4_180hdu.header['CTYPE2'] = 'Frequency [MHz]' + S4_180hdu.header['CDELT2'] = delta_freq + flag_hdu = fits.ImageHDU(flags.T, name='flag_percentage') + flag_hdu.header['CRVAL1'] = start_datetime.timestamp() + flag_hdu.header['CRPIX1'] = 0 + flag_hdu.header['CTYPE1'] = 'Time [UT]' + flag_hdu.header['CDELT1'] = delta_seconds + flag_hdu.header['CRVAL2'] = start_freq + flag_hdu.header['CRPIX2'] = 0 + flag_hdu.header['CTYPE2'] = 'Frequency [MHz]' + flag_hdu.header['CDELT2'] = delta_freq + bp_hdu = fits.ImageHDU(bandpass.T, name='bandpass') + bp_hdu.header['CRVAL1'] = start_datetime.timestamp() + bp_hdu.header['CRPIX1'] = 0 + bp_hdu.header['CTYPE1'] = 'Time [UT]' + bp_hdu.header['CDELT1'] = delta_seconds + bp_hdu.header['CRVAL2'] = start_freq + bp_hdu.header['CRPIX2'] = 0 + bp_hdu.header['CTYPE2'] = 'Frequency [MHz]' + bp_hdu.header['CDELT2'] = delta_freq + flux_hdu = fits.ImageHDU(flux, name='Flux_spectrum') + flux_hdu.header['CRVAL1'] = start_freq + flux_hdu.header['CRPIX1'] = 0 + flux_hdu.header['CTYPE1'] = 'Frequency [MHz]' + flux_hdu.header['CDELT1'] = delta_freq + full_hdu = fits.HDUList([hdu_lofar,S4_hdu,S4_180hdu,flag_hdu,flux_hdu,bp_hdu]) full_hdu.writeto(output_path, overwrite=True) -def make_plot(data_array, time_axis, frequency_axis, station_name, plot_full_path): +def make_plot(data_array, time_axis, frequency_axis, station_name, plot_full_path, label="Dynspec"): fig = plt.figure(figsize=(6, 4), dpi=120) ax = plt.gca() start_time = datetime.fromtimestamp(time_axis[0]).strftime("%Y/%m/%d %H:%M:%S") datetime_axis = [datetime.fromtimestamp(time_s) for time_s in time_axis] - high_freq = frequency_axis > 30 times = mdates.date2num(datetime_axis) - title = f'Dynspec {station_name} - {start_time}' - data_fits_new = data_array - numpy.nanmean( - numpy.sort(data_array, 0)[ - int(data_array.shape[0] * 0.1):int(data_array.shape[0] * 0.3), :], 0) + title = '{label} {station_name} - {start_time}'.format(label=label,station_name=station_name,start_time=start_time) + data_fits_new = data_array + vmin = 0.7 + vmax = 1.5 - high_freq_mean = data_fits_new[:, high_freq, 0].mean() - high_freq_std = data_fits_new[:, high_freq, 0].std() - vmin = (high_freq_mean - 2 * high_freq_std) - vmax = (high_freq_mean + 3 * high_freq_std) - ax.imshow(data_fits_new[:, :, 0].T, origin='lower', aspect='auto', + im = ax.imshow(data_fits_new.T, origin='lower', interpolation = 'none', aspect='auto', vmin=vmin, vmax=vmax, extent=[times[0], times[-1], frequency_axis[0], frequency_axis[-1]], cmap='inferno') + fig.colorbar(im) plt.suptitle(title) ax.xaxis_date() ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) @@ -294,26 +337,51 @@ def make_plot(data_array, time_axis, frequency_axis, station_name, plot_full_pat plt.savefig(plot_full_path) plt.close(fig) +def make_S4plot(data_array, time_axis, frequency_axis, station_name, plot_full_path, label="S4"): + fig = plt.figure(figsize=(6, 4), dpi=120) + ax = plt.gca() + start_time = datetime.fromtimestamp(time_axis[0]).strftime("%Y/%m/%d %H:%M:%S") + datetime_axis = [datetime.fromtimestamp(time_s) for time_s in time_axis] + + times = mdates.date2num(datetime_axis) + title = '{label} {station_name} - {start_time}'.format(label=label,station_name=station_name,start_time=start_time) + + vmin = 0. + vmax = 0.3 -def create_averaged_dataset(sample_info, start_index, data_array): + im = ax.imshow(data_array.T, origin='lower', interpolation = 'none', aspect='auto', + vmin=vmin, + vmax=vmax, + extent=[times[0], times[-1], frequency_axis[0], frequency_axis[-1]], + cmap='plasma') + fig.colorbar(im) + plt.suptitle(title) + ax.xaxis_date() + ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M")) + ax.set_xlim( + [datetime(year=datetime_axis[0].year, month=datetime_axis[0].month, day=datetime_axis[0].day, + hour=datetime_axis[0].hour), + datetime(year=datetime_axis[0].year, month=datetime_axis[0].month, day=datetime_axis[0].day, + hour=datetime_axis[0].hour) + timedelta(hours=1)] + ) + ax.set_xlabel('Time (UT)') + ax.set_ylabel('Frequency (MHz)') + plt.savefig(plot_full_path) + plt.close(fig) + +def create_averaged_dataset(sample_info, data_array, flags = None, bandpass = None): average_window = sample_info['average_window_samples'] start_datetime, end_datetime = sample_info['sample_start_datetime'], sample_info['sample_end_datetime'] start_freq, end_freq = sample_info['sample_start_frequency'], sample_info['sample_end_frequency'] output_samples = sample_info['sample_time_samples'] - tmp_array = numpy.zeros([output_samples, *data_array.shape[1:]], dtype=numpy.float64) - time_axis = numpy.linspace(start_datetime.timestamp(), end_datetime.timestamp(), output_samples) frequency_axis = numpy.linspace(start_freq, end_freq, data_array.shape[1]) - for i in range(output_samples): - index = i * average_window + start_index - - tmp_array[i: i + 1, :, :] = numpy.median(data_array[index:index + average_window, :, :], axis=0) - - tmp_array[tmp_array > 0] = 10.0 * numpy.log10(tmp_array[tmp_array > 0]) - - return numpy.array(tmp_array, dtype=numpy.float32), time_axis, frequency_axis + avgdata,avgflags,avgbandpass = getMedian(data_array,average_window, axis=0, flags = flags, bandpass = bandpass, + has_nan=numpy.any(numpy.isnan(data_array))) + + return numpy.array(avgdata, dtype=numpy.float32), time_axis, frequency_axis, avgflags,avgbandpass def round_up_datetime(datet, interval): @@ -323,7 +391,6 @@ def round_up_datetime(datet, interval): def round_down_datetime(datet, interval): return datetime.fromtimestamp(numpy.floor(datet.timestamp() / interval) * interval) - def split_samples(dynspec_name, metadata, dataset: h5py.File, @@ -332,7 +399,7 @@ def split_samples(dynspec_name, out_directory): """ - :param dynspec_name: Dynspec tab naem + :param dynspec_name: Dynspec tab name :param dataset: dynspec dataset :param sample_window: sample window in seconds :param averaging_window: averaging window in seconds @@ -352,8 +419,10 @@ def split_samples(dynspec_name, station_name = decode_str(station_name) averaging_window_in_samples = int(numpy.ceil(averaging_window / time_delta)) averaging_window_in_seconds = averaging_window_in_samples * time_delta + S4_60s_window_in_samples = int(60./time_delta) + S4_180s_window_in_samples = int(180./time_delta) - data_array = dataset[dynspec_name]['DATA'] + data_array = dataset[dynspec_name]['DATA'][:,:,0] total_time_samples = data_array.shape[0] start_obs_datetime = round_down_datetime(obs_start_time, sample_window) @@ -365,7 +434,6 @@ def split_samples(dynspec_name, start_sample_datetime = round_down_datetime(start_obs_datetime + timedelta(seconds=sample_window * i), averaging_window) end_sample_datetime = round_up_datetime(start_obs_datetime + timedelta(seconds=sample_window * (i + 1)), - averaging_window) indexs = numpy.where(numpy.logical_and(time_obs > start_sample_datetime.timestamp(), @@ -386,17 +454,41 @@ def split_samples(dynspec_name, 'sample_end_datetime': datetime.fromtimestamp(time_obs[end_index]), 'n_time_samples': len(indexs), 'sample_start_frequency': start_frequency, - 'sample_end_frequency': end_frequency, - **metadata - } - - averaged_data_array, time_axis, frequency_axis = create_averaged_dataset(sample_info, start_index, - data_array) - make_plot(averaged_data_array, time_axis, frequency_axis, station_name, full_path + '.png') - create_fits_from_dataset(sample_info, averaged_data_array, full_path + '.fits') + 'sample_end_frequency': end_frequency} + sample_info.update(metadata) + + + frequency_axis = numpy.linspace(start_frequency, end_frequency, data_array.shape[1]) + filtered_data,flags,flux,bandpass = apply_bandpass(data_array[start_index:end_index], frequency_axis, + freqaxis=1, timeaxis=0, target = metadata["TARGET"], + sample_rate= int(averaging_window_in_samples*3./averaging_window_in_seconds), + #sample every 3 seconds + filter_length = 600, # window size 30 minutes + rfi_filter_length = averaging_window_in_samples//2, + # window size in time to prevent flagging scintillation + flag_value=8, replace_value=1) + S4_data_array = getS4(filtered_data, + window_size = S4_60s_window_in_samples, #180 seconds + skip_sample_time = averaging_window_in_samples, + #create S4 every averaging time + axis=0, has_nan=False) + S4_data_array180 = getS4(filtered_data, + window_size = S4_180s_window_in_samples, #180 seconds + skip_sample_time = averaging_window_in_samples, + #create S4 every averaging time + axis=0, has_nan=False) + averaged_data_array, time_axis, frequency_axis,flags, bandpass = create_averaged_dataset(sample_info, + data_array[start_index:end_index], + flags, + bandpass) #make sure the data is shaped to contain integer of window + + make_plot(averaged_data_array, + time_axis, frequency_axis, station_name, full_path + '.png') + make_S4plot(S4_data_array, time_axis, frequency_axis, station_name, full_path + '_S4.png',label = 'S4 60s') + make_S4plot(S4_data_array180, time_axis, frequency_axis, station_name, full_path + '_S4_180.png',label = 'S4 180s') + create_fits_from_dataset(sample_info, averaged_data_array, S4_data_array, S4_data_array180, flags, flux, bandpass, full_path + '.fits') store_metadata(sample_info, full_path + '.json') - def store_metadata(metadata, path): with open(path, 'w') as fout: json.dump(metadata, fout, cls=SmartJsonEncoder, indent=True)