v1.1.0/#20: concurrent handler dispatch
Replaces the synchronous tools/call path with a coroutine-wrapped
dispatch. The select()-based event loop from v1.0.0-rc1 already
multiplexes I/O; this change extends the same single-thread
cooperative scheduling to tool handler execution.
How:
- server.lua:sleep_ms detects coroutine context and yields with
{ wake_at = gettime() + ms/1000 } instead of blocking. Falls back
to today's busy-blocking sleep when on the main thread (stdio
dispatch, init code).
- server.lua:run() now uses gettime() deltas for timeout accounting
(Phase 5 review fix — the prior interval-accumulator diverged
from wall-clock when scheduler delayed resumes).
- lmcp.lua wraps the handle_request call inside _dispatch_post in a
coroutine. Synchronous completion (no yield) takes the inline-
response path; if the handler yields, the coroutine parks in
self._pending_handlers and the conn enters dispatching_async.
- New _scheduler_tick services pending coroutines whose wake_at has
passed; on completion calls the shared _finalise_dispatch helper
to build the deferred HTTP response (Accept-aware: SSE or JSON).
- select() timeout tightens to the next pending wake_at so short
yields don't pay the full 100ms tick.
Measurement (Phase 7):
before: fast ping during slow shell sleep 3 = 4.28s
after: fast ping during slow shell sleep 3 = 0.01s (~400×)
3 parallel slow shells: 3.77s total wall (was ~9s).
Zero handler source-code changes. Every existing tool that goes
through run() (shell, shell_bg, fetch, web_search, list_dir,
search_files, systeminfo, hub remote_*) gets concurrency for free.
Pure-Lua handlers (ping, read_file, write_file, edit_file) continue
to complete inline. stdio transport stays serialised by design
(single-client per stdio process).
Known limits documented in memory project_handler_coroutines:
- socket.gettime() is wall-clock not monotonic; large NTP steps may
bunch resumes. Acceptable on chrony-slewed fleet.
- Cancellation (#11) is now tractable since the scheduler can flip a
flag between resumes — implementation pending.
- Server-initiated request await (sampling/roots from inside a
handler) still requires a future yield-on-pending helper.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -69,6 +69,11 @@ function lmcp.new(name, opts)
|
|||||||
-- server calls `:roots(session_id, ...)`; invalidated when the client
|
-- server calls `:roots(session_id, ...)`; invalidated when the client
|
||||||
-- sends notifications/roots/list_changed.
|
-- sends notifications/roots/list_changed.
|
||||||
self._roots_cache = {}
|
self._roots_cache = {}
|
||||||
|
-- Pending handler coroutines (issue #20 — concurrent dispatch).
|
||||||
|
-- Each entry: { co, conn, wake_at, finalise }. The scheduler tick
|
||||||
|
-- resumes any whose wake_at has passed and runs `finalise` on the
|
||||||
|
-- coroutine's return value to build the deferred response.
|
||||||
|
self._pending_handlers = {}
|
||||||
-- Notification queue: drained by Streamable HTTP transport (issue #16).
|
-- Notification queue: drained by Streamable HTTP transport (issue #16).
|
||||||
-- Today delivery is a no-op; we still enqueue so the emission code
|
-- Today delivery is a no-op; we still enqueue so the emission code
|
||||||
-- path is exercised. Capped + deduped to keep the queue useful.
|
-- path is exercised. Capped + deduped to keep the queue useful.
|
||||||
@@ -887,6 +892,10 @@ end
|
|||||||
|
|
||||||
-- ---- Dispatch a fully-parsed POST body ----
|
-- ---- Dispatch a fully-parsed POST body ----
|
||||||
|
|
||||||
|
-- Forward declarations: used by _dispatch_post, defined below.
|
||||||
|
local _drive_handler_co
|
||||||
|
local _finalise_dispatch
|
||||||
|
|
||||||
local function _dispatch_post(self, conn)
|
local function _dispatch_post(self, conn)
|
||||||
local body = conn.body
|
local body = conn.body
|
||||||
if body == "" then
|
if body == "" then
|
||||||
@@ -943,27 +952,73 @@ local function _dispatch_post(self, conn)
|
|||||||
-- session to push the request onto).
|
-- session to push the request onto).
|
||||||
rpc_req._session_id = sess.id
|
rpc_req._session_id = sess.id
|
||||||
|
|
||||||
-- Normal client request / notification. Dispatch via handle_request.
|
-- Concurrent handler dispatch (issue #20). Wrap the dispatch call in
|
||||||
local response = self:handle_request(rpc_req)
|
-- a coroutine so any tool handler that goes through server.lua:run()
|
||||||
if not response then
|
-- (which yields when polling its sentinel file) can return control to
|
||||||
|
-- the event loop while it waits. Other connections continue making
|
||||||
|
-- progress.
|
||||||
|
--
|
||||||
|
-- The coroutine resumes itself synchronously the first time. If it
|
||||||
|
-- completes without yielding (pure-Lua handlers, ping, etc.) the
|
||||||
|
-- response is built inline as before. If it yields, we park it in
|
||||||
|
-- self._pending_handlers and return nil — the conn enters
|
||||||
|
-- dispatching_async, the scheduler tick resumes when wake_at passes.
|
||||||
|
local co = coroutine.create(function()
|
||||||
|
return self:handle_request(rpc_req)
|
||||||
|
end)
|
||||||
|
return _drive_handler_co(self, conn, co)
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Resume a handler coroutine until it completes or yields. On completion,
|
||||||
|
-- build the deferred HTTP response (preserving the Accept-aware shape).
|
||||||
|
-- On yield, register in self._pending_handlers and return nil — the conn
|
||||||
|
-- is parked in dispatching_async until the scheduler resumes it.
|
||||||
|
_drive_handler_co = function(self, conn, co)
|
||||||
|
local rok, ryield = coroutine.resume(co)
|
||||||
|
if coroutine.status(co) == "dead" then
|
||||||
|
return _finalise_dispatch(conn, rok, ryield)
|
||||||
|
end
|
||||||
|
-- Suspended. Parse the yield payload.
|
||||||
|
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
|
||||||
|
self._pending_handlers[#self._pending_handlers + 1] = {
|
||||||
|
co = co, conn = conn, wake_at = wake_at,
|
||||||
|
}
|
||||||
|
conn.state = "dispatching_async"
|
||||||
|
return nil -- no write_buf change; conn parks
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Build the HTTP response for a completed dispatch. `rok` is the coroutine.resume
|
||||||
|
-- success flag; `result` is the handler/dispatch return (a JSON-RPC string when
|
||||||
|
-- rok=true; an error message when rok=false). Used by both the sync path
|
||||||
|
-- (_dispatch_post tail) and the async resume path (_scheduler_tick).
|
||||||
|
_finalise_dispatch = function(conn, rok, result)
|
||||||
|
local session_id = conn.session_id
|
||||||
|
if not rok then
|
||||||
|
-- Internal dispatch error — surface as a JSON-RPC error response.
|
||||||
|
return _build_http_response("500 Internal Server Error",
|
||||||
|
{ ["Content-Type"] = "application/json",
|
||||||
|
["Access-Control-Allow-Origin"] = "*" },
|
||||||
|
jsonrpc_error(nil, -32603, "Internal error: " .. tostring(result)),
|
||||||
|
session_id)
|
||||||
|
end
|
||||||
|
if not result then
|
||||||
-- Notification → 202 Accepted, no body.
|
-- Notification → 202 Accepted, no body.
|
||||||
return _build_http_response("202 Accepted",
|
return _build_http_response("202 Accepted",
|
||||||
{ ["Content-Type"] = "application/json",
|
{ ["Content-Type"] = "application/json",
|
||||||
["Access-Control-Allow-Origin"] = "*" },
|
["Access-Control-Allow-Origin"] = "*" },
|
||||||
"", conn.session_id)
|
"", session_id)
|
||||||
end
|
end
|
||||||
|
-- Accept-aware response shape (re-checked at finalise time; survives
|
||||||
-- If client accepts SSE, respond as a single-event SSE stream.
|
-- parking because conn.headers is captured by the closure scope).
|
||||||
-- Otherwise plain JSON body.
|
|
||||||
local accept = conn.headers["accept"] or ""
|
local accept = conn.headers["accept"] or ""
|
||||||
if accept:find("text/event%-stream") then
|
if accept:find("text/event%-stream") then
|
||||||
local hdrs = _build_sse_headers(conn.session_id)
|
local hdrs = _build_sse_headers(session_id)
|
||||||
return hdrs .. _sse_event(response)
|
return hdrs .. _sse_event(result)
|
||||||
end
|
end
|
||||||
return _build_http_response("200 OK",
|
return _build_http_response("200 OK",
|
||||||
{ ["Content-Type"] = "application/json",
|
{ ["Content-Type"] = "application/json",
|
||||||
["Access-Control-Allow-Origin"] = "*" },
|
["Access-Control-Allow-Origin"] = "*" },
|
||||||
response, conn.session_id)
|
result, session_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
local function _dispatch_options(conn)
|
local function _dispatch_options(conn)
|
||||||
@@ -1119,8 +1174,15 @@ local function _conn_read(self, conn)
|
|||||||
conn.state = "sse_open"
|
conn.state = "sse_open"
|
||||||
conn.last_heart = os.time()
|
conn.last_heart = os.time()
|
||||||
elseif conn.method == "POST" then
|
elseif conn.method == "POST" then
|
||||||
conn.write_buf = _dispatch_post(self, conn)
|
-- _dispatch_post may return nil (issue #20) if the handler
|
||||||
conn.state = "writing"
|
-- coroutine yielded. In that case it set conn.state =
|
||||||
|
-- "dispatching_async" itself and parked the coroutine.
|
||||||
|
local resp = _dispatch_post(self, conn)
|
||||||
|
if resp then
|
||||||
|
conn.write_buf = resp
|
||||||
|
conn.state = "writing"
|
||||||
|
end
|
||||||
|
-- else: conn already parked; scheduler tick will finalise.
|
||||||
else
|
else
|
||||||
conn.write_buf = _build_http_response("405 Method Not Allowed",
|
conn.write_buf = _build_http_response("405 Method Not Allowed",
|
||||||
{ ["Content-Type"] = "text/plain",
|
{ ["Content-Type"] = "text/plain",
|
||||||
@@ -1195,6 +1257,63 @@ local function _heartbeat_tick(self)
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- Issue #20 — scheduler tick. Resume any parked dispatch coroutine whose
|
||||||
|
-- wake_at has passed. On completion, build the deferred response and
|
||||||
|
-- queue it for write. If the connection died while the handler was
|
||||||
|
-- parked, drop the coroutine.
|
||||||
|
--
|
||||||
|
-- gettime() is wall-clock (luasocket uses gettimeofday) — NOT monotonic.
|
||||||
|
-- A large NTP step backwards could delay resumes; forwards could bunch
|
||||||
|
-- them. Acceptable for the deployment fleet (chrony slews); revisit if
|
||||||
|
-- a use case appears that needs CLOCK_MONOTONIC.
|
||||||
|
local function _scheduler_tick(self)
|
||||||
|
if not self._pending_handlers[1] then return end
|
||||||
|
local socket = require("socket")
|
||||||
|
local now = socket.gettime()
|
||||||
|
local i = 1
|
||||||
|
while i <= #self._pending_handlers do
|
||||||
|
local p = self._pending_handlers[i]
|
||||||
|
if p.conn.state == "closing" then
|
||||||
|
-- Connection died mid-handler; drop the coroutine entirely.
|
||||||
|
table.remove(self._pending_handlers, i)
|
||||||
|
elseif now >= p.wake_at then
|
||||||
|
-- Time to resume. Remove from pending BEFORE resume so a
|
||||||
|
-- re-yielding handler re-adds itself cleanly via _drive_handler_co.
|
||||||
|
table.remove(self._pending_handlers, i)
|
||||||
|
local rok, ryield = coroutine.resume(p.co)
|
||||||
|
if coroutine.status(p.co) == "dead" then
|
||||||
|
p.conn.write_buf = (p.conn.write_buf or "")
|
||||||
|
.. _finalise_dispatch(p.conn, rok, ryield)
|
||||||
|
p.conn.state = "writing"
|
||||||
|
else
|
||||||
|
-- Yielded again — re-park.
|
||||||
|
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
|
||||||
|
self._pending_handlers[#self._pending_handlers + 1] = {
|
||||||
|
co = p.co, conn = p.conn, wake_at = wake_at,
|
||||||
|
}
|
||||||
|
end
|
||||||
|
else
|
||||||
|
i = i + 1
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
-- Returns the earliest pending wake_at as an offset from now, or nil if
|
||||||
|
-- no handlers are parked. Used to tighten the select() timeout so the
|
||||||
|
-- scheduler wakes on the right beat.
|
||||||
|
local function _next_pending_delay(self)
|
||||||
|
if not self._pending_handlers[1] then return nil end
|
||||||
|
local socket = require("socket")
|
||||||
|
local now = socket.gettime()
|
||||||
|
local earliest = math.huge
|
||||||
|
for _, p in ipairs(self._pending_handlers) do
|
||||||
|
if p.wake_at < earliest then earliest = p.wake_at end
|
||||||
|
end
|
||||||
|
local d = earliest - now
|
||||||
|
if d < 0 then return 0 end
|
||||||
|
return d
|
||||||
|
end
|
||||||
|
|
||||||
-- ---- Public: server-initiated request (for sampling/roots/etc.) ----
|
-- ---- Public: server-initiated request (for sampling/roots/etc.) ----
|
||||||
-- Enqueues a JSON-RPC request on the session's SSE stream. The callback
|
-- Enqueues a JSON-RPC request on the session's SSE stream. The callback
|
||||||
-- fires when the client POSTs back the response (matched by id).
|
-- fires when the client POSTs back the response (matched by id).
|
||||||
@@ -1322,7 +1441,14 @@ function lmcp:run()
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
local ready_r, ready_w = socket.select(reads, writes, SELECT_TIMEOUT)
|
-- Tighten select timeout if a parked handler is due sooner.
|
||||||
|
-- Otherwise a 100ms tick adds 100ms latency to short shell-tool runs.
|
||||||
|
local select_timeout = SELECT_TIMEOUT
|
||||||
|
local next_pend = _next_pending_delay(self)
|
||||||
|
if next_pend and next_pend < select_timeout then
|
||||||
|
select_timeout = next_pend
|
||||||
|
end
|
||||||
|
local ready_r, ready_w = socket.select(reads, writes, select_timeout)
|
||||||
|
|
||||||
for _, sock in ipairs(ready_r or {}) do
|
for _, sock in ipairs(ready_r or {}) do
|
||||||
if sock == server_sock then
|
if sock == server_sock then
|
||||||
@@ -1365,9 +1491,11 @@ function lmcp:run()
|
|||||||
-- Per-tick maintenance.
|
-- Per-tick maintenance.
|
||||||
_drain_notifications(self)
|
_drain_notifications(self)
|
||||||
_heartbeat_tick(self)
|
_heartbeat_tick(self)
|
||||||
|
_scheduler_tick(self) -- issue #20: resume due dispatch coroutines
|
||||||
|
|
||||||
-- After draining, attempt immediate writes on conns whose write_buf
|
-- After draining, attempt immediate writes on conns whose write_buf
|
||||||
-- just got bytes (so list_changed / heartbeat appears within one tick).
|
-- just got bytes (so list_changed / heartbeat / async-completed
|
||||||
|
-- responses appear within one tick).
|
||||||
for sock, conn in pairs(self._conns) do
|
for sock, conn in pairs(self._conns) do
|
||||||
if conn.write_buf ~= "" and conn.state ~= "closing" then
|
if conn.write_buf ~= "" and conn.state ~= "closing" then
|
||||||
pcall(_conn_write, conn)
|
pcall(_conn_write, conn)
|
||||||
|
|||||||
+48
-21
@@ -35,7 +35,35 @@ local function tmpname()
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- Lazy-required luasocket — only needed in the coroutine path for
|
||||||
|
-- gettime(). Avoids forcing luasocket as a hard dep at server.lua
|
||||||
|
-- load time (callers like example_server already require it via lmcp).
|
||||||
|
local _socket = nil
|
||||||
|
local function gettime()
|
||||||
|
if not _socket then _socket = require("socket") end
|
||||||
|
return _socket.gettime()
|
||||||
|
end
|
||||||
|
|
||||||
|
-- in_coroutine() — true if we're running inside an lmcp dispatch
|
||||||
|
-- coroutine (issue #20). Handles both Lua 5.4 (coroutine.running →
|
||||||
|
-- (co, isMain)) and LuaJIT 5.1 (coroutine.running → nil on main).
|
||||||
|
local function in_coroutine()
|
||||||
|
local co, is_main = coroutine.running()
|
||||||
|
if co == nil then return false end -- 5.1 / LuaJIT main
|
||||||
|
if is_main then return false end -- 5.4 main thread
|
||||||
|
return true
|
||||||
|
end
|
||||||
|
|
||||||
local function sleep_ms(ms)
|
local function sleep_ms(ms)
|
||||||
|
-- Coroutine-aware: yield with a wake deadline instead of busy-blocking.
|
||||||
|
-- The lmcp event loop services I/O for other connections while this
|
||||||
|
-- coroutine sleeps, then resumes it once the deadline elapses.
|
||||||
|
-- (Issue #20: gives concurrent tool dispatch without changing handler
|
||||||
|
-- source code — tools that go through run() get it for free.)
|
||||||
|
if in_coroutine() then
|
||||||
|
coroutine.yield({ wake_at = gettime() + (ms / 1000) })
|
||||||
|
return
|
||||||
|
end
|
||||||
if WINDOWS then
|
if WINDOWS then
|
||||||
-- ping loopback: ~1s per -n count. For sub-second, use busy-wait.
|
-- ping loopback: ~1s per -n count. For sub-second, use busy-wait.
|
||||||
if ms < 500 then
|
if ms < 500 then
|
||||||
@@ -78,6 +106,22 @@ local function run(cmd, timeout_sec)
|
|||||||
local out_file = base .. ".out"
|
local out_file = base .. ".out"
|
||||||
local done_file = base .. ".done"
|
local done_file = base .. ".done"
|
||||||
|
|
||||||
|
-- Wall-clock deadline rather than an accumulated interval-counter:
|
||||||
|
-- when we're inside a dispatch coroutine (issue #20), the scheduler
|
||||||
|
-- may delay our resume by more than `interval`, so an accumulator
|
||||||
|
-- diverges from real elapsed. gettime() comparison stays honest in
|
||||||
|
-- both busy-poll and yield-resume modes.
|
||||||
|
local started = gettime()
|
||||||
|
local function poll_loop()
|
||||||
|
local interval = WINDOWS and 100 or 50 -- ms
|
||||||
|
while gettime() - started < timeout_sec do
|
||||||
|
if file_exists(done_file) then return true end
|
||||||
|
sleep_ms(interval)
|
||||||
|
if interval < 2000 then interval = math.floor(interval * 1.5) end
|
||||||
|
end
|
||||||
|
return false
|
||||||
|
end
|
||||||
|
|
||||||
if WINDOWS then
|
if WINDOWS then
|
||||||
-- Write a batch wrapper that runs the command and signals completion
|
-- Write a batch wrapper that runs the command and signals completion
|
||||||
local bat_file = base .. ".bat"
|
local bat_file = base .. ".bat"
|
||||||
@@ -89,22 +133,13 @@ local function run(cmd, timeout_sec)
|
|||||||
bf:close()
|
bf:close()
|
||||||
os.execute('start /B cmd /C "' .. bat_file .. '"')
|
os.execute('start /B cmd /C "' .. bat_file .. '"')
|
||||||
|
|
||||||
-- Poll for sentinel
|
local completed = poll_loop()
|
||||||
local elapsed = 0
|
|
||||||
local interval = 100 -- ms
|
|
||||||
while elapsed < timeout_sec * 1000 do
|
|
||||||
if file_exists(done_file) then break end
|
|
||||||
sleep_ms(interval)
|
|
||||||
elapsed = elapsed + interval
|
|
||||||
if interval < 2000 then interval = math.floor(interval * 1.5) end
|
|
||||||
end
|
|
||||||
|
|
||||||
local output = read_file(out_file)
|
local output = read_file(out_file)
|
||||||
remove_silent(bat_file)
|
remove_silent(bat_file)
|
||||||
remove_silent(out_file)
|
remove_silent(out_file)
|
||||||
remove_silent(done_file)
|
remove_silent(done_file)
|
||||||
|
|
||||||
if elapsed >= timeout_sec * 1000 then
|
if not completed then
|
||||||
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
||||||
end
|
end
|
||||||
return output and output ~= "" and output or "(no output)"
|
return output and output ~= "" and output or "(no output)"
|
||||||
@@ -117,20 +152,12 @@ local function run(cmd, timeout_sec)
|
|||||||
)
|
)
|
||||||
os.execute("sh -c '" .. sh_cmd:gsub("'", "'\\''") .. "' &")
|
os.execute("sh -c '" .. sh_cmd:gsub("'", "'\\''") .. "' &")
|
||||||
|
|
||||||
local elapsed = 0
|
local completed = poll_loop()
|
||||||
local interval = 50 -- ms
|
|
||||||
while elapsed < timeout_sec * 1000 do
|
|
||||||
if file_exists(done_file) then break end
|
|
||||||
sleep_ms(interval)
|
|
||||||
elapsed = elapsed + interval
|
|
||||||
if interval < 2000 then interval = math.floor(interval * 1.5) end
|
|
||||||
end
|
|
||||||
|
|
||||||
local output = read_file(out_file)
|
local output = read_file(out_file)
|
||||||
remove_silent(out_file)
|
remove_silent(out_file)
|
||||||
remove_silent(done_file)
|
remove_silent(done_file)
|
||||||
|
|
||||||
if elapsed >= timeout_sec * 1000 then
|
if not completed then
|
||||||
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
||||||
end
|
end
|
||||||
return output and output ~= "" and output or "(no output)"
|
return output and output ~= "" and output or "(no output)"
|
||||||
|
|||||||
Reference in New Issue
Block a user