Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 6 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "iota-gas-station"
version = "0.4.0"
version = "0.5.0"
edition = "2021"
authors = ["Mysten Labs <build@mystenlabs.com>", "IOTA Stiftung"]
license = "Apache-2.0"
Expand All @@ -14,7 +14,11 @@ iota-metrics = { git = "https://github.com/iotaledger/iota", tag = "v1.15.0", pa
iota-config = { git = "https://github.com/iotaledger/iota", tag = "v1.15.0", package = "iota-config" }
iota-json-rpc-types = { git = "https://github.com/iotaledger/iota", tag = "v1.15.0", package = "iota-json-rpc-types" }
iota-sdk = { git = "https://github.com/iotaledger/iota", tag = "v1.15.0", package = "iota-sdk" }
iota-sdk-types = { package = "iota-sdk-types", git = "https://github.com/iotaledger/iota-rust-sdk.git", rev = "05608b7e4a5b96d85f84e1970a517f6f174beb8b", features = ["hash", "serde", "schemars"] }
iota-sdk-types = { package = "iota-sdk-types", git = "https://github.com/iotaledger/iota-rust-sdk.git", rev = "05608b7e4a5b96d85f84e1970a517f6f174beb8b", features = [
"hash",
"serde",
"schemars",
] }
iota-types = { git = "https://github.com/iotaledger/iota", tag = "v1.15.0", package = "iota-types" }
telemetry-subscribers = { git = "https://github.com/iotaledger/iota", tag = "v1.15.0", package = "telemetry-subscribers" }

Expand Down
10 changes: 9 additions & 1 deletion src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Command {
let rescan_trigger_receiver = rescan_config.create_receiver();

let cold_params_changes = cold_params
.check_if_changed(&storage, &format!("{namespace_prefix}:cold_params"))
.check_if_changed(&storage, &get_cold_params_key(&namespace_prefix))
.await
.expect("failed to check cold params changes");

Expand Down Expand Up @@ -149,6 +149,10 @@ impl Command {
} else {
None
};
cold_params
.save_to_storage(&storage, &get_cold_params_key(&namespace_prefix))
.await
.expect("failed to save cold params to storage");
let core_metrics = GasStationCoreMetrics::new(&prometheus_registry);
let stats_storage = connect_stats_storage(&gas_station_config, &namespace_prefix).await;
let stats_tracker = StatsTracker::new(Arc::new(stats_storage));
Expand Down Expand Up @@ -186,3 +190,7 @@ impl Command {
server.handle.await.unwrap();
}
}

fn get_cold_params_key(namespace_prefix: &str) -> String {
format!("{namespace_prefix}:cold_params")
}
21 changes: 9 additions & 12 deletions src/config/cold_params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,29 @@ impl ColdParams {
.await
.context("unable to get cold params from storage")?;

// If there are no cold params, it means this is the first run.
// In this case, there are no changes, so we only need to save the current cold params.
if maybe_cold_params.is_none() {
storage
.set_data(
key,
serde_json::to_vec(&self)
.context("unable to serialize cold params and save to storage")?,
)
.await?;
return Ok(vec![]);
}

let old_cold_params = serde_json::from_slice(&maybe_cold_params.unwrap()).context(
format!("unable to deserialize cold params. The entry with the key {key} is not a valid cold params structure",
))?;

let changes = self.changes_details(&old_cold_params);
Ok(changes)
}

pub async fn save_to_storage(
&self,
storage: &Arc<dyn Storage>,
key: &str,
) -> anyhow::Result<()> {
storage
.set_data(
key,
serde_json::to_vec(&self)
.context("unable to serialize cold params and save to storage")?,
)
.await?;
Ok(changes)
Ok(())
}
}

Expand Down
9 changes: 5 additions & 4 deletions src/gas_station_initializer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,11 +208,14 @@ impl GasStationInitializer {
force_full_rescan: bool,
ignore_locks: bool,
) -> Self {
let sponsor_address = signer.get_address();
Self::perform_consistency_check(&iota_client, &storage, sponsor_address).await;

if force_full_rescan {
if storage
.acquire_maintenance_lock(MAX_MAINTENANCE_DURATION_SEC)
.await
.unwrap()
.expect("Failed to acquire maintenance lock for full rescan")
{
info!("Acquired maintenance lock for full rescan");
} else {
Expand Down Expand Up @@ -365,8 +368,6 @@ impl GasStationInitializer {
}
}

Self::perform_consistency_check(&iota_client, storage, sponsor_address).await;

let start = Instant::now();
let balance_threshold = if matches!(mode, RunMode::Init) {
info!("The pool has never been initialized. Initializing it for the first time");
Expand Down Expand Up @@ -468,7 +469,7 @@ impl GasStationInitializer {
storage: &Arc<dyn Storage>,
sponsor_address: IotaAddress,
) {
info!("Performing quick consistency check before rescan");
info!("Performing quick consistency check");
let registry_coin_count = match storage.get_available_coin_count().await {
Ok(count) => count as u64,
Err(e) => {
Expand Down
5 changes: 5 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ pub trait Storage: SetGetStorage + Sync + Send {

#[async_trait::async_trait]
pub trait SetGetStorage: Sync + Send {
/// Set data in the storage.
/// Key is expected to be absolute (already includes namespace if needed)
async fn set_data(&self, key: &str, value: Vec<u8>) -> anyhow::Result<()>;

/// Get data from the storage.
/// Key is expected to be absolute (already includes namespace if needed)
async fn get_data(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>>;
}

Expand Down
20 changes: 17 additions & 3 deletions src/storage/redis/lua_scripts/clean_up_coin_registry.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@
-- SPDX-License-Identifier: Apache-2.0

-- This script is used to clean up all data associated with a sponsor's coin registry.
-- It deletes all keys with the namespace prefix.
-- It deletes all keys with the namespace prefix, EXCEPT for lock keys which must be preserved.
-- The first argument is the namespace.

local namespace = ARGV[1]
local pattern = namespace .. ':*'

-- Lock keys that should NOT be deleted during cleanup
local t_maintenance_lock = namespace .. ':maintenance_lock'
local t_init_lock = namespace .. ':init_lock'

local cursor = "0"
local deleted_count = 0

Expand All @@ -17,8 +21,18 @@ repeat
local keys = result[2]

if #keys > 0 then
redis.call('DEL', unpack(keys))
deleted_count = deleted_count + #keys
-- Filter out lock keys that should be preserved
local keys_to_delete = {}
for i, key in ipairs(keys) do
if key ~= t_maintenance_lock and key ~= t_init_lock then
table.insert(keys_to_delete, key)
end
end

if #keys_to_delete > 0 then
redis.call('DEL', unpack(keys_to_delete))
deleted_count = deleted_count + #keys_to_delete
end
end
until cursor == "0"

Expand Down
97 changes: 86 additions & 11 deletions src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,12 @@ fn generate_namespace(namespace_prefix: &str) -> String {
impl SetGetStorage for RedisStorage {
async fn set_data(&self, key: &str, value: Vec<u8>) -> anyhow::Result<()> {
let mut conn = self.conn_manager.clone();
let full_key = format!("{}:{}", self.namespace, key);
conn.set::<_, _, ()>(full_key, value).await?;
conn.set::<_, _, ()>(key, value).await?;
Ok(())
}

async fn get_data(&self, key: &str) -> anyhow::Result<Option<Vec<u8>>> {
let mut conn = self.conn_manager.clone();
let full_key = format!("{}:{}", self.namespace, key);
let value: Option<Vec<u8>> = conn.get(full_key).await?;
let value: Option<Vec<u8>> = conn.get(key).await?;
Ok(value)
}
}
Expand Down Expand Up @@ -419,11 +416,12 @@ mod tests {
let test_struct = TestStruct {
value: "test_value".to_string(),
};
let key = format!("{}:test_key", storage.namespace);
storage
.set_data("test_key", serde_json::to_vec(&test_struct).unwrap())
.set_data(&key, serde_json::to_vec(&test_struct).unwrap())
.await
.unwrap();
let value: Option<Vec<u8>> = storage.get_data("test_key").await.unwrap();
let value: Option<Vec<u8>> = storage.get_data(&key).await.unwrap();
let value: TestStruct = serde_json::from_slice(&value.unwrap()).unwrap();
assert_eq!(value, test_struct);
}
Expand Down Expand Up @@ -513,12 +511,13 @@ mod tests {
let total_balance = storage.get_available_coin_total_balance().await;
assert_eq!(total_balance, 300);

// Set some data
// Set some data with absolute key
let test_key = format!("{}:test_key", storage.namespace);
storage
.set_data("test_key", b"test_value".to_vec())
.set_data(&test_key, b"test_value".to_vec())
.await
.unwrap();
let value = storage.get_data("test_key").await.unwrap();
let value = storage.get_data(&test_key).await.unwrap();
assert!(value.is_some());

// Clean up the coin registry
Expand All @@ -530,10 +529,86 @@ mod tests {
assert_eq!(total_balance, 0);

// Verify the test_key is also cleaned up
let value = storage.get_data("test_key").await.unwrap();
let value = storage.get_data(&test_key).await.unwrap();
assert!(value.is_none());
}

#[tokio::test]
async fn test_clean_up_preserves_locks() {
let storage = setup_storage().await;

// Add some coins and data
storage
.add_new_coins(vec![
GasCoin {
balance: 100,
object_ref: random_object_ref(),
},
GasCoin {
balance: 200,
object_ref: random_object_ref(),
},
])
.await
.unwrap();
let test_key = format!("{}:test_key", storage.namespace);
storage
.set_data(&test_key, b"test_value".to_vec())
.await
.unwrap();

// Acquire maintenance lock BEFORE cleanup
let lock_acquired = storage
.acquire_maintenance_lock(300) // 300 seconds
.await
.unwrap();
assert!(lock_acquired, "Should acquire maintenance lock");

// Verify we're in maintenance mode
assert!(
storage.is_maintenance_mode().await.unwrap(),
"Should be in maintenance mode"
);

// Clean up the coin registry (this should NOT delete the maintenance lock)
storage.clean_up_coin_registry().await.unwrap();

// Verify maintenance lock is STILL held after cleanup
assert!(
storage.is_maintenance_mode().await.unwrap(),
"Maintenance lock should be preserved after cleanup"
);

// Verify we cannot acquire the lock again (it's still held)
let lock_acquired_again = storage.acquire_maintenance_lock(300).await.unwrap();
assert!(
!lock_acquired_again,
"Should not be able to acquire lock again while it's held"
);

// Verify other data was cleaned up
let (coin_count, total_balance) = storage.init_coin_stats_at_startup().await.unwrap();
assert_eq!(coin_count, 0, "Coins should be cleaned up");
assert_eq!(total_balance, 0, "Balance should be zero");

let value = storage.get_data(&test_key).await.unwrap();
assert!(value.is_none(), "Test data should be cleaned up");

// Release the lock
storage.release_maintenance_lock().await.unwrap();
assert!(
!storage.is_maintenance_mode().await.unwrap(),
"Should not be in maintenance mode after release"
);

// Now we should be able to acquire it again
let lock_acquired_final = storage.acquire_maintenance_lock(300).await.unwrap();
assert!(
lock_acquired_final,
"Should be able to acquire lock after release"
);
}

async fn setup_storage() -> RedisStorage {
let sponsor = IotaAddress::ZERO;
let namespace_prefix = "test";
Expand Down