diff --git a/.gitignore b/.gitignore index c75922a..dd76ec0 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,5 @@ cache .pytest_cache dist *.egg-info/ -scraper_data/ \ No newline at end of file +scraper_data/ +*.txt \ No newline at end of file diff --git a/src/gentrade/news/factory.py b/src/gentrade/news/factory.py index 233c62f..1dfa8eb 100644 --- a/src/gentrade/news/factory.py +++ b/src/gentrade/news/factory.py @@ -14,7 +14,7 @@ from gentrade.scraper.extractor import ArticleContentExtractor -from gentrade.news.meta import NewsProviderBase, NewsDatabase +from gentrade.news.meta import NewsProviderBase, NewsDatabase, NewsFileDatabase from gentrade.news.newsapi import NewsApiProvider from gentrade.news.rss import RssProvider from gentrade.news.finnhub import FinnhubNewsProvider @@ -72,7 +72,6 @@ def create_provider(provider_type: str, **kwargs) -> NewsProviderBase: return provider_class(**kwargs) - class NewsAggregator: """Aggregates news articles from multiple providers and synchronizes them to a database. @@ -92,7 +91,7 @@ def __init__(self, providers: List[NewsProviderBase], db: NewsDatabase): self.db_lock = threading.Lock() def _fetch_thread(self, provider, aggregator, ticker, category, - max_hour_interval, max_count, is_process=True): + max_hour_interval, max_count, is_process=False): if ticker: news = provider.fetch_stock_news( ticker, category, max_hour_interval, max_count @@ -115,7 +114,6 @@ def _fetch_thread(self, provider, aggregator, ticker, category, item.summary = ace.clean_html(item.summary) if is_process: item.content = ace.extract_content(item.url) - logger.info(item.content) with aggregator.db_lock: aggregator.db.add_news(news) @@ -147,6 +145,9 @@ def sync_news( threads = [] for provider in self.providers: + if not provider.is_available: + continue + thread = threading.Thread( target=self._fetch_thread, args=(provider, self, ticker, category, max_hour_interval, max_count) @@ -158,10 +159,11 @@ def sync_news( thread.join() self.db.last_sync = current_time + self.db.save() logger.info("News sync completed.") if __name__ == "__main__": - db = NewsDatabase() + db = NewsFileDatabase("news_db.txt") try: # Initialize providers using the factory @@ -170,7 +172,8 @@ def sync_news( rss_provider = NewsFactory.create_provider("rss") # Create aggregator with selected providers - aggregator = NewsAggregator(providers=[rss_provider], db=db) + aggregator = NewsAggregator( + providers=[rss_provider, newsapi_provider, finnhub_provider], db=db) # Sync market news and stock-specific news aggregator.sync_news(category="business", max_hour_interval=64, max_count=10) @@ -185,16 +188,7 @@ def sync_news( all_news = db.get_all_news() logger.info(f"Total articles in database: {len(all_news)}") - if all_news: - logger.info("Example article:") - logger.info(all_news[0].to_dict()) - - for news_item in all_news: - logger.info("--------------------------------") - print(news_item.headline) - print(news_item.url) - print(news_item.content) - logger.info("--------------------------------") - + for news_item in all_news: + logger.info("[%s...]: %s..." % (str(news_item.id)[:10], news_item.headline[:15])) except ValueError as e: logger.error(f"Error during news aggregation: {e}") diff --git a/src/gentrade/news/finnhub.py b/src/gentrade/news/finnhub.py index beb1d53..be1f8dd 100644 --- a/src/gentrade/news/finnhub.py +++ b/src/gentrade/news/finnhub.py @@ -4,7 +4,7 @@ utilizing the Finnhub.io API to retrieve news articles. It supports both general market news and news specific to individual stock tickers, with filtering by time interval and article count. """ - +import os import time from typing import List from datetime import datetime, timedelta @@ -21,19 +21,23 @@ class FinnhubNewsProvider(NewsProviderBase): intervals and maximum article count. """ - def __init__(self, api_key: str): + def __init__(self, api_key: str = None): """Initialize the FinnhubNewsProvider with the required API key. Args: api_key: API key for authenticating requests to Finnhub.io. """ - self.api_key = api_key + self.api_key = ( api_key or os.getenv("FINNHUB_API_KEY") ) self.base_url = "https://finnhub.io/api/v1" @property - def market(self): + def market(self) -> str: return 'us' + @property + def is_available(self) -> bool: + return self.api_key is not None and len(self.api_key) != 0 + def fetch_latest_market_news( self, category: str = "business", @@ -75,7 +79,7 @@ def fetch_latest_market_news( headline=article.get("headline", ""), id=self.url_to_hash_id(article.get("url", "")), image=article.get("image", ""), - related=article.get("related", ""), + related=article.get("related", []), source=article.get("source", ""), summary=article.get("summary", ""), url=article.get("url", ""), @@ -85,7 +89,7 @@ def fetch_latest_market_news( ) for article in articles ] - return self._filter_news(news_list, max_hour_interval, max_count) + return self.filter_news(news_list, max_hour_interval, max_count) except requests.RequestException as e: logger.debug(f"Error fetching market news from Finnhub: {e}") @@ -134,7 +138,7 @@ def fetch_stock_news( headline=article.get("headline", ""), id=article.get("id", hash(article.get("url", ""))), image=article.get("image", ""), - related=ticker, + related=[ticker,], source=article.get("source", ""), summary=article.get("summary", ""), url=article.get("url", ""), @@ -144,7 +148,7 @@ def fetch_stock_news( ) for article in articles ] - return self._filter_news(news_list, max_hour_interval, max_count) + return self.filter_news(news_list, max_hour_interval, max_count) except requests.RequestException as e: logger.debug(f"Error fetching stock news from Finnhub: {e}") diff --git a/src/gentrade/news/meta.py b/src/gentrade/news/meta.py index dd19ca3..5393e0b 100644 --- a/src/gentrade/news/meta.py +++ b/src/gentrade/news/meta.py @@ -7,7 +7,8 @@ Supports fetching market-wide and stock-specific news, with filtering by time and count. """ - +import os +import json import abc import time import hashlib @@ -29,7 +30,7 @@ class NewsInfo: headline: str id: int image: str - related: str # Related stock ticker(s) or empty string + related: list[str] # Related stock ticker(s) or empty list source: str summary: str url: str @@ -79,7 +80,6 @@ def fetch_article_html(self) -> Optional[str]: logger.debug(f"Failed to fetch HTML for {self.url}: {e}") return None - class NewsProviderBase(metaclass=abc.ABCMeta): """Abstract base class defining the interface for news providers. @@ -87,9 +87,29 @@ class NewsProviderBase(metaclass=abc.ABCMeta): """ @property - def market(self): + def market(self) -> str: + """Get the market identifier this provider is associated with. + + Defaults to 'common' for providers that cover general markets. + Concrete providers may override this to specify a specific market (e.g., 'us', 'cn'). + + Returns: + str: Market identifier string. + """ return 'common' + @property + def is_available(self) -> bool: + """Check if the news provider is currently available/operational. + + Defaults to True. Concrete providers may override this to implement + availability checks (e.g., API status, rate limits, connectivity). + + Returns: + bool: True if provider is available, False otherwise. + """ + return True + @abc.abstractmethod def fetch_latest_market_news( self, @@ -109,7 +129,6 @@ def fetch_latest_market_news( """ raise NotImplementedError - @abc.abstractmethod def fetch_stock_news( self, ticker: str, @@ -128,7 +147,27 @@ def fetch_stock_news( Returns: List of NewsInfo objects related to the specified ticker. """ - raise NotImplementedError + # Fetch 2x max_count general news to allow ticker filtering + general_news = self.fetch_latest_market_news( + category=category, + max_hour_interval=max_hour_interval, + max_count=max_count * 2 + ) + + # Filter articles where ticker is in headline or summary (case-insensitive) + ticker_lower = ticker.lower() + ticker_news = [ + news for news in general_news + if ticker_lower in news.headline.lower() + or ticker_lower in news.summary.lower() + ] + + # Update "related" field to link articles to the target ticker + for news in ticker_news: + news.related.append(ticker) + + # Limit to max_count results + return ticker_news[:max_count] def _timestamp_to_epoch(self, timestamp: str) -> int: """Convert ISO 8601 timestamp to epoch seconds. @@ -146,7 +185,7 @@ def _timestamp_to_epoch(self, timestamp: str) -> int: except ValueError: return int(time.time()) - def _filter_news( + def filter_news( self, news_list: List[NewsInfo], max_hour_interval: int, @@ -184,8 +223,8 @@ class NewsDatabase: def __init__(self): """Initialize an empty database with last sync time set to 0.""" - self.news_dict: Dict[str, NewsInfo] = {} # Key: article URL - self.last_sync: float = 0.0 # Epoch time of last successful sync + self.news_list: List[NewsInfo] = [] + self.last_sync = 0 def add_news(self, news_list: List[NewsInfo]) -> None: """Add news articles to the database, skipping duplicates. @@ -193,10 +232,12 @@ def add_news(self, news_list: List[NewsInfo]) -> None: Args: news_list: List of NewsInfo objects to store. """ + news_hash_cache_list = [item.id for item in self.news_list] for news in news_list: - # Use URL as unique identifier to avoid duplicates - if news.url and news.url not in self.news_dict: - self.news_dict[news.url] = news + if news.id in news_hash_cache_list: + logger.error("news %s already in the cache list" % news.id) + continue + self.news_list.append(news) def get_all_news(self) -> List[NewsInfo]: """Retrieve all stored news articles. @@ -204,7 +245,7 @@ def get_all_news(self) -> List[NewsInfo]: Returns: List of all NewsInfo objects in the database. """ - return list(self.news_dict.values()) + return self.news_list def get_market_news(self, market='us') -> List[NewsInfo]: """Retrieve stored news articles for given market. @@ -217,7 +258,32 @@ def get_market_news(self, market='us') -> List[NewsInfo]: """ assert market in NEWS_MARKET market_news = [] - for item in self.news_dict.values(): + for item in self.news_list: if item.market == market: market_news.append(item) return market_news + + +class NewsFileDatabase(NewsDatabase): + + def __init__(self, filepath): + super().__init__() + self._filepath = filepath + if os.path.exists(self._filepath): + self.load() + + def save(self): + news_dicts = [news.to_dict() for news in self.news_list] + content = { + "last_sync": self.last_sync, + "news_list": news_dicts + } + with open(self._filepath, 'w', encoding='utf-8') as f: + json.dump(content, f, indent=4) # indent for readability + + def load(self): + with open(self._filepath, 'r', encoding='utf-8') as f: + content = json.load(f) # Directly loads JSON content into a Python list/dict + self.last_sync = content['last_sync'] + self.news_list = [NewsInfo(**item_dict) for item_dict in content['news_list']] + logger.info(self.news_list) diff --git a/src/gentrade/news/newsapi.py b/src/gentrade/news/newsapi.py index 53d0a40..a1dc262 100644 --- a/src/gentrade/news/newsapi.py +++ b/src/gentrade/news/newsapi.py @@ -4,7 +4,7 @@ and stock-specific news via the NewsAPI.org API. It supports filtering by time interval, article count, and language, while formatting results into standardized NewsInfo objects. """ - +import os from typing import List from datetime import datetime, timedelta import requests @@ -20,19 +20,23 @@ class NewsApiProvider(NewsProviderBase): and stock-specific news (using ticker symbols). """ - def __init__(self, api_key: str): + def __init__(self, api_key: str = None): """Initialize the NewsApiProvider with a NewsAPI.org API key. Args: api_key: API key for authenticating requests to NewsAPI.org. """ - self.api_key = api_key + self.api_key = ( api_key or os.getenv("NEWSAPI_API_KEY") ) self.base_url = "https://newsapi.org/v2/everything" # Core endpoint for news retrieval @property - def market(self): + def market(self) -> str: return 'us' + @property + def is_available(self) -> bool: + return self.api_key is not None and len(self.api_key) != 0 + def fetch_latest_market_news( self, category: str = "business", @@ -78,7 +82,7 @@ def fetch_latest_market_news( headline=article.get("title", ""), id=self.url_to_hash_id(article.get("url", "")), image=article.get("urlToImage", ""), # Article thumbnail (if available) - related="", # No stock ticker for general market news + related=[], # No stock ticker for general market news source=article.get("source", {}).get("name", ""), # News source name summary=article.get("description", ""), # Short article preview url=article.get("url", ""), # Direct article URL @@ -89,7 +93,7 @@ def fetch_latest_market_news( for article in articles ] - return self._filter_news(news_list, max_hour_interval, max_count) + return self.filter_news(news_list, max_hour_interval, max_count) except requests.RequestException as e: logger.debug(f"Failed to fetch market news from NewsAPI.org: {e}") @@ -138,14 +142,16 @@ def fetch_stock_news( articles = response.json().get("articles", []) # Convert API response to standardized NewsInfo objects - news_list = [ - NewsInfo( + news_list = [] + for article in articles: + assert article.get("url", "") != "" + ni = NewsInfo( category=category, datetime=self._timestamp_to_epoch(article.get("publishedAt", "")), headline=article.get("title", ""), id=hash(article.get("url", "")), image=article.get("urlToImage", ""), - related=ticker, # Associate with target stock ticker + related=[ticker,], # Associate with target stock ticker source=article.get("source", {}).get("name", ""), summary=article.get("description", ""), url=article.get("url", ""), @@ -153,10 +159,8 @@ def fetch_stock_news( provider='newsapi', market='us' ) - for article in articles - ] - - return self._filter_news(news_list, max_hour_interval, max_count) + news_list.append(ni) + return self.filter_news(news_list, max_hour_interval, max_count) except requests.RequestException as e: logger.debug(f"Failed to fetch {ticker} stock news from NewsAPI.org: {e}") diff --git a/src/gentrade/news/rss.py b/src/gentrade/news/rss.py index 5fcbb9d..c4557d6 100644 --- a/src/gentrade/news/rss.py +++ b/src/gentrade/news/rss.py @@ -15,7 +15,6 @@ from gentrade.news.meta import NewsInfo, NewsProviderBase - class RssProvider(NewsProviderBase): """News provider that fetches news from RSS/ATOM feeds. @@ -37,7 +36,15 @@ def __init__(self, feed_url: str = None, market: str = "common"): or os.getenv("RSS_FEED_URL") or "https://plink.anyfeeder.com/chinadaily/caijing" ) - self.market = market + self._market = market + + @property + def market(self) -> str: + return self._market + + @property + def is_available(self) -> bool: + return self.feed_url is not None def fetch_latest_market_news( self, @@ -93,7 +100,7 @@ def fetch_latest_market_news( # Extract image URL (handles missing media_content gracefully) image=entry.get("media_content", [{}])[0].get("url", "") if entry.get("media_content") else "", - related="", # No ticker for general market news + related=[], # No ticker for general market news source=feed.feed.get("title", "Unknown RSS Feed"), # Feed source name summary=entry.get("summary", ""), # Short article preview url=entry.get("link", ""), # Direct article URL @@ -106,7 +113,7 @@ def fetch_latest_market_news( ] # Filter by recency and limit to max_count - return self._filter_news(news_list, max_hour_interval, max_count) + return self.filter_news(news_list, max_hour_interval, max_count) except requests.HTTPError as e: logger.error( @@ -120,48 +127,3 @@ def fetch_latest_market_news( except Exception as e: logger.error(f"Unexpected error parsing RSS feed {self.feed_url}: {str(e)}") return [] - - def fetch_stock_news( - self, - ticker: str, - category: str = "business", - max_hour_interval: int = 24, - max_count: int = 10 - ) -> List[NewsInfo]: - """Fetch stock-specific news by filtering RSS feed results for the target ticker. - - Note: Most RSS feeds don't support native ticker filtering. This method fetches - general market news first, then filters entries where the ticker appears in the - headline or summary (case-insensitive). - - Args: - ticker: Stock ticker symbol (e.g., "AAPL") to filter for. - category: Category label to assign to fetched news (default: "business"). - max_hour_interval: Maximum age (in hours) of articles to include (default: 24). - max_count: Maximum number of articles to return (default: 10). - - Returns: - List of NewsInfo objects with ticker-matching news; empty list if no matches - or feed fetching fails. - """ - # Fetch 2x max_count general news to allow ticker filtering - general_news = self.fetch_latest_market_news( - category=category, - max_hour_interval=max_hour_interval, - max_count=max_count * 2 - ) - - # Filter articles where ticker is in headline or summary (case-insensitive) - ticker_lower = ticker.lower() - ticker_news = [ - news for news in general_news - if ticker_lower in news.headline.lower() - or ticker_lower in news.summary.lower() - ] - - # Update "related" field to link articles to the target ticker - for news in ticker_news: - news.related = ticker - - # Limit to max_count results - return ticker_news[:max_count]