@@ -22,16 +22,17 @@ use ton_block::{
2222#[ path = "tests/test_ext_messages.rs" ]
2323mod tests;
2424
25- const MESSAGE_LIFETIME : u32 = 600 ; // seconds
26- const MESSAGE_MAX_GENERATIONS : u8 = 3 ;
27-
25+ const LIMIT_MEMPOOL_PER_ADDRESS : u32 = 256 ;
2826const MAX_EXTERNAL_MESSAGE_DEPTH : u16 = 512 ;
2927pub const MAX_EXTERNAL_MESSAGE_SIZE : usize = 65535 ;
28+ const MESSAGE_LIFETIME : u32 = 600 ; // seconds
29+ const MESSAGE_MAX_GENERATIONS : u8 = 3 ;
3030
3131pub const EXT_MESSAGES_TRACE_TARGET : & str = "ext_messages" ;
3232
3333#[ derive( Clone ) ]
3434struct MessageKeeper {
35+ addr_key : ( i32 , UInt256 ) ,
3536 message : Arc < Message > ,
3637
3738 // active: bool, 0x1_00_00000000
@@ -41,11 +42,11 @@ struct MessageKeeper {
4142}
4243
4344impl MessageKeeper {
44- fn new ( message : Arc < Message > ) -> Self {
45+ fn new ( message : Arc < Message > , addr_key : ( i32 , UInt256 ) ) -> Self {
4546 let mut atomic_storage = 0 ;
4647 Self :: set_active ( & mut atomic_storage, true ) ;
4748
48- Self { message, atomic_storage : Arc :: new ( AtomicU64 :: new ( atomic_storage) ) }
49+ Self { addr_key , message, atomic_storage : Arc :: new ( AtomicU64 :: new ( atomic_storage) ) }
4950 }
5051
5152 fn message ( & self ) -> & Arc < Message > {
@@ -139,6 +140,8 @@ impl OrderMap {
139140}
140141
141142pub struct MessagesPool {
143+ // per-address message count for rate limiting (key = (workchain_id, account_id))
144+ per_address : Map < ( i32 , UInt256 ) , AtomicU32 > ,
142145 // map by hash of message
143146 messages : Map < UInt256 , MessageKeeper > ,
144147 // map by timestamp, inside map by seqno for hash of message, workchain_id and prefix of dst address
@@ -160,6 +163,7 @@ impl MessagesPool {
160163 metrics:: gauge!( "ton_node_ext_messages_queue_size" ) . set ( 0f64 ) ;
161164
162165 Self {
166+ per_address : Map :: new ( ) ,
163167 messages : Map :: with_hasher ( Default :: default ( ) ) ,
164168 order : Map :: with_hasher ( Default :: default ( ) ) ,
165169 min_timestamp : AtomicU32 :: new ( now) ,
@@ -202,11 +206,35 @@ impl MessagesPool {
202206 }
203207 }
204208
205- log:: debug!( target: EXT_MESSAGES_TRACE_TARGET , "adding external message {:x}" , id) ;
206209 let workchain_id = message. dst_workchain_id ( ) . unwrap_or_default ( ) ;
210+ let account_id = message
211+ . int_dst_account_id ( )
212+ . map_or ( UInt256 :: default ( ) , |s| UInt256 :: from_slice ( & s. get_bytestring ( 0 ) ) ) ;
213+ let addr_key = ( workchain_id, account_id) ;
214+
215+ // Per-address rate limiting
216+ if let Some ( guard) = self . per_address . get ( & addr_key) {
217+ if guard. val ( ) . load ( Ordering :: Relaxed ) >= LIMIT_MEMPOOL_PER_ADDRESS {
218+ fail ! (
219+ "per-address limit ({}) reached for {}:{}" ,
220+ LIMIT_MEMPOOL_PER_ADDRESS ,
221+ workchain_id,
222+ addr_key. 1 . to_hex_string( )
223+ )
224+ }
225+ }
226+
227+ log:: debug!( target: EXT_MESSAGES_TRACE_TARGET , "adding external message {:x}" , id) ;
207228 let prefix =
208229 message. int_dst_account_id ( ) . map_or ( 0 , |slice| slice. get_int ( 64 ) . unwrap_or_default ( ) ) ;
209- self . messages . insert ( id. clone ( ) , MessageKeeper :: new ( message) ) ;
230+ self . messages . insert ( id. clone ( ) , MessageKeeper :: new ( message, addr_key. clone ( ) ) ) ;
231+
232+ // Increment per-address counter
233+ if let Some ( guard) = self . per_address . get ( & addr_key) {
234+ guard. val ( ) . fetch_add ( 1 , Ordering :: Relaxed ) ;
235+ } else {
236+ self . per_address . insert ( addr_key, AtomicU32 :: new ( 1 ) ) ;
237+ }
210238 self . total_messages . fetch_add ( 1 , Ordering :: Relaxed ) ;
211239 #[ cfg( test) ]
212240 self . total_in_order . fetch_add ( 1 , Ordering :: Relaxed ) ;
@@ -253,12 +281,13 @@ impl MessagesPool {
253281 true
254282 }
255283 } ) ;
256- if result . is_some ( ) {
284+ if let Some ( guard ) = result {
257285 log:: debug!(
258286 target: EXT_MESSAGES_TRACE_TARGET ,
259287 "complete_messages: removing external message {:x} with reason {} because can't postpone" ,
260288 id, reason,
261289 ) ;
290+ self . decrement_per_address ( & guard. val ( ) . addr_key ) ;
262291 metrics:: gauge!( "ton_node_ext_messages_queue_size" ) . decrement ( 1f64 ) ;
263292 self . total_messages . fetch_sub ( 1 , Ordering :: Relaxed ) ;
264293 }
@@ -279,6 +308,15 @@ impl MessagesPool {
279308 ) ;
280309 }
281310
311+ fn decrement_per_address ( & self , addr_key : & ( i32 , UInt256 ) ) {
312+ if let Some ( guard) = self . per_address . get ( addr_key) {
313+ let prev = guard. val ( ) . fetch_sub ( 1 , Ordering :: Relaxed ) ;
314+ if prev <= 1 {
315+ self . per_address . remove ( addr_key) ;
316+ }
317+ }
318+ }
319+
282320 fn clear_expired_messages ( & self , timestamp : u32 , finish_time_ms : u64 ) -> bool {
283321 let order = match self . order . get ( & timestamp) {
284322 Some ( guard) => guard. val ( ) . clone ( ) ,
@@ -301,6 +339,7 @@ impl MessagesPool {
301339 target: EXT_MESSAGES_TRACE_TARGET ,
302340 "removing external message {:x} because it is expired" , guard. key( )
303341 ) ;
342+ self . decrement_per_address ( & guard. val ( ) . addr_key ) ;
304343 self . total_messages . fetch_sub ( 1 , Ordering :: Relaxed ) ;
305344 }
306345 #[ cfg( test) ]
@@ -342,8 +381,9 @@ pub struct MessagePoolIter {
342381
343382impl MessagePoolIter {
344383 fn new ( pool : Arc < MessagesPool > , shard : ShardIdent , now : u32 , finish_time_ms : u64 ) -> Self {
345- let timestamp = pool. min_timestamp . load ( Ordering :: Relaxed ) ;
346- Self { pool, shard, now, timestamp, seqno : 0 , finish_time_ms }
384+ // Start from newest messages (now) and iterate backwards to oldest
385+ // (matching C++ behavior where newest messages get priority)
386+ Self { pool, shard, now, timestamp : now, seqno : 0 , finish_time_ms }
347387 }
348388
349389 fn find_in_map (
@@ -371,20 +411,33 @@ impl Iterator for MessagePoolIter {
371411 type Item = ( Arc < Message > , UInt256 ) ;
372412
373413 fn next ( & mut self ) -> Option < Self :: Item > {
374- // iterate timestamp
375414 let now = UnixTime :: now_ms ( ) ;
376- while self . timestamp <= self . now {
415+ let min_timestamp = self . pool . min_timestamp . load ( Ordering :: Relaxed ) ;
416+ // Iterate from newest to oldest (reverse chronological order)
417+ loop {
377418 if self . finish_time_ms < now {
378419 return None ;
379420 }
380- // check if this order map is expired
421+ if self . timestamp < min_timestamp {
422+ return None ;
423+ }
424+ // Check if this timestamp is expired
381425 if self . timestamp + MESSAGE_LIFETIME < self . now {
382- if !self . pool . clear_expired_messages ( self . timestamp , self . finish_time_ms ) {
426+ if self . timestamp == min_timestamp {
427+ if !self . pool . clear_expired_messages ( self . timestamp , self . finish_time_ms ) {
428+ return None ;
429+ }
430+ self . pool . increment_min_timestamp ( self . timestamp ) ;
431+ }
432+ // Skip expired timestamps
433+ if self . timestamp == 0 {
383434 return None ;
384435 }
385- // level was removed or not present try to move bottom margin
386- self . pool . increment_min_timestamp ( self . timestamp ) ;
387- } else if let Some ( order) =
436+ self . timestamp -= 1 ;
437+ self . seqno = 0 ;
438+ continue ;
439+ }
440+ if let Some ( order) =
388441 self . pool . order . get ( & self . timestamp ) . map ( |guard| guard. val ( ) . clone ( ) )
389442 {
390443 while self . seqno < order. seqno . load ( Ordering :: Relaxed ) {
@@ -398,10 +451,12 @@ impl Iterator for MessagePoolIter {
398451 }
399452 }
400453 }
401- self . timestamp += 1 ;
454+ if self . timestamp == 0 {
455+ return None ;
456+ }
457+ self . timestamp -= 1 ;
402458 self . seqno = 0 ;
403459 }
404- None
405460 }
406461}
407462
0 commit comments