An elegant, Ecto-inspired DSL for working with DynamoDB in Elixir
Dynamo provides a structured, type-safe way to interact with Amazon DynamoDB while maintaining the flexibility that makes DynamoDB powerful. Define schemas, encode/decode data, and perform operations with a clean, familiar syntax.
- Installation
- Why Dynamo?
- Quick Start
- Key Concepts
- Usage Guide
- Configuration
- Advanced Usage
- Contributing
- License
Add dynamo to your list of dependencies in mix.exs:
def deps do
[
{:dynamo, github: "bmalum/dynamo"}
]
endNote: This package is not yet available on Hex. It will be published once it reaches a stable version.
DynamoDB is a powerful, flexible NoSQL database, but its schema-free nature can lead to inconsistencies in your data model. Dynamo bridges this gap by providing:
- Type Safety: Define schemas that enforce data consistency
- Familiar Syntax: Ecto-inspired DSL that feels natural to Elixir developers
- Simplified Operations: Clean abstractions for common DynamoDB operations
- Flexible Configuration: Multiple levels of configuration to suit your needs
- Performance Optimizations: Built-in support for batch operations and parallel scans
Define a schema:
defmodule MyApp.User do
use Dynamo.Schema
item do
table_name "users"
field :id, partition_key: true
field :email, sort_key: true
field :name
field :role, default: "user"
field :active, default: true
end
endPerform operations:
# Create a user
user = %MyApp.User{id: "user-123", email: "john@example.com", name: "John Doe"}
{:ok, saved_user} = MyApp.User.put_item(user)
# Retrieve a user
{:ok, retrieved_user} = MyApp.User.get_item(%MyApp.User{id: "user-123", email: "john@example.com"})
# List users
{:ok, users} = MyApp.User.list_items(%MyApp.User{id: "user-123"})Dynamo uses a schema-based approach to define the structure of your DynamoDB items. This provides:
- Consistent Structure: Ensure all items follow the same structure
- Default Values: Specify default values for fields
- Key Generation: Automatically generate partition and sort keys
- Type Conversion: Automatic conversion between Elixir types and DynamoDB types
Dynamo automatically handles the generation of composite keys based on your schema definition:
- Partition Keys: Define which fields make up the partition key
- Sort Keys: Define which fields make up the sort key
- Composite Keys: Combine multiple fields into a single key with configurable separators
Dynamo provides three levels of configuration:
- Application Configuration: Global defaults in your
config.exs - Process Configuration: Override settings for specific processes
- Schema Configuration: Schema-specific settings
A schema defines the structure of your DynamoDB items:
defmodule MyApp.Product do
use Dynamo.Schema, key_separator: "_"
item do
table_name "products"
field :category_id, partition_key: true
field :product_id, sort_key: true
field :name
field :price
field :stock, default: 0
field :active, default: true
end
endpartition_key: true- Marks the field as part of the partition keysort_key: true- Marks the field as part of the sort keydefault: value- Sets a default value for the field
You can also define keys separately from fields:
defmodule MyApp.Order do
use Dynamo.Schema
item do
table_name "orders"
field :customer_id
field :order_id
field :status, default: "pending"
field :total
partition_key [:customer_id]
sort_key [:order_id]
end
end# Create a struct
product = %MyApp.Product{
category_id: "electronics",
product_id: "prod-123",
name: "Smartphone",
price: 599.99
}
# Save to DynamoDB
{:ok, saved_product} = MyApp.Product.put_item(product)# Get by primary key
{:ok, product} = MyApp.Product.get_item(%MyApp.Product{
category_id: "electronics",
product_id: "prod-123"
})Dynamo handles the conversion between Elixir types and DynamoDB types:
# Encode a struct to DynamoDB format
dynamo_item = Dynamo.Encoder.encode_root(product)
# Decode a DynamoDB item to a map
decoded_map = Dynamo.Decoder.decode(dynamo_item)
# Decode a DynamoDB item to a struct
decoded_product = Dynamo.Decoder.decode(dynamo_item, as: MyApp.Product)# List all products in a category
{:ok, products} = MyApp.Product.list_items(%MyApp.Product{category_id: "electronics"})# Query with sort key conditions
{:ok, products} = MyApp.Product.list_items(
%MyApp.Product{category_id: "electronics"},
[
sort_key: "prod-",
sk_operator: :begins_with,
scan_index_forward: false # Descending order
]
)
# Query with filter expressions
{:ok, products} = MyApp.Product.list_items(
%MyApp.Product{category_id: "electronics"},
[
filter_expression: "price > :min_price",
expression_attribute_values: %{
":min_price" => %{"N" => "500"}
}
]
)# First page
{:ok, page_1} = MyApp.Product.list_items(
%MyApp.Product{category_id: "electronics"},
[limit: 10]
)
# Next page
{:ok, page_2} = MyApp.Product.list_items(
%MyApp.Product{category_id: "electronics"},
[
limit: 10,
exclusive_start_key: page_1.last_evaluated_key
]
)products = [
%MyApp.Product{category_id: "electronics", product_id: "prod-123", name: "Smartphone", price: 599.99},
%MyApp.Product{category_id: "electronics", product_id: "prod-124", name: "Laptop", price: 1299.99},
%MyApp.Product{category_id: "electronics", product_id: "prod-125", name: "Tablet", price: 399.99}
]
{:ok, result} = Dynamo.Table.batch_write_item(products)For large tables, parallel scan can significantly improve performance:
{:ok, all_products} = Dynamo.Table.parallel_scan(
MyApp.Product,
segments: 8,
filter_expression: "category_id = :category",
expression_attribute_values: %{
":category" => %{"S" => "electronics"}
}
)Dynamo provides comprehensive support for Global Secondary Indexes, allowing you to define them in your schema and query them with automatic key resolution.
defmodule MyApp.User do
use Dynamo.Schema
item do
table_name "users"
field :id, partition_key: true
field :tenant
field :email
field :name
field :status, default: "active"
field :created_at, sort_key: true
# GSI with partition key only
global_secondary_index "EmailIndex", partition_key: :email
# GSI with partition and sort keys
global_secondary_index "TenantIndex",
partition_key: :tenant,
sort_key: :created_at
# GSI with custom projection
global_secondary_index "TenantStatusIndex",
partition_key: :tenant,
sort_key: :status,
projection: :include,
projected_attributes: [:id, :email, :name]
end
end:partition_key- Field name for GSI partition key (required):sort_key- Field name for GSI sort key (optional):projection- Projection type (:all,:keys_only,:include) (default::all):projected_attributes- List of attributes to project when projection is:include
GSI queries use the same list_items/2 function with the index_name option:
# Query by email (partition-only GSI)
{:ok, users} = MyApp.User.list_items(
%MyApp.User{email: "user@example.com"},
index_name: "EmailIndex"
)
# Query by tenant with date range
{:ok, recent_users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme", created_at: "2023-01-01"},
index_name: "TenantIndex",
sk_operator: :gte
)
# Query with exact match on both keys
{:ok, active_users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme", status: "active"},
index_name: "TenantStatusIndex"
)
# Query with sort key operators
{:ok, users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme", created_at: "2023-01-01"},
index_name: "TenantIndex",
sk_operator: :between,
sk_end: "2023-12-31"
)GSI queries support all the same features as table queries:
# With filter expressions
{:ok, filtered_users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme"},
index_name: "TenantIndex",
filter_expression: "status = :status AND #name <> :excluded_name",
expression_attribute_names: %{"#name" => "name"},
expression_attribute_values: %{
":status" => %{"S" => "active"},
":excluded_name" => %{"S" => "admin"}
}
)
# With pagination
{:ok, page_1} = MyApp.User.list_items(
%MyApp.User{tenant: "acme"},
index_name: "TenantIndex",
limit: 10
)
# With projection expressions
{:ok, users} = MyApp.User.list_items(
%MyApp.User{email: "user@example.com"},
index_name: "EmailIndex",
projection_expression: "id, #name, email",
expression_attribute_names: %{"#name" => "name"}
)Dynamo provides comprehensive error handling for GSI queries:
case MyApp.User.list_items(%MyApp.User{email: nil}, index_name: "EmailIndex") do
{:ok, users} ->
# Handle success
{:error, %Dynamo.Error{type: :validation_error, message: message}} ->
# Common validation errors:
# - "GSI 'EmailIndex' requires field 'email' to be populated"
# - "GSI 'NonExistentIndex' not found. Available indexes: EmailIndex, TenantIndex"
# - "GSI 'TenantIndex' sort operation requires field 'created_at' to be populated"
# - "Consistent reads are not supported for Global Secondary Index queries"
IO.puts("Validation error: #{message}")
{:error, error} ->
# Handle other errors (AWS errors, network issues, etc.)
IO.puts("Error: #{error.message}")
endLimitations:
- GSI queries do not support consistent reads (eventually consistent only)
- GSI partition key field must be populated in the struct
- GSI sort key field must be populated when using sort key operations
Best Practices:
- Use descriptive GSI names that indicate their purpose
- Consider projection types carefully to balance query performance and storage costs
- Use
:keys_onlyprojection for count queries or when you only need key attributes - Use
:includeprojection with specific attributes when you need a subset of data - Design GSI partition keys to distribute data evenly across partitions
# Retrieve only specific attributes
{:ok, products} = MyApp.Product.list_items(
%MyApp.Product{category_id: "electronics"},
[
projection_expression: "product_id, name, price"
]
)Dynamo provides a flexible configuration system with three levels:
In your config.exs:
config :dynamo,
partition_key_name: "pk",
sort_key_name: "sk",
key_separator: "#",
prefix_sort_key: false,
table_has_sort_key: trueFor runtime configuration:
# Set configuration for the current process
Dynamo.Config.put_process_config(key_separator: "-")
# Clear process configuration
Dynamo.Config.clear_process_config()Per-schema configuration:
defmodule MyApp.User do
use Dynamo.Schema,
key_separator: "_",
prefix_sort_key: true
# schema definition...
end| Option | Description | Default |
|---|---|---|
partition_key_name |
Name of the partition key in DynamoDB | "pk" |
sort_key_name |
Name of the sort key in DynamoDB | "sk" |
key_separator |
Separator for composite keys | "#" |
prefix_sort_key |
Whether to include field name as prefix in sort key | false |
table_has_sort_key |
Whether the table has a sort key | true |
Dynamo provides several mix tasks to help you work with DynamoDB tables:
# Create a table with default configuration (pk/sk keys)
mix dynamo.create_table users
# Create a table with custom keys
mix dynamo.create_table products --partition-key category_id --sort-key product_id
# Create a table with only a partition key (no sort key)
mix dynamo.create_table simple_counter --partition-key counter_id --no-sort-key
# Create a table with provisioned capacity
mix dynamo.create_table high_traffic --billing-mode PROVISIONED --read-capacity 50 --write-capacity 25
# Use with local DynamoDB
mix dynamo.create_table local_test --endpoint http://localhost:8000# List all tables
mix dynamo.list_tables
# Filter tables by name
mix dynamo.list_tables --name-contains user
# List tables in a specific region
mix dynamo.list_tables --region eu-west-1# Delete a table (will prompt for confirmation)
mix dynamo.delete_table old_users
# Force delete without confirmation
mix dynamo.delete_table old_users --force# Generate a schema from an existing table
mix dynamo.generate_schema users
# Generate a schema with a specific module name
mix dynamo.generate_schema users --module MyApp.User
# Generate a schema with a custom output path
mix dynamo.generate_schema users --output lib/schemas/user.exDynamo supports DynamoDB transactions, allowing you to perform multiple operations atomically:
# Transfer money between accounts atomically
Dynamo.Transaction.transact([
# Check that source account has sufficient funds
{:check, %Account{id: "account-123"},
"balance >= :amount",
%{":amount" => %{"N" => "100.00"}}},
# Decrease source account balance
{:update, %Account{id: "account-123"},
%{balance: {:decrement, 100.00}}},
# Increase destination account balance
{:update, %Account{id: "account-456"},
%{balance: {:increment, 100.00}}}
])Transaction operations include:
:put- Create or replace an item:update- Update an existing item:delete- Delete an item:check- Verify a condition without modifying data
Special update operators:
{:increment, amount}- Add a value to a number{:decrement, amount}- Subtract a value from a number{:append, list}- Append elements to a list{:prepend, list}- Prepend elements to a list{:if_not_exists, default}- Set a value only if it doesn't exist
Dynamo includes standardized error handling that converts DynamoDB errors into meaningful Elixir errors:
case Dynamo.Table.get_item(%User{id: "user-123"}) do
{:ok, user} ->
# Handle success
IO.puts("Found user: #{user.name}")
{:error, %Dynamo.Error{type: :resource_not_found}} ->
# Handle specific error type
IO.puts("User not found")
{:error, %Dynamo.Error{} = error} ->
# Handle general errors
IO.puts("Error: #{error.message}")
endCommon error types:
:resource_not_found- The requested resource doesn't exist:provisioned_throughput_exceeded- Rate limits exceeded:conditional_check_failed- Condition expression evaluated to false:validation_error- Parameter validation failed:access_denied- Insufficient permissions:transaction_conflict- Transaction conflicts with another operation
Error: "GSI 'EmailIndex' not found. Available indexes: TenantIndex, StatusIndex"
Cause: The specified GSI name doesn't exist in the schema.
Solution:
# Check available GSIs
MyApp.User.global_secondary_indexes()
# => [%{name: "TenantIndex", ...}, %{name: "StatusIndex", ...}]
# Use correct GSI name
{:ok, users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme"},
index_name: "TenantIndex" # Correct name
)Error: "GSI 'EmailIndex' requires field 'email' to be populated"
Cause: The GSI partition key field is nil or missing in the struct.
Solution:
# Incorrect - email field is nil
user = %MyApp.User{email: nil}
{:error, _} = MyApp.User.list_items(user, index_name: "EmailIndex")
# Correct - populate the GSI partition key field
user = %MyApp.User{email: "user@example.com"}
{:ok, users} = MyApp.User.list_items(user, index_name: "EmailIndex")Error: "GSI 'TenantIndex' sort operation requires field 'created_at' to be populated"
Cause: Using sort key operators without populating the GSI sort key field.
Solution:
# Incorrect - created_at field is nil but using sort operator
user = %MyApp.User{tenant: "acme", created_at: nil}
{:error, _} = MyApp.User.list_items(user,
index_name: "TenantIndex",
sk_operator: :gte
)
# Correct - populate the GSI sort key field
user = %MyApp.User{tenant: "acme", created_at: "2023-01-01"}
{:ok, users} = MyApp.User.list_items(user,
index_name: "TenantIndex",
sk_operator: :gte
)Error: "Consistent reads are not supported for Global Secondary Index queries"
Cause: Attempting to use consistent_read: true with a GSI query.
Solution:
# Incorrect - GSIs don't support consistent reads
{:error, _} = MyApp.User.list_items(
%MyApp.User{email: "user@example.com"},
index_name: "EmailIndex",
consistent_read: true # Not supported for GSIs
)
# Correct - remove consistent_read option for GSI queries
{:ok, users} = MyApp.User.list_items(
%MyApp.User{email: "user@example.com"},
index_name: "EmailIndex"
)Error: "GSI 'EmailIndex' does not have a sort key but sort operation was requested"
Cause: Using sort key operators on a GSI that only has a partition key.
Solution:
# Check GSI configuration
gsi_config = MyApp.User.global_secondary_indexes()
|> Enum.find(&(&1.name == "EmailIndex"))
# => %{name: "EmailIndex", partition_key: :email, sort_key: nil, ...}
# Incorrect - EmailIndex has no sort key
{:error, _} = MyApp.User.list_items(
%MyApp.User{email: "user@example.com"},
index_name: "EmailIndex",
sk_operator: :begins_with # Not supported for partition-only GSI
)
# Correct - use partition-only query
{:ok, users} = MyApp.User.list_items(
%MyApp.User{email: "user@example.com"},
index_name: "EmailIndex"
)# List all GSIs for a schema
MyApp.User.global_secondary_indexes()
# Find specific GSI configuration
{:ok, gsi_config} = Dynamo.Schema.get_gsi_config(%MyApp.User{}, "TenantIndex")
IO.inspect(gsi_config)
# => %{name: "TenantIndex", partition_key: :tenant, sort_key: :created_at, ...}user = %MyApp.User{tenant: "acme", created_at: "2023-01-01"}
# Check if GSI partition key is populated
case Dynamo.Schema.validate_gsi_config(user, "TenantIndex") do
{:ok, gsi_config} ->
IO.puts("GSI validation passed")
{:error, error} ->
IO.puts("GSI validation failed: #{error.message}")
enduser = %MyApp.User{tenant: "acme", created_at: "2023-01-01"}
{:ok, gsi_config} = Dynamo.Schema.get_gsi_config(user, "TenantIndex")
# Generate GSI keys to see what would be used in the query
gsi_pk = Dynamo.Schema.generate_gsi_partition_key(user, gsi_config)
gsi_sk = Dynamo.Schema.generate_gsi_sort_key(user, gsi_config)
IO.puts("GSI Partition Key: #{gsi_pk}") # => "user#acme"
IO.puts("GSI Sort Key: #{gsi_sk}") # => "2023-01-01"# Efficient - Query with both partition and sort key
{:ok, users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme", status: "active"},
index_name: "TenantStatusIndex"
)
# Less efficient - Query with only partition key on composite GSI
{:ok, users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme"},
index_name: "TenantStatusIndex"
)# Use specific projections when you don't need all attributes
{:ok, users} = MyApp.User.list_items(
%MyApp.User{tenant: "acme"},
index_name: "TenantIndex",
projection_expression: "id, email, #name",
expression_attribute_names: %{"#name" => "name"}
)When using Dynamo in LiveBook, you may encounter issues with on-the-fly compiled modules. This is because LiveBook compiles modules in a way that can interfere with protocol implementations like Dynamo.Encodable.
To work around this issue, you need to override the before_write/1 function in each schema module and manually handle the encoding process:
defmodule MyApp.Product do
use Dynamo.Schema
item do
table_name "products"
field :category_id, partition_key: true
field :product_id, sort_key: true
field :name
field :price
end
def before_write(arg) do
arg
|> IO.inspect() # Optional, useful for debugging
|> Dynamo.Schema.generate_and_add_partition_key()
|> Dynamo.Schema.generate_and_add_sort_key()
|> Dynamo.Encodable.MyApp.Product.encode([])
|> Map.get("M")
end
endThis approach ensures that your schema modules work correctly in LiveBook by:
- Generating and adding partition and sort keys
- Explicitly calling the encode function for your specific module
- Extracting the "M" (map) field from the encoded result
For a complete example of using Dynamo with LiveBook, see the DynamoDB Bulk Insert Example in the repository.
You can override the before_write function to customize how keys are generated:
defmodule MyApp.TimeSeries do
use Dynamo.Schema
item do
table_name "time_series"
field :device_id, partition_key: true
field :timestamp, sort_key: true
field :value
end
def before_write(item) do
# Add current timestamp if not provided
item = if is_nil(item.timestamp) do
%{item | timestamp: DateTime.utc_now() |> DateTime.to_iso8601()}
else
item
end
# Call the default implementation
item
|> Dynamo.Schema.generate_and_add_partition_key()
|> Dynamo.Schema.generate_and_add_sort_key()
|> Dynamo.Encoder.encode_root()
end
endYou can implement the Dynamo.Encodable and Dynamo.Decodable protocols for custom types:
defimpl Dynamo.Encodable, for: MyApp.CustomType do
def encode(value, _options) do
# Convert your custom type to a DynamoDB-compatible format
%{"S" => to_string(value)}
end
end
defimpl Dynamo.Decodable, for: MyApp.CustomType do
def decode(value) do
# Convert from DynamoDB format back to your custom type
MyApp.CustomType.from_string(value)
end
endContributions are welcome! Please feel free to submit a Pull Request.
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add some amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
Dynamo provides built-in logging functionality to help debug DynamoDB operations. You can enable logging to see all queries sent to DynamoDB in a formatted output.
To enable DynamoDB query logging, use the Dynamo.Logger module:
# Enable logging
Dynamo.Logger.enable()
# Perform operations - queries will be logged to stdout
{:ok, user} = MyApp.User.get_item(%MyApp.User{id: "user-123", email: "john@example.com"})
# Disable logging
Dynamo.Logger.disable()When logging is enabled, each DynamoDB operation will be logged in JSON format with the following structure:
{
"timestamp": "2023-06-15T10:30:45.123456Z",
"operation": "GetItem",
"table": "users",
"payload": {
"TableName": "users",
"Key": {
"pk": {"S": "user#user-123"},
"sk": {"S": "user#john@example.com"}
}
},
"response": {
"Item": {
"pk": {"S": "user#user-123"},
"sk": {"S": "user#john@example.com"},
"id": {"S": "user-123"},
"email": {"S": "john@example.com"},
"name": {"S": "John Doe"}
}
}
}You can check if logging is currently enabled:
# Check if logging is enabled
if Dynamo.Logger.enabled?() do
IO.puts("DynamoDB logging is enabled")
else
IO.puts("DynamoDB logging is disabled")
endLogging is particularly useful for:
- Debugging complex query issues
- Understanding the exact payloads sent to DynamoDB
- Verifying key generation is working correctly
- Performance analysis of DynamoDB operations
- Troubleshooting in development environments
Note that logging should typically be disabled in production environments to avoid performance overhead and excessive log output.