Files
lmcp/lmcp.lua
T
test0r 55ead8041f v1.1.0/#11: progress + cancellation notifications
ctx augmentation:
- ctx.progress(p, total?, message?) emits notifications/progress on
  the session's notify_q. No-op when the original request omitted
  _meta.progressToken (per spec: only emit when client opted in).
  Type-checks numeric args; passes progressToken through unchanged
  (spec allows number OR string keys).
- ctx.cancelled() returns true once the client has sent a
  notifications/cancelled for this request's id.

handle_request:
- New side-effect in the id==nil branch: notifications/cancelled
  scans the module-level _ctx_by_co for an in-flight ctx whose
  request_id matches; flips self._cancelled_ids[rid_str] only when
  found. Unknown rids drop silently (no map growth).
- Pre-handler short-circuit: if cancel arrived before dispatch
  reached tools/call, skip the handler entirely.

Cross-module ctx lookup:
- Module-level weak _ctx_by_co table in lmcp.lua keyed by
  coroutine. lmcp.current_ctx() returns the ctx of the running
  coroutine. server.lua's run() lazy-requires lmcp and uses it
  to opt into auto-cancellation without depending on lmcp internals.

server.lua:run():
- After each sleep_ms cycle, check ctx.cancelled(); exit poll loop
  with cancelled=true if set.
- Poll interval capped at 500ms when a ctx is present so worst-case
  cancel latency stays ≤500ms (vs. 2s default growth).
- Returns "(cancelled)" sentinel; handler propagates normally.

_finalise_dispatch:
- Single cleanup site for both _cancelled_ids and _ctx_by_co (per
  Phase 5 review).
- When was_cancelled: emit JSON-RPC -32800 "Request cancelled"
  (deviation from Phase 4 plan; documented).

Phase 4 deviation explained: plan was silent TCP close (per spec
"SHOULD NOT respond"). Empirically: os.execute's fork+exec
inherits the parent's TCP socket FD into the spawned shell, so
sock:close() doesn't actually deliver FIN until the subshell exits
(i.e. the long-running command completes anyway). Verified
luasocket close() works on bare sockets (curl exits with RST in
511ms). The fix would be FD_CLOEXEC on accepted sockets, which
luasocket doesn't expose — needs a C shim or luaposix. Deferred.
Captured in memory project_fd_inheritance_in_run.

Practical UX with the deviation: client receives a structured
-32800 error within ~420ms of POSTing the cancel notification.

Measurements (Phase 7):
  cancel timing (3 runs, sleep 10 with cancel at 0.4s):
    run 1: t=0.42s code=-32800
    run 2: t=0.42s code=-32800
    run 3: t=0.42s code=-32800
  progress: 3/3 events arrived on SSE; spec-shaped payload
  concurrent fast+slow (#20 regression): unchanged (fast 0.01s)
  all previously-closed issues regression-test green

Zero handler source-code changes. Existing tools (shell, fetch,
web_search, hub remote_*) get cancellation for free via run().

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-17 19:29:00 +00:00

1703 lines
68 KiB
Lua

-- lmcp.lua — Lightweight MCP server in pure Lua
-- Zero external dependencies (uses built-in socket or luasocket)
-- SPDX-License-Identifier: MIT
local json = require('json')
local lmcp = {}
lmcp.__index = lmcp
-- Module-level coroutine→ctx registry (issue #11). Weak keys so
-- coroutines that die without explicit cleanup get GC'd out.
-- Each ctx table carries a `server` back-reference, so any code with
-- a coroutine handle can find both ctx and its owning lmcp instance.
local _ctx_by_co = setmetatable({}, { __mode = "k" })
-- server.lua and any other library code can call lmcp.current_ctx() to
-- access the ctx of the currently-running dispatch coroutine. Returns
-- nil outside coroutine context. Used by server.lua:run() to do
-- transparent auto-cancellation of long-running shell-out polls.
function lmcp.current_ctx()
local co = coroutine.running()
if co == nil then return nil end
return _ctx_by_co[co]
end
-- Read auth token from config file if present
local function read_conf(path)
local conf = {}
local f = io.open(path, 'r')
if not f then return conf end
for line in f:lines() do
local k, v = line:match('^%s*(%S+)%s*=%s*(.-)%s*$')
if k and not k:match('^#') then conf[k] = v end
end
f:close()
return conf
end
-- Protocol constants
local MCP_VERSION = "2025-06-18"
local JSONRPC = "2.0"
function lmcp.new(name, opts)
opts = opts or {}
local self = setmetatable({}, lmcp)
self.name = name or "lmcp"
self.version = opts.version or "0.1.0"
self.host = opts.host or "0.0.0.0"
self.port = opts.port or 8080
self.tools = {}
-- Resources primitive (MCP 2025-06-18 §Server/Resources). Storage is
-- always present; capability is advertised iff `opts.resources` is
-- truthy OR at least one resource/template has been registered by
-- initialize time. The opt-in covers servers that register resources
-- after :run() — strict clients cache the capability set from
-- initialize and won't call resources/list otherwise.
self.resources = {}
self.resource_templates = {}
self._force_resources_cap = opts.resources and true or false
-- Prompts primitive (MCP 2025-06-18 §Server/Prompts). Same capability
-- discipline as resources: advertised iff at least one is registered
-- OR opts.prompts forces it (strict clients cache the capability set
-- from initialize and won't call prompts/list otherwise).
self.prompts = {}
self._force_prompts_cap = opts.prompts and true or false
-- Completion (MCP issue #7). Keyed by "ref_type:ref_id:arg_name".
-- ref_type ∈ {"ref/prompt", "ref/resource"}, ref_id is the prompt
-- name or resource-template uriTemplate, arg_name is the parameter
-- whose value the client wants completions for.
self.completions = {}
self._force_completions_cap = opts.completions and true or false
-- Logging (MCP issue #8). RFC-5424 severity levels in ascending order.
-- Client sets minimum level via logging/setLevel; messages below are
-- dropped. Default level "warning" until the client picks one. Capability
-- is opt-in via opts.logging (servers that want a structured log channel
-- must declare it at construction; we don't presume).
self._log_level = "warning"
self._force_logging_cap = opts.logging and true or false
-- Client capabilities captured at initialize time (MCP issue #9).
-- Used to guard server-initiated requests (sampling, roots) — we don't
-- issue them unless the client claimed support during handshake.
self._client_caps = {}
self._client_info = {}
-- Roots cache (MCP issue #10), keyed by session_id. Populated when the
-- server calls `:roots(session_id, ...)`; invalidated when the client
-- sends notifications/roots/list_changed.
self._roots_cache = {}
-- Pending handler coroutines (issue #20 — concurrent dispatch).
-- Each entry: { co, conn, wake_at, finalise }. The scheduler tick
-- resumes any whose wake_at has passed and runs `finalise` on the
-- coroutine's return value to build the deferred response.
self._pending_handlers = {}
-- Cancellation flags (issue #11). Keyed by stringified JSON-RPC
-- request id. Only ever holds in-flight ids — see the
-- notifications/cancelled handler in handle_request which checks
-- for in-flight before inserting. Cleared by _finalise_dispatch.
self._cancelled_ids = {}
-- Notification queue: drained by Streamable HTTP transport (issue #16).
-- Today delivery is a no-op; we still enqueue so the emission code
-- path is exercised. Capped + deduped to keep the queue useful.
self._notify_queue = {}
self._notify_cap = 100
self._session_id = nil
-- Auth: explicit opt > conf file > LMCP_TOKEN env > nil (no auth)
if opts.auth_token then
self._auth_token = opts.auth_token
elseif opts.conf then
local conf = read_conf(opts.conf)
self._auth_token = conf['.godparticle']
else
local env_token = os.getenv("LMCP_TOKEN")
if env_token and env_token ~= "" then
self._auth_token = env_token
end
end
return self
end
-- Cursor pagination helper for list methods (MCP issue #12). Cursor is
-- an opaque base64 string per spec; we use it to encode the next offset.
-- Page size default 50 covers every plausible lmcp deployment today;
-- larger registered sets are still handled correctly.
local _PAGE_SIZE = 50
local function paginate(items, cursor)
local n = #items
-- Decode incoming cursor. Malformed → start from 0.
local offset = 0
if type(cursor) == "string" and cursor ~= "" then
local mime_ok, mime = pcall(require, "mime")
if mime_ok then
local decoded = mime.unb64(cursor) or ""
local parsed = tonumber(decoded)
if parsed and parsed >= 0 and parsed <= n then
offset = math.floor(parsed)
end
end
end
local page = {}
local stop = math.min(offset + _PAGE_SIZE, n)
for i = offset + 1, stop do page[#page + 1] = items[i] end
local next_cursor
if stop < n then
local mime_ok, mime = pcall(require, "mime")
if mime_ok then
next_cursor = (mime.b64(tostring(stop)) or ""):gsub("[\r\n]", "")
end
end
return page, next_cursor
end
-- Register a tool.
-- opts (optional, 5th arg):
-- annotations = { title?, readOnlyHint?, destructiveHint?,
-- idempotentHint?, openWorldHint? }
-- outputSchema = <JSON Schema> -- shape of structuredContent (issue #13)
-- Handler signature: function(args, ctx) where ctx = { _meta = … } from
-- the request. ctx is optional — existing 1-arg handlers keep working.
-- Handler return shapes:
-- string → single text content block (no structured)
-- { type = "...", ... } → typed content block (image/etc.); no structured
-- table without `type` → JSON-encoded into text content AND mirrored as
-- structuredContent (issue #13; spec-strict clients get first-class
-- structured access)
function lmcp:tool(name, description, params_schema, handler, opts)
self.tools[name] = {
name = name,
description = description,
inputSchema = params_schema or { type = "object", properties = {} },
handler = handler,
annotations = opts and opts.annotations or nil,
outputSchema = opts and opts.outputSchema or nil,
}
return self
end
-- Register a resource (exact URI). opts: { name, description?, mimeType? }.
-- Handler signature: function(args) — args is always a table (empty for
-- literal resources, populated with template captures for templates).
function lmcp:resource(uri, opts, handler)
opts = opts or {}
if type(uri) ~= "string" or uri == "" then
error("resource: uri required")
end
if type(handler) ~= "function" then
error("resource: handler required")
end
self.resources[uri] = {
uri = uri,
name = opts.name or uri,
description = opts.description,
mimeType = opts.mimeType,
handler = handler,
}
self:notify_resources_changed()
return self
end
-- Register a resource template (RFC 6570 subset). Each {name} captures
-- one greedy segment. opts: { name, description?, mimeType? }.
-- Handler signature: function(args) — args[name] = captured_string.
-- Limitation: adjacent captures ({a}{b}) bind ambiguously; register
-- separate resources if you need precision.
function lmcp:resource_template(uriTemplate, opts, handler)
opts = opts or {}
if type(uriTemplate) ~= "string" or uriTemplate == "" then
error("resource_template: uriTemplate required")
end
if type(handler) ~= "function" then
error("resource_template: handler required")
end
-- Compile template → Lua pattern + arg-name list.
local arg_names = {}
-- Escape every Lua-pattern magic char EXCEPT {} (handled separately).
local escaped = uriTemplate:gsub("([%%%(%)%.%+%-%*%?%[%]%^%$])", "%%%1")
local pattern = escaped:gsub("{([%w_]+)}", function(name)
arg_names[#arg_names + 1] = name
return "(.+)"
end)
pattern = "^" .. pattern .. "$"
self.resource_templates[#self.resource_templates + 1] = {
uriTemplate = uriTemplate,
name = opts.name or uriTemplate,
description = opts.description,
mimeType = opts.mimeType,
pattern = pattern,
arg_names = arg_names,
handler = handler,
}
self:notify_resources_changed()
return self
end
-- Internal: enqueue a parameterless list_changed notification, with tail
-- dedup (consecutive notifications of the same kind collapse — they carry
-- no state, so N → 1 "go refetch"). Cap is a backstop, not the policy.
-- params omitted on purpose (json.lua empty-table → [] gotcha, would be
-- malformed JSON-RPC; spec allows omitting params for parameterless).
local function _enqueue_list_changed(self, method)
local tail = self._notify_queue[#self._notify_queue]
if tail and tail.method == method then return end
if #self._notify_queue >= self._notify_cap then
table.remove(self._notify_queue, 1)
end
self._notify_queue[#self._notify_queue + 1] = {
jsonrpc = JSONRPC,
method = method,
}
end
function lmcp:notify_resources_changed()
_enqueue_list_changed(self, "notifications/resources/list_changed")
end
function lmcp:notify_prompts_changed()
_enqueue_list_changed(self, "notifications/prompts/list_changed")
end
-- Resolve a URI to (resource_or_template_entry, args). Literal match wins
-- over templates; templates tried in registration order.
local function _resolve_resource(self, uri)
local lit = self.resources[uri]
if lit then return lit, {} end
for _, t in ipairs(self.resource_templates) do
local captures = { string.match(uri, t.pattern) }
if captures[1] then
local args = {}
for i, name in ipairs(t.arg_names) do
args[name] = captures[i]
end
return t, args
end
end
return nil
end
-- Run a resource handler under pcall, normalise the return into a single
-- contents item. Returns (item_table, nil) on success, (nil, err_msg)
-- on failure.
local function _read_resource(entry, args, uri)
local ok, result = pcall(entry.handler, args)
if not ok then
return nil, "resource handler error: " .. tostring(result)
end
if result == nil then
return nil, "resource handler returned no content"
end
if type(result) == "string" then
return { uri = uri, mimeType = entry.mimeType or "text/plain", text = result }
end
if type(result) == "table" then
local mt = result.mimeType or entry.mimeType
if result.text ~= nil then
return { uri = uri, mimeType = mt or "text/plain", text = result.text }
end
if result.blob ~= nil then
return { uri = uri, mimeType = mt or "application/octet-stream",
blob = result.blob }
end
if result.blob_bytes ~= nil then
local mime_ok, mime = pcall(require, "mime")
if not mime_ok then
return nil, "mime module unavailable; pre-encode blob and return { blob = … }"
end
local b64 = mime.b64(result.blob_bytes) or ""
b64 = b64:gsub("[\r\n]", "") -- some luasocket builds line-wrap
return { uri = uri, mimeType = mt or "application/octet-stream", blob = b64 }
end
end
return nil, "resource handler returned unsupported shape"
end
-- Register a prompt (MCP issue #6). opts: { description?, arguments? }
-- where arguments is a list of { name, description?, required? }.
-- Handler signature: function(args) where args[name] = supplied string.
-- Return either:
-- string → single user text message
-- { description?, messages = {...} } → full custom shape (passthrough)
function lmcp:prompt(name, opts, handler)
opts = opts or {}
if type(name) ~= "string" or name == "" then
error("prompt: name required")
end
if type(handler) ~= "function" then
error("prompt: handler required")
end
self.prompts[name] = {
name = name,
description = opts.description,
arguments = opts.arguments, -- list of { name, description?, required? }
handler = handler,
}
self:notify_prompts_changed()
return self
end
-- Run a prompt handler under pcall, normalise the return to the spec
-- shape { description?, messages = [{ role, content = { type, text } }] }.
-- Returns (table, nil) on success, (nil, err_msg) on failure.
local function _get_prompt(entry, args)
local ok, result = pcall(entry.handler, args or {})
if not ok then
return nil, "prompt handler error: " .. tostring(result)
end
if result == nil then
return nil, "prompt handler returned no content"
end
if type(result) == "string" then
return {
description = entry.description,
messages = {{
role = "user",
content = { type = "text", text = result },
}},
}
end
if type(result) == "table" and type(result.messages) == "table" then
return result
end
return nil, "prompt handler returned unsupported shape (expected string or { messages = … })"
end
-- Register a completion handler for a prompt/resource-template argument
-- (MCP issue #7). ref_type ∈ {"ref/prompt", "ref/resource"}; ref_id is
-- the prompt name or the resource-template uriTemplate. fn signature:
-- fn(value, ctx) → list of candidate strings (server filters / sorts as
-- it likes; spec allows up to 100). ctx mirrors the spec context object
-- (currently { arguments = {...} } of previously-completed sibling args).
function lmcp:complete(ref_type, ref_id, arg_name, fn)
if ref_type ~= "ref/prompt" and ref_type ~= "ref/resource" then
error("complete: ref_type must be 'ref/prompt' or 'ref/resource'")
end
if type(ref_id) ~= "string" or ref_id == "" then
error("complete: ref_id required")
end
if type(arg_name) ~= "string" or arg_name == "" then
error("complete: arg_name required")
end
if type(fn) ~= "function" then
error("complete: fn required")
end
self.completions[ref_type .. ":" .. ref_id .. ":" .. arg_name] = fn
return self
end
-- Log severity ordering (RFC 5424). Lower index = more severe.
local LOG_LEVELS = {
emergency = 1, alert = 2, critical = 3, error = 4,
warning = 5, notice = 6, info = 7, debug = 8,
}
-- Emit a structured log record. Below the client-set level → drop.
-- Today's delivery channel is stderr; once issue #16 lands the
-- bidirectional transport, this also enqueues notifications/message
-- for the client. data is free-form (string, table, etc.).
function lmcp:log(level, logger, data)
local lvl = LOG_LEVELS[level]
local thr = LOG_LEVELS[self._log_level] or LOG_LEVELS.warning
if not lvl or lvl > thr then return end -- below threshold; drop
-- stderr fallback: human-readable. The structured form goes on the
-- notifications queue for the future Streamable-HTTP delivery path
-- (issue #16). Cap + drop-oldest like list_changed.
io.stderr:write(string.format("lmcp[%s/%s]: %s\n",
level, tostring(logger or "-"),
type(data) == "string" and data or json.encode(data)))
if #self._notify_queue >= self._notify_cap then
table.remove(self._notify_queue, 1)
end
self._notify_queue[#self._notify_queue + 1] = {
jsonrpc = JSONRPC,
method = "notifications/message",
params = { level = level, logger = logger, data = data },
}
end
-- JSON-RPC response helpers
local function jsonrpc_result(id, result)
return json.encode({ jsonrpc = JSONRPC, id = id, result = result })
end
local function jsonrpc_error(id, code, message)
return json.encode({
jsonrpc = JSONRPC,
id = id,
error = { code = code, message = message },
})
end
-- Handle a single JSON-RPC request
function lmcp:handle_request(req)
local method = req.method
local id = req.id -- nil for notifications
-- JSON-RPC 2.0: notifications (no id) MUST NOT receive a response.
-- Some notifications carry server-side side effects (cache invalidation,
-- progress signals); handle those before the early return. Anything
-- not recognised silently drops — clients expect no response either way.
if id == nil then
if method == "notifications/roots/list_changed" then
-- Invalidate cached roots for the session that sent this.
if req._session_id then self._roots_cache[req._session_id] = nil end
elseif method == "notifications/cancelled" then
-- Issue #11 — flip cancel flag for the named request id,
-- but ONLY if the request is actually in-flight. Cancels
-- for unknown/already-completed ids drop silently (per Phase
-- 5 review fix #2 — prevents unbounded map growth).
local rid = (req.params or {}).requestId
if rid ~= nil then
local rid_str = tostring(rid)
local in_flight = false
-- Scan _ctx_by_co for a matching live request.
for _, c in pairs(_ctx_by_co) do
if c.request_id ~= nil
and tostring(c.request_id) == rid_str then
in_flight = true; break
end
end
if in_flight then
self._cancelled_ids[rid_str] = true
end
end
end
-- (Other client→server notifications drop silently.)
return nil
end
if method == "initialize" then
self._session_id = self._session_id or tostring(os.time())
-- Capture client capabilities (MCP issue #9 — sampling needs to
-- know if the client supports it before issuing a request).
local p = req.params or {}
self._client_caps = p.capabilities or {}
self._client_info = p.clientInfo or {}
local caps = { tools = { listChanged = false } }
if self._force_resources_cap
or next(self.resources)
or self.resource_templates[1] then
caps.resources = { listChanged = true, subscribe = false }
end
if self._force_prompts_cap or next(self.prompts) then
caps.prompts = { listChanged = true }
end
if self._force_completions_cap or next(self.completions) then
-- Spec uses an empty object as the "supported" marker.
-- json.empty_object → {} (not [] from the empty-table gotcha).
caps.completions = json.empty_object
end
if self._force_logging_cap then
caps.logging = json.empty_object
end
return jsonrpc_result(id, {
protocolVersion = MCP_VERSION,
capabilities = caps,
serverInfo = {
name = self.name,
version = self.version,
},
})
elseif method == "ping" then
return jsonrpc_result(id, json.empty_object)
elseif method == "tools/list" then
local tool_list = {}
for _, t in pairs(self.tools) do
local entry = {
name = t.name,
description = t.description,
inputSchema = t.inputSchema,
}
-- Emit annotations / outputSchema only when registered. Empty
-- Lua tables would JSON-encode as [] (see
-- project_json_empty_table_gotcha memory) and break
-- spec-strict clients.
if t.annotations then entry.annotations = t.annotations end
if t.outputSchema then entry.outputSchema = t.outputSchema end
tool_list[#tool_list + 1] = entry
end
local page, next_cursor = paginate(tool_list, (req.params or {}).cursor)
local result = { tools = page }
if next_cursor then result.nextCursor = next_cursor end
return jsonrpc_result(id, result)
elseif method == "tools/call" then
local params = req.params or {}
local tool_name = params.name
local arguments = params.arguments or {}
local tool = self.tools[tool_name]
if not tool then
return jsonrpc_error(id, -32601, "Tool not found: " .. tostring(tool_name))
end
-- ctx exposes the request's _meta (issue #13), the session_id
-- (issue #9 — handlers can call self:sample(ctx.session_id, …)),
-- progress() and cancelled() (issue #11), and a `server` back-ref
-- (so lmcp.current_ctx() can find the right server instance
-- without a singleton). Handlers that don't declare a second
-- parameter ignore it (Lua call discards extras).
local rid_str = tostring(id)
local ptoken = (params._meta or {}).progressToken -- nil if absent
local ctx
ctx = {
_meta = params._meta,
request_id = id,
session_id = req._session_id,
server = self,
-- progress(p, total?, message?): emits notifications/progress
-- on session's notify_q. No-op if client didn't supply a
-- progressToken. Type-checks; rejects non-numeric progress.
progress = function(p, total, message)
if ptoken == nil then return false end
if type(p) ~= "number" then return false end
if total ~= nil and type(total) ~= "number" then return false end
local sess = self._sessions[req._session_id]
if not sess then return false end
local np = { progressToken = ptoken, progress = p }
if total ~= nil then np.total = total end
if message ~= nil then np.message = tostring(message) end
sess.notify_q[#sess.notify_q + 1] = {
jsonrpc = JSONRPC, method = "notifications/progress",
params = np,
}
return true
end,
-- cancelled(): true if a notifications/cancelled for this
-- request id has been received.
cancelled = function()
return self._cancelled_ids[rid_str] == true
end,
}
-- Register on the currently-running coroutine so lmcp.current_ctx()
-- (and thus server.lua:run()'s auto-cancel) can find this ctx.
-- Pure-Lua handlers also get this registration; harmless.
local co = coroutine.running()
if co ~= nil then _ctx_by_co[co] = ctx end
-- Pre-handler cancellation short-circuit (Phase 5 review fix #9).
-- If a notifications/cancelled landed for this id before dispatch
-- reached here, skip the handler entirely. _finalise_dispatch
-- will see `not result` and suppress the response.
if self._cancelled_ids[rid_str] then
return nil
end
local ok, result = pcall(tool.handler, arguments, ctx)
if ok then
local resp = { isError = false }
local meta_out
if type(result) == "string" then
resp.content = {{ type = "text", text = result }}
elseif type(result) == "table" and result.type then
-- Typed content block (e.g. image). No structured emission.
resp.content = { result }
elseif type(result) == "table" then
-- Issue #13: extract response _meta before mirroring as
-- structuredContent, so server metadata doesn't leak into
-- the structured payload.
meta_out = result._meta
local clean = result
if meta_out ~= nil then
clean = {}
for k, v in pairs(result) do
if k ~= "_meta" then clean[k] = v end
end
end
resp.content = {{ type = "text", text = json.encode(clean) }}
resp.structuredContent = clean
else
resp.content = {{ type = "text", text = tostring(result) }}
end
if meta_out ~= nil then resp._meta = meta_out end
return jsonrpc_result(id, resp)
else
return jsonrpc_result(id, {
content = {{ type = "text", text = "Error: " .. tostring(result) }},
isError = true,
})
end
elseif method == "resources/list" then
local out = {}
for _, r in pairs(self.resources) do
out[#out + 1] = {
uri = r.uri,
name = r.name,
description = r.description,
mimeType = r.mimeType,
}
end
local page, next_cursor = paginate(out, (req.params or {}).cursor)
local result = { resources = page }
if next_cursor then result.nextCursor = next_cursor end
return jsonrpc_result(id, result)
elseif method == "resources/templates/list" then
local out = {}
for _, t in ipairs(self.resource_templates) do
out[#out + 1] = {
uriTemplate = t.uriTemplate,
name = t.name,
description = t.description,
mimeType = t.mimeType,
}
end
local page, next_cursor = paginate(out, (req.params or {}).cursor)
local result = { resourceTemplates = page }
if next_cursor then result.nextCursor = next_cursor end
return jsonrpc_result(id, result)
elseif method == "resources/read" then
local params = req.params or {}
local uri = params.uri
if type(uri) ~= "string" or uri == "" then
return jsonrpc_error(id, -32602, "uri required (string)")
end
local entry, args = _resolve_resource(self, uri)
if not entry then
-- -32002 is the MCP-conventional "Resource not found" code;
-- distinct from -32602 "Invalid params" so retry/UX logic
-- can tell a malformed URI from a missing one.
return jsonrpc_error(id, -32002, "Resource not found: " .. uri)
end
local item, err = _read_resource(entry, args, uri)
if not item then
return jsonrpc_error(id, -32603, err)
end
return jsonrpc_result(id, { contents = { item } })
elseif method == "prompts/list" then
local out = {}
for _, p in pairs(self.prompts) do
local entry = { name = p.name, description = p.description }
if p.arguments then entry.arguments = p.arguments end
out[#out + 1] = entry
end
local page, next_cursor = paginate(out, (req.params or {}).cursor)
local result = { prompts = page }
if next_cursor then result.nextCursor = next_cursor end
return jsonrpc_result(id, result)
elseif method == "prompts/get" then
local params = req.params or {}
local name = params.name
if type(name) ~= "string" or name == "" then
return jsonrpc_error(id, -32602, "name required (string)")
end
local entry = self.prompts[name]
if not entry then
return jsonrpc_error(id, -32002, "Prompt not found: " .. name)
end
local result, err = _get_prompt(entry, params.arguments)
if not result then
return jsonrpc_error(id, -32603, err)
end
return jsonrpc_result(id, result)
elseif method == "logging/setLevel" then
local lvl = (req.params or {}).level
if type(lvl) ~= "string" or not LOG_LEVELS[lvl] then
return jsonrpc_error(id, -32602,
"level must be one of: debug, info, notice, warning, error, critical, alert, emergency")
end
self._log_level = lvl
return jsonrpc_result(id, json.empty_object)
elseif method == "completion/complete" then
local params = req.params or {}
local ref = params.ref or {}
local arg = params.argument or {}
local ref_id = ref.name or ref.uri or ref.uriTemplate or ""
if type(ref.type) ~= "string" or ref_id == ""
or type(arg.name) ~= "string" then
return jsonrpc_error(id, -32602,
"ref.type, ref.name/uri/uriTemplate, and argument.name required")
end
local fn = self.completions[ref.type .. ":" .. ref_id .. ":" .. arg.name]
if not fn then
-- No completer registered → return empty values (spec-allowed;
-- clients typically render no suggestions and let the user type).
return jsonrpc_result(id, {
completion = { values = {}, hasMore = false },
})
end
local value = arg.value or ""
local ok, values = pcall(fn, value, params.context or {})
if not ok then
return jsonrpc_error(id, -32603,
"completion handler error: " .. tostring(values))
end
if type(values) ~= "table" then
return jsonrpc_error(id, -32603,
"completion handler must return a table of strings")
end
-- Spec cap: at most 100 values per response. If more, truncate
-- and set hasMore=true so the client knows there's more.
local total = #values
local has_more = false
if total > 100 then
local out = {}
for i = 1, 100 do out[i] = values[i] end
values = out
has_more = true
end
return jsonrpc_result(id, {
completion = { values = values, total = total, hasMore = has_more },
})
else
return jsonrpc_error(id, -32601, "Method not found: " .. tostring(method))
end
end
-- ---- HTTP Server (raw sockets) ----
local function parse_http_request(client)
-- Read request line
local line, err = client:receive('*l')
if not line then return nil, err end
local method, path, version = line:match('^(%S+)%s+(%S+)%s+(%S+)')
if not method then return nil, 'bad request line' end
-- Read headers
local headers = {}
while true do
line, err = client:receive('*l')
if not line or line == '' then break end
local k, v = line:match('^(%S+):%s*(.*)')
if k then headers[k:lower()] = v end
end
-- Read body
local body = ''
local content_length = tonumber(headers['content-length'] or 0)
if content_length > 0 then
body, err = client:receive(content_length)
if not body then return nil, err end
end
return {
method = method,
path = path,
version = version,
headers = headers,
body = body,
}
end
local function send_response(client, status, headers, body)
local parts = { string.format('HTTP/1.1 %s', status) }
headers['Content-Length'] = tostring(#body)
headers['Connection'] = 'close'
for k, v in pairs(headers) do
parts[#parts + 1] = k .. ': ' .. v
end
parts[#parts + 1] = ''
parts[#parts + 1] = body
client:send(table.concat(parts, '\r\n'))
end
local function send_sse_event(client, data)
client:send('event: message\r\ndata: ' .. data .. '\r\n\r\n')
end
-- ---- Streamable HTTP transport (MCP issue #16) ----
--
-- select()-based single-thread event loop. All sockets non-blocking.
-- Per-connection FSM: reading_head → reading_body → dispatching → writing | sse_open.
--
-- Session model: each session has a Mcp-Session-Id; at most one open
-- SSE stream (the GET /mcp connection). Server-initiated requests
-- (sampling, roots — issues #9/#10) ride on the SSE stream and await
-- client responses via subsequent POSTs.
--
-- Queue routing:
-- self._notify_queue (global): list_changed, log messages → fans out
-- to ALL open sse_conn (broadcast).
-- sess.notify_q (per-session): server-initiated requests → only that
-- session's sse_conn.
--
-- write_buf discipline: append-only via `..`; consume via :sub(offset+1)
-- after partial-send. NEVER reorder or rewrite past bytes.
local READ_BUF_CAP = 64 * 1024 -- 64 KiB for header section
local BODY_CAP = 8 * 1024 * 1024 -- 8 MiB for request body
local WRITE_BUF_CAP = 1 * 1024 * 1024 -- 1 MiB per-conn write buffer
local HEARTBEAT_SEC = 30
local SESSION_IDLE_SEC = 60
local SELECT_TIMEOUT = 0.1
local function _new_session_id()
return string.format("%d-%09d", os.time(), math.random(0, 999999999))
end
local function _http_status_line(status)
return "HTTP/1.1 " .. status .. "\r\n"
end
local function _http_header_block(headers)
local parts = {}
for k, v in pairs(headers) do
parts[#parts + 1] = k .. ": " .. v
end
parts[#parts + 1] = "" -- blank line
parts[#parts + 1] = "" -- trailing CRLF
return table.concat(parts, "\r\n")
end
local function _queue_write(conn, s)
-- Append-only. If cap exceeded, evict the connection.
if #conn.write_buf + #s > WRITE_BUF_CAP then
conn.state = "closing"
return false
end
conn.write_buf = conn.write_buf .. s
return true
end
local function _build_http_response(status, headers, body, session_id)
headers = headers or {}
headers["Content-Length"] = tostring(#body)
headers["Connection"] = "close"
if session_id then headers["Mcp-Session-Id"] = session_id end
return _http_status_line(status) .. _http_header_block(headers) .. body
end
local function _build_sse_headers(session_id)
local h = {
["Content-Type"] = "text/event-stream",
["Cache-Control"] = "no-cache",
["Connection"] = "keep-alive",
["Access-Control-Allow-Origin"] = "*",
}
if session_id then h["Mcp-Session-Id"] = session_id end
return _http_status_line("200 OK") .. _http_header_block(h)
end
-- Format a JSON-RPC payload as one SSE message event.
local function _sse_event(payload_str)
return "event: message\r\ndata: " .. payload_str .. "\r\n\r\n"
end
-- Format a server-initiated JSON-RPC request from the notification queue
-- entry table { jsonrpc, id?, method, params? }.
local function _encode_notify(entry)
return json.encode(entry)
end
-- ---- Per-connection FSM helpers ----
local function _parse_request_head(conn)
-- Look for \r\n\r\n. If found, parse request line + headers.
local sep = conn.buf:find("\r\n\r\n", 1, true)
if not sep then return false end -- not complete yet
local head = conn.buf:sub(1, sep - 1)
conn.buf = conn.buf:sub(sep + 4) -- preserve any body bytes already buffered
local lines = {}
for line in head:gmatch("[^\r\n]+") do lines[#lines + 1] = line end
if #lines == 0 then return nil, "empty head" end
local method, path, version = lines[1]:match("^(%S+)%s+(%S+)%s+(%S+)")
if not method then return nil, "bad request line" end
conn.method, conn.path, conn.version = method, path, version
local headers = {}
for i = 2, #lines do
local k, v = lines[i]:match("^(%S+):%s*(.*)")
if k then headers[k:lower()] = v end
end
conn.headers = headers
conn.body_remain = tonumber(headers["content-length"] or 0) or 0
if conn.body_remain > BODY_CAP then
return nil, "body too large"
end
return true
end
local function _check_auth(self, conn)
if not self._auth_token then return true end
if conn.method == "OPTIONS" then return true end
local auth = conn.headers["authorization"] or ""
local token = auth:match("^Bearer%s+(.+)$")
return token == self._auth_token
end
-- ---- Session lookup / create ----
-- Resolve session by id. Returns the session table on success, or
-- (nil, "unknown") if `sid` is non-nil but no such session exists
-- (spec: 400/404 — caller decides). With nil `sid` (sessionless POST,
-- backwards compat), auto-issues a fresh session.
local function _resolve_session(self, sid)
if sid then
local sess = self._sessions[sid]
if not sess then return nil, "unknown" end
sess.last_activity = os.time()
return sess
end
local new_id = _new_session_id()
self._sessions[new_id] = {
id = new_id,
sse_conn = nil,
pending = {}, -- req_id → on_response
notify_q = {}, -- per-session, server-initiated requests
created = os.time(),
last_activity = os.time(),
}
return self._sessions[new_id]
end
-- For `initialize` specifically, always mint a new session id regardless
-- of any client-provided header (the spec lets the server choose).
local function _create_session(self)
local new_id = _new_session_id()
self._sessions[new_id] = {
id = new_id,
sse_conn = nil,
pending = {},
notify_q = {},
created = os.time(),
last_activity = os.time(),
}
return self._sessions[new_id]
end
-- Scan all sessions for a pending server-initiated request matching id.
-- Returns (session, callback) or nil.
local function _find_pending(self, req_id)
for _, sess in pairs(self._sessions) do
local cb = sess.pending[req_id]
if cb then return sess, cb end
end
return nil
end
-- ---- Dispatch a fully-parsed POST body ----
-- Forward declarations: used by _dispatch_post, defined below.
local _drive_handler_co
local _finalise_dispatch
local function _dispatch_post(self, conn)
local body = conn.body
if body == "" then
return _build_http_response("400 Bad Request",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
jsonrpc_error(nil, -32700, "Empty body"), nil)
end
local ok, rpc_req = pcall(json.decode, body)
if not ok then
return _build_http_response("400 Bad Request",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
jsonrpc_error(nil, -32700, "Parse error"), nil)
end
-- Server-initiated response routing: if id matches a pending
-- server-initiated request in ANY session, this POST is a response.
if rpc_req.id and (rpc_req.result ~= nil or rpc_req.error ~= nil)
and not rpc_req.method then
local sess, cb = _find_pending(self, rpc_req.id)
if sess then
sess.pending[rpc_req.id] = nil
pcall(cb, rpc_req)
return _build_http_response("202 Accepted",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
"", sess.id)
end
end
-- Session resolution (deferred from header-parse time so we can detect
-- `initialize`). Rules:
-- - `initialize`: always mint a fresh session, ignoring any client sid
-- - other methods, sid absent: auto-issue (backwards compat)
-- - other methods, sid known: use it
-- - other methods, sid unknown: 404
local sess
if rpc_req.method == "initialize" then
sess = _create_session(self)
else
local s, serr = _resolve_session(self, conn.requested_sid)
if not s then
return _build_http_response("404 Not Found",
{ ["Content-Type"] = "text/plain",
["Access-Control-Allow-Origin"] = "*" },
"Session not found: " .. tostring(conn.requested_sid), nil)
end
sess = s
end
conn.session_id = sess.id
-- Stash session id on the request so handle_request → tools/call can
-- expose it to handler ctx (issue #9 — sampling needs to know which
-- session to push the request onto).
rpc_req._session_id = sess.id
-- Stash the JSON-RPC id on the conn so _finalise_dispatch can clear
-- the cancellation flag for this request after building the response
-- (issue #11). Notifications have nil id; that's fine — the
-- nil-guard in _finalise_dispatch keeps tostring(nil) out of the
-- cancel map.
conn.dispatch_id = rpc_req.id
-- Concurrent handler dispatch (issue #20). Wrap the dispatch call in
-- a coroutine so any tool handler that goes through server.lua:run()
-- (which yields when polling its sentinel file) can return control to
-- the event loop while it waits. Other connections continue making
-- progress.
--
-- The coroutine resumes itself synchronously the first time. If it
-- completes without yielding (pure-Lua handlers, ping, etc.) the
-- response is built inline as before. If it yields, we park it in
-- self._pending_handlers and return nil — the conn enters
-- dispatching_async, the scheduler tick resumes when wake_at passes.
local co = coroutine.create(function()
return self:handle_request(rpc_req)
end)
return _drive_handler_co(self, conn, co)
end
-- Resume a handler coroutine until it completes or yields. On completion,
-- build the deferred HTTP response (preserving the Accept-aware shape).
-- On yield, register in self._pending_handlers and return nil — the conn
-- is parked in dispatching_async until the scheduler resumes it.
_drive_handler_co = function(self, conn, co)
local rok, ryield = coroutine.resume(co)
if coroutine.status(co) == "dead" then
return _finalise_dispatch(self, conn, rok, ryield, co)
end
-- Suspended. Parse the yield payload.
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
self._pending_handlers[#self._pending_handlers + 1] = {
co = co, conn = conn, wake_at = wake_at,
}
conn.state = "dispatching_async"
return nil -- no write_buf change; conn parks
end
-- Build the HTTP response for a completed dispatch. `rok` is the coroutine.resume
-- success flag; `result` is the handler/dispatch return (a JSON-RPC string when
-- rok=true; an error message when rok=false). Used by both the sync path
-- (_dispatch_post tail) and the async resume path (_scheduler_tick).
-- Also: clears cancellation flag and ctx-by-co registry entry for this
-- request (issue #11 — single cleanup site per Phase 5 review fix #7).
_finalise_dispatch = function(self, conn, rok, result, co)
local session_id = conn.session_id
-- Cleanup (always): drop the coroutine's ctx entry and any
-- cancellation flag for this request id.
if co ~= nil then _ctx_by_co[co] = nil end
local rid = conn.dispatch_id
local was_cancelled = false
if rid ~= nil then
local rid_str = tostring(rid)
if self._cancelled_ids[rid_str] then
was_cancelled = true
self._cancelled_ids[rid_str] = nil
end
end
-- Issue #11: cancelled requests get a -32800 JSON-RPC error response.
-- The MCP spec wording is "SHOULD NOT respond" (not MUST NOT). A silent
-- TCP-close would be cleaner but the spawned shell subprocess in
-- server.lua:run() inherits the socket FD via fork(), so the kernel
-- keeps the connection alive until that shell exits (i.e. the
-- underlying long-running command completes anyway). The error
-- response gives the client a structured signal and exits curl
-- immediately, which is the practical UX they want. JSON-RPC 2.0
-- code -32800 is the convention for "Request cancelled."
if was_cancelled then
return _build_http_response("200 OK",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
jsonrpc_error(rid, -32800, "Request cancelled"),
session_id)
end
if not rok then
-- Internal dispatch error — surface as a JSON-RPC error response.
return _build_http_response("500 Internal Server Error",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
jsonrpc_error(nil, -32603, "Internal error: " .. tostring(result)),
session_id)
end
if not result then
-- Notification → 202 Accepted, no body.
return _build_http_response("202 Accepted",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
"", session_id)
end
-- Accept-aware response shape (re-checked at finalise time; survives
-- parking because conn.headers is captured by the closure scope).
local accept = conn.headers["accept"] or ""
if accept:find("text/event%-stream") then
local hdrs = _build_sse_headers(session_id)
return hdrs .. _sse_event(result)
end
return _build_http_response("200 OK",
{ ["Content-Type"] = "application/json",
["Access-Control-Allow-Origin"] = "*" },
result, session_id)
end
local function _dispatch_options(conn)
local acrh = conn.headers["access-control-request-headers"]
return _build_http_response("204 No Content", {
["Access-Control-Allow-Origin"] = "*",
["Access-Control-Allow-Methods"] = "GET, POST, DELETE, OPTIONS",
-- '*' does NOT cover Authorization per CORS spec; list explicitly.
["Access-Control-Allow-Headers"] = acrh and (acrh .. ", Authorization")
or "Content-Type, Accept, Authorization, Mcp-Session-Id, Mcp-Protocol-Version",
["Access-Control-Max-Age"] = "86400",
}, "", conn.session_id)
end
local function _dispatch_delete(self, conn)
if not conn.session_id or not self._sessions[conn.session_id] then
return _build_http_response("404 Not Found",
{ ["Content-Type"] = "text/plain",
["Access-Control-Allow-Origin"] = "*" },
"Session not found", nil)
end
local sess = self._sessions[conn.session_id]
if sess.sse_conn then
sess.sse_conn.state = "closing"
sess.sse_conn = nil
end
self._sessions[conn.session_id] = nil
return _build_http_response("204 No Content",
{ ["Access-Control-Allow-Origin"] = "*" }, "", nil)
end
-- ---- Main loop helpers ----
local function _conn_read(self, conn)
local chunk, err, partial = conn.sock:receive(8192)
local data = chunk or partial or ""
if data ~= "" then
if #conn.buf + #data > READ_BUF_CAP and conn.state == "reading_head" then
-- Header section too large.
conn.write_buf = _build_http_response("431 Request Header Fields Too Large",
{ ["Content-Type"] = "text/plain" }, "Headers too large", nil)
conn.state = "writing"
return
end
conn.buf = conn.buf .. data
end
if err == "closed" then
conn.state = "closing"
return
end
-- Advance FSM.
if conn.state == "reading_head" then
local ok, perr = _parse_request_head(conn)
if perr then
conn.write_buf = _build_http_response("400 Bad Request",
{ ["Content-Type"] = "text/plain" }, tostring(perr), nil)
conn.state = "writing"
return
end
if not ok then return end -- still waiting for full head
-- Headers parsed: auth check, session resolve.
if not _check_auth(self, conn) then
conn.write_buf = _build_http_response("401 Unauthorized",
{ ["Content-Type"] = "application/json",
["WWW-Authenticate"] = "Bearer" },
'{"error":"unauthorized"}', nil)
conn.state = "writing"
return
end
-- Session resolution happens at dispatch time (after body is read),
-- because `initialize` always mints a fresh session and we need to
-- know the method to distinguish 404 (unknown id, non-initialize)
-- from "auto-issue on initialize". Just stash the requested id.
conn.requested_sid = conn.headers["mcp-session-id"]
if conn.body_remain > 0 then
-- Any body bytes already in conn.buf land here.
conn.body = conn.buf:sub(1, conn.body_remain)
conn.buf = conn.buf:sub(#conn.body + 1)
conn.body_remain = conn.body_remain - #conn.body
conn.state = (conn.body_remain == 0) and "dispatching" or "reading_body"
else
conn.state = "dispatching"
end
end
if conn.state == "reading_body" then
if #conn.buf > 0 then
local take = math.min(conn.body_remain, #conn.buf)
conn.body = conn.body .. conn.buf:sub(1, take)
conn.buf = conn.buf:sub(take + 1)
conn.body_remain = conn.body_remain - take
end
if conn.body_remain == 0 then conn.state = "dispatching" end
end
if conn.state == "dispatching" then
local path = conn.path or ""
if not path:match("^/mcp") then
conn.write_buf = _build_http_response("404 Not Found",
{ ["Content-Type"] = "text/plain",
["Access-Control-Allow-Origin"] = "*" },
"Not Found", nil)
conn.state = "writing"
return
end
if conn.method == "OPTIONS" then
conn.write_buf = _dispatch_options(conn)
conn.state = "writing"
elseif conn.method == "DELETE" then
-- Resolve session: no auto-issue for DELETE; unknown sid → 404.
if not conn.requested_sid then
conn.write_buf = _build_http_response("400 Bad Request",
{ ["Content-Type"] = "text/plain",
["Access-Control-Allow-Origin"] = "*" },
"Mcp-Session-Id required for DELETE", nil)
conn.state = "writing"
return
end
conn.session_id = conn.requested_sid
conn.write_buf = _dispatch_delete(self, conn)
conn.state = "writing"
elseif conn.method == "GET" then
-- Resolve session (auto-issue if missing, 404 if unknown).
local sess, serr = _resolve_session(self, conn.requested_sid)
if not sess then
conn.write_buf = _build_http_response("404 Not Found",
{ ["Content-Type"] = "text/plain",
["Access-Control-Allow-Origin"] = "*" },
"Session not found: " .. tostring(conn.requested_sid), nil)
conn.state = "writing"
return
end
conn.session_id = sess.id
-- Persistent SSE stream. Enforce one per session.
if sess.sse_conn and sess.sse_conn ~= conn then
conn.write_buf = _build_http_response("409 Conflict",
{ ["Content-Type"] = "text/plain",
["Access-Control-Allow-Origin"] = "*" },
"Session already has an open SSE stream", nil)
conn.state = "writing"
return
end
sess.sse_conn = conn
local hdrs = _build_sse_headers(conn.session_id)
-- For backwards compat with the old SDK probe shape, emit a one-shot
-- 'endpoint' event so clients can self-discover; modern clients
-- ignore it and just await message events.
local endpoint_data = json.encode({
endpoint = "/mcp", sessionId = conn.session_id,
})
conn.write_buf = hdrs ..
"event: endpoint\r\ndata: " .. endpoint_data .. "\r\n\r\n"
conn.state = "sse_open"
conn.last_heart = os.time()
elseif conn.method == "POST" then
-- _dispatch_post may return nil (issue #20) if the handler
-- coroutine yielded. In that case it set conn.state =
-- "dispatching_async" itself and parked the coroutine.
local resp = _dispatch_post(self, conn)
if resp then
conn.write_buf = resp
-- _finalise_dispatch sets conn.state = "closing" for
-- cancelled requests (issue #11); only override if not.
if conn.state ~= "closing" then
conn.state = "writing"
end
end
-- else: conn already parked; scheduler tick will finalise.
else
conn.write_buf = _build_http_response("405 Method Not Allowed",
{ ["Content-Type"] = "text/plain",
["Allow"] = "GET, POST, DELETE, OPTIONS" },
"Method Not Allowed", nil)
conn.state = "writing"
end
end
end
local function _conn_write(conn)
if conn.write_buf == "" then
if conn.state == "writing" then conn.state = "closing" end
return
end
local sent, err, sent_partial = conn.sock:send(conn.write_buf)
if err == "closed" then
conn.state = "closing"
return
end
-- luasocket: on success returns last_byte_index_sent; on partial/timeout
-- returns (nil, "timeout"|"closed", last_byte_index_sent_so_far). The
-- second-return numeric is an absolute index into the original string.
local idx = sent or sent_partial or 0
if idx > 0 then
conn.write_buf = conn.write_buf:sub(idx + 1)
end
if conn.write_buf == "" and conn.state == "writing" then
conn.state = "closing"
end
end
local function _drain_notifications(self)
-- Global broadcast queue: fan out to every open sse_conn.
while #self._notify_queue > 0 do
local entry = table.remove(self._notify_queue, 1)
local payload = _sse_event(_encode_notify(entry))
for _, sess in pairs(self._sessions) do
if sess.sse_conn and sess.sse_conn.state == "sse_open" then
_queue_write(sess.sse_conn, payload)
end
end
end
-- Per-session queues: route to that session only.
for _, sess in pairs(self._sessions) do
while #sess.notify_q > 0 do
if not (sess.sse_conn and sess.sse_conn.state == "sse_open") then
break -- no live SSE; leave queued (or expire policy could drop)
end
local entry = table.remove(sess.notify_q, 1)
_queue_write(sess.sse_conn, _sse_event(_encode_notify(entry)))
end
end
end
local function _heartbeat_tick(self)
local now = os.time()
-- Heartbeats on open SSE conns.
for _, sess in pairs(self._sessions) do
local conn = sess.sse_conn
if conn and conn.state == "sse_open"
and now - conn.last_heart >= HEARTBEAT_SEC then
_queue_write(conn, ": heartbeat\r\n\r\n")
conn.last_heart = now
end
end
-- Idle session expiry: no SSE + no activity for SESSION_IDLE_SEC.
for sid, sess in pairs(self._sessions) do
if sess.sse_conn == nil and now - sess.last_activity > SESSION_IDLE_SEC then
self._sessions[sid] = nil
end
end
end
-- Issue #20 — scheduler tick. Resume any parked dispatch coroutine whose
-- wake_at has passed. On completion, build the deferred response and
-- queue it for write. If the connection died while the handler was
-- parked, drop the coroutine.
--
-- gettime() is wall-clock (luasocket uses gettimeofday) — NOT monotonic.
-- A large NTP step backwards could delay resumes; forwards could bunch
-- them. Acceptable for the deployment fleet (chrony slews); revisit if
-- a use case appears that needs CLOCK_MONOTONIC.
local function _scheduler_tick(self)
if not self._pending_handlers[1] then return end
local socket = require("socket")
local now = socket.gettime()
local i = 1
while i <= #self._pending_handlers do
local p = self._pending_handlers[i]
if p.conn.state == "closing" then
-- Connection died mid-handler; drop the coroutine entirely
-- and free its ctx entry (issue #11 cleanup discipline).
_ctx_by_co[p.co] = nil
if p.conn.dispatch_id ~= nil then
self._cancelled_ids[tostring(p.conn.dispatch_id)] = nil
end
table.remove(self._pending_handlers, i)
elseif now >= p.wake_at then
-- Time to resume. Remove from pending BEFORE resume so a
-- re-yielding handler re-adds itself cleanly via _drive_handler_co.
table.remove(self._pending_handlers, i)
local rok, ryield = coroutine.resume(p.co)
if coroutine.status(p.co) == "dead" then
local resp = _finalise_dispatch(self, p.conn, rok, ryield, p.co)
p.conn.write_buf = (p.conn.write_buf or "") .. resp
-- _finalise_dispatch may set conn.state = "closing" for
-- cancelled requests; only transition to writing if it
-- didn't already pick the closing path.
if p.conn.state ~= "closing" then
p.conn.state = "writing"
end
else
-- Yielded again — re-park.
local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0
self._pending_handlers[#self._pending_handlers + 1] = {
co = p.co, conn = p.conn, wake_at = wake_at,
}
end
else
i = i + 1
end
end
end
-- Returns the earliest pending wake_at as an offset from now, or nil if
-- no handlers are parked. Used to tighten the select() timeout so the
-- scheduler wakes on the right beat.
local function _next_pending_delay(self)
if not self._pending_handlers[1] then return nil end
local socket = require("socket")
local now = socket.gettime()
local earliest = math.huge
for _, p in ipairs(self._pending_handlers) do
if p.wake_at < earliest then earliest = p.wake_at end
end
local d = earliest - now
if d < 0 then return 0 end
return d
end
-- ---- Public: server-initiated request (for sampling/roots/etc.) ----
-- Enqueues a JSON-RPC request on the session's SSE stream. The callback
-- fires when the client POSTs back the response (matched by id).
function lmcp:server_request(session_id, method, params, on_response)
local sess = self._sessions[session_id]
if not sess or not sess.sse_conn then
return false, "no live SSE stream for session " .. tostring(session_id)
end
self._server_req_id = (self._server_req_id or 0) + 1
local id = "srv-" .. self._server_req_id
sess.pending[id] = on_response
local msg = { jsonrpc = JSONRPC, id = id, method = method }
-- Omit params if nil OR an empty Lua table (would JSON-encode as []
-- per project_json_empty_table_gotcha memory). Real params with at
-- least one key encode correctly as an object.
if params ~= nil and (type(params) ~= "table" or next(params)) then
msg.params = params
end
sess.notify_q[#sess.notify_q + 1] = msg
return true, id
end
-- Sampling (MCP issue #9): ask the client's LLM to generate text. Returns
-- (true, request_id) if dispatched, (false, err) otherwise. `on_response`
-- is called with the client's JSON-RPC response shape:
-- { result = { role, content = { type = "text", text = "..." }, model, stopReason? } }
-- or { error = { code, message } }.
--
-- Today this is fire-and-forget — tool handlers cannot block waiting for
-- the response in the single-threaded event loop (see follow-up #20). A
-- tool may kick off sampling and return immediately; the callback fires
-- when the client posts the response back.
--
-- opts shape (matches MCP spec):
-- messages = { { role = "user"|"assistant", content = {type, text} }, ... }
-- modelPreferences? = { hints?, intelligencePriority?, ... }
-- systemPrompt? = string
-- includeContext? = "none"|"thisServer"|"allServers"
-- temperature? = number
-- maxTokens = integer (required)
-- stopSequences? = list of strings
function lmcp:sample(session_id, opts, on_response)
if not (self._client_caps.sampling) then
return false, "client did not advertise sampling capability"
end
if type(opts) ~= "table" or type(opts.messages) ~= "table"
or type(opts.maxTokens) ~= "number" then
return false, "sample: opts.messages (table) and opts.maxTokens (number) required"
end
return self:server_request(session_id, "sampling/createMessage", opts, on_response)
end
-- Roots (MCP issue #10): ask the client which filesystem/URL roots are
-- in scope for this session. Async like sample(); on_fetched(roots_list,
-- err) fires when the client responds. Result is also cached on
-- self._roots_cache[session_id] for later sync lookups.
--
-- Client→server `notifications/roots/list_changed` invalidates the cache;
-- next call to :roots() re-fetches.
function lmcp:roots(session_id, on_fetched)
if not (self._client_caps.roots) then
return false, "client did not advertise roots capability"
end
return self:server_request(session_id, "roots/list", {}, function(resp)
if resp.error then
if on_fetched then on_fetched(nil, resp.error.message or "rpc error") end
return
end
local list = resp.result and resp.result.roots or {}
self._roots_cache[session_id] = {
roots = list, fetched = os.time(),
}
if on_fetched then on_fetched(list, nil) end
end)
end
-- Synchronous lookup of the cached roots. Returns the list (possibly
-- empty) if previously fetched, or nil if no :roots() call has completed
-- for this session yet.
function lmcp:roots_cached(session_id)
local entry = self._roots_cache[session_id]
return entry and entry.roots or nil
end
-- Synchronous helper: is `path` (a file:// URI or absolute filesystem
-- path) within any cached root for this session? Returns:
-- true → matches at least one root
-- false → cache is populated but path is outside all roots
-- nil → no roots cached yet; caller should :roots() first
function lmcp:path_in_roots(session_id, path)
local roots = self:roots_cached(session_id)
if not roots then return nil end
-- Normalise: treat file:// URIs and bare paths uniformly.
local norm = path:gsub("^file://", "")
for _, r in ipairs(roots) do
local root_path = (r.uri or ""):gsub("^file://", "")
if root_path ~= "" and norm:sub(1, #root_path) == root_path then
return true
end
end
return false
end
function lmcp:run()
local socket = require("socket")
local server_sock = assert(socket.bind(self.host, self.port))
server_sock:settimeout(0)
self._conns = {}
self._sessions = self._sessions or {}
local addr, port = server_sock:getsockname()
io.stderr:write(string.format("lmcp: %s v%s listening on %s:%d/mcp\n",
self.name, self.version, addr, port))
while true do
-- Build select watch lists.
local reads, writes = { server_sock }, {}
for sock, conn in pairs(self._conns) do
if conn.state == "reading_head" or conn.state == "reading_body"
or conn.state == "sse_open" then
reads[#reads + 1] = sock
end
if conn.write_buf ~= "" then
writes[#writes + 1] = sock
end
end
-- Tighten select timeout if a parked handler is due sooner.
-- Otherwise a 100ms tick adds 100ms latency to short shell-tool runs.
local select_timeout = SELECT_TIMEOUT
local next_pend = _next_pending_delay(self)
if next_pend and next_pend < select_timeout then
select_timeout = next_pend
end
local ready_r, ready_w = socket.select(reads, writes, select_timeout)
for _, sock in ipairs(ready_r or {}) do
if sock == server_sock then
local new_sock, aerr = server_sock:accept()
if new_sock then
new_sock:settimeout(0)
self._conns[new_sock] = {
sock = new_sock, state = "reading_head",
buf = "", body = "", headers = {},
method = nil, path = nil,
body_remain = 0, write_buf = "",
session_id = nil, last_heart = os.time(),
}
elseif aerr and aerr ~= "timeout" then
io.stderr:write("lmcp: accept error: " .. tostring(aerr) .. "\n")
end
else
local conn = self._conns[sock]
if conn then
local ok, rerr = pcall(_conn_read, self, conn)
if not ok then
io.stderr:write("lmcp: read error: " .. tostring(rerr) .. "\n")
conn.state = "closing"
end
end
end
end
for _, sock in ipairs(ready_w or {}) do
local conn = self._conns[sock]
if conn then
local ok, werr = pcall(_conn_write, conn)
if not ok then
io.stderr:write("lmcp: write error: " .. tostring(werr) .. "\n")
conn.state = "closing"
end
end
end
-- Per-tick maintenance.
_drain_notifications(self)
_heartbeat_tick(self)
_scheduler_tick(self) -- issue #20: resume due dispatch coroutines
-- After draining, attempt immediate writes on conns whose write_buf
-- just got bytes (so list_changed / heartbeat / async-completed
-- responses appear within one tick).
for sock, conn in pairs(self._conns) do
if conn.write_buf ~= "" and conn.state ~= "closing" then
pcall(_conn_write, conn)
end
end
-- Sweep closing conns.
for sock, conn in pairs(self._conns) do
if conn.state == "closing" then
-- Detach from session if it was the sse_conn.
if conn.session_id then
local sess = self._sessions[conn.session_id]
if sess and sess.sse_conn == conn then
sess.sse_conn = nil
sess.last_activity = os.time()
end
end
pcall(sock.close, sock)
self._conns[sock] = nil
end
end
end
end
-- ---- stdio transport (MCP issue #15) ----
-- Line-delimited JSON-RPC: one message per line on stdin, one response
-- line per request on stdout, diagnostics on stderr. EOF closes cleanly.
-- Does NOT require luasocket — handle_request is transport-agnostic.
-- Bearer auth bypassed: stdio means the parent process is the trust
-- boundary.
function lmcp:run_stdio()
-- Default stdout buffering on a pipe is "full" — a response would
-- sit in the buffer until it fills, deadlocking the MCP client.
-- Set "no" once + per-write flush belt-and-braces.
io.stdout:setvbuf("no")
io.stderr:write(string.format(
"lmcp: %s v%s serving stdio\n", self.name, self.version))
for line in io.stdin:lines() do
if line ~= "" then
-- pcall the whole body so a transient error (malformed JSON,
-- handler bug, exotic pipe state) doesn't crash the loop.
local body_ok, body_err = pcall(function()
local parse_ok, req = pcall(json.decode, line)
local response
if not parse_ok then
response = jsonrpc_error(nil, -32700, "Parse error")
else
response = self:handle_request(req)
end
if type(response) == "string" then
io.stdout:write(response, "\n")
io.stdout:flush()
elseif response ~= nil then
io.stderr:write(
"lmcp: handler returned non-string ("
.. type(response) .. "); dropped\n")
end
end)
if not body_ok then
io.stderr:write("lmcp: stdio loop error: "
.. tostring(body_err) .. "\n")
end
end
end
end
return lmcp