Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ dependencies = [
"structlog>=24.4.0",
"defusedxml>=0.7.1",
"trafilatura>=1.12.0",
"numpy>=1.26.0",
"voyageai>=0.3.0",
]

[project.optional-dependencies]
Expand Down Expand Up @@ -91,6 +93,7 @@ module = [
"googleapiclient.*",
"youtube_transcript_api.*",
"trafilatura.*",
"voyageai.*",
]
ignore_missing_imports = true

Expand Down
6 changes: 6 additions & 0 deletions src/intelstream/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

if TYPE_CHECKING:
from intelstream.database.models import Source
from intelstream.services.search import SearchService

logger = structlog.get_logger(__name__)

Expand Down Expand Up @@ -109,6 +110,7 @@ def __init__(self, settings: Settings, repository: Repository) -> None:

self.settings = settings
self.repository = repository
self.search_service: SearchService | None = None
self.start_time: datetime | None = None
self._owner: discord.User | None = None

Expand Down Expand Up @@ -153,6 +155,10 @@ async def setup_hook(self) -> None:
await self.add_cog(GitHubCommands(self))
await self.add_cog(GitHubPolling(self))

from intelstream.discord.cogs.search import SearchCog

await self.add_cog(SearchCog(self))

guild = discord.Object(id=self.settings.discord_guild_id)
self.tree.copy_global_to(guild=guild)
await self.tree.sync(guild=guild)
Expand Down
23 changes: 23 additions & 0 deletions src/intelstream/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,29 @@ class Settings(BaseSettings):
default=None, description="twitterapi.io API key (optional)"
)

voyage_api_key: str | None = Field(
default=None, description="Voyage AI API key for semantic search embeddings"
)

search_embedding_model: str = Field(
default="voyage-3.5-lite",
description="Embedding model name for semantic search",
)

search_similarity_threshold: float = Field(
default=0.3,
ge=0.0,
le=1.0,
description="Minimum cosine similarity to include in search results",
)

search_max_results: int = Field(
default=5,
ge=1,
le=20,
description="Maximum number of search results to return",
)

github_token: str | None = Field(
default=None, description="GitHub Personal Access Token (optional)"
)
Expand Down
4 changes: 2 additions & 2 deletions src/intelstream/database/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from intelstream.database.models import Base, ContentItem, DiscordConfig, Source
from intelstream.database.models import Base, ContentEmbedding, ContentItem, DiscordConfig, Source
from intelstream.database.repository import Repository

__all__ = ["Base", "ContentItem", "DiscordConfig", "Repository", "Source"]
__all__ = ["Base", "ContentEmbedding", "ContentItem", "DiscordConfig", "Repository", "Source"]
24 changes: 24 additions & 0 deletions src/intelstream/database/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,35 @@ class ContentItem(Base):
created_at: Mapped[datetime] = mapped_column(DateTime, default=lambda: datetime.now(UTC))

source: Mapped["Source"] = relationship("Source", back_populates="content_items")
embedding: Mapped["ContentEmbedding | None"] = relationship(
"ContentEmbedding",
back_populates="content_item",
cascade="all, delete-orphan",
uselist=False,
)

def __repr__(self) -> str:
return f"<ContentItem(title={self.title!r}, source_id={self.source_id!r})>"


class ContentEmbedding(Base):
__tablename__ = "content_embeddings"

id: Mapped[str] = mapped_column(String(36), primary_key=True, default=lambda: str(uuid4()))
content_item_id: Mapped[str] = mapped_column(
String(36), ForeignKey("content_items.id"), nullable=False, unique=True
)
embedding_json: Mapped[str] = mapped_column(Text, nullable=False)
model_name: Mapped[str] = mapped_column(String(100), nullable=False)
text_hash: Mapped[str] = mapped_column(String(64), nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, default=lambda: datetime.now(UTC))

content_item: Mapped["ContentItem"] = relationship("ContentItem", back_populates="embedding")

def __repr__(self) -> str:
return f"<ContentEmbedding(content_item_id={self.content_item_id!r}, model={self.model_name!r})>"


class DiscordConfig(Base):
__tablename__ = "discord_config"

Expand Down
84 changes: 83 additions & 1 deletion src/intelstream/database/repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from datetime import UTC, datetime, timedelta

import structlog
from sqlalchemy import exists, func, select, text
from sqlalchemy import delete, exists, func, select, text
from sqlalchemy.exc import IntegrityError, OperationalError
from sqlalchemy.ext.asyncio import (
AsyncConnection,
Expand All @@ -18,6 +18,7 @@
)
from intelstream.database.models import (
Base,
ContentEmbedding,
ContentItem,
DiscordConfig,
ExtractionCache,
Expand Down Expand Up @@ -890,3 +891,84 @@ async def set_github_repo_active(self, repo_id: str, is_active: bool) -> bool:
await session.commit()
return True
return False

async def add_content_embedding(
self, content_item_id: str, embedding_json: str, model_name: str, text_hash: str
) -> ContentEmbedding:
async with self.session() as session:
result = await session.execute(
select(ContentEmbedding).where(ContentEmbedding.content_item_id == content_item_id)
)
existing = result.scalar_one_or_none()
if existing:
existing.embedding_json = embedding_json
existing.model_name = model_name
existing.text_hash = text_hash
else:
existing = ContentEmbedding(
content_item_id=content_item_id,
embedding_json=embedding_json,
model_name=model_name,
text_hash=text_hash,
)
session.add(existing)
await session.commit()
await session.refresh(existing)
return existing

async def get_all_embeddings(self) -> list[ContentEmbedding]:
async with self.session() as session:
result = await session.execute(select(ContentEmbedding))
return list(result.scalars().all())

async def get_items_without_embeddings(self, limit: int = 100) -> list[ContentItem]:
async with self.session() as session:
subquery = select(ContentEmbedding.content_item_id)
result = await session.execute(
select(ContentItem)
.where(ContentItem.id.notin_(subquery))
.where(ContentItem.summary.isnot(None))
.order_by(ContentItem.created_at.desc())
.limit(limit)
)
return list(result.scalars().all())

async def get_embeddings_with_items(
self,
guild_id: str | None = None,
source_type: str | None = None,
since: datetime | None = None,
) -> list[tuple[ContentEmbedding, ContentItem, Source]]:
async with self.session() as session:
query = (
select(ContentEmbedding, ContentItem, Source)
.join(ContentItem, ContentEmbedding.content_item_id == ContentItem.id)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocking] SourceType(source_type) here will raise ValueError if the user passes an invalid source type string that doesn't map to a SourceType enum value. This would cause an unhandled exception that bubbles up through the search path.

The search cog uses @app_commands.choices which constrains the Discord UI, but a malformed API request could still pass an invalid string. The exception would be caught by the cog's broad except Exception handler, which would show a generic error message -- acceptable but not ideal for UX.

Consider wrapping in a try/except in either the repository method or the _get_candidates caller:

try:
    query = query.where(Source.type == SourceType(source_type))
except ValueError:
    return []  # invalid source_type = no results

.join(Source, ContentItem.source_id == Source.id)
)
if guild_id:
query = query.where(Source.guild_id == guild_id)
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] The get_embeddings_with_items return type uses a list comprehension [(row[0], row[1], row[2]) for row in result.all()] to unpack tuples. This is fine functionally, but since SQLAlchemy's result.all() already returns Row objects that behave like tuples, you could simplify to:

return [tuple(row) for row in result.all()]

or even return list(result.tuples().all()).

Minor style point only -- the current code is clear enough. Not blocking.

if source_type:
try:
st = SourceType(source_type)
except ValueError:
return []
query = query.where(Source.type == st)
if since:
query = query.where(ContentItem.published_at >= since)
result = await session.execute(query)
return [(row[0], row[1], row[2]) for row in result.all()]

async def get_latest_embedding(self) -> ContentEmbedding | None:
async with self.session() as session:
result = await session.execute(
select(ContentEmbedding).order_by(ContentEmbedding.created_at.desc()).limit(1)
)
return result.scalar_one_or_none()

async def clear_all_embeddings(self) -> int:
async with self.session() as session:
result = await session.execute(select(func.count()).select_from(ContentEmbedding))
count = result.scalar_one()
await session.execute(delete(ContentEmbedding))
await session.commit()
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocking - Bug] result.rowcount after a delete() statement on SQLite with aiosqlite can be unreliable (it may return -1 or None in some SQLAlchemy async configurations). The original design doc avoids this by doing a count query first, which is what was implemented -- good.

Wait, re-reading: the code DOES do a count first (line ~958-959 in the actual file), then executes the delete. That's correct and matches the design. The count variable from the separate SELECT count(*) is returned, not result.rowcount. This is correct. Disregard.

return count
13 changes: 13 additions & 0 deletions src/intelstream/discord/cogs/content_posting.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,23 @@ async def cog_load(self) -> None:
max_input_length=self.bot.settings.summary_max_input_length,
)

search_service = None
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Good] Clean wiring. The conditional import and creation of SearchService only when voyage_api_key is configured matches the existing pattern for optional features. Setting self.bot.search_service here before passing to the pipeline is correct and ensures the cog registered later (SearchCog) can access it.

One minor note: the search_service local variable is set to None or the service, then assigned to both self.bot.search_service and passed to ContentPipeline. If the pipeline is later closed and recreated (unlikely but possible), the bot.search_service reference would still point to the original instance. This is fine for the current architecture.

if self.bot.settings.voyage_api_key:
from intelstream.services.search import SearchService, VoyageEmbeddingProvider

provider = VoyageEmbeddingProvider(
self.bot.settings.voyage_api_key,
model=self.bot.settings.search_embedding_model,
)
search_service = SearchService(self.bot.repository, provider)

self.bot.search_service = search_service

self._pipeline = ContentPipeline(
settings=self.bot.settings,
repository=self.bot.repository,
summarizer=summarizer,
search_service=search_service,
)
await self._pipeline.initialize()

Expand Down
115 changes: 115 additions & 0 deletions src/intelstream/discord/cogs/search.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
from __future__ import annotations

from typing import TYPE_CHECKING

import discord
import structlog
from discord import app_commands
from discord.ext import commands

from intelstream.database.models import SourceType
from intelstream.services.content_poster import SOURCE_TYPE_LABELS

if TYPE_CHECKING:
from intelstream.bot import IntelStreamBot

logger = structlog.get_logger()


class SearchCog(commands.Cog):
def __init__(self, bot: IntelStreamBot) -> None:
self.bot = bot

async def cog_load(self) -> None:
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] The cog_load() method logs when Voyage is not configured but doesn't do anything functional. Since the command-time check at line 48 handles the unconfigured case, this cog_load() override is fine but purely cosmetic. No change needed -- just noting for clarity.

However, there's a missing setup() function at the bottom of the file. Every other cog in the codebase has:

async def setup(bot: "IntelStreamBot") -> None:
    await bot.add_cog(SearchCog(bot))

While it's not strictly required since bot.py imports and registers the cog directly, the pattern is used consistently across all other cog files. Worth adding for consistency.

if not self.bot.settings.voyage_api_key:
logger.info("Voyage API key not configured, search disabled")
return

@app_commands.command(name="search", description="Search across all content")
@app_commands.describe(
query="What to search for (3-200 characters)",
days="Limit to last N days (optional)",
source_type="Filter by source type (optional)",
)
@app_commands.choices(
source_type=[
app_commands.Choice(name=label, value=st.value)
for st, label in SOURCE_TYPE_LABELS.items()
]
)
@app_commands.checks.cooldown(5, 60.0)
async def search(
self,
interaction: discord.Interaction,
query: str,
days: int | None = None,
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocking] The days parameter is not validated. A user could pass days=0 which would create a since timestamp equal to datetime.now(UTC), returning essentially nothing (or only items published this exact second). Worse, a negative value like days=-1 would set since to a future date, also returning nothing but for confusing reasons.

Add validation:

if days is not None and days < 1:
    await interaction.response.send_message(
        "Days must be at least 1.", ephemeral=True
    )
    return

Also consider an upper bound (e.g., le=3650 or similar) to prevent absurd values.

source_type: str | None = None,
) -> None:
if not self.bot.search_service:
await interaction.response.send_message(
"Semantic search is not configured.", ephemeral=True
)
return

if len(query) < 3 or len(query) > 200:
await interaction.response.send_message(
"Query must be between 3 and 200 characters.", ephemeral=True
)
return

if days is not None and days < 1:
await interaction.response.send_message(
"Days must be a positive number.", ephemeral=True
)
return

await interaction.response.defer(ephemeral=True)

guild_id = str(interaction.guild_id) if interaction.guild_id else None

try:
results = await self.bot.search_service.search(
query=query,
guild_id=guild_id,
source_type=source_type,
days=days,
limit=self.bot.settings.search_max_results,
threshold=self.bot.settings.search_similarity_threshold,
)
except Exception:
logger.exception("Search failed", query=query)
await interaction.followup.send(
"An error occurred while searching. Please try again.", ephemeral=True
)
return

if not results:
await interaction.followup.send("No relevant results found.", ephemeral=True)
return

embed = discord.Embed(
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Blocking] The query string is embedded directly into the Discord embed title without sanitization. While Discord embeds don't execute code, a very long query (up to 200 chars) combined with the Search: "..." prefix could exceed the 256-char embed title limit. The title format is f'Search: "{query}"' which adds 11 characters of overhead, so queries up to 200 chars would produce titles up to 211 chars -- under the 256 limit.

Actually, re-checking: 200 + 11 = 211 < 256, so this is fine. Disregard this concern.

However, the result.title[:250] truncation on line 93 could produce field names up to 253 chars ("1. " + 250), which also fits under the 256-char embed field name limit. Good.

One real concern: if result.original_url is very long, the field value on line 94 could exceed the 1024-char embed field value limit. Consider truncating the URL or the overall field value.

title=f'Search: "{query}"',
color=0x3498DB,
description=f"{len(results)} results found",
)
for i, result in enumerate(results, 1):
source_label = SOURCE_TYPE_LABELS.get(SourceType(result.source_type), "Unknown")
date_str = result.published_at.strftime("%b %d")
embed.add_field(
name=f"{i}. {result.title[:250]}",
value=f"[Link]({result.original_url}) | {source_label} | {date_str} | Score: {result.score:.2f}",
inline=False,
)
await interaction.followup.send(embed=embed, ephemeral=True)

@search.error
async def search_error(
self, interaction: discord.Interaction, error: app_commands.AppCommandError
) -> None:
if isinstance(error, app_commands.CommandOnCooldown):
await interaction.response.send_message(
f"Search is on cooldown. Try again in {error.retry_after:.0f}s.",
ephemeral=True,
)
else:
raise error
11 changes: 11 additions & 0 deletions src/intelstream/services/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@
import json
import time
from datetime import UTC, datetime, timedelta
from typing import TYPE_CHECKING

import anthropic
import httpx
import structlog

if TYPE_CHECKING:
from intelstream.services.search import SearchService

from intelstream.adapters.arxiv import ArxivAdapter
from intelstream.adapters.base import BaseAdapter, ContentData
from intelstream.adapters.rss import RSSAdapter
Expand All @@ -29,10 +33,12 @@ def __init__(
settings: Settings,
repository: Repository,
summarizer: SummarizationService | None = None,
search_service: "SearchService | None" = None,
) -> None:
self._settings = settings
self._repository = repository
self._summarizer = summarizer
self._search_service = search_service
self._http_client: httpx.AsyncClient | None = None
self._adapters: dict[SourceType, BaseAdapter] = {}

Expand Down Expand Up @@ -388,4 +394,9 @@ async def _handle_first_posting_backfill(self, items: list[ContentItem]) -> None
async def run_cycle(self) -> tuple[int, int]:
new_items = await self.fetch_all_sources()
summarized = await self.summarize_pending()
if self._search_service:
try:
await self._search_service.embed_pending()
except Exception:
logger.warning("Embedding step failed", exc_info=True)
return new_items, summarized
Loading
Loading