Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 29 additions & 15 deletions crates/ingestor/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,29 +189,43 @@ pub async fn complete_upload(
let mut writer = WriteMultipart::new(upload);

let mut total_bytes: usize = 0;
// Stream each part into the multipart writer. If any part fetch fails,
// abort the multipart upload to prevent orphaned uploads on S3/GCS.
for n in 0..num_parts {
let part_key = StorePath::from(format!("raw/{video_id}/part_{n}").as_str());
let data = state
.store
.get(&part_key)
.await
.map_err(|e| IngestorError::Storage {
message: format!("missing part {n}: {e}"),
})?
.bytes()
.await
.map_err(|e| IngestorError::Storage {
message: e.to_string(),
})?;
let data = match state.store.get(&part_key).await {
Ok(result) => match result.bytes().await {
Ok(bytes) => bytes,
Err(e) => {
warn!(video_id, part = n, error = %e, "aborting multipart upload: failed to read part bytes");
writer.abort().await.ok();
return Err(IngestorError::Storage {
message: e.to_string(),
});
}
},
Err(e) => {
warn!(video_id, part = n, error = %e, "aborting multipart upload: missing part");
writer.abort().await.ok();
return Err(IngestorError::Storage {
message: format!("missing part {n}: {e}"),
});
}
};
total_bytes += data.len();
// Feed bytes into the multipart writer without copying; each chunk is
// flushed to storage once the internal buffer reaches its threshold.
writer.put(data);
}

writer.finish().await.map_err(|e| IngestorError::Storage {
message: e.to_string(),
})?;
if let Err(e) = writer.finish().await {
warn!(video_id, error = %e, "aborting multipart upload: finish failed");
// finish() already calls abort internally on failure in recent
// object_store versions, but we log the error for observability.
return Err(IngestorError::Storage {
message: e.to_string(),
});
}

// Clean up individual parts
for n in 0..num_parts {
Expand Down