broker: opts.tools passthrough + streaming tool_call accumulator

Phase 2 commit #5 per docs/PHASE2.md §12. Streaming broker grows
tool-call support without taking a dependency on mcp.lua (caller
supplies the tools array — B5 from review).

chat_stream signature widens to (cfg, msgs, on_delta, opts):
  opts.tools  - optional array, passed to the request body as the
                OpenAI-shape tools field. OMITTED entirely when nil or
                empty (#tools == 0) — some servers reject "tools": [].

on_delta callback shape widens to (kind, payload):
  kind = "text",      payload = string         (Phase 1 path; unchanged
                                                semantics, signature
                                                changes from (delta) to
                                                ("text", delta))
  kind = "tool_call", payload = {id, name, arguments}
                                                emitted ONCE per call on
                                                finish_reason "tool_calls"
                                                after the streaming
                                                accumulator pulls
                                                fragmented JSON-string
                                                arguments together.

Accumulator behavior:
  - Keyed by delta.tool_calls[i].index.
  - If index is absent on a delta (some llama.cpp builds omit it on
    single-call streams; C2 in review), default to 0 with a one-shot
    stderr debug status per stream.
  - id and name captured from the opening delta of each slot.
  - function.arguments concatenated across all deltas as the raw
    JSON-string; caller (repl.lua / future Phase 2 commit #6) does
    dkjson.decode.
  - On finish_reason "tool_calls" the accumulator emits all collected
    calls in index order and resets.

M.chat external contract unchanged (C1): wrapper now uses the new
(kind, payload) shape internally but exposes the same text-string
return. No caller of M.chat passes opts.tools so tool_call kinds are
silently dropped.

repl.lua minimal companion edit: ask_ai's chat_stream callback updated
to the new shape. Text path unchanged; tool_call kinds are no-op
placeholders until commit #6 lands the sub-loop. Keeps Phase 1 streaming
functional between #5 and #6.

Smoke-tested against hossenfelder/8082 (post-#23 fix):
  - text-only: ok=true, kind="text" deltas received
  - with opts.tools: model emitted one tool_call,
    accumulator collected id + name=get_weather + args={"city":"Paris"}
    correctly across fragmented deltas
  - opts.tools={}: server accepted (field omitted as required)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-12 14:20:32 +00:00
parent c736d0e129
commit efdc7281c7
2 changed files with 93 additions and 15 deletions
+84 -12
View File
@@ -3,7 +3,13 @@
-- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder -- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder
-- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the -- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the
-- one streaming path covers both incremental and sync callers. -- one streaming path covers both incremental and sync callers.
-- See docs/PHASE0.md §6 and docs/PHASE1.md §3. -- Phase 2: optional opts.tools array passed through to the request body
-- (omitted entirely when nil/empty per §12 risk row 1). The chat_stream
-- on_delta callback widens to (kind, payload) where kind is "text" or
-- "tool_call"; tool_call deltas are accumulated by `index` (default 0 if
-- absent per C2) and emitted as complete records on finish_reason "tool_calls".
-- broker.lua does NOT depend on mcp.lua — the caller assembles opts.tools
-- and passes it in. See docs/PHASE0.md §6, PHASE1.md §3, PHASE2.md §3 / §5.
local curl = require("ffi.curl") local curl = require("ffi.curl")
local json = require("dkjson") local json = require("dkjson")
@@ -21,30 +27,49 @@ local function build_headers(model_cfg)
return h return h
end end
local function build_request(model_cfg, messages, stream) local function build_request(model_cfg, messages, stream, tools)
if not (model_cfg and model_cfg.endpoint and model_cfg.model) then if not (model_cfg and model_cfg.endpoint and model_cfg.model) then
return nil, "broker: model_cfg.endpoint and .model are required" return nil, "broker: model_cfg.endpoint and .model are required"
end end
local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions"
local body = json.encode({ local req = {
model = model_cfg.model, model = model_cfg.model,
messages = messages, messages = messages,
stream = stream and true or false, stream = stream and true or false,
temperature = model_cfg.temperature or 0.2, temperature = model_cfg.temperature or 0.2,
}) }
return url, body, build_headers(model_cfg), (model_cfg.timeout_ms or 60000) -- Per PHASE2.md §12 risk row "Empty tools array": some servers reject
-- "tools": []. Only set the field when the list has entries.
if tools and #tools > 0 then req.tools = tools end
return url, json.encode(req), build_headers(model_cfg),
(model_cfg.timeout_ms or 60000)
end end
-- Streaming /v1/chat/completions. on_delta(content_string) is called once per -- Streaming /v1/chat/completions.
-- received delta. Returns: -- Signature widens vs Phase 1: opts is optional and may carry .tools.
-- on_delta is called as on_delta(kind, payload):
-- on_delta("text", content_string) - per text chunk
-- on_delta("tool_call", { id, name, arguments }) - once per completed
-- tool call (on finish_reason "tool_calls").
-- arguments is the raw JSON-string accumulated across
-- deltas; caller does the dkjson.decode.
-- Returns:
-- true stream ended cleanly -- true stream ended cleanly
-- nil, errmsg transport / API failure -- nil, errmsg transport / API failure
function M.chat_stream(model_cfg, messages, on_delta) function M.chat_stream(model_cfg, messages, on_delta, opts)
local url, body, headers, timeout_ms = build_request(model_cfg, messages, true) opts = opts or {}
local url, body, headers, timeout_ms =
build_request(model_cfg, messages, true, opts.tools)
if not url then return nil, body end -- url slot carries err on bad cfg if not url then return nil, body end -- url slot carries err on bad cfg
local done = false local done = false
local api_err local api_err
-- Tool-call accumulator keyed by index. Each slot is filled across
-- many deltas: id+name come on the opener, arguments arrives as
-- character-fragment JSON-string chunks (PHASE2-baseline.md §4).
local tc_by_index = {}
local tc_index_order = {} -- preserve emission order
local index_absent_warned = false
local function on_event(data) local function on_event(data)
if done then return end if done then return end
@@ -62,9 +87,53 @@ function M.chat_stream(model_cfg, messages, on_delta)
end end
local choice = doc.choices and doc.choices[1] local choice = doc.choices and doc.choices[1]
local delta = choice and choice.delta local delta = choice and choice.delta
-- Text path (unchanged from Phase 1 semantics; kind widened).
local content = delta and delta.content local content = delta and delta.content
if type(content) == "string" and #content > 0 then if type(content) == "string" and #content > 0 then
on_delta(content) on_delta("text", content)
end
-- Tool-call accumulation (Phase 2).
local tcs = delta and delta.tool_calls
if type(tcs) == "table" then
for _, tc in ipairs(tcs) do
local idx = tc.index
if idx == nil then
idx = 0
if not index_absent_warned then
index_absent_warned = true
-- One-shot debug status per stream; printed to
-- stderr so it doesn't interleave with renderer
-- stdout output.
io.stderr:write(
"[aish] broker: tool_calls[].index absent; assuming 0\n")
end
end
local slot = tc_by_index[idx]
if not slot then
slot = { id = nil, name = nil, arguments = "" }
tc_by_index[idx] = slot
tc_index_order[#tc_index_order + 1] = idx
end
if tc.id then slot.id = tc.id end
if tc["function"] then
local fn = tc["function"]
if fn.name then slot.name = fn.name end
if fn.arguments then
slot.arguments = slot.arguments .. fn.arguments
end
end
end
end
-- On finish_reason "tool_calls", emit all accumulated calls.
if choice and choice.finish_reason == "tool_calls" then
for _, idx in ipairs(tc_index_order) do
on_delta("tool_call", tc_by_index[idx])
end
tc_by_index = {}
tc_index_order = {}
end end
end end
@@ -77,13 +146,16 @@ end
-- Send a /v1/chat/completions request and return the full assistant text. -- Send a /v1/chat/completions request and return the full assistant text.
-- Thin buffering wrapper over M.chat_stream — same path as the streaming -- Thin buffering wrapper over M.chat_stream — same path as the streaming
-- consumer, so the broker keeps one HTTP shape (stream:true always). -- consumer, so the broker keeps one HTTP shape (stream:true always).
-- M.chat's external contract is unchanged in Phase 2 (C1 in review):
-- it returns the text string and silently ignores any tool_call kinds
-- the model might emit (no caller of M.chat passes opts.tools).
-- Returns: -- Returns:
-- assistant_content_string on success -- assistant_content_string on success
-- nil, errmsg on transport / decode / API failure -- nil, errmsg on transport / decode / API failure
function M.chat(model_cfg, messages) function M.chat(model_cfg, messages)
local parts = {} local parts = {}
local ok, err = M.chat_stream(model_cfg, messages, function(d) local ok, err = M.chat_stream(model_cfg, messages, function(kind, payload)
parts[#parts + 1] = d if kind == "text" then parts[#parts + 1] = payload end
end) end)
if not ok then return nil, err end if not ok then return nil, err end
return table.concat(parts) return table.concat(parts)
+9 -3
View File
@@ -114,9 +114,15 @@ function M.run(config)
local parts = {} local parts = {}
local ok, err = broker.chat_stream(active_cfg, ctx:to_messages(), local ok, err = broker.chat_stream(active_cfg, ctx:to_messages(),
function(delta) function(kind, payload)
parts[#parts + 1] = delta -- Phase 2: callback shape widened to (kind, payload).
renderer.assistant_delta(delta) -- tool_call kinds are handled by the sub-loop landing in
-- commit #6; this commit ships only the text path so Phase
-- 1 streaming stays functional between #5 and #6.
if kind == "text" then
parts[#parts + 1] = payload
renderer.assistant_delta(payload)
end
end) end)
renderer.assistant_flush() renderer.assistant_flush()