diff --git a/requirements.txt b/requirements.txt index 362a5a3..a44df98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ -openai==1.3.7 +openai>=0.27.8,<0.28.0 PyGithub==2.1.1 Requests==2.31.0 google-generativeai==0.3.2 pylance==0.9.17 -ipython==8.21.0 \ No newline at end of file +ipython==8.21.0 +openai-agent==0.1.0 diff --git a/src/latio/core.py b/src/latio/core.py index 6aa8fea..5e0733a 100644 --- a/src/latio/core.py +++ b/src/latio/core.py @@ -1,4 +1,6 @@ from openai import OpenAI +from agents import Agent, Runner +from pydantic import BaseModel import os import sys import requests @@ -10,6 +12,8 @@ import google.generativeai as genai from IPython.display import display from IPython.display import Markdown +import asyncio +import workers def to_markdown(text): text = text.replace('•', ' *') @@ -17,11 +21,10 @@ def to_markdown(text): google_models = ['gemini-pro'] -openaikey = os.environ.get('OPENAI_API_KEY') +client = OpenAI(api_key=os.environ.get('OPENAI_API_KEY')) githubkey = os.environ.get('GITHUB_TOKEN') googleapikey = os.environ.get('GEMINI_API_KEY') -client = OpenAI(api_key=openaikey) genai.configure(api_key=googleapikey) def get_changed_files_github(directory, base_ref, head_ref): @@ -52,35 +55,136 @@ def get_changed_files(directory): """ changed_files = [] try: + original_dir = os.getcwd() os.chdir(directory) - result = subprocess.check_output(["git", "diff", "--name-status"], text=True) - if not result.strip(): - return None # Indicate no changes - lines = result.strip().split('\n') - for line in lines: - if line: # Check if the line is not empty - status, file_path = line.split(maxsplit=1) - if status != 'D': # Exclude deleted files - changed_files.append(file_path) - except subprocess.CalledProcessError as e: - print(f"Error getting changed files: {e}") - return changed_files + print(f"Executing git commands in {os.getcwd()}") + + # Check if this is a git repository + try: + subprocess.check_output(["git", "rev-parse", "--is-inside-work-tree"], text=True) + except subprocess.CalledProcessError: + print(f"Error: {directory} is not a git repository") + os.chdir(original_dir) + return [] + + try: + # Get unstaged changes + unstaged = subprocess.check_output(["git", "diff", "--name-only"], text=True).strip().split('\n') + # Get staged changes + staged = subprocess.check_output(["git", "diff", "--staged", "--name-only"], text=True).strip().split('\n') + # Get untracked files + untracked = subprocess.check_output(["git", "ls-files", "--others", "--exclude-standard"], text=True).strip().split('\n') + + # Combine all changes, removing empty entries + all_changes = [f for f in unstaged + staged + untracked if f] + changed_files = list(set(all_changes)) # Remove duplicates + + print(f"Unstaged: {len([f for f in unstaged if f])}, Staged: {len([f for f in staged if f])}, Untracked: {len([f for f in untracked if f])}") + + except subprocess.CalledProcessError as e: + print(f"Error executing git command: {e}") + + print(f"Detected {len(changed_files)} changed files") + return changed_files + except Exception as e: + print(f"Unexpected error getting changed files: {e}") + import traceback + traceback.print_exc() + return [] + finally: + if 'original_dir' in locals(): + os.chdir(original_dir) def get_line_changes(directory, changed_files): """ Returns a string containing colored line changes from the changed files. """ + original_dir = os.getcwd() line_changes = "" try: os.chdir(directory) + print(f"Getting line changes in {os.getcwd()}") + for file in changed_files: - result = subprocess.check_output(["git", "diff", "--", file], text=True) - if result.strip(): - line_changes += f"\nFile: {color_text(file, '34')}\n" - for line in result.splitlines(): - line_changes += color_diff_line(line) + "\n" - except subprocess.CalledProcessError as e: - print(f"Error getting line changes: {e}") + print(f"Processing file: {file}") + + # Track if we've found changes for this file + found_changes = False + + # Try unstaged changes first + try: + result = subprocess.check_output(["git", "diff", "--", file], text=True) + if result.strip(): + print(f"Found unstaged changes for {file}") + line_changes += f"\nFile: {color_text(file, '34')}\n" + for line in result.splitlines(): + line_changes += color_diff_line(line) + "\n" + found_changes = True + except subprocess.CalledProcessError as e: + print(f"Error getting unstaged diff for {file}: {e}") + + # If no unstaged changes, try staged changes + if not found_changes: + try: + result = subprocess.check_output(["git", "diff", "--staged", "--", file], text=True) + if result.strip(): + print(f"Found staged changes for {file}") + line_changes += f"\nFile: {color_text(file, '34')}\n" + for line in result.splitlines(): + line_changes += color_diff_line(line) + "\n" + found_changes = True + except subprocess.CalledProcessError as e: + print(f"Error getting staged diff for {file}: {e}") + + # Check if this is an untracked file (new file) + if not found_changes: + try: + untracked_files = subprocess.check_output(["git", "ls-files", "--others", "--exclude-standard"], text=True).strip().split('\n') + if file in untracked_files: + print(f"{file} is an untracked file, including full content") + try: + with open(file, 'r') as f: + content = f.read() + + # Format as a diff for a new file + line_changes += f"\nFile: {color_text(file, '34')} (New File)\n" + line_changes += f"diff --git a/{file} b/{file}\n" + line_changes += f"new file mode 100644\n" + line_changes += f"--- /dev/null\n" + line_changes += f"+++ b/{file}\n" + + # Add each line with a + to indicate addition + for line in content.splitlines(): + line_changes += color_diff_line("+" + line) + "\n" + + found_changes = True + except Exception as e: + print(f"Error reading untracked file {file}: {e}") + except subprocess.CalledProcessError as e: + print(f"Error checking untracked files: {e}") + + # If still no changes found, this is unexpected + if not found_changes: + print(f"Warning: No changes found for {file} despite it being in the changed files list") + try: + with open(file, 'r') as f: + content = f.read() + line_changes += f"\nFile: {color_text(file, '34')} (Full content - no diff available)\n" + for line in content.splitlines(): + line_changes += line + "\n" + except Exception as e: + print(f"Error reading file {file}: {e}") + + except Exception as e: + print(f"Unexpected error in get_line_changes: {e}") + import traceback + traceback.print_exc() + finally: + os.chdir(original_dir) + + if not line_changes.strip(): + print("Warning: No line changes were detected for any files") + return line_changes def full_sec_scan(application_summary, model): @@ -98,17 +202,19 @@ def full_sec_scan(application_summary, model): else: try: response = client.chat.completions.create( - model=model, # Choose the appropriate engine - messages=[ - {"role": "system", "content": "You are an application security expert, skilled in explaining complex programming vulnerabilities with simplicity. You will receive the full code for an application. Your task is to review the code for security vulnerabilities and suggest improvements. Don't overly focus on one file, and instead provide the top security concerns based on what you think the entire application is doing."}, - {"role": "user", "content": application_summary} - ] + model=model, + messages=[ + {"role": "system", "content": "You are an application security expert."}, + {"role": "user", "content": "Please review the following code for security vulnerabilities: " + application_summary} + ], + max_tokens=1000, + temperature=0.7, ) - message = response.choices[0].message.content + message = response.choices[0].message.content.strip() return message except Exception as e: return f"Error occurred: {e}" - + def full_health_scan(application_summary, model): """ This function sends a code snippet to OpenAI's API to check for optimizations. @@ -124,13 +230,15 @@ def full_health_scan(application_summary, model): else: try: response = client.chat.completions.create( - model=model, # Choose the appropriate engine - messages=[ - {"role": "system", "content": "You are a world class 10x developer who gives kind suggestions for remediating code smells and optimizing for big O complexity. You will receive the full code for an application. Your task is to review the code for optimizations and improvements, calling out the major bottlenecks. Don't overly focus on one file, and instead provide the best optimizations based on what you think the entire application is doing."}, - {"role": "user", "content": application_summary} - ] + model=model, + messages=[ + {"role": "system", "content": "You are a world class 10x developer."}, + {"role": "user", "content": "Please review the following code for optimizations: " + application_summary} + ], + max_tokens=1000, + temperature=0.7, ) - message = response.choices[0].message.content + message = response.choices[0].message.content.strip() return message except Exception as e: return f"Error occurred: {e}" @@ -160,7 +268,50 @@ def full_scan(directory, model, health=False): result = full_sec_scan(application_summary, model) return result -import time +async def full_agent_scan(directory, model, health=False): + """ + Scans files changed locally and includes detailed line changes for security issues. + """ + file_list = [] + for root, dirs, files in os.walk(directory): + for file in files: + file_path = os.path.join(root, file) + try: + with open(file_path, 'r') as f: + line_count = len(f.readlines()) + file_list.append(f"{file_path} ({line_count} lines)") + except Exception as e: + file_list.append(f"{file_path} (error reading file: {str(e)})") + application_summary = "\n".join(file_list) + + prompt = "Here are all of the files in this application: " + application_summary + try: + # Try with proper error handling + print("Sending to context agent...") + security_tool = workers.security_agent.as_tool( + tool_name="security_agent", + tool_description="Specialist in evaluating code for security issues." + ) + health_tool = workers.health_agent.as_tool( + tool_name="health_agent", + tool_description="Specialist in evaluating code for health issues." + ) + full_context_code_gatherer = workers.full_context_agent_code.as_tool( + tool_name="full_context_agent_code", + tool_description="Specialist in evaluating code for security and health issues." + ) + full_context_with_tools = workers.full_context_file_parser.clone(tools=[full_context_code_gatherer, security_tool, health_tool, workers.gather_full_code]) + result = await Runner.run(full_context_with_tools, prompt) + + print("Received response from full context agent") + + return result + except Exception as e: + print(f"Error in context agent: {e}") + import traceback + traceback.print_exc() + return color_text(f"Error during analysis: {str(e)}", "31") + def partial_sec_scan(application_summary, model): """ @@ -176,20 +327,20 @@ def partial_sec_scan(application_summary, model): return f"Error occurred: {e}" else: try: - print("Waiting for response from AI...") - # Send the request response = client.chat.completions.create( - model=model, # Choose the appropriate engine - messages=[ - {"role": "system", "content": "You are an application security expert, skilled in explaining complex programming vulnerabilities with simplicity. You will receive changed code as part of a pull request, followed by the rest of the file. Your task is to review the code change for security vulnerabilities and suggest improvements. Pay attention to if the code is getting added or removed indicated by the + or - at the beginning of the line. Suggest specific code fixes where applicable. Focus the most on the code that is being changed, which starts with Detailed Line Changes, instead of Changed Files."}, - {"role": "user", "content": application_summary} - ] + model=model, + messages=[ + {"role": "system", "content": "You are an application security expert."}, + {"role": "user", "content": "Please review the following code changes for security vulnerabilities: " + application_summary} + ], + max_tokens=1000, + temperature=0.7, ) - message = response.choices[0].message.content + message = response.choices[0].message.content.strip() return message except Exception as e: return f"Error occurred: {e}" - + def partial_health_scan(application_summary, model): """ This function sends a code snippet to OpenAI's API to check for code optimizations. @@ -204,16 +355,16 @@ def partial_health_scan(application_summary, model): return f"Error occurred: {e}" else: try: - print("Waiting for response from AI...") - # Send the request response = client.chat.completions.create( - model=model, # Choose the appropriate engine - messages=[ - {"role": "system", "content": "You are a world class 10x developer who gives kind suggestions for remediating code smells and optimizing for big O complexity. You will receive changed code as part of a pull request, followed by the rest of the file. Your task is to review the changed code for optimizations and improvements, calling out any potential slowdowns. Pay attention to if the code is getting added or removed indicated by the + or - at the beginning of the line. Focus the most on the code that is being changed, which starts with Detailed Line Changes, instead of Changed Files."}, - {"role": "user", "content": application_summary} - ] + model=model, + messages=[ + {"role": "system", "content": "You are a world class 10x developer."}, + {"role": "user", "content": "Please review the following code changes for optimizations: " + application_summary} + ], + max_tokens=1000, + temperature=0.7, ) - message = response.choices[0].message.content + message = response.choices[0].message.content.strip() return message except Exception as e: return f"Error occurred: {e}" @@ -294,6 +445,56 @@ def color_diff_line(line): return color_text(line, "31") return line +async def partial_agent_scan(directory, model, health=False): + """ + Scans files changed locally and includes detailed line changes for security issues. + """ + # Retrieve names of changed files + changed_files = get_changed_files(directory) + if changed_files is None or not changed_files: + print("Debug: get_changed_files returned:", changed_files) + return color_text("You haven't made any changes to test.", "31") + + # Print names of changed files in blue + print(color_text("Changed Files:", "34")) + for file_path in changed_files: + print(color_text(file_path, "34")) + + # Retrieve and print changed lines of code in green + line_changes = get_line_changes(directory, changed_files) + if not line_changes: + return color_text("No changed lines to scan.", "31") # Red text for errors + print(color_text("\nChanged Code for Analysis:\n", "32") + line_changes) # Don't double-color the lines + + # Prepare the summary for scanning + changes_summary = "Detailed Line Changes:\n" + line_changes + "\n\nChanged Files:\n" + "\n".join(changed_files) + print("Starting partial scan...") + + # Fix: Add space between prompt and content + prompt = "Please analyze these code changes: \n\n" + changes_summary + + try: + # Try with proper error handling + print("Sending to context agent...") + security_tool = workers.security_agent.as_tool( + tool_name="security_agent", + tool_description="Specialist in evaluating code for security issues." + ) + health_tool = workers.health_agent.as_tool( + tool_name="health_agent", + tool_description="Specialist in evaluating code for health issues." + ) + context_with_tools = workers.context_agent.clone(tools=[security_tool, health_tool, workers.analyze_code_context]) + result = await Runner.run(context_with_tools, prompt) + print("Received response from context agent") + + return result + except Exception as e: + print(f"Error in context agent: {e}") + import traceback + traceback.print_exc() + return color_text(f"Error during analysis: {str(e)}", "31") + def partial_scan(directory, model, health=False): """ Scans files changed locally and includes detailed line changes for security issues. @@ -324,7 +525,6 @@ def partial_scan(directory, model, health=False): result = partial_sec_scan(changes_summary, model) return result - def main(): """ Main function to perform full or partial security scanning. @@ -337,7 +537,8 @@ def main(): mode = sys.argv[1] # Set the default model based on the mode - default_model = 'gpt-4-1106-preview' if mode == 'full' else 'gpt-3.5-turbo' + default_model = 'gpt-4o' if mode == 'full' else 'gpt-4o' + print("Running in mode:", mode, "with model:", default_model) # Set up argparse for the --model argument with the conditional default parser = argparse.ArgumentParser(add_help=False) @@ -353,6 +554,19 @@ def main(): directory = remaining_argv[0] print(full_scan(directory, model=args.model, health=args.health)) + elif mode == 'full-agentic': + if len(remaining_argv) < 1: + print("Usage for full scan: latio full-agentic ") + sys.exit(1) + directory = remaining_argv[0] + try: + result = asyncio.run(full_agent_scan(directory, model=args.model, health=args.health)) + print(result) + except Exception as e: + print(f"Error during partial scan: {e}") + import traceback + traceback.print_exc() + elif mode == 'github': if len(remaining_argv) < 2: print("Usage for partial scan: latio partial ") @@ -362,6 +576,20 @@ def main(): github_token = os.environ.get('GITHUB_TOKEN') print(github_scan(repo_name, pr_number, github_token, model=args.model, health=args.health)) + elif mode == 'partial-agentic': + if len(remaining_argv) < 1: + print("Usage for full scan: latio partial ") + sys.exit(1) + directory = remaining_argv[0] + # Use asyncio.run to execute the async function + try: + result = asyncio.run(partial_agent_scan(directory, model=args.model, health=args.health)) + print(result) + except Exception as e: + print(f"Error during partial scan: {e}") + import traceback + traceback.print_exc() + elif mode == 'partial': if len(remaining_argv) < 1: print("Usage for full scan: latio partial ") @@ -380,4 +608,8 @@ def main(): else: print("Invalid mode. Use 'full' or 'partial'.") - sys.exit(1) \ No newline at end of file + sys.exit(1) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/src/latio/workers.py b/src/latio/workers.py new file mode 100644 index 0000000..3ac1c6f --- /dev/null +++ b/src/latio/workers.py @@ -0,0 +1,154 @@ +from agents import Agent, function_tool, Runner +from agents.extensions.visualization import draw_graph +from agents.extensions import handoff_filters +from agents.extensions.handoff_prompt import RECOMMENDED_PROMPT_PREFIX +import subprocess +import os +from typing import List, Dict, Set + +@function_tool +def analyze_code_context(function_changes: List[str], changed_files: List[str]) -> dict[str, str]: + """ + Takes in a list of files and line changes and returns any relevant file details and application context. + """ + # Get the file contents + print("Changed files:", changed_files) + file_contents = {} + + # Get the absolute path of the workspace root + workspace_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + print("Workspace root:", workspace_root) + + for file in changed_files: + try: + # Construct absolute path for the file + file_path = os.path.join(workspace_root, file) + print(f"Attempting to read file: {file_path}") + with open(file_path, 'r') as f: + file_contents[file] = f.read() + except FileNotFoundError: + print(f"Warning: File {file_path} not found") + except Exception as e: + print(f"Warning: Error reading file {file_path}: {str(e)}") + + # Get the codebase info by searching the codebase for any .md files + codebase_info = "" + try: + for root, _, files in os.walk(workspace_root): + for file in files: + if file.endswith(".md"): + file_path = os.path.join(root, file) + try: + with open(file_path, 'r') as f: + codebase_info += f.read() + "\n" + except Exception as e: + print(f"Warning: Error reading markdown file {file_path}: {str(e)}") + except Exception as e: + print(f"Warning: Error walking directory: {str(e)}") + + app_context_agent = Agent( + name="App Context Agent", + ), + context_info_prompt = "You are a developer with a deep understanding of the codebase and the latest best practices. You will receive information about a codebase, changed functions, and file details. Your job is to summarize the application context, including the overall purpose of the application, the overall architecture, and the overall codebase. Here is some information about the codebase and what it's doing: " + str(codebase_info) + "\n Here is the file contents: " + str(file_contents) + "\n Here is the function changes: " + str(function_changes) + app_context = Runner.run(app_context_agent, context_info_prompt) + return app_context + +@function_tool +def gather_full_code(changed_files: List[str]): + """ + Takes in a list of files and line changes and returns any relevant file details and application context. + Each line in the returned file contents will be prefixed with its line number. + """ + # Get the file contents + print("Analyzing files:", changed_files) + file_contents = {} + + # Get the absolute path of the workspace root + workspace_root = os.path.abspath(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) + print("Workspace root:", workspace_root) + + for file in changed_files: + try: + # Construct absolute path for the file + file_path = os.path.join(workspace_root, file) + with open(file_path, 'r') as f: + # Read lines and add line numbers + print(f"Reading file: {file_path}") + lines = f.readlines() + numbered_lines = [f"{i+1}: {line}" for i, line in enumerate(lines)] + file_contents[file] = ''.join(numbered_lines) + except FileNotFoundError: + print(f"Warning: File {file_path} not found") + except Exception as e: + print(f"Warning: Error reading file {file_path}: {str(e)}") + + return file_contents + +security_agent = Agent( + name="Security Agent", + handoff_description="Specialist in evaluating code for security issues.", + instructions=( + """ + {RECOMMENDED_PROMPT_PREFIX} + You are a super friendly security expert with a deep understanding of the codebase and the latest security best practices. + You will be given a list of files and code snippets to evaluate for security issues, as well as additional context about the codebase. + Give the user a short summary of the security issues you found, the files they were found in, the lines of code that are affected, and some fix guidance with an example specific to the user's code. + """ + ), +) + +health_agent = Agent( + name="Health Agent", + handoff_description="Specialist in evaluating code for health issues.", + instructions=( + "You are a 10x developer with a deep understanding of the codebase and the latest health best practices." + "You will be given a list of files and code snippets to evaluate for health issues, as well as additional context about the codebase ." + "Give the user a short summary of the health issues you found, the files they were found in, the lines of code that are affected, and some fix guidance with an example." + ), +) + +context_agent = Agent( + name="Context Agent", + handoff_description="Specialist in evaluating code for security and health issues.", + instructions=( + "You are a coding expert with a deep understanding of the codebase and the latest security and health best practices." + "You will be given a list of files and lines of code that have been changed in a pull request. You will first find all relevant code and files related to the changes." + "The analyze_code_context function takes in a list of function changes based on the line changes you're seeing, as well as their file paths, and returns a summary of the relevant code and files." + "This will be a lot of information to process, so condense this information for the security and health agents: what the application is generally doing, what the files are doing in the context of the application, and what the function changes are doing in the context of the files." + "Then, based on the relevant code you find, you will hand off to the security agent or the health agent. It is essential that the original code changes " + "If there are potential security issues to investigate, handoff to the security agent." + "If there are potential health issues to investigate, handoff to the health agent." + "If there are no issues, return a message to the user that the pull request is good to go." + ), + handoffs=[security_agent, health_agent], + tools=[analyze_code_context], +) + +full_context_agent_code = Agent( + name="Full Context Agent Code Gatherer", + handoff_description="Specialist in evaluating code for security and health issues.", + instructions=(""" + {RECOMMENDED_PROMPT_PREFIX} + You are a coding expert with a deep understanding of the codebase and the latest security and health best practices." + You will be given a list of files for analysis. You will first fetch all of the code for these files using the analyze_code_context function." + This will be a lot of information to process, so condense this information for the security and health agents: what the application is generally doing, what the files are doing in the context of the application, and the specific lines of code that are most relevant for analysis." + If there are potential security issues to investigate, handoff to the security agent with the most relevant code." + If there are potential health issues to investigate, handoff to the health agent with the most relevant code." + If there are no issues, return a message to the user that the code has no issues." + """ + ), + handoffs=[security_agent, health_agent], + tools=[gather_full_code], +) + +full_context_file_parser = Agent( + name="Full Context Agent File Parser", + handoff_description="Specialist in evaluating code for security and health issues.", + instructions=( + "You are a coding expert with a deep understanding of the codebase and the latest security and health best practices." + "You are going to receive a list of files, return only the ones that seem the most relevant for security or health analysis." + "Then, you will make sure to drop any files that seem they will be larger than your context window." + "You will then hand off the relevant files to the full context agent code gatherer to analyze the code." + ), + handoffs=[full_context_agent_code], +) \ No newline at end of file