Skip to content

Add Turso/libSQL as state storage backend #792

@llrightll

Description

@llrightll

Summary

Propose adding Turso's libSQL (in-process SQLite-compatible database) as a third StateStorage backend alongside Memory and Supabase/PostgreSQL. It fills a gap: durable like PostgreSQL but zero-infra like Memory.

Motivation

Memory Supabase (PostgreSQL) libSQL (proposed)
Survives restart No Yes Yes
External infra required None PostgreSQL server None (file)
Read latency ~0ms (in-process) 1-5ms (network) Sub-ms (in-process)
Multi-instance No Yes (strong consistency) Yes (eventual, via sync)

For single-instance deployments, self-hosted setups, edge deployments, and local development — Memory loses state on restart, and PostgreSQL requires running a database server. libSQL gives durable, in-process storage with no external dependencies. Just a file.

For multi-instance deployments, Turso's optional cloud sync provides eventual consistency across replicas.

Proposed Configuration

state_storage:
  type: turso
  path: "/data/plano-state.db"

  # Optional — cloud sync for multi-instance
  sync_url: "https://your-db.turso.io"
  auth_token: "${TURSO_AUTH_TOKEN}"

Implementation Details

The implementation follows the same patterns as the existing PostgreSQLConversationStorage in crates/brightstaff/src/state/postgresql.rs.

Struct and initialization

use libsql::{Builder, Connection, Database};

#[derive(Clone)]
pub struct TursoConversationStorage {
    db: Database,
    conn: Connection,
}

impl TursoConversationStorage {
    pub async fn new(path: String, sync_url: Option<String>, auth_token: Option<String>) -> Result<Self, StateStorageError> {
        let db = match (&sync_url, &auth_token) {
            // Phase 2: cloud sync enabled
            (Some(url), Some(token)) => {
                Builder::new_remote_replica(&path, url, token)
                    .build()
                    .await
                    .map_err(|e| StateStorageError::StorageError(format!("Failed to open synced database: {}", e)))?
            }
            // Phase 1: local-only
            _ => {
                Builder::new_local(&path)
                    .build()
                    .await
                    .map_err(|e| StateStorageError::StorageError(format!("Failed to open local database: {}", e)))?
            }
        };

        let conn = db.connect()
            .map_err(|e| StateStorageError::StorageError(format!("Failed to connect: {}", e)))?;

        // Auto-create table (unlike PostgreSQL backend's manual ensure_ready pattern,
        // this is appropriate for a file-based DB where manual DDL is impractical)
        conn.execute(
            "CREATE TABLE IF NOT EXISTS conversation_states (
                response_id TEXT PRIMARY KEY,
                input_items TEXT NOT NULL,
                created_at INTEGER NOT NULL,
                model TEXT NOT NULL,
                provider TEXT NOT NULL
            )", ()
        ).await.map_err(|e| StateStorageError::StorageError(format!("Failed to create table: {}", e)))?;

        // WAL mode for concurrent read/write
        conn.execute("PRAGMA journal_mode=WAL", ()).await.ok();

        info!("Turso conversation state storage initialized at {}", path);
        Ok(Self { db, conn })
    }
}

StateStorage trait implementation

#[async_trait]
impl StateStorage for TursoConversationStorage {
    async fn put(&self, state: OpenAIConversationState) -> Result<(), StateStorageError> {
        let input_items_json = serde_json::to_string(&state.input_items)
            .map_err(|e| StateStorageError::SerializationError(format!("Failed to serialize input_items: {}", e)))?;

        self.conn.execute(
            "INSERT OR REPLACE INTO conversation_states (response_id, input_items, created_at, model, provider)
             VALUES (?1, ?2, ?3, ?4, ?5)",
            libsql::params![state.response_id, input_items_json, state.created_at, state.model, state.provider],
        ).await.map_err(|e| StateStorageError::StorageError(format!("Failed to store state for {}: {}", state.response_id, e)))?;

        // If cloud sync is configured, push changes
        // db.sync() is a no-op for local-only databases
        let _ = self.db.sync().await;

        debug!("Stored conversation state for {}", state.response_id);
        Ok(())
    }

    async fn get(&self, response_id: &str) -> Result<OpenAIConversationState, StateStorageError> {
        let mut rows = self.conn.query(
            "SELECT response_id, input_items, created_at, model, provider
             FROM conversation_states WHERE response_id = ?1",
            libsql::params![response_id],
        ).await.map_err(|e| StateStorageError::StorageError(format!("Failed to query state for {}: {}", response_id, e)))?;

        match rows.next().await.map_err(|e| StateStorageError::StorageError(e.to_string()))? {
            Some(row) => {
                let input_items_str: String = row.get(1)
                    .map_err(|e| StateStorageError::StorageError(e.to_string()))?;
                let input_items = serde_json::from_str(&input_items_str)
                    .map_err(|e| StateStorageError::SerializationError(format!("Failed to deserialize input_items: {}", e)))?;

                Ok(OpenAIConversationState {
                    response_id: row.get(0).map_err(|e| StateStorageError::StorageError(e.to_string()))?,
                    input_items,
                    created_at: row.get(2).map_err(|e| StateStorageError::StorageError(e.to_string()))?,
                    model: row.get(3).map_err(|e| StateStorageError::StorageError(e.to_string()))?,
                    provider: row.get(4).map_err(|e| StateStorageError::StorageError(e.to_string()))?,
                })
            }
            None => Err(StateStorageError::NotFound(response_id.to_string())),
        }
    }

    async fn exists(&self, response_id: &str) -> Result<bool, StateStorageError> {
        let mut rows = self.conn.query(
            "SELECT EXISTS(SELECT 1 FROM conversation_states WHERE response_id = ?1)",
            libsql::params![response_id],
        ).await.map_err(|e| StateStorageError::StorageError(e.to_string()))?;

        match rows.next().await.map_err(|e| StateStorageError::StorageError(e.to_string()))? {
            Some(row) => Ok(row.get::<i64>(0).map_err(|e| StateStorageError::StorageError(e.to_string()))? != 0),
            None => Ok(false),
        }
    }

    async fn delete(&self, response_id: &str) -> Result<(), StateStorageError> {
        let rows_affected = self.conn.execute(
            "DELETE FROM conversation_states WHERE response_id = ?1",
            libsql::params![response_id],
        ).await.map_err(|e| StateStorageError::StorageError(format!("Failed to delete state for {}: {}", response_id, e)))?;

        if rows_affected == 0 {
            return Err(StateStorageError::NotFound(response_id.to_string()));
        }
        debug!("Deleted conversation state for {}", response_id);
        Ok(())
    }
}

Config and enum changes in mod.rs

pub enum StorageBackend {
    Memory,
    Supabase,
    Turso,  // new
}

impl StorageBackend {
    pub fn parse_backend(s: &str) -> Option<Self> {
        match s.to_lowercase().as_str() {
            "memory" => Some(StorageBackend::Memory),
            "supabase" => Some(StorageBackend::Supabase),
            "turso" => Some(StorageBackend::Turso),  // new
            _ => None,
        }
    }
}

Cargo feature flag

[features]
libsql-storage = ["libsql"]

[dependencies]
libsql = { version = "0.9", optional = true }

Schema note

The PostgreSQL backend stores input_items as JSONB. SQLite has no native JSONB type, so this backend stores it as TEXT with serde_json::to_string / serde_json::from_str (functionally identical for this use case — the column is never queried with JSON operators).

Cloud sync lifecycle

When sync_url is configured, the libsql crate creates an embedded replica that:

  • Reads are always served from the local file (sub-millisecond)
  • Writes go to the local file, then db.sync() pushes to Turso Cloud
  • Other replicas see changes after their next sync() call
  • Read-your-writes guarantee: the instance that wrote sees its data immediately
  • sync() on a local-only database is a no-op, so the same code handles both modes

Phased approach

Phase 1 — Local-only: Use Builder::new_local(path). Durable state that survives restarts. No external dependencies. Gated behind libsql-storage feature flag.

Phase 2 — Cloud sync: Use Builder::new_remote_replica(path, url, token). Add sync_interval for periodic background sync. Enables multi-instance state sharing with eventual consistency.

Trade-offs

Advantages over Supabase/PostgreSQL:

  • Zero external infrastructure
  • In-process reads (microsecond-level vs 1-5ms network)
  • Single-file deployment, easy backups (copy the file)
  • Optional cloud sync when multi-instance is needed

Advantages over Memory:

  • Survives restarts
  • Optional cloud backup and cross-instance sync

Limitations:

  • Cloud sync is eventually consistent — not suitable for strict cross-replica consistency (PostgreSQL remains better for that)
  • Adds build time when feature flag is enabled (compiles SQLite engine from C sources; opt-in only)

Use Cases

  • Self-hosted Plano — durable state without running PostgreSQL
  • Local development — state persists across restarts, no Docker compose for a DB
  • Edge/IoT — lightweight, single-file, offline-capable
  • Multi-instance with sync — each replica has local state, syncs to Turso Cloud

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions