diff --git a/docker/Dockerfile b/docker/Dockerfile index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..301597e2f4184e4913fb3f3c4d3424ec99e49bfa 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -0,0 +1,28 @@ +FROM ubuntu:24.04 + +# +# Lightweight container with minimum to run digitize3.py +# +ENV INSTALL_DIR=/usr/local + +RUN export DEBIAN_FRONTEND=noninteractive && \ + apt-get -y update && apt-get -y upgrade && \ + apt-get install -y python-is-python3 && \ + apt-get autoremove --purge && \ +# apt-get install -y wget python3 python3-pip python3-numpy git vim \ +# python3-pkgconfig libhdf5-dev hdf5-tools python3-setuptools cython3 && \ + apt-get install -y wget python3 python3-numpy git vim \ + libhdf5-dev hdf5-tools python3-h5py cwltool && \ + rm -rf /var/lib/apt/lists/* + +# Install cwltool and toil +#RUN python3 -m pip install cwltool toil[cwl] + +#RUN cd ${INSTALL_DIR} && wget https://files.pythonhosted.org/packages/cc/0c/5c2b0a88158682aeafb10c1c2b735df5bc31f165bfe192f2ee9f2a23b5f1/h5py-3.12.1.tar.gz && \ +# tar xvfz h5py-3.12.1.tar.gz && cd h5py-3.12.1 && python setup.py install --prefix=${INSTALL_DIR} + +#RUN cd ${INSTALL_DIR} && mkdir -p src && cd src/ && git clone https://git.astron.nl/RD/pulp2-cwl && \ +# cd pulp2-cwl/scripts && cp digitize3.py ${INSTALL_DIR}/bin +COPY ./scripts/digitize3.py ${INSTALL_DIR}/bin + +RUN digitize3.py -h diff --git a/scripts/digitize3.py b/scripts/digitize3.py new file mode 100755 index 0000000000000000000000000000000000000000..9e141964aec8d8cf712ef541a454be533835f97f --- /dev/null +++ b/scripts/digitize3.py @@ -0,0 +1,358 @@ +#!/usr/bin/env python3 +# +# -*- coding: utf-8 -*- +""" Routines to convert raw LOFAR data between 4-byte floats and 1-byte integers. + +Version 1.4 + + Main routine is convert_dtype + + Command Line usage: digitize3.py [-h] [-o OUTDIR] [-s NSIG] [--refloat] [-nc] [-v] + file_SAPXXX_BEAMXXX_SX_name.h5 + [file_SAPXXX_BEAMXXX_SX_name.h5 ...] + + Notes: + * we can only process files with one SUB_ARRAY_POINTING, one Beam, and one STOKES + * we assume the .raw files can be found from file.replace('.h5', '.raw') + * the float32 to int8 conversion creates a new h5 Dataset at the Beam level + with the name convention STOKES_X_i2f. + This array provides the scaling/mapping from int8 back to float32. + The dataset also has attributes 'STOKES_X_nsig', preserving the NSIG argument + 'STOKES_X_recsize', preserving the read buffer size + The dataset is deleted if direction='i2f', i..e. 'refloat'ing the byte stream + * digitization clips extreme fluctuations (nsig), so you may not recover the original + float stream. Noise will be clipped. + * We modify the STOKES_X 'DATATYPE' attribute, changing it from 'float' to 'int8' if direction == 'f2i' + 'int8' to 'float' if direction == 'i2f' + * We also change the dtype of the dataset in the HDF5 file, so that tools can directly + read the integer data. Unless those tools are aware of the STOKES_X_i2f table, the + rescaling will not be applied. For typical pulsar observations this just results in + forcible channel equalization. +""" + +import argparse +import numpy as np +import os, sys +import re +import shutil + +import h5py, h5py.h5d, h5py.h5t + +_lofar_dtypes = {'float':'>f4', 'int8':'>i1'} + +def convert_dtype(files, outdir, nsig=5., direction='f2i', check=True, verbose=None): + """ + Given a list of lofar h5 files, digitize the raw data to 'int8', + outputting it in outdir. + + Args: + files : list of h5 files + outdir : directory to output the digitized raw data and the h5 file + with the processing history + + nsig : digitize/map raw data stream from + (-Nsig*std(), +Nsig*std()) + to + [-128, 128) (signed int8) + This process clips outliers. + (default: 5.) + + direction : One of 'f2i' or 'i2f'. The 'i2f' routine undoes the original + digitization and should reproduce the original data. + (deafult: 'f2i') + + check : Check the h5 structure is consistent with the filename. + (default: True) + + verbose : if True provides more verbose output + + Notes: + * we can only process files with one SUB_ARRAY_POINTING, one Beam, and one STOKES + * we assume the .raw files can be found from file.replace('.h5', '.raw') + * the float32 to int8 conversion creates a new h5 Dataset at the Beam level + with the name convention STOKES_X_i2f. + This array provides the scaling/mapping from int8 back to float32. + The dataset also has attributes 'STOKES_X_nsig', preserving the NSIG argument + 'STOKES_X_recsize', preserving the read buffer size + The dataset is deleted if direction='i2f', i..e. 'refloat'ing the byte stream + * digitization clips extreme fluctuations (nsig), so you may not recover the original + float stream. Noise will be clipped. + * We modify the STOKES_X 'DATATYPE' attribute, changing it from 'float' to 'int8' if direction == 'f2i' + 'int8' to 'float' if direction == 'i2f' + + """ + if not isinstance(files, list): + files = [files] + + assert direction in ['f2i', 'i2f'] + + # check if outdir directory exists + if not os.path.isdir(outdir): + os.makedirs(outdir) + + for fname in files: + fraw = fname.replace('.h5', '.raw') + fraw_out = os.path.join(outdir, os.path.basename(fraw)) + if direction == 'f2i': + print("Digitizing %s to %s" % (fraw, fraw_out)) + elif direction == 'i2f': + print("unDigitizing %s to %s" % (fraw, fraw_out)) + + if os.path.abspath(fraw) == os.path.abspath(fraw_out): + print("Warning, this will overwrite input files") + + # copy the original h5 file to the outdir + fnew = os.path.join(outdir, os.path.basename(fname)) + shutil.copy2(fname, fnew) + + # get necessary processing information, possibly checking for consistency + nchan, ntint, dtype, nsamples, sap, beam, stokes = lofar_h5info(fnew, check=check, verbose=verbose) + if verbose: + print("\t Input data type is %s" % dtype) + print("\t Processing %s_%s_%s" % (sap, beam, stokes)) + if dtype=='int8' and direction=='f2i': + raise ValueError("input data is already integers but digitization requested") + if dtype=='float32' and direction=='i2f': + raise ValueError("input data is already floats but undigitization requested") + + + recsize = ntint * nchan + itemsize_i = np.dtype(dtype).itemsize + if direction == 'f2i': + itemsize_o = np.dtype('>i1').itemsize + elif direction == 'i2f': + itemsize_o = np.dtype('<f4').itemsize + + with open(fraw_out, 'wb', recsize*itemsize_o) as fh1out: + with open(fraw, 'rb', recsize*itemsize_i) as fh1: + with h5py.File(fnew, 'a') as h5: + if verbose >= 1: + print("\t",fraw," has dtype", dtype) + print("\t",fraw," has DATATYPE",h5[sap][beam][stokes].attrs['DATATYPE']) + # create/access dataset used to convert back to floats from int8's + if direction == 'f2i': + convert_dataset_dtype(h5[sap][beam][stokes],'i1') + h5[sap][beam][stokes].attrs['DATATYPE'] = 'int8' + # create the scaling data set + diginfo = h5[sap][beam].create_dataset( + "%s_i2f" % stokes, + (0, nchan), + maxshape=(None, nchan), + dtype='f') + diginfo.attrs['%s_recsize' % stokes] = recsize + diginfo.attrs['%s_nsig' % stokes] = nsig + out_dtype = h5[sap][beam][stokes].dtype + elif direction == 'i2f': + convert_dataset_dtype(h5[sap][beam][stokes],'<f4') + out_dtype = h5[sap][beam][stokes].dtype + h5[sap][beam][stokes].attrs['DATATYPE'] = 'float' + diginfo = h5[sap][beam]["%s_i2f" % stokes] + h5.flush() + if verbose >= 1: + print("\tInput dtype", dtype) + print("\tOutput dtype", out_dtype) + idx = 0 + while True: + raw1 = np.fromfile(fh1,count=recsize,dtype=dtype) + if len(raw1) == 0: + break + + raw1 = raw1.reshape(-1,nchan) + + if direction == 'f2i': + # record the stddev for possible recovery + #std = raw1.std(axis=0) # FIXME: assumes mean is zero + std = np.sqrt(np.mean(raw1.astype(np.float32)**2,axis=0)) + if np.any(np.isnan(std)): + raise ValueError("NaNs found in input data set; possible endianness or data type problems") + if np.any(std==0): + print("Warning: {0} channels in block {1} contain all zeros".format(np.sum(std==0), idx)) + std[std==0] = nsig # Avoid division by zero later + scale = nsig*std + digshape = diginfo.shape + diginfo.resize(digshape[0] + 1, axis=0) + + # apply the digitization + # by variance in individual channels + scale = nsig*std + intmap = (128*(raw1/scale)) + diginfo[idx] = scale/128. + # clip + if verbose >= 2: + nclip = np.sum(intmap> 127) + np.sum(intmap<-128) + print("Read# {0}, Clipping {1}/{2} samples".format(idx, nclip, intmap.size)) + raw1_o = np.clip(intmap, -128, 127).astype('>i1') + + elif direction == 'i2f': + # convert back to float32 + scale = diginfo[idx] + raw1_o = (raw1*scale).astype(out_dtype) + + raw1_o.tofile(fh1out) + + if verbose >= 3: + print("f2i rescale = {0}".format(scale)) + idx += 1 + + if direction == 'i2f': + del h5[sap][beam]["%s_i2f" % stokes] + +def convert_dataset_dtype(dataset, dtype): + """ + delete and recreate the dataset with a new dtype, preserving everything including external storage + """ + name = dataset.name.encode("utf-8") + parent = dataset.parent + space = dataset.id.get_space() + plist = dataset.id.get_create_plist() + attrs = dict(dataset.attrs) + del parent[name] + h5py.h5d.create(parent.id, + name, + h5py.h5t.py_create(dtype), + space, + dcpl=plist + ) + for k, v in attrs.items(): + parent[name].attrs[k] = v + + +def lofar_h5info(fname, check=True, verbose=None): + """ + open the lofar hdf5 file and return: + (nchan, ntint, recsize, dtype, SUBARRRAY_POINTING, BEAM, STOKES) + + Note: we only process files with one SUB_ARRAY_POINTING + and one BEAM per SUB_ARRAY_POINTING + + Args: + fname : h5 file + check : check that some of the h5 structure is consistent with + the filename. For instance, there is only one + SUB_ARRAY_POINTING and only one BEAM per file. + *_SAP???_B???_S?_* + verbose : if True provides more verbose output + + """ + with h5py.File(fname, 'r') as f5: + + # get SAP, beam, stokes from filename + info = re.search(r'SAP\d{3}_B\d{3}_S\d{1}_', fname) + if info is None: + print("small warning, filename conventions not met. ", end='') + #if not check: + # print("Processing first SAP, BEAM, and STOKES.") + + # get SAP, beam, stokes from hdf5 file. + # recall, we assume there is only one SAP, one BEAM and one STOKES per file + h5saps = sorted([i for i in f5.keys() if 'SUB_ARRAY_POINTING' in i]) + s0 = f5[h5saps[0]] + h5beams = sorted([i for i in s0.keys() if 'BEAM' in i]) + b0 = s0[h5beams[0]] + h5stokes = sorted([i for i in b0.keys() if 'STOKES' in i and 'i2f' not in i]) + st0 = b0[h5stokes[0]] + + if check: + force_fileconsistency(fname, h5saps, h5beams, h5stokes) + if verbose >= 1: + print("File produced by %s" % f5.attrs['BF_VERSION']) + + # future fix: currently only handle NOF_SUBBANDS + # with 1 channel in each subband + nchan = st0.attrs['NOF_SUBBANDS'] + if st0.attrs['DATATYPE']=='int8': + # some files exist where the dtype is set wrong + # in particular, the dtype can still be <f4 but the data + # might be integers + dtype = np.dtype('int8') + else: + # Try to infer the correct byte order from the dtype + #dtype = _lofar_dtypes[st0.attrs['DATATYPE']] + dtype = st0.dtype + # process sets of ~32 MB, hence // np.log2(nchan) + ntint = 2**25//4//int(np.log2(nchan)) # 16=~nchan -> power of 2, but sets of ~32 MB + nsamples = b0.attrs['NOF_SAMPLES'] + + return (nchan, ntint, dtype, nsamples, h5saps[0], h5beams[0], h5stokes[0]) + + +def force_fileconsistency(fname, h5saps, h5beams, h5stokes): + """ Raise Warning if the inferred sap, beam, and stokes + differ from the content of the hdf5 file + """ + # get subarray_pointing information from the filename + info = re.search(r'SAP\d{3}_B\d{3}_S\d{1}_', fname) + if info is None: + txt = "Filename %s doesn't adhere to convention *_SAP???_B???_S?_* " + txt += "processing may proceed with the -nc switch" + raise Warning(txt) + fsap = re.search(r'SAP\d{3}', info.group()).group().strip('SAP') + fbeam = re.search(r'_B\d{3}', info.group()).group().strip('_B') + fstokes = re.search(r'_S\d', info.group()).group().strip('_S') + + # parse the info from the hdf5 file + hsap = re.search(r'\d{3}', h5saps[0]).group() + hbeam = re.search(r'\d{3}', h5beams[0]).group() + hstokes = re.search(r'\d', h5stokes[0]).group() + + # compare the results, raising warnings as necessary + if len(h5saps) == 0: + raise Exception("No SUB_ARRAY_POINTINGS found") + elif len(h5saps) > 1: + raise Warning("Too many SUB_ARRAY_POINTINGS in %s." % fname, "Processing first.") + elif fsap != hsap: + raise Warning("Filename %s had SAP %s" % (fname, hsap)) + + if len(h5beams) == 0: + raise Exception("No BEAMS found in SAP%s" % saps[0]) + elif len(h5beams) > 1: + raise Warning("Too many BEAMS in SAP%s." % saps[0], "Processing first") + elif fbeam != hbeam: + raise Warning("Filename %s had BEAM %s" % (fname, hbeam)) + + if len(h5stokes) == 0: + raise Exception("No STOKES found in SAP%s_B%s" % (saps[0],beams[0])) + elif len(h5stokes) > 1: + raise Warning("Too many STOKES found in SAP%s_B%s." % (saps[0], beams[0]), "Processing first") + elif fstokes != hstokes: + raise Warning("Filename %s had STOKES %s" % (fname, hstokes)) + + +def CL_parser(): + parser = argparse.ArgumentParser(prog='digitize3.py', + description='digitize beamformed lofar data to int8, or convert back to original float32. ' + "Takes argument list of lofar hdf5 header files, raw files determined by filename.replace('.h5','.raw'). " + 'The digitized output copies the hdf5 file to ODIR, but with the processing information. ' + "Note, we can only process files with one SUB_ARRAY_POINTING, one BEAM, and one STOKES", + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('files', metavar='file_SAPXXX_BEAMXXX_SX_name.h5', nargs='+', + help='lofar hdf5 header files to process.') + + parser.add_argument('-o', '--output_dir', type=str, dest='outdir', + default='./', + help='output directory for the digitized files') + + parser.add_argument('-s', '--nsigma', type=float, dest='nsig', + default=5.0, + help='Clip raw data above this many stddevs.') + + parser.add_argument('--refloat', action='store_true', + help='Convert int8 back to float32 ("i2f").' + 'This only works on files having gone through the digitization process.') + + parser.add_argument('-nc', '--nocheck', action='store_true', + help='do not enforce filename consistency of *_SAP???_B???_S?_* with the actual hdf5 ' + 'structure for SUB_ARRAY_POINTING_???, BEAM_???, and STOKES_? .') + + parser.add_argument('-v', '--verbose', action='append_const', const=1) + return parser.parse_args(args=None if sys.argv[1:] else ['--help']) + +if __name__ == '__main__': + args = CL_parser() + check = not args.nocheck + args.verbose = 0 if args.verbose is None else sum(args.verbose) + if args.refloat: + direction = 'i2f' + else: + direction = 'f2i' + convert_dtype(args.files, args.outdir, args.nsig, direction=direction, check=check, verbose=args.verbose) diff --git a/steps/digitize.cwl b/steps/digitize.cwl index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..ec7d415f1f159398c70569a96a437aa9ef79eb6f 100644 --- a/steps/digitize.cwl +++ b/steps/digitize.cwl @@ -0,0 +1,62 @@ +cwlVersion: v1.2 +class: CommandLineTool + +label: Converts 32-bit raw LOFAR BF XXYY data to 8-bit +doc: | + This tool only performs digitizes raw 32-bit LOFAR BF XXYY data to int8. + Takes argument list of HDF5 header files, raw files determined by filename.replace('.h5','.raw'). + The digitized output copies the HDF5 file to output directory, but with the processing information. + +requirements: + - class: InlineJavascriptRequirement + - class: InitialWorkDirRequirement + listing: + - entry: $(inputs.h5in) + - entry: $(inputs.raw_in) + +baseCommand: digitize3.py +arguments: ["--verbose"] + +inputs: + - id: h5in + type: File[] + inputBinding: + position: 1 + doc: Input LOFAR BF .h5 files + + - id: raw_in + type: File[] + doc: Input raw 32-bit LOFAR BF .raw files + + - id: output_dir + type: string + inputBinding: + prefix: --output_dir= + separate: false + doc: Output directory for the digitized files + + - id: nsigma + type: float? + inputBinding: + prefix: --nsigma= + separate: false + doc: Clip raw data above this many stddevs + +outputs: + - id: out_dir + type: Directory + doc: directory for output 8-bit .h5 and .raw files + outputBinding: + glob: "$(inputs.output_dir)" + + - id: h5out + type: File[] + doc: Output 8-bit .h5 files + outputBinding: + glob: "$(inputs.output_dir)/*.h5" + + - id: raw_out + type: File[] + doc: Output 8-bit .raw files + outputBinding: + glob: "$(inputs.output_dir)/*.raw" diff --git a/tests/digitize_input.json b/tests/digitize_input.json new file mode 100644 index 0000000000000000000000000000000000000000..67dd7b1d60c6a95a6c879fc4090cfd338d013d47 --- /dev/null +++ b/tests/digitize_input.json @@ -0,0 +1,32 @@ +{ + "h5in": [ + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S0_P000_bf.h5" + }, + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S1_P000_bf.h5" + }, + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S2_P000_bf.h5" + }, + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S3_P000_bf.h5" + } + ], + "raw_in": [ + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S0_P000_bf.raw" + }, + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S1_P000_bf.raw" + }, + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S2_P000_bf.raw" + }, + { + "class": "File", "path": "/data/kondratiev/pulp2-data/digitize/L2049631/32-bit/L2049631_SAP000_B000_S3_P000_bf.raw" + } + ], + "output_dir": "8-bit", + "nsigma": 5.1 +} diff --git a/workflows/pulp2-xxyy-8bit-requantisation.cwl b/workflows/pulp2-xxyy-8bit-requantisation.cwl index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..b2856aee65e3b4740808208919a25b0261b14723 100644 --- a/workflows/pulp2-xxyy-8bit-requantisation.cwl +++ b/workflows/pulp2-xxyy-8bit-requantisation.cwl @@ -0,0 +1,55 @@ +cwlVersion: v1.2 +class: Workflow + +requirements: + - class: InlineJavascriptRequirement + +inputs: + - id: h5in + type: File[] + doc: Input LOFAR BF .h5 files + + - id: raw_in + type: File[] + doc: Input raw 32-bit LOFAR BF .raw files + + - id: output_dir + type: string + default: "8-bit" + doc: Output directory for the digitized files + + - id: nsigma + type: float? + default: 5.0 + doc: Clip raw data above this many stddevs + +steps: + digitize: + run: ../steps/digitize.cwl + label: Converts 32-bit raw LOFAR BF XXYY data to 8-bit + doc: | + This tool only performs digitizes raw 32-bit LOFAR BF XXYY data to int8. + Takes argument list of HDF5 header files, raw files determined by filename.replace('.h5','.raw'). + The digitized output copies the HDF5 file to output directory, but with the processing information. + in: + h5in: h5in + raw_in: raw_in + output_dir: output_dir + nsigma: nsigma + out: [odir, h5out, raw_out] + +outputs: + - id: h5out + type: File[] + outputSource: digitize/h5out + doc: Output 8-bit HDF5 + + - id: raw_out + type: File[] + outputSource: digitize/raw_out + doc: Output 8-bit .raw files + + - id: out_dir + type: Directory + outputSource: digitize/out_dir + doc: directory for output 8-bit .h5 and .raw files