Skip to content
Snippets Groups Projects
Commit ee65fa64 authored by Felix  Semler's avatar Felix Semler
Browse files

Working indexer

parent b455eea3
No related branches found
No related tags found
1 merge request!2Merge Master
import codecs import csv
import struct import os
import datetime
file = open('Reading_MT/F000001.MT','rb')
data = file.read()
file.close()
""" """
Dataset contains fixed blocks of blocklength 3840 bytes (characters in EBCDIC code) Dataset contains fixed blocks of blocklength 3840 bytes (characters in EBCDIC code)
each block 30 records of 128 bytes each block 30 records of 128 bytes
...@@ -17,6 +10,13 @@ Block nr | Type ...@@ -17,6 +10,13 @@ Block nr | Type
1 | File Descriptor 1 | File Descriptor
2,3 | Observation Header 2,3 | Observation Header
Types (little endian):
C - UTF-8
H - Int16
F - Int32
... ...
...@@ -24,13 +24,130 @@ Block nr | Type ...@@ -24,13 +24,130 @@ Block nr | Type
Use codecs.decode(data, 'cp500') to read strings (C) Use codecs.decode(data, 'cp500') to read strings (C)
Use struct.unpack('>l',data) to read 4byte long (F?) Use struct.unpack('>l',data) to read 4byte long (F?)
""" """
#Read first block of data def get_metadata(byte_data, full=False):
block_1 = data[:3840:] """Gets metadata of MT file according to Tape Version 7"""
metadata = {}
#Read first block of data - File Descriptor
block_1 = byte_data[:3840:]
#Seperate Records
record_size = 128 record_size = 128
records = 30 records = 30
id_data = block_1[:24] block_1 = [block_1[i:i+record_size] for i in range(0,records*record_size, record_size)]
block_1 = [block_1[24+i:24+i+record_size] for i in range(0,records*record_size, record_size)]
print((block_1[0:2])) metadata['FD_dataset_civil_year'] = int(str(int.from_bytes(block_1[0][116:120],"little"))[:2])
#print(codecs.decode(block_1[0],'unicode-escape'))
print(type(block_1[0])) metadata['FD_ut_day_nr'] = int.from_bytes(block_1[0][16:18],"little")
\ No newline at end of file #Below in units of 10 seconds
metadata['FD_ut_time_sec'] = int.from_bytes(block_1[0][18:20],"little")*10
if full: #Not required as one can discriminate using only the above 2
metadata['FD_tape_format'] = int.from_bytes(block_1[0][24:26],"little")
#The below are with respect to the creation of the dataset
metadata['FD_dataset_civil_day'] = int(str(int.from_bytes(block_1[0][116:120],"little"))[-3:])
metadata['FD_tape_name'] = block_1[0][104:110].decode('utf-8')
metadata['FD_dataset_name'] = block_1[0][110:114].decode('utf-8')
metadata['OH_day'] = int.from_bytes(block_1[1][16:18],'little')
metadata['OH_record_number'] = int.from_bytes(block_1[1][4:8],'little')
metadata['OH_start_UT'] = int.from_bytes(block_1[1][18:20],'little')*10
all = block_1[1][48:60]
all = [int.from_bytes(all[i*2:i*2+2],'little') for i in range(len(all)//2)]
#This is wrong for the test file used! Different tape versions 6 (manual) vs 7 (file)
metadata['OH_start(Epoch,Year,Month,Day,Hour,Min)'] = all
return metadata
def unpack_csv(csv=None):
if csv == None:
csv = 'Reading_MT//wsrtobs.csv'
file_ = open(csv)
data = file_.read()
file_.close()
data = data.split('\n')
csv = [data[i].split(',') for i in range(len(data))]
del data
return csv
def get_SEQNUMBER(file_meta, csv=None):
"""
Gets Record number and SEQNUMBER of Magnetic Tape file by checking against wsrtobs-des.txt
The file_meta (dict) parameter is the metadata as returned by get_metadata
csv (str/list of lists) is the wsrtobs-des.txt file, not required to be provided if in the same directory,
otherwise give relative or absolute path, list of lists is given by unpack_csv
returns (record number, SEQNUMBER)
"""
#Load csv file if required
if csv is None or type(csv) == str:
csv = unpack_csv(csv=csv)
#Below in timedelta -1 as we want days from year 0 0 rather than 1 1
date = datetime.date((int('19'+str(file_meta['FD_dataset_civil_year']))),1,1) + datetime.timedelta(file_meta['FD_ut_day_nr']-1)
#Format like in the csv
date_str = '"{}"'.format(date.strftime("%d/%m/%Y"))
for i in range(len(csv)):
#First check the date
try:
if csv[i][39] == date_str:
#Then check time in seconds
if int(csv[i][40]) == file_meta['FD_ut_time_sec']:
return (int(csv[i][0]), int(csv[i][1]))
except:
return None
def do_dir(parent_dir, csv=None):
"""
Iterates entire directory and checks files for Record number and SEQNUMBER
csv (path like) -- the csv file against which to check (wsrtobs.csv) if None will look for it in current directory
parent_dir (path like) -- the directory to be checked
"""
#this will contain a lists with the filepath and its corresponding values
data = [['filepath', 'Record', 'SEQNUMBER']]
#Get csv file once
csv=unpack_csv(csv=csv)
#Iterate over all files
for subdir, dirs, files in os.walk(parent_dir):
for file in files:
if file[-3::] == '.MT':
try:
#Open and read MT file
file_ = open(os.path.join(subdir, file),'rb')
data_ = file_.read()
file_.close()
#Get relevant metadata
metadata = get_metadata(data_,full=False)
del data_
#Get indexers
indexers = get_SEQNUMBER(metadata,csv = csv)
#Append to data
data.append([str(os.path.join(subdir, file)), indexers[0], indexers[1]])
except Exception as e:
print('An error occured with {}'.format(os.path.join(subdir, file)),e)
return data
data = do_dir('Reading_MT')
print(data)
'''#To check one file:
file = open('Reading_MT/F000001.MT','rb')
data = file.read()
file.close()
metadata = get_metadata(data)
for key in metadata:
print(key, metadata[key])
indexers = get_SEQNUMBER(metadata)
do_dir(csv=1, parent_dir='.')
print(indexers)'''
\ No newline at end of file
Source diff could not be displayed: it is too large. Options to address this: view the blob.
test 0 → 100644
%% Cell type:code id: tags:
``` python
import os
for subdir, dirs, files in os.walk('Reading_MT'):
for file in files:
print(os.path.join(subdir, file))
```
%% Output
Reading_MT/F000001.MT
Reading_MT/wsrtobs.csv
Reading_MT/read_headers.py
Reading_MT/3/34
Reading_MT/3/33
Reading_MT/1/11
Reading_MT/2/23
Reading_MT/2/22
%% Cell type:code id: tags:
``` python
file_ = 'sometext.MT'
file_[-3::]
```
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment