Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions background/crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# Will call the appropriate method based on the source name and return a list of PolicyDetails objects

from background.sources.kb import get_kb_details
from db import SourceName
from background.sources.sitemap import get_sitemap_policies
from db import Source, SourceName, SourceType
from logger import setup_logger

from sources.apm import get_apm_links, get_apm_url
Expand All @@ -15,10 +16,27 @@
logger = setup_logger()


def get_source_policy_list(source_name: str) -> list[PolicyDetails] | None:
def get_source_policy_list(source: Source) -> list[PolicyDetails] | None:
"""
Get the list of policies to index for the given source
"""
if source.type == SourceType.SITEMAP:
return get_sitemap_policies(source.url)
elif source.type == SourceType.RECURSIVE:
return [] # TODO: implement recursive crawling
elif source.type == SourceType.CUSTOM:
return get_custom_policies(source)
else:
logger.error(f"Unknown source type {source.type}")
return None


def get_custom_policies(source: Source) -> list[PolicyDetails] | None:
"""
These are custom sources that need special handling
"""
source_name = source.name

if source_name == SourceName.UCOP.value:
return get_ucop_policies()
elif source_name == SourceName.UCDAPM.value:
Expand Down
8 changes: 8 additions & 0 deletions background/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,17 @@ class SourceName(Enum):
UCDKB = "UCDKB"


class SourceType(Enum):
CUSTOM = "CUSTOM" # Custom source (ex: ucop, ellucid)
RECURSIVE = "RECURSIVE" # Given a base site, index everything under that path
SITEMAP = "SITEMAP" # Given a sitemap.xml URL, parse all the pages in it
SINGLE = "SINGLE" # Given a URL, index only the given page (unsupported)


class Source(Document):
name = StringField(required=True)
url = StringField(required=True)
type = EnumField(SourceType, required=True)
last_updated = DateTimeField(required=True)
last_failed = DateTimeField(required=False)
refresh_frequency = EnumField(RefreshFrequency, required=True)
Expand Down
95 changes: 90 additions & 5 deletions background/extract.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import re
from azure.core.credentials import AzureKeyCredential
from azure.ai.documentintelligence import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import AnalyzeResult, AnalyzeDocumentRequest
from bs4 import BeautifulSoup
from dotenv import load_dotenv
from pypdf import PdfReader

from background.logger import setup_logger
from models.policy_details import PolicyDetails

load_dotenv()

Expand Down Expand Up @@ -44,13 +47,82 @@ def extract_text_from_unreadable_doc(doc_url: str) -> str | None:
return None


def extract_text_from_pdf(input_path: str, original_doc_url: str) -> str:
def extract_text_from_policy_file(input_path: str, policy: PolicyDetails) -> str:
"""
Determine which extractor to use based on the file extension.
"""
ext = os.path.splitext(input_path)[1].lower()

if ext == ".pdf":
return extract_text_from_pdf(input_path, policy)
else:
return extract_text_from_text_file(input_path, policy)

Comment on lines +50 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure Proper Handling of Unsupported File Types

The function currently handles only PDF and text files. Consider adding a default case to handle unsupported file types gracefully.

-    else:
+    elif ext == ".txt":
         return extract_text_from_text_file(input_path, policy)
+    else:
+        logger.error(f"Unsupported file type: {ext}")
+        return ""

Committable suggestion was skipped due to low confidence.


def extract_text_from_text_file(input_path: str, policy: PolicyDetails) -> str:
"""
Extract text from a text file. Sounds simple.
But we do want to check if it looks like HTML, and if so we should attempt to extract metadata too (like title)
"""

try:
with open(input_path, "r") as file:
file_contents = file.read()
except Exception as e:
logger.error(f"Error extracting text from {policy.url}: {e}")
return ""

# Create a BeautifulSoup object
soup = BeautifulSoup(file_contents, "lxml")

# If the document does not have an 'html' tag, it is not a webpage
if not soup.find("html"):
return file_contents
Comment on lines +75 to +80
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize HTML Check

The current check for HTML documents can be optimized by directly checking the presence of the html tag in the file contents before creating a BeautifulSoup object.

-    # Create a BeautifulSoup object
-    soup = BeautifulSoup(file_contents, "lxml")
-    # If the document does not have an 'html' tag, it is not a webpage
-    if not soup.find("html"):
+    # If the document does not contain an 'html' tag, it is not a webpage
+    if "<html" not in file_contents.lower():
+        return file_contents
+    
+    soup = BeautifulSoup(file_contents, "lxml")
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Create a BeautifulSoup object
soup = BeautifulSoup(file_contents, "lxml")
# If the document does not have an 'html' tag, it is not a webpage
if not soup.find("html"):
return file_contents
# If the document does not contain an 'html' tag, it is not a webpage
if "<html" not in file_contents.lower():
return file_contents
soup = BeautifulSoup(file_contents, "lxml")


# Extract title (og:title or title tag)
title = None
og_title = soup.find("meta", property="og:title")
if og_title and og_title.get("content"):
title = og_title["content"]
else:
title_tag = soup.find("title")
if title_tag:
title = title_tag.string

# Extract keywords
keywords = []
meta_keywords = soup.find("meta", attrs={"name": "keywords"})
if meta_keywords and meta_keywords.get("content"):
keywords_content = meta_keywords["content"]
keywords = [keyword.strip() for keyword in keywords_content.split(",")]

# Extract content from <main> or <body>
content = ""
main_content = soup.find("main")
if main_content:
content = main_content.get_text(separator="\n").strip()
else:
body_content = soup.find("body")
if body_content:
content = body_content.get_text(separator="\n").strip()

# modify the policy if we have a title or keywords
if title:
policy.title = title
if keywords:
policy.keywords = keywords

# return the content
return content
Comment on lines +62 to +116
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improve Error Handling for File Reading

The function should handle specific exceptions related to file reading, such as FileNotFoundError and IOError, to provide more precise error messages.

-    except Exception as e:
+    except (FileNotFoundError, IOError) as e:
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def extract_text_from_text_file(input_path: str, policy: PolicyDetails) -> str:
"""
Extract text from a text file. Sounds simple.
But we do want to check if it looks like HTML, and if so we should attempt to extract metadata too (like title)
"""
try:
with open(input_path, "r") as file:
file_contents = file.read()
except Exception as e:
logger.error(f"Error extracting text from {policy.url}: {e}")
return ""
# Create a BeautifulSoup object
soup = BeautifulSoup(file_contents, "lxml")
# If the document does not have an 'html' tag, it is not a webpage
if not soup.find("html"):
return file_contents
# Extract title (og:title or title tag)
title = None
og_title = soup.find("meta", property="og:title")
if og_title and og_title.get("content"):
title = og_title["content"]
else:
title_tag = soup.find("title")
if title_tag:
title = title_tag.string
# Extract keywords
keywords = []
meta_keywords = soup.find("meta", attrs={"name": "keywords"})
if meta_keywords and meta_keywords.get("content"):
keywords_content = meta_keywords["content"]
keywords = [keyword.strip() for keyword in keywords_content.split(",")]
# Extract content from <main> or <body>
content = ""
main_content = soup.find("main")
if main_content:
content = main_content.get_text(separator="\n").strip()
else:
body_content = soup.find("body")
if body_content:
content = body_content.get_text(separator="\n").strip()
# modify the policy if we have a title or keywords
if title:
policy.title = title
if keywords:
policy.keywords = keywords
# return the content
return content
try:
with open(input_path, "r") as file:
file_contents = file.read()
except (FileNotFoundError, IOError) as e:
logger.error(f"Error extracting text from {policy.url}: {e}")
return ""



def extract_text_from_pdf(input_path: str, policy: PolicyDetails) -> str:
"""
Extract text from a PDF file. If the text is empty, then
we might have a scanned PDF -- try to extract text using OCR.
"""
try:
logger.info(f"Extracting text from {original_doc_url}")
logger.info(f"Extracting text from {policy.url}")
with open(input_path, "rb") as file:
pdf = PdfReader(file)
text = ""
Expand All @@ -61,9 +133,22 @@ def extract_text_from_pdf(input_path: str, original_doc_url: str) -> str:

# if text is empty, then we might have a scanned pdf -- try to extract text using OCR
if not text:
logger.info(f"Extracting text using OCR from {original_doc_url}")
text = extract_text_from_unreadable_doc(original_doc_url)
logger.info(f"Extracting text using OCR from {policy.url}")
text = extract_text_from_unreadable_doc(policy.url)

return text
except Exception as e:
logger.error(f"Error extracting text from {original_doc_url}: {e}")
logger.error(f"Error extracting text from {policy.url}: {e}")


def cleanup_extracted_text(text: str) -> str:
"""
Clean up the extracted text by removing extra whitespace and redundant newlines (more than one in a row).
"""
# Normalize whitespace by replacing multiple spaces with a single space
text = " ".join(text.split())

# Replace multiple newlines with a single newline
text = re.sub(r"\n+", "\n", text)

return text
69 changes: 42 additions & 27 deletions background/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import uuid

import requests
from background.extract import extract_text_from_pdf
from background.extract import (
cleanup_extracted_text,
extract_text_from_policy_file,
)
from db import IndexedDocument, Source
from logger import log_memory_usage, setup_logger
from store import vectorize_text
Expand Down Expand Up @@ -89,8 +92,11 @@ def wait_before_next_request():
time.sleep(random.uniform(1, 3)) # Sleep for 1 to 3 seconds


# download the document and return a path to the downloaded file
def download_pdf(url: str, dir: str) -> str:
def download_policy(url: str, dir: str) -> str:
"""
Download a policy from the given URL and save it to the specified directory.
Will determine if file is PDF or text file and add the appropriate extension.
"""
headers = {"User-Agent": user_agent}
response = request_with_retry(
url, headers=headers, allow_redirects=True, timeout=60
Expand All @@ -102,13 +108,22 @@ def download_pdf(url: str, dir: str) -> str:

response.raise_for_status()

unique_filename = f"{uuid.uuid4()}.pdf"
pdf_path = os.path.join(dir, unique_filename)
file_type = "txt" # default to text

# check if the response is a PDF
if "Content-Type" in response.headers:
content_type = response.headers["Content-Type"]
if "application/pdf" in content_type:
file_type = "pdf"

unique_filename = f"{uuid.uuid4()}.{file_type}"

file_path = os.path.join(dir, unique_filename)

with open(pdf_path, "wb") as file:
with open(file_path, "wb") as file:
file.write(response.content)

return pdf_path
return file_path


def calculate_file_hash(file_path: str) -> str:
Expand All @@ -123,19 +138,17 @@ def get_document_by_url(url: str) -> IndexedDocument:
return IndexedDocument.objects(url=url).first()


def ingest_documents(source: Source, policies: List[PolicyDetails]) -> IngestResult:
def ingest_policies(source: Source, policies: List[PolicyDetails]) -> IngestResult:
start_time = datetime.now(timezone.utc)
num_docs_indexed = 0
num_new_docs = 0

with tempfile.TemporaryDirectory() as temp_dir:
for policy in policies:
logger.info(f"Processing document {policy.url}")
logger.info(f"Processing policy {policy.url}")
log_memory_usage(logger)

# TODO: for now it's all PDF, but we'll need to handle other file types

# download the document
# download the policy at the given url
# calculate the file hash
# check if it exists in the database
# extract the text
Expand All @@ -147,41 +160,45 @@ def ingest_documents(source: Source, policies: List[PolicyDetails]) -> IngestRes
logger.warning(f"Policy is None, skipping")
continue

local_pdf_path = download_pdf(policy.url, temp_dir)
local_policy_path = download_policy(policy.url, temp_dir)

if not local_pdf_path:
if not local_policy_path:
logger.error(f"Failed to download pdf at {policy.url}. ")
continue

pdf_hash = calculate_file_hash(local_pdf_path)
policy_file_hash = calculate_file_hash(local_policy_path)

document = get_document_by_url(policy.url)

# if the document exists and hasn't changed, skip
if document and document.metadata.get("hash") == pdf_hash:
if document and document.metadata.get("hash") == policy_file_hash:
logger.info(f"Document {policy.url} has not changed, skipping")
# if we skip a document, let's wait a bit to avoid rate limiting
wait_before_next_request()
continue

extracted_text = extract_text_from_pdf(local_pdf_path, policy.url)
extracted_text = extract_text_from_policy_file(local_policy_path, policy)

if not extracted_text:
logger.warning(f"No text extracted from {local_pdf_path}")
logger.warning(f"No text extracted from {local_policy_path}")
continue

extracted_text = cleanup_extracted_text(extracted_text)

# add some metadata
vectorized_document = policy.to_vectorized_document(extracted_text)
vectorized_document.metadata.hash = pdf_hash
vectorized_document.metadata.hash = policy_file_hash
vectorized_document.metadata.content_length = len(extracted_text)
vectorized_document.metadata.scope = source.name

# if we haven't seen this document before, increment the count
num_new_docs += 1 if not document else 0
num_docs_indexed += 1 # record the indexing either way

result = vectorize_text(vectorized_document)

update_document(
source,
num_docs_indexed,
num_new_docs,
policy,
document,
vectorized_document,
Expand All @@ -206,19 +223,15 @@ def ingest_documents(source: Source, policies: List[PolicyDetails]) -> IngestRes

def update_document(
source: Source,
num_docs_indexed: int,
num_new_docs: int,
policy: PolicyDetails,
document: IndexedDocument,
vectorized_document: VectorDocument,
result: dict,
):
if result:
logger.info(f"Successfully indexed document {policy.url}")
num_docs_indexed += 1
if not document:
# new doc we have never seen, create it
num_new_docs += 1
document = IndexedDocument(
url=policy.url,
metadata=vectorized_document.metadata.to_dict(),
Expand Down Expand Up @@ -272,12 +285,14 @@ def ingest_kb_documents(
vectorized_document.metadata.content_length = len(text)
vectorized_document.metadata.scope = source.name

# if we haven't seen this document before, increment the count
num_new_docs += 1 if not document else 0
num_docs_indexed += 1 # record the indexing either way

result = vectorize_text(vectorized_document)

update_document(
source,
num_docs_indexed,
num_new_docs,
policy,
document,
vectorized_document,
Comment on lines +288 to 298
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Enhance logging and error handling.

Improve logging for better traceability and add error handling for potential issues during the ingestion process.

def ingest_kb_documents(
    source: Source, policy_details_with_text: List[Tuple[PolicyDetails, str]]
) -> IngestResult:
    # KB is a special case, we already have the content
    # eventually it'd be nice to either scrape the site or get API access instead
    start_time = datetime.now(timezone.utc)
    num_docs_indexed = 0
    num_new_docs = 0

    for policy, text in policy_details_with_text:
        if not policy:
            logger.warning(f"Policy is None, skipping")
            continue

        logger.info(f"Processing document {policy.url}")
        log_memory_usage(logger)

        try:
            hash = hashlib.sha256(text.encode()).hexdigest()
            document = get_document_by_url(policy.url)

            if document and document.metadata.get("hash") == hash:
                logger.info(f"Document {policy.url} has not changed, skipping")
                continue

            if not text:
                logger.warning(f"No text extracted from {policy.url}")
                continue

            vectorized_document = policy.to_vectorized_document(text)
            vectorized_document.metadata.hash = hash
            vectorized_document.metadata.content_length = len(text)
            vectorized_document.metadata.scope = source.name

            num_new_docs += 1 if not document else 0
            num_docs_indexed += 1

            result = vectorize_text(vectorized_document)
            update_document(source, policy, document, vectorized_document, result)

        except Exception as e:
            logger.error(f"Error processing document {policy.url}: {e}")

    logger.info(f"Indexed {num_docs_indexed} documents from source {source.name}")

    end_time = datetime.now(timezone.utc)

    ## TODO: somewhere remove old documents that are no longer in the source

    return IngestResult(
        num_docs_indexed=num_docs_indexed,
        num_new_docs=num_new_docs,
        source_id=source._id,
        start_time=start_time,
        end_time=end_time,
        duration=(end_time - start_time).total_seconds(),
    )
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# if we haven't seen this document before, increment the count
num_new_docs += 1 if not document else 0
num_docs_indexed += 1 # record the indexing either way
result = vectorize_text(vectorized_document)
update_document(
source,
num_docs_indexed,
num_new_docs,
policy,
document,
vectorized_document,
# if we haven't seen this document before, increment the count
num_new_docs += 1 if not document else 0
num_docs_indexed += 1 # record the indexing either way
result = vectorize_text(vectorized_document)
update_document(
source,
policy,
document,
vectorized_document,
result
)
except Exception as e:
logger.error(f"Error processing document {policy.url}: {e}")
logger.info(f"Indexed {num_docs_indexed} documents from source {source.name}")
end_time = datetime.now(timezone.utc)
## TODO: somewhere remove old documents that are no longer in the source
return IngestResult(
num_docs_indexed=num_docs_indexed,
num_new_docs=num_new_docs,
source_id=source._id,
start_time=start_time,
end_time=end_time,
duration=(end_time - start_time).total_seconds(),
)

Expand Down
Loading