diff --git a/broker.lua b/broker.lua index a79a60b..663b34c 100644 --- a/broker.lua +++ b/broker.lua @@ -1,7 +1,9 @@ -- broker.lua — llama.cpp HTTP client. --- Phase 0: blocking POST via ffi/curl + vendored dkjson; the response is read --- to completion and returned as a Lua string. SSE streaming wired in Phase 1. --- See docs/PHASE0.md §6. +-- Phase 0: blocking POST via ffi/curl + vendored dkjson. +-- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder +-- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the +-- one streaming path covers both incremental and sync callers. +-- See docs/PHASE0.md §6 and docs/PHASE1.md §3. local curl = require("ffi.curl") local json = require("dkjson") @@ -19,51 +21,72 @@ local function build_headers(model_cfg) return h end --- Send a /v1/chat/completions request. --- model_cfg : entry from config.models — { endpoint, model, temperature, --- [key_env], [timeout_ms] } --- messages : ordered list of { role = ..., content = ... }, system --- prompt already prepended (context:to_messages handles that). +local function build_request(model_cfg, messages, stream) + if not (model_cfg and model_cfg.endpoint and model_cfg.model) then + return nil, "broker: model_cfg.endpoint and .model are required" + end + local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" + local body = json.encode({ + model = model_cfg.model, + messages = messages, + stream = stream and true or false, + temperature = model_cfg.temperature or 0.2, + }) + return url, body, build_headers(model_cfg), (model_cfg.timeout_ms or 60000) +end + +-- Streaming /v1/chat/completions. on_delta(content_string) is called once per +-- received delta. Returns: +-- true stream ended cleanly +-- nil, errmsg transport / API failure +function M.chat_stream(model_cfg, messages, on_delta) + local url, body, headers, timeout_ms = build_request(model_cfg, messages, true) + if not url then return nil, body end -- url slot carries err on bad cfg + + local done = false + local api_err + + local function on_event(data) + if done then return end + if data == "[DONE]" then done = true; return end + local doc = json.decode(data) + if not doc then return end -- ignore unparseable events + -- Some servers emit an SSE-framed error envelope at the start of the + -- stream — surface it. + if doc.error then + local m = (type(doc.error) == "table" and doc.error.message) + or tostring(doc.error) + api_err = m + done = true + return + end + local choice = doc.choices and doc.choices[1] + local delta = choice and choice.delta + local content = delta and delta.content + if type(content) == "string" and #content > 0 then + on_delta(content) + end + end + + local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms) + if api_err then return nil, "api: " .. api_err end + if not ok then return nil, "transport: " .. tostring(err) end + return true +end + +-- Send a /v1/chat/completions request and return the full assistant text. +-- Thin buffering wrapper over M.chat_stream — same path as the streaming +-- consumer, so the broker keeps one HTTP shape (stream:true always). -- Returns: -- assistant_content_string on success -- nil, errmsg on transport / decode / API failure function M.chat(model_cfg, messages) - if not (model_cfg and model_cfg.endpoint and model_cfg.model) then - return nil, "broker.chat: model_cfg.endpoint and .model are required" - end - - local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" - local body = json.encode({ - model = model_cfg.model, - messages = messages, - stream = false, - temperature = model_cfg.temperature or 0.2, - }) - local headers = build_headers(model_cfg) - local timeout_ms = model_cfg.timeout_ms or 60000 - - local resp, err = curl.post(url, body, headers, timeout_ms) - if not resp then return nil, "transport: " .. tostring(err) end - - local doc, _, derr = json.decode(resp) - if not doc then - return nil, "decode: " .. tostring(derr) .. " (raw: " .. resp:sub(1, 200) .. ")" - end - - -- OpenAI-style error envelope: { error: { message, type, ... } } - if doc.error then - local m = (type(doc.error) == "table" and doc.error.message) or tostring(doc.error) - return nil, "api: " .. m - end - - local choice = doc.choices and doc.choices[1] - local msg = choice and choice.message - local content = msg and msg.content - if type(content) ~= "string" then - return nil, "broker.chat: no choices[1].message.content in response" - end - - return content + local parts = {} + local ok, err = M.chat_stream(model_cfg, messages, function(d) + parts[#parts + 1] = d + end) + if not ok then return nil, err end + return table.concat(parts) end return M