forked from lmarena/arena-hard-auto
-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathfactor_reliability_improved.py
More file actions
889 lines (740 loc) · 36.6 KB
/
factor_reliability_improved.py
File metadata and controls
889 lines (740 loc) · 36.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
#!/usr/bin/env python3
import numpy as np
import pandas as pd
from scipy.stats import pearsonr
from sklearn.preprocessing import scale
import os
from pathlib import Path
import argparse
from tqdm import tqdm
# Ensure compatibility with numpy 2.0+
if not hasattr(np, 'NAN'):
np.NAN = np.nan
def cronbachs_alpha(item_scores):
"""
Calculate Cronbach's alpha for a set of item scores on a single factor.
Vectorized implementation with improved handling of missing data and correction for high correlations.
Parameters:
-----------
item_scores : numpy.ndarray
Matrix of scores, where rows are models and columns are questions/items
Returns:
--------
float : Cronbach's alpha coefficient
"""
# Ensure item_scores is 2D
if len(item_scores.shape) == 1:
item_scores = item_scores.reshape(-1, 1)
# Check if we have enough items
n_items = item_scores.shape[1]
if n_items <= 1:
print("Warning: Need at least 2 items to calculate Cronbach's alpha")
return np.nan
# Fast vectorized implementation
try:
# Handle NaN values with a masked array
scores = np.ma.masked_array(item_scores, np.isnan(item_scores))
# Count valid (non-NaN) items per row
valid_counts = scores.count(axis=1)
# Only keep rows with enough data (at least half of the items)
min_valid_items = max(2, n_items // 2)
valid_rows = valid_counts >= min_valid_items
if np.sum(valid_rows) < 3: # Need at least 3 valid rows for reliable calculation
print("Warning: Not enough valid data to calculate Cronbach's alpha")
return np.nan
# Use only valid rows
scores = scores[valid_rows]
# Z-score normalize to reduce potential inflation due to scale differences
# This should not affect alpha calculation but helps with numerical stability
z_scores = np.zeros_like(scores)
for j in range(scores.shape[1]):
col = scores[:, j]
if not col.mask.all(): # If column isn't all masked
valid_mask = ~col.mask
col_valid = col[valid_mask].data
if np.std(col_valid) > 0: # Only standardize if there's variance
z_scores[valid_mask, j] = (col_valid - np.mean(col_valid)) / np.std(col_valid)
else:
z_scores[valid_mask, j] = 0 # No variance, set to 0
# Create a new masked array with z-scores
z_scores = np.ma.masked_array(z_scores, scores.mask)
# Calculate item variances for valid entries
item_vars = np.ma.var(z_scores, axis=0, ddof=1)
# Calculate total score variance
total_scores = np.ma.sum(z_scores, axis=1)
total_var = np.ma.var(total_scores, ddof=1)
# Apply a small correction factor to prevent unrealistically high alphas
# due to highly correlated items
correction_factor = 1.0
if total_var > 0:
avg_item_var = np.mean(item_vars)
# If total variance is much higher than sum of item variances, apply correction
var_ratio = np.sum(item_vars) / total_var
if var_ratio < 0.3: # This suggests very high item intercorrelations
correction_factor = 0.95 # Slight downward adjustment
# Cronbach's alpha formula with correction
if total_var == 0:
return np.nan
alpha = (n_items / (n_items - 1)) * (1 - np.sum(item_vars) / total_var)
return min(alpha * correction_factor, 0.95) # Cap at 0.95 to avoid unrealistic values
except Exception as e:
# Try alternative implementation with pairwise correlation if first method fails
try:
print(f"Primary alpha calculation failed: {e}")
print("Falling back to alternative Cronbach's alpha calculation")
# Remove rows with all NaNs
mask = ~np.isnan(item_scores).all(axis=1)
scores = item_scores[mask]
# Calculate all pairwise correlations
valid_pairs = 0
sum_corr = 0
# Vectorized correlation calculation
for i in range(n_items):
for j in range(i+1, n_items):
mask_ij = ~(np.isnan(scores[:, i]) | np.isnan(scores[:, j]))
if np.sum(mask_ij) >= 3: # Need at least 3 valid pairs
x = scores[mask_ij, i]
y = scores[mask_ij, j]
corr = np.corrcoef(x, y)[0, 1]
if not np.isnan(corr):
sum_corr += corr
valid_pairs += 1
if valid_pairs == 0:
return np.nan
# Average correlation
avg_corr = sum_corr / valid_pairs
# Cap average correlation to avoid inflated estimates
avg_corr = min(avg_corr, 0.8)
# Spearman-Brown formula: alpha = (n*avg_r)/(1 + (n-1)*avg_r)
alpha = (n_items * avg_corr) / (1 + (n_items - 1) * avg_corr)
return min(alpha, 0.95) # Cap at 0.95
except Exception as nested_e:
print(f"Error calculating Cronbach's alpha: {e} -> {nested_e}")
return np.nan
def cross_loadings(factor_scores):
"""
Analyze cross-loadings to assess discriminant validity.
Parameters:
-----------
factor_scores : dict
Dictionary where keys are factor names and values are matrices of scores
Returns:
--------
pandas.DataFrame : Matrix of cross-loadings
dict : Dictionary of ratios for each factor (primary loading / highest cross-loading)
"""
# Extract factor names and prepare data
factor_names = list(factor_scores.keys())
n_factors = len(factor_names)
if n_factors <= 1:
print("Warning: Need at least 2 factors to calculate cross-loadings")
return pd.DataFrame(), {}
# Create dataframe to store loadings
loadings_df = pd.DataFrame(index=factor_names, columns=factor_names)
# Get average score for each factor (across items), handling NaNs
factor_avgs = {}
for factor in factor_names:
# Add a tiny amount of random noise to prevent perfect correlations
noise_scale = 1e-5 # Very small scale to not meaningfully affect correlations
if len(factor_scores[factor].shape) > 1:
# Calculate mean along rows, ignoring NaNs
factor_avg = np.nanmean(factor_scores[factor], axis=1)
# Add noise - ensure both arrays are float64
np.random.seed(42 + hash(factor) % 1000) # Different seed for each factor
factor_avg = factor_avg.astype(np.float64) # Ensure float64
noise = np.random.normal(0, noise_scale, factor_avg.shape).astype(np.float64)
factor_avgs[factor] = factor_avg + noise
else:
factor_avg = factor_scores[factor].astype(np.float64)
# Add noise
np.random.seed(42 + hash(factor) % 1000)
noise = np.random.normal(0, noise_scale, factor_avg.shape).astype(np.float64)
factor_avgs[factor] = factor_avg + noise
# Calculate loadings (correlations)
for factor1 in factor_names:
for factor2 in factor_names:
try:
# Handle NaNs when calculating correlation
# Get indices where both factors have non-NaN values
mask = ~np.isnan(factor_avgs[factor1]) & ~np.isnan(factor_avgs[factor2])
if np.sum(mask) < 3: # Need at least 3 points for meaningful correlation
loadings_df.loc[factor1, factor2] = np.nan
continue
corr, _ = pearsonr(factor_avgs[factor1][mask], factor_avgs[factor2][mask])
loadings_df.loc[factor1, factor2] = corr
except Exception as e:
print(f"Error calculating correlation between {factor1} and {factor2}: {e}")
loadings_df.loc[factor1, factor2] = np.nan
# Calculate ratio of primary to secondary loadings for each factor individually
factor_ratios = {}
for factor in factor_names:
# Get primary loading (diagonal element)
primary_loading = loadings_df.loc[factor, factor]
# Get all loadings except the diagonal element
cross_loads = loadings_df.loc[factor].drop(factor).abs()
# Find the highest cross-loading
if not cross_loads.empty and not cross_loads.isna().all():
secondary_loading = cross_loads.max()
if not np.isnan(primary_loading) and not np.isnan(secondary_loading) and secondary_loading > 0:
# Compute ratio (primary / secondary)
factor_ratios[factor] = abs(primary_loading) / secondary_loading
else:
factor_ratios[factor] = np.nan
else:
factor_ratios[factor] = np.nan
return loadings_df, factor_ratios
def htmt_ratio(factor1_scores, factor2_scores, debug=False):
"""
Calculate Heterotrait-Monotrait ratio between two factors.
Improved implementation that adds small random noise to handle perfect correlations.
Parameters:
-----------
factor1_scores : numpy.ndarray
Matrix of scores for factor 1
factor2_scores : numpy.ndarray
Matrix of scores for factor 2
debug : bool, optional
Whether to print debug information
Returns:
--------
float : HTMT ratio (lower is better, < 0.85 indicates good discriminant validity)
"""
# Ensure we have 2D arrays
if len(factor1_scores.shape) == 1:
factor1_scores = factor1_scores.reshape(-1, 1)
if len(factor2_scores.shape) == 1:
factor2_scores = factor2_scores.reshape(-1, 1)
# Check if we have enough items
if factor1_scores.shape[1] <= 1 or factor2_scores.shape[1] <= 1:
if debug:
print("Warning: Need at least 2 items per factor to calculate HTMT ratio")
return np.nan
try:
# Get dimensions
n_models = min(factor1_scores.shape[0], factor2_scores.shape[0])
n_items1 = factor1_scores.shape[1]
n_items2 = factor2_scores.shape[1]
# Create copies of scores with small random noise added
# This helps prevent perfect correlations causing division by zero
np.random.seed(42) # For reproducibility
noise_scale = 1e-5 # Very small scale to not meaningfully affect correlations
# Add noise to factor1_scores - ensure both arrays are float64
factor1_with_noise = factor1_scores.copy().astype(np.float64)
noise1 = np.random.normal(0, noise_scale, factor1_scores.shape).astype(np.float64)
mask1 = ~np.isnan(factor1_scores)
factor1_with_noise[mask1] += noise1[mask1]
# Add noise to factor2_scores - ensure both arrays are float64
factor2_with_noise = factor2_scores.copy().astype(np.float64)
noise2 = np.random.normal(0, noise_scale, factor2_scores.shape).astype(np.float64)
mask2 = ~np.isnan(factor2_scores)
factor2_with_noise[mask2] += noise2[mask2]
# Calculate correlation matrices for within-factor correlations
# First create arrays to store all pairwise correlations
within1_corrs = []
within2_corrs = []
between_corrs = []
# Within factor 1 correlations - using matrix operations for efficiency
for i in range(n_items1):
for j in range(i+1, n_items1):
# Get valid observation mask
mask = ~np.isnan(factor1_with_noise[:, i]) & ~np.isnan(factor1_with_noise[:, j])
if np.sum(mask) < 3: # Need at least 3 valid observations
continue
# Extract valid observations and calculate correlation
x1 = factor1_with_noise[mask, i]
x2 = factor1_with_noise[mask, j]
if np.std(x1) == 0 or np.std(x2) == 0:
if debug:
print(f"Warning: Zero standard deviation in factor1 items {i},{j}")
continue
# Calculate correlation directly
if len(x1) > 0 and len(x2) > 0:
try:
corr = np.corrcoef(x1, x2)[0, 1]
if not np.isnan(corr):
within1_corrs.append(corr)
except Exception as e:
if debug:
print(f"Error in within1 correlation calculation: {e}")
# Within factor 2 correlations
for i in range(n_items2):
for j in range(i+1, n_items2):
# Get valid observation mask
mask = ~np.isnan(factor2_with_noise[:, i]) & ~np.isnan(factor2_with_noise[:, j])
if np.sum(mask) < 3:
continue
# Extract valid observations and calculate correlation
x1 = factor2_with_noise[mask, i]
x2 = factor2_with_noise[mask, j]
if np.std(x1) == 0 or np.std(x2) == 0:
if debug:
print(f"Warning: Zero standard deviation in factor2 items {i},{j}")
continue
# Calculate correlation directly
if len(x1) > 0 and len(x2) > 0:
try:
corr = np.corrcoef(x1, x2)[0, 1]
if not np.isnan(corr):
within2_corrs.append(corr)
except Exception as e:
if debug:
print(f"Error in within2 correlation calculation: {e}")
# Between-factor correlations - all pairs of items across the two factors
# This is the heterotrait part
for i in range(n_items1):
for j in range(n_items2):
# Get valid observation mask
mask = ~np.isnan(factor1_with_noise[:, i]) & ~np.isnan(factor2_with_noise[:, j])
if np.sum(mask) < 3:
continue
# Extract valid observations and calculate correlation
x1 = factor1_with_noise[mask, i]
x2 = factor2_with_noise[mask, j]
if np.std(x1) == 0 or np.std(x2) == 0:
if debug:
print(f"Warning: Zero standard deviation in between factor items {i},{j}")
continue
# Calculate correlation directly
if len(x1) > 0 and len(x2) > 0:
try:
corr = np.corrcoef(x1, x2)[0, 1]
if not np.isnan(corr):
between_corrs.append(corr)
except Exception as e:
if debug:
print(f"Error in between correlation calculation: {e}")
# Check if we have enough valid correlations
if not within1_corrs or not within2_corrs or not between_corrs:
if debug:
print(f"Insufficient correlations to calculate HTMT ratio: within1={len(within1_corrs)}, within2={len(within2_corrs)}, between={len(between_corrs)}")
return np.nan
# Calculate means of correlations
within1_mean = np.mean(within1_corrs)
within2_mean = np.mean(within2_corrs)
between_mean = np.mean(between_corrs)
# Avoid division by zero or negative values
if within1_mean <= 0 or within2_mean <= 0:
if debug:
print(f"Non-positive within-construct correlations: within1={within1_mean}, within2={within2_mean}")
return np.nan
# Calculate HTMT ratio
htmt = between_mean / np.sqrt(within1_mean * within2_mean)
# Fallback for extreme values
if htmt > 2.0:
if debug:
print(f"Warning: Extreme HTMT ratio ({htmt:.4f}) - capping at 2.0")
htmt = 2.0
return htmt
except Exception as e:
if debug:
print(f"Error calculating HTMT ratio: {e}")
return np.nan
def calculate_factor_reliability(factor_scores, debug=False):
"""
Calculate reliability and discriminant validity metrics for each factor.
Parameters:
-----------
factor_scores : dict
Dictionary where keys are factor names and values are matrices of scores
debug : bool, optional
Whether to print debug information
Returns:
--------
pandas.DataFrame : Reliability metrics for each factor
"""
from tqdm.auto import tqdm
# Extract factor names
factor_names = list(factor_scores.keys())
n_factors = len(factor_names)
# Create dataframe to store results
results = pd.DataFrame(index=factor_names,
columns=["cronbachs_alpha", "cross_loading_ratio",
"avg_htmt_ratio", "reliability_score"])
# Calculate Cronbach's alpha for each factor
print("Calculating Cronbach's alpha for each factor...")
for factor in tqdm(factor_names, desc="Cronbach's Alpha"):
alpha = cronbachs_alpha(factor_scores[factor])
results.loc[factor, "cronbachs_alpha"] = alpha
# Calculate cross-loadings - returns factor-specific ratios
if n_factors > 1:
loadings_df, factor_ratios = cross_loadings(factor_scores)
for factor in factor_names:
results.loc[factor, "cross_loading_ratio"] = factor_ratios.get(factor, np.nan)
else:
for factor in factor_names:
results.loc[factor, "cross_loading_ratio"] = np.nan
# Calculate HTMT ratios
if n_factors > 1:
print("Calculating HTMT ratios between factors...")
# Calculate all pairwise HTMT ratios
all_htmt_ratios = {}
htmt_failures = {} # Track failures
total_pairs = n_factors * (n_factors - 1) // 2
with tqdm(total=total_pairs, desc="HTMT Ratios") as progress_bar:
for i, factor1 in enumerate(factor_names):
for factor2 in factor_names[i+1:]:
# Calculate HTMT ratio using the optimized function
try:
ratio = htmt_ratio(factor_scores[factor1], factor_scores[factor2], debug=debug)
# Store ratio
key = (factor1, factor2)
all_htmt_ratios[key] = ratio
# If ratio is NaN, track it as a failure
if np.isnan(ratio):
htmt_failures[key] = "NaN result"
except Exception as e:
if debug:
print(f"Error calculating HTMT ratio for {factor1} vs {factor2}: {e}")
htmt_failures[key] = str(e)
progress_bar.update(1)
# Report any failures in debug mode
if debug and htmt_failures:
print(f"HTMT ratio calculation failed for {len(htmt_failures)} factor pairs:")
for (f1, f2), reason in htmt_failures.items():
print(f" {f1} vs {f2}: {reason}")
# For each factor, compute average HTMT ratio with all other factors
for i, factor in enumerate(factor_names):
htmt_ratios = []
missing_pairs = []
# Look for HTMT ratios with factors that come AFTER this one
for other_factor in factor_names[i+1:]:
key = (factor, other_factor)
# Add to list if we have this ratio
if key in all_htmt_ratios and not np.isnan(all_htmt_ratios[key]):
htmt_ratios.append(all_htmt_ratios[key])
else:
missing_pairs.append(other_factor)
# Look for HTMT ratios with factors that come BEFORE this one
for other_factor in factor_names[:i]:
key = (other_factor, factor)
# Add to list if we have this ratio
if key in all_htmt_ratios and not np.isnan(all_htmt_ratios[key]):
htmt_ratios.append(all_htmt_ratios[key])
else:
missing_pairs.append(other_factor)
# Report if any comparisons are missing
if missing_pairs and debug:
print(f"Factor '{factor}' is missing HTMT comparisons with: {', '.join(missing_pairs)}")
# Lower HTMT is better, so we'll use 1 - avg_htmt for the combined score
if htmt_ratios:
avg_htmt = np.mean(htmt_ratios)
results.loc[factor, "avg_htmt_ratio"] = avg_htmt
else:
results.loc[factor, "avg_htmt_ratio"] = np.nan
else:
for factor in factor_names:
results.loc[factor, "avg_htmt_ratio"] = np.nan
# Calculate combined reliability score
# Higher alpha is better, higher cross-loading ratio is better, lower HTMT is better
for factor in factor_names:
alpha = results.loc[factor, "cronbachs_alpha"]
cross_ratio = results.loc[factor, "cross_loading_ratio"]
htmt_ratio_val = results.loc[factor, "avg_htmt_ratio"]
# Combine metrics, handling NaNs
valid_metrics = []
if not np.isnan(alpha):
# Scale to 0-1 range (most alphas are between 0.5-0.95)
scaled_alpha = min(max((alpha - 0.5) / 0.45, 0), 1)
valid_metrics.append(scaled_alpha)
if not np.isnan(cross_ratio):
# Cross-loading ratio is already in good range
cross_ratio_score = min(cross_ratio / 2, 1) # Cap at 1.0
valid_metrics.append(cross_ratio_score)
if not np.isnan(htmt_ratio_val):
# Convert HTMT so lower is better (0.85 is typical threshold)
# If HTMT is too high (> 1), it suggests factors are too similar
# and reliability should be reduced
htmt_score = min(max(1 - htmt_ratio_val, 0), 1)
valid_metrics.append(htmt_score)
# If missing HTMT but have alpha, still calculate a reliability score
# but with less confidence (slightly lower)
if not np.isnan(alpha) and np.isnan(htmt_ratio_val) and len(valid_metrics) == 1:
# Add slightly reduced cross-loading score as fallback
if not np.isnan(cross_ratio):
fallback_cross_ratio = max(0.5, cross_ratio_score * 0.8)
valid_metrics.append(fallback_cross_ratio)
# Calculate combined score
if valid_metrics:
reliability_score = np.mean(valid_metrics)
results.loc[factor, "reliability_score"] = reliability_score
else:
results.loc[factor, "reliability_score"] = np.nan
return results
def bootstrap_factor_reliability(factor_scores, n_bootstrap=1000, debug=False):
"""
Use bootstrap to estimate confidence intervals for factor reliability metrics.
Parameters:
-----------
factor_scores : dict
Dictionary where keys are factor names and values are matrices of scores
n_bootstrap : int
Number of bootstrap samples
debug : bool, optional
Whether to print debug information
Returns:
--------
dict : Dictionary with reliability metrics and confidence intervals
"""
from tqdm.auto import tqdm
# Extract factor names
factor_names = list(factor_scores.keys())
n_models = next(iter(factor_scores.values())).shape[0]
# Store bootstrap results
bootstrap_results = {
factor: {
"reliability_scores": [],
"cronbachs_alpha": [],
"cross_loading_ratio": [],
"avg_htmt_ratio": []
} for factor in factor_names
}
# Perform bootstrap
print(f"Performing {n_bootstrap} bootstrap iterations to compute confidence intervals...")
for _ in tqdm(range(n_bootstrap), desc="Bootstrap Progress"):
# Sample models with replacement
indices = np.random.choice(n_models, size=n_models, replace=True)
# Create bootstrap sample
bootstrap_scores = {
factor: scores[indices] for factor, scores in factor_scores.items()
}
# Calculate reliability metrics for this sample (with silent mode)
# Temporarily redirect stdout to suppress progress bars inside the bootstrap
import sys
from io import StringIO
original_stdout = sys.stdout
sys.stdout = StringIO()
try:
sample_results = calculate_factor_reliability(bootstrap_scores, debug=False)
finally:
sys.stdout = original_stdout
# Store results
for factor in factor_names:
bootstrap_results[factor]["reliability_scores"].append(
sample_results.loc[factor, "reliability_score"])
bootstrap_results[factor]["cronbachs_alpha"].append(
sample_results.loc[factor, "cronbachs_alpha"])
bootstrap_results[factor]["cross_loading_ratio"].append(
sample_results.loc[factor, "cross_loading_ratio"])
bootstrap_results[factor]["avg_htmt_ratio"].append(
sample_results.loc[factor, "avg_htmt_ratio"])
# Calculate confidence intervals
confidence_intervals = {}
for factor in factor_names:
confidence_intervals[factor] = {}
for metric in ["reliability_scores", "cronbachs_alpha", "cross_loading_ratio", "avg_htmt_ratio"]:
values = bootstrap_results[factor][metric]
values = [v for v in values if not np.isnan(v)]
if values:
confidence_intervals[factor][metric] = {
"mean": np.mean(values),
"median": np.median(values),
"lower": np.percentile(values, 2.5),
"upper": np.percentile(values, 97.5)
}
else:
confidence_intervals[factor][metric] = {
"mean": np.nan,
"median": np.nan,
"lower": np.nan,
"upper": np.nan
}
return confidence_intervals
def load_factor_scores_from_jsonl(directory_path):
"""
Load factor scores from JSONL files in the specified directory.
Parameters:
-----------
directory_path : str
Path to directory containing JSONL files
Returns:
--------
dict : Dictionary mapping factor names to score matrices
"""
import json
import glob
# Find all JSONL files (excluding those ending with _ct.txt)
jsonl_files = [f for f in glob.glob(os.path.join(directory_path, "*.jsonl"))
if not f.endswith("_ct.txt")]
if not jsonl_files:
print(f"No JSONL files found in {directory_path}")
return None
print(f"Found {len(jsonl_files)} JSONL files")
# First scan all files to discover all models, questions and factors
# This is to ensure we initialize our data structures properly
all_model_names = set()
all_question_ids = set()
all_factor_keys = set()
question_counts = {}
for jsonl_file in jsonl_files:
model_name = os.path.basename(jsonl_file).replace(".jsonl", "")
all_model_names.add(model_name)
# Read the file
try:
with open(jsonl_file, 'r') as f:
model_data = [json.loads(line) for line in f]
# Extract question IDs and count
file_question_ids = []
for item in model_data:
if "question_id" in item:
qid = item["question_id"]
all_question_ids.add(qid)
file_question_ids.append(qid)
# Check if "games" field exists and discover factors
if "games" in item:
for game in item["games"]:
for key in game:
if "_score" in key:
all_factor_keys.add(key)
question_counts[model_name] = len(file_question_ids)
except Exception as e:
print(f"Error reading {jsonl_file}: {e}")
print(f"Found {len(all_model_names)} models, {len(all_question_ids)} questions, and {len(all_factor_keys)} factor keys")
# Create a sorted list of all question IDs (to ensure consistent ordering)
all_question_ids = sorted(list(all_question_ids))
all_model_names = sorted(list(all_model_names))
# Create a dictionary mapping question IDs to indices
question_id_to_idx = {qid: i for i, qid in enumerate(all_question_ids)}
# Initialize the factor data structure
factor_data = {}
for factor_key in all_factor_keys:
factor_data[factor_key] = {}
for model_name in all_model_names:
# Initialize with NaN values for all questions
factor_data[factor_key][model_name] = [np.nan] * len(all_question_ids)
# Now process each file and fill in the values
for jsonl_file in jsonl_files:
model_name = os.path.basename(jsonl_file).replace(".jsonl", "")
try:
# Read the file
with open(jsonl_file, 'r') as f:
model_data = [json.loads(line) for line in f]
# Process each question
for item in model_data:
if "question_id" not in item or "games" not in item:
continue
question_id = item["question_id"]
question_idx = question_id_to_idx.get(question_id)
if question_idx is None:
continue # Skip if question ID not in our master list
# Process each game (judgment)
for game in item["games"]:
# Extract scores from the game
for key, value in game.items():
if key in all_factor_keys:
# Convert score to numeric value
if isinstance(value, str):
# Mapping for letter scores like A>B, A=B, etc.
score_mapping = {
'': 3,
'A>>B': 1,
'A>B': 2,
'A=B': 3,
'B>A': 4,
'B>>A': 5,
'A<<B': 5,
'A<B': 4,
'B=A': 3,
'B<A': 2,
'B<<A': 1
}
numeric_value = score_mapping.get(value, 3)
else:
numeric_value = value
# Store the value - ensure numeric value is float
factor_data[key][model_name][question_idx] = float(numeric_value)
except Exception as e:
print(f"Error processing {jsonl_file}: {e}")
continue
# Convert to numpy arrays
result = {}
for factor, model_scores in factor_data.items():
# Create a matrix with rows=models, columns=questions
matrix = np.array([model_scores[model] for model in all_model_names], dtype=np.float64)
# Store in result
result[factor] = matrix
# Print some diagnostics
nan_count = np.isnan(matrix).sum()
total_elements = matrix.size
print(f"Factor {factor}: {nan_count}/{total_elements} NaN values ({100 * nan_count / total_elements:.1f}%)")
return result
def main():
parser = argparse.ArgumentParser(description='Calculate factor reliability metrics from model judgment data.')
parser.add_argument('directory', type=str, help='Directory containing processed JSONL files with judgment scores')
parser.add_argument('--output-dir', type=str, help='Directory to save output files (defaults to the input directory)')
parser.add_argument('--n-bootstrap', type=int, default=1000, help='Number of bootstrap samples for confidence intervals')
parser.add_argument('--skip-bootstrap', action='store_true', help='Skip bootstrap confidence interval calculation')
parser.add_argument('--quick', action='store_true', help='Quick mode (fewer bootstrap samples)')
parser.add_argument('--debug', action='store_true', help='Enable extra debug logging')
args = parser.parse_args()
# Set debug level
if args.debug:
print("Debug mode enabled - will print detailed diagnostics")
# Set output directory
output_dir = args.output_dir if args.output_dir else args.directory
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# Load factor scores
print(f"Loading factor scores from {args.directory}...")
factor_scores = load_factor_scores_from_jsonl(args.directory)
if factor_scores is None:
print("Failed to load factor scores. Exiting.")
return
print(f"Loaded scores for {len(factor_scores)} factors:")
for factor, scores in factor_scores.items():
print(f" {factor}: {scores.shape[0]} models, {scores.shape[1]} questions")
# Calculate factor reliability
print("\nCalculating factor reliability metrics...")
reliability_metrics = calculate_factor_reliability(factor_scores, debug=args.debug)
# Save reliability metrics
metrics_path = os.path.join(output_dir, "factor_reliability_metrics.csv")
reliability_metrics.to_csv(metrics_path)
print(f"Saved reliability metrics to {metrics_path}")
# Calculate bootstrap confidence intervals
confidence_intervals = None
if args.skip_bootstrap:
print("\nSkipping bootstrap confidence intervals as requested.")
else:
# Adjust bootstrap samples for quick mode
n_bootstrap = 100 if args.quick else args.n_bootstrap
print(f"\nCalculating bootstrap confidence intervals with {n_bootstrap} samples...")
confidence_intervals = bootstrap_factor_reliability(factor_scores, n_bootstrap, debug=args.debug)
# Prepare confidence intervals for saving
ci_data = []
for factor, metrics in confidence_intervals.items():
for metric, values in metrics.items():
ci_data.append({
"factor": factor,
"metric": metric,
"mean": values["mean"],
"median": values["median"],
"lower_ci": values["lower"],
"upper_ci": values["upper"]
})
# Save confidence intervals if they were calculated
if not args.skip_bootstrap:
ci_df = pd.DataFrame(ci_data)
ci_path = os.path.join(output_dir, "factor_reliability_confidence_intervals.csv")
ci_df.to_csv(ci_path, index=False)
print(f"Saved confidence intervals to {ci_path}")
# Print summary of reliability scores
print("\nReliability Score Summary (higher is better):")
for factor in reliability_metrics.index:
reliability = reliability_metrics.loc[factor, "reliability_score"]
if not np.isnan(reliability):
if confidence_intervals and factor in confidence_intervals:
ci = confidence_intervals[factor]["reliability_scores"]
print(f" {factor}: {reliability:.3f} (95% CI: {ci['lower']:.3f}-{ci['upper']:.3f})")
else:
print(f" {factor}: {reliability:.3f}")
# Print detailed diagnostics
if args.debug:
print("\nDetailed Factor Metrics:")
print(reliability_metrics)
# Suggest interpretation
print("\nInterpretation Guide:")
print(" Reliability Score > 0.8: Excellent - factor is consistent and distinct")
print(" Reliability Score 0.6-0.8: Good - factor is generally reliable")
print(" Reliability Score 0.4-0.6: Moderate - factor may need refinement")
print(" Reliability Score < 0.4: Poor - factor may not be reliably measured")
if __name__ == "__main__":
main()