Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions src/adnl/raptorq/src/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ impl EncodingPacket {
#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))]
pub struct ObjectTransmissionInformation {
transfer_length: u64, // Limited to u40
symbol_size: u16,
symbol_size: u32,
num_source_blocks: u8,
num_sub_blocks: u16,
symbol_alignment: u8,
Expand All @@ -113,14 +113,14 @@ pub struct ObjectTransmissionInformation {
impl ObjectTransmissionInformation {
pub fn new(
transfer_length: u64,
symbol_size: u16,
symbol_size: u32,
source_blocks: u8,
sub_blocks: u16,
alignment: u8,
) -> ObjectTransmissionInformation {
// See errata (https://www.rfc-editor.org/errata/eid5548)
assert!(transfer_length <= 942574504275);
assert_eq!(symbol_size % alignment as u16, 0);
assert_eq!(symbol_size % alignment as u32, 0);
// See section 4.4.1.2. "These parameters MUST be set so that ceil(ceil(F/T)/Z) <= K'_max."

if (symbol_size != 0) && (source_blocks != 0) {
Expand All @@ -140,20 +140,28 @@ impl ObjectTransmissionInformation {
}
}

#[deprecated(note = "\
RFC 5052 wire format, only encodes symbol_size as u16. \
Not used in TON — TL serialization is used instead\
")]
pub fn deserialize(data: &[u8; 12]) -> ObjectTransmissionInformation {
ObjectTransmissionInformation {
transfer_length: ((data[0] as u64) << 32)
+ ((data[1] as u64) << 24)
+ ((data[2] as u64) << 16)
+ ((data[3] as u64) << 8)
+ (data[4] as u64),
symbol_size: ((data[6] as u16) << 8) + data[7] as u16,
symbol_size: ((data[6] as u32) << 8) + data[7] as u32,
num_source_blocks: data[8],
num_sub_blocks: ((data[9] as u16) << 8) + data[10] as u16,
symbol_alignment: data[11],
}
}

#[deprecated(note = "\
RFC 5052 wire format, only encodes symbol_size as u16. \
Not used in TON — TL serialization is used instead\
")]
pub fn serialize(&self) -> [u8; 12] {
[
((self.transfer_length >> 32) & 0xFF) as u8,
Expand All @@ -175,7 +183,7 @@ impl ObjectTransmissionInformation {
self.transfer_length
}

pub fn symbol_size(&self) -> u16 {
pub fn symbol_size(&self) -> u32 {
self.symbol_size
}

Expand All @@ -196,9 +204,9 @@ impl ObjectTransmissionInformation {
max_packet_size: u16,
decoder_memory_requirement: u64,
) -> ObjectTransmissionInformation {
let alignment = 8;
let alignment: u16 = 8;
assert!(max_packet_size >= alignment);
let symbol_size = max_packet_size - (max_packet_size % alignment);
let symbol_size = (max_packet_size - (max_packet_size % alignment)) as u32;
let sub_symbol_size = 8;

let kt = int_div_ceil(transfer_length, symbol_size as u64);
Expand Down
16 changes: 7 additions & 9 deletions src/adnl/raptorq/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Decoder {
#[cfg_attr(feature = "serde_support", derive(Serialize, Deserialize))]
pub struct SourceBlockDecoder {
source_block_id: u8,
symbol_size: u16,
symbol_size: u32,
num_sub_blocks: u16,
symbol_alignment: u8,
source_block_symbols: u32,
Expand All @@ -137,7 +137,7 @@ impl SourceBlockDecoder {
note = "Use the new2() function instead. In version 2.0, that function will replace this one"
)]
#[cfg(feature = "std")]
pub fn new(source_block_id: u8, symbol_size: u16, block_length: u64) -> SourceBlockDecoder {
pub fn new(source_block_id: u8, symbol_size: u32, block_length: u64) -> SourceBlockDecoder {
let config = ObjectTransmissionInformation::new(0, symbol_size, 0, 1, 1);
SourceBlockDecoder::new2(source_block_id, &config, block_length)
}
Expand Down Expand Up @@ -175,10 +175,8 @@ impl SourceBlockDecoder {
}

fn unpack_sub_blocks(&self, result: &mut [u8], symbol: &Symbol, symbol_index: usize) {
let (tl, ts, nl, ns) = partition(
(self.symbol_size / self.symbol_alignment as u16) as u32,
self.num_sub_blocks,
);
let (tl, ts, nl, ns) =
partition(self.symbol_size / self.symbol_alignment as u32, self.num_sub_blocks);

let mut symbol_offset = 0;
let mut sub_block_offset = 0;
Expand Down Expand Up @@ -283,7 +281,7 @@ impl SourceBlockDecoder {

let mut encoded_indices = vec![];
// See section 5.3.3.4.2. There are S + H zero symbols to start the D vector
let mut d = vec![Symbol::zero(self.symbol_size); s + h];
let mut d = vec![Symbol::zero(self.symbol_size as usize); s + h];
for (i, source) in self.source_symbols.iter().enumerate() {
if let Some(symbol) = source {
encoded_indices.push(i as u32);
Expand All @@ -294,7 +292,7 @@ impl SourceBlockDecoder {
// Append the extended padding symbols
for i in self.source_block_symbols..num_extended_symbols {
encoded_indices.push(i);
d.push(Symbol::zero(self.symbol_size));
d.push(Symbol::zero(self.symbol_size as usize));
}

for repair_packet in self.repair_packets.iter() {
Expand Down Expand Up @@ -328,7 +326,7 @@ impl SourceBlockDecoder {
sys_index: u32,
p1: u32,
) -> Symbol {
let mut rebuilt = Symbol::zero(self.symbol_size);
let mut rebuilt = Symbol::zero(self.symbol_size as usize);
let tuple = intermediate_tuple(source_symbol_id, lt_symbols, sys_index, p1);

for i in enc_indices(tuple, lt_symbols, pi_symbols, p1) {
Expand Down
6 changes: 3 additions & 3 deletions src/adnl/raptorq/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl SourceBlockEncoder {
since = "1.3.0",
note = "Use the new2() function instead. In version 2.0, that function will replace this one"
)]
pub fn new(source_block_id: u8, symbol_size: u16, data: &[u8]) -> SourceBlockEncoder {
pub fn new(source_block_id: u8, symbol_size: u32, data: &[u8]) -> SourceBlockEncoder {
let config = ObjectTransmissionInformation::new(0, symbol_size, 0, 1, 1);
SourceBlockEncoder::new2(source_block_id, &config, data)
}
Expand All @@ -196,7 +196,7 @@ impl SourceBlockEncoder {
if config.sub_blocks() > 1 {
let mut symbols = vec![vec![]; data.len() / config.symbol_size() as usize];
let (tl, ts, nl, ns) = partition(
(config.symbol_size() / config.symbol_alignment() as u16) as u32,
config.symbol_size() / config.symbol_alignment() as u32,
config.sub_blocks(),
);
// Divide the block into sub-blocks and then concatenate the sub-symbols into symbols
Expand Down Expand Up @@ -247,7 +247,7 @@ impl SourceBlockEncoder {
)]
pub fn with_encoding_plan(
source_block_id: u8,
symbol_size: u16,
symbol_size: u32,
data: &[u8],
plan: &SourceBlockEncodingPlan,
) -> SourceBlockEncoder {
Expand Down
5 changes: 1 addition & 4 deletions src/adnl/raptorq/src/octets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,10 +633,7 @@ unsafe fn store_neon(ptr: *mut uint8x16_t, value: uint8x16_t) {
#[cfg(target_arch = "arm")]
use std::arch::arm::*;

// TODO: replace with vst1q_u8 when it's supported
let reinterp = vreinterpretq_u64_u8(value);
*(ptr as *mut u64) = vgetq_lane_u64(reinterp, 0);
*(ptr as *mut u64).add(1) = vgetq_lane_u64(reinterp, 1);
vst1q_u8(ptr as *mut u8, value);
}

#[cfg(all(target_arch = "aarch64", feature = "std"))]
Expand Down
7 changes: 1 addition & 6 deletions src/adnl/src/overlay/broadcast.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,8 +1675,6 @@ pub(crate) struct BroadcastTwostepFecProtocol {
}

impl BroadcastTwostepFecProtocol {
const MAX_PART_SIZE: usize = 65536;

pub(crate) fn for_recv() -> Self {
Self { extra: None, send_ctx: None }
}
Expand All @@ -1690,9 +1688,6 @@ impl BroadcastTwostepFecProtocol {
}
let k = ((neighbours as usize) * 2 - 2) / 3;
let part_size = (data.len() + k - 1) / k;
if part_size >= Self::MAX_PART_SIZE {
fail!("Too big part size {part_size} in {} broadcast", Self::broadcast_type());
}
let ctx = BroadcastTwostepSendContext { neighbours, part_size };
Ok(Self { extra: Some(extra), send_ctx: Some(ctx) })
}
Expand Down Expand Up @@ -1819,7 +1814,7 @@ impl BroadcastProtocol<BroadcastTwostepFec> for BroadcastTwostepFecProtocol {
bcast_id: *bcast_id,
data_hash: sha256_digest(ctx.data.object),
date,
encoder: RaptorqEncoder::with_data(ctx.data.object, Some(send_ctx.part_size as u16)),
encoder: RaptorqEncoder::with_data(ctx.data.object, Some(send_ctx.part_size as u32)),
extra: self.extra.take().unwrap_or_default(),
flags: ctx.flags,
seqno: 0,
Expand Down
17 changes: 8 additions & 9 deletions src/adnl/src/rldp/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,8 @@ impl RaptorqDecoder {
/// Construct with parameters
pub fn with_params(params: FecTypeRaptorQ) -> Result<Self> {
const MAX_SOURCE_SYMBOLS: i32 = 56_403; // K'_max per RFC 6330 §5.1.2
if (params.symbol_size <= 0) || (params.symbol_size > u16::MAX as i32) {
fail!(
"Invalid FEC params: symbol_size must be in 1..={}, got {}",
u16::MAX,
params.symbol_size
);
if params.symbol_size <= 0 {
fail!("Invalid FEC params: symbol_size must be > 0, got {}", params.symbol_size);
}
if params.data_size <= 0 {
fail!("Invalid FEC params: data_size must be > 0, got {}", params.data_size);
Expand All @@ -77,12 +73,15 @@ impl RaptorqDecoder {
} else {
// Two-step broadcast case: symbol_size was set by the sender without alignment
// rounding (alignment=1). Use the same config as the encoder.
// symbol_size can exceed u16::MAX for large blocks with few validators
// (matches C++ behaviour which uses size_t for symbol_size).
//
// With source_blocks=1, raptorq asserts ceil(data_size/symbol_size) <=
// MAX_SOURCE_SYMBOLS_PER_BLOCK (K'_max = 56403, RFC 6330 §5.1.2).
// Validate before calling to prevent a panic on malformed network messages.
let source_symbols = (params.data_size + params.symbol_size - 1) / params.symbol_size;
if source_symbols > MAX_SOURCE_SYMBOLS {
let source_symbols = (params.data_size as i64 + params.symbol_size as i64 - 1)
/ params.symbol_size as i64;
if source_symbols > MAX_SOURCE_SYMBOLS as i64 {
fail!(
"Invalid FEC params: source symbol count {source_symbols} \
exceeds raptorq limit {MAX_SOURCE_SYMBOLS} (data_size={}, symbol_size={})",
Expand All @@ -92,7 +91,7 @@ impl RaptorqDecoder {
}
raptorq::ObjectTransmissionInformation::new(
params.data_size as u64,
params.symbol_size as u16,
params.symbol_size as u32,
1,
1,
1,
Expand Down
6 changes: 3 additions & 3 deletions src/adnl/src/rldp/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub struct RaptorqEncoder {

impl RaptorqEncoder {
/// Construct over data
pub fn with_data(data: &[u8], symbol: Option<u16>) -> Self {
pub fn with_data(data: &[u8], symbol: Option<u32>) -> Self {
let engine =
if let Some(symbol_size) = symbol.filter(|&s| s as usize != Constraints::SYMBOL) {
// symbol_size is set for the two-step FEC broadcast case where data is
Expand All @@ -59,8 +59,8 @@ impl RaptorqEncoder {
// and k is bounded by the neighbour count (typically a few dozen, at most
// ~130), which is far below K'_max = 56403 (RFC 6330 max symbols per block).
//
// sub_blocks=1: the data fits comfortably in memory since part_size < 65536
// and kt <= k << K'_max, so no sub-block splitting is needed.
// sub_blocks=1: the data fits comfortably in memory since
// kt <= k << K'_max, so no sub-block splitting is needed.
let config = raptorq::ObjectTransmissionInformation::new(
data.len() as u64,
symbol_size,
Expand Down
93 changes: 92 additions & 1 deletion src/adnl/tests/test_overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1627,7 +1627,7 @@ async fn test_overlay_semiprivate() -> Result<()> {
fn test_overlay_raptorq() {
use rand::{seq::SliceRandom, SeedableRng};

fn run(symbol: Option<u16>) {
fn run(symbol: Option<u32>) {
let seed: u64 = rand::random();
println!("test_overlay_raptorq seed: {seed}");
let mut rng = rand::rngs::StdRng::seed_from_u64(seed);
Expand Down Expand Up @@ -1778,3 +1778,94 @@ fn test_overlay_raptorq() {
println!("--- symbol=Some(771) (alignment=1) ---");
run(Some(771));
}

/// Test that RaptorQ encode/decode works with symbol_size > 65535 (u16 limit).
/// This matches the C++ behaviour where symbol_size is size_t.
/// Simulates TwostepFec for 800KB data with 10 parties:
/// k = (10*2-2)/3 = 6, part_size = ceil(819200/6) = 136534
#[test]
fn test_raptorq_large_symbol_size() {
use adnl::{RaptorqDecoder, RaptorqEncoder};
use rand::Rng;

const DATA_SIZE: usize = 800 * 1024;

for num_parties in [5u32, 10, 20] {
let k = ((num_parties as usize) * 2 - 2) / 3;
let part_size = (DATA_SIZE + k - 1) / k;

println!(
"--- parties={num_parties}, k={k}, part_size={part_size} (>65535: {}) ---",
part_size > 65535
);

// Generate random data
let mut rng = rand::thread_rng();
let data: Vec<u8> = (0..DATA_SIZE).map(|_| rng.gen()).collect();

// Encode with large symbol_size
let mut encoder = RaptorqEncoder::with_data(&data, Some(part_size as u32));
let params = encoder.params().clone();
println!(
" params: data_size={}, symbol_size={}, symbols_count={}",
params.data_size, params.symbol_size, params.symbols_count
);
assert_eq!(params.data_size, DATA_SIZE as i32);
assert_eq!(params.symbol_size, part_size as i32);

// Collect source + repair symbols
let source_count = params.symbols_count as usize;
let mut packets: Vec<(u32, Vec<u8>)> = Vec::new();
let mut seqno = 0u32;
for _ in 0..(source_count * 3 / 2) {
let chunk = encoder.encode(&mut seqno).unwrap();
packets.push((seqno, chunk));
seqno += 1;
}
assert!(
packets.len() >= source_count,
"Not enough packets generated: {} < {source_count}",
packets.len()
);

// Decode using exactly source_count symbols (minimum required)
let mut decoder = RaptorqDecoder::with_params(params.clone())
.expect("decoder creation must succeed for large symbol_size");

let mut decoded = None;
for (seq, chunk) in packets.iter().take(source_count + 2) {
if let Some(result) = decoder.decode(*seq, chunk) {
decoded = Some(result);
break;
}
}

let result = decoded.expect("decode must succeed");
assert_eq!(result.len(), DATA_SIZE, "decoded size mismatch");
assert_eq!(result, data, "decoded data mismatch");
println!(" OK: encode/decode verified for symbol_size={part_size}");
}
}

/// Verify that the RaptorQ encoder produces valid FEC symbols for large
/// part_size values (>65535) that match the C++ TwostepFec behaviour.
#[test]
fn test_twostep_fec_encoder_large_symbols() {
let data = vec![0x42u8; 800 * 1024];
for neighbours in [5u32, 10, 20] {
let k = ((neighbours as usize) * 2 - 2) / 3;
let part_size = (data.len() + k - 1) / k;

let mut encoder = RaptorqEncoder::with_data(&data, Some(part_size as u32));
let params = encoder.params().clone();
assert_eq!(params.data_size, data.len() as i32);
assert_eq!(params.symbol_size, part_size as i32);

// Generate one symbol per neighbour (like broadcast-twostep.cpp)
let mut seqno = 0u32;
for _ in 0..neighbours {
let chunk = encoder.encode(&mut seqno).unwrap();
assert_eq!(chunk.len(), part_size, "symbol size mismatch");
}
}
}
4 changes: 4 additions & 0 deletions src/node/tests/compat_test/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,7 @@ path = "tests/test_quic_transport.rs"
[[test]]
name = "test_quic_overlay"
path = "tests/test_quic_overlay.rs"

[[test]]
name = "test_raptorq"
path = "tests/test_raptorq.rs"
7 changes: 7 additions & 0 deletions src/node/tests/compat_test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@ BUILD_DIR := $(ROOT_DIR)/build
CPP_BUILD_DIR ?= $(BUILD_DIR)/cpp
RUST_TARGET_DIR := $(BUILD_DIR)/rust

# Resolve CPP_SRC_PATH to absolute to avoid breakage when CMake runs from build dir
CPP_SRC_PATH_ORIG := $(CPP_SRC_PATH)
CPP_SRC_PATH := $(shell cd "$(CPP_SRC_PATH)" 2>/dev/null && pwd)
ifeq ($(CPP_SRC_PATH),)
$(error CPP_SRC_PATH "$(CPP_SRC_PATH_ORIG)" is not a valid directory)
endif

# C++ source files location
CPP_TEST_SRC := $(ROOT_DIR)/cpp_src

Expand Down
Loading