-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathzst_head_reader.py
More file actions
125 lines (102 loc) · 3.92 KB
/
zst_head_reader.py
File metadata and controls
125 lines (102 loc) · 3.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
'''
Code Purpose: Read and parse the header of a PRESTO/SIGPROC filterbank file compressed with zstd
Note: Ripped from https://github.com/evanocathain/mockHeader/tree/master and modified to read from zstd-compressed files with python.
'''
#!/usr/bin/env python3
import argparse
import json
import struct
import subprocess
from typing import BinaryIO, Dict, Any, Optional
STRING_VALUE_KEYS = {"rawdatafile", "source_name"} # add more if present
INT_VALUE_KEYS = {
"machine_id", "telescope_id", "data_type",
"nchans", "nbits", "nifs", "nbeams", "ibeam",
}
DOUBLE_VALUE_KEYS = {
"src_raj", "src_dej", "az_start", "za_start",
"tstart", "tsamp", "fch1", "foff", "refdm", "period",
}
def read_exact(stream: BinaryIO, n: int) -> bytes:
b = stream.read(n)
if b is None or len(b) != n:
raise EOFError(f"Unexpected EOF while reading {n} bytes")
return b
def read_i32(stream: BinaryIO) -> int:
return struct.unpack("<i", read_exact(stream, 4))[0]
def read_f64(stream: BinaryIO) -> float:
return struct.unpack("<d", read_exact(stream, 8))[0]
def read_sigproc_string(stream: BinaryIO) -> str:
n = read_i32(stream)
if n < 0 or n > 10_000_000:
raise ValueError(f"Unreasonable string length {n}")
return read_exact(stream, n).decode("ascii", errors="replace")
def hexdump_c(data: bytes, width: int = 16) -> str:
out = []
for off in range(0, len(data), width):
chunk = data[off:off + width]
hexpart = " ".join(f"{x:02x}" for x in chunk).ljust(width * 3 - 1)
asciipart = "".join(chr(x) if 32 <= x <= 126 else "." for x in chunk)
out.append(f"{off:08x} {hexpart} |{asciipart}|")
return "\n".join(out)
def parseheader(stream: BinaryIO) -> Dict[str, Any]:
header: Dict[str, Any] = {}
start = read_sigproc_string(stream)
if start != "HEADER_START":
raise ValueError(f"Expected HEADER_START, got {start!r}")
while True:
key = read_sigproc_string(stream)
if key == "HEADER_END":
return header
if key in STRING_VALUE_KEYS:
header[key] = read_sigproc_string(stream)
elif key in INT_VALUE_KEYS:
header[key] = read_i32(stream)
elif key in DOUBLE_VALUE_KEYS:
header[key] = read_f64(stream)
else:
raise KeyError(
f"Unknown key {key!r}. Add it to STRING_VALUE_KEYS / INT_VALUE_KEYS / DOUBLE_VALUE_KEYS."
)
def valid_header(path: str, nbytes: int = 512) -> bool:
proc = subprocess.Popen(
["zstd", "-dc", path],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
assert proc.stdout is not None
try:
data = proc.stdout.read(nbytes)
return b"HEADER_START" in data
finally:
proc.kill()
def zstheader(path: str) -> Dict[str, Any]:
"""
Read and return the SIGPROC header from a zstd-compressed filterbank file.
Raises on corruption.
"""
if not valid_header(path):
raise ValueError("HEADER_START not found in first 512 bytes")
proc = subprocess.Popen(["zstd", "-dc", path], stdout=subprocess.PIPE)
assert proc.stdout is not None
try:
return parseheader(proc.stdout)
finally:
proc.kill()
def main():
ap = argparse.ArgumentParser(description="Parse PRESTO/SIGPROC filterbank header from .fil.zst")
ap.add_argument("path", help="Input file")
ap.add_argument("--dump512", action="store_true", help="Dump first 512 bytes of file in hex")
args = ap.parse_args()
proc = subprocess.Popen(["zstd", "-dc", args.path], stdout=subprocess.PIPE)
assert proc.stdout is not None
try:
header = parseheader(proc.stdout)
except (EOFError, ValueError, KeyError, struct.error) as e:
print(f"{args.path}: corrupt header, skipping file")
proc.kill()
return
proc.kill()
print(json.dumps(header, indent=2, sort_keys=True))
if __name__ == "__main__":
main()