diff --git a/proposal_processor/processor.py b/proposal_processor/processor.py index 18ca0ab..fc61033 100644 --- a/proposal_processor/processor.py +++ b/proposal_processor/processor.py @@ -14,15 +14,35 @@ from google.cloud import aiplatform import smtplib, time, os import logging +import grpc +from absl import logging as absl_logging +# Set environment variables os.environ['GRPC_ENABLE_FORK_SUPPORT'] = '0' +# Initialize logging before gRPC +absl_logging.set_verbosity(absl_logging.INFO) + +# Initialize base logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) +# Initialize gRPC +def init_grpc(): + try: + channel = grpc.insecure_channel('localhost:50051') + grpc.channel_ready_future(channel).result(timeout=10) + except grpc.FutureTimeoutError: + pass + except Exception as e: + logger.warning(f"gRPC initialization warning (non-critical): {str(e)}") + +# Call initialization +init_grpc() + class ProposalProcessor: def __init__( self, @@ -33,11 +53,13 @@ def __init__( gcp_project_id: Optional[str] = None, gcp_location: Optional[str] = None, credentials_path: Optional[str] = None, - email_config: Optional[TypedDict] = None + email_config: Optional[TypedDict] = None, + debug: bool = False ): logger.info("Initializing ProposalProcessor with provider: %s", llm_provider) self.wait_between_api_calls = 1 - self.wait_between_api_sections= 60 + self.wait_between_api_sections = 60 + self.debug = debug # Initialize LLM based on provider if llm_provider == "openai": @@ -82,11 +104,26 @@ def __init__( self.retriever = (self.vectorstore).as_retriever(search_kwargs={"k": 1}) + def export_debug_section(self, section: str, content: any, output_dir: str = "debug_output"): + """Export section content to a text file for debugging.""" + os.makedirs(output_dir, exist_ok=True) + filename = f"{section.lower().replace(' ', '_')}.txt" + filepath = os.path.join(output_dir, filename) + + with open(filepath, 'w', encoding='utf-8') as f: + if isinstance(content, list): + f.write("\n".join([doc.page_content for doc in content])) + else: + f.write(str(content)) + logger.info(f"Exported debug file: {filepath}") + def retrieve_opportunity_docs(self, state: TypedDict) -> TypedDict: logger.info("Retrieving opportunity documents") docs = self.retriever.get_relevant_documents( "opportunity requirements scope objectives criteria" ) + if self.debug: + self.export_debug_section("opportunity_docs", docs) logger.info("Retrieved %d opportunity documents", len(docs)) state["opportunity_docs"] = docs return state @@ -97,6 +134,8 @@ def retrieve_corporate_docs(self, state: TypedDict) -> TypedDict: docs = self.retriever.get_relevant_documents( "company overview history mission values" ) + if self.debug: + self.export_debug_section("corporate_docs", docs) logger.info("Retrieved %d corporate documents", len(docs)) state["corporate_docs"] = docs return state @@ -107,6 +146,8 @@ def retrieve_staff_docs(self, state: TypedDict) -> TypedDict: docs = self.retriever.get_relevant_documents( "staff profiles expertise qualifications experience" ) + if self.debug: + self.export_debug_section("staff_docs", docs) logger.info("Retrieved %d staff documents", len(docs)) state["staff_docs"] = docs return state @@ -117,6 +158,8 @@ def retrieve_capabilities_docs(self, state: TypedDict) -> TypedDict: opportunity_text = "\n".join([doc.page_content for doc in state["opportunity_docs"]]) query = f"capabilities and competencies relevant to: {opportunity_text}" docs = self.retriever.get_relevant_documents(query) + if self.debug: + self.export_debug_section("capabilities_docs", docs) logger.info("Retrieved %d capabilities documents", len(docs)) state["capabilities_docs"] = docs return state @@ -127,33 +170,20 @@ def retrieve_experience_docs(self, state: TypedDict) -> TypedDict: opportunity_text = "\n".join([doc.page_content for doc in state["opportunity_docs"]]) query = f"past projects and experience relevant to: {opportunity_text}" docs = self.retriever.get_relevant_documents(query) + if self.debug: + self.export_debug_section("experience_docs", docs) logger.info("Retrieved %d experience documents", len(docs)) state["experience_docs"] = docs return state - - def export_debug_section(section: str, content: any, output_dir: str = "debug_output"): - """Export section content to a text file for debugging.""" - os.makedirs(output_dir, exist_ok=True) - filename = f"{section.lower().replace(' ', '_')}.txt" - filepath = os.path.join(output_dir, filename) - - with open(filepath, 'w', encoding='utf-8') as f: - if isinstance(content, list): - f.write("\n".join([doc.page_content for doc in content])) - else: - f.write(str(content)) - logger.info(f"Exported debug file: {filepath}") def generate_section(self, state: TypedDict, section: str, docs_key: str) -> str: logger.info("Generating section: %s", section) templates = { - "corporate_overview": "Write a comprehensive corporate overview based on: {documents}" - #,"staff_profile": "Create detailed staff profiles highlighting relevant expertise based on: {documents}", - #"unique_valuation_propositions": "Identify and explain why Northramp is best for this opportunity based on: {documents}", - #"capabilities": "Describe capabilities relevant to the opportunity requirements based on: {documents}", - #"experience": "Detail relevant corporate experience and past projects based on: {documents}", - #"past_performance": "Summarize past performance and achievements based on: {documents}", - #"responses": "Provide specific responses to opportunity questions and requirements based on: {documents}" + "corporate_overview": "Write a comprehensive corporate overview based on: {documents}", + "staff_profile": "Create detailed staff profiles highlighting relevant expertise based on: {documents}", + "capabilities": "Describe capabilities relevant to the opportunity requirements based on: {documents}", + "experience": "Detail relevant corporate experience and past projects based on: {documents}", + "responses": "Provide specific responses to opportunity questions and requirements based on: {documents}" } prompt = PromptTemplate( @@ -163,23 +193,28 @@ def generate_section(self, state: TypedDict, section: str, docs_key: str) -> str docs_text = "\n".join([doc.page_content for doc in state[docs_key]]) chain = prompt | self.llm response = chain.invoke({"documents": docs_text}) - logger.info(response.content[:10]) - return response.content + if self.debug: + self.export_debug_section(f"generated_{section}", response.content) + logger.info(response.content[:100]) + return response def build_document(self, state: TypedDict) -> TypedDict: logger.info("Building document") sections = { - "corporate_overview": "corporate_docs" - #,"staff_profile": "staff_docs", - #"capabilities": "capabilities_docs", - #"experience": "experience_docs", - #"responses": "opportunity_docs" + "corporate_overview": "corporate_docs", + "staff_profile": "staff_docs", + "capabilities": "capabilities_docs", + "experience": "experience_docs", + "responses": "opportunity_docs" } content = {} for section, docs_key in sections.items(): logger.info("Generating section: %s", section) - content[section] = self.generate_section(state, section, docs_key) + response = self.generate_section(state, section, docs_key) + content[section] = response.content # Extract content from AIMessage + if self.debug: + self.export_debug_section(f"final_{section}", content[section]) time.sleep(self.wait_between_api_sections) pdf_path = "proposal_response.pdf" @@ -250,19 +285,18 @@ def build_graph(self) -> StateGraph: workflow.add_node("opportunity", self.retrieve_opportunity_docs) workflow.add_node("corporate", self.retrieve_corporate_docs) - # workflow.add_node("staff", self.retrieve_staff_docs) - # workflow.add_node("capabilities", self.retrieve_capabilities_docs) - # workflow.add_node("experience", self.retrieve_experience_docs) + workflow.add_node("staff", self.retrieve_staff_docs) + workflow.add_node("capabilities", self.retrieve_capabilities_docs) + workflow.add_node("experience", self.retrieve_experience_docs) workflow.add_node("build", self.build_document) workflow.add_node("email", self.send_email) workflow.add_edge(START, "opportunity") workflow.add_edge("opportunity", "corporate") - workflow.add_edge("corporate", "build") - # workflow.add_edge("corporate", "staff") - # workflow.add_edge("staff", "capabilities") - # workflow.add_edge("capabilities", "experience") - # workflow.add_edge("experience", "build") + workflow.add_edge("corporate", "staff") + workflow.add_edge("staff", "capabilities") + workflow.add_edge("capabilities", "experience") + workflow.add_edge("experience", "build") workflow.add_edge("build", "email") workflow.add_edge("email", END)