Skip to content

Commit fc3a4d6

Browse files
committed
improve index graph
1 parent c6753e6 commit fc3a4d6

6 files changed

Lines changed: 81 additions & 62 deletions

File tree

.env.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,6 @@ PINECONE_INDEX_NAME=...
2525

2626
## Mongo Atlas
2727
MONGODB_URI=... # Full connection string
28+
29+
## Index API key
30+
INDEX_API_KEY=...

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ dependencies = [
2222
"langchain-cohere>=0.2.4",
2323
"bs4>=0.0.2",
2424
"lxml>=5.3.0",
25+
"pinecone>=6.0.1",
2526
]
2627

2728
[project.optional-dependencies]

src/index_graph/configuration.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from __future__ import annotations
44

5-
from dataclasses import dataclass
5+
from dataclasses import dataclass, field
66

77
from shared.configuration import BaseConfiguration
88

@@ -18,4 +18,5 @@ class IndexConfiguration(BaseConfiguration):
1818
1919
This class defines the parameters needed for configuring the indexing and
2020
retrieval processes, including embedding model selection, retriever provider choice, and search parameters.
21-
"""
21+
"""
22+
api_key: str = field(default="", metadata={"description": "The API key for indexing documents."})

src/index_graph/graph.py

Lines changed: 61 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""This "graph" simply exposes an endpoint for a user to upload docs to be indexed."""
22

33
import asyncio
4+
import os
45
from typing import List, Optional
56

67
import requests
@@ -11,33 +12,35 @@
1112
from langgraph.graph import END, START, StateGraph
1213

1314
from index_graph.configuration import IndexConfiguration
14-
from index_graph.state import IndexState
15+
from index_graph.state import IndexState, InputState
1516
from shared import retrieval
1617

1718

18-
async def index_docs(
19-
state: IndexState, *, config: Optional[RunnableConfig] = None
20-
) -> dict[str, str]:
21-
"""Asynchronously index documents in the given state using the configured retriever.
22-
23-
This function takes the documents from the state, ensures they have a user ID,
24-
adds them to the retriever's index, and then signals for the documents to be
25-
deleted from the state.
26-
27-
If docs are not provided in the state, they will be loaded
28-
from the configuration.docs_file JSON file.
19+
def check_index_config(state: IndexState, *, config: Optional[RunnableConfig] = None) -> dict[str, str]:
20+
"""Check the API key."""
21+
configuration = IndexConfiguration.from_runnable_config(config)
2922

30-
Args:
31-
state (IndexState): The current state containing documents and retriever.
32-
config (Optional[RunnableConfig]): Configuration for the indexing process.r
33-
"""
34-
if not config:
35-
raise ValueError("Configuration required to run index_docs.")
23+
if not configuration.api_key:
24+
raise ValueError("API key is required for document indexing.")
25+
26+
if configuration.api_key != os.getenv("INDEX_API_KEY"):
27+
raise ValueError("Authentication failed: Invalid API key provided.")
28+
29+
if configuration.retriever_provider != "pinecone":
30+
raise ValueError("Only Pinecone is currently supported for document indexing due to specific ID prefix requirements.")
31+
32+
return {}
3633

34+
async def get_sitemap_urls(state: IndexState, *, config: Optional[RunnableConfig] = None) -> dict[str, str]:
35+
"""Get the URLs from the sitemap."""
3736
url = state.url_site_map
38-
39-
# Load and parse the sitemap
40-
37+
38+
headers = {
39+
"Accept": "application/xml",
40+
"User-Agent": "Mozilla/5.0 (compatible; LangChainBot/1.0)",
41+
}
42+
response = requests.get(url, headers=headers)
43+
sitemap_content = response.text
4144
headers = {
4245
"Accept": "application/xml",
4346
"User-Agent": "Mozilla/5.0 (compatible; LangChainBot/1.0)",
@@ -50,64 +53,67 @@ async def index_docs(
5053

5154
root = ET.fromstring(sitemap_content)
5255
# Extract all URLs, removing frequency and other metadata
53-
urls = [
56+
urls_to_index = [
5457
url.find("{http://www.sitemaps.org/schemas/sitemap/0.9}loc").text
5558
for url in root.findall("{http://www.sitemaps.org/schemas/sitemap/0.9}url")
5659
]
5760

58-
# Process each URL
59-
docs = []
61+
print(f"Found {len(urls_to_index)} URLs to index.")
6062

61-
# Convert synchronous function to async
62-
async def async_get_web_chunks(url):
63-
return await asyncio.get_event_loop().run_in_executor(
64-
None, get_web_chuncks, url
65-
)
63+
return {"urls_to_index": urls_to_index}
6664

67-
# Process all URLs in parallel
68-
chunk_tasks = [async_get_web_chunks(url) for url in urls]
69-
chunks_list = await asyncio.gather(*chunk_tasks)
65+
async def index_docs(
66+
state: IndexState, *, config: Optional[RunnableConfig] = None
67+
) -> dict[str, str]:
68+
"""Asynchronously index documents in the given state using the configured retriever.
7069
71-
# Flatten the list of lists into single docs list
72-
docs = [doc for chunks in chunks_list for doc in chunks]
70+
This function takes the documents from the state, ensures they have a user ID,
71+
adds them to the retriever's index, and then signals for the documents to be
72+
deleted from the state.
7373
74-
print(f"Indexing {len(docs)} documents from {url}.")
74+
If docs are not provided in the state, they will be loaded
75+
from the configuration.docs_file JSON file.
7576
76-
with retrieval.make_retriever(config) as retriever:
77-
batch_size = min(
78-
500, max(1, len(docs))
79-
) # Target 500 docs per batch, but handle smaller doc counts
80-
for i in range(0, len(docs), batch_size):
81-
batch = docs[i : i + batch_size]
82-
await retriever.aadd_documents(batch)
83-
print(
84-
f"Indexed batch {i//batch_size + 1} of {(len(docs) + batch_size - 1)//batch_size}"
85-
)
77+
Args:
78+
state (IndexState): The current state containing documents and retriever.
79+
config (Optional[RunnableConfig]): Configuration for the indexing process.r
80+
"""
81+
# Process all URLs in parallel
82+
chunk_tasks = [index_url(url, config) for url in state.urls_to_index]
83+
await asyncio.gather(*chunk_tasks)
8684

87-
return {"docs": docs}
85+
return {}
8886

8987

90-
def get_web_chuncks(path: str) -> List[Document]:
88+
async def index_url(url: str, config: IndexConfiguration) -> List[Document]:
9189
"""Index a web path."""
9290
loader = WebBaseLoader(
93-
web_paths=(path,),
91+
web_paths=(url,),
9492
)
9593
docs = loader.load()
96-
97-
print(f"Loaded {len(docs)} documents from {path}.")
98-
9994
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
100-
10195
docs = text_splitter.split_documents(docs)
10296

103-
print(f"Split {len(docs)} documents into chunks.")
97+
with retrieval.make_retriever(config) as retriever:
98+
99+
await retriever.vectorstore.aadd_texts(
100+
#namespace= "langgraph" if "langgraph" in url else "langchain",
101+
texts=[doc.page_content for doc in docs],
102+
metadatas=[doc.metadata for doc in docs],
103+
id_prefix=url,
104+
)
105+
104106
return docs
105107

106108

107109
# Define the graph
108-
builder = StateGraph(IndexState, config_schema=IndexConfiguration)
110+
builder = StateGraph(IndexState, input=InputState, config_schema=IndexConfiguration)
111+
builder.add_node(check_index_config)
109112
builder.add_node(index_docs)
110-
builder.add_edge(START, "index_docs")
113+
builder.add_node(get_sitemap_urls)
114+
builder.add_edge(START, "check_index_config")
115+
builder.add_edge("check_index_config", "get_sitemap_urls")
116+
builder.add_edge("get_sitemap_urls", "index_docs")
111117
builder.add_edge("index_docs", END)
112118
# Compile into a graph object that you can invoke and deploy.
113119
graph = builder.compile()

src/index_graph/state.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,26 @@
11
"""State management for the index graph."""
22

3-
from dataclasses import dataclass
3+
from dataclasses import dataclass, field
4+
5+
6+
@dataclass(kw_only=True)
7+
class InputState:
8+
"""The input state for the index graph."""
9+
10+
url_site_map: str
11+
"""The URL to the site map to index."""
412

513

614
# The index state defines the simple IO for the single-node index graph
715
@dataclass(kw_only=True)
8-
class IndexState:
16+
class IndexState(InputState):
917
"""Represents the state for document indexing and retrieval.
1018
1119
This class defines the structure of the index state, which includes
1220
the documents to be indexed and the retriever used for searching
1321
these documents.
1422
"""
1523

24+
urls_to_index: list[str] = field(default_factory=list)
25+
"""The URLs to index."""
1626

17-
url_site_map: str
18-
"""The URL to the site map to index."""

src/simple_rag/graph.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def generate(state: GraphState):
4545

4646
# RAG generation
4747
# Prompt
48-
prompt = hub.pull("self-rag")
48+
prompt = hub.pull("langchaindoc/simple-rag")
4949

5050
# LLM
5151
llm = ChatOpenAI(model_name="gpt-4o-mini", temperature=0)

0 commit comments

Comments
 (0)