Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
a1b9a04
Refactor MPI for heterogenous cluster support.
AutonomicPerfectionist Sep 24, 2023
1f3febc
Add documentation for ggml-mpi functions
AutonomicPerfectionist Sep 25, 2023
d70f26c
Add code comments in MPI
AutonomicPerfectionist Sep 25, 2023
4bd95ae
Remove mtest (#3177)
AutonomicPerfectionist Sep 25, 2023
f691b61
Revert accidental removal of ggml_mpi_backend_init
AutonomicPerfectionist Sep 25, 2023
d7dbb6b
Disable warmup under MPI
AutonomicPerfectionist Sep 25, 2023
1ff69c4
Update MPI example to follow main changes
AutonomicPerfectionist Sep 25, 2023
33185ed
Remove fprintf logs from mpi main
AutonomicPerfectionist Sep 25, 2023
f67fcbc
Remove unrelated sections from mpi readme
AutonomicPerfectionist Sep 25, 2023
907f807
Replace vector with C-style array and length in llama_split_layers_we…
AutonomicPerfectionist Sep 28, 2023
6b1c471
Fix minor rebase errors
AutonomicPerfectionist Oct 24, 2023
afc2cc4
Fix MPI compilation errors
AutonomicPerfectionist Oct 25, 2023
efd73fe
Synchronize batch sequence info, fixing MPI for llama_decode()
AutonomicPerfectionist Oct 29, 2023
3fa2527
Update MPI code to new KV seq rm and bos/eos model APIs
AutonomicPerfectionist Oct 30, 2023
33b88d6
Fix some mpi mem leaks, add mpi-layer-split to help when using mpi
AutonomicPerfectionist Oct 31, 2023
da37edc
Fix missing layer_inp_i names
AutonomicPerfectionist Nov 1, 2023
51f3f8f
Allow per-node threads to be set in command-line args, add mpi suppor…
AutonomicPerfectionist Nov 1, 2023
4cf1c76
Support running speculation with two processes
AutonomicPerfectionist Nov 8, 2023
8ccaf96
Support setting layer splits per comm/model
AutonomicPerfectionist Nov 9, 2023
5f21688
Fix incorrect layer split parsing
AutonomicPerfectionist Nov 9, 2023
2ddf0fe
Split orig comm to only contain root nodes of the two subnets
AutonomicPerfectionist Nov 9, 2023
2166a12
Fix main layer split and fix speculative prompt tokenization
AutonomicPerfectionist Nov 9, 2023
fbc3d4d
Fix kv desync
AutonomicPerfectionist Nov 9, 2023
4dc25d3
Propagate exit to worker nodes
AutonomicPerfectionist Nov 9, 2023
ba31377
Add async decoding
AutonomicPerfectionist Nov 12, 2023
1b6f75d
Fix draft nodes accidentally running target
AutonomicPerfectionist Nov 13, 2023
71c6947
Re-enable async tensor send
AutonomicPerfectionist Nov 13, 2023
d73f944
Begin work on decoupling tgt and dft pipelines
AutonomicPerfectionist Nov 14, 2023
4aa9b6c
Only sync required token data
AutonomicPerfectionist Nov 14, 2023
9b67f73
Working additional run w/ reset
AutonomicPerfectionist Nov 14, 2023
802ab55
Fix hang due to early return
AutonomicPerfectionist Nov 15, 2023
b2b4033
Run pipeline in parallel
AutonomicPerfectionist Nov 15, 2023
a1a9f05
Fix memory leak, remove unneeded fields
AutonomicPerfectionist Nov 15, 2023
86a932d
Clean up output a bit
AutonomicPerfectionist Nov 15, 2023
76c8dab
Switch tensor send and pipeline sync to async
AutonomicPerfectionist Nov 15, 2023
e47fd5c
Re-enable wait recv
AutonomicPerfectionist Nov 15, 2023
3a58fef
Don't store send requests, immediately free them
AutonomicPerfectionist Nov 15, 2023
8c44ee6
Switch pipeline sync back to synced send
AutonomicPerfectionist Nov 15, 2023
c4b8362
Move tensor transmissions to tag 7 and re-enable async pipeline sync
AutonomicPerfectionist Nov 15, 2023
d5b7512
Switch isend to buffered send
AutonomicPerfectionist Nov 16, 2023
a0272a1
Add assertions to prevent buffer overflow
AutonomicPerfectionist Nov 16, 2023
7fb2630
Add additional logging
AutonomicPerfectionist Nov 16, 2023
cd10f89
Correct async tgt, but break drafts
AutonomicPerfectionist Nov 16, 2023
cbe6e2c
Mostly working async
AutonomicPerfectionist Nov 17, 2023
6933af6
Another partially working version
AutonomicPerfectionist Nov 17, 2023
b005ee1
Non-async working
AutonomicPerfectionist Nov 19, 2023
1ac4484
Rearchitect MPI so head is first
AutonomicPerfectionist Nov 20, 2023
a9685bb
Fix segfault and working async w/ no np
AutonomicPerfectionist Nov 24, 2023
7081a7a
Fix np >= 2 by using sequence offsets
AutonomicPerfectionist Nov 28, 2023
67838da
Fix draft model KV cache synchronization w/ double buffering
AutonomicPerfectionist Dec 8, 2023
615e666
Add cancellation and multiple simultaneous speculative seqs
AutonomicPerfectionist Jan 4, 2024
9f65428
Mostly fix cache sync issues and simul spec runs
AutonomicPerfectionist Jan 8, 2024
73b92c7
Cancel after sampling
AutonomicPerfectionist Jan 8, 2024
e797f1a
Force at least 2 spec runs
AutonomicPerfectionist Jan 8, 2024
7674bde
Take secondary draft sequences into account for cancellation
AutonomicPerfectionist Jan 8, 2024
9606d38
Add p_recovery and move dump_kv_cache_view_seqs out of common
AutonomicPerfectionist Jan 15, 2024
c6ac680
Enforce message ordering with transactions, abort GGML compute if can…
AutonomicPerfectionist Jan 15, 2024
a76859e
Kinda fix no shutdown issue, add more tag definitions
AutonomicPerfectionist Jan 15, 2024
6830313
Refactor speculation for better readability, check for cancellations …
AutonomicPerfectionist Jan 15, 2024
cfe3120
Fix main, kinda
AutonomicPerfectionist Jan 15, 2024
d3baaf7
Both main and speculative mostly working, add latencies to speculativ…
AutonomicPerfectionist Jan 19, 2024
d23b996
Add latencies to main
AutonomicPerfectionist Jan 21, 2024
d6a70a9
Add p_decay
AutonomicPerfectionist Feb 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 95 additions & 60 deletions common/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,18 +154,37 @@ bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) {
invalid_param = true;
break;
}
params.n_threads = std::stoi(argv[i]);
if (params.n_threads <= 0) {
params.n_threads = std::thread::hardware_concurrency();
std::string arg_next = argv[i];

// split string by , and /
const std::regex regex{R"([,/]+)"};
std::sregex_token_iterator it{arg_next.begin(), arg_next.end(), regex, -1};
std::vector<std::string> split_arg{it, {}};
params.n_threads.resize(split_arg.size());
for (size_t i = 0; i < split_arg.size(); ++i) {
params.n_threads[i] = std::stoi(split_arg[i]);
if (params.n_threads[i] <= 0) {
params.n_threads[i] = std::thread::hardware_concurrency();
}
}

} else if (arg == "-tb" || arg == "--threads-batch") {
if (++i >= argc) {
invalid_param = true;
break;
}
params.n_threads_batch = std::stoi(argv[i]);
if (params.n_threads_batch <= 0) {
params.n_threads_batch = std::thread::hardware_concurrency();
std::string arg_next = argv[i];

// split string by , and /
const std::regex regex{R"([,/]+)"};
std::sregex_token_iterator it{arg_next.begin(), arg_next.end(), regex, -1};
std::vector<std::string> split_arg{it, {}};
params.n_threads_batch.resize(split_arg.size());
for (size_t i = 0; i < split_arg.size(); ++i) {
params.n_threads_batch[i] = std::stoi(split_arg[i]);
if (params.n_threads_batch[i] <= 0) {
params.n_threads_batch[i] = std::thread::hardware_concurrency();
}
}
} else if (arg == "-p" || arg == "--prompt") {
if (++i >= argc) {
Expand Down Expand Up @@ -429,6 +448,18 @@ bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) {
break;
}
params.p_split = std::stof(argv[i]);
} else if (arg == "--p-recovery" || arg == "-pr") {
if (++i >= argc) {
invalid_param = true;
break;
}
params.p_recovery = std::stof(argv[i]);
} else if (arg == "--p-decay" || arg == "-pd") {
if (++i >= argc) {
invalid_param = true;
break;
}
params.p_decay = std::stof(argv[i]);
} else if (arg == "-m" || arg == "--model") {
if (++i >= argc) {
invalid_param = true;
Expand Down Expand Up @@ -540,6 +571,30 @@ bool gpt_params_parse_ex(int argc, char ** argv, gpt_params & params) {
#else
fprintf(stderr, "warning: llama.cpp was compiled without cuBLAS. It is not possible to set a main GPU.\n");
#endif
} else if (arg == "--mpi-layer-split") {
if (++i >= argc) {
invalid_param = true;
break;
}
std::string arg_next = argv[i];

// split string by , and /
const std::regex regex{R"([\/]+)"};
const std::regex inner_regex{R"([,]+)"};

std::sregex_token_iterator it{arg_next.begin(), arg_next.end(), regex, -1};
std::vector<std::string> split_arg{it, {}};
params.mpi_layer_split.resize(split_arg.size());
for (size_t i = 0; i < split_arg.size(); ++i) {
std::sregex_token_iterator it_inner{split_arg[i].begin(), split_arg[i].end(), inner_regex, -1};
std::vector<std::string> split_arg_inner{it_inner, {}};
params.mpi_layer_split[i].resize(split_arg_inner.size());
for (size_t j = 0; j < split_arg_inner.size(); ++j) {
params.mpi_layer_split[i][j] = std::stof(split_arg_inner[j]);
}
}


} else if (arg == "--tensor-split" || arg == "-ts") {
if (++i >= argc) {
invalid_param = true;
Expand Down Expand Up @@ -742,7 +797,7 @@ void gpt_print_usage(int /*argc*/, char ** argv, const gpt_params & params) {
printf(" (can be specified more than once for multiple prompts).\n");
printf(" --color colorise output to distinguish prompt and user input from generations\n");
printf(" -s SEED, --seed SEED RNG seed (default: -1, use random seed for < 0)\n");
printf(" -t N, --threads N number of threads to use during generation (default: %d)\n", params.n_threads);
printf(" -t N, --threads N number of threads to use during generation (default: %d)\n", params.n_threads[0]);
printf(" -tb N, --threads-batch N\n");
printf(" number of threads to use during batch and prompt processing (default: same as --threads)\n");
printf(" -p PROMPT, --prompt PROMPT\n");
Expand Down Expand Up @@ -811,6 +866,8 @@ void gpt_print_usage(int /*argc*/, char ** argv, const gpt_params & params) {
printf(" -ns N, --sequences N number of sequences to decode (default: %d)\n", params.n_sequences);
printf(" -pa N, --p-accept N speculative decoding accept probability (default: %.1f)\n", (double)params.p_accept);
printf(" -ps N, --p-split N speculative decoding split probability (default: %.1f)\n", (double)params.p_split);
printf(" -pr N, --p-recovery N PipeInfer probability recovery (default: %.1f)\n", (double)params.p_recovery);
printf(" -pd N, --p-decay N PipeInfer probability decay (default: %.1f)\n", (double)params.p_decay);
printf(" -cb, --cont-batching enable continuous batching (a.k.a dynamic batching) (default: disabled)\n");
printf(" --mmproj MMPROJ_FILE path to a multimodal projector file for LLaVA. see examples/llava/README.md\n");
printf(" --image IMAGE_FILE path to an image file. use with multimodal models\n");
Expand All @@ -836,6 +893,9 @@ void gpt_print_usage(int /*argc*/, char ** argv, const gpt_params & params) {
printf(" use " GGML_CUBLAS_NAME " instead of custom mul_mat_q " GGML_CUDA_NAME " kernels.\n");
printf(" Not recommended since this is both slower and uses more VRAM.\n");
#endif // GGML_USE_CUBLAS
#endif
#ifdef GGML_USE_MPI
printf(" --mpi-layer-split N percentiles to split the layers by across nodes\n");
#endif
printf(" --verbose-prompt print prompt before generation\n");
printf(" -dkvc, --dump-kv-cache\n");
Expand All @@ -859,9 +919,9 @@ void gpt_print_usage(int /*argc*/, char ** argv, const gpt_params & params) {
std::string get_system_info(const gpt_params & params) {
std::ostringstream os;

os << "system_info: n_threads = " << params.n_threads;
if (params.n_threads_batch != -1) {
os << " (n_threads_batch = " << params.n_threads_batch << ")";
os << "system_info: n_threads = " << params.n_threads[0];
if (params.n_threads_batch[0] != -1) {
os << " (n_threads_batch = " << params.n_threads_batch[0] << ")";
}
os << " / " << std::thread::hardware_concurrency() << " | " << llama_print_system_info();

Expand Down Expand Up @@ -909,8 +969,8 @@ struct llama_context_params llama_context_params_from_gpt_params(const gpt_param

cparams.n_ctx = params.n_ctx;
cparams.n_batch = params.n_batch;
cparams.n_threads = params.n_threads;
cparams.n_threads_batch = params.n_threads_batch == -1 ? params.n_threads : params.n_threads_batch;
cparams.n_threads = params.n_threads[0];
cparams.n_threads_batch = params.n_threads_batch[0] == -1 ? params.n_threads[0] : params.n_threads_batch[0];
cparams.mul_mat_q = params.mul_mat_q;
cparams.seed = params.seed;
cparams.f16_kv = params.memory_f16;
Expand Down Expand Up @@ -944,12 +1004,15 @@ void llama_batch_add(
for (size_t i = 0; i < seq_ids.size(); ++i) {
batch.seq_id[batch.n_tokens][i] = seq_ids[i];
}
batch.logits [batch.n_tokens] = logits;
if (batch.logits) {
batch.logits[batch.n_tokens] = logits;
}

batch.n_tokens++;
}

std::tuple<struct llama_model *, struct llama_context *> llama_init_from_gpt_params(gpt_params & params) {
int32_t n_threads = params.n_threads[0];
auto mparams = llama_model_params_from_gpt_params(params);

llama_model * model = llama_load_model_from_file(params.model.c_str(), mparams);
Expand All @@ -967,6 +1030,16 @@ std::tuple<struct llama_model *, struct llama_context *> llama_init_from_gpt_par
return std::make_tuple(nullptr, nullptr);
}

#ifdef GGML_USE_MPI
int node_id = llama_node_id(lctx);
n_threads = (node_id >= params.n_threads.size()) ? get_num_physical_cores() : params.n_threads[node_id];
int32_t n_threads_batch = (node_id >= params.n_threads_batch.size()) ? -1 : params.n_threads_batch[node_id];

params.n_threads[0] = n_threads; // So we can treat index 0 as what our n_threads is elsewhere
params.n_threads_batch[0] = n_threads_batch;
llama_set_n_threads(lctx, n_threads, (n_threads_batch > 0) ? n_threads_batch : get_num_physical_cores());
#endif

for (unsigned int i = 0; i < params.lora_adapter.size(); ++i) {
const std::string& lora_adapter = std::get<0>(params.lora_adapter[i]);
float lora_scale = std::get<1>(params.lora_adapter[i]);
Expand All @@ -976,7 +1049,7 @@ std::tuple<struct llama_model *, struct llama_context *> llama_init_from_gpt_par
((i > 0) || params.lora_base.empty())
? NULL
: params.lora_base.c_str(),
params.n_threads);
n_threads);
if (err != 0) {
fprintf(stderr, "%s: error: failed to apply lora adapter\n", __func__);
llama_free(lctx);
Expand All @@ -992,9 +1065,16 @@ std::tuple<struct llama_model *, struct llama_context *> llama_init_from_gpt_par
{
LOG("warming up the model with an empty run\n");

#ifndef GGML_USE_MPI
// When using MPI, llama_decode() enters into an infinite loop
// on non-head nodes. Thus, we only want to warmup the model here
// if we aren't using MPI.
// FIXME have a way to terminate the infinite loop so we can warmup the model
// in MPI mode
std::vector<llama_token> tmp = { llama_token_bos(model), llama_token_eos(model), };
llama_decode(lctx, llama_batch_get_one(tmp.data(), std::min(tmp.size(), (size_t) params.n_batch), 0, 0));
llama_kv_cache_clear(lctx);
#endif
llama_reset_timings(lctx);
}

Expand Down Expand Up @@ -1384,7 +1464,7 @@ void dump_non_result_info_yaml(FILE * stream, const gpt_params & params, const l
dump_vector_float_yaml(stream, "tensor_split", tensor_split_vector);

fprintf(stream, "tfs: %f # default: 1.0\n", sparams.tfs_z);
fprintf(stream, "threads: %d # default: %d\n", params.n_threads, std::thread::hardware_concurrency());
fprintf(stream, "threads: %d # default: %d\n", params.n_threads[0], std::thread::hardware_concurrency());
fprintf(stream, "top_k: %d # default: 40\n", sparams.top_k);
fprintf(stream, "top_p: %f # default: 0.95\n", sparams.top_p);
fprintf(stream, "min_p: %f # default: 0.0\n", sparams.min_p);
Expand Down Expand Up @@ -1419,49 +1499,4 @@ void dump_kv_cache_view(const llama_kv_cache_view & view, int row_size) {
printf("\n=== Done dumping\n");
}

void dump_kv_cache_view_seqs(const llama_kv_cache_view & view, int row_size) {
static const char slot_chars[] = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";

printf("=== Dumping KV cache. total cells %d, max sequences per cell %d, populated cells %d, total tokens in cache %d, largest empty slot=%d @ %d\n",
view.n_cells, view.n_max_seq, view.used_cells, view.token_count, view.max_contiguous, view.max_contiguous_idx);

std::unordered_map<llama_seq_id, size_t> seqs;
llama_kv_cache_view_cell * c_curr = view.cells;
llama_seq_id * cs_curr = view.cells_sequences;

for (int i = 0; i < view.n_cells; i++, c_curr++, cs_curr += view.n_max_seq) {
for (int j = 0; j < view.n_max_seq; j++) {
if (cs_curr[j] < 0) { continue; }
if (seqs.find(cs_curr[j]) == seqs.end()) {
if (seqs.size() + 1 >= sizeof(slot_chars)) { break; }
seqs[cs_curr[j]] = seqs.size();
}
}
if (seqs.size() + 1 >= sizeof(slot_chars)) { break; }
}

printf("=== Sequence legend: ");
for (const auto & it : seqs) {
printf("%zu=%d, ", it.second, it.first);
}
printf("'+'=other sequence ids");

c_curr = view.cells;
cs_curr = view.cells_sequences;
for (int i = 0; i < view.n_cells; i++, c_curr++, cs_curr += view.n_max_seq) {
if (i % row_size == 0) {
printf("\n%5d: ", i);
}
for (int j = 0; j < view.n_max_seq; j++) {
if (cs_curr[j] >= 0) {
const auto & it = seqs.find(cs_curr[j]);
putchar(it != seqs.end() ? int(slot_chars[it->second]) : '+');
} else {
putchar('.');
}
}
putchar(' ');
}

printf("\n=== Done dumping\n");
}
10 changes: 6 additions & 4 deletions common/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ int32_t get_num_physical_cores();
struct gpt_params {
uint32_t seed = -1; // RNG seed

int32_t n_threads = get_num_physical_cores();
int32_t n_threads_batch = -1; // number of threads to use for batch processing (-1 = use n_threads)
std::vector<int32_t> n_threads = {get_num_physical_cores()};
std::vector<int32_t> n_threads_batch = {-1}; // number of threads to use for batch processing (-1 = use n_threads)
int32_t n_predict = -1; // new tokens to predict
int32_t n_ctx = 512; // context size
int32_t n_batch = 512; // batch size for prompt processing (must be >=32 to use BLAS)
Expand All @@ -57,9 +57,12 @@ struct gpt_params {
int32_t n_sequences = 1; // number of sequences to decode
float p_accept = 0.5f; // speculative decoding accept probability
float p_split = 0.1f; // speculative decoding split probability
float p_recovery = 0.0f; // Cumulative probability that p_accept and p_split are increased by per-iteration.
float p_decay = 0.0f; // Cumulative probability that p_accept and p_split are decreased by per-iteration when drafting stops due to p_accept.
int32_t n_gpu_layers = -1; // number of layers to store in VRAM (-1 - use default)
int32_t n_gpu_layers_draft = -1; // number of layers to store in VRAM for the draft model (-1 - use default)
int32_t main_gpu = 0; // the GPU that is used for scratch and small tensors
std::vector<std::vector<float>> mpi_layer_split = {{1.0}}; // list of percentages of the total number of layers
float tensor_split[LLAMA_MAX_DEVICES] = {0}; // how split tensors should be distributed across GPUs
int32_t n_beams = 0; // if non-zero then use beam search of given width.
float rope_freq_base = 0.0f; // RoPE base frequency
Expand Down Expand Up @@ -227,5 +230,4 @@ void dump_non_result_info_yaml(
// Dump the KV cache view with the number of sequences per cell.
void dump_kv_cache_view(const llama_kv_cache_view & view, int row_size = 80);

// Dump the KV cache view showing individual sequences in each cell (long output).
void dump_kv_cache_view_seqs(const llama_kv_cache_view & view, int row_size = 40);

Loading