diff --git a/lmcp.lua b/lmcp.lua index 8dc7075..90a31e5 100644 --- a/lmcp.lua +++ b/lmcp.lua @@ -69,6 +69,11 @@ function lmcp.new(name, opts) -- server calls `:roots(session_id, ...)`; invalidated when the client -- sends notifications/roots/list_changed. self._roots_cache = {} + -- Pending handler coroutines (issue #20 — concurrent dispatch). + -- Each entry: { co, conn, wake_at, finalise }. The scheduler tick + -- resumes any whose wake_at has passed and runs `finalise` on the + -- coroutine's return value to build the deferred response. + self._pending_handlers = {} -- Notification queue: drained by Streamable HTTP transport (issue #16). -- Today delivery is a no-op; we still enqueue so the emission code -- path is exercised. Capped + deduped to keep the queue useful. @@ -887,6 +892,10 @@ end -- ---- Dispatch a fully-parsed POST body ---- +-- Forward declarations: used by _dispatch_post, defined below. +local _drive_handler_co +local _finalise_dispatch + local function _dispatch_post(self, conn) local body = conn.body if body == "" then @@ -943,27 +952,73 @@ local function _dispatch_post(self, conn) -- session to push the request onto). rpc_req._session_id = sess.id - -- Normal client request / notification. Dispatch via handle_request. - local response = self:handle_request(rpc_req) - if not response then + -- Concurrent handler dispatch (issue #20). Wrap the dispatch call in + -- a coroutine so any tool handler that goes through server.lua:run() + -- (which yields when polling its sentinel file) can return control to + -- the event loop while it waits. Other connections continue making + -- progress. + -- + -- The coroutine resumes itself synchronously the first time. If it + -- completes without yielding (pure-Lua handlers, ping, etc.) the + -- response is built inline as before. If it yields, we park it in + -- self._pending_handlers and return nil — the conn enters + -- dispatching_async, the scheduler tick resumes when wake_at passes. + local co = coroutine.create(function() + return self:handle_request(rpc_req) + end) + return _drive_handler_co(self, conn, co) +end + +-- Resume a handler coroutine until it completes or yields. On completion, +-- build the deferred HTTP response (preserving the Accept-aware shape). +-- On yield, register in self._pending_handlers and return nil — the conn +-- is parked in dispatching_async until the scheduler resumes it. +_drive_handler_co = function(self, conn, co) + local rok, ryield = coroutine.resume(co) + if coroutine.status(co) == "dead" then + return _finalise_dispatch(conn, rok, ryield) + end + -- Suspended. Parse the yield payload. + local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0 + self._pending_handlers[#self._pending_handlers + 1] = { + co = co, conn = conn, wake_at = wake_at, + } + conn.state = "dispatching_async" + return nil -- no write_buf change; conn parks +end + +-- Build the HTTP response for a completed dispatch. `rok` is the coroutine.resume +-- success flag; `result` is the handler/dispatch return (a JSON-RPC string when +-- rok=true; an error message when rok=false). Used by both the sync path +-- (_dispatch_post tail) and the async resume path (_scheduler_tick). +_finalise_dispatch = function(conn, rok, result) + local session_id = conn.session_id + if not rok then + -- Internal dispatch error — surface as a JSON-RPC error response. + return _build_http_response("500 Internal Server Error", + { ["Content-Type"] = "application/json", + ["Access-Control-Allow-Origin"] = "*" }, + jsonrpc_error(nil, -32603, "Internal error: " .. tostring(result)), + session_id) + end + if not result then -- Notification → 202 Accepted, no body. return _build_http_response("202 Accepted", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, - "", conn.session_id) + "", session_id) end - - -- If client accepts SSE, respond as a single-event SSE stream. - -- Otherwise plain JSON body. + -- Accept-aware response shape (re-checked at finalise time; survives + -- parking because conn.headers is captured by the closure scope). local accept = conn.headers["accept"] or "" if accept:find("text/event%-stream") then - local hdrs = _build_sse_headers(conn.session_id) - return hdrs .. _sse_event(response) + local hdrs = _build_sse_headers(session_id) + return hdrs .. _sse_event(result) end return _build_http_response("200 OK", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, - response, conn.session_id) + result, session_id) end local function _dispatch_options(conn) @@ -1119,8 +1174,15 @@ local function _conn_read(self, conn) conn.state = "sse_open" conn.last_heart = os.time() elseif conn.method == "POST" then - conn.write_buf = _dispatch_post(self, conn) - conn.state = "writing" + -- _dispatch_post may return nil (issue #20) if the handler + -- coroutine yielded. In that case it set conn.state = + -- "dispatching_async" itself and parked the coroutine. + local resp = _dispatch_post(self, conn) + if resp then + conn.write_buf = resp + conn.state = "writing" + end + -- else: conn already parked; scheduler tick will finalise. else conn.write_buf = _build_http_response("405 Method Not Allowed", { ["Content-Type"] = "text/plain", @@ -1195,6 +1257,63 @@ local function _heartbeat_tick(self) end end +-- Issue #20 — scheduler tick. Resume any parked dispatch coroutine whose +-- wake_at has passed. On completion, build the deferred response and +-- queue it for write. If the connection died while the handler was +-- parked, drop the coroutine. +-- +-- gettime() is wall-clock (luasocket uses gettimeofday) — NOT monotonic. +-- A large NTP step backwards could delay resumes; forwards could bunch +-- them. Acceptable for the deployment fleet (chrony slews); revisit if +-- a use case appears that needs CLOCK_MONOTONIC. +local function _scheduler_tick(self) + if not self._pending_handlers[1] then return end + local socket = require("socket") + local now = socket.gettime() + local i = 1 + while i <= #self._pending_handlers do + local p = self._pending_handlers[i] + if p.conn.state == "closing" then + -- Connection died mid-handler; drop the coroutine entirely. + table.remove(self._pending_handlers, i) + elseif now >= p.wake_at then + -- Time to resume. Remove from pending BEFORE resume so a + -- re-yielding handler re-adds itself cleanly via _drive_handler_co. + table.remove(self._pending_handlers, i) + local rok, ryield = coroutine.resume(p.co) + if coroutine.status(p.co) == "dead" then + p.conn.write_buf = (p.conn.write_buf or "") + .. _finalise_dispatch(p.conn, rok, ryield) + p.conn.state = "writing" + else + -- Yielded again — re-park. + local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0 + self._pending_handlers[#self._pending_handlers + 1] = { + co = p.co, conn = p.conn, wake_at = wake_at, + } + end + else + i = i + 1 + end + end +end + +-- Returns the earliest pending wake_at as an offset from now, or nil if +-- no handlers are parked. Used to tighten the select() timeout so the +-- scheduler wakes on the right beat. +local function _next_pending_delay(self) + if not self._pending_handlers[1] then return nil end + local socket = require("socket") + local now = socket.gettime() + local earliest = math.huge + for _, p in ipairs(self._pending_handlers) do + if p.wake_at < earliest then earliest = p.wake_at end + end + local d = earliest - now + if d < 0 then return 0 end + return d +end + -- ---- Public: server-initiated request (for sampling/roots/etc.) ---- -- Enqueues a JSON-RPC request on the session's SSE stream. The callback -- fires when the client POSTs back the response (matched by id). @@ -1322,7 +1441,14 @@ function lmcp:run() end end - local ready_r, ready_w = socket.select(reads, writes, SELECT_TIMEOUT) + -- Tighten select timeout if a parked handler is due sooner. + -- Otherwise a 100ms tick adds 100ms latency to short shell-tool runs. + local select_timeout = SELECT_TIMEOUT + local next_pend = _next_pending_delay(self) + if next_pend and next_pend < select_timeout then + select_timeout = next_pend + end + local ready_r, ready_w = socket.select(reads, writes, select_timeout) for _, sock in ipairs(ready_r or {}) do if sock == server_sock then @@ -1365,9 +1491,11 @@ function lmcp:run() -- Per-tick maintenance. _drain_notifications(self) _heartbeat_tick(self) + _scheduler_tick(self) -- issue #20: resume due dispatch coroutines -- After draining, attempt immediate writes on conns whose write_buf - -- just got bytes (so list_changed / heartbeat appears within one tick). + -- just got bytes (so list_changed / heartbeat / async-completed + -- responses appear within one tick). for sock, conn in pairs(self._conns) do if conn.write_buf ~= "" and conn.state ~= "closing" then pcall(_conn_write, conn) diff --git a/server.lua b/server.lua index 05fa311..f4e64ef 100644 --- a/server.lua +++ b/server.lua @@ -35,7 +35,35 @@ local function tmpname() end end +-- Lazy-required luasocket — only needed in the coroutine path for +-- gettime(). Avoids forcing luasocket as a hard dep at server.lua +-- load time (callers like example_server already require it via lmcp). +local _socket = nil +local function gettime() + if not _socket then _socket = require("socket") end + return _socket.gettime() +end + +-- in_coroutine() — true if we're running inside an lmcp dispatch +-- coroutine (issue #20). Handles both Lua 5.4 (coroutine.running → +-- (co, isMain)) and LuaJIT 5.1 (coroutine.running → nil on main). +local function in_coroutine() + local co, is_main = coroutine.running() + if co == nil then return false end -- 5.1 / LuaJIT main + if is_main then return false end -- 5.4 main thread + return true +end + local function sleep_ms(ms) + -- Coroutine-aware: yield with a wake deadline instead of busy-blocking. + -- The lmcp event loop services I/O for other connections while this + -- coroutine sleeps, then resumes it once the deadline elapses. + -- (Issue #20: gives concurrent tool dispatch without changing handler + -- source code — tools that go through run() get it for free.) + if in_coroutine() then + coroutine.yield({ wake_at = gettime() + (ms / 1000) }) + return + end if WINDOWS then -- ping loopback: ~1s per -n count. For sub-second, use busy-wait. if ms < 500 then @@ -78,6 +106,22 @@ local function run(cmd, timeout_sec) local out_file = base .. ".out" local done_file = base .. ".done" + -- Wall-clock deadline rather than an accumulated interval-counter: + -- when we're inside a dispatch coroutine (issue #20), the scheduler + -- may delay our resume by more than `interval`, so an accumulator + -- diverges from real elapsed. gettime() comparison stays honest in + -- both busy-poll and yield-resume modes. + local started = gettime() + local function poll_loop() + local interval = WINDOWS and 100 or 50 -- ms + while gettime() - started < timeout_sec do + if file_exists(done_file) then return true end + sleep_ms(interval) + if interval < 2000 then interval = math.floor(interval * 1.5) end + end + return false + end + if WINDOWS then -- Write a batch wrapper that runs the command and signals completion local bat_file = base .. ".bat" @@ -89,22 +133,13 @@ local function run(cmd, timeout_sec) bf:close() os.execute('start /B cmd /C "' .. bat_file .. '"') - -- Poll for sentinel - local elapsed = 0 - local interval = 100 -- ms - while elapsed < timeout_sec * 1000 do - if file_exists(done_file) then break end - sleep_ms(interval) - elapsed = elapsed + interval - if interval < 2000 then interval = math.floor(interval * 1.5) end - end - + local completed = poll_loop() local output = read_file(out_file) remove_silent(bat_file) remove_silent(out_file) remove_silent(done_file) - if elapsed >= timeout_sec * 1000 then + if not completed then return output or ("Error: command timed out after " .. timeout_sec .. "s") end return output and output ~= "" and output or "(no output)" @@ -117,20 +152,12 @@ local function run(cmd, timeout_sec) ) os.execute("sh -c '" .. sh_cmd:gsub("'", "'\\''") .. "' &") - local elapsed = 0 - local interval = 50 -- ms - while elapsed < timeout_sec * 1000 do - if file_exists(done_file) then break end - sleep_ms(interval) - elapsed = elapsed + interval - if interval < 2000 then interval = math.floor(interval * 1.5) end - end - + local completed = poll_loop() local output = read_file(out_file) remove_silent(out_file) remove_silent(done_file) - if elapsed >= timeout_sec * 1000 then + if not completed then return output or ("Error: command timed out after " .. timeout_sec .. "s") end return output and output ~= "" and output or "(no output)"