From bffafd3c6e739619f5b8357473d8601cbcbdc058 Mon Sep 17 00:00:00 2001 From: Lixi Zhou Date: Fri, 25 Jul 2025 14:14:36 -0700 Subject: [PATCH] upload files --- velox/optimizer/python/model2Vec/CREDIT | 1 + .../python/model2Vec/database_util.py | 658 +++++++++++++ velox/optimizer/python/model2Vec/dataset.py | 434 +++++++++ velox/optimizer/python/model2Vec/model.py | 280 ++++++ velox/optimizer/python/model2Vec/trainer.py | 381 ++++++++ velox/optimizer/python/model2Vec/util.py | 58 ++ velox/optimizer/python/query2Vec/CREDIT | 1 + .../python/query2Vec/database_util.py | 915 ++++++++++++++++++ velox/optimizer/python/query2Vec/dataset.py | 469 +++++++++ velox/optimizer/python/query2Vec/model.py | 492 ++++++++++ velox/optimizer/python/query2Vec/trainer.py | 396 ++++++++ velox/optimizer/python/query2Vec/util.py | 51 + 12 files changed, 4136 insertions(+) create mode 100644 velox/optimizer/python/model2Vec/CREDIT create mode 100644 velox/optimizer/python/model2Vec/database_util.py create mode 100644 velox/optimizer/python/model2Vec/dataset.py create mode 100644 velox/optimizer/python/model2Vec/model.py create mode 100644 velox/optimizer/python/model2Vec/trainer.py create mode 100644 velox/optimizer/python/model2Vec/util.py create mode 100644 velox/optimizer/python/query2Vec/CREDIT create mode 100644 velox/optimizer/python/query2Vec/database_util.py create mode 100644 velox/optimizer/python/query2Vec/dataset.py create mode 100644 velox/optimizer/python/query2Vec/model.py create mode 100644 velox/optimizer/python/query2Vec/trainer.py create mode 100644 velox/optimizer/python/query2Vec/util.py diff --git a/velox/optimizer/python/model2Vec/CREDIT b/velox/optimizer/python/model2Vec/CREDIT new file mode 100644 index 000000000..271aa3219 --- /dev/null +++ b/velox/optimizer/python/model2Vec/CREDIT @@ -0,0 +1 @@ +CREDIT: The code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer diff --git a/velox/optimizer/python/model2Vec/database_util.py b/velox/optimizer/python/model2Vec/database_util.py new file mode 100644 index 000000000..2f4295775 --- /dev/null +++ b/velox/optimizer/python/model2Vec/database_util.py @@ -0,0 +1,658 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import numpy as np +import pandas as pd +import csv +import torch +import logging +import re +import json +import warnings +import faiss +import random +from tqdm.auto import tqdm + +def read_json(path): + with open(path, "r") as f: + return json.load(f) + + +## bfs shld be enough +def floyd_warshall_rewrite(adjacency_matrix): + (nrows, ncols) = adjacency_matrix.shape + assert nrows == ncols + M = adjacency_matrix.copy().astype("long") + for i in range(nrows): + for j in range(ncols): + if i == j: + M[i][j] = 0 + elif M[i][j] == 0: + M[i][j] = 60 + + for k in range(nrows): + for i in range(nrows): + for j in range(nrows): + M[i][j] = min(M[i][j], M[i][k] + M[k][j]) + return M + + +def get_job_table_sample(workload_file_name, num_materialized_samples=1000): + + tables = [] + samples = [] + + # Load queries + with open(workload_file_name + ".csv", "r") as f: + data_raw = list(list(rec) for rec in csv.reader(f, delimiter="#")) + for row in data_raw: + tables.append(row[0].split(",")) + + if int(row[3]) < 1: + print("Queries must have non-zero cardinalities") + exit(1) + + print("Loaded queries with len ", len(tables)) + + # Load bitmaps + num_bytes_per_bitmap = int((num_materialized_samples + 7) >> 3) + with open(workload_file_name + ".bitmaps", "rb") as f: + for i in range(len(tables)): + four_bytes = f.read(4) + if not four_bytes: + print("Error while reading 'four_bytes'") + exit(1) + num_bitmaps_curr_query = int.from_bytes(four_bytes, byteorder="little") + bitmaps = np.empty( + (num_bitmaps_curr_query, num_bytes_per_bitmap * 8), dtype=np.uint8 + ) + for j in range(num_bitmaps_curr_query): + # Read bitmap + bitmap_bytes = f.read(num_bytes_per_bitmap) + if not bitmap_bytes: + print("Error while reading 'bitmap_bytes'") + exit(1) + bitmaps[j] = np.unpackbits(np.frombuffer(bitmap_bytes, dtype=np.uint8)) + samples.append(bitmaps) + print("Loaded bitmaps") + table_sample = [] + for ts, ss in zip(tables, samples): + d = {} + for t, s in zip(ts, ss): + tf = t.split(" ")[0] # remove alias + d[tf] = s + table_sample.append(d) + + return table_sample + + +def get_hist_file(hist_path, bin_number=50): + hist_file = pd.read_csv(hist_path) + for i in range(len(hist_file)): + freq = hist_file["freq"][i] + freq_np = np.frombuffer(bytes.fromhex(freq), dtype=float) + hist_file["freq"][i] = freq_np + + table_column = [] + for i in range(len(hist_file)): + table = hist_file["table"][i] + col = hist_file["column"][i] + table_alias = "".join([tok[0] for tok in table.split("_")]) + if table == "movie_info_idx": + table_alias = "mi_idx" + combine = ".".join([table_alias, col]) + table_column.append(combine) + hist_file["table_column"] = table_column + + for rid in range(len(hist_file)): + hist_file["bins"][rid] = [ + int(i) for i in hist_file["bins"][rid][1:-1].split(" ") if len(i) > 0 + ] + + if bin_number != 50: + hist_file = re_bin(hist_file, bin_number) + + return hist_file + + +def re_bin(hist_file, target_number): + for i in range(len(hist_file)): + freq = hist_file["freq"][i] + bins = freq2bin(freq, target_number) + hist_file["bins"][i] = bins + return hist_file + + +def freq2bin(freqs, target_number): + freq = freqs.copy() + maxi = len(freq) - 1 + + step = 1.0 / target_number + mini = 0 + while freq[mini + 1] == 0: + mini += 1 + pointer = mini + 1 + cur_sum = 0 + res_pos = [mini] + residue = 0 + while pointer < maxi + 1: + cur_sum += freq[pointer] + freq[pointer] = 0 + if cur_sum >= step: + cur_sum -= step + res_pos.append(pointer) + else: + pointer += 1 + + if len(res_pos) == target_number: + res_pos.append(maxi) + + return res_pos + + +class Batch: + def __init__(self, attn_bias, rel_pos, heights, x, y=None): + super(Batch, self).__init__() + + self.heights = heights + self.x, self.y = x, y + self.attn_bias = attn_bias + self.rel_pos = rel_pos + + def to(self, device): + + self.heights = self.heights.to(device) + self.x = self.x.to(device) + + self.attn_bias, self.rel_pos = self.attn_bias.to(device), self.rel_pos.to( + device + ) + + return self + + def __len__(self): + return self.in_degree.size(0) + + +def pad_1d_unsqueeze(x, padlen): + x = x + 1 # pad id = 0 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen], dtype=x.dtype) + new_x[:xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pad_2d_unsqueeze(x, padlen): + # dont know why add 1, comment out first + # x = x + 1 # pad id = 0 + xlen, xdim = x.size() + if xlen < padlen: + new_x = x.new_zeros([padlen, xdim], dtype=x.dtype) + 1 + new_x[:xlen, :] = x + x = new_x + return x.unsqueeze(0) + + +def pad_rel_pos_unsqueeze(x, padlen): + x = x + 1 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype) + new_x[:xlen, :xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pad_attn_bias_unsqueeze(x, padlen): + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype).fill_(float("-inf")) + new_x[:xlen, :xlen] = x + new_x[xlen:, :xlen] = 0 + x = new_x + return x.unsqueeze(0) + + +def collator(small_set): + y = small_set[1] + xs = [s["x"] for s in small_set[0]] + + num_graph = len(y) + x = torch.cat(xs) + attn_bias = torch.cat([s["attn_bias"] for s in small_set[0]]) + rel_pos = torch.cat([s["rel_pos"] for s in small_set[0]]) + heights = torch.cat([s["heights"] for s in small_set[0]]) + + return Batch(attn_bias, rel_pos, heights, x), y + + +def filterDict2Hist(hist_file, filterDict, encoder): + buckets = len(hist_file["bins"][0]) + empty = np.zeros(buckets - 1) + ress = np.zeros((3, buckets - 1)) + # iterate over each filter + for i in range(len(filterDict["colId"])): + colId = filterDict["colId"][i] + col = encoder.idx2col[colId] + if col == "NA": + ress[i] = empty + continue + bins = hist_file.loc[hist_file["column"] == col, "bins"].item() + + opId = filterDict["opId"][0] + op = encoder.idx2op[opId] + + val = filterDict["val"][0] + + left = 0 + right = len(bins) - 1 + + if col in encoder.categorical_vals_mapping: + # categorical column + # print("col: ", col, " val: ", val) + left = right = val + # print("left: ", left, " right: ", right) + elif col in encoder.column_min_max_vals: + # numerical column + mini, maxi = encoder.column_min_max_vals[col] + val_unnorm = val * (maxi - mini) + mini + + for j in range(len(bins)): + if bins[j] < val_unnorm: + left = j + if bins[j] > val_unnorm: + right = j + break + else: + logging.warning(f"Column {col} not found in encoder") + ress = ress.flatten() + return ress + + res = np.zeros(len(bins) - 1) + + if op == "eq": + res[left:right] = 1 + elif op == "lt": + res[:left] = 1 + elif op == "gt": + res[right:] = 1 + elif op == "lte": + res[: right + 1] = 1 + elif op == "gte": + res[left:] = 1 + + ress[i] = res + + ress = ress.flatten() + return ress + + +def format_join(plan): + if "joinType" in plan: + return plan["joinType"] + else: + return None + + +def format_filter(plan): + filters = [] + alias = None + try: + if "filter" in plan: + if plan["filter"]["functionName"] in [ + "eq", + "lt", + "gt", + "gte", + "lte", + "like", + ]: + if plan["filter"]["functionName"] == "like": + # remove the % in the value + plan["filter"]["inputs"][1]["value"]["value"] = plan["filter"][ + "inputs" + ][1]["value"]["value"].strip("%") + if "nullOnFailure" in plan["filter"]["inputs"][0]: + filter = "{} {} {}".format( + plan["filter"]["inputs"][0]["inputs"][0]["fieldName"], + plan["filter"]["functionName"], + plan["filter"]["inputs"][1]["value"]["value"], + ) + else: + filter = "{} {} {}".format( + plan["filter"]["inputs"][0]["fieldName"], + plan["filter"]["functionName"], + plan["filter"]["inputs"][1]["value"]["value"], + ) + filters.append(filter) + else: + raise ValueError( + "Unsupported filter function: {}".format( + plan["filter"]["functionName"] + ) + ) + + # print("[INFO-format_filter] filter: ", plan["filter"], " filters: ", filters) + except Exception as e: + print("[ERROR-format_filter] plan: ", plan["filter"]) + raise e + + return filters, alias + + +def extract_ml_operators(node, ml_ops=None, ml_op_dims=None, ml_nested_kernels=None): + if ml_ops is None: + ml_ops = [] + if ml_op_dims is None: + ml_op_dims = [] + if ml_nested_kernels is None: + ml_nested_kernels = [] + + # Define a regex pattern to match ML operators with suffixes + ml_operator_pattern = re.compile( + r"^(relu|mat_mul|mat_vector_add|softmax|argmax|batch_norm|torchdnn)" + ) + + # Check if the node contains a "functionName" and matches the pattern + if "functionName" in node: + function_name = node["functionName"] + if ml_operator_pattern.match(function_name): + ml_ops.append(function_name) + ml_op_dims.append(node.get("dims", [0])) + ml_nested_kernels.append(node.get("torchdnn_kernels", [])) + + # Recursively check the "inputs" if they exist + if "inputs" in node and isinstance(node["inputs"], list): + for child in node["inputs"]: + extract_ml_operators(child, ml_ops, ml_op_dims, ml_nested_kernels) + # reverse the order: starting from the first layer + return ml_ops[::-1], ml_op_dims[::-1], ml_nested_kernels[::-1] + + +def format_ml_ops(plan): + list_ml_ops = [] + list_ml_op_dims = [] + list_ml_nested_kernels = [] + if "projections" in plan: + for projection in plan["projections"]: + # Check if the node contains a "functionName" and matches the pattern + if "functionName" in projection: + ml_ops, ml_op_dims, ml_nested_kernels = extract_ml_operators(projection) + list_ml_ops.extend(ml_ops) + list_ml_op_dims.extend(ml_op_dims) + list_ml_nested_kernels.extend(ml_nested_kernels) + + return list_ml_ops, list_ml_op_dims, list_ml_nested_kernels + + +def format_ml_op_name(op_name): + if "mat_mul" in op_name: + return "mat_mul" + elif "mat_vector_add" in op_name: + return "mat_vector_add" + elif "relu" in op_name: + return "relu" + elif "softmax" in op_name: + return "softmax" + elif "argmax" in op_name: + return "argmax" + elif "batch_norm" in op_name: + return "batch_norm" + elif "torchdnn" in op_name: + return "torchdnn" + elif "embedding" in op_name: + return "embedding" + elif "sigmoid" in op_name: + return "sigmoid" + else: + warnings.warn(f"[MLOP-Format] Unsupported ML operator: {op_name}") + return "NA" + + +def compute_torchdnn_computation_complexity(ml_nested_op_dims, ml_nested_ops): + complexity = 0 + for idx, op in enumerate(ml_nested_ops): + if op == "MatMul": + complexity += ml_nested_op_dims[idx * 2] * ml_nested_op_dims[idx * 2 + 1] + elif op == "MatAdd": + complexity += ml_nested_op_dims[idx * 2] + elif op == "ReLU": + complexity += ml_nested_op_dims[idx * 2] + elif op == "Softmax": + complexity += ml_nested_op_dims[idx * 2] + elif op == "Argmax": + complexity += ml_nested_op_dims[idx * 2] + elif op == "BatchNorm": + complexity += ml_nested_op_dims[idx * 2] + else: + logging.warning( + f"[TorchDNN-Complex.-Comput.] Unsupported ML operator: {op}" + ) + return complexity + +class ModelGraphEncoder: + def __init__( + self, + mlop2idx={ + "NA": 0, + "mat_mul": 1, + "mat_vector_add": 2, + "relu": 3, + "softmax": 4, + "argmax": 5, + "batch_norm": 6, + "torchdnn": 7, + "sigmoid": 8, + "embedding": 9, + "svd": 10, + }, + ml_op_flops_min_max=(0, 100000 * 2048), + ml_op_dims_min_max=(0, 100000) + ): + self.mlop2idx = mlop2idx + self.idx2mlop = {v: k for k, v in mlop2idx.items()} + self.ml_op_flops_min_max = ml_op_flops_min_max + self.ml_op_dims_min_max = ml_op_dims_min_max + + def set_ml_op_flops_min_max(self, ml_op_flops_min_max): + self.ml_op_flops_min_max = ml_op_flops_min_max + + def set_ml_op_dims_min_max(self, ml_op_dims_min_max): + self.ml_op_dims_min_max = ml_op_dims_min_max + + def encode_ml_op(self, ml_op): + if ml_op not in self.mlop2idx: + self.mlop2idx[ml_op] = len(self.mlop2idx) + self.idx2mlop[self.mlop2idx[ml_op]] = ml_op + return self.mlop2idx[ml_op] + + def encode_ml_op_dims(self, ml_op_dims, length=20): + if isinstance(ml_op_dims, list): + ml_op_dims = np.array(ml_op_dims) + encoded_dims = (ml_op_dims - self.ml_op_dims_min_max[0]) / ( + self.ml_op_dims_min_max[1] - self.ml_op_dims_min_max[0] + ) + encoded_dims = np.pad( + encoded_dims, (0, length - len(encoded_dims)), "constant", constant_values=0 + ) + + if (encoded_dims > 1).any(): + abnormal_idx = np.where(encoded_dims > 1)[0] + warnings.warn( + "Encoded dimensions exceed 1.0, which may indicate an issue with the normalization. Original dimensions: {}, Encoded dimensions: {}".format( + ml_op_dims[abnormal_idx], encoded_dims[abnormal_idx] + ) + ) + return encoded_dims + else: + raise ValueError( + "ml_op_dims should be a list or numpy array, got {}".format( + type(ml_op_dims) + ) + ) + + def encode_ml_op_flops(self, ml_flops): + # Normalize the flops value to [0, 1] using min-max scaling + min_flops, max_flops = self.ml_op_flops_min_max + encoded_flops = (ml_flops - min_flops) / (max_flops - min_flops) + if encoded_flops > 1: + warnings.warn( + "Encoded FLOPs value is out of bounds [0, 1]. " + "This may indicate an issue with the normalization. Original FLOPs: {}, Encoded FLOPs: {}".format( + ml_flops, encoded_flops)) + return encoded_flops + +from collections import defaultdict +import hashlib + +def binary_search_with_range(arr, target, range_percent=0.1): + """ + Perform a binary search to find the index of the value in a sorted array + where the target is within a specified percentage range of the value. + + Args: + arr: Sorted list or numpy array of numeric values. + target: The value to search for. + range_percent: Acceptable percentage difference (default 0.1 for 10%). + + Returns: + The index of the value if found within the range, else -1. + """ + low, high = 0, len(arr) - 1 + while low <= high: + mid = (low + high) // 2 + val = arr[mid] + if abs(val - target) <= abs(val) * range_percent: + return mid + elif val < target: + low = mid + 1 + else: + high = mid - 1 + return -1 + +class WeisfeilerLehmanEncoder: + def __init__(self, range_percent=0.1, num_iterations=2): + # Maintain groups of labels, where the key is the kernel type + # and the value is a list of nodes with FLOPs. The final label + # is constructed as kernel_type + "_" + flops. The values are + # sorted to ensure deterministic ordering. + self.label_groups = defaultdict(list) + self.range_percent = range_percent + self.num_iterations = num_iterations + self.unique_labels = set() # To track unique labels at final iteration + + def inital_node_label(self, node): + node_type = node.ml_op_type + node_flop = node.ml_op_flop + group_flops = self.label_groups[node_type] + + # Find the index of the node's FLOPs in the sorted list of FLOPs + flops_idx = binary_search_with_range(group_flops, node_flop, self.range_percent) + if flops_idx == -1: + label = f"{node_type}_{node_flop}" + group_flops.append(node_flop) + group_flops.sort() # Keep the list sorted for future searches + self.label_groups[node_type] = group_flops + else: + label = f"{node_type}_{group_flops[flops_idx]}" + + return label + + def hash_label(label): + """Deterministically hash a string label into a fixed string.""" + return hashlib.md5(label.encode()).hexdigest() + + def get_wl_subtree_features(self, root): + """ + Extract WL subtree features from a model tree. + """ + label_dict = {} # node -> current label + label_history = {} # node -> list of labels at each iteration + node_list = [] + + # Step 1: Traverse tree to initialize labels + def dfs(node): + label = self.inital_node_label(node) + # label = str(node.ml_op_type) # Initial version, using only the type + label_dict[node] = label + label_history[node] = [label] + node_list.append(node) + for child in node.children: + dfs(child) + dfs(root) + + # Step 2: WL iterations + for i in range(self.num_iterations): + new_labels = {} + for node in node_list: + child_labels = sorted([label_dict[child] for child in node.children]) + neighbor_str = label_dict[node] + "_" + "_".join(child_labels) + # new_label = hash_label(neighbor_str) + new_label = neighbor_str + new_labels[node] = new_label + label_history[node].append(new_label) + label_dict = new_labels + + # Step 3: Count all labels across iterations and add labels to unique labels set + feature_counter = defaultdict(int) + for node in node_list: + for lbl in label_history[node]: + feature_counter[lbl] += 1 + self.unique_labels.update(label_history[node]) + + return feature_counter + + def assign_init_label_for_dataset(self, dataset): + """ + Encode all trees in the dataset and return a list of feature dictionaries. + + Args: + dataset : ModelComputationGraphDataset + """ + for i in range(len(dataset)): + root_node = dataset.rootNodes[i] + wl_feature = self.get_wl_subtree_features(root_node) + dataset.wl_features.append(wl_feature) + + def obtain_wl_feature_for_dataset(self, dataset): + self.assign_init_label_for_dataset(dataset) + wl_sorted_labels = sorted(self.unique_labels) + dataset.wl_feature_vectors = np.zeros((len(dataset), len(wl_sorted_labels)), dtype=np.float32) + wl_labels2idx = {label: idx for idx, label in enumerate(wl_sorted_labels)} + + for i in tqdm(range(len(dataset))): + wl_feature = dataset.wl_features[i] + for label, count in wl_feature.items(): + if label in wl_labels2idx: + dataset.wl_feature_vectors[i, wl_labels2idx[label]] = count + + def construct_similar_dissimilar_pairs_for_dataset(self, dataset, sim_threshold=0.8, dissim_threshold=0.2): + faiss_index = faiss.index_factory( + dataset.wl_feature_vectors.shape[1], "Flat", faiss.METRIC_INNER_PRODUCT + ) + data_to_add = dataset.wl_feature_vectors.copy() + faiss.normalize_L2(data_to_add) + faiss_index.add(data_to_add) + + query_vectors = dataset.wl_feature_vectors[:].copy() + faiss.normalize_L2(query_vectors) + D, I = faiss_index.search(query_vectors, len(dataset)) + for i in range(len(dataset)): + max_sim = D[i][1] + min_sim = D[i][-1] + max_sim_idx = I[i][1] + min_sim_idx = I[i][-1] + if max_sim > sim_threshold: + dataset.similar_model_idx[i] = max_sim_idx + else: + dataset.similar_model_idx[i] = i + if min_sim < dissim_threshold: + dataset.dissimilar_model_idx[i] = min_sim_idx + else: + if i > 0: + dataset.dissimilar_query_idx[i] = dataset.similar_query_idx[i - 1] + else: + dataset.dissimilar_query_idx[i] = random.randint(0, len(dataset) - 1) + warnings.warn(f"[WL-Encoder] No dissimilar query found for model {i}, minimum similarity: {min_sim}") \ No newline at end of file diff --git a/velox/optimizer/python/model2Vec/dataset.py b/velox/optimizer/python/model2Vec/dataset.py new file mode 100644 index 000000000..ba56bb441 --- /dev/null +++ b/velox/optimizer/python/model2Vec/dataset.py @@ -0,0 +1,434 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import torch +from torch.utils.data import Dataset +import numpy as np +import json +import pandas as pd +import sys, os +import ast +import warnings +from collections import deque +from .database_util import ( + format_filter, + format_join, + filterDict2Hist, + ModelGraphEncoder, +) +from .database_util import * +from model_cactus.util import Normalizer + + +def get_numerical_min_max_mapping(df_stat): + numerical_min_max_mapping = {} + for idx, row in df_stat.iterrows(): + if row["type"] == "Numerical": + numerical_min_max_mapping[row["column"]] = (row["bins"][0], row["bins"][-1]) + return numerical_min_max_mapping + + +def get_categorical_mapping(df_stat): + categorical_mapping = {} + for idx, row in df_stat.iterrows(): + if row["type"] == "Categorical": + bins = row["bins"] + # filter out None + bins = [str(x) for x in bins if x != ""] + column_mapping = {} + for idx, val in enumerate(bins): + column_mapping[val] = idx + categorical_mapping[row["column"]] = column_mapping + return categorical_mapping + + +def calculate_height(adj_list, tree_size): + if tree_size == 1: + return np.array([0]) + + adj_list = np.array(adj_list) + node_ids = np.arange(tree_size, dtype=int) + node_order = np.zeros(tree_size, dtype=int) + uneval_nodes = np.ones(tree_size, dtype=bool) + + parent_nodes = adj_list[:, 0] + child_nodes = adj_list[:, 1] + + n = 0 + while uneval_nodes.any(): + uneval_mask = uneval_nodes[child_nodes] + unready_parents = parent_nodes[uneval_mask] + + node2eval = uneval_nodes & ~np.isin(node_ids, unready_parents) + node_order[node2eval] = n + uneval_nodes[node2eval] = False + n += 1 + return node_order + + +def node2dict(treeNode): + + adj_list, num_child, features = topo_sort(treeNode) + heights = calculate_height(adj_list, len(features)) + + return { + "features": torch.FloatTensor(np.array(features)), + "heights": torch.LongTensor(heights), + "adjacency_list": torch.LongTensor(np.array(adj_list)), + } + + +def topo_sort(root_node): + # nodes = [] + adj_list = [] # from parent to children + num_child = [] + features = [] + + toVisit = deque() + toVisit.append((0, root_node)) + next_id = 1 + while toVisit: + idx, node = toVisit.popleft() + # nodes.append(node) + features.append(node.feature) + num_child.append(len(node.children)) + for child in node.children: + toVisit.append((next_id, child)) + adj_list.append((idx, next_id)) + next_id += 1 + + return adj_list, num_child, features + + +def pad_2d_unsqueeze(x, padlen): + # dont know why add 1, comment out first + # x = x + 1 # pad id = 0 + xlen, xdim = x.size() + if xlen < padlen: + new_x = x.new_zeros([padlen, xdim], dtype=x.dtype) + 1 + new_x[:xlen, :] = x + x = new_x + return x.unsqueeze(0) + + +def pad_rel_pos_unsqueeze(x, padlen): + x = x + 1 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype) + new_x[:xlen, :xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pad_attn_bias_unsqueeze(x, padlen): + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype).fill_(float("-inf")) + new_x[:xlen, :xlen] = x + new_x[xlen:, :xlen] = 0 + x = new_x + return x.unsqueeze(0) + + +def floyd_warshall_rewrite(adjacency_matrix): + (nrows, ncols) = adjacency_matrix.shape + assert nrows == ncols + M = adjacency_matrix.copy().astype("long") + for i in range(nrows): + for j in range(ncols): + if i == j: + M[i][j] = 0 + elif M[i][j] == 0: + M[i][j] = 60 + + for k in range(nrows): + for i in range(nrows): + for j in range(nrows): + M[i][j] = min(M[i][j], M[i][k] + M[k][j]) + return M + + +def pad_1d_unsqueeze(x, padlen): + x = x + 1 # pad id = 0 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen], dtype=x.dtype) + new_x[:xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pre_collate(the_dict, max_node=50, rel_pos_max=20): + + x = pad_2d_unsqueeze(the_dict["features"], max_node) + N = len(the_dict["features"]) + attn_bias = torch.zeros([N + 1, N + 1], dtype=torch.float) + + edge_index = the_dict["adjacency_list"].t() + if len(edge_index) == 0: + shortest_path_result = np.array([[0]]) + path = np.array([[0]]) + adj = torch.tensor([[0]]).bool() + else: + adj = torch.zeros([N, N], dtype=torch.bool) + adj[edge_index[0, :], edge_index[1, :]] = True + + shortest_path_result = floyd_warshall_rewrite(adj.numpy()) + + rel_pos = torch.from_numpy((shortest_path_result)).long() + + attn_bias[1:, 1:][rel_pos >= rel_pos_max] = float("-inf") + + attn_bias = pad_attn_bias_unsqueeze(attn_bias, max_node + 1) + rel_pos = pad_rel_pos_unsqueeze(rel_pos, max_node) + + heights = pad_1d_unsqueeze(the_dict["heights"], max_node) + + return {"x": x, "attn_bias": attn_bias, "rel_pos": rel_pos, "heights": heights} + + +def read_and_process_histograms(path): + df_stat_columns = ["table", "column", "table_column", "type", "freqs", "bins"] + df_stat = pd.read_csv(path, sep="|", header=None, names=df_stat_columns) + for idx, row in df_stat.iterrows(): + df_stat.loc[idx, "freqs"] = ast.literal_eval(row["freqs"]) + if row["type"] == "Numerical": + df_stat.loc[idx, "bins"] = [float(x) for x in ast.literal_eval(row["bins"])] + else: + df_stat.loc[idx, "bins"] = ast.literal_eval(row["bins"]) + return df_stat + + +def remove_type_attribute(obj): + if isinstance(obj, dict): + # Use list() to avoid 'RuntimeError: dictionary changed size during iteration' + for key in list(obj.keys()): + if key == "type" or key == "outputType": + del obj[key] + elif key == "functionName" and obj[key] is None: + del obj[key] + else: + # Recursively call the function on nested dictionaries + remove_type_attribute(obj[key]) + elif isinstance(obj, list): + # Recursively call the function on each element of the list + for item in obj: + remove_type_attribute(item) + + +def extract_dim_for_ml_op(ml_node): + if "dims" in ml_node: + return ml_node["dims"] + else: + # for the op like relu, sigmoid, argmax, etc. the dims is not specified + # we give it a default value of -1, a future step is to infer the dims from the input + return [-1] + + +def compute_flops_from_ml_dims(ml_op_dims): + # compute the flops based on the dims + if isinstance(ml_op_dims, list): + if len(ml_op_dims) == 1: + return ml_op_dims[0] # for the op like relu, sigmoid, argmax, etc. + else: + return sum(a * b for a, b in zip(ml_op_dims, ml_op_dims[1:])) + # return np.prod(ml_op_dims) + else: + raise ValueError( + "ml_op_dims should be a list or numpy array, got {}".format( + type(ml_op_dims) + ) + ) + + +import re + +ml_kernel_pattern = r"(relu|mat_mul|mat_vector_add|softmax|argmax|batch_norm|torchdnn|svd|embedding|sigmoid)" + + +def find_ml_express_node_from_plan(node, search_through_source): + if "projections" in node: + for proj in node["projections"]: + if "functionName" in proj and proj["functionName"] is not None: + function_name = proj["functionName"] + if re.search(ml_kernel_pattern, function_name, re.IGNORECASE): + return True, proj + # if search_through_source is False, we only search the current node + # otherwise, we search through the sources + if search_through_source and "sources" in node: + for subplan in node["sources"]: + found, returned_node = find_ml_express_node_from_plan( + subplan, search_through_source + ) + if found: + return True, returned_node + return False, None + + +class ModelGraphTreeNode: + def __init__(self, ml_op_type, ml_op_type_id, ml_op_dims, ml_op_flop): + self.ml_op_type = ml_op_type + self.ml_op_type_id = ml_op_type_id + self.ml_op_dims = ml_op_dims + self.ml_op_flop = ml_op_flop + + self.parent = None + self.feature = None + self.children = [] + + def addChild(self, treeNode): + self.children.append(treeNode) + + def __str__(self): + # return TreeNode.print_nested(self) + return "{} with {}, {}, {}, {} children".format( + self.ml_op_type, + self.ml_op_type_id, + self.ml_op_dims, + self.ml_op_flop, + len(self.children), + ) + + def __repr__(self): + return self.__str__() + + @staticmethod + def print_nested(node, indent=0): + print( + "--" * indent + + "{} with {}, {}, {}, {} children".format( + node.ml_op_type, + node.ml_op_type_id, + node.ml_op_dims, + node.ml_op_flop, + len(node.children), + ) + ) + for k in node.children: + ModelGraphTreeNode.print_nested(k, indent + 1) + + +def modelGraphNode2feature(node, encoder : ModelGraphEncoder, length : int = 50): + ml_op_id = node.ml_op_type_id + ml_op_dims_encoded = encoder.encode_ml_op_dims(node.ml_op_dims, length) + ml_op_flop_encoded = encoder.encode_ml_op_flops(node.ml_op_flop) + + return np.concatenate( + (np.array([ml_op_id]), ml_op_dims_encoded, np.array([ml_op_flop_encoded])) + ) + + +class ModelComputationGraphDataset(Dataset): + def __init__( + self, + cost_normalizer: Normalizer, + model_graph_encoder: ModelGraphEncoder, + df: pd.DataFrame = None, + query_plans: json = None, + to_predict: str = "cost", + dir_path: str = "/home/velox/velox/optimizer/tests/", + mode: str = "train", + ml_op_dim_length: int = 50, + device: torch.device = "cpu", + ): + self.df = df + self.query_plans = query_plans + self.model_graph_encoder = model_graph_encoder + + self.dir_path = dir_path + + self.cost_normalizer = cost_normalizer + self.mode = mode + self.ml_op_dim_length = ml_op_dim_length + self.device = device + + if df is not None: + self.cards = np.zeros(len(df)) + self.costs = df["executionTime"].values + self.card_labels = torch.from_numpy(self.cards) # FIXME: + self.cost_labels = torch.from_numpy( + self.cost_normalizer.normalize_labels(self.costs) + ) + self.query_plans = [ + read_json(os.path.join(dir_path, x)) for x in df["serializedPlanPath"] + ] + elif query_plans is not None: + self.cards = np.zeros(len(query_plans)) + self.costs = np.zeros(len(query_plans)) + self.card_labels = torch.from_numpy(self.cards) + self.cost_labels = torch.from_numpy( + self.cost_normalizer.normalize_labels(self.costs) + ) + + self.length = len(self.query_plans) + idxs = range(self.length) + + self.to_predict = to_predict + if to_predict == "cost": + self.gts = self.costs + self.labels = self.cost_labels + else: + raise ValueError("Invalid to_predict value: {}".format(to_predict)) + + self.treeNodes = [] ## for mem collection + self.rootNodes = [] + self.collated_dicts = [ + self.js_node2dict(i, plan) for i, plan in zip(idxs, self.query_plans) + ] + + self.similar_model_idx = {} + self.dissimilar_model_idx = {} + self.wl_features = [] + + def js_node2dict(self, idx, plan): + try: + # if it is train mode, we search the plan through the sources + search_through_source = self.mode == "train" + find_ml_node, ml_node = find_ml_express_node_from_plan( + plan, search_through_source + ) + treeNode = self.traverse_model_graph(ml_node) + self.rootNodes.append(treeNode) + _dict = node2dict(treeNode) + collated_dict = pre_collate(_dict) + except Exception as e: + print("Error in js_node2dict idx: ", idx) + raise e + + # self.treeNodes.clear() + # del self.treeNodes[:] + + return collated_dict + + def __len__(self): + return self.length + + def __getitem__(self, idx): + + return self.collated_dicts[idx], (self.cost_labels[idx], self.card_labels[idx]) + + def traverse_model_graph(self, ml_node): + # extract ml op name + ml_op = format_ml_op_name(ml_node["functionName"]) + ml_type_id = self.model_graph_encoder.encode_ml_op(ml_op) + ml_op_dims = extract_dim_for_ml_op(ml_node) + ml_flops = compute_flops_from_ml_dims(ml_op_dims) + root = ModelGraphTreeNode(ml_op, ml_type_id, ml_op_dims, ml_flops) + try: + root.feature = modelGraphNode2feature(root, self.model_graph_encoder, self.ml_op_dim_length) + except Exception as e: + print("Error in modelGraphNode2feature for node: ", ml_node) + raise e + self.treeNodes.append(root) + if "inputs" in ml_node: + for sub_op in ml_node["inputs"]: + if "functionName" not in sub_op or sub_op["functionName"] is None: + # skip the node without functionName + continue + # sub_op is a dict with functionName and dims + sub_op["parent"] = ml_node + node = self.traverse_model_graph(sub_op) + node.parent = root + root.addChild(node) + return root diff --git a/velox/optimizer/python/model2Vec/model.py b/velox/optimizer/python/model2Vec/model.py new file mode 100644 index 000000000..e9b91d491 --- /dev/null +++ b/velox/optimizer/python/model2Vec/model.py @@ -0,0 +1,280 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +from .database_util import collator +import torch +from torch.utils.data import Dataset +import json +import pandas as pd +import torch.nn as nn +import torch.nn.functional as F + + +class Prediction(nn.Module): + def __init__( + self, in_feature=69, hid_units=256, contract=1, mid_layers=True, res_con=True + ): + super(Prediction, self).__init__() + self.mid_layers = mid_layers + self.res_con = res_con + + self.out_mlp1 = nn.Linear(in_feature, hid_units) + + self.mid_mlp1 = nn.Linear(hid_units, hid_units // contract) + self.mid_mlp2 = nn.Linear(hid_units // contract, hid_units) + + self.out_mlp2 = nn.Linear(hid_units, 1) + + def forward(self, features): + + hid = F.relu(self.out_mlp1(features)) + if self.mid_layers: + mid = F.relu(self.mid_mlp1(hid)) + mid = F.relu(self.mid_mlp2(mid)) + if self.res_con: + hid = hid + mid + else: + hid = mid + out = torch.sigmoid(self.out_mlp2(hid)) + + return out + + +class FeatureEmbed(nn.Module): + def __init__( + self, + embed_size=32, + kernels=100, + dims_length=50, + flops_length=1, + ): + super(FeatureEmbed, self).__init__() + + self.embed_size = embed_size + self.dims_length = dims_length + self.flops_length = flops_length + self.kernelEmbed = nn.Embedding(kernels, embed_size) + + self.kernelDimsLinear = nn.Linear(dims_length, embed_size) + self.kernelFlopsLinear = nn.Linear(flops_length, embed_size) + + self.project = nn.Linear(embed_size * 3, embed_size * 3) + + def forward(self, feature): + kernelId, dims, flops = torch.split(feature, (1, self.dims_length, self.flops_length), dim=-1) + + kernelEmb = self.getKernel(kernelId) + dimsEmb = self.getKernelDims(dims) + flopsEmb = self.getKernelFlops(flops) + + final = torch.cat((kernelEmb, dimsEmb, flopsEmb), dim=1) + final = F.leaky_relu(self.project(final)) + + return final + + def getKernel(self, kernelId): + emb = self.kernelEmbed(kernelId.long()) + return emb.squeeze(1) + + def getKernelDims(self, dims): + emb = F.leaky_relu(self.kernelDimsLinear(dims)) + return emb.squeeze(1) + + def getKernelFlops(self, flops): + emb = F.leaky_relu(self.kernelFlopsLinear(flops)) + return emb.squeeze(1) + + +class Model2Vec(nn.Module): + def __init__( + self, + emb_size=32, + ffn_dim=32, + head_size=8, + dropout=0.1, + attention_dropout_rate=0.1, + n_layers=8, + bin_number=50, + pred_hid=256, + ): + + super(Model2Vec, self).__init__() + hidden_dim = emb_size * 3 + self.hidden_dim = hidden_dim + self.head_size = head_size + + self.rel_pos_encoder = nn.Embedding(64, head_size, padding_idx=0) + + self.height_encoder = nn.Embedding(64, hidden_dim, padding_idx=0) + + self.input_dropout = nn.Dropout(dropout) + encoders = [ + EncoderLayer( + hidden_dim, ffn_dim, dropout, attention_dropout_rate, head_size + ) + for _ in range(n_layers) + ] + self.layers = nn.ModuleList(encoders) + + self.final_ln = nn.LayerNorm(hidden_dim) + + self.super_token = nn.Embedding(1, hidden_dim) + self.super_token_virtual_distance = nn.Embedding(1, head_size) + + self.embbed_layer = FeatureEmbed(emb_size) + + self.pred = Prediction(hidden_dim, pred_hid) + + # if multi-task + self.pred2 = Prediction(hidden_dim, pred_hid) + + def forward(self, batched_data): + # attention bias, rel pos, x + attn_bias, rel_pos, x = ( + batched_data.attn_bias, + batched_data.rel_pos, + batched_data.x, + ) + # heights encoding + heights = batched_data.heights + + n_batch, n_node = x.size()[:2] + tree_attn_bias = attn_bias.clone() + tree_attn_bias = tree_attn_bias.unsqueeze(1).repeat(1, self.head_size, 1, 1) + + # rel pos + rel_pos_bias = self.rel_pos_encoder(rel_pos).permute( + 0, 3, 1, 2 + ) # [n_batch, n_node, n_node, n_head] -> [n_batch, n_head, n_node, n_node] + tree_attn_bias[:, :, 1:, 1:] = tree_attn_bias[:, :, 1:, 1:] + rel_pos_bias + + # reset rel pos here + t = self.super_token_virtual_distance.weight.view(1, self.head_size, 1) + tree_attn_bias[:, :, 1:, 0] = tree_attn_bias[:, :, 1:, 0] + t + tree_attn_bias[:, :, 0, :] = tree_attn_bias[:, :, 0, :] + t + + x_view = x.view(-1, 52) + node_feature = self.embbed_layer(x_view).view(n_batch, -1, self.hidden_dim) + + # -1 is number of dummy + + node_feature = node_feature + self.height_encoder(heights) + super_token_feature = self.super_token.weight.unsqueeze(0).repeat(n_batch, 1, 1) + super_node_feature = torch.cat([super_token_feature, node_feature], dim=1) + + # transfomrer encoder + output = self.input_dropout(super_node_feature) + for enc_layer in self.layers: + output = enc_layer(output, tree_attn_bias) + output = self.final_ln(output) + + return output[:, 0, :] + + def get_pred1(self, embeddings): + return self.pred(embeddings) + + def get_pred2(self, embeddings): + return self.pred2(embeddings) + + def get_embeddings(self, dataset, device): + batch, _ = collator(list(zip(*[dataset[i] for i in range(len(dataset))]))) + with torch.no_grad(): + batch = batch.to(device) + embeddings = self.forward(batch) + return embeddings + + +class FeedForwardNetwork(nn.Module): + def __init__(self, hidden_size, ffn_size, dropout_rate): + super(FeedForwardNetwork, self).__init__() + + self.layer1 = nn.Linear(hidden_size, ffn_size) + self.gelu = nn.GELU() + self.layer2 = nn.Linear(ffn_size, hidden_size) + + def forward(self, x): + x = self.layer1(x) + x = self.gelu(x) + x = self.layer2(x) + return x + + +class MultiHeadAttention(nn.Module): + def __init__(self, hidden_size, attention_dropout_rate, head_size): + super(MultiHeadAttention, self).__init__() + + self.head_size = head_size + + self.att_size = att_size = hidden_size // head_size + self.scale = att_size**-0.5 + + self.linear_q = nn.Linear(hidden_size, head_size * att_size) + self.linear_k = nn.Linear(hidden_size, head_size * att_size) + self.linear_v = nn.Linear(hidden_size, head_size * att_size) + self.att_dropout = nn.Dropout(attention_dropout_rate) + + self.output_layer = nn.Linear(head_size * att_size, hidden_size) + + def forward(self, q, k, v, attn_bias=None): + orig_q_size = q.size() + + d_k = self.att_size + d_v = self.att_size + batch_size = q.size(0) + + # head_i = Attention(Q(W^Q)_i, K(W^K)_i, V(W^V)_i) + q = self.linear_q(q).view(batch_size, -1, self.head_size, d_k) + k = self.linear_k(k).view(batch_size, -1, self.head_size, d_k) + v = self.linear_v(v).view(batch_size, -1, self.head_size, d_v) + + q = q.transpose(1, 2) # [b, h, q_len, d_k] + v = v.transpose(1, 2) # [b, h, v_len, d_v] + k = k.transpose(1, 2).transpose(2, 3) # [b, h, d_k, k_len] + + # Scaled Dot-Product Attention. + # Attention(Q, K, V) = softmax((QK^T)/sqrt(d_k))V + q = q * self.scale + x = torch.matmul(q, k) # [b, h, q_len, k_len] + if attn_bias is not None: + x = x + attn_bias + + x = torch.softmax(x, dim=3) + x = self.att_dropout(x) + x = x.matmul(v) # [b, h, q_len, attn] + + x = x.transpose(1, 2).contiguous() # [b, q_len, h, attn] + x = x.view(batch_size, -1, self.head_size * d_v) + + x = self.output_layer(x) + + assert x.size() == orig_q_size + return x + + +class EncoderLayer(nn.Module): + def __init__( + self, hidden_size, ffn_size, dropout_rate, attention_dropout_rate, head_size + ): + super(EncoderLayer, self).__init__() + + self.self_attention_norm = nn.LayerNorm(hidden_size) + self.self_attention = MultiHeadAttention( + hidden_size, attention_dropout_rate, head_size + ) + self.self_attention_dropout = nn.Dropout(dropout_rate) + + self.ffn_norm = nn.LayerNorm(hidden_size) + self.ffn = FeedForwardNetwork(hidden_size, ffn_size, dropout_rate) + self.ffn_dropout = nn.Dropout(dropout_rate) + + def forward(self, x, attn_bias=None): + y = self.self_attention_norm(x) + y = self.self_attention(y, y, y, attn_bias) + y = self.self_attention_dropout(y) + x = x + y + + y = self.ffn_norm(x) + y = self.ffn(y) + y = self.ffn_dropout(y) + x = x + y + return x diff --git a/velox/optimizer/python/model2Vec/trainer.py b/velox/optimizer/python/model2Vec/trainer.py new file mode 100644 index 000000000..d69783a7c --- /dev/null +++ b/velox/optimizer/python/model2Vec/trainer.py @@ -0,0 +1,381 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from .database_util import collator, get_job_table_sample +import os +import time +import torch +from scipy.stats import pearsonr + + +def chunks(l, n): + """Yield successive n-sized chunks from l.""" + for i in range(0, len(l), n): + yield l[i : i + n] + + +def print_qerror(preds_unnorm, labels_unnorm, prints=False): + qerror = [] + for i in range(len(preds_unnorm)): + if preds_unnorm[i] > float(labels_unnorm[i]): + qerror.append(preds_unnorm[i] / float(labels_unnorm[i])) + else: + qerror.append(float(labels_unnorm[i]) / float(preds_unnorm[i])) + + e_50, e_90 = np.median(qerror), np.percentile(qerror, 90) + e_mean = np.mean(qerror) + + if prints: + print("[Q-ERROR] Median: {}".format(e_50)) + print("[Q-ERROR] Mean: {}".format(e_mean)) + + res = { + "q_median": e_50, + "q_90": e_90, + "q_mean": e_mean, + } + + return res + + +def get_corr(ps, ls): # unnormalised + ps = np.array(ps) + ls = np.array(ls) + corr, _ = pearsonr(np.log(ps), np.log(ls)) + + return corr + + +# def eval_workload(workload, methods): + +# get_table_sample = methods['get_sample'] + +# workload_file_name = './data/imdb/workloads/' + workload +# table_sample = get_table_sample(workload_file_name) +# plan_df = pd.read_csv('./data/imdb/{}_plan.csv'.format(workload)) +# workload_csv = pd.read_csv('./data/imdb/workloads/{}.csv'.format(workload),sep='#',header=None) +# workload_csv.columns = ['table','join','predicate','cardinality'] +# ds = PlanTreeDataset(plan_df, workload_csv, \ +# methods['encoding'], methods['hist_file'], methods['cost_norm'], \ +# methods['cost_norm'], 'cost', table_sample) + +# eval_score = evaluate(methods['model'], ds, methods['bs'], methods['cost_norm'], methods['device'],True) +# return eval_score, ds + + +def evaluate_model2vec(model, ds, bs, norm, device, prints=False): + model.eval() + cost_predss = np.empty(0) + + with torch.no_grad(): + for i in range(0, len(ds), bs): + batch, batch_labels = collator( + list(zip(*[ds[j] for j in range(i, min(i + bs, len(ds)))])) + ) + + batch = batch.to(device) + + model_embeds = model(batch) + cost_preds = model.get_pred1(model_embeds) + cost_preds = cost_preds.squeeze() + + cost_predss = np.append(cost_predss, cost_preds.cpu().detach().numpy()) + scores = print_qerror(norm.unnormalize_labels(cost_predss), ds.costs, prints) + corr = get_corr(norm.unnormalize_labels(cost_predss), ds.costs) + if prints: + print("Corr: ", corr) + return scores, corr + + +def train_model2vec( + model, + train_ds, + val_ds, + crit, + cost_norm, + args, + optimizer=None, + scheduler=None, + log_best=False, + best_metric="q_mean", +): + + to_pred, bs, device, epochs, clip_size = ( + args.to_predict, + args.bs, + args.device, + args.epochs, + args.clip_size, + ) + lr = args.lr + + if not optimizer: + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + if not scheduler: + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 20, 0.7) + + t0 = time.time() + + rng = np.random.default_rng() + + best_prev = 999999 + best_model_path = None + + for epoch in range(epochs): + losses = 0 + cost_predss = np.empty(0) + + model.train() + + train_idxs = rng.permutation(len(train_ds)) + + cost_labelss = np.array(train_ds.costs)[train_idxs] + + for idxs in chunks(train_idxs, bs): + optimizer.zero_grad() + + batch, batch_labels = collator(list(zip(*[train_ds[j] for j in idxs]))) + + l, r = zip(*(batch_labels)) + + batch_cost_label = torch.FloatTensor(l).to(device) + batch = batch.to(device) + + model_embeds = model(batch) + cost_preds = model.get_pred1(model_embeds) + cost_preds = cost_preds.squeeze() + + loss = crit(cost_preds, batch_cost_label) + + loss.backward() + + torch.nn.utils.clip_grad_norm_(model.parameters(), clip_size) + + optimizer.step() + # SQ: added the following 3 lines to fix the out of memory issue + del batch + del batch_labels + torch.cuda.empty_cache() + + losses += loss.item() + cost_predss = np.append(cost_predss, cost_preds.detach().cpu().numpy()) + + if epoch > 10: + test_scores, corrs = evaluate_model2vec( + model, val_ds, bs, cost_norm, device, False + ) + + test_scores["corr"] = corrs + if (best_metric != "corr" and test_scores[best_metric] < best_prev) or ( + best_metric == "corr" and test_scores[best_metric] > best_prev + ): + if log_best: + best_model_path = logging( + args, + epoch, + test_scores, + filename="log.txt", + save_model=True, + model=model, + ) + best_prev = test_scores[best_metric] + + if epoch % 5 == 0: + print( + "Epoch: {} Avg Loss: {}, Time: {}".format( + epoch, losses / len(train_ds), time.time() - t0 + ) + ) + # train_scores = print_qerror(cost_norm.unnormalize_labels(cost_predss),cost_labelss, True) + test_scores, corrs = evaluate_model2vec( + model, val_ds, bs, cost_norm, device, True + ) + + scheduler.step() + + return model, best_model_path + + +import torch.nn.functional as F + + +def cosine_contrastive_loss(anchor, positive, negative, margin=0.2): + # Normalize to unit vectors + anchor = F.normalize(anchor, dim=1) + positive = F.normalize(positive, dim=1) + negative = F.normalize(negative, dim=1) + + pos_sim = F.cosine_similarity(anchor, positive) + neg_sim = F.cosine_similarity(anchor, negative) + + loss = F.relu(margin + neg_sim - pos_sim).mean() + return loss + + +def train_model2vec_with_contrastive( + model, + train_ds, + val_ds, + crit, + cost_norm, + args, + optimizer=None, + scheduler=None, + log_best=False, + best_metric="q_mean", +): + + to_pred, bs, device, epochs, clip_size = ( + args.to_predict, + args.bs, + args.device, + args.epochs, + args.clip_size, + ) + lr = args.lr + + if not optimizer: + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + if not scheduler: + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 20, 0.7) + + t0 = time.time() + + rng = np.random.default_rng() + + best_prev = 999999 + best_model_path = None + + for epoch in range(epochs): + losses = 0 + mse_losses = 0 + contrastive_losses = 0 + cost_predss = np.empty(0) + + model.train() + + train_idxs = rng.permutation(len(train_ds)) + + cost_labels = np.array(train_ds.costs)[train_idxs] + + for idxs in chunks(train_idxs, bs): + optimizer.zero_grad() + + batch, batch_labels = collator(list(zip(*[train_ds[j] for j in idxs]))) + + l, r = zip(*(batch_labels)) + + batch_cost_label = torch.FloatTensor(l).to(device) + batch = batch.to(device) + + model_embeds = model(batch) + cost_preds = model.get_pred1(model_embeds) + cost_preds = cost_preds.squeeze() + + similar_models_batch, _ = collator( + list(zip(*[train_ds[train_ds.similar_model_idx[j]] for j in idxs])) + ) + disimilar_models_batch, _ = collator( + list(zip(*[train_ds[train_ds.dissimilar_model_idx[j]] for j in idxs])) + ) + similar_models_batch = similar_models_batch.to(device) + disimilar_models_batch = disimilar_models_batch.to(device) + similar_models_embeds = model(similar_models_batch) + disimilar_models_embeds = model(disimilar_models_batch) + + mse_loss = crit(cost_preds, batch_cost_label) + contrastive_loss = cosine_contrastive_loss( + model_embeds, similar_models_embeds, disimilar_models_embeds + ) + loss = ( + mse_loss + contrastive_loss + ) # Adjust the weight of the contrastive loss as needed + + loss.backward() + + torch.nn.utils.clip_grad_norm_(model.parameters(), clip_size) + + optimizer.step() + # SQ: added the following 3 lines to fix the out of memory issue + # del batch + # del batch_labels + torch.cuda.empty_cache() + + losses += loss.item() + mse_losses += mse_loss.item() + contrastive_losses += contrastive_loss.item() + + cost_predss = np.append(cost_predss, cost_preds.detach().cpu().numpy()) + # break + + # break + if epoch > 10: + test_scores, corrs = evaluate_model2vec( + model, val_ds, bs, cost_norm, device, False + ) + test_scores["corr"] = corrs + if (best_metric != "corr" and test_scores[best_metric] < best_prev) or ( + best_metric == "corr" and test_scores[best_metric] > best_prev + ): + if log_best: + best_model_path = logging( + args, + epoch, + test_scores, + filename="log.txt", + save_model=True, + model=model, + ) + best_prev = test_scores[best_metric] + + if epoch % 5 == 0: + print( + "Epoch: {} Avg Loss: {}, MSE Loss: {}, Contrastive Loss: {}, Time: {}".format( + epoch, + losses / len(train_ds), + mse_losses / len(train_ds), + contrastive_losses / len(train_ds), + time.time() - t0, + ) + ) + # train_scores = print_qerror(cost_norm.unnormalize_labels(cost_predss),cost_labelss, True) + test_scores, corrs = evaluate_model2vec( + model, val_ds, bs, cost_norm, device, True + ) + + scheduler.step() + + return model, best_model_path + + +def logging(args, epoch, qscores, filename=None, save_model=False, model=None): + arg_keys = [attr for attr in dir(args) if not attr.startswith("__")] + arg_vals = [getattr(args, attr) for attr in arg_keys] + + res = dict(zip(arg_keys, arg_vals)) + model_checkpoint = str(hash(tuple(arg_vals))) + ".pt" + + res["epoch"] = epoch + res["model"] = model_checkpoint + + res = {**res, **qscores} + + filename = args.newpath + filename + model_checkpoint = args.newpath + model_checkpoint + + if filename is not None: + if os.path.isfile(filename): + df = pd.read_csv(filename) + res_df = pd.DataFrame([res]) + df = pd.concat([df, res_df], ignore_index=True) + df.to_csv(filename, index=False) + else: + df = pd.DataFrame(res, index=[0]) + df.to_csv(filename, index=False) + if save_model: + torch.save({"model": model.state_dict(), "args": args}, model_checkpoint) + + return res["model"] diff --git a/velox/optimizer/python/model2Vec/util.py b/velox/optimizer/python/model2Vec/util.py new file mode 100644 index 000000000..fc602ff5a --- /dev/null +++ b/velox/optimizer/python/model2Vec/util.py @@ -0,0 +1,58 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import numpy as np +import torch + +class Normalizer(): + def __init__(self, mini=None,maxi=None): + self.mini = mini + self.maxi = maxi + + def normalize_labels(self, labels, reset_min_max = False): + ## added 0.001 for numerical stability + labels = np.array(labels) + 0.001 + if self.mini is None or reset_min_max: + self.mini = labels.min() + print("min log(label): {}".format(self.mini)) + if self.maxi is None or reset_min_max: + self.maxi = labels.max() + print("max log(label): {}".format(self.maxi)) + labels_norm = (labels - self.mini) / (self.maxi - self.mini) + # Threshold labels <-- but why... + labels_norm = np.minimum(labels_norm, 1) + labels_norm = np.maximum(labels_norm, 0.001) + + return labels_norm + + def unnormalize_labels(self, labels_norm): + labels_norm = np.array(labels_norm, dtype=np.float32) + labels = (labels_norm * (self.maxi - self.mini)) + self.mini + return np.array(labels) - 0.001 + + + +def seed_everything(): + torch.manual_seed(0) + import random + random.seed(0) + np.random.seed(0) + torch.backends.cudnn.benchmark = False + + + +def normalize_data(val, column_name, column_min_max_vals): + min_val = column_min_max_vals[column_name][0] + max_val = column_min_max_vals[column_name][1] + val = float(val) + val_norm = 0.0 + if max_val > min_val: + val_norm = (val - min_val) / (max_val - min_val) + return np.array(val_norm, dtype=np.float32) + + + + + + + diff --git a/velox/optimizer/python/query2Vec/CREDIT b/velox/optimizer/python/query2Vec/CREDIT new file mode 100644 index 000000000..271aa3219 --- /dev/null +++ b/velox/optimizer/python/query2Vec/CREDIT @@ -0,0 +1 @@ +CREDIT: The code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer diff --git a/velox/optimizer/python/query2Vec/database_util.py b/velox/optimizer/python/query2Vec/database_util.py new file mode 100644 index 000000000..ac30839e3 --- /dev/null +++ b/velox/optimizer/python/query2Vec/database_util.py @@ -0,0 +1,915 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import numpy as np +import pandas as pd +import csv +import torch +import logging +import re +import json +import warnings +from tqdm import tqdm +import hashlib +import faiss +import warnings +import random +from collections import defaultdict + + +def read_json(path): + with open(path, "r") as f: + return json.load(f) + + +## bfs shld be enough +def floyd_warshall_rewrite(adjacency_matrix): + (nrows, ncols) = adjacency_matrix.shape + assert nrows == ncols + M = adjacency_matrix.copy().astype("long") + for i in range(nrows): + for j in range(ncols): + if i == j: + M[i][j] = 0 + elif M[i][j] == 0: + M[i][j] = 60 + + for k in range(nrows): + for i in range(nrows): + for j in range(nrows): + M[i][j] = min(M[i][j], M[i][k] + M[k][j]) + return M + + +def get_job_table_sample(workload_file_name, num_materialized_samples=1000): + + tables = [] + samples = [] + + # Load queries + with open(workload_file_name + ".csv", "r") as f: + data_raw = list(list(rec) for rec in csv.reader(f, delimiter="#")) + for row in data_raw: + tables.append(row[0].split(",")) + + if int(row[3]) < 1: + print("Queries must have non-zero cardinalities") + exit(1) + + print("Loaded queries with len ", len(tables)) + + # Load bitmaps + num_bytes_per_bitmap = int((num_materialized_samples + 7) >> 3) + with open(workload_file_name + ".bitmaps", "rb") as f: + for i in range(len(tables)): + four_bytes = f.read(4) + if not four_bytes: + print("Error while reading 'four_bytes'") + exit(1) + num_bitmaps_curr_query = int.from_bytes(four_bytes, byteorder="little") + bitmaps = np.empty( + (num_bitmaps_curr_query, num_bytes_per_bitmap * 8), dtype=np.uint8 + ) + for j in range(num_bitmaps_curr_query): + # Read bitmap + bitmap_bytes = f.read(num_bytes_per_bitmap) + if not bitmap_bytes: + print("Error while reading 'bitmap_bytes'") + exit(1) + bitmaps[j] = np.unpackbits(np.frombuffer(bitmap_bytes, dtype=np.uint8)) + samples.append(bitmaps) + print("Loaded bitmaps") + table_sample = [] + for ts, ss in zip(tables, samples): + d = {} + for t, s in zip(ts, ss): + tf = t.split(" ")[0] # remove alias + d[tf] = s + table_sample.append(d) + + return table_sample + + +def get_hist_file(hist_path, bin_number=50): + hist_file = pd.read_csv(hist_path) + for i in range(len(hist_file)): + freq = hist_file["freq"][i] + freq_np = np.frombuffer(bytes.fromhex(freq), dtype=float) + hist_file["freq"][i] = freq_np + + table_column = [] + for i in range(len(hist_file)): + table = hist_file["table"][i] + col = hist_file["column"][i] + table_alias = "".join([tok[0] for tok in table.split("_")]) + if table == "movie_info_idx": + table_alias = "mi_idx" + combine = ".".join([table_alias, col]) + table_column.append(combine) + hist_file["table_column"] = table_column + + for rid in range(len(hist_file)): + hist_file["bins"][rid] = [ + int(i) for i in hist_file["bins"][rid][1:-1].split(" ") if len(i) > 0 + ] + + if bin_number != 50: + hist_file = re_bin(hist_file, bin_number) + + return hist_file + + +def re_bin(hist_file, target_number): + for i in range(len(hist_file)): + freq = hist_file["freq"][i] + bins = freq2bin(freq, target_number) + hist_file["bins"][i] = bins + return hist_file + + +def freq2bin(freqs, target_number): + freq = freqs.copy() + maxi = len(freq) - 1 + + step = 1.0 / target_number + mini = 0 + while freq[mini + 1] == 0: + mini += 1 + pointer = mini + 1 + cur_sum = 0 + res_pos = [mini] + residue = 0 + while pointer < maxi + 1: + cur_sum += freq[pointer] + freq[pointer] = 0 + if cur_sum >= step: + cur_sum -= step + res_pos.append(pointer) + else: + pointer += 1 + + if len(res_pos) == target_number: + res_pos.append(maxi) + + return res_pos + + +class Batch: + def __init__(self, attn_bias, rel_pos, heights, x, y=None): + super(Batch, self).__init__() + + self.heights = heights + self.x, self.y = x, y + self.attn_bias = attn_bias + self.rel_pos = rel_pos + + def to(self, device): + + self.heights = self.heights.to(device) + self.x = self.x.to(device) + + self.attn_bias, self.rel_pos = self.attn_bias.to(device), self.rel_pos.to( + device + ) + + return self + + def __len__(self): + return self.in_degree.size(0) + + +def pad_1d_unsqueeze(x, padlen): + x = x + 1 # pad id = 0 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen], dtype=x.dtype) + new_x[:xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pad_2d_unsqueeze(x, padlen): + # dont know why add 1, comment out first + # x = x + 1 # pad id = 0 + xlen, xdim = x.size() + if xlen < padlen: + new_x = x.new_zeros([padlen, xdim], dtype=x.dtype) + 1 + new_x[:xlen, :] = x + x = new_x + return x.unsqueeze(0) + + +def pad_rel_pos_unsqueeze(x, padlen): + x = x + 1 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype) + new_x[:xlen, :xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pad_attn_bias_unsqueeze(x, padlen): + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype).fill_(float("-inf")) + new_x[:xlen, :xlen] = x + new_x[xlen:, :xlen] = 0 + x = new_x + return x.unsqueeze(0) + + +def collator(small_set): + y = small_set[1] + xs = [s["x"] for s in small_set[0]] + + num_graph = len(y) + x = torch.cat(xs) + attn_bias = torch.cat([s["attn_bias"] for s in small_set[0]]) + rel_pos = torch.cat([s["rel_pos"] for s in small_set[0]]) + heights = torch.cat([s["heights"] for s in small_set[0]]) + + return Batch(attn_bias, rel_pos, heights, x), y + + +def filterDict2Hist(hist_file, filterDict, encoder): + buckets = len(hist_file["bins"][0]) + empty = np.zeros(buckets - 1) + ress = np.zeros((3, buckets - 1)) + # iterate over each filter + for i in range(len(filterDict["colId"])): + colId = filterDict["colId"][i] + col = encoder.idx2col[colId] + if col == "NA": + ress[i] = empty + continue + bins = hist_file.loc[hist_file["column"] == col, "bins"].iloc[0] + + opId = filterDict["opId"][0] + op = encoder.idx2op[opId] + + val = filterDict["val"][0] + + left = 0 + right = len(bins) - 1 + + if col in encoder.categorical_vals_mapping: + # categorical column + # print("col: ", col, " val: ", val) + left = right = val + # print("left: ", left, " right: ", right) + elif col in encoder.column_min_max_vals: + # numerical column + mini, maxi = encoder.column_min_max_vals[col] + val_unnorm = val * (maxi - mini) + mini + + for j in range(len(bins)): + if bins[j] < val_unnorm: + left = j + if bins[j] > val_unnorm: + right = j + break + else: + logging.warning(f"Column {col} not found in encoder, current encoder map:", encoder.column_min_max_vals) + ress = ress.flatten() + return ress + + res = np.zeros(len(bins) - 1) + + if op == "eq": + res[left:right] = 1 + elif op == "lt": + res[:left] = 1 + elif op == "gt": + res[right:] = 1 + elif op == "lte": + res[: right + 1] = 1 + elif op == "gte": + res[left:] = 1 + + ress[i] = res + + ress = ress.flatten() + return ress + + +def format_join(plan): + if "joinType" in plan: + return plan["joinType"] + else: + return None + + +def format_filter(plan): + filters = [] + alias = None + try: + if "filter" in plan: + if plan["filter"]["functionName"] in [ + "eq", + "lt", + "gt", + "gte", + "lte", + "like", + ]: + if plan["filter"]["functionName"] == "like": + # remove the % in the value + plan["filter"]["inputs"][1]["value"]["value"] = plan["filter"][ + "inputs" + ][1]["value"]["value"].strip("%") + if "nullOnFailure" in plan["filter"]["inputs"][0]: + if plan["filter"]["inputs"][1]["name"] == "CastTypedExpr": + # there is a casting + filter = "{} {} {}".format( + plan["filter"]["inputs"][0]["inputs"][0]["fieldName"], + plan["filter"]["functionName"], + plan["filter"]["inputs"][1]["inputs"][0]["value"]["value"], + ) + else: + filter = "{} {} {}".format( + plan["filter"]["inputs"][0]["inputs"][0]["fieldName"], + plan["filter"]["functionName"], + plan["filter"]["inputs"][1]["value"]["value"], + ) + else: + if plan["filter"]["inputs"][1]["name"] == "CastTypedExpr": + # there is a casting + filter = "{} {} {}".format( + plan["filter"]["inputs"][0]["fieldName"], + plan["filter"]["functionName"], + plan["filter"]["inputs"][1]["inputs"][0]["value"]["value"], + ) + else: + filter = "{} {} {}".format( + plan["filter"]["inputs"][0]["fieldName"], + plan["filter"]["functionName"], + plan["filter"]["inputs"][1]["value"]["value"], + ) + filters.append(filter) + else: + raise ValueError( + "Unsupported filter function: {}".format( + plan["filter"]["functionName"] + ) + ) + + # print("[INFO-format_filter] filter: ", plan["filter"], " filters: ", filters) + except Exception as e: + print("[ERROR-format_filter] plan: ", plan["filter"]) + raise e + + return filters, alias + + +def extract_ml_operators(node, ml_ops=None, ml_op_dims=None, ml_nested_kernels=None): + if ml_ops is None: + ml_ops = [] + if ml_op_dims is None: + ml_op_dims = [] + if ml_nested_kernels is None: + ml_nested_kernels = [] + + # Define a regex pattern to match ML operators with suffixes + ml_operator_pattern = re.compile( + r"^(relu|mat_mul|mat_vector_add|softmax|argmax|batch_norm|torchdnn)" + ) + + # Check if the node contains a "functionName" and matches the pattern + if "functionName" in node: + function_name = node["functionName"] + if ml_operator_pattern.match(function_name): + ml_ops.append(function_name) + ml_op_dims.append(node.get("dims", [0])) + ml_nested_kernels.append(node.get("torchdnn_kernels", [])) + + # Recursively check the "inputs" if they exist + if "inputs" in node and isinstance(node["inputs"], list): + for child in node["inputs"]: + extract_ml_operators(child, ml_ops, ml_op_dims, ml_nested_kernels) + # reverse the order: starting from the first layer + return ml_ops[::-1], ml_op_dims[::-1], ml_nested_kernels[::-1] + + +def format_ml_ops(plan): + list_ml_ops = [] + list_ml_op_dims = [] + list_ml_nested_kernels = [] + if "projections" in plan: + for projection in plan["projections"]: + # Check if the node contains a "functionName" and matches the pattern + if "functionName" in projection: + ml_ops, ml_op_dims, ml_nested_kernels = extract_ml_operators(projection) + list_ml_ops.extend(ml_ops) + list_ml_op_dims.extend(ml_op_dims) + list_ml_nested_kernels.extend(ml_nested_kernels) + + return list_ml_ops, list_ml_op_dims, list_ml_nested_kernels + + +def format_ml_op_name(op_name): + if "mat_mul" in op_name: + return "mat_mul" + elif "mat_vector_add" in op_name: + return "mat_vector_add" + elif "relu" in op_name: + return "relu" + elif "softmax" in op_name: + return "softmax" + elif "argmax" in op_name: + return "argmax" + elif "batch_norm" in op_name: + return "batch_norm" + elif "torchdnn" in op_name: + return "torchdnn" + else: + logging.warning(f"[MLOP-Format] Unsupported ML operator: {op_name}") + return "NA" + + +def compute_torchdnn_computation_complexity(ml_nested_op_dims, ml_nested_ops): + complexity = 0 + for idx, op in enumerate(ml_nested_ops): + if op == "MatMul": + complexity += ml_nested_op_dims[idx * 2] * ml_nested_op_dims[idx * 2 + 1] + elif op == "MatAdd": + complexity += ml_nested_op_dims[idx * 2] + elif op == "ReLU": + complexity += ml_nested_op_dims[idx * 2] + elif op == "Softmax": + complexity += ml_nested_op_dims[idx * 2] + elif op == "Argmax": + complexity += ml_nested_op_dims[idx * 2] + elif op == "BatchNorm": + complexity += ml_nested_op_dims[idx * 2] + else: + logging.warning( + f"[TorchDNN-Complex.-Comput.] Unsupported ML operator: {op}" + ) + return complexity + + +def parse_predicate(predicate): + # Define known operators (longer ones first to prevent partial match issues) + operators = ['!=', '>=', '<=', '=', '>', '<', 'like', 'eq', 'gt', 'lt', 'gte', 'lte'] + # Sort operators by length to match longest operator first + operators = sorted(operators, key=len, reverse=True) + pattern = r'\s*(' + '|'.join(map(re.escape, operators)) + r')\s*' + + match = re.split(pattern, predicate, maxsplit=1) + if len(match) == 3: + column, operator, value = match + column = column.strip() + operator = operator.strip() + value = value.strip() + if column == "store_id": + column = "store" + return column, operator, value + else: + raise ValueError(f"Could not parse predicate: {predicate}") + +class Encoder: + def __init__( + self, + column_min_max_vals, + categorical_vals_mapping, + col2idx, + op2idx={"NA": 0, "lt": 1, "eq": 2, "gt": 3, "lte": 4, "gte": 5, "like": 6}, + mlop2idx={ + "NA": 0, + "mat_mul": 1, + "mat_vector_add": 2, + "relu": 3, + "softmax": 4, + "argmax": 5, + "batch_norm": 6, + "torchdnn": 7, + }, + ): + self.column_min_max_vals = column_min_max_vals + self.categorical_vals_mapping = categorical_vals_mapping + self.col2idx = col2idx + self.op2idx = op2idx + + idx2col = {} + for k, v in col2idx.items(): + idx2col[v] = k + self.idx2col = idx2col + self.idx2op = {v: k for k, v in op2idx.items()} + self.mlop2idx = mlop2idx + self.idx2mlop = {v: k for k, v in mlop2idx.items()} + self.op_complexity_min_max = 1, 100000 * 2048 + + # store the minmium value and maximum value of rows and cols + self.table_rows_min_max = 10, 2500000 + self.table_cols_min_max = 1, 500 + + self.type2idx = {} + self.idx2type = {} + self.join2idx = {} + self.idx2join = {} + + self.table2idx = {"NA": 0} + self.idx2table = {0: "NA"} + + def set_column_normalizer(self, column_min_max_vals, categorical_vals_mapping): + self.column_min_max_vals = column_min_max_vals + self.categorical_vals_mapping = categorical_vals_mapping + + def normalize_table_stats(self, num_rows, num_cols): + num_rows_normalized = (num_rows - self.table_rows_min_max[0]) / ( + self.table_rows_min_max[1] - self.table_rows_min_max[0] + ) + num_cols_normalized = (num_cols - self.table_cols_min_max[0]) / ( + self.table_cols_min_max[1] - self.table_cols_min_max[0] + ) + if num_rows_normalized > 1: + logging.warning( + f"Table rows {num_rows} is greater than max value {self.table_rows_min_max[1]}" + ) + if num_cols_normalized > 1: + logging.warning( + f"Table cols {num_cols} is greater than max value {self.table_cols_min_max[1]}" + ) + return num_rows_normalized, num_cols_normalized + + def encode_ml_ops(self, ml_ops, ml_op_dims, ml_nested_ops): + # # version 1 + # most_complicate_op = "NA" + # least_complicate_op = "NA" + # op_complexity_min = self.op_complexity_min_max[1] + # op_complexity_max = self.op_complexity_min_max[0] + # num_ops = len(ml_ops) + # if len(ml_ops) == 0: + # return 0, self.mlop2idx["NA"], self.mlop2idx["NA"], 0.0, 0.0, + # for idx, op in enumerate(ml_ops): + # op_complexity = np.prod(ml_op_dims[idx]) + # if op_complexity > op_complexity_max: + # op_complexity_max = op_complexity + # most_complicate_op = self.mlop2idx[format_ml_op_name(op)] + # if op_complexity < op_complexity_min: + # op_complexity_min = op_complexity + # least_complicate_op = self.mlop2idx[format_ml_op_name(op)] + + # # normalize op_complexity + # op_complexity_min = (op_complexity_min - self.op_complexity_min_max[0]) / (self.op_complexity_min_max[1] - self.op_complexity_min_max[0]) + # op_complexity_max = (op_complexity_max - self.op_complexity_min_max[0]) / (self.op_complexity_min_max[1] - self.op_complexity_min_max[0]) + + # return num_ops, least_complicate_op, most_complicate_op, op_complexity_min, op_complexity_max + + # version 2 + most_complicate_op = "NA" + least_complicate_op = "NA" + op_complexity_min = self.op_complexity_min_max[1] + op_complexity_max = self.op_complexity_min_max[0] + num_ops = len(ml_ops) + list_op_complexity = np.zeros(50) # use fixed lens + if len(ml_ops) == 0: + return 0, self.mlop2idx["NA"], self.mlop2idx["NA"], list_op_complexity + for idx, op in enumerate(ml_ops): + op_complexity = 0 + if "torchdnn" in op: + # compute the complexity of the nested kernels if it is a torchdnn operation + op_complexity = compute_torchdnn_computation_complexity( + ml_nested_op_dims=ml_op_dims[idx], ml_nested_ops=ml_nested_ops[idx] + ) + else: + op_complexity = np.prod(ml_op_dims[idx]) + if op_complexity == 0: + # infer the complexity of the operation from last op + # use the last dimension to estimate the complexity + if idx > 0: + op_complexity = ml_op_dims[idx - 1][-1] + + if op_complexity > op_complexity_max: + most_complicate_op = format_ml_op_name(op) + if op_complexity < op_complexity_min: + least_complicate_op = format_ml_op_name(op) + list_op_complexity[idx] = op_complexity + + # normalize op_complexity + list_op_complexity = (list_op_complexity - self.op_complexity_min_max[0]) / ( + self.op_complexity_min_max[1] - self.op_complexity_min_max[0] + ) + + most_complicate_op = self.mlop2idx[most_complicate_op] + least_complicate_op = self.mlop2idx[least_complicate_op] + + return num_ops, least_complicate_op, most_complicate_op, list_op_complexity + + def normalize_val(self, column, val, log=False): + # if column is categorical + if column in self.categorical_vals_mapping: + if val in self.categorical_vals_mapping[column]: + return self.categorical_vals_mapping[column][val] + else: + logging.warning(f"[normalize_val] Value {val} not found in mapping for column {column}") + return 0 + # if column is numerical + else: + mini, maxi = self.column_min_max_vals[column] + + val_norm = 0.0 + if maxi > mini: + val_norm = (float(val) - mini) / (maxi - mini) + else: + raise ValueError( + f"[normalize_val] Min value {mini} is greater than max value {maxi} for column {column}" + ) + return val_norm + + def encode_filters(self, filters=[], alias=None): + ## filters: list of dict + + # print(filt, alias) + if len(filters) == 0: + return { + "colId": [self.col2idx["NA"]], + "opId": [self.op2idx["NA"]], + "val": [0.0], + } + res = {"colId": [], "opId": [], "val": []} + for filt in filters: + filt = "".join(c for c in filt if c not in "()") + fs = filt.split(" AND ") + for f in fs: + # print(filters) + # col, op, val = f.split(" ") + # improved version + col, op, val = parse_predicate(f) + if alias is not None: + column = alias + "." + col + else: + column = col + # column = alias + '.' + col + # print(f) + if column not in self.col2idx: + # create a new label for it + self.col2idx[column] = len(self.col2idx) + self.idx2col[self.col2idx[column]] = column + # logging.warning( + # f"[encode_filters] Column {column} not found in encoder.col2idx" + # ) + # return { + # "colId": [self.col2idx["NA"]], + # "opId": [self.op2idx["NA"]], + # "val": [0.0], + # } + res["colId"].append(self.col2idx[column]) + res["opId"].append(self.op2idx[op]) + res["val"].append(self.normalize_val(column, val)) + return res + + def encode_join(self, join): + if join not in self.join2idx: + self.join2idx[join] = len(self.join2idx) + self.idx2join[self.join2idx[join]] = join + return self.join2idx[join] + + def encode_table(self, table): + if table not in self.table2idx: + self.table2idx[table] = len(self.table2idx) + self.idx2table[self.table2idx[table]] = table + return self.table2idx[table] + + def encode_type(self, nodeType): + if nodeType not in self.type2idx: + self.type2idx[nodeType] = len(self.type2idx) + self.idx2type[self.type2idx[nodeType]] = nodeType + return self.type2idx[nodeType] + + +class TreeNode: + def __init__( + self, + nodeType, + typeId, + filt, + card, + join, + join_str, + filterDict, + ml_model_embeds=None, + ): + self.nodeType = nodeType + self.typeId = typeId + self.filter = filt + + self.table = "NA" + self.table_id = 0 + self.query_id = None ## so that sample bitmap can recognise + + self.join = join + self.join_str = join_str + self.card = card #'Actual Rows' + self.children = [] + self.rounds = 0 + self.agg_keys = [] + + self.filterDict = filterDict + + # only be set at TableScan node + self.num_rows = 0 + self.num_cols = 0 + + # only be set when ML ops are invoked + self.ml_model_embeds = ml_model_embeds + # self.ml_ops = [] + # self.ml_op_dims = [] + # self.ml_nested_ops = [] + + self.parent = None + + self.feature = None + + def addChild(self, treeNode): + self.children.append(treeNode) + + def __str__(self): + # TODO: add model computation graph here + # return TreeNode.print_nested(self) + return "{} with table: {}, filter: {}, {}, {} children".format( + self.nodeType, + self.table, + self.filter, + # self.ml_ops, + # self.ml_op_dims, + self.join_str, + len(self.children), + ) + + def __repr__(self): + return self.__str__() + + @staticmethod + def print_nested(node, indent=0): + print( + "--" * indent + + "{} with table: {}, filter: {}, {}, {} children".format( + node.nodeType, + node.table, + node.filter, + # node.ml_ops, + # node.ml_op_dims, + node.join_str, + len(node.children), + ) + ) + for k in node.children: + TreeNode.print_nested(k, indent + 1) + +class WeisfeilerLehmanQueryEncoder: + def __init__(self, range_percent=0.1, num_iterations=2, model_embed_dim=192, embed_sim_threshold=0.8): + # Maintain groups of labels, where the key is the operator type + # and the value is a list of unique node representations for different node type + self.label_groups = defaultdict(set) + self.range_percent = range_percent + self.num_iterations = num_iterations + self.embed_sim_threshold = embed_sim_threshold + self.unique_labels = set() # To track unique labels at final iteration + self.model_embedding_index = faiss.index_factory( + 192, "Flat", faiss.METRIC_INNER_PRODUCT + ) + self.model_embedding_idx = 0 + + def inital_node_label(self, node): + node_type = node.nodeType + node_ml_model_embeds = node.ml_model_embeds + node_contains_ml = not (node_ml_model_embeds == 0).all() + + node_groups = self.label_groups[node_type] + + if node_contains_ml: + # search node + query_embed = node_ml_model_embeds.reshape(1, -1).copy() + faiss.normalize_L2(query_embed) + D, I = self.model_embedding_index.search(query_embed, 5) + if D[0][0] > self.embed_sim_threshold: + label = f"{node_type}_{I[0][0]}" + else: + label = f"{node_type}_{self.model_embedding_idx}" + self.model_embedding_idx += 1 + self.model_embedding_index.add(query_embed) + elif node_type == "TableScanNode": + # For TableScan nodes, we use the table name as the label + table_id = node.table_id + label = f"{node_type}_{table_id}" + elif node_type == "FilterNode": + # For Filter nodes, we use the filter condition as the label + filter_col_id = node.filterDict['colId'][0] + filter_op_id = node.filterDict['opId'][0] + filter_op_val = node.filterDict['val'][0] + label = f"{node_type}_{filter_col_id}_{filter_op_id}_{filter_op_val}" + elif node_type == "ProjectNode": + # For Project nodes, we use the projection columns as the label + label = node_type + elif node_type == "HashJoinNode" or node_type == "NestedLoopJoinNode": + # For Join nodes, we use the join condition as the label + label = f"{node_type}_{node.join}" + elif node_type == "AggregationNode": + # For Aggregation nodes, we use the aggregation columns as the label + agg_cols = "_".join([str(col) for col in node.agg_keys]) + label = f"{node_type}_{agg_cols}" + else: + label = node_type + + node_groups.add(label) + + return label + + def hash_label(label): + """Deterministically hash a string label into a fixed string.""" + return hashlib.md5(label.encode()).hexdigest() + + def get_wl_subtree_features(self, root): + """ + Extract WL subtree features from a model tree. + """ + label_dict = {} # node -> current label + label_history = {} # node -> list of labels at each iteration + node_list = [] + + # Step 1: Traverse tree to initialize labels + def dfs(node): + label = self.inital_node_label(node) + # label = str(node.ml_op_type) # Initial version, using only the type + label_dict[node] = label + label_history[node] = [label] + node_list.append(node) + for child in node.children: + dfs(child) + dfs(root) + + # Step 2: WL iterations + for i in range(self.num_iterations): + new_labels = {} + for node in node_list: + child_labels = sorted([label_dict[child] for child in node.children]) + neighbor_str = label_dict[node] + "_" + "_".join(child_labels) + # new_label = hash_label(neighbor_str) + new_label = neighbor_str + new_labels[node] = new_label + label_history[node].append(new_label) + label_dict = new_labels + + # Step 3: Count all labels across iterations and add labels to unique labels set + feature_counter = defaultdict(int) + for node in node_list: + for lbl in label_history[node]: + feature_counter[lbl] += 1 + self.unique_labels.update(label_history[node]) + + return feature_counter + + def assign_init_label_for_dataset(self, dataset): + """ + Encode all trees in the dataset and return a list of feature dictionaries. + + Args: + dataset : ModelComputationGraphDataset + """ + for i in range(len(dataset)): + root_node = dataset.rootNodes[i] + wl_feature = self.get_wl_subtree_features(root_node) + dataset.wl_features.append(wl_feature) + + def obtain_wl_feature_for_dataset(self, dataset): + self.assign_init_label_for_dataset(dataset) + wl_sorted_labels = sorted(self.unique_labels) + dataset.wl_feature_vectors = np.zeros((len(dataset), len(wl_sorted_labels)), dtype=np.float32) + wl_labels2idx = {label: idx for idx, label in enumerate(wl_sorted_labels)} + + for i in tqdm(range(len(dataset))): + wl_feature = dataset.wl_features[i] + for label, count in wl_feature.items(): + if label in wl_labels2idx: + dataset.wl_feature_vectors[i, wl_labels2idx[label]] = count + + def construct_similar_dissimilar_pairs_for_dataset(self, dataset, sim_threshold=0.8, dissim_threshold=0.2): + faiss_index = faiss.index_factory( + dataset.wl_feature_vectors.shape[1], "Flat", faiss.METRIC_INNER_PRODUCT + ) + data_to_add = dataset.wl_feature_vectors.copy() + faiss.normalize_L2(data_to_add) + faiss_index.add(data_to_add) + + query_vectors = dataset.wl_feature_vectors[:].copy() + faiss.normalize_L2(query_vectors) + D, I = faiss_index.search(query_vectors, len(dataset)) + for i in range(len(dataset)): + max_sim = D[i][1] + min_sim = D[i][-1] + max_sim_idx = I[i][1] + min_sim_idx = I[i][-1] + if max_sim > sim_threshold: + dataset.similar_query_idx[i] = max_sim_idx + else: + dataset.similar_query_idx[i] = i + if min_sim < dissim_threshold: + dataset.dissimilar_query_idx[i] = min_sim_idx + else: + if i > 0: + dataset.dissimilar_query_idx[i] = dataset.similar_query_idx[i - 1] + else: + dataset.dissimilar_query_idx[i] = random.randint(0, len(dataset) - 1) + warnings.warn(f"[WL-Encoder] No dissimilar query found for model {i}, minimum similarity: {min_sim}") \ No newline at end of file diff --git a/velox/optimizer/python/query2Vec/dataset.py b/velox/optimizer/python/query2Vec/dataset.py new file mode 100644 index 000000000..606352ba1 --- /dev/null +++ b/velox/optimizer/python/query2Vec/dataset.py @@ -0,0 +1,469 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import torch +from torch.utils.data import Dataset +import numpy as np +import json +import pandas as pd +import sys, os +import ast +import warnings +from collections import deque +from .database_util import format_filter, format_join, TreeNode, filterDict2Hist +from .database_util import * +from query2Vec.util import Normalizer +from model2Vec.database_util import collator, ModelGraphEncoder, WeisfeilerLehmanEncoder +from model2Vec.dataset import ( + ModelGraphTreeNode, + ModelComputationGraphDataset, + find_ml_express_node_from_plan, +) +from model2Vec.model import Model2Vec + + +def get_numerical_min_max_mapping(df_stat): + numerical_min_max_mapping = {} + for idx, row in df_stat.iterrows(): + if row["type"] == "Numerical": + numerical_min_max_mapping[row["column"]] = (row["bins"][0], row["bins"][-1]) + return numerical_min_max_mapping + + +def get_categorical_mapping(df_stat): + categorical_mapping = {} + for idx, row in df_stat.iterrows(): + if row["type"] == "Categorical": + bins = row["bins"] + # filter out None + bins = [str(x) for x in bins if x != ""] + column_mapping = {} + for idx, val in enumerate(bins): + column_mapping[val] = idx + categorical_mapping[row["column"]] = column_mapping + return categorical_mapping + + +def calculate_height(adj_list, tree_size): + if tree_size == 1: + return np.array([0]) + + adj_list = np.array(adj_list) + node_ids = np.arange(tree_size, dtype=int) + node_order = np.zeros(tree_size, dtype=int) + uneval_nodes = np.ones(tree_size, dtype=bool) + + parent_nodes = adj_list[:, 0] + child_nodes = adj_list[:, 1] + + n = 0 + while uneval_nodes.any(): + uneval_mask = uneval_nodes[child_nodes] + unready_parents = parent_nodes[uneval_mask] + + node2eval = uneval_nodes & ~np.isin(node_ids, unready_parents) + node_order[node2eval] = n + uneval_nodes[node2eval] = False + n += 1 + return node_order + + +def node2dict(treeNode): + + adj_list, num_child, features = topo_sort(treeNode) + heights = calculate_height(adj_list, len(features)) + + return { + "features": torch.FloatTensor(np.array(features)), + "heights": torch.LongTensor(heights), + "adjacency_list": torch.LongTensor(np.array(adj_list)), + } + + +def topo_sort(root_node): + # nodes = [] + adj_list = [] # from parent to children + num_child = [] + features = [] + + toVisit = deque() + toVisit.append((0, root_node)) + next_id = 1 + while toVisit: + idx, node = toVisit.popleft() + # nodes.append(node) + features.append(node.feature) + num_child.append(len(node.children)) + for child in node.children: + toVisit.append((next_id, child)) + adj_list.append((idx, next_id)) + next_id += 1 + + return adj_list, num_child, features + + +def pad_2d_unsqueeze(x, padlen): + # dont know why add 1, comment out first + # x = x + 1 # pad id = 0 + xlen, xdim = x.size() + if xlen < padlen: + new_x = x.new_zeros([padlen, xdim], dtype=x.dtype) + 1 + new_x[:xlen, :] = x + x = new_x + return x.unsqueeze(0) + + +def pad_rel_pos_unsqueeze(x, padlen): + x = x + 1 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype) + new_x[:xlen, :xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pad_attn_bias_unsqueeze(x, padlen): + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen, padlen], dtype=x.dtype).fill_(float("-inf")) + new_x[:xlen, :xlen] = x + new_x[xlen:, :xlen] = 0 + x = new_x + return x.unsqueeze(0) + + +def floyd_warshall_rewrite(adjacency_matrix): + (nrows, ncols) = adjacency_matrix.shape + assert nrows == ncols + M = adjacency_matrix.copy().astype("long") + for i in range(nrows): + for j in range(ncols): + if i == j: + M[i][j] = 0 + elif M[i][j] == 0: + M[i][j] = 60 + + for k in range(nrows): + for i in range(nrows): + for j in range(nrows): + M[i][j] = min(M[i][j], M[i][k] + M[k][j]) + return M + + +def pad_1d_unsqueeze(x, padlen): + x = x + 1 # pad id = 0 + xlen = x.size(0) + if xlen < padlen: + new_x = x.new_zeros([padlen], dtype=x.dtype) + new_x[:xlen] = x + x = new_x + return x.unsqueeze(0) + + +def pre_collate(the_dict, max_node=50, rel_pos_max=20): + + x = pad_2d_unsqueeze(the_dict["features"], max_node) + N = len(the_dict["features"]) + attn_bias = torch.zeros([N + 1, N + 1], dtype=torch.float) + + edge_index = the_dict["adjacency_list"].t() + if len(edge_index) == 0: + shortest_path_result = np.array([[0]]) + path = np.array([[0]]) + adj = torch.tensor([[0]]).bool() + else: + adj = torch.zeros([N, N], dtype=torch.bool) + adj[edge_index[0, :], edge_index[1, :]] = True + + shortest_path_result = floyd_warshall_rewrite(adj.numpy()) + + rel_pos = torch.from_numpy((shortest_path_result)).long() + + attn_bias[1:, 1:][rel_pos >= rel_pos_max] = float("-inf") + + attn_bias = pad_attn_bias_unsqueeze(attn_bias, max_node + 1) + rel_pos = pad_rel_pos_unsqueeze(rel_pos, max_node) + + heights = pad_1d_unsqueeze(the_dict["heights"], max_node) + + return {"x": x, "attn_bias": attn_bias, "rel_pos": rel_pos, "heights": heights} + + +def read_and_process_histograms(path): + df_stat_columns = ["table", "column", "table_column", "type", "freqs", "bins"] + df_stat = pd.read_csv(path, sep="|", header=None, names=df_stat_columns) + for idx, row in df_stat.iterrows(): + df_stat.loc[idx, "freqs"] = ast.literal_eval(row["freqs"]) + if row["type"] == "Numerical": + df_stat.loc[idx, "bins"] = [float(x) for x in ast.literal_eval(row["bins"])] + else: + df_stat.loc[idx, "bins"] = ast.literal_eval(row["bins"]) + return df_stat + + +def remove_type_attribute(obj): + if isinstance(obj, dict): + # Use list() to avoid 'RuntimeError: dictionary changed size during iteration' + for key in list(obj.keys()): + if key == "type" or key == "outputType": + del obj[key] + elif key == "functionName" and obj[key] is None: + del obj[key] + else: + # Recursively call the function on nested dictionaries + remove_type_attribute(obj[key]) + elif isinstance(obj, list): + # Recursively call the function on each element of the list + for item in obj: + remove_type_attribute(item) + + +class PlanTreeDataset(Dataset): + def __init__( + self, + df: pd.DataFrame, + encoder: Encoder, + to_predict: str, + table_sample: pd.DataFrame, + cost_norm: Normalizer, + dir_path: str = "/home/velox/velox/optimizer/tests/", + query_process: bool = False, + path_to_stats: str = None, + model2vec: Model2Vec = None, + model_graph_encoder: ModelGraphEncoder = None, + model2vec_normalizer: Normalizer = None, + device: torch.device = "cpu", + pre_process_stats: bool = False, + query_stats_df: pd.DataFrame = None, + ): + self.df = df + self.encoder = encoder + + self.table_sample = table_sample + self.length = len(df) + self.dir_path = dir_path + self.query_plans = [ + read_json(os.path.join(dir_path, x)) for x in df["serializedPlanPath"] + ] + if query_process: + for plan in self.query_plans: + remove_type_attribute(plan) + self.model2vec = model2vec + self.model_graph_encoder = model_graph_encoder + self.model2vec_normalizer = model2vec_normalizer + self.device = device + + # self.query_stats = [ + # read_and_process_histograms(os.path.join(dir_path, x)) + # for x in df["tableStatsPath"] + # ] + + if not pre_process_stats: + self.query_stats = read_and_process_histograms(path_to_stats) + self.encoder_min_max_map = get_numerical_min_max_mapping(self.query_stats) + self.encoder_cate_map = get_categorical_mapping(self.query_stats) + self.encoder.set_column_normalizer( + self.encoder_min_max_map, self.encoder_cate_map + ) + else: + self.query_stats = query_stats_df + + # self.encoder_min_max_map = get_numerical_min_max_mapping(self.query_stats) + # self.encoder_cate_map = get_categorical_mapping(self.query_stats) + # self.encoder_min_max_map = [ + # get_numerical_min_max_mapping(df_state) for df_state in self.query_stats + # ] + # self.encoder_cate_map = [ + # get_categorical_mapping(df_state) for df_state in self.query_stats + # ] + + self.cost_norm = cost_norm + + self.cards = np.zeros(len(df)) + self.costs = df["executionTime"].values + self.card_labels = torch.from_numpy(self.cards) # FIXME: + self.cost_labels = torch.from_numpy(cost_norm.normalize_labels(self.costs)) + self.to_predict = to_predict + if to_predict == "cost": + self.gts = self.costs + self.labels = self.cost_labels + else: + raise ValueError("Invalid to_predict value: {}".format(to_predict)) + + idxs = range(len(self.df)) + + self.treeNodes = [] ## for mem collection + self.rootNodes = [] + self.collated_dicts = [ + self.js_node2dict(i, plan) for i, plan in zip(idxs, self.query_plans) + ] + + self.similar_query_idx = {} + self.dissimilar_query_idx = {} + self.wl_features = [] + + def js_node2dict(self, idx, plan): + try: + # load the stats for each query + # self.encoder.set_column_normalizer( + # self.encoder_min_max_map[idx], self.encoder_cate_map[idx] + # ) + treeNode = self.traverse_query_plan(plan, idx, self.encoder) + self.rootNodes.append(treeNode) + _dict = node2dict(treeNode) + collated_dict = pre_collate(_dict) + except Exception as e: + print("Error in js_node2dict idx: ", idx) + raise e + + # self.treeNodes.clear() + # del self.treeNodes[:] + + return collated_dict + + def __len__(self): + return self.length + + def __getitem__(self, idx): + + return self.collated_dicts[idx], (self.cost_labels[idx], self.card_labels[idx]) + + def get_node_ml_embed(self, node_plan): + search_through_source = False + found, returned_node = find_ml_express_node_from_plan( + node_plan, search_through_source + ) + if found: + model2vec_ds = ModelComputationGraphDataset( + cost_normalizer=self.model2vec_normalizer, + model_graph_encoder=self.model_graph_encoder, + df=None, + query_plans=[node_plan], + mode="eval", + ) + model_embeds = ( + self.model2vec.get_embeddings(model2vec_ds, device=self.device) + .detach() + .cpu() + .numpy() + ) + assert len(model_embeds) == 1 + return model_embeds[0] + else: + return np.zeros(192) + + def traverse_query_plan(self, plan, idx, encoder): + # traverse plan + nodeType = plan["name"] + nodeTypeId = encoder.encode_type(nodeType) + card = None + filters, alias = format_filter(plan) + join = None + joinType = encoder.encode_join(format_join(plan)) + filters_encoded = encoder.encode_filters(filters, alias) + if not filters_encoded["colId"]: + raise ValueError( + "failed filters_encoded: ", idx, filters_encoded, filters, alias + ) + # TODO: Incorporate the Model2Vec + # model_embeds = np.random.random(192) + + ml_model_embeds = self.get_node_ml_embed(plan) + # ml_ops, ml_op_dims, ml_nested_ops = format_ml_ops(plan) + root = TreeNode( + nodeType, + nodeTypeId, + filters, + card, + joinType, + join, + filters_encoded, + ml_model_embeds=ml_model_embeds, + ) + if root.nodeType == "AggregationNode": + list_agg_keys = [] + for groupKey in plan.get("groupingKeys", []): + list_agg_keys.append(groupKey["fieldName"]) + sorted(list_agg_keys) + root.agg_keys = list_agg_keys + self.treeNodes.append(root) + if "tableName" in plan: + root.table = plan["tableName"] + root.table_id = encoder.encode_table(plan["tableName"]) + root.num_rows, root.num_cols = plan["tableStats"] + + # print("table: ", root.table, ", table_id: ", root.table_id) + # root.ml_model_embeds = model_embeds + # if ml_ops: + # root.ml_ops = ml_ops + # root.ml_op_dims = ml_op_dims + # root.ml_nested_ops = ml_nested_ops + root.query_id = idx # need to change + root.feature = node2feature( + root, encoder, self.query_stats, None + ) # last is table_sample + + if "sources" in plan: + for subplan in plan["sources"]: + subplan["parent"] = plan + node = self.traverse_query_plan(subplan, idx, encoder) + node.parent = root + root.addChild(node) + return root + + +def node2feature(node, encoder, hist_df, table_sample): + num_filter = len(node.filterDict["colId"]) + # number of filters is less than 3, pad with zeros + pad = np.zeros((3, 3 - num_filter)) + filts = np.array(list(node.filterDict.values())) # cols, ops, vals + ## 3x3 -> 9, get back with reshape 3,3 + filts = np.concatenate((filts, pad), axis=1).flatten() + mask = np.zeros(3) + mask[:num_filter] = 1 + type_join = np.array([node.typeId, node.join]) + # use the history histrogram file and the query filter to get the histogram + # print("node.filterDict: ", node.filterDict) + hists = filterDict2Hist(hist_df, node.filterDict, encoder) + + # encode ml ops + # version 1 + # num_ops, least_complicate_op, most_complicate_op, op_complixity_min, op_complixity_max = encoder.encode_ml_ops(node.ml_ops, node.ml_op_dims) + # ml_features = np.array([num_ops, least_complicate_op, most_complicate_op, list_op_complixity]) + + # version 2 + # num_ops, least_complicate_op, most_complicate_op, list_op_complixity = ( + # encoder.encode_ml_ops(node.ml_ops, node.ml_op_dims, node.ml_nested_ops) + # ) + # ml_features = np.concatenate( + # ([num_ops, least_complicate_op, most_complicate_op], list_op_complixity) + # ) + # print("most_complicate_op: ", most_complicate_op, " op_complixity_max: ", op_complixity_max, " least_complicate_op: ", least_complicate_op, " op_complixity_min: ", op_complixity_min) + + ml_features = node.ml_model_embeds + # table stats + num_rows, num_cols = encoder.normalize_table_stats(node.num_rows, node.num_cols) + # print("num_rows: ", num_rows, " num_cols: ", num_cols, " node.num_rows: ", node.num_rows, " node.num_cols: ", node.num_cols) + table_stat_features = np.array([num_rows, num_cols]) + + # Retrive the sample from table_sample + # The 1000 bits will only be set on the TableScan node + # table, bitmap, 1 + 1000 bits + table = np.array([node.table_id]) + # TODO + # sample = np.zeros(1000) + # if node.table_id == 0: + # sample = np.zeros(1000) + # else: + # # node.table which is the tableName or Relation Name in query tree + # sample = table_sample[node.query_id][node.table] + + # return np.concatenate((type_join,filts,mask)) + # print(type_join.shape, filts.shape, mask.shape, ml_features.shape, hists.shape, table.shape, table_stat_features.shape, sample.shape) + # return np.concatenate( + # (type_join, filts, mask, ml_features, hists, table, table_stat_features, sample) + # ) + return np.concatenate( + (type_join, filts, mask, ml_features, hists, table, table_stat_features) + ) diff --git a/velox/optimizer/python/query2Vec/model.py b/velox/optimizer/python/query2Vec/model.py new file mode 100644 index 000000000..c3b7a5a86 --- /dev/null +++ b/velox/optimizer/python/query2Vec/model.py @@ -0,0 +1,492 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import torch +from torch.utils.data import Dataset +from .database_util import collator +import json +import pandas as pd +import torch.nn as nn +import torch.nn.functional as F + + +class Prediction(nn.Module): + def __init__( + self, in_feature=69, hid_units=256, contract=1, mid_layers=True, res_con=True + ): + super(Prediction, self).__init__() + self.mid_layers = mid_layers + self.res_con = res_con + + self.out_mlp1 = nn.Linear(in_feature, hid_units) + + self.mid_mlp1 = nn.Linear(hid_units, hid_units // contract) + self.mid_mlp2 = nn.Linear(hid_units // contract, hid_units) + + self.out_mlp2 = nn.Linear(hid_units, 1) + + def forward(self, features): + + hid = F.relu(self.out_mlp1(features)) + if self.mid_layers: + mid = F.relu(self.mid_mlp1(hid)) + mid = F.relu(self.mid_mlp2(mid)) + if self.res_con: + hid = hid + mid + else: + hid = mid + out = torch.sigmoid(self.out_mlp2(hid)) + + return out + + +class FeatureEmbed(nn.Module): + def __init__( + self, + model_embed_size=192, + embed_size=32, + tables=30, + types=20, + joins=40, + columns=80, + ops=30, + # num_ml_ops=20, + use_sample=False, + use_hist=True, + bin_number=50, + ): + super(FeatureEmbed, self).__init__() + self.model_embed_size = model_embed_size + self.use_sample = use_sample + self.embed_size = embed_size + + self.use_hist = use_hist + self.bin_number = bin_number + + self.typeEmbed = nn.Embedding(types, embed_size) + self.tableEmbed = nn.Embedding(tables, embed_size) + + self.columnEmbed = nn.Embedding(columns, embed_size) + self.opEmbed = nn.Embedding(ops, embed_size // 8) + + # self.mlOpEmbed = nn.Embedding(num_ml_ops, embed_size) + # TODO: assume 500 is the max number of operators at one expression + # self.numOpEmbed = nn.Embedding(500, embed_size) + # version 1 + # self.linearOpComplexity = nn.Linear(2, embed_size) + # version 2 + self.linearOpComplexity = nn.Linear(50, embed_size) + + self.linearModelEmbed = nn.Linear(model_embed_size, embed_size) + + self.linearFilter2 = nn.Linear( + embed_size + embed_size // 8 + 1, embed_size + embed_size // 8 + 1 + ) + self.linearFilter = nn.Linear( + embed_size + embed_size // 8 + 1, embed_size + embed_size // 8 + 1 + ) + + self.linearType = nn.Linear(embed_size, embed_size) + + self.linearJoin = nn.Linear(embed_size, embed_size) + + # if use_sample: + # self.linearSample = nn.Linear(1000 + 2, embed_size) + # else: + self.linearSample = nn.Linear(2, embed_size) + + self.linearHist = nn.Linear(bin_number, embed_size) + + self.joinEmbed = nn.Embedding(joins, embed_size) + + if use_hist: + self.project = nn.Linear( + embed_size * 6 + embed_size // 8 + 1, + embed_size * 6 + embed_size // 8 + 1, + ) + else: + self.project = nn.Linear( + embed_size * 6 + embed_size // 8 + 1, + embed_size * 6 + embed_size // 8 + 1, + ) + + # input: B by 14 (type, join, f1, f2, f3, mask1, mask2, mask3) + def forward(self, feature): + + # typeId, joinId, filtersId, filtersMask, numOps, leastComplOp, mostComplOp, opCompl, hists, table_sample = torch.split(feature,(1,1,9,3,1,1,1,2,self.bin_number*3,1001+2), dim = -1) + # version 2 + if self.use_sample: + ( + typeId, + joinId, + filtersId, + filtersMask, + modelEmbed, + # numOps, + # leastComplOp, + # mostComplOp, + # opCompl, + hists, + table_sample, + ) = torch.split( + feature, + (1, 1, 9, 3, self.model_embed_size, self.bin_number * 3, 1001 + 2), + # (1, 1, 9, 3, 1, 1, 1, 50, self.bin_number * 3, 1001 + 2), + dim=-1, + ) + else: + ( + typeId, + joinId, + filtersId, + filtersMask, + modelEmbed, + # numOps, + # leastComplOp, + # mostComplOp, + # opCompl, + hists, + table_sample, + ) = torch.split( + feature, + (1, 1, 9, 3, self.model_embed_size, self.bin_number * 3, 1 + 2), + dim=-1, + ) + + typeEmb = self.getType(typeId) # embed + joinEmb = self.getJoin(joinId) # embed + filterEmbed = self.getFilter(filtersId, filtersMask) + modelLinearEmbed = self.getModelEmbed(modelEmbed) # embed + # numOpsEmb = self.getNumOps(numOps) + # leastComplOpEmb = self.getMLOp(leastComplOp) + # mostComplOpEmb = self.getMLOp(mostComplOp) + # opComplEmb = self.getLinearOpComplexity(opCompl) + + histEmb = self.getHist(hists, filtersMask) + tableEmb = self.getTable(table_sample) + + if self.use_hist: + final = torch.cat( + ( + typeEmb, + filterEmbed, + joinEmb, + modelLinearEmbed, + tableEmb, + histEmb, + ), + dim=1, + ) + else: + final = torch.cat( + ( + typeEmb, + filterEmbed, + joinEmb, + modelLinearEmbed, + tableEmb, + ), + dim=1, + ) + final = F.leaky_relu(self.project(final)) + + return final + + def getModelEmbed(self, modelEmbed): + emb = self.linearModelEmbed(modelEmbed.float()) + return emb.squeeze(1) + + # def getNumOps(self, numOps): + # emb = self.numOpEmbed(numOps.long()) + # return emb.squeeze(1) + + # def getMLOp(self, mlOp): + # emb = self.mlOpEmbed(mlOp.long()) + # return emb.squeeze(1) + + # def getLinearOpComplexity(self, opCompl): + # emb = self.linearOpComplexity(opCompl) + # return emb.squeeze(1) + + def getType(self, typeId): + emb = self.typeEmbed(typeId.long()) + + return emb.squeeze(1) + + def getTable(self, table_sample): + if self.use_sample: + table, sample = torch.split(table_sample, (1, 1000 + 2), dim=-1) + emb = self.tableEmbed(table.long()).squeeze(1) + emb += self.linearSample(sample) + else: + table, sample = torch.split(table_sample, (1, 2), dim=-1) + emb = self.tableEmbed(table.long()).squeeze(1) + emb += self.linearSample(sample) + return emb + + def getJoin(self, joinId): + emb = self.joinEmbed(joinId.long()) + + return emb.squeeze(1) + + def getHist(self, hists, filtersMask): + # batch * 50 * 3 + histExpand = hists.view(-1, self.bin_number, 3).transpose(1, 2) + + emb = self.linearHist(histExpand) + emb[~filtersMask.bool()] = 0.0 # mask out space holder + + ## avg by # of filters + num_filters = torch.sum(filtersMask, dim=1) + total = torch.sum(emb, dim=1) + avg = total / num_filters.view(-1, 1) + + return avg + + def getFilter(self, filtersId, filtersMask): + ## get Filters, then apply mask + filterExpand = filtersId.view(-1, 3, 3).transpose(1, 2) + colsId = filterExpand[:, :, 0].long() + opsId = filterExpand[:, :, 1].long() + vals = filterExpand[:, :, 2].unsqueeze(-1) # b by 3 by 1 + + # b by 3 by embed_dim + + col = self.columnEmbed(colsId) + op = self.opEmbed(opsId) + + concat = torch.cat((col, op, vals), dim=-1) + concat = F.leaky_relu(self.linearFilter(concat)) + concat = F.leaky_relu(self.linearFilter2(concat)) + + ## apply mask + concat[~filtersMask.bool()] = 0.0 + + ## avg by # of filters + num_filters = torch.sum(filtersMask, dim=1) + total = torch.sum(concat, dim=1) + avg = total / num_filters.view(-1, 1) + + return avg + + +# def get_output_size(self): +# size = self.embed_size * 5 + self.embed_size // 8 + 1 +# return size + + +class QueryFormer(nn.Module): + def __init__( + self, + emb_size=32, + model_embed_size=192, + ffn_dim=32, + head_size=8, + dropout=0.1, + attention_dropout_rate=0.1, + n_layers=8, + use_sample=True, + use_hist=True, + bin_number=50, + pred_hid=256, + ): + + super(QueryFormer, self).__init__() + if use_hist: + hidden_dim = emb_size * 6 + emb_size // 8 + 1 + else: + hidden_dim = emb_size * 5 + emb_size // 8 + 1 + self.hidden_dim = hidden_dim + self.head_size = head_size + self.use_sample = use_sample + self.use_hist = use_hist + self.model_embed_size = model_embed_size + + self.rel_pos_encoder = nn.Embedding(64, head_size, padding_idx=0) + + self.height_encoder = nn.Embedding(64, hidden_dim, padding_idx=0) + + self.input_dropout = nn.Dropout(dropout) + encoders = [ + EncoderLayer( + hidden_dim, ffn_dim, dropout, attention_dropout_rate, head_size + ) + for _ in range(n_layers) + ] + self.layers = nn.ModuleList(encoders) + + self.final_ln = nn.LayerNorm(hidden_dim) + + self.super_token = nn.Embedding(1, hidden_dim) + self.super_token_virtual_distance = nn.Embedding(1, head_size) + + self.embbed_layer = FeatureEmbed( + embed_size=emb_size, + model_embed_size=model_embed_size, + use_sample=use_sample, + use_hist=use_hist, + bin_number=bin_number, + ) + + self.pred = Prediction(hidden_dim, pred_hid) + + # if multi-task + self.pred2 = Prediction(hidden_dim, pred_hid) + + def forward(self, batched_data): + # attention bias, rel pos, x + attn_bias, rel_pos, x = ( + batched_data.attn_bias, + batched_data.rel_pos, + batched_data.x, + ) + # heights encoding + heights = batched_data.heights + + n_batch, n_node = x.size()[:2] + tree_attn_bias = attn_bias.clone() + tree_attn_bias = tree_attn_bias.unsqueeze(1).repeat(1, self.head_size, 1, 1) + + # rel pos + rel_pos_bias = self.rel_pos_encoder(rel_pos).permute( + 0, 3, 1, 2 + ) # [n_batch, n_node, n_node, n_head] -> [n_batch, n_head, n_node, n_node] + tree_attn_bias[:, :, 1:, 1:] = tree_attn_bias[:, :, 1:, 1:] + rel_pos_bias + + # reset rel pos here + t = self.super_token_virtual_distance.weight.view(1, self.head_size, 1) + tree_attn_bias[:, :, 1:, 0] = tree_attn_bias[:, :, 1:, 0] + t + tree_attn_bias[:, :, 0, :] = tree_attn_bias[:, :, 0, :] + t + + # version 1 + # x_view = x.view(-1, 1172) + # version 2 + if self.use_sample: + x_view = x.view(-1, 1220) + else: + x_view = x.view(-1, 359) + node_feature = self.embbed_layer(x_view).view(n_batch, -1, self.hidden_dim) + + # -1 is number of dummy + + node_feature = node_feature + self.height_encoder(heights) + super_token_feature = self.super_token.weight.unsqueeze(0).repeat(n_batch, 1, 1) + super_node_feature = torch.cat([super_token_feature, node_feature], dim=1) + + # transfomrer encoder + output = self.input_dropout(super_node_feature) + for enc_layer in self.layers: + output = enc_layer(output, tree_attn_bias) + output = self.final_ln(output) + + return output[:, 0, :] + # return self.pred(output[:, 0, :]), self.pred2(output[:, 0, :]) + + def get_pred1(self, embeddings): + return self.pred(embeddings) + + def get_pred2(self, embeddings): + return self.pred2(embeddings) + + def get_embeddings(self, dataset, device): + batch, _ = collator(list(zip(*[dataset[i] for i in range(len(dataset))]))) + with torch.no_grad(): + batch = batch.to(device) + embeddings = self.forward(batch) + return embeddings + + +class FeedForwardNetwork(nn.Module): + def __init__(self, hidden_size, ffn_size, dropout_rate): + super(FeedForwardNetwork, self).__init__() + + self.layer1 = nn.Linear(hidden_size, ffn_size) + self.gelu = nn.GELU() + self.layer2 = nn.Linear(ffn_size, hidden_size) + + def forward(self, x): + x = self.layer1(x) + x = self.gelu(x) + x = self.layer2(x) + return x + + +class MultiHeadAttention(nn.Module): + def __init__(self, hidden_size, attention_dropout_rate, head_size): + super(MultiHeadAttention, self).__init__() + + self.head_size = head_size + + self.att_size = att_size = hidden_size // head_size + self.scale = att_size**-0.5 + + self.linear_q = nn.Linear(hidden_size, head_size * att_size) + self.linear_k = nn.Linear(hidden_size, head_size * att_size) + self.linear_v = nn.Linear(hidden_size, head_size * att_size) + self.att_dropout = nn.Dropout(attention_dropout_rate) + + self.output_layer = nn.Linear(head_size * att_size, hidden_size) + + def forward(self, q, k, v, attn_bias=None): + orig_q_size = q.size() + + d_k = self.att_size + d_v = self.att_size + batch_size = q.size(0) + + # head_i = Attention(Q(W^Q)_i, K(W^K)_i, V(W^V)_i) + q = self.linear_q(q).view(batch_size, -1, self.head_size, d_k) + k = self.linear_k(k).view(batch_size, -1, self.head_size, d_k) + v = self.linear_v(v).view(batch_size, -1, self.head_size, d_v) + + q = q.transpose(1, 2) # [b, h, q_len, d_k] + v = v.transpose(1, 2) # [b, h, v_len, d_v] + k = k.transpose(1, 2).transpose(2, 3) # [b, h, d_k, k_len] + + # Scaled Dot-Product Attention. + # Attention(Q, K, V) = softmax((QK^T)/sqrt(d_k))V + q = q * self.scale + x = torch.matmul(q, k) # [b, h, q_len, k_len] + if attn_bias is not None: + x = x + attn_bias + + x = torch.softmax(x, dim=3) + x = self.att_dropout(x) + x = x.matmul(v) # [b, h, q_len, attn] + + x = x.transpose(1, 2).contiguous() # [b, q_len, h, attn] + x = x.view(batch_size, -1, self.head_size * d_v) + + x = self.output_layer(x) + + assert x.size() == orig_q_size + return x + + +class EncoderLayer(nn.Module): + def __init__( + self, hidden_size, ffn_size, dropout_rate, attention_dropout_rate, head_size + ): + super(EncoderLayer, self).__init__() + + self.self_attention_norm = nn.LayerNorm(hidden_size) + self.self_attention = MultiHeadAttention( + hidden_size, attention_dropout_rate, head_size + ) + self.self_attention_dropout = nn.Dropout(dropout_rate) + + self.ffn_norm = nn.LayerNorm(hidden_size) + self.ffn = FeedForwardNetwork(hidden_size, ffn_size, dropout_rate) + self.ffn_dropout = nn.Dropout(dropout_rate) + + def forward(self, x, attn_bias=None): + y = self.self_attention_norm(x) + y = self.self_attention(y, y, y, attn_bias) + y = self.self_attention_dropout(y) + x = x + y + + y = self.ffn_norm(x) + y = self.ffn(y) + y = self.ffn_dropout(y) + x = x + y + return x diff --git a/velox/optimizer/python/query2Vec/trainer.py b/velox/optimizer/python/query2Vec/trainer.py new file mode 100644 index 000000000..13db94449 --- /dev/null +++ b/velox/optimizer/python/query2Vec/trainer.py @@ -0,0 +1,396 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import matplotlib.pyplot as plt +import numpy as np +import pandas as pd +from .dataset import PlanTreeDataset +from .database_util import collator, get_job_table_sample +import os +import time +import torch +from scipy.stats import pearsonr + + +def chunks(l, n): + """Yield successive n-sized chunks from l.""" + for i in range(0, len(l), n): + yield l[i : i + n] + + +def print_qerror(preds_unnorm, labels_unnorm, prints=False): + qerror = [] + for i in range(len(preds_unnorm)): + if preds_unnorm[i] > float(labels_unnorm[i]): + qerror.append(preds_unnorm[i] / float(labels_unnorm[i])) + else: + qerror.append(float(labels_unnorm[i]) / float(preds_unnorm[i])) + + e_50, e_90 = np.median(qerror), np.percentile(qerror, 90) + e_mean = np.mean(qerror) + + if prints: + print("[Q-ERROR] Median: {}".format(e_50)) + print("[Q-ERROR] Mean: {}".format(e_mean)) + + res = { + "q_median": e_50, + "q_90": e_90, + "q_mean": e_mean, + } + + return res + + +def get_corr(ps, ls): # unnormalised + ps = np.array(ps) + ls = np.array(ls) + corr, _ = pearsonr(np.log(ps), np.log(ls)) + + return corr + + +# def eval_workload(workload, methods): + +# get_table_sample = methods['get_sample'] + +# workload_file_name = './data/imdb/workloads/' + workload +# table_sample = get_table_sample(workload_file_name) +# plan_df = pd.read_csv('./data/imdb/{}_plan.csv'.format(workload)) +# workload_csv = pd.read_csv('./data/imdb/workloads/{}.csv'.format(workload),sep='#',header=None) +# workload_csv.columns = ['table','join','predicate','cardinality'] +# ds = PlanTreeDataset(plan_df, workload_csv, \ +# methods['encoding'], methods['hist_file'], methods['cost_norm'], \ +# methods['cost_norm'], 'cost', table_sample) + +# eval_score = evaluate_query2vec(methods['model'], ds, methods['bs'], methods['cost_norm'], methods['device'],True) +# return eval_score, ds + + +def evaluate_query2vec(model, ds, bs, norm, device, prints=False): + model.eval() + cost_predss = np.empty(0) + + with torch.no_grad(): + for i in range(0, len(ds), bs): + batch, batch_labels = collator( + list(zip(*[ds[j] for j in range(i, min(i + bs, len(ds)))])) + ) + + batch = batch.to(device) + + embed = model(batch) + cost_preds = model.get_pred1(embed) + cost_preds = cost_preds.squeeze() + + cost_predss = np.append(cost_predss, cost_preds.cpu().detach().numpy()) + scores = print_qerror(norm.unnormalize_labels(cost_predss), ds.costs, prints) + corr = get_corr(norm.unnormalize_labels(cost_predss), ds.costs) + if prints: + print("Corr: ", corr) + return scores, corr + + +def train_query2vec( + model, + train_ds, + val_ds, + crit, + cost_norm, + args, + optimizer=None, + scheduler=None, + log_best=False, + best_metric="q_mean" +): + + to_pred, bs, device, epochs, clip_size = ( + args.to_predict, + args.bs, + args.device, + args.epochs, + args.clip_size, + ) + lr = args.lr + + if not optimizer: + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + if not scheduler: + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 20, 0.7) + + t0 = time.time() + + rng = np.random.default_rng() + + best_prev = 999999 + best_model_path = None + + for epoch in range(epochs): + losses = 0 + cost_predss = np.empty(0) + + model.train() + + train_idxs = rng.permutation(len(train_ds)) + + cost_labelss = np.array(train_ds.costs)[train_idxs] + + for idxs in chunks(train_idxs, bs): + optimizer.zero_grad() + + batch, batch_labels = collator(list(zip(*[train_ds[j] for j in idxs]))) + + l, r = zip(*(batch_labels)) + + batch_cost_label = torch.FloatTensor(l).to(device) + batch = batch.to(device) + + embed = model(batch) + cost_preds = model.get_pred1(embed) + cost_preds = cost_preds.squeeze() + + loss = crit(cost_preds, batch_cost_label) + + loss.backward() + + torch.nn.utils.clip_grad_norm_(model.parameters(), clip_size) + + optimizer.step() + # SQ: added the following 3 lines to fix the out of memory issue + del batch + del batch_labels + torch.cuda.empty_cache() + + losses += loss.item() + cost_predss = np.append(cost_predss, cost_preds.detach().cpu().numpy()) + + if epoch > 10: + test_scores, corrs = evaluate_query2vec( + model, val_ds, bs, cost_norm, device, False + ) + test_scores["corr"] = corrs + if (best_metric != "corr" and test_scores[best_metric] < best_prev) or ( + best_metric == "corr" and test_scores[best_metric] > best_prev + ): + if log_best: + best_model_path = logging( + args, + epoch, + test_scores, + filename="log.txt", + save_model=True, + model=model, + ) + best_prev = test_scores[best_metric] + + if epoch % 5 == 0: + print( + "Epoch: {} Avg Loss: {}, Time: {}".format( + epoch, losses / len(train_ds), time.time() - t0 + ) + ) + # train_scores = print_qerror(cost_norm.unnormalize_labels(cost_predss),cost_labelss, True) + test_scores, corrs = evaluate_query2vec( + model, val_ds, bs, cost_norm, device, True + ) + + scheduler.step() + + return model, best_model_path + + +import torch.nn.functional as F + + +def cosine_contrastive_loss(anchor, positive, negative, margin=0.2): + # Normalize to unit vectors + anchor = F.normalize(anchor, dim=1) + positive = F.normalize(positive, dim=1) + negative = F.normalize(negative, dim=1) + + pos_sim = F.cosine_similarity(anchor, positive) + neg_sim = F.cosine_similarity(anchor, negative) + + loss = F.relu(margin + neg_sim - pos_sim).mean() + return loss + + +def train_query2vec_with_contrastive( + model, + train_ds, + val_ds, + crit, + cost_norm, + args, + optimizer=None, + scheduler=None, + cost_loss_factor=1.0, + contrastive_loss_factor=1.0, + log_best=False, + best_metric="q_mean" +): + + to_pred, bs, device, epochs, clip_size = ( + args.to_predict, + args.bs, + args.device, + args.epochs, + args.clip_size, + ) + lr = args.lr + + if not optimizer: + optimizer = torch.optim.Adam(model.parameters(), lr=lr) + if not scheduler: + scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 20, 0.7) + + t0 = time.time() + + rng = np.random.default_rng() + + best_prev = 999999 + best_model_path = None + + for epoch in range(epochs): + losses = 0 + mse_losses = 0 + contrastive_losses = 0 + cost_predss = np.empty(0) + + model.train() + + train_idxs = rng.permutation(len(train_ds)) + + cost_labels = np.array(train_ds.costs)[train_idxs] + + for idxs in chunks(train_idxs, bs): + optimizer.zero_grad() + + batch, batch_labels = collator(list(zip(*[train_ds[j] for j in idxs]))) + + l, r = zip(*(batch_labels)) + + batch_cost_label = torch.FloatTensor(l).to(device) + batch = batch.to(device) + + model_embeds = model(batch) + cost_preds = model.get_pred1(model_embeds) + cost_preds = cost_preds.squeeze() + + similar_models_batch, similar_model_labels = collator( + list(zip(*[train_ds[train_ds.similar_query_idx[j]] for j in idxs])) + ) + disimilar_models_batch, disimilar_model_labels = collator( + list(zip(*[train_ds[train_ds.dissimilar_query_idx[j]] for j in idxs])) + ) + similar_models_batch = similar_models_batch.to(device) + disimilar_models_batch = disimilar_models_batch.to(device) + similar_models_embeds = model(similar_models_batch) + disimilar_models_embeds = model(disimilar_models_batch) + + # similar_model_cost_preds = model.get_pred1(similar_models_embeds) + # similar_model_cost_preds = similar_model_cost_preds.squeeze() + # l, r = zip(*(similar_model_labels)) + # batch_similar_model_labels = torch.FloatTensor(l).to(device) + # disimilar_model_cost_preds = model.get_pred1(disimilar_models_embeds) + # disimilar_model_cost_preds = disimilar_model_cost_preds.squeeze() + # l, r = zip(*(disimilar_model_labels)) + # batch_disimilar_model_labels = torch.FloatTensor(l).to(device) + + # mse_loss = crit(cost_preds, batch_cost_label) + mse_loss = crit(cost_preds, batch_cost_label) + # +crit(similar_model_cost_preds, batch_similar_model_labels) + # +crit(disimilar_model_cost_preds, batch_disimilar_model_labels) + + contrastive_loss = cosine_contrastive_loss( + model_embeds, similar_models_embeds, disimilar_models_embeds + ) + loss = ( + cost_loss_factor * mse_loss + contrastive_loss * contrastive_loss_factor + ) # Adjust the weight of the contrastive loss as needed + + loss.backward() + + torch.nn.utils.clip_grad_norm_(model.parameters(), clip_size) + + optimizer.step() + # SQ: added the following 3 lines to fix the out of memory issue + # del batch + # del batch_labels + torch.cuda.empty_cache() + + losses += loss.item() + mse_losses += mse_loss.item() + contrastive_losses += contrastive_loss.item() + + cost_predss = np.append(cost_predss, cost_preds.detach().cpu().numpy()) + # break + + # break + if epoch > 10: + test_scores, corrs = evaluate_query2vec( + model, val_ds, bs, cost_norm, device, False + ) + test_scores["corr"] = corrs + if (best_metric != "corr" and test_scores[best_metric] < best_prev) or ( + best_metric == "corr" and test_scores[best_metric] > best_prev + ): + if log_best: + best_model_path = logging( + args, + epoch, + test_scores, + filename="log.txt", + save_model=True, + model=model, + ) + best_prev = test_scores[best_metric] + + if epoch % 5 == 0: + print( + "Epoch: {} Avg Loss: {}, MSE Loss: {}, Contrastive Loss: {}, Time: {}".format( + epoch, + losses / len(train_ds), + mse_losses / len(train_ds), + contrastive_losses / len(train_ds), + time.time() - t0, + ) + ) + # train_scores = print_qerror(cost_norm.unnormalize_labels(cost_predss),cost_labelss, True) + test_scores, corrs = evaluate_query2vec( + model, val_ds, bs, cost_norm, device, True + ) + + scheduler.step() + + return model, best_model_path + + +def logging(args, epoch, qscores, filename=None, save_model=False, model=None): + arg_keys = [attr for attr in dir(args) if not attr.startswith("__")] + arg_vals = [getattr(args, attr) for attr in arg_keys] + + res = dict(zip(arg_keys, arg_vals)) + model_checkpoint = str(hash(tuple(arg_vals))) + ".pt" + + res["epoch"] = epoch + res["model"] = model_checkpoint + + res = {**res, **qscores} + + filename = args.newpath + filename + model_checkpoint = args.newpath + model_checkpoint + + if filename is not None: + if os.path.isfile(filename): + df = pd.read_csv(filename) + res_df = pd.DataFrame([res]) + df = pd.concat([df, res_df], ignore_index=True) + df.to_csv(filename, index=False) + else: + df = pd.DataFrame(res, index=[0]) + df.to_csv(filename, index=False) + if save_model: + torch.save({"model": model.state_dict(), "args": args}, model_checkpoint) + + return res["model"] diff --git a/velox/optimizer/python/query2Vec/util.py b/velox/optimizer/python/query2Vec/util.py new file mode 100644 index 000000000..c95b7b1b5 --- /dev/null +++ b/velox/optimizer/python/query2Vec/util.py @@ -0,0 +1,51 @@ +""" +CREDIT: The following code is adapted from the QueryFormer work: https://github.com/zhaoyue-ntu/QueryFormer +""" +import numpy as np +import torch + + +class Normalizer: + def __init__(self, mini=None, maxi=None): + self.mini = mini + self.maxi = maxi + + def normalize_labels(self, labels, reset_min_max=False): + ## added 0.001 for numerical stability + labels = np.array(labels) + 0.001 + if self.mini is None or reset_min_max: + self.mini = labels.min() + print("min log(label): {}".format(self.mini)) + if self.maxi is None or reset_min_max: + self.maxi = labels.max() + print("max log(label): {}".format(self.maxi)) + labels_norm = (labels - self.mini) / (self.maxi - self.mini) + # Threshold labels <-- but why... + labels_norm = np.minimum(labels_norm, 1) + labels_norm = np.maximum(labels_norm, 0.001) + + return labels_norm + + def unnormalize_labels(self, labels_norm): + labels_norm = np.array(labels_norm, dtype=np.float32) + labels = (labels_norm * (self.maxi - self.mini)) + self.mini + return np.array(labels - 0.001) + + +def seed_everything(): + torch.manual_seed(0) + import random + + random.seed(0) + np.random.seed(0) + torch.backends.cudnn.benchmark = False + + +def normalize_data(val, column_name, column_min_max_vals): + min_val = column_min_max_vals[column_name][0] + max_val = column_min_max_vals[column_name][1] + val = float(val) + val_norm = 0.0 + if max_val > min_val: + val_norm = (val - min_val) / (max_val - min_val) + return np.array(val_norm, dtype=np.float32)