Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
7 changes: 5 additions & 2 deletions src/executor/src/ordinary_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl TransactionExecutor for OrdinaryTransactionExecutor {
log::debug!(target: "executor", "Account is frozen, hash = {:x}", hash);
}
let mut acc_balance = account.balance().cloned().unwrap_or_default();
let mut original_acc_balance = acc_balance.clone();
let is_special = self.config.is_special_account(is_masterchain, &account_id)?;
let account_address = MsgAddressInt::with_params(wc_id, account_id.clone())?;

Expand Down Expand Up @@ -228,8 +229,10 @@ impl TransactionExecutor for OrdinaryTransactionExecutor {

log::debug!(target: "executor",
"storage_phase: {}", if description.storage_ph.is_some() {"present"} else {"none"});
let mut original_acc_balance = account.balance().cloned().unwrap_or_default();
original_acc_balance.sub(tr.total_fees())?;
if !original_acc_balance.sub(tr.total_fees())? {
original_acc_balance.coins = Default::default();
debug_assert!(tr.total_fees().other.is_empty());
}

if !description.credit_first && !is_ext_msg {
description.credit_ph = match self.credit_phase(&msg_balance, &mut acc_balance) {
Expand Down
2 changes: 1 addition & 1 deletion src/executor/src/tests/common/cross_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) fn cross_check(
// assert!(extra.config.global_version() >= ton_block::SUPPORTED_VERSION, "global_version {} must be >= {}",
// config.global_version(), ton_block::SUPPORTED_VERSION);
#[cfg(windows)]
let lib_name = "../../ton/build/crypto/Release/vm_run_shared.dll";
let lib_name = "../../../ton/build/crypto/vm_run_shared.dll";
#[cfg(target_os = "linux")]
let lib_name = "../../ton-node-cpp/build/crypto/libvm_run_shared.so";
#[cfg(target_os = "macos")]
Expand Down
33 changes: 25 additions & 8 deletions src/executor/src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub static RECEIVER_ACCOUNT: AccountId = AccountId::with_uint256([0x22; 128]);
pub static THIRD_ACCOUNT: AccountId = AccountId::with_uint256([0x33; 128]);
pub static BLOCKCHAIN_CONFIG: LazyLock<BlockchainConfig> = LazyLock::new(default_config);
pub static SIMPLE_MC_STATE: LazyLock<Cell> =
LazyLock::new(|| mc_state_proof_cell_with_config(BLOCKCHAIN_CONFIG.raw_config().clone()));
LazyLock::new(|| mc_state_proof_cell_with_config(BLOCKCHAIN_CONFIG.raw_config().clone(), None));

pub fn mc_state_cell_with_config(config: ConfigParams) -> ShardStateUnsplit {
let mc_seqno = 1234567;
Expand All @@ -78,8 +78,11 @@ pub fn make_proof_cell(p: &impl Serializable) -> Cell {
proof.serialize().unwrap()
}

pub fn mc_state_proof_cell_with_config(config: ConfigParams) -> Cell {
let mc_state = mc_state_cell_with_config(config);
pub fn mc_state_proof_cell_with_config(config: ConfigParams, libs: Option<Cell>) -> Cell {
let mut mc_state = mc_state_cell_with_config(config);
if let Some(libs) = libs {
*mc_state.libraries_mut() = ton_block::Libraries::with_hashmap(Some(libs));
}
make_proof_cell(&mc_state)
}

Expand Down Expand Up @@ -142,7 +145,7 @@ pub fn execute_params(last_tr_lt: u64) -> ExecuteParams {
Emulator,
}
let debug = DebugType::None;
let _ = cross_check::DisableCrossCheck::new();
// let _ = cross_check::DisableCrossCheck::new();
let (verbosity, pattern, trace_callback) = match debug {
DebugType::None => (4, None, None),
DebugType::Simple => (2048 + 4, Some("{m}"), None),
Expand Down Expand Up @@ -978,13 +981,27 @@ pub fn read_config(cfg: &str) -> Result<ConfigParams> {
}

pub fn replay_transaction_by_files(acc: &str, acc_after: &str, tr: &str, cfg: &str) {
replay_transaction_with_prevs(acc, acc_after, tr, cfg, "")
replay_transaction_full(acc, acc_after, tr, cfg, "", "")
}

pub fn replay_transaction_with_prevs(acc: &str, acc_after: &str, tr: &str, cfg: &str, prev: &str) {
pub fn replay_transaction_full(
acc: &str,
acc_after: &str,
tr: &str,
cfg: &str,
prev: &str,
libs: &str,
) {
let config = read_config(cfg).unwrap();
assert!(config.valid_config_data(false, None).unwrap());
let mc_state_proof = mc_state_proof_cell_with_config(config);
let libs = if libs.is_empty() {
None
} else if let Ok(data) = std::fs::read(libs) {
Some(read_single_root_boc(data).unwrap())
} else {
None
};
let mc_state_proof = mc_state_proof_cell_with_config(config, libs);
replay_transaction(None, acc, acc_after, tr, prev, mc_state_proof)
}

Expand Down Expand Up @@ -1015,7 +1032,7 @@ pub fn try_replay_transaction(
config: BlockchainConfig,
params: &ExecuteParams,
) -> Result<Transaction> {
let mc_state_proof = mc_state_proof_cell_with_config(config.raw_config().clone());
let mc_state_proof = mc_state_proof_cell_with_config(config.raw_config().clone(), None);
let msg_cell = message.map(|msg| msg.serialize().unwrap());
execute_with_params(mc_state_proof, msg_cell, account, params)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,18 @@ fn test_fwd_fee_payment_in_smc() {
)
}

#[test]
fn test_raw_reserve_with_flag4() {
replay_transaction_full(
"real_boc/raw_reserve_with_flag4_account_old.boc",
"real_boc/raw_reserve_with_flag4_account_new.boc",
"real_boc/raw_reserve_with_flag4_transaction.boc",
"real_boc/config12.boc",
"",
"real_boc/raw_reserve_with_flag4_libs.boc",
)
}

#[ignore = "test for replay transaction by files"]
#[test]
fn test_replay_transaction_by_files() {
Expand Down Expand Up @@ -632,6 +644,8 @@ fn test_bad_single() {
fn test_bad_trans() {
let json = "../../emulator/emulator_test.json";
let prefix = "real_boc/bad_".to_string();
let libs = std::path::PathBuf::from(json).parent().unwrap().join("libs.boc");
let libs = libs.to_string_lossy();
let json = std::fs::read_to_string(json).unwrap();
let json: serde_json::Map<String, serde_json::Value> = serde_json::from_str(&json).unwrap();
let acc = json["shard_account_boc"].as_str().unwrap();
Expand All @@ -642,11 +656,12 @@ fn test_bad_trans() {
shard_acc.account_cell().write_to_file(prefix.clone() + "account_new.boc");
std::fs::write(prefix.clone() + "transaction.boc", base64_decode(tr).unwrap()).unwrap();

replay_transaction_with_prevs(
replay_transaction_full(
acc,
&(prefix + "account_new.boc"),
tr,
"real_boc/config12.boc",
prev,
&libs,
);
}
93 changes: 74 additions & 19 deletions src/node/src/ext_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,17 @@ use ton_block::{
#[path = "tests/test_ext_messages.rs"]
mod tests;

const MESSAGE_LIFETIME: u32 = 600; // seconds
const MESSAGE_MAX_GENERATIONS: u8 = 3;

const LIMIT_MEMPOOL_PER_ADDRESS: u32 = 256;
const MAX_EXTERNAL_MESSAGE_DEPTH: u16 = 512;
pub const MAX_EXTERNAL_MESSAGE_SIZE: usize = 65535;
const MESSAGE_LIFETIME: u32 = 600; // seconds
const MESSAGE_MAX_GENERATIONS: u8 = 3;

pub const EXT_MESSAGES_TRACE_TARGET: &str = "ext_messages";

#[derive(Clone)]
struct MessageKeeper {
addr_key: (i32, UInt256),
message: Arc<Message>,

// active: bool, 0x1_00_00000000
Expand All @@ -41,11 +42,11 @@ struct MessageKeeper {
}

impl MessageKeeper {
fn new(message: Arc<Message>) -> Self {
fn new(message: Arc<Message>, addr_key: (i32, UInt256)) -> Self {
let mut atomic_storage = 0;
Self::set_active(&mut atomic_storage, true);

Self { message, atomic_storage: Arc::new(AtomicU64::new(atomic_storage)) }
Self { addr_key, message, atomic_storage: Arc::new(AtomicU64::new(atomic_storage)) }
}

fn message(&self) -> &Arc<Message> {
Expand Down Expand Up @@ -139,6 +140,8 @@ impl OrderMap {
}

pub struct MessagesPool {
// per-address message count for rate limiting (key = (workchain_id, account_id))
per_address: Map<(i32, UInt256), AtomicU32>,
// map by hash of message
messages: Map<UInt256, MessageKeeper>,
// map by timestamp, inside map by seqno for hash of message, workchain_id and prefix of dst address
Expand All @@ -160,6 +163,7 @@ impl MessagesPool {
metrics::gauge!("ton_node_ext_messages_queue_size").set(0f64);

Self {
per_address: Map::new(),
messages: Map::with_hasher(Default::default()),
order: Map::with_hasher(Default::default()),
min_timestamp: AtomicU32::new(now),
Expand Down Expand Up @@ -202,11 +206,35 @@ impl MessagesPool {
}
}

log::debug!(target: EXT_MESSAGES_TRACE_TARGET, "adding external message {:x}", id);
let workchain_id = message.dst_workchain_id().unwrap_or_default();
let account_id = message
.int_dst_account_id()
.map_or(UInt256::default(), |s| UInt256::from_slice(&s.get_bytestring(0)));
let addr_key = (workchain_id, account_id);

// Per-address rate limiting
if let Some(guard) = self.per_address.get(&addr_key) {
if guard.val().load(Ordering::Relaxed) >= LIMIT_MEMPOOL_PER_ADDRESS {
fail!(
"per-address limit ({}) reached for {}:{}",
LIMIT_MEMPOOL_PER_ADDRESS,
workchain_id,
addr_key.1.to_hex_string()
)
}
}

log::debug!(target: EXT_MESSAGES_TRACE_TARGET, "adding external message {:x}", id);
let prefix =
message.int_dst_account_id().map_or(0, |slice| slice.get_int(64).unwrap_or_default());
self.messages.insert(id.clone(), MessageKeeper::new(message));
self.messages.insert(id.clone(), MessageKeeper::new(message, addr_key.clone()));

// Increment per-address counter
if let Some(guard) = self.per_address.get(&addr_key) {
guard.val().fetch_add(1, Ordering::Relaxed);
} else {
self.per_address.insert(addr_key, AtomicU32::new(1));
}
self.total_messages.fetch_add(1, Ordering::Relaxed);
#[cfg(test)]
self.total_in_order.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -253,12 +281,13 @@ impl MessagesPool {
true
}
});
if result.is_some() {
if let Some(guard) = result {
log::debug!(
target: EXT_MESSAGES_TRACE_TARGET,
"complete_messages: removing external message {:x} with reason {} because can't postpone",
id, reason,
);
self.decrement_per_address(&guard.val().addr_key);
metrics::gauge!("ton_node_ext_messages_queue_size").decrement(1f64);
self.total_messages.fetch_sub(1, Ordering::Relaxed);
}
Expand All @@ -279,6 +308,15 @@ impl MessagesPool {
);
}

fn decrement_per_address(&self, addr_key: &(i32, UInt256)) {
if let Some(guard) = self.per_address.get(addr_key) {
let prev = guard.val().fetch_sub(1, Ordering::Relaxed);
if prev <= 1 {
self.per_address.remove(addr_key);
}
}
}

fn clear_expired_messages(&self, timestamp: u32, finish_time_ms: u64) -> bool {
let order = match self.order.get(&timestamp) {
Some(guard) => guard.val().clone(),
Expand All @@ -301,6 +339,7 @@ impl MessagesPool {
target: EXT_MESSAGES_TRACE_TARGET,
"removing external message {:x} because it is expired", guard.key()
);
self.decrement_per_address(&guard.val().addr_key);
self.total_messages.fetch_sub(1, Ordering::Relaxed);
}
#[cfg(test)]
Expand Down Expand Up @@ -342,8 +381,9 @@ pub struct MessagePoolIter {

impl MessagePoolIter {
fn new(pool: Arc<MessagesPool>, shard: ShardIdent, now: u32, finish_time_ms: u64) -> Self {
let timestamp = pool.min_timestamp.load(Ordering::Relaxed);
Self { pool, shard, now, timestamp, seqno: 0, finish_time_ms }
// Start from newest messages (now) and iterate backwards to oldest
// (matching C++ behavior where newest messages get priority)
Self { pool, shard, now, timestamp: now, seqno: 0, finish_time_ms }
}

fn find_in_map(
Expand Down Expand Up @@ -371,20 +411,33 @@ impl Iterator for MessagePoolIter {
type Item = (Arc<Message>, UInt256);

fn next(&mut self) -> Option<Self::Item> {
// iterate timestamp
let now = UnixTime::now_ms();
while self.timestamp <= self.now {
let min_timestamp = self.pool.min_timestamp.load(Ordering::Relaxed);
// Iterate from newest to oldest (reverse chronological order)
loop {
if self.finish_time_ms < now {
return None;
}
// check if this order map is expired
if self.timestamp < min_timestamp {
return None;
}
// Check if this timestamp is expired
if self.timestamp + MESSAGE_LIFETIME < self.now {
if !self.pool.clear_expired_messages(self.timestamp, self.finish_time_ms) {
if self.timestamp == min_timestamp {
if !self.pool.clear_expired_messages(self.timestamp, self.finish_time_ms) {
return None;
}
self.pool.increment_min_timestamp(self.timestamp);
}
// Skip expired timestamps
if self.timestamp == 0 {
return None;
}
// level was removed or not present try to move bottom margin
self.pool.increment_min_timestamp(self.timestamp);
} else if let Some(order) =
self.timestamp -= 1;
self.seqno = 0;
continue;
}
if let Some(order) =
self.pool.order.get(&self.timestamp).map(|guard| guard.val().clone())
{
while self.seqno < order.seqno.load(Ordering::Relaxed) {
Expand All @@ -398,10 +451,12 @@ impl Iterator for MessagePoolIter {
}
}
}
self.timestamp += 1;
if self.timestamp == 0 {
return None;
}
self.timestamp -= 1;
self.seqno = 0;
}
None
}
}

Expand Down
20 changes: 15 additions & 5 deletions src/node/src/tests/test_ext_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ fn test_create_ext_message() {
#[test]
fn test_message_keeper() {
let m = Message::with_ext_in_header(ExternalInboundMessageHeader::default());
let mk = MessageKeeper::new(Arc::new(m));
let mk = MessageKeeper::new(Arc::new(m), Default::default());

assert!(mk.check_active(10000));

Expand Down Expand Up @@ -133,7 +133,7 @@ fn test_message_keeper() {
#[test]
fn test_message_keeper_multithread() {
let m = Message::with_ext_in_header(ExternalInboundMessageHeader::default());
let mk = Arc::new(MessageKeeper::new(Arc::new(m)));
let mk = Arc::new(MessageKeeper::new(Arc::new(m), Default::default()));

let mut hs = vec![];
for _ in 0..50 {
Expand Down Expand Up @@ -173,11 +173,15 @@ fn test_message_keeper_multithread() {
}

fn create_external_message(dst_shard: u8, salt: Vec<u8>) -> Arc<Message> {
create_external_message_to([dst_shard; 32], salt)
}

fn create_external_message_to(dst_account: [u8; 32], salt: Vec<u8>) -> Arc<Message> {
let mut hdr = ExternalInboundMessageHeader::default();
let length_in_bits = salt.len() * 8;
let address = SliceData::from_raw(salt, length_in_bits);
hdr.src = MsgAddressExt::with_extern(address).unwrap();
hdr.dst = MsgAddressInt::with_standart(None, 0, [dst_shard; 32].into()).unwrap();
hdr.dst = MsgAddressInt::with_standart(None, 0, dst_account.into()).unwrap();
hdr.import_fee = 10u64.into();
Arc::new(Message::with_ext_in_header(hdr))
}
Expand Down Expand Up @@ -352,7 +356,10 @@ fn test_external_messages_big_load() {
let queue_seconds = min(MESSAGE_LIFETIME, 100);
for i in 0..queue_seconds {
for j in 0..rate_per_second {
let m = create_external_message(0, (i * rate_per_second + j).to_be_bytes().to_vec());
let idx = i * rate_per_second + j;
let mut dst = [0u8; 32];
dst[..4].copy_from_slice(&idx.to_be_bytes());
let m = create_external_message_to(dst, idx.to_be_bytes().to_vec());
let id = m.hash().unwrap();
mp.new_message(&id, m, now + i).unwrap();
}
Expand All @@ -369,6 +376,9 @@ fn test_external_messages_big_load() {
(count, n)
};
println!("count = {}, time = {:?}", count, n);
assert_eq!(0, count);
// With newest-first iteration, non-expired messages are found quickly.
// The pool contains messages from ~502 to ~601 seconds ago;
// those within MESSAGE_LIFETIME (600s) are valid and returned.
assert!(count <= 100);
assert!((n as u64) < limit * 3);
}
Loading
Loading