chore(loki.source.firehose): Port component to the shared server abstraction#6003
chore(loki.source.firehose): Port component to the shared server abstraction#6003
Conversation
|
@codex review |
| defer reader.Close() | ||
|
|
||
| var b bytes.Buffer | ||
| if _, err := io.Copy(&b, reader); err != nil { |
There was a problem hiding this comment.
Semgrep identified an issue in your code:
Detected a possible denial-of-service via a zip bomb attack. By limiting the max bytes read, you can mitigate this attack. io.CopyN() can specify a size.
To resolve this comment:
🔧 No guidance has been designated for this issue. Fix according to your organization's approved methods.
💬 Ignore this finding
Reply with Semgrep commands to ignore this finding.
/fp <comment>for false positive/ar <comment>for acceptable risk/other <comment>for all other reasons
Alternatively, triage in Semgrep AppSec Platform to ignore the finding created by potential-dos-via-decompression-bomb.
We're currently testing semgrep's diff-aware PR comment feature on a subset of our repos-- if you run into issues or find this spammy, please reach out to @danny.cooper in slack and give feedback.
For backwards compatability with gosec, its best to use polyglot suppression comments of the following format for false positives:
// #nosec <gosec rule ID> nosemgrep: <semgrep rule ID>
You can view more details about this finding in the Semgrep AppSec Platform.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 71e5358d97
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
|
Codex Review: Didn't find any major issues. More of your lovely PRs please. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
There was a problem hiding this comment.
Pull request overview
Ports loki.source.awsfirehose to the shared loki/source.Server abstraction, aligning it with the newer HTTP-source lifecycle/forwarding behavior and enabling route-level custom HTTP responses.
Changes:
- Added
LogsResponseWritersupport tosource.ServersoLogsRoutes can customize success/error responses. - Replaced the legacy Firehose handler with a
LogsRouteimplementation using the shared server abstraction and batch forwarding. - Updated/expanded tests and added Firehose request fixtures under
testdata/.
Reviewed changes
Copilot reviewed 11 out of 17 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| internal/component/loki/source/server.go | Adds LogsResponseWriter and integrates it into log route handling (including shutdown/cancel paths). |
| internal/component/loki/source/server_test.go | Adds coverage for custom response writer behavior. |
| internal/component/loki/source/aws_firehose/component.go | Switches component to source.Server + batch receiver consumption; restart/update logic updated accordingly. |
| internal/component/loki/source/aws_firehose/routes.go | Introduces Firehose LogsRoute + JSON response writer and parsing/labeling logic. |
| internal/component/loki/source/aws_firehose/routes_test.go | Migrates tests to exercise the new route implementation (including gzip cases) and metrics assertions. |
| internal/component/loki/source/aws_firehose/model.go | Adds request/response models and record decoding helpers (base64 + gzip detection/deflation). |
| internal/component/loki/source/aws_firehose/metrics.go | Replaces legacy metrics with a new metrics wrapper compatible with the new route/component structure. |
| internal/component/loki/source/aws_firehose/testdata/direct_put.json | Adds fixture for direct put requests. |
| internal/component/loki/source/aws_firehose/testdata/direct_put_with_non_json_message.json | Adds fixture for direct put requests with non-JSON decoded payload. |
| internal/component/loki/source/aws_firehose/testdata/cw_logs_with_only_data_messages.json | Adds fixture for CloudWatch Logs data-only payloads. |
| internal/component/loki/source/aws_firehose/testdata/cw_logs_with_only_control_messages.json | Adds fixture for CloudWatch Logs control-only payloads. |
| internal/component/loki/source/aws_firehose/testdata/cw_logs_mixed.json | Adds fixture for mixed CloudWatch Logs records. |
| internal/component/loki/source/aws_firehose/testdata/cw_logs_control_and_bad_records.json | Adds fixture for mixed control + invalid gzipped record scenarios. |
| internal/component/loki/source/aws_firehose/internal/model.go | Removes legacy internal request/record model types. |
| internal/component/loki/source/aws_firehose/internal/metrics.go | Removes legacy internal metrics implementation. |
| internal/component/loki/source/aws_firehose/internal/handler.go | Removes legacy HTTP handler implementation in favor of LogsRoute. |
| internal/component/loki/source/aws_firehose/internal/errors.go | Removes legacy error-with-reason helpers (reintroduced elsewhere as needed). |
| commonLabels := labels.NewBuilder(labels.EmptyLabels()) | ||
| commonLabels.Set("__aws_firehose_request_id", req.Header.Get("X-Amz-Firehose-Request-Id")) |
There was a problem hiding this comment.
The parsed Firehose requestId from the JSON body (firehoseReq.RequestID) is currently ignored, while labels and the response use the X-Amz-Firehose-Request-Id header instead. This changes behavior from the previous handler (which responded with the body requestId) and can produce an empty/mismatched requestId if the header is absent or differs. Consider using firehoseReq.RequestID as the canonical value (and/or copying it into the header after decoding so WriteResponse can reliably include it).
| commonLabels := labels.NewBuilder(labels.EmptyLabels()) | |
| commonLabels.Set("__aws_firehose_request_id", req.Header.Get("X-Amz-Firehose-Request-Id")) | |
| requestID := firehoseReq.RequestID | |
| if requestID == "" { | |
| requestID = req.Header.Get("X-Amz-Firehose-Request-Id") | |
| } | |
| if requestID != "" { | |
| req.Header.Set("X-Amz-Firehose-Request-Id", requestID) | |
| } | |
| commonLabels := labels.NewBuilder(labels.EmptyLabels()) | |
| commonLabels.Set("__aws_firehose_request_id", requestID) |
| var err error | ||
| bodyReader, err = gzip.NewReader(req.Body) | ||
| if err != nil { | ||
| r.metrics.IncRequestError("pre_read") | ||
| return nil, http.StatusBadRequest, err | ||
| } |
There was a problem hiding this comment.
When Content-Encoding is gzip, gzip.NewReader(req.Body) is created but never closed. gzip.Reader.Close() returns the underlying flate reader to a pool and helps avoid per-request allocations; add a defer to close it after creation (before closing req.Body).
| var err error | |
| bodyReader, err = gzip.NewReader(req.Body) | |
| if err != nil { | |
| r.metrics.IncRequestError("pre_read") | |
| return nil, http.StatusBadRequest, err | |
| } | |
| gzReader, err := gzip.NewReader(req.Body) | |
| if err != nil { | |
| r.metrics.IncRequestError("pre_read") | |
| return nil, http.StatusBadRequest, err | |
| } | |
| defer gzReader.Close() | |
| bodyReader = gzReader |
Pull Request Details
Port
loki.source.awsfirehoseto the sharedloki/source.Serverroute abstraction and align it with the newer HTTP source pattern used by the other Loki source components.This component how have the same life cycle handling where we forward a batch of entries and handle request cancellations and server shutdown like we do in other components.
Some notable changes:
Issue(s) fixed by this Pull Request
Part of: #5803
Notes to the Reviewer
LogsResponseWriterfor custom response write logic.PR Checklist