diff --git a/README.md b/README.md index fbbed4b..e098c0b 100644 --- a/README.md +++ b/README.md @@ -1,42 +1,24 @@ # FreshProxy -A **Flask**-based proxy for [FreshRSS](https://github.com/FreshRSS/FreshRSS) that securely forwards specific API requests, eliminating the need for dynamic query parameters. Configurable via environment variables (or a `.env` file). +A Flask-based proxy for [FreshRSS](https://github.com/FreshRSS/FreshRSS) that securely forwards API requests, eliminating the need to expose raw FreshRSS endpoints directly. Configurable via environment variables (or a `.env` file). ## Overview -**FreshProxy** acts as a dedicated **HTTP proxy** for specific **FreshRSS** endpoints, enhancing security and simplifying request structures. By using dedicated proxy endpoints, you eliminate the need for dynamic query parameters, reducing potential attack vectors and improving clarity. +FreshProxy acts as a dedicated HTTP proxy for your FreshRSS instance, enhancing security and simplifying request structures. By using a single proxy endpoint (`/digest`), you avoid having to expose or directly query each feed or subscription list from the client. ## Features -- **Dedicated Proxy Endpoints**: - - `/subscriptions` -> `subscription/list`. - - `/feed/` -> `stream/contents/feed/`. -- **CORS** restrictions to only allow certain origins. +- **Single Aggregator Endpoint**: + - `GET /digest`: Returns a globally-sorted list of recent feed items from your FreshRSS instance. + - Optional query parameters: + - `label=`: Filter feeds by label. + - `n=`: Number of items to fetch per feed (defaults to 1). + - `page= & limit=`: For item-level pagination (defaults: page=1, limit=50). +- **CORS** restrictions, allowing only whitelisted origins. - **Timeout** and error handling for upstream requests. - **Environment-based configuration** (via `.env` or standard env vars). - **Docker Support** for easy deployment. -## Project Structure - -```text -freshproxy/ -├── freshproxy/ -│   ├── __init__.py # Makes 'freshproxy' a package -│   ├── app.py # Application factory & CORS setup -│   ├── config.py # Environment variables, whitelists -│   └── proxy_routes.py # Blueprint with the '/' GET route -├── tests/ -│   ├── test_config.py # Example environment var tests -│   └── test_proxy.py # Proxy route tests (mocking requests) -├── requirements.txt # Dependencies (Flask, requests, etc.) -├── pyproject.toml # Project metadata & optional deps -├── run.py # Dev entry point -├── Dockerfile # Container-based deployment -├── .env.example # Example environment variables (no secrets) -├── .gitignore -└── README.md -``` - ## Installation 1. Clone the repository: @@ -74,7 +56,6 @@ FRESHPROXY_DEBUG=False FRESHPROXY_REQUEST_TIMEOUT=10 ``` - ### Environment Variables - `FRESHRSS_API_TOKEN`: Secret token used to authenticate with your FreshRSS instance. diff --git a/freshproxy/proxy_routes.py b/freshproxy/proxy_routes.py index ea4849d..a13e158 100644 --- a/freshproxy/proxy_routes.py +++ b/freshproxy/proxy_routes.py @@ -1,17 +1,90 @@ import logging +import time import re import requests from typing import Union, Tuple from flask import Blueprint, request, jsonify, Response +from concurrent.futures import ThreadPoolExecutor, as_completed from freshproxy.config import AUTH_TOKEN, BASE_URL, ALLOWED_ENDPOINTS, REQUEST_TIMEOUT logger = logging.getLogger(__name__) - proxy_bp = Blueprint("proxy_bp", __name__) +AGGREGATOR_CACHE = {} +CACHE_TTL_SECONDS = 300 + + +def get_cache_key(label, n): + """ + Create a unique cache key for aggregator queries. + """ + return f"digest|{label}|{n}" + + +def set_cache_value(cache_key, value): + """ + Store a (timestamp, data) tuple in the global aggregator cache. + """ + AGGREGATOR_CACHE[cache_key] = (time.time(), value) + + +def get_cache_value(cache_key): + """ + Retrieve the cached value if it's not expired; otherwise return None. + """ + cache_item = AGGREGATOR_CACHE.get(cache_key) + if not cache_item: + return None + cached_time, data = cache_item + if time.time() - cached_time > CACHE_TTL_SECONDS: + # expired + AGGREGATOR_CACHE.pop(cache_key, None) + return None + return data + + +def fetch_feed_posts(feed_id, n=1, retry_attempts=2): + """ + Fetch up to 'n' latest posts for a single feed, with retry logic. + Returns a dict with feed info or an error message. + """ + feed_endpoint = ALLOWED_ENDPOINTS.get("feed", "stream/contents/feed") + actual_id = feed_id + if actual_id.startswith("feed/"): + actual_id = actual_id[len("feed/") :] + + feed_url = f"{BASE_URL}/{feed_endpoint}/{actual_id}" + headers = {"Authorization": f"GoogleLogin auth={AUTH_TOKEN}"} + params = {"n": n} + + for attempt in range(retry_attempts + 1): + try: + resp = requests.get(feed_url, headers=headers, params=params, timeout=REQUEST_TIMEOUT) + resp.raise_for_status() + + data = resp.json() + items = data.get("items", []) + return items + + except requests.Timeout: + logger.warning(f"Timeout fetching feed_id={feed_id}, attempt={attempt}") + if attempt == retry_attempts: + return {"error": "Timeout after retries"} + except requests.RequestException as e: + logger.warning(f"Request error fetching feed_id={feed_id}, attempt={attempt}: {e}") + if attempt == retry_attempts: + return {"error": str(e)} + except ValueError as e: + logger.warning(f"JSON decode error feed_id={feed_id}, attempt={attempt}: {e}") + if attempt == retry_attempts: + return {"error": f"JSON decode error: {e}"} + + return {"error": "Unknown fetch error"} + + def proxy_request(endpoint: str, params: dict) -> Union[Response, Tuple[Response, int]]: """ Helper function to proxy requests to FrehsRSS. @@ -60,49 +133,123 @@ def is_valid_feed_id(feed_id: str) -> bool: Returns: bool: True if valid, False otherwise. """ + if feed_id.startswith("feed/"): + feed_id = feed_id[len("feed/") :] return re.fullmatch(r"\d+", feed_id) is not None -@proxy_bp.route("/subscriptions", methods=["GET"]) -def get_subscriptions() -> Union[Response, Tuple[Response, int]]: - """ - Proxy endpoint for /subscriptions -> FreshRSS subscription/list - - Returns: - Union[Response, Tuple[Response, int]]: JSON response or error message with status code. - """ - endpoint = ALLOWED_ENDPOINTS.get("subscriptions") - if not endpoint: - logger.error("FreshRSS endpoint for 'subscriptions' not configured.") - return jsonify({"error": "Internal server error"}), 500 - - params = request.args.to_dict() - params.update({"output": "json"}) - - return proxy_request(endpoint, params) - - -@proxy_bp.route("/feed/", methods=["GET"]) -def get_feed_contents(feed_id: str) -> Union[Response, Tuple[Response, int]]: +@proxy_bp.route("/digest", methods=["GET"]) +def get_digest(): """ - Proxy endpoint for /feed/ -> FreshRSS stream/contents/feed/ - - Args: - feed_id (str): The ID of the feed to retrieve contents for. + Return a sorted list of the latest items across all feeds (optionally filtered by label). - Returns: - Union[Response, Tuple[Response, int]]: JSON response or error message with status code. + Query params: + - label: Filter feeds by this label (optional) + - n: Number of items to fetch per feed (default=1) + - page: 1-based index of which "items page" to return (default=1) + - limit: How many items per page (default=50) """ - if not is_valid_feed_id(feed_id): - logger.warning(f"Invalid feed_id format received: {feed_id}") - return jsonify({"error": "Invalid feed_id format"}), 400 - - base_endpoint = ALLOWED_ENDPOINTS.get("feed") - if not base_endpoint: - logger.error("FreshRSS base endpoint for 'feed' not configured.") - return jsonify({"error": "Internal server error"}), 500 - - endpoint = f"{base_endpoint}/{feed_id}" - params = request.args.to_dict() - - return proxy_request(endpoint, params) + label = request.args.get("label", "") + page = int(request.args.get("page", 1)) + limit = int(request.args.get("limit", 50)) + n = int(request.args.get("n", 1)) + + cache_key = get_cache_key(label, n) + + cached_data = get_cache_value(cache_key) + if cached_data is not None: + logger.info(f"Using cached flattened data for cache_key={cache_key}") + all_items = cached_data + else: + logger.info(f"Cache miss for cache_key={cache_key}. Fetching from FreshRSS.") + + # 1) Fetch the subscriptions from FreshRSS + subscriptions_endpoint = ALLOWED_ENDPOINTS.get("subscriptions", "subscription/list") + subscriptions_url = f"{BASE_URL}/{subscriptions_endpoint}" + headers = {"Authorization": f"GoogleLogin auth={AUTH_TOKEN}"} + sub_params = {"output": "json"} # FreshRSS expects 'output=json' for JSON responses + + try: + sub_resp = requests.get( + subscriptions_url, headers=headers, params=sub_params, timeout=REQUEST_TIMEOUT + ) + sub_resp.raise_for_status() + subscriptions_data = sub_resp.json() + except requests.RequestException as e: + logger.error(f"Failed to fetch subscriptions: {e}") + return jsonify({"error": "Failed to fetch subscriptions", "details": str(e)}), 502 + except ValueError as e: + logger.error(f"JSON decode error in subscriptions: {e}") + return ( + jsonify({"error": "Failed to decode JSON (subscriptions)", "details": str(e)}), + 500, + ) + + # 2) Filter the subscriptions by label if specified + all_feeds = subscriptions_data.get("subscriptions", []) + if label: + all_feeds = [ + feed + for feed in all_feeds + if any(cat.get("label") == label for cat in feed.get("categories", [])) + ] + logger.info(f"Found {len(all_feeds)} feeds after label filtering.") + + # 3) Flatten items from each feed into a single list + all_items = [] + + def process_feed(feed): + feed_id = feed.get("id") + if not feed_id: + logger.warning(f"Skipping feed with no id: {feed}") + return [] + + if not is_valid_feed_id(feed_id): + logger.warning(f"Skipping feed with invalid id: {feed_id}") + return [] + + items = fetch_feed_posts(feed_id, n=n, retry_attempts=2) + # If fetch_feed_posts returns a dict with "error", handle gracefully + if isinstance(items, dict) and "error" in items: + logger.warning(f"Error while fetching feed {feed_id}: {items['error']}") + return [] + + if not isinstance(items, list): + logger.warning( + f"Expected list of items, got {type(items)}. Skipping feed {feed_id}" + ) + return [] + + for item in items: + item["feedId"] = feed_id + item["feedTitle"] = feed.get("title", "") + item["feedHtmlUrl"] = feed.get("htmlUrl") + item["feedIconUrl"] = feed.get("iconUrl") + return items + + max_workers = min(10, len(all_feeds)) # limit concurrency + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_feed = {executor.submit(process_feed, f): f for f in all_feeds} + for future in as_completed(future_to_feed): + feed_items = future.result() + all_items.extend(feed_items) + + # 4) Sort the flattened list by `published` descending + all_items.sort(key=lambda x: x.get("published", 0), reverse=True) + + # 5) Store in cache for future requests + set_cache_value(cache_key, all_items) + + # 6) Pagination: slice the all_items list + offset = max(0, (page - 1) * limit) + paginated_items = all_items[offset : offset + limit] + + # 7) Construct the response + response_data = { + "items": paginated_items, + "page": page, + "limit": limit, + "totalItems": len(all_items), + } + + return jsonify(response_data) diff --git a/tests/test_proxy.py b/tests/test_proxy.py index e91a8d3..07f2d35 100644 --- a/tests/test_proxy.py +++ b/tests/test_proxy.py @@ -3,7 +3,7 @@ from unittest.mock import patch, MagicMock from freshproxy.app import create_app -from freshproxy.config import AUTH_TOKEN, REQUEST_TIMEOUT +from freshproxy.proxy_routes import AGGREGATOR_CACHE @pytest.fixture @@ -18,7 +18,7 @@ def client(monkeypatch): "http://localhost:3000,https://test.com,https://proxy.example.com", ) monkeypatch.setenv("FRESHPROXY_DEBUG", "True") - monkeypatch.setenv("FRESHPROXY_REQUEST_TIMEOUT", 5) + monkeypatch.setenv("FRESHPROXY_REQUEST_TIMEOUT", "5") app = create_app() app.testing = True @@ -26,6 +26,17 @@ def client(monkeypatch): yield client +@pytest.fixture(autouse=True) +def clear_aggregator_cache(): + """ + This fixture runs automatically before each test, ensuring + the aggregator cache is empty so tests don't interfere with each other. + """ + AGGREGATOR_CACHE.clear() + yield + AGGREGATOR_CACHE.clear() + + @pytest.fixture def mock_requests_get(): """ @@ -58,103 +69,307 @@ def test_unsupported_endpoint(client): assert response.status_code == 404 -def test_valid_subscriptions(mock_requests_get, proxy_mock_response, client): +def test_aggregated_digest(client, mock_requests_get): """ - Test the /subscriptions endpoint. + Test the /digest aggregator route, ensuring we get a flat list of items + sorted by 'published' descending, along with pagination metadata. """ - mock_requests_get.return_value = proxy_mock_response({"subscriptions": ["Feed1", "Feed2"]}) - - response = client.get("/subscriptions") + subscription_response = MagicMock() + subscription_response.ok = True + subscription_response.json.return_value = { + "subscriptions": [ + { + "id": "feed/1", + "title": "Feed 1", + "htmlUrl": "https://feed1-url", + "iconUrl": "https://icon1-url", + "categories": [{"label": "favs"}], + }, + { + "id": "feed/2", + "title": "Feed 2", + "htmlUrl": "https://feed2-url", + "iconUrl": "https://icon2-url", + "categories": [], + }, + ] + } + + feed1_response = MagicMock() + feed1_response.ok = True + feed1_response.json.return_value = { + "items": [ + { + "title": "Feed1 Post Title", + "published": 1697000000, + "alternate": [{"href": "https://feed1-post-url"}], + } + ] + } + feed2_response = MagicMock() + feed2_response.ok = True + feed2_response.json.return_value = { + "items": [ + { + "title": "Feed2 Post Title", + "published": 1697100000, + "alternate": [{"href": "https://feed2-post-url"}], + } + ] + } + + mock_requests_get.side_effect = [ + subscription_response, + feed1_response, + feed2_response, + ] + + response = client.get("/digest?n=1&page=1&limit=2") assert response.status_code == 200 data = response.get_json() - assert "subscriptions" in data - assert data["subscriptions"] == ["Feed1", "Feed2"] - - mock_requests_get.assert_called_once() - _, kwargs = mock_requests_get.call_args - assert "headers" in kwargs - assert kwargs["headers"] == {"Authorization": f"GoogleLogin auth={AUTH_TOKEN}"} - assert kwargs["params"] == {"output": "json"} - assert kwargs["timeout"] == REQUEST_TIMEOUT + assert "items" in data, "Response must contain 'items' key" + assert len(data["items"]) == 2 + assert data["items"][0]["title"] == "Feed2 Post Title" + assert data["items"][0]["feedId"] == "feed/2" + assert data["items"][1]["title"] == "Feed1 Post Title" + assert data["items"][1]["feedId"] == "feed/1" -def test_valid_feed_contents(mock_requests_get, proxy_mock_response, client): - """ - Test the /feed/ endpoint with query parameters. - """ - mock_requests_get.return_value = proxy_mock_response({"feed": ["Feed1", "Feed2"]}) - - feed_id = "40" - query_param = {"n": "1"} - - response = client.get(f"/feed/{feed_id}", query_string=query_param) - - assert response.status_code == 200 - data = response.get_json() - assert "feed" in data - assert data["feed"] == ["Feed1", "Feed2"] + assert data["page"] == 1 + assert data["limit"] == 2 + assert data["totalItems"] == 2 - mock_requests_get.assert_called_once() - _, kwargs = mock_requests_get.call_args - assert "headers" in kwargs - assert kwargs["params"] == query_param - assert kwargs["timeout"] == REQUEST_TIMEOUT + item0 = data["items"][0] + assert item0["feedTitle"] == "Feed 2" + assert item0["feedHtmlUrl"] == "https://feed2-url" + assert item0["feedIconUrl"] == "https://icon2-url" -def test_timeout_subscriptions(mock_requests_get, client): +def test_aggreated_error_handling(client, mock_requests_get): """ - Test that a timeout in requests.get leads to a 504 response for /subscriptions. + Test aggregator handling a feed fetch error with retries. + Ensures that if one feed fails, we still get items from the other feeds, + or handle partial data gracefully. """ - mock_requests_get.side_effect = requests.Timeout() + subscription_response = MagicMock() + subscription_response.ok = True + subscription_response.json.return_value = { + "subscriptions": [ + {"id": "feed/1", "title": "Feed 1"}, + ] + } + + fail_response = MagicMock() + fail_response.raise_for_status.side_effect = requests.RequestException("500 Server Error") + + success_response = MagicMock() + success_response.ok = True + success_response.json.return_value = { + "items": [ + {"title": "Feed1 Post Title", "published": 1697000000}, + ] + } + + mock_requests_get.side_effect = [ + subscription_response, + fail_response, + success_response, + ] + + response = client.get("/digest?n=1") + assert response.status_code == 200 - response = client.get("/subscriptions") - assert response.status_code == 504 - assert "timed out" in response.get_json()["error"] + data = response.get_json() + assert "items" in data + assert len(data["items"]) == 1 + item = data["items"][0] + assert item["title"] == "Feed1 Post Title" + assert item["feedId"] == "feed/1" -def test_json_decode_error_subscriptions(mock_requests_get, client): - """ - Test that a JSON decode error leads to a 500 response for /subscriptions. - """ - mock_response = MagicMock() - mock_response.json.side_effect = ValueError("Bad JSON format") - mock_requests_get.return_value = mock_response + assert data["page"] == 1 + assert data["limit"] == 50 + assert data["totalItems"] == 1 - response = client.get("/subscriptions") - assert response.status_code == 500 - body = response.get_json() - assert "Failed to decode JSON response" in body["error"] + calls = mock_requests_get.call_args_list + assert len(calls) == 3 -def test_subscriptions_accepts_query_params(mock_requests_get, proxy_mock_response, client): +def test_invalid_feed_id(client, mock_requests_get): """ - Test that the /subscriptions endpoint accepts and correctly forwards query parameters. + Test that an invalid feed ID in the subscription list is handled gracefully + by the aggregator, e.g., skipped or logged as a warning without crashing. """ - mock_requests_get.return_value = proxy_mock_response({"subscriptions": ["Feed1", "Feed2"]}) - - response = client.get("/subscriptions?output=json") + subscription_response = MagicMock() + subscription_response.ok = True + subscription_response.json.return_value = { + "subscriptions": [ + { + "id": "feed/1", + "title": "Valid Feed", + "htmlUrl": "https://valid-feed-url", + "iconUrl": "https://valid-feed-icon", + }, + { + "id": "feed/abc", # This is the invalid one + "title": "Invalid Feed", + "htmlUrl": "https://invalid-feed-url", + "iconUrl": "https://invalid-feed-icon", + }, + ] + } + + valid_feed_response = MagicMock() + valid_feed_response.ok = True + valid_feed_response.json.return_value = { + "items": [ + { + "title": "Valid Feed Post", + "published": 1697000000, + } + ] + } + + mock_requests_get.side_effect = [ + subscription_response, + valid_feed_response, + ] + + response = client.get("/digest?n=1") assert response.status_code == 200 data = response.get_json() - assert "subscriptions" in data + assert "items" in data + assert len(data["items"]) == 1 - mock_requests_get.assert_called_once() - _, kwargs = mock_requests_get.call_args - assert "headers" in kwargs - assert kwargs["params"] == {"output": "json"} - assert kwargs["timeout"] == REQUEST_TIMEOUT + item = data["items"][0] + assert item["title"] == "Valid Feed Post" + assert item["feedId"] == "feed/1" + assert data["totalItems"] == 1 -def test_invalid_feed_id(mock_requests_get, client): - """ - Test that an invalid feed_id format returns a 400 Bad Request. - """ - invalid_feed_id = "invalid123" + calls = mock_requests_get.call_args_list + assert len(calls) == 2 - response = client.get(f"/feed/{invalid_feed_id}") - assert response.status_code == 400 - body = response.get_json() - assert "Invalid feed_id format" in body["error"] - mock_requests_get.assert_not_called() +def test_pagination_in_digest(client, mock_requests_get): + """ + Test that pagination (page/limit) is properly applied to the global + sorted list of items. + """ + subscription_response = MagicMock() + subscription_response.ok = True + subscription_response.json.return_value = { + "subscriptions": [ + {"id": "feed/1", "title": "Feed 1"}, + {"id": "feed/2", "title": "Feed 2"}, + ] + } + + feed1_response = MagicMock() + feed1_response.ok = True + feed1_response.json.return_value = { + "items": [ + { + "title": "Feed1 Post A", + "published": 1697100002, + }, + { + "title": "Feed1 Post B", + "published": 1697100001, + }, + ] + } + + feed2_response = MagicMock() + feed2_response.ok = True + feed2_response.json.return_value = { + "items": [ + { + "title": "Feed2 Post A", + "published": 1697100004, + }, + { + "title": "Feed2 Post B", + "published": 1697100003, + }, + ] + } + + mock_requests_get.side_effect = [ + subscription_response, + feed1_response, + feed2_response, + ] + + response_page1 = client.get("/digest?n=2&page=1&limit=2") + assert response_page1.status_code == 200 + data_page1 = response_page1.get_json() + assert len(data_page1["items"]) == 2 + assert data_page1["items"][0]["title"] == "Feed2 Post A" + assert data_page1["items"][1]["title"] == "Feed2 Post B" + assert data_page1["totalItems"] == 4 + assert data_page1["page"] == 1 + assert data_page1["limit"] == 2 + + response_page2 = client.get("/digest?n=2&page=2&limit=2") + assert response_page2.status_code == 200 + data_page2 = response_page2.get_json() + assert len(data_page2["items"]) == 2 + assert data_page2["items"][0]["title"] == "Feed1 Post A" + assert data_page2["items"][1]["title"] == "Feed1 Post B" + assert data_page2["page"] == 2 + assert data_page2["limit"] == 2 + assert data_page2["totalItems"] == 4 + + +def test_caching_digest(client, mock_requests_get): + """ + Test that calling /digest with the same label/n parameters multiple times + uses cached data on the second request and does NOT re-fetch feeds from FreshRSS. + """ + subscription_response = MagicMock() + subscription_response.ok = True + subscription_response.json.return_value = { + "subscriptions": [ + {"id": "feed/1", "title": "Feed 1"}, + ] + } + + feed1_response = MagicMock() + feed1_response.ok = True + feed1_response.json.return_value = { + "items": [ + { + "title": "Feed1 Cached Post", + "published": 1697000000, + } + ] + } + + mock_requests_get.side_effect = [ + subscription_response, + feed1_response, + ] + + response = client.get("/digest?n=1") + assert response.status_code == 200 + data = response.get_json() + assert len(data["items"]) == 1 + assert data["items"][0]["title"] == "Feed1 Cached Post" + assert data["totalItems"] == 1 + + calls_after_first = mock_requests_get.call_args_list + assert len(calls_after_first) == 2 + + response_cached = client.get("/digest?n=1") + assert response_cached.status_code == 200 + data_cached = response_cached.get_json() + assert len(data_cached["items"]) == 1 + assert data_cached["items"][0]["title"] == "Feed1 Cached Post" + + calls_after_second = mock_requests_get.call_args_list + assert ( + len(calls_after_second) == 2 + ), "No new requests should be made on the second call if data was cached."