From 318131c2d80e4e7923ff691953cbfd2813e82a98 Mon Sep 17 00:00:00 2001 From: maddy Date: Mon, 11 Aug 2025 15:45:06 -0400 Subject: [PATCH 1/5] Langchain implementation without langgraph MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- autoeda/__init__.py | 0 autoeda/agent.py | 275 ++++++++++++++ autoeda/raw_data_analyzer.py | 631 +++++++++++++++++++++++++++++++ autoeda/schema_parser.py | 695 +++++++++++++++++++++++++++++++++++ 4 files changed, 1601 insertions(+) create mode 100644 autoeda/__init__.py create mode 100644 autoeda/agent.py create mode 100644 autoeda/raw_data_analyzer.py create mode 100755 autoeda/schema_parser.py diff --git a/autoeda/__init__.py b/autoeda/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/autoeda/agent.py b/autoeda/agent.py new file mode 100644 index 000000000..e8455cc24 --- /dev/null +++ b/autoeda/agent.py @@ -0,0 +1,275 @@ +""" +LangChain single-node agent template with OpenAI API integration and .env support. + +This refactors the original LangGraph single-node graph template to use LangChain's Runnable workflow and standard components. + +Import as: + +from autoeda_agent_langchain import AutoEDAAgent +""" + +import dataclasses +import os +from typing import Any, Dict, Optional + +import dotenv +import langchain_core.runnables as lcru +from langchain_core.runnables import RunnableLambda, RunnableSequence +from langchain_core.tools import tool +import openai + +import autoeda.raw_data_analyzer as lsardaan +import autoeda.schema_parser as lsagscpa + +# Load environment variables from .env. +dotenv.load_dotenv() +OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") +client = openai.OpenAI(api_key=OPENAI_API_KEY) + +# ############################################################################# +# Configuration +# ############################################################################# + +@dataclasses.dataclass +class Configuration: + """ + Configurable parameters for the agent. + """ + openai_model: str = "gpt-3.5-turbo" + analysis_depth: str = "standard" # "basic", "standard", "deep" + include_schema_metadata: bool = True + +# ############################################################################# +# State +# ############################################################################# + +@dataclasses.dataclass +class State: + """ + Input state for the agent. + """ + file_path: str = "" + raw_data_result: Optional[Dict[str, Any]] = None + schema_file_path: str = "" + schema_content: str = "" + schema_result: Optional[Dict[str, Any]] = None + analysis_output: str = "" + +# ############################################################################# +# Steps as LangChain Runnables +# ############################################################################# + +def call_model(state: State, config: Configuration) -> State: + """ + Process input and returns output using OpenAI API. + """ + model = config.openai_model + + # Check for errors and build appropriate context + error_context = "" + if state.raw_data_result and "error" in state.raw_data_result: + error_context = f"️Data analysis failed: {state.raw_data_result['error']}" + elif state.schema_result and "error" in state.schema_result: + error_context = f"Schema parsing failed: {state.schema_result['error']}" + + if error_context: + prompt = f"""You are an AutoEDA assistant. An error occurred during data analysis: + +{error_context} + +Please provide: +1. Potential causes of this error +2. Troubleshooting steps to resolve the issue +3. Alternative approaches for data analysis +4. General recommendations for handling similar data files +""" + else: + # Build consolidated dataset summary + total_rows = state.raw_data_result.get('total_rows', 0) if state.raw_data_result else 0 + total_columns = state.raw_data_result.get('total_columns', 0) if state.raw_data_result else 0 + file_path = state.raw_data_result.get('file_path', 'unknown') if state.raw_data_result else 'unknown' + + # Determine dataset characteristics for targeted analysis + dataset_size = "small" if total_rows < 1000 else "medium" if total_rows < 100000 else "large" + dataset_width = "narrow" if total_columns < 10 else "wide" if total_columns < 50 else "very wide" + + # Build schema insights + schema_insights = "" + if state.schema_result and "error" not in state.schema_result: + required_cols = state.schema_result.get('required_columns', 0) + optional_cols = state.schema_result.get('optional_columns', 0) + schema_type = state.schema_result.get('schema_type', 'unknown') + + schema_insights = f""" +Schema Structure ({schema_type}): +• {total_columns} columns total ({required_cols} required, {optional_cols} optional) +• Column breakdown:""" + + # Group columns by data type for better insights + col_types = {} + nullable_count = 0 + for col in state.schema_result.get("columns", []): + data_type = col['data_type'] + col_types[data_type] = col_types.get(data_type, 0) + 1 + if col.get('nullable', False): + nullable_count += 1 + + for dtype, count in col_types.items(): + schema_insights += f"\n - {dtype}: {count} columns" + + if nullable_count > 0: + schema_insights += f"\n• {nullable_count} columns allow null values" + + # Build targeted recommendations based on dataset characteristics + analysis_focus = "" + if dataset_size == "small": + analysis_focus = "completeness analysis and pattern detection" + elif dataset_size == "large": + analysis_focus = "sampling strategies and performance optimization" + else: + analysis_focus = "statistical profiling and distribution analysis" + + if dataset_width == "wide" or dataset_width == "very wide": + analysis_focus += ", feature selection and dimensionality reduction" + + prompt = f"""You are an expert AutoEDA assistant. Analyze this {dataset_size}, {dataset_width} dataset: + +Dataset Overview: +• File: {file_path} +• Size: {total_rows:,} rows × {total_columns} columns +• Focus areas: {analysis_focus} +{schema_insights} + +Based on these characteristics, provide targeted recommendations: + +1. **Data Quality Assessment**: Identify specific validation rules, missing value patterns, and potential anomalies to investigate + +2. **Statistical Analysis Plan**: Recommend appropriate statistical methods and exploratory techniques for this dataset size and structure + +3. **Visualization Strategy**: Suggest specific chart types and visualization approaches that work best for this data profile + +4. **Risk Areas**: Highlight potential data quality issues, biases, or limitations to investigate based on the schema and size + +5. **Next Steps**: Provide a prioritized action plan for the exploratory data analysis process + +Keep recommendations practical and specific to the dataset characteristics.""" + + response = client.chat.completions.create( + model=model, + messages=[{"role": "user", "content": prompt}], + temperature=0.3, # Lower temperature for more consistent, analytical responses + max_tokens=1024, # Increased for more detailed recommendations + ) + + output_text = response.choices[0].message.content + + state.analysis_output = output_text + return state + +def analyze_raw_data(state: State, config: Configuration) -> State: + """ + Analyze raw data file and generate schema. + """ + if not state.file_path: + state.raw_data_result = {"error": "No file path provided"} + return state + + analyzer = lsardaan.RawDataAnalyzer() + result = analyzer.analyze_file(state.file_path) + + if result.error_message: + state.raw_data_result = {"error": result.error_message} + return state + + # Convert result to dict for state + raw_data_result = { + "file_path": result.file_path, + "total_rows": result.total_rows, + "total_columns": result.total_columns, + "suggested_schema": result.suggested_schema, + "analysis_metadata": result.analysis_metadata, + } + state.raw_data_result = raw_data_result + + # Generate schema file + schema_file_path = state.file_path + ".schema.json" + try: + analyzer.save_schema(result, schema_file_path) + state.schema_file_path = schema_file_path + + # Load schema content from the generated file + import json + with open(schema_file_path, "r", encoding="utf-8") as f: + schema_content = f.read() + state.schema_content = schema_content + + except Exception as e: + state.raw_data_result["schema_file_error"] = str(e) + + return state + +def parse_schema(state: State, config: Configuration) -> State: + """ + Parse schema content and extract column information. + """ + if not state.schema_content: + state.schema_result = {"error": "No schema content provided"} + return state + + result = lsagscpa.parse_schema_content(state.schema_content) + + if result.error_message: + state.schema_result = {"error": result.error_message} + return state + + # Convert result to dict for state + schema_result = { + "total_columns": result.total_columns, + "required_columns": result.required_columns, + "optional_columns": result.optional_columns, + "schema_type": result.schema_type, + "columns": [ + { + "name": col.name, + "data_type": col.data_type, + "required": col.required, + "description": col.description, + "nullable": col.nullable, + } + for col in result.columns + ], + } + state.schema_result = schema_result + return state + +# ############################################################################# +# LangChain Agent as RunnableSequence +# ############################################################################# + +class AutoEDAAgent: + """ + AutoEDA Agent implemented using LangChain Runnables. + """ + + def __init__(self, config: Optional[Configuration] = None): + self.config = config or Configuration() + + self.workflow = ( + RunnableSequence() + .add(RunnableLambda(lambda state: analyze_raw_data(state, self.config))) + .add(RunnableLambda(lambda state: parse_schema(state, self.config))) + .add(RunnableLambda(lambda state: call_model(state, self.config))) + ) + + def run(self, state: State) -> State: + return self.workflow.invoke(state) + +# Example usage: +# agent = AutoEDAAgent(Configuration( +# openai_model="gpt-4", +# analysis_depth="deep", +# include_schema_metadata=True +# )) +# initial_state = State(file_path="mydata.csv") # schema_content will be auto-generated from the file +# result_state = agent.run(initial_state) +# print(result_state.analysis_output) \ No newline at end of file diff --git a/autoeda/raw_data_analyzer.py b/autoeda/raw_data_analyzer.py new file mode 100644 index 000000000..ccce1cac2 --- /dev/null +++ b/autoeda/raw_data_analyzer.py @@ -0,0 +1,631 @@ +#!/usr/bin/env python + +""" +Raw Data Analyzer for AutoEDA Agent. + +Analyzes raw data files (CSV, JSON, Parquet, Excel) and generates schema.json files +for use with the schema parser module. + +Import as: + +import raw_data_analyzer as rdanal +""" + +import argparse +import dataclasses +import json +import logging +import pathlib +import warnings +from typing import Any, Dict, List, Optional + +import pandas as pd + +import helpers.hio as hio + +# Configure logging. +logging.basicConfig(level=logging.INFO) +_LOG = logging.getLogger(__name__) + + +# ############################################################################# +# ColumnAnalysis +# ############################################################################# + + +@dataclasses.dataclass +class ColumnAnalysis: + """ + Represent analysis results for a single column. + """ + + name: str + data_type: str + nullable: bool + null_count: int + unique_count: int + sample_values: List[Any] + min_value: Optional[Any] = None + max_value: Optional[Any] = None + mean_value: Optional[float] = None + std_value: Optional[float] = None + pattern: Optional[str] = None + format_hint: Optional[str] = None + + +# ############################################################################# +# DataAnalysisResult +# ############################################################################# + + +@dataclasses.dataclass +class DataAnalysisResult: + """ + Represent the complete data analysis result. + """ + + file_path: str + total_rows: int + total_columns: int + columns: List[ColumnAnalysis] + suggested_schema: Dict[str, Any] + analysis_metadata: Dict[str, Any] + error_message: Optional[str] = None + + +# ############################################################################# +# RawDataAnalyzer +# ############################################################################# + + +class RawDataAnalyzer: + """ + Analyze raw data files and generate schema definitions. + + Supported formats: + - CSV files + - JSON files + - Parquet files + - Excel files + """ + + def __init__( + self, + *, + sample_size: int = 10000, + max_unique_values: int = 100, + datetime_formats: Optional[List[str]] = None, + ) -> None: + """ + Initialize the raw data analyzer. + + :param sample_size: maximum number of rows to sample for + analysis + :param max_unique_values: maximum unique values to consider for + enum types + :param datetime_formats: list of datetime formats to try for + detection + """ + self.sample_size = sample_size + self.max_unique_values = max_unique_values + self.datetime_threshold = 0.8 # 80% success rate for datetime detection + self.datetime_formats = datetime_formats or [ + "%Y-%m-%d", + "%Y-%m-%d %H:%M:%S", + "%Y-%m-%dT%H:%M:%S", + "%Y-%m-%dT%H:%M:%SZ", + "%m/%d/%Y", + "%d/%m/%Y", + "%Y%m%d", + ] + self.supported_formats = [".csv", ".json", ".parquet", ".xlsx", ".xls"] + + def analyze_file(self, file_path: str) -> DataAnalysisResult: + """ + Analyze a data file and generate schema information. + + :param file_path: path to the data file to analyze + :return: analysis results with suggested schema + """ + try: + path = pathlib.Path(file_path) + if not path.exists(): + error_msg = f"Data file not found: {file_path}" + return self._create_error_result(file_path, error_msg) + if not self._is_supported_format(path.suffix): + error_msg = f"Unsupported file format: {path.suffix}" + return self._create_error_result(file_path, error_msg) + # Load data based on file format. + df = self._load_data(file_path) + # Sample data if it's too large. + if len(df) > self.sample_size: + _LOG.info( + "Sampling %d rows from %d total rows", + self.sample_size, + len(df), + ) + df = df.sample(n=self.sample_size, random_state=42) + # Analyze each column. + columns_analysis = self._analyze_columns(df) + # Generate suggested schema. + suggested_schema = self._generate_schema(columns_analysis, df) + # Create analysis metadata. + analysis_metadata = self._create_analysis_metadata(file_path, df) + _LOG.info( + "Successfully analyzed %d columns from %s", + len(columns_analysis), + file_path, + ) + return DataAnalysisResult( + file_path=file_path, + total_rows=len(df), + total_columns=len(df.columns), + columns=columns_analysis, + suggested_schema=suggested_schema, + analysis_metadata=analysis_metadata, + ) + except (IOError, ValueError, pd.errors.ParserError) as e: + _LOG.error("Error analyzing file %s: %s", file_path, e) + error_msg = f"Failed to analyze file: {str(e)}" + return self._create_error_result(file_path, error_msg) + + def save_schema( + self, + analysis_result: DataAnalysisResult, + output_path: str, + *, + include_analysis_metadata: bool = True, + ) -> None: + """ + Save the generated schema to a JSON file. + + :param analysis_result: analysis result containing the schema + :param output_path: path where to save the schema.json file + :param include_analysis_metadata: whether to include analysis + metadata + """ + try: + schema_data = analysis_result.suggested_schema.copy() + + if include_analysis_metadata: + schema_data["_analysis_metadata"] = ( + analysis_result.analysis_metadata + ) + + # Ensure output directory exists. + output_dir = pathlib.Path(output_path).parent + hio.create_dir(str(output_dir), incremental=True) + + # Save schema to file. + with open(output_path, "w", encoding="utf-8") as f: + json.dump(schema_data, f, indent=2, default=str) + + _LOG.info("Schema saved to %s", output_path) + + except Exception as e: + _LOG.error("Error saving schema to %s: %s", output_path, e) + raise + + def _load_data(self, file_path: str) -> pd.DataFrame: + """ + Load data from file based on format. + + :param file_path: path to the data file + :return: loaded DataFrame + """ + path = pathlib.Path(file_path) + file_extension = path.suffix.lower() + if file_extension == ".csv": + # Try different encodings and separators. + for encoding in ["utf-8", "latin-1", "cp1252"]: + for sep in [",", ";", "\t"]: + try: + df = pd.read_csv( + file_path, + encoding=encoding, + sep=sep, + nrows=self.sample_size, + ) + # Good indicator of correct separator. + if len(df.columns) > 1: + _LOG.debug( + "Successfully loaded CSV with encoding=%s, sep='%s'", + encoding, + sep, + ) + return df + except (ValueError, TypeError, pd.errors.ParserError): + continue + # Fallback to default pandas behavior. + return pd.read_csv(file_path, nrows=self.sample_size) + if file_extension == ".json": + return pd.read_json(file_path, lines=True, nrows=self.sample_size) + if file_extension == ".parquet": + return pd.read_parquet(file_path) + if file_extension in [".xlsx", ".xls"]: + return pd.read_excel(file_path, nrows=self.sample_size) + raise ValueError(f"Unsupported file format: {file_extension}") + + def _analyze_columns(self, df: pd.DataFrame) -> List[ColumnAnalysis]: + """ + Analyze each column in the DataFrame. + + :param df: DataFrame to analyze + :return: list of column analysis results + """ + columns_analysis = [] + for col_name in df.columns: + col_data = df[col_name] + analysis = self._analyze_single_column(col_name, col_data) + columns_analysis.append(analysis) + return columns_analysis + + def _analyze_single_column( + self, + col_name: str, + col_data: pd.Series, + ) -> ColumnAnalysis: + """ + Analyze a single column. + + :param col_name: name of the column + :param col_data: column data as pandas Series + :return: column analysis result + """ + # Basic statistics. + null_count = col_data.isnull().sum() + unique_count = col_data.nunique() + nullable = null_count > 0 + + # Sample non-null values. + non_null_data = col_data.dropna() + sample_values = ( + non_null_data.head(5).tolist() if len(non_null_data) > 0 else [] + ) + + # Infer data type. + data_type, format_hint = self._infer_data_type(non_null_data) + + # Calculate statistics for numeric columns. + min_value = None + max_value = None + mean_value = None + std_value = None + + if data_type in ["integer", "float"] and len(non_null_data) > 0: + try: + numeric_data = pd.to_numeric(non_null_data, errors="coerce") + min_value = float(numeric_data.min()) + max_value = float(numeric_data.max()) + mean_value = float(numeric_data.mean()) + std_value = float(numeric_data.std()) + except (ValueError, TypeError): + pass + + # Detect patterns for string columns. + pattern = None + if data_type == "string" and len(non_null_data) > 0: + pattern = self._detect_pattern(non_null_data) + + return ColumnAnalysis( + name=col_name, + data_type=data_type, + nullable=nullable, + null_count=int(null_count), + unique_count=int(unique_count), + sample_values=sample_values, + min_value=min_value, + max_value=max_value, + mean_value=mean_value, + std_value=std_value, + pattern=pattern, + format_hint=format_hint, + ) + + def _infer_data_type(self, data: pd.Series) -> tuple[str, Optional[str]]: + """ + Infer the data type of a column. + + :param data: column data (non-null values) + :return: tuple of (data_type, format_hint) + """ + result_type = "string" + format_hint = None + if len(data) == 0: + return result_type, format_hint + # Check for boolean. + if data.dtype == bool or set(data.astype(str).str.lower()) <= { + "true", + "false", + "t", + "f", + "yes", + "no", + "y", + "n", + "1", + "0", + }: + result_type = "boolean" + # Check for integer. + elif data.dtype in ["int64", "int32", "int16", "int8"]: + result_type = "integer" + # Check for float. + elif data.dtype in ["float64", "float32"]: + result_type = "float" + else: + # Try to convert to numeric. + try: + numeric_data = pd.to_numeric(data, errors="coerce") + if not numeric_data.isnull().all(): + # Check if all values are integers. + if ( + numeric_data.fillna(0) + .apply(lambda x: x.is_integer()) + .all() + ): + result_type = "integer" + else: + result_type = "float" + else: + # Check for datetime. + datetime_type, datetime_format = self._check_datetime(data) + if datetime_type: + result_type = datetime_type + format_hint = datetime_format + except (ValueError, TypeError): + # Check for datetime as fallback. + datetime_type, datetime_format = self._check_datetime(data) + if datetime_type: + result_type = datetime_type + format_hint = datetime_format + return result_type, format_hint + + def _check_datetime( + self, data: pd.Series + ) -> tuple[Optional[str], Optional[str]]: + """ + Check if data represents datetime values with improved detection. + + :param data: column data to check + :return: tuple of (datetime_type, format_hint) + """ + # Skip if data looks purely numeric. + try: + pd.to_numeric(data.head(10), errors="raise") + return None, None + except (ValueError, TypeError): + pass + # Sample a subset for testing. + sample_data = data.head(min(100, len(data))) + # Try specific datetime formats first. + for fmt in self.datetime_formats: + try: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + parsed = pd.to_datetime( + sample_data, format=fmt, errors="coerce" + ) + # Check success rate. + success_rate = 1 - parsed.isnull().sum() / len(sample_data) + if success_rate >= self.datetime_threshold: + # Determine if it's date or datetime. + non_null_parsed = parsed.dropna() + if len(non_null_parsed) > 0: + # Check if all times are midnight (date-only). + if all( + t.time() == non_null_parsed.iloc[0].time() + for t in non_null_parsed.head(10) + ): + return "date", fmt + return "datetime", fmt + except (ValueError, TypeError, AttributeError): + continue + # Try pandas automatic parsing as last resort. + try: + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + parsed = pd.to_datetime( + sample_data, errors="coerce", infer_datetime_format=True + ) + success_rate = 1 - parsed.isnull().sum() / len(sample_data) + if success_rate >= self.datetime_threshold: + return "datetime", "auto" + except (ValueError, TypeError, AttributeError): + pass + return None, None + + def _detect_pattern(self, data: pd.Series) -> Optional[str]: + """ + Detect common patterns in string data. + + :param data: string column data + :return: detected pattern or None + """ + str_data = data.astype(str) + # Check for common patterns. + patterns = { + "email": r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", + "phone": r"^[\+]?[1-9]?[0-9]{7,15}$", + "url": r"^https?://[^\s/$.?#].[^\s]*$", + "uuid": r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$", + "ip_address": r"^(?:[0-9]{1,3}\.){3}[0-9]{1,3}$", + } + for pattern_name, pattern_regex in patterns.items(): + if str_data.str.match(pattern_regex, case=False).mean() > 0.8: + return pattern_name + return None + + def _generate_schema( + self, + columns_analysis: List[ColumnAnalysis], + df: pd.DataFrame, + ) -> Dict[str, Any]: + """ + Generate JSON Schema from column analysis. + + :param columns_analysis: list of analyzed columns + :param df: original DataFrame + :return: JSON Schema dictionary + """ + properties = {} + required_fields = [] + + for col in columns_analysis: + col_schema = { + "type": self._map_to_json_schema_type(col.data_type), + "description": f"Column '{col.name}' with {col.unique_count} unique values", + } + + # Add format hint if available. + if col.format_hint and col.format_hint != "auto": + col_schema["format"] = col.format_hint + + # Add constraints for numeric fields. + if col.data_type in ["integer", "float"]: + if col.min_value is not None: + col_schema["minimum"] = col.min_value + if col.max_value is not None: + col_schema["maximum"] = col.max_value + + # Add enum for low-cardinality categorical fields. + if ( + col.data_type == "string" + and col.unique_count <= self.max_unique_values + and col.unique_count > 1 + ): + unique_values = df[col.name].dropna().unique().tolist() + col_schema["enum"] = unique_values[:50] # Limit enum size. + + # Add pattern if detected. + if col.pattern: + col_schema["pattern"] = col.pattern + + properties[col.name] = col_schema + + # Mark as required if no null values. + if not col.nullable: + required_fields.append(col.name) + + schema = { + "type": "object", + "title": "Generated Schema", + "description": f"Auto-generated schema for data with {len(columns_analysis)} columns", + "properties": properties, + } + + if required_fields: + schema["required"] = required_fields + + return schema + + def _map_to_json_schema_type(self, data_type: str) -> str: + """ + Map internal data types to JSON Schema types. + + :param data_type: internal data type + :return: JSON Schema type + """ + type_mapping = { + "integer": "integer", + "float": "number", + "boolean": "boolean", + "string": "string", + "datetime": "string", + "date": "string", + "object": "object", + "array": "array", + } + return type_mapping.get(data_type, "string") + + def _create_analysis_metadata( + self, + file_path: str, + df: pd.DataFrame, + ) -> Dict[str, Any]: + """ + Create metadata about the analysis process. + + :param file_path: path to analyzed file + :param df: analyzed DataFrame + :return: analysis metadata + """ + return { + "analysis_timestamp": pd.Timestamp.now().isoformat(), + "analyzer_version": "1.0.0", + "source_file": file_path, + "sample_size": min(len(df), self.sample_size), + "total_rows_analyzed": len(df), + "total_columns_analyzed": len(df.columns), + "file_size_bytes": pathlib.Path(file_path).stat().st_size, + "analysis_settings": { + "max_unique_values": self.max_unique_values, + "datetime_formats": self.datetime_formats, + }, + } + + def _is_supported_format(self, file_extension: str) -> bool: + """ + Check if file format is supported. + + :param file_extension: file extension to check + :return: whether the format is supported + """ + return file_extension.lower() in self.supported_formats + + def _create_error_result( + self, + file_path: str, + error_message: str, + ) -> DataAnalysisResult: + """ + Create a DataAnalysisResult with error information. + + :param file_path: path to the file that caused the error + :param error_message: error message to include + :return: DataAnalysisResult object with error details + """ + return DataAnalysisResult( + file_path=file_path, + total_rows=0, + total_columns=0, + columns=[], + suggested_schema={}, + analysis_metadata={}, + error_message=error_message, + ) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Analyze raw data files and generate schema information." + ) + parser.add_argument("file", type=str, help="Path to the data file to analyze") + args = parser.parse_args() + analyzer = RawDataAnalyzer() + result = analyzer.analyze_file(args.file) + if result.error_message: + _LOG.error("Error: %s", result.error_message) + else: + _LOG.info("File: %s", result.file_path) + _LOG.info("Total rows: %d", result.total_rows) + _LOG.info("Total columns: %d", result.total_columns) + _LOG.info("Columns:") + for col in result.columns: + _LOG.info( + " - %s: %s, nullable=%s", col.name, col.data_type, col.nullable + ) + _LOG.info("Suggested schema: %s", result.suggested_schema) + _LOG.info("Metadata: %s", result.analysis_metadata) + # Save suggested schema to JSON file. + schema_path = args.file + ".schema.json" + try: + with open(schema_path, "w", encoding="utf-8") as f: + json.dump(result.suggested_schema, f, indent=2) + _LOG.info("Saved schema to %s", schema_path) + except (IOError, TypeError, ValueError) as e: + _LOG.error("Failed to save schema to %s: %s", schema_path, e) + + +if __name__ == "__main__": + main() diff --git a/autoeda/schema_parser.py b/autoeda/schema_parser.py new file mode 100755 index 000000000..109885926 --- /dev/null +++ b/autoeda/schema_parser.py @@ -0,0 +1,695 @@ +#!/usr/bin/env python + +""" +Schema Parser Module for AutoEDA Agent. + +A standalone module to parse JSON/YAML schema files and extract column information +for automated exploratory data analysis. + +Import as: + +import schema_parser as schpar +""" + +import argparse +import dataclasses +import json +import logging +import pathlib +from typing import Any, Dict, List, Optional + +import yaml # type: ignore[import-untyped] + +import helpers.hdbg as hdbg +import helpers.hio as hio + +# Configure logging. +logging.basicConfig(level=logging.INFO) +_LOG = logging.getLogger(__name__) + + +# ############################################################################# +# ColumnInfo +# ############################################################################# + + +@dataclasses.dataclass +class ColumnInfo: + """ + Represent information about a single column. + """ + + name: str + data_type: str + required: bool = False + description: str = "" + nullable: bool = True + default: Any = None + constraints: Optional[Dict[str, Any]] = None + metadata: Optional[Dict[str, Any]] = None + + def __post_init__(self) -> None: + """ + Initialize default values for optional fields. + """ + if self.constraints is None: + self.constraints = {} + if self.metadata is None: + self.metadata = {} + + +# ############################################################################# +# ParsedSchema +# ############################################################################# + + +@dataclasses.dataclass +class ParsedSchema: + """ + Represent the complete parsed schema result. + """ + + columns: List[ColumnInfo] + raw_schema: Dict[str, Any] + schema_type: str + total_columns: int + required_columns: int + optional_columns: int + error_message: Optional[str] = None + + def __post_init__(self) -> None: + """ + Calculate derived fields from columns. + """ + self.total_columns = len(self.columns) + self.required_columns = sum(1 for col in self.columns if col.required) + self.optional_columns = self.total_columns - self.required_columns + + +# ############################################################################# +# SchemaParser +# ############################################################################# + + +class SchemaParser: + """ + Parse various schema formats and extract column information. + + Supported formats: + - JSON Schema + - Custom column definitions + - YAML schema files + - Generic key-value structures + """ + + def __init__( + self, + *, + include_metadata: bool = True, + output_format: str = "detailed", + ) -> None: + """ + Initialize the schema parser. + + :param include_metadata: whether to include metadata in parsed + results + :param output_format: output format, either "detailed" or + "simple" + """ + hdbg.dassert_in(output_format, ["detailed", "simple"]) + self.include_metadata = include_metadata + self.output_format = output_format + self.supported_formats = ["json", "yaml", "yml"] + + def parse_file(self, file_path: str) -> ParsedSchema: + """ + Parse schema from a file. + + :param file_path: path to the schema file + :return: parsed schema object with results + """ + try: + path = pathlib.Path(file_path) + if not path.exists(): + error_msg = f"Schema file not found: {file_path}" + return self._create_error_result(error_msg) + if not self._is_supported_format(path.suffix): + error_msg = f"Unsupported file format: {path.suffix}" + return self._create_error_result(error_msg) + # Read file content. + content = hio.from_file(str(path)) + return self.parse_content(content, source=str(path)) + except (FileNotFoundError, IOError, OSError, ValueError, TypeError) as e: + _LOG.error("Error parsing file %s: %s", file_path, e) + error_msg = f"Failed to parse file: {str(e)}" + return self._create_error_result(error_msg) + + def parse_content( + self, + content: str, + *, + source: str = "content", + ) -> ParsedSchema: + """ + Parse schema from string content. + + :param content: schema content as string + :param source: source identifier for logging + :return: parsed schema object with results + """ + try: + schema_data = self._parse_schema_content(content) + schema_type = self._detect_schema_type(schema_data) + columns = self._extract_columns(schema_data, schema_type) + + _LOG.info( + "Successfully parsed %d columns from %s", + len(columns), + source, + ) + + return ParsedSchema( + columns=columns, + raw_schema=schema_data, + schema_type=schema_type, + total_columns=len(columns), + required_columns=0, # Will be calculated in __post_init__. + optional_columns=0, # Will be calculated in __post_init__. + ) + + except ( + json.JSONDecodeError, + yaml.YAMLError, + ValueError, + KeyError, + TypeError, + ) as e: + _LOG.error("Error parsing content from %s: %s", source, e) + error_msg = f"Failed to parse content: {str(e)}" + return self._create_error_result(error_msg) + + def to_dict(self, parsed_schema: ParsedSchema) -> Dict[str, Any]: + """ + Convert ParsedSchema to dictionary format. + + :param parsed_schema: parsed schema object to convert + :return: dictionary representation of the schema + """ + return { + "columns": [ + { + "name": col.name, + "data_type": col.data_type, + "required": col.required, + "description": col.description, + "nullable": col.nullable, + "default": col.default, + "constraints": col.constraints, + "metadata": col.metadata if self.include_metadata else {}, + } + for col in parsed_schema.columns + ], + "schema_info": { + "schema_type": parsed_schema.schema_type, + "total_columns": parsed_schema.total_columns, + "required_columns": parsed_schema.required_columns, + "optional_columns": parsed_schema.optional_columns, + }, + "raw_schema": parsed_schema.raw_schema, + "error_message": parsed_schema.error_message, + } + + def to_dataframe_schema(self, parsed_schema: ParsedSchema) -> Dict[str, str]: + """ + Convert to simple column_name: data_type mapping for pandas. + + :param parsed_schema: parsed schema object to convert + :return: simple mapping of column names to data types + """ + return {col.name: col.data_type for col in parsed_schema.columns} + + def _parse_schema_content(self, content: str) -> Dict[str, Any]: + """ + Parse content as JSON or YAML. + + :param content: raw content string to parse + :return: parsed data as dictionary + """ + content = content.strip() + # Try JSON first. + try: + parsed_data = json.loads(content) + if not isinstance(parsed_data, dict): + raise ValueError("Schema must be a JSON object/dictionary") + return parsed_data + except json.JSONDecodeError: + pass + # Try YAML. + try: + parsed_data = yaml.safe_load(content) + if not isinstance(parsed_data, dict): + raise ValueError("Schema must be a YAML object/dictionary") + return parsed_data + except yaml.YAMLError as e: + raise ValueError(f"Invalid JSON/YAML format: {e}") from e + + def _detect_schema_type(self, schema_data: Dict[str, Any]) -> str: + """ + Detect the type of schema format. + + :param schema_data: parsed schema data + :return: detected schema type + """ + if "properties" in schema_data and "type" in schema_data: + return "json_schema" + if "columns" in schema_data: + return "custom_columns" + if "fields" in schema_data: + return "fields_format" + if any( + isinstance(v, dict) and ("type" in v or "dataType" in v) + for v in schema_data.values() + ): + return "generic_properties" + return "unknown" + + def _extract_columns( + self, + schema_data: Dict[str, Any], + schema_type: str, + ) -> List[ColumnInfo]: + """ + Extract column information based on schema type. + + :param schema_data: parsed schema data + :param schema_type: detected schema type + :return: list of column information objects + """ + extractors = { + "json_schema": self._extract_from_json_schema, + "custom_columns": self._extract_from_custom_format, + "fields_format": self._extract_from_fields_format, + "generic_properties": self._extract_from_generic_format, + "unknown": self._extract_from_unknown_format, + } + + extractor = extractors.get(schema_type, self._extract_from_unknown_format) + return extractor(schema_data) + + def _extract_from_json_schema( + self, + schema_data: Dict[str, Any], + ) -> List[ColumnInfo]: + """ + Extract columns from JSON Schema format. + + :param schema_data: JSON schema data + :return: list of column information objects + """ + columns = [] + properties = schema_data.get("properties", {}) + required_fields = set(schema_data.get("required", [])) + + for column_name, column_info in properties.items(): + constraints = {} + metadata = {} + + # Extract constraints. + for key in [ + "minimum", + "maximum", + "minLength", + "maxLength", + "pattern", + "enum", + ]: + if key in column_info: + constraints[key] = column_info[key] + + # Extract metadata if enabled. + if self.include_metadata: + excluded_keys = [ + "type", + "description", + "minimum", + "maximum", + "minLength", + "maxLength", + "pattern", + "enum", + ] + metadata = { + k: v for k, v in column_info.items() if k not in excluded_keys + } + + column = ColumnInfo( + name=column_name, + data_type=self._map_json_schema_type( + column_info.get("type", "string") + ), + required=column_name in required_fields, + description=column_info.get("description", ""), + nullable=not (column_name in required_fields), + default=column_info.get("default"), + constraints=constraints, + metadata=metadata, + ) + + columns.append(column) + + return columns + + def _extract_from_custom_format( + self, + schema_data: Dict[str, Any], + ) -> List[ColumnInfo]: + """ + Extract columns from custom columns format. + + :param schema_data: custom format schema data + :return: list of column information objects + """ + columns = [] + columns_data = schema_data.get("columns", []) + + for column_info in columns_data: + if not isinstance(column_info, dict): + continue + + constraints = column_info.get("constraints", {}) + metadata = {} + + if self.include_metadata: + excluded_keys = [ + "name", + "type", + "required", + "description", + "nullable", + "default", + "constraints", + ] + metadata = { + k: v for k, v in column_info.items() if k not in excluded_keys + } + + column = ColumnInfo( + name=column_info.get("name", ""), + data_type=self._normalize_data_type( + column_info.get("type", "string") + ), + required=column_info.get("required", False), + description=column_info.get("description", ""), + nullable=column_info.get("nullable", True), + default=column_info.get("default"), + constraints=constraints, + metadata=metadata, + ) + + columns.append(column) + + return columns + + def _extract_from_fields_format( + self, + schema_data: Dict[str, Any], + ) -> List[ColumnInfo]: + """ + Extract columns from fields format. + + :param schema_data: fields format schema data + :return: list of column information objects + """ + columns = [] + fields_data = schema_data.get("fields", []) + + for field_info in fields_data: + if not isinstance(field_info, dict): + continue + + constraints = field_info.get("constraints", {}) + metadata = {} + + if self.include_metadata: + excluded_keys = [ + "name", + "type", + "optional", + "description", + "constraints", + ] + metadata = { + k: v for k, v in field_info.items() if k not in excluded_keys + } + + column = ColumnInfo( + name=field_info.get("name", ""), + data_type=self._normalize_data_type( + field_info.get("type", "string") + ), + required=not field_info.get("optional", True), + description=field_info.get("description", ""), + nullable=field_info.get("optional", True), + constraints=constraints, + metadata=metadata, + ) + + columns.append(column) + + return columns + + def _extract_from_generic_format( + self, + schema_data: Dict[str, Any], + ) -> List[ColumnInfo]: + """ + Extract columns from generic format. + + :param schema_data: generic format schema data + :return: list of column information objects + """ + columns = [] + + for key, value in schema_data.items(): + if not isinstance(value, dict): + continue + + if "type" not in value and "dataType" not in value: + continue + + metadata = {} + if self.include_metadata: + excluded_keys = [ + "type", + "dataType", + "required", + "description", + "nullable", + ] + metadata = { + k: v for k, v in value.items() if k not in excluded_keys + } + + column = ColumnInfo( + name=key, + data_type=self._normalize_data_type( + value.get("type", value.get("dataType", "string")) + ), + required=value.get("required", False), + description=value.get("description", ""), + nullable=value.get("nullable", True), + metadata=metadata, + ) + + columns.append(column) + + return columns + + def _extract_from_unknown_format( + self, + schema_data: Dict[str, Any], + ) -> List[ColumnInfo]: + """ + Extract columns from unknown format by making best guesses. + + :param schema_data: unknown format schema data + :return: list of column information objects + """ + columns = [] + + # Try to extract any key-value pairs as potential columns. + for key, value in schema_data.items(): + if isinstance(value, str): + # Assume string values are data types. + column = ColumnInfo( + name=key, + data_type=self._normalize_data_type(value), + description="Inferred from key-value pair", + ) + columns.append(column) + + return columns + + def _map_json_schema_type(self, json_type: str) -> str: + """ + Map JSON Schema types to standard data types. + + :param json_type: JSON schema type string + :return: normalized data type + """ + type_mapping = { + "string": "string", + "integer": "integer", + "number": "float", + "boolean": "boolean", + "array": "array", + "object": "object", + "null": "null", + } + return type_mapping.get(json_type.lower(), "string") + + def _normalize_data_type(self, data_type: str) -> str: + """ + Normalize various data type representations. + + :param data_type: raw data type string + :return: normalized data type + """ + if not isinstance(data_type, str): + return "string" + data_type = data_type.lower().strip() + type_mapping = { + # String types. + "str": "string", + "text": "string", + "varchar": "string", + "char": "string", + "nvarchar": "string", + # Integer types. + "int": "integer", + "int32": "integer", + "int64": "integer", + "bigint": "integer", + "smallint": "integer", + # Float types. + "float": "float", + "float32": "float", + "float64": "float", + "double": "float", + "decimal": "float", + "numeric": "float", + "real": "float", + # Boolean types. + "bool": "boolean", + "bit": "boolean", + # Date/Time types. + "datetime": "datetime", + "datetime64": "datetime", + "timestamp": "datetime", + "date": "date", + "time": "time", + # Other types. + "json": "object", + "jsonb": "object", + "uuid": "string", + "blob": "binary", + "binary": "binary", + } + return type_mapping.get(data_type, data_type) + + def _is_supported_format(self, file_extension: str) -> bool: + """ + Check if file format is supported. + + :param file_extension: file extension to check + :return: whether the format is supported + """ + return file_extension.lower().lstrip(".") in self.supported_formats + + def _create_error_result(self, error_message: str) -> ParsedSchema: + """ + Create a ParsedSchema with error information. + + :param error_message: error message to include + :return: ParsedSchema object with error details + """ + return ParsedSchema( + columns=[], + raw_schema={}, + schema_type="error", + total_columns=0, + required_columns=0, + optional_columns=0, + error_message=error_message, + ) + + +# ############################################################################# +# Convenience functions. +# ############################################################################# + + +def parse_schema_file( + file_path: str, + *, + include_metadata: bool = True, +) -> ParsedSchema: + """ + Parse a schema file using default settings. + + :param file_path: path to schema file + :param include_metadata: whether to include metadata + :return: parsed schema object + """ + parser = SchemaParser(include_metadata=include_metadata) + return parser.parse_file(file_path) + + +def parse_schema_content( + content: str, + *, + include_metadata: bool = True, +) -> ParsedSchema: + """ + Parse schema content using default settings. + + :param content: schema content as string + :param include_metadata: whether to include metadata + :return: parsed schema object + """ + parser = SchemaParser(include_metadata=include_metadata) + return parser.parse_content(content) + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Parse a schema file and log the results." + ) + parser.add_argument( + "schema_file", type=str, help="Path to the schema file (JSON)" + ) + args = parser.parse_args() + # Read schema content from file + try: + with open(args.schema_file, "r", encoding="utf-8") as f: + schema_content = f.read() + except (FileNotFoundError, IOError, OSError) as e: + _LOG.error("Failed to read schema file %s: %s", args.schema_file, e) + return + result = parse_schema_content(schema_content) + if result.error_message: + _LOG.error("Error: %s", result.error_message) + else: + _LOG.info("Successfully parsed %d columns", result.total_columns) + _LOG.info("Required columns: %d", result.required_columns) + _LOG.info("Optional columns: %d", result.optional_columns) + _LOG.info("Schema type: %s", result.schema_type) + _LOG.info("Columns:") + for col in result.columns: + _LOG.info( + " - %s: %s (required: %s)", col.name, col.data_type, col.required + ) + + +if __name__ == "__main__": + main() From 0da8a5f4e9bea42062d3d5ae8ed380b0e6c26d2b Mon Sep 17 00:00:00 2001 From: maddy Date: Mon, 11 Aug 2025 15:56:04 -0400 Subject: [PATCH 2/5] Ran linter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- autoeda/agent.py | 218 ++++++++++++++++++++++++++++++----------------- 1 file changed, 141 insertions(+), 77 deletions(-) diff --git a/autoeda/agent.py b/autoeda/agent.py index e8455cc24..0fdd337c0 100644 --- a/autoeda/agent.py +++ b/autoeda/agent.py @@ -1,11 +1,13 @@ """ -LangChain single-node agent template with OpenAI API integration and .env support. +LangChain single-node agent template with OpenAI API integration and .env +support. -This refactors the original LangGraph single-node graph template to use LangChain's Runnable workflow and standard components. +This refactors the original LangGraph single-node graph template to use +LangChain's Runnable workflow and standard components. Import as: -from autoeda_agent_langchain import AutoEDAAgent +import autoeda.agent as auagent """ import dataclasses @@ -13,41 +15,45 @@ from typing import Any, Dict, Optional import dotenv -import langchain_core.runnables as lcru -from langchain_core.runnables import RunnableLambda, RunnableSequence -from langchain_core.tools import tool -import openai +import langchain_core.runnables as lcrun # type: ignore +import openai # type: ignore -import autoeda.raw_data_analyzer as lsardaan -import autoeda.schema_parser as lsagscpa +import autoeda.raw_data_analyzer as aradaana +import autoeda.schema_parser as aschpars # Load environment variables from .env. dotenv.load_dotenv() OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") client = openai.OpenAI(api_key=OPENAI_API_KEY) + # ############################################################################# # Configuration # ############################################################################# + @dataclasses.dataclass class Configuration: """ Configurable parameters for the agent. """ + openai_model: str = "gpt-3.5-turbo" analysis_depth: str = "standard" # "basic", "standard", "deep" include_schema_metadata: bool = True + # ############################################################################# # State # ############################################################################# + @dataclasses.dataclass class State: """ Input state for the agent. """ + file_path: str = "" raw_data_result: Optional[Dict[str, Any]] = None schema_file_path: str = "" @@ -55,26 +61,57 @@ class State: schema_result: Optional[Dict[str, Any]] = None analysis_output: str = "" + # ############################################################################# # Steps as LangChain Runnables # ############################################################################# + def call_model(state: State, config: Configuration) -> State: """ Process input and returns output using OpenAI API. """ model = config.openai_model - # Check for errors and build appropriate context error_context = "" if state.raw_data_result and "error" in state.raw_data_result: error_context = f"️Data analysis failed: {state.raw_data_result['error']}" elif state.schema_result and "error" in state.schema_result: error_context = f"Schema parsing failed: {state.schema_result['error']}" - if error_context: prompt = f"""You are an AutoEDA assistant. An error occurred during data analysis: + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + {error_context} Please provide: @@ -85,40 +122,62 @@ def call_model(state: State, config: Configuration) -> State: """ else: # Build consolidated dataset summary - total_rows = state.raw_data_result.get('total_rows', 0) if state.raw_data_result else 0 - total_columns = state.raw_data_result.get('total_columns', 0) if state.raw_data_result else 0 - file_path = state.raw_data_result.get('file_path', 'unknown') if state.raw_data_result else 'unknown' - + total_rows = ( + state.raw_data_result.get("total_rows", 0) + if state.raw_data_result + else 0 + ) + total_columns = ( + state.raw_data_result.get("total_columns", 0) + if state.raw_data_result + else 0 + ) + file_path = ( + state.raw_data_result.get("file_path", "unknown") + if state.raw_data_result + else "unknown" + ) + # Determine dataset characteristics for targeted analysis - dataset_size = "small" if total_rows < 1000 else "medium" if total_rows < 100000 else "large" - dataset_width = "narrow" if total_columns < 10 else "wide" if total_columns < 50 else "very wide" - + dataset_size = ( + "small" + if total_rows < 1000 + else "medium" if total_rows < 100000 else "large" + ) + dataset_width = ( + "narrow" + if total_columns < 10 + else "wide" if total_columns < 50 else "very wide" + ) + # Build schema insights schema_insights = "" if state.schema_result and "error" not in state.schema_result: - required_cols = state.schema_result.get('required_columns', 0) - optional_cols = state.schema_result.get('optional_columns', 0) - schema_type = state.schema_result.get('schema_type', 'unknown') - + required_cols = state.schema_result.get("required_columns", 0) + optional_cols = state.schema_result.get("optional_columns", 0) + schema_type = state.schema_result.get("schema_type", "unknown") + schema_insights = f""" Schema Structure ({schema_type}): • {total_columns} columns total ({required_cols} required, {optional_cols} optional) • Column breakdown:""" - + # Group columns by data type for better insights - col_types = {} + col_types: Dict[str, int] = {} nullable_count = 0 for col in state.schema_result.get("columns", []): - data_type = col['data_type'] + data_type = col["data_type"] col_types[data_type] = col_types.get(data_type, 0) + 1 - if col.get('nullable', False): + if col.get("nullable", False): nullable_count += 1 - + for dtype, count in col_types.items(): schema_insights += f"\n - {dtype}: {count} columns" - + if nullable_count > 0: - schema_insights += f"\n• {nullable_count} columns allow null values" + schema_insights += ( + f"\n• {nullable_count} columns allow null values" + ) # Build targeted recommendations based on dataset characteristics analysis_focus = "" @@ -128,31 +187,31 @@ def call_model(state: State, config: Configuration) -> State: analysis_focus = "sampling strategies and performance optimization" else: analysis_focus = "statistical profiling and distribution analysis" - - if dataset_width == "wide" or dataset_width == "very wide": - analysis_focus += ", feature selection and dimensionality reduction" - - prompt = f"""You are an expert AutoEDA assistant. Analyze this {dataset_size}, {dataset_width} dataset: - -Dataset Overview: -• File: {file_path} -• Size: {total_rows:,} rows × {total_columns} columns -• Focus areas: {analysis_focus} -{schema_insights} - -Based on these characteristics, provide targeted recommendations: - -1. **Data Quality Assessment**: Identify specific validation rules, missing value patterns, and potential anomalies to investigate -2. **Statistical Analysis Plan**: Recommend appropriate statistical methods and exploratory techniques for this dataset size and structure - -3. **Visualization Strategy**: Suggest specific chart types and visualization approaches that work best for this data profile - -4. **Risk Areas**: Highlight potential data quality issues, biases, or limitations to investigate based on the schema and size - -5. **Next Steps**: Provide a prioritized action plan for the exploratory data analysis process + if dataset_width in {"wide", "very wide"}: + analysis_focus += ", feature selection and dimensionality reduction" -Keep recommendations practical and specific to the dataset characteristics.""" + prompt = ( + f"You are an expert AutoEDA assistant. " + f"Analyze this {dataset_size}, {dataset_width} dataset:\n\n" + f"Dataset Overview:\n" + f"• File: {file_path}\n" + f"• Size: {total_rows:,} rows × {total_columns} columns\n" + f"• Focus areas: {analysis_focus}\n" + f"{schema_insights}\n\n" + f"Based on these characteristics, provide targeted recommendations:\n\n" + f"1. **Data Quality Assessment**: Identify specific validation rules, " + f"missing value patterns, and potential anomalies to investigate\n\n" + f"2. **Statistical Analysis Plan**: Recommend appropriate statistical " + f"methods and exploratory techniques for this dataset size and structure\n\n" + f"3. **Visualization Strategy**: Suggest specific chart types and " + f"visualization approaches that work best for this data profile\n\n" + f"4. **Risk Areas**: Highlight potential data quality issues, biases, " + f"or limitations to investigate based on the schema and size\n\n" + f"5. **Next Steps**: Provide a prioritized action plan for the " + f"exploratory data analysis process\n\n" + f"Keep recommendations practical and specific to the dataset characteristics." + ) response = client.chat.completions.create( model=model, @@ -166,21 +225,19 @@ def call_model(state: State, config: Configuration) -> State: state.analysis_output = output_text return state -def analyze_raw_data(state: State, config: Configuration) -> State: + +def analyze_raw_data(state: State, _config: Configuration) -> State: """ Analyze raw data file and generate schema. """ if not state.file_path: state.raw_data_result = {"error": "No file path provided"} return state - - analyzer = lsardaan.RawDataAnalyzer() + analyzer = aradaana.RawDataAnalyzer() result = analyzer.analyze_file(state.file_path) - if result.error_message: state.raw_data_result = {"error": result.error_message} return state - # Convert result to dict for state raw_data_result = { "file_path": result.file_path, @@ -190,38 +247,31 @@ def analyze_raw_data(state: State, config: Configuration) -> State: "analysis_metadata": result.analysis_metadata, } state.raw_data_result = raw_data_result - # Generate schema file schema_file_path = state.file_path + ".schema.json" try: analyzer.save_schema(result, schema_file_path) state.schema_file_path = schema_file_path - # Load schema content from the generated file - import json with open(schema_file_path, "r", encoding="utf-8") as f: schema_content = f.read() state.schema_content = schema_content - - except Exception as e: + except (OSError, IOError) as e: state.raw_data_result["schema_file_error"] = str(e) - return state -def parse_schema(state: State, config: Configuration) -> State: + +def parse_schema(state: State, _config: Configuration) -> State: """ Parse schema content and extract column information. """ if not state.schema_content: state.schema_result = {"error": "No schema content provided"} return state - - result = lsagscpa.parse_schema_content(state.schema_content) - + result = aschpars.parse_schema_content(state.schema_content) if result.error_message: state.schema_result = {"error": result.error_message} return state - # Convert result to dict for state schema_result = { "total_columns": result.total_columns, @@ -242,10 +292,12 @@ def parse_schema(state: State, config: Configuration) -> State: state.schema_result = schema_result return state + # ############################################################################# -# LangChain Agent as RunnableSequence +# AutoEDAAgent # ############################################################################# + class AutoEDAAgent: """ AutoEDA Agent implemented using LangChain Runnables. @@ -253,16 +305,27 @@ class AutoEDAAgent: def __init__(self, config: Optional[Configuration] = None): self.config = config or Configuration() - self.workflow = ( - RunnableSequence() - .add(RunnableLambda(lambda state: analyze_raw_data(state, self.config))) - .add(RunnableLambda(lambda state: parse_schema(state, self.config))) - .add(RunnableLambda(lambda state: call_model(state, self.config))) + lcrun.RunnableSequence() + .add( + lcrun.RunnableLambda( + lambda state: analyze_raw_data(state, self.config) + ) + ) + .add( + lcrun.RunnableLambda( + lambda state: parse_schema(state, self.config) + ) + ) + .add( + lcrun.RunnableLambda(lambda state: call_model(state, self.config)) + ) ) def run(self, state: State) -> State: - return self.workflow.invoke(state) + result = self.workflow.invoke(state) + return result # type: ignore + # Example usage: # agent = AutoEDAAgent(Configuration( @@ -270,6 +333,7 @@ def run(self, state: State) -> State: # analysis_depth="deep", # include_schema_metadata=True # )) -# initial_state = State(file_path="mydata.csv") # schema_content will be auto-generated from the file +# initial_state = State(file_path="mydata.csv") +# # schema_content will be auto-generated from the file # result_state = agent.run(initial_state) -# print(result_state.analysis_output) \ No newline at end of file +# print(result_state.analysis_output) From b4989fec2ec60750b1c2baf196a5f0b4b92bde50 Mon Sep 17 00:00:00 2001 From: maddy Date: Mon, 11 Aug 2025 16:03:53 -0400 Subject: [PATCH 3/5] Fixed empty lines MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- autoeda/agent.py | 57 ++++++++++-------------------------------------- 1 file changed, 11 insertions(+), 46 deletions(-) diff --git a/autoeda/agent.py b/autoeda/agent.py index 0fdd337c0..f1f54f369 100644 --- a/autoeda/agent.py +++ b/autoeda/agent.py @@ -5,6 +5,17 @@ This refactors the original LangGraph single-node graph template to use LangChain's Runnable workflow and standard components. +Example usage: +agent = AutoEDAAgent(Configuration( + openai_model="gpt-4", + analysis_depth="deep", + include_schema_metadata=True +)) +initial_state = State(file_path="mydata.csv") +# schema_content will be auto-generated from the file +result_state = agent.run(initial_state) +print(result_state.analysis_output) + Import as: import autoeda.agent as auagent @@ -80,40 +91,7 @@ def call_model(state: State, config: Configuration) -> State: error_context = f"Schema parsing failed: {state.schema_result['error']}" if error_context: prompt = f"""You are an AutoEDA assistant. An error occurred during data analysis: - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - {error_context} - Please provide: 1. Potential causes of this error 2. Troubleshooting steps to resolve the issue @@ -156,7 +134,6 @@ def call_model(state: State, config: Configuration) -> State: required_cols = state.schema_result.get("required_columns", 0) optional_cols = state.schema_result.get("optional_columns", 0) schema_type = state.schema_result.get("schema_type", "unknown") - schema_insights = f""" Schema Structure ({schema_type}): • {total_columns} columns total ({required_cols} required, {optional_cols} optional) @@ -325,15 +302,3 @@ def __init__(self, config: Optional[Configuration] = None): def run(self, state: State) -> State: result = self.workflow.invoke(state) return result # type: ignore - - -# Example usage: -# agent = AutoEDAAgent(Configuration( -# openai_model="gpt-4", -# analysis_depth="deep", -# include_schema_metadata=True -# )) -# initial_state = State(file_path="mydata.csv") -# # schema_content will be auto-generated from the file -# result_state = agent.run(initial_state) -# print(result_state.analysis_output) From 6adb0f87ba2b46c13323a17dc28fbdfaefae2d9c Mon Sep 17 00:00:00 2001 From: maddy Date: Mon, 11 Aug 2025 16:07:21 -0400 Subject: [PATCH 4/5] Fix comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- autoeda/agent.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/autoeda/agent.py b/autoeda/agent.py index f1f54f369..72188d562 100644 --- a/autoeda/agent.py +++ b/autoeda/agent.py @@ -99,7 +99,7 @@ def call_model(state: State, config: Configuration) -> State: 4. General recommendations for handling similar data files """ else: - # Build consolidated dataset summary + # Build consolidated dataset summary. total_rows = ( state.raw_data_result.get("total_rows", 0) if state.raw_data_result @@ -116,7 +116,7 @@ def call_model(state: State, config: Configuration) -> State: else "unknown" ) - # Determine dataset characteristics for targeted analysis + # Determine dataset characteristics for targeted analysis. dataset_size = ( "small" if total_rows < 1000 @@ -128,7 +128,7 @@ def call_model(state: State, config: Configuration) -> State: else "wide" if total_columns < 50 else "very wide" ) - # Build schema insights + # Build schema insights. schema_insights = "" if state.schema_result and "error" not in state.schema_result: required_cols = state.schema_result.get("required_columns", 0) @@ -139,7 +139,7 @@ def call_model(state: State, config: Configuration) -> State: • {total_columns} columns total ({required_cols} required, {optional_cols} optional) • Column breakdown:""" - # Group columns by data type for better insights + # Group columns by data type for better insights. col_types: Dict[str, int] = {} nullable_count = 0 for col in state.schema_result.get("columns", []): @@ -156,7 +156,7 @@ def call_model(state: State, config: Configuration) -> State: f"\n• {nullable_count} columns allow null values" ) - # Build targeted recommendations based on dataset characteristics + # Build targeted recommendations based on dataset characteristics. analysis_focus = "" if dataset_size == "small": analysis_focus = "completeness analysis and pattern detection" @@ -215,7 +215,7 @@ def analyze_raw_data(state: State, _config: Configuration) -> State: if result.error_message: state.raw_data_result = {"error": result.error_message} return state - # Convert result to dict for state + # Convert result to dict for state. raw_data_result = { "file_path": result.file_path, "total_rows": result.total_rows, @@ -249,7 +249,7 @@ def parse_schema(state: State, _config: Configuration) -> State: if result.error_message: state.schema_result = {"error": result.error_message} return state - # Convert result to dict for state + # Convert result to dict for state. schema_result = { "total_columns": result.total_columns, "required_columns": result.required_columns, From d114a689ba7bf1283be71143667ad0a6b0086bf9 Mon Sep 17 00:00:00 2001 From: maddy Date: Mon, 11 Aug 2025 20:27:48 -0400 Subject: [PATCH 5/5] Update MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pre-commit checks: All checks passed ✅ --- autoeda/agent.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/autoeda/agent.py b/autoeda/agent.py index 72188d562..a97e7f1d1 100644 --- a/autoeda/agent.py +++ b/autoeda/agent.py @@ -2,9 +2,6 @@ LangChain single-node agent template with OpenAI API integration and .env support. -This refactors the original LangGraph single-node graph template to use -LangChain's Runnable workflow and standard components. - Example usage: agent = AutoEDAAgent(Configuration( openai_model="gpt-4",