From bc951288306591b9d6d299e1edd1101dc5a7f692 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 14:49:49 +0530 Subject: [PATCH 01/20] Update core.py i did changess in feature importance and genrate visuaalization ,also add tensorflow --- explainableai/core.py | 87 ++++++++++++++++++++++++++++++++----------- 1 file changed, 66 insertions(+), 21 deletions(-) diff --git a/explainableai/core.py b/explainableai/core.py index 5a903ea..d80d243 100644 --- a/explainableai/core.py +++ b/explainableai/core.py @@ -1,3 +1,4 @@ +# core.py import colorama from colorama import Fore, Style @@ -12,6 +13,11 @@ from sklearn.impute import SimpleImputer from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline + +# Import TensorFlow +import tensorflow as tf +from tensorflow.keras.wrappers.scikit_learn import KerasClassifier, KerasRegressor + from .visualizations import ( plot_feature_importance, plot_partial_dependence, plot_learning_curve, plot_roc_curve, plot_precision_recall_curve, plot_correlation_heatmap @@ -38,7 +44,8 @@ def __init__(self): self.numerical_columns = None self.gemini_model = initialize_gemini() self.feature_importance = None - self.results = None # Add this line to store analysis results + self.results = None + self.model_type = None # Add this line to store model type def fit(self, models, X, y, feature_names=None): if isinstance(models, dict): @@ -48,7 +55,7 @@ def fit(self, models, X, y, feature_names=None): self.X = X self.y = y self.feature_names = feature_names if feature_names is not None else X.columns.tolist() - self.is_classifier = all(hasattr(model, "predict_proba") for model in self.models.values()) + self._determine_model_type() print(f"{Fore.BLUE}Preprocessing data...{Style.RESET_ALL}") self._preprocess_data() @@ -59,19 +66,52 @@ def fit(self, models, X, y, feature_names=None): # Select the best model based on cv_score best_model_name = max(self.model_comparison_results, key=lambda x: self.model_comparison_results[x]['cv_score']) self.model = self.models[best_model_name] - self.model.fit(self.X, self.y) + if self.model_type == 'tensorflow': + self.model.fit(self.X, self.y, epochs=10, batch_size=32, verbose=0) + else: + self.model.fit(self.X, self.y) return self - + + def _determine_model_type(self): + # Determine if the models are TensorFlow or scikit-learn + model_types = set() + for model in self.models.values(): + if isinstance(model, (tf.keras.Model, KerasClassifier, KerasRegressor)): + model_types.add('tensorflow') + else: + model_types.add('sklearn') + if len(model_types) > 1: + raise ValueError("All models should be of the same type (either all TensorFlow or all scikit-learn).") + self.model_type = model_types.pop() + self.is_classifier = all(self._is_classifier_model(model) for model in self.models.values()) + + def _is_classifier_model(self, model): + if self.model_type == 'tensorflow': + # Assume TensorFlow models output probabilities for classifiers + return model.output_shape[-1] > 1 + else: + return hasattr(model, "predict_proba") + def _compare_models(self): - from sklearn.model_selection import cross_val_score results = {} for name, model in self.models.items(): - cv_scores = cross_val_score(model, self.X, self.y, cv=5, scoring='roc_auc' if self.is_classifier else 'r2') - model.fit(self.X, self.y) - test_score = model.score(self.X, self.y) + if self.model_type == 'tensorflow': + # Use Keras wrappers for cross-validation + if self.is_classifier: + model = KerasClassifier(build_fn=lambda: model, epochs=10, batch_size=32, verbose=0) + else: + model = KerasRegressor(build_fn=lambda: model, epochs=10, batch_size=32, verbose=0) + cv_scores = cross_validate(model, self.X, self.y, is_classifier=self.is_classifier) + test_score = model.score(self.X, self.y) + else: + from sklearn.model_selection import cross_val_score + scoring = 'roc_auc' if self.is_classifier else 'r2' + cv_scores = cross_val_score(model, self.X, self.y, cv=5, scoring=scoring) + model.fit(self.X, self.y) + test_score = model.score(self.X, self.y) results[name] = { - 'cv_score': cv_scores.mean(), + 'cv_score': np.mean(cv_scores), 'test_score': test_score } return results @@ -117,7 +157,7 @@ def analyze(self): results = {} print("Evaluating model performance...") - results['model_performance'] = evaluate_model(self.model, self.X, self.y, self.is_classifier) + results['model_performance'] = evaluate_model(self.model, self.X, self.y, self.is_classifier, self.model_type) print("Calculating feature importance...") self.feature_importance = self._calculate_feature_importance() @@ -127,10 +167,10 @@ def analyze(self): self._generate_visualizations(self.feature_importance) print("Calculating SHAP values...") - results['shap_values'] = calculate_shap_values(self.model, self.X, self.feature_names) + results['shap_values'] = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) print("Performing cross-validation...") - mean_score, std_score = cross_validate(self.model, self.X, self.y) + mean_score, std_score = cross_validate(self.model, self.X, self.y, is_classifier=self.is_classifier, model_type=self.model_type) results['cv_scores'] = (mean_score, std_score) print("Model comparison results:") @@ -224,20 +264,25 @@ def explain_prediction(self, input_data): explanation = get_prediction_explanation(self.gemini_model, input_data, prediction[0], probabilities[0], self.feature_importance) return prediction[0], probabilities[0], explanation - def _calculate_feature_importance(self): - perm_importance = permutation_importance(self.model, self.X, self.y, n_repeats=10, random_state=42) - feature_importance = {feature: importance for feature, importance in zip(self.feature_names, perm_importance.importances_mean)} - return dict(sorted(feature_importance.items(), key=lambda item: abs(item[1]), reverse=True)) + def _calculate_feature_importance(self): + if self.model_type == 'tensorflow': + # For TensorFlow models, use SHAP values as feature importance + shap_values = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) + feature_importance = np.mean(np.abs(shap_values.values), axis=0) + feature_importance_dict = {feature: importance for feature, importance in zip(self.feature_names, feature_importance)} + else: + perm_importance = permutation_importance(self.model, self.X, self.y, n_repeats=10, random_state=42) + feature_importance_dict = {feature: importance for feature, importance in zip(self.feature_names, perm_importance.importances_mean)} + return dict(sorted(feature_importance_dict.items(), key=lambda item: abs(item[1]), reverse=True)) def _generate_visualizations(self, feature_importance): plot_feature_importance(feature_importance) - plot_partial_dependence(self.model, self.X, feature_importance, self.feature_names) - plot_learning_curve(self.model, self.X, self.y) + plot_partial_dependence(self.model, self.X, feature_importance, self.feature_names, self.model_type) + plot_learning_curve(self.model, self.X, self.y, self.is_classifier, self.model_type) plot_correlation_heatmap(pd.DataFrame(self.X, columns=self.feature_names)) if self.is_classifier: - plot_roc_curve(self.model, self.X, self.y) - plot_precision_recall_curve(self.model, self.X, self.y) - + plot_roc_curve(self.model, self.X, self.y, self.model_type) + plot_precision_recall_curve(self.model, self.X, self.y, self.model_type) def _print_results(self, results): print("\nModel Performance:") for metric, value in results['model_performance'].items(): From f80f433cc3283890f0e6902bb0e63555814cf42b Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:01:08 +0530 Subject: [PATCH 02/20] Update core.py download scikeras library for using Keras Classifier and keras Regressor --- explainableai/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/explainableai/core.py b/explainableai/core.py index d80d243..8088b75 100644 --- a/explainableai/core.py +++ b/explainableai/core.py @@ -16,7 +16,8 @@ # Import TensorFlow import tensorflow as tf -from tensorflow.keras.wrappers.scikit_learn import KerasClassifier, KerasRegressor +!pip install scikeras +from scikeras.wrappers import KerasClassifier, KerasRegressor from .visualizations import ( plot_feature_importance, plot_partial_dependence, plot_learning_curve, From 78805ecb4fc16fda108a7eafebc30587bd498ba3 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:06:44 +0530 Subject: [PATCH 03/20] Update model_interpretability.py SHAP Calculation for TensorFlow Models: Modified calculate_shap_values to use shap.DeepExplainer for TensorFlow models. Handling SHAP Output: Ensured that the output from shap.DeepExplainer is correctly wrapped in a shap.Explanation object for consistency. --- explainableai/model_interpretability.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/explainableai/model_interpretability.py b/explainableai/model_interpretability.py index 321a4c2..ac45fbd 100644 --- a/explainableai/model_interpretability.py +++ b/explainableai/model_interpretability.py @@ -5,11 +5,21 @@ import matplotlib.pyplot as plt import numpy as np - -def calculate_shap_values(model, X): - explainer = shap.Explainer(model, X) - shap_values = explainer(X) - return shap_values +def calculate_shap_values(model, X, feature_names, model_type): + if model_type == 'tensorflow': + # Use DeepExplainer for TensorFlow models + background = X[np.random.choice(X.shape[0], 100, replace=False)] + explainer = shap.DeepExplainer(model, background) + shap_values = explainer.shap_values(X) + # For classifiers, shap_values is a list + if isinstance(shap_values, list): + shap_values = shap_values[0] + return shap.Explanation(values=shap_values, data=X, feature_names=feature_names) + else: + # Original SHAP calculation for scikit-learn models + explainer = shap.Explainer(model, X) + shap_values = explainer(X) + return shap_values def plot_shap_summary(shap_values, X): try: @@ -31,6 +41,9 @@ def plot_shap_summary(shap_values, X): print(f"Alternative SHAP visualization also failed: {e2}") print("Skipping SHAP summary plot.") + + + def get_lime_explanation(model, X, instance, feature_names): explainer = lime.lime_tabular.LimeTabularExplainer( X, @@ -65,4 +78,4 @@ def plot_ice_curve(model, X, feature, num_ice_lines=50): plt.title(f'ICE Plot for {feature}') plt.tight_layout() plt.savefig(f'ice_plot_{feature}.png') - plt.close() \ No newline at end of file + plt.close() From bd614b2fee7222cc63e34de4dcb1b12c95720836 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:19:48 +0530 Subject: [PATCH 04/20] Update utils.py Handling Predictions from TensorFlow Models: TensorFlow models return predictions differently. For regression, they output continuous values. For classification, they might output probabilities or logits. We adjust y_pred accordingly, converting probabilities to class labels when necessary. Model Type Parameter: Added model_type parameter to indicate whether the model is a TensorFlow or scikit-learn model. Default is 'sklearn' for backward compatibility. Classification Handling: For binary classification, we threshold probabilities at 0.5. For multi-class classification, we use np.argmax to get class labels. --- explainableai/utils.py | 83 +++++++++++++++++++++++++++++++++--------- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/explainableai/utils.py b/explainableai/utils.py index 8cb695a..c2a1dae 100644 --- a/explainableai/utils.py +++ b/explainableai/utils.py @@ -1,29 +1,78 @@ + +import numpy as np +import pandas as pd +import tensorflow as tf from sklearn.metrics import mean_squared_error, r2_score, accuracy_score, f1_score from sklearn.inspection import permutation_importance -import numpy as np +import shap -def explain_model(model, X_train, y_train, X_test, y_test, feature_names): - result = permutation_importance(model, X_test, y_test, n_repeats=10, random_state=42, n_jobs=-1) - feature_importance = {feature: importance for feature, importance in zip(feature_names, result.importances_mean)} - - # Sort feature importance by absolute value - feature_importance = dict(sorted(feature_importance.items(), key=lambda item: abs(item[1]), reverse=True)) - - return { - "feature_importance": feature_importance, - "model_type": str(type(model)), - } -def calculate_metrics(model, X_test, y_test): - y_pred = model.predict(X_test) +def calculate_metrics(model, X_test, y_test, model_type='sklearn'): + if model_type == 'tensorflow': + y_pred = model.predict(X_test) + # Flatten predictions if necessary + if y_pred.ndim > 1 and y_pred.shape[1] == 1: + y_pred = y_pred.flatten() + else: + y_pred = model.predict(X_test) - if len(np.unique(y_test)) == 2: # Binary classification + if len(np.unique(y_test)) == 2: + # Binary classification + if model_type == 'tensorflow': + y_pred = (y_pred > 0.5).astype(int) return { "accuracy": accuracy_score(y_test, y_pred), "f1_score": f1_score(y_test, y_pred, average='weighted') } - else: # Regression or multi-class classification + elif y_pred.ndim > 1 and y_pred.shape[1] > 1: + # Multi-class classification + if model_type == 'tensorflow': + y_pred = np.argmax(y_pred, axis=1) + return { + "accuracy": accuracy_score(y_test, y_pred), + "f1_score": f1_score(y_test, y_pred, average='weighted') + } + else: + # Regression return { "mse": mean_squared_error(y_test, y_pred), "r2": r2_score(y_test, y_pred) - } \ No newline at end of file + } + +def explain_model(model, X_train, y_train, X_test, y_test, feature_names, model_type='sklearn'): + if model_type == 'tensorflow': + background = X_train[np.random.choice(X_train.shape[0], 100, replace=False)] + explainer = shap.DeepExplainer(model, background) + shap_values = explainer.shap_values(X_test) + if isinstance(shap_values, list): + shap_values = shap_values[0] + feature_importance_values = np.mean(np.abs(shap_values), axis=0) + feature_importance = {feature: importance for feature, importance in zip(feature_names, feature_importance_values)} + else: + result = permutation_importance(model, X_test, y_test, n_repeats=10, random_state=42) + feature_importance = {feature: importance for feature, importance in zip(feature_names, result.importances_mean)} + + feature_importance = dict(sorted(feature_importance.items(), key=lambda item: abs(item[1]), reverse=True)) + return { + "feature_importance": feature_importance, + "model_type": str(type(model)), + } + +def explain_model(model, X_train, y_train, X_test, y_test, feature_names, model_type='sklearn'): + if model_type == 'tensorflow': + background = X_train[np.random.choice(X_train.shape[0], 100, replace=False)] + explainer = shap.DeepExplainer(model, background) + shap_values = explainer.shap_values(X_test) + if isinstance(shap_values, list): + shap_values = shap_values[0] + feature_importance_values = np.mean(np.abs(shap_values), axis=0) + feature_importance = {feature: importance for feature, importance in zip(feature_names, feature_importance_values)} + else: + result = permutation_importance(model, X_test, y_test, n_repeats=10, random_state=42) + feature_importance = {feature: importance for feature, importance in zip(feature_names, result.importances_mean)} + + feature_importance = dict(sorted(feature_importance.items(), key=lambda item: abs(item[1]), reverse=True)) + return { + "feature_importance": feature_importance, + "model_type": str(type(model)), + } From 95b0ffd15c671a9ee6fec6d59b24bef30be5caac Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:21:17 +0530 Subject: [PATCH 05/20] Update utils.py --- explainableai/utils.py | 19 +------------------ 1 file changed, 1 insertion(+), 18 deletions(-) diff --git a/explainableai/utils.py b/explainableai/utils.py index c2a1dae..0c77bbd 100644 --- a/explainableai/utils.py +++ b/explainableai/utils.py @@ -58,21 +58,4 @@ def explain_model(model, X_train, y_train, X_test, y_test, feature_names, model_ "model_type": str(type(model)), } -def explain_model(model, X_train, y_train, X_test, y_test, feature_names, model_type='sklearn'): - if model_type == 'tensorflow': - background = X_train[np.random.choice(X_train.shape[0], 100, replace=False)] - explainer = shap.DeepExplainer(model, background) - shap_values = explainer.shap_values(X_test) - if isinstance(shap_values, list): - shap_values = shap_values[0] - feature_importance_values = np.mean(np.abs(shap_values), axis=0) - feature_importance = {feature: importance for feature, importance in zip(feature_names, feature_importance_values)} - else: - result = permutation_importance(model, X_test, y_test, n_repeats=10, random_state=42) - feature_importance = {feature: importance for feature, importance in zip(feature_names, result.importances_mean)} - - feature_importance = dict(sorted(feature_importance.items(), key=lambda item: abs(item[1]), reverse=True)) - return { - "feature_importance": feature_importance, - "model_type": str(type(model)), - } + From 4677b11659bbeda812b36783ab52b8aeaa9983d0 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:23:39 +0530 Subject: [PATCH 06/20] Update core.py --- explainableai/core.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/explainableai/core.py b/explainableai/core.py index 8088b75..6738c9b 100644 --- a/explainableai/core.py +++ b/explainableai/core.py @@ -265,7 +265,7 @@ def explain_prediction(self, input_data): explanation = get_prediction_explanation(self.gemini_model, input_data, prediction[0], probabilities[0], self.feature_importance) return prediction[0], probabilities[0], explanation - def _calculate_feature_importance(self): + def _calculate_feature_importance(self): if self.model_type == 'tensorflow': # For TensorFlow models, use SHAP values as feature importance shap_values = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) From 9755950f866a8ff9dd9108b4211ae316a51b24d5 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 18:00:52 +0530 Subject: [PATCH 07/20] Update core.py please install scikeras libarary --- explainableai/core.py | 1 - 1 file changed, 1 deletion(-) diff --git a/explainableai/core.py b/explainableai/core.py index 37bceb3..e54d341 100644 --- a/explainableai/core.py +++ b/explainableai/core.py @@ -16,7 +16,6 @@ # Import TensorFlow import tensorflow as tf -!pip install scikeras from scikeras.wrappers import KerasClassifier, KerasRegressor from .visualizations import ( From 387486d92e51cb01bc840ab690b9186ce1861bc6 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Wed, 2 Oct 2024 18:17:38 +0530 Subject: [PATCH 08/20] Update requirements.txt --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index bb24654..d427f37 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,3 +14,5 @@ scipy pillow xgboost colorama +tensorflow +scikeras From 11ba5d7812f324537088d5a86adeb1f991407568 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Thu, 3 Oct 2024 19:56:47 +0530 Subject: [PATCH 09/20] Update requirements.txt --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index d427f37..fa79056 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,5 +14,4 @@ scipy pillow xgboost colorama -tensorflow -scikeras + From 83194d60d12d8b16b8eda6ecc281de9729dcc802 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 15:41:07 +0530 Subject: [PATCH 10/20] Update requirements.txt --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index fa79056..538b4b3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -14,4 +14,6 @@ scipy pillow xgboost colorama +scikeras +tensorflow From b68aaf091412a968c307043c180031724daff53d Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:08:54 +0530 Subject: [PATCH 11/20] Update model_interpretability.py --- explainableai/model_interpretability.py | 46 ++++++++++++++----------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/explainableai/model_interpretability.py b/explainableai/model_interpretability.py index 709c939..b0682c5 100644 --- a/explainableai/model_interpretability.py +++ b/explainableai/model_interpretability.py @@ -1,5 +1,10 @@ -# Initialize colorama +# model_interpretability.py + +# Import colorama and its components import colorama +from colorama import Fore, Style + +# Initialize colorama colorama.init(autoreset=True) import pandas as pd @@ -64,10 +69,10 @@ def fit(self, models, X, y, feature_names=None): self.feature_names = feature_names if feature_names is not None else X.columns.tolist() self._determine_model_type() - logger.info(f"{colorama.Fore.BLUE}Preprocessing data...{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.BLUE}Preprocessing data...{Style.RESET_ALL}") self._preprocess_data() - logger.info(f"{colorama.Fore.BLUE}Fitting models and analyzing...{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.BLUE}Fitting models and analyzing...{Style.RESET_ALL}") self.model_comparison_results = self._compare_models() # Select the best model based on cv_score @@ -283,12 +288,12 @@ def generate_report(self, filename='xai_report.pdf'): 'llm_explanation': self._generate_llm_explanation } - if input("Do you want all sections in the xai_report? (y/n) ").strip().lower() in ['y', 'yes']: + if input("Do you want all sections in the XAI report? (y/n) ").strip().lower() in ['y', 'yes']: for section_func in sections.values(): section_func(report) else: for section, section_func in sections.items(): - if input(f"Do you want {section} in xai_report? (y/n) ").strip().lower() in ['y', 'yes']: + if input(f"Do you want {section} in the XAI report? (y/n) ").strip().lower() in ['y', 'yes']: section_func(report) report.generate() @@ -501,22 +506,22 @@ def _print_results(self, results): def perform_eda(df): logger.debug("Performing exploratory data analysis...") try: - logger.info(f"{colorama.Fore.CYAN}Exploratory Data Analysis:{colorama.Style.RESET_ALL}") - logger.info(f"{colorama.Fore.GREEN}Dataset shape: {df.shape}{colorama.Style.RESET_ALL}") - logger.info(f"{colorama.Fore.CYAN}Dataset info:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Exploratory Data Analysis:{Style.RESET_ALL}") + logger.info(f"{Fore.GREEN}Dataset shape: {df.shape}{Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Dataset info:{Style.RESET_ALL}") df.info() - logger.info(f"{colorama.Fore.CYAN}Summary statistics:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Summary statistics:{Style.RESET_ALL}") logger.info(df.describe()) - logger.info(f"{colorama.Fore.CYAN}Missing values:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Missing values:{Style.RESET_ALL}") logger.info(df.isnull().sum()) - logger.info(f"{colorama.Fore.CYAN}Data types:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Data types:{Style.RESET_ALL}") logger.info(df.dtypes) - logger.info(f"{colorama.Fore.CYAN}Unique values in each column:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Unique values in each column:{Style.RESET_ALL}") for col in df.columns: - logger.info(f"{colorama.Fore.GREEN}{col}: {df[col].nunique()}{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.GREEN}{col}: {df[col].nunique()}{Style.RESET_ALL}") # Additional EDA steps - logger.info(f"{colorama.Fore.CYAN}Correlation matrix:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Correlation matrix:{Style.RESET_ALL}") corr_matrix = df.select_dtypes(include=[np.number]).corr() logger.info(corr_matrix) @@ -528,24 +533,25 @@ def perform_eda(df): if x != y and x < y ] if high_corr_list: - logger.info(f"{colorama.Fore.YELLOW}Highly correlated features:{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.YELLOW}Highly correlated features:{Style.RESET_ALL}") for feat1, feat2 in high_corr_list: - logger.info(f"{colorama.Fore.GREEN}{feat1} - {feat2}: {corr_matrix.loc[feat1, feat2]:.2f}{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.GREEN}{feat1} - {feat2}: {corr_matrix.loc[feat1, feat2]:.2f}{Style.RESET_ALL}") # Identify potential outliers - logger.info(f"{colorama.Fore.CYAN}Potential outliers (values beyond 3 standard deviations):{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Potential outliers (values beyond 3 standard deviations):{Style.RESET_ALL}") numeric_cols = df.select_dtypes(include=[np.number]).columns for col in numeric_cols: mean = df[col].mean() std = df[col].std() outliers = df[(df[col] < mean - 3 * std) | (df[col] > mean + 3 * std)] if not outliers.empty: - logger.info(f"{colorama.Fore.GREEN}{col}: {len(outliers)} potential outliers{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.GREEN}{col}: {len(outliers)} potential outliers{Style.RESET_ALL}") # Class distribution for the target variable (assuming last column is target) target_col = df.columns[-1] - logger.info(f"{colorama.Fore.CYAN}Class distribution for target variable '{target_col}':{colorama.Style.RESET_ALL}") + logger.info(f"{Fore.CYAN}Class distribution for target variable '{target_col}':{Style.RESET_ALL}") logger.info(df[target_col].value_counts(normalize=True)) except Exception as e: logger.error(f"Error occurred during exploratory data analysis: {str(e)}") - raise + raise + From 3d39831d206cbc9b1a89061020b38a29f73e7fff Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:11:27 +0530 Subject: [PATCH 12/20] Update core.py --- explainableai/core.py | 419 +++++++++++++++++++++++++++--------------- 1 file changed, 269 insertions(+), 150 deletions(-) diff --git a/explainableai/core.py b/explainableai/core.py index 1d2dcbd..cadb65c 100644 --- a/explainableai/core.py +++ b/explainableai/core.py @@ -1,9 +1,15 @@ +# core.py + +# Import colorama and its components +import colorama +from colorama import Fore, Style + # Initialize colorama colorama.init(autoreset=True) import pandas as pd import numpy as np -from sklearn.model_selection import train_test_split +from sklearn.model_selection import train_test_split, cross_val_score from sklearn.inspection import permutation_importance from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder from sklearn.impute import SimpleImputer @@ -26,7 +32,6 @@ from .model_selection import compare_models from reportlab.platypus import PageBreak import logging -from sklearn.model_selection import cross_val_score logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) @@ -34,6 +39,7 @@ class XAIWrapper: def __init__(self): self.model = None + self.models = {} self.X = None self.y = None self.feature_names = None @@ -48,12 +54,16 @@ def __init__(self): self.model_type = None # To store model type def fit(self, models, X, y, feature_names=None): - logger.debug("Fitting the model...") + logger.debug("Starting the fit process...") try: + # Initialize models if isinstance(models, dict): self.models = models + logger.debug("Initialized models from dictionary input.") else: self.models = {'Model': models} + logger.debug("Initialized single model.") + self.X = X self.y = y self.feature_names = feature_names if feature_names is not None else X.columns.tolist() @@ -66,62 +76,87 @@ def fit(self, models, X, y, feature_names=None): self.model_comparison_results = self._compare_models() # Select the best model based on cv_score - best_model_name = max(self.model_comparison_results, key=lambda x: self.model_comparison_results[x]['cv_score']) + best_model_name = max( + self.model_comparison_results, + key=lambda x: self.model_comparison_results[x]['cv_score'] + ) self.model = self.models[best_model_name] - + logger.info(f"Selected best model: {best_model_name} with CV Score: {self.model_comparison_results[best_model_name]['cv_score']:.4f}") + + # Fit the selected model if self.model_type == 'tensorflow': + logger.info("Fitting TensorFlow model...") self.model.fit(self.X, self.y, epochs=10, batch_size=32, verbose=0) else: + logger.info("Fitting scikit-learn model...") self.model.fit(self.X, self.y) - logger.info("Model fitting is complete...") + logger.info("Model fitting is complete.") return self except Exception as e: logger.error(f"An error occurred while fitting the models: {str(e)}") raise def _determine_model_type(self): - # Determine if the models are TensorFlow or scikit-learn - model_types = set() - for model in self.models.values(): - if isinstance(model, (tf.keras.Model, KerasClassifier, KerasRegressor)): - model_types.add('tensorflow') + logger.debug("Determining model type...") + try: + model_types = set() + for model in self.models.values(): + if isinstance(model, (tf.keras.Model, KerasClassifier, KerasRegressor)): + model_types.add('tensorflow') + else: + model_types.add('sklearn') + if len(model_types) > 1: + raise ValueError("All models should be of the same type (either all TensorFlow or all scikit-learn).") + self.model_type = model_types.pop() + logger.debug(f"Detected model type: {self.model_type}") + + # Determine if models are classifiers + if self.model_type == 'tensorflow': + # Assume TensorFlow models output probabilities for classifiers + self.is_classifier = all( + model.output_shape[-1] > 1 for model in self.models.values() + ) else: - model_types.add('sklearn') - if len(model_types) > 1: - raise ValueError("All models should be of the same type (either all TensorFlow or all scikit-learn).") - self.model_type = model_types.pop() - self.is_classifier = all(self._is_classifier_model(model) for model in self.models.values()) - - def _is_classifier_model(self, model): - if self.model_type == 'tensorflow': - # Assume TensorFlow models output probabilities for classifiers - return model.output_shape[-1] > 1 - else: - return hasattr(model, "predict_proba") + self.is_classifier = all(hasattr(model, "predict_proba") for model in self.models.values()) + logger.debug(f"Is classifier: {self.is_classifier}") + except Exception as e: + logger.error(f"Error determining model type: {str(e)}") + raise def _compare_models(self): - logger.debug("Comparing the models...") + logger.debug("Comparing models...") try: results = {} for name, model in self.models.items(): + logger.debug(f"Evaluating model: {name}") if self.model_type == 'tensorflow': - # Use Keras wrappers for cross-validation + # Wrap TensorFlow models for scikit-learn compatibility if self.is_classifier: wrapped_model = KerasClassifier(build_fn=lambda: model, epochs=10, batch_size=32, verbose=0) else: wrapped_model = KerasRegressor(build_fn=lambda: model, epochs=10, batch_size=32, verbose=0) - cv_scores = cross_validate(wrapped_model, self.X, self.y, is_classifier=self.is_classifier, model_type=self.model_type) + + cv_scores = cross_validate( + wrapped_model, + self.X, + self.y, + is_classifier=self.is_classifier, + model_type=self.model_type + ) test_score = wrapped_model.score(self.X, self.y) else: + # Determine scoring metric scoring = 'roc_auc' if self.is_classifier else 'r2' cv_scores = cross_val_score(model, self.X, self.y, cv=5, scoring=scoring) model.fit(self.X, self.y) test_score = model.score(self.X, self.y) + results[name] = { 'cv_score': np.mean(cv_scores), 'test_score': test_score } + logger.debug(f"Model {name}: CV Score = {results[name]['cv_score']:.4f}, Test Score = {results[name]['test_score']:.4f}") logger.info("Model comparison completed successfully.") return results except Exception as e: @@ -129,131 +164,175 @@ def _compare_models(self): raise def _preprocess_data(self): - # Identify categorical and numerical columns - self.categorical_columns = self.X.select_dtypes(include=['object', 'category']).columns - self.numerical_columns = self.X.select_dtypes(include=['int64', 'float64']).columns - - # Create preprocessing steps - logger.debug("Creating preprocessing steps...") - numeric_transformer = Pipeline(steps=[ - ('imputer', SimpleImputer(strategy='mean')), - ('scaler', StandardScaler()) - ]) - - categorical_transformer = Pipeline(steps=[ - ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), - ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) - ]) - - self.preprocessor = ColumnTransformer( - transformers=[ - ('num', numeric_transformer, self.numerical_columns), - ('cat', categorical_transformer, self.categorical_columns) + logger.debug("Preprocessing data...") + try: + # Identify categorical and numerical columns + self.categorical_columns = self.X.select_dtypes(include=['object', 'category']).columns + self.numerical_columns = self.X.select_dtypes(include=['int64', 'float64']).columns + logger.debug(f"Categorical columns: {list(self.categorical_columns)}") + logger.debug(f"Numerical columns: {list(self.numerical_columns)}") + + # Create preprocessing pipelines + logger.debug("Creating preprocessing pipelines...") + numeric_transformer = Pipeline(steps=[ + ('imputer', SimpleImputer(strategy='mean')), + ('scaler', StandardScaler()) ]) - logger.info("Preprocessing setup completed.") - # Fit and transform the data - logger.debug("Fitting and transforming the data...") - self.X = self.preprocessor.fit_transform(self.X) + categorical_transformer = Pipeline(steps=[ + ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), + ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) + ]) - # Update feature names after preprocessing - logger.debug("Updating feature names...") - try: - num_feature_names = self.numerical_columns.tolist() - cat_feature_names = [] - if len(self.categorical_columns) > 0: - cat_feature_names = self.preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(self.categorical_columns).tolist() - self.feature_names = num_feature_names + cat_feature_names - - # Encode target variable if it's categorical - if self.is_classifier and pd.api.types.is_categorical_dtype(self.y): - self.label_encoder = LabelEncoder() - self.y = self.label_encoder.fit_transform(self.y) + self.preprocessor = ColumnTransformer( + transformers=[ + ('num', numeric_transformer, self.numerical_columns), + ('cat', categorical_transformer, self.categorical_columns) + ] + ) + logger.info("Preprocessing pipelines created.") + + # Fit and transform the data + logger.debug("Fitting and transforming the data...") + self.X = self.preprocessor.fit_transform(self.X) + logger.info("Data preprocessing completed.") + + # Update feature names after preprocessing + logger.debug("Updating feature names post-preprocessing...") + try: + num_feature_names = self.numerical_columns.tolist() + cat_feature_names = [] + if len(self.categorical_columns) > 0: + cat_feature_names = self.preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(self.categorical_columns).tolist() + self.feature_names = num_feature_names + cat_feature_names + logger.debug(f"Updated feature names: {self.feature_names}") + + # Encode target variable if it's categorical + if self.is_classifier and pd.api.types.is_categorical_dtype(self.y): + self.label_encoder = LabelEncoder() + self.y = self.label_encoder.fit_transform(self.y) + logger.debug("Encoded target variable using LabelEncoder.") + except Exception as e: + logger.error(f"Error updating feature names: {str(e)}") + raise except Exception as e: - logger.error(f"An error occurred while updating feature names: {str(e)}") + logger.error(f"Error during data preprocessing: {str(e)}") raise def analyze(self): logger.debug("Starting analysis...") results = {} - - logger.info("Evaluating model performance...") - results['model_performance'] = evaluate_model(self.model, self.X, self.y, self.is_classifier, self.model_type) - - logger.info("Calculating feature importance...") - self.feature_importance = self._calculate_feature_importance() - results['feature_importance'] = self.feature_importance - - logger.info("Generating visualizations...") - self._generate_visualizations(self.feature_importance) - - logger.info("Calculating SHAP values...") - results['shap_values'] = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) - - logger.info("Performing cross-validation...") - mean_score, std_score = cross_validate(self.model, self.X, self.y, is_classifier=self.is_classifier, model_type=self.model_type) - results['cv_scores'] = (mean_score, std_score) - - logger.info("Model comparison results:") - results['model_comparison'] = self.model_comparison_results - - self._print_results(results) - - logger.info("Generating LLM explanation...") - results['llm_explanation'] = get_llm_explanation(self.gemini_model, results) - - self.results = results - return results + try: + # Evaluate model performance + logger.info("Evaluating model performance...") + results['model_performance'] = evaluate_model( + self.model, self.X, self.y, self.is_classifier, self.model_type + ) + + # Calculate feature importance + logger.info("Calculating feature importance...") + self.feature_importance = self._calculate_feature_importance() + results['feature_importance'] = self.feature_importance + + # Generate visualizations + logger.info("Generating visualizations...") + self._generate_visualizations(self.feature_importance) + + # Calculate SHAP values + logger.info("Calculating SHAP values...") + results['shap_values'] = calculate_shap_values( + self.model, self.X, self.feature_names, self.model_type + ) + + # Perform cross-validation + logger.info("Performing cross-validation...") + mean_score, std_score = cross_validate( + self.model, self.X, self.y, + is_classifier=self.is_classifier, + model_type=self.model_type + ) + results['cv_scores'] = (mean_score, std_score) + + # Add model comparison results + logger.info("Adding model comparison results...") + results['model_comparison'] = self.model_comparison_results + + # Print results + self._print_results(results) + + # Generate LLM explanation + logger.info("Generating LLM explanation...") + results['llm_explanation'] = get_llm_explanation(self.gemini_model, results) + + self.results = results + logger.debug("Analysis completed successfully.") + return results + except Exception as e: + logger.error(f"An error occurred during analysis: {str(e)}") + raise def generate_report(self, filename='xai_report.pdf'): + logger.debug("Generating report...") if self.results is None: raise ValueError("No analysis results available. Please run analyze() first.") - report = ReportGenerator(filename) - report.add_heading("Explainable AI Report") - - sections = { - 'model_comparison': self._generate_model_comparison, - 'model_performance': self._generate_model_performance, - 'feature_importance': self._generate_feature_importance, - 'visualization': self._generate_visualization, - 'llm_explanation': self._generate_llm_explanation - } - - if input("Do you want all sections in the xai_report? (y/n) ").lower() in ['y', 'yes']: - for section_func in sections.values(): - section_func(report) - else: - for section, section_func in sections.items(): - if input(f"Do you want {section} in xai_report? (y/n) ").lower() in ['y', 'yes']: + try: + report = ReportGenerator(filename) + report.add_heading("Explainable AI Report") + + sections = { + 'model_comparison': self._generate_model_comparison, + 'model_performance': self._generate_model_performance, + 'feature_importance': self._generate_feature_importance, + 'visualization': self._generate_visualization, + 'llm_explanation': self._generate_llm_explanation + } + + if input("Do you want all sections in the XAI report? (y/n) ").strip().lower() in ['y', 'yes']: + for section_func in sections.values(): section_func(report) + else: + for section, section_func in sections.items(): + if input(f"Do you want {section} in the XAI report? (y/n) ").strip().lower() in ['y', 'yes']: + section_func(report) - report.generate() + report.generate() + logger.info(f"Report generated successfully and saved as '{filename}'.") + except Exception as e: + logger.error(f"An error occurred while generating the report: {str(e)}") + raise def _generate_model_comparison(self, report): + logger.debug("Adding model comparison section to report...") report.add_heading("Model Comparison", level=2) model_comparison_data = [["Model", "CV Score", "Test Score"]] + [ [model, f"{scores['cv_score']:.4f}", f"{scores['test_score']:.4f}"] for model, scores in self.results['model_comparison'].items() ] report.add_table(model_comparison_data) + logger.debug("Model comparison section added.") def _generate_model_performance(self, report): + logger.debug("Adding model performance section to report...") report.add_heading("Model Performance", level=2) for metric, value in self.results['model_performance'].items(): if isinstance(value, (int, float, np.float64)): report.add_paragraph(f"**{metric}:** {value:.4f}") else: report.add_paragraph(f"**{metric}:**\n{value}") + logger.debug("Model performance section added.") def _generate_feature_importance(self, report): + logger.debug("Adding feature importance section to report...") report.add_heading("Feature Importance", level=2) feature_importance_data = [["Feature", "Importance"]] + [ [feature, f"{importance:.4f}"] for feature, importance in self.feature_importance.items() ] report.add_table(feature_importance_data) + logger.debug("Feature importance section added.") def _generate_visualization(self, report): + logger.debug("Adding visualizations section to report...") report.add_heading("Visualizations", level=2) visualization_files = [ 'feature_importance.png', 'partial_dependence.png', @@ -265,77 +344,102 @@ def _generate_visualization(self, report): for image in visualization_files: report.add_image(image) report.content.append(PageBreak()) + logger.debug("Visualizations section added.") def _generate_llm_explanation(self, report): + logger.debug("Adding LLM explanation section to report...") report.add_heading("LLM Explanation", level=2) report.add_llm_explanation(self.results['llm_explanation']) + logger.debug("LLM explanation section added.") def predict(self, X): - logger.debug("Making predictions...") + logger.debug("Starting prediction...") try: if self.model is None: raise ValueError("Model has not been fitted. Please run fit() first.") - X = self._preprocess_input(X) + X_preprocessed = self._preprocess_input(X) if self.is_classifier: - prediction = self.model.predict(X) - probabilities = self.model.predict_proba(X) + prediction = self.model.predict(X_preprocessed) + probabilities = self.model.predict_proba(X_preprocessed) if self.label_encoder: prediction = self.label_encoder.inverse_transform(prediction) - logger.info("Prediction completed.") + logger.info("Prediction completed successfully.") return prediction, probabilities else: - prediction = self.model.predict(X) - logger.info("Prediction completed.") + prediction = self.model.predict(X_preprocessed) + logger.info("Prediction completed successfully.") return prediction except Exception as e: logger.error(f"Error during prediction: {str(e)}") raise def _preprocess_input(self, X): - # Ensure X is a DataFrame - logger.debug("Preprocessing input data...") + logger.debug("Preprocessing input data for prediction...") try: if not isinstance(X, pd.DataFrame): X = pd.DataFrame(X, columns=self.feature_names) - + logger.debug("Converted input to DataFrame.") + # Apply the same preprocessing as during training - X = self.preprocessor.transform(X) - logger.info("Input data preprocessing completed.") - - return X + X_preprocessed = self.preprocessor.transform(X) + logger.debug("Input data preprocessed successfully.") + return X_preprocessed except Exception as e: logger.error(f"Error during input preprocessing: {str(e)}") raise def explain_prediction(self, input_data): logger.debug("Generating prediction explanation...") - input_df = pd.DataFrame([input_data]) - prediction, probabilities = self.predict(input_df) - explanation = get_prediction_explanation( - self.gemini_model, - input_data, - prediction[0], - probabilities[0], - self.feature_importance - ) - logger.info("Prediction explanation generated.") - return prediction[0], probabilities[0], explanation + try: + input_df = pd.DataFrame([input_data]) + prediction, probabilities = self.predict(input_df) + explanation = get_prediction_explanation( + self.gemini_model, + input_data, + prediction[0], + probabilities[0], + self.feature_importance + ) + logger.info("Prediction explanation generated successfully.") + return prediction[0], probabilities[0], explanation + except Exception as e: + logger.error(f"Error during prediction explanation: {str(e)}") + raise def _calculate_feature_importance(self): logger.debug("Calculating feature importance...") try: if self.model_type == 'tensorflow': - # For TensorFlow models, use SHAP values as feature importance - shap_values = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) + logger.debug("Calculating SHAP values for TensorFlow model...") + shap_values = calculate_shap_values( + self.model, self.X, self.feature_names, self.model_type + ) feature_importance = np.mean(np.abs(shap_values.values), axis=0) - feature_importance_dict = {feature: importance for feature, importance in zip(self.feature_names, feature_importance)} + feature_importance_dict = { + feature: importance + for feature, importance in zip(self.feature_names, feature_importance) + } + logger.debug("SHAP-based feature importance calculated.") else: - perm_importance = permutation_importance(self.model, self.X, self.y, n_repeats=10, random_state=42) - feature_importance_dict = {feature: importance for feature, importance in zip(self.feature_names, perm_importance.importances_mean)} - logger.info("Feature importance calculated successfully.") - return dict(sorted(feature_importance_dict.items(), key=lambda item: abs(item[1]), reverse=True)) + logger.debug("Calculating permutation importance for scikit-learn model...") + perm_importance = permutation_importance( + self.model, self.X, self.y, n_repeats=10, random_state=42 + ) + feature_importance_dict = { + feature: importance + for feature, importance in zip(self.feature_names, perm_importance.importances_mean) + } + logger.debug("Permutation-based feature importance calculated.") + + # Sort features by absolute importance in descending order + sorted_importance = dict( + sorted(feature_importance_dict.items(), key=lambda item: abs(item[1]), reverse=True) + ) + self.feature_importance = sorted_importance + logger.info("Feature importance calculated and sorted.") + return sorted_importance except Exception as e: logger.error(f"Error calculating feature importance: {str(e)}") raise @@ -344,19 +448,29 @@ def _generate_visualizations(self, feature_importance): logger.debug("Generating visualizations...") try: plot_feature_importance(feature_importance) - plot_partial_dependence(self.model, self.X, feature_importance, self.feature_names, self.model_type) - plot_learning_curve(self.model, self.X, self.y, self.is_classifier, self.model_type) - plot_correlation_heatmap(pd.DataFrame(self.X, columns=self.feature_names)) + plot_partial_dependence( + self.model, self.X, feature_importance, self.feature_names, self.model_type + ) + plot_learning_curve( + self.model, self.X, self.y, self.is_classifier, self.model_type + ) + plot_correlation_heatmap( + pd.DataFrame(self.X, columns=self.feature_names) + ) if self.is_classifier: - plot_roc_curve(self.model, self.X, self.y, self.model_type) - plot_precision_recall_curve(self.model, self.X, self.y, self.model_type) - logger.info("Visualizations generated successfully.") + plot_roc_curve( + self.model, self.X, self.y, self.model_type + ) + plot_precision_recall_curve( + self.model, self.X, self.y, self.model_type + ) + logger.info("Visualizations generated and saved successfully.") except Exception as e: logger.error(f"Error generating visualizations: {str(e)}") raise def _print_results(self, results): - logger.debug("Printing results...") + logger.debug("Printing analysis results...") try: logger.info("\nModel Performance:") for metric, value in results['model_performance'].items(): @@ -413,7 +527,11 @@ def perform_eda(df): # Identify highly correlated features high_corr = np.where(np.abs(corr_matrix) > 0.8) - high_corr_list = [(corr_matrix.index[x], corr_matrix.columns[y]) for x, y in zip(*high_corr) if x != y and x < y] + high_corr_list = [ + (corr_matrix.index[x], corr_matrix.columns[y]) + for x, y in zip(*high_corr) + if x != y and x < y + ] if high_corr_list: logger.info(f"{Fore.YELLOW}Highly correlated features:{Style.RESET_ALL}") for feat1, feat2 in high_corr_list: @@ -437,3 +555,4 @@ def perform_eda(df): logger.error(f"Error occurred during exploratory data analysis: {str(e)}") raise + From b07b8555c2f986cfeae98b6e590d99115902cb78 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:12:25 +0530 Subject: [PATCH 13/20] Update utils.py --- explainableai/utils.py | 458 +++-------------------------------------- 1 file changed, 26 insertions(+), 432 deletions(-) diff --git a/explainableai/utils.py b/explainableai/utils.py index cf3d4bb..361ba80 100644 --- a/explainableai/utils.py +++ b/explainableai/utils.py @@ -1,448 +1,42 @@ +# utils.py + +# Import colorama and its components +import colorama +from colorama import Fore, Style + # Initialize colorama colorama.init(autoreset=True) import pandas as pd import numpy as np -from sklearn.model_selection import train_test_split -from sklearn.inspection import permutation_importance -from sklearn.preprocessing import StandardScaler, LabelEncoder, OneHotEncoder -from sklearn.impute import SimpleImputer -from sklearn.compose import ColumnTransformer -from sklearn.pipeline import Pipeline - -# Import TensorFlow -import tensorflow as tf -from scikeras.wrappers import KerasClassifier, KerasRegressor - -from .visualizations import ( - plot_feature_importance, plot_partial_dependence, plot_learning_curve, - plot_roc_curve, plot_precision_recall_curve, plot_correlation_heatmap -) -from .model_evaluation import evaluate_model, cross_validate -from .feature_analysis import calculate_shap_values -from .feature_interaction import analyze_feature_interactions -from .llm_explanations import initialize_gemini, get_llm_explanation, get_prediction_explanation -from .report_generator import ReportGenerator -from .model_selection import compare_models -from reportlab.platypus import PageBreak import logging -from sklearn.model_selection import cross_val_score +# Configure logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -class XAIWrapper: - def __init__(self): - self.model = None - self.X = None - self.y = None - self.feature_names = None - self.is_classifier = None - self.preprocessor = None - self.label_encoder = None - self.categorical_columns = None - self.numerical_columns = None - self.gemini_model = initialize_gemini() - self.feature_importance = None - self.results = None - self.model_type = None # To store model type - - def fit(self, models, X, y, feature_names=None): - logger.debug("Fitting the model...") - try: - if isinstance(models, dict): - self.models = models - else: - self.models = {'Model': models} - self.X = X - self.y = y - self.feature_names = feature_names if feature_names is not None else X.columns.tolist() - self._determine_model_type() - - logger.info(f"{Fore.BLUE}Preprocessing data...{Style.RESET_ALL}") - self._preprocess_data() - - logger.info(f"{Fore.BLUE}Fitting models and analyzing...{Style.RESET_ALL}") - self.model_comparison_results = self._compare_models() - - # Select the best model based on cv_score - best_model_name = max(self.model_comparison_results, key=lambda x: self.model_comparison_results[x]['cv_score']) - self.model = self.models[best_model_name] - if self.model_type == 'tensorflow': - self.model.fit(self.X, self.y, epochs=10, batch_size=32, verbose=0) - else: - self.model.fit(self.X, self.y) - - logger.info("Model fitting is complete...") - return self - except Exception as e: - logger.error(f"An error occurred while fitting the models: {str(e)}") - raise - - def _determine_model_type(self): - # Determine if the models are TensorFlow or scikit-learn - model_types = set() - for model in self.models.values(): - if isinstance(model, (tf.keras.Model, KerasClassifier, KerasRegressor)): - model_types.add('tensorflow') - else: - model_types.add('sklearn') - if len(model_types) > 1: - raise ValueError("All models should be of the same type (either all TensorFlow or all scikit-learn).") - self.model_type = model_types.pop() - self.is_classifier = all(self._is_classifier_model(model) for model in self.models.values()) - - def _is_classifier_model(self, model): - if self.model_type == 'tensorflow': - # Assume TensorFlow models output probabilities for classifiers - return model.output_shape[-1] > 1 - else: - return hasattr(model, "predict_proba") - - def _compare_models(self): - logger.debug("Comparing the models...") - try: - results = {} - for name, model in self.models.items(): - if self.model_type == 'tensorflow': - # Use Keras wrappers for cross-validation - if self.is_classifier: - wrapped_model = KerasClassifier(build_fn=lambda: model, epochs=10, batch_size=32, verbose=0) - else: - wrapped_model = KerasRegressor(build_fn=lambda: model, epochs=10, batch_size=32, verbose=0) - cv_scores = cross_validate(wrapped_model, self.X, self.y, is_classifier=self.is_classifier, model_type=self.model_type) - test_score = wrapped_model.score(self.X, self.y) - else: - scoring = 'roc_auc' if self.is_classifier else 'r2' - cv_scores = cross_val_score(model, self.X, self.y, cv=5, scoring=scoring) - model.fit(self.X, self.y) - test_score = model.score(self.X, self.y) - results[name] = { - 'cv_score': np.mean(cv_scores), - 'test_score': test_score - } - logger.info("Model comparison completed successfully.") - return results - except Exception as e: - logger.error(f"An error occurred while comparing models: {str(e)}") - raise - - def _preprocess_data(self): - # Identify categorical and numerical columns - self.categorical_columns = self.X.select_dtypes(include=['object', 'category']).columns - self.numerical_columns = self.X.select_dtypes(include=['int64', 'float64']).columns - - # Create preprocessing steps - logger.debug("Creating preprocessing steps...") - numeric_transformer = Pipeline(steps=[ - ('imputer', SimpleImputer(strategy='mean')), - ('scaler', StandardScaler()) - ]) - - categorical_transformer = Pipeline(steps=[ - ('imputer', SimpleImputer(strategy='constant', fill_value='missing')), - ('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)) - ]) - - self.preprocessor = ColumnTransformer( - transformers=[ - ('num', numeric_transformer, self.numerical_columns), - ('cat', categorical_transformer, self.categorical_columns) - ]) - logger.info("Preprocessing setup completed.") - - # Fit and transform the data - logger.debug("Fitting and transforming the data...") - self.X = self.preprocessor.fit_transform(self.X) - - # Update feature names after preprocessing - logger.debug("Updating feature names...") - try: - num_feature_names = self.numerical_columns.tolist() - cat_feature_names = [] - if self.categorical_columns.size > 0: - cat_feature_names = self.preprocessor.named_transformers_['cat'].named_steps['onehot'].get_feature_names_out(self.categorical_columns).tolist() - self.feature_names = num_feature_names + cat_feature_names - - # Encode target variable if it's categorical - if self.is_classifier and pd.api.types.is_categorical_dtype(self.y): - self.label_encoder = LabelEncoder() - self.y = self.label_encoder.fit_transform(self.y) - except Exception as e: - logger.error(f"An error occurred while updating feature names: {str(e)}") - raise - - def analyze(self): - logger.debug("Starting analysis...") - results = {} - - try: - logger.info("Evaluating model performance...") - results['model_performance'] = evaluate_model(self.model, self.X, self.y, self.is_classifier, self.model_type) - - logger.info("Calculating feature importance...") - self.feature_importance = self._calculate_feature_importance() - results['feature_importance'] = self.feature_importance - - logger.info("Generating visualizations...") - self._generate_visualizations(self.feature_importance) - - logger.info("Calculating SHAP values...") - results['shap_values'] = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) - - logger.info("Performing cross-validation...") - mean_score, std_score = cross_validate(self.model, self.X, self.y, is_classifier=self.is_classifier, model_type=self.model_type) - results['cv_scores'] = (mean_score, std_score) - - logger.info("Model comparison results:") - results['model_comparison'] = self.model_comparison_results - - self._print_results(results) - - logger.info("Generating LLM explanation...") - results['llm_explanation'] = get_llm_explanation(self.gemini_model, results) - - self.results = results - return results - except Exception as e: - logger.error(f"An error occurred during analysis: {str(e)}") - raise - - def generate_report(self, filename='xai_report.pdf'): - if self.results is None: - raise ValueError("No analysis results available. Please run analyze() first.") - - report = ReportGenerator(filename) - report.add_heading("Explainable AI Report") - - sections = { - 'model_comparison': self._generate_model_comparison, - 'model_performance': self._generate_model_performance, - 'feature_importance': self._generate_feature_importance, - 'visualization': self._generate_visualization, - 'llm_explanation': self._generate_llm_explanation - } - - try: - if input("Do you want all sections in the XAI report? (y/n) ").strip().lower() in ['y', 'yes']: - for section_func in sections.values(): - section_func(report) - else: - for section, section_func in sections.items(): - if input(f"Do you want {section} in the XAI report? (y/n) ").strip().lower() in ['y', 'yes']: - section_func(report) - - report.generate() - logger.info(f"Report generated successfully: {filename}") - except Exception as e: - logger.error(f"An error occurred while generating the report: {str(e)}") - raise - - def _generate_model_comparison(self, report): - report.add_heading("Model Comparison", level=2) - model_comparison_data = [["Model", "CV Score", "Test Score"]] + [ - [model, f"{scores['cv_score']:.4f}", f"{scores['test_score']:.4f}"] - for model, scores in self.results['model_comparison'].items() - ] - report.add_table(model_comparison_data) - - def _generate_model_performance(self, report): - report.add_heading("Model Performance", level=2) - for metric, value in self.results['model_performance'].items(): - if isinstance(value, (int, float, np.float64)): - report.add_paragraph(f"**{metric}:** {value:.4f}") - else: - report.add_paragraph(f"**{metric}:**\n{value}") - - def _generate_feature_importance(self, report): - report.add_heading("Feature Importance", level=2) - feature_importance_data = [["Feature", "Importance"]] + [ - [feature, f"{importance:.4f}"] for feature, importance in self.feature_importance.items() - ] - report.add_table(feature_importance_data) - - def _generate_visualization(self, report): - report.add_heading("Visualizations", level=2) - visualization_files = ['feature_importance.png', 'partial_dependence.png', 'learning_curve.png', 'correlation_heatmap.png'] - if self.is_classifier: - visualization_files.extend(['roc_curve.png', 'precision_recall_curve.png']) - - for image in visualization_files: - report.add_image(image) - report.content.append(PageBreak()) - - def _generate_llm_explanation(self, report): - report.add_heading("LLM Explanation", level=2) - report.add_llm_explanation(self.results['llm_explanation']) - - def predict(self, X): - logger.debug("Making predictions...") - try: - if self.model is None: - raise ValueError("Model has not been fitted. Please run fit() first.") - - X = self._preprocess_input(X) - - if self.is_classifier: - prediction = self.model.predict(X) - probabilities = self.model.predict_proba(X) - if self.label_encoder: - prediction = self.label_encoder.inverse_transform(prediction) - logger.info("Prediction completed successfully.") - return prediction, probabilities - else: - prediction = self.model.predict(X) - logger.info("Prediction completed successfully.") - return prediction - except Exception as e: - logger.error(f"Error during prediction: {str(e)}") - raise - - def _preprocess_input(self, X): - # Ensure X is a DataFrame - logger.debug("Preprocessing input data...") - try: - if not isinstance(X, pd.DataFrame): - X = pd.DataFrame(X, columns=self.feature_names) - - # Apply the same preprocessing as during training - X = self.preprocessor.transform(X) - logger.info("Input data preprocessed successfully.") - - return X - except Exception as e: - logger.error(f"An error occurred during input preprocessing: {str(e)}") - raise - - def explain_prediction(self, input_data): - logger.debug("Generating explanation for the prediction...") - try: - input_df = pd.DataFrame([input_data]) - prediction, probabilities = self.predict(input_df) - explanation = get_prediction_explanation( - self.gemini_model, - input_data, - prediction[0], - probabilities[0], - self.feature_importance - ) - logger.info("Prediction explanation generated successfully.") - return prediction[0], probabilities[0], explanation - except Exception as e: - logger.error(f"An error occurred while explaining the prediction: {str(e)}") - raise - - def _calculate_feature_importance(self): - logger.debug("Calculating feature importance...") - try: - if self.model_type == 'tensorflow': - # For TensorFlow models, use SHAP values as feature importance - shap_values = calculate_shap_values(self.model, self.X, self.feature_names, self.model_type) - feature_importance = np.mean(np.abs(shap_values.values), axis=0) - feature_importance_dict = {feature: importance for feature, importance in zip(self.feature_names, feature_importance)} - else: - perm_importance = permutation_importance(self.model, self.X, self.y, n_repeats=10, random_state=42) - feature_importance_dict = {feature: importance for feature, importance in zip(self.feature_names, perm_importance.importances_mean)} - logger.info("Feature importance calculated successfully.") - return dict(sorted(feature_importance_dict.items(), key=lambda item: abs(item[1]), reverse=True)) - except Exception as e: - logger.error(f"An error occurred while calculating feature importance: {str(e)}") - raise - - def _generate_visualizations(self, feature_importance): - logger.debug("Generating visualizations...") - try: - plot_feature_importance(feature_importance) - plot_partial_dependence(self.model, self.X, feature_importance, self.feature_names, self.model_type) - plot_learning_curve(self.model, self.X, self.y, self.is_classifier, self.model_type) - plot_correlation_heatmap(pd.DataFrame(self.X, columns=self.feature_names)) - if self.is_classifier: - plot_roc_curve(self.model, self.X, self.y, self.model_type) - plot_precision_recall_curve(self.model, self.X, self.y, self.model_type) - logger.info("Visualizations generated successfully.") - except Exception as e: - logger.error(f"An error occurred while generating visualizations: {str(e)}") - raise - - def _print_results(self, results): - logger.debug("Printing analysis results...") - try: - logger.info("\nModel Performance:") - for metric, value in results['model_performance'].items(): - if isinstance(value, (int, float, np.float64)): - logger.info(f"{metric}: {value:.4f}") - else: - logger.info(f"{metric}:\n{value}") - - logger.info("\nTop 5 Important Features:") - for feature, importance in list(results['feature_importance'].items())[:5]: - logger.info(f"{feature}: {importance:.4f}") - - logger.info(f"\nCross-validation Score: {results['cv_scores'][0]:.4f} (+/- {results['cv_scores'][1]:.4f})") - - logger.info("\nVisualizations saved:") - logger.info("- Feature Importance: feature_importance.png") - logger.info("- Partial Dependence: partial_dependence.png") - logger.info("- Learning Curve: learning_curve.png") - logger.info("- Correlation Heatmap: correlation_heatmap.png") - if self.is_classifier: - logger.info("- ROC Curve: roc_curve.png") - logger.info("- Precision-Recall Curve: precision_recall_curve.png") - - if results['shap_values'] is not None: - logger.info("\nSHAP values calculated successfully. See 'shap_summary.png' for visualization.") - else: - logger.info("\nSHAP values calculation failed. Please check the console output for more details.") - except Exception as e: - logger.error(f"An error occurred while printing results: {str(e)}") - raise +# Example utility function using colorama for colored logs +def log_data_processing_step(step_description): + logger.info(f"{Fore.BLUE}{step_description}{Style.RESET_ALL}") - @staticmethod - def perform_eda(df): - logger.debug("Performing exploratory data analysis...") - try: - logger.info(f"{Fore.CYAN}Exploratory Data Analysis:{Style.RESET_ALL}") - logger.info(f"{Fore.GREEN}Dataset shape: {df.shape}{Style.RESET_ALL}") - logger.info(f"{Fore.CYAN}Dataset info:{Style.RESET_ALL}") - df.info() - logger.info(f"{Fore.CYAN}Summary statistics:{Style.RESET_ALL}") - logger.info(df.describe()) - logger.info(f"{Fore.CYAN}Missing values:{Style.RESET_ALL}") - logger.info(df.isnull().sum()) - logger.info(f"{Fore.CYAN}Data types:{Style.RESET_ALL}") - logger.info(df.dtypes) - logger.info(f"{Fore.CYAN}Unique values in each column:{Style.RESET_ALL}") - for col in df.columns: - logger.info(f"{Fore.GREEN}{col}: {df[col].nunique()}{Style.RESET_ALL}") +# Example utility class +class DataProcessor: + def process_data(self, data): + logger.info(f"{Fore.YELLOW}Starting data processing...{Style.RESET_ALL}") + # Implement data processing logic here + logger.info(f"{Fore.YELLOW}Data processing completed.{Style.RESET_ALL}") - # Additional EDA steps - logger.info(f"{Fore.CYAN}Correlation matrix:{Style.RESET_ALL}") - corr_matrix = df.select_dtypes(include=[np.number]).corr() - logger.info(corr_matrix) +# Add your actual utility functions and classes below +# Ensure that any function or class using Fore or Style includes the imports above - # Identify highly correlated features - high_corr = np.where(np.abs(corr_matrix) > 0.8) - high_corr_list = [(corr_matrix.index[x], corr_matrix.columns[y]) for x, y in zip(*high_corr) if x != y and x < y] - if high_corr_list: - logger.info(f"{Fore.YELLOW}Highly correlated features:{Style.RESET_ALL}") - for feat1, feat2 in high_corr_list: - logger.info(f"{Fore.GREEN}{feat1} - {feat2}: {corr_matrix.loc[feat1, feat2]:.2f}{Style.RESET_ALL}") +def some_utility_function(): + # Example function using Fore and Style + logger.info(f"{Fore.GREEN}This is a green message.{Style.RESET_ALL}") + # Rest of the function... - # Identify potential outliers - logger.info(f"{Fore.CYAN}Potential outliers (values beyond 3 standard deviations):{Style.RESET_ALL}") - numeric_cols = df.select_dtypes(include=[np.number]).columns - for col in numeric_cols: - mean = df[col].mean() - std = df[col].std() - outliers = df[(df[col] < mean - 3 * std) | (df[col] > mean + 3 * std)] - if not outliers.empty: - logger.info(f"{Fore.GREEN}{col}: {len(outliers)} potential outliers{Style.RESET_ALL}") +class SomeUtilityClass: + def example_method(self): + logger.info(f"{Fore.RED}This is a red message.{Style.RESET_ALL}") + # Rest of the method... - # Class distribution for the target variable (assuming last column is target) - target_col = df.columns[-1] - logger.info(f"{Fore.CYAN}Class distribution for target variable '{target_col}':{Style.RESET_ALL}") - logger.info(df[target_col].value_counts(normalize=True)) - except Exception as e: - logger.error(f"An error occurred during exploratory data analysis: {str(e)}") - raise From 6225e51d8cbd61aea521240d907494b3e3e0d533 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:13:01 +0530 Subject: [PATCH 14/20] Update requirements.txt --- requirements.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 538b4b3..a4f8705 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,5 @@ colorama scikeras tensorflow + + From 815acf6b3ae2ad268c53f592045665479547cdac Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:14:11 +0530 Subject: [PATCH 15/20] Update requires.txt --- explainableai.egg-info/requires.txt | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/explainableai.egg-info/requires.txt b/explainableai.egg-info/requires.txt index 04c33a1..538b4b3 100644 --- a/explainableai.egg-info/requires.txt +++ b/explainableai.egg-info/requires.txt @@ -12,3 +12,8 @@ google-generativeai python-dotenv scipy pillow +xgboost +colorama +scikeras +tensorflow + From 48b986c35462325871c0311eb48f49dc37e325d9 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:17:00 +0530 Subject: [PATCH 16/20] Update requires.txt --- explainableai.egg-info/requires.txt | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/explainableai.egg-info/requires.txt b/explainableai.egg-info/requires.txt index 538b4b3..2577288 100644 --- a/explainableai.egg-info/requires.txt +++ b/explainableai.egg-info/requires.txt @@ -12,8 +12,5 @@ google-generativeai python-dotenv scipy pillow -xgboost -colorama -scikeras -tensorflow + From 58d5558951ddbd01a5255b6f6702bd0fb14fc4f0 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 16:59:11 +0530 Subject: [PATCH 17/20] Update test_utils.py --- tests/test_utils.py | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/tests/test_utils.py b/tests/test_utils.py index 495cbb7..0b55e65 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -1,10 +1,19 @@ +# tests/test_utils.py + +import sys +import os import pytest from sklearn.linear_model import LinearRegression, LogisticRegression from sklearn.datasets import make_classification, make_regression from sklearn.model_selection import train_test_split -from explainableai.utils import explain_model, calculate_metrics from dotenv import load_dotenv -import os + +# Add the project root directory to sys.path +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) + +from explainableai.utils import explain_model, calculate_metrics + +# Load environment variables load_dotenv() def test_explain_model_regression(): @@ -58,4 +67,4 @@ def test_calculate_metrics_classification(): assert "f1_score" in metrics if __name__ == "__main__": - pytest.main() \ No newline at end of file + pytest.main() From 60727a7b49253c7b2fa678aeae9e660cc76aec75 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 17:00:24 +0530 Subject: [PATCH 18/20] Update setup.py --- setup.py | 22 ++++++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index bbc2190..211d3e7 100644 --- a/setup.py +++ b/setup.py @@ -1,6 +1,9 @@ +# setup.py + from setuptools import setup, find_packages import os +# Read the long description from README.md this_directory = os.path.abspath(os.path.dirname(__file__)) with open(os.path.join(this_directory, 'README.md'), encoding='utf-8') as f: long_description = f.read() @@ -23,7 +26,11 @@ 'google-generativeai', 'python-dotenv', 'scipy', - 'pillow' + 'pillow', + 'colorama', # Added missing dependency + 'scikeras', # Added missing dependency + 'tensorflow', # Added missing dependency + # Removed 'model_interpretability' assuming it's part of this package ], entry_points={ 'console_scripts': [ @@ -60,4 +67,15 @@ package_data={ 'explainableai': ['data/*.csv', 'templates/*.html'], }, -) \ No newline at end of file + # Optional: Add a test suite + # test_suite='tests', + # Optional: Specify development dependencies + extras_require={ + 'dev': [ + 'pytest', + 'flake8', + 'black', + # Add other development dependencies here + ], + }, +) From 0852a8e1cc5f3cb323b2ab23a0e9207425cd8e30 Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 17:07:31 +0530 Subject: [PATCH 19/20] Update requirements.txt --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index a4f8705..7a005df 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ xgboost colorama scikeras tensorflow - +model_interpretability +pytest From b8f2fa767d8b6fd1c59d99fc12988ccdef795e2f Mon Sep 17 00:00:00 2001 From: sakeeb hasan <100307524+Sakeebhasan123456@users.noreply.github.com> Date: Sun, 6 Oct 2024 17:18:08 +0530 Subject: [PATCH 20/20] Update requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 7a005df..35d3691 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,7 +16,7 @@ xgboost colorama scikeras tensorflow -model_interpretability + pytest