Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ DATABASE_URL=sqlite:/data/meta.db
# [optional] Backoff base duration in seconds (default: 2).
# BACKOFF_BASE_SECS=2

# [optional] Upload inactivity timeout in seconds before marking stale uploads as failed (default: 600).
# UPLOAD_STALE_TIMEOUT_SECS=600

# [optional] Interval in seconds between stale upload sweep checks (default: 30).
# UPLOAD_STALE_CHECK_INTERVAL_SECS=30

# ──────────────────────────────────────────────
# Streamer
# ──────────────────────────────────────────────
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

pub mod share_repo;
pub mod task_repo;
pub mod upload_session_repo;
pub mod video_repo;

use snafu::{ResultExt, Snafu};
Expand Down
141 changes: 141 additions & 0 deletions crates/core/src/db/upload_session_repo.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
//! Repository for tracking in-progress upload sessions.

use chrono::{DateTime, Utc};
use snafu::ResultExt;
use sqlx::SqlitePool;

use super::{QuerySnafu, Result};

/// Insert or update upload session activity for a video.
///
/// If the session already exists, only `last_activity_at` is updated.
pub async fn touch(pool: &SqlitePool, video_id: &str, at: DateTime<Utc>) -> Result<()> {
let ts = at.to_rfc3339();
sqlx::query(
"INSERT INTO upload_sessions (video_id, created_at, last_activity_at)
VALUES (?, ?, ?)
ON CONFLICT(video_id) DO UPDATE SET last_activity_at = excluded.last_activity_at",
)
.bind(video_id)
.bind(&ts)
.bind(&ts)
.execute(pool)
.await
.context(QuerySnafu)?;
Ok(())
}

/// Remove the upload session for a video.
pub async fn delete(pool: &SqlitePool, video_id: &str) -> Result<()> {
sqlx::query("DELETE FROM upload_sessions WHERE video_id = ?")
.bind(video_id)
.execute(pool)
.await
.context(QuerySnafu)?;
Ok(())
}

/// Return stale upload session video IDs whose activity is older than `cutoff`.
pub async fn list_stale(pool: &SqlitePool, cutoff: DateTime<Utc>) -> Result<Vec<String>> {
let rows: Vec<(String,)> = sqlx::query_as(
"SELECT video_id
FROM upload_sessions
WHERE last_activity_at <= ?
ORDER BY last_activity_at ASC",
)
.bind(cutoff.to_rfc3339())
.fetch_all(pool)
.await
.context(QuerySnafu)?;

Ok(rows.into_iter().map(|(video_id,)| video_id).collect())
}

#[cfg(test)]
mod tests {
use chrono::Duration;

use super::*;
use crate::db::run_migrations;

async fn in_memory_db() -> SqlitePool {
let db = SqlitePool::connect("sqlite::memory:")
.await
.expect("in-memory db");
run_migrations(&db).await.expect("migrations");
db
}

async fn seed_video(db: &SqlitePool, video_id: &str) {
sqlx::query(
"INSERT INTO videos (video_id, owner, filename, size_bytes, content_type, status, \
created_at, ready_at)
VALUES (?, 'alice', 'demo.mp4', 1024, 'video/mp4', 'uploading', ?, NULL)",
)
.bind(video_id)
.bind(Utc::now().to_rfc3339())
.execute(db)
.await
.expect("seed video");
}

#[tokio::test]
async fn touch_creates_or_updates_session() {
let db = in_memory_db().await;
seed_video(&db, "v_1").await;

let first = Utc::now() - Duration::minutes(10);
touch(&db, "v_1", first).await.expect("touch insert");

let second = Utc::now();
touch(&db, "v_1", second).await.expect("touch update");

let row: (String, String, String) = sqlx::query_as(
"SELECT video_id, created_at, last_activity_at FROM upload_sessions WHERE video_id = ?",
)
.bind("v_1")
.fetch_one(&db)
.await
.expect("session row exists");

assert_eq!(row.0, "v_1");
assert_eq!(row.1, first.to_rfc3339());
assert_eq!(row.2, second.to_rfc3339());
}

#[tokio::test]
async fn list_stale_returns_only_expired_sessions() {
let db = in_memory_db().await;
seed_video(&db, "old").await;
seed_video(&db, "new").await;

touch(&db, "old", Utc::now() - Duration::minutes(20))
.await
.expect("old touch");
touch(&db, "new", Utc::now() - Duration::minutes(2))
.await
.expect("new touch");

let stale = list_stale(&db, Utc::now() - Duration::minutes(5))
.await
.expect("list stale");
assert_eq!(stale, vec!["old".to_string()]);
}

#[tokio::test]
async fn delete_removes_session() {
let db = in_memory_db().await;
seed_video(&db, "v_2").await;
touch(&db, "v_2", Utc::now()).await.expect("touch");

delete(&db, "v_2").await.expect("delete session");

let count: (i64,) =
sqlx::query_as("SELECT COUNT(*) FROM upload_sessions WHERE video_id = ?")
.bind("v_2")
.fetch_one(&db)
.await
.expect("count row");
assert_eq!(count.0, 0);
}
}
58 changes: 31 additions & 27 deletions crates/ingestor/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ pub struct Config {
/// Delay in seconds before retrying after a consumer receive error.
#[serde(default = "defaults::consumer_retry_delay_secs")]
pub consumer_retry_delay_secs: u64,
/// Upload session inactivity timeout in seconds.
#[serde(default = "defaults::upload_stale_timeout_secs")]
pub upload_stale_timeout_secs: u64,
/// Interval in seconds for stale upload sweep checks.
#[serde(default = "defaults::upload_stale_check_interval_secs")]
pub upload_stale_check_interval_secs: u64,
}

impl Config {
Expand All @@ -43,6 +49,16 @@ impl Config {
pub const fn consumer_retry_delay(&self) -> Duration {
Duration::from_secs(self.consumer_retry_delay_secs)
}

/// Build a [`Duration`] from the stale upload inactivity timeout.
pub const fn upload_stale_timeout(&self) -> Duration {
Duration::from_secs(self.upload_stale_timeout_secs)
}

/// Build a [`Duration`] from the stale upload sweep interval.
pub const fn upload_stale_check_interval(&self) -> Duration {
Duration::from_secs(self.upload_stale_check_interval_secs)
}
}

impl Default for Config {
Expand All @@ -56,6 +72,8 @@ impl Default for Config {
max_attempts: defaults::max_attempts(),
backoff_base_secs: defaults::backoff_base_secs(),
consumer_retry_delay_secs: defaults::consumer_retry_delay_secs(),
upload_stale_timeout_secs: defaults::upload_stale_timeout_secs(),
upload_stale_check_interval_secs: defaults::upload_stale_check_interval_secs(),
}
}
}
Expand All @@ -79,6 +97,12 @@ mod defaults {

/// 1 second.
pub(super) const fn consumer_retry_delay_secs() -> u64 { 1 }

/// 10 minutes.
pub(super) const fn upload_stale_timeout_secs() -> u64 { 600 }

/// 30 seconds.
pub(super) const fn upload_stale_check_interval_secs() -> u64 { 30 }
}

#[cfg(test)]
Expand All @@ -95,7 +119,9 @@ mod tests {
"chunk_size": 1_048_576,
"max_attempts": 5,
"backoff_base_secs": 10,
"consumer_retry_delay_secs": 3
"consumer_retry_delay_secs": 3,
"upload_stale_timeout_secs": 900,
"upload_stale_check_interval_secs": 15
});
let cfg: Config = serde_json::from_value(json).unwrap();
assert_eq!(cfg.port, 9081);
Expand All @@ -106,6 +132,8 @@ mod tests {
assert_eq!(cfg.max_attempts, 5);
assert_eq!(cfg.backoff_base_secs, 10);
assert_eq!(cfg.consumer_retry_delay_secs, 3);
assert_eq!(cfg.upload_stale_timeout_secs, 900);
assert_eq!(cfg.upload_stale_check_interval_secs, 15);
}

#[test]
Expand All @@ -121,31 +149,7 @@ mod tests {
assert_eq!(cfg.max_attempts, 3);
assert_eq!(cfg.backoff_base_secs, 2);
assert_eq!(cfg.consumer_retry_delay_secs, 1);
}

#[test]
fn backoff_base_returns_duration() {
stream_core::paths::init_data_dir(None);
let cfg = Config::default();
assert_eq!(cfg.backoff_base(), std::time::Duration::from_secs(2));
}

#[test]
fn consumer_retry_delay_returns_duration() {
stream_core::paths::init_data_dir(None);
let cfg = Config::default();
assert_eq!(
cfg.consumer_retry_delay(),
std::time::Duration::from_secs(1)
);
}

#[test]
fn round_trips_through_serde() {
stream_core::paths::init_data_dir(None);
let cfg = Config::default();
let json = serde_json::to_string(&cfg).unwrap();
let cfg2: Config = serde_json::from_str(&json).unwrap();
assert_eq!(cfg, cfg2);
assert_eq!(cfg.upload_stale_timeout_secs, 600);
assert_eq!(cfg.upload_stale_check_interval_secs, 30);
}
}
11 changes: 10 additions & 1 deletion crates/ingestor/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use object_store::{ObjectStoreExt, PutPayload, WriteMultipart, path::Path as Sto
use snafu::ResultExt;
use sqlx::SqlitePool;
use stream_core::{
db::{task_repo, video_repo},
db::{task_repo, upload_session_repo, video_repo},
domain::{ConnectionToken, ProcessingTask, TaskStatus, TokenAction, VideoStatus},
validate::MAX_FILE_SIZE,
};
Expand Down Expand Up @@ -73,6 +73,9 @@ pub async fn init_upload(
video_repo::update_status(&state.db, &video_id, VideoStatus::Uploading)
.await
.context(DatabaseSnafu)?;
upload_session_repo::touch(&state.db, &video_id, Utc::now())
.await
.context(DatabaseSnafu)?;

// Generate presigned URL for each part
let exp = Utc::now().timestamp().cast_unsigned() + 3600;
Expand Down Expand Up @@ -135,6 +138,9 @@ pub async fn upload_part(
.map_err(|e| IngestorError::Storage {
message: e.to_string(),
})?;
upload_session_repo::touch(&state.db, &path.id, Utc::now())
.await
.context(DatabaseSnafu)?;

info!(
video_id = path.id,
Expand Down Expand Up @@ -218,6 +224,9 @@ pub async fn complete_upload(
let part_key = StorePath::from(format!("raw/{video_id}/part_{n}").as_str());
let _ = state.store.delete(&part_key).await;
}
upload_session_repo::delete(&state.db, &video_id)
.await
.context(DatabaseSnafu)?;

info!(
video_id,
Expand Down
Loading