1+ """This "graph" simply exposes an endpoint for a user to upload docs to be indexed."""
2+
13import asyncio
24import os
3- import logging
45from typing import List , Optional
5- from datetime import datetime
6- from pathlib import Path
7- import gc
8- from pinecone import Index
96
107import requests
118from langchain_community .document_loaders import WebBaseLoader
129from langchain_core .documents import Document
1310from langchain_core .runnables import RunnableConfig
1411from langchain_text_splitters import RecursiveCharacterTextSplitter
1512from langgraph .graph import END , START , StateGraph
13+
1614from index_graph .configuration import IndexConfiguration
1715from index_graph .state import IndexState , InputState
1816from shared import retrieval
19- from shared .utils import load_pinecone_index
2017
21- # Configure logging for errors and status
22- LOG_PATH = Path ("indexing_errors.log" )
23- logging .basicConfig (
24- filename = LOG_PATH ,
25- filemode = "a" ,
26- format = "%(asctime)s [%(levelname)s] %(message)s" ,
27- level = logging .INFO ,
28- )
2918
3019def check_index_config (state : IndexState , * , config : Optional [RunnableConfig ] = None ) -> dict [str , str ]:
31- """Validate API key and supported retriever provider ."""
20+ """Check the API key."""
3221 configuration = IndexConfiguration .from_runnable_config (config )
3322
3423 if not configuration .api_key :
3524 raise ValueError ("API key is required for document indexing." )
36-
25+
3726 if configuration .api_key != os .getenv ("INDEX_API_KEY" ):
3827 raise ValueError ("Authentication failed: Invalid API key provided." )
39-
28+
4029 if configuration .retriever_provider != "pinecone" :
4130 raise ValueError ("Only Pinecone is currently supported for document indexing due to specific ID prefix requirements." )
42-
31+
4332 return {}
4433
4534async def get_sitemap_urls (state : IndexState , * , config : Optional [RunnableConfig ] = None ) -> dict [str , str ]:
46- """Fetch all URLs from a sitemap (XML format) ."""
35+ """Get the URLs from the sitemap."""
4736 url = state .url_site_map
48-
37+
38+ headers = {
39+ "Accept" : "application/xml" ,
40+ "User-Agent" : "Mozilla/5.0 (compatible; LangChainBot/1.0)" ,
41+ }
42+ response = requests .get (url , headers = headers )
43+ sitemap_content = response .text
4944 headers = {
5045 "Accept" : "application/xml" ,
5146 "User-Agent" : "Mozilla/5.0 (compatible; LangChainBot/1.0)" ,
5247 }
5348 response = requests .get (url , headers = headers )
5449 sitemap_content = response .text
5550
51+ # Extract URLs from sitemap (assuming XML format)
5652 import xml .etree .ElementTree as ET
53+
5754 root = ET .fromstring (sitemap_content )
55+ # Extract all URLs, removing frequency and other metadata
5856 urls_to_index = [
5957 url .find ("{http://www.sitemaps.org/schemas/sitemap/0.9}loc" ).text
6058 for url in root .findall ("{http://www.sitemaps.org/schemas/sitemap/0.9}url" )
6159 ]
6260
6361 print (f"Found { len (urls_to_index )} URLs to index." )
62+
6463 return {"urls_to_index" : urls_to_index }
6564
66- async def index_docs (state : IndexState , * , config : Optional [RunnableConfig ] = None ) -> dict [str , str ]:
67- """Index documents from all URLs in batches of 100, without concurrency limitation."""
68- # Load Pinecone index once
69- index_name = os .environ ["PINECONE_INDEX_NAME" ]
70- index = load_pinecone_index (index_name )
71-
72- success_count = 0
73- fail_count = 0
74-
75- async def safe_index_url (url : str ) -> None :
76- nonlocal success_count , fail_count
77- try :
78- await index_url (url , config = config , index = index )
79- success_count += 1
80- except Exception as e :
81- logging .error (f"Failed indexing { url } : { e } " )
82- with open ("failed_urls.txt" , "a" ) as f :
83- f .write (f"{ url } \n " )
84- fail_count += 1
85- finally :
86- gc .collect ()
87-
88- # Process URLs in batches of 100
89- batch_size = 100
90- total = len (state .urls_to_index )
91- for i in range (0 , total , batch_size ):
92- current_batch = state .urls_to_index [i :i + batch_size ]
93- print (f"🔄 Processing batch { i // batch_size + 1 } / { (total + batch_size - 1 ) // batch_size } " )
94- tasks = [safe_index_url (url ) for url in current_batch ]
95- await asyncio .gather (* tasks , return_exceptions = True )
96-
97- print (f"Indexed: { success_count } | Failed: { fail_count } " )
98- return {
99- "success_count" : str (success_count ),
100- "fail_count" : str (fail_count ),
101- }
65+ async def index_docs (
66+ state : IndexState , * , config : Optional [RunnableConfig ] = None
67+ ) -> dict [str , str ]:
68+ """Asynchronously index documents in the given state using the configured retriever.
69+
70+ This function takes the documents from the state, ensures they have a user ID,
71+ adds them to the retriever's index, and then signals for the documents to be
72+ deleted from the state.
73+
74+ If docs are not provided in the state, they will be loaded
75+ from the configuration.docs_file JSON file.
76+
77+ Args:
78+ state (IndexState): The current state containing documents and retriever.
79+ config (Optional[RunnableConfig]): Configuration for the indexing process.r
80+ """
81+ # Process all URLs in parallel
82+ chunk_tasks = [index_url (url , config ) for url in state .urls_to_index ]
83+ await asyncio .gather (* chunk_tasks )
84+
85+ return {}
10286
10387
104- async def index_url (url : str , config : IndexConfiguration , index :Index , retry : int = 1 ) -> List [Document ]:
105- """Delete old chunks and re-index content from a given URL."""
106- try :
107- logging .info (f"Indexing: { url } " )
108- loader = WebBaseLoader (web_paths = (url ,))
109- docs = loader .load ()
110- text_splitter = RecursiveCharacterTextSplitter (chunk_size = 1000 , chunk_overlap = 200 )
111- docs = text_splitter .split_documents (docs )
112-
113- now_str = datetime .utcnow ().isoformat ()
114- for doc in docs :
115- doc .metadata ["source_url" ] = url
116- doc .metadata ["last_indexed_at" ] = now_str
117-
118- texts = [doc .page_content for doc in docs ]
119- metadatas = [doc .metadata for doc in docs ]
120- chunk_ids = [f"{ url } --chunk{ i } " for i in range (len (texts ))]
121-
122-
123- print (f"Checking for existing chunks at prefix: { url } " )
124- existing_urls = list (index .list (prefix = f"{ url } " ))
125-
126- if existing_urls :
127- print (f"Deleted old chunks ({ len (existing_urls )} ) for { url } " )
128- index .delete (ids = existing_urls )
129- else :
130- print (f"No existing chunks found for { url } " )
131-
132- async with retrieval .make_retriever (config ) as (_ , vectorstore ):
133- if hasattr (vectorstore , "aadd_texts" ):
134- await vectorstore .aadd_texts (
135- texts = texts ,
136- metadatas = metadatas ,
137- ids = chunk_ids
138- )
139- else :
140- for i , doc in enumerate (docs ):
141- doc .metadata ["id" ] = chunk_ids [i ]
142- await vectorstore .aadd_documents (docs )
143-
144- logging .info (f"Successfully indexed { url } " )
145- return docs
146-
147- except Exception as e :
148- if retry > 0 :
149- logging .warning (f"⚠️ Retry { url } after error: { e } " )
150- await asyncio .sleep (1 )
151- return await index_url (url , config ,index , retry = retry - 1 )
152- else :
153- logging .error (f"Final failure for { url } : { e } " )
154- return []
155-
156- # Define the graph structure
88+ async def index_url (url : str , config : IndexConfiguration ) -> List [Document ]:
89+ """Index a web path."""
90+ loader = WebBaseLoader (
91+ web_paths = (url ,),
92+ )
93+ docs = loader .load ()
94+ text_splitter = RecursiveCharacterTextSplitter (chunk_size = 1000 , chunk_overlap = 200 )
95+ docs = text_splitter .split_documents (docs )
96+
97+ with retrieval .make_retriever (config ) as retriever :
98+
99+ await retriever .vectorstore .aadd_texts (
100+ #namespace= "langgraph" if "langgraph" in url else "langchain",
101+ texts = [doc .page_content for doc in docs ],
102+ metadatas = [doc .metadata for doc in docs ],
103+ id_prefix = url ,
104+ )
105+
106+ return docs
107+
108+
109+ # Define the graph
157110builder = StateGraph (IndexState , input = InputState , config_schema = IndexConfiguration )
158111builder .add_node (check_index_config )
159112builder .add_node (index_docs )
@@ -162,7 +115,6 @@ async def index_url(url: str, config: IndexConfiguration, index:Index, retry: in
162115builder .add_edge ("check_index_config" , "get_sitemap_urls" )
163116builder .add_edge ("get_sitemap_urls" , "index_docs" )
164117builder .add_edge ("index_docs" , END )
165-
166- # Compile the state graph for execution
118+ # Compile into a graph object that you can invoke and deploy.
167119graph = builder .compile ()
168120graph .name = "IndexGraph"
0 commit comments