Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/forge_repo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ forge_fs.workspace = true
forge_template.workspace = true
forge_json_repair.workspace = true
anyhow.workspace = true
backon.workspace = true
futures.workspace = true
async-trait.workspace = true
serde.workspace = true
Expand Down
108 changes: 77 additions & 31 deletions crates/forge_repo/src/database/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::PathBuf;
use std::time::Duration;

use anyhow::Result;
use backon::{BlockingRetryable, ExponentialBuilder};
use diesel::prelude::*;
use diesel::r2d2::{ConnectionManager, CustomizeConnection, Pool, PooledConnection};
use diesel::sqlite::SqliteConnection;
Expand All @@ -20,6 +21,7 @@ pub struct PoolConfig {
pub min_idle: Option<u32>,
pub connection_timeout: Duration,
pub idle_timeout: Option<Duration>,
pub max_retries: usize,
pub database_path: PathBuf,
}

Expand All @@ -28,15 +30,17 @@ impl PoolConfig {
Self {
max_size: 5,
min_idle: Some(1),
connection_timeout: Duration::from_secs(30),
connection_timeout: Duration::from_secs(5),
idle_timeout: Some(Duration::from_secs(600)), // 10 minutes
max_retries: 5,
database_path,
}
}
}

pub struct DatabasePool {
pool: DbPool,
max_retries: usize,
}

impl DatabasePool {
Expand All @@ -61,14 +65,65 @@ impl DatabasePool {
.run_pending_migrations(MIGRATIONS)
.map_err(|e| anyhow::anyhow!("Failed to run database migrations: {e}"))?;

Ok(Self { pool })
Ok(Self { pool, max_retries: 5 })
}

pub fn get_connection(&self) -> Result<PooledSqliteConnection> {
self.pool.get().map_err(|e| {
warn!(error = %e, "Failed to get connection from pool");
anyhow::anyhow!("Failed to get connection from pool: {e}")
})
Self::retry_with_backoff(
self.max_retries,
"Failed to get connection from pool, retrying",
|| {
self.pool
.get()
.map_err(|e| anyhow::anyhow!("Failed to get connection from pool: {e}"))
},
)
}

/// Retries a blocking database pool operation with exponential backoff.
fn retry_with_backoff<T>(
max_retries: usize,
message: &'static str,
operation: impl FnMut() -> Result<T>,
) -> Result<T> {
operation
.retry(
ExponentialBuilder::default()
.with_min_delay(Duration::from_secs(1))
.with_max_times(max_retries)
.with_jitter(),
)
.sleep(std::thread::sleep)
.notify(|err, dur| {
warn!(
error = %err,
retry_after_ms = dur.as_millis() as u64,
"{}",
message
);
})
.call()
}
}
// Configure SQLite for better concurrency ref: https://docs.diesel.rs/master/diesel/sqlite/struct.SqliteConnection.html#concurrency
#[derive(Debug)]
struct SqliteCustomizer;

impl CustomizeConnection<SqliteConnection, diesel::r2d2::Error> for SqliteCustomizer {
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
diesel::sql_query("PRAGMA busy_timeout = 30000;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA journal_mode = WAL;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA synchronous = NORMAL;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA wal_autocheckpoint = 1000;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
Ok(())
}
}

Expand All @@ -83,36 +138,27 @@ impl TryFrom<PoolConfig> for DatabasePool {
std::fs::create_dir_all(parent)?;
}

// Retry pool creation with exponential backoff to handle transient
// failures such as another process holding an exclusive lock on the
// SQLite database file.
DatabasePool::retry_with_backoff(
config.max_retries,
"Failed to create database pool, retrying",
|| Self::build_pool(&config),
)
}
}

impl DatabasePool {
/// Builds the connection pool and runs migrations.
fn build_pool(config: &PoolConfig) -> Result<Self> {
let database_url = config.database_path.to_string_lossy().to_string();
let manager = ConnectionManager::<SqliteConnection>::new(&database_url);

// Configure SQLite for better concurrency ref: https://docs.diesel.rs/master/diesel/sqlite/struct.SqliteConnection.html#concurrency
#[derive(Debug)]
struct SqliteCustomizer;
impl CustomizeConnection<SqliteConnection, diesel::r2d2::Error> for SqliteCustomizer {
fn on_acquire(&self, conn: &mut SqliteConnection) -> Result<(), diesel::r2d2::Error> {
diesel::sql_query("PRAGMA busy_timeout = 30000;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA journal_mode = WAL;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA synchronous = NORMAL;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
diesel::sql_query("PRAGMA wal_autocheckpoint = 1000;")
.execute(conn)
.map_err(diesel::r2d2::Error::QueryError)?;
Ok(())
}
}

let customizer = SqliteCustomizer;

let mut builder = Pool::builder()
.max_size(config.max_size)
.connection_timeout(config.connection_timeout)
.connection_customizer(Box::new(customizer));
.connection_customizer(Box::new(SqliteCustomizer));

if let Some(min_idle) = config.min_idle {
builder = builder.min_idle(Some(min_idle));
Expand All @@ -138,6 +184,6 @@ impl TryFrom<PoolConfig> for DatabasePool {
})?;

debug!(database_path = %config.database_path.display(), "created connection pool");
Ok(Self { pool })
Ok(Self { pool, max_retries: config.max_retries })
}
}
Loading