Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 7 additions & 10 deletions forester-utils/src/address_staging_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl AddressStagingTree {
low_element_next_values: &[[u8; 32]],
low_element_indices: &[u64],
low_element_next_indices: &[u64],
low_element_proofs: &[Vec<[u8; 32]>],
low_element_proofs: &[[[u8; 32]; HEIGHT]],
leaves_hashchain: [u8; 32],
zkp_batch_size: usize,
epoch: u64,
Expand All @@ -145,15 +145,12 @@ impl AddressStagingTree {
let inputs = get_batch_address_append_circuit_inputs::<HEIGHT>(
next_index,
old_root,
low_element_values.to_vec(),
low_element_next_values.to_vec(),
low_element_indices.iter().map(|v| *v as usize).collect(),
low_element_next_indices
.iter()
.map(|v| *v as usize)
.collect(),
low_element_proofs.to_vec(),
addresses.to_vec(),
low_element_values,
low_element_next_values,
low_element_indices,
low_element_next_indices,
low_element_proofs,
addresses,
&mut self.sparse_tree,
leaves_hashchain,
zkp_batch_size,
Expand Down
98 changes: 82 additions & 16 deletions forester/src/processor/v2/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use light_client::{
indexer::{AddressQueueData, Indexer, QueueElementsV2Options, StateQueueData},
rpc::Rpc,
};
use light_hasher::hash_chain::create_hash_chain_from_slice;

use crate::processor::v2::{common::clamp_to_u16, BatchContext};

Expand All @@ -22,6 +23,17 @@ pub(crate) fn lock_recover<'a, T>(mutex: &'a Mutex<T>, name: &'static str) -> Mu
}
}

#[derive(Debug, Clone)]
pub struct AddressBatchSnapshot<const HEIGHT: usize> {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
pub low_element_proofs: Vec<[[u8; 32]; HEIGHT]>,
pub leaves_hashchain: [u8; 32],
}

pub async fn fetch_zkp_batch_size<R: Rpc>(context: &BatchContext<R>) -> crate::Result<u64> {
let rpc = context.rpc_pool.get_connection().await?;
let mut account = rpc
Expand Down Expand Up @@ -474,20 +486,79 @@ impl StreamingAddressQueue {
}
}

pub fn get_batch_data(&self, start: usize, end: usize) -> Option<BatchDataSlice> {
pub fn get_batch_snapshot<const HEIGHT: usize>(
&self,
start: usize,
end: usize,
hashchain_idx: usize,
) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
Comment on lines +489 to +494
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bind the hash-chain cursor to the requested batch.

get_batch_snapshot still trusts two independent caller-controlled cursors: start..end and hashchain_idx. That lets a caller build a non-batch-sized snapshot or accidentally pair batch N's addresses/proofs with batch M's hash chain, which only fails later during proof generation/verification. This helper already knows self.zkp_batch_size, so it should enforce one aligned batch here and derive or validate the hash-chain index locally.

🩹 Suggested guardrails
     pub fn get_batch_snapshot<const HEIGHT: usize>(
         &self,
         start: usize,
         end: usize,
         hashchain_idx: usize,
     ) -> crate::Result<Option<AddressBatchSnapshot<HEIGHT>>> {
+        if start >= end {
+            return Ok(None);
+        }
+
+        let batch_len = end - start;
+        if batch_len != self.zkp_batch_size || start % self.zkp_batch_size != 0 {
+            return Err(anyhow!(
+                "invalid address batch range {}..{} for zkp_batch_size {}",
+                start,
+                end,
+                self.zkp_batch_size
+            ));
+        }
+
+        let expected_hashchain_idx = start / self.zkp_batch_size;
+        if hashchain_idx != expected_hashchain_idx {
+            return Err(anyhow!(
+                "hashchain_idx {} does not match batch range {}..{} (expected {})",
+                hashchain_idx,
+                start,
+                end,
+                expected_hashchain_idx
+            ));
+        }
+
         let available = self.wait_for_batch(end);
-        if available < end || start >= end {
+        if available < end {
             return Ok(None);
         }

Based on learnings: ZKP batch sizes for address trees must be 10 or 250, and tree updates must require zero-knowledge proofs with public inputs: old root, new root, hash chain, and next_index.

Also applies to: 524-541

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@forester/src/processor/v2/helpers.rs` around lines 489 - 494,
get_batch_snapshot currently accepts independent caller-controlled cursors
(start..end and hashchain_idx); enforce batch alignment by computing the batch
index from start using self.zkp_batch_size (e.g., batch_idx = start /
self.zkp_batch_size), require end == start + self.zkp_batch_size, and derive the
hash-chain index from that batch index instead of trusting the caller-provided
hashchain_idx (or validate that the passed hashchain_idx equals the derived
value and return an error if not). Update get_batch_snapshot to validate these
invariants and use the derived hash-chain index when loading the hash chain;
apply the same change to the analogous helper at the other location referenced
in the comment.

let available = self.wait_for_batch(end);
if start >= available {
return None;
if available < end || start >= end {
return Ok(None);
}
let actual_end = end.min(available);
let actual_end = end;
let data = lock_recover(&self.data, "streaming_address_queue.data");
Some(BatchDataSlice {
addresses: data.addresses[start..actual_end].to_vec(),

for (name, len) in [
("addresses", data.addresses.len()),
("low_element_values", data.low_element_values.len()),
(
"low_element_next_values",
data.low_element_next_values.len(),
),
("low_element_indices", data.low_element_indices.len()),
(
"low_element_next_indices",
data.low_element_next_indices.len(),
),
] {
if len < actual_end {
return Err(anyhow!(
"incomplete batch data: {} len {} < required end {}",
name,
len,
actual_end
));
}
}

let addresses = data.addresses[start..actual_end].to_vec();
if addresses.is_empty() {
return Err(anyhow!("Empty batch at start={}", start));
}

let leaves_hashchain = match data.leaves_hash_chains.get(hashchain_idx).copied() {
Some(hashchain) => hashchain,
None => {
tracing::debug!(
"Missing leaves_hash_chain for batch {} (available: {}), deriving from addresses",
hashchain_idx,
data.leaves_hash_chains.len()
);
create_hash_chain_from_slice(&addresses).map_err(|error| {
anyhow!(
"Failed to derive leaves_hash_chain for batch {} from {} addresses: {}",
hashchain_idx,
addresses.len(),
error
)
})?
}
};

Ok(Some(AddressBatchSnapshot {
low_element_values: data.low_element_values[start..actual_end].to_vec(),
low_element_next_values: data.low_element_next_values[start..actual_end].to_vec(),
low_element_indices: data.low_element_indices[start..actual_end].to_vec(),
low_element_next_indices: data.low_element_next_indices[start..actual_end].to_vec(),
})
low_element_proofs: data
.reconstruct_proofs::<HEIGHT>(start..actual_end)
.map_err(|error| {
anyhow!("incomplete batch data: failed to reconstruct proofs: {error}")
})?,
addresses,
leaves_hashchain,
}))
}

pub fn into_data(self) -> AddressQueueData {
Expand Down Expand Up @@ -522,6 +593,10 @@ impl StreamingAddressQueue {
lock_recover(&self.data, "streaming_address_queue.data").start_index
}

pub fn tree_next_insertion_index(&self) -> u64 {
lock_recover(&self.data, "streaming_address_queue.data").tree_next_insertion_index
}

pub fn subtrees(&self) -> Vec<[u8; 32]> {
lock_recover(&self.data, "streaming_address_queue.data")
.subtrees
Expand Down Expand Up @@ -553,15 +628,6 @@ impl StreamingAddressQueue {
}
}

#[derive(Debug, Clone)]
pub struct BatchDataSlice {
pub addresses: Vec<[u8; 32]>,
pub low_element_values: Vec<[u8; 32]>,
pub low_element_next_values: Vec<[u8; 32]>,
pub low_element_indices: Vec<u64>,
pub low_element_next_indices: Vec<u64>,
}

pub async fn fetch_streaming_address_batches<R: Rpc + 'static>(
context: &BatchContext<R>,
total_elements: u64,
Expand Down
6 changes: 3 additions & 3 deletions forester/src/processor/v2/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
}

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down Expand Up @@ -532,7 +532,7 @@ where
((queue_size / self.zkp_batch_size) as usize).min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down Expand Up @@ -561,7 +561,7 @@ where
let max_batches = max_batches.min(self.context.max_batches_per_tree);

if self.worker_pool.is_none() {
let job_tx = spawn_proof_workers(&self.context.prover_config);
let job_tx = spawn_proof_workers(&self.context.prover_config)?;
self.worker_pool = Some(WorkerPool { job_tx });
}

Expand Down
20 changes: 11 additions & 9 deletions forester/src/processor/v2/proof_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,27 +132,27 @@ struct ProofClients {
}

impl ProofClients {
fn new(config: &ProverConfig) -> Self {
Self {
fn new(config: &ProverConfig) -> crate::Result<Self> {
Ok(Self {
append_client: ProofClient::with_config(
config.append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
nullify_client: ProofClient::with_config(
config.update_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
)?,
address_append_client: ProofClient::with_config(
config.address_append_url.clone(),
config.polling_interval,
config.max_wait_time,
config.api_key.clone(),
),
}
)?,
})
}

fn get_client(&self, input: &ProofInput) -> &ProofClient {
Expand All @@ -164,11 +164,13 @@ impl ProofClients {
}
}

pub fn spawn_proof_workers(config: &ProverConfig) -> async_channel::Sender<ProofJob> {
pub fn spawn_proof_workers(
config: &ProverConfig,
) -> crate::Result<async_channel::Sender<ProofJob>> {
let (job_tx, job_rx) = async_channel::bounded::<ProofJob>(256);
let clients = Arc::new(ProofClients::new(config));
let clients = Arc::new(ProofClients::new(config)?);
tokio::spawn(async move { run_proof_pipeline(job_rx, clients).await });
job_tx
Ok(job_tx)
}

async fn run_proof_pipeline(
Expand Down
Loading
Loading