-- lmcp.lua — Lightweight MCP server in pure Lua -- Zero external dependencies (uses built-in socket or luasocket) -- SPDX-License-Identifier: MIT local json = require('json') local lmcp = {} lmcp.__index = lmcp -- Module-level coroutine→ctx registry (issue #11). Weak keys so -- coroutines that die without explicit cleanup get GC'd out. -- Each ctx table carries a `server` back-reference, so any code with -- a coroutine handle can find both ctx and its owning lmcp instance. local _ctx_by_co = setmetatable({}, { __mode = "k" }) -- server.lua and any other library code can call lmcp.current_ctx() to -- access the ctx of the currently-running dispatch coroutine. Returns -- nil outside coroutine context. Used by server.lua:run() to do -- transparent auto-cancellation of long-running shell-out polls. function lmcp.current_ctx() local co = coroutine.running() if co == nil then return nil end return _ctx_by_co[co] end -- Read auth token from config file if present local function read_conf(path) local conf = {} local f = io.open(path, 'r') if not f then return conf end for line in f:lines() do local k, v = line:match('^%s*(%S+)%s*=%s*(.-)%s*$') if k and not k:match('^#') then conf[k] = v end end f:close() return conf end -- Protocol constants local MCP_VERSION = "2025-06-18" local JSONRPC = "2.0" function lmcp.new(name, opts) opts = opts or {} local self = setmetatable({}, lmcp) self.name = name or "lmcp" self.version = opts.version or "0.1.0" self.host = opts.host or "0.0.0.0" self.port = opts.port or 8080 self.tools = {} -- Resources primitive (MCP 2025-06-18 §Server/Resources). Storage is -- always present; capability is advertised iff `opts.resources` is -- truthy OR at least one resource/template has been registered by -- initialize time. The opt-in covers servers that register resources -- after :run() — strict clients cache the capability set from -- initialize and won't call resources/list otherwise. self.resources = {} self.resource_templates = {} self._force_resources_cap = opts.resources and true or false -- Prompts primitive (MCP 2025-06-18 §Server/Prompts). Same capability -- discipline as resources: advertised iff at least one is registered -- OR opts.prompts forces it (strict clients cache the capability set -- from initialize and won't call prompts/list otherwise). self.prompts = {} self._force_prompts_cap = opts.prompts and true or false -- Completion (MCP issue #7). Keyed by "ref_type:ref_id:arg_name". -- ref_type ∈ {"ref/prompt", "ref/resource"}, ref_id is the prompt -- name or resource-template uriTemplate, arg_name is the parameter -- whose value the client wants completions for. self.completions = {} self._force_completions_cap = opts.completions and true or false -- Logging (MCP issue #8). RFC-5424 severity levels in ascending order. -- Client sets minimum level via logging/setLevel; messages below are -- dropped. Default level "warning" until the client picks one. Capability -- is opt-in via opts.logging (servers that want a structured log channel -- must declare it at construction; we don't presume). self._log_level = "warning" self._force_logging_cap = opts.logging and true or false -- Client capabilities captured at initialize time (MCP issue #9). -- Used to guard server-initiated requests (sampling, roots) — we don't -- issue them unless the client claimed support during handshake. self._client_caps = {} self._client_info = {} -- Roots cache (MCP issue #10), keyed by session_id. Populated when the -- server calls `:roots(session_id, ...)`; invalidated when the client -- sends notifications/roots/list_changed. self._roots_cache = {} -- Pending handler coroutines (issue #20 — concurrent dispatch). -- Each entry: { co, conn, wake_at, finalise }. The scheduler tick -- resumes any whose wake_at has passed and runs `finalise` on the -- coroutine's return value to build the deferred response. self._pending_handlers = {} -- Cancellation flags (issue #11). Keyed by stringified JSON-RPC -- request id. Only ever holds in-flight ids — see the -- notifications/cancelled handler in handle_request which checks -- for in-flight before inserting. Cleared by _finalise_dispatch. self._cancelled_ids = {} -- Notification queue: drained by Streamable HTTP transport (issue #16). -- Today delivery is a no-op; we still enqueue so the emission code -- path is exercised. Capped + deduped to keep the queue useful. self._notify_queue = {} self._notify_cap = 100 self._session_id = nil -- Auth: explicit opt > conf file > LMCP_TOKEN env > nil (no auth) if opts.auth_token then self._auth_token = opts.auth_token elseif opts.conf then local conf = read_conf(opts.conf) self._auth_token = conf['.godparticle'] else local env_token = os.getenv("LMCP_TOKEN") if env_token and env_token ~= "" then self._auth_token = env_token end end return self end -- Cursor pagination helper for list methods (MCP issue #12). Cursor is -- an opaque base64 string per spec; we use it to encode the next offset. -- Page size default 50 covers every plausible lmcp deployment today; -- larger registered sets are still handled correctly. local _PAGE_SIZE = 50 local function paginate(items, cursor) local n = #items -- Decode incoming cursor. Malformed → start from 0. local offset = 0 if type(cursor) == "string" and cursor ~= "" then local mime_ok, mime = pcall(require, "mime") if mime_ok then local decoded = mime.unb64(cursor) or "" local parsed = tonumber(decoded) if parsed and parsed >= 0 and parsed <= n then offset = math.floor(parsed) end end end local page = {} local stop = math.min(offset + _PAGE_SIZE, n) for i = offset + 1, stop do page[#page + 1] = items[i] end local next_cursor if stop < n then local mime_ok, mime = pcall(require, "mime") if mime_ok then next_cursor = (mime.b64(tostring(stop)) or ""):gsub("[\r\n]", "") end end return page, next_cursor end -- Register a tool. -- opts (optional, 5th arg): -- annotations = { title?, readOnlyHint?, destructiveHint?, -- idempotentHint?, openWorldHint? } -- outputSchema = -- shape of structuredContent (issue #13) -- Handler signature: function(args, ctx) where ctx = { _meta = … } from -- the request. ctx is optional — existing 1-arg handlers keep working. -- Handler return shapes: -- string → single text content block (no structured) -- { type = "...", ... } → typed content block (image/etc.); no structured -- table without `type` → JSON-encoded into text content AND mirrored as -- structuredContent (issue #13; spec-strict clients get first-class -- structured access) function lmcp:tool(name, description, params_schema, handler, opts) -- Normalise empty inputSchema.properties → nil. JSON Schema allows -- omitting `properties` on a `type: "object"` schema (means "any -- object, no constraints"). Without this, an empty Lua properties -- table goes through json.lua's is_array → emitted as `[]` → -- spec-strict clients (Zod et al.) reject with -- `expected: record, received: array`. The same gotcha already -- bit `ping` in v1.0.0-rc1 (project_json_empty_table_gotcha -- memory). v1.1.1 fix. local schema = params_schema or { type = "object" } if type(schema.properties) == "table" and next(schema.properties) == nil then -- Clone the schema and drop the empty `properties` key. Avoids -- mutating the caller's table (in case they re-use it across -- registrations). local clean = {} for k, v in pairs(schema) do if k ~= "properties" then clean[k] = v end end schema = clean end self.tools[name] = { name = name, description = description, inputSchema = schema, handler = handler, annotations = opts and opts.annotations or nil, outputSchema = opts and opts.outputSchema or nil, } return self end -- Register a resource (exact URI). opts: { name, description?, mimeType? }. -- Handler signature: function(args) — args is always a table (empty for -- literal resources, populated with template captures for templates). function lmcp:resource(uri, opts, handler) opts = opts or {} if type(uri) ~= "string" or uri == "" then error("resource: uri required") end if type(handler) ~= "function" then error("resource: handler required") end self.resources[uri] = { uri = uri, name = opts.name or uri, description = opts.description, mimeType = opts.mimeType, handler = handler, } self:notify_resources_changed() return self end -- Register a resource template (RFC 6570 subset). Each {name} captures -- one greedy segment. opts: { name, description?, mimeType? }. -- Handler signature: function(args) — args[name] = captured_string. -- Limitation: adjacent captures ({a}{b}) bind ambiguously; register -- separate resources if you need precision. function lmcp:resource_template(uriTemplate, opts, handler) opts = opts or {} if type(uriTemplate) ~= "string" or uriTemplate == "" then error("resource_template: uriTemplate required") end if type(handler) ~= "function" then error("resource_template: handler required") end -- Compile template → Lua pattern + arg-name list. local arg_names = {} -- Escape every Lua-pattern magic char EXCEPT {} (handled separately). local escaped = uriTemplate:gsub("([%%%(%)%.%+%-%*%?%[%]%^%$])", "%%%1") local pattern = escaped:gsub("{([%w_]+)}", function(name) arg_names[#arg_names + 1] = name return "(.+)" end) pattern = "^" .. pattern .. "$" self.resource_templates[#self.resource_templates + 1] = { uriTemplate = uriTemplate, name = opts.name or uriTemplate, description = opts.description, mimeType = opts.mimeType, pattern = pattern, arg_names = arg_names, handler = handler, } self:notify_resources_changed() return self end -- Internal: enqueue a parameterless list_changed notification, with tail -- dedup (consecutive notifications of the same kind collapse — they carry -- no state, so N → 1 "go refetch"). Cap is a backstop, not the policy. -- params omitted on purpose (json.lua empty-table → [] gotcha, would be -- malformed JSON-RPC; spec allows omitting params for parameterless). local function _enqueue_list_changed(self, method) local tail = self._notify_queue[#self._notify_queue] if tail and tail.method == method then return end if #self._notify_queue >= self._notify_cap then table.remove(self._notify_queue, 1) end self._notify_queue[#self._notify_queue + 1] = { jsonrpc = JSONRPC, method = method, } end function lmcp:notify_resources_changed() _enqueue_list_changed(self, "notifications/resources/list_changed") end function lmcp:notify_prompts_changed() _enqueue_list_changed(self, "notifications/prompts/list_changed") end -- Resolve a URI to (resource_or_template_entry, args). Literal match wins -- over templates; templates tried in registration order. local function _resolve_resource(self, uri) local lit = self.resources[uri] if lit then return lit, {} end for _, t in ipairs(self.resource_templates) do local captures = { string.match(uri, t.pattern) } if captures[1] then local args = {} for i, name in ipairs(t.arg_names) do args[name] = captures[i] end return t, args end end return nil end -- Run a resource handler under pcall, normalise the return into a single -- contents item. Returns (item_table, nil) on success, (nil, err_msg) -- on failure. local function _read_resource(entry, args, uri) local ok, result = pcall(entry.handler, args) if not ok then return nil, "resource handler error: " .. tostring(result) end if result == nil then return nil, "resource handler returned no content" end if type(result) == "string" then return { uri = uri, mimeType = entry.mimeType or "text/plain", text = result } end if type(result) == "table" then local mt = result.mimeType or entry.mimeType if result.text ~= nil then return { uri = uri, mimeType = mt or "text/plain", text = result.text } end if result.blob ~= nil then return { uri = uri, mimeType = mt or "application/octet-stream", blob = result.blob } end if result.blob_bytes ~= nil then local mime_ok, mime = pcall(require, "mime") if not mime_ok then return nil, "mime module unavailable; pre-encode blob and return { blob = … }" end local b64 = mime.b64(result.blob_bytes) or "" b64 = b64:gsub("[\r\n]", "") -- some luasocket builds line-wrap return { uri = uri, mimeType = mt or "application/octet-stream", blob = b64 } end end return nil, "resource handler returned unsupported shape" end -- Register a prompt (MCP issue #6). opts: { description?, arguments? } -- where arguments is a list of { name, description?, required? }. -- Handler signature: function(args) where args[name] = supplied string. -- Return either: -- string → single user text message -- { description?, messages = {...} } → full custom shape (passthrough) function lmcp:prompt(name, opts, handler) opts = opts or {} if type(name) ~= "string" or name == "" then error("prompt: name required") end if type(handler) ~= "function" then error("prompt: handler required") end self.prompts[name] = { name = name, description = opts.description, arguments = opts.arguments, -- list of { name, description?, required? } handler = handler, } self:notify_prompts_changed() return self end -- Run a prompt handler under pcall, normalise the return to the spec -- shape { description?, messages = [{ role, content = { type, text } }] }. -- Returns (table, nil) on success, (nil, err_msg) on failure. local function _get_prompt(entry, args) local ok, result = pcall(entry.handler, args or {}) if not ok then return nil, "prompt handler error: " .. tostring(result) end if result == nil then return nil, "prompt handler returned no content" end if type(result) == "string" then return { description = entry.description, messages = {{ role = "user", content = { type = "text", text = result }, }}, } end if type(result) == "table" and type(result.messages) == "table" then return result end return nil, "prompt handler returned unsupported shape (expected string or { messages = … })" end -- Register a completion handler for a prompt/resource-template argument -- (MCP issue #7). ref_type ∈ {"ref/prompt", "ref/resource"}; ref_id is -- the prompt name or the resource-template uriTemplate. fn signature: -- fn(value, ctx) → list of candidate strings (server filters / sorts as -- it likes; spec allows up to 100). ctx mirrors the spec context object -- (currently { arguments = {...} } of previously-completed sibling args). function lmcp:complete(ref_type, ref_id, arg_name, fn) if ref_type ~= "ref/prompt" and ref_type ~= "ref/resource" then error("complete: ref_type must be 'ref/prompt' or 'ref/resource'") end if type(ref_id) ~= "string" or ref_id == "" then error("complete: ref_id required") end if type(arg_name) ~= "string" or arg_name == "" then error("complete: arg_name required") end if type(fn) ~= "function" then error("complete: fn required") end self.completions[ref_type .. ":" .. ref_id .. ":" .. arg_name] = fn return self end -- Log severity ordering (RFC 5424). Lower index = more severe. local LOG_LEVELS = { emergency = 1, alert = 2, critical = 3, error = 4, warning = 5, notice = 6, info = 7, debug = 8, } -- Emit a structured log record. Below the client-set level → drop. -- Today's delivery channel is stderr; once issue #16 lands the -- bidirectional transport, this also enqueues notifications/message -- for the client. data is free-form (string, table, etc.). function lmcp:log(level, logger, data) local lvl = LOG_LEVELS[level] local thr = LOG_LEVELS[self._log_level] or LOG_LEVELS.warning if not lvl or lvl > thr then return end -- below threshold; drop -- stderr fallback: human-readable. The structured form goes on the -- notifications queue for the future Streamable-HTTP delivery path -- (issue #16). Cap + drop-oldest like list_changed. io.stderr:write(string.format("lmcp[%s/%s]: %s\n", level, tostring(logger or "-"), type(data) == "string" and data or json.encode(data))) if #self._notify_queue >= self._notify_cap then table.remove(self._notify_queue, 1) end self._notify_queue[#self._notify_queue + 1] = { jsonrpc = JSONRPC, method = "notifications/message", params = { level = level, logger = logger, data = data }, } end -- JSON-RPC response helpers local function jsonrpc_result(id, result) return json.encode({ jsonrpc = JSONRPC, id = id, result = result }) end local function jsonrpc_error(id, code, message) return json.encode({ jsonrpc = JSONRPC, id = id, error = { code = code, message = message }, }) end -- Handle a single JSON-RPC request function lmcp:handle_request(req) local method = req.method local id = req.id -- nil for notifications -- JSON-RPC 2.0: notifications (no id) MUST NOT receive a response. -- Some notifications carry server-side side effects (cache invalidation, -- progress signals); handle those before the early return. Anything -- not recognised silently drops — clients expect no response either way. if id == nil then if method == "notifications/roots/list_changed" then -- Invalidate cached roots for the session that sent this. if req._session_id then self._roots_cache[req._session_id] = nil end elseif method == "notifications/cancelled" then -- Issue #11 — flip cancel flag for the named request id, -- but ONLY if the request is actually in-flight. Cancels -- for unknown/already-completed ids drop silently (per Phase -- 5 review fix #2 — prevents unbounded map growth). local rid = (req.params or {}).requestId if rid ~= nil then local rid_str = tostring(rid) local in_flight = false -- Scan _ctx_by_co for a matching live request. for _, c in pairs(_ctx_by_co) do if c.request_id ~= nil and tostring(c.request_id) == rid_str then in_flight = true; break end end if in_flight then self._cancelled_ids[rid_str] = true end end end -- (Other client→server notifications drop silently.) return nil end if method == "initialize" then self._session_id = self._session_id or tostring(os.time()) -- Capture client capabilities (MCP issue #9 — sampling needs to -- know if the client supports it before issuing a request). local p = req.params or {} self._client_caps = p.capabilities or {} self._client_info = p.clientInfo or {} local caps = { tools = { listChanged = false } } if self._force_resources_cap or next(self.resources) or self.resource_templates[1] then caps.resources = { listChanged = true, subscribe = false } end if self._force_prompts_cap or next(self.prompts) then caps.prompts = { listChanged = true } end if self._force_completions_cap or next(self.completions) then -- Spec uses an empty object as the "supported" marker. -- json.empty_object → {} (not [] from the empty-table gotcha). caps.completions = json.empty_object end if self._force_logging_cap then caps.logging = json.empty_object end return jsonrpc_result(id, { protocolVersion = MCP_VERSION, capabilities = caps, serverInfo = { name = self.name, version = self.version, }, }) elseif method == "ping" then return jsonrpc_result(id, json.empty_object) elseif method == "tools/list" then local tool_list = {} for _, t in pairs(self.tools) do local entry = { name = t.name, description = t.description, inputSchema = t.inputSchema, } -- Emit annotations / outputSchema only when registered. Empty -- Lua tables would JSON-encode as [] (see -- project_json_empty_table_gotcha memory) and break -- spec-strict clients. if t.annotations then entry.annotations = t.annotations end if t.outputSchema then entry.outputSchema = t.outputSchema end tool_list[#tool_list + 1] = entry end local page, next_cursor = paginate(tool_list, (req.params or {}).cursor) local result = { tools = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "tools/call" then local params = req.params or {} local tool_name = params.name local arguments = params.arguments or {} local tool = self.tools[tool_name] if not tool then return jsonrpc_error(id, -32601, "Tool not found: " .. tostring(tool_name)) end -- ctx exposes the request's _meta (issue #13), the session_id -- (issue #9 — handlers can call self:sample(ctx.session_id, …)), -- progress() and cancelled() (issue #11), and a `server` back-ref -- (so lmcp.current_ctx() can find the right server instance -- without a singleton). Handlers that don't declare a second -- parameter ignore it (Lua call discards extras). local rid_str = tostring(id) local ptoken = (params._meta or {}).progressToken -- nil if absent local ctx ctx = { _meta = params._meta, request_id = id, session_id = req._session_id, server = self, -- progress(p, total?, message?): emits notifications/progress -- on session's notify_q. No-op if client didn't supply a -- progressToken. Type-checks; rejects non-numeric progress. progress = function(p, total, message) if ptoken == nil then return false end if type(p) ~= "number" then return false end if total ~= nil and type(total) ~= "number" then return false end local sess = self._sessions[req._session_id] if not sess then return false end local np = { progressToken = ptoken, progress = p } if total ~= nil then np.total = total end if message ~= nil then np.message = tostring(message) end sess.notify_q[#sess.notify_q + 1] = { jsonrpc = JSONRPC, method = "notifications/progress", params = np, } return true end, -- cancelled(): true if a notifications/cancelled for this -- request id has been received. cancelled = function() return self._cancelled_ids[rid_str] == true end, } -- Register on the currently-running coroutine so lmcp.current_ctx() -- (and thus server.lua:run()'s auto-cancel) can find this ctx. -- Pure-Lua handlers also get this registration; harmless. local co = coroutine.running() if co ~= nil then _ctx_by_co[co] = ctx end -- Pre-handler cancellation short-circuit (Phase 5 review fix #9). -- If a notifications/cancelled landed for this id before dispatch -- reached here, skip the handler entirely. _finalise_dispatch -- will see `not result` and suppress the response. if self._cancelled_ids[rid_str] then return nil end local ok, result = pcall(tool.handler, arguments, ctx) if ok then local resp = { isError = false } local meta_out if type(result) == "string" then resp.content = {{ type = "text", text = result }} elseif type(result) == "table" and result.type then -- Typed content block (e.g. image). No structured emission. resp.content = { result } elseif type(result) == "table" then -- Issue #13: extract response _meta before mirroring as -- structuredContent, so server metadata doesn't leak into -- the structured payload. meta_out = result._meta local clean = result if meta_out ~= nil then clean = {} for k, v in pairs(result) do if k ~= "_meta" then clean[k] = v end end end resp.content = {{ type = "text", text = json.encode(clean) }} resp.structuredContent = clean else resp.content = {{ type = "text", text = tostring(result) }} end if meta_out ~= nil then resp._meta = meta_out end return jsonrpc_result(id, resp) else return jsonrpc_result(id, { content = {{ type = "text", text = "Error: " .. tostring(result) }}, isError = true, }) end elseif method == "resources/list" then local out = {} for _, r in pairs(self.resources) do out[#out + 1] = { uri = r.uri, name = r.name, description = r.description, mimeType = r.mimeType, } end local page, next_cursor = paginate(out, (req.params or {}).cursor) local result = { resources = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "resources/templates/list" then local out = {} for _, t in ipairs(self.resource_templates) do out[#out + 1] = { uriTemplate = t.uriTemplate, name = t.name, description = t.description, mimeType = t.mimeType, } end local page, next_cursor = paginate(out, (req.params or {}).cursor) local result = { resourceTemplates = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "resources/read" then local params = req.params or {} local uri = params.uri if type(uri) ~= "string" or uri == "" then return jsonrpc_error(id, -32602, "uri required (string)") end local entry, args = _resolve_resource(self, uri) if not entry then -- -32002 is the MCP-conventional "Resource not found" code; -- distinct from -32602 "Invalid params" so retry/UX logic -- can tell a malformed URI from a missing one. return jsonrpc_error(id, -32002, "Resource not found: " .. uri) end local item, err = _read_resource(entry, args, uri) if not item then return jsonrpc_error(id, -32603, err) end return jsonrpc_result(id, { contents = { item } }) elseif method == "prompts/list" then local out = {} for _, p in pairs(self.prompts) do local entry = { name = p.name, description = p.description } if p.arguments then entry.arguments = p.arguments end out[#out + 1] = entry end local page, next_cursor = paginate(out, (req.params or {}).cursor) local result = { prompts = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "prompts/get" then local params = req.params or {} local name = params.name if type(name) ~= "string" or name == "" then return jsonrpc_error(id, -32602, "name required (string)") end local entry = self.prompts[name] if not entry then return jsonrpc_error(id, -32002, "Prompt not found: " .. name) end local result, err = _get_prompt(entry, params.arguments) if not result then return jsonrpc_error(id, -32603, err) end return jsonrpc_result(id, result) elseif method == "logging/setLevel" then local lvl = (req.params or {}).level if type(lvl) ~= "string" or not LOG_LEVELS[lvl] then return jsonrpc_error(id, -32602, "level must be one of: debug, info, notice, warning, error, critical, alert, emergency") end self._log_level = lvl return jsonrpc_result(id, json.empty_object) elseif method == "completion/complete" then local params = req.params or {} local ref = params.ref or {} local arg = params.argument or {} local ref_id = ref.name or ref.uri or ref.uriTemplate or "" if type(ref.type) ~= "string" or ref_id == "" or type(arg.name) ~= "string" then return jsonrpc_error(id, -32602, "ref.type, ref.name/uri/uriTemplate, and argument.name required") end local fn = self.completions[ref.type .. ":" .. ref_id .. ":" .. arg.name] if not fn then -- No completer registered → return empty values (spec-allowed; -- clients typically render no suggestions and let the user type). return jsonrpc_result(id, { completion = { values = {}, hasMore = false }, }) end local value = arg.value or "" local ok, values = pcall(fn, value, params.context or {}) if not ok then return jsonrpc_error(id, -32603, "completion handler error: " .. tostring(values)) end if type(values) ~= "table" then return jsonrpc_error(id, -32603, "completion handler must return a table of strings") end -- Spec cap: at most 100 values per response. If more, truncate -- and set hasMore=true so the client knows there's more. local total = #values local has_more = false if total > 100 then local out = {} for i = 1, 100 do out[i] = values[i] end values = out has_more = true end return jsonrpc_result(id, { completion = { values = values, total = total, hasMore = has_more }, }) else return jsonrpc_error(id, -32601, "Method not found: " .. tostring(method)) end end -- ---- HTTP Server (raw sockets) ---- local function parse_http_request(client) -- Read request line local line, err = client:receive('*l') if not line then return nil, err end local method, path, version = line:match('^(%S+)%s+(%S+)%s+(%S+)') if not method then return nil, 'bad request line' end -- Read headers local headers = {} while true do line, err = client:receive('*l') if not line or line == '' then break end local k, v = line:match('^(%S+):%s*(.*)') if k then headers[k:lower()] = v end end -- Read body local body = '' local content_length = tonumber(headers['content-length'] or 0) if content_length > 0 then body, err = client:receive(content_length) if not body then return nil, err end end return { method = method, path = path, version = version, headers = headers, body = body, } end local function send_response(client, status, headers, body) local parts = { string.format('HTTP/1.1 %s', status) } headers['Content-Length'] = tostring(#body) headers['Connection'] = 'close' for k, v in pairs(headers) do parts[#parts + 1] = k .. ': ' .. v end parts[#parts + 1] = '' parts[#parts + 1] = body client:send(table.concat(parts, '\r\n')) end local function send_sse_event(client, data) client:send('event: message\r\ndata: ' .. data .. '\r\n\r\n') end -- ---- Streamable HTTP transport (MCP issue #16) ---- -- -- select()-based single-thread event loop. All sockets non-blocking. -- Per-connection FSM: reading_head → reading_body → dispatching → writing | sse_open. -- -- Session model: each session has a Mcp-Session-Id; at most one open -- SSE stream (the GET /mcp connection). Server-initiated requests -- (sampling, roots — issues #9/#10) ride on the SSE stream and await -- client responses via subsequent POSTs. -- -- Queue routing: -- self._notify_queue (global): list_changed, log messages → fans out -- to ALL open sse_conn (broadcast). -- sess.notify_q (per-session): server-initiated requests → only that -- session's sse_conn. -- -- write_buf discipline: append-only via `..`; consume via :sub(offset+1) -- after partial-send. NEVER reorder or rewrite past bytes. local READ_BUF_CAP = 64 * 1024 -- 64 KiB for header section local BODY_CAP = 8 * 1024 * 1024 -- 8 MiB for request body local WRITE_BUF_CAP = 1 * 1024 * 1024 -- 1 MiB per-conn write buffer local HEARTBEAT_SEC = 30 local SESSION_IDLE_SEC = 60 local SELECT_TIMEOUT = 0.1 local function _new_session_id() return string.format("%d-%09d", os.time(), math.random(0, 999999999)) end local function _http_status_line(status) return "HTTP/1.1 " .. status .. "\r\n" end local function _http_header_block(headers) local parts = {} for k, v in pairs(headers) do parts[#parts + 1] = k .. ": " .. v end parts[#parts + 1] = "" -- blank line parts[#parts + 1] = "" -- trailing CRLF return table.concat(parts, "\r\n") end local function _queue_write(conn, s) -- Append-only. If cap exceeded, evict the connection. if #conn.write_buf + #s > WRITE_BUF_CAP then conn.state = "closing" return false end conn.write_buf = conn.write_buf .. s return true end local function _build_http_response(status, headers, body, session_id) headers = headers or {} headers["Content-Length"] = tostring(#body) headers["Connection"] = "close" if session_id then headers["Mcp-Session-Id"] = session_id end return _http_status_line(status) .. _http_header_block(headers) .. body end local function _build_sse_headers(session_id) local h = { ["Content-Type"] = "text/event-stream", ["Cache-Control"] = "no-cache", ["Connection"] = "keep-alive", ["Access-Control-Allow-Origin"] = "*", } if session_id then h["Mcp-Session-Id"] = session_id end return _http_status_line("200 OK") .. _http_header_block(h) end -- Format a JSON-RPC payload as one SSE message event. local function _sse_event(payload_str) return "event: message\r\ndata: " .. payload_str .. "\r\n\r\n" end -- Format a server-initiated JSON-RPC request from the notification queue -- entry table { jsonrpc, id?, method, params? }. local function _encode_notify(entry) return json.encode(entry) end -- ---- Per-connection FSM helpers ---- local function _parse_request_head(conn) -- Look for \r\n\r\n. If found, parse request line + headers. local sep = conn.buf:find("\r\n\r\n", 1, true) if not sep then return false end -- not complete yet local head = conn.buf:sub(1, sep - 1) conn.buf = conn.buf:sub(sep + 4) -- preserve any body bytes already buffered local lines = {} for line in head:gmatch("[^\r\n]+") do lines[#lines + 1] = line end if #lines == 0 then return nil, "empty head" end local method, path, version = lines[1]:match("^(%S+)%s+(%S+)%s+(%S+)") if not method then return nil, "bad request line" end conn.method, conn.path, conn.version = method, path, version local headers = {} for i = 2, #lines do local k, v = lines[i]:match("^(%S+):%s*(.*)") if k then headers[k:lower()] = v end end conn.headers = headers conn.body_remain = tonumber(headers["content-length"] or 0) or 0 if conn.body_remain > BODY_CAP then return nil, "body too large" end return true end local function _check_auth(self, conn) if not self._auth_token then return true end if conn.method == "OPTIONS" then return true end local auth = conn.headers["authorization"] or "" local token = auth:match("^Bearer%s+(.+)$") return token == self._auth_token end -- ---- Session lookup / create ---- -- Resolve session by id. Returns the session table on success, or -- (nil, "unknown") if `sid` is non-nil but no such session exists -- (spec: 400/404 — caller decides). With nil `sid` (sessionless POST, -- backwards compat), auto-issues a fresh session. local function _resolve_session(self, sid) if sid then local sess = self._sessions[sid] if not sess then return nil, "unknown" end sess.last_activity = os.time() return sess end local new_id = _new_session_id() self._sessions[new_id] = { id = new_id, sse_conn = nil, pending = {}, -- req_id → on_response notify_q = {}, -- per-session, server-initiated requests created = os.time(), last_activity = os.time(), } return self._sessions[new_id] end -- For `initialize` specifically, always mint a new session id regardless -- of any client-provided header (the spec lets the server choose). local function _create_session(self) local new_id = _new_session_id() self._sessions[new_id] = { id = new_id, sse_conn = nil, pending = {}, notify_q = {}, created = os.time(), last_activity = os.time(), } return self._sessions[new_id] end -- Scan all sessions for a pending server-initiated request matching id. -- Returns (session, callback) or nil. local function _find_pending(self, req_id) for _, sess in pairs(self._sessions) do local cb = sess.pending[req_id] if cb then return sess, cb end end return nil end -- ---- Dispatch a fully-parsed POST body ---- -- Forward declarations: used by _dispatch_post, defined below. local _drive_handler_co local _finalise_dispatch local function _dispatch_post(self, conn) local body = conn.body if body == "" then return _build_http_response("400 Bad Request", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, jsonrpc_error(nil, -32700, "Empty body"), nil) end local ok, rpc_req = pcall(json.decode, body) if not ok then return _build_http_response("400 Bad Request", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, jsonrpc_error(nil, -32700, "Parse error"), nil) end -- Server-initiated response routing: if id matches a pending -- server-initiated request in ANY session, this POST is a response. if rpc_req.id and (rpc_req.result ~= nil or rpc_req.error ~= nil) and not rpc_req.method then local sess, cb = _find_pending(self, rpc_req.id) if sess then sess.pending[rpc_req.id] = nil pcall(cb, rpc_req) return _build_http_response("202 Accepted", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, "", sess.id) end end -- Session resolution (deferred from header-parse time so we can detect -- `initialize`). Rules: -- - `initialize`: always mint a fresh session, ignoring any client sid -- - other methods, sid absent: auto-issue (backwards compat) -- - other methods, sid known: use it -- - other methods, sid unknown: 404 local sess if rpc_req.method == "initialize" then sess = _create_session(self) else local s, serr = _resolve_session(self, conn.requested_sid) if not s then return _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session not found: " .. tostring(conn.requested_sid), nil) end sess = s end conn.session_id = sess.id -- Stash session id on the request so handle_request → tools/call can -- expose it to handler ctx (issue #9 — sampling needs to know which -- session to push the request onto). rpc_req._session_id = sess.id -- Stash the JSON-RPC id on the conn so _finalise_dispatch can clear -- the cancellation flag for this request after building the response -- (issue #11). Notifications have nil id; that's fine — the -- nil-guard in _finalise_dispatch keeps tostring(nil) out of the -- cancel map. conn.dispatch_id = rpc_req.id -- Concurrent handler dispatch (issue #20). Wrap the dispatch call in -- a coroutine so any tool handler that goes through server.lua:run() -- (which yields when polling its sentinel file) can return control to -- the event loop while it waits. Other connections continue making -- progress. -- -- The coroutine resumes itself synchronously the first time. If it -- completes without yielding (pure-Lua handlers, ping, etc.) the -- response is built inline as before. If it yields, we park it in -- self._pending_handlers and return nil — the conn enters -- dispatching_async, the scheduler tick resumes when wake_at passes. local co = coroutine.create(function() return self:handle_request(rpc_req) end) return _drive_handler_co(self, conn, co) end -- Resume a handler coroutine until it completes or yields. On completion, -- build the deferred HTTP response (preserving the Accept-aware shape). -- On yield, register in self._pending_handlers and return nil — the conn -- is parked in dispatching_async until the scheduler resumes it. _drive_handler_co = function(self, conn, co) local rok, ryield = coroutine.resume(co) if coroutine.status(co) == "dead" then return _finalise_dispatch(self, conn, rok, ryield, co) end -- Suspended. Parse the yield payload. local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0 self._pending_handlers[#self._pending_handlers + 1] = { co = co, conn = conn, wake_at = wake_at, } conn.state = "dispatching_async" return nil -- no write_buf change; conn parks end -- Build the HTTP response for a completed dispatch. `rok` is the coroutine.resume -- success flag; `result` is the handler/dispatch return (a JSON-RPC string when -- rok=true; an error message when rok=false). Used by both the sync path -- (_dispatch_post tail) and the async resume path (_scheduler_tick). -- Also: clears cancellation flag and ctx-by-co registry entry for this -- request (issue #11 — single cleanup site per Phase 5 review fix #7). _finalise_dispatch = function(self, conn, rok, result, co) local session_id = conn.session_id -- Cleanup (always): drop the coroutine's ctx entry and any -- cancellation flag for this request id. if co ~= nil then _ctx_by_co[co] = nil end local rid = conn.dispatch_id local was_cancelled = false if rid ~= nil then local rid_str = tostring(rid) if self._cancelled_ids[rid_str] then was_cancelled = true self._cancelled_ids[rid_str] = nil end end -- Issue #11: cancelled requests get a -32800 JSON-RPC error response. -- The MCP spec wording is "SHOULD NOT respond" (not MUST NOT). A silent -- TCP-close would be cleaner but the spawned shell subprocess in -- server.lua:run() inherits the socket FD via fork(), so the kernel -- keeps the connection alive until that shell exits (i.e. the -- underlying long-running command completes anyway). The error -- response gives the client a structured signal and exits curl -- immediately, which is the practical UX they want. JSON-RPC 2.0 -- code -32800 is the convention for "Request cancelled." if was_cancelled then return _build_http_response("200 OK", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, jsonrpc_error(rid, -32800, "Request cancelled"), session_id) end if not rok then -- Internal dispatch error — surface as a JSON-RPC error response. return _build_http_response("500 Internal Server Error", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, jsonrpc_error(nil, -32603, "Internal error: " .. tostring(result)), session_id) end if not result then -- Notification → 202 Accepted, no body. return _build_http_response("202 Accepted", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, "", session_id) end -- Accept-aware response shape (re-checked at finalise time; survives -- parking because conn.headers is captured by the closure scope). local accept = conn.headers["accept"] or "" if accept:find("text/event%-stream") then local hdrs = _build_sse_headers(session_id) return hdrs .. _sse_event(result) end return _build_http_response("200 OK", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, result, session_id) end local function _dispatch_options(conn) local acrh = conn.headers["access-control-request-headers"] return _build_http_response("204 No Content", { ["Access-Control-Allow-Origin"] = "*", ["Access-Control-Allow-Methods"] = "GET, POST, DELETE, OPTIONS", -- '*' does NOT cover Authorization per CORS spec; list explicitly. ["Access-Control-Allow-Headers"] = acrh and (acrh .. ", Authorization") or "Content-Type, Accept, Authorization, Mcp-Session-Id, Mcp-Protocol-Version", ["Access-Control-Max-Age"] = "86400", }, "", conn.session_id) end local function _dispatch_delete(self, conn) if not conn.session_id or not self._sessions[conn.session_id] then return _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session not found", nil) end local sess = self._sessions[conn.session_id] if sess.sse_conn then sess.sse_conn.state = "closing" sess.sse_conn = nil end self._sessions[conn.session_id] = nil return _build_http_response("204 No Content", { ["Access-Control-Allow-Origin"] = "*" }, "", nil) end -- ---- Main loop helpers ---- local function _conn_read(self, conn) local chunk, err, partial = conn.sock:receive(8192) local data = chunk or partial or "" if data ~= "" then if #conn.buf + #data > READ_BUF_CAP and conn.state == "reading_head" then -- Header section too large. conn.write_buf = _build_http_response("431 Request Header Fields Too Large", { ["Content-Type"] = "text/plain" }, "Headers too large", nil) conn.state = "writing" return end conn.buf = conn.buf .. data end if err == "closed" then conn.state = "closing" return end -- Advance FSM. if conn.state == "reading_head" then local ok, perr = _parse_request_head(conn) if perr then conn.write_buf = _build_http_response("400 Bad Request", { ["Content-Type"] = "text/plain" }, tostring(perr), nil) conn.state = "writing" return end if not ok then return end -- still waiting for full head -- Headers parsed: auth check, session resolve. if not _check_auth(self, conn) then conn.write_buf = _build_http_response("401 Unauthorized", { ["Content-Type"] = "application/json", ["WWW-Authenticate"] = "Bearer" }, '{"error":"unauthorized"}', nil) conn.state = "writing" return end -- Session resolution happens at dispatch time (after body is read), -- because `initialize` always mints a fresh session and we need to -- know the method to distinguish 404 (unknown id, non-initialize) -- from "auto-issue on initialize". Just stash the requested id. conn.requested_sid = conn.headers["mcp-session-id"] if conn.body_remain > 0 then -- Any body bytes already in conn.buf land here. conn.body = conn.buf:sub(1, conn.body_remain) conn.buf = conn.buf:sub(#conn.body + 1) conn.body_remain = conn.body_remain - #conn.body conn.state = (conn.body_remain == 0) and "dispatching" or "reading_body" else conn.state = "dispatching" end end if conn.state == "reading_body" then if #conn.buf > 0 then local take = math.min(conn.body_remain, #conn.buf) conn.body = conn.body .. conn.buf:sub(1, take) conn.buf = conn.buf:sub(take + 1) conn.body_remain = conn.body_remain - take end if conn.body_remain == 0 then conn.state = "dispatching" end end if conn.state == "dispatching" then local path = conn.path or "" if not path:match("^/mcp") then conn.write_buf = _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Not Found", nil) conn.state = "writing" return end if conn.method == "OPTIONS" then conn.write_buf = _dispatch_options(conn) conn.state = "writing" elseif conn.method == "DELETE" then -- Resolve session: no auto-issue for DELETE; unknown sid → 404. if not conn.requested_sid then conn.write_buf = _build_http_response("400 Bad Request", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Mcp-Session-Id required for DELETE", nil) conn.state = "writing" return end conn.session_id = conn.requested_sid conn.write_buf = _dispatch_delete(self, conn) conn.state = "writing" elseif conn.method == "GET" then -- Resolve session (auto-issue if missing, 404 if unknown). local sess, serr = _resolve_session(self, conn.requested_sid) if not sess then conn.write_buf = _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session not found: " .. tostring(conn.requested_sid), nil) conn.state = "writing" return end conn.session_id = sess.id -- Persistent SSE stream. Enforce one per session. if sess.sse_conn and sess.sse_conn ~= conn then conn.write_buf = _build_http_response("409 Conflict", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session already has an open SSE stream", nil) conn.state = "writing" return end sess.sse_conn = conn local hdrs = _build_sse_headers(conn.session_id) -- For backwards compat with the old SDK probe shape, emit a one-shot -- 'endpoint' event so clients can self-discover; modern clients -- ignore it and just await message events. local endpoint_data = json.encode({ endpoint = "/mcp", sessionId = conn.session_id, }) conn.write_buf = hdrs .. "event: endpoint\r\ndata: " .. endpoint_data .. "\r\n\r\n" conn.state = "sse_open" conn.last_heart = os.time() elseif conn.method == "POST" then -- _dispatch_post may return nil (issue #20) if the handler -- coroutine yielded. In that case it set conn.state = -- "dispatching_async" itself and parked the coroutine. local resp = _dispatch_post(self, conn) if resp then conn.write_buf = resp -- _finalise_dispatch sets conn.state = "closing" for -- cancelled requests (issue #11); only override if not. if conn.state ~= "closing" then conn.state = "writing" end end -- else: conn already parked; scheduler tick will finalise. else conn.write_buf = _build_http_response("405 Method Not Allowed", { ["Content-Type"] = "text/plain", ["Allow"] = "GET, POST, DELETE, OPTIONS" }, "Method Not Allowed", nil) conn.state = "writing" end end end local function _conn_write(conn) if conn.write_buf == "" then if conn.state == "writing" then conn.state = "closing" end return end local sent, err, sent_partial = conn.sock:send(conn.write_buf) if err == "closed" then conn.state = "closing" return end -- luasocket: on success returns last_byte_index_sent; on partial/timeout -- returns (nil, "timeout"|"closed", last_byte_index_sent_so_far). The -- second-return numeric is an absolute index into the original string. local idx = sent or sent_partial or 0 if idx > 0 then conn.write_buf = conn.write_buf:sub(idx + 1) end if conn.write_buf == "" and conn.state == "writing" then conn.state = "closing" end end local function _drain_notifications(self) -- Global broadcast queue: fan out to every open sse_conn. while #self._notify_queue > 0 do local entry = table.remove(self._notify_queue, 1) local payload = _sse_event(_encode_notify(entry)) for _, sess in pairs(self._sessions) do if sess.sse_conn and sess.sse_conn.state == "sse_open" then _queue_write(sess.sse_conn, payload) end end end -- Per-session queues: route to that session only. for _, sess in pairs(self._sessions) do while #sess.notify_q > 0 do if not (sess.sse_conn and sess.sse_conn.state == "sse_open") then break -- no live SSE; leave queued (or expire policy could drop) end local entry = table.remove(sess.notify_q, 1) _queue_write(sess.sse_conn, _sse_event(_encode_notify(entry))) end end end local function _heartbeat_tick(self) local now = os.time() -- Heartbeats on open SSE conns. for _, sess in pairs(self._sessions) do local conn = sess.sse_conn if conn and conn.state == "sse_open" and now - conn.last_heart >= HEARTBEAT_SEC then _queue_write(conn, ": heartbeat\r\n\r\n") conn.last_heart = now end end -- Idle session expiry: no SSE + no activity for SESSION_IDLE_SEC. for sid, sess in pairs(self._sessions) do if sess.sse_conn == nil and now - sess.last_activity > SESSION_IDLE_SEC then self._sessions[sid] = nil end end end -- Issue #20 — scheduler tick. Resume any parked dispatch coroutine whose -- wake_at has passed. On completion, build the deferred response and -- queue it for write. If the connection died while the handler was -- parked, drop the coroutine. -- -- gettime() is wall-clock (luasocket uses gettimeofday) — NOT monotonic. -- A large NTP step backwards could delay resumes; forwards could bunch -- them. Acceptable for the deployment fleet (chrony slews); revisit if -- a use case appears that needs CLOCK_MONOTONIC. local function _scheduler_tick(self) if not self._pending_handlers[1] then return end local socket = require("socket") local now = socket.gettime() local i = 1 while i <= #self._pending_handlers do local p = self._pending_handlers[i] if p.conn.state == "closing" then -- Connection died mid-handler; drop the coroutine entirely -- and free its ctx entry (issue #11 cleanup discipline). _ctx_by_co[p.co] = nil if p.conn.dispatch_id ~= nil then self._cancelled_ids[tostring(p.conn.dispatch_id)] = nil end table.remove(self._pending_handlers, i) elseif now >= p.wake_at then -- Time to resume. Remove from pending BEFORE resume so a -- re-yielding handler re-adds itself cleanly via _drive_handler_co. table.remove(self._pending_handlers, i) local rok, ryield = coroutine.resume(p.co) if coroutine.status(p.co) == "dead" then local resp = _finalise_dispatch(self, p.conn, rok, ryield, p.co) p.conn.write_buf = (p.conn.write_buf or "") .. resp -- _finalise_dispatch may set conn.state = "closing" for -- cancelled requests; only transition to writing if it -- didn't already pick the closing path. if p.conn.state ~= "closing" then p.conn.state = "writing" end else -- Yielded again — re-park. local wake_at = (type(ryield) == "table" and ryield.wake_at) or 0 self._pending_handlers[#self._pending_handlers + 1] = { co = p.co, conn = p.conn, wake_at = wake_at, } end else i = i + 1 end end end -- Returns the earliest pending wake_at as an offset from now, or nil if -- no handlers are parked. Used to tighten the select() timeout so the -- scheduler wakes on the right beat. local function _next_pending_delay(self) if not self._pending_handlers[1] then return nil end local socket = require("socket") local now = socket.gettime() local earliest = math.huge for _, p in ipairs(self._pending_handlers) do if p.wake_at < earliest then earliest = p.wake_at end end local d = earliest - now if d < 0 then return 0 end return d end -- ---- Public: server-initiated request (for sampling/roots/etc.) ---- -- Enqueues a JSON-RPC request on the session's SSE stream. The callback -- fires when the client POSTs back the response (matched by id). function lmcp:server_request(session_id, method, params, on_response) local sess = self._sessions[session_id] if not sess or not sess.sse_conn then return false, "no live SSE stream for session " .. tostring(session_id) end self._server_req_id = (self._server_req_id or 0) + 1 local id = "srv-" .. self._server_req_id sess.pending[id] = on_response local msg = { jsonrpc = JSONRPC, id = id, method = method } -- Omit params if nil OR an empty Lua table (would JSON-encode as [] -- per project_json_empty_table_gotcha memory). Real params with at -- least one key encode correctly as an object. if params ~= nil and (type(params) ~= "table" or next(params)) then msg.params = params end sess.notify_q[#sess.notify_q + 1] = msg return true, id end -- Sampling (MCP issue #9): ask the client's LLM to generate text. Returns -- (true, request_id) if dispatched, (false, err) otherwise. `on_response` -- is called with the client's JSON-RPC response shape: -- { result = { role, content = { type = "text", text = "..." }, model, stopReason? } } -- or { error = { code, message } }. -- -- Today this is fire-and-forget — tool handlers cannot block waiting for -- the response in the single-threaded event loop (see follow-up #20). A -- tool may kick off sampling and return immediately; the callback fires -- when the client posts the response back. -- -- opts shape (matches MCP spec): -- messages = { { role = "user"|"assistant", content = {type, text} }, ... } -- modelPreferences? = { hints?, intelligencePriority?, ... } -- systemPrompt? = string -- includeContext? = "none"|"thisServer"|"allServers" -- temperature? = number -- maxTokens = integer (required) -- stopSequences? = list of strings function lmcp:sample(session_id, opts, on_response) if not (self._client_caps.sampling) then return false, "client did not advertise sampling capability" end if type(opts) ~= "table" or type(opts.messages) ~= "table" or type(opts.maxTokens) ~= "number" then return false, "sample: opts.messages (table) and opts.maxTokens (number) required" end return self:server_request(session_id, "sampling/createMessage", opts, on_response) end -- Roots (MCP issue #10): ask the client which filesystem/URL roots are -- in scope for this session. Async like sample(); on_fetched(roots_list, -- err) fires when the client responds. Result is also cached on -- self._roots_cache[session_id] for later sync lookups. -- -- Client→server `notifications/roots/list_changed` invalidates the cache; -- next call to :roots() re-fetches. function lmcp:roots(session_id, on_fetched) if not (self._client_caps.roots) then return false, "client did not advertise roots capability" end return self:server_request(session_id, "roots/list", {}, function(resp) if resp.error then if on_fetched then on_fetched(nil, resp.error.message or "rpc error") end return end local list = resp.result and resp.result.roots or {} self._roots_cache[session_id] = { roots = list, fetched = os.time(), } if on_fetched then on_fetched(list, nil) end end) end -- Synchronous lookup of the cached roots. Returns the list (possibly -- empty) if previously fetched, or nil if no :roots() call has completed -- for this session yet. function lmcp:roots_cached(session_id) local entry = self._roots_cache[session_id] return entry and entry.roots or nil end -- Synchronous helper: is `path` (a file:// URI or absolute filesystem -- path) within any cached root for this session? Returns: -- true → matches at least one root -- false → cache is populated but path is outside all roots -- nil → no roots cached yet; caller should :roots() first function lmcp:path_in_roots(session_id, path) local roots = self:roots_cached(session_id) if not roots then return nil end -- Normalise: treat file:// URIs and bare paths uniformly. local norm = path:gsub("^file://", "") for _, r in ipairs(roots) do local root_path = (r.uri or ""):gsub("^file://", "") if root_path ~= "" and norm:sub(1, #root_path) == root_path then return true end end return false end function lmcp:run() local socket = require("socket") local server_sock = assert(socket.bind(self.host, self.port)) server_sock:settimeout(0) self._conns = {} self._sessions = self._sessions or {} local addr, port = server_sock:getsockname() io.stderr:write(string.format("lmcp: %s v%s listening on %s:%d/mcp\n", self.name, self.version, addr, port)) while true do -- Build select watch lists. local reads, writes = { server_sock }, {} for sock, conn in pairs(self._conns) do if conn.state == "reading_head" or conn.state == "reading_body" or conn.state == "sse_open" then reads[#reads + 1] = sock end if conn.write_buf ~= "" then writes[#writes + 1] = sock end end -- Tighten select timeout if a parked handler is due sooner. -- Otherwise a 100ms tick adds 100ms latency to short shell-tool runs. local select_timeout = SELECT_TIMEOUT local next_pend = _next_pending_delay(self) if next_pend and next_pend < select_timeout then select_timeout = next_pend end local ready_r, ready_w = socket.select(reads, writes, select_timeout) for _, sock in ipairs(ready_r or {}) do if sock == server_sock then local new_sock, aerr = server_sock:accept() if new_sock then new_sock:settimeout(0) self._conns[new_sock] = { sock = new_sock, state = "reading_head", buf = "", body = "", headers = {}, method = nil, path = nil, body_remain = 0, write_buf = "", session_id = nil, last_heart = os.time(), } elseif aerr and aerr ~= "timeout" then io.stderr:write("lmcp: accept error: " .. tostring(aerr) .. "\n") end else local conn = self._conns[sock] if conn then local ok, rerr = pcall(_conn_read, self, conn) if not ok then io.stderr:write("lmcp: read error: " .. tostring(rerr) .. "\n") conn.state = "closing" end end end end for _, sock in ipairs(ready_w or {}) do local conn = self._conns[sock] if conn then local ok, werr = pcall(_conn_write, conn) if not ok then io.stderr:write("lmcp: write error: " .. tostring(werr) .. "\n") conn.state = "closing" end end end -- Per-tick maintenance. _drain_notifications(self) _heartbeat_tick(self) _scheduler_tick(self) -- issue #20: resume due dispatch coroutines -- After draining, attempt immediate writes on conns whose write_buf -- just got bytes (so list_changed / heartbeat / async-completed -- responses appear within one tick). for sock, conn in pairs(self._conns) do if conn.write_buf ~= "" and conn.state ~= "closing" then pcall(_conn_write, conn) end end -- Sweep closing conns. for sock, conn in pairs(self._conns) do if conn.state == "closing" then -- Detach from session if it was the sse_conn. if conn.session_id then local sess = self._sessions[conn.session_id] if sess and sess.sse_conn == conn then sess.sse_conn = nil sess.last_activity = os.time() end end pcall(sock.close, sock) self._conns[sock] = nil end end end end -- ---- stdio transport (MCP issue #15) ---- -- Line-delimited JSON-RPC: one message per line on stdin, one response -- line per request on stdout, diagnostics on stderr. EOF closes cleanly. -- Does NOT require luasocket — handle_request is transport-agnostic. -- Bearer auth bypassed: stdio means the parent process is the trust -- boundary. function lmcp:run_stdio() -- Default stdout buffering on a pipe is "full" — a response would -- sit in the buffer until it fills, deadlocking the MCP client. -- Set "no" once + per-write flush belt-and-braces. io.stdout:setvbuf("no") io.stderr:write(string.format( "lmcp: %s v%s serving stdio\n", self.name, self.version)) for line in io.stdin:lines() do if line ~= "" then -- pcall the whole body so a transient error (malformed JSON, -- handler bug, exotic pipe state) doesn't crash the loop. local body_ok, body_err = pcall(function() local parse_ok, req = pcall(json.decode, line) local response if not parse_ok then response = jsonrpc_error(nil, -32700, "Parse error") else response = self:handle_request(req) end if type(response) == "string" then io.stdout:write(response, "\n") io.stdout:flush() elseif response ~= nil then io.stderr:write( "lmcp: handler returned non-string (" .. type(response) .. "); dropped\n") end end) if not body_ok then io.stderr:write("lmcp: stdio loop error: " .. tostring(body_err) .. "\n") end end end end return lmcp