diff --git a/gamma/utils.py b/gamma/utils.py index 9ddbed2..fd4bfd1 100644 --- a/gamma/utils.py +++ b/gamma/utils.py @@ -1,5 +1,6 @@ import multiprocessing as mp import platform +import time from collections import Counter from datetime import datetime @@ -80,6 +81,35 @@ def convert_picks_csv(picks, stations, config): ) +class LockWithTimeout: + def __init__(self, lock, timeout=5 * 60, max_hold_time=5 * 60): + self.lock = lock + self.timeout = timeout + self.max_hold_time = max_hold_time + self.acquired = False + self.hold_start_time = None + + def __enter__(self): + start_time = time.time() + while time.time() - start_time < self.timeout: + try: + if self.lock.acquire(): + self.acquired = True + self.hold_start_time = time.time() + return self + except: + pass + time.sleep(0.1) + return None + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.acquired: + if time.time() - self.hold_start_time > self.max_hold_time: + print(f"\nLock held too long ({time.time() - self.hold_start_time:.1f}s), releasing") + self.lock.release() + self.acquired = False + + def hierarchical_dbscan_clustering(data, phase_loc, phase_type, phase_weight, vel, eps=15, min_samples=3): def dbscan2(t, xy, w, ph, vel, eps, min_samples, ratio=1.1): @@ -152,110 +182,122 @@ def association(picks, stations, config, event_idx0=0, method="BGMM", **kwargs): else: config["eikonal"] = initialize_eikonal(config["eikonal"]) - if ("use_dbscan" in config) and config["use_dbscan"]: - # db = DBSCAN(eps=config["dbscan_eps"], min_samples=config["dbscan_min_samples"]).fit( - # np.hstack([data[:, 0:1], locs[:, :2] / np.average(vel["p"])]), - # sample_weight=np.squeeze(phase_weight), - # ) - # labels = db.labels_ - labels = hierarchical_dbscan_clustering( - data, - locs, - phase_type, - phase_weight, - vel, - eps=config["dbscan_eps"], - min_samples=config["dbscan_min_samples"], - ) - unique_labels = set(labels) - unique_labels = unique_labels.difference([-1]) - else: - labels = np.zeros(len(data)) - unique_labels = [0] - - if "ncpu" not in config: - config["ncpu"] = max(1, min(len(unique_labels) // 4, min(32, mp.cpu_count() - 1))) - else: - config["ncpu"] = min(mp.cpu_count(), config["ncpu"]) - - if config["ncpu"] == 1: - print(f"Associating {len(data)} picks with {config['ncpu']} CPUs") - event_idx = 0 - events, assignment = [], [] - for unique_label in list(unique_labels): - events_, assignment_ = associate( - unique_label, - labels, + try: + if ("use_dbscan" in config) and config["use_dbscan"]: + # db = DBSCAN(eps=config["dbscan_eps"], min_samples=config["dbscan_min_samples"]).fit( + # np.hstack([data[:, 0:1], locs[:, :2] / np.average(vel["p"])]), + # sample_weight=np.squeeze(phase_weight), + # ) + # labels = db.labels_ + labels = hierarchical_dbscan_clustering( data, locs, phase_type, phase_weight, - pick_idx, - pick_station_id, - config, - timestamp0, vel, - method, - event_idx, + eps=config["dbscan_eps"], + min_samples=config["dbscan_min_samples"], ) - event_idx += len(events_) - events.extend(events_) - assignment.extend(assignment_) - else: - manager = mp.Manager() - lock = manager.Lock() - # event_idx0 - 1 as event_idx is increased before use - event_idx = manager.Value("i", event_idx0 - 1) + unique_labels = set(labels) + unique_labels = unique_labels.difference([-1]) + else: + labels = np.zeros(len(data)) + unique_labels = [0] - print(f"Associating {len(unique_labels)} clusters with {config['ncpu']} CPUs") + if "ncpu" not in config: + config["ncpu"] = max(1, min(len(unique_labels) // 4, min(32, mp.cpu_count() - 1))) + else: + config["ncpu"] = min(mp.cpu_count(), config["ncpu"]) - # the following sort and shuffle is to make sure jobs are distributed evenly - counter = Counter(labels) - unique_labels = sorted(unique_labels, key=lambda x: counter[x], reverse=True) - np.random.shuffle(unique_labels) + if config["ncpu"] == 1: + print(f"Associating {len(data)} picks with {config['ncpu']} CPUs") + event_idx = 0 + events, assignment = [], [] + for unique_label in list(unique_labels): + events_, assignment_ = associate( + unique_label, + labels, + data, + locs, + phase_type, + phase_weight, + pick_idx, + pick_station_id, + config, + timestamp0, + vel, + method, + event_idx, + ) + event_idx += len(events_) + events.extend(events_) + assignment.extend(assignment_) + + # release each batch result after processing + del events_, assignment_ + else: + manager = mp.Manager() + lock = manager.Lock() + # event_idx0 - 1 as event_idx is increased before use + event_idx = manager.Value("i", event_idx0 - 1) - # the default chunk_size is len(unique_labels)//(config["ncpu"]*4), which makes some jobs very heavy - chunk_size = max(len(unique_labels) // (config["ncpu"] * 20), 1) + print(f"Associating {len(unique_labels)} clusters with {config['ncpu']} CPUs") - # Check for OS to start a child process in multiprocessing - # https://superfastpython.com/multiprocessing-context-in-python/ - if platform.system().lower() in ["darwin", "windows"]: - context = "spawn" - else: - context = "fork" + # the following sort and shuffle is to make sure jobs are distributed evenly + counter = Counter(labels) + unique_labels = sorted(unique_labels, key=lambda x: counter[x], reverse=True) + np.random.shuffle(unique_labels) - with mp.get_context(context).Pool(config["ncpu"]) as p: - results = p.starmap( - associate, - [ - [ - k, - labels, - data, - locs, - phase_type, - phase_weight, - pick_idx, - pick_station_id, - config, - timestamp0, - vel, - method, - event_idx, - lock, - ] - for k in unique_labels - ], - chunksize=chunk_size, - ) - # resuts is a list of tuples, each tuple contains two lists events and assignment - # here we flatten the list of tuples into two lists - events, assignment = [], [] - for each_events, each_assignment in results: - events.extend(each_events) - assignment.extend(each_assignment) + # the default chunk_size is len(unique_labels)//(config["ncpu"]*4), which makes some jobs very heavy + chunk_size = min(max(len(unique_labels) // (config["ncpu"] * 20), 1), 200) - return events, assignment # , event_idx.value + # Check for OS to start a child process in multiprocessing + # https://superfastpython.com/multiprocessing-context-in-python/ + if platform.system().lower() in ["darwin", "windows"]: + context = "spawn" + else: + context = "fork" + + with mp.get_context(context).Pool(config["ncpu"]) as p: + results = p.starmap( + associate, + [ + [ + k, + labels, + data, + locs, + phase_type, + phase_weight, + pick_idx, + pick_station_id, + config, + timestamp0, + vel, + method, + event_idx, + lock, + ] + for k in unique_labels + ], + chunksize=chunk_size, + ) + # resuts is a list of tuples, each tuple contains two lists events and assignment + # here we flatten the list of tuples into two lists + events, assignment = [], [] + for each_events, each_assignment in results: + events.extend(each_events) + assignment.extend(each_assignment) + + # Release each batch result after processing + del each_events, each_assignment + del results + + return events, assignment # , event_idx.value + + finally: + # Clean up large arrays + del labels, data, locs, phase_type, phase_weight, pick_idx, pick_station_id def associate( @@ -276,228 +318,259 @@ def associate( ): print(".", end="") - data_ = data[labels == k] - locs_ = locs[labels == k] - phase_type_ = phase_type[labels == k] - phase_weight_ = phase_weight[labels == k] - pick_idx_ = pick_idx[labels == k] - pick_station_id_ = pick_station_id[labels == k] + try: + # Create mask and filtered data + mask = labels == k + data_ = data[mask] + locs_ = locs[mask] + phase_type_ = phase_type[mask] + phase_weight_ = phase_weight[mask] + pick_idx_ = pick_idx[mask] + pick_station_id_ = pick_station_id[mask] - max_num_event = max(Counter(pick_station_id_).values()) + max_num_event = max(Counter(pick_station_id_).values()) - if len(pick_idx_) < max(3, config["min_picks_per_eq"]): - return [], [] + if len(pick_idx_) < max(3, config["min_picks_per_eq"]): + return [], [] - time_range = max(data_[:, 0].max() - data_[:, 0].min(), 1) - if config["use_amplitude"]: - amp_range = max(data_[:, 1].max() - data_[:, 1].min(), 1) + time_range = max(data_[:, 0].max() - data_[:, 0].min(), 1) + if config["use_amplitude"]: + amp_range = max(data_[:, 1].max() - data_[:, 1].min(), 1) - ## initialization with [1,1,1] horizontal points and N time points - centers_init = init_centers(config, data_, locs_, time_range, max_num_event) + ## initialization with [1,1,1] horizontal points and N time points + centers_init = init_centers(config, data_, locs_, time_range, max_num_event) - ## run clustering - # mean_precision_prior = 0.01 / time_range - if "covariance_prior" in config: - covariance_prior_pre = config["covariance_prior"] - else: - # covariance_prior_pre = [5.0, 2.0] - ## TODO: design a smark way to set covariance_prior - weight = np.squeeze(phase_weight_) - x_mean = np.average(locs_[:, 0], weights=weight) - y_mean = np.average(locs_[:, 1], weights=weight) - x_std = np.sqrt(np.average((locs_[:, 0] - x_mean) ** 2, weights=weight)) - y_std = np.sqrt(np.average((locs_[:, 1] - y_mean) ** 2, weights=weight)) - # x_std = np.std(locs_[:, 0]) - # y_std = np.std(locs_[:, 1]) - # t_std = np.std(data_[:, 0]) - ## option 1 - # scaler = max(np.sqrt(x_std**2 + y_std**2) / 6.0 , 0.1) - ## option 2 - # scaler = max(np.sqrt(x_std**2 + y_std**2) / 6.0 / t_std, 0.1) * 10 - ## option 3 - # d, v = 50, 6.0 - # scaler = max((np.exp(np.sqrt(x_std**2 + y_std**2)/d) - 1)/(np.exp(1) - 1) * d / v / t_std, 0.2) * 10 - ## option 4 - rstd = np.sqrt(x_std**2 + y_std**2) - # scaler = max(10.0, (rstd / 6.0) * (rstd / 60.0)) # 6.0 km/s, 60 km - scaler = max(1.0, (rstd / 6.0) * (rstd / 30.0)) # 6.0 km/s, 30 km - if config["use_amplitude"]: - # covariance_prior_pre = [time_range * 10.0, amp_range * 10.0] - covariance_prior_pre = [scaler, scaler] + ## run clustering + # mean_precision_prior = 0.01 / time_range + if "covariance_prior" in config: + covariance_prior_pre = config["covariance_prior"] else: - # covariance_prior_pre = [time_range * 10.0] - covariance_prior_pre = [scaler] - # print(f"covariance_prior_pre: {covariance_prior_pre}") - if config["use_amplitude"]: - covariance_prior = np.array([[covariance_prior_pre[0], 0.0], [0.0, covariance_prior_pre[1]]]) - else: - covariance_prior = np.array([[covariance_prior_pre[0]]]) - data_ = data_[:, 0:1] - - if method == "BGMM": - gmm = BayesianGaussianMixture( - n_components=len(centers_init), - weight_concentration_prior=1.0 / len(centers_init), - # mean_precision_prior=mean_precision_prior, - covariance_prior=covariance_prior, - init_params="centers", - centers_init=centers_init.copy(), - station_locs=locs_, - phase_type=phase_type_, - phase_weight=phase_weight_, - vel=vel, - eikonal=config["eikonal"], - bounds=config["bfgs_bounds"], - ).fit(data_) - elif method == "GMM": - gmm = GaussianMixture( - n_components=len(centers_init) + 1, - init_params="centers", - centers_init=centers_init.copy(), - station_locs=locs_, - phase_type=phase_type_, - phase_weight=phase_weight_, - vel=vel, - eikonal=config["eikonal"], - bounds=config["bfgs_bounds"], - dummy_comp=True, - dummy_prob=1 / (1 * np.sqrt(2 * np.pi)) * np.exp(-1 / 2), - dummy_quantile=0.1, - ).fit(data_) - else: - raise (f"Unknown method {method}; Should be 'BGMM' or 'GMM'") - - ## run prediction - pred = gmm.predict(data_) - prob = np.exp(gmm.score_samples(data_)) - prob_matrix = gmm.predict_proba(data_) - prob_eq = prob_matrix.sum(axis=0) - # prob = prob_matrix[range(len(data_)), pred] - # score = gmm.score(data_) - # score_sample = gmm.score_samples(data_) - - ## filtering - events = [] - assignment = [] - - for i in range(len(centers_init)): - tmp_data = data_[pred == i] - tmp_locs = locs_[pred == i] - tmp_pick_station_id = pick_station_id_[pred == i] - tmp_phase_type = phase_type_[pred == i] - # tmp_phase_weight = phase_weight_[pred == i] - if (len(tmp_data) == 0) or (len(tmp_data) < config["min_picks_per_eq"]): - # if (len(tmp_data) == 0) or (np.sum(tmp_phase_weight) < config["min_picks_per_eq"]): - continue - # idx_filter = np.ones(len(tmp_data)).astype(bool) - - ## filter by time - t_ = calc_time( - gmm.centers_[i : i + 1, : len(config["dims"]) + 1], - tmp_locs, - tmp_phase_type, - vel=vel, - eikonal=config["eikonal"], - ) - diff_t = np.abs(t_ - tmp_data[:, 0:1]) - idx_t = (diff_t < config["max_sigma11"]).squeeze(axis=1) - idx_filter = idx_t - if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: - # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: - continue - - ## filter multiple picks at the same station - unique_sta_id = {} - for j, k in enumerate(tmp_pick_station_id): - if (k not in unique_sta_id) or (diff_t[j] < unique_sta_id[k][1]): - unique_sta_id[k] = (j, diff_t[j]) - idx_s = np.zeros(len(idx_t)).astype(bool) ## based on station - for k in unique_sta_id: - idx_s[unique_sta_id[k][0]] = True - idx_filter = idx_filter & idx_s - if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: - # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: - continue - gmm.covariances_[i, 0, 0] = np.mean((diff_t[idx_t]) ** 2) - - ## filter by amplitude + # covariance_prior_pre = [5.0, 2.0] + ## TODO: design a smark way to set covariance_prior + weight = np.squeeze(phase_weight_) + x_mean = np.average(locs_[:, 0], weights=weight) + y_mean = np.average(locs_[:, 1], weights=weight) + x_std = np.sqrt(np.average((locs_[:, 0] - x_mean) ** 2, weights=weight)) + y_std = np.sqrt(np.average((locs_[:, 1] - y_mean) ** 2, weights=weight)) + # x_std = np.std(locs_[:, 0]) + # y_std = np.std(locs_[:, 1]) + # t_std = np.std(data_[:, 0]) + ## option 1 + # scaler = max(np.sqrt(x_std**2 + y_std**2) / 6.0 , 0.1) + ## option 2 + # scaler = max(np.sqrt(x_std**2 + y_std**2) / 6.0 / t_std, 0.1) * 10 + ## option 3 + # d, v = 50, 6.0 + # scaler = max((np.exp(np.sqrt(x_std**2 + y_std**2)/d) - 1)/(np.exp(1) - 1) * d / v / t_std, 0.2) * 10 + ## option 4 + rstd = np.sqrt(x_std**2 + y_std**2) + # scaler = max(10.0, (rstd / 6.0) * (rstd / 60.0)) # 6.0 km/s, 60 km + scaler = max(1.0, (rstd / 6.0) * (rstd / 30.0)) # 6.0 km/s, 30 km + if config["use_amplitude"]: + # covariance_prior_pre = [time_range * 10.0, amp_range * 10.0] + covariance_prior_pre = [scaler, scaler] + else: + # covariance_prior_pre = [time_range * 10.0] + covariance_prior_pre = [scaler] + # print(f"covariance_prior_pre: {covariance_prior_pre}") if config["use_amplitude"]: - a_ = calc_amp( - gmm.centers_[i : i + 1, len(config["dims"]) + 1 : len(config["dims"]) + 2], - gmm.centers_[i : i + 1, : len(config["dims"]) + 1], - tmp_locs, - ) - diff_a = np.abs(a_ - tmp_data[:, 1:2]) - idx_a = (diff_a < config["max_sigma22"]).squeeze() - idx_filter = idx_filter & idx_a - if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: - # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: - continue - - if "max_sigma12" in config: - idx_cov = np.abs(gmm.covariances_[i, 0, 1]) < config["max_sigma12"] - idx_filter = idx_filter & idx_cov + covariance_prior = np.array([[covariance_prior_pre[0], 0.0], [0.0, covariance_prior_pre[1]]]) + else: + covariance_prior = np.array([[covariance_prior_pre[0]]]) + data_ = data_[:, 0:1] + + if method == "BGMM": + gmm = BayesianGaussianMixture( + n_components=len(centers_init), + weight_concentration_prior=1.0 / len(centers_init), + # mean_precision_prior=mean_precision_prior, + covariance_prior=covariance_prior, + init_params="centers", + centers_init=centers_init.copy(), + station_locs=locs_, + phase_type=phase_type_, + phase_weight=phase_weight_, + vel=vel, + eikonal=config["eikonal"], + bounds=config["bfgs_bounds"], + ).fit(data_) + elif method == "GMM": + gmm = GaussianMixture( + n_components=len(centers_init) + 1, + init_params="centers", + centers_init=centers_init.copy(), + station_locs=locs_, + phase_type=phase_type_, + phase_weight=phase_weight_, + vel=vel, + eikonal=config["eikonal"], + bounds=config["bfgs_bounds"], + dummy_comp=True, + dummy_prob=1 / (1 * np.sqrt(2 * np.pi)) * np.exp(-1 / 2), + dummy_quantile=0.1, + ).fit(data_) + else: + raise (f"Unknown method {method}; Should be 'BGMM' or 'GMM'") + + ## run prediction + pred = gmm.predict(data_) + prob = np.exp(gmm.score_samples(data_)) + prob_matrix = gmm.predict_proba(data_) + prob_eq = prob_matrix.sum(axis=0) + # prob = prob_matrix[range(len(data_)), pred] + # score = gmm.score(data_) + # score_sample = gmm.score_samples(data_) + + ## filtering + events = [] + assignment = [] + + for i in range(len(centers_init)): + try: + mask_i = pred == i + tmp_data = data_[mask_i] + tmp_locs = locs_[mask_i] + tmp_pick_station_id = pick_station_id_[mask_i] + tmp_phase_type = phase_type_[mask_i] + # tmp_phase_weight = phase_weight_[mask_i] + if (len(tmp_data) == 0) or (len(tmp_data) < config["min_picks_per_eq"]): + # if (len(tmp_data) == 0) or (np.sum(tmp_phase_weight) < config["min_picks_per_eq"]): + continue + # idx_filter = np.ones(len(tmp_data)).astype(bool) + + ## filter by time + t_ = calc_time( + gmm.centers_[i : i + 1, : len(config["dims"]) + 1], + tmp_locs, + tmp_phase_type, + vel=vel, + eikonal=config["eikonal"], + ) + diff_t = np.abs(t_ - tmp_data[:, 0:1]) + idx_t = (diff_t < config["max_sigma11"]).squeeze(axis=1) + idx_filter = idx_t if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: continue - gmm.covariances_[i, 1, 1] = np.mean((diff_a[idx_a]) ** 2) - - if "min_p_picks_per_eq" in config: - if len(tmp_data[idx_filter & (tmp_phase_type == "p")]) < config["min_p_picks_per_eq"]: - # if np.sum(tmp_phase_weight[idx_filter & (tmp_phase_type == "p")]) < config["min_p_picks_per_eq"]: - continue - if "min_s_picks_per_eq" in config: - if len(tmp_data[idx_filter & (tmp_phase_type == "s")]) < config["min_s_picks_per_eq"]: - # if np.sum(tmp_phase_weight[idx_filter & (tmp_phase_type == "s")]) < config["min_s_picks_per_eq"]: - continue - if "min_stations" in config: - if len(np.unique(tmp_locs[idx_filter], axis=0)) < config["min_stations"]: - continue - - - if lock is not None: - with lock: - if not isinstance(event_idx, int): - event_idx.value += 1 - event_idx_value = event_idx.value + ## filter multiple picks at the same station + unique_sta_id = {} + for j, k in enumerate(tmp_pick_station_id): + if (k not in unique_sta_id) or (diff_t[j] < unique_sta_id[k][1]): + unique_sta_id[k] = (j, diff_t[j]) + idx_s = np.zeros(len(idx_t)).astype(bool) ## based on station + for k in unique_sta_id: + idx_s[unique_sta_id[k][0]] = True + idx_filter = idx_filter & idx_s + if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: + # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: + continue + gmm.covariances_[i, 0, 0] = np.mean((diff_t[idx_t]) ** 2) + + ## filter by amplitude + if config["use_amplitude"]: + a_ = calc_amp( + gmm.centers_[i : i + 1, len(config["dims"]) + 1 : len(config["dims"]) + 2], + gmm.centers_[i : i + 1, : len(config["dims"]) + 1], + tmp_locs, + ) + diff_a = np.abs(a_ - tmp_data[:, 1:2]) + idx_a = (diff_a < config["max_sigma22"]).squeeze() + idx_filter = idx_filter & idx_a + if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: + # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: + continue + + if "max_sigma12" in config: + idx_cov = np.abs(gmm.covariances_[i, 0, 1]) < config["max_sigma12"] + idx_filter = idx_filter & idx_cov + if len(tmp_data[idx_filter]) < config["min_picks_per_eq"]: + # if np.sum(tmp_phase_weight[idx_filter]) < config["min_picks_per_eq"]: + continue + + gmm.covariances_[i, 1, 1] = np.mean((diff_a[idx_a]) ** 2) + + if "min_p_picks_per_eq" in config: + if len(tmp_data[idx_filter & (tmp_phase_type == "p")]) < config["min_p_picks_per_eq"]: + # if np.sum(tmp_phase_weight[idx_filter & (tmp_phase_type == "p")]) < config["min_p_picks_per_eq"]: + continue + if "min_s_picks_per_eq" in config: + if len(tmp_data[idx_filter & (tmp_phase_type == "s")]) < config["min_s_picks_per_eq"]: + # if np.sum(tmp_phase_weight[idx_filter & (tmp_phase_type == "s")]) < config["min_s_picks_per_eq"]: + continue + if "min_stations" in config: + if len(np.unique(tmp_locs[idx_filter], axis=0)) < config["min_stations"]: + continue + + + if lock is not None: + with LockWithTimeout(lock) as lock_with_timeout: + if lock_with_timeout is None: + print(f"\nFailed to acquire lock for cluster {k}, skipping event") + continue + + if not isinstance(event_idx, int): + event_idx.value += 1 + event_idx_value = event_idx.value + else: + event_idx += 1 + event_idx_value = event_idx else: - event_idx += 1 - event_idx_value = event_idx - else: - if not isinstance(event_idx, int): - event_idx.value += 1 - event_idx_value = event_idx.value - else: - event_idx += 1 - event_idx_value = event_idx - - event = { - # "time": from_seconds(gmm.centers_[i, len(config["dims"])]), - "time": datetime.utcfromtimestamp(gmm.centers_[i, len(config["dims"])] + timestamp0).isoformat( - timespec="milliseconds" - ), - # "time(s)": gmm.centers_[i, len(config["dims"])], - "magnitude": gmm.centers_[i, len(config["dims"]) + 1] if config["use_amplitude"] else 999, - "sigma_time": np.sqrt(gmm.covariances_[i, 0, 0]), - "sigma_amp": np.sqrt(gmm.covariances_[i, 1, 1]) if config["use_amplitude"] else 0, - "cov_time_amp": gmm.covariances_[i, 0, 1] if config["use_amplitude"] else 0, - "gamma_score": prob_eq[i], - "num_picks": len(tmp_data[idx_filter]), - "num_p_picks": len(tmp_data[idx_filter & (tmp_phase_type == "p")]), - "num_s_picks": len(tmp_data[idx_filter & (tmp_phase_type == "s")]), - "event_index": event_idx_value, - } - for j, k in enumerate(config["dims"]): ## add location - event[k] = gmm.centers_[i, j] - events.append(event) - - for pi, pr in zip(pick_idx_[pred == i][idx_filter], prob): - assignment.append((pi, event_idx_value, pr)) - - if (event_idx_value + 1) % 100 == 0: - print(f"\nAssociated {event_idx_value + 1} events") - return events, assignment + if not isinstance(event_idx, int): + event_idx.value += 1 + event_idx_value = event_idx.value + else: + event_idx += 1 + event_idx_value = event_idx + + event = { + # "time": from_seconds(gmm.centers_[i, len(config["dims"])]), + "time": datetime.utcfromtimestamp(gmm.centers_[i, len(config["dims"])] + timestamp0).isoformat( + timespec="milliseconds" + ), + # "time(s)": gmm.centers_[i, len(config["dims"])], + "magnitude": gmm.centers_[i, len(config["dims"]) + 1] if config["use_amplitude"] else 999, + "sigma_time": np.sqrt(gmm.covariances_[i, 0, 0]), + "sigma_amp": np.sqrt(gmm.covariances_[i, 1, 1]) if config["use_amplitude"] else 0, + "cov_time_amp": gmm.covariances_[i, 0, 1] if config["use_amplitude"] else 0, + "gamma_score": prob_eq[i], + "num_picks": len(tmp_data[idx_filter]), + "num_p_picks": len(tmp_data[idx_filter & (tmp_phase_type == "p")]), + "num_s_picks": len(tmp_data[idx_filter & (tmp_phase_type == "s")]), + "event_index": event_idx_value, + } + + for j, k in enumerate(config["dims"]): ## add location + event[k] = gmm.centers_[i, j] + events.append(event) + + filtered_pick_idx = pick_idx_[mask_i][idx_filter] + assignment.extend(zip(filtered_pick_idx, [event_idx_value] * len(filtered_pick_idx), prob)) + # for pi, pr in zip(pick_idx_[mask_i][idx_filter], prob): + # assignment.append((pi, event_idx_value, pr)) + + if (event_idx_value + 1) % 100 == 0: + print(f"\nAssociated {event_idx_value + 1} events") + + finally: + # Clean up temporary arrays in the loop + del tmp_data, tmp_locs, tmp_pick_station_id, tmp_phase_type + if config["use_amplitude"] and 'a_' in locals(): + del a_, diff_a, idx_a + if 't_' in locals(): + del t_, diff_t, idx_t, idx_filter + + return events, assignment + + finally: + # Release large arrays + del data_, locs_, phase_type_, phase_weight_, pick_idx_, pick_station_id_ + for var in ['pred', 'prob', 'prob_matrix', 'prob_eq', 'gmm']: + try: + del locals()[var] + except KeyError: + pass + # def init_centers(config, data_, locs_, time_range, max_num_event=1): @@ -592,7 +665,7 @@ def init_centers(config, data_, locs_, time_range, max_num_event=1): # x_init = np.broadcast_to(x_init, (num_t_init)).reshape(-1) # y_init = np.broadcast_to(y_init, (num_t_init)).reshape(-1) z_init = np.linspace(config["z(km)"][0], config["z(km)"][1], initial_points[2] + 2)[1:-1] - z_init = np.broadcast_to(z_init, (num_t_init)).reshape(-1) + z_init = np.tile(z_init, num_t_init)[:num_t_init] if config["dims"] == ["x(km)", "y(km)", "z(km)"]: centers_init = np.vstack([x_init, y_init, z_init, t_init]).T