From 7364963b0084cc518da5e7c50a963c3a47547a59 Mon Sep 17 00:00:00 2001 From: Markus Fritsche Date: Sat, 16 May 2026 22:57:14 +0000 Subject: [PATCH] broker: usage capture + opts widening (Phase 7 commit #1) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation for Phase 7. broker.chat_stream now emits a third on_delta kind ("usage") after the stream completes successfully; broker.chat returns (text, usage). Backward-compatible — existing callers that ignore the new kind / second value continue working via Lua's drop-extra-returns semantics. Changes: - build_request widens (A3 + R3) — `(model_cfg, msgs, stream, opts)`. opts.tools / opts.max_tokens / opts.include_usage / opts.category all live inside opts now. Both internal call sites updated. - opts.include_usage defaults to true for streaming requests; sets `stream_options: { include_usage: true }` in the request body. B1: required for local llama.cpp to emit usage; cloud honors as a no-op (emits anyway). - on_event captures `doc.usage` into a closure-local `final_usage`. N1: the check is INDEPENDENT of the choice/delta branches — local emits usage on choices=[] chunks (choice nil) while cloud emits with non-empty choices + finish_reason. Both shapes funnel here. - After curl.post_sse returns successfully (NOT on transport/api errors), if final_usage is set, emit on_delta("usage", {prompt_tokens, completion_tokens, total_tokens, cost, model, category}). cost is nil for local (R6 preserves the nil vs 0 distinction the accumulator needs). model is model_cfg.model — caller-stable per B4 + R2 so call_broker's fallback retry attributes usage to the fallback's model name without wrapper-side tracking. - M.chat (R1 — BLOCKER fix): on_delta now also captures kind=="usage" alongside "text"; M.chat returns (text, usage). Without this fix 4 of 5 non-streaming categories (summarize / delegate / memory_summarize / probe) would silently report zero usage. Smoke verified against live hossenfelder:8082: - CLOUD chat -> (text, usage); cost=2.9e-05, model=anthropic/... - LOCAL chat -> (text, usage); cost=NIL (correct per R6), model=qwen-coder-7b-snappy-8k - CLOUD stream -> on_delta("usage", {...}) with category="test" echoed; model name caller-stable. Regression: test_safety 87/87, test_router_model 31/31, repl loads. Co-Authored-By: Claude Opus 4.7 (1M context) --- broker.lua | 83 ++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 71 insertions(+), 12 deletions(-) diff --git a/broker.lua b/broker.lua index 4e5a47b..a3e9e96 100644 --- a/broker.lua +++ b/broker.lua @@ -27,10 +27,19 @@ local function build_headers(model_cfg) return h end -local function build_request(model_cfg, messages, stream, tools, max_tokens) +-- Phase 7 (A3): build_request widens to take an opts table; previously +-- positional (tools, max_tokens). Both internal call sites (chat_stream +-- and M.chat-via-chat_stream) updated. opts fields: +-- .tools per Phase 2 (omitted from body when nil/empty) +-- .max_tokens per Phase 3 (omitted when nil) +-- .include_usage Phase 7 — default true; sets stream_options.include_usage +-- in the request body (B1: required for local llama.cpp +-- to emit usage; no-op for cloud which emits anyway). +local function build_request(model_cfg, messages, stream, opts) if not (model_cfg and model_cfg.endpoint and model_cfg.model) then return nil, "broker: model_cfg.endpoint and .model are required" end + opts = opts or {} local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" local req = { model = model_cfg.model, @@ -40,30 +49,46 @@ local function build_request(model_cfg, messages, stream, tools, max_tokens) } -- Per PHASE2.md §12 risk row "Empty tools array": some servers reject -- "tools": []. Only set the field when the list has entries. - if tools and #tools > 0 then req.tools = tools end + if opts.tools and #opts.tools > 0 then req.tools = opts.tools end -- Phase 3 (A2): max_tokens passthrough — used by safety.is_destructive -- to cap YES/NO probes at ~4 tokens. Omitted when nil (Phase 1/2 -- callers unaffected — model defaults still apply). - if max_tokens then req.max_tokens = max_tokens end + if opts.max_tokens then req.max_tokens = opts.max_tokens end + -- Phase 7 (B1): default ON for streaming requests; the flag is + -- required to make local llama.cpp emit usage. Cloud honors it as + -- a no-op (emits usage with or without). Per-call opt-out: + -- opts.include_usage = false. + if stream and opts.include_usage ~= false then + req.stream_options = { include_usage = true } + end return url, json.encode(req), build_headers(model_cfg), (model_cfg.timeout_ms or 60000) end -- Streaming /v1/chat/completions. -- Signature widens vs Phase 1: opts is optional and may carry .tools. +-- Phase 7 adds .include_usage (default true) + .category (echoed into +-- the emitted usage payload for caller-side accumulator tagging). -- on_delta is called as on_delta(kind, payload): -- on_delta("text", content_string) - per text chunk -- on_delta("tool_call", { id, name, arguments }) - once per completed -- tool call (on finish_reason "tool_calls"). --- arguments is the raw JSON-string accumulated across --- deltas; caller does the dkjson.decode. +-- on_delta("usage", { prompt_tokens, completion_tokens, +-- total_tokens, cost, model, category }) +-- - Phase 7: emitted once after the stream +-- completes successfully, IF the provider sent +-- a usage block. Skipped on transport / API +-- errors. model is model_cfg.model (caller- +-- stable per B4 + R2); cost is nil for +-- providers that don't emit it (local llama.cpp); +-- category is opts.category or "main". -- Returns: -- true stream ended cleanly -- nil, errmsg transport / API failure function M.chat_stream(model_cfg, messages, on_delta, opts) opts = opts or {} local url, body, headers, timeout_ms = - build_request(model_cfg, messages, true, opts.tools, opts.max_tokens) + build_request(model_cfg, messages, true, opts) if not url then return nil, body end -- url slot carries err on bad cfg -- Phase 3: opts.timeout_ms overrides the model's default. Used by -- safety.is_destructive's LLM probe to cap YES/NO checks at ~15s even @@ -79,6 +104,11 @@ function M.chat_stream(model_cfg, messages, on_delta, opts) local tc_by_index = {} local tc_index_order = {} -- preserve emission order local index_absent_warned = false + -- Phase 7: usage captured from the final SSE chunk (per B2 either + -- on a non-empty-choices chunk with finish_reason — cloud, or on a + -- choices=[] chunk before [DONE] — local). Emitted as + -- on_delta("usage", ...) AFTER curl.post_sse returns (B5). + local final_usage = nil local function on_event(data) if done then return end @@ -94,6 +124,23 @@ function M.chat_stream(model_cfg, messages, on_delta, opts) done = true return end + -- N1: usage branch is INDEPENDENT of the choice/delta branches. + -- Check unconditionally — local emits usage on choices=[] chunks + -- where `choice` is nil; cloud emits with non-empty choices. + -- R2: payload.model is the caller-stable model_cfg.model (upvar), + -- so call_broker's fallback retry naturally credits the right + -- model — wrapper callers key by payload.model. + if doc.usage then + final_usage = { + prompt_tokens = doc.usage.prompt_tokens or 0, + completion_tokens = doc.usage.completion_tokens or 0, + total_tokens = doc.usage.total_tokens or 0, + cost = doc.usage.cost, -- nil for local (R6 preserves nil) + model = model_cfg.model, -- caller-stable per B4/R2 + category = opts.category or "main", + } + -- Don't emit yet; fired after curl.post_sse returns. + end local choice = doc.choices and doc.choices[1] local delta = choice and choice.delta @@ -149,25 +196,37 @@ function M.chat_stream(model_cfg, messages, on_delta, opts) local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms) if api_err then return nil, "api: " .. api_err end if not ok then return nil, "transport: " .. tostring(err) end + -- Phase 7 (B5): emit captured usage AFTER stream completes, as the + -- last event in stream order. Skipped on transport/api errors (the + -- accumulator stays unchanged for the failed call). + if final_usage then on_delta("usage", final_usage) end return true end -- Send a /v1/chat/completions request and return the full assistant text. -- Thin buffering wrapper over M.chat_stream — same path as the streaming -- consumer, so the broker keeps one HTTP shape (stream:true always). --- M.chat's external contract is unchanged in Phase 2 (C1 in review): --- it returns the text string and silently ignores any tool_call kinds --- the model might emit (no caller of M.chat passes opts.tools). +-- M.chat's external contract widens in Phase 7 (R1): now returns +-- (text, usage). Existing callers that ignore the second value continue +-- to work — Lua silently drops extra return values. Callers that want +-- cost/usage tracking do `local r, u = broker.chat(...)` and route u +-- to ctx:add_usage via the central _record_usage helper. +-- Tool-call kinds are still silently ignored (no caller of M.chat +-- passes opts.tools). -- Returns: --- assistant_content_string on success +-- text, usage on success (usage may be nil if +-- the provider didn't emit one) -- nil, errmsg on transport / decode / API failure function M.chat(model_cfg, messages, opts) local parts = {} + local captured_usage -- R1: required so callers see usage local ok, err = M.chat_stream(model_cfg, messages, function(kind, payload) - if kind == "text" then parts[#parts + 1] = payload end + if kind == "text" then parts[#parts + 1] = payload + elseif kind == "usage" then captured_usage = payload + end end, opts) if not ok then return nil, err end - return table.concat(parts) + return table.concat(parts), captured_usage end return M