diff --git a/src/executor/real_boc/raw_reserve_with_flag4_account_new.boc b/src/executor/real_boc/raw_reserve_with_flag4_account_new.boc new file mode 100644 index 0000000..ad2c0d1 Binary files /dev/null and b/src/executor/real_boc/raw_reserve_with_flag4_account_new.boc differ diff --git a/src/executor/real_boc/raw_reserve_with_flag4_account_old.boc b/src/executor/real_boc/raw_reserve_with_flag4_account_old.boc new file mode 100644 index 0000000..34a8845 Binary files /dev/null and b/src/executor/real_boc/raw_reserve_with_flag4_account_old.boc differ diff --git a/src/executor/real_boc/raw_reserve_with_flag4_libs.boc b/src/executor/real_boc/raw_reserve_with_flag4_libs.boc new file mode 100644 index 0000000..57c347a Binary files /dev/null and b/src/executor/real_boc/raw_reserve_with_flag4_libs.boc differ diff --git a/src/executor/real_boc/raw_reserve_with_flag4_transaction.boc b/src/executor/real_boc/raw_reserve_with_flag4_transaction.boc new file mode 100644 index 0000000..c17772f Binary files /dev/null and b/src/executor/real_boc/raw_reserve_with_flag4_transaction.boc differ diff --git a/src/executor/src/ordinary_transaction.rs b/src/executor/src/ordinary_transaction.rs index 388fa07..18003a7 100644 --- a/src/executor/src/ordinary_transaction.rs +++ b/src/executor/src/ordinary_transaction.rs @@ -116,6 +116,7 @@ impl TransactionExecutor for OrdinaryTransactionExecutor { log::debug!(target: "executor", "Account is frozen, hash = {:x}", hash); } let mut acc_balance = account.balance().cloned().unwrap_or_default(); + let mut original_acc_balance = acc_balance.clone(); let is_special = self.config.is_special_account(is_masterchain, &account_id)?; let account_address = MsgAddressInt::with_params(wc_id, account_id.clone())?; @@ -228,8 +229,10 @@ impl TransactionExecutor for OrdinaryTransactionExecutor { log::debug!(target: "executor", "storage_phase: {}", if description.storage_ph.is_some() {"present"} else {"none"}); - let mut original_acc_balance = account.balance().cloned().unwrap_or_default(); - original_acc_balance.sub(tr.total_fees())?; + if !original_acc_balance.sub(tr.total_fees())? { + original_acc_balance.coins = Default::default(); + debug_assert!(tr.total_fees().other.is_empty()); + } if !description.credit_first && !is_ext_msg { description.credit_ph = match self.credit_phase(&msg_balance, &mut acc_balance) { diff --git a/src/executor/src/tests/common/cross_check.rs b/src/executor/src/tests/common/cross_check.rs index 61a0ecc..c2fe05f 100644 --- a/src/executor/src/tests/common/cross_check.rs +++ b/src/executor/src/tests/common/cross_check.rs @@ -87,7 +87,7 @@ pub(crate) fn cross_check( // assert!(extra.config.global_version() >= ton_block::SUPPORTED_VERSION, "global_version {} must be >= {}", // config.global_version(), ton_block::SUPPORTED_VERSION); #[cfg(windows)] - let lib_name = "../../ton/build/crypto/Release/vm_run_shared.dll"; + let lib_name = "../../../ton/build/crypto/vm_run_shared.dll"; #[cfg(target_os = "linux")] let lib_name = "../../ton-node-cpp/build/crypto/libvm_run_shared.so"; #[cfg(target_os = "macos")] diff --git a/src/executor/src/tests/common/mod.rs b/src/executor/src/tests/common/mod.rs index 03cb743..53226f7 100644 --- a/src/executor/src/tests/common/mod.rs +++ b/src/executor/src/tests/common/mod.rs @@ -53,7 +53,7 @@ pub static RECEIVER_ACCOUNT: AccountId = AccountId::with_uint256([0x22; 128]); pub static THIRD_ACCOUNT: AccountId = AccountId::with_uint256([0x33; 128]); pub static BLOCKCHAIN_CONFIG: LazyLock = LazyLock::new(default_config); pub static SIMPLE_MC_STATE: LazyLock = - LazyLock::new(|| mc_state_proof_cell_with_config(BLOCKCHAIN_CONFIG.raw_config().clone())); + LazyLock::new(|| mc_state_proof_cell_with_config(BLOCKCHAIN_CONFIG.raw_config().clone(), None)); pub fn mc_state_cell_with_config(config: ConfigParams) -> ShardStateUnsplit { let mc_seqno = 1234567; @@ -78,8 +78,11 @@ pub fn make_proof_cell(p: &impl Serializable) -> Cell { proof.serialize().unwrap() } -pub fn mc_state_proof_cell_with_config(config: ConfigParams) -> Cell { - let mc_state = mc_state_cell_with_config(config); +pub fn mc_state_proof_cell_with_config(config: ConfigParams, libs: Option) -> Cell { + let mut mc_state = mc_state_cell_with_config(config); + if let Some(libs) = libs { + *mc_state.libraries_mut() = ton_block::Libraries::with_hashmap(Some(libs)); + } make_proof_cell(&mc_state) } @@ -142,7 +145,7 @@ pub fn execute_params(last_tr_lt: u64) -> ExecuteParams { Emulator, } let debug = DebugType::None; - let _ = cross_check::DisableCrossCheck::new(); + // let _ = cross_check::DisableCrossCheck::new(); let (verbosity, pattern, trace_callback) = match debug { DebugType::None => (4, None, None), DebugType::Simple => (2048 + 4, Some("{m}"), None), @@ -978,13 +981,27 @@ pub fn read_config(cfg: &str) -> Result { } pub fn replay_transaction_by_files(acc: &str, acc_after: &str, tr: &str, cfg: &str) { - replay_transaction_with_prevs(acc, acc_after, tr, cfg, "") + replay_transaction_full(acc, acc_after, tr, cfg, "", "") } -pub fn replay_transaction_with_prevs(acc: &str, acc_after: &str, tr: &str, cfg: &str, prev: &str) { +pub fn replay_transaction_full( + acc: &str, + acc_after: &str, + tr: &str, + cfg: &str, + prev: &str, + libs: &str, +) { let config = read_config(cfg).unwrap(); assert!(config.valid_config_data(false, None).unwrap()); - let mc_state_proof = mc_state_proof_cell_with_config(config); + let libs = if libs.is_empty() { + None + } else if let Ok(data) = std::fs::read(libs) { + Some(read_single_root_boc(data).unwrap()) + } else { + None + }; + let mc_state_proof = mc_state_proof_cell_with_config(config, libs); replay_transaction(None, acc, acc_after, tr, prev, mc_state_proof) } @@ -1015,7 +1032,7 @@ pub fn try_replay_transaction( config: BlockchainConfig, params: &ExecuteParams, ) -> Result { - let mc_state_proof = mc_state_proof_cell_with_config(config.raw_config().clone()); + let mc_state_proof = mc_state_proof_cell_with_config(config.raw_config().clone(), None); let msg_cell = message.map(|msg| msg.serialize().unwrap()); execute_with_params(mc_state_proof, msg_cell, account, params) } diff --git a/src/executor/src/tests/test_transaction_executor_with_real_data.rs b/src/executor/src/tests/test_transaction_executor_with_real_data.rs index f5f510f..9ec2df1 100644 --- a/src/executor/src/tests/test_transaction_executor_with_real_data.rs +++ b/src/executor/src/tests/test_transaction_executor_with_real_data.rs @@ -559,6 +559,18 @@ fn test_fwd_fee_payment_in_smc() { ) } +#[test] +fn test_raw_reserve_with_flag4() { + replay_transaction_full( + "real_boc/raw_reserve_with_flag4_account_old.boc", + "real_boc/raw_reserve_with_flag4_account_new.boc", + "real_boc/raw_reserve_with_flag4_transaction.boc", + "real_boc/config12.boc", + "", + "real_boc/raw_reserve_with_flag4_libs.boc", + ) +} + #[ignore = "test for replay transaction by files"] #[test] fn test_replay_transaction_by_files() { @@ -632,6 +644,8 @@ fn test_bad_single() { fn test_bad_trans() { let json = "../../emulator/emulator_test.json"; let prefix = "real_boc/bad_".to_string(); + let libs = std::path::PathBuf::from(json).parent().unwrap().join("libs.boc"); + let libs = libs.to_string_lossy(); let json = std::fs::read_to_string(json).unwrap(); let json: serde_json::Map = serde_json::from_str(&json).unwrap(); let acc = json["shard_account_boc"].as_str().unwrap(); @@ -642,11 +656,12 @@ fn test_bad_trans() { shard_acc.account_cell().write_to_file(prefix.clone() + "account_new.boc"); std::fs::write(prefix.clone() + "transaction.boc", base64_decode(tr).unwrap()).unwrap(); - replay_transaction_with_prevs( + replay_transaction_full( acc, &(prefix + "account_new.boc"), tr, "real_boc/config12.boc", prev, + &libs, ); } diff --git a/src/node/src/ext_messages.rs b/src/node/src/ext_messages.rs index 1834fbb..52ea600 100644 --- a/src/node/src/ext_messages.rs +++ b/src/node/src/ext_messages.rs @@ -22,16 +22,17 @@ use ton_block::{ #[path = "tests/test_ext_messages.rs"] mod tests; -const MESSAGE_LIFETIME: u32 = 600; // seconds -const MESSAGE_MAX_GENERATIONS: u8 = 3; - +const LIMIT_MEMPOOL_PER_ADDRESS: u32 = 256; const MAX_EXTERNAL_MESSAGE_DEPTH: u16 = 512; pub const MAX_EXTERNAL_MESSAGE_SIZE: usize = 65535; +const MESSAGE_LIFETIME: u32 = 600; // seconds +const MESSAGE_MAX_GENERATIONS: u8 = 3; pub const EXT_MESSAGES_TRACE_TARGET: &str = "ext_messages"; #[derive(Clone)] struct MessageKeeper { + addr_key: (i32, UInt256), message: Arc, // active: bool, 0x1_00_00000000 @@ -41,11 +42,11 @@ struct MessageKeeper { } impl MessageKeeper { - fn new(message: Arc) -> Self { + fn new(message: Arc, addr_key: (i32, UInt256)) -> Self { let mut atomic_storage = 0; Self::set_active(&mut atomic_storage, true); - Self { message, atomic_storage: Arc::new(AtomicU64::new(atomic_storage)) } + Self { addr_key, message, atomic_storage: Arc::new(AtomicU64::new(atomic_storage)) } } fn message(&self) -> &Arc { @@ -139,6 +140,8 @@ impl OrderMap { } pub struct MessagesPool { + // per-address message count for rate limiting (key = (workchain_id, account_id)) + per_address: Map<(i32, UInt256), AtomicU32>, // map by hash of message messages: Map, // map by timestamp, inside map by seqno for hash of message, workchain_id and prefix of dst address @@ -160,6 +163,7 @@ impl MessagesPool { metrics::gauge!("ton_node_ext_messages_queue_size").set(0f64); Self { + per_address: Map::new(), messages: Map::with_hasher(Default::default()), order: Map::with_hasher(Default::default()), min_timestamp: AtomicU32::new(now), @@ -202,11 +206,35 @@ impl MessagesPool { } } - log::debug!(target: EXT_MESSAGES_TRACE_TARGET, "adding external message {:x}", id); let workchain_id = message.dst_workchain_id().unwrap_or_default(); + let account_id = message + .int_dst_account_id() + .map_or(UInt256::default(), |s| UInt256::from_slice(&s.get_bytestring(0))); + let addr_key = (workchain_id, account_id); + + // Per-address rate limiting + if let Some(guard) = self.per_address.get(&addr_key) { + if guard.val().load(Ordering::Relaxed) >= LIMIT_MEMPOOL_PER_ADDRESS { + fail!( + "per-address limit ({}) reached for {}:{}", + LIMIT_MEMPOOL_PER_ADDRESS, + workchain_id, + addr_key.1.to_hex_string() + ) + } + } + + log::debug!(target: EXT_MESSAGES_TRACE_TARGET, "adding external message {:x}", id); let prefix = message.int_dst_account_id().map_or(0, |slice| slice.get_int(64).unwrap_or_default()); - self.messages.insert(id.clone(), MessageKeeper::new(message)); + self.messages.insert(id.clone(), MessageKeeper::new(message, addr_key.clone())); + + // Increment per-address counter + if let Some(guard) = self.per_address.get(&addr_key) { + guard.val().fetch_add(1, Ordering::Relaxed); + } else { + self.per_address.insert(addr_key, AtomicU32::new(1)); + } self.total_messages.fetch_add(1, Ordering::Relaxed); #[cfg(test)] self.total_in_order.fetch_add(1, Ordering::Relaxed); @@ -253,12 +281,13 @@ impl MessagesPool { true } }); - if result.is_some() { + if let Some(guard) = result { log::debug!( target: EXT_MESSAGES_TRACE_TARGET, "complete_messages: removing external message {:x} with reason {} because can't postpone", id, reason, ); + self.decrement_per_address(&guard.val().addr_key); metrics::gauge!("ton_node_ext_messages_queue_size").decrement(1f64); self.total_messages.fetch_sub(1, Ordering::Relaxed); } @@ -279,6 +308,15 @@ impl MessagesPool { ); } + fn decrement_per_address(&self, addr_key: &(i32, UInt256)) { + if let Some(guard) = self.per_address.get(addr_key) { + let prev = guard.val().fetch_sub(1, Ordering::Relaxed); + if prev <= 1 { + self.per_address.remove(addr_key); + } + } + } + fn clear_expired_messages(&self, timestamp: u32, finish_time_ms: u64) -> bool { let order = match self.order.get(×tamp) { Some(guard) => guard.val().clone(), @@ -301,6 +339,7 @@ impl MessagesPool { target: EXT_MESSAGES_TRACE_TARGET, "removing external message {:x} because it is expired", guard.key() ); + self.decrement_per_address(&guard.val().addr_key); self.total_messages.fetch_sub(1, Ordering::Relaxed); } #[cfg(test)] @@ -342,8 +381,9 @@ pub struct MessagePoolIter { impl MessagePoolIter { fn new(pool: Arc, shard: ShardIdent, now: u32, finish_time_ms: u64) -> Self { - let timestamp = pool.min_timestamp.load(Ordering::Relaxed); - Self { pool, shard, now, timestamp, seqno: 0, finish_time_ms } + // Start from newest messages (now) and iterate backwards to oldest + // (matching C++ behavior where newest messages get priority) + Self { pool, shard, now, timestamp: now, seqno: 0, finish_time_ms } } fn find_in_map( @@ -371,20 +411,33 @@ impl Iterator for MessagePoolIter { type Item = (Arc, UInt256); fn next(&mut self) -> Option { - // iterate timestamp let now = UnixTime::now_ms(); - while self.timestamp <= self.now { + let min_timestamp = self.pool.min_timestamp.load(Ordering::Relaxed); + // Iterate from newest to oldest (reverse chronological order) + loop { if self.finish_time_ms < now { return None; } - // check if this order map is expired + if self.timestamp < min_timestamp { + return None; + } + // Check if this timestamp is expired if self.timestamp + MESSAGE_LIFETIME < self.now { - if !self.pool.clear_expired_messages(self.timestamp, self.finish_time_ms) { + if self.timestamp == min_timestamp { + if !self.pool.clear_expired_messages(self.timestamp, self.finish_time_ms) { + return None; + } + self.pool.increment_min_timestamp(self.timestamp); + } + // Skip expired timestamps + if self.timestamp == 0 { return None; } - // level was removed or not present try to move bottom margin - self.pool.increment_min_timestamp(self.timestamp); - } else if let Some(order) = + self.timestamp -= 1; + self.seqno = 0; + continue; + } + if let Some(order) = self.pool.order.get(&self.timestamp).map(|guard| guard.val().clone()) { while self.seqno < order.seqno.load(Ordering::Relaxed) { @@ -398,10 +451,12 @@ impl Iterator for MessagePoolIter { } } } - self.timestamp += 1; + if self.timestamp == 0 { + return None; + } + self.timestamp -= 1; self.seqno = 0; } - None } } diff --git a/src/node/src/tests/test_ext_messages.rs b/src/node/src/tests/test_ext_messages.rs index 3903842..95649fd 100644 --- a/src/node/src/tests/test_ext_messages.rs +++ b/src/node/src/tests/test_ext_messages.rs @@ -103,7 +103,7 @@ fn test_create_ext_message() { #[test] fn test_message_keeper() { let m = Message::with_ext_in_header(ExternalInboundMessageHeader::default()); - let mk = MessageKeeper::new(Arc::new(m)); + let mk = MessageKeeper::new(Arc::new(m), Default::default()); assert!(mk.check_active(10000)); @@ -133,7 +133,7 @@ fn test_message_keeper() { #[test] fn test_message_keeper_multithread() { let m = Message::with_ext_in_header(ExternalInboundMessageHeader::default()); - let mk = Arc::new(MessageKeeper::new(Arc::new(m))); + let mk = Arc::new(MessageKeeper::new(Arc::new(m), Default::default())); let mut hs = vec![]; for _ in 0..50 { @@ -173,11 +173,15 @@ fn test_message_keeper_multithread() { } fn create_external_message(dst_shard: u8, salt: Vec) -> Arc { + create_external_message_to([dst_shard; 32], salt) +} + +fn create_external_message_to(dst_account: [u8; 32], salt: Vec) -> Arc { let mut hdr = ExternalInboundMessageHeader::default(); let length_in_bits = salt.len() * 8; let address = SliceData::from_raw(salt, length_in_bits); hdr.src = MsgAddressExt::with_extern(address).unwrap(); - hdr.dst = MsgAddressInt::with_standart(None, 0, [dst_shard; 32].into()).unwrap(); + hdr.dst = MsgAddressInt::with_standart(None, 0, dst_account.into()).unwrap(); hdr.import_fee = 10u64.into(); Arc::new(Message::with_ext_in_header(hdr)) } @@ -352,7 +356,10 @@ fn test_external_messages_big_load() { let queue_seconds = min(MESSAGE_LIFETIME, 100); for i in 0..queue_seconds { for j in 0..rate_per_second { - let m = create_external_message(0, (i * rate_per_second + j).to_be_bytes().to_vec()); + let idx = i * rate_per_second + j; + let mut dst = [0u8; 32]; + dst[..4].copy_from_slice(&idx.to_be_bytes()); + let m = create_external_message_to(dst, idx.to_be_bytes().to_vec()); let id = m.hash().unwrap(); mp.new_message(&id, m, now + i).unwrap(); } @@ -369,6 +376,9 @@ fn test_external_messages_big_load() { (count, n) }; println!("count = {}, time = {:?}", count, n); - assert_eq!(0, count); + // With newest-first iteration, non-expired messages are found quickly. + // The pool contains messages from ~502 to ~601 seconds ago; + // those within MESSAGE_LIFETIME (600s) are valid and returned. + assert!(count <= 100); assert!((n as u64) < limit * 3); } diff --git a/src/node/src/validator/collator.rs b/src/node/src/validator/collator.rs index 9cb1e52..933c6f5 100644 --- a/src/node/src/validator/collator.rs +++ b/src/node/src/validator/collator.rs @@ -3585,6 +3585,14 @@ impl Collator { ); return Ok(()); } + if collator_data.error_attempt >= 2 { + log::info!( + "{}: attempt #{}: skipping external messages", + self.collated_block_descr, + collator_data.error_attempt + ); + return Ok(()); + } log::debug!("{}: process_inbound_external_messages", self.collated_block_descr); let finish_time_ms = self.get_external_messages_finish_time_micros(); let mut iter =