Skip to content

Commit 7e07542

Browse files
authored
Merge pull request #240 from rararulab/issue-229-gateway-ingestor
feat(gateway): add ingestor proxy route (#229)
2 parents 214b920 + ebb78b0 commit 7e07542

5 files changed

Lines changed: 50 additions & 12 deletions

File tree

crates/app/src/commands/gateway.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub(crate) async fn run(json: bool, dry_run: bool) -> ExitCode {
2222

2323
let app_state = stream_gateway::state::AppState::builder()
2424
.meta_upstream(cfg.meta_upstream)
25+
.ingestor_upstream(cfg.ingestor_upstream)
2526
.http_client(reqwest::Client::new())
2627
.rate_limiter(rate_limiter)
2728
.max_body_size(cfg.max_body_size)

crates/gateway/src/config.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ pub struct Config {
1313
/// Upstream URL for the meta service.
1414
#[serde(default = "defaults::meta_upstream")]
1515
pub meta_upstream: String,
16+
/// Upstream URL for the ingestor service.
17+
#[serde(default = "defaults::ingestor_upstream")]
18+
pub ingestor_upstream: String,
1619
/// Maximum requests allowed per rate-limit window.
1720
#[serde(default = "defaults::rate_limit_max")]
1821
pub rate_limit_max: u64,
@@ -33,6 +36,7 @@ impl Default for Config {
3336
Self {
3437
port: defaults::port(),
3538
meta_upstream: defaults::meta_upstream(),
39+
ingestor_upstream: defaults::ingestor_upstream(),
3640
rate_limit_max: defaults::rate_limit_max(),
3741
rate_limit_window_secs: defaults::rate_limit_window_secs(),
3842
max_body_size: defaults::max_body_size(),
@@ -45,6 +49,8 @@ mod defaults {
4549

4650
pub(super) fn meta_upstream() -> String { "http://127.0.0.1:8080".to_string() }
4751

52+
pub(super) fn ingestor_upstream() -> String { "http://127.0.0.1:8081".to_string() }
53+
4854
pub(super) const fn rate_limit_max() -> u64 { 100 }
4955

5056
pub(super) const fn rate_limit_window_secs() -> u64 { 60 }
@@ -62,13 +68,15 @@ mod tests {
6268
let json = serde_json::json!({
6369
"port": 4000,
6470
"meta_upstream": "http://meta:9090",
71+
"ingestor_upstream": "http://ingestor:9091",
6572
"rate_limit_max": 200,
6673
"rate_limit_window_secs": 30,
6774
"max_body_size": 1_048_576
6875
});
6976
let cfg: Config = serde_json::from_value(json).unwrap();
7077
assert_eq!(cfg.port, 4000);
7178
assert_eq!(cfg.meta_upstream, "http://meta:9090");
79+
assert_eq!(cfg.ingestor_upstream, "http://ingestor:9091");
7280
assert_eq!(cfg.rate_limit_max, 200);
7381
assert_eq!(cfg.rate_limit_window_secs, 30);
7482
assert_eq!(cfg.max_body_size, 1_048_576);
@@ -80,6 +88,7 @@ mod tests {
8088
let cfg: Config = serde_json::from_value(json).unwrap();
8189
assert_eq!(cfg.port, 3000);
8290
assert_eq!(cfg.meta_upstream, "http://127.0.0.1:8080");
91+
assert_eq!(cfg.ingestor_upstream, "http://127.0.0.1:8081");
8392
assert_eq!(cfg.rate_limit_max, 100);
8493
assert_eq!(cfg.rate_limit_window_secs, 60);
8594
assert_eq!(cfg.max_body_size, 2 * 1024 * 1024 * 1024);

crates/gateway/src/proxy.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,18 +42,20 @@ impl IntoResponse for ProxyError {
4242
}
4343
}
4444

45-
/// Forward an incoming request to the given `upstream_base` URL.
45+
/// Forward an incoming request to the given upstream URL, stripping the
46+
/// gateway prefix from the path.
4647
///
47-
/// Strips the gateway prefix so that `/api/meta/videos/v_1` becomes
48-
/// `{upstream_base}/videos/v_1` on the upstream service.
49-
pub async fn proxy_to_meta(
50-
State(state): State<AppState>,
48+
/// `prefix` is the gateway path prefix (e.g. `/api/meta`) to strip before
49+
/// forwarding to `upstream_base`.
50+
async fn forward(
51+
state: &AppState,
52+
prefix: &str,
53+
upstream_base: &str,
5154
req: Request<Body>,
5255
) -> Result<Response, ProxyError> {
5356
let path = req.uri().path();
5457

55-
// Strip the `/api/meta` prefix
56-
let upstream_path = path.strip_prefix("/api/meta").unwrap_or(path);
58+
let upstream_path = path.strip_prefix(prefix).unwrap_or(path);
5759
let upstream_path = if upstream_path.is_empty() {
5860
"/"
5961
} else {
@@ -66,7 +68,7 @@ pub async fn proxy_to_meta(
6668
.map(|q| format!("?{q}"))
6769
.unwrap_or_default();
6870

69-
let upstream_url = format!("{}{upstream_path}{query}", state.meta_upstream);
71+
let upstream_url = format!("{upstream_base}{upstream_path}{query}");
7072

7173
let max_body = state.max_body_size;
7274
let method = req.method().clone();
@@ -116,6 +118,27 @@ pub async fn proxy_to_meta(
116118
.expect("failed to build response"))
117119
}
118120

121+
/// Proxy handler for the meta service (`/api/meta/*`).
122+
///
123+
/// Strips the `/api/meta` prefix and forwards to the configured meta upstream.
124+
pub async fn proxy_to_meta(
125+
State(state): State<AppState>,
126+
req: Request<Body>,
127+
) -> Result<Response, ProxyError> {
128+
forward(&state, "/api/meta", &state.meta_upstream.clone(), req).await
129+
}
130+
131+
/// Proxy handler for the ingestor service (`/api/ingest/*`).
132+
///
133+
/// Strips the `/api/ingest` prefix and forwards to the configured ingestor
134+
/// upstream.
135+
pub async fn proxy_to_ingestor(
136+
State(state): State<AppState>,
137+
req: Request<Body>,
138+
) -> Result<Response, ProxyError> {
139+
forward(&state, "/api/ingest", &state.ingestor_upstream.clone(), req).await
140+
}
141+
119142
#[cfg(test)]
120143
mod tests {
121144
use std::sync::Arc;
@@ -150,6 +173,7 @@ mod tests {
150173
async fn rejects_oversized_request_body() {
151174
let state = AppState::builder()
152175
.meta_upstream("http://127.0.0.1:9999".to_string())
176+
.ingestor_upstream("http://127.0.0.1:9998".to_string())
153177
.http_client(reqwest::Client::new())
154178
.rate_limiter(Arc::new(RateLimiter::new(100, 60)))
155179
.max_body_size(16) // 16-byte limit

crates/gateway/src/routes.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub fn router(state: AppState) -> Router {
2525
// are never blocked by auth or rate limiting.
2626
let protected = Router::new()
2727
.route("/api/meta/{*rest}", any(proxy::proxy_to_meta))
28+
.route("/api/ingest/{*rest}", any(proxy::proxy_to_ingestor))
2829
.layer(rate_limit_layer(Arc::clone(&state.rate_limiter)))
2930
.layer(auth_layer());
3031

@@ -48,6 +49,7 @@ mod tests {
4849
fn test_state() -> AppState {
4950
AppState::builder()
5051
.meta_upstream("http://127.0.0.1:9999".to_string())
52+
.ingestor_upstream("http://127.0.0.1:9998".to_string())
5153
.http_client(reqwest::Client::new())
5254
.rate_limiter(Arc::new(RateLimiter::new(100, 60)))
5355
.max_body_size(2 * 1024 * 1024 * 1024)

crates/gateway/src/state.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ use moka::future::Cache as MokaCache;
1515
#[derive(Debug, Clone, Builder)]
1616
pub struct AppState {
1717
/// Upstream URL for the `MetaService`.
18-
pub meta_upstream: String,
18+
pub meta_upstream: String,
19+
/// Upstream URL for the `IngestorService`.
20+
pub ingestor_upstream: String,
1921
/// Shared HTTP client for proxying requests.
20-
pub http_client: reqwest::Client,
22+
pub http_client: reqwest::Client,
2123
/// Rate limiter state shared across requests.
22-
pub rate_limiter: Arc<RateLimiter>,
24+
pub rate_limiter: Arc<RateLimiter>,
2325
/// Maximum allowed request body size in bytes.
24-
pub max_body_size: usize,
26+
pub max_body_size: usize,
2527
}
2628

2729
/// Per-user rate limiter backed by moka cache with TTL-based expiry.

0 commit comments

Comments
 (0)