diff --git a/ffi/curl.lua b/ffi/curl.lua index f0424f5..682daf8 100644 --- a/ffi/curl.lua +++ b/ffi/curl.lua @@ -1,7 +1,10 @@ -- ffi/curl.lua — libcurl easy interface binding. -- Phase 0: blocking POST with header list and response capture into Lua string. --- Phase 1: SSE streaming via streaming WRITEFUNCTION (this same callback hook). --- See docs/PHASE0.md §6. +-- Phase 1: M.post_sse for incremental Server-Sent-Events streaming. Reuses the +-- same WRITEFUNCTION hook; parses `data: ...\n\n` events out of the chunk +-- stream and invokes the caller's on_event(data) per event. JSON decode and +-- OpenAI-shape interpretation stay in broker.lua (this module is HTTP-only). +-- See docs/PHASE0.md §6 and docs/PHASE1.md §4. local ffi = require("ffi") @@ -51,6 +54,7 @@ local OPT = { NOSIGNAL = 99, TIMEOUT_MS = 155, USERAGENT = 10018, + FAILONERROR = 45, } -- Variadic FFI calls demand explicit per-argument types. Pre-cast setopt to @@ -112,4 +116,101 @@ function M.post(url, body, headers, timeout_ms) return nil, err end +-- POST `body` to `url` with `headers`, streaming Server-Sent-Events back. +-- For each complete `data: ...\n\n` event, `on_event(data_string)` is invoked +-- synchronously from within the WRITEFUNCTION callback. The caller decides +-- what to do with the payload (broker.lua decodes JSON, extracts the OpenAI +-- delta.content). `[DONE]` sentinels and `:` comment lines are passed +-- through as-is to on_event (broker filters them). +-- Returns: +-- true stream completed successfully (HTTP 2xx, perform OK) +-- nil, errmsg libcurl failure (non-zero CURLcode); FAILONERROR is set +-- so non-2xx surfaces as a transport error rather than a +-- silent garbage-into-the-parser scenario. +function M.post_sse(url, body, headers, on_event, timeout_ms) + local handle = C.curl_easy_init() + if handle == nil then return nil, "curl_easy_init returned NULL" end + + -- SSE parse state: buffer holds incomplete tail between callback deliveries. + local buffer = "" + + local write_cb = ffi.cast( + "size_t(*)(char*, size_t, size_t, void*)", + function(ptr, size, nmemb, _) + local n = tonumber(size) * tonumber(nmemb) + buffer = buffer .. ffi.string(ptr, n) + + -- Drain complete events (terminated by \n\n). + while true do + local b = buffer:find("\n\n", 1, true) + if not b then break end + local event = buffer:sub(1, b - 1) + buffer = buffer:sub(b + 2) + + -- A single event can have multiple field lines; we only need + -- `data: ...` (joining multi-line data per the SSE spec). + local data_parts = {} + for line in (event .. "\n"):gmatch("([^\n]*)\n") do + if line:sub(1, 1) == ":" then + -- SSE keepalive comment; ignore. + elseif line:sub(1, 6) == "data: " then + data_parts[#data_parts + 1] = line:sub(7) + elseif line:sub(1, 5) == "data:" then + -- spec allows `data:` with no space + data_parts[#data_parts + 1] = line:sub(6) + end + end + if #data_parts > 0 then + on_event(table.concat(data_parts, "\n")) + end + end + + return n + end) + + local slist = nil + for _, h in ipairs(headers or {}) do + slist = C.curl_slist_append(slist, h) + end + + setopt_str (handle, OPT.URL, url) + setopt_long(handle, OPT.POST, 1) + setopt_str (handle, OPT.POSTFIELDS, body) + setopt_ptr (handle, OPT.HTTPHEADER, slist) + setopt_ptr (handle, OPT.WRITEFUNCTION, write_cb) + setopt_long(handle, OPT.NOSIGNAL, 1) + setopt_long(handle, OPT.FAILONERROR, 1) + setopt_str (handle, OPT.USERAGENT, "aish/0.0 (luajit-ffi)") + if timeout_ms then + setopt_long(handle, OPT.TIMEOUT_MS, timeout_ms) + end + + local rc = C.curl_easy_perform(handle) + local err + if rc ~= 0 then err = ffi.string(C.curl_easy_strerror(rc)) end + + -- End-of-stream flush: the final event may lack a trailing \n\n if the + -- server closed the connection right after writing the last data: line + -- (some llama.cpp builds, and any plain HTTP/1.0 close-on-EOF feed). + -- Parse any remaining buffer content as one last event. + if rc == 0 and #buffer > 0 then + local data_parts = {} + for line in (buffer .. "\n"):gmatch("([^\n]*)\n") do + if line:sub(1, 6) == "data: " then + data_parts[#data_parts + 1] = line:sub(7) + elseif line:sub(1, 5) == "data:" then + data_parts[#data_parts + 1] = line:sub(6) + end + end + if #data_parts > 0 then on_event(table.concat(data_parts, "\n")) end + end + + C.curl_easy_cleanup(handle) + if slist ~= nil then C.curl_slist_free_all(slist) end + write_cb:free() + + if rc == 0 then return true end + return nil, err +end + return M