broker: chat_stream over post_sse; chat is now a buffering wrapper

Phase 1 streaming consumer per PHASE1.md §3.

  broker.chat_stream(model_cfg, messages, on_delta) -> true | (nil, err)
  broker.chat(model_cfg, messages)                  -> content | (nil, err)
                                                       (now a thin buffer over
                                                        chat_stream)

The HTTP shape unifies on stream:true. on_event from ffi/curl.post_sse
decodes each event's JSON, extracts choices[1].delta.content, and calls
on_delta(content) for non-empty string deltas. The `[DONE]` sentinel is
filtered. SSE-framed error envelopes ({"error":{"message":...}} arriving
as data:) surface as "api: ..." errors.

build_request is factored out so chat_stream and (future) any
non-streaming consumer share URL/body/header construction.

Live verification against hossenfelder fast preset:
  - chat_stream("Count one to five..."): 9 incremental deltas streamed
    token-by-token, assembled to "1 2 3 4 5"
  - chat("Reply with exactly: pong"): "pong" returned via buffer

Error envelope path is correct by inspection but not exercised live —
hossenfelder passes through bogus model names rather than rejecting.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-05-10 19:16:07 +00:00
parent 2e36381576
commit e46a5c385d
+67 -44
View File
@@ -1,7 +1,9 @@
-- broker.lua — llama.cpp HTTP client. -- broker.lua — llama.cpp HTTP client.
-- Phase 0: blocking POST via ffi/curl + vendored dkjson; the response is read -- Phase 0: blocking POST via ffi/curl + vendored dkjson.
-- to completion and returned as a Lua string. SSE streaming wired in Phase 1. -- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder
-- See docs/PHASE0.md §6. -- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the
-- one streaming path covers both incremental and sync callers.
-- See docs/PHASE0.md §6 and docs/PHASE1.md §3.
local curl = require("ffi.curl") local curl = require("ffi.curl")
local json = require("dkjson") local json = require("dkjson")
@@ -19,51 +21,72 @@ local function build_headers(model_cfg)
return h return h
end end
-- Send a /v1/chat/completions request. local function build_request(model_cfg, messages, stream)
-- model_cfg : entry from config.models — { endpoint, model, temperature, if not (model_cfg and model_cfg.endpoint and model_cfg.model) then
-- [key_env], [timeout_ms] } return nil, "broker: model_cfg.endpoint and .model are required"
-- messages : ordered list of { role = ..., content = ... }, system end
-- prompt already prepended (context:to_messages handles that). local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions"
local body = json.encode({
model = model_cfg.model,
messages = messages,
stream = stream and true or false,
temperature = model_cfg.temperature or 0.2,
})
return url, body, build_headers(model_cfg), (model_cfg.timeout_ms or 60000)
end
-- Streaming /v1/chat/completions. on_delta(content_string) is called once per
-- received delta. Returns:
-- true stream ended cleanly
-- nil, errmsg transport / API failure
function M.chat_stream(model_cfg, messages, on_delta)
local url, body, headers, timeout_ms = build_request(model_cfg, messages, true)
if not url then return nil, body end -- url slot carries err on bad cfg
local done = false
local api_err
local function on_event(data)
if done then return end
if data == "[DONE]" then done = true; return end
local doc = json.decode(data)
if not doc then return end -- ignore unparseable events
-- Some servers emit an SSE-framed error envelope at the start of the
-- stream — surface it.
if doc.error then
local m = (type(doc.error) == "table" and doc.error.message)
or tostring(doc.error)
api_err = m
done = true
return
end
local choice = doc.choices and doc.choices[1]
local delta = choice and choice.delta
local content = delta and delta.content
if type(content) == "string" and #content > 0 then
on_delta(content)
end
end
local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms)
if api_err then return nil, "api: " .. api_err end
if not ok then return nil, "transport: " .. tostring(err) end
return true
end
-- Send a /v1/chat/completions request and return the full assistant text.
-- Thin buffering wrapper over M.chat_stream — same path as the streaming
-- consumer, so the broker keeps one HTTP shape (stream:true always).
-- Returns: -- Returns:
-- assistant_content_string on success -- assistant_content_string on success
-- nil, errmsg on transport / decode / API failure -- nil, errmsg on transport / decode / API failure
function M.chat(model_cfg, messages) function M.chat(model_cfg, messages)
if not (model_cfg and model_cfg.endpoint and model_cfg.model) then local parts = {}
return nil, "broker.chat: model_cfg.endpoint and .model are required" local ok, err = M.chat_stream(model_cfg, messages, function(d)
end parts[#parts + 1] = d
end)
local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" if not ok then return nil, err end
local body = json.encode({ return table.concat(parts)
model = model_cfg.model,
messages = messages,
stream = false,
temperature = model_cfg.temperature or 0.2,
})
local headers = build_headers(model_cfg)
local timeout_ms = model_cfg.timeout_ms or 60000
local resp, err = curl.post(url, body, headers, timeout_ms)
if not resp then return nil, "transport: " .. tostring(err) end
local doc, _, derr = json.decode(resp)
if not doc then
return nil, "decode: " .. tostring(derr) .. " (raw: " .. resp:sub(1, 200) .. ")"
end
-- OpenAI-style error envelope: { error: { message, type, ... } }
if doc.error then
local m = (type(doc.error) == "table" and doc.error.message) or tostring(doc.error)
return nil, "api: " .. m
end
local choice = doc.choices and doc.choices[1]
local msg = choice and choice.message
local content = msg and msg.content
if type(content) ~= "string" then
return nil, "broker.chat: no choices[1].message.content in response"
end
return content
end end
return M return M