From 55ead8041fbc9ed987b5a564df61ef3fb60b22c2 Mon Sep 17 00:00:00 2001 From: Markus Fritsche Date: Sun, 17 May 2026 19:29:00 +0000 Subject: [PATCH] v1.1.0/#11: progress + cancellation notifications MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ctx augmentation: - ctx.progress(p, total?, message?) emits notifications/progress on the session's notify_q. No-op when the original request omitted _meta.progressToken (per spec: only emit when client opted in). Type-checks numeric args; passes progressToken through unchanged (spec allows number OR string keys). - ctx.cancelled() returns true once the client has sent a notifications/cancelled for this request's id. handle_request: - New side-effect in the id==nil branch: notifications/cancelled scans the module-level _ctx_by_co for an in-flight ctx whose request_id matches; flips self._cancelled_ids[rid_str] only when found. Unknown rids drop silently (no map growth). - Pre-handler short-circuit: if cancel arrived before dispatch reached tools/call, skip the handler entirely. Cross-module ctx lookup: - Module-level weak _ctx_by_co table in lmcp.lua keyed by coroutine. lmcp.current_ctx() returns the ctx of the running coroutine. server.lua's run() lazy-requires lmcp and uses it to opt into auto-cancellation without depending on lmcp internals. server.lua:run(): - After each sleep_ms cycle, check ctx.cancelled(); exit poll loop with cancelled=true if set. - Poll interval capped at 500ms when a ctx is present so worst-case cancel latency stays ≤500ms (vs. 2s default growth). - Returns "(cancelled)" sentinel; handler propagates normally. _finalise_dispatch: - Single cleanup site for both _cancelled_ids and _ctx_by_co (per Phase 5 review). - When was_cancelled: emit JSON-RPC -32800 "Request cancelled" (deviation from Phase 4 plan; documented). Phase 4 deviation explained: plan was silent TCP close (per spec "SHOULD NOT respond"). Empirically: os.execute's fork+exec inherits the parent's TCP socket FD into the spawned shell, so sock:close() doesn't actually deliver FIN until the subshell exits (i.e. the long-running command completes anyway). Verified luasocket close() works on bare sockets (curl exits with RST in 511ms). The fix would be FD_CLOEXEC on accepted sockets, which luasocket doesn't expose — needs a C shim or luaposix. Deferred. Captured in memory project_fd_inheritance_in_run. Practical UX with the deviation: client receives a structured -32800 error within ~420ms of POSTing the cancel notification. Measurements (Phase 7): cancel timing (3 runs, sleep 10 with cancel at 0.4s): run 1: t=0.42s code=-32800 run 2: t=0.42s code=-32800 run 3: t=0.42s code=-32800 progress: 3/3 events arrived on SSE; spec-shaped payload concurrent fast+slow (#20 regression): unchanged (fast 0.01s) all previously-closed issues regression-test green Zero handler source-code changes. Existing tools (shell, fetch, web_search, hub remote_*) get cancellation for free via run(). Co-Authored-By: Claude Opus 4.7 (1M context) --- lmcp.lua | 164 ++++++++++++++++++++++++++++++++++++++++++++++++----- server.lua | 31 ++++++++++ 2 files changed, 181 insertions(+), 14 deletions(-) diff --git a/lmcp.lua b/lmcp.lua index 90a31e5..2a8ac73 100644 --- a/lmcp.lua +++ b/lmcp.lua @@ -7,6 +7,22 @@ local json = require('json') local lmcp = {} lmcp.__index = lmcp +-- Module-level coroutine→ctx registry (issue #11). Weak keys so +-- coroutines that die without explicit cleanup get GC'd out. +-- Each ctx table carries a `server` back-reference, so any code with +-- a coroutine handle can find both ctx and its owning lmcp instance. +local _ctx_by_co = setmetatable({}, { __mode = "k" }) + +-- server.lua and any other library code can call lmcp.current_ctx() to +-- access the ctx of the currently-running dispatch coroutine. Returns +-- nil outside coroutine context. Used by server.lua:run() to do +-- transparent auto-cancellation of long-running shell-out polls. +function lmcp.current_ctx() + local co = coroutine.running() + if co == nil then return nil end + return _ctx_by_co[co] +end + -- Read auth token from config file if present local function read_conf(path) local conf = {} @@ -74,6 +90,11 @@ function lmcp.new(name, opts) -- resumes any whose wake_at has passed and runs `finalise` on the -- coroutine's return value to build the deferred response. self._pending_handlers = {} + -- Cancellation flags (issue #11). Keyed by stringified JSON-RPC + -- request id. Only ever holds in-flight ids — see the + -- notifications/cancelled handler in handle_request which checks + -- for in-flight before inserting. Cleared by _finalise_dispatch. + self._cancelled_ids = {} -- Notification queue: drained by Streamable HTTP transport (issue #16). -- Today delivery is a no-op; we still enqueue so the emission code -- path is exercised. Capped + deduped to keep the queue useful. @@ -418,9 +439,28 @@ function lmcp:handle_request(req) if method == "notifications/roots/list_changed" then -- Invalidate cached roots for the session that sent this. if req._session_id then self._roots_cache[req._session_id] = nil end + elseif method == "notifications/cancelled" then + -- Issue #11 — flip cancel flag for the named request id, + -- but ONLY if the request is actually in-flight. Cancels + -- for unknown/already-completed ids drop silently (per Phase + -- 5 review fix #2 — prevents unbounded map growth). + local rid = (req.params or {}).requestId + if rid ~= nil then + local rid_str = tostring(rid) + local in_flight = false + -- Scan _ctx_by_co for a matching live request. + for _, c in pairs(_ctx_by_co) do + if c.request_id ~= nil + and tostring(c.request_id) == rid_str then + in_flight = true; break + end + end + if in_flight then + self._cancelled_ids[rid_str] = true + end + end end - -- (Other client→server notifications: cancelled, message — no - -- action today; add side-effects here as needed.) + -- (Other client→server notifications drop silently.) return nil end @@ -489,15 +529,59 @@ function lmcp:handle_request(req) if not tool then return jsonrpc_error(id, -32601, "Tool not found: " .. tostring(tool_name)) end - -- ctx exposes the request's _meta (issue #13) and the session_id - -- (issue #9 — so handlers can call self:sample(ctx.session_id, …)). - -- Handlers that don't declare a second parameter ignore it (Lua - -- call discards extras). - local ctx = { + -- ctx exposes the request's _meta (issue #13), the session_id + -- (issue #9 — handlers can call self:sample(ctx.session_id, …)), + -- progress() and cancelled() (issue #11), and a `server` back-ref + -- (so lmcp.current_ctx() can find the right server instance + -- without a singleton). Handlers that don't declare a second + -- parameter ignore it (Lua call discards extras). + local rid_str = tostring(id) + local ptoken = (params._meta or {}).progressToken -- nil if absent + local ctx + ctx = { _meta = params._meta, request_id = id, session_id = req._session_id, + server = self, + -- progress(p, total?, message?): emits notifications/progress + -- on session's notify_q. No-op if client didn't supply a + -- progressToken. Type-checks; rejects non-numeric progress. + progress = function(p, total, message) + if ptoken == nil then return false end + if type(p) ~= "number" then return false end + if total ~= nil and type(total) ~= "number" then return false end + local sess = self._sessions[req._session_id] + if not sess then return false end + local np = { progressToken = ptoken, progress = p } + if total ~= nil then np.total = total end + if message ~= nil then np.message = tostring(message) end + sess.notify_q[#sess.notify_q + 1] = { + jsonrpc = JSONRPC, method = "notifications/progress", + params = np, + } + return true + end, + -- cancelled(): true if a notifications/cancelled for this + -- request id has been received. + cancelled = function() + return self._cancelled_ids[rid_str] == true + end, } + + -- Register on the currently-running coroutine so lmcp.current_ctx() + -- (and thus server.lua:run()'s auto-cancel) can find this ctx. + -- Pure-Lua handlers also get this registration; harmless. + local co = coroutine.running() + if co ~= nil then _ctx_by_co[co] = ctx end + + -- Pre-handler cancellation short-circuit (Phase 5 review fix #9). + -- If a notifications/cancelled landed for this id before dispatch + -- reached here, skip the handler entirely. _finalise_dispatch + -- will see `not result` and suppress the response. + if self._cancelled_ids[rid_str] then + return nil + end + local ok, result = pcall(tool.handler, arguments, ctx) if ok then local resp = { isError = false } @@ -951,6 +1035,12 @@ local function _dispatch_post(self, conn) -- expose it to handler ctx (issue #9 — sampling needs to know which -- session to push the request onto). rpc_req._session_id = sess.id + -- Stash the JSON-RPC id on the conn so _finalise_dispatch can clear + -- the cancellation flag for this request after building the response + -- (issue #11). Notifications have nil id; that's fine — the + -- nil-guard in _finalise_dispatch keeps tostring(nil) out of the + -- cancel map. + conn.dispatch_id = rpc_req.id -- Concurrent handler dispatch (issue #20). Wrap the dispatch call in -- a coroutine so any tool handler that goes through server.lua:run() @@ -976,7 +1066,7 @@ end _drive_handler_co = function(self, conn, co) local rok, ryield = coroutine.resume(co) if coroutine.status(co) == "dead" then - return _finalise_dispatch(conn, rok, ryield) + return _finalise_dispatch(self, conn, rok, ryield, co) end -- Suspended. Parse the yield payload. local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0 @@ -991,8 +1081,40 @@ end -- success flag; `result` is the handler/dispatch return (a JSON-RPC string when -- rok=true; an error message when rok=false). Used by both the sync path -- (_dispatch_post tail) and the async resume path (_scheduler_tick). -_finalise_dispatch = function(conn, rok, result) +-- Also: clears cancellation flag and ctx-by-co registry entry for this +-- request (issue #11 — single cleanup site per Phase 5 review fix #7). +_finalise_dispatch = function(self, conn, rok, result, co) local session_id = conn.session_id + + -- Cleanup (always): drop the coroutine's ctx entry and any + -- cancellation flag for this request id. + if co ~= nil then _ctx_by_co[co] = nil end + local rid = conn.dispatch_id + local was_cancelled = false + if rid ~= nil then + local rid_str = tostring(rid) + if self._cancelled_ids[rid_str] then + was_cancelled = true + self._cancelled_ids[rid_str] = nil + end + end + -- Issue #11: cancelled requests get a -32800 JSON-RPC error response. + -- The MCP spec wording is "SHOULD NOT respond" (not MUST NOT). A silent + -- TCP-close would be cleaner but the spawned shell subprocess in + -- server.lua:run() inherits the socket FD via fork(), so the kernel + -- keeps the connection alive until that shell exits (i.e. the + -- underlying long-running command completes anyway). The error + -- response gives the client a structured signal and exits curl + -- immediately, which is the practical UX they want. JSON-RPC 2.0 + -- code -32800 is the convention for "Request cancelled." + if was_cancelled then + return _build_http_response("200 OK", + { ["Content-Type"] = "application/json", + ["Access-Control-Allow-Origin"] = "*" }, + jsonrpc_error(rid, -32800, "Request cancelled"), + session_id) + end + if not rok then -- Internal dispatch error — surface as a JSON-RPC error response. return _build_http_response("500 Internal Server Error", @@ -1180,7 +1302,11 @@ local function _conn_read(self, conn) local resp = _dispatch_post(self, conn) if resp then conn.write_buf = resp - conn.state = "writing" + -- _finalise_dispatch sets conn.state = "closing" for + -- cancelled requests (issue #11); only override if not. + if conn.state ~= "closing" then + conn.state = "writing" + end end -- else: conn already parked; scheduler tick will finalise. else @@ -1274,7 +1400,12 @@ local function _scheduler_tick(self) while i <= #self._pending_handlers do local p = self._pending_handlers[i] if p.conn.state == "closing" then - -- Connection died mid-handler; drop the coroutine entirely. + -- Connection died mid-handler; drop the coroutine entirely + -- and free its ctx entry (issue #11 cleanup discipline). + _ctx_by_co[p.co] = nil + if p.conn.dispatch_id ~= nil then + self._cancelled_ids[tostring(p.conn.dispatch_id)] = nil + end table.remove(self._pending_handlers, i) elseif now >= p.wake_at then -- Time to resume. Remove from pending BEFORE resume so a @@ -1282,9 +1413,14 @@ local function _scheduler_tick(self) table.remove(self._pending_handlers, i) local rok, ryield = coroutine.resume(p.co) if coroutine.status(p.co) == "dead" then - p.conn.write_buf = (p.conn.write_buf or "") - .. _finalise_dispatch(p.conn, rok, ryield) - p.conn.state = "writing" + local resp = _finalise_dispatch(self, p.conn, rok, ryield, p.co) + p.conn.write_buf = (p.conn.write_buf or "") .. resp + -- _finalise_dispatch may set conn.state = "closing" for + -- cancelled requests; only transition to writing if it + -- didn't already pick the closing path. + if p.conn.state ~= "closing" then + p.conn.state = "writing" + end else -- Yielded again — re-park. local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0 diff --git a/server.lua b/server.lua index f4e64ef..531b2e7 100644 --- a/server.lua +++ b/server.lua @@ -44,6 +44,22 @@ local function gettime() return _socket.gettime() end +-- Lazy access to the lmcp module for cross-module ctx lookup (issue #11). +-- server.lua doesn't statically require lmcp (it's an example/runtime +-- server, not the library); but lmcp must already be loaded when we run. +-- Defensive: if the lookup fails for any reason, current_ctx returns nil +-- and run() falls back to non-cancellable behaviour. +local _lmcp_mod = nil +local function current_ctx() + if _lmcp_mod == false then return nil end + if _lmcp_mod == nil then + local ok, mod = pcall(require, "lmcp") + _lmcp_mod = ok and mod or false + if _lmcp_mod == false then return nil end + end + return _lmcp_mod.current_ctx and _lmcp_mod.current_ctx() or nil +end + -- in_coroutine() — true if we're running inside an lmcp dispatch -- coroutine (issue #20). Handles both Lua 5.4 (coroutine.running → -- (co, isMain)) and LuaJIT 5.1 (coroutine.running → nil on main). @@ -111,13 +127,26 @@ local function run(cmd, timeout_sec) -- may delay our resume by more than `interval`, so an accumulator -- diverges from real elapsed. gettime() comparison stays honest in -- both busy-poll and yield-resume modes. + -- + -- Auto-cancellation (issue #11): if a ctx is available on the + -- running coroutine AND it has been cancelled, exit the polling + -- loop early. The interval is capped at 500ms when a ctx is + -- present so worst-case cancel latency is ~0.5s, not ~2s. local started = gettime() + local cancelled = false local function poll_loop() local interval = WINDOWS and 100 or 50 -- ms while gettime() - started < timeout_sec do if file_exists(done_file) then return true end + local ctx = current_ctx() + if ctx and ctx.cancelled and ctx.cancelled() then + cancelled = true + return false + end sleep_ms(interval) if interval < 2000 then interval = math.floor(interval * 1.5) end + -- When cancellable, cap so we can respond to cancel quickly. + if ctx and interval > 500 then interval = 500 end end return false end @@ -140,6 +169,7 @@ local function run(cmd, timeout_sec) remove_silent(done_file) if not completed then + if cancelled then return "(cancelled)" end return output or ("Error: command timed out after " .. timeout_sec .. "s") end return output and output ~= "" and output or "(no output)" @@ -158,6 +188,7 @@ local function run(cmd, timeout_sec) remove_silent(done_file) if not completed then + if cancelled then return "(cancelled)" end return output or ("Error: command timed out after " .. timeout_sec .. "s") end return output and output ~= "" and output or "(no output)"