Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions docs/explanation/data-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ Let's say the lambda runs a few tasks, which either pass (status `OK`) or fail (

## Phase 2: What Gets Published to SNS

We publish one SNS message per status type, so in this case two messages: one for `OK` and one for `ERROR`. Each message includes the relevant portion of the `_perform_task` return value (JSON-encoded) as the `Message` payload. The `result_type` is included as a `MessageAttribute` to allow for filtering by the SQS subscriptions.
We publish one SNS message per status type, so in this case two messages: one for `OK` and one for `ERROR`. Each message includes the relevant portion of the `_perform_task` return value, with a top-level `result_type` added before publication. The same `result_type` is also included as a `MessageAttribute` to allow for filtering by the SQS subscriptions.

For `OK`, the publish call payload shape is:

```json
{
"TopicArn": "arn:aws:sns:us-east-1:123456789012:lambdacron-results.fifo",
"Message": "{\"tasks\": [{\"taskid\": 1, \"name\": \"Foo\"}, {\"taskid\": 2, \"name\": \"Bar\"}]}",
"Message": "{\"tasks\": [{\"taskid\": 1, \"name\": \"Foo\"}, {\"taskid\": 2, \"name\": \"Bar\"}], \"result_type\": \"OK\"}",
"Subject": "Notification for OK",
"MessageAttributes": {
"result_type": {
Expand All @@ -47,7 +47,7 @@ For `OK`, the publish call payload shape is:

For `ERROR`, the shape is identical except:

- `Message` is `"{\"tasks\": [{\"taskid\": 3, \"name\": \"Baz\"}]}"`
- `Message` is `"{\"tasks\": [{\"taskid\": 3, \"name\": \"Baz\"}], \"result_type\": \"ERROR\"}"`
- `Subject` is `"Notification for ERROR"`
- `MessageAttributes.result_type.StringValue` is `"ERROR"`

Expand All @@ -63,7 +63,7 @@ The SQS event record (as seen by notifier Lambda) looks like:
{
"messageId": "msg-ok-1",
"eventSource": "aws:sqs",
"body": "{\"tasks\": [{\"taskid\": 1, \"name\": \"Foo\"}, {\"taskid\": 2, \"name\": \"Bar\"}]}",
"body": "{\"tasks\": [{\"taskid\": 1, \"name\": \"Foo\"}, {\"taskid\": 2, \"name\": \"Bar\"}], \"result_type\": \"OK\"}",
"messageAttributes": {
"result_type": {
"stringValue": "OK",
Expand All @@ -77,8 +77,7 @@ The SQS event record (as seen by notifier Lambda) looks like:

## Phase 4: Notifier Parse Behavior with This Shape

The notifier parser converts the JSON string back into an object, and, if `result_type` is missing from the payload, injects it from the SQS `messageAttributes`. In this case, the payload doesn't include `result_type`, so it is injected.

The notifier parser converts the JSON string back into an object and validates that the payload `result_type` matches the SQS `messageAttributes.result_type` value when that attribute is present.
The result, which is fed to the notifier's templates, is:

```json
Expand Down
2 changes: 2 additions & 0 deletions docs/how-to/set-up-ses.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ Set up Amazon SES once per AWS account and region where your notifier Lambda run

* Choose the AWS account/region where email will be sent (SES setup is regional).
* Choose a sender identity:

* Email identity for one sender address.
* Domain identity if you want to send from multiple addresses in one domain.

* Make sure you can edit DNS records if you choose a domain identity.

SES requires verified identities for senders and (in sandbox) recipients. That means you must verify the email address or domain you want to send from, and if in sandbox, also verify any recipient addresses. Sandbox mode limits how much you can send, and you'll probably want to request production access if you want to send to more than one recipient.
Expand Down
5 changes: 3 additions & 2 deletions docs/how-to/write-lambda-and-templates.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Based on `examples/basic/lambda/lambda_module.py`:

* Key (`example`) is the `result_type`.
* Value (`{"message": "Hello World"}`) is the payload rendered by templates.
* The payload automatically has the key `result_type` injected by the notification handler, so you can access it in templates as `{{ result_type }}`.
* LambdaCron adds `result_type` to the published message body when it sends the payload to SNS, so you can access it in templates as `{{ result_type }}`.

## 2. Create templates that use the payload fields

Expand Down Expand Up @@ -90,7 +90,8 @@ At runtime:

Important detail:

* `result_type` is injected into the render payload by the notification handler when available in message attributes.
* `result_type` is injected into the message body by the publisher before it reaches the notifier.
* The SNS message attribute still carries the same `result_type`. It is used for filter policies and validation.
* That is why templates like `{{ result_type }}` work even when your `_perform_task` payload does not explicitly include a `result_type` field.

## 4. Checklist Before Deploying
Expand Down
43 changes: 42 additions & 1 deletion src/lambdacron/lambda_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,46 @@ def load_sns_message_group_id(env_var: str = "SNS_MESSAGE_GROUP_ID") -> str:
return os.environ.get(env_var, "lambdacron")


def build_result_message_payload(*, result_type: str, message: Any) -> dict[str, Any]:
"""
Build the JSON object published to SNS for a single result type.

Parameters
----------
result_type : str
Result type key from the task output mapping.
message : Any
JSON-serializable payload associated with the result type.

Returns
-------
dict[str, Any]
Payload object with a top-level ``result_type`` field.

Raises
------
ValueError
If the payload is not a JSON object or if it contains a conflicting
``result_type`` value.
"""
if not isinstance(message, Mapping):
raise ValueError(
f"Result payload for type '{result_type}' must be a JSON object"
)

payload = dict(message)
existing_result_type = payload.get("result_type")
if existing_result_type is None:
payload["result_type"] = result_type
else:
if existing_result_type != result_type:
raise ValueError(
f"Result payload for type '{result_type}' has conflicting "
f"result_type '{existing_result_type}'"
)
return payload


def dispatch_sns_messages(
*,
result: Mapping[str, Any],
Expand All @@ -180,9 +220,10 @@ def dispatch_sns_messages(
Logger used to emit structured publish logs.
"""
for result_type, message in result.items():
payload = build_result_message_payload(result_type=result_type, message=message)
sns_client.publish(
TopicArn=sns_topic_arn,
Message=json.dumps(message),
Message=json.dumps(payload),
Subject=f"Notification for {result_type}",
MessageAttributes={
"result_type": {
Expand Down
20 changes: 12 additions & 8 deletions src/lambdacron/notifications/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class RenderedTemplateNotificationHandler(ABC):
Providers keyed by template name for rendering.
expected_queue_arn : str, optional
Queue ARN to validate incoming SQS records.
include_result_type : bool, optional
Whether to include the SNS message attribute ``result_type`` in the payload.
logger : logging.Logger, optional
Logger used for structured logging.
jinja_env : jinja2.Environment, optional
Expand All @@ -107,13 +105,11 @@ def __init__(
template_providers: Mapping[str, TemplateProvider],
*,
expected_queue_arn: Optional[str] = None,
include_result_type: bool = True,
logger: Optional[logging.Logger] = None,
jinja_env: Optional[Environment] = None,
) -> None:
self.template_providers = dict(template_providers)
self.expected_queue_arn = expected_queue_arn
self.include_result_type = include_result_type
self.logger = logger or logging.getLogger(self.__class__.__name__)
self.jinja_env = jinja_env or Environment(undefined=StrictUndefined)

Expand Down Expand Up @@ -213,10 +209,18 @@ def _parse_result(self, record: Mapping[str, Any]) -> Mapping[str, Any]:
raise ValueError("SNS message must be valid JSON") from exc
if not isinstance(payload, dict):
raise ValueError("Result payload must be a JSON object")
if self.include_result_type:
result_type = self._extract_result_type(record)
if result_type and "result_type" not in payload:
payload["result_type"] = result_type
payload_result_type = payload.get("result_type")
if not isinstance(payload_result_type, str) or not payload_result_type:
raise ValueError(
"Result payload must include a non-empty string result_type"
)

attribute_result_type = self._extract_result_type(record)
if attribute_result_type and attribute_result_type != payload_result_type:
raise ValueError(
"Result type mismatch between payload and message attributes "
f"(payload {payload_result_type}, attribute {attribute_result_type})"
)
Comment on lines +213 to +223
Copy link

Copilot AI Mar 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Requiring every incoming payload to include a non-empty string result_type is a behavior change that will cause any in-flight/older SQS messages (published before this change) to fail parsing and be retried indefinitely as batchItemFailures. Consider a backward-compatible transition: if payload.result_type is missing/empty but messageAttributes.result_type is present, inject it (and optionally log a warning / plan a deprecation), while still rejecting mismatches when both are present.

Suggested change
if not isinstance(payload_result_type, str) or not payload_result_type:
raise ValueError(
"Result payload must include a non-empty string result_type"
)
attribute_result_type = self._extract_result_type(record)
if attribute_result_type and attribute_result_type != payload_result_type:
raise ValueError(
"Result type mismatch between payload and message attributes "
f"(payload {payload_result_type}, attribute {attribute_result_type})"
)
attribute_result_type = self._extract_result_type(record)
if not isinstance(payload_result_type, str) or not payload_result_type:
if attribute_result_type:
logging.getLogger(__name__).warning(
"Result payload missing result_type; using "
"messageAttributes.result_type for backward compatibility"
)
payload["result_type"] = attribute_result_type
payload_result_type = attribute_result_type
else:
raise ValueError(
"Result payload must include a non-empty string result_type"
)
elif attribute_result_type and attribute_result_type != payload_result_type:
raise ValueError(
"Result type mismatch between payload and message attributes "
f"(payload {payload_result_type}, attribute {attribute_result_type})"
)

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not worried about backward compatibility here.

return payload

def _render_template(self, template: str, result: Mapping[str, Any]) -> str:
Expand Down
2 changes: 0 additions & 2 deletions src/lambdacron/notifications/print_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,11 @@ def __init__(
*,
template_provider: TemplateProvider,
expected_queue_arn: str | None = None,
include_result_type: bool = True,
logger: Any | None = None,
) -> None:
super().__init__(
template_providers={"body": template_provider},
expected_queue_arn=expected_queue_arn,
include_result_type=include_result_type,
logger=logger,
)

Expand Down
14 changes: 9 additions & 5 deletions src/lambdacron/render.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

from jinja2 import TemplateError

from lambdacron.lambda_task import build_result_message_payload
from lambdacron.notifications.base import (
FileTemplateProvider,
RenderedTemplateNotificationHandler,
Expand Down Expand Up @@ -46,8 +47,7 @@ def build_parser() -> argparse.ArgumentParser:
class RenderNotificationHandler(RenderedTemplateNotificationHandler):
def __init__(self, *, template_path: Path, stream: TextIO | None = None) -> None:
super().__init__(
template_providers={"body": FileTemplateProvider(template_path)},
include_result_type=True,
template_providers={"body": FileTemplateProvider(template_path)}
)
self.stream = stream or sys.stdout

Expand Down Expand Up @@ -95,11 +95,15 @@ def extract_result_payload(payload_json: str, *, result_type: str) -> str:
if not isinstance(payload, dict):
raise ValueError("Task output must be a JSON object keyed by result type")
selected = payload.get(result_type)
if not isinstance(selected, dict):
if not isinstance(selected, Mapping):
raise ValueError(
f"Result payload for type '{result_type}' must be a JSON object"
f"Result payload for type '{result_type}' must be a JSON object, "
f"got {type(selected).__name__}"
)
return json.dumps(selected)
payload_for_publish = build_result_message_payload(
result_type=result_type, message=selected
)
return json.dumps(payload_for_publish)


def main(argv: list[str] | None = None) -> int:
Expand Down
69 changes: 44 additions & 25 deletions tests/notifications/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ def test_file_template_provider_raises_for_missing_file(tmp_path):
def test_notification_handler_parses_sqs_json_body(monkeypatch):
monkeypatch.setenv("TEMPLATE", "Status {{ status }}")
handler = CapturingHandler(template_providers={"body": EnvVarTemplateProvider()})
event = build_sqs_event(json.dumps({"status": "ok"}))
event = build_sqs_event(json.dumps({"status": "ok", "result_type": "success"}))

response = handler.lambda_handler(event, context=None)

assert handler.calls == [
{
"result": {"status": "ok"},
"result": {"status": "ok", "result_type": "success"},
"rendered": {"body": "Status ok"},
"record": event["Records"][0],
}
Expand All @@ -88,7 +88,9 @@ def test_notification_handler_parses_sqs_json_body(monkeypatch):
def test_notification_handler_parses_sns_envelope(monkeypatch):
monkeypatch.setenv("TEMPLATE", "Result {{ status }}")
handler = CapturingHandler(template_providers={"body": EnvVarTemplateProvider()})
sns_body = json.dumps({"Message": json.dumps({"status": "good"})})
sns_body = json.dumps(
{"Message": json.dumps({"status": "good", "result_type": "success"})}
)
event = build_sqs_event(sns_body)

response = handler.lambda_handler(event, context=None)
Expand Down Expand Up @@ -141,8 +143,14 @@ def test_notification_handler_logs_invocation(monkeypatch, caplog):
)
event = {
"Records": [
{"body": json.dumps({"name": "Ada"}), "eventSource": "aws:sqs"},
{"body": json.dumps({"name": "Grace"}), "eventSource": "aws:sqs"},
{
"body": json.dumps({"name": "Ada", "result_type": "success"}),
"eventSource": "aws:sqs",
},
{
"body": json.dumps({"name": "Grace", "result_type": "success"}),
"eventSource": "aws:sqs",
},
]
}

Expand Down Expand Up @@ -204,30 +212,41 @@ def test_parse_result_rejects_non_object_payload(monkeypatch):
assert response == {"batchItemFailures": [{"itemIdentifier": "msg-123"}]}


@pytest.mark.parametrize("include_result_type", [True, False])
@pytest.mark.parametrize("payload_has_result_type", [True, False])
def test_notification_handler_result_type_injection(
monkeypatch, include_result_type, payload_has_result_type
):
def test_notification_handler_payload_result_type_passes_through(monkeypatch):
monkeypatch.setenv("TEMPLATE", "Result {{ result_type }}")
handler = CapturingHandler(template_providers={"body": EnvVarTemplateProvider()})
payload = {"status": "ok", "result_type": "payload"}
event = build_sqs_event(
json.dumps(payload),
message_attributes={"result_type": {"stringValue": "payload"}},
)

handler.lambda_handler(event, context=None)

assert handler.calls[0]["result"] == payload


def test_notification_handler_requires_payload_result_type(monkeypatch):
monkeypatch.setenv("TEMPLATE", "Result {{ result_type | default('none') }}")
handler = CapturingHandler(
template_providers={"body": EnvVarTemplateProvider()},
include_result_type=include_result_type,
handler = CapturingHandler(template_providers={"body": EnvVarTemplateProvider()})
event = build_sqs_event(
json.dumps({"status": "ok"}),
message_attributes={"result_type": {"stringValue": "attribute"}},
)
payload = {"status": "ok"}
if payload_has_result_type:
payload["result_type"] = "payload"

response = handler.lambda_handler(event, context=None)

assert response == {"batchItemFailures": [{"itemIdentifier": "msg-123"}]}


def test_notification_handler_rejects_result_type_mismatch(monkeypatch):
monkeypatch.setenv("TEMPLATE", "Result {{ result_type }}")
handler = CapturingHandler(template_providers={"body": EnvVarTemplateProvider()})
event = build_sqs_event(
json.dumps(payload),
json.dumps({"status": "ok", "result_type": "payload"}),
message_attributes={"result_type": {"stringValue": "attribute"}},
)

handler.lambda_handler(event, context=None)
response = handler.lambda_handler(event, context=None)

result = handler.calls[0]["result"]
if payload_has_result_type:
assert result["result_type"] == "payload"
elif include_result_type:
assert result["result_type"] == "attribute"
else:
assert "result_type" not in result
assert response == {"batchItemFailures": [{"itemIdentifier": "msg-123"}]}
8 changes: 5 additions & 3 deletions tests/notifications/test_email_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_email_handler_sends_rendered_templates(monkeypatch):
recipients=["alice@example.com", "bob@example.com"],
ses_client=ses_client,
)
event = build_sqs_event({"name": "Ada"})
event = build_sqs_event({"name": "Ada", "result_type": "success"})

handler.lambda_handler(event, context=None)

Expand Down Expand Up @@ -71,7 +71,7 @@ def test_email_handler_includes_optional_fields(monkeypatch):
config_set="alerts",
reply_to=["reply@example.com"],
)
event = build_sqs_event({"name": "Grace"})
event = build_sqs_event({"name": "Grace", "result_type": "success"})

handler.lambda_handler(event, context=None)

Expand Down Expand Up @@ -102,7 +102,9 @@ def send_email(self, **kwargs):
recipients=["ops@example.com"],
ses_client=ErrorSesClient(),
)
event = build_sqs_event({"name": "Ada"}, message_id="msg-err")
event = build_sqs_event(
{"name": "Ada", "result_type": "success"}, message_id="msg-err"
)

response = handler.lambda_handler(event, context=None)

Expand Down
7 changes: 6 additions & 1 deletion tests/notifications/test_print_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ def test_print_handler_prints_rendered_template(monkeypatch, capsys):
monkeypatch.setenv("TEMPLATE", "Hello {{ name }}")
handler = PrintNotificationHandler(template_provider=EnvVarTemplateProvider())
event = {
"Records": [{"body": json.dumps({"name": "Ada"}), "eventSource": "aws:sqs"}]
"Records": [
{
"body": json.dumps({"name": "Ada", "result_type": "success"}),
"eventSource": "aws:sqs",
}
]
}

handler.lambda_handler(event, context=None)
Expand Down
Loading