diff --git a/python-test-samples/durable-functions-integration/README.md b/python-test-samples/durable-functions-integration/README.md new file mode 100644 index 00000000..2679cfea --- /dev/null +++ b/python-test-samples/durable-functions-integration/README.md @@ -0,0 +1,187 @@ +# Durable Functions Integration Testing + +This sample demonstrates how to write integration tests for AWS Durable Functions using the AWS Durable Execution SDK for Python. Durable Functions allow you to write stateful, long-running workflows as code that can survive failures and restarts. + +## Overview + +This pattern shows how to test durable functions that are deployed to AWS. The tests use the `DurableFunctionCloudTestRunner` to invoke deployed durable functions and verify their behavior, including: + +- Successful workflow execution +- Error handling and failure scenarios +- Performance and timing validation +- State persistence across retries + +## What are Durable Functions? + +Durable Functions enable you to write stateful workflows in code. They provide: + +- **Automatic state persistence**: Function state is automatically saved and restored +- **Fault tolerance**: Workflows can survive failures and continue from where they left off +- **Long-running operations**: Support for workflows that take minutes, hours, or days +- **Built-in retry logic**: Automatic retries with exponential backoff +- **Activity functions**: Break complex workflows into smaller, testable units + +## System Under Test + +The example implements an order processing workflow with the following steps: + +1. **Validate Order**: Checks if the order is valid +2. **Process Payment**: Simulates payment processing +3. **Wait**: 10-second delay to simulate external confirmation +4. **Confirm Order**: Sends order confirmation +5. **Persist to DynamoDB**: Saves the completed order to DynamoDB with TTL + +The workflow uses durable execution to ensure that even if the Lambda function times out or fails, the workflow can resume from the last completed step. The final step persists the order data to DynamoDB for audit and retrieval purposes. + +```mermaid +flowchart TD + Start([Lambda Invoked]) --> Input[Receive Order ID] + Input --> Step1[Step 1: Validate Order] + Step1 --> |orderId, status: validated| Step2[Step 2: Process Payment] + Step2 --> |orderId, status: paid, amount: 99.99| Wait[Wait 10 seconds] + Wait --> Step3[Step 3: Confirm Order] + Step3 --> |orderId, status: confirmed| Step4[Step 4: Persist to DynamoDB] + Step4 --> |Save with TTL| DB[(DynamoDB Table)] + Step4 --> Return[Return Complete Result] + Return --> End([Workflow Complete]) + + style Start fill:#e1f5e1 + style End fill:#e1f5e1 + style Step1 fill:#fff4e6 + style Step2 fill:#fff4e6 + style Step3 fill:#fff4e6 + style Step4 fill:#fff4e6 + style Wait fill:#e3f2fd + style DB fill:#f3e5f5 +``` + +## Prerequisites + +- SAM CLI - [Install the SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) +- Python 3.13 or later - [Install Python 3](https://www.python.org/downloads/) +- AWS Durable Execution SDK for Python - Installed via requirements.txt + +## Project Structure + +``` +durable-functions-integration/ +├── src/ +│ └── order_processor/ # Main durable function +│ ├── app.py # Workflow orchestration +│ └── requirements.txt # Function dependencies +├── tests/ +│ ├── integration/ +│ │ └── test_order_processor.py # Integration tests +│ └── requirements.txt # Test dependencies +├── template.yaml # SAM template +├── pytest.ini # Pytest configuration +└── README.md # This file +``` + +## Build and Deploy + +### Install dependencies + +1. Navigate to the project directory: +```bash +cd python-test-samples/durable-functions-integration +``` + +2. Create a virtual environment (optional but recommended): +```bash +python3 -m venv .venv +source .venv/bin/activate # On Windows: .venv/Scripts/activate +pip install -r tests/requirements.txt +``` + +### Build the project +```bash +sam build +``` + +### Deploy to AWS + +First deployment (guided): +```bash +sam deploy +``` + +## Run Integration Tests + +The integration tests run against the deployed durable function in AWS. + +Run the integration tests using pytest: + +```bash +python3 -m pytest tests/integration/test_order_processor.py -v -s +``` + +Or run all tests in the integration directory: + +```bash +pytest tests/integration/ -v -s +``` + +**Flags:** +- `-v` : Verbose output showing each test +- `-s` : Show print statements and logging output + +### Test Coverage + +The integration tests cover: + +1. **Successful Workflow Execution** (`test_cloud_order_processor_success`) + - Verifies the complete workflow executes successfully + - Validates all steps complete in the correct order + - Checks the final output structure + - Confirms persistence result is included in response + +2. **Error Handling** (`test_cloud_order_processor_with_invalid_input`) + - Tests workflow behavior with invalid input + - Verifies proper error propagation + - Ensures graceful failure handling + +3. **Performance Testing** (`test_cloud_performance`) + - Measures real-world execution time + - Validates timing expectations (10-second wait + overhead) + - Ensures workflows complete within acceptable timeframes + +4. **DynamoDB Persistence** (`test_dynamodb_persistence`) + - Verifies orders are saved to DynamoDB + - Validates all order fields are persisted correctly + - Confirms TTL and metadata are set properly + - Checks data type conversions (Decimal for amounts) + + +## Architecture + +The sample includes: + +- **Lambda Function**: Durable function with 5 workflow steps +- **DynamoDB Table**: Stores completed orders with 30-day TTL +- **IAM Policies**: Durable execution policy + DynamoDB CRUD permissions + +## Cleanup + +Remove local build artifacts: +```bash +rm -rf .aws-sam +rm -rf .venv +rm -rf .pytest_cache +rm -rf samconfig.toml +``` + +Delete the CloudFormation stack (this will also delete the DynamoDB table): +```bash +sam delete --stack-name {stack_name} +``` + +## Additional Resources + +- [AWS Durable Execution Documentation](https://docs.aws.amazon.com/lambda/latest/dg/durable-execution.html) +- [AWS Durable Execution SDK for Python](https://github.com/awslabs/aws-durable-execution-sdk-python) +- [Serverless Testing Best Practices](../../Serverless-Testing-Principles.md) + +## License + +This sample is licensed under the MIT-0 License. See the LICENSE file. diff --git a/python-test-samples/durable-functions-integration/pytest.ini b/python-test-samples/durable-functions-integration/pytest.ini new file mode 100644 index 00000000..64c4cd58 --- /dev/null +++ b/python-test-samples/durable-functions-integration/pytest.ini @@ -0,0 +1,12 @@ +[pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* +markers = + integration: Integration tests that require AWS resources + slow: Tests that take longer to execute +addopts = + -v + --tb=short + --strict-markers diff --git a/python-test-samples/durable-functions-integration/src/order_processor/app.py b/python-test-samples/durable-functions-integration/src/order_processor/app.py new file mode 100644 index 00000000..16c54f6f --- /dev/null +++ b/python-test-samples/durable-functions-integration/src/order_processor/app.py @@ -0,0 +1,130 @@ +# Copyright (c) 2025 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT-0 +import os +import json +from datetime import datetime +from decimal import Decimal +import boto3 +from aws_durable_execution_sdk_python import ( + DurableContext, + durable_execution, + durable_step, +) +from aws_durable_execution_sdk_python.config import Duration + +# Initialize DynamoDB client +dynamodb = boto3.resource('dynamodb') +table_name = os.environ.get('ORDERS_TABLE_NAME', 'Orders') +table = dynamodb.Table(table_name) + + +@durable_step +def validate_order(step_context, order_id): + """Validate the order data""" + step_context.logger.info(f"Validating order {order_id}") + return {"orderId": order_id, "status": "validated"} + + +@durable_step +def process_payment(step_context, order_id): + """Process payment for the order""" + step_context.logger.info(f"Processing payment for order {order_id}") + return {"orderId": order_id, "status": "paid", "amount": 99.99} + + +@durable_step +def confirm_order(step_context, order_id): + """Confirm the order""" + step_context.logger.info(f"Confirming order {order_id}") + return {"orderId": order_id, "status": "confirmed"} + + +@durable_step +def persist_order_to_dynamodb(step_context, order_data): + """ + Persist the completed order to DynamoDB. + + This durable step ensures the order is saved exactly once, + even if the function is interrupted and replayed. + """ + order_id = order_data['orderId'] + step_context.logger.info(f"Persisting order {order_id} to DynamoDB") + + try: + # Convert float to Decimal for DynamoDB + item = { + 'orderId': order_id, + 'status': order_data['status'], + 'steps': json.dumps(order_data['steps']), # Store steps as JSON string + 'completedAt': order_data.get('completedAt', datetime.utcnow().isoformat()), + 'createdAt': datetime.utcnow().isoformat(), + 'ttl': int(datetime.utcnow().timestamp()) + (30 * 24 * 60 * 60) # 30 days TTL + } + + # Add amount if present (convert to Decimal) + for step in order_data.get('steps', []): + if 'amount' in step: + item['amount'] = Decimal(str(step['amount'])) + break + + # Put item in DynamoDB + response = table.put_item(Item=item) + + step_context.logger.info(f"Successfully persisted order {order_id} to DynamoDB") + + return { + "orderId": order_id, + "status": "persisted", + "tableName": table_name, + "timestamp": item['createdAt'] + } + + except Exception as e: + step_context.logger.error(f"Failed to persist order {order_id}: {str(e)}") + raise + +@durable_execution +def lambda_handler(event, context: DurableContext): + """ + Main Lambda handler for the order processing durable workflow. + + Workflow steps: + 1. Validate order + 2. Process payment + 3. Wait 10 seconds (simulates external confirmation) + 4. Confirm order + 5. Persist order to DynamoDB + """ + order_id = event['orderId'] + + # Step 1: Validate order + validation_result = context.step(validate_order(order_id)) + + # Step 2: Process payment + payment_result = context.step(process_payment(order_id)) + + # Wait for 10 seconds to simulate external confirmation + context.wait(Duration.from_seconds(10)) + + # Step 3: Confirm order + confirmation_result = context.step(confirm_order(order_id)) + + # Build order data + order_data = { + "orderId": order_id, + "status": "completed", + "steps": [validation_result, payment_result, confirmation_result], + "completedAt": datetime.utcnow().isoformat() + } + + # Step 4: Persist order to DynamoDB + persistence_result = context.step(persist_order_to_dynamodb(order_data)) + + # Return final result including persistence confirmation + return { + "orderId": order_id, + "status": "completed", + "steps": [validation_result, payment_result, confirmation_result], + "persistence": persistence_result, + "completedAt": order_data["completedAt"] + } diff --git a/python-test-samples/durable-functions-integration/src/order_processor/requirements.txt b/python-test-samples/durable-functions-integration/src/order_processor/requirements.txt new file mode 100644 index 00000000..c4ded195 --- /dev/null +++ b/python-test-samples/durable-functions-integration/src/order_processor/requirements.txt @@ -0,0 +1,3 @@ +aws-durable-execution-sdk-python>=0.1.0 +aws-lambda-powertools>=2.31.0 +boto3>=1.34.0 diff --git a/python-test-samples/durable-functions-integration/template.yaml b/python-test-samples/durable-functions-integration/template.yaml new file mode 100644 index 00000000..f0f66bad --- /dev/null +++ b/python-test-samples/durable-functions-integration/template.yaml @@ -0,0 +1,67 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: Order Processor Durable Function for Cloud Testing + +Globals: + Function: + Runtime: python3.13 + Timeout: 60 + MemorySize: 256 + +Resources: + # DynamoDB table for storing completed orders + OrdersTable: + Type: AWS::DynamoDB::Table + Properties: + TableName: 'durable-functions-integration-Orders' + BillingMode: PAY_PER_REQUEST + AttributeDefinitions: + - AttributeName: orderId + AttributeType: S + KeySchema: + - AttributeName: orderId + KeyType: HASH + TimeToLiveSpecification: + AttributeName: ttl + Enabled: true + Tags: + - Key: Purpose + Value: DurableFunctionTesting + + OrderProcessorFunction: + Type: AWS::Serverless::Function + Properties: + FunctionName: order-processor-cloud-test + Handler: app.lambda_handler + CodeUri: src/order_processor/ + AutoPublishAlias: dev + DurableConfig: + ExecutionTimeout: 300 + RetentionPeriodInDays: 5 + Environment: + Variables: + ORDERS_TABLE_NAME: !Ref OrdersTable + Policies: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicDurableExecutionRolePolicy + - DynamoDBCrudPolicy: + TableName: !Ref OrdersTable + +Outputs: + OrderProcessorFunctionArn: + Description: ARN of the Order Processor function + Value: !GetAtt OrderProcessorFunction.Arn + + OrderProcessorFunctionName: + Description: Name of the Order Processor function + Value: !Ref OrderProcessorFunction + + OrdersTableName: + Description: Name of the DynamoDB Orders table + Value: !Ref OrdersTable + Export: + Name: !Sub ${AWS::StackName}-OrdersTableName + + OrdersTableArn: + Description: ARN of the DynamoDB Orders table + Value: !GetAtt OrdersTable.Arn + diff --git a/python-test-samples/durable-functions-integration/tests/integration/test_order_processor.py b/python-test-samples/durable-functions-integration/tests/integration/test_order_processor.py new file mode 100644 index 00000000..74e364f2 --- /dev/null +++ b/python-test-samples/durable-functions-integration/tests/integration/test_order_processor.py @@ -0,0 +1,167 @@ +# Copyright (c) 2025 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. +# SPDX-License-Identifier: MIT-0 +import boto3 +import json +import os +import time +from decimal import Decimal +from aws_durable_execution_sdk_python_testing import DurableFunctionCloudTestRunner +from aws_durable_execution_sdk_python.execution import InvocationStatus + +region = "us-east-1" + +# Initialize DynamoDB client +dynamodb = boto3.resource('dynamodb', region_name=region) + + +def test_cloud_order_processor_success(): + """Test the deployed order processor function in AWS""" + + # Create a cloud test runner pointing to the deployed function + runner = DurableFunctionCloudTestRunner( + function_name="order-processor-cloud-test:dev", + region=region + ) + + # Invoke the function in AWS + result = runner.run( + input={"orderId": "cloud-integration-test-001"}, + timeout=30 + ) + + # Verify execution succeeded + assert result.status is InvocationStatus.SUCCEEDED + + # Parse and verify the result + execution_result = result.result + if isinstance(execution_result, str): + execution_result = json.loads(execution_result) + + # Verify the workflow completed correctly + assert execution_result["orderId"] == "cloud-integration-test-001" + assert execution_result["status"] == "completed" + assert len(execution_result["steps"]) == 3 + + # Verify each step + assert execution_result["steps"][0]["status"] == "validated" + assert execution_result["steps"][1]["status"] == "paid" + assert execution_result["steps"][1]["amount"] == 99.99 + assert execution_result["steps"][2]["status"] == "confirmed" + + # Verify persistence result + assert "persistence" in execution_result + assert execution_result["persistence"]["status"] == "persisted" + assert execution_result["persistence"]["orderId"] == "cloud-integration-test-001" + + print("✅ Cloud integration test passed!") + + + +def test_cloud_order_processor_with_invalid_input(): + """Test error handling in the cloud""" + + runner = DurableFunctionCloudTestRunner( + function_name="order-processor-cloud-test:dev", + region=region + ) + + # Test with missing orderId + result = runner.run( + input={}, + timeout=30 + ) + + # Verify the execution failed as expected + assert result.status is InvocationStatus.FAILED + print("✅ Error handling test passed!") + + +def test_cloud_performance(): + """Measure real-world execution time""" + runner = DurableFunctionCloudTestRunner( + function_name="order-processor-cloud-test:dev", + region=region + ) + + start_time = time.time() + + result = runner.run( + input={"orderId": "performance-test"}, + timeout=30 + ) + + execution_time = time.time() - start_time + + # Verify execution succeeded + assert result.status is InvocationStatus.SUCCEEDED + + # Verify execution time is reasonable (should be ~10 seconds + overhead) + assert execution_time >= 10, "Execution completed too quickly" + assert execution_time <= 15, "Execution took too long" + + print(f"✅ Performance test passed! Execution time: {execution_time:.2f}s") + + +def test_dynamodb_persistence(): + """Test that orders are persisted to DynamoDB""" + + # Get the table name + table_name = 'durable-functions-integration-Orders' + table = dynamodb.Table(table_name) + + # Create unique order ID for this test + order_id = f"dynamodb-test-{int(time.time())}" + + # Run the workflow + runner = DurableFunctionCloudTestRunner( + function_name="order-processor-cloud-test:dev", + region=region + ) + + result = runner.run( + input={"orderId": order_id}, + timeout=30 + ) + + # Verify execution succeeded + assert result.status is InvocationStatus.SUCCEEDED + + # Wait a moment for DynamoDB to be consistent + time.sleep(2) + + # Query DynamoDB to verify the order was persisted + try: + response = table.get_item(Key={'orderId': order_id}) + + # Verify item exists + assert 'Item' in response, f"Order {order_id} not found in DynamoDB" + + item = response['Item'] + + # Verify order data + assert item['orderId'] == order_id + assert item['status'] == 'completed' + assert 'steps' in item + assert 'completedAt' in item + assert 'createdAt' in item + assert 'ttl' in item + + # Verify amount was stored correctly + assert 'amount' in item + assert item['amount'] == Decimal('99.99') + + # Parse and verify steps + steps = json.loads(item['steps']) + assert len(steps) == 3 + assert steps[0]['status'] == 'validated' + assert steps[1]['status'] == 'paid' + assert steps[2]['status'] == 'confirmed' + + print(f"✅ DynamoDB persistence test passed! Order {order_id} found in table {table_name}") + + except Exception as e: + print(f"❌ Failed to verify DynamoDB persistence: {e}") + raise + + + diff --git a/python-test-samples/durable-functions-integration/tests/requirements.txt b/python-test-samples/durable-functions-integration/tests/requirements.txt new file mode 100644 index 00000000..6d8e9262 --- /dev/null +++ b/python-test-samples/durable-functions-integration/tests/requirements.txt @@ -0,0 +1,4 @@ +aws-durable-execution-sdk-python +aws-durable-execution-sdk-python-testing +pytest +