Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions forester-utils/src/address_staging_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl AddressStagingTree {
low_element_next_values: &[[u8; 32]],
low_element_indices: &[u64],
low_element_next_indices: &[u64],
low_element_proofs: &[Vec<[u8; 32]>],
low_element_proofs: &[[[u8; 32]; HEIGHT]],
leaves_hashchain: [u8; 32],
zkp_batch_size: usize,
epoch: u64,
Expand All @@ -145,15 +145,12 @@ impl AddressStagingTree {
let inputs = get_batch_address_append_circuit_inputs::<HEIGHT>(
next_index,
old_root,
low_element_values.to_vec(),
low_element_next_values.to_vec(),
low_element_indices.iter().map(|v| *v as usize).collect(),
low_element_next_indices
.iter()
.map(|v| *v as usize)
.collect(),
low_element_proofs.to_vec(),
addresses.to_vec(),
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
addresses,
&mut self.sparse_tree,
leaves_hashchain,
zkp_batch_size,
Expand Down
90 changes: 74 additions & 16 deletions forester/src/processor/v2/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
indexer::{AddressQueueData, Indexer, QueueElementsV2Options, StateQueueData},
rpc::Rpc,
};
use light_hasher::hash_chain::create_hash_chain_from_slice;

use crate::processor::v2::{common::clamp_to_u16, BatchContext};

Expand All @@ -22,6 +23,17 @@
}
}

#[derive(Debug, Clone)]
pub struct AddressBatchSnapshot<const HEIGHT: usize> {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
pub low_element_proofs: Vec<[[u8; 32]; HEIGHT]>,
pub leaves_hashchain: [u8; 32],
}

pub async fn fetch_zkp_batch_size<R: Rpc>(context: &BatchContext<R>) -> crate::Result<u64> {
let rpc = context.rpc_pool.get_connection().await?;
let mut account = rpc
Expand Down Expand Up @@ -474,20 +486,71 @@
}
}

pub fn get_batch_data(&self, start: usize, end: usize) -> Option<BatchDataSlice> {
pub fn get_batch_snapshot<const HEIGHT: usize>(
&self,
start: usize,
end: usize,
hashchain_idx: usize,
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
let available = self.wait_for_batch(end);
if start >= available {
return None;
if available < end || start >= end {
return Ok(None);
}
let actual_end = end.min(available);
let actual_end = end;
let data = lock_recover(&self.data, "streaming_address_queue.data");
Some(BatchDataSlice {
addresses: data.addresses[start..actual_end].to_vec(),

for (name, len) in [

Check warning on line 502 in forester/src/processor/v2/helpers.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/light-protocol/light-protocol/forester/src/processor/v2/helpers.rs

Check warning on line 502 in forester/src/processor/v2/helpers.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/light-protocol/light-protocol/forester/src/processor/v2/helpers.rs
("addresses", data.addresses.len()),
("low_element_values", data.low_element_values.len()),
("low_element_next_values", data.low_element_next_values.len()),
("low_element_indices", data.low_element_indices.len()),
("low_element_next_indices", data.low_element_next_indices.len()),
] {
if len < actual_end {
return Err(anyhow!(
"incomplete batch data: {} len {} < required end {}",
name,
len,
actual_end
));
}
}

let addresses = data.addresses[start..actual_end].to_vec();
if addresses.is_empty() {
return Err(anyhow!("Empty batch at start={}", start));
}

let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
Some(hashchain) => hashchain,
None => {
tracing::debug!(
"Missing leaves_hash_chain for batch {} (available: {}), deriving from addresses",
hashchain_idx,
data.leaves_hash_chains.len()
);
create_hash_chain_from_slice(&addresses).map_err(|error| {
anyhow!(
"Failed to derive leaves_hash_chain for batch {} from {} addresses: {}",
hashchain_idx,
addresses.len(),
error
)
})?
}
};

Ok(Some(AddressBatchSnapshot {
low_element_values: data.low_element_values[start..actual_end].to_vec(),
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),

Check warning on line 545 in forester/src/processor/v2/helpers.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/light-protocol/light-protocol/forester/src/processor/v2/helpers.rs

Check warning on line 545 in forester/src/processor/v2/helpers.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/light-protocol/light-protocol/forester/src/processor/v2/helpers.rs
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
})
low_element_proofs: data.reconstruct_proofs::<HEIGHT>(start..actual_end).map_err(
|error| anyhow!("incomplete batch data: failed to reconstruct proofs: {error}"),
)?,
addresses,
leaves_hashchain,
}))
Comment on lines +489 to +553
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only return complete, internally consistent address batches.

Lines 495-534 can shrink the requested range to available and then slice the companion vectors unchecked. If background fetching ends early or the indexer returns mismatched vector lengths, this either panics or feeds process_batch a truncated batch with a derived hashchain. Short or inconsistent batches should stay "not ready" until the full fixed-size batch is present.

Suggested guard
-        let actual_end = end.min(available);
+        if available < end {
+            return Ok(None);
+        }
+        let actual_end = end;
         let data = lock_recover(&self.data, "streaming_address_queue.data");

-        let addresses = data.addresses[start..actual_end].to_vec();
+        let addresses = data
+            .addresses
+            .get(start..actual_end)
+            .ok_or_else(|| anyhow!("addresses shorter than requested batch"))?
+            .to_vec();
+        // Apply the same checked slicing to every parallel vector before
+        // constructing `AddressBatchSnapshot`.
Based on learnings: ZKP batch sizes for address trees must be 10 or 250 in `src/initialize_address_tree.rs`. Only these circuit sizes are supported for address trees.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v2/helpers.rs` around lines 489 - 534, The
get_batch_snapshot function must only return fully available, internally
consistent fixed-size batches: do not shrink the requested range to available;
instead, if end > self.wait_for_batch(end) return Ok(None). Also verify that all
companion vectors (data.addresses, data.low_element_values,
data.low_element_next_values, data.low_element_indices,
data.low_element_next_indices, and data.reconstruct_proofs range) have at least
end elements and that their slices [start..end] produce the same length as
addresses before constructing AddressBatchSnapshot; if any length mismatch or
addresses.is_empty() then return Ok(None) (not Err or a truncated snapshot).
Keep the existing fallback for missing leaves_hash_chains but only use it when
the full batch is present.

}

pub fn into_data(self) -> AddressQueueData {
Expand Down Expand Up @@ -522,6 +585,10 @@
lock_recover(&self.data, "streaming_address_queue.data").start_index
}

pub fn tree_next_insertion_index(&self) -> u64 {
lock_recover(&self.data, "streaming_address_queue.data").tree_next_insertion_index
}

pub fn subtrees(&self) -> Vec<[u8; 32]> {
lock_recover(&self.data, "streaming_address_queue.data")
.subtrees
Expand Down Expand Up @@ -553,15 +620,6 @@
}
}

#[derive(Debug, Clone)]
pub struct BatchDataSlice {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
}

pub async fn fetch_streaming_address_batches<R: Rpc + 'static>(
context: &BatchContext<R>,
total_elements: u64,
Expand Down
6 changes: 3 additions & 3 deletions forester/src/processor/v2/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
}

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}
Comment on lines 134 to 137
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Extract worker-pool bootstrap into a single helper.

The same fallible initialization block now exists in three paths. A small ensure_worker_pool() would keep future startup logging/retry behavior consistent and avoid one path drifting the next time worker bootstrap changes.

Also applies to: 534-537, 563-566

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v2/processor.rs` around lines 134 - 137, Create a
single helper method (e.g., ensure_worker_pool) on the processor that
encapsulates the fallible initialization currently duplicated: check
self.worker_pool.is_none(), call
spawn_proof_workers(&self.context.prover_config)?, and set self.worker_pool =
Some(WorkerPool { job_tx }); then replace the three duplicated blocks with calls
to ensure_worker_pool(). Reference the existing symbols spawn_proof_workers,
WorkerPool, self.worker_pool, and self.context.prover_config so the helper
performs the same behavior and returns the same error propagation as the
original code.


Expand Down Expand Up @@ -532,7 +532,7 @@ where
((queue_size / self.zkp_batch_size) as usize).min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down Expand Up @@ -561,7 +561,7 @@ where
let max_batches = max_batches.min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down
18 changes: 9 additions & 9 deletions forester/src/processor/v2/proof_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,27 @@
}

impl ProofClients {
fn new(config: &ProverConfig) -> Self {
Self {
fn new(config: &ProverConfig) -> crate::Result<Self> {
Ok(Self {
append_client: ProofClient::with_config(
config.append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
nullify_client: ProofClient::with_config(
config.update_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
address_append_client: ProofClient::with_config(
config.address_append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
}
)?,
})
}

fn get_client(&self, input: &ProofInput) -> &ProofClient {
Expand All @@ -161,14 +161,14 @@
ProofInput::Nullify(_) => &self.nullify_client,
ProofInput::AddressAppend(_) => &self.address_append_client,
}
}

Check warning on line 164 in forester/src/processor/v2/proof_worker.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/light-protocol/light-protocol/forester/src/processor/v2/proof_worker.rs

Check warning on line 164 in forester/src/processor/v2/proof_worker.rs

View workflow job for this annotation

GitHub Actions / lint

Diff in /home/runner/work/light-protocol/light-protocol/forester/src/processor/v2/proof_worker.rs
}

pub fn spawn_proof_workers(config: &ProverConfig) -> async_channel::Sender<ProofJob> {
pub fn spawn_proof_workers(config: &ProverConfig) -> crate::Result<async_channel::Sender<ProofJob>> {
let (job_tx, job_rx) = async_channel::bounded::<ProofJob>(256);
let clients = Arc::new(ProofClients::new(config));
let clients = Arc::new(ProofClients::new(config)?);
tokio::spawn(async move { run_proof_pipeline(job_rx, clients).await });
job_tx
Ok(job_tx)
}

async fn run_proof_pipeline(
Expand Down
Loading
Loading