Conversation
| hub_repo | ||
| ) | ||
| let Model::LLM(LLM { checkpoint, .. }) = &self.coordinator_state.model; | ||
| if !self.skip_upload_check { |
There was a problem hiding this comment.
maybe we can invert the check and return early
There was a problem hiding this comment.
We have to execute the rest of the code in that function, so I don’t think we can return early there, unless you meant something else?
There was a problem hiding this comment.
considered pulling this out into a fn? this seems like something that could be deduplicated between the centralized and decentralized clients
| if skip_upload { | ||
| info!("Skipping checkpoint save and upload (skip_upload flag is set)"); | ||
| checkpoint_completed.store(true, Ordering::SeqCst); |
There was a problem hiding this comment.
you can return Ok(evals) early and you don't need to have an else branch here
shared/data-provider/src/errors.rs
Outdated
| //#[error("GCS operation failed: {0}")] | ||
| //GcsStorage(#[from] google_cloud_storage::client::Error), |
There was a problem hiding this comment.
should we delete this?
| hub_repo | ||
| ) | ||
| let Model::LLM(LLM { checkpoint, .. }) = &self.coordinator_state.model; | ||
| if !self.skip_upload_check { |
There was a problem hiding this comment.
considered pulling this out into a fn? this seems like something that could be deduplicated between the centralized and decentralized clients
| // Use HF_TOKEN from checkpoint_config for Hub uploads | ||
| if let Some(ref hub_token) = config.hub_token { | ||
| Some(UploadInfo::Hub(HubUploadInfo { | ||
| hub_repo: String::new(), // Will be validated when actual checkpoint is received |
There was a problem hiding this comment.
this feels a little strange to me, carrying a dummy hub repo around. any way to get this as some sort of enum with a filled/unfilled variant, or something?
There was a problem hiding this comment.
Totally true. Since right now the only way to get the checkpoint is through the Coordinator, we have to update the source once the client connects and receives the updates. I refactored it to use an enum that carries the minimum required info at first, and then switches to the other variant once all the necessary information is available.
|
|
||
| let (mut app, allowlist, p2p, state_options) = | ||
| build_app(cancel, server_addr, tx_tui_state, args) | ||
| build_app(cancel, server_addr, tx_tui_state, args, false) |
There was a problem hiding this comment.
is_testing arg feels a little strange here, just passing a raw boolean.. but maybe this is fine
There was a problem hiding this comment.
Yeah, I don’t like it either, but it was a quick way to avoid checking credentials and permissions for uploading to external storage. I tried another approach to avoid adding new flags everywhere.
| model::Checkpoint::Hub(HubRepo { repo_id, revision }) | ||
| | model::Checkpoint::P2P(HubRepo { repo_id, revision }) => { | ||
| Some(UploadInfo::Hub(HubUploadInfo { | ||
| hub_repo: (&repo_id).into(), |
There was a problem hiding this comment.
weird, does the into() not autoref repo_id?
There was a problem hiding this comment.
With some refactors I made, I think this part is no longer needed. But yeah, I thought the same when using into, and it was kind of suggesting that approach.
| "storage.objects.delete", | ||
| ]; | ||
|
|
||
| let resource = format!("projects/_/buckets/{}", gcs_info.gcs_bucket); |
There was a problem hiding this comment.
this seems like a lot of GCS-specific stuff to have in the main app file. have we considered making a tiny wrapper module in some other crate that makes this api a bit more like the hf one? or even a trait to abstract reading / writing checkpoints?
There was a problem hiding this comment.
I moved all the credential-specific logic into a new module, where I defined the UploadInfo and UploadCredentials structs, and we only call validate_credentials from there. Maybe the trait approach is even better, so that’s a good suggestion, I’ll try that. But at least this should be an improvement.
|
|
||
| let upload_handle = tokio::task::spawn(async move { | ||
|
|
||
| let upload_info = match checkpoint { |
There was a problem hiding this comment.
this kind of duplication of structure makes me wonder if we shouldn't have a separate enum for, like
enum HostedModel {
Gcp { whatever }
Hub { whatever }
}and then a checkpoint would be
enum Checkpoint {
Hosted(HostedModel)
P2P(HostedModel)
}just to remove this weird duplication of p2p / p2pgcs. thoughts?
There was a problem hiding this comment.
The only reason I did this is that adding a new enum containing those variants would change the size of the Checkpoint struct, which would in turn change the Coordinator’s memory layout. I wanted to avoid a redeployment caused by storage changes. I much prefer the suggested approach, but to avoid dealing with those changes I went in this direction, does that make sense?
| } | ||
| } | ||
|
|
||
| fn default_checkpoint_dir() -> PathBuf { |
There was a problem hiding this comment.
should this not write to somewhere in the scratch dir in the container, so all our files end up in the same place?
There was a problem hiding this comment.
Yep, good catch. I changed it to check whether we have the scratch directory and to keep all the local checkpoints there.
| let seed = sha256(&round.random_seed.to_le_bytes()); | ||
|
|
||
| let checkpointers = max( | ||
| (coordinator.epoch_state.clients.len() / 3).min(SOLANA_MAX_NUM_CHECKPOINTERS), |
There was a problem hiding this comment.
will this be correct if a client has requested to be withdrawn e.g. within warmup? (it might be, just sanity checking)
There was a problem hiding this comment.
I think that on every state transition we call move_clients_to_exited to check whether any clients are unhealthy and move them from epoch_state.clients to epoch_state.exited_clients, so that should handle it. Worst case, if we don’t move one, the idea is to select multiple checkpointers at the end of an epoch to increase the chances that at least one of them is healthy and successfully uploads the full model.
| Hub(HubRepo), | ||
| P2P(HubRepo), | ||
| Gcs(GcsRepo), | ||
| P2P(HubRepo), |
There was a problem hiding this comment.
doesn't re-ordering here mean that existing coordinators currently set to P2P will suddenly decode as Gcs?
There was a problem hiding this comment.
YES! nice catch, I changed it to keep the original order 🙂
| .arg("--env-file") | ||
| .arg(&self.env_file); | ||
|
|
||
| if let Some(dir) = &self.scratch_dir { |
There was a problem hiding this comment.
doesn't this assert that you MUST pass google credentials for any run started via run-manager, meaning we can't do e.g. hf checkpointing anymore?
also, i would much rather we find a way to pass the creds via some other mechanism - maybe write them to a temp dir the same way we write the env file, or something? adding another "you must have this file in this dir" thing is janky IMO
There was a problem hiding this comment.
Yes! Also, thank you for catching that. I changed it to be optional and now we only mount the credentials into Docker if they exist locally, but this should support Hugging Face the same way as before.
I agree that asking to move the credentials to the scratch dir is kind of annoying. The only alternative I can think of is setting GOOGLE_CREDENTIALS_FILE_PATH in the .env to point to the local credential file, and then having the manager script automatically mount that file into the scratch dir and set the env variable. Does that sound better?
There was a problem hiding this comment.
I agree that asking to move the credentials to the scratch dir is kind of annoying. The only alternative I can think of is setting GOOGLE_CREDENTIALS_FILE_PATH in the .env to point to the local credential file, and then having the manager script automatically mount that file into the scratch dir and set the env variable. Does that sound better?
Yeah, i think this would work better if that's cool :)
Depends on #463
This PR contains several changes to client arguments, dependencies, and the general cooldown state logic. Here’s a summary of the changes made in this PR:
SOLANA_MAX_NUM_CHECKPOINTERS, which is currently 16) is deterministically selected as checkpointers using a seeded shuffle algorithm based on the round’s random seed.cooldown_timebefore moving on.--hub-repoand--gcs-bucketflags were removed from the training arguments. Checkpoint destinations are now derived from the coordinator configuration, and there is a single repo or bucket that contains all the checkpoints for the run.--skip-checkpoint-upload, for testing purposes. This avoids uploading and saving checkpoints locally and should not be used outside testing environments.google-cloud-storagecrate to version 1.6.SCRATCH_DIRin order to work correctly when using GCP checkpoints.