Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/intelstream/adapters/strategies/rss_discovery.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import re
from urllib.parse import urljoin, urlparse

Expand Down Expand Up @@ -142,8 +143,8 @@ async def check_path(path: str) -> str | None:
pass
return None

for path in RSS_PATHS:
result = await check_path(path)
results = await asyncio.gather(*(check_path(path) for path in RSS_PATHS))
for result in results:
if result:
return result

Expand Down
141 changes: 8 additions & 133 deletions src/intelstream/adapters/substack.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
from typing import Any

import feedparser
import httpx
import structlog

from intelstream.adapters.base import BaseAdapter, ContentData
from intelstream.utils.feed_utils import parse_feed_date

logger = structlog.get_logger()
from intelstream.adapters.base import ContentData
from intelstream.adapters.rss import RSSAdapter


class SubstackAdapter(BaseAdapter):
class SubstackAdapter(RSSAdapter):
def __init__(self, http_client: httpx.AsyncClient | None = None) -> None:
self._client = http_client
super().__init__(http_client=http_client)

@property
def source_type(self) -> str:
Expand All @@ -30,128 +24,9 @@ async def fetch_latest(
self,
identifier: str,
feed_url: str | None = None,
skip_content: bool = False, # noqa: ARG002
skip_content: bool = False,
) -> list[ContentData]:
url = feed_url or await self.get_feed_url(identifier)

logger.debug("Fetching Substack feed", identifier=identifier, url=url)

try:
if self._client:
response = await self._client.get(url, follow_redirects=True)
response.raise_for_status()
content = response.text
else:
async with httpx.AsyncClient() as client:
response = await client.get(url, follow_redirects=True)
response.raise_for_status()
content = response.text

feed = feedparser.parse(content)

if feed.bozo and not feed.entries:
logger.warning(
"Failed to parse Substack feed",
identifier=identifier,
error=str(feed.bozo_exception),
)
return []

items: list[ContentData] = []
for entry in feed.entries:
try:
item = self._parse_entry(entry, feed)
items.append(item)
except Exception as e:
logger.warning(
"Failed to parse feed entry",
identifier=identifier,
entry_id=getattr(entry, "id", "unknown"),
error=str(e),
)
continue

logger.info("Fetched Substack content", identifier=identifier, count=len(items))
return items

except httpx.HTTPStatusError as e:
logger.error(
"HTTP error fetching Substack feed",
identifier=identifier,
status_code=e.response.status_code,
)
raise
except httpx.RequestError as e:
logger.error(
"Request error fetching Substack feed",
identifier=identifier,
error=str(e),
)
raise

def _parse_entry(
self, entry: feedparser.FeedParserDict, feed: feedparser.FeedParserDict
) -> ContentData:
external_id: str = str(entry.get("id") or entry.get("link") or "")
title: str = str(entry.get("title", "Untitled"))
original_url: str = str(entry.get("link", ""))

author: str = str(entry.get("author", ""))
if not author:
feed_data: Any = feed.feed
if feed_data:
author = str(feed_data.get("title", "Unknown Author"))
else:
author = "Unknown Author"

published_at = parse_feed_date(entry)
raw_content = self._extract_content(entry)
thumbnail_url = self._extract_thumbnail(entry)

return ContentData(
external_id=external_id,
title=title,
original_url=original_url,
author=author,
published_at=published_at,
raw_content=raw_content,
thumbnail_url=thumbnail_url,
resolved_feed_url = feed_url or await self.get_feed_url(identifier)
return await super().fetch_latest(
identifier, feed_url=resolved_feed_url, skip_content=skip_content
)

def _extract_content(self, entry: feedparser.FeedParserDict) -> str | None:
if entry.get("content"):
for content in entry.content:
if content.get("type") in ("text/html", "text/plain"):
value = content.get("value")
return str(value) if value else None

if entry.get("summary"):
return str(entry.summary)

if entry.get("description"):
return str(entry.description)

return None

def _extract_thumbnail(self, entry: feedparser.FeedParserDict) -> str | None:
if entry.get("media_content"):
for media in entry.media_content:
if media.get("medium") == "image" or str(media.get("type", "")).startswith(
"image/"
):
url = media.get("url")
return str(url) if url else None

if entry.get("media_thumbnail"):
for thumb in entry.media_thumbnail:
url = thumb.get("url")
if url:
return str(url)

if entry.get("enclosures"):
for enclosure in entry.enclosures:
if str(enclosure.get("type", "")).startswith("image/"):
url = enclosure.get("href") or enclosure.get("url")
return str(url) if url else None

return None
7 changes: 0 additions & 7 deletions src/intelstream/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,6 @@ class Settings(BaseSettings):
description="Model to use for interactive /summarize command",
)

discord_max_message_length: int = Field(
default=2000,
ge=500,
le=2000,
description="Maximum Discord message length (Discord limit is 2000)",
)

http_timeout_seconds: float = Field(
default=30.0,
ge=5.0,
Expand Down
9 changes: 9 additions & 0 deletions src/intelstream/database/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,15 @@ async def content_item_exists(self, external_id: str) -> bool:
)
return result.scalar_one()

async def content_items_exist(self, external_ids: list[str]) -> set[str]:
if not external_ids:
return set()
async with self.session() as session:
result = await session.execute(
select(ContentItem.external_id).where(ContentItem.external_id.in_(external_ids))
)
return {row[0] for row in result.all()}

async def get_unposted_content_items(self, limit: int = 10) -> list[ContentItem]:
async with self.session() as session:
result = await session.execute(
Expand Down
5 changes: 1 addition & 4 deletions src/intelstream/discord/cogs/content_posting.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,7 @@ async def cog_load(self) -> None:
)
await self._pipeline.initialize()

self._poster = ContentPoster(
self.bot,
max_message_length=self.bot.settings.discord_max_message_length,
)
self._poster = ContentPoster(self.bot)
self._initialized = True

self._base_interval = self.bot.settings.content_poll_interval_minutes
Expand Down
15 changes: 15 additions & 0 deletions src/intelstream/discord/cogs/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ def __init__(self, bot: "IntelStreamBot") -> None:
self.bot = bot
self._github_service: GitHubService | None = None

async def _repo_autocomplete(
self, interaction: discord.Interaction, current: str
) -> list[app_commands.Choice[str]]:
repos = await self.bot.repository.get_all_github_repos(active_only=False)
guild_id = str(interaction.guild_id) if interaction.guild_id else None
if guild_id:
repos = [r for r in repos if r.guild_id == guild_id]
matches = [r for r in repos if current.lower() in f"{r.owner}/{r.repo}".lower()]
return [
app_commands.Choice(name=f"{r.owner}/{r.repo}", value=f"{r.owner}/{r.repo}")
for r in matches[:25]
]

def _get_github_service(self) -> GitHubService | None:
if not self.bot.settings.github_token:
return None
Expand Down Expand Up @@ -228,6 +241,7 @@ async def github_list(
@github_group.command(name="remove", description="Stop monitoring a GitHub repository")
@app_commands.default_permissions(manage_guild=True)
@app_commands.describe(repo="Repository name (owner/repo format)")
@app_commands.autocomplete(repo=_repo_autocomplete)
async def github_remove(
self,
interaction: discord.Interaction,
Expand Down Expand Up @@ -273,6 +287,7 @@ async def github_remove(
@github_group.command(name="toggle", description="Enable or disable GitHub repo monitoring")
@app_commands.default_permissions(manage_guild=True)
@app_commands.describe(repo="Repository name (owner/repo format)")
@app_commands.autocomplete(repo=_repo_autocomplete)
async def github_toggle(
self,
interaction: discord.Interaction,
Expand Down
13 changes: 13 additions & 0 deletions src/intelstream/discord/cogs/source_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ def __init__(self, bot: "IntelStreamBot") -> None:
self.bot = bot
self._anthropic_client: anthropic.AsyncAnthropic | None = None

async def _source_name_autocomplete(
self, interaction: discord.Interaction, current: str
) -> list[app_commands.Choice[str]]:
sources = await self.bot.repository.get_all_sources(active_only=False)
guild_id = str(interaction.guild_id) if interaction.guild_id else None
if guild_id:
sources = [s for s in sources if s.guild_id == guild_id]
matches = [s for s in sources if current.lower() in s.name.lower()]
return [app_commands.Choice(name=s.name, value=s.name) for s in matches[:25]]

def _get_anthropic_client(self) -> anthropic.AsyncAnthropic:
if self._anthropic_client is None:
self._anthropic_client = anthropic.AsyncAnthropic(
Expand Down Expand Up @@ -367,6 +377,7 @@ async def source_list(self, interaction: discord.Interaction) -> None:
@source_group.command(name="remove", description="Remove a content source")
@app_commands.default_permissions(manage_guild=True)
@app_commands.describe(name="Name of the source to remove")
@app_commands.autocomplete(name=_source_name_autocomplete)
async def source_remove(
self,
interaction: discord.Interaction,
Expand Down Expand Up @@ -412,6 +423,7 @@ async def source_remove(

@source_group.command(name="info", description="Show detailed info about a source")
@app_commands.describe(name="Name of the source to inspect")
@app_commands.autocomplete(name=_source_name_autocomplete)
async def source_info(
self,
interaction: discord.Interaction,
Expand Down Expand Up @@ -462,6 +474,7 @@ async def source_info(
@source_group.command(name="toggle", description="Enable or disable a content source")
@app_commands.default_permissions(manage_guild=True)
@app_commands.describe(name="Name of the source to toggle")
@app_commands.autocomplete(name=_source_name_autocomplete)
async def source_toggle(
self,
interaction: discord.Interaction,
Expand Down
Loading
Loading