Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ ANTHROPIC_API_KEY=your_anthropic_api_key_here
# YouTube API (optional - for YouTube channel monitoring)
YOUTUBE_API_KEY=your_youtube_api_key_here

# Twitter/X API (optional - for Twitter account monitoring)
# Get a Bearer Token from https://developer.x.com/en/portal/dashboard
# TWITTER_BEARER_TOKEN=your_x_api_bearer_token_here

# Database Configuration
DATABASE_URL=sqlite+aiosqlite:///./data/intelstream.db

Expand Down
18 changes: 10 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ A Discord bot that monitors content sources and posts AI-generated summaries to
- **RSS/Atom feeds** - Support for any standard RSS or Atom feed
- **Arxiv papers** - Monitor research paper categories (cs.AI, cs.LG, cs.CL, etc.)
- **Blogs** - Smart extraction from any blog using cascading discovery strategies (RSS, Sitemap, LLM extraction)
- **Twitter/X accounts** - Monitor Twitter accounts for new tweets via twitterapi.io
- **Twitter/X accounts** - Monitor Twitter accounts for new tweets via official X API v2
- **Web pages** - Monitor any web page URL with automatic content detection
- **GitHub repositories** - Track commits, pull requests, and issues with Discord embeds
- **Manual summarization** - Summarize any URL on-demand with `/summarize`
Expand All @@ -23,7 +23,7 @@ A Discord bot that monitors content sources and posts AI-generated summaries to
- Discord Bot Token
- Anthropic API Key (for Claude)
- YouTube API Key (optional, for YouTube monitoring)
- twitterapi.io API Key (optional, for Twitter monitoring)
- X/Twitter API v2 Bearer Token (optional, for Twitter monitoring)

## Setup

Expand All @@ -48,8 +48,8 @@ A Discord bot that monitors content sources and posts AI-generated summaries to
# Optional: YouTube monitoring
# YOUTUBE_API_KEY=your_youtube_api_key

# Optional: Twitter monitoring
# TWITTER_API_KEY=your_twitterapi_io_api_key
# Optional: Twitter monitoring (X API v2)
# TWITTER_BEARER_TOKEN=your_x_api_bearer_token
```

4. Run the bot:
Expand All @@ -73,7 +73,7 @@ A Discord bot that monitors content sources and posts AI-generated summaries to
| Variable | Default | Description |
|----------|---------|-------------|
| `YOUTUBE_API_KEY` | - | YouTube Data API key (required for YouTube monitoring) |
| `TWITTER_API_KEY` | - | twitterapi.io API key (required for Twitter monitoring) |
| `TWITTER_BEARER_TOKEN` | - | X API v2 Bearer Token (required for Twitter monitoring) |
| `GITHUB_TOKEN` | - | GitHub Personal Access Token (required for GitHub monitoring) |
| `GITHUB_POLL_INTERVAL_MINUTES` | `5` | Polling interval for GitHub repositories (1-60) |
| `DATABASE_URL` | `sqlite+aiosqlite:///./data/intelstream.db` | Database connection string |
Expand Down Expand Up @@ -164,7 +164,7 @@ The optional `summarize` parameter controls whether content is summarized by AI.
- `RSS` - Any RSS/Atom feed URL
- `Arxiv` - Arxiv category code (e.g., `cs.AI`, `cs.LG`, `cs.CL`, `cs.CV`, `stat.ML`)
- `Blog` - Any blog URL (uses cascading discovery: RSS, Sitemap, LLM extraction)
- `Twitter` - Twitter/X account URL (requires twitterapi.io API key)
- `Twitter` - Twitter/X account URL (requires X API v2 Bearer Token)
- `Page` - Any web page URL (uses AI to detect content structure)

#### Configuration
Expand Down Expand Up @@ -265,7 +265,9 @@ Both full GitHub URLs and `owner/repo` format are supported. The optional `chann

Results are cached to avoid repeated extraction on subsequent polls.

**Twitter**: Monitors Twitter/X accounts for new original tweets using the twitterapi.io API. Retweets are skipped; only original tweets and quote tweets are included. Quoted tweet text is included in the content for richer summarization. When added with `summarize:False`, the bot posts bare tweet URLs (Discord auto-embeds the tweet preview). Requires a twitterapi.io API key (`TWITTER_API_KEY`).
**Twitter**: Monitors Twitter/X accounts for new original tweets using the official X API v2. Retweets and replies are filtered server-side for cost efficiency. Quote tweets are included with the quoted text appended for context. Long tweets (over 280 characters) are fully captured. Media attachments (images, videos) are detected and the first image URL is stored as the thumbnail. When added with `summarize:False`, the bot posts bare tweet URLs (Discord auto-embeds the tweet preview). Requires an X API v2 Bearer Token (`TWITTER_BEARER_TOKEN`).

**Twitter cost considerations**: The X API v2 uses either a tiered subscription (Basic: $200/month, 15,000 reads) or a pay-per-use credit system. IntelStream fetches 5 tweets per poll and caches user ID lookups in memory to minimize API usage. With the default 15-minute poll interval, 10 Twitter sources consume roughly 28,800 reads/month (10 sources x 4 polls/hour x 24h x 30d). Set `TWITTER_POLL_INTERVAL_MINUTES` to a higher value (e.g., 30 or 60) for even lower consumption.

**Page**: When you add a Page source, the bot uses Claude to analyze the page structure and automatically determine CSS selectors for extracting posts.

Expand Down Expand Up @@ -330,7 +332,7 @@ src/intelstream/
│ ├── rss.py # Generic RSS/Atom adapter
│ ├── arxiv.py # Arxiv RSS adapter
│ ├── smart_blog.py # Blog adapter with cascading strategies
│ ├── twitter.py # Twitter/X adapter via twitterapi.io
│ ├── twitter.py # Twitter/X adapter via official X API v2
│ ├── page.py # Web page adapter
│ └── strategies/ # Discovery strategies for Blog adapter
│ ├── rss_discovery.py
Expand Down
2 changes: 2 additions & 0 deletions src/intelstream/adapters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from intelstream.adapters.page import PageAdapter
from intelstream.adapters.rss import RSSAdapter
from intelstream.adapters.substack import SubstackAdapter
from intelstream.adapters.twitter import TwitterAdapter
from intelstream.adapters.youtube import YouTubeAdapter

__all__ = [
Expand All @@ -12,5 +13,6 @@
"PageAdapter",
"RSSAdapter",
"SubstackAdapter",
"TwitterAdapter",
"YouTubeAdapter",
]
223 changes: 171 additions & 52 deletions src/intelstream/adapters/twitter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import UTC, datetime
from typing import Any

import httpx
import structlog
Expand All @@ -7,15 +8,20 @@

logger = structlog.get_logger()

TWITTER_API_BASE = "https://api.twitterapi.io"
TWITTER_DATE_FORMAT = "%a %b %d %H:%M:%S %z %Y"
X_API_BASE = "https://api.x.com/2"
TITLE_MAX_LENGTH = 100

TWEET_FIELDS = "created_at,author_id,referenced_tweets,attachments,public_metrics,note_tweet"
USER_FIELDS = "name,username,profile_image_url"
MEDIA_FIELDS = "url,preview_image_url,type"
EXPANSIONS = "author_id,attachments.media_keys,referenced_tweets.id"


class TwitterAdapter(BaseAdapter):
def __init__(self, api_key: str, http_client: httpx.AsyncClient | None = None) -> None:
self._api_key = api_key
def __init__(self, bearer_token: str, http_client: httpx.AsyncClient | None = None) -> None:
self._bearer_token = bearer_token
self._client = http_client
self._user_id_cache: dict[str, str] = {}

@property
def source_type(self) -> str:
Expand All @@ -32,50 +38,59 @@ async def fetch_latest(
) -> list[ContentData]:
logger.debug("Fetching Twitter timeline", identifier=identifier, skip_content=skip_content)

headers = {"X-API-Key": self._api_key}
params: dict[str, str] = {"userName": identifier, "includeReplies": "false"}
user_id = await self._resolve_user_id(identifier)
if not user_id:
return []

if self._client:
response = await self._client.get(
f"{TWITTER_API_BASE}/twitter/user/last_tweets",
headers=headers,
params=params,
)
else:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(
f"{TWITTER_API_BASE}/twitter/user/last_tweets",
headers=headers,
params=params,
)
params: dict[str, str] = {
"max_results": "5",
"exclude": "retweets,replies",
"tweet.fields": TWEET_FIELDS,
"user.fields": USER_FIELDS,
"expansions": EXPANSIONS,
}

response.raise_for_status()
if not skip_content:
params["media.fields"] = MEDIA_FIELDS

response = await self._request(f"{X_API_BASE}/users/{user_id}/tweets", params=params)
data = response.json()

tweets_raw = data.get("tweets", [])
if "errors" in data and "data" not in data:
for error in data["errors"]:
logger.error(
"X API error",
identifier=identifier,
error_title=error.get("title"),
error_detail=error.get("detail"),
)
return []

tweets_raw: list[dict[str, Any]] = data.get("data", [])
includes: dict[str, Any] = data.get("includes", {})
meta: dict[str, Any] = data.get("meta", {})

logger.debug(
"Twitter API response",
"X API response",
identifier=identifier,
status=data.get("status"),
tweet_count=len(tweets_raw),
has_next_page=data.get("has_next_page"),
result_count=meta.get("result_count"),
)

if data.get("status") != "success":
logger.error(
"Twitter API returned error",
identifier=identifier,
message=data.get("message"),
)
return []
users_map = self._build_users_map(includes)
media_map = self._build_media_map(includes)
referenced_tweets_map = self._build_referenced_tweets_map(includes)

items: list[ContentData] = []
for tweet in tweets_raw:
if tweet.get("retweeted_tweet"):
continue

try:
item = self._parse_tweet(tweet, skip_content=skip_content)
item = self._parse_tweet(
tweet,
users_map=users_map,
media_map=media_map,
referenced_tweets_map=referenced_tweets_map,
skip_content=skip_content,
)
items.append(item)
except Exception as e:
logger.warning(
Expand All @@ -88,28 +103,100 @@ async def fetch_latest(
logger.info("Fetched Twitter content", identifier=identifier, count=len(items))
return items

def _parse_tweet(self, tweet: dict[str, object], skip_content: bool = False) -> ContentData:
async def _resolve_user_id(self, username: str) -> str | None:
if username in self._user_id_cache:
return self._user_id_cache[username]

response = await self._request(
f"{X_API_BASE}/users/by/username/{username}",
params={"user.fields": "id"},
)

data = response.json()

if "errors" in data and "data" not in data:
for error in data["errors"]:
logger.error(
"X API user lookup error",
username=username,
error_title=error.get("title"),
error_detail=error.get("detail"),
)
return None

user_data: dict[str, Any] | None = data.get("data")
if not user_data:
logger.error("X API user not found", username=username)
return None

user_id = str(user_data["id"])
self._user_id_cache[username] = user_id
return user_id

async def _request(self, url: str, params: dict[str, str] | None = None) -> httpx.Response:
headers = {"Authorization": f"Bearer {self._bearer_token}"}

if self._client:
response = await self._client.get(url, headers=headers, params=params)
else:
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(url, headers=headers, params=params)

response.raise_for_status()
return response

def _build_users_map(self, includes: dict[str, Any]) -> dict[str, dict[str, Any]]:
users: list[dict[str, Any]] = includes.get("users", [])
return {str(u["id"]): u for u in users}

def _build_media_map(self, includes: dict[str, Any]) -> dict[str, dict[str, Any]]:
media: list[dict[str, Any]] = includes.get("media", [])
return {str(m["media_key"]): m for m in media}

def _build_referenced_tweets_map(self, includes: dict[str, Any]) -> dict[str, dict[str, Any]]:
tweets: list[dict[str, Any]] = includes.get("tweets", [])
return {str(t["id"]): t for t in tweets}

def _parse_tweet(
self,
tweet: dict[str, Any],
users_map: dict[str, dict[str, Any]],
media_map: dict[str, dict[str, Any]],
referenced_tweets_map: dict[str, dict[str, Any]],
skip_content: bool = False,
) -> ContentData:
tweet_id = str(tweet["id"])
text = str(tweet.get("text", ""))
url = str(tweet.get("url", f"https://x.com/i/status/{tweet_id}"))
note_tweet = tweet.get("note_tweet")
text = (
str(note_tweet["text"])
if isinstance(note_tweet, dict) and "text" in note_tweet
else str(tweet.get("text", ""))
)
author_id = str(tweet.get("author_id", ""))

author_data = tweet.get("author") or {}
if not isinstance(author_data, dict):
author_data = {}
author_name = str(author_data.get("name") or author_data.get("userName", "Unknown"))
profile_pic = author_data.get("profilePicture")
author_info = users_map.get(author_id, {})
author_name = str(author_info.get("name") or author_info.get("username", "Unknown"))
username = str(author_info.get("username", ""))
profile_pic = author_info.get("profile_image_url")

title = self._make_title(text)
published_at = self._parse_twitter_date(
str(tweet["createdAt"]) if tweet.get("createdAt") else None
url = (
f"https://x.com/{username}/status/{tweet_id}"
if username
else f"https://x.com/i/status/{tweet_id}"
)

title = self._make_title(text)
published_at = self._parse_iso_date(tweet.get("created_at"))

raw_content = None
if not skip_content:
raw_content = text
quoted = tweet.get("quoted_tweet")
if isinstance(quoted, dict) and quoted.get("text"):
raw_content += f"\n\n[Quoted: {quoted['text']}]"

quoted_text = self._get_quoted_tweet_text(tweet, referenced_tweets_map)
if quoted_text:
raw_content += f"\n\n[Quoted: {quoted_text}]"

thumbnail_url = self._get_thumbnail_url(tweet, media_map, profile_pic)

return ContentData(
external_id=tweet_id,
Expand All @@ -118,19 +205,51 @@ def _parse_tweet(self, tweet: dict[str, object], skip_content: bool = False) ->
author=author_name,
published_at=published_at,
raw_content=raw_content,
thumbnail_url=str(profile_pic) if profile_pic else None,
thumbnail_url=thumbnail_url,
)

def _get_quoted_tweet_text(
self,
tweet: dict[str, Any],
referenced_tweets_map: dict[str, dict[str, Any]],
) -> str | None:
refs: list[dict[str, Any]] = tweet.get("referenced_tweets", [])
for ref in refs:
if ref.get("type") == "quoted":
ref_id = str(ref["id"])
quoted = referenced_tweets_map.get(ref_id)
if quoted:
return str(quoted.get("text", ""))
return None

def _get_thumbnail_url(
self,
tweet: dict[str, Any],
media_map: dict[str, dict[str, Any]],
profile_pic: str | None,
) -> str | None:
attachments: dict[str, Any] = tweet.get("attachments", {})
media_keys: list[str] = attachments.get("media_keys", [])

for key in media_keys:
media = media_map.get(key, {})
media_url = media.get("url") or media.get("preview_image_url")
if media_url:
return str(media_url)

return str(profile_pic) if profile_pic else None

def _make_title(self, text: str) -> str:
first_line = text.split("\n")[0]
if len(first_line) <= TITLE_MAX_LENGTH:
return first_line
return first_line[: TITLE_MAX_LENGTH - 3] + "..."

def _parse_twitter_date(self, date_str: str | None) -> datetime:
def _parse_iso_date(self, date_str: Any) -> datetime:
if not date_str:
return datetime.now(UTC)
try:
return datetime.strptime(date_str, TWITTER_DATE_FORMAT)
s = str(date_str).replace("Z", "+00:00")
return datetime.fromisoformat(s)
except (ValueError, TypeError):
return datetime.now(UTC)
Loading
Loading