Skip to content

Conversation

@jcjones
Copy link
Contributor

@jcjones jcjones commented Jan 15, 2026

This change plumbs through asynchronous stream processing for the Report upload path, so that bytes incoming via Trillium's methods are decoded without buffering more than a few reports at a time, and are then processed as before, in whatever order, as fast as the executor can go. The resulting errors are collected and returned at the conclusion.

I've implemented an HTTP/1.1 test of this using raw chunked encoding.

Implements #4149.

@jcjones jcjones added this to the draft-ietf-ppm-dap-16 milestone Jan 15, 2026
@jcjones jcjones self-assigned this Jan 15, 2026
@jcjones jcjones added the aggregator-api Issues relating to the aggregator API (/aggregator_api/) label Jan 15, 2026
@jcjones jcjones marked this pull request as ready for review January 15, 2026 18:41
@jcjones jcjones requested a review from a team as a code owner January 15, 2026 18:41

select! {
// Poll for new reports from the stream (only if not eof)
stream_result = report_stream.next(), if !stream_eof => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does something bad happen if we call report_stream.next() when it's already yielded None? What I'm getting at is, can we avoid tracking stream_eof?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd need to fuse the stream at the end but I think I still need a status variable to be sure that both the stream is None and the futures.is_empty().

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the private type inside async-stream-impl does implement FusedStream already, but this isn't documented. If it has yielded None already, it will keep doing so, leading to the select! macro wasting time by randomly polling it during some loop iterations.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooh, I think I see!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I fixed it in c4b0be1


select! {
// Poll for new reports from the stream (only if not eof)
stream_result = report_stream.next(), if !stream_eof => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW the private type inside async-stream-impl does implement FusedStream already, but this isn't documented. If it has yielded None already, it will keep doing so, leading to the select! macro wasting time by randomly polling it during some loop iterations.

@divergentdave divergentdave removed the aggregator-api Issues relating to the aggregator API (/aggregator_api/) label Jan 16, 2026
@divergentdave
Copy link
Collaborator

I removed the "aggregator-api" label, as that's for things in the janus_aggregator_api crate. (not the clearest name)

@jcjones
Copy link
Contributor Author

jcjones commented Jan 16, 2026

Moving this to draft because my changes aren't testing the way I expect -- with as-of-yet unpushed new tests.

@jcjones jcjones marked this pull request as draft January 16, 2026 23:25
@jcjones jcjones marked this pull request as ready for review January 21, 2026 02:06
}

/// These are all tests of the decode_reports_stream method in http_handlers.rs
mod decode_reports_stream_tests {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This module could go in its own file but I don't mind it as-is.

Comment on lines 664 to 668
// Consume bytes up to the point of failure (including metadata and
// any successfully decoded fields). This allows the stream to continue
// processing subsequent reports after a corrupted one.
bytes_consumed = cursor.position() as usize;
yield Err(decode_error);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we'll be able to continue decoding after encountering a codec error in one report. The reports are only delimited by their own internal sequence of length prefixes, and if decoding fails, the method leaves the cursor's position in an indeterminate position.

If we want to do better in this regard, we'd have to write a separate routine that peeks at the length prefixes and determines the total length of the report, or returns an error for an early EOF. If we did that before trying to decode a report, that would let us isolate the impact of a report containing an invalid encoding of a field element, for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't have an length covering the whole Report, I don't think we can even peek and figure this out in any way more meaningful than is currently happening, as we don't have bounds on any given element. To peek and figure out, say, the leader_encrypted_input_share length, we need to first determine the metadata.public_extensions and public_share lengths. Whether we do that while we're filling in the structures (as this change currently does) or beforehand doesn't change the failure scenario: We don't know we've a corrupt report until something doesn't match expectations.

All that to say, I agree with you that continuing from here will only work if the corrupted report is an early EOF that ends the entire report, not an early EOF in the middle of the report, and we cannot tell the difference.

On further consideration, it's best to just fail the stream and ensure that if at all possible we emit this CodecError for the failed report, assuming we could get the metadata out.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we'd have to peek six times to get all the variable lengths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Digging more, it's not actually possible to ever return a decoding error corresponding to a specific report ID, metadata or not, because all decoding errors are effectively fatal. The only thing we can detect going wrong during a decode is incorrect lengths, which are indistinguishable from the stream still being in-progress. The whole Result<Result<Report, ReportDecodeError>> is, I'm afraid, ineffective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 433c4da I did a partial revert of efa84bb so we're back to the whole-stream-aborts thing. I also adjusted the tests, and particularly changed upload_report_decode_failure (new name) to confirm that if you upload stream[Good Report, Undecodable Gibberish] that you get a stream error and the Good Report gets processed, per spec.

Comment on lines +1563 to +1565
// Create malformed data that fails to decode metadata
// Using an invalid fixed size value (all 0xFF) should cause early failure
let bad_data = vec![0xFF; 100];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was surprised to see an input this long for an error in the ReportMetadata, but it turns out that the metadata includes a length prefix for the public extensions, and thus the 0xFFs lead to an EOF error.

@jcjones jcjones requested a review from divergentdave January 23, 2026 17:49
@jcjones
Copy link
Contributor Author

jcjones commented Jan 23, 2026

---- aggregator::upload_tests::upload_report_decode_failure stdout ----

thread 'aggregator::upload_tests::upload_report_decode_failure' (10859) panicked at aggregator/src/aggregator/upload_tests.rs:1101:6:
called `Result::unwrap()` on an `Err` value: Elapsed(())

Timing out on runtime_manager.wait_for_completed_tasks("aggregator", 1). Naturally, this test isn't brittle locally, I'm ... pondering.

@jcjones
Copy link
Contributor Author

jcjones commented Jan 23, 2026

---- aggregator::upload_tests::upload_report_decode_failure stdout ----

thread 'aggregator::upload_tests::upload_report_decode_failure' (10859) panicked at aggregator/src/aggregator/upload_tests.rs:1101:6:
called `Result::unwrap()` on an `Err` value: Elapsed(())

Timing out on runtime_manager.wait_for_completed_tasks("aggregator", 1). Naturally, this test isn't brittle locally, I'm ... pondering.

oh, this is a failure to finish processing a future as a race. Ugh.

@jcjones
Copy link
Contributor Author

jcjones commented Jan 23, 2026

oh, this is a failure to finish processing a future as a race. Ugh.

70582cc fixes the race and makes the behavior predictable. I've re-run the test in a loop for the last 20 minutes without issue. I should have caught this before.

The only reason I see not to do this is if there's a prohibition in the protocol spec, but I don't see one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants