-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtool_identify_bad_data.py
More file actions
440 lines (374 loc) Β· 17 KB
/
tool_identify_bad_data.py
File metadata and controls
440 lines (374 loc) Β· 17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
import os
import polars as pl
import pandas as pd
from datetime import timedelta, datetime
import numpy as np
# Base directory for raw data files
datasets_dir = "datasets/raw/full_scope"
# Map each interval folder to timedelta for accurate frequency
INTERVALS = {
'1m': timedelta(minutes=1),
'3m': timedelta(minutes=3),
'5m': timedelta(minutes=5),
'15m': timedelta(minutes=15),
'30m': timedelta(minutes=30),
'1h': timedelta(hours=1),
'2h': timedelta(hours=2),
'4h': timedelta(hours=4),
'6h': timedelta(hours=6),
'8h': timedelta(hours=8),
'12h': timedelta(hours=12),
}
# Expected columns for data quality checks
EXPECTED_COLUMNS = [
'timestamp', 'open', 'high', 'low', 'close', 'volume',
'close_time', 'quote_asset_volume', 'number_of_trades',
'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume', 'ignore'
]
# Price/volume columns that should be numeric and positive
PRICE_COLUMNS = ['open', 'high', 'low', 'close']
VOLUME_COLUMNS = ['volume', 'quote_asset_volume', 'taker_buy_base_asset_volume', 'taker_buy_quote_asset_volume']
NUMERIC_COLUMNS = PRICE_COLUMNS + VOLUME_COLUMNS + ['close_time', 'number_of_trades']
def analyze_data_quality(path: str, interval: str, freq: timedelta) -> dict:
"""
Comprehensive analysis of CSV file for missing data and data quality issues.
Returns dict with detailed statistics.
"""
try:
# Load CSV
df = pl.read_csv(path)
# Check basic structure
if 'timestamp' not in df.columns:
return {
'file': os.path.basename(path),
'interval': interval,
'error': 'Missing timestamp column',
'missing_count': 0,
'total_expected': 0,
'actual_count': len(df),
'data_quality': {}
}
initial_row_count = len(df)
# Initialize data quality report
quality_report = {
'initial_rows': initial_row_count,
'missing_columns': [],
'null_counts': {},
'infinite_counts': {},
'negative_prices': 0,
'zero_volumes': 0,
'invalid_ohlc': 0, # high < low, etc.
'duplicate_timestamps': 0,
'unparseable_timestamps': 0,
'data_type_issues': {}
}
# Check for missing expected columns
missing_cols = [col for col in EXPECTED_COLUMNS if col not in df.columns]
quality_report['missing_columns'] = missing_cols
# Parse timestamps - handle both string and datetime formats
original_ts_count = len(df)
try:
# Try parsing as space-separated format first (2019-01-01 06:00:00)
df = df.with_columns([
pl.col('timestamp')
.str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False)
.alias('timestamp')
])
except:
try:
# Try parsing as ISO format (2019-01-01T06:00:00.000000)
df = df.with_columns([
pl.col('timestamp')
.str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S%.f", strict=False)
.alias('timestamp')
])
except:
try:
# Try direct datetime conversion as fallback
df = df.with_columns([
pl.col('timestamp').str.to_datetime().alias('timestamp')
])
except:
# Last resort: try to parse as any datetime format
df = df.with_columns([
pl.col('timestamp').cast(pl.Datetime).alias('timestamp')
])
# Count unparseable timestamps
df_with_valid_ts = df.filter(pl.col('timestamp').is_not_null())
quality_report['unparseable_timestamps'] = original_ts_count - len(df_with_valid_ts)
df = df_with_valid_ts
if len(df) == 0:
quality_report['unparseable_timestamps'] = original_ts_count
return {
'file': os.path.basename(path),
'interval': interval,
'error': 'No valid timestamps found',
'missing_count': 0,
'total_expected': 0,
'actual_count': 0,
'data_quality': quality_report
}
# Check for duplicate timestamps
initial_count = len(df)
df_unique = df.unique(subset=['timestamp'])
quality_report['duplicate_timestamps'] = initial_count - len(df_unique)
df = df_unique.sort('timestamp')
# Analyze each numeric column for data quality issues
for col in NUMERIC_COLUMNS:
if col not in df.columns:
continue
# Convert to numeric, tracking conversion issues
try:
original_count = len(df)
df = df.with_columns([
pl.col(col).cast(pl.Float64, strict=False).alias(col)
])
# Count nulls after conversion
null_count = df.select(pl.col(col).is_null().sum()).item()
quality_report['null_counts'][col] = null_count
# Count infinite values
if null_count < len(df): # Only check if there are non-null values
inf_count = df.select(pl.col(col).is_infinite().sum()).item()
quality_report['infinite_counts'][col] = inf_count
except Exception as e:
quality_report['data_type_issues'][col] = str(e)
# Check for negative prices
for col in PRICE_COLUMNS:
if col in df.columns:
try:
negative_count = df.select((pl.col(col) < 0).sum()).item()
if negative_count > 0:
quality_report['negative_prices'] += negative_count
except:
pass
# Check for zero volumes
for col in VOLUME_COLUMNS:
if col in df.columns:
try:
zero_count = df.select((pl.col(col) == 0).sum()).item()
if zero_count > 0:
quality_report['zero_volumes'] += zero_count
except:
pass
# Check for invalid OHLC relationships (high < low, close not between high/low, etc.)
if all(col in df.columns for col in ['open', 'high', 'low', 'close']):
try:
invalid_ohlc = df.select([
# High should be >= Low
(pl.col('high') < pl.col('low')).alias('high_lt_low'),
# High should be >= Open
(pl.col('high') < pl.col('open')).alias('high_lt_open'),
# High should be >= Close
(pl.col('high') < pl.col('close')).alias('high_lt_close'),
# Low should be <= Open
(pl.col('low') > pl.col('open')).alias('low_gt_open'),
# Low should be <= Close
(pl.col('low') > pl.col('close')).alias('low_gt_close'),
]).sum_horizontal().sum().item()
quality_report['invalid_ohlc'] = invalid_ohlc
except:
pass
# Calculate missing timestamp analysis
if len(df) >= 2:
# Get min and max timestamps
ts_min = df.select(pl.col('timestamp').min()).item()
ts_max = df.select(pl.col('timestamp').max()).item()
# Convert Polars datetime to Python datetime if needed
if hasattr(ts_min, 'to_py'):
start_dt = ts_min.to_py()
else:
start_dt = ts_min
if hasattr(ts_max, 'to_py'):
end_dt = ts_max.to_py()
else:
end_dt = ts_max
# Ensure we have proper datetime objects
if not isinstance(start_dt, datetime):
start_dt = pd.to_datetime(start_dt).to_pydatetime()
if not isinstance(end_dt, datetime):
end_dt = pd.to_datetime(end_dt).to_pydatetime()
# Convert timedelta to pandas frequency string
freq_map = {
timedelta(minutes=1): "1min",
timedelta(minutes=3): "3min",
timedelta(minutes=5): "5min",
timedelta(minutes=15): "15min",
timedelta(minutes=30): "30min",
timedelta(hours=1): "1h",
timedelta(hours=2): "2h",
timedelta(hours=4): "4h",
timedelta(hours=6): "6h",
timedelta(hours=8): "8h",
timedelta(hours=12): "12h"
}
pandas_freq = freq_map.get(freq)
if not pandas_freq:
raise ValueError(f"Unsupported frequency: {freq}")
# Generate full expected timeline
full_range = pd.date_range(start=start_dt, end=end_dt, freq=pandas_freq)
expected_count = len(full_range)
actual_count = len(df)
missing_count = expected_count - actual_count
return {
'file': os.path.basename(path),
'interval': interval,
'missing_count': missing_count,
'total_expected': expected_count,
'actual_count': actual_count,
'missing_percentage': round((missing_count / expected_count) * 100, 2) if expected_count > 0 else 0,
'start_date': start_dt.strftime('%Y-%m-%d %H:%M:%S'),
'end_date': end_dt.strftime('%Y-%m-%d %H:%M:%S'),
'error': None,
'data_quality': quality_report
}
else:
return {
'file': os.path.basename(path),
'interval': interval,
'missing_count': 0,
'total_expected': 0,
'actual_count': len(df),
'missing_percentage': 0,
'start_date': None,
'end_date': None,
'error': 'Insufficient data for timeline analysis',
'data_quality': quality_report
}
except Exception as e:
return {
'file': os.path.basename(path),
'interval': interval,
'error': str(e),
'missing_count': 0,
'total_expected': 0,
'actual_count': 0,
'data_quality': {}
}
def format_data_quality_summary(quality_report: dict) -> str:
"""Format data quality report into a readable string."""
if not quality_report:
return ""
issues = []
# Check for various data quality issues
if quality_report.get('missing_columns'):
issues.append(f"Missing cols: {', '.join(quality_report['missing_columns'])}")
if quality_report.get('unparseable_timestamps', 0) > 0:
issues.append(f"Bad timestamps: {quality_report['unparseable_timestamps']}")
if quality_report.get('duplicate_timestamps', 0) > 0:
issues.append(f"Duplicate timestamps: {quality_report['duplicate_timestamps']}")
# Null counts
null_counts = quality_report.get('null_counts', {})
major_nulls = {k: v for k, v in null_counts.items() if v > 0}
if major_nulls:
null_summary = ', '.join([f"{k}:{v}" for k, v in major_nulls.items()])
issues.append(f"Nulls: {null_summary}")
# Infinite values
inf_counts = quality_report.get('infinite_counts', {})
major_infs = {k: v for k, v in inf_counts.items() if v > 0}
if major_infs:
inf_summary = ', '.join([f"{k}:{v}" for k, v in major_infs.items()])
issues.append(f"Infinite: {inf_summary}")
if quality_report.get('negative_prices', 0) > 0:
issues.append(f"Negative prices: {quality_report['negative_prices']}")
if quality_report.get('zero_volumes', 0) > 0:
issues.append(f"Zero volumes: {quality_report['zero_volumes']}")
if quality_report.get('invalid_ohlc', 0) > 0:
issues.append(f"Invalid OHLC: {quality_report['invalid_ohlc']}")
if quality_report.get('data_type_issues'):
issues.append(f"Type issues: {len(quality_report['data_type_issues'])} cols")
return " | ".join(issues) if issues else "Clean"
def main():
all_results = []
problem_assets = []
data_quality_issues = []
print("π Analyzing missing data and data quality across all intervals...\n")
for interval, freq in INTERVALS.items():
folder = os.path.join(datasets_dir, interval)
if not os.path.isdir(folder):
print(f"β οΈ Folder not found: {folder}")
continue
print(f"--- Analyzing interval: {interval} ---")
interval_results = []
for fname in sorted(os.listdir(folder)):
if not fname.endswith('_historical_data.csv'):
continue
file_path = os.path.join(folder, fname)
result = analyze_data_quality(file_path, interval, freq)
all_results.append(result)
interval_results.append(result)
# Format data quality summary
quality_summary = format_data_quality_summary(result.get('data_quality', {}))
# Check if this asset has significant missing data
if result['missing_count'] > 500:
asset_name = fname.replace('_historical_data.csv', '').replace(f'_{interval}', '')
problem_assets.append({
'asset': asset_name,
'interval': interval,
'missing_count': result['missing_count'],
'missing_percentage': result['missing_percentage'],
'file': result['file'],
'quality_issues': quality_summary
})
print(
f"β {fname}: {result['missing_count']} missing ({result['missing_percentage']}%) | {quality_summary}")
elif result['error']:
print(f"π₯ {fname}: ERROR - {result['error']}")
elif quality_summary != "Clean":
data_quality_issues.append({
'file': fname,
'interval': interval,
'quality_issues': quality_summary
})
print(
f"β οΈ {fname}: {result['missing_count']} missing ({result['missing_percentage']}%) | {quality_summary}")
else:
print(f"β
{fname}: {result['missing_count']} missing ({result['missing_percentage']}%) | Clean")
# Summary for this interval
total_files = len(interval_results)
files_with_errors = sum(1 for r in interval_results if r['error'])
files_with_missing_500plus = sum(1 for r in interval_results if r['missing_count'] > 500)
files_with_quality_issues = sum(1 for r in interval_results
if format_data_quality_summary(r.get('data_quality', {})) != "Clean")
print(f" π Summary: {total_files} files, {files_with_errors} errors, "
f"{files_with_missing_500plus} with 500+ missing, {files_with_quality_issues} with data quality issues\n")
# Final summary
print("=" * 100)
print("π¨ ASSETS WITH MORE THAN 500 MISSING ENTRIES:")
print("=" * 100)
if not problem_assets:
print("β
No assets found with more than 500 missing entries!")
else:
# Group by asset to see all intervals affected
asset_groups = {}
for item in problem_assets:
asset = item['asset']
if asset not in asset_groups:
asset_groups[asset] = []
asset_groups[asset].append(item)
for asset, issues in asset_groups.items():
print(f"\nπ΄ {asset}:")
for issue in sorted(issues, key=lambda x: x['interval']):
print(
f" {issue['interval']}: {issue['missing_count']} missing ({issue['missing_percentage']}%) | {issue['quality_issues']}")
# Data quality issues summary
if data_quality_issues:
print("\n" + "=" * 100)
print("β οΈ DATA QUALITY ISSUES (files with <500 missing but other problems):")
print("=" * 100)
for issue in data_quality_issues[:20]: # Show first 20
print(f" {issue['file']} ({issue['interval']}): {issue['quality_issues']}")
if len(data_quality_issues) > 20:
print(f" ... and {len(data_quality_issues) - 20} more files with quality issues")
# Overall stats
total_problem_files = len(problem_assets)
unique_problem_assets = len(set(item['asset'] for item in problem_assets))
total_quality_issues = len(data_quality_issues)
print(f"\nπ OVERALL STATISTICS:")
print(f" β’ Total files analyzed: {len(all_results)}")
print(f" β’ Files with 500+ missing: {total_problem_files}")
print(f" β’ Files with data quality issues: {total_quality_issues}")
print(f" β’ Unique assets affected (missing): {unique_problem_assets}")
if problem_assets:
print(f" β’ Most problematic intervals: {', '.join(set(item['interval'] for item in problem_assets))}")
if __name__ == '__main__':
main()