-- broker.lua — llama.cpp HTTP client. -- Phase 0: blocking POST via ffi/curl + vendored dkjson. -- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder -- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the -- one streaming path covers both incremental and sync callers. -- See docs/PHASE0.md §6 and docs/PHASE1.md §3. local curl = require("ffi.curl") local json = require("dkjson") local M = {} local function build_headers(model_cfg) local h = { "Content-Type: application/json" } if model_cfg.key_env then local key = os.getenv(model_cfg.key_env) if key and key ~= "" then h[#h + 1] = "Authorization: Bearer " .. key end end return h end local function build_request(model_cfg, messages, stream) if not (model_cfg and model_cfg.endpoint and model_cfg.model) then return nil, "broker: model_cfg.endpoint and .model are required" end local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions" local body = json.encode({ model = model_cfg.model, messages = messages, stream = stream and true or false, temperature = model_cfg.temperature or 0.2, }) return url, body, build_headers(model_cfg), (model_cfg.timeout_ms or 60000) end -- Streaming /v1/chat/completions. on_delta(content_string) is called once per -- received delta. Returns: -- true stream ended cleanly -- nil, errmsg transport / API failure function M.chat_stream(model_cfg, messages, on_delta) local url, body, headers, timeout_ms = build_request(model_cfg, messages, true) if not url then return nil, body end -- url slot carries err on bad cfg local done = false local api_err local function on_event(data) if done then return end if data == "[DONE]" then done = true; return end local doc = json.decode(data) if not doc then return end -- ignore unparseable events -- Some servers emit an SSE-framed error envelope at the start of the -- stream — surface it. if doc.error then local m = (type(doc.error) == "table" and doc.error.message) or tostring(doc.error) api_err = m done = true return end local choice = doc.choices and doc.choices[1] local delta = choice and choice.delta local content = delta and delta.content if type(content) == "string" and #content > 0 then on_delta(content) end end local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms) if api_err then return nil, "api: " .. api_err end if not ok then return nil, "transport: " .. tostring(err) end return true end -- Send a /v1/chat/completions request and return the full assistant text. -- Thin buffering wrapper over M.chat_stream — same path as the streaming -- consumer, so the broker keeps one HTTP shape (stream:true always). -- Returns: -- assistant_content_string on success -- nil, errmsg on transport / decode / API failure function M.chat(model_cfg, messages) local parts = {} local ok, err = M.chat_stream(model_cfg, messages, function(d) parts[#parts + 1] = d end) if not ok then return nil, err end return table.concat(parts) end return M