Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 109 additions & 30 deletions src/intugle/core/semantic_search/crud.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,27 @@


class SemanticSearchCRUD:
def __init__(self, collection_name: str, embeddings: List[Embeddings], batch_size: int = 30):
def __init__(self, collection_name: str, embeddings: List[Embeddings], batch_size: Optional[int] = None):
if not collection_name or not isinstance(collection_name, str):
raise ValueError("collection_name must be a non-empty string")
if not embeddings or not isinstance(embeddings, list) or len(embeddings) == 0:
raise ValueError("embeddings must be a non-empty list of Embeddings")

self.collection_name = collection_name
self.embeddings = embeddings
self.batch_size = batch_size
self.batch_size = batch_size if batch_size is not None else settings.QDRANT_INSERT_BATCH_SIZE

if self.batch_size <= 0:
raise ValueError("batch_size must be a positive integer")

log.info(f"SemanticSearchCRUD initialized with collection_name='{self.collection_name}', "
f"embeddings_count={len(self.embeddings)}, batch_size={self.batch_size}")

@property
def vector_store(self):
if not settings.QDRANT_URL:
raise ValueError("QDRANT_URL setting is required but not configured")

client_config = {"url": settings.QDRANT_URL, "api_key": settings.QDRANT_API_KEY}
return VectorStoreService(
collection_name=self.collection_name,
Expand Down Expand Up @@ -109,28 +123,50 @@ def create_content_for_vectorization(self, _: int, row: pd.Series) -> pd.DataFra
return final_consolidated_content[final_consolidated_content.content != ""]

async def vectorize(self, content: pd.DataFrame) -> List[models.PointStruct]:
"""Vectorize content using configured embeddings."""
if content is None or content.empty:
log.warning("Empty content provided for vectorization")
return []

if not isinstance(content, pd.DataFrame):
raise ValueError("content must be a pandas DataFrame")

required_columns = ["content", "type", "column_id"]
missing_columns = [col for col in required_columns if col not in content.columns]
if missing_columns:
raise ValueError(f"Content DataFrame is missing required columns: {missing_columns}")

tags_and_columns = content.loc[content.type.isin(["tag", "column_name"])].reset_index(drop=True)
business_glossary = content.loc[content.type.isin(["glossary"])].reset_index(drop=True)

tags_and_columns_content = tags_and_columns["content"].tolist()
business_glossary_content = business_glossary["content"].tolist()
log.info(f"tags_column: {tags_and_columns_content}")
log.info(f"business glossary: {business_glossary_content}")
tags_and_columns_content = tags_and_columns["content"].tolist() if not tags_and_columns.empty else []
business_glossary_content = business_glossary["content"].tolist() if not business_glossary.empty else []

log.debug(f"Vectorizing {len(tags_and_columns_content)} tag/column items and {len(business_glossary_content)} glossary items")

if not tags_and_columns_content and not business_glossary_content:
log.warning("No content to vectorize")
return []

async def run():
# Run tags col and glossary concurrenty
# Run tags col and glossary concurrently
try:
coroutines = []
embedding_map = []
for embedding in self.embeddings:
coroutines.append(
embedding.aencode(tags_and_columns_content, embeddings_types=[EmbeddingsType.DENSE])
)
embedding_map.append((embedding, "tags_col"))
coroutines.append(
embedding.aencode(business_glossary_content, embeddings_types=[EmbeddingsType.LATE])
)
embedding_map.append((embedding, "glossary"))
if tags_and_columns_content:
coroutines.append(
embedding.aencode(tags_and_columns_content, embeddings_types=[EmbeddingsType.DENSE])
)
embedding_map.append((embedding, "tags_col"))
if business_glossary_content:
coroutines.append(
embedding.aencode(business_glossary_content, embeddings_types=[EmbeddingsType.LATE])
)
embedding_map.append((embedding, "glossary"))

if not coroutines:
return {"tags_col": {}, "glossary": {}}

gathered_results = await asyncio.gather(*coroutines)

Expand All @@ -143,17 +179,21 @@ async def run():

return results
except Exception as ex:
raise Exception(f"[!] Semantic Search: Couldnot vectorize => {ex}")
log.error(f"Vectorization failed: {ex}")
raise RuntimeError(f"Semantic Search vectorization failed: {ex}") from ex

# Run all type of embeddings concurrenlty
# Run all type of embeddings concurrently
results = await run()

points = []
point = self.convert_to_qdrant_point(tags_and_columns, results["tags_col"])
points.extend(point)
point = self.convert_to_qdrant_point(business_glossary, results["glossary"])
points.extend(point)

if not tags_and_columns.empty and results["tags_col"]:
point = self.convert_to_qdrant_point(tags_and_columns, results["tags_col"])
points.extend(point)
if not business_glossary.empty and results["glossary"]:
point = self.convert_to_qdrant_point(business_glossary, results["glossary"])
points.extend(point)

log.debug(f"Generated {len(points)} points from vectorization")
return points

@staticmethod
Expand Down Expand Up @@ -208,19 +248,58 @@ async def clean_collection(self):
await vdb.client.create_payload_index(
collection_name=self.collection_name,
field_name="type",
field_schema=models.PayloadSchemaType.KEYWORD
field_type=models.PayloadSchemaType.KEYWORD
)

async def initialize(self, column_details: list[dict]):
await self.clean_collection()
"""Initialize the semantic search collection with column details."""
if not column_details or not isinstance(column_details, list):
raise ValueError("column_details must be a non-empty list of dictionaries")

log.info(f"Starting initialization with {len(column_details)} column details, batch_size={self.batch_size}")

try:
await self.clean_collection()
log.info("Collection cleaned successfully")
except Exception as e:
log.error(f"Failed to clean collection: {e}")
raise RuntimeError(f"Failed to initialize collection: {e}") from e

async with self.vector_store as vdb:
column_details = pd.DataFrame(column_details)
column_details_df = pd.DataFrame(column_details)
total_batches = (len(column_details_df) + self.batch_size - 1) // self.batch_size
log.info(f"Processing {len(column_details_df)} columns in {total_batches} batches")

batch_count = 0
for batch in batched(column_details_df, self.batch_size):
batch_count += 1
try:
log.debug(f"Processing batch {batch_count}/{total_batches}")

content = list(itertools.starmap(self.create_content_for_vectorization, batch.iterrows()))
if not content:
log.warning(f"Batch {batch_count} produced no content, skipping")
continue

content_df = pd.concat(content, axis=0).reset_index(drop=True)
if content_df.empty:
log.warning(f"Batch {batch_count} content is empty, skipping")
continue

log.debug(f"Batch {batch_count}: Created {len(content_df)} content items")

points = await self.vectorize(content_df)
if not points:
log.warning(f"Batch {batch_count}: No points generated, skipping")
continue

for batch in batched(column_details, self.batch_size):
content = list(itertools.starmap(self.create_content_for_vectorization, batch.iterrows()))
log.debug(f"Batch {batch_count}: Generated {len(points)} points")

content = pd.concat(content, axis=0).reset_index(drop=True)
await vdb.bulk_insert(points)
log.info(f"Batch {batch_count}/{total_batches} completed successfully")

points = await self.vectorize(content)
except Exception as e:
log.error(f"Failed to process batch {batch_count}: {e}")
raise RuntimeError(f"Batch processing failed at batch {batch_count}: {e}") from e

vdb.bulk_insert(points)
log.info("Semantic search initialization completed successfully")
1 change: 1 addition & 0 deletions src/intugle/core/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def set_project_base(self, project_base: str):
VECTOR_COLLECTION_NAME: str = os.getcwd().split("/")[-1]
QDRANT_URL: str = "http://localhost:6333"
QDRANT_API_KEY: Optional[str] = None
QDRANT_INSERT_BATCH_SIZE: int = 10
TAVILY_API_KEY: Optional[str] = None
EMBEDDING_MODEL_NAME: str = "openai:ada"
TOKENIZER_MODEL_NAME: str = "cl100k_base"
Expand Down
Loading