server : implement prompt processing progress report in stream mode (#15827)

* server : implement `return_progress`

* add timings.cache_n

* add progress.time_ms

* add test

* fix test for chat/completions

* readme: add docs on timings

* use ggml_time_us

Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>

---------

Co-authored-by: Georgi Gerganov <ggerganov@gmail.com>
This commit is contained in:
Xuan-Son Nguyen
2025-09-06 18:35:04 +07:00
committed by GitHub
parent 01806e7771
commit 61bdfd5298
3 changed files with 152 additions and 18 deletions

View File

@@ -110,9 +110,10 @@ static bool server_task_type_need_logits(server_task_type task_type) {
}
struct slot_params {
bool stream = true;
bool cache_prompt = true; // remember the prompt to avoid reprocessing all prompt
bool return_tokens = false;
bool stream = true;
bool cache_prompt = true; // remember the prompt to avoid reprocessing all prompt
bool return_tokens = false;
bool return_progress = false;
int32_t n_keep = 0; // number of tokens to keep from initial prompt
int32_t n_discard = 0; // number of tokens after n_keep that may be discarded when shifting context, 0 defaults to half
@@ -307,11 +308,11 @@ struct server_task {
// enabling this will output extra debug information in the HTTP responses from the server
params.verbose = params_base.verbosity > 9;
params.timings_per_token = json_value(data, "timings_per_token", false);
params.stream = json_value(data, "stream", false);
params.cache_prompt = json_value(data, "cache_prompt", true);
params.return_tokens = json_value(data, "return_tokens", false);
params.return_progress = json_value(data, "return_progress", false);
params.n_predict = json_value(data, "n_predict", json_value(data, "max_tokens", defaults.n_predict));
params.n_indent = json_value(data, "n_indent", defaults.n_indent);
params.n_keep = json_value(data, "n_keep", defaults.n_keep);
@@ -608,6 +609,8 @@ struct server_task {
};
struct result_timings {
int32_t cache_n = -1;
int32_t prompt_n = -1;
double prompt_ms;
double prompt_per_token_ms;
@@ -624,6 +627,8 @@ struct result_timings {
json to_json() const {
json base = {
{"cache_n", cache_n},
{"prompt_n", prompt_n},
{"prompt_ms", prompt_ms},
{"prompt_per_token_ms", prompt_per_token_ms},
@@ -644,6 +649,22 @@ struct result_timings {
}
};
struct result_prompt_progress {
int32_t total = 0;
int32_t cache = 0;
int32_t processed = 0;
int64_t time_ms = 0;
json to_json() const {
return json {
{"total", total},
{"cache", cache},
{"processed", processed},
{"time_ms", time_ms},
};
}
};
struct server_task_result {
int id = -1;
int id_slot = -1;
@@ -999,8 +1020,10 @@ struct server_task_result_cmpl_partial : server_task_result {
int32_t n_prompt_tokens;
bool post_sampling_probs;
bool is_progress = false;
completion_token_output prob_output;
result_timings timings;
result_prompt_progress progress;
// OAI-compat fields
bool verbose = false;
@@ -1045,6 +1068,9 @@ struct server_task_result_cmpl_partial : server_task_result {
if (timings.prompt_n > 0) {
res.push_back({"timings", timings.to_json()});
}
if (is_progress) {
res.push_back({"prompt_progress", progress.to_json()});
}
if (!prob_output.probs.empty()) {
res["completion_probabilities"] = completion_token_output::probs_vector_to_json({prob_output}, post_sampling_probs);
}
@@ -1082,6 +1108,9 @@ struct server_task_result_cmpl_partial : server_task_result {
if (timings.prompt_n >= 0) {
res.push_back({"timings", timings.to_json()});
}
if (is_progress) {
res.push_back({"prompt_progress", progress.to_json()});
}
return res;
}
@@ -1109,7 +1138,7 @@ struct server_task_result_cmpl_partial : server_task_result {
});
};
// We have to send an initial update to conform to openai behavior
if (first) {
if (first || is_progress) {
add_delta({
{"role", "assistant"},
{"content", nullptr},
@@ -1121,16 +1150,20 @@ struct server_task_result_cmpl_partial : server_task_result {
}
if (!deltas.empty()) {
GGML_ASSERT(deltas[deltas.size() - 1].at("choices").size() >= 1);
auto & last_json = deltas[deltas.size() - 1];
GGML_ASSERT(last_json.at("choices").size() >= 1);
if (prob_output.probs.size() > 0) {
deltas[deltas.size() - 1].at("choices").at(0)["logprobs"] = json {
last_json.at("choices").at(0)["logprobs"] = json {
{"content", completion_token_output::probs_vector_to_json({prob_output}, post_sampling_probs)},
};
}
if (timings.prompt_n >= 0) {
deltas[deltas.size() - 1].push_back({"timings", timings.to_json()});
last_json.push_back({"timings", timings.to_json()});
}
if (is_progress) {
last_json.push_back({"prompt_progress", progress.to_json()});
}
}
@@ -1404,6 +1437,7 @@ struct server_slot {
// n_prompt_tokens may not be equal to prompt_tokens.size(), because prompt maybe truncated
int32_t n_prompt_tokens = 0;
int32_t n_prompt_tokens_cache = 0;
int32_t n_prompt_tokens_processed = 0;
// input prompt tokens
@@ -1456,7 +1490,9 @@ struct server_slot {
void reset() {
SLT_DBG(*this, "%s", "\n");
n_prompt_tokens = 0;
n_prompt_tokens = 0;
n_prompt_tokens_cache = 0;
last_nl_pos = 0;
generated_text = "";
has_new_line = false;
@@ -1547,6 +1583,8 @@ struct server_slot {
result_timings get_timings() const {
result_timings timings;
timings.cache_n = n_prompt_tokens_cache;
timings.prompt_n = n_prompt_tokens_processed;
timings.prompt_ms = t_prompt_processing;
timings.prompt_per_token_ms = t_prompt_processing / n_prompt_tokens_processed;
@@ -2520,7 +2558,7 @@ struct server_context {
slot.add_token(result);
if (slot.params.stream) {
send_partial_response(slot, result);
send_partial_response(slot, result, false);
}
}
@@ -2712,13 +2750,24 @@ struct server_context {
return true;
}
void send_partial_response(server_slot & slot, const completion_token_output & tkn) {
void send_partial_response(server_slot & slot, const completion_token_output & tkn, bool is_progress) {
auto res = std::make_unique<server_task_result_cmpl_partial>();
res->id = slot.id_task;
res->index = slot.index;
res->content = tkn.text_to_send;
res->tokens = { tkn.tok };
res->id = slot.id_task;
res->index = slot.index;
if (is_progress) {
res->is_progress = true;
res->progress.total = slot.n_prompt_tokens;
res->progress.cache = slot.n_prompt_tokens_cache;
res->progress.processed = slot.cache_tokens.size();
res->progress.time_ms = (ggml_time_us() - slot.t_start_process_prompt / 1000);
} else {
res->content = tkn.text_to_send;
res->tokens = { tkn.tok };
slot.update_chat_msg(res->oaicompat_msg_diffs);
}
res->n_decoded = slot.n_decoded;
res->n_prompt_tokens = slot.n_prompt_tokens;
@@ -2729,8 +2778,6 @@ struct server_context {
res->oaicompat_model = slot.params.oaicompat_model;
res->oaicompat_cmpl_id = slot.params.oaicompat_cmpl_id;
slot.update_chat_msg(res->oaicompat_msg_diffs);
// populate res.probs_output
if (slot.params.sampling.n_probs > 0) {
res->prob_output = tkn; // copy the token probs
@@ -3557,6 +3604,7 @@ struct server_context {
slot.n_past--;
}
slot.n_prompt_tokens_cache = slot.n_past;
slot.n_prompt_tokens_processed = 0;
}
@@ -3573,7 +3621,8 @@ struct server_context {
llama_memory_seq_rm(llama_get_memory(ctx), slot.id, -1, -1);
// there is no common part left
slot.n_past = 0;
slot.n_past = 0;
slot.n_prompt_tokens_cache = 0;
}
SLT_INF(slot, "kv cache rm [%d, end)\n", slot.n_past);
@@ -3767,6 +3816,13 @@ struct server_context {
n_batch = llama_n_batch(ctx);
for (auto & slot : slots) {
// optionally send prompt processing progress
if (slot.state == SLOT_STATE_PROCESSING_PROMPT || slot.state == SLOT_STATE_DONE_PROMPT) {
if (slot.params.stream && slot.params.return_progress) {
send_partial_response(slot, {}, true);
}
}
if (slot.i_batch < (int) i || slot.i_batch >= (int) (i + n_tokens)) {
continue; // continue loop of slots
}