Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/block/src/inbound_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,10 +470,15 @@ impl Augmentation<ImportFees> for InMsg {

fees.value_imported.coins = header.fwd_fee;
}
InMsg::DeferredFinal(_) => {
fees.fees_collected = header.fwd_fee;
InMsg::DeferredFinal(x) => {
let env = x.read_envelope_message()?;
if env.fwd_fee_remaining() != x.fwd_fee() {
fail!("fwd_fee_remaining not equal to fwd_fee")
}
fees.fees_collected = *env.fwd_fee_remaining();

fees.value_imported.coins = header.fwd_fee;
fees.value_imported = header.value.clone();
fees.value_imported.coins.add(env.fwd_fee_remaining())?;
}
InMsg::DeferredTransit(x) => {
let env = x.read_in_envelope_message()?;
Expand Down
2 changes: 2 additions & 0 deletions src/emulator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ impl Emulator {
let config = BlockchainConfig::with_config(self.config_params.clone())
.inspect_err(|err| log::error!("Failed to create BlockchainConfig: {err}"))?;

let dict_hash_min_cells = config.size_limits_config().acc_state_cells_for_storage_dict;
let executor: Box<dyn TransactionExecutor> = if in_msg_cell.is_some() {
Box::new(OrdinaryTransactionExecutor::new(config))
} else {
Expand Down Expand Up @@ -479,6 +480,7 @@ impl Emulator {
let elapsed_time = now.elapsed().as_micros() as i64;
let result = match result {
Ok(mut transaction) => {
account.update_storage_stat(dict_hash_min_cells).unwrap();
transaction.set_prev_trans_lt(shard_acc.last_trans_lt());
transaction.set_prev_trans_hash(shard_acc.last_trans_hash().clone());
let old_hash = shard_acc.account_hash();
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
4 changes: 4 additions & 0 deletions src/executor/src/blockchain_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ impl BlockchainConfig {
})
}

pub fn global_version(&self) -> u32 {
self.global_version
}

/// Get `MsgForwardPrices` for message forward fee calculation
pub fn get_fwd_prices(&self, is_masterchain: bool) -> &MsgForwardPrices {
if is_masterchain {
Expand Down
1 change: 1 addition & 0 deletions src/executor/src/ordinary_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ impl TransactionExecutor for OrdinaryTransactionExecutor {
in_msg: Some(in_msg.clone()),
incoming_value: msg_balance.clone(),
storage_fees_collected,
due_payment: account.due_payment().map_or(0, Coins::as_u128),
config_params,
prev_blocks_info: params.prev_blocks_info.clone(),
..Default::default()
Expand Down
25 changes: 7 additions & 18 deletions src/executor/src/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ pub fn execute_params(last_tr_lt: u64) -> ExecuteParams {
Simple,
Emulator,
}
let debug = DebugType::Emulator;
let debug = DebugType::None;
let _ = cross_check::DisableCrossCheck::new();
let (verbosity, pattern, trace_callback) = match debug {
DebugType::None => (4, None, None),
Expand Down Expand Up @@ -458,6 +458,7 @@ pub fn execute_with_params(
} else {
mc_state_proof
};
let block_version = config.global_version();
let dict_hash_min_cells = config.size_limits_config().acc_state_cells_for_storage_dict;
let executor: Box<dyn TransactionExecutor> = if in_msg_cell.is_none() {
let tt = acc.get_tick_tock().unwrap();
Expand All @@ -474,7 +475,11 @@ pub fn execute_with_params(
let acc_before = acc.clone();
let trans = executor.execute_with_params(in_msg_cell.clone(), acc, params.clone());
if trans.is_ok() {
acc.update_storage_stat(dict_hash_min_cells).unwrap();
if block_version < 11 {
acc.del_storage_stat();
} else {
acc.update_storage_stat(dict_hash_min_cells).unwrap();
}
}
#[cfg(feature = "cross_check")]
cross_check::cross_check(
Expand Down Expand Up @@ -922,22 +927,6 @@ pub fn replay_transaction(
transaction.read_description().unwrap()
);

let block_version = extra.config.global_version();
if block_version < 11 {
account.del_storage_stat();
} else {
account
.update_storage_stat(
mc.read_custom()
.unwrap()
.unwrap()
.config()
.size_limits_config()
.unwrap()
.acc_state_cells_for_storage_dict,
)
.unwrap();
}
// account.write_to_file(acc_after).unwrap();
let new_hash = account.serialize().unwrap().repr_hash();
// let hash_update = ton_block::HashUpdate::with_hashes(old_hash.clone(), new_hash.clone());
Expand Down
63 changes: 59 additions & 4 deletions src/executor/src/tests/test_transaction_executor_with_real_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ use crate::blockchain_config::BlockchainConfig;
use pretty_assertions::assert_eq;
use std::io::{BufRead, BufReader};
use ton_block::{
read_single_root_boc, Account, AccountStorage, Block, Cell, ConfigParams, CurrencyCollection,
Deserializable, Message, MsgAddressInt, Result, Serializable, StateInit, StorageInfo,
Transaction, UnixTime,
base64_decode, read_single_root_boc, Account, AccountStorage, Block, Cell, ConfigParams,
CurrencyCollection, Deserializable, Message, MsgAddressInt, Result, Serializable, ShardAccount,
StateInit, StorageInfo, TrComputePhase, Transaction, UnixTime,
};

mod common;
Expand Down Expand Up @@ -92,7 +92,7 @@ fn many_replay_contract_by_files(
let descr = transaction.read_description()?;
let compute_ph = descr.compute_phase_ref().expect("no compute phase");
match compute_ph {
ton_block::TrComputePhase::Vm(vm) => {
TrComputePhase::Vm(vm) => {
let steps = vm.vm_steps;
let gas = vm.gas_used;
result.push((idx, contents.len(), steps, gas));
Expand Down Expand Up @@ -539,6 +539,26 @@ fn test_size_limits_v12() {
)
}

#[test]
fn test_due_payment_in_smc() {
replay_transaction_by_files(
"real_boc/due_payment_in_smc_account_old.boc",
"real_boc/due_payment_in_smc_account_new.boc",
"real_boc/due_payment_in_smc_transaction.boc",
"real_boc/config12.boc",
)
}

#[test]
fn test_fwd_fee_payment_in_smc() {
replay_transaction_by_files(
"real_boc/fwd_fee_payment_in_smc_account_old.boc",
"real_boc/fwd_fee_payment_in_smc_account_new.boc",
"real_boc/fwd_fee_payment_in_smc_transaction.boc",
"real_boc/config12.boc",
)
}

#[ignore = "test for replay transaction by files"]
#[test]
fn test_replay_transaction_by_files() {
Expand Down Expand Up @@ -595,3 +615,38 @@ fn test_revert_action_phase() {
answer.set_last_tr_time(account.last_tr_time().unwrap_or(0));
assert_eq!(answer, account);
}

#[ignore]
#[test]
fn test_bad_single() {
replay_transaction_by_files(
"real_boc/bad_account_old.boc",
"real_boc/bad_account_new.boc",
"real_boc/bad_transaction.boc",
"real_boc/config12.boc",
)
}

#[ignore]
#[test]
fn test_bad_trans() {
let json = "../../emulator/emulator_test.json";
let prefix = "real_boc/bad_".to_string();
let json = std::fs::read_to_string(json).unwrap();
let json: serde_json::Map<String, serde_json::Value> = serde_json::from_str(&json).unwrap();
let acc = json["shard_account_boc"].as_str().unwrap();
let tr = json["tx_boc"].as_str().unwrap();
let prev = json["prev_blocks_info_boc"].as_str().unwrap();
let shard_acc = ShardAccount::construct_from_base64(acc).unwrap();
shard_acc.account_cell().write_to_file(prefix.clone() + "account_old.boc");
shard_acc.account_cell().write_to_file(prefix.clone() + "account_new.boc");
std::fs::write(prefix.clone() + "transaction.boc", base64_decode(tr).unwrap()).unwrap();

replay_transaction_with_prevs(
acc,
&(prefix + "account_new.boc"),
tr,
"real_boc/config12.boc",
prev,
);
}
1 change: 1 addition & 0 deletions src/executor/src/tick_tock_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl TransactionExecutor for TickTockTransactionExecutor {
balance: acc_balance.clone(),
config_params,
storage_fees_collected,
due_payment: account.due_payment().map_or(0, Coins::as_u128),
prev_blocks_info: params.prev_blocks_info.clone(),
..Default::default()
};
Expand Down
72 changes: 51 additions & 21 deletions src/node/src/validator/collator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,6 +786,7 @@ struct ExecutionManager {
config: BlockchainConfig,
prev_blocks_info: PrevBlocksInfo,
engine: Arc<dyn EngineOperations>,
cancel_ext: Arc<AtomicBool>,
}

impl ExecutionManager {
Expand Down Expand Up @@ -824,19 +825,10 @@ impl ExecutionManager {
mc_data.state.shard_state_extra()?.prev_blocks.clone(),
),
engine,
cancel_ext: Arc::new(AtomicBool::new(false)),
})
}

// waits and finalizes all parallel tasks
pub async fn wait_transactions(&mut self, collator_data: &mut CollatorData) -> Result<()> {
log::trace!("{}: wait_transactions", self.collated_block_descr);
while self.wait_tr.count() > 0 {
self.wait_transaction(collator_data).await?;
}
self.min_lt.fetch_max(self.max_lt.load(Ordering::Relaxed), Ordering::Relaxed);
Ok(())
}

// checks if a number of parallel transactilns is not too big, waits and finalizes some if needed.
pub async fn check_parallel_transactions(
&mut self,
Expand Down Expand Up @@ -910,6 +902,7 @@ impl ExecutionManager {
let engine = self.engine.clone();
let dict_hash_min_cells = self.config.size_limits_config().acc_state_cells_for_storage_dict;
let lt_compatible = self.lt_compatible;
let cancel_ext = self.cancel_ext.clone();
let handle = tokio::spawn(async move {
let lt = lt.max(min_lt.load(Ordering::Relaxed));
let full_collated_data = config.has_capability(GlobalCapabilities::CapFullCollatedData);
Expand All @@ -931,10 +924,24 @@ impl ExecutionManager {
collated_block_descr,
shard_acc.account_id()
);
let config = config.clone(); // TODO: use Arc
if cancel_ext.load(Ordering::Relaxed) {
log::debug!(
"{}: account {:x} ext message cancelled by cutoff timeout",
collated_block_descr,
shard_acc.account_id()
);
let transaction_res = Err(error!("cancelled by cutoff timeout"));
wait_tr.respond(Some((new_msg, msg_metadata, transaction_res)));
continue;
}

shard_acc.fetch_max_lt(min_lt.load(Ordering::Relaxed));
let config = config.clone(); // TODO: use Arc

let mut min_lt = min_lt.load(Ordering::Relaxed);
if let AsyncMessage::Deferred(enq) = &*new_msg {
min_lt = min_lt.max(enq.emitted_lt().saturating_add(1));
};
shard_acc.fetch_max_lt(min_lt);
let mut account = shard_acc.account().clone();

let params = ExecuteParams {
Expand Down Expand Up @@ -3295,7 +3302,7 @@ impl Collator {
let account_id = collator_data.config.raw_config().config_addr.clone();
self.create_ticktock_transaction(account_id, tock, prev_data, collator_data, exec_manager)
.await?;
exec_manager.wait_transactions(collator_data).await?;
self.wait_transactions(exec_manager, collator_data, false).await?;
Ok(())
}

Expand Down Expand Up @@ -3373,7 +3380,7 @@ impl Collator {
)
.await?;

exec_manager.wait_transactions(collator_data).await?;
self.wait_transactions(exec_manager, collator_data, false).await?;

Ok(())
}
Expand Down Expand Up @@ -3485,7 +3492,7 @@ impl Collator {
if to_us {
let account_id = enq.dst_account_id().clone();
log::debug!(
"{}: message {:x} sent to execution to account {account_id:x}",
"{}: internal message {:x} sent to execution to account {account_id:x}",
self.collated_block_descr,
key.hash,
);
Expand Down Expand Up @@ -3604,9 +3611,8 @@ impl Collator {
}
let (_, account_id) = header.dst.extract_std_address(true)?;
log::debug!(
"{}: message {:x} sent to execution",
"{}: external message {msg_id:x} sent to execution to account {account_id:x}",
self.collated_block_descr,
msg_id
);
let msg = AsyncMessage::Ext(msg, msg_cell, msg_id);
let initiator_addr =
Expand All @@ -3626,7 +3632,7 @@ impl Collator {
}
self.check_stop_flag()?;
}
exec_manager.wait_transactions(collator_data).await?;
self.wait_transactions(exec_manager, collator_data, true).await?;
let accepted = mem::take(&mut collator_data.accepted_ext_messages);
let rejected = mem::take(&mut collator_data.rejected_ext_messages);
self.engine.complete_external_messages(rejected, accepted)?;
Expand Down Expand Up @@ -3700,11 +3706,11 @@ impl Collator {
}
Ok(None)
} else {
collator_data.update_last_proc_int_msg((msg.enq.lt(), msg_hash.clone()))?;
log::debug!(
"{}: new message {msg_hash:x} sent to execution",
"{}: new message {msg_hash:x} sent to execution to account {account_id:x}",
self.collated_block_descr,
);
collator_data.update_last_proc_int_msg((msg.enq.lt(), msg_hash.clone()))?;
let msg = if from_dispatch_queue {
AsyncMessage::Deferred(msg.enq)
} else {
Expand Down Expand Up @@ -3749,13 +3755,37 @@ impl Collator {
}
self.check_stop_flag()?;
}
exec_manager.wait_transactions(collator_data).await?;
self.wait_transactions(exec_manager, collator_data, false).await?;
self.check_stop_flag()?;
}

Ok(())
}

// waits and finalizes all parallel tasks
async fn wait_transactions(
&self,
exec_manager: &mut ExecutionManager,
collator_data: &mut CollatorData,
allow_cancel: bool,
) -> Result<()> {
log::trace!("{}: wait_transactions", self.collated_block_descr);
while exec_manager.wait_tr.count() != 0 {
if allow_cancel && self.check_cutoff_timeout() {
log::warn!(
"{}: TIMEOUT is elapsed, cancelling remaining external messages",
self.collated_block_descr
);
exec_manager.cancel_ext.store(true, Ordering::Relaxed);
}
exec_manager.wait_transaction(collator_data).await?;
}
exec_manager
.min_lt
.fetch_max(exec_manager.max_lt.load(Ordering::Relaxed), Ordering::Relaxed);
Ok(())
}

fn update_processed_upto(
&self,
mc_data: &McData,
Expand Down
10 changes: 8 additions & 2 deletions src/node/src/validator/validate_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3307,7 +3307,7 @@ impl ValidateQuery {
};
let (workchain_id, addr) = dst.extract_std_address(true).map_err(|err| {
error!(
"destination of inbound internal message with hash {key:x} \
"destination {dst} of inbound internal message with hash {key:x} \
is an invalid blockchain address {err}"
)
})?;
Expand Down Expand Up @@ -3495,9 +3495,15 @@ impl ValidateQuery {
}

if from_dispatch_queue {
let (_, addr) = src.extract_std_address(true).map_err(|err| {
error!(
"source {src} of deferred inbound message with hash {key:x} \
is an invalid blockchain address {err}"
)
})?;
// Check that the message was removed from DispatchQueue
let Some(dispatched_msg_env_cell) =
base.removed_dispatch_queue_messages(src.address(), created_lt)
base.removed_dispatch_queue_messages(&addr, created_lt)
else {
reject_query!(
"deferred InMsg with src_addr={addr:x} lt={created_lt} was not removed from \
Expand Down
Loading