Skip to content

Events en

RAprogramm edited this page Jan 7, 2026 · 2 revisions

Lifecycle Events

Generate domain events for entity lifecycle changes. Events enable audit logging, event sourcing, and integration with message queues.

Quick Start

#[derive(Entity)]
#[entity(table = "orders", events)]
pub struct Order {
    #[id]
    pub id: Uuid,

    #[field(create, response)]
    pub customer_id: Uuid,

    #[field(create, update, response)]
    pub status: String,

    #[field(create, response)]
    pub total_cents: i64,

    #[field(response)]
    #[auto]
    pub created_at: DateTime<Utc>,
}

Generated Code

The events attribute generates an event enum:

/// Generated by entity-derive
#[derive(Debug, Clone)]
pub enum OrderEvent {
    /// Entity was created.
    Created(Order),

    /// Entity was updated.
    Updated {
        id: Uuid,
        changes: UpdateOrderRequest,
    },

    /// Entity was deleted.
    Deleted(Uuid),
}

Usage Examples

Basic Event Publishing

use async_trait::async_trait;

#[async_trait]
pub trait EventBus: Send + Sync {
    async fn publish<E: Send + Sync>(&self, event: E);
}

async fn create_order(
    repo: &impl OrderRepository,
    bus: &impl EventBus,
    dto: CreateOrderRequest,
) -> Result<Order, sqlx::Error> {
    let order = repo.create(dto).await?;

    // Publish event after successful creation
    bus.publish(OrderEvent::Created(order.clone())).await;

    Ok(order)
}

async fn update_order(
    repo: &impl OrderRepository,
    bus: &impl EventBus,
    id: Uuid,
    dto: UpdateOrderRequest,
) -> Result<Order, sqlx::Error> {
    let order = repo.update(id, dto.clone()).await?;

    bus.publish(OrderEvent::Updated { id, changes: dto }).await;

    Ok(order)
}

async fn delete_order(
    repo: &impl OrderRepository,
    bus: &impl EventBus,
    id: Uuid,
) -> Result<bool, sqlx::Error> {
    let deleted = repo.delete(id).await?;

    if deleted {
        bus.publish(OrderEvent::Deleted(id)).await;
    }

    Ok(deleted)
}

Audit Logging

struct AuditLogger {
    pool: PgPool,
}

#[async_trait]
impl EventHandler<OrderEvent> for AuditLogger {
    async fn handle(&self, event: OrderEvent) {
        let (action, entity_id, details) = match &event {
            OrderEvent::Created(order) => (
                "created",
                order.id,
                serde_json::to_string(order).unwrap(),
            ),
            OrderEvent::Updated { id, changes } => (
                "updated",
                *id,
                serde_json::to_string(changes).unwrap(),
            ),
            OrderEvent::Deleted(id) => (
                "deleted",
                *id,
                String::new(),
            ),
        };

        sqlx::query(
            "INSERT INTO audit_log (entity_type, entity_id, action, details, created_at)
             VALUES ('order', $1, $2, $3, NOW())"
        )
        .bind(entity_id)
        .bind(action)
        .bind(details)
        .execute(&self.pool)
        .await
        .ok();
    }
}

Message Queue Integration

use rdkafka::producer::FutureProducer;

struct KafkaEventBus {
    producer: FutureProducer,
    topic: String,
}

#[async_trait]
impl EventBus for KafkaEventBus {
    async fn publish<E: Serialize + Send + Sync>(&self, event: E) {
        let payload = serde_json::to_vec(&event).unwrap();

        self.producer
            .send(
                FutureRecord::to(&self.topic)
                    .payload(&payload)
                    .key(&Uuid::new_v4().to_string()),
                Duration::from_secs(5),
            )
            .await
            .ok();
    }
}

Event Sourcing Pattern

struct OrderAggregate {
    events: Vec<OrderEvent>,
    current_state: Option<Order>,
}

impl OrderAggregate {
    fn apply(&mut self, event: OrderEvent) {
        match &event {
            OrderEvent::Created(order) => {
                self.current_state = Some(order.clone());
            }
            OrderEvent::Updated { changes, .. } => {
                if let Some(ref mut order) = self.current_state {
                    if let Some(status) = &changes.status {
                        order.status = status.clone();
                    }
                }
            }
            OrderEvent::Deleted(_) => {
                self.current_state = None;
            }
        }
        self.events.push(event);
    }

    fn replay(events: Vec<OrderEvent>) -> Self {
        let mut aggregate = Self {
            events: Vec::new(),
            current_state: None,
        };
        for event in events {
            aggregate.apply(event);
        }
        aggregate
    }
}

With Soft Delete

When soft_delete is enabled, additional events are generated:

#[derive(Entity)]
#[entity(table = "documents", events, soft_delete)]
pub struct Document {
    #[id]
    pub id: Uuid,

    #[field(create, response)]
    pub title: String,

    #[field(skip)]
    pub deleted_at: Option<DateTime<Utc>>,
}

Generated:

pub enum DocumentEvent {
    Created(Document),
    Updated { id: Uuid, changes: UpdateDocumentRequest },
    Deleted(Uuid),       // Soft delete
    Restored(Uuid),      // Restored from soft delete
    HardDeleted(Uuid),   // Permanent delete
}

Best Practices

  1. Publish after commit — Only publish events after the database transaction succeeds
  2. Idempotent handlers — Event handlers should be idempotent for at-least-once delivery
  3. Include context — Consider adding metadata (user_id, timestamp, correlation_id)
  4. Async processing — Use background workers for heavy event processing
  5. Dead letter queue — Handle failed events gracefully

Combining with Hooks

Events and hooks work well together:

#[derive(Entity)]
#[entity(table = "orders", events, hooks)]
pub struct Order { /* ... */ }

struct OrderService {
    repo: PgPool,
    bus: EventBus,
}

#[async_trait]
impl OrderHooks for OrderService {
    type Error = AppError;

    async fn after_create(&self, entity: &Order) -> Result<(), Self::Error> {
        // Publish event in hook
        self.bus.publish(OrderEvent::Created(entity.clone())).await;
        Ok(())
    }

    async fn after_update(&self, entity: &Order) -> Result<(), Self::Error> {
        // Events can be published here too
        Ok(())
    }

    async fn after_delete(&self, id: &Uuid) -> Result<(), Self::Error> {
        self.bus.publish(OrderEvent::Deleted(*id)).await;
        Ok(())
    }
}

See Also

Clone this wiki locally