From 299bad05fb903fddd70d32b4958560937ce3aba7 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 4 Nov 2022 01:52:22 +0000 Subject: [PATCH 1/5] Code Changes to Support Criteo_Dense --- .../decisionTree/experiments/config.json | 16 ++++ .../experiments/data_processing.py | 85 ++++++++++++++++++- 2 files changed, 97 insertions(+), 4 deletions(-) diff --git a/model-inference/decisionTree/experiments/config.json b/model-inference/decisionTree/experiments/config.json index 76198d19..008f7729 100644 --- a/model-inference/decisionTree/experiments/config.json +++ b/model-inference/decisionTree/experiments/config.json @@ -174,5 +174,21 @@ "filename": "", "table": "", "header": false + }, + "criteo_dense": { + "num_features": 1000000, + "rows": 8572, + "batch_size": 100000, + "query_size": 100000, + "type": "classification", + "info": "https://www.kaggle.com/c/criteo-display-ad-challenge/", + "query": "SELECT * from criteo_dense", + "create": "", + "train": 0, + "test": 1, + "y_col": "label", + "filename": "criteo_dense", + "table": "criteo_dense", + "header": true } } diff --git a/model-inference/decisionTree/experiments/data_processing.py b/model-inference/decisionTree/experiments/data_processing.py index 32d39ae3..345acb4d 100644 --- a/model-inference/decisionTree/experiments/data_processing.py +++ b/model-inference/decisionTree/experiments/data_processing.py @@ -29,8 +29,9 @@ def parse_arguments(): 'bosch', 'covtype', 'criteo', - 'tpcxai_fraud'], - help="Dataset to be processed. Choose from ['higgs', 'airline_regression', 'airline_classification', 'fraud', 'year', 'epsilon', 'bosch', 'covtype','tpcxai_fraud','criteo']") + 'tpcxai_fraud', + 'criteo_dense'], + help="Dataset to be processed. Choose from ['higgs', 'airline_regression', 'airline_classification', 'fraud', 'year', 'epsilon', 'bosch', 'covtype','tpcxai_fraud','criteo', 'criteo_dense']") parser.add_argument("-n", "--nrows", type=int, help="Load nrows of the dataset. Warning: only use in development.") parser.add_argument("-sf","--scalefactor", type=int, help="Relevant only for TPCxAI_Fraud. Takes one of the values in 1, 3, 10 and 30") @@ -244,6 +245,24 @@ def prepare_criteo(dataset_folder): if not (os.path.isfile(train_path) and os.path.isfile(test_path)): os.system(f"tar -Jxf {local_url} -C {dataset_folder}") +def prepare_criteo_dense(dataset_folder): + prepare_criteo(dataset_folder) + read_lines = 5e6 # 6042135 + num_features = 1000000 + test_path = relative2abspath(dataset_folder, "criteo.kaggle2014.svm", "test.txt.svm") + if os.path.isfile(test_path): + test_df_features, test_df_labels = datasets.load_svmlight_file(test_path, n_features=num_features, length=read_lines) + # print(test_df_features.todense(), test_df_labels[..., np.newaxis]) + # print(np.append(test_df_features.todense(), test_df_labels[..., np.newaxis], axis=1).shape) + test_df = pd.DataFrame(np.append(test_df_features.todense(), test_df_labels[..., np.newaxis], axis=1), columns=[f'feature_{idx}' for idx in range(num_features)]+['label']) + print('Test Dataset Shape:',test_df.shape) + # print(test_df.shape) + # test_df = test_df.join(pd.DataFrame(test_df_labels, columns=['label'])) + # print(test_df[['feature_1']].describe()) + return test_df + return None + + def get_connection(pgsqlconfig): return psycopg2.connect( database=pgsqlconfig["dbname"], @@ -276,6 +295,10 @@ def make_query(dataset, datasetconfig, column_names): feature_names = ", ".join([f"{col_name} DECIMAL NOT NULL" for col_name in column_names]) label_name = f"{datasetconfig['y_col']} INTEGER NOT NULL" create_query = f"CREATE TABLE ** ({feature_names}, {label_name})" + elif dataset == "criteo_dense": + feature_names = '''"row" double precision[]''' + label_name = f"{datasetconfig['y_col']} INTEGER NOT NULL" + create_query = f"CREATE TABLE ** ({label_name}, {feature_names})" else: create_query = datasetconfig["create"] train_create_query = create_query.replace( @@ -387,11 +410,11 @@ def create_tables( print('-'*50) df = pd.concat([df,prepare_tpcxai_fraud_transactions(dataset_folder, nrows=partition_size, skip_rows=range(1,partition_size*i))]) print(f'Final Shape of DataFrame: {df.shape}') - elif dataset == 'criteo': prepare_criteo(dataset_folder) exit() - + elif dataset == 'criteo_dense': + df = prepare_criteo_dense(dataset_folder) else: raise ValueError(f"{dataset} not supported") @@ -450,6 +473,59 @@ def create_tables( exit() + # CRITEO_DENSE FOLLOWS DIFFERENT LOADING INSTRUCTIONS AS + # IT HAS MORE THAN 1600 COLUMNS + if dataset == "criteo_dense": + train = pd.DataFrame({'label': []}) + column_names = list(df.columns) + connection = get_connection(pgsqlconfig) + print("FETCHING TRAIN AND TEST QUERY CRITEO_DENSE") + train_query, test_query = make_query( + dataset, datasetconfig, column_names) + + print("DROPPING TRAIN AND TABLE IF THEY EXIST") + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS " + + datasetconfig["table"]+"_train") + connection.commit() + + with connection.cursor() as cursor: + cursor.execute("DROP TABLE IF EXISTS " + + datasetconfig["table"]+"_test") + connection.commit() + + print("CREATING TABLES FOR CRITEO_DENSE") + with connection.cursor() as cursor: + cursor.execute(train_query) + cursor.execute(test_query) + connection.commit() + + print("LOADING DATA FOR CRITEO_DENSE") + # columns = [i for i in range(datasetconfig['num_features'])] + with connection.cursor() as cur: + train.head() + rows = len(train) + for i in range(rows): + cur.execute("INSERT INTO criteo_dense_train(label,row) VALUES(%s, %s)", (int( + train.loc[i, 'label']), list(train.loc[i, column_names]))) + if i % 10000 == 0: + print(i) + + connection.commit() + print("LOADED "+datasetconfig["table"]+"_train"+" to DB") + + df.head() + rows = len(df) + for i in range(rows): + cur.execute("INSERT INTO criteo_dense_test(label,row) VALUES(%s, %s)", (int( + df.loc[i, 'label']), list(df.loc[i, column_names]))) + if i % 10000 == 0: + print(i) + + connection.commit() + print("LOADED "+datasetconfig["table"]+"_test"+" to DB") + exit() + # Split dataset train_size = math.floor(len(df) * datasetconfig["train"]) train = df.head(train_size) @@ -473,6 +549,7 @@ def create_tables( connection = get_connection(pgsqlconfig) print("FETCHING TRAIN AND TEST QUERY") train_query, test_query = make_query(dataset, datasetconfig, column_names) + print(train_query, test_query) print("CREATING TRAIN AND TEST TABLES") create_tables(connection, train_query, test_query, train_csv_path, test_csv_path, dataset) From 2915057d9293d8b4f2a983754e4772741b5a1613 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 4 Nov 2022 02:15:19 +0000 Subject: [PATCH 2/5] Code Change to Support Criteo Dense --- .../decisionTree/experiments/data_processing.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/model-inference/decisionTree/experiments/data_processing.py b/model-inference/decisionTree/experiments/data_processing.py index 345acb4d..417b9038 100644 --- a/model-inference/decisionTree/experiments/data_processing.py +++ b/model-inference/decisionTree/experiments/data_processing.py @@ -482,38 +482,31 @@ def create_tables( print("FETCHING TRAIN AND TEST QUERY CRITEO_DENSE") train_query, test_query = make_query( dataset, datasetconfig, column_names) - print("DROPPING TRAIN AND TABLE IF THEY EXIST") with connection.cursor() as cursor: cursor.execute("DROP TABLE IF EXISTS " + datasetconfig["table"]+"_train") connection.commit() - with connection.cursor() as cursor: cursor.execute("DROP TABLE IF EXISTS " + datasetconfig["table"]+"_test") connection.commit() - print("CREATING TABLES FOR CRITEO_DENSE") with connection.cursor() as cursor: cursor.execute(train_query) cursor.execute(test_query) connection.commit() - print("LOADING DATA FOR CRITEO_DENSE") - # columns = [i for i in range(datasetconfig['num_features'])] with connection.cursor() as cur: train.head() rows = len(train) for i in range(rows): cur.execute("INSERT INTO criteo_dense_train(label,row) VALUES(%s, %s)", (int( train.loc[i, 'label']), list(train.loc[i, column_names]))) - if i % 10000 == 0: - print(i) - + if i % 100 == 0: + print(f'Written Rows: {i}') connection.commit() print("LOADED "+datasetconfig["table"]+"_train"+" to DB") - df.head() rows = len(df) for i in range(rows): @@ -521,7 +514,6 @@ def create_tables( df.loc[i, 'label']), list(df.loc[i, column_names]))) if i % 10000 == 0: print(i) - connection.commit() print("LOADED "+datasetconfig["table"]+"_test"+" to DB") exit() From 713f6ab00fe262128076753e7ac296c8a2d4f438 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 4 Nov 2022 04:02:51 +0000 Subject: [PATCH 3/5] Criteo Dense Model Testing Code --- model-inference/decisionTree/experiments/data_processing.py | 2 +- model-inference/decisionTree/experiments/model_helper.py | 6 ++++++ model-inference/decisionTree/experiments/test_model.py | 3 ++- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/model-inference/decisionTree/experiments/data_processing.py b/model-inference/decisionTree/experiments/data_processing.py index 417b9038..589be53a 100644 --- a/model-inference/decisionTree/experiments/data_processing.py +++ b/model-inference/decisionTree/experiments/data_processing.py @@ -512,7 +512,7 @@ def create_tables( for i in range(rows): cur.execute("INSERT INTO criteo_dense_test(label,row) VALUES(%s, %s)", (int( df.loc[i, 'label']), list(df.loc[i, column_names]))) - if i % 10000 == 0: + if i % 50 == 0: print(i) connection.commit() print("LOADED "+datasetconfig["table"]+"_test"+" to DB") diff --git a/model-inference/decisionTree/experiments/model_helper.py b/model-inference/decisionTree/experiments/model_helper.py index e32dbc06..85b637c6 100644 --- a/model-inference/decisionTree/experiments/model_helper.py +++ b/model-inference/decisionTree/experiments/model_helper.py @@ -63,6 +63,12 @@ def fetch_data(dataset, config, suffix, time_consume=None): dataframe.drop('row', axis=1, inplace=True) # dataframe['row'] = dataframe['row'].apply(lambda row:np.array(row)) + elif dataset == 'criteo_dense': + unpacked = zip(*list(dataframe['row'].values)) + for i in range(datasetconfig['num_features']): + dataframe[f'feature_{i}'] = next(unpacked) + dataframe.drop('row', axis=1, inplace=True) + # dataframe['row'] = dataframe['row'].apply(lambda row:np.array(row)) end_time = time.time() data_loading_time = calculate_time(start_time, end_time) if time_consume is not None: diff --git a/model-inference/decisionTree/experiments/test_model.py b/model-inference/decisionTree/experiments/test_model.py index 6d7aa777..3117bb06 100644 --- a/model-inference/decisionTree/experiments/test_model.py +++ b/model-inference/decisionTree/experiments/test_model.py @@ -38,7 +38,8 @@ def parse_arguments(config): 'bosch', 'covtype', 'criteo', - 'tpcxai_fraud'], + 'tpcxai_fraud', + 'criteo_dense'], help="Dataset to be tested.") parser.add_argument( "-m", "--model", type=str, From c3ae1ec12521e069ad4804b8509cfa9e9f517987 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 4 Nov 2022 14:47:42 +0000 Subject: [PATCH 4/5] Local Change Storages for Testing --- model-inference/decisionTree/experiments/config.json | 4 ++-- model-inference/decisionTree/experiments/data_processing.py | 2 +- model-inference/decisionTree/experiments/model_helper.py | 6 ++++-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/model-inference/decisionTree/experiments/config.json b/model-inference/decisionTree/experiments/config.json index 008f7729..75a0b244 100644 --- a/model-inference/decisionTree/experiments/config.json +++ b/model-inference/decisionTree/experiments/config.json @@ -1,5 +1,5 @@ { - "num_trees": 1600, + "num_trees": 500, "depth": 8, "pgsqlconfig": { "host": "localhost", @@ -177,7 +177,7 @@ }, "criteo_dense": { "num_features": 1000000, - "rows": 8572, + "rows": 86, "batch_size": 100000, "query_size": 100000, "type": "classification", diff --git a/model-inference/decisionTree/experiments/data_processing.py b/model-inference/decisionTree/experiments/data_processing.py index 589be53a..719eb30b 100644 --- a/model-inference/decisionTree/experiments/data_processing.py +++ b/model-inference/decisionTree/experiments/data_processing.py @@ -247,7 +247,7 @@ def prepare_criteo(dataset_folder): def prepare_criteo_dense(dataset_folder): prepare_criteo(dataset_folder) - read_lines = 5e6 # 6042135 + read_lines = 5e4 # 5e6 # 6042135 num_features = 1000000 test_path = relative2abspath(dataset_folder, "criteo.kaggle2014.svm", "test.txt.svm") if os.path.isfile(test_path): diff --git a/model-inference/decisionTree/experiments/model_helper.py b/model-inference/decisionTree/experiments/model_helper.py index 85b637c6..b616064a 100644 --- a/model-inference/decisionTree/experiments/model_helper.py +++ b/model-inference/decisionTree/experiments/model_helper.py @@ -49,13 +49,15 @@ def fetch_data(dataset, config, suffix, time_consume=None): pgsqlconfig = config["pgsqlconfig"] datasetconfig = config[dataset] query = datasetconfig["query"]+"_"+suffix + # print(query) dbURL = "postgresql://"+pgsqlconfig["username"]+":"+pgsqlconfig["password"] + \ "@"+pgsqlconfig["host"]+":" + \ pgsqlconfig["port"]+"/"+pgsqlconfig["dbname"] - # print(dbURL) - # print(query) + print(dbURL) + print(query) start_time = time.time() dataframe = cx.read_sql(dbURL, query) + print(dataframe.head()) if dataset == 'epsilon': unpacked = zip(*list(dataframe['row'].values)) for i in range(1, 2001): From 6ec35450d35cc08235045a6e454768fdb9c958b7 Mon Sep 17 00:00:00 2001 From: Venkatesh Gunda Date: Tue, 8 Nov 2022 04:38:20 +0000 Subject: [PATCH 5/5] Add Custom Fill Value for Missing Features --- .../decisionTree/experiments/data_processing.py | 6 +++--- .../decisionTree/experiments/model_helper.py | 13 +++++++++++++ 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/model-inference/decisionTree/experiments/data_processing.py b/model-inference/decisionTree/experiments/data_processing.py index 719eb30b..4d9907ce 100644 --- a/model-inference/decisionTree/experiments/data_processing.py +++ b/model-inference/decisionTree/experiments/data_processing.py @@ -2,7 +2,7 @@ import gc import json import pickle -from model_helper import relative2abspath, dataset_folder +from model_helper import relative2abspath, dataset_folder, todense_fill import numpy as np import pandas as pd from urllib.request import urlretrieve @@ -247,14 +247,14 @@ def prepare_criteo(dataset_folder): def prepare_criteo_dense(dataset_folder): prepare_criteo(dataset_folder) - read_lines = 5e4 # 5e6 # 6042135 + read_lines = 1e4 # 5e4 # 5e6 # 6042135 num_features = 1000000 test_path = relative2abspath(dataset_folder, "criteo.kaggle2014.svm", "test.txt.svm") if os.path.isfile(test_path): test_df_features, test_df_labels = datasets.load_svmlight_file(test_path, n_features=num_features, length=read_lines) # print(test_df_features.todense(), test_df_labels[..., np.newaxis]) # print(np.append(test_df_features.todense(), test_df_labels[..., np.newaxis], axis=1).shape) - test_df = pd.DataFrame(np.append(test_df_features.todense(), test_df_labels[..., np.newaxis], axis=1), columns=[f'feature_{idx}' for idx in range(num_features)]+['label']) + test_df = pd.DataFrame(np.append(todense_fill(test_df_features, fill_value=-1), test_df_labels[..., np.newaxis], axis=1), columns=[f'feature_{idx}' for idx in range(num_features)]+['label']) print('Test Dataset Shape:',test_df.shape) # print(test_df.shape) # test_df = test_df.join(pd.DataFrame(test_df_labels, columns=['label'])) diff --git a/model-inference/decisionTree/experiments/model_helper.py b/model-inference/decisionTree/experiments/model_helper.py index b616064a..79a2a48c 100644 --- a/model-inference/decisionTree/experiments/model_helper.py +++ b/model-inference/decisionTree/experiments/model_helper.py @@ -3,6 +3,7 @@ import os import numpy as np import math +from scipy import sparse as sp from sklearn.metrics import classification_report, mean_squared_error dataset_folder = "dataset/" @@ -12,6 +13,18 @@ def calculate_time(start_time, end_time): diff = (end_time-start_time)*1000 return diff +def todense_fill(csr: sp.csr_matrix, fill_value: float) -> np.ndarray: + """Densify a sparse CSR matrix. Same as csr_matrix.todense() + except it fills missing entries with fill_value instead of 0. + """ + dummy_value = np.nan if not np.isnan(fill_value) else np.inf + dummy_check = np.isnan if np.isnan(dummy_value) else np.isinf + csr = csr.copy().astype(float) + csr.data[csr.data == 0] = dummy_value + out = np.array(csr.todense()).squeeze() + out[out == 0] = fill_value + out[dummy_check(out)] = 0 + return out def load_data_from_pickle(dataset, config, suffix, time_consume): start_time = time.time()