Inject result_type on publish to SNS, not on read#33
Conversation
Previously, notifiers injected `result_type` into the payloads for the template, so that it would always be available to templates. However, I found an issue that, if two result types had identical payloads, the SNS FIFO deduplication would only publish one of the result types. Specifically, in eshgham-cron, if I was subscribed to ACTION_NEEDED (INACTIVATED + FAILED workflows), and I only had INACTIVATED workflows, I wasn't triggering my notification. This fixes that by putting `result_type` in when we publish to SNS, instead of when we receive the message from SQS. Assisted-by: Codex:gpt-5.4
There was a problem hiding this comment.
Pull request overview
This PR fixes SNS FIFO deduplication collisions by ensuring each published SNS message body includes a top-level result_type, rather than injecting result_type only at notification-read time.
Changes:
- Add
build_result_message_payload()and use it indispatch_sns_messages()so the SNS message body includesresult_type(in addition to message attributes). - Update notifier parsing to require
result_typein the payload and validate it against the SQSmessageAttributes.result_type. - Update render CLI behavior, tests, and documentation to match the new data flow.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_render.py | Updates render CLI expectations for mismatched vs matching result_type in payloads. |
| tests/test_lambda_task.py | Adjusts SNS publish assertions and adds unit tests for distinct bodies and non-mutation behavior. |
| tests/notifications/test_print_handler.py | Updates fixture payloads to include result_type. |
| tests/notifications/test_email_handler.py | Updates email handler tests to include result_type in SQS event payloads. |
| tests/notifications/test_base.py | Reworks notifier parsing tests around required payload result_type and mismatch handling. |
| src/lambdacron/render.py | Uses publisher-style payload building to emulate SNS message bodies for template rendering. |
| src/lambdacron/notifications/print_handler.py | Removes include_result_type plumbing consistent with new parsing rules. |
| src/lambdacron/notifications/base.py | Enforces payload result_type presence and validates against message attributes. |
| src/lambdacron/lambda_task.py | Introduces build_result_message_payload() and injects result_type into published SNS message bodies. |
| docs/how-to/write-lambda-and-templates.md | Updates docs to reflect injection at publish-time instead of notifier-time. |
| docs/how-to/set-up-ses.md | Minor formatting adjustments. |
| docs/explanation/data-flow.md | Updates data flow examples and parsing description to match new behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if not isinstance(payload_result_type, str) or not payload_result_type: | ||
| raise ValueError( | ||
| "Result payload must include a non-empty string result_type" | ||
| ) | ||
|
|
||
| attribute_result_type = self._extract_result_type(record) | ||
| if attribute_result_type and attribute_result_type != payload_result_type: | ||
| raise ValueError( | ||
| "Result type mismatch between payload and message attributes " | ||
| f"(payload {payload_result_type}, attribute {attribute_result_type})" | ||
| ) |
There was a problem hiding this comment.
Requiring every incoming payload to include a non-empty string result_type is a behavior change that will cause any in-flight/older SQS messages (published before this change) to fail parsing and be retried indefinitely as batchItemFailures. Consider a backward-compatible transition: if payload.result_type is missing/empty but messageAttributes.result_type is present, inject it (and optionally log a warning / plan a deprecation), while still rejecting mismatches when both are present.
| if not isinstance(payload_result_type, str) or not payload_result_type: | |
| raise ValueError( | |
| "Result payload must include a non-empty string result_type" | |
| ) | |
| attribute_result_type = self._extract_result_type(record) | |
| if attribute_result_type and attribute_result_type != payload_result_type: | |
| raise ValueError( | |
| "Result type mismatch between payload and message attributes " | |
| f"(payload {payload_result_type}, attribute {attribute_result_type})" | |
| ) | |
| attribute_result_type = self._extract_result_type(record) | |
| if not isinstance(payload_result_type, str) or not payload_result_type: | |
| if attribute_result_type: | |
| logging.getLogger(__name__).warning( | |
| "Result payload missing result_type; using " | |
| "messageAttributes.result_type for backward compatibility" | |
| ) | |
| payload["result_type"] = attribute_result_type | |
| payload_result_type = attribute_result_type | |
| else: | |
| raise ValueError( | |
| "Result payload must include a non-empty string result_type" | |
| ) | |
| elif attribute_result_type and attribute_result_type != payload_result_type: | |
| raise ValueError( | |
| "Result type mismatch between payload and message attributes " | |
| f"(payload {payload_result_type}, attribute {attribute_result_type})" | |
| ) |
There was a problem hiding this comment.
We're not worried about backward compatibility here.
| if not isinstance(payload, dict): | ||
| raise ValueError("Task output must be a JSON object keyed by result type") | ||
| selected = payload.get(result_type) | ||
| if not isinstance(selected, dict): | ||
| raise ValueError( | ||
| f"Result payload for type '{result_type}' must be a JSON object" | ||
| ) | ||
| return json.dumps(selected) | ||
| payload_for_publish = build_result_message_payload( | ||
| result_type=result_type, message=selected | ||
| ) | ||
| return json.dumps(payload_for_publish) |
There was a problem hiding this comment.
selected = payload.get(result_type) means a missing result_type key produces the generic "must be a JSON object" error (because None fails the Mapping check). Consider explicitly detecting a missing key and raising a clearer error (e.g., that the task output does not contain the requested result type) to make the CLI failure mode easier to understand.
Split a long f-string over 2 lines Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
|
This makes sense to me, merging. |
Previously, notifiers injected
result_typeinto the payloads for the template, so that it would always be available to templates.However, I found an issue that, if two result types had identical payloads, the SNS FIFO deduplication would only publish one of the result types. Specifically, in eshgham-cron, if I was subscribed to ACTION_NEEDED (INACTIVATED + FAILED workflows), and I only had INACTIVATED workflows, I wasn't triggering my notification.
This fixes that by putting
result_typein when we publish to SNS, instead of when we receive the message from SQS.Yay for Codex catching the docs fixes. I would have forgotten to do that.
Assisted-by: Codex:gpt-5.4