Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a5d6eed
First implementation of off chain data saving to gcp
IAvecilla Jan 22, 2026
eb0d7f2
Correct config parse and push model extra config to the gcp bucket wh…
IAvecilla Jan 22, 2026
8dccc9b
Fix light config to match with new format
IAvecilla Jan 22, 2026
e6e85fa
Fix tests and add hub support
IAvecilla Jan 27, 2026
5df718a
Update 0.2.0 version of contracts
IAvecilla Jan 27, 2026
b8396ae
Merge branch 'gcs-2' into off-chain-coordinator-fields
IAvecilla Jan 27, 2026
767e1df
Fix clippy and format
IAvecilla Jan 27, 2026
c783800
Fix website with new off-chain data
IAvecilla Jan 28, 2026
bbbfa74
Fix web with off-chain data
IAvecilla Jan 28, 2026
8659093
Fetch coordinator metadata in the website
IAvecilla Jan 28, 2026
44694e8
Merge branch 'gcs-2' into off-chain-coordinator-fields
IAvecilla Jan 28, 2026
9cfad6a
Bring back update_client_version to ts
IAvecilla Jan 28, 2026
da40341
Add console logs to debug external config fetch
IAvecilla Jan 28, 2026
663301d
Remove externalconfig from the website
IAvecilla Jan 28, 2026
81bf2ad
Merge branch 'gcs-2' into off-chain-coordinator-fields
IAvecilla Jan 28, 2026
343ff5f
Clean the code and refactor some functions
IAvecilla Jan 29, 2026
0223c62
Merge branch 'gcs-2' into off-chain-coordinator-fields
IAvecilla Jan 29, 2026
15e4a55
Add default to version field
IAvecilla Jan 29, 2026
b77447c
Add flag to skip uploading metadata only for testing purposes
IAvecilla Jan 29, 2026
db15a63
Add hub token to upload external config
IAvecilla Jan 29, 2026
92fe252
Be able to use a local config to override external config and avoid f…
IAvecilla Jan 29, 2026
f962b96
Fix config mount on docker container test
IAvecilla Jan 29, 2026
f5d536e
Rename extra config ExternalModelConfig to ModelExtraData
IAvecilla Jan 30, 2026
33edbfb
Fix flag to skip upload to external storage
IAvecilla Feb 2, 2026
50db892
Merge branch 'gcs-2' into off-chain-coordinator-fields
IAvecilla Feb 2, 2026
8bba81d
Fix argument to get local model extra data
IAvecilla Feb 2, 2026
96aa920
Add P2PDummy alternative for testing purposes
IAvecilla Feb 2, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion architectures/centralized/client/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use psyche_centralized_shared::{ClientId, ClientToServerMessage, ServerToClientM
use psyche_client::HubUploadInfo;
use psyche_client::UploadInfo;
use psyche_client::{
Client, ClientTUI, ClientTUIState, NC, RunInitConfig, TrainArgs, read_identity_secret_key,
Client, ClientTUI, ClientTUIState, ModelExtraData, NC, RunInitConfig, TrainArgs,
read_identity_secret_key,
};
use psyche_coordinator::{Coordinator, HealthChecks, model};
use psyche_metrics::ClientMetrics;
Expand Down Expand Up @@ -110,6 +111,7 @@ pub async fn build_app(
let hub_read_token = std::env::var("HF_TOKEN").ok();
let eval_tasks = p.eval_tasks()?;
let checkpoint_config = p.checkpoint_config()?;
let model_extra_data_override: Option<ModelExtraData> = p.model_extra_data_override()?;
let wandb_info = p.wandb_info(format!(
"{}-{}",
p.run_id.clone(),
Expand Down Expand Up @@ -153,6 +155,7 @@ pub async fn build_app(
max_concurrent_parameter_requests: p.max_concurrent_parameter_requests,
device: p.device,
sidecar_port: p.sidecar_port,
model_extra_data_override,
};
let app = App {
cancel,
Expand Down
129 changes: 61 additions & 68 deletions architectures/centralized/server/src/app.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{Result, anyhow, bail};
use anyhow::{Result, bail};
use async_trait::async_trait;
use psyche_centralized_shared::{ClientId, ClientToServerMessage, ServerToClientMessage};
use psyche_coordinator::model::{self, Checkpoint, LLM, LLMTrainingDataLocation, Model};
use psyche_coordinator::model::{self, Checkpoint, Model};
use psyche_coordinator::{
Client, ClientState, Coordinator, CoordinatorError, HealthChecks, Round, RunState,
SOLANA_MAX_NUM_CLIENTS, TickResult,
Expand Down Expand Up @@ -148,12 +148,18 @@ impl App {
}
}

fn default_data_server_port() -> u16 {
9088
}

#[derive(Serialize, Deserialize, Debug)]
pub struct DataServerInfo {
pub dir: PathBuf,
pub token_size: TokenSize,
pub seq_len: usize,
pub shuffle_seed: [u8; 32],
#[serde(default = "default_data_server_port")]
pub port: u16,
}

impl App {
Expand All @@ -176,72 +182,58 @@ impl App {

debug!("potentially launching data server...");

let training_data_server = match &coordinator.model {
Model::LLM(LLM {
data_location,
checkpoint,
..
}) => {
if let LLMTrainingDataLocation::Server(url) = data_location {
match checkpoint {
Checkpoint::Hub(hub_repo) => {
let repo_id = String::from(&hub_repo.repo_id);
let revision = hub_repo.revision.map(|bytes| (&bytes).into());
if revision.is_some()
|| !tokio::fs::try_exists(PathBuf::from(repo_id.clone()))
.await
.unwrap_or_default()
{
download_model_repo_async(&repo_id, revision, None, None, None, true)
.await?;
}
}
Checkpoint::Ephemeral => {
bail!("Can't start up a run with an Ephemeral checkpoint.")
}
Checkpoint::Dummy(_) => {
// ok!
}
Checkpoint::P2P(_) | Checkpoint::P2PGcs(_) => {
bail!("Can't start up a run with a P2P checkpoint.")
}
Checkpoint::Gcs(gcs_repo) => {
let bucket: String = (&gcs_repo.bucket).into();
let prefix: Option<String> =
gcs_repo.prefix.map(|p| (&p).into());
download_model_from_gcs_async(&bucket, prefix.as_deref()).await?;
}
}

let server_addr: SocketAddr = String::from(url).parse().map_err(|e| {
anyhow!("Failed to parse training data server URL {:?}: {}", url, e)
})?;
let data_server_port = server_addr.port();
let DataServerInfo {
dir,
seq_len,
shuffle_seed,
token_size
} = data_server_config.ok_or_else(|| anyhow!(
"Coordinator state requires we host training data, but no --data-config passed."
))?;

let local_data_provider = LocalDataProvider::new_from_directory(
dir,
token_size,
seq_len,
Shuffle::Seeded(shuffle_seed),
)?;

let (tx, backend) = ChannelCoordinatorBackend::new();
let data_server =
DataProviderTcpServer::start(local_data_provider, backend, data_server_port)
let training_data_server = if let Some(DataServerInfo {
dir,
seq_len,
shuffle_seed,
token_size,
port,
}) = data_server_config
{
// Download model if needed based on checkpoint type
let Model::LLM(llm) = &coordinator.model;
match &llm.checkpoint {
Checkpoint::Hub(hub_repo) => {
let repo_id = String::from(&hub_repo.repo_id);
let revision = hub_repo.revision.map(|bytes| (&bytes).into());
if revision.is_some()
|| !tokio::fs::try_exists(PathBuf::from(repo_id.clone()))
.await
.unwrap_or_default()
{
download_model_repo_async(&repo_id, revision, None, None, None, true)
.await?;
Some((tx, data_server))
} else {
None
}
}
Checkpoint::Ephemeral => {
bail!("Can't start up a run with an Ephemeral checkpoint.")
}
Checkpoint::Dummy(_) => {
// ok!
}
Checkpoint::P2P(_) | Checkpoint::P2PDummy | Checkpoint::P2PGcs(_) => {
bail!("Can't start up a run with a P2P checkpoint.")
}
Checkpoint::Gcs(gcs_repo) => {
let bucket: String = (&gcs_repo.bucket).into();
let prefix: Option<String> = gcs_repo.prefix.map(|p| (&p).into());
download_model_from_gcs_async(&bucket, prefix.as_deref()).await?;
}
}

let local_data_provider = LocalDataProvider::new_from_directory(
dir,
token_size,
seq_len,
Shuffle::Seeded(shuffle_seed),
)?;

let (tx, backend) = ChannelCoordinatorBackend::new();
let data_server =
DataProviderTcpServer::start(local_data_provider, backend, port).await?;
Some((tx, data_server))
} else {
None
};
debug!("data server work done.");

Expand All @@ -253,8 +245,7 @@ impl App {
} else {
(None, None)
};
let (cancel, tx_tui_state) =
maybe_start_render_loop(tabs)?;
let (cancel, tx_tui_state) = maybe_start_render_loop(tabs)?;

let mut tick_interval = interval(Duration::from_millis(500));
tick_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); //important!
Expand Down Expand Up @@ -293,7 +284,9 @@ impl App {
withdraw_on_disconnect,
pause,
})
}.instrument(info_span!("App::new")).await
}
.instrument(info_span!("App::new"))
.await
}

pub async fn run(&mut self) -> Result<()> {
Expand Down
9 changes: 3 additions & 6 deletions architectures/centralized/testing/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use psyche_centralized_testing::{
spawn_clients_with_training_delay,
},
};
use psyche_coordinator::{
RunState,
model::{Checkpoint, HubRepo},
};
use psyche_coordinator::{RunState, model::Checkpoint};
use tracing::info;

#[test_log::test(tokio::test(flavor = "multi_thread"))]
Expand Down Expand Up @@ -639,7 +636,7 @@ async fn client_join_in_training_and_get_model_using_p2p() {

assert_with_retries(
|| server_handle.get_checkpoint(),
std::mem::discriminant(&Checkpoint::P2P(HubRepo::dummy())),
std::mem::discriminant(&Checkpoint::P2PDummy),
)
.await;

Expand Down Expand Up @@ -722,7 +719,7 @@ async fn two_clients_join_in_training_and_get_model_using_p2p() {

assert_with_retries(
|| server_handle.get_checkpoint(),
std::mem::discriminant(&Checkpoint::P2P(HubRepo::dummy())),
std::mem::discriminant(&Checkpoint::P2PDummy),
)
.await;

Expand Down
5 changes: 4 additions & 1 deletion architectures/decentralized/solana-client/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use anchor_client::{
};
use anyhow::{Result, anyhow};
use psyche_client::{
Client, ClientTUI, ClientTUIState, NC, RunInitConfig, TrainArgs, read_identity_secret_key,
Client, ClientTUI, ClientTUIState, ModelExtraData, NC, RunInitConfig, TrainArgs,
read_identity_secret_key,
};
use psyche_coordinator::{ClientState, Coordinator, CoordinatorError, RunState};
use psyche_core::sha256;
Expand Down Expand Up @@ -90,6 +91,7 @@ pub async fn build_app(
let eval_tasks = p.eval_tasks()?;
let hub_read_token = std::env::var("HF_TOKEN").ok();
let checkpoint_config = p.checkpoint_config()?;
let model_extra_data_override: Option<ModelExtraData> = p.model_extra_data_override()?;

let solana_pubkey = wallet_keypair.pubkey();
let wandb_info = p.wandb_info(format!("{}-{solana_pubkey}", p.run_id))?;
Expand Down Expand Up @@ -134,6 +136,7 @@ pub async fn build_app(
max_concurrent_parameter_requests: p.max_concurrent_parameter_requests,
device: p.device,
sidecar_port: p.sidecar_port,
model_extra_data_override,
};
let app = App {
run_id: p.run_id.clone(),
Expand Down
3 changes: 3 additions & 0 deletions architectures/decentralized/solana-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ async fn async_main() -> Result<()> {
Checkpoint::Ephemeral => {
bail!("Can't predownload model with ephemeral checkpoint.")
}
Checkpoint::P2PDummy => {
println!("P2PDummy checkpoint (for testing), nothing to predownload.");
}
Checkpoint::Dummy(hub_repo)
| Checkpoint::Hub(hub_repo)
| Checkpoint::P2P(hub_repo) => {
Expand Down
1 change: 1 addition & 0 deletions architectures/decentralized/solana-coordinator/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -1,19 +1,10 @@
use psyche_coordinator::Round;
use psyche_coordinator::RunState;
use psyche_coordinator::model::Checkpoint;
use psyche_coordinator::model::HttpTrainingDataLocation;
use psyche_coordinator::model::LLMArchitecture;
use psyche_coordinator::model::LLMTrainingDataLocation;
use psyche_coordinator::model::LLMTrainingDataType;
use psyche_coordinator::model::Model;
use psyche_core::CosineLR;
use psyche_core::FixedString;
use psyche_core::FixedVec;
use psyche_core::LearningRateSchedule;
use psyche_core::OptimizerDefinition;
use psyche_core::Shuffle;
use psyche_core::SmallBoolean;
use psyche_core::TokenSize;
use psyche_solana_coordinator::CoordinatorAccount;
use psyche_solana_coordinator::coordinator_account_from_bytes;

Expand Down Expand Up @@ -43,12 +34,11 @@ pub async fn run() {
assert_eq!(coordinator.run_state, RunState::Uninitialized);
assert_eq!(coordinator.run_state_start_unix_timestamp, 0);
assert_eq!(coordinator.pending_pause, SmallBoolean::FALSE);
// Coordinator model
// Coordinator model (only on-chain fields)
match coordinator.model {
Model::LLM(llm) => {
assert_eq!(llm.max_seq_len, 2048);
assert_eq!(llm.cold_start_warmup_steps, 0);
assert_eq!(llm.architecture, LLMArchitecture::HfLlama);
match llm.checkpoint {
Checkpoint::Hub(hub) => {
assert_eq!(
Expand All @@ -59,57 +49,6 @@ pub async fn run() {
},
_ => panic!("Expected Hub checkpoint"),
};
assert_eq!(llm.data_type, LLMTrainingDataType::Pretraining);
match llm.data_location {
LLMTrainingDataLocation::Http(http) => {
match http.location {
HttpTrainingDataLocation::Gcp {
bucket_name,
filter_directory,
} => {
assert_eq!(
bucket_name,
fixed_str("nous-pretraining-public-us")
);
assert_eq!(
filter_directory,
fixed_str("fineweb-edu-tokenized-llama2")
);
},
_ => panic!("Expected Gcp data location"),
};
assert_eq!(http.token_size_in_bytes, TokenSize::TwoBytes);
assert_eq!(http.shuffle, Shuffle::DontShuffle);
},
_ => panic!("Expected Http data location"),
};
match llm.lr_schedule {
LearningRateSchedule::Cosine(learning_rate) => {
assert_eq!(
learning_rate,
CosineLR::new(0.0004, 250, 0.0, 25000, 0.00004)
);
},
_ => panic!("Expected Constant LR schedule"),
};
match llm.optimizer {
OptimizerDefinition::Distro {
clip_grad_norm,
weight_decay,
compression_decay,
compression_topk,
compression_chunk,
quantize_1bit,
} => {
assert_eq!(clip_grad_norm, Some(1.0));
assert_eq!(weight_decay, None);
assert_eq!(compression_decay, 0.999);
assert_eq!(compression_topk, 2);
assert_eq!(compression_chunk, 64);
assert_eq!(quantize_1bit, false);
},
_ => panic!("Expected Distro optimizer"),
}
},
};
// Coordinator config
Expand Down
Loading