diff --git a/methods/matching/find_pairs.py b/methods/matching/find_pairs.py index 7f17782..c1f7d96 100644 --- a/methods/matching/find_pairs.py +++ b/methods/matching/find_pairs.py @@ -1,13 +1,16 @@ import argparse +import math import os import logging from functools import partial from multiprocessing import Pool, cpu_count, set_start_method -from numba import jit # type: ignore + import numpy as np import pandas as pd +from numba import jit from methods.common.luc import luc_matching_columns +from methods.utils.kd_tree import make_kdrangetree, make_rumba_tree REPEAT_MATCH_FINDING = 100 DEFAULT_DISTANCE = 10000000.0 @@ -35,12 +38,7 @@ def find_match_iteration( logging.info("Loading K from %s", k_parquet_filename) - # Methodology 6.5.7: For a 10% sample of K k_set = pd.read_parquet(k_parquet_filename) - k_subset = k_set.sample( - frac=0.1, - random_state=rng - ).reset_index() logging.info("Loading M from %s", m_parquet_filename) m_set = pd.read_parquet(m_parquet_filename) @@ -61,11 +59,21 @@ def find_match_iteration( logging.info("Preparing s_set...") m_dist_thresholded_df = m_set[DISTANCE_COLUMNS] / thresholds_for_columns - k_subset_dist_thresholded_df = k_subset[DISTANCE_COLUMNS] / thresholds_for_columns + k_set_dist_thresholded_df = k_set[DISTANCE_COLUMNS] / thresholds_for_columns + + # Rearrange columns by variance so we throw out the least likely to match first + # except the bottom three which are deforestation CPCs and have more cross-variance between K and M + variances = np.std(m_dist_thresholded_df, axis=0) + cols = DISTANCE_COLUMNS + order = np.argsort(-variances.to_numpy()) + order = np.roll(order, 3) + new_cols = [cols[o] for o in order] + m_dist_thresholded_df = m_dist_thresholded_df[new_cols] + k_set_dist_thresholded_df = k_set_dist_thresholded_df[new_cols] # convert to float32 numpy arrays and make them contiguous for numba to vectorise m_dist_thresholded = np.ascontiguousarray(m_dist_thresholded_df, dtype=np.float32) - k_subset_dist_thresholded = np.ascontiguousarray(k_subset_dist_thresholded_df, dtype=np.float32) + k_set_dist_thresholded = np.ascontiguousarray(k_set_dist_thresholded_df, dtype=np.float32) # LUC columns are all named with the year in, so calculate the column names # for the years we are intested in @@ -76,32 +84,50 @@ def find_match_iteration( hard_match_columns = ['country', 'ecoregion', luc10, luc5, luc0] assert len(hard_match_columns) == HARD_COLUMN_COUNT - # similar to the above, make the hard match columns contiguous float32 numpy arrays - m_dist_hard = np.ascontiguousarray(m_set[hard_match_columns].to_numpy()).astype(np.int32) - k_subset_dist_hard = np.ascontiguousarray(k_subset[hard_match_columns].to_numpy()).astype(np.int32) + # Find categories in K + hard_match_categories = [k[hard_match_columns].to_numpy() for _, k in k_set.iterrows()] + hard_match_categories = {k.tobytes(): k for k in hard_match_categories} + no_potentials = [] - # Methodology 6.5.5: S should be 10 times the size of K, in order to achieve this for every - # pixel in the subsample (which is 10% the size of K) we select 100 pixels. - required = 100 + # Methodology 6.5.5: S should be 10 times the size of K + required = 10 logging.info("Running make_s_set_mask... required: %d", required) - starting_positions = rng.integers(0, int(m_dist_thresholded.shape[0]), int(k_subset_dist_thresholded.shape[0])) - s_set_mask_true, no_potentials = make_s_set_mask( - m_dist_thresholded, - k_subset_dist_thresholded, - m_dist_hard, - k_subset_dist_hard, - starting_positions, - required - ) + + s_set_mask_true = np.zeros(m_set.shape[0], dtype=np.bool_) + no_potentials = np.zeros(k_set.shape[0], dtype=np.bool_) + + # Split K and M into those categories and create masks + for values in hard_match_categories.values(): + k_selector = np.all(k_set[hard_match_columns] == values, axis=1) + m_selector = np.all(m_set[hard_match_columns] == values, axis=1) + logging.info(" category: %a |K|: %d |M|: %d", values, k_selector.sum(), m_selector.sum()) + # Make masks for each of those pairs + key_s_set_mask_true, key_no_potentials = make_s_set_mask( + m_dist_thresholded[m_selector], + k_set_dist_thresholded[k_selector], + required, + rng + ) + # Merge into one s_set_mask_true + s_set_mask_true[m_selector] = key_s_set_mask_true + # Merge into no_potentials + no_potentials[k_selector] = key_no_potentials logging.info("Done make_s_set_mask. s_set_mask.shape: %a", {s_set_mask_true.shape}) s_set = m_set[s_set_mask_true] + logging.info("Finished preparing s_set. shape: %a", {s_set.shape}) potentials = np.invert(no_potentials) - k_subset = k_subset[potentials] - logging.info("Finished preparing s_set. shape: %a", {s_set.shape}) + # Methodology 6.5.7: For a 10% sample of K + k_subset = k_set.sample( + frac=0.1, + random_state=rng + ) + k_subset = k_subset[k_subset.apply(lambda row: potentials[row.name], axis=1)] + k_subset.reset_index() + logging.info("Finished preparing k_subset. shape: %a", {k_subset.shape}) # Notes: # 1. Not all pixels may have matches @@ -173,56 +199,51 @@ def find_match_iteration( logging.info("Finished find match iteration") -@jit(nopython=True, fastmath=True, error_model="numpy") def make_s_set_mask( m_dist_thresholded: np.ndarray, - k_subset_dist_thresholded: np.ndarray, - m_dist_hard: np.ndarray, - k_subset_dist_hard: np.ndarray, - starting_positions: np.ndarray, - required: int + k_set_dist_thresholded: np.ndarray, + required: int, + rng: np.random.Generator ): + k_size = k_set_dist_thresholded.shape[0] m_size = m_dist_thresholded.shape[0] - k_size = k_subset_dist_thresholded.shape[0] s_include = np.zeros(m_size, dtype=np.bool_) k_miss = np.zeros(k_size, dtype=np.bool_) - for k in range(k_size): - matches = 0 - k_row = k_subset_dist_thresholded[k, :] - k_hard = k_subset_dist_hard[k] + m_sets = max(1, min(100, math.floor(m_size // 1e6), math.ceil(m_size / (k_size * required * 10)))) - for index in range(m_size): - m_index = (index + starting_positions[k]) % m_size + m_lookup = np.arange(m_size) + rng.shuffle(m_lookup) + m_step = math.ceil(m_size / m_sets) - m_row = m_dist_thresholded[m_index, :] - m_hard = m_dist_hard[m_index] + def m_index(m_set: int, pos: int): + return m_lookup[m_set * m_step + pos] + def m_indexes(m_set: int): + return m_lookup[m_set * m_step:(m_set + 1) * m_step] - should_include = True + m_trees = [make_kdrangetree(m_dist_thresholded[m_indexes(m_set)], np.ones(m_dist_thresholded.shape[1])) for m_set in range(m_sets)] - # check that every element of m_hard matches k_hard - hard_equals = True - for j in range(m_hard.shape[0]): - if m_hard[j] != k_hard[j]: - hard_equals = False + rumba_trees = [make_rumba_tree(m_tree, m_dist_thresholded) for m_tree in m_trees] - if not hard_equals: - should_include = False + for k in range(k_size): + k_row = k_set_dist_thresholded[k] + m_order = np.arange(m_sets) + rng.shuffle(m_order) + possible_s = [] + for m_set in m_order: + next_possible_s = rumba_trees[m_set].members_sample(k_row, required, rng) + if possible_s is None: + possible_s = [m_index(m_set, s) for s in next_possible_s] else: - for j in range(m_row.shape[0]): - if abs(m_row[j] - k_row[j]) > 1.0: - should_include = False - - if should_include: - s_include[m_index] = True - matches += 1 - - # Don't find any more M's - if matches == required: + take = min(required - len(possible_s), len(next_possible_s)) + possible_s[len(possible_s):len(possible_s)+take] = [m_index(m_set, s) for s in next_possible_s[0:take]] + if len(possible_s) == required: break - - k_miss[k] = matches == 0 + if len(possible_s) == 0: + k_miss[k] = True + else: + s_include[possible_s] = True return s_include, k_miss @@ -278,6 +299,8 @@ def greedy_match( results.append((k_idx, min_dist_idx)) s_available[min_dist_idx] = False total_available -= 1 + else: + matchless.append(k_idx) else: matchless.append(k_idx) diff --git a/methods/utils/kd_tree.py b/methods/utils/kd_tree.py new file mode 100644 index 0000000..b44d3b9 --- /dev/null +++ b/methods/utils/kd_tree.py @@ -0,0 +1,343 @@ +import math +from typing import List + +import numpy as np +from numba import float32, int32 # type: ignore +from numba.experimental import jitclass + + +class KDTree: + def __init__(self): + pass + + def contains(self, _range) -> bool: + raise NotImplemented() + + def depth(self) -> int: + return 1 + + def size(self) -> int: + return 1 + + def count(self) -> int: + return 0 + + def members(self, _range) -> np.ndarray: + raise NotImplemented() + + def dump(self, _space: str): + raise NotImplemented() + +class KDLeaf(KDTree): + def __init__(self, point, index): + self.point = point + self.index = index + def contains(self, range) -> bool: + return np.all(range[0] <= self.point) & np.all(range[1] >= self.point) # type: ignore + def members(self, range): + if self.contains(range): + return np.array([self.index]) + return np.empty(0, dtype=np.int_) + def dump(self, space: str): + print(space, f"point {self.point}") + def count(self): + return 1 + +class KDList(KDTree): + def __init__(self, points, indexes): + self.points = points + self.indexes = indexes + def contains(self, range) -> bool: + return np.any(np.all(range[0] <= self.points, axis=1) & np.all(range[1] >= self.points, axis=1)) # type: ignore + def members(self, range): + return self.indexes[np.all(range[0] <= self.points, axis=1) & np.all(range[1] >= self.points, axis=1)] + def dump(self, space: str): + print(space, f"points {self.points}") + def count(self): + return len(self.points) + +class KDSplit(KDTree): + def __init__(self, d: int, value: float, left: KDTree, right: KDTree): + self.d = d + self.value = value + self.left = left + self.right = right + def contains(self, range) -> bool: + l = self.value - range[0, self.d] # Amount on left side + r = range[1, self.d] - self.value # Amount on right side + # Either l or r must be positive, or both + # Pick the biggest first + if l >= r: + if self.left.contains(range): + return True + # Visit the rest if it is inside + if r >= 0: + if self.right.contains(range): + return True + else: + if self.right.contains(range): + return True + # Visit the rest if it is inside + if l >= 0: + if self.left.contains(range): + return True + return False + + def members(self, range): + result = None + if self.value >= range[0, self.d]: + result = self.left.members(range) + if range[1, self.d] >= self.value: + rights = self.right.members(range) + if result is None: + result = rights + else: + result = np.append(result, rights, axis=0) + return result if result is not None else np.empty(0, dtype=np.int_) + + def size(self) -> int: + return 1 + self.left.size() + self.right.size() + + def depth(self) -> int: + return 1 + max(self.left.depth(), self.right.depth()) + + def dump(self, space: str): + print(space, f"split d{self.d} at {self.value}") + print(space + " <") + self.left.dump(space + "\t") + print(space + " >") + self.right.dump(space + "\t") + def count(self): + return self.left.count() + self.right.count() + + +class KDRangeTree: + def __init__(self, tree, widths): + self.tree = tree + self.widths = widths + def contains(self, point) -> bool: + return self.tree.contains(np.array([point - self.widths, point + self.widths])) + def members(self, point) -> np.ndarray: + return self.tree.members(np.array([point - self.widths, point + self.widths])) + def dump(self, space: str): + self.tree.dump(space) + def size(self): + return self.tree.size() + def depth(self): + return self.tree.depth() + def count(self): + return self.tree.count() + +def make_kdrangetree(points, widths): + def make_kdtree_internal(points, indexes): + if len(points) == 1: + return KDLeaf(points[0], indexes[0]) + if len(points) < 30: + return KDList(points, indexes) + # Find split in dimension with most bins + dimensions = points.shape[1] + bins: float = None # type: ignore + chosen_d_min = 0 + chosen_d_max = 0 + chosen_d = 0 + for d in range(dimensions): + d_max = np.max(points[:, d]) + d_min = np.min(points[:, d]) + d_range = d_max - d_min + d_bins = d_range / widths[d] + if bins is None or d_bins > bins: + bins = d_bins + chosen_d = d + chosen_d_max = d_max + chosen_d_min = d_min + + if bins < 1.3: + # No split is very worthwhile, so dump points + return KDList(points, indexes) + + split_at = np.median(points[:, chosen_d]) + # Avoid degenerate cases + if split_at == chosen_d_max or split_at == chosen_d_min: + split_at = (chosen_d_max + chosen_d_min) / 2 + + left_side = points[:, chosen_d] <= split_at + right_side = ~left_side + lefts = points[left_side] + rights = points[right_side] + lefts_indexes = indexes[left_side] + rights_indexes = indexes[right_side] + return KDSplit(chosen_d, split_at, make_kdtree_internal(lefts, lefts_indexes), make_kdtree_internal(rights, rights_indexes)) + indexes = np.arange(len(points)) + return KDRangeTree(make_kdtree_internal(points, indexes), widths) + +@jitclass([('ds', int32[:]), ('values', float32[:]), ('items', int32[:]), ('lefts', int32[:]), ('rights', int32[:]), ('rows', float32[:, :]), ('dimensions', int32), ('widths', float32[:])]) +class RumbaTree: + def __init__(self, ds: np.ndarray, values: np.ndarray, items: np.ndarray, lefts: np.ndarray, rights: np.ndarray, rows: np.ndarray, dimensions: int, widths: np.ndarray): + self.ds = ds + self.values = values + self.items = items + self.lefts = lefts + self.rights = rights + self.rows = rows + self.dimensions = dimensions + self.widths = widths + def members(self, point: np.ndarray): + low = point - self.widths + high = point + self.widths + queue = [0] + finds = [] + while len(queue) > 0: + pos = queue.pop() + d = self.ds[pos] + value = self.values[pos] + if math.isnan(value): + i = d + item = self.items[i] + while item != -1: + # Check item + found = True + for d in range(self.dimensions): + value = self.rows[item, d] + if value < low[d]: + found = False + break + if value > high[d]: + found = False + break + if found: + finds.append(item) + i += 1 + item = self.items[i] + else: + if value <= high[d]: + queue.append(self.rights[pos]) + if value >= low[d]: + queue.append(self.lefts[pos]) + return finds + def count_members(self, point: np.ndarray): + low = point - self.widths + high = point + self.widths + queue = [0] + count = 0 + while len(queue) > 0: + pos = queue.pop() + d = self.ds[pos] + value = self.values[pos] + if math.isnan(value): + i = d + item = self.items[i] + while item != -1: + # Check item + found = True + for d in range(self.dimensions): + value = self.rows[item, d] + if value < low[d]: + found = False + break + if value > high[d]: + found = False + break + if found: + count += 1 + i += 1 + item = self.items[i] + else: + if value <= high[d]: + queue.append(self.rights[pos]) + if value >= low[d]: + queue.append(self.lefts[pos]) + return count + def members_sample(self, point: np.ndarray, count: int, rng: np.random.Generator): + low = point - self.widths + high = point + self.widths + queue = [0] + finds = [] + rand = rng.integers(0, 2**32) + while len(queue) > 0: + pos = queue.pop() + d = self.ds[pos] + value = self.values[pos] + if math.isnan(value): + i = d + item = self.items[i] + while item != -1: + # Check item + found = True + for d in range(self.dimensions): + value = self.rows[item, d] + if value < low[d]: + found = False + break + if value > high[d]: + found = False + break + if found: + if len(finds) < count: + finds.append(item) + else: + # Replace a random item in finds based on on-line search probability + pos = rand % len(finds) + rand *= 65539 + rand &= 0x7FFFFFFF + if pos < count: + finds[pos] = item + i += 1 + item = self.items[i] + else: + if value <= high[d]: + queue.append(self.rights[pos]) + if value >= low[d]: + queue.append(self.lefts[pos]) + return finds + +NAN = float('nan') +def make_rumba_tree(tree: KDRangeTree, rows: np.ndarray): + ds = [] + values = [] + items = [] + lefts = [] + rights = [] + widths = None + def recurse(node): + nonlocal widths + if isinstance(node, KDSplit): + pos = len(ds) + ds.append(node.d) + values.append(node.value) + lefts.append(pos + 1) # Next node we output will be left + rights.append(0xDEADBEEF) # Put placeholder here... + recurse(node.left) + rights[pos] = len(ds) # ..and fixup right to be the next node we output + recurse(node.right) + elif isinstance(node, KDList): + values.append(NAN) + ds.append(len(items)) + lefts.append(-1) # Specific invalid values for debugging an errors in tree build + rights.append(-2) + for item in node.indexes: + items.append(item) + items.append(-1) + elif isinstance(node, KDLeaf): + values.append(NAN) + ds.append(len(items)) + lefts.append(-3) + rights.append(-4) + items.append(node.index) + items.append(-1) + elif isinstance(node, KDRangeTree): + widths = node.widths + recurse(node.tree) + recurse(tree) + if widths is None: + raise ValueError(f"Expected KDRangeTree, got {tree}") + return RumbaTree( + np.array(ds, dtype=np.int32), + np.array(values, dtype=np.float32), + np.array(items, dtype=np.int32), + np.array(lefts, dtype=np.int32), + np.array(rights, dtype=np.int32), + np.ascontiguousarray(rows, dtype=np.float32), + rows.shape[1], + np.ascontiguousarray(widths, dtype=np.float32), + ) + diff --git a/secrets/rhm31.cap b/secrets/rhm31.cap new file mode 100644 index 0000000..b86ac15 --- /dev/null +++ b/secrets/rhm31.cap @@ -0,0 +1 @@ +capnp://sha-256:SgebTxjjj9SY29G4_I-cjSNyOBmHYq2LDYwNGb7JXWE@/tmp/ocurrent.sock/ITvMU-BpgDAdGZ63klCBtJ_dHYI \ No newline at end of file diff --git a/test/test_kd_tree.py b/test/test_kd_tree.py new file mode 100644 index 0000000..5ad6955 --- /dev/null +++ b/test/test_kd_tree.py @@ -0,0 +1,152 @@ +import math +from time import time +import numpy as np +import pandas as pd + +from methods.common.luc import luc_matching_columns +from methods.utils.kd_tree import KDRangeTree, KDTree, make_kdrangetree, make_rumba_tree + +ALLOWED_VARIATION = np.array([ + 200, + 2.5, + 10, + 0.1, + 0.1, + 0.1, + 0.1, + 0.1, + 0.1, +]) + +def test_kd_tree_matches_as_expected(): + def build_rects(items): + rects = [] + for item in items: + lefts = [] + rights = [] + for dimension, value in enumerate(item): + width = ALLOWED_VARIATION[dimension] + if width < 0: + fraction = -width + width = value * fraction + lefts.append(value - width) + rights.append(value + width) + rects.append([lefts, rights]) + return np.array(rects) + + expected_fraction = 1 / 100 # This proportion of pixels we end up matching + + def build_kdranged_tree_for_k(k_rows) -> KDRangeTree: + return make_kdrangetree(np.array([( + row.elevation, + row.slope, + row.access, + row["cpc0_u"], + row["cpc0_d"], + row["cpc5_u"], + row["cpc5_d"], + row["cpc10_u"], + row["cpc10_d"], + ) for row in k_rows + ]), ALLOWED_VARIATION) + + + luc0, luc5, luc10 = luc_matching_columns(2012) + source_pixels = pd.read_parquet("./test/data/1201-k.parquet") + + # Split source_pixels into classes + source_rows = [] + for _, row in source_pixels.iterrows(): + key = (int(row.ecoregion) << 16) | (int(row[luc0]) << 10) | (int(row[luc5]) << 5) | (int(row[luc10])) + if key != 1967137: + continue + source_rows.append(row) + + source = np.array([ + [ + row.elevation, + row.slope, + row.access, + row["cpc0_u"], + row["cpc0_d"], + row["cpc5_u"], + row["cpc5_d"], + row["cpc10_u"], + row["cpc10_d"], + ] for row in source_rows + ]) + + # Invent an array of values that matches the expected_fraction + length = 10000 + np.random.seed(42) + + ranges = np.transpose(np.array([ + np.min(source, axis=0), + np.max(source, axis=0) + ])) + + # Safe ranges (exclude 10% of outliers) + safe_ranges = np.transpose(np.array([ + np.quantile(source, 0.05, axis=0), + np.quantile(source, 0.95, axis=0) + ])) + + # Need to put an estimate here of how much of the area inside those 90% bounds is actually filled + filled_fraction = 0.775 + + # Proportion of values that should fall inside each dimension + inside_fraction = expected_fraction * math.pow(1 / filled_fraction, len(ranges)) + inside_length = math.ceil(length * inside_fraction) + inside_values = np.random.uniform(safe_ranges[:, 0], safe_ranges[:, 1], (inside_length, len(ranges))) + + widths = ranges[:, 1] - ranges[:, 0] + range_extension = 100 * widths # Width extension makes it very unlikely a random value will be inside + outside_ranges = np.transpose([ranges[:, 0] - range_extension, ranges[:, 1] + range_extension]) + + outside_length = length - inside_length + outside_values = np.random.uniform(outside_ranges[:, 0], outside_ranges[:, 1], (outside_length, len(ranges))) + + test_values = np.append(inside_values, outside_values, axis=0) + + def do_np_matching(): + source_rects = build_rects(source) + found = 0 + for i in range(length): + pos = np.all((test_values[i] >= source_rects[:, 0]) & (test_values[i] <= source_rects[:, 1]), axis=1) + found += 1 if np.any(pos) else 0 + return found + + def speed_of(what, func): + expected_finds = 946 + start = time() + value = func() + end = time() + assert value == expected_finds, f"Got wrong value {value} for method {what}, expected {expected_finds}" + print(what, ": ", (end - start) / length, "per call") + + print("making tree... (this will take a few seconds)") + start = time() + kd_tree = build_kdranged_tree_for_k(source_rows) + print("build time", time() - start) + print("tree depth", kd_tree.depth()) + print("tree size", kd_tree.size()) + + def do_kdrange_tree_matching(): + found = 0 + for i in range(length): + found += 1 if len(kd_tree.members(test_values[i])) > 0 else 0 + return found + + rumba_tree = make_rumba_tree(kd_tree, source) + + def do_rumba_tree_matching(): + found = 0 + for i in range(length): + found += 1 if len(rumba_tree.members(test_values[i])) > 0 else 0 + return found + + test_np_matching = False # This is slow but a useful check so I don't want to delete it + if test_np_matching: + speed_of("NP matching", do_np_matching) + speed_of("KD Tree matching", do_kdrange_tree_matching) + speed_of("Rumba matching", do_rumba_tree_matching)