diff --git a/INSTALL_DEPENDENCIES.md b/INSTALL_DEPENDENCIES.md index 38dc3174d..d52991d3b 100644 --- a/INSTALL_DEPENDENCIES.md +++ b/INSTALL_DEPENDENCIES.md @@ -240,7 +240,16 @@ PostgresML can be installed from source code or using `apt-get` in Ubuntu. https://postgresml.org/docs/open-source/pgml/developers/self-hosting/building-from-source +https://web.archive.org/web/20241210195416/https://postgresml.org/docs/open-source/pgml/developers/self-hosting/building-from-source + ```bash +sudo apt-get install build-essential clang cmake pkg-config libssl-dev binutils lld + +git clone https://github.com/postgresml/postgresml && \ + cd postgresml && \ + git checkout 13e98ec90b370ac4570288031555ab1cc0eb4512 && \ + git submodule update --init --recursive && \ + cd pgml-extension cargo install cargo-pgrx --version 0.12.9 cargo pgrx install ``` diff --git a/README.md b/README.md index 92b21ca09..19e416e15 100644 --- a/README.md +++ b/README.md @@ -146,7 +146,7 @@ To run other workloads on CactusDB, please refer to the [**this file**](./velox/ ## Development Guide -Please check [this file](/DEVELOP_GUIDE.md) to see the supported ML kernels and how to implement the pipeline within CactusDB. Other functions/APIs can be checked through [**this link**](https://asu-cactus.github.io/CactusDB_Doc/annotated.html) +Please check [this file](/DEVELOP_GUIDE.md) to see the supported ML kernels and how to implement the pipeline within CactusDB. Other functions/APIs can be checked through [**this link**](https://sigmod2026-a.github.io/cactusdb.github.io/) ## FAQ diff --git a/db-ml/baseline/load_data_to_db.py b/db-ml/baseline/load_data_to_db.py index 3e752d4f2..ac52dd5b9 100644 --- a/db-ml/baseline/load_data_to_db.py +++ b/db-ml/baseline/load_data_to_db.py @@ -135,7 +135,7 @@ def load_movielens_recommendation_to_datastore(): print("[INFO] load movielens dataset to hadoop success!") -def load_tpcxai_final_to_datastore(): +def load_tpcxai_sf1_final_to_datastore(): # table needs to be loaded into the database # order, lineitem, product, financial_account, financial_transactions conn_string = utils.get_postgres_connection_config() @@ -186,6 +186,56 @@ def load_tpcxai_final_to_datastore(): print("[INFO] load movielens dataset to hadoop success!") +def load_tpcxai_sf10_final_to_datastore(): + # table needs to be loaded into the database + # order, lineitem, product, financial_account, financial_transactions + conn_string = utils.get_postgres_connection_config() + db = create_engine(conn_string) + conn = db.connect() + + data_dir = "/home/velox/resources/data/tpcxai_sf10/final" + parquet_files_to_load = [ + # "order", + # "lineitem", + # "product", + # "financial_account", + # "financial_transactions", + # "store_dept", + # "product_rating", + "customer", + "order_returns", + "review", + ] + + conn_params = utils.get_connectorx_configuration() + + for file in parquet_files_to_load: + for dataset in ["training", "serving"]: + if dataset == "training" and file == "store_dept": + continue + df_parquet_path = os.path.join(data_dir, dataset, "{}.parquet".format(file)) + table_name = "tpcxai_{}_{}".format(file, dataset) + utils.load_parquet_to_postgres(df_parquet_path, table_name, conn_params) + + print("[INFO] load movielens dataset to postgres success!") + + # check hdfs path exist + data_path = "/user/velox/data/tpcxai" + if not utils.check_hdfs_dir_exist(data_path): + utils.create_hdfs_dir(data_path) + + for file in parquet_files_to_load: + for dataset in ["training", "serving"]: + path_in_hdfs = os.path.join(data_path, "{}_{}".format(file, dataset)) + utils.load_csv_to_hdfs( + "/home/velox/resources/data/tpcxai_sf10/final/{}/{}.parquet".format( + dataset, file + ), + path_in_hdfs, + overwrite=True, + ) + + print("[INFO] load movielens dataset to hadoop success!") def load_movielens_to_postgres(): conn_string = utils.get_postgres_connection_config() @@ -432,15 +482,18 @@ def main(): # load_movielens_to_postgres() # load_ffnn_data_to_postgres() load_movielens_recommendation_to_datastore() - load_tpcxai_final_to_datastore() + load_tpcxai_sf1_final_to_datastore() + load_tpcxai_sf10_final_to_datastore() elif dataset == "movielens": load_movielens_to_postgres() elif dataset == "ffnn": load_ffnn_data_to_postgres() elif dataset == "movielens_recommendation": load_movielens_recommendation_to_datastore() - elif dataset == "tpcxai": - load_tpcxai_final_to_datastore() + elif dataset == "tpcxai-sf1": + load_tpcxai_sf1_final_to_datastore() + elif dataset == "tpcxai-sf10": + load_tpcxai_sf10_final_to_datastore() if __name__ == "__main__": diff --git a/db-ml/baseline/pipeline.py b/db-ml/baseline/pipeline.py index 025098b12..e7735b8de 100644 --- a/db-ml/baseline/pipeline.py +++ b/db-ml/baseline/pipeline.py @@ -4290,7 +4290,7 @@ def model_inference_impl(self, data): num_product = len(self.bi) # for i in tqdm(range(5)): - for i in tqdm(range(num_input)): + for i in tqdm(range(int(num_input))): try: user_id = data[i, 0] product_id = data[i, 1] diff --git a/resources/split_tpcxai_sf10_database.py b/resources/split_tpcxai_sf10_database.py new file mode 100644 index 000000000..1c5f42906 --- /dev/null +++ b/resources/split_tpcxai_sf10_database.py @@ -0,0 +1,61 @@ +import resources_utils as ru +import pyarrow.parquet as pq +import pandas as pd +import numpy as np +import math +import os +import resources_utils + + +def create_tpcxai_dataset(data_dir): + NUM_SPLIT = 4 + df_dict = {} + for root, dirs, files in os.walk(data_dir): + for file in files: + if not file.endswith(".parquet"): + print("[INFO] Skipping file: ", file) + continue + dataset_name = file.split(".")[0] + df = pq.read_table(os.path.join(root, file)).to_pandas() + + # Convert timestamp columns to varchar + for col in df.select_dtypes(include=["datetime64[ns]"]).columns: + df[col] = df[col].dt.strftime("%Y-%m-%d %H:%M:%S") + + df_dict[dataset_name] = df + batch_size = math.ceil(len(df) / NUM_SPLIT) + target_dir_path = os.path.join( + root.replace("data", "./data/parquet"), dataset_name + ) + ru.remove_all_in_directory(target_dir_path) + print("[INFO] Writing parquet files to: ", target_dir_path) + ru.write_parquet(df, batch_size, target_dir_path) + num_rows, num_cols = df.shape + with open( + os.path.join( + root.replace("data", "./data/parquet"), dataset_name + "_stats.txt" + ), + "w", + ) as f: + f.write(f"{num_rows}\n") + f.write(f"{num_cols}\n") + + +create_tpcxai_dataset("data/tpcxai_sf10/final/training") +create_tpcxai_dataset("data/tpcxai_sf10/final/serving") + +column_sparsity_map = {} +column_sparsity_map = resources_utils.count_sparsity_over_data( + "data/tpcxai_sf10/final/training", column_sparsity_map +) +with open("./data/parquet/tpcxai_sf10/final/training/sparsity.txt", "w") as f: + for col, sparsity in column_sparsity_map.items(): + f.write(f"{col} {sparsity}\n") + +column_sparsity_map = {} +column_sparsity_map = resources_utils.count_sparsity_over_data( + "data/tpcxai_sf10/final/serving", column_sparsity_map +) +with open("./data/parquet/tpcxai_sf10/final/serving/sparsity.txt", "w") as f: + for col, sparsity in column_sparsity_map.items(): + f.write(f"{col} {sparsity}\n") diff --git a/velox/optimizer/Helper.h b/velox/optimizer/Helper.h index 8d05b9725..0a3f5d2c9 100644 --- a/velox/optimizer/Helper.h +++ b/velox/optimizer/Helper.h @@ -148,7 +148,7 @@ create_blocks(int row, int col, float* values, int block_size) { ? col - current_col : block_size; // Adjust block size for the last block - // Create a new block of size row x current_block_size + // Create a new block of size row x current_block_size std::vector block(row * current_block_size); // Fill the block with values @@ -771,6 +771,25 @@ std::string reformatComparisonExprWOCast(const std::string& exprStr) { } } +std::string reformatNonTargetExprs(const std::string& input) { + std::string output = input; + + // 1. Replace ROW["x"] with x + output = std::regex_replace(output, std::regex("ROW\\[\"(\\w+)\"\\]"), "$1"); + + // 2. Replace lambda ROW -> with x -> + output = std::regex_replace(output, std::regex("lambda\\s+ROW<([a-zA-Z_][a-zA-Z0-9_]*):[a-zA-Z]+>\\s*->"), "$1 ->"); + + // 3. Replace cast x as real -> CAST(x AS REAL) + output = std::regex_replace(output, std::regex("cast\\s+(\\w+)\\s+as\\s+(\\w+)", std::regex_constants::icase), "CAST($1 AS $2)"); + + // 4. Remove quotes around simple literals (but not for strings with spaces or punctuation) + output = std::regex_replace(output, std::regex("\"([A-Za-z0-9_]+)\""), "$1"); + + return output; +} + + /** * @brief Function to reformat the comparison expression to a standard format, * the input following the format: diff --git a/velox/optimizer/tests/BenchmarkUtils.h b/velox/optimizer/tests/BenchmarkUtils.h index 1739aebe2..d48b25d7e 100644 --- a/velox/optimizer/tests/BenchmarkUtils.h +++ b/velox/optimizer/tests/BenchmarkUtils.h @@ -1585,6 +1585,9 @@ PlanBuilder setupMovielensDBQuery( PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(movieTagDataRowType, {}, "") .capturePlanNodeId(readMovieTagDataPlanNodeId) + .project( + {"mt_movie_id", + "relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score"}) .hashJoin( {"mt_movie_id"}, {"m_movie_id"}, @@ -1624,6 +1627,17 @@ PlanBuilder setupMovielensDBQuery( "m_popularity", "m_vote_average", "m_vote_count"}) + .project( + {"u_user_id", + "u_age", + "u_gender", + "u_occupation", + "m_movie_id", + "mt_relevance_score", + "mt_movie_id", + "m_popularity", + "m_vote_average", + "m_vote_count"}) .project({ "u_user_id", "u_age", @@ -1632,7 +1646,9 @@ PlanBuilder setupMovielensDBQuery( "transform(array_constructor(if (u_gender = 'M', 1, 0)), x->Cast(x AS real)) as u_gender", "m_movie_id", "mt_movie_id", - "relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score", + "m_vote_average", + "m_vote_count", + "mt_relevance_score", "llm_ffnn_minmax_scaler(transform(array_constructor(m_popularity, m_vote_average, m_vote_count), x-> CAST(X as REAL))) AS m_trending_features", "llm_ffnn_interest_scaler(transform(array_constructor(u_age, u_occupation), x-> CAST(X as REAL))) AS u_interest_features", }) @@ -1951,6 +1967,10 @@ PlanBuilder setupMovielensDBQuery( PlanBuilder(planNodeIdGenerator, pool_.get()) .tableScan(movieTagDataRowType, {}, "") .capturePlanNodeId(readMovieTagDataPlanNodeId) + .project({ + "mt_movie_id", + "relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score", + }) .hashJoin( {"mt_movie_id"}, {"m_movie_id"}, @@ -1981,7 +2001,7 @@ PlanBuilder setupMovielensDBQuery( "m_vote_count"}) .project({ "m_movie_id", - "relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score", + "mt_relevance_score", "mt_movie_id", "m_popularity", "m_vote_average",