An async HTTP/1.1 and HTTP/2 framework in Rust, built on top of tokio. Designed to be lightweight, secure, and production-ready.
- Trie-based routing (static, dynamic, multi-method)
- Middleware (global and per route group)
- Native TLS via rustls (HTTPS with PEM certificates)
- HTTP/2 via h2 (automatic ALPN negotiation over TLS)
- Request body streaming via
mpsc::channel - Automatic gzip/brotli compression
- Native CORS with builder pattern and fail-fast validation (RFC compliance)
- Granular body size limits (global and per route group)
- Pluggable rate limiting via
RateLimitertrait (in-memory or distributed backends like Redis) - Static file serving
- Cookies (parsing and Set-Cookie builder)
- Graceful shutdown
- Configurable timeouts (read and idle)
- Concurrent connection limits
- Automatic security headers (
X-Content-Type-Options: nosniff) with configurable CSP, X-Frame-Options, and more - Automatic request ID (
X-Request-ID)
use rpress::{Rpress, RpressCors, RpressRoutes, RequestPayload, ResponsePayload};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let cors = RpressCors::new()
.set_origins(vec!["*"])
.set_methods(vec!["GET", "POST", "PUT", "DELETE"])
.set_headers(vec!["Content-Type", "Authorization"]);
let mut app = Rpress::new(Some(cors));
let mut routes = RpressRoutes::new();
routes.add(":get/hello", |_req: RequestPayload| async move {
ResponsePayload::text("Hello, Rpress!")
});
app.add_route_group(routes);
app.listen("0.0.0.0:3000").await?;
Ok(())
}Routes use the format :method/path. Dynamic segments are prefixed with :.
let mut routes = RpressRoutes::new();
routes.add(":get/api/users", |_req: RequestPayload| async move {
ResponsePayload::json(&serde_json::json!({"users": []})).unwrap()
});routes.add(":get/api/users/:id", |req: RequestPayload| async move {
let id = req.get_param("id").unwrap_or("0");
ResponsePayload::text(format!("User ID: {}", id))
});routes.add(":get/api/resource", |_req: RequestPayload| async move {
ResponsePayload::text("GET resource")
});
routes.add(":post/api/resource", |_req: RequestPayload| async move {
ResponsePayload::text("POST resource").with_status(StatusCode::Created)
});
routes.add(":delete/api/resource/:id", |req: RequestPayload| async move {
let id = req.get_param("id").unwrap_or("?");
ResponsePayload::text(format!("Deleted {}", id))
});GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS
Applied to all routes:
app.use_middleware(|req, next| async move {
let uri = req.uri().to_string();
let method = req.method().to_string();
tracing::info!("--> {} {}", method, uri);
let start = std::time::Instant::now();
let result = next(req).await;
tracing::info!("<-- {} {} ({:?})", method, uri, start.elapsed());
result
});let mut routes = RpressRoutes::new();
routes.use_middleware(|req, next| async move {
if req.header("authorization").is_none() {
return Err(RpressError {
status: StatusCode::Unauthorized,
message: "Token required".to_string(),
});
}
next(req).await
});
routes.add(":get/admin/dashboard", |_req: RequestPayload| async move {
ResponsePayload::text("Admin area")
});Middleware often needs to pass extracted data (e.g. JWT claims) to downstream handlers. Use set_extension / get_extension on RequestPayload:
let mut public = RpressRoutes::new();
public.add(":post/login", |_req: RequestPayload| async move {
ResponsePayload::json(&serde_json::json!({"token": "eyJ..."})).unwrap()
});
let mut protected = RpressRoutes::new();
protected.use_middleware(|mut req, next| async move {
let token = req.header("authorization")
.and_then(|h| h.strip_prefix("Bearer "))
.ok_or(RpressError {
status: StatusCode::Unauthorized,
message: "Missing token".into(),
})?;
// Validate JWT and extract claims...
let user_id = "42"; // from token
let tenant = "acme"; // from token
req.set_extension("user_id", user_id);
req.set_extension("tenant_id", tenant);
next(req).await
});
protected.add(":get/me", |req: RequestPayload| async move {
let user_id = req.get_extension("user_id").unwrap_or("?");
let tenant = req.get_extension("tenant_id").unwrap_or("?");
ResponsePayload::text(format!("user={} tenant={}", user_id, tenant))
});
app.add_route_group(public);
app.add_route_group(protected);Extensions are plain HashMap<String, String> key-value pairs — lightweight and zero-cost when unused. Later middleware in the chain can overwrite values set by earlier middleware.
Rpress automatically creates structured tracing spans for every request. This makes the framework compatible with distributed tracing backends like Jaeger, Datadog, Grafana Tempo, and Zipkin out of the box.
Every incoming request is wrapped in an http.request span with these fields:
| Field | Description |
|---|---|
http.method |
HTTP method (GET, POST, etc.) |
http.route |
Request URI path |
http.request_id |
Unique UUID v4 (same as X-Request-ID header) |
http.status_code |
Response status code (recorded after handler completes) |
http.latency_ms |
Total processing time in milliseconds |
Each connection also gets a parent span:
| Span | Fields | Description |
|---|---|---|
http.connection |
peer.addr |
Per-connection span (HTTP/1.1 and TLS) |
h2.stream |
— | Per-stream span for HTTP/2 multiplexed streams |
The hierarchy looks like this:
http.connection (peer.addr=192.168.1.10)
└── http.request (method=GET, route=/users/1, request_id=abc-123, status_code=200, latency_ms=3)
└── app.request (your middleware span)
└── tracing::info!("...") ← inherits full context
Any tracing::info!, tracing::warn!, or tracing::error! emitted inside a middleware or handler automatically inherits the parent span context — no manual propagation needed.
The framework span already exists when your middleware runs. Create a child span to add application-specific fields:
app.use_middleware(|req, next| async move {
let uri = req.uri().to_string();
let method = req.method().to_string();
let span = tracing::info_span!(
"app.request",
app.route = %uri,
app.method = %method,
app.user_id = tracing::field::Empty,
);
let _guard = span.enter();
tracing::info!("processing request");
let result = next(req).await;
// After authentication, record the user:
// tracing::Span::current().record("app.user_id", &"user-123");
result
});Rpress uses the standard tracing crate. To export spans to a distributed tracing backend, configure tracing-subscriber with an OpenTelemetry layer in your main():
// Cargo.toml:
// tracing-subscriber = { version = "0.3", features = ["env-filter"] }
// opentelemetry = "0.27"
// opentelemetry-otlp = "0.27"
// tracing-opentelemetry = "0.28"
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
fn init_tracing() {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.build()
.unwrap();
let provider = opentelemetry::sdk::trace::TracerProvider::builder()
.with_batch_exporter(exporter)
.build();
let tracer = provider.tracer("rpress-app");
let telemetry = tracing_opentelemetry::layer().with_tracer(tracer);
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(telemetry)
.init();
}With this setup, every http.request span (and its children) is automatically exported as a trace to your backend. The http.request_id field matches the X-Request-ID response header, making it easy to correlate logs with traces.
routes.add(":post/api/data", |req: RequestPayload| async move {
// URI and method
let uri = req.uri();
let method = req.method();
// Headers (keys are lowercase)
let content_type = req.header("content-type").unwrap_or("unknown");
let auth = req.header("authorization");
// Route parameters
let id = req.get_param("id");
// Query string — GET /search?q=rust&page=1
let query = req.get_query("q").unwrap_or("");
let page = req.get_query("page").unwrap_or("1");
// Cookies
let cookies = req.cookies();
let session = cookies.get("session_id");
// Extensions (set by middleware, e.g. auth claims)
let user_id = req.get_extension("user_id");
let role = req.get_extension("role");
// Body as string
let body_text = req.body_str().unwrap_or("invalid utf8");
// Body as JSON
let data: serde_json::Value = req.body_json().unwrap();
ResponsePayload::text("ok")
});For large uploads, Rpress can stream the body in chunks via a channel instead of accumulating everything in memory. The threshold is configurable:
app.set_stream_threshold(64 * 1024); // stream bodies > 64KBCollects the entire body into a Vec<u8>. Works for both small bodies (already buffered) and streamed ones:
routes.add(":post/upload", |mut req: RequestPayload| async move {
let body = req.collect_body().await;
ResponsePayload::text(format!("Received {} bytes", body.len()))
});For processing data on demand without accumulating everything in memory:
routes.add(":post/stream", |mut req: RequestPayload| async move {
let mut total = 0usize;
if let Some(mut rx) = req.body_stream() {
while let Some(chunk) = rx.recv().await {
total += chunk.len();
}
}
ResponsePayload::text(format!("Processed {} bytes in chunks", total))
});// Plain text
ResponsePayload::text("Hello world")
// HTML
ResponsePayload::html("<h1>Welcome</h1>")
// JSON
ResponsePayload::json(&serde_json::json!({"status": "ok"})).unwrap()
// Bytes with custom content-type
ResponsePayload::bytes(vec![0x89, 0x50, 0x4E, 0x47], "image/png")
// Empty (204 No Content)
ResponsePayload::empty()
// Redirect
ResponsePayload::redirect("/new-location", StatusCode::Found)ResponsePayload::text("data")
.with_status(StatusCode::Created)
.with_content_type("application/xml")
.with_header("X-Custom", "value")use rpress::CookieBuilder;
let cookie = CookieBuilder::new("token", "abc123")
.path("/")
.max_age(3600)
.same_site("Strict")
.http_only(true)
.secure(true)
.domain("example.com");
ResponsePayload::text("logged in")
.set_cookie(&cookie)Multiple Set-Cookie headers are supported — each .set_cookie() call adds a separate header.
Native configuration via builder pattern:
let cors = RpressCors::new()
.set_origins(vec!["https://app.example.com", "https://admin.example.com"])
.set_methods(vec!["GET", "POST", "PUT", "DELETE"])
.set_headers(vec!["Content-Type", "Authorization", "X-Custom-Header"])
.set_expose_headers(vec!["X-Request-ID"])
.set_max_age(3600)
.set_credentials(true);
let mut app = Rpress::new(Some(cors));Without CORS:
let mut app = Rpress::new(None);Automatic headers: Access-Control-Allow-Origin, Access-Control-Allow-Methods, Access-Control-Allow-Headers, Vary: Origin. Preflight OPTIONS requests are handled automatically.
Rpress enforces RFC-compliant CORS at startup. Using wildcard origin "*" with set_credentials(true) will panic immediately, preventing the application from starting with an insecure configuration that browsers would silently reject:
// This will panic at startup:
let cors = RpressCors::new()
.set_origins(vec!["*"])
.set_credentials(true);
let app = Rpress::new(Some(cors)); // panics!
// Use explicit origins instead:
let cors = RpressCors::new()
.set_origins(vec!["https://app.example.com"])
.set_credentials(true);
let app = Rpress::new(Some(cors)); // okGzip and Brotli with automatic negotiation via Accept-Encoding:
app.enable_compression(true);Behavior:
- Brotli is preferred when
Accept-Encoding: bris present - Gzip is used when
Accept-Encoding: gzipis present - Bodies smaller than 256 bytes are not compressed
- Already compressed types (image/, video/, audio/*, zip, gzip) are skipped
- SVG is compressed normally
Content-EncodingandVary: Accept-Encodingare added automatically- Compression runs inside
tokio::task::spawn_blocking— CPU-bound work (Brotli/Gzip encoding) never blocks the async event loop, even under high concurrency
Limit requests per IP with a sliding window counter:
app.set_rate_limit(100, 60); // 100 requests per 60 secondsWhen the limit is exceeded, returns 429 Too Many Requests.
By default, set_rate_limit uses an in-memory backend (InMemoryRateLimiter) suitable for single-instance deployments. Expired entries are automatically cleaned up when the store exceeds 10,000 records.
For multi-instance environments (e.g. Kubernetes), inject a custom backend that implements the RateLimiter trait:
use rpress::RateLimiter;
use std::pin::Pin;
struct RedisRateLimiter { /* redis client */ }
impl RateLimiter for RedisRateLimiter {
fn check(
&self,
key: &str,
max_requests: u32,
window_secs: u64,
) -> Pin<Box<dyn Future<Output = bool> + Send + '_>> {
let key = key.to_string();
Box::pin(async move {
// Query Redis INCR + EXPIRE and return whether under limit
true
})
}
}
let mut app = Rpress::new(None);
app.set_rate_limiter(RedisRateLimiter { /* ... */ });
app.set_rate_limit(100, 60);The set_rate_limiter call must come before set_rate_limit, or after it to replace the default in-memory limiter. The framework does not ship a Redis implementation -- it only provides the trait and the in-memory default.
By default, Rpress rejects request bodies larger than 10 MB with 413 Payload Too Large.
app.set_max_body_size(5 * 1024 * 1024); // 5 MB for all routesIndividual route groups can override the global limit. This allows a file upload group to accept large bodies while keeping the rest of the API tightly restricted:
let mut api_routes = RpressRoutes::new();
api_routes.set_max_body_size(8 * 1024); // 8 KB for API routes
api_routes.add(":post/login", |req: RequestPayload| async move {
ResponsePayload::text("ok")
});
let mut upload_routes = RpressRoutes::new();
upload_routes.set_max_body_size(50 * 1024 * 1024); // 50 MB for uploads
upload_routes.add(":post/upload", |mut req: RequestPayload| async move {
let body = req.collect_body().await;
ResponsePayload::text(format!("Received {} bytes", body.len()))
});
app.set_max_body_size(1024 * 1024); // 1 MB global default
app.add_route_group(api_routes);
app.add_route_group(upload_routes);When a route group has its own limit, that limit takes precedence over the global one -- even if the group limit is larger. The global limit acts as the baseline for routes without a specific override.
app.serve_static("/assets", "./public");
app.serve_static("/uploads", "/var/data/uploads");- Content-Type is detected by file extension
- Path traversal is prevented with
canonicalize()— both the base directory and the requested path are resolved and compared before any read is performed - File reads use
tokio::fs::readand path resolution usestokio::fs::canonicalize— no blocking syscalls on the event loop - Supports: HTML, CSS, JS, JSON, images (PNG, JPG, GIF, SVG, WebP, ICO), fonts (WOFF, WOFF2, TTF), PDF, XML, videos (MP4, WebM)
Rpress supports native TLS via rustls. Use listen_tls instead of listen to serve over HTTPS:
use rpress::{Rpress, RpressTlsConfig, RpressRoutes, RequestPayload, ResponsePayload};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut app = Rpress::new(None);
let mut routes = RpressRoutes::new();
routes.add(":get/hello", |_req: RequestPayload| async {
ResponsePayload::text("Hello, HTTPS!")
});
app.add_route_group(routes);
let tls = RpressTlsConfig::from_pem("cert.pem", "key.pem")?;
app.listen_tls("0.0.0.0:443", tls).await
}| Method | Description |
|---|---|
from_pem(cert_path, key_path) |
Loads a PEM certificate chain and private key from files |
from_config(rustls::ServerConfig) |
Uses an existing rustls::ServerConfig for full control |
Both methods automatically configure ALPN to support HTTP/2 (h2) and HTTP/1.1.
The listen() method continues to work for plaintext HTTP. You can use either one depending on your environment:
// Development — plaintext
app.listen("0.0.0.0:3000").await?;
// Production — TLS
let tls = RpressTlsConfig::from_pem("cert.pem", "key.pem")?;
app.listen_tls("0.0.0.0:443", tls).await?;HTTP/2 is supported automatically over TLS connections. When a client negotiates the h2 protocol via ALPN during the TLS handshake, Rpress routes the connection through its HTTP/2 handler.
- All routes, middleware, CORS, and response features work identically over HTTP/2
- No code changes required — the same
RpressRoutesand handlers serve both protocols - HTTP/2 multiplexing is fully supported (concurrent streams on a single connection)
- Plaintext connections (
listen()) always use HTTP/1.1
// This handler serves both HTTP/1.1 and HTTP/2 clients transparently
routes.add(":get/api/data", |_req: RequestPayload| async {
ResponsePayload::json(&serde_json::json!({"protocol": "auto"})).unwrap()
});use std::time::Duration;
use rpress::{Rpress, RpressTlsConfig};
let mut app = Rpress::new(Some(cors));
// Read buffer capacity (default: 40KB)
app.set_buffer_capacity(1024 * 1024);
// Read timeout per request (default: 30s)
app.set_read_timeout(Duration::from_secs(30));
// Idle timeout between keep-alive requests (default: 60s)
app.set_idle_timeout(Duration::from_secs(120));
// Maximum concurrent connections (default: 1024)
app.set_max_connections(2048);
// Global max body size (default: 10MB)
app.set_max_body_size(5 * 1024 * 1024);
// Rate limiting (in-memory by default)
app.set_rate_limit(100, 60);
// Or inject a custom backend:
// app.set_rate_limiter(my_redis_limiter);
// Body streaming threshold (default: 64KB)
app.set_stream_threshold(64 * 1024);
// Gzip/brotli compression (default: disabled)
app.enable_compression(true);
// Static files
app.serve_static("/assets", "./public");
// Routes and middleware
app.use_middleware(|req, next| async move { next(req).await });
app.add_route_group(routes);
// Start the server (choose one)
app.listen("0.0.0.0:3000").await?; // HTTP
// or
let tls = RpressTlsConfig::from_pem("cert.pem", "key.pem")?;
app.listen_tls("0.0.0.0:443", tls).await?; // HTTPS + HTTP/2
// With a ready callback (like Express's app.listen(port, callback))
app.listen_with("0.0.0.0:3000", || async {
println!("Server running on port 3000");
}).await?;Organize handlers in structs with Arc:
use rpress::handler;
pub struct UserController;
impl UserController {
pub fn new() -> Arc<Self> {
Arc::new(Self)
}
async fn get_user(&self, req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let id = req.get_param("id").ok_or_else(|| RpressError {
status: StatusCode::BadRequest,
message: "Missing id".to_string(),
})?;
Ok(ResponsePayload::json(&serde_json::json!({
"id": id,
"name": "Guilherme"
}))?)
}
async fn create_user(&self, mut req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let body = req.collect_body().await;
let data: serde_json::Value = serde_json::from_slice(&body)?;
Ok(ResponsePayload::json(&serde_json::json!({
"created": true,
"name": data["name"]
}))?.with_status(StatusCode::Created))
}
}
pub fn get_user_routes() -> RpressRoutes {
let controller = UserController::new();
let mut routes = RpressRoutes::new();
routes.add(":get/users/:id", handler!(controller, get_user));
routes.add(":post/users", handler!(controller, create_user));
routes
}Shared state — database pools, config, caches, service clients — is passed into route groups as function parameters and stored inside controllers wrapped in Arc.
main()
└── Arc::new(MyPool::new()) — created once
├── .clone() → get_user_routes(db)
│ └── UserController { db }
│ └── self.db.query(…).await
└── .clone() → get_order_routes(db)
└── OrderController { db }
// db.rs — your database pool (e.g. sqlx::PgPool or a mock)
pub struct DbPool { /* connection pool */ }
impl DbPool {
pub async fn find_user(&self, id: u32) -> Option<User> { /* … */ }
pub async fn create_user(&self, name: String, email: String) -> User { /* … */ }
}// routes/user.rs
use std::sync::Arc;
use rpress::{handler, RpressRoutes, RequestPayload, ResponsePayload, RpressError, StatusCode};
use crate::db::DbPool;
pub struct UserController {
db: Arc<DbPool>, // shared, cloning Arc is O(1)
}
impl UserController {
pub fn new(db: Arc<DbPool>) -> Arc<Self> {
Arc::new(Self { db })
}
async fn get_user(&self, req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let id: u32 = req.get_param("id")
.and_then(|v| v.parse().ok())
.ok_or(RpressError { status: StatusCode::BadRequest, message: "bad id".into() })?;
let user = self.db.find_user(id).await
.ok_or(RpressError { status: StatusCode::NotFound, message: "not found".into() })?;
Ok(ResponsePayload::json(&user)?)
}
async fn create_user(&self, mut req: RequestPayload) -> Result<ResponsePayload, RpressError> {
let body = req.collect_body().await;
let data: serde_json::Value = serde_json::from_slice(&body)?;
let user = self.db.create_user(
data["name"].as_str().unwrap_or("").to_string(),
data["email"].as_str().unwrap_or("").to_string(),
).await;
Ok(ResponsePayload::json(&user)?.with_status(StatusCode::Created))
}
}
// The pool is injected here — route groups are plain functions.
pub fn get_user_routes(db: Arc<DbPool>) -> RpressRoutes {
let controller = UserController::new(db);
let mut routes = RpressRoutes::new();
routes.add(":get/users/:id", handler!(controller, get_user));
routes.add(":post/users", handler!(controller, create_user));
routes
}// main.rs — create the pool once, share it via Arc::clone
use std::sync::Arc;
use rpress::Rpress;
use crate::db::DbPool;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// With sqlx: let db = Arc::new(PgPool::connect(&database_url).await?);
let db = Arc::new(DbPool::new());
let mut app = Rpress::new(None);
// Each route group gets a cheap Arc clone — no data is copied.
app.add_route_group(get_user_routes(db.clone()));
app.add_route_group(get_order_routes(db.clone()));
app.listen("0.0.0.0:3000").await?;
Ok(())
}Pass additional state the same way — just add more parameters:
pub fn get_auth_routes(
db: Arc<DbPool>,
cache: Arc<RedisClient>,
cfg: Arc<AppConfig>,
) -> RpressRoutes {
let controller = AuthController::new(db, cache, cfg);
// …
}// main.rs
let db = Arc::new(DbPool::new());
let cache = Arc::new(RedisClient::connect("redis://localhost")?);
let cfg = Arc::new(AppConfig::from_env());
app.add_route_group(get_auth_routes(db.clone(), cache.clone(), cfg.clone()));Any type that is Send + Sync + 'static can be wrapped in Arc and shared this way, including tokio::sync::RwLock and tokio::sync::Mutex for mutable shared state.
Implement RpressErrorExt to return errors with custom status codes:
use rpress::{RpressErrorExt, StatusCode};
struct NotFoundError {
resource: String,
}
impl RpressErrorExt for NotFoundError {
fn into_rpress_error(self) -> (StatusCode, String) {
(StatusCode::NotFound, format!("{} not found", self.resource))
}
}
routes.add(":get/items/:id", |req: RequestPayload| async move {
let id = req.get_param("id").unwrap_or("0");
if id == "0" {
return Err(NotFoundError { resource: "Item".into() });
}
Ok(ResponsePayload::text(format!("Item {}", id)))
});Handlers can return:
ResponsePayload(implicit 200)Result<ResponsePayload, RpressError>Result<ResponsePayload, E>whereE: RpressErrorExt- Any
E: RpressErrorExtdirectly (error without Result) ()(202 Accepted with no body)
These headers are sent automatically on every response:
| Header | Value |
|---|---|
X-Content-Type-Options |
nosniff |
X-Request-ID |
Unique UUID v4 per request |
Server |
Rpress/1.0 |
Connection |
keep-alive |
Use RpressSecurityHeaders to opt-in to additional security headers such as
Content-Security-Policy, X-Frame-Options, X-XSS-Protection, and any custom
header. These are injected into every response unless the handler already set
the same header via with_header().
use rpress::{Rpress, RpressSecurityHeaders};
let mut app = Rpress::new(None);
app.set_security_headers(
RpressSecurityHeaders::new()
.content_security_policy("default-src 'self'; script-src 'self'")
.x_frame_options("DENY")
.x_xss_protection("1; mode=block")
.custom("Permissions-Policy", "camera=(), microphone=()")
.custom("Referrer-Policy", "strict-origin-when-cross-origin"),
);If a handler needs a different policy for a specific route, it can override by setting the header directly:
ResponsePayload::html(page)
.with_header("Content-Security-Policy", "default-src 'self'; script-src 'self' 'unsafe-inline'")The handler-set value takes priority and the global default is skipped for that header.
The server responds to SIGINT (Ctrl+C):
- Stops accepting new connections
- Waits for active connections to finish
- Shuts down cleanly
| Resource | Limit |
|---|---|
| Request line | 8 KB |
| Headers (size) | 8 KB |
| Headers (count) | 100 |
| Body (Content-Length) | Configurable per route group (default 10 MB) |
| Individual chunk | 1 MB |
| Connection buffer | Configurable (default 40 KB) |
Rpress includes a built-in Socket.IO server compatible with socket.io-client v4+
(Engine.IO v4, Socket.IO protocol v5). It supports HTTP long-polling and WebSocket
transports, namespaces, rooms, event-based messaging, acknowledgements, and broadcasting.
use rpress::{Rpress, RpressIo};
use std::sync::Arc;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let io = RpressIo::new();
io.on_connection(|socket| async move {
println!("Connected: {}", socket.id());
socket.on("message", |socket, data| async move {
// Broadcast to all other sockets
socket.broadcast().emit("message", &data[0]).await;
None
}).await;
socket.on_disconnect(|socket| async move {
println!("Disconnected: {}", socket.id());
}).await;
});
let mut app = Rpress::new(None);
app.attach_socketio(io);
app.listen("0.0.0.0:3000").await
}let io = RpressIo::new();
// Default namespace "/"
io.on_connection(|socket| async move { /* ... */ });
// Custom namespace "/admin"
io.of("/admin").on_connection(|socket| async move {
println!("Admin connected: {}", socket.id());
});socket.on("join_room", |socket, data| async move {
if let Some(room) = data.first().and_then(|v| v.as_str()) {
socket.join(room).await;
socket.to(room).emit("user_joined", &socket.id()).await;
}
None
}).await;socket.on("greet", |_socket, data| async move {
let name = data.first().and_then(|v| v.as_str()).unwrap_or("world");
Some(serde_json::json!(format!("Hello, {}!", name)))
}).await;On the client side (JavaScript):
socket.emit("greet", "Rpress", (response) => {
console.log(response); // "Hello, Rpress!"
});import { io } from "socket.io-client";
const socket = io("http://localhost:3000");
socket.on("connect", () => {
console.log("Connected:", socket.id);
});
socket.emit("message", "Hello from client");
socket.on("message", (data) => {
console.log("Received:", data);
});For server-to-server communication, use the rpress-client crate to connect from Rust:
[dependencies]
rpress-client = "0.1"
tokio = { version = "1", features = ["full"] }
serde_json = "1"use rpress_client::SocketIoClient;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Connect to default namespace "/"
let client = SocketIoClient::connect("http://localhost:3000").await?;
// Listen for events
client.on("chat message", |data| async move {
println!("Received: {:?}", data);
}).await;
// Emit events
client.emit("chat message", &serde_json::json!("Hello from Rust!")).await?;
// Emit with acknowledgement
let ack = client.emit_with_ack("greet", &serde_json::json!("Rpress")).await?;
println!("Ack: {:?}", ack);
// Connect to a custom namespace
let admin = SocketIoClient::connect_to("http://localhost:3000", "/admin").await?;
admin.emit("status", &serde_json::json!("online")).await?;
// Disconnect
client.disconnect().await?;
admin.disconnect().await?;
Ok(())
}Protect Socket.IO connections by registering an authentication handler. The handler
receives the auth payload from the client's CONNECT packet and must return
Ok(claims) to allow the connection or Err(message) to reject it with a
CONNECT_ERROR. The returned claims are accessible on the socket via socket.auth().
Server (Rust):
use rpress::{Rpress, RpressIo};
use std::sync::Arc;
let io = RpressIo::new();
// Register auth handler for the default namespace
io.use_auth(|auth| async move {
let token = auth.get("token").and_then(|v| v.as_str())
.ok_or_else(|| "Missing token".to_string())?;
// Validate the token (e.g. JWT verification)
if token == "valid-secret" {
Ok(serde_json::json!({"user_id": "123", "role": "admin"}))
} else {
Err("Unauthorized".to_string())
}
});
io.on_connection(|socket| async move {
let user_id = socket.auth().get("user_id").and_then(|v| v.as_str());
println!("Authenticated user: {}", user_id.unwrap_or("unknown"));
socket.on("admin_action", |socket, data| async move {
if socket.auth().get("role").and_then(|v| v.as_str()) == Some("admin") {
socket.broadcast().emit("notification", &data[0]).await;
}
None
}).await;
});
// Per-namespace auth is also supported:
io.of("/admin").use_auth(|auth| async move {
let token = auth.get("token").and_then(|v| v.as_str())
.ok_or_else(|| "Missing token".to_string())?;
// Stricter validation for /admin namespace...
Ok(serde_json::json!({"admin": true}))
});Client (JavaScript):
import { io } from "socket.io-client";
const socket = io("http://localhost:3000", {
auth: { token: "valid-secret" }
});
socket.on("connect", () => {
console.log("Authenticated and connected:", socket.id);
});
socket.on("connect_error", (err) => {
console.error("Auth failed:", err.message);
});Client (Rust — rpress-client):
use rpress_client::SocketIoClient;
// Connect with authentication
let client = SocketIoClient::connect_with_auth(
"http://localhost:3000",
serde_json::json!({"token": "valid-secret"}),
).await?;
// Connect to a specific namespace with auth
let admin = SocketIoClient::connect_to_with_auth(
"http://localhost:3000",
"/admin",
serde_json::json!({"token": "admin-secret"}),
).await?;Without an auth handler configured, connections are accepted without validation (backward compatible).
By default, Rpress uses an in-memory adapter for room management and broadcasting. This works perfectly for a single server instance, but when running multiple replicas behind a load balancer (e.g. Kubernetes), broadcasts on one Pod won't reach sockets connected to another Pod.
Enable the redis feature to use Redis Pub/Sub for cross-instance broadcasting:
[dependencies]
rpress = { version = "0.5", features = ["redis"] }Server setup:
use rpress::{Rpress, RpressIo};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let io = RpressIo::with_redis("redis://127.0.0.1:6379").await?;
io.on_connection(|socket| async move {
println!("Connected: {}", socket.id());
socket.on("message", |socket, data| async move {
// This broadcast reaches ALL connected clients,
// even those on different server instances
socket.broadcast().emit("message", &data[0]).await;
None
}).await;
});
let mut app = Rpress::new(None);
app.attach_socketio(io);
app.listen("0.0.0.0:3000").await
}You can also use a custom adapter with set_adapter:
use rpress::{RpressIo, RedisAdapter};
let adapter = RedisAdapter::new("redis://my-redis-cluster:6379").await?;
let mut io = RpressIo::new();
io.set_adapter(adapter);Deploying with Multiple Replicas (Kubernetes / Load Balancers)
Engine.IO starts connections via HTTP long-polling before upgrading to WebSocket.
In a multi-replica deployment, successive polling requests from the same client may
be routed to different Pods by the load balancer, causing "Session ID unknown" errors.
There are two solutions:
Option A: WebSocket-only mode (recommended)
Force all clients to connect directly via WebSocket, bypassing long-polling entirely. This eliminates the sticky session requirement because WebSocket is a single persistent connection that stays on the same Pod.
Server:
use rpress::{Rpress, RpressIo, EioConfig};
let config = EioConfig {
websocket_only: true,
..EioConfig::default()
};
let io = RpressIo::with_config(config);Client (JavaScript):
const socket = io("https://api.example.com", {
transports: ["websocket"], // skip long-polling
});Client (Rust — rpress-client):
// rpress-client connects via WebSocket by default — no changes needed
let client = SocketIoClient::connect("http://localhost:3000").await?;When websocket_only is enabled, the server rejects any long-polling request with
a clear error message instructing the client to use WebSocket transport.
Option B: Sticky sessions
If you need long-polling support (e.g. for clients behind restrictive proxies that block WebSocket), configure your load balancer with session affinity so that all requests from the same client reach the same Pod.
Nginx example:
upstream rpress_backend {
ip_hash; # sticky sessions by client IP
server pod1:3000;
server pod2:3000;
}
server {
location /socket.io/ {
proxy_pass http://rpress_backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}Kubernetes Ingress (nginx-ingress) example:
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
annotations:
nginx.ingress.kubernetes.io/affinity: "cookie"
nginx.ingress.kubernetes.io/session-cookie-name: "RPRESS_AFFINITY"
nginx.ingress.kubernetes.io/session-cookie-expires: "172800"
nginx.ingress.kubernetes.io/proxy-read-timeout: "3600"
nginx.ingress.kubernetes.io/proxy-send-timeout: "3600"
spec:
rules:
- host: api.example.com
http:
paths:
- path: /socket.io/
pathType: Prefix
backend:
service:
name: rpress-service
port:
number: 3000The Redis adapter handles only cross-instance broadcast synchronization. Room membership, socket state, and Engine.IO sessions remain local to each instance, which is why sticky sessions (or WebSocket-only mode) are required.
Load tested on a single machine with oha (HTTP) and Artillery (Socket.IO). All tests run against a release build of the benchmark server included in bench/.
| Scenario | Requests | Concurrency | Req/sec | p50 | p99 | Success |
|---|---|---|---|---|---|---|
| Warmup | 1,000 | 50 | 85,422 | 0.37ms | 1.79ms | 100% |
| Max Throughput | 50,000 | 500 | 144,126 | 2.91ms | 10.91ms | 100% |
| JSON Serialization | 20,000 | 200 | 30,528 | 6.13ms | 15.03ms | 100% |
| POST Echo (1KB body) | 20,000 | 200 | 28,886 | 6.51ms | 16.48ms | 100% |
| Large Body + Compression | 10,000 | 100 | 1,840 | 53.30ms | 108.01ms | 100% |
| Static File (32KB CSS) | 10,000 | 100 | 10,295 | 9.01ms | 22.52ms | 100% |
| Extreme Concurrency | 10,000 | 1,000 | 94,558 | 6.19ms | 35.60ms | 100% |
| Sustained Load (60s) | 1,854,463 | 200 | 30,906 | 6.10ms | 15.69ms | 100% |
| Metric | Value |
|---|---|
| Virtual Users | 520 created, 520 completed, 0 failed |
| Scenarios | Ping-Pong (60%), Room Join (30%), Broadcast Storm (10%) |
| Total Emits | 3,567 |
| Peak Emit Rate | 135/sec |
| Session Length (median) | 2.0s |
| Total Test Time | 1 min 10s |
| Test | Result | Detail |
|---|---|---|
| Connection Limit (5,000 vs 4,096 max) | PASS | 4,883 successful, excess gracefully rejected |
| Slowloris (20 slow connections) | PASS | Server remained responsive |
| Oversized Body (15MB vs 10MB limit) | PASS | Returned 413 Payload Too Large |
| Post-Stress Health Check | PASS | 10/10 checks passed |
All scenarios are configurable via environment variables. See bench/README.md for details on running your own benchmarks.
MIT