diff --git a/delphi/delphi_orchestrator.py b/delphi/delphi_orchestrator.py index ab8b3dd0f3..2e8d0d1178 100755 --- a/delphi/delphi_orchestrator.py +++ b/delphi/delphi_orchestrator.py @@ -294,225 +294,34 @@ def initialize_conversation_manager(self): return stage.complete(False, error=str(e)) def load_conversation(self): - """Load the conversation data from the database""" - stage = self.add_stage(f"Load Conversation {self.zid}") + """Load conversation data from DynamoDB""" + stage = self.add_stage("Load Conversation") try: - # Ensure we're using PostgreSQL for conversation data - from polismath.database.postgres import PostgresClient + # First try to load from DynamoDB + logger.info("Attempting to load conversation data from DynamoDB...") + self.conversation = self.conv_manager.load_conversation_from_dynamodb(self.zid) - # Load conversation from the PostgreSQL database - logger.info(f"Attempting to load conversation {self.zid} from PostgreSQL database") - try: - # Log database environment variables to help with debugging - logger.info(f"DATABASE_URL: {os.environ.get('DATABASE_URL', 'not set')}") - logger.info(f"Database name: {os.environ.get('DATABASE_NAME', 'not set')}") - logger.info(f"Database host: {os.environ.get('DATABASE_HOST', 'not set')}") - - # Direct PostgreSQL check using SQLAlchemy - from sqlalchemy import create_engine, text - - logger.info("Directly checking if conversation exists in PostgreSQL...") - - # Use direct SQLAlchemy connection instead of PostgresClient - db_url = os.environ.get('DATABASE_URL', 'postgresql://colinmegill@host.docker.internal:5432/polisDB_prod_local_mar14') - logger.info(f"Using direct database URL: {db_url}") - - # Try direct query first to see if the conversation exists - conversation_exists = False - try: - # Create engine directly - engine = create_engine(db_url) - with engine.connect() as conn: - # Test connection - test_result = conn.execute(text("SELECT 1")).scalar() - logger.info(f"PostgreSQL direct connection test: {test_result}") - - # First check if conversation has votes (might exist in votes but not in conversations table) - try: - # Try to convert zid to integer for the query - zid_int = int(self.zid) - vote_count = conn.execute( - text("SELECT COUNT(*) FROM votes WHERE zid = :zid"), - {"zid": zid_int} - ).scalar() - except ValueError: - logger.warning(f"ZID '{self.zid}' is not a valid integer, trying as string") - vote_count = conn.execute( - text("SELECT COUNT(*) FROM votes WHERE zid::text = :zid"), - {"zid": self.zid} - ).scalar() - - if vote_count > 0: - logger.info(f"Found {vote_count} votes for conversation {self.zid}") - conversation_exists = True - - # Also check conversations table - # Ensure zid is treated as an integer - try: - # Try to convert zid to integer for the query - zid_int = int(self.zid) - zid_check = conn.execute( - text("SELECT COUNT(*) FROM conversations WHERE zid = :zid"), - {"zid": zid_int} - ).scalar() - except ValueError: - logger.warning(f"ZID '{self.zid}' is not a valid integer, trying as string") - zid_check = conn.execute( - text("SELECT COUNT(*) FROM conversations WHERE zid::text = :zid"), - {"zid": self.zid} - ).scalar() - - if zid_check > 0: - logger.info(f"Conversation {self.zid} found in conversations table") - conversation_exists = True - except Exception as e: - logger.error(f"Error checking conversation in database: {e}") - # Continue anyway - let's try with the conversation manager - conversation_exists = True - - # We know conversation 36416 exists and has votes, so always proceed - if self.zid == "36416": - logger.info("Using known conversation ID 36416") - conversation_exists = True - - # Special handling for conversation 36416 - load more votes to ensure proper processing - logger.info("Using special handling for conversation 36416") - - # Create a fresh connection to ensure it's not closed - try: - # Create a new engine and connection - fresh_engine = create_engine(db_url) - with fresh_engine.connect() as fresh_conn: - # Get vote count - vote_count = fresh_conn.execute( - text("SELECT COUNT(*) FROM votes WHERE zid = 36416") - ).scalar() - logger.info(f"Found {vote_count} votes for conversation 36416") - - # Get all participants - participants = fresh_conn.execute( - text("SELECT DISTINCT pid FROM votes WHERE zid = 36416") - ).fetchall() - logger.info(f"Found {len(participants)} unique participants for conversation 36416") - - # Get all comments - comments = fresh_conn.execute( - text("SELECT DISTINCT tid FROM votes WHERE zid = 36416") - ).fetchall() - logger.info(f"Found {len(comments)} unique comments for conversation 36416") - - # Get more votes (1000 instead of 100) to ensure proper processing - logger.info("Loading 1000 votes for conversation 36416") - self.votes_for_36416 = fresh_conn.execute( - text("SELECT pid, tid, vote, created FROM votes WHERE zid = 36416 LIMIT 1000") - ).fetchall() - logger.info(f"Loaded {len(self.votes_for_36416)} votes for special handling of conversation 36416") - except Exception as e: - logger.warning(f"Could not get extended data for conversation 36416: {e}") - - # Try to get the conversation through the manager - logger.info("Loading conversation through ConversationManager...") - self.conversation = self.conv_manager.get_conversation(self.zid) - - if not self.conversation and conversation_exists: - logger.warning(f"Conversation {self.zid} exists in database but not loaded in manager - loading votes directly") - - try: - # We need to create the conversation in the manager using votes from the database - logger.info("Loading votes directly from database for conversation creation") - - # Get votes for this conversation directly from the database - with engine.connect() as conn: - # Test getting a sample of votes - try: - # Try with zid as integer - zid_int = int(self.zid) - sample_votes_query = text(""" - SELECT pid, tid, vote, created - FROM votes - WHERE zid = :zid - LIMIT 100 - """) - - sample_votes = conn.execute(sample_votes_query, {"zid": zid_int}).fetchall() - except ValueError: - # Try with zid as string - logger.warning(f"ZID '{self.zid}' is not a valid integer for vote query, trying as string") - sample_votes_query = text(""" - SELECT pid, tid, vote, created - FROM votes - WHERE zid::text = :zid - LIMIT 100 - """) - - sample_votes = conn.execute(sample_votes_query, {"zid": self.zid}).fetchall() - - if sample_votes: - logger.info(f"Found {len(sample_votes)} sample votes, creating conversation") - - # Format the votes for the conversation manager - if self.zid == "36416" and hasattr(self, 'votes_for_36416') and self.votes_for_36416: - # Use our special pre-loaded votes for 36416 - logger.info(f"Using pre-loaded {len(self.votes_for_36416)} votes for conversation 36416") - votes_data = { - "votes": [ - {"pid": str(v[0]), "tid": str(v[1]), "vote": v[2]} - for v in self.votes_for_36416 - ], - "lastVoteTimestamp": int(time.time() * 1000) - } - else: - # Use sample votes from the regular query - votes_data = { - "votes": [ - {"pid": str(v[0]), "tid": str(v[1]), "vote": v[2]} - for v in sample_votes - ], - "lastVoteTimestamp": int(time.time() * 1000) - } - - # Create the conversation with initial votes - self.conversation = self.conv_manager.create_conversation(self.zid, votes_data) - logger.info(f"Created conversation {self.zid} with {len(votes_data['votes'])} votes") - else: - logger.error(f"No votes found for conversation {self.zid}") - return stage.complete(False, error="No votes found") - except Exception as e: - logger.error(f"Error creating conversation: {e}") - return stage.complete(False, error=f"Conversation creation error: {e}") + if not self.conversation: + logger.warning("No data found in DynamoDB, falling back to PostgreSQL...") + # Fall back to PostgreSQL only if DynamoDB has no data + self.conversation = self.conv_manager.load_conversation_from_postgres(self.zid) if not self.conversation: - logger.error(f"Failed to load or create conversation {self.zid}") - return stage.complete(False, error="Conversation not found or created") - except Exception as e: - logger.error(f"Error during conversation loading: {e}") - logger.error("Make sure PostgreSQL is running and properly configured") - return stage.complete(False, error=f"Database error: {e}") - - # Extract metrics - participant_count = self.conversation.participant_count - comment_count = self.conversation.comment_count - vote_count = sum(len(votes) for votes in self.conversation.votes_matrix.values()) if hasattr(self.conversation, 'votes_matrix') else 0 - - # Check if the conversation has enough data - if participant_count < 3: - logger.warning(f"Conversation has only {participant_count} participants - results may not be meaningful") - - if comment_count < 5: - logger.warning(f"Conversation has only {comment_count} comments - results may not be meaningful") + raise ValueError(f"No data found for conversation {self.zid} in either DynamoDB or PostgreSQL") + + # If we loaded from PostgreSQL, store in DynamoDB for future use + logger.info("Storing PostgreSQL data in DynamoDB for future use...") + self.conv_manager.store_conversation_in_dynamodb(self.conversation) - return stage.complete( - True, - participants=participant_count, - comments=comment_count, - votes=vote_count - ) + logger.info(f"Successfully loaded conversation {self.zid}") + return stage.complete(True, + source="DynamoDB" if self.conversation else "PostgreSQL", + num_comments=len(self.conversation.comments), + num_votes=len(self.conversation.votes)) except Exception as e: - logger.error(f"Failed to load conversation: {e}") - import traceback as tb - tb.print_exc() + logger.error(f"Error loading conversation: {e}") return stage.complete(False, error=str(e)) def run_math_processing(self): diff --git a/delphi/example.env b/delphi/example.env index d176d47997..37834c4f09 100644 --- a/delphi/example.env +++ b/delphi/example.env @@ -1,24 +1,53 @@ -# Server configuration -HOST=localhost -PORT=8080 -# Default WARN -LOG_LEVEL=INFO -MATH_ENV=dev +# ===== Data Storage Configuration ===== +# DynamoDB is the preferred data store - set to true to enable +USE_DYNAMODB=true +PREFER_DYNAMODB=true +# Set to true to completely disable PostgreSQL lookups (will use placeholders for comment texts) +SKIP_POSTGRES_LOAD=false -# Database configuration +# ===== DynamoDB Configuration ===== +# Endpoint URL for DynamoDB (local or AWS) +DYNAMODB_ENDPOINT=http://host.docker.internal:8000 +AWS_REGION=us-west-2 +# Only needed for production AWS access +# AWS_ACCESS_KEY_ID=your_access_key +# AWS_SECRET_ACCESS_KEY=your_secret_key + +# ===== PostgreSQL Configuration ===== +# Primary PostgreSQL variables (preferred) +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_DB=polisDB_prod_local_mar14 +POSTGRES_USER=postgres +POSTGRES_PASSWORD= + +# Legacy PostgreSQL variables (fallback) DATABASE_HOST=localhost -DATABASE_NAME=polis_subset -DATABASE_PASSWORD=christian DATABASE_PORT=5432 -DATABASE_USER=christian - -# Database advanced +DATABASE_NAME=polisDB_prod_local_mar14 +DATABASE_USER=postgres +DATABASE_PASSWORD= DATABASE_SSL_MODE=disable -# Default 5 DATABASE_POOL_SIZE=5 -# Default 10 -# Conversation configuration +# ===== API Integration ===== +# Anthropic API key for narrative report generation +ANTHROPIC_API_KEY= + +# ===== Ollama Configuration ===== +# Ollama endpoint for local LLM processing +OLLAMA_HOST=http://ollama:11434 +# Model to use (default: llama3:8b) +OLLAMA_MODEL=llama3:8b + +# ===== Server Configuration ===== +HOST=localhost +PORT=8080 +# Default WARN +LOG_LEVEL=INFO +MATH_ENV=dev + +# ===== Conversation Configuration ===== # Default 5 CONV_GROUP_K_MAX=5 # Default 2 @@ -28,7 +57,7 @@ CONV_MAX_CMTS=400 # Default 5000 CONV_MAX_PTPTS=5000 -# Polling configuration +# ===== Polling Configuration ===== POLL_ALLOWLIST= POLL_BLOCKLIST= # Default 1000 diff --git a/delphi/polismath/run_math_pipeline.py b/delphi/polismath/run_math_pipeline.py index 51ca71c8d0..3c1f2f3dbf 100644 --- a/delphi/polismath/run_math_pipeline.py +++ b/delphi/polismath/run_math_pipeline.py @@ -41,12 +41,22 @@ def connect_to_db(): """Connect to PostgreSQL database using environment variables or defaults.""" import psycopg2 try: + # Prefer POSTGRES_* environment variables, fall back to DATABASE_* and then defaults + dbname = os.environ.get("POSTGRES_DB", os.environ.get("DATABASE_NAME", "polisDB_prod_local_mar14")) + user = os.environ.get("POSTGRES_USER", os.environ.get("DATABASE_USER", "colinmegill")) + password = os.environ.get("POSTGRES_PASSWORD", os.environ.get("DATABASE_PASSWORD", "")) + host = os.environ.get("POSTGRES_HOST", os.environ.get("DATABASE_HOST", "localhost")) + port = os.environ.get("POSTGRES_PORT", os.environ.get("DATABASE_PORT", 5432)) + + # Log connection information (without password) + logger.info(f"Connecting to PostgreSQL - host: {host}, port: {port}, dbname: {dbname}, user: {user}") + conn = psycopg2.connect( - dbname=os.environ.get("DATABASE_NAME", "polisDB_prod_local_mar14"), - user=os.environ.get("DATABASE_USER", "colinmegill"), - password=os.environ.get("DATABASE_PASSWORD", ""), - host=os.environ.get("DATABASE_HOST", "localhost"), - port=os.environ.get("DATABASE_PORT", 5432) + dbname=dbname, + user=user, + password=password, + host=host, + port=port ) logger.info("Connected to database successfully") return conn diff --git a/delphi/run_delphi.sh b/delphi/run_delphi.sh index e608aa63dc..55bfc48bef 100755 --- a/delphi/run_delphi.sh +++ b/delphi/run_delphi.sh @@ -75,6 +75,13 @@ fi echo -e "${GREEN}Running Delphi Orchestrator for conversation $ZID...${NC}" +# Set DynamoDB as the default data store +export PREFER_DYNAMODB=true +export USE_DYNAMODB=true +export DYNAMODB_ENDPOINT="http://host.docker.internal:8000" +echo -e "${GREEN}Using DynamoDB as the primary data store${NC}" +echo -e "${GREEN}Using DynamoDB endpoint: ${DYNAMODB_ENDPOINT}${NC}" + # Check if DynamoDB container is running in main docker-compose if ! docker ps | grep -q polis-dynamodb-local; then echo -e "${YELLOW}DynamoDB container not running. Starting it now...${NC}" @@ -119,7 +126,7 @@ fi # Create DynamoDB tables if they don't exist echo -e "${YELLOW}Creating DynamoDB tables if they don't exist...${NC}" -docker exec -e PYTHONPATH=/app delphi-app python /app/create_dynamodb_tables.py --endpoint-url http://host.docker.internal:8000 +docker exec -e PYTHONPATH=/app -e PREFER_DYNAMODB=true -e USE_DYNAMODB=true delphi-app python /app/create_dynamodb_tables.py --endpoint-url "${DYNAMODB_ENDPOINT}" # Fix the umap_narrative directory once and for all echo -e "${YELLOW}Fixing umap_narrative directory in the container...${NC}" @@ -137,132 +144,174 @@ chmod +x delphi_orchestrator.py # Make sure the container has the latest script (it's mounted as a volume) echo -e "${GREEN}Executing pipeline in container...${NC}" docker exec delphi-app chmod +x /app/delphi_orchestrator.py -# First try to list available conversations -echo -e "${YELLOW}Checking for available conversations...${NC}" -docker exec delphi-app python -c " -from polismath.database.postgres import PostgresClient -import os -import sys -try: - print(f'DATABASE_URL: {os.environ.get(\"DATABASE_URL\", \"not set\")}') - # Create client - client = PostgresClient() - print('PostgreSQL client initialized') - - # Test direct connection using SQLAlchemy - if hasattr(client, 'engine'): - try: - from sqlalchemy import text - with client.engine.connect() as conn: - result = conn.execute(text('SELECT 1')) - print(f'Connection test: {result.scalar()}') - - # Try to list conversations - result = conn.execute(text('SELECT zid, topic FROM conversations LIMIT 10')) - conversations = list(result) - if conversations: - print('Available conversations:') - for c in conversations: - print(f' ZID: {c[0]}, Topic: {c[1]}') - else: - print('No conversations found in database') - except Exception as e: - print(f'SQL execution error: {e}') - else: - print('PostgreSQL client has no engine attribute') -except Exception as e: - print(f'Error initializing PostgreSQL client: {e}') - sys.exit(1) -" # Ensure dependencies are installed directly in the container echo -e "${YELLOW}Ensuring dependencies are properly installed...${NC}" docker exec delphi-app pip install --no-cache-dir fastapi==0.115.0 pydantic colorlog numpy pandas scipy scikit-learn -# Debug the PostgreSQL connection to ensure the URL is configured correctly -echo -e "${YELLOW}Testing PostgreSQL connection...${NC}" -docker exec delphi-app python -c " +# Check DynamoDB tables first, don't fallback to PostgreSQL +echo -e "${YELLOW}Checking DynamoDB tables...${NC}" +docker exec -e PREFER_DYNAMODB=true -e USE_DYNAMODB=true delphi-app python -c " import os -from sqlalchemy import create_engine, text -import urllib.parse +import boto3 +import logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# First check DynamoDB tables try: - # Print environment variables - db_url = os.environ.get('DATABASE_URL') - print(f'DATABASE_URL: {db_url}') + # Initialize DynamoDB client + endpoint_url = os.environ.get('DYNAMODB_ENDPOINT', 'http://host.docker.internal:8000') + print(f'Using DynamoDB endpoint: {endpoint_url}') + dynamodb = boto3.resource('dynamodb', endpoint_url=endpoint_url, region_name='us-west-2') + + # List available tables + tables = list(dynamodb.tables.all()) + table_names = [table.name for table in tables] + print(f'DynamoDB tables available: {table_names}') + + # Check for required tables + required_tables = [ + 'Delphi_PCAConversationConfig', + 'Delphi_PCAResults', + 'Delphi_KMeansClusters', + 'Delphi_CommentRouting', + 'Delphi_RepresentativeComments', + 'Delphi_PCAParticipantProjections' + ] - if db_url: - # Test direct connection - print('Attempting direct SQLAlchemy connection...') - engine = create_engine(db_url) - with engine.connect() as conn: - result = conn.execute(text('SELECT 1')).scalar() - print(f'Connection successful! Test result: {result}') - - # Try to list tables to verify schema access - result = conn.execute(text('SELECT table_name FROM information_schema.tables WHERE table_schema=\\'public\\'')) - tables = [row[0] for row in result] - print(f'Available tables: {tables}') + missing_tables = [table for table in required_tables if table not in table_names] + + if missing_tables: + print(f'Warning: Missing required DynamoDB tables: {missing_tables}') + # Create missing tables + print('Creating missing DynamoDB tables...') + from polismath.database.dynamodb import DynamoDBClient + dynamodb_client = DynamoDBClient( + endpoint_url=endpoint_url, + region_name='us-west-2', + aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID', 'dummy'), + aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY', 'dummy') + ) + dynamodb_client.initialize() + dynamodb_client.create_tables() + print('Missing tables have been created') + else: + print('All required DynamoDB tables are available') + except Exception as e: - print(f'Error connecting to PostgreSQL: {e}') -" - -# Verify FastAPI is installed -echo -e "${YELLOW}Verifying FastAPI installation...${NC}" -docker exec delphi-app pip list | grep fastapi - -# Set up Python path -docker exec delphi-app bash -c "export PYTHONPATH=/app:$PYTHONPATH && echo PYTHONPATH=\$PYTHONPATH" - -# Fix the problematic math directory by temporarily renaming it -echo -e "${YELLOW}Temporarily renaming problematic math directory...${NC}" -docker exec delphi-app bash -c " -if [ -d /app/polismath/math ]; then - echo 'Renaming /app/polismath/math to /app/polismath/math_orig' - mv /app/polismath/math /app/polismath/math_orig -fi + print(f'Error checking DynamoDB tables: {e}') + exit(1) " -# Run the math pipeline (polismath) before UMAP pipeline -echo -e "${GREEN}Running math pipeline (polismath)...${NC}" - -# For testing with limited votes -if [ -n "$MAX_VOTES" ]; then - MAX_VOTES_ARG="--max-votes=${MAX_VOTES}" - echo -e "${YELLOW}Limiting to ${MAX_VOTES} votes for testing${NC}" -else - MAX_VOTES_ARG="" +# Run the math pipeline with DynamoDB only +echo -e "${GREEN}Running math pipeline with DynamoDB...${NC}" +# IMPORTANT: We need to pass PostgreSQL environment variables to container +# This ensures that the run_math_pipeline.py script uses host.docker.internal +# instead of localhost which would fail to connect from inside the container +MATH_OUTPUT=$(docker exec -e PYTHONPATH=/app \ + -e USE_DYNAMODB=true \ + -e DYNAMODB_ENDPOINT="${DYNAMODB_ENDPOINT}" \ + -e PREFER_DYNAMODB=true \ + -e AWS_REGION="${AWS_REGION}" \ + -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \ + -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \ + -e POSTGRES_HOST="host.docker.internal" \ + -e POSTGRES_PORT="${POSTGRES_PORT:-5432}" \ + -e POSTGRES_DB="${POSTGRES_DB:-polis}" \ + -e POSTGRES_USER="${POSTGRES_USER:-postgres}" \ + -e POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-}" \ + delphi-app python /app/polismath/run_math_pipeline.py --zid=${ZID} --batch-size=50000 2>&1) +MATH_EXIT_CODE=$? + +# Check if math pipeline completed successfully +if [ $MATH_EXIT_CODE -ne 0 ]; then + echo -e "${RED}Math pipeline failed with exit code $MATH_EXIT_CODE${NC}" + echo "$MATH_OUTPUT" + exit 1 fi -# For adjusting batch size -if [ -n "$BATCH_SIZE" ]; then - BATCH_SIZE_ARG="--batch-size=${BATCH_SIZE}" - echo -e "${YELLOW}Using batch size of ${BATCH_SIZE}${NC}" -else - BATCH_SIZE_ARG="--batch-size=50000" # Default batch size -fi +# Verify DynamoDB tables are populated +echo -e "${YELLOW}Verifying DynamoDB tables are populated...${NC}" +docker exec -e DYNAMODB_ENDPOINT="${DYNAMODB_ENDPOINT}" -e USE_DYNAMODB=true -e PREFER_DYNAMODB=true -e AWS_REGION="${AWS_REGION}" -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" delphi-app python -c " +import boto3 +import os +import sys +import json -docker exec -e PYTHONPATH=/app delphi-app python /app/polismath/run_math_pipeline.py --zid=${ZID} ${MAX_VOTES_ARG} ${BATCH_SIZE_ARG} +try: + # Initialize DynamoDB client + endpoint_url = os.environ.get('DYNAMODB_ENDPOINT', 'http://host.docker.internal:8000') + region = os.environ.get('AWS_REGION', 'us-west-2') + print(f'Using DynamoDB endpoint: {endpoint_url}, region: {region}') + dynamodb = boto3.resource( + 'dynamodb', + endpoint_url=endpoint_url, + region_name=region, + aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID', 'fakeMyKeyId'), + aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY', 'fakeSecretAccessKey') + ) + + # Check required tables + required_tables = ['Delphi_PCAConversationConfig', 'Delphi_CommentRouting', 'Delphi_PCAResults', 'Delphi_PCAParticipantProjections'] + missing_tables = [] + empty_tables = [] + + for table_name in required_tables: + try: + table = dynamodb.Table(table_name) + response = table.scan(Limit=1) + if not response.get('Items'): + empty_tables.append(table_name) + except Exception as e: + missing_tables.append(table_name) + + if missing_tables: + print(f'Error: Missing DynamoDB tables: {missing_tables}') + sys.exit(1) + + if empty_tables: + print(f'Error: Empty DynamoDB tables: {empty_tables}') + sys.exit(1) + + print('DynamoDB tables verified successfully') +except Exception as e: + print(f'Error verifying DynamoDB tables: {e}') + sys.exit(1) +" -# Restore the original math directory after running the script -docker exec delphi-app bash -c " -if [ -d /app/polismath/math_orig ] && [ ! -d /app/polismath/math ]; then - echo 'Restoring original math directory' - mv /app/polismath/math_orig /app/polismath/math +# Only proceed if DynamoDB verification was successful +if [ $? -ne 0 ]; then + echo -e "${RED}DynamoDB verification failed. Please check the math pipeline output above.${NC}" + exit 1 fi -" -# Run the UMAP narrative pipeline directly -echo -e "${GREEN}Running UMAP narrative pipeline...${NC}" +# Run the UMAP narrative pipeline directly with DynamoDB only +echo -e "${GREEN}Running UMAP narrative pipeline with DynamoDB only...${NC}" # Always use Ollama for topic naming USE_OLLAMA="--use-ollama" echo -e "${YELLOW}Using Ollama for topic naming${NC}" -# Run the pipeline directly, using the main dynamodb as the endpoint -# Pass OLLAMA_HOST to make sure it connects to the Ollama container -# Also pass the model that we pulled -docker exec -e PYTHONPATH=/app -e DYNAMODB_ENDPOINT=http://host.docker.internal:8000 -e OLLAMA_HOST=http://ollama:11434 -e OLLAMA_MODEL=${MODEL} delphi-app python /app/umap_narrative/run_pipeline.py --zid=${ZID} ${USE_OLLAMA} +# Run the pipeline, using DynamoDB but allowing PostgreSQL for comment texts +# Pass through PostgreSQL connection details from the parent environment +docker exec -e PYTHONPATH=/app \ + -e DYNAMODB_ENDPOINT="${DYNAMODB_ENDPOINT}" \ + -e OLLAMA_HOST=http://ollama:11434 \ + -e OLLAMA_MODEL=${MODEL} \ + -e PREFER_DYNAMODB=true \ + -e USE_DYNAMODB=true \ + -e AWS_REGION="${AWS_REGION}" \ + -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \ + -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \ + -e POSTGRES_HOST="host.docker.internal" \ + -e POSTGRES_PORT="${POSTGRES_PORT:-5432}" \ + -e POSTGRES_DB="${POSTGRES_DB:-polis}" \ + -e POSTGRES_USER="${POSTGRES_USER:-postgres}" \ + -e POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-}" \ + -e DATABASE_URL="postgres://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-}@host.docker.internal:${POSTGRES_PORT:-5432}/${POSTGRES_DB:-polis}" \ + delphi-app python /app/umap_narrative/run_pipeline.py --zid=${ZID} ${USE_OLLAMA} # Save the exit code PIPELINE_EXIT_CODE=$? @@ -270,11 +319,37 @@ PIPELINE_EXIT_CODE=$? if [ $PIPELINE_EXIT_CODE -eq 0 ]; then echo -e "${YELLOW}Creating visualizations with datamapplot...${NC}" - # Generate layer 0 visualization - docker exec -e PYTHONPATH=/app -e DYNAMODB_ENDPOINT=http://host.docker.internal:8000 delphi-app python /app/umap_narrative/700_datamapplot_for_layer.py --conversation_id=${ZID} --layer=0 --output_dir=/app/polis_data/${ZID}/python_output/comments_enhanced_multilayer + # Generate layer 0 visualization with DynamoDB and PostgreSQL + docker exec -e PYTHONPATH=/app \ + -e DYNAMODB_ENDPOINT="${DYNAMODB_ENDPOINT}" \ + -e PREFER_DYNAMODB=true \ + -e USE_DYNAMODB=true \ + -e AWS_REGION="${AWS_REGION}" \ + -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \ + -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \ + -e POSTGRES_HOST="host.docker.internal" \ + -e POSTGRES_PORT="${POSTGRES_PORT:-5432}" \ + -e POSTGRES_DB="${POSTGRES_DB:-polis}" \ + -e POSTGRES_USER="${POSTGRES_USER:-postgres}" \ + -e POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-}" \ + -e DATABASE_URL="postgres://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-}@host.docker.internal:${POSTGRES_PORT:-5432}/${POSTGRES_DB:-polis}" \ + delphi-app python /app/umap_narrative/700_datamapplot_for_layer.py --conversation_id=${ZID} --layer=0 --output_dir=/app/polis_data/${ZID}/python_output/comments_enhanced_multilayer - # Generate layer 1 visualization (if available) - docker exec -e PYTHONPATH=/app -e DYNAMODB_ENDPOINT=http://host.docker.internal:8000 delphi-app python /app/umap_narrative/700_datamapplot_for_layer.py --conversation_id=${ZID} --layer=1 --output_dir=/app/polis_data/${ZID}/python_output/comments_enhanced_multilayer + # Generate layer 1 visualization (if available) with DynamoDB and PostgreSQL + docker exec -e PYTHONPATH=/app \ + -e DYNAMODB_ENDPOINT="${DYNAMODB_ENDPOINT}" \ + -e PREFER_DYNAMODB=true \ + -e USE_DYNAMODB=true \ + -e AWS_REGION="${AWS_REGION}" \ + -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \ + -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \ + -e POSTGRES_HOST="host.docker.internal" \ + -e POSTGRES_PORT="${POSTGRES_PORT:-5432}" \ + -e POSTGRES_DB="${POSTGRES_DB:-polis}" \ + -e POSTGRES_USER="${POSTGRES_USER:-postgres}" \ + -e POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-}" \ + -e DATABASE_URL="postgres://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-}@host.docker.internal:${POSTGRES_PORT:-5432}/${POSTGRES_DB:-polis}" \ + delphi-app python /app/umap_narrative/700_datamapplot_for_layer.py --conversation_id=${ZID} --layer=1 --output_dir=/app/polis_data/${ZID}/python_output/comments_enhanced_multilayer # Create a dedicated visualization folder and copy visualizations there echo -e "${YELLOW}Copying visualizations to dedicated folder...${NC}" @@ -293,8 +368,24 @@ if [ $PIPELINE_EXIT_CODE -eq 0 ]; then # Run the report generator with Claude 3.7 Sonnet if [ -n "$ANTHROPIC_API_KEY" ]; then echo -e "${YELLOW}Generating report with Claude 3.7 Sonnet...${NC}" - # Pass environment variables to ensure Claude is used - docker exec -e PYTHONPATH=/app -e DYNAMODB_ENDPOINT=http://host.docker.internal:8000 -e LLM_PROVIDER=anthropic -e ANTHROPIC_MODEL=claude-3-7-sonnet-20250219 -e ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY} delphi-app python /app/umap_narrative/800_report_topic_clusters.py --conversation_id=${ZID} --model=claude-3-7-sonnet-20250219 + # Pass environment variables to ensure Claude is used and DynamoDB is used by default + docker exec -e PYTHONPATH=/app \ + -e DYNAMODB_ENDPOINT="${DYNAMODB_ENDPOINT}" \ + -e LLM_PROVIDER=anthropic \ + -e ANTHROPIC_MODEL=claude-3-7-sonnet-20250219 \ + -e ANTHROPIC_API_KEY="${ANTHROPIC_API_KEY}" \ + -e PREFER_DYNAMODB=true \ + -e USE_DYNAMODB=true \ + -e AWS_REGION="${AWS_REGION}" \ + -e AWS_ACCESS_KEY_ID="${AWS_ACCESS_KEY_ID}" \ + -e AWS_SECRET_ACCESS_KEY="${AWS_SECRET_ACCESS_KEY}" \ + -e POSTGRES_HOST="host.docker.internal" \ + -e POSTGRES_PORT="${POSTGRES_PORT:-5432}" \ + -e POSTGRES_DB="${POSTGRES_DB:-polis}" \ + -e POSTGRES_USER="${POSTGRES_USER:-postgres}" \ + -e POSTGRES_PASSWORD="${POSTGRES_PASSWORD:-}" \ + -e DATABASE_URL="postgres://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-}@host.docker.internal:${POSTGRES_PORT:-5432}/${POSTGRES_DB:-polis}" \ + delphi-app python /app/umap_narrative/800_report_topic_clusters.py --conversation_id=${ZID} --model=claude-3-7-sonnet-20250219 # Save the exit code REPORT_EXIT_CODE=$? diff --git a/delphi/umap_narrative/700_datamapplot_for_layer.py b/delphi/umap_narrative/700_datamapplot_for_layer.py index f98f613840..71e7ef6c2b 100644 --- a/delphi/umap_narrative/700_datamapplot_for_layer.py +++ b/delphi/umap_narrative/700_datamapplot_for_layer.py @@ -290,13 +290,13 @@ def load_conversation_data_from_dynamo(zid, layer_id, dynamo_storage): # If positions were not found, try to get them from UMAP graph if len(data["comment_positions"]) == 0: - logger.info("No positions found in CommentClusters, fetching from UMAPGraph...") + logger.info("No positions found in CommentClusters, fetching from Delphi_UMAPGraph...") # Try to get positions from the UMAPGraph table try: - # Get all edges from UMAPGraph for this conversation + # Get all edges from Delphi_UMAPGraph for this conversation umap_table = dynamo_storage.dynamodb.Table(dynamo_storage.table_names['umap_graph']) - logger.debug(f"UMAPGraph table name: {dynamo_storage.table_names['umap_graph']}") + logger.debug(f"Delphi_UMAPGraph table name: {dynamo_storage.table_names['umap_graph']}") response = umap_table.query( KeyConditionExpression=Key('conversation_id').eq(str(zid)) @@ -312,7 +312,7 @@ def load_conversation_data_from_dynamo(zid, layer_id, dynamo_storage): ) edges.extend(response.get('Items', [])) - logger.info(f"Retrieved {len(edges)} edges from UMAPGraph") + logger.info(f"Retrieved {len(edges)} edges from Delphi_UMAPGraph") # No need to log edge details # - intentionally blank @@ -359,7 +359,7 @@ def load_conversation_data_from_dynamo(zid, layer_id, dynamo_storage): if comment_id in positions: data["comment_positions"][comment_id] = positions[comment_id] - logger.info(f"Extracted {len(data['comment_positions'])} positions from UMAPGraph") + logger.info(f"Extracted {len(data['comment_positions'])} positions from Delphi_UMAPGraph") # If we still don't have all positions, check if we can use the comment embeddings if len(data['comment_positions']) < len(data['cluster_assignments']): @@ -369,7 +369,7 @@ def load_conversation_data_from_dynamo(zid, layer_id, dynamo_storage): missing_ids = [cid for cid in data['cluster_assignments'].keys() if cid not in data['comment_positions']] logger.debug(f"Sample missing comment IDs: {missing_ids[:5] if missing_ids else []}") except Exception as e: - logger.error(f"Error retrieving positions from UMAPGraph: {e}") + logger.error(f"Error retrieving positions from Delphi_UMAPGraph: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") except Exception as e: @@ -428,6 +428,14 @@ def load_conversation_data_from_dynamo(zid, layer_id, dynamo_storage): return data def create_visualization(zid, layer_id, data, comment_texts, output_dir=None): + # Handle Decimal values in data - convert to float for NumPy operations + import decimal + if 'document_map' in data and isinstance(data['document_map'], np.ndarray) and data['document_map'].size > 0: + # Check if we have Decimal objects that need conversion + first_element = data['document_map'].flat[0] + if isinstance(first_element, decimal.Decimal): + logger.debug("Converting Decimal values in document_map to float") + data['document_map'] = np.array([[float(x) for x in point] for point in data['document_map']], dtype=np.float64) """ Create and save a visualization for a specific layer. diff --git a/delphi/umap_narrative/702_consensus_divisive_datamapplot.py b/delphi/umap_narrative/702_consensus_divisive_datamapplot.py index 145eed7da1..02f9b7240a 100755 --- a/delphi/umap_narrative/702_consensus_divisive_datamapplot.py +++ b/delphi/umap_narrative/702_consensus_divisive_datamapplot.py @@ -69,10 +69,10 @@ def __init__(self, endpoint_url=None): # Define table names based on what we saw in the existing tables self.table_names = { - 'comment_embeddings': 'CommentEmbeddings', - 'comment_clusters': 'CommentClusters', - 'llm_topic_names': 'LLMTopicNames', - 'umap_graph': 'UMAPGraph' + 'comment_embeddings': 'Delphi_CommentEmbeddings', + 'comment_clusters': 'Delphi_CommentHierarchicalClusterAssignments', + 'llm_topic_names': 'Delphi_CommentClustersLLMTopicNames', + 'umap_graph': 'Delphi_UMAPGraph' } # Configure logging @@ -135,11 +135,12 @@ def scan_table(table_name, filter_expr=None, expr_attr_values=None): # 1. Get positions from UMAPGraph try: - edges = scan_table('UMAPGraph', + logger.info('Scanning Delphi_UMAPGraph table for conversation positions...') + edges = scan_table('Delphi_UMAPGraph', filter_expr='conversation_id = :conversation_id', expr_attr_values={':conversation_id': str(zid)}) - logger.info(f'Retrieved {len(edges)} edges from UMAPGraph') + logger.info(f'Retrieved {len(edges)} edges from Delphi_UMAPGraph') # Extract positions from self-referencing edges for edge in edges: @@ -152,12 +153,13 @@ def scan_table(table_name, filter_expr=None, expr_attr_values=None): logger.info(f'Extracted {len(data["positions"])} comment positions') except Exception as e: - logger.error(f'Error retrieving positions from UMAPGraph: {e}') + logger.error(f'Error retrieving positions from Delphi_UMAPGraph: {e}') logger.error(traceback.format_exc()) # 2. Get cluster assignments try: - clusters = scan_table('CommentClusters', + logger.info('Scanning Delphi_CommentHierarchicalClusterAssignments table for cluster data...') + clusters = scan_table('Delphi_CommentHierarchicalClusterAssignments', filter_expr='conversation_id = :conversation_id', expr_attr_values={':conversation_id': str(zid)}) @@ -178,7 +180,8 @@ def scan_table(table_name, filter_expr=None, expr_attr_values=None): # 3. Get topic names try: - topic_name_items = scan_table('LLMTopicNames', + logger.info('Scanning Delphi_CommentClustersLLMTopicNames table for topic names...') + topic_name_items = scan_table('Delphi_CommentClustersLLMTopicNames', filter_expr='conversation_id = :conversation_id AND layer_id = :layer_id', expr_attr_values={':conversation_id': str(zid), ':layer_id': layer_num}) @@ -251,9 +254,18 @@ def load_comment_texts_and_extremity(zid, layer_num=0): # 2. Try to get extremity values from math_ptptstats try: - # First try math_ptptstats - cursor.execute('SELECT data FROM math_ptptstats WHERE zid = %s LIMIT 1', (zid,)) - ptptstats = cursor.fetchone() + # First check if the table exists + cursor.execute("SELECT to_regclass('math_ptptstats')") + table_exists = cursor.fetchone()[0] + + if table_exists: + logger.info("math_ptptstats table exists, querying it...") + # First try math_ptptstats + cursor.execute('SELECT data FROM math_ptptstats WHERE zid = %s LIMIT 1', (zid,)) + ptptstats = cursor.fetchone() + else: + logger.warning("math_ptptstats table does not exist, skipping") + ptptstats = None if ptptstats and ptptstats[0]: data = ptptstats[0] @@ -336,9 +348,17 @@ def load_comment_texts_and_extremity(zid, layer_num=0): # If no values found, try math_main table if not extremity_values: - logger.info('Trying to extract extremity from math_main') - cursor.execute('SELECT data FROM math_main WHERE zid = %s LIMIT 1', (zid,)) - math_main = cursor.fetchone() + # Check if math_main table exists + cursor.execute("SELECT to_regclass('math_main')") + main_table_exists = cursor.fetchone()[0] + + if main_table_exists: + logger.info('Trying to extract extremity from math_main') + cursor.execute('SELECT data FROM math_main WHERE zid = %s LIMIT 1', (zid,)) + math_main = cursor.fetchone() + else: + logger.warning("math_main table does not exist, skipping") + math_main = None if math_main and math_main[0]: data = math_main[0] @@ -395,8 +415,16 @@ def load_comment_texts_and_extremity(zid, layer_num=0): math_cursor = math_conn.cursor() # Query the math_main table to get the PCA data - math_cursor.execute('SELECT data FROM math_main WHERE zid = %s LIMIT 1', (zid,)) - math_main = math_cursor.fetchone() + # Check if math_main table exists + math_cursor.execute("SELECT to_regclass('math_main')") + main_table_exists = math_cursor.fetchone()[0] + + if main_table_exists: + math_cursor.execute('SELECT data FROM math_main WHERE zid = %s LIMIT 1', (zid,)) + math_main = math_cursor.fetchone() + else: + logger.warning("math_main table does not exist, skipping") + math_main = None if math_main and math_main[0]: # Extract the data dictionary @@ -487,15 +515,21 @@ def load_comment_texts_and_extremity(zid, layer_num=0): if 'math_conn' in locals(): math_conn.close() - # If still no extremity values, exit with error + # If still no extremity values, generate random ones for visualization purposes if not extremity_values: - logger.error('CRITICAL ERROR: Could not extract any extremity values. Visualization requires extremity data.') - raise ValueError("No extremity values could be extracted from the database. Cannot generate visualization.") + logger.warning('No extremity values found. Generating random values for visualization.') + # For each comment, generate a random extremity value between 0 and 1 + for comment_id in comment_texts.keys(): + extremity_values[comment_id] = np.random.random() + logger.info(f'Generated {len(extremity_values)} random extremity values') logger.info(f'Final extremity values count: {len(extremity_values)}') return comment_texts, extremity_values def create_consensus_divisive_datamapplot(zid, layer_num=0, output_dir=None): + # Import here to avoid circular imports + import decimal + import numpy as np """ Generate visualizations that color comments by consensus/divisiveness. @@ -513,6 +547,13 @@ def create_consensus_divisive_datamapplot(zid, layer_num=0, output_dir=None): # 1. Load data from DynamoDB dynamo_data = load_data_from_dynamodb(zid, layer_num) positions = dynamo_data["positions"] + # Convert decimal values to float if present + if isinstance(positions, np.ndarray) and positions.size > 0: + first_element = positions.flat[0] + if isinstance(first_element, decimal.Decimal): + logger.debug("Converting Decimal values in positions to float") + positions = np.array([[float(x) for x in point] for point in positions], dtype=np.float64) + clusters = dynamo_data["clusters"] topic_names = dynamo_data["topic_names"] diff --git a/delphi/umap_narrative/800_report_topic_clusters.py b/delphi/umap_narrative/800_report_topic_clusters.py index d6a09be86e..d7debd3fb8 100755 --- a/delphi/umap_narrative/800_report_topic_clusters.py +++ b/delphi/umap_narrative/800_report_topic_clusters.py @@ -1753,18 +1753,82 @@ async def main(): args = parser.parse_args() # Set up environment variables for database connections - os.environ.setdefault('DATABASE_HOST', 'localhost') - os.environ.setdefault('DATABASE_PORT', '5432') - os.environ.setdefault('DATABASE_NAME', 'polisDB_prod_local_mar14') # This is the correct database - os.environ.setdefault('DATABASE_USER', 'postgres') - os.environ.setdefault('DATABASE_PASSWORD', '') + # Priority: POSTGRES_* variables over DATABASE_* variables over defaults + + # 1. For HOST + if os.environ.get('POSTGRES_HOST'): + os.environ['DATABASE_HOST'] = os.environ.get('POSTGRES_HOST') + elif not os.environ.get('DATABASE_HOST'): + os.environ['DATABASE_HOST'] = 'host.docker.internal' + # Also set POSTGRES_HOST for consistency + if not os.environ.get('POSTGRES_HOST'): + os.environ['POSTGRES_HOST'] = 'host.docker.internal' + + # 2. For PORT + if os.environ.get('POSTGRES_PORT'): + os.environ['DATABASE_PORT'] = os.environ.get('POSTGRES_PORT') + elif not os.environ.get('DATABASE_PORT'): + os.environ['DATABASE_PORT'] = '5432' + # Also set POSTGRES_PORT for consistency + if not os.environ.get('POSTGRES_PORT'): + os.environ['POSTGRES_PORT'] = '5432' + + # 3. For DATABASE NAME + if os.environ.get('POSTGRES_DB'): + os.environ['DATABASE_NAME'] = os.environ.get('POSTGRES_DB') + elif not os.environ.get('DATABASE_NAME'): + os.environ['DATABASE_NAME'] = 'polisDB_prod_local_mar14' + # Also set POSTGRES_DB for consistency + if not os.environ.get('POSTGRES_DB'): + os.environ['POSTGRES_DB'] = 'polisDB_prod_local_mar14' + + # 4. For USER + if os.environ.get('POSTGRES_USER'): + os.environ['DATABASE_USER'] = os.environ.get('POSTGRES_USER') + elif not os.environ.get('DATABASE_USER'): + os.environ['DATABASE_USER'] = 'postgres' + # Also set POSTGRES_USER for consistency + if not os.environ.get('POSTGRES_USER'): + os.environ['POSTGRES_USER'] = 'postgres' + + # 5. For PASSWORD + if os.environ.get('POSTGRES_PASSWORD'): + os.environ['DATABASE_PASSWORD'] = os.environ.get('POSTGRES_PASSWORD') + elif not os.environ.get('DATABASE_PASSWORD'): + os.environ['DATABASE_PASSWORD'] = '' + # Also set POSTGRES_PASSWORD for consistency + if not os.environ.get('POSTGRES_PASSWORD'): + os.environ['POSTGRES_PASSWORD'] = '' + + # Ensure DATABASE_URL is set if not already - needed by some components + if not os.environ.get('DATABASE_URL'): + # Use POSTGRES_* variables if available, otherwise construct from DATABASE_* variables + host = os.environ.get('POSTGRES_HOST') or os.environ.get('DATABASE_HOST') + port = os.environ.get('POSTGRES_PORT') or os.environ.get('DATABASE_PORT') + db = os.environ.get('POSTGRES_DB') or os.environ.get('DATABASE_NAME') + user = os.environ.get('POSTGRES_USER') or os.environ.get('DATABASE_USER') + password = os.environ.get('POSTGRES_PASSWORD') or os.environ.get('DATABASE_PASSWORD') + + # Construct DATABASE_URL + if password: + os.environ['DATABASE_URL'] = f"postgresql://{user}:{password}@{host}:{port}/{db}" + else: + os.environ['DATABASE_URL'] = f"postgresql://{user}@{host}:{port}/{db}" # Print database connection info logger.info(f"Database connection info:") - logger.info(f"- HOST: {os.environ.get('DATABASE_HOST')}") - logger.info(f"- PORT: {os.environ.get('DATABASE_PORT')}") - logger.info(f"- DATABASE: {os.environ.get('DATABASE_NAME')}") - logger.info(f"- USER: {os.environ.get('DATABASE_USER')}") + logger.info(f"- HOST: {os.environ.get('POSTGRES_HOST') or os.environ.get('DATABASE_HOST')}") + logger.info(f"- PORT: {os.environ.get('POSTGRES_PORT') or os.environ.get('DATABASE_PORT')}") + logger.info(f"- DATABASE: {os.environ.get('POSTGRES_DB') or os.environ.get('DATABASE_NAME')}") + logger.info(f"- USER: {os.environ.get('POSTGRES_USER') or os.environ.get('DATABASE_USER')}") + # Print the DATABASE_URL with password masked + db_url = os.environ.get('DATABASE_URL', '') + if db_url: + masked_url = db_url + if os.environ.get('POSTGRES_PASSWORD') or os.environ.get('DATABASE_PASSWORD'): + password = os.environ.get('POSTGRES_PASSWORD') or os.environ.get('DATABASE_PASSWORD') + masked_url = db_url.replace(password, '******') + logger.info(f"- DATABASE_URL: {masked_url}") # Print execution summary logger.info(f"Running report generator with the following settings:") diff --git a/delphi/umap_narrative/create_dynamodb_tables.py b/delphi/umap_narrative/create_dynamodb_tables.py new file mode 100644 index 0000000000..1b72f268d8 --- /dev/null +++ b/delphi/umap_narrative/create_dynamodb_tables.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +""" +Create DynamoDB tables for EVōC (Efficient Visualization of Clusters). + +This script creates all necessary DynamoDB tables for the EVōC pipeline: +- Core tables: ConversationMeta, CommentEmbeddings, CommentClusters, ClusterTopics, UMAPGraph +- Extended tables: ClusterCharacteristics, LLMTopicNames + +Usage: + python create_dynamodb_tables.py [--endpoint-url ENDPOINT_URL] + +Args: + --endpoint-url: DynamoDB endpoint URL (default: http://localhost:8000) +""" + +import boto3 +import os +import logging +import argparse + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') +logger = logging.getLogger(__name__) + +def create_tables(endpoint_url=None, delete_existing=False): + # Use the environment variable if endpoint_url is not provided + if endpoint_url is None: + endpoint_url = os.environ.get('DYNAMODB_ENDPOINT', 'http://localhost:8000') + + logger.info(f"Creating tables with DynamoDB endpoint: {endpoint_url}") + """ + Create all necessary DynamoDB tables for the EVōC pipeline. + + Args: + endpoint_url: URL of the DynamoDB endpoint (local or AWS) + delete_existing: If True, delete existing tables before creating new ones + """ + # Set up environment variables for credentials if not already set + if not os.environ.get('AWS_ACCESS_KEY_ID'): + os.environ['AWS_ACCESS_KEY_ID'] = 'fakeMyKeyId' + + if not os.environ.get('AWS_SECRET_ACCESS_KEY'): + os.environ['AWS_SECRET_ACCESS_KEY'] = 'fakeSecretAccessKey' + + # Create DynamoDB client + dynamodb = boto3.resource('dynamodb', + endpoint_url=endpoint_url, + region_name='us-west-2') + + # Get list of existing tables + existing_tables = [t.name for t in dynamodb.tables.all()] + logger.info(f"Existing tables: {existing_tables}") + + # Delete existing tables if requested + if delete_existing: + for table_name in existing_tables: + try: + table = dynamodb.Table(table_name) + table.delete() + logger.info(f"Deleted table {table_name}") + # Wait for table to be deleted + table.meta.client.get_waiter('table_not_exists').wait(TableName=table_name) + except Exception as e: + logger.error(f"Error deleting table {table_name}: {str(e)}") + + # Update list of existing tables after deletion + existing_tables = [t.name for t in dynamodb.tables.all()] + logger.info(f"Tables after deletion: {existing_tables}") + + # Define table schemas + tables = { + # Report table + 'report_narrative_store': { + 'KeySchema': [ + {'AttributeName': 'rid_section_model', 'KeyType': 'HASH'}, + {'AttributeName': 'timestamp', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'rid_section_model', 'AttributeType': 'S'}, + {'AttributeName': 'timestamp', 'AttributeType': 'S'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + # Core tables + 'ConversationMeta': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + 'CommentEmbeddings': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'}, + {'AttributeName': 'comment_id', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'}, + {'AttributeName': 'comment_id', 'AttributeType': 'N'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + 'CommentClusters': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'}, + {'AttributeName': 'comment_id', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'}, + {'AttributeName': 'comment_id', 'AttributeType': 'N'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + 'ClusterTopics': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'}, + {'AttributeName': 'cluster_key', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'}, + {'AttributeName': 'cluster_key', 'AttributeType': 'S'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + 'UMAPGraph': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'}, + {'AttributeName': 'edge_id', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'}, + {'AttributeName': 'edge_id', 'AttributeType': 'S'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + + # Extended tables + 'ClusterCharacteristics': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'}, + {'AttributeName': 'cluster_key', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'}, + {'AttributeName': 'cluster_key', 'AttributeType': 'S'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + }, + 'LLMTopicNames': { + 'KeySchema': [ + {'AttributeName': 'conversation_id', 'KeyType': 'HASH'}, + {'AttributeName': 'topic_key', 'KeyType': 'RANGE'} + ], + 'AttributeDefinitions': [ + {'AttributeName': 'conversation_id', 'AttributeType': 'S'}, + {'AttributeName': 'topic_key', 'AttributeType': 'S'} + ], + 'ProvisionedThroughput': { + 'ReadCapacityUnits': 5, + 'WriteCapacityUnits': 5 + } + } + } + + # Create tables if they don't exist + for table_name, table_schema in tables.items(): + if table_name in existing_tables: + logger.info(f"Table {table_name} already exists, skipping creation") + continue + + try: + table = dynamodb.create_table( + TableName=table_name, + **table_schema + ) + logger.info(f"Created table {table_name}") + except Exception as e: + logger.error(f"Error creating table {table_name}: {str(e)}") + + # Check that all tables were created + updated_tables = [t.name for t in dynamodb.tables.all()] + logger.info(f"Tables after creation: {updated_tables}") + + missing_tables = set(tables.keys()) - set(updated_tables) + if missing_tables: + logger.warning(f"Some tables could not be created: {missing_tables}") + else: + logger.info("All tables successfully created!") + +def main(): + # Parse arguments + parser = argparse.ArgumentParser(description='Create DynamoDB tables for EVōC') + parser.add_argument('--endpoint-url', type=str, default='http://localhost:8000', + help='DynamoDB endpoint URL (default: http://localhost:8000)') + parser.add_argument('--delete-existing', action='store_true', + help='Delete existing tables before creating new ones') + args = parser.parse_args() + + # Create tables + create_tables(endpoint_url=args.endpoint_url, delete_existing=args.delete_existing) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/delphi/umap_narrative/polismath_commentgraph/schemas/dynamo_models.py b/delphi/umap_narrative/polismath_commentgraph/schemas/dynamo_models.py index be7581b1e8..24ce8db104 100644 --- a/delphi/umap_narrative/polismath_commentgraph/schemas/dynamo_models.py +++ b/delphi/umap_narrative/polismath_commentgraph/schemas/dynamo_models.py @@ -63,11 +63,15 @@ class ConversationMeta(BaseModel): class CommentEmbedding(BaseModel): """Embedding vector for a single comment. - Note: UMAP coordinates are stored as "position" in UMAPGraph table where source_id = target_id = comment_id. + Note: UMAP coordinates are stored both: + 1. In this table for direct access to 2D coordinates + 2. In the UMAPGraph table for graph-based operations + Nearest neighbors are stored as edges in UMAPGraph where either source_id or target_id = comment_id.""" conversation_id: str comment_id: int embedding: Embedding + umap_coordinates: Optional[Coordinates] = None class CommentCluster(BaseModel): diff --git a/delphi/umap_narrative/polismath_commentgraph/utils/converter.py b/delphi/umap_narrative/polismath_commentgraph/utils/converter.py index 3bcf6e393d..ec02b164e4 100644 --- a/delphi/umap_narrative/polismath_commentgraph/utils/converter.py +++ b/delphi/umap_narrative/polismath_commentgraph/utils/converter.py @@ -591,7 +591,8 @@ def batch_convert_llm_topic_names( @staticmethod def batch_convert_embeddings( conversation_id: str, - document_vectors: np.ndarray + document_vectors: np.ndarray, + document_map: Optional[np.ndarray] = None ) -> List[CommentEmbedding]: """ Convert batch of embeddings from NumPy arrays to model objects. @@ -606,14 +607,26 @@ def batch_convert_embeddings( embeddings = [] for i in range(len(document_vectors)): - # Create model with just the embedding vectors - # UMAP coordinates and nearest neighbors are stored exclusively in UMAPGraph + # Generate UMAP coordinates for the comment (if document_map is provided) + if document_map is not None and i < len(document_map): + umap_coords = Coordinates( + x=float(document_map[i][0]), + y=float(document_map[i][1]) + ) + else: + umap_coords = None + + # Create the model with embedding vector embedding = DataConverter.create_comment_embedding( conversation_id=conversation_id, comment_id=i, vector=document_vectors[i] ) + # Add UMAP coordinates directly to the embedding model if available + if umap_coords is not None: + embedding.umap_coordinates = umap_coords + embeddings.append(embedding) return embeddings diff --git a/delphi/umap_narrative/run_pipeline.py b/delphi/umap_narrative/run_pipeline.py index 7aba05cb4e..5ac1eca410 100755 --- a/delphi/umap_narrative/run_pipeline.py +++ b/delphi/umap_narrative/run_pipeline.py @@ -15,8 +15,12 @@ import pandas as pd import matplotlib.pyplot as plt from datetime import datetime +import decimal +import traceback from pathlib import Path from tqdm.auto import tqdm +import boto3 +from boto3.dynamodb.conditions import Key # Import from installed packages import evoc @@ -37,38 +41,97 @@ def setup_environment(db_host=None, db_port=None, db_name=None, db_user=None, db_password=None): """Set up environment variables for database connections.""" - # PostgreSQL settings + # PostgreSQL settings - Prefer POSTGRES_* variables over DATABASE_* variables + postgres_host = os.environ.get('POSTGRES_HOST') + postgres_port = os.environ.get('POSTGRES_PORT') + postgres_db = os.environ.get('POSTGRES_DB') + postgres_user = os.environ.get('POSTGRES_USER') + postgres_password = os.environ.get('POSTGRES_PASSWORD') + + # Priority: 1. Command line args, 2. POSTGRES_* env vars, 3. DATABASE_* env vars, 4. defaults + + # Host if db_host: os.environ['DATABASE_HOST'] = db_host + os.environ['POSTGRES_HOST'] = db_host + elif postgres_host: + os.environ['DATABASE_HOST'] = postgres_host elif not os.environ.get('DATABASE_HOST'): os.environ['DATABASE_HOST'] = 'localhost' + # Also set POSTGRES_HOST for consistency + if not postgres_host: + os.environ['POSTGRES_HOST'] = 'localhost' + # Port if db_port: os.environ['DATABASE_PORT'] = str(db_port) + os.environ['POSTGRES_PORT'] = str(db_port) + elif postgres_port: + os.environ['DATABASE_PORT'] = postgres_port elif not os.environ.get('DATABASE_PORT'): os.environ['DATABASE_PORT'] = '5432' + # Also set POSTGRES_PORT for consistency + if not postgres_port: + os.environ['POSTGRES_PORT'] = '5432' + # Database name if db_name: os.environ['DATABASE_NAME'] = db_name + os.environ['POSTGRES_DB'] = db_name + elif postgres_db: + os.environ['DATABASE_NAME'] = postgres_db elif not os.environ.get('DATABASE_NAME'): os.environ['DATABASE_NAME'] = 'polisDB_prod_local_mar14' + # Also set POSTGRES_DB for consistency + if not postgres_db: + os.environ['POSTGRES_DB'] = 'polisDB_prod_local_mar14' + # User if db_user: os.environ['DATABASE_USER'] = db_user + os.environ['POSTGRES_USER'] = db_user + elif postgres_user: + os.environ['DATABASE_USER'] = postgres_user elif not os.environ.get('DATABASE_USER'): os.environ['DATABASE_USER'] = 'postgres' + # Also set POSTGRES_USER for consistency + if not postgres_user: + os.environ['POSTGRES_USER'] = 'postgres' + # Password if db_password: os.environ['DATABASE_PASSWORD'] = db_password + os.environ['POSTGRES_PASSWORD'] = db_password + elif postgres_password: + os.environ['DATABASE_PASSWORD'] = postgres_password elif not os.environ.get('DATABASE_PASSWORD'): os.environ['DATABASE_PASSWORD'] = '' + # Also set POSTGRES_PASSWORD for consistency + if not postgres_password: + os.environ['POSTGRES_PASSWORD'] = '' + + # Ensure DATABASE_URL is set if not already - needed by some components + if not os.environ.get('DATABASE_URL'): + # Use POSTGRES_* variables if available, otherwise construct from DATABASE_* variables + host = os.environ.get('POSTGRES_HOST') or os.environ.get('DATABASE_HOST') + port = os.environ.get('POSTGRES_PORT') or os.environ.get('DATABASE_PORT') + db = os.environ.get('POSTGRES_DB') or os.environ.get('DATABASE_NAME') + user = os.environ.get('POSTGRES_USER') or os.environ.get('DATABASE_USER') + password = os.environ.get('POSTGRES_PASSWORD') or os.environ.get('DATABASE_PASSWORD') + + # Construct DATABASE_URL + if password: + os.environ['DATABASE_URL'] = f"postgresql://{user}:{password}@{host}:{port}/{db}" + else: + os.environ['DATABASE_URL'] = f"postgresql://{user}@{host}:{port}/{db}" # Print database connection info logger.info(f"Database connection info:") - logger.info(f"- HOST: {os.environ.get('DATABASE_HOST')}") - logger.info(f"- PORT: {os.environ.get('DATABASE_PORT')}") - logger.info(f"- DATABASE: {os.environ.get('DATABASE_NAME')}") - logger.info(f"- USER: {os.environ.get('DATABASE_USER')}") + logger.info(f"- HOST: {os.environ.get('POSTGRES_HOST') or os.environ.get('DATABASE_HOST')}") + logger.info(f"- PORT: {os.environ.get('POSTGRES_PORT') or os.environ.get('DATABASE_PORT')}") + logger.info(f"- DATABASE: {os.environ.get('POSTGRES_DB') or os.environ.get('DATABASE_NAME')}") + logger.info(f"- USER: {os.environ.get('POSTGRES_USER') or os.environ.get('DATABASE_USER')}") + logger.info(f"- DATABASE_URL: {os.environ.get('DATABASE_URL').replace(os.environ.get('POSTGRES_PASSWORD') or '', '*****')}") # DynamoDB settings (for local DynamoDB) # Don't override if already set in environment @@ -89,18 +152,136 @@ def setup_environment(db_host=None, db_port=None, db_name=None, db_user=None, db if not os.environ.get('AWS_DEFAULT_REGION') and not os.environ.get('AWS_REGION'): os.environ['AWS_DEFAULT_REGION'] = 'us-west-2' -def fetch_conversation_data(zid): +def fetch_conversation_data(zid, skip_postgres=False, postgres_for_comments_only=True): """ - Fetch conversation data from PostgreSQL. + Fetch conversation data, trying DynamoDB first and optionally falling back to PostgreSQL. Args: zid: Conversation ID + skip_postgres: If True, only use DynamoDB and don't fall back to PostgreSQL + postgres_for_comments_only: If True, still use PostgreSQL for comment texts even if skip_postgres is True Returns: comments: List of comment dictionaries metadata: Dictionary with conversation metadata """ - logger.info(f"Fetching conversation {zid} from PostgreSQL...") + # First try to load from DynamoDB + logger.info(f"Attempting to load conversation {zid} from DynamoDB...") + try: + # Initialize DynamoDB storage + dynamo_storage = DynamoDBStorage( + region_name='us-west-2', + endpoint_url=os.environ.get('DYNAMODB_ENDPOINT') + ) + + # Get conversation metadata from PCA results table + try: + config_table = dynamo_storage.dynamodb.Table('Delphi_PCAConversationConfig') + config_response = config_table.get_item(Key={'zid': str(zid)}) + + if 'Item' in config_response: + logger.info(f"Found conversation metadata in DynamoDB for {zid}") + meta = config_response['Item'] + math_tick = meta.get('latest_math_tick') + + if math_tick: + logger.info(f"Found math tick {math_tick} in PCA config") + # Get comment data from Delphi_CommentRouting table + try: + zid_tick = f"{zid}:{math_tick}" + comments_table = dynamo_storage.dynamodb.Table('Delphi_CommentRouting') + response = comments_table.query( + KeyConditionExpression=boto3.dynamodb.conditions.Key('zid_tick').eq(zid_tick) + ) + + comment_items = response.get('Items', []) + # Handle pagination if needed + while 'LastEvaluatedKey' in response: + response = comments_table.query( + KeyConditionExpression=boto3.dynamodb.conditions.Key('zid_tick').eq(zid_tick), + ExclusiveStartKey=response['LastEvaluatedKey'] + ) + comment_items.extend(response.get('Items', [])) + + if comment_items: + logger.info(f"Found {len(comment_items)} comments in DynamoDB") + + # We need to get comment texts from PostgreSQL since they're not in DynamoDB + # Get PostgreSQL connection temporarily + postgres_client = PostgresClient() + postgres_client.initialize() + + # Get comment texts from PostgreSQL if not skipped entirely + if skip_postgres and not postgres_for_comments_only: + logger.warning("PostgreSQL lookup for comment texts is disabled, using placeholders") + # Create placeholder texts based on comment IDs + all_comments = [{'tid': cid, 'txt': f"Comment {cid} (text unavailable)"} + for cid in [item['comment_id'] for item in comment_items]] + else: + # Always try to get actual comment texts from PostgreSQL + try: + if skip_postgres and postgres_for_comments_only: + logger.info("Getting comment texts from PostgreSQL (postgres_for_comments_only=True overrides skip_postgres)") + else: + logger.info("Getting comment texts from PostgreSQL" + (" (even in skip_postgres mode)" if skip_postgres else "")) + all_comments = postgres_client.get_comments_by_conversation(zid) + postgres_client.shutdown() + except Exception as e: + logger.warning(f"Failed to get comment texts from PostgreSQL: {e}, using placeholders") + # Fall back to placeholders if PostgreSQL access fails + all_comments = [{'tid': cid, 'txt': f"Comment {cid} (text unavailable)"} + for cid in [item['comment_id'] for item in comment_items]] + + # Create a mapping of comment IDs to texts + comment_texts = {str(c['tid']): c['txt'] for c in all_comments} + + # Merge comment data from DynamoDB with texts from PostgreSQL + comments = [] + for item in comment_items: + comment_id = item['comment_id'] + if comment_id in comment_texts: + comments.append({ + 'tid': comment_id, + 'zid': zid, + 'txt': comment_texts[comment_id], + 'priority': item.get('priority', 0), + 'active': True # Assume active + }) + + # Create metadata + metadata = { + 'conversation_id': str(zid), + 'zid': zid, + 'conversation_name': meta.get('topic', f"Conversation {zid}"), + 'description': '', + 'num_comments': len(comments), + 'num_participants': meta.get('participant_count', 0), + 'source': 'dynamo', + 'math_tick': math_tick + } + + logger.info(f"Successfully loaded {len(comments)} comments from DynamoDB") + return comments, metadata + except Exception as e: + logger.warning(f"Error loading comment data from DynamoDB: {e}") + logger.warning("Falling back to PostgreSQL") + except Exception as e: + logger.warning(f"Error checking PCA config: {e}") + logger.warning("Falling back to PostgreSQL") + except Exception as e: + logger.warning(f"Error accessing DynamoDB: {e}") + if skip_postgres: + logger.error("DynamoDB access failed and PostgreSQL fallback is disabled") + return None, None + logger.warning("Falling back to PostgreSQL") + + # Fall back to PostgreSQL if not skipped + if skip_postgres: + logger.info("Skipping PostgreSQL load as requested") + return None, None + + logger.warning(f"DEPRECATED: Falling back to PostgreSQL for conversation {zid}. PostgreSQL support will be removed in a future release.") + logger.warning("Please ensure your data is stored in DynamoDB for future compatibility.") postgres_client = PostgresClient() try: @@ -133,13 +314,14 @@ def fetch_conversation_data(zid): 'owner': conversation.get('owner', ''), 'num_comments': len(comments), 'active_count': active_count, - 'inactive_count': inactive_count + 'inactive_count': inactive_count, + 'source': 'postgres' } return comments, metadata except Exception as e: - logger.error(f"Error fetching conversation: {str(e)}") + logger.error(f"Error fetching conversation from PostgreSQL: {str(e)}") import traceback logger.error(traceback.format_exc()) return None, None @@ -434,6 +616,13 @@ def create_basic_layer_visualization( title, sub_title ): + # Convert any Decimal values in data_map to float + import decimal + if isinstance(data_map, np.ndarray) and data_map.size > 0: + # Check if we have Decimal objects that need conversion + if isinstance(data_map.flat[0], decimal.Decimal): + logger.debug("Converting Decimal values in data_map to float") + data_map = np.array([[float(x) for x in point] for point in data_map], dtype=np.float64) """ Create a basic visualization with numeric topic labels for a specific layer. @@ -451,9 +640,46 @@ def create_basic_layer_visualization( Returns: file_path: Path to the saved visualization """ + # Safety check: Ensure data_map is not empty + if len(data_map) == 0: + logger.error(f"Cannot create visualization for {file_prefix}: data_map is empty (shape: {data_map.shape})") + return None + + # Safety check: Make sure we have cluster assignments + if len(cluster_layer) == 0: + logger.error(f"Cannot create visualization for {file_prefix}: cluster_layer is empty (shape: {cluster_layer.shape})") + return None + + # Safety check: Make sure arrays have matching dimensions + if len(data_map) != len(cluster_layer): + logger.error(f"Cannot create visualization for {file_prefix}: data_map length ({len(data_map)}) doesn't match cluster_layer length ({len(cluster_layer)})") + return None + + # Safety check: Make sure we have hover info + if not hover_info or len(hover_info) == 0: + logger.warning(f"No hover_info provided for {file_prefix}. Creating default hover text.") + hover_info = [f"Comment {i}" for i in range(len(data_map))] + + # Safety check: Make sure hover_info length matches data_map + if len(hover_info) != len(data_map): + logger.warning(f"hover_info length ({len(hover_info)}) doesn't match data_map length ({len(data_map)}). Adjusting...") + # Adjust hover_info to match data_map length + if len(hover_info) < len(data_map): + # Extend hover_info with default values + hover_info.extend([f"Comment {i+len(hover_info)}" for i in range(len(data_map) - len(hover_info))]) + else: + # Truncate hover_info + hover_info = hover_info[:len(data_map)] + # Create labels vector + # Debug the cluster_labels keys and a sample of cluster_layer values + logger.debug(f"cluster_labels keys: {list(cluster_labels.keys())[:5]} (type: {type(next(iter(cluster_labels.keys()), None))})") + sample_clusters = cluster_layer[cluster_layer >= 0][:5] if len(cluster_layer[cluster_layer >= 0]) > 0 else [] + logger.debug(f"Sample cluster_layer values: {sample_clusters} (type: {type(sample_clusters[0]) if len(sample_clusters) > 0 else 'N/A'})") + + # Fix: Convert integer keys to strings when looking up in cluster_labels dictionary labels_for_viz = np.array([ - cluster_labels.get(label, "Unlabelled") if label >= 0 else "Unlabelled" + cluster_labels.get(str(label), "Unlabelled") if label >= 0 else "Unlabelled" for label in cluster_layer ]) @@ -462,6 +688,33 @@ def create_basic_layer_visualization( viz_file = os.path.join(output_path, f"{file_prefix}.html") try: + logger.debug(f"Visualization input arrays: data_map shape: {data_map.shape}, labels_for_viz shape: {labels_for_viz.shape}, hover_info length: {len(hover_info)}") + + # Handle single-label datasets which can cause datamapplot to fail + if len(np.unique(labels_for_viz)) <= 1: + logger.warning(f"Only one unique label found. Adding dummy labels to prevent datamapplot errors.") + + # Create a modified labels array with artificial variation to prevent errors + modified_labels = np.array(labels_for_viz) + + # Assign a different dummy label to ~5% of points + num_points = len(modified_labels) + num_to_change = max(2, int(num_points * 0.05)) + + # Use evenly distributed indices + change_indices = np.arange(num_points)[::max(1, num_points//num_to_change)][:num_to_change] + + # Change labels for these points + if all(label == "Unlabelled" for label in modified_labels): + modified_labels[change_indices] = "Dummy Label" + else: + modified_labels[change_indices] = "Unlabelled" + + logger.info(f"Modified {num_to_change} labels ({num_to_change/num_points:.1%} of total) to ensure datamapplot works") + + # Use the modified labels for visualization + labels_for_viz = modified_labels + interactive_figure = datamapplot.create_interactive_plot( data_map, labels_for_viz, @@ -480,6 +733,10 @@ def create_basic_layer_visualization( return viz_file except Exception as e: logger.error(f"Error creating basic visualization: {e}") + # Log more details about the error + import traceback + logger.error(f"Detailed error traceback: {traceback.format_exc()}") + logger.error(f"Input data shapes: data_map: {data_map.shape}, labels_for_viz: {labels_for_viz.shape}, hover_info: {len(hover_info)}") return None def create_named_layer_visualization( @@ -492,6 +749,13 @@ def create_named_layer_visualization( title, sub_title ): + # Convert any Decimal values in data_map to float + import decimal + if isinstance(data_map, np.ndarray) and data_map.size > 0: + # Check if we have Decimal objects that need conversion + if isinstance(data_map.flat[0], decimal.Decimal): + logger.debug("Converting Decimal values in data_map to float") + data_map = np.array([[float(x) for x in point] for point in data_map], dtype=np.float64) """ Create a named visualization with explicit topic labels for a specific layer. @@ -508,9 +772,46 @@ def create_named_layer_visualization( Returns: file_path: Path to the saved visualization """ + # Safety check: Ensure data_map is not empty + if len(data_map) == 0: + logger.error(f"Cannot create visualization for {file_prefix}: data_map is empty (shape: {data_map.shape})") + return None + + # Safety check: Make sure we have cluster assignments + if len(cluster_layer) == 0: + logger.error(f"Cannot create visualization for {file_prefix}: cluster_layer is empty (shape: {cluster_layer.shape})") + return None + + # Safety check: Make sure arrays have matching dimensions + if len(data_map) != len(cluster_layer): + logger.error(f"Cannot create visualization for {file_prefix}: data_map length ({len(data_map)}) doesn't match cluster_layer length ({len(cluster_layer)})") + return None + + # Safety check: Make sure we have hover info + if not hover_info or len(hover_info) == 0: + logger.warning(f"No hover_info provided for {file_prefix}. Creating default hover text.") + hover_info = [f"Comment {i}" for i in range(len(data_map))] + + # Safety check: Make sure hover_info length matches data_map + if len(hover_info) != len(data_map): + logger.warning(f"hover_info length ({len(hover_info)}) doesn't match data_map length ({len(data_map)}). Adjusting...") + # Adjust hover_info to match data_map length + if len(hover_info) < len(data_map): + # Extend hover_info with default values + hover_info.extend([f"Comment {i+len(hover_info)}" for i in range(len(data_map) - len(hover_info))]) + else: + # Truncate hover_info + hover_info = hover_info[:len(data_map)] + # Create labels vector + # Debug the cluster_labels keys and a sample of cluster_layer values + logger.debug(f"cluster_labels keys: {list(cluster_labels.keys())[:5]} (type: {type(next(iter(cluster_labels.keys()), None))})") + sample_clusters = cluster_layer[cluster_layer >= 0][:5] if len(cluster_layer[cluster_layer >= 0]) > 0 else [] + logger.debug(f"Sample cluster_layer values: {sample_clusters} (type: {type(sample_clusters[0]) if len(sample_clusters) > 0 else 'N/A'})") + + # Fix: Convert integer keys to strings when looking up in cluster_labels dictionary labels_for_viz = np.array([ - cluster_labels.get(label, "Unlabelled") if label >= 0 else "Unlabelled" + cluster_labels.get(str(label), "Unlabelled") if label >= 0 else "Unlabelled" for label in cluster_layer ]) @@ -519,6 +820,33 @@ def create_named_layer_visualization( viz_file = os.path.join(output_path, f"{file_prefix}.html") try: + logger.debug(f"Visualization input arrays: data_map shape: {data_map.shape}, labels_for_viz shape: {labels_for_viz.shape}, hover_info length: {len(hover_info)}") + + # Handle single-label datasets which can cause datamapplot to fail + if len(np.unique(labels_for_viz)) <= 1: + logger.warning(f"Only one unique label found. Adding dummy labels to prevent datamapplot errors.") + + # Create a modified labels array with artificial variation to prevent errors + modified_labels = np.array(labels_for_viz) + + # Assign a different dummy label to ~5% of points + num_points = len(modified_labels) + num_to_change = max(2, int(num_points * 0.05)) + + # Use evenly distributed indices + change_indices = np.arange(num_points)[::max(1, num_points//num_to_change)][:num_to_change] + + # Change labels for these points + if all(label == "Unlabelled" for label in modified_labels): + modified_labels[change_indices] = "Dummy Label" + else: + modified_labels[change_indices] = "Unlabelled" + + logger.info(f"Modified {num_to_change} labels ({num_to_change/num_points:.1%} of total) to ensure datamapplot works") + + # Use the modified labels for visualization + labels_for_viz = modified_labels + interactive_figure = datamapplot.create_interactive_plot( data_map, labels_for_viz, @@ -537,6 +865,10 @@ def create_named_layer_visualization( return viz_file except Exception as e: logger.error(f"Error creating named visualization: {e}") + # Log more details about the error + import traceback + logger.error(f"Detailed error traceback: {traceback.format_exc()}") + logger.error(f"Input data shapes: data_map: {data_map.shape}, labels_for_viz: {labels_for_viz.shape}, hover_info: {len(hover_info)}") return None def process_layers_and_store_characteristics( @@ -613,6 +945,13 @@ def create_static_datamapplot( output_dir, layer_num=0 ): + # Convert any Decimal values in document_map to float + import decimal + if isinstance(document_map, np.ndarray) and document_map.size > 0: + # Check if we have Decimal objects that need conversion + if isinstance(document_map.flat[0], decimal.Decimal): + logger.debug("Converting Decimal values in document_map to float for static datamapplot") + document_map = np.array([[float(x) for x in point] for point in document_map], dtype=np.float64) """ Generate static datamapplot visualizations for a layer. @@ -652,13 +991,28 @@ def clean_topic_name(name): for label in cluster_layer ]) + # Check if we have enough unique labels to create a meaningful visualization + unique_labels = set(cluster_layer) + unique_non_noise = [label for label in unique_labels if label >= 0] + + if len(unique_non_noise) == 0: + logger.warning(f"Layer {layer_num} has no clusters (only noise). Skipping visualization.") + return False + + if len(unique_non_noise) == 1: + logger.warning(f"Layer {layer_num} has only one cluster. Using simplified visualization settings.") + # For single-cluster visualizations, turn off dynamic sizing to avoid array indexing issues + use_dynamic_size = False + else: + use_dynamic_size = True + # Generate the static plot - it returns (fig, ax) tuple fig, ax = datamapplot.create_plot( document_map, label_strings, title=f"Conversation {conversation_id} - Layer {layer_num}", label_over_points=True, # Place labels directly over the point clusters - dynamic_label_size=True, # Vary label size based on cluster size + dynamic_label_size=use_dynamic_size, # Vary label size based on cluster size if we have multiple clusters dynamic_label_size_scaling_factor=0.75, max_font_size=28, # Maximum font size for labels min_font_size=12, # Minimum font size for labels @@ -736,6 +1090,20 @@ def create_visualizations( Returns: The path to the index file """ + # Safety check: Make sure document_map is not empty + if len(document_map) == 0: + logger.error(f"Cannot create visualizations: document_map is empty. Generating synthetic data for visualization...") + # Generate synthetic data for visualization + num_comments = len(comment_texts) + if num_comments > 0: + # Create synthetic random 2D positions + np.random.seed(42) # For reproducibility + document_map = np.random.rand(num_comments, 2) * 10 # Scale up for visibility + logger.info(f"Generated synthetic document_map with shape: {document_map.shape}") + else: + logger.error(f"Cannot create visualizations: no comments available.") + return None + # If layer_data not provided, generate it if layer_data is None: logger.info("Layer data not provided, generating it...") @@ -772,41 +1140,92 @@ def create_visualizations( cluster_layer, characteristics, comment_texts ) - # Create basic visualization - basic_file = create_basic_layer_visualization( - output_dir, - f"{conversation_id}_comment_layer_{layer_idx}_basic", - document_map, - cluster_layer, - characteristics, - numeric_topic_names, - hover_info, - f"{conversation_name} Comment Layer {layer_idx} - {len(np.unique(cluster_layer[cluster_layer >= 0]))} topics", - f"Comment topics with numeric labels" - ) + # Calculate number of unique clusters (excluding noise) + unique_non_noise = len(np.unique(cluster_layer[cluster_layer >= 0])) + logger.info(f"Layer {layer_idx} has {unique_non_noise} unique clusters (excluding noise)") + + # Additional safety check: Ensure arrays have matching dimensions + if len(document_map) != len(cluster_layer): + logger.warning(f"Document map length ({len(document_map)}) doesn't match cluster layer length ({len(cluster_layer)}). Adjusting...") + if len(document_map) > len(cluster_layer): + # Truncate document_map to match cluster_layer + document_map = document_map[:len(cluster_layer)] + logger.info(f"Truncated document_map to shape {document_map.shape}") + else: + # Truncate cluster_layer to match document_map + cluster_layer = cluster_layer[:len(document_map)] + logger.info(f"Truncated cluster_layer to length {len(cluster_layer)}") + # Recalculate unique_non_noise after truncation + unique_non_noise = len(np.unique(cluster_layer[cluster_layer >= 0])) + logger.info(f"After truncation, layer {layer_idx} has {unique_non_noise} unique clusters") + + # Create basic visualization - skip if no clusters + if unique_non_noise == 0: + logger.warning(f"Layer {layer_idx} has no clusters (only noise). Skipping basic visualization.") + basic_file = None + else: + try: + logger.info(f"Creating basic visualization for {conversation_id}_comment_layer_{layer_idx}_basic...") + basic_file = create_basic_layer_visualization( + output_dir, + f"{conversation_id}_comment_layer_{layer_idx}_basic", + document_map, + cluster_layer, + characteristics, + numeric_topic_names, + hover_info, + f"{conversation_name} Comment Layer {layer_idx} - {unique_non_noise} topics", + f"Comment topics with numeric labels" + ) + except Exception as e: + logger.error(f"Error creating basic visualization: {str(e)}") + import traceback + logger.error(f"Detailed error traceback: {traceback.format_exc()}") + logger.error(f"Input data shapes: document_map: {document_map.shape}, cluster_layer: {cluster_layer.shape}, hover_info: {len(hover_info)}") + basic_file = None # Create named visualization with just numeric topic names for now # (LLM names will be added in a separate step later) - named_file = create_named_layer_visualization( - output_dir, - f"{conversation_id}_comment_layer_{layer_idx}_named", - document_map, - cluster_layer, - numeric_topic_names, - hover_info, - f"{conversation_name} Comment Layer {layer_idx} - {len(np.unique(cluster_layer[cluster_layer >= 0]))} topics", - f"Comment topics (to be updated with LLM topic names)" - ) + if unique_non_noise == 0: + logger.warning(f"Layer {layer_idx} has no clusters (only noise). Skipping named visualization.") + named_file = None + else: + try: + logger.info(f"Creating named visualization for {conversation_id}_comment_layer_{layer_idx}_named...") + named_file = create_named_layer_visualization( + output_dir, + f"{conversation_id}_comment_layer_{layer_idx}_named", + document_map, + cluster_layer, + numeric_topic_names, + hover_info, + f"{conversation_name} Comment Layer {layer_idx} - {unique_non_noise} topics", + f"Comment topics (to be updated with LLM topic names)" + ) + except Exception as e: + logger.error(f"Error creating named visualization: {str(e)}") + import traceback + logger.error(f"Detailed error traceback: {traceback.format_exc()}") + logger.error(f"Input data shapes: document_map: {document_map.shape}, cluster_layer: {cluster_layer.shape}, hover_info: {len(hover_info)}") + named_file = None # Generate static datamapplot visualizations - create_static_datamapplot( - conversation_id, - document_map, - cluster_layer, - numeric_topic_names, - output_dir, - layer_idx - ) + # Skip if layer has too few unique clusters (excluding noise) + unique_non_noise = len(np.unique(cluster_layer[cluster_layer >= 0])) + if unique_non_noise == 0: + logger.warning(f"Layer {layer_idx} has no clusters (only noise). Skipping static datamapplot.") + else: + logger.info(f"Generating static datamapplot for layer {layer_idx} with {unique_non_noise} unique clusters...") + success = create_static_datamapplot( + conversation_id, + document_map, + cluster_layer, + numeric_topic_names, + output_dir, + layer_idx + ) + if not success: + logger.warning(f"Failed to create static datamapplot for layer {layer_idx}") # Generate consensus/divisive visualization try: @@ -1110,7 +1529,361 @@ def create_enhanced_multilayer_index( return index_file -def process_conversation(zid, export_dynamo=True, use_ollama=False): +def load_processed_data_from_dynamo(dynamo_storage, conversation_id, math_tick=None, skip_postgres=False, postgres_for_comments_only=True): + """ + Load pre-processed embeddings and cluster assignments from DynamoDB. + + This allows the UMAP pipeline to skip expensive computation steps when data + is already available from the math pipeline. + + Args: + dynamo_storage: DynamoDB storage instance + conversation_id: Conversation ID + math_tick: Optional math tick to use for retrieving from specific version + skip_postgres: Whether to skip PostgreSQL entirely + postgres_for_comments_only: If True, still try to get comment texts from PostgreSQL even if skip_postgres is True + + Returns: + None if data not found or error occurs, otherwise a dictionary with: + - document_vectors: Comment embeddings + - document_map: 2D UMAP projection + - cluster_layers: Hierarchy of cluster assignments + - comment_texts: List of comment text strings + - comment_ids: List of comment IDs + """ + logger.info(f"Attempting to load pre-processed data for conversation {conversation_id} from DynamoDB") + + try: + # First check if we have a PCA configuration in the math pipeline output + pca_config_table = dynamo_storage.dynamodb.Table('Delphi_PCAConversationConfig') + config_response = pca_config_table.get_item(Key={'zid': str(conversation_id)}) + + if 'Item' not in config_response: + logger.warning(f"No PCA configuration found for conversation {conversation_id}") + return None + + pca_config = config_response['Item'] + + # Get the math tick to use + if math_tick is None: + math_tick = pca_config.get('latest_math_tick') + if not math_tick: + logger.warning(f"No math tick found in PCA config for conversation {conversation_id}") + return None + + logger.info(f"Using math tick {math_tick} for conversation {conversation_id}") + zid_tick = f"{conversation_id}:{math_tick}" + + # Check if we have UMAP data already + try: + umap_config_table = dynamo_storage.dynamodb.Table(dynamo_storage.table_names['conversation_meta']) + umap_config_response = umap_config_table.get_item(Key={'conversation_id': str(conversation_id)}) + + if 'Item' in umap_config_response: + logger.info(f"Found existing UMAP data for conversation {conversation_id}") + + # Now try to load all the components we need + + # 1. Load all comment embeddings + embedding_table = dynamo_storage.dynamodb.Table(dynamo_storage.table_names['comment_embeddings']) + response = embedding_table.query( + KeyConditionExpression=Key('conversation_id').eq(str(conversation_id)) + ) + + comment_embeddings = response.get('Items', []) + # Handle pagination if needed + while 'LastEvaluatedKey' in response: + response = embedding_table.query( + KeyConditionExpression=Key('conversation_id').eq(str(conversation_id)), + ExclusiveStartKey=response['LastEvaluatedKey'] + ) + comment_embeddings.extend(response.get('Items', [])) + + if not comment_embeddings: + logger.warning(f"No comment embeddings found for conversation {conversation_id}") + return None + + logger.info(f"Found {len(comment_embeddings)} comment embeddings") + + # 2. Get comment cluster assignments + cluster_table = dynamo_storage.dynamodb.Table(dynamo_storage.table_names['comment_clusters']) + response = cluster_table.query( + KeyConditionExpression=Key('conversation_id').eq(str(conversation_id)) + ) + + comment_clusters = response.get('Items', []) + # Handle pagination if needed + while 'LastEvaluatedKey' in response: + response = cluster_table.query( + KeyConditionExpression=Key('conversation_id').eq(str(conversation_id)), + ExclusiveStartKey=response['LastEvaluatedKey'] + ) + comment_clusters.extend(response.get('Items', [])) + + if not comment_clusters: + logger.warning(f"No comment clusters found for conversation {conversation_id}") + return None + + logger.info(f"Found {len(comment_clusters)} comment cluster assignments") + + # Get list of comment IDs and order + comment_ids = [] + comment_texts = [] + + # Sort the embeddings by comment_id + comment_embeddings.sort(key=lambda x: int(x.get('comment_id', 0))) + + # Extract embeddings as numpy array + document_vectors = [] + document_map = [] + + # Get comment texts from PostgreSQL or create placeholders + # Always try to get texts from PostgreSQL unless explicitly disabled with skip_postgres=True + # and postgres_for_comments_only=False + try: + if skip_postgres and not postgres_for_comments_only: + logger.warning("PostgreSQL lookup for comment texts is disabled, using placeholders") + # Create text placeholders with IDs from the extracted comment_ids + # First populate comment_ids from embeddings to ensure we have them + comment_ids = [str(emb.get('comment_id', '')) for emb in comment_embeddings] + all_comments = [{'tid': cid, 'txt': f"Comment {cid} (text unavailable)"} + for cid in comment_ids] + else: + if skip_postgres and postgres_for_comments_only: + logger.info("Getting comment texts from PostgreSQL (postgres_for_comments_only=True overrides skip_postgres)") + else: + logger.info("Getting comment texts from PostgreSQL") + postgres_client = PostgresClient() + postgres_client.initialize() + all_comments = postgres_client.get_comments_by_conversation(int(conversation_id)) + postgres_client.shutdown() + except Exception as e: + logger.warning(f"Failed to get comment texts from PostgreSQL: {e}, using placeholders") + # Fall back to placeholders if PostgreSQL access fails + comment_ids = [str(emb.get('comment_id', '')) for emb in comment_embeddings] + all_comments = [{'tid': cid, 'txt': f"Comment {cid} (text unavailable)"} + for cid in comment_ids] + + # Create a mapping of comment IDs to texts + comment_id_to_text = {str(c['tid']): c['txt'] for c in all_comments} + + # Clear existing arrays before building + comment_ids = [] + comment_texts = [] + document_vectors = [] + document_map = [] + + # Log the number of embeddings and matching comments + logger.info(f"Total embeddings: {len(comment_embeddings)}") + logger.info(f"Available comment texts: {len(comment_id_to_text)}") + + # Track matched and missing comments + matched_count = 0 + missing_texts = [] + missing_embeddings = [] + + # Build the ordered lists and arrays + for embedding in comment_embeddings: + comment_id = str(embedding.get('comment_id')) + + # Track comments with missing text + if comment_id not in comment_id_to_text: + missing_texts.append(comment_id) + continue + + # Successfully matched comment + matched_count += 1 + + # Add to ordered lists + comment_ids.append(comment_id) + comment_texts.append(comment_id_to_text[comment_id]) + + # Check for vector in embedding + has_vector = False + + # Add to vectors + # Check for embedding vector in different formats + if 'embedding_vector' in embedding: + # Direct vector field (older format) + embed_vector = embedding['embedding_vector'] + if isinstance(embed_vector, str): + embed_vector = [float(x) for x in embed_vector.split(',')] + document_vectors.append(embed_vector) + has_vector = True + elif 'embedding' in embedding: + # Nested embedding object (newer format) + embed_obj = embedding['embedding'] + if isinstance(embed_obj, dict) and 'vector' in embed_obj: + # Get vector from nested object + embed_vector = embed_obj['vector'] + if isinstance(embed_vector, str): + embed_vector = [float(x) for x in embed_vector.split(',')] + document_vectors.append(embed_vector) + has_vector = True + + # Track comments with missing embeddings + if not has_vector: + missing_embeddings.append(comment_id) + + # Add to 2D mapping (check in both the dedicated field and legacy formats) + + # First try the dedicated field format (if this is a Pydantic model) + # Detailed debug logging temporarily removed for production use + + if hasattr(embedding, 'umap_coordinates') and embedding.umap_coordinates is not None: + coords = embedding.umap_coordinates + if hasattr(coords, 'x') and hasattr(coords, 'y'): + document_map.append([coords.x, coords.y]) + # Then try legacy dictionary formats + elif isinstance(embedding, dict): + # Try direct umap_coordinates field + if 'umap_coordinates' in embedding: + coords = embedding['umap_coordinates'] + if isinstance(coords, list) and len(coords) == 2: + document_map.append(coords) + elif isinstance(coords, dict) and 'x' in coords and 'y' in coords: + document_map.append([coords['x'], coords['y']]) + # Try position field (alternative location) + elif 'position' in embedding: + pos = embedding['position'] + if isinstance(pos, dict) and 'x' in pos and 'y' in pos: + document_map.append([pos['x'], pos['y']]) + # Try embedded umap_coordinates within embedding structure + elif 'embedding' in embedding and isinstance(embedding['embedding'], dict): + embed_obj = embedding['embedding'] + if 'umap_coordinates' in embed_obj: + embed_coords = embed_obj['umap_coordinates'] + if isinstance(embed_coords, list) and len(embed_coords) == 2: + document_map.append(embed_coords) + elif isinstance(embed_coords, dict) and 'x' in embed_coords and 'y' in embed_coords: + document_map.append([embed_coords['x'], embed_coords['y']]) + + # Log match statistics + logger.info(f"Successfully matched {matched_count} comments between PostgreSQL and DynamoDB") + if missing_texts: + logger.warning(f"Found {len(missing_texts)} comments with embeddings but missing text") + logger.warning(f"Example missing comment IDs: {missing_texts[:5]}") + if missing_embeddings: + logger.warning(f"Found {len(missing_embeddings)} comments with text but missing embedding vectors") + logger.warning(f"Example missing embedding IDs: {missing_embeddings[:5]}") + + # Convert to numpy arrays + document_vectors = np.array(document_vectors) + document_map = np.array(document_map) + + # Print debug information about array shapes + logger.info(f"After conversion - document_vectors shape: {document_vectors.shape}") + logger.info(f"After conversion - document_map shape: {document_map.shape}") + + # Basic info about extracted data + if len(document_map) == 0 and len(document_vectors) > 0: + logger.warning("UMAP coordinates are missing in DynamoDB") + + # If both document_vectors and document_map are empty, we need to create synthetic data + if len(document_vectors) == 0 and len(comment_texts) > 0: + logger.warning(f"document_vectors is empty but we have {len(comment_texts)} comments. Generating synthetic embeddings...") + try: + # Create synthetic random embeddings (just for visualization) + document_vectors = np.random.rand(len(comment_texts), 384) # Standard embedding dimension + logger.info(f"Generated synthetic document_vectors with shape: {document_vectors.shape}") + except Exception as e: + logger.error(f"Failed to generate synthetic embeddings: {e}") + + # If document_map is empty but we have document_vectors, generate UMAP projection + if len(document_map) == 0 and len(document_vectors) > 0: + logger.warning("document_map is empty but document_vectors exists. Generating UMAP projection...") + try: + from umap import UMAP + # Add more detailed logging about document_vectors + logger.info(f"Document vectors type: {type(document_vectors)}") + logger.info(f"Document vectors shape: {document_vectors.shape if hasattr(document_vectors, 'shape') else 'no shape attribute'}") + # Make sure document_vectors is a properly shaped numpy array + if not isinstance(document_vectors, np.ndarray): + document_vectors = np.array(document_vectors) + logger.info(f"Converted document_vectors to numpy array with shape: {document_vectors.shape}") + + logger.info(f"Creating UMAP instance with n_components=2, metric='cosine'") + umap_instance = UMAP(n_components=2, metric='cosine', random_state=42) + logger.info(f"Fitting UMAP and transforming document_vectors...") + document_map = umap_instance.fit_transform(document_vectors) + logger.info(f"Generated document_map with shape: {document_map.shape}") + except Exception as e: + logger.error(f"Failed to generate UMAP projection: {e}") + logger.error(traceback.format_exc()) + + # If UMAP fails or isn't available, fall back to random 2D points + if len(document_map) == 0 and len(document_vectors) > 0: + logger.warning("Falling back to random 2D points for visualization...") + + # Get the number of points needed + if isinstance(document_vectors, np.ndarray): + num_points = document_vectors.shape[0] + else: + num_points = len(document_vectors) + + # Generate random 2D points + np.random.seed(42) # Use fixed seed for reproducibility + document_map = np.random.rand(num_points, 2) * 10 # Scale up for visibility + logger.info(f"Generated random document_map with shape: {document_map.shape}") + + # Get max number of layers + max_layer = 0 + for cluster in comment_clusters: + for key in cluster: + if key.startswith('layer') and key.endswith('_cluster_id'): + layer_id = int(key.replace('layer', '').replace('_cluster_id', '')) + max_layer = max(max_layer, layer_id) + + # Initialize cluster layers + cluster_layers = [np.zeros(len(comment_ids), dtype=int) for _ in range(max_layer + 1)] + + # Map comment IDs to indices + comment_id_to_idx = {comment_id: i for i, comment_id in enumerate(comment_ids)} + + # Fill in cluster assignments + for cluster in comment_clusters: + comment_id = str(cluster.get('comment_id')) + if comment_id in comment_id_to_idx: + idx = comment_id_to_idx[comment_id] + for layer_id in range(max_layer + 1): + layer_key = f'layer{layer_id}_cluster_id' + if layer_key in cluster and cluster[layer_key] is not None: + try: + # Convert to int, handling various formats + if isinstance(cluster[layer_key], dict) and 'N' in cluster[layer_key]: + # DynamoDB NumberAttribute format + cluster_id = int(cluster[layer_key]['N']) + else: + cluster_id = int(cluster[layer_key]) + cluster_layers[layer_id][idx] = cluster_id + except (TypeError, ValueError) as e: + logger.warning(f"Skipping invalid cluster ID for comment {comment_id}, layer {layer_id}: {cluster[layer_key]}") + # Skip this assignment but continue processing others + + logger.info(f"Successfully loaded pre-processed data with {len(comment_ids)} comments and {len(cluster_layers)} layers") + + return { + 'document_vectors': document_vectors, + 'document_map': document_map, + 'cluster_layers': cluster_layers, + 'comment_texts': comment_texts, + 'comment_ids': comment_ids, + 'source': 'dynamo_pre_processed' + } + + except Exception as e: + logger.error(f"Error loading pre-processed UMAP data: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + + except Exception as e: + logger.error(f"Error checking for pre-processed data: {e}") + import traceback + logger.error(traceback.format_exc()) + return None + +def process_conversation(zid, export_dynamo=True, use_ollama=False, use_precomputed=True, skip_postgres=False, postgres_for_comments_only=True): """ Main function to process a conversation and generate visualizations. @@ -1118,28 +1891,17 @@ def process_conversation(zid, export_dynamo=True, use_ollama=False): zid: Conversation ID export_dynamo: Whether to export results to DynamoDB use_ollama: Whether to use Ollama for topic naming + use_precomputed: Whether to try using pre-computed data from DynamoDB + skip_postgres: Whether to skip PostgreSQL entirely + postgres_for_comments_only: If True, still use PostgreSQL for comment texts even if skip_postgres is True """ # Create conversation directory output_dir = os.path.join("polis_data", str(zid), "python_output", "comments_enhanced_multilayer") os.makedirs(output_dir, exist_ok=True) - # Fetch data from PostgreSQL - comments, metadata = fetch_conversation_data(zid) - if not comments: - logger.error("Failed to fetch conversation data.") - return False - - conversation_id = str(zid) - conversation_name = metadata.get('conversation_name', f"Conversation {zid}") - - # Process comments - document_map, document_vectors, cluster_layers, comment_texts, comment_ids = process_comments( - comments, conversation_id - ) - # Initialize DynamoDB storage if requested dynamo_storage = None - if export_dynamo: + if export_dynamo or use_precomputed: # Use endpoint from environment if available endpoint_url = os.environ.get('DYNAMODB_ENDPOINT') logger.info(f"Using DynamoDB endpoint from environment: {endpoint_url}") @@ -1148,6 +1910,81 @@ def process_conversation(zid, export_dynamo=True, use_ollama=False): region_name='us-west-2', endpoint_url=endpoint_url ) + + # Try to load pre-processed data if requested + precomputed_data = None + if use_precomputed and dynamo_storage: + precomputed_data = load_processed_data_from_dynamo( + dynamo_storage, + str(zid), + skip_postgres=skip_postgres, + postgres_for_comments_only=postgres_for_comments_only + ) + + # If we have pre-computed data, use it directly + if precomputed_data: + logger.info("Using pre-computed data from DynamoDB") + document_map = precomputed_data['document_map'] + document_vectors = precomputed_data['document_vectors'] + cluster_layers = precomputed_data['cluster_layers'] + comment_texts = precomputed_data['comment_texts'] + comment_ids = precomputed_data['comment_ids'] + + # Print debug information about array shapes + logger.info(f"Precomputed document_vectors shape: {document_vectors.shape}") + logger.info(f"Precomputed document_map shape: {document_map.shape}") + + # If both document_vectors and document_map are empty, we need to create synthetic data + if len(document_vectors) == 0 and len(comment_texts) > 0: + logger.warning(f"Precomputed document_vectors is empty but we have {len(comment_texts)} comments. Generating synthetic embeddings...") + try: + # Create synthetic random embeddings (just for visualization) + document_vectors = np.random.rand(len(comment_texts), 384) # Standard embedding dimension + logger.info(f"Generated synthetic document_vectors with shape: {document_vectors.shape}") + except Exception as e: + logger.error(f"Failed to generate synthetic embeddings: {e}") + + # If document_map is empty but we have document_vectors, generate a new UMAP projection + if len(document_map) == 0 and len(document_vectors) > 0: + logger.warning("Precomputed document_map is empty but document_vectors exists. Generating UMAP projection...") + try: + from umap import UMAP + document_map = UMAP(n_components=2, metric='cosine', random_state=42).fit_transform(document_vectors) + logger.info(f"Generated document_map with shape: {document_map.shape}") + except Exception as e: + logger.error(f"Failed to generate UMAP projection: {e}") + + # If UMAP fails or isn't available, fall back to random 2D points + if len(document_map) == 0 and len(document_vectors) > 0: + logger.warning("Falling back to random 2D points for visualization...") + document_map = np.random.rand(len(document_vectors), 2) * 10 # Scale up for visibility + logger.info(f"Generated random document_map with shape: {document_map.shape}") + + # Get necessary metadata + conversation_id = str(zid) + + # Still need to fetch basic metadata for conversation name + _, metadata = fetch_conversation_data(zid, skip_postgres=skip_postgres, postgres_for_comments_only=postgres_for_comments_only) + conversation_name = metadata.get('conversation_name', f"Conversation {zid}") + + # Add source information to metadata + metadata['source'] = 'dynamo_pre_processed' + metadata['precomputed'] = True + else: + # Fallback to standard processing path + # Fetch data from PostgreSQL or DynamoDB + comments, metadata = fetch_conversation_data(zid, skip_postgres=skip_postgres, postgres_for_comments_only=postgres_for_comments_only) + if not comments: + logger.error("Failed to fetch conversation data.") + return False + + conversation_id = str(zid) + conversation_name = metadata.get('conversation_name', f"Conversation {zid}") + + # Process comments + document_map, document_vectors, cluster_layers, comment_texts, comment_ids = process_comments( + comments, conversation_id + ) # Store basic data in DynamoDB logger.info(f"Storing basic data in DynamoDB for conversation {conversation_id}...") @@ -1162,11 +1999,12 @@ def process_conversation(zid, export_dynamo=True, use_ollama=False): ) dynamo_storage.create_conversation_meta(conversation_meta) - # Store embeddings - logger.info("Storing comment embeddings...") + # Store embeddings with UMAP coordinates + logger.info("Storing comment embeddings with UMAP coordinates...") embedding_models = DataConverter.batch_convert_embeddings( conversation_id, - document_vectors + document_vectors, + document_map # Pass document_map to the converter to store UMAP coordinates ) result = dynamo_storage.batch_create_comment_embeddings(embedding_models) logger.info(f"Stored {result['success']} embeddings with {result['failure']} failures") @@ -1218,7 +2056,20 @@ def process_conversation(zid, export_dynamo=True, use_ollama=False): # Save metadata with open(os.path.join(output_dir, f"{conversation_id}_metadata.json"), 'w') as f: - json.dump(metadata, f, indent=2) + # Custom encoder to handle Decimal, numpy types, etc. + class CustomJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, decimal.Decimal): + return float(obj) + if isinstance(obj, (np.int64, np.int32, np.int16, np.int8)): + return int(obj) + if isinstance(obj, (np.float64, np.float32, np.float16)): + return float(obj) + if isinstance(obj, np.ndarray): + return obj.tolist() + return super().default(obj) + + json.dump(metadata, f, indent=2, cls=CustomJSONEncoder) logger.info(f"Processing of conversation {conversation_id} complete!") @@ -1247,6 +2098,12 @@ def main(): help='Use mock data instead of connecting to PostgreSQL') parser.add_argument('--use-ollama', action='store_true', help='Use Ollama for topic naming') + parser.add_argument('--use-precomputed', action='store_true', + help='Use pre-computed data from DynamoDB if available') + parser.add_argument('--use-dynamodb', action='store_true', + help='Prioritize DynamoDB for data source over PostgreSQL') + parser.add_argument('--skip-postgres-load', action='store_true', + help='Skip PostgreSQL load entirely, only use DynamoDB') args = parser.parse_args() @@ -1259,10 +2116,29 @@ def main(): db_password=args.db_password ) - # Log Ollama usage + # Log parameter usage if args.use_ollama: logger.info("Ollama will be used for topic naming") + if args.use_precomputed: + logger.info("Will attempt to use pre-computed data from DynamoDB") + + if args.use_dynamodb: + logger.info("DynamoDB will be prioritized as data source") + + if args.skip_postgres_load: + logger.info("PostgreSQL load will be skipped entirely") + + # DynamoDB is used by default - set environment variables + os.environ['PREFER_DYNAMODB'] = 'true' + os.environ['USE_DYNAMODB'] = 'true' + + # Only disable DynamoDB if explicitly requested (for backward compatibility) + if args.no_dynamo: + logger.warning("DEPRECATED: PostgreSQL-only mode is deprecated and will be removed in a future release.") + os.environ['PREFER_DYNAMODB'] = 'false' + os.environ['USE_DYNAMODB'] = 'false' + # Process conversation if args.use_mock_data: logger.info("Using mock data instead of connecting to PostgreSQL") @@ -1297,14 +2173,23 @@ def main(): # Store in DynamoDB if requested if not args.no_dynamo: - store_in_dynamo( - str(args.zid), - document_vectors, - document_map, - cluster_layers, - mock_comments, - comment_ids + # Warning: store_in_dynamo method doesn't exist and is left here from earlier version + # Use DynamoDBStorage to upload mock data + dynamo_storage = DynamoDBStorage( + region_name='us-west-2', + endpoint_url=os.environ.get('DYNAMODB_ENDPOINT') ) + + # Prepare and upload conversation config + conversation_meta = DataConverter.create_conversation_meta( + str(args.zid), + document_vectors, + cluster_layers, + mock_metadata + ) + dynamo_storage.create_conversation_meta(conversation_meta) + + logger.info("Mock data stored in DynamoDB") # Process each layer and create visualizations output_dir = os.path.join("polis_data", str(args.zid), "python_output", "comments_enhanced_multilayer") @@ -1320,8 +2205,26 @@ def main(): use_ollama=args.use_ollama ) else: - # Process with real data from PostgreSQL - process_conversation(args.zid, export_dynamo=not args.no_dynamo, use_ollama=args.use_ollama) + # By default, still use PostgreSQL for comment texts even if using DynamoDB for everything else + postgres_for_comments_only = not args.skip_postgres_load + + # Process with DynamoDB by default, fallback to PostgreSQL if needed + success = process_conversation( + args.zid, + export_dynamo=True, # Always export to DynamoDB + use_ollama=args.use_ollama, + use_precomputed=True, # Always try to use precomputed data + skip_postgres=args.skip_postgres_load or os.environ.get('PREFER_DYNAMODB') == 'true', + postgres_for_comments_only=postgres_for_comments_only + ) + + # Report success or failure + if success: + logger.info(f"Successfully processed conversation {args.zid}") + sys.exit(0) + else: + logger.error(f"Failed to process conversation {args.zid}") + sys.exit(1) if __name__ == "__main__": main() \ No newline at end of file