Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 72 additions & 38 deletions proposal_processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,35 @@
from google.cloud import aiplatform
import smtplib, time, os
import logging
import grpc
from absl import logging as absl_logging

# Set environment variables
os.environ['GRPC_ENABLE_FORK_SUPPORT'] = '0'

# Initialize logging before gRPC
absl_logging.set_verbosity(absl_logging.INFO)

# Initialize base logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Initialize gRPC
def init_grpc():
try:
channel = grpc.insecure_channel('localhost:50051')
grpc.channel_ready_future(channel).result(timeout=10)
except grpc.FutureTimeoutError:
pass
except Exception as e:
logger.warning(f"gRPC initialization warning (non-critical): {str(e)}")

# Call initialization
init_grpc()

class ProposalProcessor:
def __init__(
self,
Expand All @@ -33,11 +53,13 @@ def __init__(
gcp_project_id: Optional[str] = None,
gcp_location: Optional[str] = None,
credentials_path: Optional[str] = None,
email_config: Optional[TypedDict] = None
email_config: Optional[TypedDict] = None,
debug: bool = False
):
logger.info("Initializing ProposalProcessor with provider: %s", llm_provider)
self.wait_between_api_calls = 1
self.wait_between_api_sections= 60
self.wait_between_api_sections = 60
self.debug = debug

# Initialize LLM based on provider
if llm_provider == "openai":
Expand Down Expand Up @@ -82,11 +104,26 @@ def __init__(

self.retriever = (self.vectorstore).as_retriever(search_kwargs={"k": 1})

def export_debug_section(self, section: str, content: any, output_dir: str = "debug_output"):
"""Export section content to a text file for debugging."""
os.makedirs(output_dir, exist_ok=True)
filename = f"{section.lower().replace(' ', '_')}.txt"
filepath = os.path.join(output_dir, filename)

with open(filepath, 'w', encoding='utf-8') as f:
if isinstance(content, list):
f.write("\n".join([doc.page_content for doc in content]))
else:
f.write(str(content))
logger.info(f"Exported debug file: {filepath}")

def retrieve_opportunity_docs(self, state: TypedDict) -> TypedDict:
logger.info("Retrieving opportunity documents")
docs = self.retriever.get_relevant_documents(
"opportunity requirements scope objectives criteria"
)
if self.debug:
self.export_debug_section("opportunity_docs", docs)
logger.info("Retrieved %d opportunity documents", len(docs))
state["opportunity_docs"] = docs
return state
Expand All @@ -97,6 +134,8 @@ def retrieve_corporate_docs(self, state: TypedDict) -> TypedDict:
docs = self.retriever.get_relevant_documents(
"company overview history mission values"
)
if self.debug:
self.export_debug_section("corporate_docs", docs)
logger.info("Retrieved %d corporate documents", len(docs))
state["corporate_docs"] = docs
return state
Expand All @@ -107,6 +146,8 @@ def retrieve_staff_docs(self, state: TypedDict) -> TypedDict:
docs = self.retriever.get_relevant_documents(
"staff profiles expertise qualifications experience"
)
if self.debug:
self.export_debug_section("staff_docs", docs)
logger.info("Retrieved %d staff documents", len(docs))
state["staff_docs"] = docs
return state
Expand All @@ -117,6 +158,8 @@ def retrieve_capabilities_docs(self, state: TypedDict) -> TypedDict:
opportunity_text = "\n".join([doc.page_content for doc in state["opportunity_docs"]])
query = f"capabilities and competencies relevant to: {opportunity_text}"
docs = self.retriever.get_relevant_documents(query)
if self.debug:
self.export_debug_section("capabilities_docs", docs)
logger.info("Retrieved %d capabilities documents", len(docs))
state["capabilities_docs"] = docs
return state
Expand All @@ -127,33 +170,20 @@ def retrieve_experience_docs(self, state: TypedDict) -> TypedDict:
opportunity_text = "\n".join([doc.page_content for doc in state["opportunity_docs"]])
query = f"past projects and experience relevant to: {opportunity_text}"
docs = self.retriever.get_relevant_documents(query)
if self.debug:
self.export_debug_section("experience_docs", docs)
logger.info("Retrieved %d experience documents", len(docs))
state["experience_docs"] = docs
return state

def export_debug_section(section: str, content: any, output_dir: str = "debug_output"):
"""Export section content to a text file for debugging."""
os.makedirs(output_dir, exist_ok=True)
filename = f"{section.lower().replace(' ', '_')}.txt"
filepath = os.path.join(output_dir, filename)

with open(filepath, 'w', encoding='utf-8') as f:
if isinstance(content, list):
f.write("\n".join([doc.page_content for doc in content]))
else:
f.write(str(content))
logger.info(f"Exported debug file: {filepath}")

def generate_section(self, state: TypedDict, section: str, docs_key: str) -> str:
logger.info("Generating section: %s", section)
templates = {
"corporate_overview": "Write a comprehensive corporate overview based on: {documents}"
#,"staff_profile": "Create detailed staff profiles highlighting relevant expertise based on: {documents}",
#"unique_valuation_propositions": "Identify and explain why Northramp is best for this opportunity based on: {documents}",
#"capabilities": "Describe capabilities relevant to the opportunity requirements based on: {documents}",
#"experience": "Detail relevant corporate experience and past projects based on: {documents}",
#"past_performance": "Summarize past performance and achievements based on: {documents}",
#"responses": "Provide specific responses to opportunity questions and requirements based on: {documents}"
"corporate_overview": "Write a comprehensive corporate overview based on: {documents}",
"staff_profile": "Create detailed staff profiles highlighting relevant expertise based on: {documents}",
"capabilities": "Describe capabilities relevant to the opportunity requirements based on: {documents}",
"experience": "Detail relevant corporate experience and past projects based on: {documents}",
"responses": "Provide specific responses to opportunity questions and requirements based on: {documents}"
}

prompt = PromptTemplate(
Expand All @@ -163,23 +193,28 @@ def generate_section(self, state: TypedDict, section: str, docs_key: str) -> str
docs_text = "\n".join([doc.page_content for doc in state[docs_key]])
chain = prompt | self.llm
response = chain.invoke({"documents": docs_text})
logger.info(response.content[:10])
return response.content
if self.debug:
self.export_debug_section(f"generated_{section}", response.content)
logger.info(response.content[:100])
return response

def build_document(self, state: TypedDict) -> TypedDict:
logger.info("Building document")
sections = {
"corporate_overview": "corporate_docs"
#,"staff_profile": "staff_docs",
#"capabilities": "capabilities_docs",
#"experience": "experience_docs",
#"responses": "opportunity_docs"
"corporate_overview": "corporate_docs",
"staff_profile": "staff_docs",
"capabilities": "capabilities_docs",
"experience": "experience_docs",
"responses": "opportunity_docs"
}

content = {}
for section, docs_key in sections.items():
logger.info("Generating section: %s", section)
content[section] = self.generate_section(state, section, docs_key)
response = self.generate_section(state, section, docs_key)
content[section] = response.content # Extract content from AIMessage
if self.debug:
self.export_debug_section(f"final_{section}", content[section])
time.sleep(self.wait_between_api_sections)

pdf_path = "proposal_response.pdf"
Expand Down Expand Up @@ -250,19 +285,18 @@ def build_graph(self) -> StateGraph:

workflow.add_node("opportunity", self.retrieve_opportunity_docs)
workflow.add_node("corporate", self.retrieve_corporate_docs)
# workflow.add_node("staff", self.retrieve_staff_docs)
# workflow.add_node("capabilities", self.retrieve_capabilities_docs)
# workflow.add_node("experience", self.retrieve_experience_docs)
workflow.add_node("staff", self.retrieve_staff_docs)
workflow.add_node("capabilities", self.retrieve_capabilities_docs)
workflow.add_node("experience", self.retrieve_experience_docs)
workflow.add_node("build", self.build_document)
workflow.add_node("email", self.send_email)

workflow.add_edge(START, "opportunity")
workflow.add_edge("opportunity", "corporate")
workflow.add_edge("corporate", "build")
# workflow.add_edge("corporate", "staff")
# workflow.add_edge("staff", "capabilities")
# workflow.add_edge("capabilities", "experience")
# workflow.add_edge("experience", "build")
workflow.add_edge("corporate", "staff")
workflow.add_edge("staff", "capabilities")
workflow.add_edge("capabilities", "experience")
workflow.add_edge("experience", "build")
workflow.add_edge("build", "email")
workflow.add_edge("email", END)

Expand Down