Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 9 additions & 28 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,42 +1,24 @@
# FreshProxy

A **Flask**-based proxy for [FreshRSS](https://github.com/FreshRSS/FreshRSS) that securely forwards specific API requests, eliminating the need for dynamic query parameters. Configurable via environment variables (or a `.env` file).
A Flask-based proxy for [FreshRSS](https://github.com/FreshRSS/FreshRSS) that securely forwards API requests, eliminating the need to expose raw FreshRSS endpoints directly. Configurable via environment variables (or a `.env` file).

## Overview

**FreshProxy** acts as a dedicated **HTTP proxy** for specific **FreshRSS** endpoints, enhancing security and simplifying request structures. By using dedicated proxy endpoints, you eliminate the need for dynamic query parameters, reducing potential attack vectors and improving clarity.
FreshProxy acts as a dedicated HTTP proxy for your FreshRSS instance, enhancing security and simplifying request structures. By using a single proxy endpoint (`/digest`), you avoid having to expose or directly query each feed or subscription list from the client.

## Features

- **Dedicated Proxy Endpoints**:
- `/subscriptions` -> `subscription/list`.
- `/feed/<id>` -> `stream/contents/feed/<id>`.
- **CORS** restrictions to only allow certain origins.
- **Single Aggregator Endpoint**:
- `GET /digest`: Returns a globally-sorted list of recent feed items from your FreshRSS instance.
- Optional query parameters:
- `label=<labelName>`: Filter feeds by label.
- `n=<int>`: Number of items to fetch per feed (defaults to 1).
- `page=<int> & limit=<int>`: For item-level pagination (defaults: page=1, limit=50).
- **CORS** restrictions, allowing only whitelisted origins.
- **Timeout** and error handling for upstream requests.
- **Environment-based configuration** (via `.env` or standard env vars).
- **Docker Support** for easy deployment.

## Project Structure

```text
freshproxy/
├── freshproxy/
│   ├── __init__.py # Makes 'freshproxy' a package
│   ├── app.py # Application factory & CORS setup
│   ├── config.py # Environment variables, whitelists
│   └── proxy_routes.py # Blueprint with the '/' GET route
├── tests/
│   ├── test_config.py # Example environment var tests
│   └── test_proxy.py # Proxy route tests (mocking requests)
├── requirements.txt # Dependencies (Flask, requests, etc.)
├── pyproject.toml # Project metadata & optional deps
├── run.py # Dev entry point
├── Dockerfile # Container-based deployment
├── .env.example # Example environment variables (no secrets)
├── .gitignore
└── README.md
```

## Installation

1. Clone the repository:
Expand Down Expand Up @@ -74,7 +56,6 @@ FRESHPROXY_DEBUG=False
FRESHPROXY_REQUEST_TIMEOUT=10
```


### Environment Variables

- `FRESHRSS_API_TOKEN`: Secret token used to authenticate with your FreshRSS instance.
Expand Down
229 changes: 188 additions & 41 deletions freshproxy/proxy_routes.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,90 @@
import logging
import time
import re
import requests

from typing import Union, Tuple
from flask import Blueprint, request, jsonify, Response
from concurrent.futures import ThreadPoolExecutor, as_completed

from freshproxy.config import AUTH_TOKEN, BASE_URL, ALLOWED_ENDPOINTS, REQUEST_TIMEOUT

logger = logging.getLogger(__name__)

proxy_bp = Blueprint("proxy_bp", __name__)


AGGREGATOR_CACHE = {}
CACHE_TTL_SECONDS = 300


def get_cache_key(label, n):
"""
Create a unique cache key for aggregator queries.
"""
return f"digest|{label}|{n}"


def set_cache_value(cache_key, value):
"""
Store a (timestamp, data) tuple in the global aggregator cache.
"""
AGGREGATOR_CACHE[cache_key] = (time.time(), value)


def get_cache_value(cache_key):
"""
Retrieve the cached value if it's not expired; otherwise return None.
"""
cache_item = AGGREGATOR_CACHE.get(cache_key)
if not cache_item:
return None
cached_time, data = cache_item
if time.time() - cached_time > CACHE_TTL_SECONDS:
# expired
AGGREGATOR_CACHE.pop(cache_key, None)
return None
return data


def fetch_feed_posts(feed_id, n=1, retry_attempts=2):
"""
Fetch up to 'n' latest posts for a single feed, with retry logic.
Returns a dict with feed info or an error message.
"""
feed_endpoint = ALLOWED_ENDPOINTS.get("feed", "stream/contents/feed")
actual_id = feed_id
if actual_id.startswith("feed/"):
actual_id = actual_id[len("feed/") :]

feed_url = f"{BASE_URL}/{feed_endpoint}/{actual_id}"
headers = {"Authorization": f"GoogleLogin auth={AUTH_TOKEN}"}
params = {"n": n}

for attempt in range(retry_attempts + 1):
try:
resp = requests.get(feed_url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
resp.raise_for_status()

data = resp.json()
items = data.get("items", [])
return items

except requests.Timeout:
logger.warning(f"Timeout fetching feed_id={feed_id}, attempt={attempt}")
if attempt == retry_attempts:
return {"error": "Timeout after retries"}
except requests.RequestException as e:
logger.warning(f"Request error fetching feed_id={feed_id}, attempt={attempt}: {e}")
if attempt == retry_attempts:
return {"error": str(e)}
except ValueError as e:
logger.warning(f"JSON decode error feed_id={feed_id}, attempt={attempt}: {e}")
if attempt == retry_attempts:
return {"error": f"JSON decode error: {e}"}

return {"error": "Unknown fetch error"}


def proxy_request(endpoint: str, params: dict) -> Union[Response, Tuple[Response, int]]:
"""
Helper function to proxy requests to FrehsRSS.
Expand Down Expand Up @@ -60,49 +133,123 @@ def is_valid_feed_id(feed_id: str) -> bool:
Returns:
bool: True if valid, False otherwise.
"""
if feed_id.startswith("feed/"):
feed_id = feed_id[len("feed/") :]
return re.fullmatch(r"\d+", feed_id) is not None


@proxy_bp.route("/subscriptions", methods=["GET"])
def get_subscriptions() -> Union[Response, Tuple[Response, int]]:
"""
Proxy endpoint for /subscriptions -> FreshRSS subscription/list

Returns:
Union[Response, Tuple[Response, int]]: JSON response or error message with status code.
"""
endpoint = ALLOWED_ENDPOINTS.get("subscriptions")
if not endpoint:
logger.error("FreshRSS endpoint for 'subscriptions' not configured.")
return jsonify({"error": "Internal server error"}), 500

params = request.args.to_dict()
params.update({"output": "json"})

return proxy_request(endpoint, params)


@proxy_bp.route("/feed/<feed_id>", methods=["GET"])
def get_feed_contents(feed_id: str) -> Union[Response, Tuple[Response, int]]:
@proxy_bp.route("/digest", methods=["GET"])
def get_digest():
"""
Proxy endpoint for /feed/<id> -> FreshRSS stream/contents/feed/<id>

Args:
feed_id (str): The ID of the feed to retrieve contents for.
Return a sorted list of the latest items across all feeds (optionally filtered by label).

Returns:
Union[Response, Tuple[Response, int]]: JSON response or error message with status code.
Query params:
- label: Filter feeds by this label (optional)
- n: Number of items to fetch per feed (default=1)
- page: 1-based index of which "items page" to return (default=1)
- limit: How many items per page (default=50)
"""
if not is_valid_feed_id(feed_id):
logger.warning(f"Invalid feed_id format received: {feed_id}")
return jsonify({"error": "Invalid feed_id format"}), 400

base_endpoint = ALLOWED_ENDPOINTS.get("feed")
if not base_endpoint:
logger.error("FreshRSS base endpoint for 'feed' not configured.")
return jsonify({"error": "Internal server error"}), 500

endpoint = f"{base_endpoint}/{feed_id}"
params = request.args.to_dict()

return proxy_request(endpoint, params)
label = request.args.get("label", "")
page = int(request.args.get("page", 1))
limit = int(request.args.get("limit", 50))
n = int(request.args.get("n", 1))

cache_key = get_cache_key(label, n)

cached_data = get_cache_value(cache_key)
if cached_data is not None:
logger.info(f"Using cached flattened data for cache_key={cache_key}")
all_items = cached_data
else:
logger.info(f"Cache miss for cache_key={cache_key}. Fetching from FreshRSS.")

# 1) Fetch the subscriptions from FreshRSS
subscriptions_endpoint = ALLOWED_ENDPOINTS.get("subscriptions", "subscription/list")
subscriptions_url = f"{BASE_URL}/{subscriptions_endpoint}"
headers = {"Authorization": f"GoogleLogin auth={AUTH_TOKEN}"}
sub_params = {"output": "json"} # FreshRSS expects 'output=json' for JSON responses

try:
sub_resp = requests.get(
subscriptions_url, headers=headers, params=sub_params, timeout=REQUEST_TIMEOUT
)
sub_resp.raise_for_status()
subscriptions_data = sub_resp.json()
except requests.RequestException as e:
logger.error(f"Failed to fetch subscriptions: {e}")
return jsonify({"error": "Failed to fetch subscriptions", "details": str(e)}), 502
except ValueError as e:
logger.error(f"JSON decode error in subscriptions: {e}")
return (
jsonify({"error": "Failed to decode JSON (subscriptions)", "details": str(e)}),
500,
)

# 2) Filter the subscriptions by label if specified
all_feeds = subscriptions_data.get("subscriptions", [])
if label:
all_feeds = [
feed
for feed in all_feeds
if any(cat.get("label") == label for cat in feed.get("categories", []))
]
logger.info(f"Found {len(all_feeds)} feeds after label filtering.")

# 3) Flatten items from each feed into a single list
all_items = []

def process_feed(feed):
feed_id = feed.get("id")
if not feed_id:
logger.warning(f"Skipping feed with no id: {feed}")
return []

if not is_valid_feed_id(feed_id):
logger.warning(f"Skipping feed with invalid id: {feed_id}")
return []

items = fetch_feed_posts(feed_id, n=n, retry_attempts=2)
# If fetch_feed_posts returns a dict with "error", handle gracefully
if isinstance(items, dict) and "error" in items:
logger.warning(f"Error while fetching feed {feed_id}: {items['error']}")
return []

if not isinstance(items, list):
logger.warning(
f"Expected list of items, got {type(items)}. Skipping feed {feed_id}"
)
return []

for item in items:
item["feedId"] = feed_id
item["feedTitle"] = feed.get("title", "")
item["feedHtmlUrl"] = feed.get("htmlUrl")
item["feedIconUrl"] = feed.get("iconUrl")
return items

max_workers = min(10, len(all_feeds)) # limit concurrency
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_feed = {executor.submit(process_feed, f): f for f in all_feeds}
for future in as_completed(future_to_feed):
feed_items = future.result()
all_items.extend(feed_items)

# 4) Sort the flattened list by `published` descending
all_items.sort(key=lambda x: x.get("published", 0), reverse=True)

# 5) Store in cache for future requests
set_cache_value(cache_key, all_items)

# 6) Pagination: slice the all_items list
offset = max(0, (page - 1) * limit)
paginated_items = all_items[offset : offset + limit]

# 7) Construct the response
response_data = {
"items": paginated_items,
"page": page,
"limit": limit,
"totalItems": len(all_items),
}

return jsonify(response_data)
Loading
Loading