diff --git a/background/crawl.py b/background/crawl.py index 0a6a493..60ccfd7 100644 --- a/background/crawl.py +++ b/background/crawl.py @@ -2,7 +2,8 @@ # Will call the appropriate method based on the source name and return a list of PolicyDetails objects from background.sources.kb import get_kb_details -from db import SourceName +from background.sources.sitemap import get_sitemap_policies +from db import Source, SourceName, SourceType from logger import setup_logger from sources.apm import get_apm_links, get_apm_url @@ -15,10 +16,27 @@ logger = setup_logger() -def get_source_policy_list(source_name: str) -> list[PolicyDetails] | None: +def get_source_policy_list(source: Source) -> list[PolicyDetails] | None: """ Get the list of policies to index for the given source """ + if source.type == SourceType.SITEMAP: + return get_sitemap_policies(source.url) + elif source.type == SourceType.RECURSIVE: + return [] # TODO: implement recursive crawling + elif source.type == SourceType.CUSTOM: + return get_custom_policies(source) + else: + logger.error(f"Unknown source type {source.type}") + return None + + +def get_custom_policies(source: Source) -> list[PolicyDetails] | None: + """ + These are custom sources that need special handling + """ + source_name = source.name + if source_name == SourceName.UCOP.value: return get_ucop_policies() elif source_name == SourceName.UCDAPM.value: diff --git a/background/db.py b/background/db.py index d307f02..0204d17 100644 --- a/background/db.py +++ b/background/db.py @@ -39,9 +39,17 @@ class SourceName(Enum): UCDKB = "UCDKB" +class SourceType(Enum): + CUSTOM = "CUSTOM" # Custom source (ex: ucop, ellucid) + RECURSIVE = "RECURSIVE" # Given a base site, index everything under that path + SITEMAP = "SITEMAP" # Given a sitemap.xml URL, parse all the pages in it + SINGLE = "SINGLE" # Given a URL, index only the given page (unsupported) + + class Source(Document): name = StringField(required=True) url = StringField(required=True) + type = EnumField(SourceType, required=True) last_updated = DateTimeField(required=True) last_failed = DateTimeField(required=False) refresh_frequency = EnumField(RefreshFrequency, required=True) diff --git a/background/extract.py b/background/extract.py index a60be03..b3649a8 100644 --- a/background/extract.py +++ b/background/extract.py @@ -1,11 +1,14 @@ import os +import re from azure.core.credentials import AzureKeyCredential from azure.ai.documentintelligence import DocumentIntelligenceClient from azure.ai.documentintelligence.models import AnalyzeResult, AnalyzeDocumentRequest +from bs4 import BeautifulSoup from dotenv import load_dotenv from pypdf import PdfReader from background.logger import setup_logger +from models.policy_details import PolicyDetails load_dotenv() @@ -44,13 +47,82 @@ def extract_text_from_unreadable_doc(doc_url: str) -> str | None: return None -def extract_text_from_pdf(input_path: str, original_doc_url: str) -> str: +def extract_text_from_policy_file(input_path: str, policy: PolicyDetails) -> str: + """ + Determine which extractor to use based on the file extension. + """ + ext = os.path.splitext(input_path)[1].lower() + + if ext == ".pdf": + return extract_text_from_pdf(input_path, policy) + else: + return extract_text_from_text_file(input_path, policy) + + +def extract_text_from_text_file(input_path: str, policy: PolicyDetails) -> str: + """ + Extract text from a text file. Sounds simple. + But we do want to check if it looks like HTML, and if so we should attempt to extract metadata too (like title) + """ + + try: + with open(input_path, "r") as file: + file_contents = file.read() + except Exception as e: + logger.error(f"Error extracting text from {policy.url}: {e}") + return "" + + # Create a BeautifulSoup object + soup = BeautifulSoup(file_contents, "lxml") + + # If the document does not have an 'html' tag, it is not a webpage + if not soup.find("html"): + return file_contents + + # Extract title (og:title or title tag) + title = None + og_title = soup.find("meta", property="og:title") + if og_title and og_title.get("content"): + title = og_title["content"] + else: + title_tag = soup.find("title") + if title_tag: + title = title_tag.string + + # Extract keywords + keywords = [] + meta_keywords = soup.find("meta", attrs={"name": "keywords"}) + if meta_keywords and meta_keywords.get("content"): + keywords_content = meta_keywords["content"] + keywords = [keyword.strip() for keyword in keywords_content.split(",")] + + # Extract content from
or + content = "" + main_content = soup.find("main") + if main_content: + content = main_content.get_text(separator="\n").strip() + else: + body_content = soup.find("body") + if body_content: + content = body_content.get_text(separator="\n").strip() + + # modify the policy if we have a title or keywords + if title: + policy.title = title + if keywords: + policy.keywords = keywords + + # return the content + return content + + +def extract_text_from_pdf(input_path: str, policy: PolicyDetails) -> str: """ Extract text from a PDF file. If the text is empty, then we might have a scanned PDF -- try to extract text using OCR. """ try: - logger.info(f"Extracting text from {original_doc_url}") + logger.info(f"Extracting text from {policy.url}") with open(input_path, "rb") as file: pdf = PdfReader(file) text = "" @@ -61,9 +133,22 @@ def extract_text_from_pdf(input_path: str, original_doc_url: str) -> str: # if text is empty, then we might have a scanned pdf -- try to extract text using OCR if not text: - logger.info(f"Extracting text using OCR from {original_doc_url}") - text = extract_text_from_unreadable_doc(original_doc_url) + logger.info(f"Extracting text using OCR from {policy.url}") + text = extract_text_from_unreadable_doc(policy.url) return text except Exception as e: - logger.error(f"Error extracting text from {original_doc_url}: {e}") + logger.error(f"Error extracting text from {policy.url}: {e}") + + +def cleanup_extracted_text(text: str) -> str: + """ + Clean up the extracted text by removing extra whitespace and redundant newlines (more than one in a row). + """ + # Normalize whitespace by replacing multiple spaces with a single space + text = " ".join(text.split()) + + # Replace multiple newlines with a single newline + text = re.sub(r"\n+", "\n", text) + + return text diff --git a/background/ingest.py b/background/ingest.py index b6918b9..cd96c24 100644 --- a/background/ingest.py +++ b/background/ingest.py @@ -19,7 +19,10 @@ import uuid import requests -from background.extract import extract_text_from_pdf +from background.extract import ( + cleanup_extracted_text, + extract_text_from_policy_file, +) from db import IndexedDocument, Source from logger import log_memory_usage, setup_logger from store import vectorize_text @@ -89,8 +92,11 @@ def wait_before_next_request(): time.sleep(random.uniform(1, 3)) # Sleep for 1 to 3 seconds -# download the document and return a path to the downloaded file -def download_pdf(url: str, dir: str) -> str: +def download_policy(url: str, dir: str) -> str: + """ + Download a policy from the given URL and save it to the specified directory. + Will determine if file is PDF or text file and add the appropriate extension. + """ headers = {"User-Agent": user_agent} response = request_with_retry( url, headers=headers, allow_redirects=True, timeout=60 @@ -102,13 +108,22 @@ def download_pdf(url: str, dir: str) -> str: response.raise_for_status() - unique_filename = f"{uuid.uuid4()}.pdf" - pdf_path = os.path.join(dir, unique_filename) + file_type = "txt" # default to text + + # check if the response is a PDF + if "Content-Type" in response.headers: + content_type = response.headers["Content-Type"] + if "application/pdf" in content_type: + file_type = "pdf" + + unique_filename = f"{uuid.uuid4()}.{file_type}" + + file_path = os.path.join(dir, unique_filename) - with open(pdf_path, "wb") as file: + with open(file_path, "wb") as file: file.write(response.content) - return pdf_path + return file_path def calculate_file_hash(file_path: str) -> str: @@ -123,19 +138,17 @@ def get_document_by_url(url: str) -> IndexedDocument: return IndexedDocument.objects(url=url).first() -def ingest_documents(source: Source, policies: List[PolicyDetails]) -> IngestResult: +def ingest_policies(source: Source, policies: List[PolicyDetails]) -> IngestResult: start_time = datetime.now(timezone.utc) num_docs_indexed = 0 num_new_docs = 0 with tempfile.TemporaryDirectory() as temp_dir: for policy in policies: - logger.info(f"Processing document {policy.url}") + logger.info(f"Processing policy {policy.url}") log_memory_usage(logger) - # TODO: for now it's all PDF, but we'll need to handle other file types - - # download the document + # download the policy at the given url # calculate the file hash # check if it exists in the database # extract the text @@ -147,41 +160,45 @@ def ingest_documents(source: Source, policies: List[PolicyDetails]) -> IngestRes logger.warning(f"Policy is None, skipping") continue - local_pdf_path = download_pdf(policy.url, temp_dir) + local_policy_path = download_policy(policy.url, temp_dir) - if not local_pdf_path: + if not local_policy_path: logger.error(f"Failed to download pdf at {policy.url}. ") continue - pdf_hash = calculate_file_hash(local_pdf_path) + policy_file_hash = calculate_file_hash(local_policy_path) document = get_document_by_url(policy.url) # if the document exists and hasn't changed, skip - if document and document.metadata.get("hash") == pdf_hash: + if document and document.metadata.get("hash") == policy_file_hash: logger.info(f"Document {policy.url} has not changed, skipping") # if we skip a document, let's wait a bit to avoid rate limiting wait_before_next_request() continue - extracted_text = extract_text_from_pdf(local_pdf_path, policy.url) + extracted_text = extract_text_from_policy_file(local_policy_path, policy) if not extracted_text: - logger.warning(f"No text extracted from {local_pdf_path}") + logger.warning(f"No text extracted from {local_policy_path}") continue + extracted_text = cleanup_extracted_text(extracted_text) + # add some metadata vectorized_document = policy.to_vectorized_document(extracted_text) - vectorized_document.metadata.hash = pdf_hash + vectorized_document.metadata.hash = policy_file_hash vectorized_document.metadata.content_length = len(extracted_text) vectorized_document.metadata.scope = source.name + # if we haven't seen this document before, increment the count + num_new_docs += 1 if not document else 0 + num_docs_indexed += 1 # record the indexing either way + result = vectorize_text(vectorized_document) update_document( source, - num_docs_indexed, - num_new_docs, policy, document, vectorized_document, @@ -206,8 +223,6 @@ def ingest_documents(source: Source, policies: List[PolicyDetails]) -> IngestRes def update_document( source: Source, - num_docs_indexed: int, - num_new_docs: int, policy: PolicyDetails, document: IndexedDocument, vectorized_document: VectorDocument, @@ -215,10 +230,8 @@ def update_document( ): if result: logger.info(f"Successfully indexed document {policy.url}") - num_docs_indexed += 1 if not document: # new doc we have never seen, create it - num_new_docs += 1 document = IndexedDocument( url=policy.url, metadata=vectorized_document.metadata.to_dict(), @@ -272,12 +285,14 @@ def ingest_kb_documents( vectorized_document.metadata.content_length = len(text) vectorized_document.metadata.scope = source.name + # if we haven't seen this document before, increment the count + num_new_docs += 1 if not document else 0 + num_docs_indexed += 1 # record the indexing either way + result = vectorize_text(vectorized_document) update_document( source, - num_docs_indexed, - num_new_docs, policy, document, vectorized_document, diff --git a/background/sources/sitemap.py b/background/sources/sitemap.py new file mode 100644 index 0000000..8779bb9 --- /dev/null +++ b/background/sources/sitemap.py @@ -0,0 +1,76 @@ +from datetime import datetime +import logging +from typing import List +from urllib.parse import urljoin, urlparse +from dotenv import load_dotenv + +from bs4 import BeautifulSoup + +from background.ingest import request_with_retry +from background.models.policy_details import PolicyDetails + +load_dotenv() # This loads the environment variables from .env + +logger = logging.getLogger(__name__) + +user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" + + +def get_sitemap_policies(sitemap_url) -> List[PolicyDetails]: + """ + Given a sitemap.xml URL, read the urls and return a list of PolicyDetails objects + """ + headers = {"User-Agent": user_agent} + response = request_with_retry( + sitemap_url, headers=headers, allow_redirects=True, timeout=60 + ) + + response.raise_for_status() + + if response is None: + logger.error(f"Failed to fetch sitemap {sitemap_url}") + return [] + + policy_details_list: List[PolicyDetails] = [] + + soup = BeautifulSoup(response.text, features="xml") + + url_tags = soup.find_all("url") + + for url_tag in url_tags: + loc_tag = url_tag.find("loc") + lastmod_tag = url_tag.find("lastmod") + + if loc_tag: + url = _ensure_absolute_url(sitemap_url, loc_tag.text) + + if lastmod_tag and lastmod_tag.text: + try: + lastmod_date = datetime.fromisoformat(lastmod_tag.text) + except ValueError: + lastmod_date = None + else: + lastmod_date = None + + policy_details = PolicyDetails( + url=url, effective_date=lastmod_date, issuance_date=lastmod_date + ) + policy_details_list.append(policy_details) + + if len(policy_details_list) == 0 and len(soup.find_all("urlset")) == 0: + # the given url doesn't look like a sitemap + logger.error( + f"No URLs found in sitemap {sitemap_url}. Check URL and ensure it's a valid sitemap.xml" + ) + return [] + + if len(policy_details_list) == 0: + raise ValueError(f"No URLs found in sitemap {sitemap_url}") + + return policy_details_list + + +def _ensure_absolute_url(source_url: str, maybe_relative_url: str) -> str: + if not urlparse(maybe_relative_url).netloc: + return urljoin(source_url, maybe_relative_url) + return maybe_relative_url diff --git a/background/update.py b/background/update.py index e6af5fd..086c3b8 100644 --- a/background/update.py +++ b/background/update.py @@ -4,7 +4,7 @@ import traceback from dotenv import load_dotenv -from ingest import ingest_documents, ingest_kb_documents +from ingest import ingest_policies, ingest_kb_documents from crawl import get_source_policy_list from db import ( IndexAttempt, @@ -14,6 +14,7 @@ Source, SourceName, SourceStatus, + SourceType, ) from mongoengine.queryset.visitor import Q @@ -45,14 +46,14 @@ def index_documents(source: Source) -> None: attempt.save() - ## TODO: each source should return a list of PolicyDetails objects from their respective functions - ## then common code to loop through each, save to db, download files, convert to text, vectorize and save to db - ## want to check if the policy already exists in the db, if so, update the metadata and text, if not, create a new one. use hash to check if file has changed + ## each source returns a list of PolicyDetails objects from their respective functions + ## then common code loops through each, save to db, download files, convert to text, vectorize and save to db + ## check if the policy already exists in the db, if so, update the metadata and text, if not, create a new one. use hash to check if file has changed ## then when all are done, update the source last_updated field and update the attempt with the final counts - ## OPTIONAL: eventually, we could add a check to see if the policy has been removed from the source, and if so, remove it from the db + ## TODO: eventually, we could add a check to see if the policy has been removed from the source, and if so, remove it from the db try: - policy_details = get_source_policy_list(source.name) + policy_details = get_source_policy_list(source) if policy_details is None: logger.error(f"Source {source.name} not recognized") @@ -79,10 +80,11 @@ def index_documents(source: Source) -> None: # loop through each policy, download files, convert to text, vectorize and save to db if source.name == SourceName.UCDKB.value: - # KB is a special case, we have the data in a JSON file + # KB is a special case, we have the data in a local JSON file + ## TODO: move to remote storage of JSON file ingest_result = ingest_kb_documents(source, policy_details) else: - ingest_result = ingest_documents(source, policy_details) + ingest_result = ingest_policies(source, policy_details) logger.info(f"Indexing source {source.name} successful.") @@ -125,6 +127,9 @@ def index_documents(source: Source) -> None: ) source.status = SourceStatus.FAILED + attempt.save() + source.save() + def cleanup_old_attempts(): """Set to failed any index_attempts that are INPROGRESS and started more than 1 day ago""" @@ -192,6 +197,7 @@ def tmp_reset_db(): source = Source( name=SourceName.UCDAPM.value, url="https://policy.ucop.edu/", + type=SourceType.CUSTOM, refresh_frequency=RefreshFrequency.DAILY, last_updated=datetime.now(timezone.utc) - timedelta(days=30), status=SourceStatus.ACTIVE, @@ -210,6 +216,7 @@ def ensure_default_source(): source = Source( name=SourceName.UCDAPM.value, url="https://policy.ucop.edu/", + type=SourceType.CUSTOM, refresh_frequency=RefreshFrequency.DAILY, last_updated=datetime.now(timezone.utc) - timedelta(days=30), status=SourceStatus.ACTIVE, diff --git a/requirements.txt b/requirements.txt index deca927..41cd49d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,7 @@ langchain-core==0.1.49 langchain-elasticsearch==0.1.3 langchain-openai==0.1.5 langchain-text-splitters==0.0.1 +lxml==5.2.2 mongoengine==0.28.2 openai==1.25.1 pymongo==4.7.3