-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmg_create_arrow_db_file.py
More file actions
162 lines (126 loc) Β· 5.78 KB
/
mg_create_arrow_db_file.py
File metadata and controls
162 lines (126 loc) Β· 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
"""
Convert a big CSV (timestamp + float columns) to an uncompressed Arrow IPC file.
Optimized for MAXIMUM READ SPEED with memory-mapping support.
Usage:
python csv_to_arrow.py /path/to/file.csv [--out /path/out.arrow]
"""
# =============================================================================
# CONFIGURATION - CHANGE THESE PATHS AS NEEDED
# =============================================================================
INPUT_PATH = "datasets/refined/unified_1_prep/1m/unified_dataset.csv" # Change this to your CSV file
OUTPUT_PATH = "datasets/arrow_1m/unified_1m_dataset.arrow" # Change this to your desired output
# =============================================================================
import argparse
import os
import polars as pl
import time
from pathlib import Path
def convert(csv_file: str, out_file: str | None = None) -> str:
"""Convert CSV to uncompressed Arrow for maximum streaming speed"""
if out_file is None:
out_file = os.path.splitext(csv_file)[0] + ".arrow"
print(f"π Converting {csv_file} for maximum streaming speed...")
start_time = time.time()
# Load CSV optimized for speed, not compression
df = pl.read_csv(
csv_file,
try_parse_dates=True, # Automatically parses timestamp columns
low_memory=False, # Keep Float64 for maximum speed (no f32 fallback)
infer_schema_length=10000 # Better type inference for large files
)
load_time = time.time() - start_time
print(f"π Loaded shape: {df.shape} in {load_time:.2f}s")
# Write uncompressed Arrow for memory-mapping
write_start = time.time()
df.write_ipc(
out_file,
compression=None # CRITICAL: No compression for memory-mapping support
)
write_time = time.time() - write_start
# File size info
input_size = os.path.getsize(csv_file) / (1024 * 1024) # MB
output_size = os.path.getsize(out_file) / (1024 * 1024) # MB
total_time = time.time() - start_time
print(f"β
{csv_file} β {out_file}")
print(f"π CSV: {input_size:.1f}MB β Arrow: {output_size:.1f}MB")
print(f"β±οΈ Total: {total_time:.2f}s (load: {load_time:.2f}s, write: {write_time:.2f}s)")
print(f"π₯ Ready for memory-mapped streaming with:")
print(f" lazy = pl.scan_ipc('{out_file}', memory_map=True)")
return out_file
def batch_convert(input_dir: str, output_dir: str = None):
"""Convert multiple CSV files to streaming-optimized Arrow format"""
input_path = Path(input_dir)
if not input_path.exists():
print(f"β Input directory does not exist: {input_dir}")
return
# Set up output directory
if output_dir is None:
output_path = input_path / "arrow_streaming"
else:
output_path = Path(output_dir)
output_path.mkdir(exist_ok=True)
print(f"π Output directory: {output_path}")
# Find all CSV files
csv_files = list(input_path.glob("*.csv"))
if not csv_files:
print(f"β οΈ No CSV files found in {input_dir}")
return
print(f"π Converting {len(csv_files)} CSV files for streaming...")
total_start = time.time()
successful = 0
for csv_file in csv_files:
arrow_file = output_path / f"{csv_file.stem}.arrow"
try:
convert(str(csv_file), str(arrow_file))
successful += 1
print(f"β
{csv_file.name} converted")
except Exception as e:
print(f"β Failed to convert {csv_file.name}: {e}")
total_time = time.time() - total_start
print(f"\nπ Batch conversion complete!")
print(f"β
Successfully converted: {successful}/{len(csv_files)} files")
print(f"β±οΈ Total time: {total_time:.2f}s")
print(f"\nπ₯ All files ready for streaming with memory-mapping!")
def verify_streaming_performance(arrow_file: str):
"""Test the streaming performance of the Arrow file"""
print(f"\nπ§ͺ Testing streaming performance: {arrow_file}")
# Test memory-mapped lazy loading
start_time = time.time()
lazy_df = pl.scan_ipc(arrow_file, memory_map=True)
lazy_time = time.time() - start_time
# Test a sample query
query_start = time.time()
sample = lazy_df.head(1000).collect()
query_time = time.time() - query_start
print(f"β‘ Lazy loading: {lazy_time * 1000:.2f}ms")
print(f"β‘ Sample query (1000 rows): {query_time * 1000:.2f}ms")
print(f"π Shape: {sample.shape}")
print(f"π― Memory-mapped streaming: READY!")
if __name__ == "__main__":
# Quick conversion using paths at top of script
if len(os.sys.argv) == 1: # No command line arguments
print("π₯ Using default paths from script configuration...")
if os.path.exists(INPUT_PATH):
output_file = convert(INPUT_PATH, OUTPUT_PATH)
verify_streaming_performance(output_file)
else:
print(f"β Input file not found: {INPUT_PATH}")
print("π‘ Update INPUT_PATH at the top of this script")
else:
# Command line interface
ap = argparse.ArgumentParser(description="Convert CSV to streaming-optimized Arrow")
ap.add_argument("csv", help="Input CSV file path")
ap.add_argument("--out", help="Output .arrow file path (optional)")
ap.add_argument("--batch", action="store_true", help="Treat input as directory for batch conversion")
ap.add_argument("--test", action="store_true", help="Test streaming performance after conversion")
args = ap.parse_args()
if args.batch:
# Batch convert directory
output_dir = args.out if args.out else None
batch_convert(args.csv, output_dir)
else:
# Single file conversion
output_file = convert(args.csv, args.out)
if args.test:
verify_streaming_performance(output_file)