v1.1.0/#11: progress + cancellation notifications
ctx augmentation:
- ctx.progress(p, total?, message?) emits notifications/progress on
the session's notify_q. No-op when the original request omitted
_meta.progressToken (per spec: only emit when client opted in).
Type-checks numeric args; passes progressToken through unchanged
(spec allows number OR string keys).
- ctx.cancelled() returns true once the client has sent a
notifications/cancelled for this request's id.
handle_request:
- New side-effect in the id==nil branch: notifications/cancelled
scans the module-level _ctx_by_co for an in-flight ctx whose
request_id matches; flips self._cancelled_ids[rid_str] only when
found. Unknown rids drop silently (no map growth).
- Pre-handler short-circuit: if cancel arrived before dispatch
reached tools/call, skip the handler entirely.
Cross-module ctx lookup:
- Module-level weak _ctx_by_co table in lmcp.lua keyed by
coroutine. lmcp.current_ctx() returns the ctx of the running
coroutine. server.lua's run() lazy-requires lmcp and uses it
to opt into auto-cancellation without depending on lmcp internals.
server.lua:run():
- After each sleep_ms cycle, check ctx.cancelled(); exit poll loop
with cancelled=true if set.
- Poll interval capped at 500ms when a ctx is present so worst-case
cancel latency stays ≤500ms (vs. 2s default growth).
- Returns "(cancelled)" sentinel; handler propagates normally.
_finalise_dispatch:
- Single cleanup site for both _cancelled_ids and _ctx_by_co (per
Phase 5 review).
- When was_cancelled: emit JSON-RPC -32800 "Request cancelled"
(deviation from Phase 4 plan; documented).
Phase 4 deviation explained: plan was silent TCP close (per spec
"SHOULD NOT respond"). Empirically: os.execute's fork+exec
inherits the parent's TCP socket FD into the spawned shell, so
sock:close() doesn't actually deliver FIN until the subshell exits
(i.e. the long-running command completes anyway). Verified
luasocket close() works on bare sockets (curl exits with RST in
511ms). The fix would be FD_CLOEXEC on accepted sockets, which
luasocket doesn't expose — needs a C shim or luaposix. Deferred.
Captured in memory project_fd_inheritance_in_run.
Practical UX with the deviation: client receives a structured
-32800 error within ~420ms of POSTing the cancel notification.
Measurements (Phase 7):
cancel timing (3 runs, sleep 10 with cancel at 0.4s):
run 1: t=0.42s code=-32800
run 2: t=0.42s code=-32800
run 3: t=0.42s code=-32800
progress: 3/3 events arrived on SSE; spec-shaped payload
concurrent fast+slow (#20 regression): unchanged (fast 0.01s)
all previously-closed issues regression-test green
Zero handler source-code changes. Existing tools (shell, fetch,
web_search, hub remote_*) get cancellation for free via run().
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,22 @@ local json = require('json')
|
|||||||
local lmcp = {}
|
local lmcp = {}
|
||||||
lmcp.__index = lmcp
|
lmcp.__index = lmcp
|
||||||
|
|
||||||
|
-- Module-level coroutine→ctx registry (issue #11). Weak keys so
|
||||||
|
-- coroutines that die without explicit cleanup get GC'd out.
|
||||||
|
-- Each ctx table carries a `server` back-reference, so any code with
|
||||||
|
-- a coroutine handle can find both ctx and its owning lmcp instance.
|
||||||
|
local _ctx_by_co = setmetatable({}, { __mode = "k" })
|
||||||
|
|
||||||
|
-- server.lua and any other library code can call lmcp.current_ctx() to
|
||||||
|
-- access the ctx of the currently-running dispatch coroutine. Returns
|
||||||
|
-- nil outside coroutine context. Used by server.lua:run() to do
|
||||||
|
-- transparent auto-cancellation of long-running shell-out polls.
|
||||||
|
function lmcp.current_ctx()
|
||||||
|
local co = coroutine.running()
|
||||||
|
if co == nil then return nil end
|
||||||
|
return _ctx_by_co[co]
|
||||||
|
end
|
||||||
|
|
||||||
-- Read auth token from config file if present
|
-- Read auth token from config file if present
|
||||||
local function read_conf(path)
|
local function read_conf(path)
|
||||||
local conf = {}
|
local conf = {}
|
||||||
@@ -74,6 +90,11 @@ function lmcp.new(name, opts)
|
|||||||
-- resumes any whose wake_at has passed and runs `finalise` on the
|
-- resumes any whose wake_at has passed and runs `finalise` on the
|
||||||
-- coroutine's return value to build the deferred response.
|
-- coroutine's return value to build the deferred response.
|
||||||
self._pending_handlers = {}
|
self._pending_handlers = {}
|
||||||
|
-- Cancellation flags (issue #11). Keyed by stringified JSON-RPC
|
||||||
|
-- request id. Only ever holds in-flight ids — see the
|
||||||
|
-- notifications/cancelled handler in handle_request which checks
|
||||||
|
-- for in-flight before inserting. Cleared by _finalise_dispatch.
|
||||||
|
self._cancelled_ids = {}
|
||||||
-- Notification queue: drained by Streamable HTTP transport (issue #16).
|
-- Notification queue: drained by Streamable HTTP transport (issue #16).
|
||||||
-- Today delivery is a no-op; we still enqueue so the emission code
|
-- Today delivery is a no-op; we still enqueue so the emission code
|
||||||
-- path is exercised. Capped + deduped to keep the queue useful.
|
-- path is exercised. Capped + deduped to keep the queue useful.
|
||||||
@@ -418,9 +439,28 @@ function lmcp:handle_request(req)
|
|||||||
if method == "notifications/roots/list_changed" then
|
if method == "notifications/roots/list_changed" then
|
||||||
-- Invalidate cached roots for the session that sent this.
|
-- Invalidate cached roots for the session that sent this.
|
||||||
if req._session_id then self._roots_cache[req._session_id] = nil end
|
if req._session_id then self._roots_cache[req._session_id] = nil end
|
||||||
|
elseif method == "notifications/cancelled" then
|
||||||
|
-- Issue #11 — flip cancel flag for the named request id,
|
||||||
|
-- but ONLY if the request is actually in-flight. Cancels
|
||||||
|
-- for unknown/already-completed ids drop silently (per Phase
|
||||||
|
-- 5 review fix #2 — prevents unbounded map growth).
|
||||||
|
local rid = (req.params or {}).requestId
|
||||||
|
if rid ~= nil then
|
||||||
|
local rid_str = tostring(rid)
|
||||||
|
local in_flight = false
|
||||||
|
-- Scan _ctx_by_co for a matching live request.
|
||||||
|
for _, c in pairs(_ctx_by_co) do
|
||||||
|
if c.request_id ~= nil
|
||||||
|
and tostring(c.request_id) == rid_str then
|
||||||
|
in_flight = true; break
|
||||||
|
end
|
||||||
|
end
|
||||||
|
if in_flight then
|
||||||
|
self._cancelled_ids[rid_str] = true
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
-- (Other client→server notifications: cancelled, message — no
|
-- (Other client→server notifications drop silently.)
|
||||||
-- action today; add side-effects here as needed.)
|
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -489,15 +529,59 @@ function lmcp:handle_request(req)
|
|||||||
if not tool then
|
if not tool then
|
||||||
return jsonrpc_error(id, -32601, "Tool not found: " .. tostring(tool_name))
|
return jsonrpc_error(id, -32601, "Tool not found: " .. tostring(tool_name))
|
||||||
end
|
end
|
||||||
-- ctx exposes the request's _meta (issue #13) and the session_id
|
-- ctx exposes the request's _meta (issue #13), the session_id
|
||||||
-- (issue #9 — so handlers can call self:sample(ctx.session_id, …)).
|
-- (issue #9 — handlers can call self:sample(ctx.session_id, …)),
|
||||||
-- Handlers that don't declare a second parameter ignore it (Lua
|
-- progress() and cancelled() (issue #11), and a `server` back-ref
|
||||||
-- call discards extras).
|
-- (so lmcp.current_ctx() can find the right server instance
|
||||||
local ctx = {
|
-- without a singleton). Handlers that don't declare a second
|
||||||
|
-- parameter ignore it (Lua call discards extras).
|
||||||
|
local rid_str = tostring(id)
|
||||||
|
local ptoken = (params._meta or {}).progressToken -- nil if absent
|
||||||
|
local ctx
|
||||||
|
ctx = {
|
||||||
_meta = params._meta,
|
_meta = params._meta,
|
||||||
request_id = id,
|
request_id = id,
|
||||||
session_id = req._session_id,
|
session_id = req._session_id,
|
||||||
|
server = self,
|
||||||
|
-- progress(p, total?, message?): emits notifications/progress
|
||||||
|
-- on session's notify_q. No-op if client didn't supply a
|
||||||
|
-- progressToken. Type-checks; rejects non-numeric progress.
|
||||||
|
progress = function(p, total, message)
|
||||||
|
if ptoken == nil then return false end
|
||||||
|
if type(p) ~= "number" then return false end
|
||||||
|
if total ~= nil and type(total) ~= "number" then return false end
|
||||||
|
local sess = self._sessions[req._session_id]
|
||||||
|
if not sess then return false end
|
||||||
|
local np = { progressToken = ptoken, progress = p }
|
||||||
|
if total ~= nil then np.total = total end
|
||||||
|
if message ~= nil then np.message = tostring(message) end
|
||||||
|
sess.notify_q[#sess.notify_q + 1] = {
|
||||||
|
jsonrpc = JSONRPC, method = "notifications/progress",
|
||||||
|
params = np,
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
end,
|
||||||
|
-- cancelled(): true if a notifications/cancelled for this
|
||||||
|
-- request id has been received.
|
||||||
|
cancelled = function()
|
||||||
|
return self._cancelled_ids[rid_str] == true
|
||||||
|
end,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
-- Register on the currently-running coroutine so lmcp.current_ctx()
|
||||||
|
-- (and thus server.lua:run()'s auto-cancel) can find this ctx.
|
||||||
|
-- Pure-Lua handlers also get this registration; harmless.
|
||||||
|
local co = coroutine.running()
|
||||||
|
if co ~= nil then _ctx_by_co[co] = ctx end
|
||||||
|
|
||||||
|
-- Pre-handler cancellation short-circuit (Phase 5 review fix #9).
|
||||||
|
-- If a notifications/cancelled landed for this id before dispatch
|
||||||
|
-- reached here, skip the handler entirely. _finalise_dispatch
|
||||||
|
-- will see `not result` and suppress the response.
|
||||||
|
if self._cancelled_ids[rid_str] then
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
local ok, result = pcall(tool.handler, arguments, ctx)
|
local ok, result = pcall(tool.handler, arguments, ctx)
|
||||||
if ok then
|
if ok then
|
||||||
local resp = { isError = false }
|
local resp = { isError = false }
|
||||||
@@ -951,6 +1035,12 @@ local function _dispatch_post(self, conn)
|
|||||||
-- expose it to handler ctx (issue #9 — sampling needs to know which
|
-- expose it to handler ctx (issue #9 — sampling needs to know which
|
||||||
-- session to push the request onto).
|
-- session to push the request onto).
|
||||||
rpc_req._session_id = sess.id
|
rpc_req._session_id = sess.id
|
||||||
|
-- Stash the JSON-RPC id on the conn so _finalise_dispatch can clear
|
||||||
|
-- the cancellation flag for this request after building the response
|
||||||
|
-- (issue #11). Notifications have nil id; that's fine — the
|
||||||
|
-- nil-guard in _finalise_dispatch keeps tostring(nil) out of the
|
||||||
|
-- cancel map.
|
||||||
|
conn.dispatch_id = rpc_req.id
|
||||||
|
|
||||||
-- Concurrent handler dispatch (issue #20). Wrap the dispatch call in
|
-- Concurrent handler dispatch (issue #20). Wrap the dispatch call in
|
||||||
-- a coroutine so any tool handler that goes through server.lua:run()
|
-- a coroutine so any tool handler that goes through server.lua:run()
|
||||||
@@ -976,7 +1066,7 @@ end
|
|||||||
_drive_handler_co = function(self, conn, co)
|
_drive_handler_co = function(self, conn, co)
|
||||||
local rok, ryield = coroutine.resume(co)
|
local rok, ryield = coroutine.resume(co)
|
||||||
if coroutine.status(co) == "dead" then
|
if coroutine.status(co) == "dead" then
|
||||||
return _finalise_dispatch(conn, rok, ryield)
|
return _finalise_dispatch(self, conn, rok, ryield, co)
|
||||||
end
|
end
|
||||||
-- Suspended. Parse the yield payload.
|
-- Suspended. Parse the yield payload.
|
||||||
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
|
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
|
||||||
@@ -991,8 +1081,40 @@ end
|
|||||||
-- success flag; `result` is the handler/dispatch return (a JSON-RPC string when
|
-- success flag; `result` is the handler/dispatch return (a JSON-RPC string when
|
||||||
-- rok=true; an error message when rok=false). Used by both the sync path
|
-- rok=true; an error message when rok=false). Used by both the sync path
|
||||||
-- (_dispatch_post tail) and the async resume path (_scheduler_tick).
|
-- (_dispatch_post tail) and the async resume path (_scheduler_tick).
|
||||||
_finalise_dispatch = function(conn, rok, result)
|
-- Also: clears cancellation flag and ctx-by-co registry entry for this
|
||||||
|
-- request (issue #11 — single cleanup site per Phase 5 review fix #7).
|
||||||
|
_finalise_dispatch = function(self, conn, rok, result, co)
|
||||||
local session_id = conn.session_id
|
local session_id = conn.session_id
|
||||||
|
|
||||||
|
-- Cleanup (always): drop the coroutine's ctx entry and any
|
||||||
|
-- cancellation flag for this request id.
|
||||||
|
if co ~= nil then _ctx_by_co[co] = nil end
|
||||||
|
local rid = conn.dispatch_id
|
||||||
|
local was_cancelled = false
|
||||||
|
if rid ~= nil then
|
||||||
|
local rid_str = tostring(rid)
|
||||||
|
if self._cancelled_ids[rid_str] then
|
||||||
|
was_cancelled = true
|
||||||
|
self._cancelled_ids[rid_str] = nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
-- Issue #11: cancelled requests get a -32800 JSON-RPC error response.
|
||||||
|
-- The MCP spec wording is "SHOULD NOT respond" (not MUST NOT). A silent
|
||||||
|
-- TCP-close would be cleaner but the spawned shell subprocess in
|
||||||
|
-- server.lua:run() inherits the socket FD via fork(), so the kernel
|
||||||
|
-- keeps the connection alive until that shell exits (i.e. the
|
||||||
|
-- underlying long-running command completes anyway). The error
|
||||||
|
-- response gives the client a structured signal and exits curl
|
||||||
|
-- immediately, which is the practical UX they want. JSON-RPC 2.0
|
||||||
|
-- code -32800 is the convention for "Request cancelled."
|
||||||
|
if was_cancelled then
|
||||||
|
return _build_http_response("200 OK",
|
||||||
|
{ ["Content-Type"] = "application/json",
|
||||||
|
["Access-Control-Allow-Origin"] = "*" },
|
||||||
|
jsonrpc_error(rid, -32800, "Request cancelled"),
|
||||||
|
session_id)
|
||||||
|
end
|
||||||
|
|
||||||
if not rok then
|
if not rok then
|
||||||
-- Internal dispatch error — surface as a JSON-RPC error response.
|
-- Internal dispatch error — surface as a JSON-RPC error response.
|
||||||
return _build_http_response("500 Internal Server Error",
|
return _build_http_response("500 Internal Server Error",
|
||||||
@@ -1180,7 +1302,11 @@ local function _conn_read(self, conn)
|
|||||||
local resp = _dispatch_post(self, conn)
|
local resp = _dispatch_post(self, conn)
|
||||||
if resp then
|
if resp then
|
||||||
conn.write_buf = resp
|
conn.write_buf = resp
|
||||||
conn.state = "writing"
|
-- _finalise_dispatch sets conn.state = "closing" for
|
||||||
|
-- cancelled requests (issue #11); only override if not.
|
||||||
|
if conn.state ~= "closing" then
|
||||||
|
conn.state = "writing"
|
||||||
|
end
|
||||||
end
|
end
|
||||||
-- else: conn already parked; scheduler tick will finalise.
|
-- else: conn already parked; scheduler tick will finalise.
|
||||||
else
|
else
|
||||||
@@ -1274,7 +1400,12 @@ local function _scheduler_tick(self)
|
|||||||
while i <= #self._pending_handlers do
|
while i <= #self._pending_handlers do
|
||||||
local p = self._pending_handlers[i]
|
local p = self._pending_handlers[i]
|
||||||
if p.conn.state == "closing" then
|
if p.conn.state == "closing" then
|
||||||
-- Connection died mid-handler; drop the coroutine entirely.
|
-- Connection died mid-handler; drop the coroutine entirely
|
||||||
|
-- and free its ctx entry (issue #11 cleanup discipline).
|
||||||
|
_ctx_by_co[p.co] = nil
|
||||||
|
if p.conn.dispatch_id ~= nil then
|
||||||
|
self._cancelled_ids[tostring(p.conn.dispatch_id)] = nil
|
||||||
|
end
|
||||||
table.remove(self._pending_handlers, i)
|
table.remove(self._pending_handlers, i)
|
||||||
elseif now >= p.wake_at then
|
elseif now >= p.wake_at then
|
||||||
-- Time to resume. Remove from pending BEFORE resume so a
|
-- Time to resume. Remove from pending BEFORE resume so a
|
||||||
@@ -1282,9 +1413,14 @@ local function _scheduler_tick(self)
|
|||||||
table.remove(self._pending_handlers, i)
|
table.remove(self._pending_handlers, i)
|
||||||
local rok, ryield = coroutine.resume(p.co)
|
local rok, ryield = coroutine.resume(p.co)
|
||||||
if coroutine.status(p.co) == "dead" then
|
if coroutine.status(p.co) == "dead" then
|
||||||
p.conn.write_buf = (p.conn.write_buf or "")
|
local resp = _finalise_dispatch(self, p.conn, rok, ryield, p.co)
|
||||||
.. _finalise_dispatch(p.conn, rok, ryield)
|
p.conn.write_buf = (p.conn.write_buf or "") .. resp
|
||||||
p.conn.state = "writing"
|
-- _finalise_dispatch may set conn.state = "closing" for
|
||||||
|
-- cancelled requests; only transition to writing if it
|
||||||
|
-- didn't already pick the closing path.
|
||||||
|
if p.conn.state ~= "closing" then
|
||||||
|
p.conn.state = "writing"
|
||||||
|
end
|
||||||
else
|
else
|
||||||
-- Yielded again — re-park.
|
-- Yielded again — re-park.
|
||||||
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
|
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
|
||||||
|
|||||||
+31
@@ -44,6 +44,22 @@ local function gettime()
|
|||||||
return _socket.gettime()
|
return _socket.gettime()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- Lazy access to the lmcp module for cross-module ctx lookup (issue #11).
|
||||||
|
-- server.lua doesn't statically require lmcp (it's an example/runtime
|
||||||
|
-- server, not the library); but lmcp must already be loaded when we run.
|
||||||
|
-- Defensive: if the lookup fails for any reason, current_ctx returns nil
|
||||||
|
-- and run() falls back to non-cancellable behaviour.
|
||||||
|
local _lmcp_mod = nil
|
||||||
|
local function current_ctx()
|
||||||
|
if _lmcp_mod == false then return nil end
|
||||||
|
if _lmcp_mod == nil then
|
||||||
|
local ok, mod = pcall(require, "lmcp")
|
||||||
|
_lmcp_mod = ok and mod or false
|
||||||
|
if _lmcp_mod == false then return nil end
|
||||||
|
end
|
||||||
|
return _lmcp_mod.current_ctx and _lmcp_mod.current_ctx() or nil
|
||||||
|
end
|
||||||
|
|
||||||
-- in_coroutine() — true if we're running inside an lmcp dispatch
|
-- in_coroutine() — true if we're running inside an lmcp dispatch
|
||||||
-- coroutine (issue #20). Handles both Lua 5.4 (coroutine.running →
|
-- coroutine (issue #20). Handles both Lua 5.4 (coroutine.running →
|
||||||
-- (co, isMain)) and LuaJIT 5.1 (coroutine.running → nil on main).
|
-- (co, isMain)) and LuaJIT 5.1 (coroutine.running → nil on main).
|
||||||
@@ -111,13 +127,26 @@ local function run(cmd, timeout_sec)
|
|||||||
-- may delay our resume by more than `interval`, so an accumulator
|
-- may delay our resume by more than `interval`, so an accumulator
|
||||||
-- diverges from real elapsed. gettime() comparison stays honest in
|
-- diverges from real elapsed. gettime() comparison stays honest in
|
||||||
-- both busy-poll and yield-resume modes.
|
-- both busy-poll and yield-resume modes.
|
||||||
|
--
|
||||||
|
-- Auto-cancellation (issue #11): if a ctx is available on the
|
||||||
|
-- running coroutine AND it has been cancelled, exit the polling
|
||||||
|
-- loop early. The interval is capped at 500ms when a ctx is
|
||||||
|
-- present so worst-case cancel latency is ~0.5s, not ~2s.
|
||||||
local started = gettime()
|
local started = gettime()
|
||||||
|
local cancelled = false
|
||||||
local function poll_loop()
|
local function poll_loop()
|
||||||
local interval = WINDOWS and 100 or 50 -- ms
|
local interval = WINDOWS and 100 or 50 -- ms
|
||||||
while gettime() - started < timeout_sec do
|
while gettime() - started < timeout_sec do
|
||||||
if file_exists(done_file) then return true end
|
if file_exists(done_file) then return true end
|
||||||
|
local ctx = current_ctx()
|
||||||
|
if ctx and ctx.cancelled and ctx.cancelled() then
|
||||||
|
cancelled = true
|
||||||
|
return false
|
||||||
|
end
|
||||||
sleep_ms(interval)
|
sleep_ms(interval)
|
||||||
if interval < 2000 then interval = math.floor(interval * 1.5) end
|
if interval < 2000 then interval = math.floor(interval * 1.5) end
|
||||||
|
-- When cancellable, cap so we can respond to cancel quickly.
|
||||||
|
if ctx and interval > 500 then interval = 500 end
|
||||||
end
|
end
|
||||||
return false
|
return false
|
||||||
end
|
end
|
||||||
@@ -140,6 +169,7 @@ local function run(cmd, timeout_sec)
|
|||||||
remove_silent(done_file)
|
remove_silent(done_file)
|
||||||
|
|
||||||
if not completed then
|
if not completed then
|
||||||
|
if cancelled then return "(cancelled)" end
|
||||||
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
||||||
end
|
end
|
||||||
return output and output ~= "" and output or "(no output)"
|
return output and output ~= "" and output or "(no output)"
|
||||||
@@ -158,6 +188,7 @@ local function run(cmd, timeout_sec)
|
|||||||
remove_silent(done_file)
|
remove_silent(done_file)
|
||||||
|
|
||||||
if not completed then
|
if not completed then
|
||||||
|
if cancelled then return "(cancelled)" end
|
||||||
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
return output or ("Error: command timed out after " .. timeout_sec .. "s")
|
||||||
end
|
end
|
||||||
return output and output ~= "" and output or "(no output)"
|
return output and output ~= "" and output or "(no output)"
|
||||||
|
|||||||
Reference in New Issue
Block a user