diff --git a/spf/data_collector.py b/spf/data_collector.py index 883ee96..c643222 100644 --- a/spf/data_collector.py +++ b/spf/data_collector.py @@ -19,7 +19,7 @@ from spf.dataset.v4_data import v4rx_2xf64_keys, v4rx_f64_keys, v4rx_new_dataset from spf.dataset.v5_data import v5rx_2xf64_keys, v5rx_f64_keys, v5rx_new_dataset from spf.dataset.wall_array_v2_idxs import v2_column_names -from spf.rf import beamformer_given_steering, get_avg_phase, precompute_steering_vectors +from spf.rf import beamformer_given_steering, get_avg_phase, get_avg_phase_fast, get_avg_phase_fast2, precompute_steering_vectors from spf.scripts.zarr_utils import zarr_shrink from spf.sdrpluto.sdr_controller import ( EmitterConfig, @@ -266,7 +266,7 @@ def get_data(self): if sdr_rx is None: raise ValueError("SDR RX is None, aborting.") # process the data - signal_matrix = np.vstack(sdr_rx["signal_matrix"]) + signal_matrix = np.vstack(sdr_rx["signal_matrix"],dtype=np.complex64) current_time = time.time() - self.time_offset # timestamp return data_to_snapshot( @@ -285,10 +285,10 @@ def get_data(self): sdr_rx = self.get_rx() # process the data - signal_matrix = np.vstack(sdr_rx["signal_matrix"]) + signal_matrix = np.vstack(sdr_rx["signal_matrix"]).astype(np.complex64) current_time = time.time() - self.time_offset # timestamp after sample arrives - avg_phase_diff = get_avg_phase(signal_matrix) + avg_phase_diff =get_avg_phase_fast2(signal_matrix) assert self.pplus.rx_config.rx_spacing > 0.001 return self.snapshot_class( signal_matrix=signal_matrix, @@ -541,12 +541,9 @@ def write_to_record_matrix(self, thread_idx, record_idx, data): data.gps_long = current_pos_heading_and_time["gps"][0] data.gps_lat = current_pos_heading_and_time["gps"][1] data.gps_timestamp = current_pos_heading_and_time["gps_time"] - if self.realtime_v5inf is not None: data_dict=asdict(data) - print(data_dict['signal_matrix'].dtype,"XXA") data_dict['signal_matrix']=data_dict['signal_matrix'].reshape(1,1,*data_dict['signal_matrix'].shape).astype(np.complex64) - print(data_dict['signal_matrix'].dtype,"XXA") self.realtime_v5inf.write_to_idx(record_idx, thread_idx, data_dict) if self.data_filename is not None: z = self.zarr[f"receivers/r{thread_idx}"] @@ -556,10 +553,11 @@ def write_to_record_matrix(self, thread_idx, record_idx, data): def close(self): - self.zarr.store.close() - self.zarr = None - logging.info(f"Trying to shrink... {self.data_filename}") - zarr_shrink(self.data_filename) + if self.data_filename is not None: + self.zarr.store.close() + self.zarr = None + logging.info(f"Trying to shrink... {self.data_filename}") + zarr_shrink(self.data_filename) # V5 data format diff --git a/spf/dataset/segmentation.py b/spf/dataset/segmentation.py index cf80abb..af248b2 100644 --- a/spf/dataset/segmentation.py +++ b/spf/dataset/segmentation.py @@ -24,6 +24,8 @@ pi_norm, reduce_theta_to_positive_y, windowed_trimmed_circular_mean_and_stddev, + windowed_trimmed_circular_mean_and_stddev_fast, + windowed_trimmed_circular_mean_and_stddev_fast2, ) from spf.scripts.zarr_utils import ( new_yarr_dataset, @@ -375,6 +377,7 @@ def segment_session( skip_beamformer=False, skip_detrend=False, skip_segmentation=False, + fast_beamformer=False, **kwrgs, ): """ @@ -507,9 +510,13 @@ def segment_session( .T ) else: + + beamformer_fn=beamformer_given_steering_nomean + if fast_beamformer: + beamformer_fn=beamformer_given_steering_nomean_fast # CPU version of beamforming (same algorithm but slower) segmentation_results["windowed_beamformer"] = ( - beamformer_given_steering_nomean_fast( + beamformer_fn( steering_vectors=kwrgs["steering_vectors"], signal_matrix=v.astype(np.complex64), ) @@ -646,9 +653,13 @@ def get_all_windows_stats( # - Trimmed circular mean of phase differences # - Trimmed standard deviation of phase differences # - Median absolute signal amplitude - step_idxs, step_stats = windowed_trimmed_circular_mean_and_stddev( + step_idxs, step_stats = windowed_trimmed_circular_mean_and_stddev_fast( v, pd, window_size=window_size, stride=stride, trim=trim ) + # step_idxs2, step_stats2 = windowed_trimmed_circular_mean_and_stddev_fast2( + # v, pd, window_size=window_size, stride=stride, trim=trim + # ) + #breakpoint() return step_idxs, step_stats diff --git a/spf/dataset/spf_dataset.py b/spf/dataset/spf_dataset.py index 7bbb2f5..69419ec 100644 --- a/spf/dataset/spf_dataset.py +++ b/spf/dataset/spf_dataset.py @@ -754,6 +754,7 @@ def render_session(self, ridx, data): skip_beamformer=False, skip_detrend=self.skip_detrend, skip_segmentation=self.skip_segmentation, + fast_beamformer=True, **{ "steering_vectors": self.steering_vectors[ridx], **DEFAULT_SEGMENT_ARGS, diff --git a/spf/dataset/spf_nn_dataset_wrapper.py b/spf/dataset/spf_nn_dataset_wrapper.py index 7956c06..25bf5a4 100644 --- a/spf/dataset/spf_nn_dataset_wrapper.py +++ b/spf/dataset/spf_nn_dataset_wrapper.py @@ -94,7 +94,7 @@ def to_absolute_north(self, sample): sample[ridx]["paired"] = paired_nn_inference_rotated return sample - @lru_cache + @lru_cache(4) def get_inference_for_idx(self, idx): if not self.ds.realtime: return [ @@ -103,7 +103,7 @@ def get_inference_for_idx(self, idx): ] return self.get_and_annotate_entry_at_idx(idx) - @lru_cache + @lru_cache(4) def get_and_annotate_entry_at_idx(self, idx): sample = self.ds[idx] if not self.ds.realtime: @@ -135,7 +135,7 @@ def __next__(self): self.serving_idx += 1 return sample - @lru_cache + @lru_cache(4) def __getitem__(self, idx): return self.get_and_annotate_entry_at_idx(idx) diff --git a/spf/rf.py b/spf/rf.py index 9163136..f0d6c42 100644 --- a/spf/rf.py +++ b/spf/rf.py @@ -77,11 +77,59 @@ def torch_pi_norm_pi(x): def torch_pi_norm(x: torch.Tensor, max_angle: float = torch.pi): return ((x + max_angle) % (2 * max_angle)) - max_angle +#@njit +def fast_percentile(x, percentile): + n = x.shape[0] + if n == 0: + return 0.0 + rank = (percentile / 100.0) * (n - 1) + k = int(np.floor(rank)) # equivalent to method='lower' + k = max(0, min(k, n - 1)) + partitioned = np.partition(x, k) + return partitioned[k] + +@njit +def circular_stddev_fast(v, mean, trim=50.0): + n = v.shape[0] + if n <= 1: + return 0.0, 0.0 + + # Compute distance from mean + diffs = circular_diff_to_mean_single_fast(v, mean) + + # Precompute sums + total_sum_sq = 0.0 + for i in range(n): + total_sum_sq += diffs[i] * diffs[i] + + stddev = np.sqrt(total_sum_sq / (n - 1)) + + # Early exit if no trimming needed + if trim == 0.0: + return stddev, stddev + + # Find threshold for trimming + threshold = fast_percentile_1d(diffs, 100.0 - trim) + + trimmed_sum_sq = 0.0 + trimmed_count = 0 + + for i in range(n): + if diffs[i] <= threshold: + trimmed_sum_sq += diffs[i] * diffs[i] + trimmed_count += 1 + + if trimmed_count <= 1: + trimmed_stddev = 0.0 + else: + trimmed_stddev = np.sqrt(trimmed_sum_sq / (trimmed_count - 1)) + + return stddev, trimmed_stddev # returns circular_stddev and trimmed cricular stddev @njit def circular_stddev(v, u, trim=50.0): - diff_from_mean = circular_diff_to_mean_single(angles=v, mean=u) + diff_from_mean = circular_diff_to_mean_single_fast(angles=v, mean=u) diff_from_mean_squared = diff_from_mean**2 @@ -179,6 +227,19 @@ def circular_diff_to_mean_single(angles, mean: float): return b +@njit +def circular_diff_to_mean_single_fast(angles, mean: float): + n = angles.shape[0] + out = np.empty(n, dtype=np.float32) + two_pi = 2.0 * np.pi + + for i in range(n): + diff = np.abs(mean - angles[i]) % two_pi + out[i] = diff if diff < (two_pi - diff) else (two_pi - diff) + + return out + + # @njit def circular_diff_to_mean(angles, means): assert means.ndim == 1 @@ -186,6 +247,22 @@ def circular_diff_to_mean(angles, means): b = 2 * np.pi - a return np.min(np.vstack([a[None], b[None]]), axis=0) +@njit +def circular_mean_simple_fast(angles): + n_rows, n_cols = angles.shape + result = np.empty(n_rows, dtype=np.float32) + + for i in range(n_rows): + sx = 0.0 + sy = 0.0 + for j in range(n_cols): + angle = angles[i, j] + sx += np.cos(angle) + sy += np.sin(angle) + mean_angle = np.arctan2(sy, sx) + result[i] = pi_norm(mean_angle) + + return result, result # TODO remove def circular_mean(angles, trim, weights=None): @@ -213,6 +290,61 @@ def circular_mean(angles, trim, weights=None): return pi_norm(cm), pi_norm(_cm) +@njit +def circular_mean_single_fast(angles, trim, weights=None): + n = angles.shape[0] + two_pi = 2.0 * np.pi + + # Early exit for empty + if n == 0: + return 0.0, 0.0 + + # Precompute sin and cos + _sin_angles = np.sin(angles) + _cos_angles = np.cos(angles) + + if weights is not None: + _sin_angles = _sin_angles * weights + _cos_angles = _cos_angles * weights + + # Compute untrimmed circular mean + sum_sin = _sin_angles.sum() + sum_cos = _cos_angles.sum() + cm = np.arctan2(sum_sin, sum_cos) % two_pi + + if trim == 0.0: + return pi_norm(cm), pi_norm(cm) + + # Compute deviations + dists = circular_diff_to_mean_single_fast(angles, cm) + + # Fast percentile instead of full sort + threshold = fast_percentile_1d(dists, 100.0 - trim) + + # Compute trimmed sin and cos sums + trimmed_sum_sin = 0.0 + trimmed_sum_cos = 0.0 + + for i in range(n): + if dists[i] <= threshold: + trimmed_sum_sin += _sin_angles[i] + trimmed_sum_cos += _cos_angles[i] + + trimmed_cm = np.arctan2(trimmed_sum_sin, trimmed_sum_cos) % two_pi + + return pi_norm(cm), pi_norm(trimmed_cm) + +# Helper functions you need: + +@njit +def fast_percentile_1d(x, percentile): + n = x.shape[0] + if n == 0: + return 0.0 + k = int(np.ceil((percentile / 100.0) * n)) - 1 + k = max(0, min(k, n-1)) + partitioned = np.partition(x, k) + return partitioned[k] @njit def circular_mean_single(angles, trim, weights=None): @@ -228,7 +360,7 @@ def circular_mean_single(angles, trim, weights=None): r = pi_norm(cm) return r, r - dists = circular_diff_to_mean_single(angles=angles, mean=cm) + dists = circular_diff_to_mean_single_fast(angles=angles, mean=cm) mask = dists <= np.percentile(dists, 100.0 - trim) _cm = np.arctan2(_sin_angles[mask].sum(), _cos_angles[mask].sum()) % (2 * np.pi) @@ -326,12 +458,29 @@ def torch_get_stats_for_signal(v: torch.Tensor, pd: torch.Tensor, trim: float): ) return torch.hstack((trimmed_cm, trimmed_stddev, abs_signal_median)) +@njit +def fast_median_abs(x): + n = x.shape[0] + if n == 0: + return 0.0 + abs_x = np.empty(n, dtype=np.float32) + for i in range(n): + abs_x[i] = abs(x[i]) + + mid = n // 2 + partitioned = np.partition(abs_x, mid) + + if n % 2 == 1: + return partitioned[mid] + else: + return 0.5 * (np.partition(abs_x, mid-1)[mid-1] + partitioned[mid]) + @njit def get_stats_for_signal(v, pd, trim): - trimmed_cm = circular_mean_single(pd, trim=trim)[1] # get the value for trimmed - trimmed_stddev = circular_stddev(pd, trimmed_cm, trim=trim)[1] - abs_signal_median = np.median(np.abs(v)) if v.size > 0 else 0 + trimmed_cm = circular_mean_single_fast(pd, trim=trim)[1] # get the value for trimmed + trimmed_stddev = circular_stddev_fast(pd, trimmed_cm, trim=trim)[1] + abs_signal_median = fast_median_abs(v.flatten()) if v.size > 0 else 0.0 return trimmed_cm, trimmed_stddev, abs_signal_median @@ -361,7 +510,170 @@ def torch_windowed_trimmed_circular_mean_and_stddev( _v, _pd, trim ) # trimmed_cm, trimmed_stddev, abs_signal_median return step_idxs, step_stats +@njit +def fast_abs_median(x): + n = x.shape[0] + if n == 0: + return 0.0 + + abs_x = np.empty(n, dtype=np.float32) + for i in range(n): + abs_x[i] = abs(x[i]) + mid = n // 2 + partitioned = np.partition(abs_x, mid) + + if n % 2 == 1: + return partitioned[mid] + else: + # For even n, average the two middle values + return 0.5 * (np.partition(abs_x, mid - 1)[mid - 1] + partitioned[mid]) + +@njit +def circular_diff_to_mean_single_scalar(angle, mean): + two_pi = 2.0 * np.pi + diff = (mean - angle) % two_pi + if diff > np.pi: + diff -= two_pi + return abs(diff) + +#@njit(parallel=True) +def windowed_trimmed_circular_mean_and_stddev_fast2(v, pd, window_size, stride, trim): + n_samples = pd.shape[0] + # No windows if input too short or invalid stride + if n_samples < window_size or stride <= 0: + return np.empty(0, np.int64), np.empty((0, 4), np.float64) + n_windows = (n_samples - window_size) // stride + 1 + + # Prepare output arrays + step_idxs = np.empty(n_windows, np.int64) + step_stats = np.empty((n_windows, 4), np.float64) + + for j in prange(n_windows): + start = j * stride + end = start + window_size + + # Compute initial circular mean of pd in this window + sx = 0.0 + sy = 0.0 + for k in range(start, end): + angle = pd[k] + sx += math.cos(angle) + sy += math.sin(angle) + mean_angle = math.atan2(sy, sx) + + # Compute absolute circular differences to the initial mean + absdiffs = np.empty(window_size, np.float64) + idx = 0 + for k in range(start, end): + diff = circular_diff_to_mean_single_scalar(pd[k], mean_angle) + absdiffs[idx] = diff if diff >= 0 else -diff + idx += 1 + + # Determine trimming threshold based on 'trim' parameter + if trim > 0: + if trim < 1: + # trim is a fraction (proportion to trim each tail) + keep_frac = 1.0 - 2.0 * trim + else: + # trim is an absolute count of elements to trim on each side + keep_frac = float(max(0, (window_size - 2 * int(trim)))) / window_size + if keep_frac <= 0: + threshold = 0.0 + elif keep_frac >= 1: + threshold = np.max(absdiffs) + else: + threshold = fast_percentile_1d(absdiffs, keep_frac * 100.0) + else: + # No trimming + threshold = np.inf + + # Compute trimmed circular mean (include only values within threshold) + sx2 = 0.0 + sy2 = 0.0 + count2 = 0 + for k in range(start, end): + diff = circular_diff_to_mean_single_fast(pd[k], mean_angle) + diff = diff if diff >= 0 else -diff + if diff <= threshold: + angle = pd[k] + sx2 += math.cos(angle) + sy2 += math.sin(angle) + count2 += 1 + circ_mean = math.atan2(sy2, sx2) if count2 > 0 else mean_angle + + # Compute circular standard deviation of the trimmed values + if count2 > 0: + sum_sq = 0.0 + for k in range(start, end): + diff2 = circular_diff_to_mean_single_fast(pd[k], circ_mean) + diff2 = diff2 if diff2 >= 0 else -diff2 + if diff2 <= threshold: + sum_sq += diff2 * diff2 + circ_std = math.sqrt(sum_sq / count2) + else: + circ_std = 0.0 + + # Compute median of absolute values for each of the two signal channels + abs_vals0 = np.empty(window_size, np.float64) + abs_vals1 = np.empty(window_size, np.float64) + idx = 0 + for k in range(start, end): + val0 = v[0, k] + val1 = v[1, k] + abs_vals0[idx] = val0 if val0 >= 0 else -val0 + abs_vals1[idx] = val1 if val1 >= 0 else -val1 + idx += 1 + med0 = fast_abs_median(abs_vals0) + med1 = fast_abs_median(abs_vals1) + + # Store results + step_idxs[j] = start + window_size // 2 + step_stats[j, 0] = circ_mean + step_stats[j, 1] = circ_std + step_stats[j, 2] = med0 + step_stats[j, 3] = med1 + + return step_idxs, step_stats + +@njit +def windowed_trimmed_circular_mean_and_stddev_fast(v, pd, window_size, stride, trim=50.0): + n_samples = pd.shape[0] + assert (n_samples - window_size) % stride == 0 + n_steps = 1 + (n_samples - window_size) // stride + + step_stats = np.zeros((n_steps, 3), dtype=np.float32) + + for step in prange(n_steps): + start_idx = step * stride + end_idx = start_idx + window_size + + # Instead of slicing arrays, pass start/end indices + step_stats[step] = get_stats_for_signal_window( + v, pd, start_idx, end_idx, trim + ) + + # After loop: create step_idxs + step_idxs = np.empty((n_steps, 2), dtype=np.int32) + for step in range(n_steps): + start_idx = step * stride + end_idx = start_idx + window_size + step_idxs[step, 0] = start_idx + step_idxs[step, 1] = end_idx + + return step_idxs, step_stats + +# Modified get_stats_for_signal +@njit +def get_stats_for_signal_window(v, pd, start_idx, end_idx, trim): + _pd = pd[start_idx:end_idx] + _v = v[:, start_idx:end_idx] + + trimmed_cm = circular_mean_single_fast(_pd, trim=trim)[1] + trimmed_stddev = circular_stddev_fast(_pd, trimmed_cm, trim=trim)[1] + abs_signal_median = fast_median_abs(_v.flatten()) if _v.size > 0 else 0.0 + + return np.array([trimmed_cm, trimmed_stddev, abs_signal_median], dtype=np.float32) @njit def windowed_trimmed_circular_mean_and_stddev(v, pd, window_size, stride, trim=50.0): @@ -427,6 +739,27 @@ def get_avg_phase(signal_matrix, trim=0.0): ).reshape(-1) +#@njit +def get_avg_phase_fast(signal_matrix): + return np.array( + circular_mean_simple_fast(get_phase_diff(signal_matrix=signal_matrix)[None]) + ).reshape(-1) + +@njit +def get_avg_phase_fast2(signal_matrix): + pd = get_phase_diff(signal_matrix) + + sx = 0.0 + sy = 0.0 + for i in range(pd.shape[0]): + angle = pd[i] + sx += np.cos(angle) + sy += np.sin(angle) + + mean_angle = pi_norm(np.arctan2(sy, sx)) + + return np.array([mean_angle, mean_angle], dtype=np.float32) + # @torch.jit.script def torch_get_avg_phase_notrim(signal_matrix: torch.Tensor): return torch.hstack( @@ -863,7 +1196,6 @@ def beamformer_given_steering_nomean_fast(steering_vectors, signal_matrix): beamformer_given_steering_nomean_fast_core(steering_vectors, signal_matrix, output) return output - @njit(parallel=True) def beamformer_given_steering_nomean_fast_core(steering_vectors, signal_matrix, output): n_thetas, n_antennas = steering_vectors.shape diff --git a/spf/utils.py b/spf/utils.py index e926f24..9c9e1fd 100644 --- a/spf/utils.py +++ b/spf/utils.py @@ -11,7 +11,7 @@ from deepdiff.diff import DeepDiff from torch.utils.data import BatchSampler, DistributedSampler -SEGMENTATION_VERSION = 3.6 +SEGMENTATION_VERSION = 3.7 warnings.simplefilter(action="ignore", category=FutureWarning) diff --git a/tests/test_beamformer.py b/tests/test_beamformer.py index e16fe34..65e6e52 100644 --- a/tests/test_beamformer.py +++ b/tests/test_beamformer.py @@ -7,8 +7,15 @@ UCADetector, ULADetector, beamformer, + beamformer_given_steering_nomean, + beamformer_given_steering_nomean_fast, c, + circular_diff_to_mean_single, + circular_diff_to_mean_single_fast, circular_mean, + circular_mean_simple_fast, + fast_median_abs, + fast_percentile, rotation_matrix, torch_circular_mean, ) @@ -93,135 +100,3 @@ def test_beamformer(): # plt.axhline(y=beam_former_outputs_at_t.max()-potential_error) # plt.show() - -def test_simple_segment(): - rng = np.random.default_rng(12345) - signal_matrix = random_signal_matrix(1200, rng=rng).reshape(2, 600) - - ground_truth_windows = [ - {"start_idx": 0, "end_idx": 100, "mean": 0.5}, - {"start_idx": 300, "end_idx": 400, "mean": 1.3}, - {"start_idx": 500, "end_idx": 600, "mean": 1.25}, - ] - - for window in ground_truth_windows: - signal_matrix[0, window["start_idx"] : window["end_idx"]] *= 10 - signal_matrix[1, window["start_idx"] : window["end_idx"]] = signal_matrix[ - 0, window["start_idx"] : window["end_idx"] - ] * np.exp(-1j * window["mean"]) - - mean_diff_threshold = 0.05 - segmented_windows = simple_segment( - signal_matrix, - window_size=100, - stride=50, - trim=10, - # mean_diff_threshold=mean_diff_threshold, - max_stddev_threshold=0.1, - drop_less_than_size=0, - min_abs_signal=10, - )["simple_segmentation"] - - assert segmented_windows[0]["start_idx"] == ground_truth_windows[0]["start_idx"] - assert segmented_windows[0]["end_idx"] == ground_truth_windows[0]["end_idx"] - assert np.isclose( - segmented_windows[0]["mean"], - ground_truth_windows[0]["mean"], - mean_diff_threshold, - ) - - assert segmented_windows[1]["start_idx"] == ground_truth_windows[1]["start_idx"] - # assert segmented_windows[1]["end_idx"] == ground_truth_windows[2]["end_idx"] - assert np.isclose( - segmented_windows[1]["mean"], - ground_truth_windows[1]["mean"], - mean_diff_threshold, - ) - - -def test_simple_segment_separate(): - rng = np.random.default_rng(12345) - signal_matrix = random_signal_matrix(800, rng=rng).reshape(2, 400) - - ground_truth_windows = [ - {"start_idx": 0, "end_idx": 100, "mean": 0.5}, - {"start_idx": 200, "end_idx": 300, "mean": 1.3}, - {"start_idx": 300, "end_idx": 400, "mean": 1.1}, - ] - - for window in ground_truth_windows: - signal_matrix[0, window["start_idx"] : window["end_idx"]] *= 10 - signal_matrix[1, window["start_idx"] : window["end_idx"]] = signal_matrix[ - 0, window["start_idx"] : window["end_idx"] - ] * np.exp(-1j * window["mean"]) - - mean_diff_threshold = 0.05 - segmented_windows = simple_segment( - signal_matrix, - window_size=100, - stride=50, - trim=10, - # mean_diff_threshold=mean_diff_threshold, - max_stddev_threshold=0.1, - drop_less_than_size=0, - min_abs_signal=10, - )["simple_segmentation"] - - assert segmented_windows[0]["start_idx"] == ground_truth_windows[0]["start_idx"] - assert segmented_windows[0]["end_idx"] == ground_truth_windows[0]["end_idx"] - assert np.isclose( - segmented_windows[0]["mean"], - ground_truth_windows[0]["mean"], - mean_diff_threshold, - ) - - assert segmented_windows[1]["start_idx"] == ground_truth_windows[1]["start_idx"] - assert segmented_windows[1]["end_idx"] == ground_truth_windows[1]["end_idx"] - assert np.isclose( - segmented_windows[1]["mean"], - ground_truth_windows[1]["mean"], - mean_diff_threshold, - ) - - assert segmented_windows[2]["start_idx"] == ground_truth_windows[2]["start_idx"] - assert segmented_windows[2]["end_idx"] == ground_truth_windows[2]["end_idx"] - assert np.isclose( - segmented_windows[2]["mean"], - ground_truth_windows[2]["mean"], - mean_diff_threshold, - ) - - -def test_circular_mean(): - gt_mean = np.array([0, np.pi / 2]).reshape(-1, 1) - - gt_mean = np.random.uniform(low=-np.pi, high=np.pi, size=(128, 1)) - - for n in [1000]: - angles = np.random.randn(gt_mean.shape[0], n) * 0.2 + gt_mean - m, _m = circular_mean(angles, 20) - assert np.isclose(_m, gt_mean[:, 0], atol=1e-1).all() - - _angles = torch.from_numpy(angles) - m, _m = torch_circular_mean(_angles, 20) - assert torch.isclose( - _m.double(), torch.from_numpy(gt_mean[:, 0]), atol=1e-1 - ).all() - - # try with and without weights - add_noise_angles = np.hstack( - [angles, np.random.randn(gt_mean.shape[0], n * 10) + 0.5] - ) - # add_noise_angles = 10*np.random.randn(2, n*40) - weights = np.zeros(add_noise_angles.shape) - weights[:, :n] = 1 - - m, _m = circular_mean(add_noise_angles, 20, weights=weights) - assert np.isclose(_m, gt_mean[:, 0], atol=1e-1).all() - - m, _m = torch_circular_mean( - torch.from_numpy(add_noise_angles), 20, weights=weights - ) - assert torch.isclose( - _m.double(), torch.from_numpy(gt_mean[:, 0]), atol=1e-1 - ).all() diff --git a/tests/test_rf.py b/tests/test_rf.py new file mode 100644 index 0000000..86ddd0c --- /dev/null +++ b/tests/test_rf.py @@ -0,0 +1,169 @@ +import numpy as np +import torch + +from spf.rf import ( + beamformer_given_steering_nomean, + beamformer_given_steering_nomean_fast, + circular_diff_to_mean_single, + circular_diff_to_mean_single_fast, + circular_mean, + circular_mean_simple_fast, + fast_median_abs, + fast_percentile, + torch_circular_mean, +) + +# lots of test code from chat gpt + +def test_circular_mean(): + gt_mean = np.array([0, np.pi / 2]).reshape(-1, 1) + + gt_mean = np.random.uniform(low=-np.pi, high=np.pi, size=(128, 1)) + + for n in [1000]: + angles = np.random.randn(gt_mean.shape[0], n) * 0.2 + gt_mean + m, _m = circular_mean(angles, 20) + assert np.isclose(_m, gt_mean[:, 0], atol=1e-1).all() + + _angles = torch.from_numpy(angles) + m, _m = torch_circular_mean(_angles, 20) + assert torch.isclose( + _m.double(), torch.from_numpy(gt_mean[:, 0]), atol=1e-1 + ).all() + + # try with and without weights + add_noise_angles = np.hstack( + [angles, np.random.randn(gt_mean.shape[0], n * 10) + 0.5] + ) + # add_noise_angles = 10*np.random.randn(2, n*40) + weights = np.zeros(add_noise_angles.shape) + weights[:, :n] = 1 + + m, _m = circular_mean(add_noise_angles, 20, weights=weights) + assert np.isclose(_m, gt_mean[:, 0], atol=1e-1).all() + + m, _m = torch_circular_mean( + torch.from_numpy(add_noise_angles), 20, weights=weights + ) + assert torch.isclose( + _m.double(), torch.from_numpy(gt_mean[:, 0]), atol=1e-1 + ).all() + + +def test_beamformer_functions(): + np.random.seed(42) # for reproducibility + + # Generate random inputs + n_thetas = 65 + n_antennas = 2 + n_samples = 1000 + n_iter=10 + + for _ in range(n_iter): + steering_vectors = np.random.randn(n_thetas, n_antennas) + 1j * np.random.randn(n_thetas, n_antennas) + steering_vectors = steering_vectors.astype(np.complex64) + + signal_matrix = np.random.randn(n_antennas, n_samples) + 1j * np.random.randn(n_antennas, n_samples) + signal_matrix = signal_matrix.astype(np.complex64) + + # Run both versions + slow_output = beamformer_given_steering_nomean(steering_vectors, signal_matrix) + fast_output = beamformer_given_steering_nomean_fast(steering_vectors, signal_matrix) + + assert np.isclose(slow_output,fast_output,rtol=1e-4).all() + + + +def test_fast_percentile(): + np.random.seed(42) + + # Test cases + arrays = [ + np.array([1, 2, 3, 4, 5]), + np.array([10, 20, 30]), + np.random.randn(100), + np.random.uniform(-10, 10, size=1000), + #np.array([]) # Empty array case + ] + + percentiles = [0.0, 25.0, 50.0, 75.0, 100.0] + + for arr_idx, arr in enumerate(arrays): + arr_float32 = arr.astype(np.float32) + + print(f"Testing array #{arr_idx}, size {arr_float32.size}") + for p in percentiles: + fast_val = fast_percentile(arr_float32, p) + numpy_val = np.percentile(arr_float32, p, interpolation='lower') # match "nearest" behavior + + diff = abs(fast_val - numpy_val) + print(f" Percentile {p:.1f}% -> fast: {fast_val:.5f}, numpy: {numpy_val:.5f}, diff: {diff:.2e}") + + # Allow small floating point rounding errors + if arr_float32.size > 0: + assert diff < 1e-3, f"Mismatch at array {arr_idx}, percentile {p}" + else: + assert fast_val == 0.0, "Empty array should return 0.0" + +def test_circular_diff_to_mean_single(): + np.random.seed(42) + + means = [0.0, np.pi/2, np.pi, 3*np.pi/2, 2*np.pi] + for mean in means: + print(f"Testing with mean = {mean:.4f}") + + angles = np.random.uniform(0, 2*np.pi, size=1000).astype(np.float32) + + output_orig = circular_diff_to_mean_single(angles, mean) + output_opt = circular_diff_to_mean_single_fast(angles, mean) + + max_abs_error = np.max(np.abs(output_orig - output_opt)) + print(f" Max abs error: {max_abs_error:.2e}") + + # Allow only tiny floating point differences + assert max_abs_error < 1e-6, f"Mismatch! Max error: {max_abs_error}" + +def test_fast_median_abs(): + np.random.seed(42) + + arrays = [ + np.array([], dtype=np.float32), # Empty + np.array([3.0], dtype=np.float32), # Single element + np.array([-2.0, 5.0], dtype=np.float32), # Two elements + np.random.uniform(-10, 10, size=5).astype(np.float32), # Odd size + np.random.uniform(-10, 10, size=6).astype(np.float32), # Even size + np.random.randn(1000).astype(np.float32), # Large array + ] + + for idx, arr in enumerate(arrays): + computed = fast_median_abs(arr) + + if arr.size == 0: + # Special case for empty array + print(f"Array #{idx}: Empty array, Computed={computed:.6f}") + assert computed == 0.0, f"Empty array should return 0.0, got {computed}" + else: + expected = np.median(np.abs(arr)) + diff = abs(expected - computed) + print(f"Array #{idx}: Expected={expected:.6f}, Computed={computed:.6f}, Diff={diff:.2e}") + assert diff < 1e-6, f"Mismatch in array #{idx}! Diff={diff}" + +def test_circular_mean(): + np.random.seed(42) + + test_cases = [ + np.random.uniform(0, 2*np.pi, size=(1, 5)).astype(np.float32), # small 1×5 + np.random.uniform(0, 2*np.pi, size=(10, 64)).astype(np.float32), # medium 10×64 + np.random.uniform(0, 2*np.pi, size=(100, 128)).astype(np.float32), # larger 100×128 + ] + + for idx, angles in enumerate(test_cases): + cm_orig, _ = circular_mean(angles, trim=0.0) # Your original slow + cm_opt, _ = circular_mean_simple_fast(angles) # Your new fast + + # Correct circular difference! + diff = np.abs((cm_orig - cm_opt + np.pi) % (2*np.pi) - np.pi) + max_diff = np.max(diff) + + assert max_diff < 1e-6, f"Mismatch in test case {idx}! Max circular diff = {max_diff}" + diff --git a/tests/test_segmentation.py b/tests/test_segmentation.py new file mode 100644 index 0000000..0f75679 --- /dev/null +++ b/tests/test_segmentation.py @@ -0,0 +1,101 @@ +import numpy as np +from spf.dataset.segmentation import simple_segment +from spf.utils import random_signal_matrix + +def test_simple_segment(): + rng = np.random.default_rng(12345) + signal_matrix = random_signal_matrix(1200, rng=rng).reshape(2, 600) + + ground_truth_windows = [ + {"start_idx": 0, "end_idx": 100, "mean": 0.5}, + {"start_idx": 300, "end_idx": 400, "mean": 1.3}, + {"start_idx": 500, "end_idx": 600, "mean": 1.25}, + ] + + for window in ground_truth_windows: + signal_matrix[0, window["start_idx"] : window["end_idx"]] *= 10 + signal_matrix[1, window["start_idx"] : window["end_idx"]] = signal_matrix[ + 0, window["start_idx"] : window["end_idx"] + ] * np.exp(-1j * window["mean"]) + + mean_diff_threshold = 0.05 + segmented_windows = simple_segment( + signal_matrix, + window_size=100, + stride=50, + trim=10, + # mean_diff_threshold=mean_diff_threshold, + max_stddev_threshold=0.1, + drop_less_than_size=0, + min_abs_signal=10, + )["simple_segmentation"] + + assert segmented_windows[0]["start_idx"] == ground_truth_windows[0]["start_idx"] + assert segmented_windows[0]["end_idx"] == ground_truth_windows[0]["end_idx"] + assert np.isclose( + segmented_windows[0]["mean"], + ground_truth_windows[0]["mean"], + mean_diff_threshold, + ) + + assert segmented_windows[1]["start_idx"] == ground_truth_windows[1]["start_idx"] + # assert segmented_windows[1]["end_idx"] == ground_truth_windows[2]["end_idx"] + assert np.isclose( + segmented_windows[1]["mean"], + ground_truth_windows[1]["mean"], + mean_diff_threshold, + ) + + +def test_simple_segment_separate(): + rng = np.random.default_rng(12345) + signal_matrix = random_signal_matrix(800, rng=rng).reshape(2, 400) + + ground_truth_windows = [ + {"start_idx": 0, "end_idx": 100, "mean": 0.5}, + {"start_idx": 200, "end_idx": 300, "mean": 1.3}, + {"start_idx": 300, "end_idx": 400, "mean": 1.1}, + ] + + for window in ground_truth_windows: + signal_matrix[0, window["start_idx"] : window["end_idx"]] *= 10 + signal_matrix[1, window["start_idx"] : window["end_idx"]] = signal_matrix[ + 0, window["start_idx"] : window["end_idx"] + ] * np.exp(-1j * window["mean"]) + + mean_diff_threshold = 0.05 + segmented_windows = simple_segment( + signal_matrix, + window_size=100, + stride=50, + trim=10, + # mean_diff_threshold=mean_diff_threshold, + max_stddev_threshold=0.1, + drop_less_than_size=0, + min_abs_signal=10, + )["simple_segmentation"] + + assert segmented_windows[0]["start_idx"] == ground_truth_windows[0]["start_idx"] + assert segmented_windows[0]["end_idx"] == ground_truth_windows[0]["end_idx"] + assert np.isclose( + segmented_windows[0]["mean"], + ground_truth_windows[0]["mean"], + mean_diff_threshold, + ) + + assert segmented_windows[1]["start_idx"] == ground_truth_windows[1]["start_idx"] + assert segmented_windows[1]["end_idx"] == ground_truth_windows[1]["end_idx"] + assert np.isclose( + segmented_windows[1]["mean"], + ground_truth_windows[1]["mean"], + mean_diff_threshold, + ) + + assert segmented_windows[2]["start_idx"] == ground_truth_windows[2]["start_idx"] + assert segmented_windows[2]["end_idx"] == ground_truth_windows[2]["end_idx"] + assert np.isclose( + segmented_windows[2]["mean"], + ground_truth_windows[2]["mean"], + mean_diff_threshold, + ) +