broker: usage capture + opts widening (Phase 7 commit #1)
Foundation for Phase 7. broker.chat_stream now emits a third
on_delta kind ("usage") after the stream completes successfully;
broker.chat returns (text, usage). Backward-compatible — existing
callers that ignore the new kind / second value continue working
via Lua's drop-extra-returns semantics.
Changes:
- build_request widens (A3 + R3) — `(model_cfg, msgs, stream, opts)`.
opts.tools / opts.max_tokens / opts.include_usage / opts.category
all live inside opts now. Both internal call sites updated.
- opts.include_usage defaults to true for streaming requests; sets
`stream_options: { include_usage: true }` in the request body.
B1: required for local llama.cpp to emit usage; cloud honors as
a no-op (emits anyway).
- on_event captures `doc.usage` into a closure-local `final_usage`.
N1: the check is INDEPENDENT of the choice/delta branches — local
emits usage on choices=[] chunks (choice nil) while cloud emits
with non-empty choices + finish_reason. Both shapes funnel here.
- After curl.post_sse returns successfully (NOT on transport/api
errors), if final_usage is set, emit on_delta("usage", {prompt_tokens,
completion_tokens, total_tokens, cost, model, category}). cost is
nil for local (R6 preserves the nil vs 0 distinction the
accumulator needs). model is model_cfg.model — caller-stable per
B4 + R2 so call_broker's fallback retry attributes usage to the
fallback's model name without wrapper-side tracking.
- M.chat (R1 — BLOCKER fix): on_delta now also captures kind=="usage"
alongside "text"; M.chat returns (text, usage). Without this fix
4 of 5 non-streaming categories (summarize / delegate /
memory_summarize / probe) would silently report zero usage.
Smoke verified against live hossenfelder:8082:
- CLOUD chat -> (text, usage); cost=2.9e-05, model=anthropic/...
- LOCAL chat -> (text, usage); cost=NIL (correct per R6),
model=qwen-coder-7b-snappy-8k
- CLOUD stream -> on_delta("usage", {...}) with category="test"
echoed; model name caller-stable.
Regression: test_safety 87/87, test_router_model 31/31, repl loads.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+71
-12
@@ -27,10 +27,19 @@ local function build_headers(model_cfg)
|
|||||||
return h
|
return h
|
||||||
end
|
end
|
||||||
|
|
||||||
local function build_request(model_cfg, messages, stream, tools, max_tokens)
|
-- Phase 7 (A3): build_request widens to take an opts table; previously
|
||||||
|
-- positional (tools, max_tokens). Both internal call sites (chat_stream
|
||||||
|
-- and M.chat-via-chat_stream) updated. opts fields:
|
||||||
|
-- .tools per Phase 2 (omitted from body when nil/empty)
|
||||||
|
-- .max_tokens per Phase 3 (omitted when nil)
|
||||||
|
-- .include_usage Phase 7 — default true; sets stream_options.include_usage
|
||||||
|
-- in the request body (B1: required for local llama.cpp
|
||||||
|
-- to emit usage; no-op for cloud which emits anyway).
|
||||||
|
local function build_request(model_cfg, messages, stream, opts)
|
||||||
if not (model_cfg and model_cfg.endpoint and model_cfg.model) then
|
if not (model_cfg and model_cfg.endpoint and model_cfg.model) then
|
||||||
return nil, "broker: model_cfg.endpoint and .model are required"
|
return nil, "broker: model_cfg.endpoint and .model are required"
|
||||||
end
|
end
|
||||||
|
opts = opts or {}
|
||||||
local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions"
|
local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions"
|
||||||
local req = {
|
local req = {
|
||||||
model = model_cfg.model,
|
model = model_cfg.model,
|
||||||
@@ -40,30 +49,46 @@ local function build_request(model_cfg, messages, stream, tools, max_tokens)
|
|||||||
}
|
}
|
||||||
-- Per PHASE2.md §12 risk row "Empty tools array": some servers reject
|
-- Per PHASE2.md §12 risk row "Empty tools array": some servers reject
|
||||||
-- "tools": []. Only set the field when the list has entries.
|
-- "tools": []. Only set the field when the list has entries.
|
||||||
if tools and #tools > 0 then req.tools = tools end
|
if opts.tools and #opts.tools > 0 then req.tools = opts.tools end
|
||||||
-- Phase 3 (A2): max_tokens passthrough — used by safety.is_destructive
|
-- Phase 3 (A2): max_tokens passthrough — used by safety.is_destructive
|
||||||
-- to cap YES/NO probes at ~4 tokens. Omitted when nil (Phase 1/2
|
-- to cap YES/NO probes at ~4 tokens. Omitted when nil (Phase 1/2
|
||||||
-- callers unaffected — model defaults still apply).
|
-- callers unaffected — model defaults still apply).
|
||||||
if max_tokens then req.max_tokens = max_tokens end
|
if opts.max_tokens then req.max_tokens = opts.max_tokens end
|
||||||
|
-- Phase 7 (B1): default ON for streaming requests; the flag is
|
||||||
|
-- required to make local llama.cpp emit usage. Cloud honors it as
|
||||||
|
-- a no-op (emits usage with or without). Per-call opt-out:
|
||||||
|
-- opts.include_usage = false.
|
||||||
|
if stream and opts.include_usage ~= false then
|
||||||
|
req.stream_options = { include_usage = true }
|
||||||
|
end
|
||||||
return url, json.encode(req), build_headers(model_cfg),
|
return url, json.encode(req), build_headers(model_cfg),
|
||||||
(model_cfg.timeout_ms or 60000)
|
(model_cfg.timeout_ms or 60000)
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Streaming /v1/chat/completions.
|
-- Streaming /v1/chat/completions.
|
||||||
-- Signature widens vs Phase 1: opts is optional and may carry .tools.
|
-- Signature widens vs Phase 1: opts is optional and may carry .tools.
|
||||||
|
-- Phase 7 adds .include_usage (default true) + .category (echoed into
|
||||||
|
-- the emitted usage payload for caller-side accumulator tagging).
|
||||||
-- on_delta is called as on_delta(kind, payload):
|
-- on_delta is called as on_delta(kind, payload):
|
||||||
-- on_delta("text", content_string) - per text chunk
|
-- on_delta("text", content_string) - per text chunk
|
||||||
-- on_delta("tool_call", { id, name, arguments }) - once per completed
|
-- on_delta("tool_call", { id, name, arguments }) - once per completed
|
||||||
-- tool call (on finish_reason "tool_calls").
|
-- tool call (on finish_reason "tool_calls").
|
||||||
-- arguments is the raw JSON-string accumulated across
|
-- on_delta("usage", { prompt_tokens, completion_tokens,
|
||||||
-- deltas; caller does the dkjson.decode.
|
-- total_tokens, cost, model, category })
|
||||||
|
-- - Phase 7: emitted once after the stream
|
||||||
|
-- completes successfully, IF the provider sent
|
||||||
|
-- a usage block. Skipped on transport / API
|
||||||
|
-- errors. model is model_cfg.model (caller-
|
||||||
|
-- stable per B4 + R2); cost is nil for
|
||||||
|
-- providers that don't emit it (local llama.cpp);
|
||||||
|
-- category is opts.category or "main".
|
||||||
-- Returns:
|
-- Returns:
|
||||||
-- true stream ended cleanly
|
-- true stream ended cleanly
|
||||||
-- nil, errmsg transport / API failure
|
-- nil, errmsg transport / API failure
|
||||||
function M.chat_stream(model_cfg, messages, on_delta, opts)
|
function M.chat_stream(model_cfg, messages, on_delta, opts)
|
||||||
opts = opts or {}
|
opts = opts or {}
|
||||||
local url, body, headers, timeout_ms =
|
local url, body, headers, timeout_ms =
|
||||||
build_request(model_cfg, messages, true, opts.tools, opts.max_tokens)
|
build_request(model_cfg, messages, true, opts)
|
||||||
if not url then return nil, body end -- url slot carries err on bad cfg
|
if not url then return nil, body end -- url slot carries err on bad cfg
|
||||||
-- Phase 3: opts.timeout_ms overrides the model's default. Used by
|
-- Phase 3: opts.timeout_ms overrides the model's default. Used by
|
||||||
-- safety.is_destructive's LLM probe to cap YES/NO checks at ~15s even
|
-- safety.is_destructive's LLM probe to cap YES/NO checks at ~15s even
|
||||||
@@ -79,6 +104,11 @@ function M.chat_stream(model_cfg, messages, on_delta, opts)
|
|||||||
local tc_by_index = {}
|
local tc_by_index = {}
|
||||||
local tc_index_order = {} -- preserve emission order
|
local tc_index_order = {} -- preserve emission order
|
||||||
local index_absent_warned = false
|
local index_absent_warned = false
|
||||||
|
-- Phase 7: usage captured from the final SSE chunk (per B2 either
|
||||||
|
-- on a non-empty-choices chunk with finish_reason — cloud, or on a
|
||||||
|
-- choices=[] chunk before [DONE] — local). Emitted as
|
||||||
|
-- on_delta("usage", ...) AFTER curl.post_sse returns (B5).
|
||||||
|
local final_usage = nil
|
||||||
|
|
||||||
local function on_event(data)
|
local function on_event(data)
|
||||||
if done then return end
|
if done then return end
|
||||||
@@ -94,6 +124,23 @@ function M.chat_stream(model_cfg, messages, on_delta, opts)
|
|||||||
done = true
|
done = true
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
-- N1: usage branch is INDEPENDENT of the choice/delta branches.
|
||||||
|
-- Check unconditionally — local emits usage on choices=[] chunks
|
||||||
|
-- where `choice` is nil; cloud emits with non-empty choices.
|
||||||
|
-- R2: payload.model is the caller-stable model_cfg.model (upvar),
|
||||||
|
-- so call_broker's fallback retry naturally credits the right
|
||||||
|
-- model — wrapper callers key by payload.model.
|
||||||
|
if doc.usage then
|
||||||
|
final_usage = {
|
||||||
|
prompt_tokens = doc.usage.prompt_tokens or 0,
|
||||||
|
completion_tokens = doc.usage.completion_tokens or 0,
|
||||||
|
total_tokens = doc.usage.total_tokens or 0,
|
||||||
|
cost = doc.usage.cost, -- nil for local (R6 preserves nil)
|
||||||
|
model = model_cfg.model, -- caller-stable per B4/R2
|
||||||
|
category = opts.category or "main",
|
||||||
|
}
|
||||||
|
-- Don't emit yet; fired after curl.post_sse returns.
|
||||||
|
end
|
||||||
local choice = doc.choices and doc.choices[1]
|
local choice = doc.choices and doc.choices[1]
|
||||||
local delta = choice and choice.delta
|
local delta = choice and choice.delta
|
||||||
|
|
||||||
@@ -149,25 +196,37 @@ function M.chat_stream(model_cfg, messages, on_delta, opts)
|
|||||||
local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms)
|
local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms)
|
||||||
if api_err then return nil, "api: " .. api_err end
|
if api_err then return nil, "api: " .. api_err end
|
||||||
if not ok then return nil, "transport: " .. tostring(err) end
|
if not ok then return nil, "transport: " .. tostring(err) end
|
||||||
|
-- Phase 7 (B5): emit captured usage AFTER stream completes, as the
|
||||||
|
-- last event in stream order. Skipped on transport/api errors (the
|
||||||
|
-- accumulator stays unchanged for the failed call).
|
||||||
|
if final_usage then on_delta("usage", final_usage) end
|
||||||
return true
|
return true
|
||||||
end
|
end
|
||||||
|
|
||||||
-- Send a /v1/chat/completions request and return the full assistant text.
|
-- Send a /v1/chat/completions request and return the full assistant text.
|
||||||
-- Thin buffering wrapper over M.chat_stream — same path as the streaming
|
-- Thin buffering wrapper over M.chat_stream — same path as the streaming
|
||||||
-- consumer, so the broker keeps one HTTP shape (stream:true always).
|
-- consumer, so the broker keeps one HTTP shape (stream:true always).
|
||||||
-- M.chat's external contract is unchanged in Phase 2 (C1 in review):
|
-- M.chat's external contract widens in Phase 7 (R1): now returns
|
||||||
-- it returns the text string and silently ignores any tool_call kinds
|
-- (text, usage). Existing callers that ignore the second value continue
|
||||||
-- the model might emit (no caller of M.chat passes opts.tools).
|
-- to work — Lua silently drops extra return values. Callers that want
|
||||||
|
-- cost/usage tracking do `local r, u = broker.chat(...)` and route u
|
||||||
|
-- to ctx:add_usage via the central _record_usage helper.
|
||||||
|
-- Tool-call kinds are still silently ignored (no caller of M.chat
|
||||||
|
-- passes opts.tools).
|
||||||
-- Returns:
|
-- Returns:
|
||||||
-- assistant_content_string on success
|
-- text, usage on success (usage may be nil if
|
||||||
|
-- the provider didn't emit one)
|
||||||
-- nil, errmsg on transport / decode / API failure
|
-- nil, errmsg on transport / decode / API failure
|
||||||
function M.chat(model_cfg, messages, opts)
|
function M.chat(model_cfg, messages, opts)
|
||||||
local parts = {}
|
local parts = {}
|
||||||
|
local captured_usage -- R1: required so callers see usage
|
||||||
local ok, err = M.chat_stream(model_cfg, messages, function(kind, payload)
|
local ok, err = M.chat_stream(model_cfg, messages, function(kind, payload)
|
||||||
if kind == "text" then parts[#parts + 1] = payload end
|
if kind == "text" then parts[#parts + 1] = payload
|
||||||
|
elseif kind == "usage" then captured_usage = payload
|
||||||
|
end
|
||||||
end, opts)
|
end, opts)
|
||||||
if not ok then return nil, err end
|
if not ok then return nil, err end
|
||||||
return table.concat(parts)
|
return table.concat(parts), captured_usage
|
||||||
end
|
end
|
||||||
|
|
||||||
return M
|
return M
|
||||||
|
|||||||
Reference in New Issue
Block a user