diff --git a/inferf/CMakeLists.txt b/inferf/CMakeLists.txt new file mode 100644 index 000000000..15abf21d5 --- /dev/null +++ b/inferf/CMakeLists.txt @@ -0,0 +1,80 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.0 FATAL_ERROR) +#set(CMAKE_PREFIX_PATH "$HOME/libtorch; $CONDA_PREFIX") +set(CMAKE_PREFIX_PATH "$HOME/libtorch/share/cmake/Torch") +set(CMAKE_PREFIX_PATH "$CONDA_PREFIX") +# include_directories(SYSTEM ${TORCH_INCLUDE_DIRS}) +# set(TORCH_LIBRARIES "$HOME/libtorch") + +find_package(Torch REQUIRED) +find_package(xgboost REQUIRED) +find_package(cpr REQUIRED) +find_package(jsoncpp REQUIRED) + +# include tokenizer cpp as a sub directory + + + +include_directories(/home/velox/third_party/tokenizers-cpp/include) +include_directories(/home/velox/third_party/tokenizers-cpp/src) + +add_library(tokenizer_cpp STATIC IMPORTED) +add_library(tokenizer_c STATIC IMPORTED) +set_target_properties(tokenizer_cpp PROPERTIES IMPORTED_LOCATION /home/velox/third_party/tokenizers-cpp/example/build/tokenizers/libtokenizers_cpp.a) +set_target_properties(tokenizer_c PROPERTIES IMPORTED_LOCATION /home/velox/third_party/tokenizers-cpp/example/build/tokenizers/libtokenizers_c.a) + + +# TODO: temporary disable it until we can fix the build of dependency +# add_executable(standalone_hf_tokenizer_test tests/StandaloneHFTokenizerTest.cpp) +# target_link_libraries(standalone_hf_tokenizer_test +# # FIXME: for some reason tokenizer cpp needs to be placed before tokenizer c, +# # needs fix to automatically compile it from 3rd library +# tokenizer_cpp +# tokenizer_c +# ) + + +include_directories("/usr/include/hdf5/serial") +include_directories("/home/h5cpp/build/src/h5cpp") + +find_package(h5cpp REQUIRED) +find_package(HDF5 COMPONENTS C CXX HL REQUIRED) + +link_directories( ${HDF5_LIBRARY_DIRS} ) +include_directories( ${HDF5_INCLUDE_DIRS} ) + + +add_executable(factorize_test tests/RewriteFactorized.cpp) +target_link_libraries( + factorize_test + velox_aggregates + velox_type + velox_vector + velox_vector_test_lib + velox_exec + velox_exec_test_lib + velox_tpch_connector + velox_memory + velox_common_base + velox_vector_fuzzer + openblas + ${TORCH_LIBRARIES} + jsoncpp_lib + h5cpp::h5cpp + hdf5_serial + ${HDF5_CXX_LIBRARIES} +) + diff --git a/inferf/python/genetic.py b/inferf/python/genetic.py new file mode 100644 index 000000000..bfd1bd282 --- /dev/null +++ b/inferf/python/genetic.py @@ -0,0 +1,207 @@ +import json +import random +from collections import defaultdict + +# class that define the properties of an input edge +class Edge: + def __init__(self, _id, parent_node, child_node, direction, num_input_features, num_input_rows, num_output_features, num_output_rows): + self._id = _id + self.parent_node = parent_node + self.child_node = child_node + self.direction = direction + self.num_input_features = num_input_features + self.num_input_rows = num_input_rows + self.num_output_features = num_output_features + self.num_output_rows = num_output_rows + + +# for each input edge, returns the height/distance from corresponding leaf edge. returns 1 for leaf edges +def getHeight(edge_list, child_edge_list): + height = {} + def helper(e_id): + if e_id in height: + return height[e_id] + + if e_id not in child_edge_list: + height[e_id] = 1 + return height[e_id] + else: + left, right = child_edge_list[e_id] + height[left] = helper(left) + height[right] = helper(right) + height[e_id] = max(height[left], height[right]) + 1 + return height[e_id] + + edge_heights = [] + for edge in edge_list: + height[edge._id] = helper(edge._id) + edge_heights.append((edge._id, height[edge._id])) + return edge_heights + + +# checks if the labels of various join edges are valid, specially looks for edges where label should be 2 +def validateChromosome(current, map_edge, child_edge_list, edge_heights): + keys = child_edge_list.keys() + for e_id, _ in edge_heights: + if e_id in keys: + left, right = child_edge_list[e_id] + lLeft, lRight = current[map_edge[left]], current[map_edge[right]] + if (lLeft == 1 and lRight == 1) or (lLeft == 1 and lRight == 2) or (lLeft == 2 and lRight == 1) or (lLeft == 2 and lRight == 2): + current[map_edge[e_id]] = 2 + else: + if current[map_edge[e_id]] == 2: + current[map_edge[e_id]] = random.randint(0, 1) + return current + + +# method to initialize n valid chromosomes representing various factorization plans +def initChromosome(n, k, map_edge, child_edge_list, edge_heights): + map_chrm = {} + count = 0 + while count < n: + current = [random.choice([0, 1]) for _ in range(k)] + current_updated = validateChromosome(current, map_edge, child_edge_list, edge_heights) + if current_updated not in map_chrm.values(): + map_chrm[count] = current_updated + count += 1 + return map_chrm + + +# method to exchange subtrees between two parents +def crossoverParents(parent1, parent2, map_edge, child_edge_list, edge_heights): + keys = list(child_edge_list.keys()) + targetKey = keys.pop(random.randrange(len(keys))) + offspring1, offspring2 = parent1.copy(), parent2.copy() + stack, subtree = [targetKey], [] + + while stack: + current = stack.pop() + subtree.append(current) + if current in keys: + stack.extend(childList[current]) + + for e_id in subtree: + offspring1[e_id], offspring2[e_id] = offspring2[e_id], offspring1[e_id] + + return validateChromosome(offspring1, map_edge, child_edge_list, edge_heights), validateChromosome(offspring2, map_edge, child_edge_list, edge_heights) + + +# method to make random change on the chromosomes generated after crossover between two parent chromosomes +def mutateChromosome(chromosome, map_edge, child_edge_list, edge_heights): + mutabel_edges = [i for i, label in enumerate(chromosome) if label in [0, 1]] + if len(mutabel_edges) > 0: + targetEdge = mutabel_edges.pop(random.randrange(len(mutabel_edges))) + chromosome[targetEdge] = 1 - chromosome[targetEdge] + return validateChromosome(chromosome, map_edge, child_edge_list, edge_heights) + + +def getFitness(chromosome): + """Fitness function that returns a random value.""" + return random.random() + + +# method to perform the genetic algorithm +def performGenetic(edges, num_join, factorized_output_features, p=2, max_iter=5, utility_threshold=0.5, k1=9.8167, k2=2.1713): + + labels = {} # stores labels of input join edges + map_edge = {} # maps edge id to a position index in the chromosome representation + node_to_edge = defaultdict(lambda: []) # maps a join node to its two child edges + + # initialize labels, map_edges, and node_to_edge + j = 0 + for e in edges: + map_edge[e._id] = j + j += 1 + labels[e._id] = 0 + temp_list = node_to_edge[e.parent_node] + temp_list.append((e._id, e.direction)) + node_to_edge[e.parent_node] = temp_list + + # child_edge_list holds the left and right child ids for each non-leaf edges + child_edge_list = defaultdict(lambda: ["", ""]) + for e in edges: + child_node = e.child_node + if child_node in node_to_edge: + e1, direction1 = node_to_edge[child_node][0] + e2, direction2 = node_to_edge[child_node][1] + child_edge_list[e._id][direction1] = e1 + child_edge_list[e._id][direction2] = e2 + + edge_heights = getHeight(edge_list, child_edge_list) + sorted_heights = sorted(edge_heights, key=lambda x: x[1], reverse=False) + + map_chrm = initChromosome(num_join*p, len(edges), map_edge, child_edge_list, edge_heights) + num_chrm = len(map_chrm.keys()) + bestChrm, bestFitness = map_chrm[0], float("-inf") + + # perform parent selection, crossover of parents, and mutation max_iter times to generate random chromosomes + for i in range(max_iter): + idx1, idx2 = random.sample(range(0, num_chrm), 2) + parent1, parent2 = map_chrm[idx1], map_chrm[idx2] + parent1, parent2 = crossoverParents(parent1, parent2, map_edge, child_edge_list, edge_heights) + parent1 = mutateChromosome(parent1, map_edge, child_edge_list, edge_heights) + parent2 = mutateChromosome(parent2, map_edge, child_edge_list, edge_heights) + fitness1, fitness2 = getFitness(parent1), getFitness(parent2) + + if fitness1 > bestFitness: + bestFitness = fitness1 + bestChrm = parent1 + if fitness2 > bestFitness: + bestFitness = fitness2 + bestChrm = parent2 + + return bestChrm, map_edge + + + +def getEdgesFromJson(json_data): + num_join = 0 + idx = 0 + edge_list = [] + for json_obj in json_data: + num_join += 1 + join_id = json_obj['ID'] + left = json_obj['Left'] + right = json_obj['Right'] + tuple_left = json_obj['NumTuplesLeft'] + dim_left = json_obj['NumDimLeft'] + tuple_right = json_obj['NumTuplesRight'] + dim_right = json_obj['NumDimRight'] + tuple_output = json_obj['NumTuplesOutput'] + dim_output = json_obj['NumDimOutput'] + + edge = Edge(idx, join_id, left, 0, dim_left, tuple_left, dim_output, tuple_output) + edge_list.append(edge) + idx += 1 + + edge = Edge(idx, join_id, right, 1, dim_right, tuple_right, dim_output, tuple_output) + edge_list.append(edge) + idx += 1 + return edge_list, num_join + + +def getNeuronInputSize(model_path, input_layer): + state_dict = torch.load(model_path) + input_weight = state_dict[f"{input_layer}.weight"] + return input_weight.shape[0] + + + + +if __name__ == "__main__": + + file_path = "plans/4_3.txt" + num_neurons = getNeuronInputSize("plans/dummy.pth", "fc1") + + with open(file_path, "r") as file: + json_string = file.read() + + if json_string.startswith('R"(') and json_string.endswith(')"'): + json_string = json_string[3:-2] + json_data = json.loads(json_string) + + edge_list, num_join = getEdgesFromJson(json_data) + selected_plan, map_edge = performGenetic(edge_list, num_join, factorized_output_features=64, utility_threshold=0.5) + + for edge in edge_list: + print(f"Plan: {edge.child_node} ---> {edge.parent_node} = {selected_plan[map_edge[edge._id]]}") diff --git a/inferf/python/greedy.py b/inferf/python/greedy.py new file mode 100644 index 000000000..7441c7af2 --- /dev/null +++ b/inferf/python/greedy.py @@ -0,0 +1,140 @@ +import heapq +import json +from collections import defaultdict +import torch + + +class Edge: + def __init__(self, _id, parent_node, child_node, direction, num_input_features, num_input_rows, num_output_features, num_output_rows): + self._id = _id + self.parent_node = parent_node + self.child_node = child_node + self.direction = direction + self.num_input_features = num_input_features + self.num_input_rows = num_input_rows + self.num_output_features = num_output_features + self.num_output_rows = num_output_rows + + + +def performGreedy(edges, factorized_output_features, utility_threshold=0.5, k1=9.8167, k2=2.1713): + + def getUtility(num_input_features, num_input_rows, num_output_features, num_output_rows): + #return k1 * (num_output_features - factorized_output_features) + k2 * (num_output_rows*num_output_features - num_input_rows*num_input_features) + return k1 * (num_output_features - factorized_output_features) + k2 * (num_output_rows*num_output_features - num_input_rows*num_output_features) + + + labels = {} + utility = {} + node_to_edge = defaultdict(lambda: []) + gpq = [] + processed = set() + + for e in edges: + labels[e._id] = 0 + temp_list = node_to_edge[e.parent_node] + temp_list.append((e._id, e.direction)) + node_to_edge[e.parent_node] = temp_list + + utility[e._id] = getUtility(e.num_input_features, e.num_input_rows, e.num_output_features, e.num_output_rows) + if utility[e._id] >= utility_threshold: + heapq.heappush(gpq, (-1 * utility[e._id], e._id)) + + + child_edge_list = defaultdict(lambda: ["", ""]) + for e in edges: + child_node = e.child_node + if child_node in node_to_edge: + e1, direction1 = node_to_edge[child_node][0] + e2, direction2 = node_to_edge[child_node][1] + child_edge_list[e._id][direction1] = e1 + child_edge_list[e._id][direction2] = e2 + + def traverseBranch(eLocal): + if eLocal in processed: + return labels[eLocal] + + + if eLocal not in child_edge_list: + if utility[eLocal] >= utility_threshold: + labels[eLocal] = 1 + processed.add(eLocal) + return labels[eLocal] + else: + left, right = child_edge_list[eLocal] + + if left not in processed: + labels[left] = traverseBranch(left) + if right not in processed: + labels[right] = traverseBranch(right) + + if (labels[left] == 1 and labels[right] == 1) or (labels[left] == 1 and labels[right] == 2) or (labels[left] == 2 and labels[right] == 1) or (labels[left] == 2 and labels[right] == 2): + labels[eLocal] = 2 + else: + if utility[eLocal] >= utility_threshold: + labels[eLocal] = 1 + processed.add(eLocal) + return labels[eLocal] + + + while gpq: + max_pair = heapq.heappop(gpq) + uCurrent = max_pair[0] + eCurrent = max_pair[1] + labels[eCurrent] = traverseBranch(eCurrent) + + return labels + + +def getEdgesFromJson(json_data): + idx = 0 + edge_list = [] + for json_obj in json_data: + join_id = json_obj['ID'] + left = json_obj['Left'] + right = json_obj['Right'] + tuple_left = json_obj['NumTuplesLeft'] + dim_left = json_obj['NumDimLeft'] + tuple_right = json_obj['NumTuplesRight'] + dim_right = json_obj['NumDimRight'] + tuple_output = json_obj['NumTuplesOutput'] + dim_output = json_obj['NumDimOutput'] + + edge = Edge(idx, join_id, left, 0, dim_left, tuple_left, dim_output, tuple_output) + edge_list.append(edge) + idx += 1 + + edge = Edge(idx, join_id, right, 1, dim_right, tuple_right, dim_output, tuple_output) + edge_list.append(edge) + idx += 1 + return edge_list + + + +def getNeuronInputSize(model_path, input_layer): + state_dict = torch.load(model_path) + input_weight = state_dict[f"{input_layer}.weight"] + return input_weight.shape[0] + + + + +if __name__ == "__main__": + file_path = "plans/4_3.txt" + num_neurons = getNeuronInputSize("plans/dummy.pth", "fc1") + + with open(file_path, "r") as file: + json_string = file.read() + + if json_string.startswith('R"(') and json_string.endswith(')"'): + json_string = json_string[3:-2] + json_data = json.loads(json_string) + + edge_list = getEdgesFromJson(json_data) + labels = performGreedy(edge_list, factorized_output_features=num_neurons, utility_threshold=0.5) + + print("Number of neurons in the split layer:", str(num_neurons)) + for edge in edge_list: + print(f"Plan: {edge.child_node} ---> {edge.parent_node} = {labels[edge._id]}") + + diff --git a/inferf/python/plans/4_3.txt b/inferf/python/plans/4_3.txt new file mode 100644 index 000000000..261cff1c7 --- /dev/null +++ b/inferf/python/plans/4_3.txt @@ -0,0 +1,5 @@ +R"([ + {"ID":"0","Left":"table_1","Right":"table_2","Pred":"table_1.join_key1 = table_2.join_key2","ProbeKeys":"join_key1","BuildKeys":"join_key2","Projection":["join_key1","f_table_1","join_key2","f_table_2"],"NumTuplesLeft":1000,"NumDimLeft":20,"NumTuplesRight":300,"NumDimRight":39,"NumTuplesOutput":300,"NumDimOutput":59}, + {"ID":"1","Left":"0","Right":"table_3","Pred":"table_2.join_key2 = table_3.join_key3","ProbeKeys":"join_key2","BuildKeys":"join_key3","Projection":["join_key1","f_table_1","join_key2","f_table_2","join_key3","f_table_3"],"NumTuplesLeft":300,"NumDimLeft":59,"NumTuplesRight":12000,"NumDimRight":31,"NumTuplesOutput":12000,"NumDimOutput":90}, + {"ID":"2","Left":"1","Right":"table_4","Pred":"table_3.join_key3 = table_4.join_key4","ProbeKeys":"join_key3","BuildKeys":"join_key4","Projection":["join_key1","f_table_1","join_key2","f_table_2","join_key3","f_table_3","join_key4","f_table_4"],"NumTuplesLeft":12000,"NumDimLeft":90,"NumTuplesRight":120000,"NumDimRight":19,"NumTuplesOutput":120000,"NumDimOutput":109} +])" \ No newline at end of file diff --git a/inferf/python/plans/dummy.pth b/inferf/python/plans/dummy.pth new file mode 100644 index 000000000..a18998400 Binary files /dev/null and b/inferf/python/plans/dummy.pth differ diff --git a/inferf/python/synthesize_data.py b/inferf/python/synthesize_data.py new file mode 100644 index 000000000..5ae394541 --- /dev/null +++ b/inferf/python/synthesize_data.py @@ -0,0 +1,136 @@ +import numpy as np +import pandas as pd +import random +import os +import json + + +def get_join_statistics(join_id, left_table_name, right_table_name, left_table_actual, left_key, right_key, projection, left_rows, left_cols, right_rows, right_cols): + join_info = {} + join_info["ID"] = join_id + join_info["Left"] = left_table_name + join_info["Right"] = right_table_name + join_info["Pred"] = f"{left_table_actual}.{left_key} = {right_table_name}.{right_key}" + join_info["ProbeKeys"] = left_key + join_info["BuildKeys"] = right_key + join_info["Projection"] = projection + join_info["NumTuplesLeft"] = left_rows + join_info["NumDimLeft"] = left_cols + join_info["NumTuplesRight"] = right_rows + join_info["NumDimRight"] = right_cols + join_info["NumTuplesOutput"] = right_rows + join_info["NumDimOutput"] = left_cols + right_cols + return join_info + + +# Function to generate a random table with size controls +def generate_table_optimized(table_idx, join_key, num_rows, num_columns, ratio=None, join_key_values=None): + if join_key_values is None: + join_key_values = np.random.randint(1, num_rows + 1, size=num_rows) + else: + #ratio = 5 # Limit the duplication factor to control row growth + #num_rows = min(num_rows, len(join_key_values) * ratio) + #join_key_values = np.repeat(join_key_values, np.random.randint(1, ratio + 1))[:num_rows] + if ratio < 1: + join_key_values = np.random.choice(join_key_values, size=num_rows, replace=False) + else: + multiply_factor = int(ratio) + join_key_values = np.repeat(join_key_values, multiply_factor)#[:num_rows] + + feature_columns = np.random.rand(num_rows, num_columns) + data = pd.DataFrame(feature_columns, columns=[f"feature_{i}" for i in range(1, num_columns + 1)]) + data.insert(0, join_key, join_key_values) + return data + +# Main function to generate and save the dataset +def generate_synthetic_dataset(data_id="10_2", max_columns=1500, max_single_columns=150, num_tables=10, base_rows=1000, ratios=[0.6, 0.6, 15, 25, 2, 4, 0.8, 3, 10]): + output_dir = "synthetic_dataset_" + data_id + total_columns = 0 + tables = [] + #base_rows = random.randint(5000, 10000) # Number of rows in the first table + + plans_array = [] + join_serial = 0 + prev_join_id = None + left_rows = 0 + left_cols = 0 + projection = [] + + # Generate tables with size controls + for i in range(num_tables): + remaining_columns = max_columns - total_columns - (num_tables - len(tables) - 1) + if remaining_columns <= 0: + break + + num_columns = random.randint(10, min(max_single_columns, remaining_columns)) # Reduce max columns per table + total_columns += num_columns + + table_idx = i + 1 + join_key = "join_key" + str(table_idx) + + if i == 0: + num_rows = base_rows + table = generate_table_optimized(table_idx, join_key, num_rows, num_columns) + left_rows = num_rows + left_cols = num_columns + else: + #k = random.uniform(0.1, 0.5) # Reduce row growth factor + k = ratios[i - 1] + num_rows = int(len(tables[-1]) * k) + #if num_rows > 100000: + # num_rows = 100000 + prev_join_key = "join_key" + str(i) + table = generate_table_optimized(table_idx, join_key, num_rows, num_columns, ratio=k, join_key_values=tables[-1][prev_join_key].values) + + right_table_name = "table_" + str(i + 1) + if join_serial == 0: + left_table_name = "table_" + str(i) + left_table_actual = left_table_name + else: + left_table_name = prev_join_id + left_table_actual = "table_" + str(i) + join_id = str(join_serial) + + if len(projection) == 0: + projection.append(prev_join_key) + projection.append("f_" + left_table_name) + + projection.append(join_key) + projection.append("f_" + right_table_name) + + join_info = get_join_statistics(join_id, left_table_name, right_table_name, left_table_actual, prev_join_key, join_key, projection.copy(), left_rows, left_cols, num_rows, num_columns) + plans_array.append(join_info) + + left_rows = num_rows + left_cols += num_columns + prev_join_id = join_id + join_serial += 1 + + tables.append(table) + + # Save tables to CSV + os.makedirs(output_dir, exist_ok=True) + for idx, table in enumerate(tables, start=1): + table.to_csv(os.path.join(output_dir, f"table_{idx}.csv"), index=False) + print(f"Dataset generated and saved in {output_dir}") + + # Save the JSON array to a file + #with open(f"plans/{data_id}.json", "w") as json_file: + # json.dump(plans_array, json_file, indent=4) + + # Convert each JSON object in the array to a compact one-line format + formatted_objects = [json.dumps(obj, separators=(',', ':')) for obj in plans_array] + # Combine the formatted objects into a single JSON array + formatted_json_array = "[\n\t" + ",\n\t".join(formatted_objects) + "\n]" + # Wrap the formatted array in R"( ... )" + formatted_string = f'R"({formatted_json_array})"' + # Print the result + print(formatted_string) + # Save to a file + with open(f"plans/{data_id}.txt", "w") as file: + file.write(formatted_string) + + +# Run the script +if __name__ == "__main__": + generate_synthetic_dataset() diff --git a/inferf/tests/RewriteFactorized.cpp b/inferf/tests/RewriteFactorized.cpp new file mode 100644 index 000000000..35056e30c --- /dev/null +++ b/inferf/tests/RewriteFactorized.cpp @@ -0,0 +1,1315 @@ +#include +//#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "velox/type/Type.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/ml_functions/DecisionTree.h" +#include "velox/ml_functions/XGBoost.h" +#include "velox/ml_functions/tests/MLTestUtility.h" +#include "velox/parse/TypeResolver.h" +#include "velox/ml_functions/VeloxDecisionTree.h" +#include "velox/common/file/FileSystems.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" +#include "velox/dwio/parquet/RegisterParquetReader.h" +#include "velox/dwio/parquet/RegisterParquetWriter.h" +#include +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/ml_functions/functions.h" +#include "velox/ml_functions/Concat.h" +#include "velox/ml_functions/NNBuilder.h" +#include +#include +#include "velox/ml_functions/VeloxDecisionTree.h" +#include "velox/expression/VectorFunction.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace ml; +using namespace facebook::velox; +using namespace facebook::velox::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::core; + +/* + * The structure to describe the context of a neural network model architecture + */ +struct NNModelContext { + + int inputFeatures; + + int numLayers; + + int hiddenLayerNeurons; + + int outputLayerNeurons; + +}; + +/* + * The structure to describe the context of a decision tree model architecture + */ + +struct DTModelContext { + + int inputFeatures; + + int treeDepth; + +}; + + +/* + * The structure to describe the push down status of a feature + */ +struct FeatureStatus { + int isFeature; + + int isPushed; + + int vectorSize; + + int featureStartPos; + +}; + + +class VectorAddition : public MLFunction { + public: + VectorAddition(int inputDims) { + inputDims_ = inputDims; + } + + void apply( + const SelectivityVector& rows, + std::vector& args, + const TypePtr& type, + exec::EvalCtx& context, + VectorPtr& output) const override { + BaseVector::ensureWritable(rows, type, context.pool(), output); + + BaseVector* left = args[0].get(); + BaseVector* right = args[1].get(); + + exec::LocalDecodedVector leftHolder(context, *left, rows); + auto decodedLeftArray = leftHolder.get(); + auto baseLeftArray = + decodedLeftArray->base()->as()->elements(); + + exec::LocalDecodedVector rightHolder(context, *right, rows); + auto decodedRightArray = rightHolder.get(); + auto baseRightArray = rightHolder->base()->as()->elements(); + + float* input1Values = baseLeftArray->values()->asMutable(); + float* input2Values = baseRightArray->values()->asMutable(); + + int numInput = rows.size(); + + Eigen::Map< + Eigen::Matrix> + input1Matrix(input1Values, numInput, inputDims_); + Eigen::Map< + Eigen::Matrix> + input2Matrix(input2Values, numInput, inputDims_); + + std::vector> results; + + for (int i = 0; i < numInput; i++) { + //Eigen::Matrix vSum = input1Matrix.row(i) + input2Matrix.row(i); + Eigen::VectorXf vSum = input1Matrix.row(i) + input2Matrix.row(i); + std::vector curVec(vSum.data(), vSum.data() + vSum.size()); + //std::vector std_vector(vSum.data(), vSum.data() + vSum.size()); + results.push_back(curVec); + } + + VectorMaker maker{context.pool()}; + output = maker.arrayVector(results, REAL()); + } + + static std::vector> signatures() { + return {exec::FunctionSignatureBuilder() + .argumentType("array(REAL)") + .argumentType("array(REAL)") + .returnType("array(REAL)") + .build()}; + } + + static std::string getName() { + return "vector_addition"; + }; + + float* getTensor() const override { + // TODO: need to implement + return nullptr; + } + + CostEstimate getCost(std::vector inputDims) { + // TODO: need to implement + return CostEstimate(0, inputDims[0], inputDims[1]); + } + + private: + int inputDims_; +}; + + +class RewriteFactorized : HiveConnectorTestBase { + public: + RewriteFactorized() { + // Register Presto scalar functions. + functions::prestosql::registerAllScalarFunctions(); + // Register Presto aggregate functions. + aggregate::prestosql::registerAllAggregateFunctions(); + // Register type resolver with DuckDB SQL parser. + parse::registerTypeResolver(); + // HiveConnectorTestBase::SetUp(); + //parquet::registerParquetReaderFactory(); + + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector(kHiveConnectorId, std::make_shared()); + connector::registerConnector(hiveConnector); + + // SetUp(); + + } + + ~RewriteFactorized() {} + + void SetUp() override { + // TODO: not used for now + // HiveConnectorTestBase::SetUp(); + // parquet::registerParquetReaderFactory(); + } + + void TearDown() override { + HiveConnectorTestBase::TearDown(); + } + + void TestBody() override { + } + + + static void waitForFinishedDrivers(const std::shared_ptr& task) { + + while (!task->isFinished()) { + + usleep(1000); // 0.01 second. + + } + } + + std::shared_ptr executor_{ + std::make_shared( + std::thread::hardware_concurrency())}; + + std::shared_ptr queryCtx_{ + std::make_shared(executor_.get())}; + + std::shared_ptr pool_{memory::MemoryManager::getInstance()->addLeafPool()}; + + VectorMaker maker{pool_.get()}; + + std::unordered_map tableName2RowVector; + std::unordered_map>> operatorParam2Weights; + std::unordered_map> tabel2Columns; + std::unordered_map allFeatureStatus; + std::vector modelOperators; + std::map factorizationPlans; + std::map featureStartPos; + int totalFeatures = 0; + + + + // Function to check if a string represents a valid integer + bool isInteger(const std::string& s) { + + if (s.empty() || (s.size() > 1 && s[0] == '0')) { + + return false; // prevent leading zeros for non-zero integers + + } + + for (char c : s) { + + if (!std::isdigit(c)) { + + return false; + } + } + + return true; + + } + + + + void findFactorizationPlans(std::string filePath) { + // Open the file + std::ifstream inputFile(filePath); + if (!inputFile.is_open()) { + std::cerr << "Failed to open the file: " << filePath << std::endl; + //return 1; + } + + // Map to store key-value pairs + std::map myMap; + + // Read the file line by line + std::string line; + while (std::getline(inputFile, line)) { + // Find the separator " = " to split the key and value + size_t separatorPos = line.find(" = "); + if (separatorPos != std::string::npos) { + // Extract the key and value + std::string key = line.substr(0, separatorPos); + int value = std::stoi(line.substr(separatorPos + 3)); // Convert value to int + + // Store in the map + myMap[key] = value; + } + } + + // Close the file + inputFile.close(); + + factorizationPlans = myMap; + } + + + + std::vector extractOperatorsInReverse(const std::string& input) { + std::vector operators; + std::regex operatorPattern(R"(([a-zA-Z_]+\d+)\()"); // Match patterns like mat_mul1(, mat_add2(, etc. + std::smatch match; + + std::string::const_iterator searchStart(input.cbegin()); + while (std::regex_search(searchStart, input.cend(), match, operatorPattern)) { + operators.push_back(match[1]); // Extract the operator name + searchStart = match.suffix().first; // Move the search start position + } + + // Reverse the order of operators to match actual execution order + std::reverse(operators.begin(), operators.end()); + return operators; + } + + + + std::vector> loadHDF5Array(const std::string& filename, const std::string& datasetName, int doPrint) { + H5::H5File file(filename, H5F_ACC_RDONLY); + H5::DataSet dataset = file.openDataSet(datasetName); + H5::DataSpace dataspace = dataset.getSpace(); + + // Get the number of dimensions + int rank = dataspace.getSimpleExtentNdims(); + // std::cout << "Rank: " << rank << std::endl; + + // Allocate space for the dimensions + std::vector dims(rank); + + // Get the dataset dimensions + dataspace.getSimpleExtentDims(dims.data(), nullptr); + + size_t rows; + size_t cols; + + if (rank == 1) { + rows = dims[0]; + cols = 1; + } + else if (rank == 2) { + rows = dims[0]; + cols = dims[1]; + } else { + throw std::runtime_error("Unsupported rank: " + std::to_string(rank)); + } + + // Read data into a 1D vector + std::vector flatData(rows * cols); + dataset.read(flatData.data(), H5::PredType::NATIVE_FLOAT); + + // Convert to 2D vector + std::vector> result(rows, std::vector(cols)); + for (size_t i = 0; i < rows; ++i) { + for (size_t j = 0; j < cols; ++j) { + result[i][j] = flatData[i * cols + j]; + if (doPrint == 1) + std::cout << result[i][j] << ", "; + } + if (doPrint == 1) + std::cout << std::endl; + } + + // Close the dataset and file + dataset.close(); + file.close(); + + return result; + } + + + + + void findWeights(std::string modelPath) { + // read the parameter weights from file + std::vector> w1 = loadHDF5Array(modelPath, "fc1.weight", 0); + std::vector> b1 = loadHDF5Array(modelPath, "fc1.bias", 0); + std::vector> w2 = loadHDF5Array(modelPath, "fc2.weight", 0); + std::vector> b2 = loadHDF5Array(modelPath, "fc2.bias", 0); + std::vector> w3 = loadHDF5Array(modelPath, "fc3.weight", 0); + std::vector> b3 = loadHDF5Array(modelPath, "fc3.bias", 0); + + // store the weights in map with same name as operator + operatorParam2Weights["mat_mul1"] = w1; + operatorParam2Weights["mat_add1"] = b1; + operatorParam2Weights["mat_mul2"] = w2; + operatorParam2Weights["mat_add2"] = b2; + operatorParam2Weights["mat_mul3"] = w3; + operatorParam2Weights["mat_add3"] = b3; + + std::cout << "Shape of mat_mul1 weight: " << w1.size() << ", " << w1[0].size() << std::endl; + } + + + + std::vector> extractSubweight(const std::vector>& matrix, int start, int n) { + std::vector> result; + + std::cout << "Extracting subweight" << std::endl; + std::cout << start << ", " << n << std::endl; + + // Ensure that the range [start, start + n) is within bounds + //int end = std::min(start + n, matrix.size()); + int end = start + n; + for (int i = start; i < end; ++i) { + result.push_back(matrix[i]); // Copy rows within the range + } + + return result; + } + + + + RowVectorPtr getTableFromCSVFile(VectorMaker & maker, std::string csvFilePath, std::string tableName, std::string joinKey) { + + std::ifstream file(csvFilePath.c_str()); + if (file.fail()) { + + std::cerr << "Error in reading data file:" << csvFilePath << std::endl; + exit(1); + + } + + std::string line; + + std::cout << tableName << std::endl; + + std::vector joinVec; + std::vector> featuresVec; + + // Ignore the first line (header) + if (std::getline(file, line)) { + std::cout << "Ignoring header: " << line << std::endl; + } + + int count = 0; + while (std::getline(file, line)) { + std::istringstream iss(line); + std::string numberStr; + std::vector features; + int colIndex = 0; + + while (std::getline(iss, numberStr, ',')) { + if (numberStr.size() >= 2 && numberStr.front() == '"' && numberStr.back() == '"') { + numberStr = numberStr.substr(1, numberStr.size() - 2); + } + + if (colIndex == 0) { + joinVec.push_back(std::stoi(numberStr)); + } + else { + features.push_back(std::stof(numberStr)); + } + colIndex += 1; + } + featuresVec.push_back(features); + count += 1; + } + file.close(); + + std::string featureCol = "f_" + tableName; + auto joinVector = maker.flatVector(joinVec); + auto featuresVector = maker.arrayVector(featuresVec, REAL()); + auto tableRowVector = maker.rowVector( + {joinKey, featureCol}, {joinVector, featuresVector} + ); + + std::vector cols = {joinKey, featureCol}; + tabel2Columns[tableName] = cols; + totalFeatures += featuresVec[0].size(); + + FeatureStatus fs1; + fs1.isFeature = 0; + allFeatureStatus[joinKey] = fs1; + + FeatureStatus fs2; + fs2.isFeature = 1; + fs2.isPushed = 0; + fs2.vectorSize = featuresVec[0].size(); + allFeatureStatus[featureCol] = fs2; + + return tableRowVector; + } + + + + +int sampleQuery() { + + return 29; + +} + +int sampleModel() { + + return 0; + +} + + +void sampleNNModelArch(int numInputFeatures, NNModelContext & nn) { + + nn.inputFeatures = numInputFeatures; + nn.numLayers = 3; + nn.hiddenLayerNeurons = 16; + nn.outputLayerNeurons = 2; + +} + +void sampleDTModelArch (int numInputFeatures, DTModelContext & dt) { + + dt.inputFeatures = numInputFeatures; + dt.treeDepth = 8; + +} + +bool replace(std::string& str, const std::string& from, const std::string& to) { + size_t start_pos = str.find(from); + if(start_pos == std::string::npos) + return false; + str.replace(start_pos, from.length(), to); + return true; +} + + + +void registerNNFunction(std::string op_name, std::vector> weightMatrice, int dim1, int dim2) { + + if (op_name.find("mat_mul") != std::string::npos) { + auto nnWeightVector = maker.arrayVector(weightMatrice, REAL()); + exec::registerVectorFunction( + op_name, + MatrixMultiply::signatures(), + std::make_unique( + std::move(nnWeightVector->elements()->values()->asMutable()), dim1, dim2) + ); + std::cout << "Registered a mat_mul function of name " << op_name << " with dimension " << dim1 << ", " << dim2 << endl; + } + + else if (op_name.find("mat_add") != std::string::npos) { + auto nnWeightVector = maker.arrayVector(weightMatrice, REAL()); + exec::registerVectorFunction( + op_name, + MatrixVectorAddition::signatures(), + std::make_unique( + std::move(nnWeightVector->elements()->values()->asMutable()), dim1) + ); + std::cout << "Registered a mat_add function of name " << op_name << " with dimension " << dim1 << endl; + } + + else if (op_name.find("relu") != std::string::npos) { + exec::registerVectorFunction( + op_name, Relu::signatures(), std::make_unique(), + {}, + true); + std::cout << "Registered a relu function of name " << op_name << endl; + } + + else if (op_name.find("softmax") != std::string::npos) { + exec::registerVectorFunction( + op_name, Softmax::signatures(), std::make_unique()); + std::cout << "Registered a softmax function of name " << op_name << endl; + } + + else if (op_name.find("vector_addition") != std::string::npos) { + exec::registerVectorFunction( + op_name, + VectorAddition::signatures(), + std::make_unique(dim1) + ); + std::cout << "Registered a vector_addition function of name " << op_name << " with dimension " << dim1 << endl; + } + + +} + + + + + +float * genWeight(int dim1, int dim2) { + + int total_size = dim1 * dim2; + + //generate weight matrix + float * weight = new float[total_size]; + for (int i = 0; i < total_size; i++) { + if (i % 2 == 0) { + weight[i] = 1.0; + } else { + weight[i] = 0.0; + } + } + return weight; +} + +bool addModelInferenceToQueryPlanAfterFactorize(PlanBuilder & planBuilder, std::shared_ptr planNodeIdGenerator) { + + std::string modelProjString = ""; + std::vector> emptyMatrix; + for (int i = modelOperators.size() - 1; i > 0; i--) { + std::string opName = modelOperators[i]; + if (opName.find("mat_mul") != std::string::npos) { + std::vector> param = operatorParam2Weights[opName]; + int dim1 = param.size(); + int dim2 = param[0].size(); + registerNNFunction(opName, param, dim1, dim2); + } + else if (opName.find("mat_add") != std::string::npos) { + std::vector> param = operatorParam2Weights[opName]; + int dim1 = param.size(); + registerNNFunction(opName, param, dim1, -1); + } + else { + registerNNFunction(opName, emptyMatrix, -1, -1); + } + modelProjString += opName + "("; + } + modelProjString += "features"; + + for (int i = modelOperators.size() - 1; i > 0; i--) { + modelProjString += ")"; + } + modelProjString += " AS output"; + + std::cout << "Inference Part: " << modelProjString << endl; + planBuilder.project({modelProjString}); + + return true; + +} + +bool addModelInferenceToQueryPlan(PlanBuilder & planBuilder, std::shared_ptr planNodeIdGenerator) { + + std::string modelProjString = ""; + std::vector> emptyMatrix; + for (int i = modelOperators.size() - 1; i >= 0; i--) { + std::string opName = modelOperators[i]; + if (opName.find("mat_mul") != std::string::npos) { + std::vector> param = operatorParam2Weights[opName]; + int dim1 = param.size(); + int dim2 = param[0].size(); + registerNNFunction(opName, param, dim1, dim2); + } + else if (opName.find("mat_add") != std::string::npos) { + std::vector> param = operatorParam2Weights[opName]; + int dim1 = param.size(); + registerNNFunction(opName, param, dim1, -1); + } + else { + registerNNFunction(opName, emptyMatrix, -1, -1); + } + modelProjString += opName + "("; + } + modelProjString += "features"; + + for (int i = modelOperators.size() - 1; i >= 0; i--) { + modelProjString += ")"; + } + modelProjString += " AS output"; + + std::cout << "Inference Part: " << modelProjString << endl; + planBuilder.project({modelProjString}); + + return true; + +} + + +bool rewriteWithFactorization(PlanBuilder & planBuilder, std::shared_ptr planNodeIdGenerator, std::string joinOrderStr) { + + // Create a JSON reader and root object + Json::CharReaderBuilder readerBuilder; + Json::Value root; // Root will hold the parsed JSON array + std::string errors; + + // Parse the JSON string + std::istringstream stream(joinOrderStr); + if (!Json::parseFromStream(readerBuilder, stream, &root, &errors)) { + std::cerr << "Error parsing JSON: " << errors << "\n"; + return 1; + } + //JSON Reading successful" + + std::unordered_map sources; //with filters and projections pushed down; + + std::string outName; + std::string leftName; + std::string rightName; + + std::string firstOpName = modelOperators[0]; // name of operator of split layer + std::vector> firstWeight = operatorParam2Weights[firstOpName]; // weight of the split layer + int fCurrentTotal = 0; + int numCols = firstWeight.size(); // number of columns in split layer + int numNeurons = firstWeight[0].size(); // number of neurons in split layer + int k = 0; + int addIdx = 0; + + std::vector> emptyMatrix; + bool isAdditionRegistered = false; + + // Iterate through the array + for (const auto& item : root) { + std::cout << "ID: " << item["ID"].asString() << "\n"; + std::cout << "Left: " << item["Left"].asString() << "\n"; + std::cout << "Right: " << item["Right"].asString() << "\n"; + std::cout << "Pred: " << item["Pred"].asString() << "\n"; + std::cout << "ProbeKeys: " << item["ProbeKeys"].asString() << "\n"; + std::cout << "BuildKeys: " << item["BuildKeys"].asString() << "\n"; + + std::string joinId = item["ID"].asString(); + std::string leftTable = item["Left"].asString(); + std::string rightTable = item["Right"].asString(); + std::string probKeys = item["ProbeKeys"].asString(); + std::string buildKeys = item["BuildKeys"].asString(); + int NumDimLeft = item["NumDimLeft"].asInt(); + int NumDimRight = item["NumDimRight"].asInt(); + + std::string leftFactorizationKey = leftTable + "--->" + joinId; + std::string rightFactorizationKey = rightTable + "--->" + joinId; + + std::cout << "left factorization key: " << leftFactorizationKey << std::endl; + std::cout << "right factorization key: " << leftFactorizationKey << std::endl; + + bool isLeftTableNotLeaf = isInteger(leftTable); + + if (isLeftTableNotLeaf == false) { + // left table is leaf + + featureStartPos[leftTable] = fCurrentTotal; + fCurrentTotal += NumDimLeft; + + std::string fName = tabel2Columns[leftTable][1]; + + if (factorizationPlans[leftFactorizationKey] == 1) { + // Doing factorization of left edge + std::vector> subWeight = extractSubweight(firstWeight, featureStartPos[leftTable], NumDimLeft); + std::string newOpName = firstOpName + "_" + std::to_string(k); + registerNNFunction(newOpName, subWeight, NumDimLeft, numNeurons); + std::string fNewName = "factorized_" + std::to_string(k); + std::string fNewNameFull = newOpName + "(" + fName + ") AS " + fNewName; + tabel2Columns[leftTable][1] = fNewName; + k += 1; + + FeatureStatus fs; + fs.isFeature = 1; + fs.isPushed = 1; + fs.vectorSize = numNeurons; + allFeatureStatus[fNewName] = fs; + + auto leftPlan = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector[leftTable]}) + .project({tabel2Columns[leftTable][0], fNewNameFull}); + + sources[leftTable] = leftPlan; + } + else { + // Not Doing factorization of left edge + FeatureStatus fs = allFeatureStatus[fName]; + fs.featureStartPos = featureStartPos[leftTable]; + allFeatureStatus[fName] = fs; + + auto leftPlan = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector[leftTable]}) + .project({tabel2Columns[leftTable]}); + + sources[leftTable] = leftPlan; + } + + } + + bool isRightTableNotLeaf = isInteger(rightTable); + if (isRightTableNotLeaf == false) { + // right table is leaf + + featureStartPos[rightTable] = fCurrentTotal; + fCurrentTotal += NumDimRight; + + std::string fName = tabel2Columns[rightTable][1]; + + if (factorizationPlans[rightFactorizationKey] == 1) { + // Doing factorization of right edge + std::vector> subWeight = extractSubweight(firstWeight, featureStartPos[rightTable], NumDimRight); + std::string newOpName = firstOpName + "_" + std::to_string(k); + registerNNFunction(newOpName, subWeight, NumDimRight, numNeurons); + std::string fNewName = "factorized_" + std::to_string(k); + std::string fNewNameFull = newOpName + "(" + fName + ") AS " + fNewName; + tabel2Columns[rightTable][1] = fNewName; + k += 1; + + FeatureStatus fs; + fs.isFeature = 1; + fs.isPushed = 1; + fs.vectorSize = numNeurons; + allFeatureStatus[fNewName] = fs; + + auto rightPlan = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector[rightTable]}) + .project({tabel2Columns[rightTable][0], fNewNameFull}); + + sources[rightTable] = rightPlan; + } + else { + // Not Doing factorization of right edge + FeatureStatus fs = allFeatureStatus[fName]; + fs.featureStartPos = featureStartPos[rightTable]; + allFeatureStatus[fName] = fs; + + auto rightPlan = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector[rightTable]}) + .project({tabel2Columns[rightTable]}); + + sources[rightTable] = rightPlan; + } + + } + + featureStartPos[joinId] = featureStartPos[leftTable]; + + + PlanBuilder left, right, out; + + if (sources.count(leftTable) > 0) { + + left = sources[leftTable]; + + } + + if (sources.count(rightTable) > 0) { + + right = sources[rightTable]; + + } + + //compose join features + std::vector leftProj; + std::vector joinCols; + int totalPushed = 0; + + if (factorizationPlans[leftFactorizationKey] == 1) { + // Factorizing all left features that were not pushed earlier + int newFactorizedCount = 0; + for (int i = 0; i < tabel2Columns[leftTable].size(); i++) { + std::string fName = tabel2Columns[leftTable][i]; + if (allFeatureStatus.count(fName) <= 0) { + std::cout << "Feature status not found for feature " << fName << std::endl; + continue; + } + FeatureStatus fs = allFeatureStatus[fName]; + if (fs.isFeature == 1 && fs.isPushed == 0) { + + // Feature not pushed earlier, pushing now + std::vector> subWeight = extractSubweight(firstWeight, fs.featureStartPos, fs.vectorSize); + std::string newOpName = firstOpName + "_" + std::to_string(k); + registerNNFunction(newOpName, subWeight, fs.vectorSize, numNeurons); + std::string fNewName = "factorized_" + std::to_string(k); + std::string fNewNameFull = newOpName + "(" + fName + ") AS " + fNewName; + k += 1; + + FeatureStatus fs; + fs.isFeature = true; + fs.isPushed = 1; + fs.vectorSize = numNeurons; + allFeatureStatus[fNewName] = fs; + leftProj.push_back(fNewNameFull); + joinCols.push_back(fNewName); + newFactorizedCount += 1; + totalPushed += 1; + } + else { + // Feature not a feature or pushed earlier + leftProj.push_back(fName); + joinCols.push_back(fName); + if (fs.isFeature == 1) { + // Feature pushed earlier + totalPushed += 1; + } + } + } + + if (newFactorizedCount > 0) { + left = left.project({leftProj}); + } + } + else { + // Just adding all left columns without factorization + for (int i = 0; i < tabel2Columns[leftTable].size(); i++) { + std::string fName = tabel2Columns[leftTable][i]; + leftProj.push_back(fName); + joinCols.push_back(fName); + } + } + + + std::vector rightProj; + + if (factorizationPlans[rightFactorizationKey] == 1) { + // Factorizing all right features that were not pushed earlier + int newFactorizedCount = 0; + for (int i = 0; i < tabel2Columns[rightTable].size(); i++) { + std::string fName = tabel2Columns[rightTable][i]; + if (allFeatureStatus.count(fName) <= 0) { + std::cout << "Feature status not found for feature " << fName << std::endl; + continue; + } + FeatureStatus fs = allFeatureStatus[fName]; + if (fs.isFeature == 1 && fs.isPushed == 0) { + + // Feature not pushed earlier, pushing now + std::vector> subWeight = extractSubweight(firstWeight, fs.featureStartPos, fs.vectorSize); + std::string newOpName = firstOpName + "_" + std::to_string(k); + registerNNFunction(newOpName, subWeight, fs.vectorSize, numNeurons); + std::string fNewName = "factorized_" + std::to_string(k); + std::string fNewNameFull = newOpName + "(" + fName + ") AS " + fNewName; + k += 1; + + FeatureStatus fs; + fs.isFeature = true; + fs.isPushed = 1; + fs.vectorSize = numNeurons; + allFeatureStatus[fNewName] = fs; + rightProj.push_back(fNewNameFull); + joinCols.push_back(fNewName); + newFactorizedCount += 1; + totalPushed += 1; + } + else { + // Feature not a feature or pushed earlier + rightProj.push_back(fName); + joinCols.push_back(fName); + if (fs.isFeature == 1) { + // Feature pushed earlier + totalPushed += 1; + } + } + } + + if (newFactorizedCount > 0) { + right = right.project({rightProj}); + } + } + else { + // Just adding all right columns without factorization + for (int i = 0; i < tabel2Columns[rightTable].size(); i++) { + std::string fName = tabel2Columns[rightTable][i]; + rightProj.push_back(fName); + joinCols.push_back(fName); + } + } + + tabel2Columns[joinId] = joinCols; + // Writing join + out = left.hashJoin( + {item["ProbeKeys"].asString()}, + {item["BuildKeys"].asString()}, + right.planNode(), + "", + {joinCols} + ); + + if (totalPushed > 1) { + // some features were pushed before join, they need to be aggregated + std::vector joinColsNew; + std::vector joinProj; + + bool isFirstVec = true; + std::string projString = ""; + for (int i = 0; i < joinCols.size(); i++) { + std::string fName = joinCols[i]; + FeatureStatus fs = allFeatureStatus[fName]; + if (fs.isFeature == 0 || fs.isPushed == 0) { + joinColsNew.push_back(fName); + joinProj.push_back(fName); + continue; + } + + if (isFirstVec == true) { + projString = fName; + isFirstVec = false; + continue; + } + + std::string newOpName = "vector_addition"; + std::string opName = newOpName + "(" + projString + ", " + fName + ")"; + projString = opName; + if (isAdditionRegistered == false) { + registerNNFunction(newOpName, emptyMatrix, numNeurons, -1); + isAdditionRegistered = true; + } + } + + std::string fNewName = "added_vec_" + std::to_string(addIdx); + addIdx += 1; + projString += " AS " + fNewName; + joinColsNew.push_back(fNewName); + joinProj.push_back(projString); + + FeatureStatus fs; + fs.isFeature = true; + fs.isPushed = 1; + fs.vectorSize = numNeurons; + allFeatureStatus[fNewName] = fs; + + out = out.project({joinProj}); + tabel2Columns[joinId] = joinColsNew; + } + // Finished Writing join + + outName = joinId; + sources[outName] = out; + + } + + planBuilder = sources[outName]; + + std::cout << "After the last join" << std::endl; + std::vector joinProj; + std::vector joinCols; + int newPushedCount = 0; + + // iterate over the final join projection output to compute not pushed feature + for (int i = 0; i < tabel2Columns[outName].size(); i++) { + std::string fName = tabel2Columns[outName][i]; + FeatureStatus fs = allFeatureStatus[fName]; + if (fs.isFeature == 1) { + if (fs.isPushed == 0) { + + // found a feature in the final join output which were not pushed + std::vector> subWeight = extractSubweight(firstWeight, fs.featureStartPos, fs.vectorSize); + std::string newOpName = firstOpName + "_" + std::to_string(k); + std::cout << fName << ", " << fs.featureStartPos << ", " << fs.vectorSize << ", " << subWeight.size() << ", " << subWeight[0].size() << std::endl; + registerNNFunction(newOpName, subWeight, fs.vectorSize, numNeurons); + std::string fNewName = "factorized_" + std::to_string(k); + std::string fNewNameFull = newOpName + "(" + fName + ") AS " + fNewName; + k += 1; + + FeatureStatus fs; + fs.isFeature = true; + fs.isPushed = 1; + fs.vectorSize = numNeurons; + allFeatureStatus[fNewName] = fs; + joinProj.push_back(fNewNameFull); + joinCols.push_back(fNewName); + newPushedCount += 1; + } + else { + joinProj.push_back(fName); + joinCols.push_back(fName); + } + } + } + + if (newPushedCount > 0) { + // final join output projection changed + planBuilder = planBuilder.project({joinProj}); + } + + // perform aggregation of the final join output + std::string projString = joinCols[0]; + for (int i = 1; i < joinCols.size(); i++) { + std::string newOpName = "vector_addition"; + std::string opName = newOpName + "(" + projString + ", " + joinCols[i] + ")"; + projString = opName; + if (isAdditionRegistered == false) { + registerNNFunction(newOpName, emptyMatrix, numNeurons, -1); + isAdditionRegistered = true; + } + } + projString += " AS features"; + + planBuilder = planBuilder.project({projString}); + std::cout << "Plan: " << planBuilder.planNode()->toString(true, true) << std::endl; + return true; + +} + + + +bool writeWithoutFactorization(PlanBuilder & planBuilder, std::shared_ptr planNodeIdGenerator, std::string joinOrderStr) { + + // Create a JSON reader and root object + Json::CharReaderBuilder readerBuilder; + Json::Value root; // Root will hold the parsed JSON array + std::string errors; + + // Parse the JSON string + std::istringstream stream(joinOrderStr); + if (!Json::parseFromStream(readerBuilder, stream, &root, &errors)) { + std::cerr << "Error parsing JSON: " << errors << "\n"; + return 1; + } + + std::unordered_map sources; //with filters and projections pushed down; + + std::string outName; + std::string leftName; + std::string rightName; + std::vector projections; + + std::string firstOpName = modelOperators[0]; + std::vector> firstWeight = operatorParam2Weights[firstOpName]; + int fCurrentTotal = 0; + int numCols = firstWeight.size(); + int numNeurons = firstWeight[0].size(); + int k = 0; + + // Iterate through the array + for (const auto& item : root) { + std::cout << "ID: " << item["ID"].asString() << "\n"; + std::cout << "Left: " << item["Left"].asString() << "\n"; + std::cout << "Right: " << item["Right"].asString() << "\n"; + std::cout << "Pred: " << item["Pred"].asString() << "\n"; + std::cout << "ProbeKeys: " << item["ProbeKeys"].asString() << "\n"; + std::cout << "BuildKeys: " << item["BuildKeys"].asString() << "\n"; + + std::string joinId = item["ID"].asString(); + std::string leftTable = item["Left"].asString(); + std::string rightTable = item["Right"].asString(); + std::string probKeys = item["ProbeKeys"].asString(); + std::string buildKeys = item["BuildKeys"].asString(); + int NumDimLeft = item["NumDimLeft"].asInt(); + int NumDimRight = item["NumDimRight"].asInt(); + + bool isLeftTableNotLeaf = isInteger(leftTable); + if (isLeftTableNotLeaf == false) { + // left table is leaf + fCurrentTotal += NumDimLeft; + + auto leftPlan = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector[leftTable]}) + .project({tabel2Columns[leftTable]}); + + sources[leftTable] = leftPlan; + + } + + bool isRightTableNotLeaf = isInteger(rightTable); + if (isRightTableNotLeaf == false) { + // right table is leaf + fCurrentTotal += NumDimRight; + + auto rightPlan = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector[rightTable]}) + .project({tabel2Columns[rightTable]}); + + sources[rightTable] = rightPlan; + + } + + + PlanBuilder left, right, out; + + //retrieve the corresponding PlanBuilder + if (sources.count(leftTable) > 0) { + + left = sources[leftTable]; + + } + + if (sources.count(rightTable) > 0) { + + right = sources[rightTable]; + + } + + //compose join + projections.clear(); + // Access Projection if it exists + if (item.isMember("Projection")) { + for (const auto& proj : item["Projection"]) { + std::cout << proj << std::endl; + projections.push_back(proj.asString()); + } + } + + std::cout << "Writing join" << std::endl; + out = left.hashJoin( + {item["ProbeKeys"].asString()}, + {item["BuildKeys"].asString()}, + right.planNode(), + "", + {projections} + ); + std::cout << "Finished Writing join" << std::endl; + + outName = joinId; + sources[outName] = out; + + } + planBuilder = sources[outName]; + + // After the final join + std::string projString=""; + bool isFirst = true; + for (std::string proj : projections) { + FeatureStatus fs = allFeatureStatus[proj]; + if (fs.isFeature == 1) { + if (isFirst) { + projString += proj; + isFirst = false; + } else { + projString += "," + proj; + } + } + } + projString = "concat(" + projString + ") as features"; + + planBuilder = planBuilder.project({projString}); + std::cout << "Plan: " << planBuilder.planNode()->toString(true, true) << std::endl; + return true; + +} + + + +bool createAndExecuteQuery(std::string queryJsonStr, bool withFactorization) { + PlanBuilder planBuilder{pool_.get()}; + std::shared_ptr planNodeIdGenerator = std::make_shared(); + if (withFactorization) { + // Rewriting with factorization + rewriteWithFactorization(planBuilder, planNodeIdGenerator, queryJsonStr); + + // Step 2. Add model inference to query plan + addModelInferenceToQueryPlanAfterFactorize(planBuilder, planNodeIdGenerator); + } + else { + // Writing without factorization + writeWithoutFactorization(planBuilder, planNodeIdGenerator, queryJsonStr); + + // Step 2. Add model inference to query plan + addModelInferenceToQueryPlan(planBuilder, planNodeIdGenerator); + } + + //std::cout << "Plan: " << planBuilder.planNode()->toString(true, true) << std::endl; + auto myPlan = planBuilder.planNode(); + std::cout << myPlan->toString(true, true) << std::endl; + std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now(); + auto results = exec::test::AssertQueryBuilder(myPlan).copyResults(pool_.get()); + std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); + std::cout << "Time (sec) = " << (std::chrono::duration_cast(end - begin).count()) /1000000.0 << std::endl; + std::cout << "Results Size: " << results->size() << std::endl; + std::cout << "Results:" << results->toString(0, 5) << std::endl; + //std::cout << results->toString(0, results->size()) << std::endl; + return true; + +} + + +}; + +int main(int argc, char** argv) { + + setlocale(LC_TIME, "C"); + + folly::init(&argc, &argv, false); + memory::MemoryManager::initialize({}); + + RewriteFactorized rewriteObj; + + // Load table1 + std::cout << "Reading Table 1 CSV file" << std::endl; + RowVectorPtr table1Vec = rewriteObj.getTableFromCSVFile(rewriteObj.maker, "resources/data/synthetic_dataset_4_3/table_1.csv", "table_1", "join_key1"); + + std::cout << "Reading Table 2 CSV file" << std::endl; + // Load table2 + RowVectorPtr table2Vec = rewriteObj.getTableFromCSVFile(rewriteObj.maker, "resources/data/synthetic_dataset_4_3/table_2.csv", "table_2", "join_key2"); + + // Load table3 + std::cout << "Reading Table 3 CSV file" << std::endl; + RowVectorPtr table3Vec = rewriteObj.getTableFromCSVFile(rewriteObj.maker, "resources/data/synthetic_dataset_4_3/table_3.csv", "table_3", "join_key3"); + + // Load table4 + std::cout << "Reading Table 4 CSV file" << std::endl; + RowVectorPtr table4Vec = rewriteObj.getTableFromCSVFile(rewriteObj.maker, "resources/data/synthetic_dataset_4_3/table_4.csv", "table_4", "join_key4"); + + + rewriteObj.tableName2RowVector["table_1"] = table1Vec; + rewriteObj.tableName2RowVector["table_2"] = table2Vec; + rewriteObj.tableName2RowVector["table_3"] = table3Vec; + rewriteObj.tableName2RowVector["table_4"] = table4Vec; + + + // retrieve the weights and set to map + std::cout << "Reading model parameters" << std::endl; + rewriteObj.findWeights("resources/model/dummy.h5"); + + // retrieve the model operators from model expression IR + std::string modelInput = "softmax3(mat_add3(mat_mul3(relu2(mat_add2(mat_mul2(relu1(mat_add1(mat_mul1(features)))))))))"; + std::cout << "Extracting model operators" << std::endl; + std::vector operators = rewriteObj.extractOperatorsInReverse(modelInput); + rewriteObj.modelOperators = operators; + + std::string jsonString = R"([ + {"ID":"0","Left":"table_1","Right":"table_2","Pred":"table_1.join_key1 = table_2.join_key2","ProbeKeys":"join_key1","BuildKeys":"join_key2","Projection":["join_key1","f_table_1","join_key2","f_table_2"],"NumTuplesLeft":1000,"NumDimLeft":20,"NumTuplesRight":300,"NumDimRight":39,"NumTuplesOutput":300,"NumDimOutput":59}, + {"ID":"1","Left":"0","Right":"table_3","Pred":"table_2.join_key2 = table_3.join_key3","ProbeKeys":"join_key2","BuildKeys":"join_key3","Projection":["join_key1","f_table_1","join_key2","f_table_2","join_key3","f_table_3"],"NumTuplesLeft":300,"NumDimLeft":59,"NumTuplesRight":12000,"NumDimRight":31,"NumTuplesOutput":12000,"NumDimOutput":90} + ])"; + + std::cout << "Reading factorization Plan" << std::endl; + rewriteObj.findFactorizationPlans("resources/plans/factorization_plan4_3.txt"); + + std::cout << "Performing rewriting" << std::endl; + bool ret = rewriteObj.createAndExecuteQuery(jsonString, true); + + return 1; +} + diff --git a/resources/model/job_any_64.h5 b/resources/model/job_any_64.h5 new file mode 100644 index 000000000..f5fc94c52 Binary files /dev/null and b/resources/model/job_any_64.h5 differ diff --git a/velox/ml_functions/CMakeLists.txt b/velox/ml_functions/CMakeLists.txt index 801f56af7..126773280 100644 --- a/velox/ml_functions/CMakeLists.txt +++ b/velox/ml_functions/CMakeLists.txt @@ -149,9 +149,28 @@ target_link_libraries( ml_functions ) +add_executable(job_two_join_test tests/Job2WayJoin.cpp) +target_link_libraries( + job_two_join_test + velox_aggregates + velox_type + velox_vector + velox_vector_test_lib + velox_exec + velox_exec_test_lib + velox_tpch_connector + velox_memory + velox_common_base + velox_vector_fuzzer + openblas + ${TORCH_LIBRARIES} + jsoncpp_lib + h5cpp::h5cpp + hdf5_serial + ${HDF5_CXX_LIBRARIES} +) -#FIXME: model path issue needs to be fixed add_executable(fraud_detection_test tests/FraudDetectionTest.cpp) target_link_libraries( fraud_detection_test diff --git a/velox/ml_functions/tests/Job2WayJoin.cpp b/velox/ml_functions/tests/Job2WayJoin.cpp new file mode 100644 index 000000000..203d7d9c4 --- /dev/null +++ b/velox/ml_functions/tests/Job2WayJoin.cpp @@ -0,0 +1,1445 @@ +#include +//#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "velox/type/Type.h" +#include "velox/exec/tests/utils/HiveConnectorTestBase.h" +#include "velox/functions/prestosql/aggregates/RegisterAggregateFunctions.h" +#include "velox/functions/prestosql/registration/RegistrationFunctions.h" +#include "velox/ml_functions/DecisionTree.h" +#include "velox/ml_functions/XGBoost.h" +#include "velox/ml_functions/tests/MLTestUtility.h" +#include "velox/parse/TypeResolver.h" +#include "velox/ml_functions/VeloxDecisionTree.h" +#include "velox/common/file/FileSystems.h" +#include "velox/dwio/dwrf/reader/DwrfReader.h" +#include "velox/dwio/parquet/RegisterParquetReader.h" +#include "velox/dwio/parquet/RegisterParquetWriter.h" +#include +#include "velox/connectors/hive/HiveConfig.h" +#include "velox/ml_functions/functions.h" +#include "velox/ml_functions/Concat.h" +#include "velox/ml_functions/NNBuilder.h" +#include +#include +#include "velox/ml_functions/VeloxDecisionTree.h" +#include "velox/expression/VectorFunction.h" +#include "velox/vector/ComplexVector.h" +#include "velox/vector/FlatVector.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace ml; +using namespace facebook::velox; +using namespace facebook::velox::test; +using namespace facebook::velox::exec; +using namespace facebook::velox::exec::test; +using namespace facebook::velox::core; + +/* + * The structure to describe the context of a neural network model architecture + */ +struct NNModelContext { + int inputFeatures; + + int numLayers; + + int hiddenLayerNeurons; + + int outputLayerNeurons; +}; + +/* + * The structure to describe the context of a decision tree model architecture + */ + +struct DTModelContext { + int inputFeatures; + + int treeDepth; +}; + + +/* + * The structure to describe the push down status of a feature + */ +struct FeatureStatus { + int isFeature; + + int isPushed; + + int vectorSize; + + int featureStartPos; + +}; + + +class VectorAddition : public MLFunction { + public: + VectorAddition(int inputDims) { + dims.push_back(inputDims); + } + + void apply( + const SelectivityVector& rows, + std::vector& args, + const TypePtr& type, + exec::EvalCtx& context, + VectorPtr& output) const override { + BaseVector::ensureWritable(rows, type, context.pool(), output); + + /*auto input_elements1 = args[0]->as()->elements(); + float* input1Values = input_elements1->values()->asMutable(); + + auto input_elements2 = args[1]->as()->elements(); + float* input2Values = input_elements2->values()->asMutable();*/ + + BaseVector* left = args[0].get(); + BaseVector* right = args[1].get(); + + exec::LocalDecodedVector leftHolder(context, *left, rows); + auto decodedLeftArray = leftHolder.get(); + auto baseLeftArray = decodedLeftArray->base()->as()->elements(); + + exec::LocalDecodedVector rightHolder(context, *right, rows); + auto decodedRightArray = rightHolder.get(); + auto baseRightArray = decodedRightArray->base()->as()->elements(); + + float* input1Values = baseLeftArray->values()->asMutable(); + float* input2Values = baseRightArray->values()->asMutable(); + + int numInput = rows.size(); + + Eigen::Map< + Eigen::Matrix> + input1Matrix(input1Values, numInput, dims[0]); + Eigen::Map< + Eigen::Matrix> + input2Matrix(input2Values, numInput, dims[0]); + + std::vector> results; + + Eigen::Matrix sumMat = input1Matrix + input2Matrix; + for (int i = 0; i < numInput; i++) { + std::vector curVec( + sumMat.row(i).data(), + sumMat.row(i).data() + sumMat.cols()); + results.push_back(curVec); + } + + VectorMaker maker{context.pool()}; + output = maker.arrayVector(results, REAL()); + } + + static std::vector> signatures() { + return {exec::FunctionSignatureBuilder() + .argumentType("array(REAL)") + .argumentType("array(REAL)") + .returnType("array(REAL)") + .build()}; + } + + static std::string getName() { + return "vector_addition"; + }; + + float* getTensor() const override { + // TODO: need to implement + return nullptr; + } + + CostEstimate getCost(std::vector inputDims) { + // TODO: need to implement + return CostEstimate(0, inputDims[0], inputDims[1]); + } + +}; + + + + +class GetFeatureVec : public MLFunction { + public: + + void apply( + const SelectivityVector& rows, + std::vector& args, + const TypePtr& type, + exec::EvalCtx& context, + VectorPtr& output) const override { + BaseVector::ensureWritable(rows, type, context.pool(), output); + + int64_t vecSizeLarge = 0; + if (args.size() == 2) { + // an optional parameter can be passed to enable the GPU for mat_mul + vecSizeLarge = args[1]->as>()->valueAt(0); + } + int vecSize = static_cast(vecSizeLarge); + + std::vector> results; + + for (int i = 0; i < rows.size(); i++) { + std::vector vec; + + for (int j = 0; j < vecSize; j++) { + if (j % 2 == 0) + vec.push_back(1.0); + else + vec.push_back(0.0); + } + results.push_back(vec); + } + + VectorMaker maker{context.pool()}; + output = maker.arrayVector(results, REAL()); + } + + static std::vector> signatures() { + return {exec::FunctionSignatureBuilder() + .argumentType("array(REAL)") + .argumentType("BIGINT") + .returnType("array(REAL)") + .build()}; + } + + static std::string getName() { + return "get_feature_vec"; + }; + + float* getTensor() const override { + // TODO: need to implement + return nullptr; + } + + CostEstimate getCost(std::vector inputDims) { + // TODO: need to implement + return CostEstimate(0, inputDims[0], inputDims[1]); + } + +}; + + + +class Job2WayJoin : HiveConnectorTestBase { + public: + Job2WayJoin() { + // Register Presto scalar functions. + functions::prestosql::registerAllScalarFunctions(); + // Register Presto aggregate functions. + aggregate::prestosql::registerAllAggregateFunctions(); + // Register type resolver with DuckDB SQL parser. + parse::registerTypeResolver(); + // HiveConnectorTestBase::SetUp(); + // parquet::registerParquetReaderFactory(); + + auto hiveConnector = + connector::getConnectorFactory( + connector::hive::HiveConnectorFactory::kHiveConnectorName) + ->newConnector( + kHiveConnectorId, std::make_shared()); + connector::registerConnector(hiveConnector); + + // SetUp(); + } + + ~Job2WayJoin() {} + + void SetUp() override { + // TODO: not used for now + // HiveConnectorTestBase::SetUp(); + // parquet::registerParquetReaderFactory(); + } + + void TearDown() override { + HiveConnectorTestBase::TearDown(); + } + + void TestBody() override {} + + static void waitForFinishedDrivers(const std::shared_ptr& task) { + while (!task->isFinished()) { + usleep(1000); // 0.01 second. + } + } + + std::shared_ptr executor_{ + std::make_shared( + std::thread::hardware_concurrency())}; + + std::shared_ptr queryCtx_{ + std::make_shared(executor_.get())}; + + std::shared_ptr pool_{ + memory::MemoryManager::getInstance()->addLeafPool()}; + + VectorMaker maker{pool_.get()}; + + std::unordered_map tableName2RowVector; + std::unordered_map>> operatorParam2Weights; + std::unordered_map> tabel2Columns; + std::vector modelOperators; + + // Function to check if a string represents a valid integer + bool isInteger(const std::string& s) { + if (s.empty() || (s.size() > 1 && s[0] == '0')) { + return false; // prevent leading zeros for non-zero integers + } + + for (char c : s) { + if (!std::isdigit(c)) { + return false; + } + } + + return true; + } + + + int getStringIndex(const std::vector& strVec, const std::string& target) { + auto it = std::find(strVec.begin(), strVec.end(), target); + + if (it != strVec.end()) { + // Calculate the index + int index = std::distance(strVec.begin(), it); + return index; + } else { + return -1; + } + + } + + + + std::vector extractOperatorsInReverse(const std::string& input) { + std::vector operators; + std::regex operatorPattern(R"(([a-zA-Z_]+\d+)\()"); // Match patterns like mat_mul1(, mat_add2(, etc. + std::smatch match; + + std::string::const_iterator searchStart(input.cbegin()); + while (std::regex_search(searchStart, input.cend(), match, operatorPattern)) { + operators.push_back(match[1]); // Extract the operator name + searchStart = match.suffix().first; // Move the search start position + } + + // Reverse the order of operators to match actual execution order + std::reverse(operators.begin(), operators.end()); + return operators; + } + + + + + std::vector> loadHDF5Array(const std::string& filename, const std::string& datasetName, int doPrint) { + H5::H5File file(filename, H5F_ACC_RDONLY); + H5::DataSet dataset = file.openDataSet(datasetName); + H5::DataSpace dataspace = dataset.getSpace(); + + // Get the number of dimensions + int rank = dataspace.getSimpleExtentNdims(); + // std::cout << "Rank: " << rank << std::endl; + + // Allocate space for the dimensions + std::vector dims(rank); + + // Get the dataset dimensions + dataspace.getSimpleExtentDims(dims.data(), nullptr); + + size_t rows; + size_t cols; + + if (rank == 1) { + rows = dims[0]; + cols = 1; + } + else if (rank == 2) { + rows = dims[0]; + cols = dims[1]; + } else { + throw std::runtime_error("Unsupported rank: " + std::to_string(rank)); + } + + // Read data into a 1D vector + std::vector flatData(rows * cols); + dataset.read(flatData.data(), H5::PredType::NATIVE_FLOAT); + + // Convert to 2D vector + std::vector> result(rows, std::vector(cols)); + for (size_t i = 0; i < rows; ++i) { + for (size_t j = 0; j < cols; ++j) { + result[i][j] = flatData[i * cols + j]; + if (doPrint == 1) + std::cout << result[i][j] << ", "; + } + if (doPrint == 1) + std::cout << std::endl; + } + + // Close the dataset and file + dataset.close(); + file.close(); + + return result; + } + + + + void findWeights(const std::string& modelPath) { + // read the parameter weights from file + std::vector> w1 = loadHDF5Array(modelPath, "fc1.weight", 0); + std::vector> b1 = loadHDF5Array(modelPath, "fc1.bias", 0); + std::vector> w2 = loadHDF5Array(modelPath, "fc2.weight", 0); + std::vector> b2 = loadHDF5Array(modelPath, "fc2.bias", 0); + std::vector> w3 = loadHDF5Array(modelPath, "fc3.weight", 0); + std::vector> b3 = loadHDF5Array(modelPath, "fc3.bias", 0); + + // store the weights in map with same name as operator + operatorParam2Weights["mat_mul1"] = w1; + operatorParam2Weights["mat_add1"] = b1; + operatorParam2Weights["mat_mul2"] = w2; + operatorParam2Weights["mat_add2"] = b2; + operatorParam2Weights["mat_mul3"] = w3; + operatorParam2Weights["mat_add3"] = b3; + + std::cout << "Shape of mat_mul1 weight: " << w1.size() << ", " << w1[0].size() << std::endl; + std::cout << "Shape of mat_add1 weight: " << b1.size() << std::endl; + std::cout << "Shape of mat_mul2 weight: " << w2.size() << ", " << w2[0].size() << std::endl; + std::cout << "Shape of mat_add2 weight: " << b2.size() << std::endl; + std::cout << "Shape of mat_mul3 weight: " << w3.size() << ", " << w3[0].size() << std::endl; + std::cout << "Shape of mat_add3 weight: " << b3.size() << std::endl; + } + + + std::vector> extractSubweight(const std::vector>& matrix, int start, int n) { + std::vector> result; + + std::cout << "Extracting subweight" << std::endl; + std::cout << start << ", " << n << std::endl; + + // Ensure that the range [start, start + n) is within bounds + //int end = std::min(start + n, matrix.size()); + int end = start + n; + for (int i = start; i < end; ++i) { + result.push_back(matrix[i]); // Copy rows within the range + } + + return result; + } + + + RowVectorPtr getTableFromCSVFile( + VectorMaker& maker, + std::string csvFilePath, + std::string tableName, + int k) { + std::ifstream file(csvFilePath.c_str()); + if (file.fail()) { + std::cerr << "Error in reading data file:" << csvFilePath << std::endl; + exit(1); + } + + std::cout << tableName << std::endl; + + std::unordered_map> colName2colHeader; + + std::unordered_map> colName2colType; + + colName2colType["aka_name"] = {0, 0, 1, 1, 1, 1, 1, 1}; + colName2colType["aka_title"] = {0, 0, 1, 1, 0, 0, 1, 0, 0, 0, 1, 1}; + colName2colType["cast_info"] = {0, 0, 0, 0, 1, 0, 0}; + colName2colType["char_name"] = {0, 1, 1, 0, 1, 1, 1}; + colName2colType["comp_cast_type"] = {0, 1}; + colName2colType["company_name"] = {0, 1, 1, 0, 1, 1, 1}; + colName2colType["company_type"] = {0, 1}; + colName2colType["complete_cast"] = {0, 0, 0, 0}; + colName2colType["info_type"] = {0, 1}; + colName2colType["keyword"] = {0, 1, 1}; + colName2colType["kind_type"] = {0, 1}; + colName2colType["link_type"] = {0, 1}; + colName2colType["movie_companies"] = {0, 0, 0, 0, 1}; + colName2colType["movie_info_idx"] = {0, 0, 0, 1, 1}; + colName2colType["movie_keyword"] = {0, 0, 0}; + colName2colType["movie_link"] = {0, 0, 0, 0}; + colName2colType["name"] = {0, 1, 1, 0, 1, 1, 1, 1, 1}; + colName2colType["role_type"] = {0, 1}; + colName2colType["title"] = {0, 1, 1, 0, 0, 0, 1, 0, 0, 0, 1, 1}; + colName2colType["movie_info"] = {0, 0, 0, 1, 1}; + colName2colType["person_info"] = {0, 0, 0, 1, 1}; + + std::vector aka_name_columns = { + "id", + "person_id", + "name", + "imdb_index", + "name_pcode_cf", + "name_pcode_ndf", + "surname_pcode", + "md5sum", + "an_features"}; + + colName2colHeader["aka_name"] = aka_name_columns; + + std::vector aka_title_columns = { + "id", + "movie_id", + "title", + "imdb_index", + "kind_id", + "production_year", + "phonetic_code", + "episode_of_id", + "season_nr", + "episode_nr", + "note", + "md5sum", + "at_features"}; + + colName2colHeader["aka_title"] = aka_title_columns; + + std::vector cast_info_columns = { + "id", + "person_id", + "movie_id", + "person_role_id", + "note", + "nr_order", + "role_id", + "ci_features"}; + + colName2colHeader["cast_info"] = cast_info_columns; + + std::vector char_name_columns = { + "id", + "name", + "imdb_index", + "imdb_id", + "name_pcode_nf", + "surname_pcode", + "md5sum", + "chn_features"}; + + colName2colHeader["char_name"] = char_name_columns; + + std::vector comp_cast_type_columns = { + "id", "kind", "cct_features"}; + + colName2colHeader["comp_cast_type"] = comp_cast_type_columns; + + std::vector company_name_columns = { + "id", + "name", + "country_code", + "imdb_id", + "name_pcode_nf", + "name_pcode_sf", + "md5sum", + "cn_features"}; + + colName2colHeader["company_name"] = company_name_columns; + + std::vector company_type_columns = { + "id", "kind", "ct_features"}; + + colName2colHeader["company_type"] = company_type_columns; + + std::vector complete_cast_columns = { + "id", "movie_id", "subject_id", "status_id", "cc_features"}; + + colName2colHeader["complete_cast"] = complete_cast_columns; + + std::vector info_type_columns = {"id", "info", "it_features"}; + + colName2colHeader["info_type"] = info_type_columns; + + std::vector keyword_columns = { + "id", "keyword", "phonetic_code", "k_features"}; + + colName2colHeader["keyword"] = keyword_columns; + + std::vector kind_type_columns = {"id", "kind", "kt_features"}; + + colName2colHeader["kind_type"] = kind_type_columns; + + std::vector link_type_columns = {"id", "link", "lt_features"}; + + colName2colHeader["link_type"] = link_type_columns; + + std::vector movie_companies_columns = { + "id", + "movie_id", + "company_id", + "company_type_id", + "note", + "mc_features"}; + + colName2colHeader["movie_companies"] = movie_companies_columns; + + std::vector movie_info_idx_columns = { + "id", "movie_id", "info_type_id", "info", "note", "mii_features"}; + + colName2colHeader["movie_info_idx"] = movie_info_idx_columns; + + std::vector movie_keyword_columns = { + "id", "movie_id", "keyword_id", "mk_features"}; + + colName2colHeader["movie_keyword"] = movie_keyword_columns; + + std::vector movie_link_columns = { + "id", "movie_id", "linked_movie_id", "link_type_id", "ml_features"}; + + colName2colHeader["movie_link"] = movie_link_columns; + + std::vector name_columns = { + "id", + "name", + "imdb_index", + "imdb_id", + "gender", + "name_pcode_cf", + "name_pcode_nf", + "surname_pcode", + "md5sum", + "n_features"}; + + colName2colHeader["name"] = name_columns; + + std::vector role_type_columns = {"id", "role", "rt_features"}; + + colName2colHeader["role_type"] = role_type_columns; + + std::vector title_columns = { + "id", + "title", + "imdb_index", + "kind_id", + "production_year", + "imdb_id", + "phonetic_code", + "episode_of_id", + "season_nr", + "episode_nr", + "series_years", + "md5sum", + "t_features"}; + + colName2colHeader["title"] = title_columns; + + std::vector movie_info_columns = { + "id", "movie_id", "info_type_id", "info", "note", "mi_features"}; + + colName2colHeader["movie_info"] = movie_info_columns; + + std::vector person_info_columns = { + "id", "person_id", "info_type_id", "info", "note", "pi_features"}; + + colName2colHeader["person_info"] = person_info_columns; + + std::string line; + + std::vector> intCols; + + std::vector> stringCols; + + std::vector colTypeIndex = colName2colType[tableName]; + + std::vector colIndexInType; + + int colIndex = 0; + + std::string cell; + + int numRows = 0; + + while (std::getline(file, line)) { + // std::cout << line << std::endl; + + // analyze the first line + std::stringstream iss(line); + + bool fragmentFlag = false; + + std::string fragmentedStr; + + colIndex = 0; + + // The JOB tables only have two types of columns: integer and string + while (std::getline(iss, cell, ',')) { + if ((fragmentFlag == false) && (cell.size() == 1) && (cell[0] == '"')) { + fragmentFlag = true; + + fragmentedStr = ","; + + continue; + + } else if ( + (fragmentFlag == true) && (cell.size() == 1) && (cell[0] == '"')) { + fragmentFlag = false; + + cell = fragmentedStr; + + fragmentedStr = ""; + + } else if ( + (fragmentFlag == false) && (cell[0] == '"') && + ((cell[cell.size() - 1] != '"') || + ((cell[cell.size() - 1] == '"') && + (cell[cell.size() - 2] == '\\')))) { + fragmentFlag = true; + + fragmentedStr = cell; + + continue; + + } else if ( + (fragmentFlag == true) && (cell[0] != '"') && + (cell[cell.size() - 1] == '"') && (cell[cell.size() - 2] != '\\')) { + fragmentFlag = false; + + fragmentedStr += cell; + + cell = fragmentedStr; + + fragmentedStr = ""; + + } else if (fragmentFlag == true) { + fragmentedStr += cell; + + continue; + } + + // std::cout << colIndex << ":" << cell << std::endl; + + if (!fragmentFlag) { + if (!colTypeIndex[colIndex]) { + // this is an integer column + + if (numRows == 0) { + if (cell == "") + + intCols.push_back(std::vector{INT_MIN}); + + else + + intCols.push_back(std::vector{stoi(cell)}); + + colIndexInType.push_back(intCols.size() - 1); + + } else { + int vecIndex = colIndexInType[colIndex]; + + if (cell == "") + + intCols[vecIndex].push_back(INT_MIN); + + else + + intCols[vecIndex].push_back(stoi(cell)); + } + + } else { + // this is a string column + + if (numRows == 0) { + stringCols.push_back(std::vector{cell}); + + colIndexInType.push_back(stringCols.size() - 1); + + } else { + int vecIndex = colIndexInType[colIndex]; + + stringCols[vecIndex].push_back(cell); + } + } + + colIndex++; + } + } + + if (colIndex < colTypeIndex.size()) { + // std::cout << "colIndex:" << colIndex << std::endl; + // std::cout << "colTypeIndex.size():"<< colTypeIndex.size() << + // std::endl; + + for (int i = colIndex; i < colTypeIndex.size(); i++) { + if (!colTypeIndex[i]) { + if (numRows == 0) { + intCols.push_back(std::vector{INT_MIN}); + + colIndexInType.push_back(intCols.size() - 1); + + } else { + int vecIndex = colIndexInType[i]; + + intCols[vecIndex].push_back(INT_MIN); + } + + } else { + if (numRows == 0) { + stringCols.push_back(std::vector{""}); + + colIndexInType.push_back(stringCols.size() - 1); + + } else { + int vecIndex = colIndexInType[i]; + + stringCols[vecIndex].push_back(""); + } + } + } + } + + colIndex = colTypeIndex.size(); + + /*if (numRows == 0) { + + for (int i = 0; i < colIndex; i++) { + + std::cout << colTypeIndex[i] << ":" << colIndexInType[i] << + std::endl; + + } + + }*/ + + numRows++; + } + + std::vector vecs; + + std::cout << "Building RowVector for this table with " << colIndex + << " columns and " << numRows << " rows." << std::endl; + + for (int i = 0; i < colIndex; i++) { + int type = colTypeIndex[i]; + + int vecIndex = colIndexInType[i]; + + // std::cout << i << ":" << type << ":" << vecIndex << std::endl; + + if (!type) { + auto vec = maker.flatVector(intCols[vecIndex]); + + vecs.push_back(vec); + + } else { + auto vec = maker.flatVector(stringCols[vecIndex]); + + vecs.push_back(vec); + } + } + + // to create the last column, which is a feature vector of length k + + if (k < 0) + k = 8; + + std::vector> inputVectors; + + for (int i = 0; i < numRows; i++) { + std::vector inputVector; + + for (int j = 0; j < k; j++) { + if (j % 2 == 0) + + inputVector.push_back(1.0); + + else + + inputVector.push_back(0.0); + } + + inputVectors.push_back(inputVector); + } + + auto inputArrayVector = maker.arrayVector(inputVectors, REAL()); + + vecs.push_back(inputArrayVector); + + RowVectorPtr myRowVector = + maker.rowVector(colName2colHeader[tableName], vecs); + + return myRowVector; + } + + int sampleQuery() { + return 29; + } + + int sampleModel() { + return 0; + } + + void sampleNNModelArch(int numInputFeatures, NNModelContext& nn) { + nn.inputFeatures = numInputFeatures; + nn.numLayers = 3; + nn.hiddenLayerNeurons = 16; + nn.outputLayerNeurons = 2; + } + + void sampleDTModelArch(int numInputFeatures, DTModelContext& dt) { + dt.inputFeatures = numInputFeatures; + dt.treeDepth = 8; + } + + bool replace(std::string& str, const std::string& from, const std::string& to) { + size_t start_pos = str.find(from); + if (start_pos == std::string::npos) + return false; + str.replace(start_pos, from.length(), to); + return true; + } + + + void registerNNFunction(const std::string& op_name, const std::vector>& weightMatrice, int dim1, int dim2) { + + if (op_name.find("mat_mul") != std::string::npos) { + auto nnWeightVector = maker.arrayVector(weightMatrice, REAL()); + exec::registerVectorFunction( + op_name, + MatrixMultiply::signatures(), + std::make_unique( + nnWeightVector->elements()->values()->asMutable(), dim1, dim2) + ); + std::cout << "Registered a mat_mul function of name " << op_name << " with dimension " << dim1 << ", " << dim2 << endl; + } + + else if (op_name.find("mat_add") != std::string::npos) { + auto nnWeightVector = maker.arrayVector(weightMatrice, REAL()); + exec::registerVectorFunction( + op_name, + MatrixVectorAddition::signatures(), + std::make_unique( + nnWeightVector->elements()->values()->asMutable(), dim1) + ); + std::cout << "Registered a mat_add function of name " << op_name << " with dimension " << dim1 << endl; + } + + else if (op_name.find("relu") != std::string::npos) { + exec::registerVectorFunction( + op_name, Relu::signatures(), std::make_unique(), + {}, + true); + std::cout << "Registered a relu function of name " << op_name << endl; + } + + else if (op_name.find("softmax") != std::string::npos) { + exec::registerVectorFunction( + op_name, Softmax::signatures(), std::make_unique()); + std::cout << "Registered a softmax function of name " << op_name << endl; + } + + else if (op_name.find("vector_addition") != std::string::npos) { + exec::registerVectorFunction( + op_name, + VectorAddition::signatures(), + std::make_unique(dim1) + ); + std::cout << "Registered a vector_addition function of name " << op_name << " with dimension " << dim1 << endl; + } + + +} + + + + + std::unordered_map getAllTableSources(std::shared_ptr planNodeIdGenerator) { + std::unordered_map + sources; // with filters and projections pushed down; + + auto an_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["aka_name"]}) + .project({"person_id as an_person_id", "name as an_name", "imdb_index as an_imdb_index", "an_features"}); + tabel2Columns["an"] = {"an_person_id", "an_name", "an_imdb_index", "an_features"}; + sources["an"] = an_a; + + auto at_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["aka_title"]}) + .project({"id as at_id", "movie_id as at_movie_id", "title as at_title", "imdb_index as at_imdb_index", "kind_id as at_kind_id", "at_features"}); + tabel2Columns["at"] = {"at_id", "at_movie_id", "at_title", "at_imdb_index", "at_kind_id", "at_features"}; + sources["at"] = at_a; + + auto ci_a = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["cast_info"]}) + .project( + {"movie_id as ci_movie_id", + "person_id as ci_person_id", + "role_id as ci_role_id", + "person_role_id as ci_person_role_id", + "ci_features"}) + .limit(0, 3624434, false); + tabel2Columns["ci"] = {"ci_movie_id", + "ci_person_id", + "ci_role_id", + "ci_person_role_id", + "ci_features"}; + sources["ci"] = ci_a; + + auto chn_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["char_name"]}) + .project({"id as chn_id", "name as chn_name", "imdb_index as chn_imdb_index", "imdb_id as chn_imdb_id", "chn_features"}); + tabel2Columns["chn"] = {"chn_id", "chn_name", "chn_imdb_index", "chn_imdb_id", "chn_features"}; + sources["chn"] = chn_a; + + auto cc_a = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["complete_cast"]}) + .project({"id as cc_id", "movie_id as cc_movie_id", "subject_id as cc_subject_id", "status_id as cc_status_id", "cc_features"}); + tabel2Columns["cc"] = {"cc_id", "cc_movie_id", "cc_subject_id", "cc_status_id", "cc_features"}; + sources["cc"] = cc_a; + + auto cct_a = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["comp_cast_type"]}) + .project({"id as cct_id", "kind as cct_kind", "cct_features"}); + tabel2Columns["cct"] = {"cct_id", "cct_kind", "cct_features"}; + sources["cct"] = cct_a; + + + auto cn_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["company_name"]}) + .project({"id as cn_id", "name as cn_name", "country_code as cn_country_code", "imdb_id as cn_imdb_id", "cn_features"}); + tabel2Columns["cn"] = {"cn_id", "cn_name", "cn_country_code", "cn_imdb_id", "cn_features"}; + sources["cn"] = cn_a; + + auto ct_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["company_type"]}) + .project({"id as ct_id", "kind as ct_kind", "ct_features"}); + tabel2Columns["ct"] = {"ct_id", "ct_kind", "ct_features"}; + sources["ct"] = ct_a; + + auto it_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["info_type"]}) + .project({"id as it_id", "info as it_info", "it_features"}); + tabel2Columns["it"] = {"it_id", "it_info", "it_features"}; + sources["it"] = it_a; + + auto kt_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["kind_type"]}) + .project({"id as kt_id", "kind as kt_kind", "kt_features"}); + tabel2Columns["kt"] = {"kt_id", "kt_kind", "kt_features"}; + sources["kt"] = kt_a; + + auto lt_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["link_type"]}) + .project({"id as lt_id", "link as lt_link", "lt_features"}); + tabel2Columns["lt"] = {"lt_id", "lt_link", "lt_features"}; + sources["lt"] = lt_a; + + auto k_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["keyword"]}) + .project({"id as k_id", "keyword as k_keyword", "k_features"}); + tabel2Columns["k"] = {"k_id", "k_keyword", "k_features"}; + sources["k"] = k_a; + + auto mc_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["movie_companies"]}) + .project({"id as mc_id", "movie_id as mc_movie_id", "company_id as mc_company_id", "company_type_id as mc_company_type_id", "mc_features"}); + tabel2Columns["mc"] = {"mc_id", "mc_movie_id", "mc_company_id", "mc_company_type_id", "mc_features"}; + sources["mc"] = mc_a; + + auto mi_a = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["movie_info"]}) + .project({"id as mi_id", "movie_id as mi_movie_id", "info_type_id as mi_info_type_id", "info as mi_info", "mi_features"}); + tabel2Columns["mi"] = {"mi_id", "mi_movie_id", "mi_info_type_id", "mi_info", "mi_features"}; + sources["mi"] = mi_a; + + auto mii_a = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["movie_info_idx"]}) + .project({"id as mii_id", "movie_id as mii_movie_id", "info_type_id as mii_info_type_id", "info as mii_info", "mii_features"}); + tabel2Columns["mii"] = {"mii_id", "mii_movie_id", "mii_info_type_id", "mii_info", "mii_features"}; + sources["mii"] = mii_a; + + auto mk_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["movie_keyword"]}) + .project({"id as mk_id", "movie_id as mk_movie_id", "keyword_id as mk_keyword_id", "mk_features"}); + tabel2Columns["mk"] = {"mk_id", "mk_movie_id", "mk_keyword_id", "mk_features"}; + sources["mk"] = mk_a; + + auto ml_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["movie_link"]}) + .project({"id as ml_id", "movie_id as ml_movie_id", "linked_movie_id as ml_linked_movie_id", "link_type_id as ml_link_type_id", "ml_features"}); + tabel2Columns["ml"] = {"ml_id", "ml_movie_id", "ml_linked_movie_id", "ml_link_type_id", "ml_features"}; + sources["ml"] = ml_a; + + auto n_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["name"]}) + .project({"id as n_id", "name as n_name", "imdb_index as n_imdb_index", "imdb_id as n_imdb_id", "n_features"}); + tabel2Columns["n"] = {"n_id", "n_name", "n_imdb_index", "n_imdb_id", "n_features"}; + sources["n"] = n_a; + + auto pi_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["person_info"]}) + .project( + {"id as pi_id", + "person_id as pi_person_id", + "info_type_id as pi_info_type_id", + "pi_features"}); + tabel2Columns["pi"] = {"pi_id", "pi_person_id", "pi_info_type_id", "pi_features"}; + sources["pi"] = pi_a; + + auto rt_a = PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["role_type"]}) + .project({"id as rt_id", "role as rt_role", "rt_features"}); + tabel2Columns["rt"] = {"rt_id", "rt_role", "rt_features"}; + sources["rt"] = rt_a; + + auto t_a = + PlanBuilder(planNodeIdGenerator, pool_.get()) + .values({tableName2RowVector["title"]}) + .project({"id as t_id", "title as t_title", "imdb_index as t_imdb_index", "kind_id as t_kind_id", "imdb_id as t_imdb_id", "t_features"}); + tabel2Columns["t"] = {"t_id", "t_title", "t_imdb_index", "t_kind_id", "t_imdb_id", "t_features"}; + sources["t"] = t_a; + + return sources; + } + + + +bool addModelInferenceToQueryPlanAfterFactorize(PlanBuilder & planBuilder, std::shared_ptr planNodeIdGenerator, const std::string& colFeature) { + + std::string modelProjString = ""; + std::vector> emptyMatrix; + for (int i = modelOperators.size() - 1; i > 0; i--) { + std::string opName = modelOperators[i]; + if (opName.find("mat_mul") != std::string::npos) { + std::vector> param = operatorParam2Weights[opName]; + int dim1 = param.size(); + int dim2 = param[0].size(); + registerNNFunction(opName, param, dim1, dim2); + } + else if (opName.find("mat_add") != std::string::npos) { + std::vector> param = operatorParam2Weights[opName]; + int dim1 = param.size(); + registerNNFunction(opName, param, dim1, -1); + } + else { + registerNNFunction(opName, emptyMatrix, -1, -1); + } + modelProjString += opName + "("; + } + modelProjString += colFeature; + + for (int i = modelOperators.size() - 1; i > 0; i--) { + modelProjString += ")"; + } + modelProjString += " AS output"; + + std::cout << "Inference Part: " << modelProjString << endl; + planBuilder.project({modelProjString}); + + return true; +} + + + +/* +param pushDown: 0 -> no push, 1 -> all push, 2 -> left push, 3 -> right push +*/ +bool createAndExecuteQuery(std::string leftTable, std::string rightTable, std::string probKey, std::string buildKey, int dimLeft, int dimRight, int pushDown) { + std::string firstOpName = modelOperators[0]; // name of operator of split layer + std::vector> firstWeight = operatorParam2Weights[firstOpName]; // weight of the split layer + int numCols = firstWeight.size(); // number of columns in split layer + int numNeurons = firstWeight[0].size(); // number of neurons in split layer + + //registering vector addition operator for later use as aggregation + std::vector> emptyMatrix; + registerNNFunction("vector_addition", emptyMatrix, numNeurons, -1); + + PlanBuilder planBuilder{pool_.get()}; + std::shared_ptr planNodeIdGenerator = std::make_shared(); + + // Fetch plan builders for each plan + std::unordered_map sources = getAllTableSources(planNodeIdGenerator); + + // fetch plan builders corresponding to left and right table + PlanBuilder leftPlan = sources[leftTable]; + PlanBuilder rightPlan = sources[rightTable]; + + // form the left and right feature columns + std::string lFeatureName = leftTable + "_features"; + std::string rFeatureName = rightTable + "_features"; + + std::vector projections; + projections.push_back(probKey); + projections.push_back(buildKey); + + std::string fNewName; //new name after applying operator on feature + std::string fNewNameFull; // fNewName with its full projection details + + // checking if push left feature + if (pushDown == 1 || pushDown == 2) { + std::vector> subWeight = extractSubweight(firstWeight, 0, dimLeft); + std::string newOpName = firstOpName + "_left"; + registerNNFunction(newOpName, subWeight, dimLeft, numNeurons); + fNewName = "factorized_left"; + fNewNameFull = newOpName + "(get_feature_vec(" + lFeatureName + ", " + std::to_string(dimLeft) + ")) AS " + fNewName; + } + else { + fNewName = "mapped_left"; + fNewNameFull = "get_feature_vec(" + lFeatureName + ", " + std::to_string(dimLeft) + ") AS " + fNewName; + } + projections.push_back(fNewName); + leftPlan= leftPlan.project({probKey, fNewNameFull}); + + // checking if push right feature + if (pushDown == 1 || pushDown == 3) { + std::vector> subWeight = extractSubweight(firstWeight, dimLeft, dimRight); + std::string newOpName = firstOpName + "_right"; + registerNNFunction(newOpName, subWeight, dimRight, numNeurons); + fNewName = "factorized_right"; + fNewNameFull = newOpName + "(get_feature_vec(" + rFeatureName + ", " + std::to_string(dimRight) + ")) AS " + fNewName; + } + else { + fNewName = "mapped_right"; + fNewNameFull = "get_feature_vec(" + rFeatureName + ", " + std::to_string(dimRight) + ") AS " + fNewName; + } + projections.push_back(fNewName); + rightPlan= rightPlan.project({buildKey, fNewNameFull}); + + // Perform the join + PlanBuilder out = leftPlan.hashJoin( + {probKey}, + {buildKey}, + rightPlan.planNode(), + "", + {projections} + ); + + // Apply operators after join + std::string projString; + + if (pushDown == 0) { + // no features pushed, so concatenate them and apply first operator in the model + registerNNFunction(firstOpName, firstWeight, dimLeft + dimRight, numNeurons); + projString = firstOpName + "(concat(mapped_left, mapped_right)) AS features"; + } + else if (pushDown == 1) { + // all features pushed, so just perform aggregation + projString = "vector_addition(factorized_left, factorized_right) AS features"; + } + else if (pushDown == 2) { + // only left features pushed, so apply first operator on right feature and perform aggregation + std::vector> subWeight = extractSubweight(firstWeight, dimLeft, dimRight); + std::string newOpName = firstOpName + "_right"; + registerNNFunction(newOpName, subWeight, dimRight, numNeurons); + std::string fNewNameFull = newOpName + "(mapped_right)"; + + projString = "vector_addition(factorized_left, " + fNewNameFull + ") AS features"; + } + + else { + // only right features pushed, so apply first operator on left feature and perform aggregation + std::vector> subWeight = extractSubweight(firstWeight, 0, dimLeft); + std::string newOpName = firstOpName + "_left"; + registerNNFunction(newOpName, subWeight, dimLeft, numNeurons); + std::string fNewNameFull = newOpName + "(mapped_left)"; + + projString = "vector_addition(" + fNewNameFull + ", factorized_right) AS features"; + } + + planBuilder = out.project({projString}); + addModelInferenceToQueryPlanAfterFactorize(planBuilder, planNodeIdGenerator, "features"); + + auto myPlan = planBuilder.planNode(); + std::cout << myPlan->toString(true, true) << std::endl; + std::chrono::steady_clock::time_point begin = std::chrono::steady_clock::now(); + auto results = exec::test::AssertQueryBuilder(myPlan).copyResults(pool_.get()); + std::chrono::steady_clock::time_point end = std::chrono::steady_clock::now(); + std::cout << "Time (sec) = " << (std::chrono::duration_cast(end - begin).count()) /1000000.0 << std::endl; + std::cout << "Results Size: " << results->size() << std::endl; + std::cout << "Results:" << results->toString(0, 5) << std::endl; + return true; + } + +}; + + +int main(int argc, char** argv) { + setlocale(LC_TIME, "C"); + malloc_trim(0); + + folly::init(&argc, &argv, false); + memory::MemoryManager::initialize({}); + + Job2WayJoin bench; + std::cout + << "[WARNING] the data path is hardcoded and needs to be modified accordingly." + << std::endl; + + // Load Aka Name table + RowVectorPtr akaNameVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/aka_name.csv", "aka_name", 8); + + // Load Aka Title table + RowVectorPtr akaTitleVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/aka_title.csv", "aka_title", 8); + + // Load Cast Info table + RowVectorPtr castInfoVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/cast_info.csv", "cast_info", 8); + + // Load Char Name table + RowVectorPtr charNameVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/char_name.csv", "char_name", 8); + + // Load Comp Cast Type table + RowVectorPtr compCastTypeVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/comp_cast_type.csv", "comp_cast_type", 8); + + // Load Company Name table + RowVectorPtr companyNameVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/company_name.csv", "company_name", 8); + + // Load Company Type table + RowVectorPtr companyTypeVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/company_type.csv", "company_type", 8); + + // Load Complete Cast table + RowVectorPtr completeCastVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/complete_cast.csv", "complete_cast", 8); + + // Load Info Type table + RowVectorPtr infoTypeVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/info_type.csv", "info_type", 8); + + // Load Keyword table + RowVectorPtr keywordVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/keyword.csv", "keyword", 8); + + // Load Kind Type table + RowVectorPtr kindTypeVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/kind_type.csv", "kind_type", 8); + + // Load Link Type table + RowVectorPtr linkTypeVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/link_type.csv", "link_type", 8); + + // Load Movie Companies table + RowVectorPtr movieCompaniesVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/movie_companies.csv", "movie_companies", 8); + + // Load Movie Info Index table + RowVectorPtr movieInfoIdxVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/movie_info_idx.csv", "movie_info_idx", 8); + + // Load Movie Keyword table + RowVectorPtr movieKeywordVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/movie_keyword.csv", "movie_keyword", 8); + + // Load Movie Link table + RowVectorPtr movieLinkVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/movie_link.csv", "movie_link", 8); + + // Load Name table + RowVectorPtr nameVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/name.csv", "name", 8); + + // Load Role Type table + RowVectorPtr roleTypeVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/role_type.csv", "role_type", 8); + + // Load Title table + RowVectorPtr titleVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/title.csv", "title", 8); + + // Load Movie Info table + RowVectorPtr movieInfoVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb/movie_info.csv", "movie_info", 8); + + // Load Person Info table + RowVectorPtr personInfoVec = bench.getTableFromCSVFile( + bench.maker, "resources/data/imdb//person_info.csv", "person_info", 8); + + bench.tableName2RowVector["aka_name"] = akaNameVec; + + bench.tableName2RowVector["aka_title"] = akaTitleVec; + + bench.tableName2RowVector["cast_info"] = castInfoVec; + + bench.tableName2RowVector["char_name"] = charNameVec; + + bench.tableName2RowVector["comp_cast_type"] = compCastTypeVec; + + bench.tableName2RowVector["company_name"] = companyNameVec; + + bench.tableName2RowVector["company_type"] = companyTypeVec; + + bench.tableName2RowVector["complete_cast"] = completeCastVec; + + bench.tableName2RowVector["info_type"] = infoTypeVec; + + bench.tableName2RowVector["keyword"] = keywordVec; + + bench.tableName2RowVector["kind_type"] = kindTypeVec; + + bench.tableName2RowVector["link_type"] = linkTypeVec; + + bench.tableName2RowVector["movie_companies"] = movieCompaniesVec; + + bench.tableName2RowVector["movie_info_idx"] = movieInfoIdxVec; + + bench.tableName2RowVector["movie_keyword"] = movieKeywordVec; + + bench.tableName2RowVector["movie_link"] = movieLinkVec; + + bench.tableName2RowVector["name"] = nameVec; + + bench.tableName2RowVector["role_type"] = roleTypeVec; + + bench.tableName2RowVector["title"] = titleVec; + + bench.tableName2RowVector["movie_info"] = movieInfoVec; + + bench.tableName2RowVector["person_info"] = personInfoVec; + + + exec::registerVectorFunction( + "get_feature_vec", + GetFeatureVec::signatures(), + std::make_unique()); + std::cout << "Completed registering function for get_feature_vec" << std::endl; + + // retrieve the weights and set to map + std::cout << "Reading model parameters" << std::endl; + bench.findWeights("resources/model/job_any_64.h5"); + + // retrieve the model operators from model expression IR + std::string modelInput = "softmax3(mat_add3(mat_mul3(relu2(mat_add2(mat_mul2(relu1(mat_add1(mat_mul1(features)))))))))"; + std::cout << "Extracting model operators" << std::endl; + std::vector operators = bench.extractOperatorsInReverse(modelInput); + bench.modelOperators = operators; + + std::cout << "Performing Join" << std::endl; + bool ret = bench.createAndExecuteQuery("ci", "rt", "ci_role_id", "rt_id", 50, 50, 1); + + return ret; +} +