1- """This "graph" simply exposes an endpoint for a user to upload docs to be indexed."""
2-
31import asyncio
42import os
3+ import logging
54from typing import List , Optional
5+ from datetime import datetime
6+ from pathlib import Path
7+ import gc
8+ from pinecone import Index
69
710import requests
811from langchain_community .document_loaders import WebBaseLoader
912from langchain_core .documents import Document
1013from langchain_core .runnables import RunnableConfig
1114from langchain_text_splitters import RecursiveCharacterTextSplitter
1215from langgraph .graph import END , START , StateGraph
13-
1416from index_graph .configuration import IndexConfiguration
1517from index_graph .state import IndexState , InputState
1618from shared import retrieval
19+ from shared .utils import load_pinecone_index
1720
21+ # Configure logging for errors and status
22+ LOG_PATH = Path ("indexing_errors.log" )
23+ logging .basicConfig (
24+ filename = LOG_PATH ,
25+ filemode = "a" ,
26+ format = "%(asctime)s [%(levelname)s] %(message)s" ,
27+ level = logging .INFO ,
28+ )
1829
1930def check_index_config (state : IndexState , * , config : Optional [RunnableConfig ] = None ) -> dict [str , str ]:
20- """Check the API key."""
31+ """Validate API key and supported retriever provider ."""
2132 configuration = IndexConfiguration .from_runnable_config (config )
2233
2334 if not configuration .api_key :
2435 raise ValueError ("API key is required for document indexing." )
25-
36+
2637 if configuration .api_key != os .getenv ("INDEX_API_KEY" ):
2738 raise ValueError ("Authentication failed: Invalid API key provided." )
28-
39+
2940 if configuration .retriever_provider != "pinecone" :
3041 raise ValueError ("Only Pinecone is currently supported for document indexing due to specific ID prefix requirements." )
31-
42+
3243 return {}
3344
3445async def get_sitemap_urls (state : IndexState , * , config : Optional [RunnableConfig ] = None ) -> dict [str , str ]:
35- """Get the URLs from the sitemap."""
46+ """Fetch all URLs from a sitemap (XML format) ."""
3647 url = state .url_site_map
37-
38- headers = {
39- "Accept" : "application/xml" ,
40- "User-Agent" : "Mozilla/5.0 (compatible; LangChainBot/1.0)" ,
41- }
42- response = requests .get (url , headers = headers )
43- sitemap_content = response .text
48+
4449 headers = {
4550 "Accept" : "application/xml" ,
4651 "User-Agent" : "Mozilla/5.0 (compatible; LangChainBot/1.0)" ,
4752 }
4853 response = requests .get (url , headers = headers )
4954 sitemap_content = response .text
5055
51- # Extract URLs from sitemap (assuming XML format)
5256 import xml .etree .ElementTree as ET
53-
5457 root = ET .fromstring (sitemap_content )
55- # Extract all URLs, removing frequency and other metadata
5658 urls_to_index = [
5759 url .find ("{http://www.sitemaps.org/schemas/sitemap/0.9}loc" ).text
5860 for url in root .findall ("{http://www.sitemaps.org/schemas/sitemap/0.9}url" )
5961 ]
6062
6163 print (f"Found { len (urls_to_index )} URLs to index." )
62-
6364 return {"urls_to_index" : urls_to_index }
6465
65- async def index_docs (
66- state : IndexState , * , config : Optional [RunnableConfig ] = None
67- ) -> dict [str , str ]:
68- """Asynchronously index documents in the given state using the configured retriever.
69-
70- This function takes the documents from the state, ensures they have a user ID,
71- adds them to the retriever's index, and then signals for the documents to be
72- deleted from the state.
73-
74- If docs are not provided in the state, they will be loaded
75- from the configuration.docs_file JSON file.
76-
77- Args:
78- state (IndexState): The current state containing documents and retriever.
79- config (Optional[RunnableConfig]): Configuration for the indexing process.r
80- """
81- # Process all URLs in parallel
82- chunk_tasks = [index_url (url , config ) for url in state .urls_to_index ]
83- await asyncio .gather (* chunk_tasks )
84-
85- return {}
86-
87-
88- async def index_url (url : str , config : IndexConfiguration ) -> List [Document ]:
89- """Index a web path."""
90- loader = WebBaseLoader (
91- web_paths = (url ,),
92- )
93- docs = loader .load ()
94- text_splitter = RecursiveCharacterTextSplitter (chunk_size = 1000 , chunk_overlap = 200 )
95- docs = text_splitter .split_documents (docs )
96-
97- with retrieval .make_retriever (config ) as retriever :
98-
99- await retriever .vectorstore .aadd_texts (
100- #namespace= "langgraph" if "langgraph" in url else "langchain",
101- texts = [doc .page_content for doc in docs ],
102- metadatas = [doc .metadata for doc in docs ],
103- id_prefix = url ,
104- )
105-
106- return docs
66+ async def index_docs (state : IndexState , * , config : Optional [RunnableConfig ] = None ) -> dict [str , str ]:
67+ """Index documents from all URLs in batches of 100, without concurrency limitation."""
68+ # Load Pinecone index once
69+ index_name = os .environ ["PINECONE_INDEX_NAME" ]
70+ index = load_pinecone_index (index_name )
71+
72+ success_count = 0
73+ fail_count = 0
74+
75+ async def safe_index_url (url : str ) -> None :
76+ nonlocal success_count , fail_count
77+ try :
78+ await index_url (url , config = config , index = index )
79+ success_count += 1
80+ except Exception as e :
81+ logging .error (f"Failed indexing { url } : { e } " )
82+ with open ("failed_urls.txt" , "a" ) as f :
83+ f .write (f"{ url } \n " )
84+ fail_count += 1
85+ finally :
86+ gc .collect ()
87+
88+ # Process URLs in batches of 100
89+ batch_size = 100
90+ total = len (state .urls_to_index )
91+ for i in range (0 , total , batch_size ):
92+ current_batch = state .urls_to_index [i :i + batch_size ]
93+ print (f"🔄 Processing batch { i // batch_size + 1 } / { (total + batch_size - 1 ) // batch_size } " )
94+ tasks = [safe_index_url (url ) for url in current_batch ]
95+ await asyncio .gather (* tasks , return_exceptions = True )
96+
97+ print (f"Indexed: { success_count } | Failed: { fail_count } " )
98+ return {
99+ "success_count" : str (success_count ),
100+ "fail_count" : str (fail_count ),
101+ }
107102
108103
109- # Define the graph
104+ async def index_url (url : str , config : IndexConfiguration , index :Index , retry : int = 1 ) -> List [Document ]:
105+ """Delete old chunks and re-index content from a given URL."""
106+ try :
107+ logging .info (f"Indexing: { url } " )
108+ loader = WebBaseLoader (web_paths = (url ,))
109+ docs = loader .load ()
110+ text_splitter = RecursiveCharacterTextSplitter (chunk_size = 1000 , chunk_overlap = 200 )
111+ docs = text_splitter .split_documents (docs )
112+
113+ now_str = datetime .utcnow ().isoformat ()
114+ for doc in docs :
115+ doc .metadata ["source_url" ] = url
116+ doc .metadata ["last_indexed_at" ] = now_str
117+
118+ texts = [doc .page_content for doc in docs ]
119+ metadatas = [doc .metadata for doc in docs ]
120+ chunk_ids = [f"{ url } --chunk{ i } " for i in range (len (texts ))]
121+
122+
123+ print (f"Checking for existing chunks at prefix: { url } " )
124+ existing_urls = list (index .list (prefix = f"{ url } " ))
125+
126+ if existing_urls :
127+ print (f"Deleted old chunks ({ len (existing_urls )} ) for { url } " )
128+ index .delete (ids = existing_urls )
129+ else :
130+ print (f"No existing chunks found for { url } " )
131+
132+ async with retrieval .make_retriever (config ) as (_ , vectorstore ):
133+ if hasattr (vectorstore , "aadd_texts" ):
134+ await vectorstore .aadd_texts (
135+ texts = texts ,
136+ metadatas = metadatas ,
137+ ids = chunk_ids
138+ )
139+ else :
140+ for i , doc in enumerate (docs ):
141+ doc .metadata ["id" ] = chunk_ids [i ]
142+ await vectorstore .aadd_documents (docs )
143+
144+ logging .info (f"Successfully indexed { url } " )
145+ return docs
146+
147+ except Exception as e :
148+ if retry > 0 :
149+ logging .warning (f"⚠️ Retry { url } after error: { e } " )
150+ await asyncio .sleep (1 )
151+ return await index_url (url , config ,index , retry = retry - 1 )
152+ else :
153+ logging .error (f"Final failure for { url } : { e } " )
154+ return []
155+
156+ # Define the graph structure
110157builder = StateGraph (IndexState , input = InputState , config_schema = IndexConfiguration )
111158builder .add_node (check_index_config )
112159builder .add_node (index_docs )
@@ -115,6 +162,7 @@ async def index_url(url: str, config: IndexConfiguration) -> List[Document]:
115162builder .add_edge ("check_index_config" , "get_sitemap_urls" )
116163builder .add_edge ("get_sitemap_urls" , "index_docs" )
117164builder .add_edge ("index_docs" , END )
118- # Compile into a graph object that you can invoke and deploy.
165+
166+ # Compile the state graph for execution
119167graph = builder .compile ()
120168graph .name = "IndexGraph"
0 commit comments