diff --git a/broker.lua b/broker.lua index 663b34c..0e2602d 100644 --- a/broker.lua +++ b/broker.lua @@ -3,7 +3,13 @@ -- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder -- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the -- one streaming path covers both incremental and sync callers. --- See docs/PHASE0.md §6 and docs/PHASE1.md §3. +-- Phase 2: optional opts.tools array passed through to the request body +-- (omitted entirely when nil/empty per §12 risk row 1). The chat_stream +-- on_delta callback widens to (kind, payload) where kind is "text" or +-- "tool_call"; tool_call deltas are accumulated by `index` (default 0 if +-- absent per C2) and emitted as complete records on finish_reason "tool_calls". +-- broker.lua does NOT depend on mcp.lua — the caller assembles opts.tools +-- and passes it in. See docs/PHASE0.md §6, PHASE1.md §3, PHASE2.md §3 / §5. local curl = require("ffi.curl") local json = require("dkjson") @@ -21,30 +27,49 @@ local function build_headers(model_cfg) return h end -local function build_request(model_cfg, messages, stream) +local function build_request(model_cfg, messages, stream, tools) if not (model_cfg and model_cfg.endpoint and model_cfg.model) then return nil, "broker: model_cfg.endpoint and .model are required" end local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" - local body = json.encode({ + local req = { model = model_cfg.model, messages = messages, stream = stream and true or false, temperature = model_cfg.temperature or 0.2, - }) - return url, body, build_headers(model_cfg), (model_cfg.timeout_ms or 60000) + } + -- Per PHASE2.md §12 risk row "Empty tools array": some servers reject + -- "tools": []. Only set the field when the list has entries. + if tools and #tools > 0 then req.tools = tools end + return url, json.encode(req), build_headers(model_cfg), + (model_cfg.timeout_ms or 60000) end --- Streaming /v1/chat/completions. on_delta(content_string) is called once per --- received delta. Returns: +-- Streaming /v1/chat/completions. +-- Signature widens vs Phase 1: opts is optional and may carry .tools. +-- on_delta is called as on_delta(kind, payload): +-- on_delta("text", content_string) - per text chunk +-- on_delta("tool_call", { id, name, arguments }) - once per completed +-- tool call (on finish_reason "tool_calls"). +-- arguments is the raw JSON-string accumulated across +-- deltas; caller does the dkjson.decode. +-- Returns: -- true stream ended cleanly -- nil, errmsg transport / API failure -function M.chat_stream(model_cfg, messages, on_delta) - local url, body, headers, timeout_ms = build_request(model_cfg, messages, true) +function M.chat_stream(model_cfg, messages, on_delta, opts) + opts = opts or {} + local url, body, headers, timeout_ms = + build_request(model_cfg, messages, true, opts.tools) if not url then return nil, body end -- url slot carries err on bad cfg local done = false local api_err + -- Tool-call accumulator keyed by index. Each slot is filled across + -- many deltas: id+name come on the opener, arguments arrives as + -- character-fragment JSON-string chunks (PHASE2-baseline.md §4). + local tc_by_index = {} + local tc_index_order = {} -- preserve emission order + local index_absent_warned = false local function on_event(data) if done then return end @@ -62,9 +87,53 @@ function M.chat_stream(model_cfg, messages, on_delta) end local choice = doc.choices and doc.choices[1] local delta = choice and choice.delta + + -- Text path (unchanged from Phase 1 semantics; kind widened). local content = delta and delta.content if type(content) == "string" and #content > 0 then - on_delta(content) + on_delta("text", content) + end + + -- Tool-call accumulation (Phase 2). + local tcs = delta and delta.tool_calls + if type(tcs) == "table" then + for _, tc in ipairs(tcs) do + local idx = tc.index + if idx == nil then + idx = 0 + if not index_absent_warned then + index_absent_warned = true + -- One-shot debug status per stream; printed to + -- stderr so it doesn't interleave with renderer + -- stdout output. + io.stderr:write( + "[aish] broker: tool_calls[].index absent; assuming 0\n") + end + end + local slot = tc_by_index[idx] + if not slot then + slot = { id = nil, name = nil, arguments = "" } + tc_by_index[idx] = slot + tc_index_order[#tc_index_order + 1] = idx + end + if tc.id then slot.id = tc.id end + if tc["function"] then + local fn = tc["function"] + if fn.name then slot.name = fn.name end + if fn.arguments then + slot.arguments = slot.arguments .. fn.arguments + end + end + end + end + + -- On finish_reason "tool_calls", emit all accumulated calls. + if choice and choice.finish_reason == "tool_calls" then + for _, idx in ipairs(tc_index_order) do + on_delta("tool_call", tc_by_index[idx]) + end + tc_by_index = {} + tc_index_order = {} end end @@ -77,13 +146,16 @@ end -- Send a /v1/chat/completions request and return the full assistant text. -- Thin buffering wrapper over M.chat_stream — same path as the streaming -- consumer, so the broker keeps one HTTP shape (stream:true always). +-- M.chat's external contract is unchanged in Phase 2 (C1 in review): +-- it returns the text string and silently ignores any tool_call kinds +-- the model might emit (no caller of M.chat passes opts.tools). -- Returns: -- assistant_content_string on success -- nil, errmsg on transport / decode / API failure function M.chat(model_cfg, messages) local parts = {} - local ok, err = M.chat_stream(model_cfg, messages, function(d) - parts[#parts + 1] = d + local ok, err = M.chat_stream(model_cfg, messages, function(kind, payload) + if kind == "text" then parts[#parts + 1] = payload end end) if not ok then return nil, err end return table.concat(parts) diff --git a/repl.lua b/repl.lua index bed89bf..c312080 100644 --- a/repl.lua +++ b/repl.lua @@ -114,9 +114,15 @@ function M.run(config) local parts = {} local ok, err = broker.chat_stream(active_cfg, ctx:to_messages(), - function(delta) - parts[#parts + 1] = delta - renderer.assistant_delta(delta) + function(kind, payload) + -- Phase 2: callback shape widened to (kind, payload). + -- tool_call kinds are handled by the sub-loop landing in + -- commit #6; this commit ships only the text path so Phase + -- 1 streaming stays functional between #5 and #6. + if kind == "text" then + parts[#parts + 1] = payload + renderer.assistant_delta(payload) + end end) renderer.assistant_flush()