Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions INSTALL_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,16 @@ PostgresML can be installed from source code or using `apt-get` in Ubuntu.

https://postgresml.org/docs/open-source/pgml/developers/self-hosting/building-from-source

https://web.archive.org/web/20241210195416/https://postgresml.org/docs/open-source/pgml/developers/self-hosting/building-from-source

```bash
sudo apt-get install build-essential clang cmake pkg-config libssl-dev binutils lld

git clone https://github.com/postgresml/postgresml && \
cd postgresml && \
git checkout 13e98ec90b370ac4570288031555ab1cc0eb4512 && \
git submodule update --init --recursive && \
cd pgml-extension
cargo install cargo-pgrx --version 0.12.9
cargo pgrx install
```
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ To run other workloads on CactusDB, please refer to the [**this file**](./velox/

## Development Guide

Please check [this file](/DEVELOP_GUIDE.md) to see the supported ML kernels and how to implement the pipeline within CactusDB. Other functions/APIs can be checked through [**this link**](https://asu-cactus.github.io/CactusDB_Doc/annotated.html)
Please check [this file](/DEVELOP_GUIDE.md) to see the supported ML kernels and how to implement the pipeline within CactusDB. Other functions/APIs can be checked through [**this link**](https://sigmod2026-a.github.io/cactusdb.github.io/)

## FAQ

Expand Down
61 changes: 57 additions & 4 deletions db-ml/baseline/load_data_to_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def load_movielens_recommendation_to_datastore():
print("[INFO] load movielens dataset to hadoop success!")


def load_tpcxai_final_to_datastore():
def load_tpcxai_sf1_final_to_datastore():
# table needs to be loaded into the database
# order, lineitem, product, financial_account, financial_transactions
conn_string = utils.get_postgres_connection_config()
Expand Down Expand Up @@ -186,6 +186,56 @@ def load_tpcxai_final_to_datastore():

print("[INFO] load movielens dataset to hadoop success!")

def load_tpcxai_sf10_final_to_datastore():
# table needs to be loaded into the database
# order, lineitem, product, financial_account, financial_transactions
conn_string = utils.get_postgres_connection_config()
db = create_engine(conn_string)
conn = db.connect()

data_dir = "/home/velox/resources/data/tpcxai_sf10/final"
parquet_files_to_load = [
# "order",
# "lineitem",
# "product",
# "financial_account",
# "financial_transactions",
# "store_dept",
# "product_rating",
"customer",
"order_returns",
"review",
]

conn_params = utils.get_connectorx_configuration()

for file in parquet_files_to_load:
for dataset in ["training", "serving"]:
if dataset == "training" and file == "store_dept":
continue
df_parquet_path = os.path.join(data_dir, dataset, "{}.parquet".format(file))
table_name = "tpcxai_{}_{}".format(file, dataset)
utils.load_parquet_to_postgres(df_parquet_path, table_name, conn_params)

print("[INFO] load movielens dataset to postgres success!")

# check hdfs path exist
data_path = "/user/velox/data/tpcxai"
if not utils.check_hdfs_dir_exist(data_path):
utils.create_hdfs_dir(data_path)

for file in parquet_files_to_load:
for dataset in ["training", "serving"]:
path_in_hdfs = os.path.join(data_path, "{}_{}".format(file, dataset))
utils.load_csv_to_hdfs(
"/home/velox/resources/data/tpcxai_sf10/final/{}/{}.parquet".format(
dataset, file
),
path_in_hdfs,
overwrite=True,
)

print("[INFO] load movielens dataset to hadoop success!")

def load_movielens_to_postgres():
conn_string = utils.get_postgres_connection_config()
Expand Down Expand Up @@ -432,15 +482,18 @@ def main():
# load_movielens_to_postgres()
# load_ffnn_data_to_postgres()
load_movielens_recommendation_to_datastore()
load_tpcxai_final_to_datastore()
load_tpcxai_sf1_final_to_datastore()
load_tpcxai_sf10_final_to_datastore()
elif dataset == "movielens":
load_movielens_to_postgres()
elif dataset == "ffnn":
load_ffnn_data_to_postgres()
elif dataset == "movielens_recommendation":
load_movielens_recommendation_to_datastore()
elif dataset == "tpcxai":
load_tpcxai_final_to_datastore()
elif dataset == "tpcxai-sf1":
load_tpcxai_sf1_final_to_datastore()
elif dataset == "tpcxai-sf10":
load_tpcxai_sf10_final_to_datastore()


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion db-ml/baseline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4290,7 +4290,7 @@ def model_inference_impl(self, data):
num_product = len(self.bi)

# for i in tqdm(range(5)):
for i in tqdm(range(num_input)):
for i in tqdm(range(int(num_input))):
try:
user_id = data[i, 0]
product_id = data[i, 1]
Expand Down
61 changes: 61 additions & 0 deletions resources/split_tpcxai_sf10_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import resources_utils as ru
import pyarrow.parquet as pq
import pandas as pd
import numpy as np
import math
import os
import resources_utils


def create_tpcxai_dataset(data_dir):
NUM_SPLIT = 4
df_dict = {}
for root, dirs, files in os.walk(data_dir):
for file in files:
if not file.endswith(".parquet"):
print("[INFO] Skipping file: ", file)
continue
dataset_name = file.split(".")[0]
df = pq.read_table(os.path.join(root, file)).to_pandas()

# Convert timestamp columns to varchar
for col in df.select_dtypes(include=["datetime64[ns]"]).columns:
df[col] = df[col].dt.strftime("%Y-%m-%d %H:%M:%S")

df_dict[dataset_name] = df
batch_size = math.ceil(len(df) / NUM_SPLIT)
target_dir_path = os.path.join(
root.replace("data", "./data/parquet"), dataset_name
)
ru.remove_all_in_directory(target_dir_path)
print("[INFO] Writing parquet files to: ", target_dir_path)
ru.write_parquet(df, batch_size, target_dir_path)
num_rows, num_cols = df.shape
with open(
os.path.join(
root.replace("data", "./data/parquet"), dataset_name + "_stats.txt"
),
"w",
) as f:
f.write(f"{num_rows}\n")
f.write(f"{num_cols}\n")


create_tpcxai_dataset("data/tpcxai_sf10/final/training")
create_tpcxai_dataset("data/tpcxai_sf10/final/serving")

column_sparsity_map = {}
column_sparsity_map = resources_utils.count_sparsity_over_data(
"data/tpcxai_sf10/final/training", column_sparsity_map
)
with open("./data/parquet/tpcxai_sf10/final/training/sparsity.txt", "w") as f:
for col, sparsity in column_sparsity_map.items():
f.write(f"{col} {sparsity}\n")

column_sparsity_map = {}
column_sparsity_map = resources_utils.count_sparsity_over_data(
"data/tpcxai_sf10/final/serving", column_sparsity_map
)
with open("./data/parquet/tpcxai_sf10/final/serving/sparsity.txt", "w") as f:
for col, sparsity in column_sparsity_map.items():
f.write(f"{col} {sparsity}\n")
21 changes: 20 additions & 1 deletion velox/optimizer/Helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ create_blocks(int row, int col, float* values, int block_size) {
? col - current_col
: block_size; // Adjust block size for the last block

// Create a new block of size row x current_block_size
// Create a new block of size row x current_block_size
std::vector<float> block(row * current_block_size);

// Fill the block with values
Expand Down Expand Up @@ -771,6 +771,25 @@ std::string reformatComparisonExprWOCast(const std::string& exprStr) {
}
}

std::string reformatNonTargetExprs(const std::string& input) {
std::string output = input;

// 1. Replace ROW["x"] with x
output = std::regex_replace(output, std::regex("ROW\\[\"(\\w+)\"\\]"), "$1");

// 2. Replace lambda ROW<x:TYPE> -> with x ->
output = std::regex_replace(output, std::regex("lambda\\s+ROW<([a-zA-Z_][a-zA-Z0-9_]*):[a-zA-Z]+>\\s*->"), "$1 ->");

// 3. Replace cast x as real -> CAST(x AS REAL)
output = std::regex_replace(output, std::regex("cast\\s+(\\w+)\\s+as\\s+(\\w+)", std::regex_constants::icase), "CAST($1 AS $2)");

// 4. Remove quotes around simple literals (but not for strings with spaces or punctuation)
output = std::regex_replace(output, std::regex("\"([A-Za-z0-9_]+)\""), "$1");

return output;
}


/**
* @brief Function to reformat the comparison expression to a standard format,
* the input following the format:
Expand Down
24 changes: 22 additions & 2 deletions velox/optimizer/tests/BenchmarkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,9 @@ PlanBuilder setupMovielensDBQuery(
PlanBuilder(planNodeIdGenerator, pool_.get())
.tableScan(movieTagDataRowType, {}, "")
.capturePlanNodeId(readMovieTagDataPlanNodeId)
.project(
{"mt_movie_id",
"relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score"})
.hashJoin(
{"mt_movie_id"},
{"m_movie_id"},
Expand Down Expand Up @@ -1624,6 +1627,17 @@ PlanBuilder setupMovielensDBQuery(
"m_popularity",
"m_vote_average",
"m_vote_count"})
.project(
{"u_user_id",
"u_age",
"u_gender",
"u_occupation",
"m_movie_id",
"mt_relevance_score",
"mt_movie_id",
"m_popularity",
"m_vote_average",
"m_vote_count"})
.project({
"u_user_id",
"u_age",
Expand All @@ -1632,7 +1646,9 @@ PlanBuilder setupMovielensDBQuery(
"transform(array_constructor(if (u_gender = 'M', 1, 0)), x->Cast(x AS real)) as u_gender",
"m_movie_id",
"mt_movie_id",
"relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score",
"m_vote_average",
"m_vote_count",
"mt_relevance_score",
"llm_ffnn_minmax_scaler(transform(array_constructor(m_popularity, m_vote_average, m_vote_count), x-> CAST(X as REAL))) AS m_trending_features",
"llm_ffnn_interest_scaler(transform(array_constructor(u_age, u_occupation), x-> CAST(X as REAL))) AS u_interest_features",
})
Expand Down Expand Up @@ -1951,6 +1967,10 @@ PlanBuilder setupMovielensDBQuery(
PlanBuilder(planNodeIdGenerator, pool_.get())
.tableScan(movieTagDataRowType, {}, "")
.capturePlanNodeId(readMovieTagDataPlanNodeId)
.project({
"mt_movie_id",
"relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score",
})
.hashJoin(
{"mt_movie_id"},
{"m_movie_id"},
Expand Down Expand Up @@ -1981,7 +2001,7 @@ PlanBuilder setupMovielensDBQuery(
"m_vote_count"})
.project({
"m_movie_id",
"relu(mat_vector_add10_4(mat_mul10_3(relu(mat_vector_add10_2(mat_mul10_1(mt_relevance_score)))))) AS mt_relevance_score",
"mt_relevance_score",
"mt_movie_id",
"m_popularity",
"m_vote_average",
Expand Down