Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 309 additions & 0 deletions main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,309 @@
### functions would go in this file. Edit as you see fit. ####
#pandas data analysis tool
import pandas as pd
#system function
import os
#read current path and needed fixed folder structure
from pathlib import Path
#3D interactive plot
import plotly.express as px
#static plot
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.cluster import DBSCAN
from sklearn.cluster import AgglomerativeClustering
from scipy.cluster.hierarchy import dendrogram, linkage

def load_exnode_data(folder_path_arg=None): # Renamed parameter to avoid conflict
# find all files are in the folder
if folder_path_arg is None:
project_root = Path(os.getcwd())
effective_folder_path = project_root.parent / "inputs"
else:
effective_folder_path = Path(folder_path_arg).parent / "inputs"

print(f"Using data folder path: {effective_folder_path}")

all_data_points = []

# Get a list of all .exnode files in the folder
file_lists = [f for f in os.listdir(effective_folder_path) if f.endswith('.exnode')]

if not file_lists:
print(f"No .exnode files found in {effective_folder_path}")
return pd.DataFrame() # Return an empty DataFrame if no files are found
else:
print(f"Found {len(file_lists)} .exnode files: {file_lists}")

for file_name in file_lists:
file_path_full = os.path.join(effective_folder_path, file_name)
print(f"\nProcessing file: {file_name}")

data_points_current_file = []
current_node_id = None
current_node_values = [] # To store x, y, z, avg_intensity sequentially

# error handle
try:
with open(file_path_full, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()

if line.startswith('Node:'):
# check if its first node if not store it
if current_node_id is not None:
# load 4 values - check if 4 values are present before appending
if len(current_node_values) == 4:
data_points_current_file.append({
'file_source': file_name,
'node_id': current_node_id,
'x': current_node_values[0],
'y': current_node_values[1],
'z': current_node_values[2],
'avg_intensity': current_node_values[3]
})
else:
print(f"Warning in {file_name}: Node {current_node_id} has {len(current_node_values)} values, expected 4. Skipping this node's data.")

# Start new node
current_node_id = int(line.split(':')[1].strip())
current_node_values = [] # Reset for new node

elif current_node_id is not None and line != '': # We are inside a node's data block and line is not empty
# Assumes data lines are always valid floats
try:
value = float(line)
current_node_values.append(value)
except ValueError:
# The print statement's indentation was incorrect relative to the 'break'
print(f"Warning in {file_name}: Non-numeric or unexpected data '{line}' found for Node {current_node_id} at line {line_num}. Skipping value.")
except FileNotFoundError: # Added outer except blocks
print(f"Error: File {file_path_full} not found. Skipping.")

# After loop, process the last node's data for the current file
if current_node_id is not None:
# Assumes exactly 4 values are always present for the last node
if len(current_node_values) == 4:
data_points_current_file.append({
'file_source': file_name,
'node_id': current_node_id,
'x': current_node_values[0],
'y': current_node_values[1],
'z': current_node_values[2],
'avg_intensity': current_node_values[3]
})
else:
print(f"Warning in {file_name}: Last node {current_node_id} has {len(current_node_values)} values, expected 4. Skipping this node's data.")

all_data_points.extend(data_points_current_file)
print(f"Finished processing {file_name}. Added {len(data_points_current_file)} data points.")

# Create DataFrame from all collected data points
df = pd.DataFrame(all_data_points)

# Display some info
print("\n--- Combined DataFrame Info ---")
if not df.empty:
print("First 5 data points:")
print(df.head())
print("last 5 data points:")
print(df.tail())
print(f"\nTotal data points read from all files: {len(df)}")
else:
print("No data points were successfully read.")

return df

def plot_plotly_3d_scatter(df, x_col='x', y_col='y', z_col='z', color_col='avg_intensity', title='Interactive 3D Visualization of Data Points by Intensity (Plotly)'):
"""
Generates an interactive 3D scatter plot using plotly.express.

Args:
df (pd.DataFrame): The input DataFrame.
x_col (str): The column name for the x-axis. Defaults to 'x'.
y_col (str): The column name for the y-axis. Defaults to 'y'.
z_col (str): The column name for the z-axis. Defaults to 'z'.
color_col (str): The column name to use for coloring the points. Defaults to 'avg_intensity'.
title (str): The title of the plot. Defaults to 'Interactive 3D Visualization of Data Points by Intensity (Plotly)'.
"""
if df.empty:
print("DataFrame is empty. No data to visualize interactively.")
return

fig_interactive = px.scatter_3d(df,
x=x_col,
y=y_col,
z=z_col,
color=color_col,
title=title)
fig_interactive.show()

def plot_mpl(df, x_col='x', y_col='y', z_col='z', color_col='avg_intensity', title='3D Visualization of Data Points by Intensity (Matplotlib)'):
"""
Generates a static 3D scatter plot using matplotlib.

Args:
df (pd.DataFrame): The input DataFrame.
x_col (str): The column name for the x-axis. Defaults to 'x'.
y_col (str): The column name for the y-axis. Defaults to 'y'.
z_col (str): The column name for the z-axis. Defaults to 'z'.
color_col (str): The column name to use for coloring the points. Defaults to 'avg_intensity'.
title (str): The title of the plot. Defaults to '3D Visualization of Data Points by Intensity (Matplotlib)'.
"""
if not df.empty:
fig = plt.figure(figsize=(10, 8))
ax = fig.add_subplot(111, projection='3d')

scatter = ax.scatter(df[x_col], df[y_col], df[z_col], c=df[color_col], cmap='viridis', s=5)

ax.set_xlabel('X Coordinate')
ax.set_ylabel('Y Coordinate')
ax.set_zlabel('Z Coordinate')
ax.set_title(title)

# Add a color bar
cbar = fig.colorbar(scatter, ax=ax, pad=0.1)
cbar.set_label('Average Intensity')

plt.show()
else:
print("DataFrame is empty. No data to visualize.")

def perform_and_plot_all_kmeans_clusters(dataframe, n_clusters=5):
"""
Performs K-means clustering for various feature sets and visualizes the results.

Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
n_clusters (int): The number of clusters to use for K-means.
"""
print(f"Performing K-means clustering with {n_clusters} clusters...")

# 1. K-means Clustering (Avg Intensity Only)
kmeans_intensity = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto')
dataframe['intensity_cluster_label'] = kmeans_intensity.fit_predict(dataframe[['avg_intensity']])
print("Intensity K-means clustering completed.")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='intensity_cluster_label',
title=f'K-means Clustering ({n_clusters} Clusters - Avg Intensity Only)')

# 2. K-means Clustering (X, Y, Z Coordinates Only)
kmeans_xyz = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto')
dataframe['xyz_cluster_label'] = kmeans_xyz.fit_predict(dataframe[['x', 'y', 'z']])
print("Location (X,Y,Z) K-means clustering completed.")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='xyz_cluster_label',
title=f'K-means Clustering ({n_clusters} Clusters - X, Y, Z Coordinates Only)')

# 3. K-means Clustering (X, Y, Z, and Avg Intensity Combined)
kmeans_xyzi = KMeans(n_clusters=n_clusters, random_state=42, n_init='auto')
features_xyzi = dataframe[['x', 'y', 'z', 'avg_intensity']]
scaler_xyzi = StandardScaler()
scaled_features_xyzi_kmean = scaler_xyzi.fit_transform(features_xyzi)
#dataframe['xyzi_cluster_label'] = kmeans_xyzi.fit_predict(dataframe[['x', 'y', 'z', 'avg_intensity']])
dataframe['xyzi_cluster_label'] = kmeans_xyzi.fit_predict(scaled_features_xyzi_kmean)
print("Combined (X,Y,Z,Intensity) K-means clustering completed.")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='xyzi_cluster_label',
title=f'K-means Clustering ({n_clusters} Clusters - X, Y, Z, and Avg Intensity)')

print("All K-means clustering and plotting processes finished.")

def perform_and_plot_all_dbscan_clusters(dataframe, eps_val=0.5, min_samples_val=8):
"""
Performs DBSCAN clustering for various feature sets and visualizes the results.

Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
eps_val (float): The maximum distance between two samples for one to be considered
as in the neighborhood of the other.
min_samples_val (int): The number of samples (or total weight) in a neighborhood for
a point to be considered as a core point.
"""
print(f"Performing DBSCAN clustering with eps={eps_val} and min_samples={min_samples_val}...")


# 1. DBSCAN Clustering (X, Y, Z, and Avg Intensity Combined)
features_xyzi = dataframe[['x', 'y', 'z', 'avg_intensity']]
scaler_xyzi = StandardScaler()
scaled_features_xyzi = scaler_xyzi.fit_transform(features_xyzi)
dbscan_xyzi = DBSCAN(eps=eps_val, min_samples=min_samples_val)
dataframe['dbscan_xyzi_cluster_label'] = dbscan_xyzi.fit_predict(scaled_features_xyzi)
print("DBSCAN (X,Y,Z,Intensity) clustering completed.")
num_clusters_xyzi = len(set(dataframe['dbscan_xyzi_cluster_label'])) - (1 if -1 in dataframe['dbscan_xyzi_cluster_label'].values else 0)
num_noise_points_xyzi = (dataframe['dbscan_xyzi_cluster_label'] == -1).sum()
print(f"Number of clusters (Combined): {num_clusters_xyzi}, Noise points: {num_noise_points_xyzi}")
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col='dbscan_xyzi_cluster_label',
title=f'DBSCAN Clustering (eps={eps_val}, min_samples={min_samples_val} - X, Y, Z, and Avg Intensity Combined)')

print("All DBSCAN clustering and plotting processes finished.")

def perform_and_plot_all_agglomerative_clusters(dataframe, n_clusters=5, linkage='ward'):
"""
Performs Agglomerative Clustering for the combined feature set and visualizes the results.

Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
n_clusters (int): The number of clusters to form.
linkage (str): Which linkage criterion to use. E.g., 'ward', 'complete', 'average', 'single'.
"""
print(f"Performing Agglomerative Clustering with {n_clusters} clusters and linkage='{linkage}'...")

features_agglomerative = dataframe[['x', 'y', 'z', 'avg_intensity']]

# Scale the features
scaler = StandardScaler()
scaled_features_agglomerative = scaler.fit_transform(features_agglomerative)

# Apply Agglomerative Clustering
agg_clustering = AgglomerativeClustering(n_clusters=n_clusters, linkage=linkage)
dataframe[f'agglomerative_cluster_label_{n_clusters}'] = agg_clustering.fit_predict(scaled_features_agglomerative)

print("Agglomerative Clustering applied using 'x', 'y', 'z', and 'avg_intensity' features.")
num_clusters_agglomerative = len(set(dataframe[f'agglomerative_cluster_label_{n_clusters}']))
print("Number of clusters found:", num_clusters_agglomerative)

# Visualize the clusters
plot_plotly_3d_scatter(dataframe, x_col='x', y_col='y', z_col='z',
color_col=f'agglomerative_cluster_label_{n_clusters}',
title=f'Agglomerative Clustering ({n_clusters} Clusters, Linkage: {linkage} - X, Y, Z, and Avg Intensity)')

print("Agglomerative clustering and plotting process finished.")

def plot_agglomerative_dendrogram(dataframe, linkage_method='ward', p_val=30):
"""
Generates and plots a dendrogram for Agglomerative Clustering using combined features.

Args:
dataframe (pd.DataFrame): The DataFrame containing the data.
linkage (str): Which linkage criterion to use for the dendrogram.
p_val (int): The number of last merged clusters to show when truncating the dendrogram.
"""
print(f"\nGenerating dendrogram for Combined (X,Y,Z,Intensity) Agglomerative Clustering with linkage='{linkage}'...")

features_xyzi = dataframe[['x', 'y', 'z', 'avg_intensity']]
scaler_xyzi = StandardScaler()
scaled_features_xyzi = scaler_xyzi.fit_transform(features_xyzi)

linkage_matrix = linkage(scaled_features_xyzi, method=linkage_method)

plt.figure(figsize=(15, 7))
plt.title(f'Hierarchical Clustering Dendrogram (Linkage: {linkage_method} - X, Y, Z, Avg Intensity)')
plt.xlabel('Sample Index or Cluster Size')
plt.ylabel('Distance')
dendrogram(
linkage_matrix,
leaf_rotation=90., # rotates the x axis labels
leaf_font_size=8., # font size for the x axis labels
truncate_mode='lastp', # show only the last p merged clusters
p=p_val, # show only the last p merged clusters
show_leaf_counts=True
)
plt.show()
print("Dendrogram generated.")