Skip to content
This repository was archived by the owner on Sep 12, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ members = [
"local-storage",
"messages",
"myceli",
"testing/testing-scenarios",
"transports"
]

Expand Down Expand Up @@ -57,4 +58,5 @@ tracing-subscriber = { version = "0.3.14", default-features = false, features =
ipfs-unixfs = { path = "ipfs-unixfs" }
local-storage = { path = "local-storage" }
messages = { path = "messages" }
myceli = { path = "myceli" }
transports = { path = "transports" }
43 changes: 41 additions & 2 deletions local-storage/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ impl StorageProvider for SqliteStorageProvider {
WITH RECURSIVE cids(x) AS (
VALUES(?1)
UNION
SELECT block_cid FROM links JOIN cids ON root_cid=x
SELECT block_cid FROM links JOIN cids ON root_cid=x WHERE block_id IS NOT null
)
SELECT x FROM cids;
SELECT cid FROM blocks WHERE cid in cids
",
)?
.query_map([cid], |row| {
Expand Down Expand Up @@ -524,4 +524,43 @@ pub mod tests {
0
);
}

#[test]
pub fn test_verify_get_all_cids() {
let harness = TestHarness::new();

let cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"00"));
let cid_str = cid.to_string();
let block_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11"));
let child_cid_str = block_cid.to_string();

let other_child_cid = Cid::new_v1(0x55, cid::multihash::Code::Sha2_256.digest(b"11"));

let block = StoredBlock {
cid: cid_str.to_string(),
data: vec![],
links: vec![block_cid.to_string(), other_child_cid.to_string()],
filename: None,
};

let child_block = StoredBlock {
cid: block_cid.to_string(),
data: b"101293910101".to_vec(),
links: vec![],
filename: None,
};

harness.provider.import_block(&block).unwrap();

let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap();
assert_eq!(dag_cids, vec![cid_str.to_string()]);

harness.provider.import_block(&child_block).unwrap();

let dag_cids = harness.provider.get_all_dag_cids(&cid_str).unwrap();
assert_eq!(
dag_cids,
vec![child_cid_str.to_string(), cid_str.to_string()]
);
}
}
97 changes: 97 additions & 0 deletions local-storage/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl Storage {
Ok(blocks)
});
let blocks = blocks?;
info!("FileBuilder found {} blocks in {path:?}", blocks.len());
let mut root_cid: Option<String> = None;

blocks.iter().for_each(|b| {
Expand Down Expand Up @@ -66,6 +67,7 @@ impl Storage {
if blocks.len() == 1 {
if let Some(first) = blocks.first() {
root_cid = Some(first.cid().to_string());
info!("set final root {root_cid:?}");
}
}
if let Some(root_cid) = root_cid {
Expand Down Expand Up @@ -144,6 +146,29 @@ impl Storage {
self.provider
.get_dag_blocks_by_window(cid, offset, window_size)
}

pub fn get_last_dag_cid(&self, cid: &str) -> Result<String> {
let dag_cids = self.get_all_dag_cids(cid)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there should be a TODO to have the get_all* functions return an iterator rather than allocate. This is a good example of why.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could easily have been done inside of sqlite instead of returning all of the CIDs...but this function doesn't end up really getting used, so it can probably get removed.

But yes I agree that a good refactor would be returning iterator from those get_all* functions.

match dag_cids.last() {
Some(cid) => Ok(cid.to_owned()),
None => bail!("No last cid found for dag {cid}"),
}
}

// Given a root CID, a number of CIDs, approximate the window we should be in
// pub fn find_dag_window(&self, root: &str, cid_count: u32, window_size: u32) -> Result<u32> {

// let all_cids = self.get_all_dag_cids(root)?;
// let chunks = all_cids.chunks(window_size as usize);
// let mut window_num = 0;
// for c in chunks {
// if c.contains(&child.to_string()) {
// return Ok(window_num);
// }
// window_num += 1;
// }
// bail!("Failed to find child cid {child} in dag {root}");
// }
}

#[cfg(test)]
Expand Down Expand Up @@ -174,6 +199,46 @@ pub mod tests {
}
}

fn generate_stored_blocks(num_blocks: u16) -> Result<Vec<StoredBlock>> {
const CHUNK_SIZE: u16 = 20;
let data_size = CHUNK_SIZE * num_blocks;
let mut data = Vec::<u8>::new();
data.resize(data_size.into(), 1);
thread_rng().fill_bytes(&mut data);

let rt = tokio::runtime::Runtime::new().unwrap();
let blocks = rt.block_on(async {
let file: File = FileBuilder::new()
.content_bytes(data)
.name("testfile")
.fixed_chunker(CHUNK_SIZE.into())
.build()
.await
.unwrap();
let blocks: Vec<_> = file.encode().await.unwrap().try_collect().await.unwrap();
blocks
});
let mut stored_blocks = vec![];

blocks.iter().for_each(|b| {
let links = b
.links()
.iter()
.map(|c| c.to_string())
.collect::<Vec<String>>();
let stored = StoredBlock {
cid: b.cid().to_string(),
data: b.data().to_vec(),
links,
filename: None,
};

stored_blocks.push(stored);
});

Ok(stored_blocks)
}

#[test]
pub fn test_import_path_to_storage_single_block() {
let harness = TestHarness::new();
Expand Down Expand Up @@ -320,6 +385,38 @@ pub mod tests {
assert_eq!(blocks.len(), cids.len());
}

#[test]
pub fn test_get_all_dag_cids() {
let harness = TestHarness::new();

let mut dag_blocks = generate_stored_blocks(50).unwrap();
let total_block_count = dag_blocks.len();

let root = dag_blocks.pop().unwrap();

harness.storage.import_block(&root).unwrap();

let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
assert_eq!(dag_cids.len(), 1);

for _ in (1..10) {
harness
.storage
.import_block(&dag_blocks.pop().unwrap())
.unwrap();
}

let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
assert_eq!(dag_cids.len(), 10);

while let Some(block) = dag_blocks.pop() {
harness.storage.import_block(&block).unwrap()
}

let dag_cids = harness.storage.get_all_dag_cids(&root.cid).unwrap();
assert_eq!(dag_cids.len(), total_block_count);
}

// TODO: duplicated data is not being handled correctly right now, need to fix this
// #[test]
// pub fn export_from_storage_various_file_sizes_duplicated_data() {
Expand Down
14 changes: 14 additions & 0 deletions messages/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,22 @@ pub enum ApplicationAPI {
},
// Resumes the transmission of all dags which may be paused
ResumeTransmitAllDags,
// Resumes the transmission of a dag from a prior session, given the last received CID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, you don't provide the last received CID.

Since UDP can go out-of-order and drop middle parts, I wonder if there might be another model that could be followed. Like if you got block 1,2,3,5,6,7 & 8 but missed 4, do you really want to ask for everything starting with 4 to be retransmitted? Couldn't you just as easily start a request for a new DAG rooted at CID of block # 4 ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm I'm not sure we could start a transmission for a DAG rooted at the CID of block 4, because it wouldn't have the links to resolve the rest of the DAG like the root CID would. But I do think we could use some windowing information to make the first resume request a bit better.

Here is the current behavior for the scenario you laid out:
Let's assume we have a window size of 8 blocks on both sides, so we missed block 4 from that window of 1-8.

  1. The receiver would calculate that it has received 7 CIDs for this dag, so it would send that number in a ResumePriorDagTransmit message to the transmitter.
  2. The transmitter would receive the Resume... message and determine that the window containing blocks 1...8 needs to be resent
  3. The whole window would be transmitted again and then the iterative windowing process would resume

Here is a slightly better way that doesn't involve reworking how we specify missing blocks:
Again assuming a window size of 8 blocks on both sides.

  1. The receiver would call a new function, get_missing_blocks_and_window, which takes a root cid and window size, and returns the last window num it was receiving blocks for, and any missing CIDs from that window (based on how large it should be), which is placed in a ResumePriorDagTransmit message and sent to the transmitter
  2. The transmitter would send the blocks associated with the CIDs in the received Resume... message and set the window num in the session accordingly
  3. After the last partly-received window has been correctly transmitted, then the iterative windowing processing resumes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: I'm not suggesting a change here and now. Just thinking about these things. As I wrap my head around the project I can't help but noodle a bit.

I'm not sure we could start a transmission for a DAG rooted at the CID of block 4, because it wouldn't have the links to resolve the rest of the DAG like the root CID would.

It would only have its own block and children (grand children of the original root, perhaps) if it has any. Every node in a DAG is a root of a smaller DAG, because 🌴 😄

What I was really getting at is one could have a request that said, "Here's some CIDs I want" and pass in the missing CIDs, and that's logically the same as calling TransmitDag (with your own address as the target) repeatedly - once for every link in the DAG you don't have a block for.

But yeah, if you have a window as a parameter in the request message, then yes you can do the exact equivalent thing. If you can detect a missing block that implies you have a block with a link to it, so you could just as easily 'resume' that block with a window that only covers links you know you don't have.

I don't know if you need the transmitter to initiate the resume stuff, though? myceli could see what blocks it's missing at startup (I imagine when the sat gets powered on it runs main(), yes?) and just start requesting them. Or if that's too much (might accidentally un-GC you), one could have a database record of roots of DAGs that were originally sent, and at startup start requesting links missing only from them (or if the DAG is complete remove it from the table).

// for determining where to restart the transmission
ResumePriorDagTransmit {
cid: String,
num_received_cids: u32,
retries: u8,
},
/// Listens on address for data and writes out files received
Receive {
listen_addr: String,
},
/// Commands a node to request another node at target_addr to resume dag transfer
RequestResumeDagTransfer {
cid: String,
target_addr: String,
},
/// Request Available Blocks
RequestAvailableBlocks,
/// Advertise all available blocks by CID
Expand Down Expand Up @@ -125,6 +137,8 @@ pub enum ApplicationAPI {
AvailableDags {
dags: Vec<DagInfo>,
},
/// Asks IPFS instance to terminate
Terminate,
// TODO: Implement later
// Information about the next pass used for calculating
// data transfer parameters
Expand Down
10 changes: 10 additions & 0 deletions messages/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,14 @@ pub enum DataProtocol {
target_addr: String,
retries: u8,
},
// Resumes the transmission of a dag which isn't currently tracked in sessions
// This accounts for resuming after restarting of transmitter
ResumePriorDagTransmit {
cid: String,
num_received_cids: u32,
target_addr: String,
retries: u8,
},
// Resumes the transmission of a dag which may have run out of retries or
// paused due to connectivity lost
ResumeTransmitDag {
Expand All @@ -66,4 +74,6 @@ pub enum DataProtocol {
cid: String,
blocks: Vec<String>,
},
// Used by listener to terminate shipper on program exit
Terminate,
}
7 changes: 6 additions & 1 deletion myceli/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::Result;
use anyhow::{bail, Result};
use local_storage::storage::Storage;
use messages::{ApplicationAPI, DagInfo, DataProtocol, Message};
use std::path::PathBuf;
Expand Down Expand Up @@ -77,6 +77,11 @@ pub fn get_available_dags(storage: Rc<Storage>) -> Result<Message> {
}))
}

pub fn get_last_dag_cid(cid: &str, storage: Rc<Storage>) -> Result<String> {
let last_dag_cid = storage.get_last_dag_cid(cid)?;
Ok(last_dag_cid)
}

#[cfg(test)]
pub mod tests {
use super::*;
Expand Down
54 changes: 50 additions & 4 deletions myceli/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::path::PathBuf;
use std::rc::Rc;
use std::sync::mpsc::{self, Sender};
use std::sync::{Arc, Mutex};
use std::thread::spawn;
use std::thread::{spawn, JoinHandle};
use tracing::{debug, error, info};
use transports::Transport;

Expand All @@ -19,6 +19,7 @@ pub struct Listener<T> {
transport: Arc<T>,
connected: Arc<Mutex<bool>>,
radio_address: Option<String>,
shipper_handle: Option<JoinHandle<()>>,
}

impl<T: Transport + Send + 'static> Listener<T> {
Expand All @@ -39,6 +40,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
transport,
connected: Arc::new(Mutex::new(true)),
radio_address,
shipper_handle: None,
})
}

Expand All @@ -55,7 +57,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
let shipper_transport = Arc::clone(&self.transport);
let initial_connected = Arc::clone(&self.connected);
let shipper_radio = self.radio_address.clone();
spawn(move || {
self.shipper_handle = Some(spawn(move || {
let mut shipper = Shipper::new(
&shipper_storage_path,
shipper_receiver,
Expand All @@ -69,7 +71,7 @@ impl<T: Transport + Send + 'static> Listener<T> {
)
.expect("Shipper creation failed");
shipper.receive_msg_loop();
});
}));

loop {
match self.transport.receive() {
Expand All @@ -79,7 +81,14 @@ impl<T: Transport + Send + 'static> Listener<T> {
} else {
sender_addr.to_owned()
};
match self.handle_message(message, &target_addr, shipper_sender.clone()) {
match self.handle_message(message, &sender_addr, shipper_sender.clone()) {
Ok(Some(Message::ApplicationAPI(ApplicationAPI::Terminate))) => {
info!("Received termination command, exiting listener");
if let Some(handle) = self.shipper_handle.take() {
handle.join().unwrap();
}
return Ok(());
}
Ok(Some(resp)) => {
if let Err(e) = self.transmit_response(resp, &target_addr) {
error!("TransmitResponse error: {e}");
Expand Down Expand Up @@ -216,6 +225,43 @@ impl<T: Transport + Send + 'static> Listener<T> {
Message::ApplicationAPI(ApplicationAPI::RequestAvailableDags) => {
Some(handlers::get_available_dags(self.storage.clone())?)
}
Message::ApplicationAPI(ApplicationAPI::Terminate) => {
shipper_sender.send((DataProtocol::Terminate, sender_addr.to_string()))?;
Some(Message::ApplicationAPI(ApplicationAPI::Terminate))
}
Message::ApplicationAPI(ApplicationAPI::RequestResumeDagTransfer {
cid,
target_addr,
}) => {
let all_cids = self.storage.get_all_dag_cids(&cid)?;
println!("found cids for {cid}, {all_cids:?}");
let num_received_cids = self.storage.get_all_dag_cids(&cid)?.len() as u32;
self.transmit_response(
Message::ApplicationAPI(ApplicationAPI::ResumePriorDagTransmit {
cid,
num_received_cids,
retries: 5,
}),
&target_addr,
)?;
None
}
Message::ApplicationAPI(ApplicationAPI::ResumePriorDagTransmit {
cid,
num_received_cids,
retries,
}) => {
shipper_sender.send((
DataProtocol::ResumePriorDagTransmit {
cid,
num_received_cids,
target_addr: sender_addr.to_string(),
retries,
},
sender_addr.to_string(),
))?;
None
}
// Default case for valid messages which don't have handling code implemented yet
message => {
info!("Received message: {:?}", message);
Expand Down
Loading