ffi/curl: SSE streaming via post_sse — incremental data: events
Phase 1 streaming substrate per PHASE1.md §4.
curl.post_sse(url, body, headers, on_event, timeout_ms)
-> true | (nil, errmsg)
Reuses the Phase 0 WRITEFUNCTION hook. Each chunk delivery accumulates
into a per-request buffer; the buffer is drained for complete events
(\n\n-terminated). Each event's `data: ...` field(s) are joined per the
SSE spec and passed to on_event(data_string) synchronously. `:` comment
lines (keepalives) are filtered.
The `[DONE]` sentinel is passed through to on_event as-is (broker.lua
filters it — this module stays HTTP-layer only, no JSON / OpenAI shape
knowledge).
Two robustness items:
- End-of-stream flush: the final event may lack \n\n if the server
closes-on-EOF immediately after the last data: line (some llama.cpp
builds, plain HTTP/1.0 close-on-EOF feeds). Post-perform, any
remaining buffer is parsed as one last event.
- FAILONERROR: a non-2xx response surfaces as a CURLcode error rather
than silently feeding the error body into the SSE parser.
Smoke:
[1] canned events via nc listener: 3 events parsed in order
[2] chunk-split mid-event ("Hel" + sleep + "lo..."): correctly
reassembled across two WRITEFUNCTION deliveries
[3] LIVE against hossenfelder.fritz.box:8082 fast preset with
stream:true: response "pong" assembled from incremental deltas;
4 raw events (role + 1 content + finish_reason + [DONE])
Next: broker.lua chat_stream that decodes the OpenAI delta shape on
top of this and exposes on_delta(content_string) for renderer streaming.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
+103
-2
@@ -1,7 +1,10 @@
|
||||
-- ffi/curl.lua — libcurl easy interface binding.
|
||||
-- Phase 0: blocking POST with header list and response capture into Lua string.
|
||||
-- Phase 1: SSE streaming via streaming WRITEFUNCTION (this same callback hook).
|
||||
-- See docs/PHASE0.md §6.
|
||||
-- Phase 1: M.post_sse for incremental Server-Sent-Events streaming. Reuses the
|
||||
-- same WRITEFUNCTION hook; parses `data: ...\n\n` events out of the chunk
|
||||
-- stream and invokes the caller's on_event(data) per event. JSON decode and
|
||||
-- OpenAI-shape interpretation stay in broker.lua (this module is HTTP-only).
|
||||
-- See docs/PHASE0.md §6 and docs/PHASE1.md §4.
|
||||
|
||||
local ffi = require("ffi")
|
||||
|
||||
@@ -51,6 +54,7 @@ local OPT = {
|
||||
NOSIGNAL = 99,
|
||||
TIMEOUT_MS = 155,
|
||||
USERAGENT = 10018,
|
||||
FAILONERROR = 45,
|
||||
}
|
||||
|
||||
-- Variadic FFI calls demand explicit per-argument types. Pre-cast setopt to
|
||||
@@ -112,4 +116,101 @@ function M.post(url, body, headers, timeout_ms)
|
||||
return nil, err
|
||||
end
|
||||
|
||||
-- POST `body` to `url` with `headers`, streaming Server-Sent-Events back.
|
||||
-- For each complete `data: ...\n\n` event, `on_event(data_string)` is invoked
|
||||
-- synchronously from within the WRITEFUNCTION callback. The caller decides
|
||||
-- what to do with the payload (broker.lua decodes JSON, extracts the OpenAI
|
||||
-- delta.content). `[DONE]` sentinels and `:` comment lines are passed
|
||||
-- through as-is to on_event (broker filters them).
|
||||
-- Returns:
|
||||
-- true stream completed successfully (HTTP 2xx, perform OK)
|
||||
-- nil, errmsg libcurl failure (non-zero CURLcode); FAILONERROR is set
|
||||
-- so non-2xx surfaces as a transport error rather than a
|
||||
-- silent garbage-into-the-parser scenario.
|
||||
function M.post_sse(url, body, headers, on_event, timeout_ms)
|
||||
local handle = C.curl_easy_init()
|
||||
if handle == nil then return nil, "curl_easy_init returned NULL" end
|
||||
|
||||
-- SSE parse state: buffer holds incomplete tail between callback deliveries.
|
||||
local buffer = ""
|
||||
|
||||
local write_cb = ffi.cast(
|
||||
"size_t(*)(char*, size_t, size_t, void*)",
|
||||
function(ptr, size, nmemb, _)
|
||||
local n = tonumber(size) * tonumber(nmemb)
|
||||
buffer = buffer .. ffi.string(ptr, n)
|
||||
|
||||
-- Drain complete events (terminated by \n\n).
|
||||
while true do
|
||||
local b = buffer:find("\n\n", 1, true)
|
||||
if not b then break end
|
||||
local event = buffer:sub(1, b - 1)
|
||||
buffer = buffer:sub(b + 2)
|
||||
|
||||
-- A single event can have multiple field lines; we only need
|
||||
-- `data: ...` (joining multi-line data per the SSE spec).
|
||||
local data_parts = {}
|
||||
for line in (event .. "\n"):gmatch("([^\n]*)\n") do
|
||||
if line:sub(1, 1) == ":" then
|
||||
-- SSE keepalive comment; ignore.
|
||||
elseif line:sub(1, 6) == "data: " then
|
||||
data_parts[#data_parts + 1] = line:sub(7)
|
||||
elseif line:sub(1, 5) == "data:" then
|
||||
-- spec allows `data:` with no space
|
||||
data_parts[#data_parts + 1] = line:sub(6)
|
||||
end
|
||||
end
|
||||
if #data_parts > 0 then
|
||||
on_event(table.concat(data_parts, "\n"))
|
||||
end
|
||||
end
|
||||
|
||||
return n
|
||||
end)
|
||||
|
||||
local slist = nil
|
||||
for _, h in ipairs(headers or {}) do
|
||||
slist = C.curl_slist_append(slist, h)
|
||||
end
|
||||
|
||||
setopt_str (handle, OPT.URL, url)
|
||||
setopt_long(handle, OPT.POST, 1)
|
||||
setopt_str (handle, OPT.POSTFIELDS, body)
|
||||
setopt_ptr (handle, OPT.HTTPHEADER, slist)
|
||||
setopt_ptr (handle, OPT.WRITEFUNCTION, write_cb)
|
||||
setopt_long(handle, OPT.NOSIGNAL, 1)
|
||||
setopt_long(handle, OPT.FAILONERROR, 1)
|
||||
setopt_str (handle, OPT.USERAGENT, "aish/0.0 (luajit-ffi)")
|
||||
if timeout_ms then
|
||||
setopt_long(handle, OPT.TIMEOUT_MS, timeout_ms)
|
||||
end
|
||||
|
||||
local rc = C.curl_easy_perform(handle)
|
||||
local err
|
||||
if rc ~= 0 then err = ffi.string(C.curl_easy_strerror(rc)) end
|
||||
|
||||
-- End-of-stream flush: the final event may lack a trailing \n\n if the
|
||||
-- server closed the connection right after writing the last data: line
|
||||
-- (some llama.cpp builds, and any plain HTTP/1.0 close-on-EOF feed).
|
||||
-- Parse any remaining buffer content as one last event.
|
||||
if rc == 0 and #buffer > 0 then
|
||||
local data_parts = {}
|
||||
for line in (buffer .. "\n"):gmatch("([^\n]*)\n") do
|
||||
if line:sub(1, 6) == "data: " then
|
||||
data_parts[#data_parts + 1] = line:sub(7)
|
||||
elseif line:sub(1, 5) == "data:" then
|
||||
data_parts[#data_parts + 1] = line:sub(6)
|
||||
end
|
||||
end
|
||||
if #data_parts > 0 then on_event(table.concat(data_parts, "\n")) end
|
||||
end
|
||||
|
||||
C.curl_easy_cleanup(handle)
|
||||
if slist ~= nil then C.curl_slist_free_all(slist) end
|
||||
write_cb:free()
|
||||
|
||||
if rc == 0 then return true end
|
||||
return nil, err
|
||||
end
|
||||
|
||||
return M
|
||||
|
||||
Reference in New Issue
Block a user