-- lmcp.lua — Lightweight MCP server in pure Lua -- Zero external dependencies (uses built-in socket or luasocket) -- SPDX-License-Identifier: MIT local json = require('json') local lmcp = {} lmcp.__index = lmcp -- Read auth token from config file if present local function read_conf(path) local conf = {} local f = io.open(path, 'r') if not f then return conf end for line in f:lines() do local k, v = line:match('^%s*(%S+)%s*=%s*(.-)%s*$') if k and not k:match('^#') then conf[k] = v end end f:close() return conf end -- Protocol constants local MCP_VERSION = "2025-06-18" local JSONRPC = "2.0" function lmcp.new(name, opts) opts = opts or {} local self = setmetatable({}, lmcp) self.name = name or "lmcp" self.version = opts.version or "0.1.0" self.host = opts.host or "0.0.0.0" self.port = opts.port or 8080 self.tools = {} -- Resources primitive (MCP 2025-06-18 §Server/Resources). Storage is -- always present; capability is advertised iff `opts.resources` is -- truthy OR at least one resource/template has been registered by -- initialize time. The opt-in covers servers that register resources -- after :run() — strict clients cache the capability set from -- initialize and won't call resources/list otherwise. self.resources = {} self.resource_templates = {} self._force_resources_cap = opts.resources and true or false -- Prompts primitive (MCP 2025-06-18 §Server/Prompts). Same capability -- discipline as resources: advertised iff at least one is registered -- OR opts.prompts forces it (strict clients cache the capability set -- from initialize and won't call prompts/list otherwise). self.prompts = {} self._force_prompts_cap = opts.prompts and true or false -- Completion (MCP issue #7). Keyed by "ref_type:ref_id:arg_name". -- ref_type ∈ {"ref/prompt", "ref/resource"}, ref_id is the prompt -- name or resource-template uriTemplate, arg_name is the parameter -- whose value the client wants completions for. self.completions = {} self._force_completions_cap = opts.completions and true or false -- Logging (MCP issue #8). RFC-5424 severity levels in ascending order. -- Client sets minimum level via logging/setLevel; messages below are -- dropped. Default level "warning" until the client picks one. Capability -- is opt-in via opts.logging (servers that want a structured log channel -- must declare it at construction; we don't presume). self._log_level = "warning" self._force_logging_cap = opts.logging and true or false -- Client capabilities captured at initialize time (MCP issue #9). -- Used to guard server-initiated requests (sampling, roots) — we don't -- issue them unless the client claimed support during handshake. self._client_caps = {} self._client_info = {} -- Roots cache (MCP issue #10), keyed by session_id. Populated when the -- server calls `:roots(session_id, ...)`; invalidated when the client -- sends notifications/roots/list_changed. self._roots_cache = {} -- Notification queue: drained by Streamable HTTP transport (issue #16). -- Today delivery is a no-op; we still enqueue so the emission code -- path is exercised. Capped + deduped to keep the queue useful. self._notify_queue = {} self._notify_cap = 100 self._session_id = nil -- Auth: explicit opt > conf file > LMCP_TOKEN env > nil (no auth) if opts.auth_token then self._auth_token = opts.auth_token elseif opts.conf then local conf = read_conf(opts.conf) self._auth_token = conf['.godparticle'] else local env_token = os.getenv("LMCP_TOKEN") if env_token and env_token ~= "" then self._auth_token = env_token end end return self end -- Cursor pagination helper for list methods (MCP issue #12). Cursor is -- an opaque base64 string per spec; we use it to encode the next offset. -- Page size default 50 covers every plausible lmcp deployment today; -- larger registered sets are still handled correctly. local _PAGE_SIZE = 50 local function paginate(items, cursor) local n = #items -- Decode incoming cursor. Malformed → start from 0. local offset = 0 if type(cursor) == "string" and cursor ~= "" then local mime_ok, mime = pcall(require, "mime") if mime_ok then local decoded = mime.unb64(cursor) or "" local parsed = tonumber(decoded) if parsed and parsed >= 0 and parsed <= n then offset = math.floor(parsed) end end end local page = {} local stop = math.min(offset + _PAGE_SIZE, n) for i = offset + 1, stop do page[#page + 1] = items[i] end local next_cursor if stop < n then local mime_ok, mime = pcall(require, "mime") if mime_ok then next_cursor = (mime.b64(tostring(stop)) or ""):gsub("[\r\n]", "") end end return page, next_cursor end -- Register a tool. -- opts (optional, 5th arg): -- annotations = { title?, readOnlyHint?, destructiveHint?, -- idempotentHint?, openWorldHint? } -- outputSchema = -- shape of structuredContent (issue #13) -- Handler signature: function(args, ctx) where ctx = { _meta = … } from -- the request. ctx is optional — existing 1-arg handlers keep working. -- Handler return shapes: -- string → single text content block (no structured) -- { type = "...", ... } → typed content block (image/etc.); no structured -- table without `type` → JSON-encoded into text content AND mirrored as -- structuredContent (issue #13; spec-strict clients get first-class -- structured access) function lmcp:tool(name, description, params_schema, handler, opts) self.tools[name] = { name = name, description = description, inputSchema = params_schema or { type = "object", properties = {} }, handler = handler, annotations = opts and opts.annotations or nil, outputSchema = opts and opts.outputSchema or nil, } return self end -- Register a resource (exact URI). opts: { name, description?, mimeType? }. -- Handler signature: function(args) — args is always a table (empty for -- literal resources, populated with template captures for templates). function lmcp:resource(uri, opts, handler) opts = opts or {} if type(uri) ~= "string" or uri == "" then error("resource: uri required") end if type(handler) ~= "function" then error("resource: handler required") end self.resources[uri] = { uri = uri, name = opts.name or uri, description = opts.description, mimeType = opts.mimeType, handler = handler, } self:notify_resources_changed() return self end -- Register a resource template (RFC 6570 subset). Each {name} captures -- one greedy segment. opts: { name, description?, mimeType? }. -- Handler signature: function(args) — args[name] = captured_string. -- Limitation: adjacent captures ({a}{b}) bind ambiguously; register -- separate resources if you need precision. function lmcp:resource_template(uriTemplate, opts, handler) opts = opts or {} if type(uriTemplate) ~= "string" or uriTemplate == "" then error("resource_template: uriTemplate required") end if type(handler) ~= "function" then error("resource_template: handler required") end -- Compile template → Lua pattern + arg-name list. local arg_names = {} -- Escape every Lua-pattern magic char EXCEPT {} (handled separately). local escaped = uriTemplate:gsub("([%%%(%)%.%+%-%*%?%[%]%^%$])", "%%%1") local pattern = escaped:gsub("{([%w_]+)}", function(name) arg_names[#arg_names + 1] = name return "(.+)" end) pattern = "^" .. pattern .. "$" self.resource_templates[#self.resource_templates + 1] = { uriTemplate = uriTemplate, name = opts.name or uriTemplate, description = opts.description, mimeType = opts.mimeType, pattern = pattern, arg_names = arg_names, handler = handler, } self:notify_resources_changed() return self end -- Internal: enqueue a parameterless list_changed notification, with tail -- dedup (consecutive notifications of the same kind collapse — they carry -- no state, so N → 1 "go refetch"). Cap is a backstop, not the policy. -- params omitted on purpose (json.lua empty-table → [] gotcha, would be -- malformed JSON-RPC; spec allows omitting params for parameterless). local function _enqueue_list_changed(self, method) local tail = self._notify_queue[#self._notify_queue] if tail and tail.method == method then return end if #self._notify_queue >= self._notify_cap then table.remove(self._notify_queue, 1) end self._notify_queue[#self._notify_queue + 1] = { jsonrpc = JSONRPC, method = method, } end function lmcp:notify_resources_changed() _enqueue_list_changed(self, "notifications/resources/list_changed") end function lmcp:notify_prompts_changed() _enqueue_list_changed(self, "notifications/prompts/list_changed") end -- Resolve a URI to (resource_or_template_entry, args). Literal match wins -- over templates; templates tried in registration order. local function _resolve_resource(self, uri) local lit = self.resources[uri] if lit then return lit, {} end for _, t in ipairs(self.resource_templates) do local captures = { string.match(uri, t.pattern) } if captures[1] then local args = {} for i, name in ipairs(t.arg_names) do args[name] = captures[i] end return t, args end end return nil end -- Run a resource handler under pcall, normalise the return into a single -- contents item. Returns (item_table, nil) on success, (nil, err_msg) -- on failure. local function _read_resource(entry, args, uri) local ok, result = pcall(entry.handler, args) if not ok then return nil, "resource handler error: " .. tostring(result) end if result == nil then return nil, "resource handler returned no content" end if type(result) == "string" then return { uri = uri, mimeType = entry.mimeType or "text/plain", text = result } end if type(result) == "table" then local mt = result.mimeType or entry.mimeType if result.text ~= nil then return { uri = uri, mimeType = mt or "text/plain", text = result.text } end if result.blob ~= nil then return { uri = uri, mimeType = mt or "application/octet-stream", blob = result.blob } end if result.blob_bytes ~= nil then local mime_ok, mime = pcall(require, "mime") if not mime_ok then return nil, "mime module unavailable; pre-encode blob and return { blob = … }" end local b64 = mime.b64(result.blob_bytes) or "" b64 = b64:gsub("[\r\n]", "") -- some luasocket builds line-wrap return { uri = uri, mimeType = mt or "application/octet-stream", blob = b64 } end end return nil, "resource handler returned unsupported shape" end -- Register a prompt (MCP issue #6). opts: { description?, arguments? } -- where arguments is a list of { name, description?, required? }. -- Handler signature: function(args) where args[name] = supplied string. -- Return either: -- string → single user text message -- { description?, messages = {...} } → full custom shape (passthrough) function lmcp:prompt(name, opts, handler) opts = opts or {} if type(name) ~= "string" or name == "" then error("prompt: name required") end if type(handler) ~= "function" then error("prompt: handler required") end self.prompts[name] = { name = name, description = opts.description, arguments = opts.arguments, -- list of { name, description?, required? } handler = handler, } self:notify_prompts_changed() return self end -- Run a prompt handler under pcall, normalise the return to the spec -- shape { description?, messages = [{ role, content = { type, text } }] }. -- Returns (table, nil) on success, (nil, err_msg) on failure. local function _get_prompt(entry, args) local ok, result = pcall(entry.handler, args or {}) if not ok then return nil, "prompt handler error: " .. tostring(result) end if result == nil then return nil, "prompt handler returned no content" end if type(result) == "string" then return { description = entry.description, messages = {{ role = "user", content = { type = "text", text = result }, }}, } end if type(result) == "table" and type(result.messages) == "table" then return result end return nil, "prompt handler returned unsupported shape (expected string or { messages = … })" end -- Register a completion handler for a prompt/resource-template argument -- (MCP issue #7). ref_type ∈ {"ref/prompt", "ref/resource"}; ref_id is -- the prompt name or the resource-template uriTemplate. fn signature: -- fn(value, ctx) → list of candidate strings (server filters / sorts as -- it likes; spec allows up to 100). ctx mirrors the spec context object -- (currently { arguments = {...} } of previously-completed sibling args). function lmcp:complete(ref_type, ref_id, arg_name, fn) if ref_type ~= "ref/prompt" and ref_type ~= "ref/resource" then error("complete: ref_type must be 'ref/prompt' or 'ref/resource'") end if type(ref_id) ~= "string" or ref_id == "" then error("complete: ref_id required") end if type(arg_name) ~= "string" or arg_name == "" then error("complete: arg_name required") end if type(fn) ~= "function" then error("complete: fn required") end self.completions[ref_type .. ":" .. ref_id .. ":" .. arg_name] = fn return self end -- Log severity ordering (RFC 5424). Lower index = more severe. local LOG_LEVELS = { emergency = 1, alert = 2, critical = 3, error = 4, warning = 5, notice = 6, info = 7, debug = 8, } -- Emit a structured log record. Below the client-set level → drop. -- Today's delivery channel is stderr; once issue #16 lands the -- bidirectional transport, this also enqueues notifications/message -- for the client. data is free-form (string, table, etc.). function lmcp:log(level, logger, data) local lvl = LOG_LEVELS[level] local thr = LOG_LEVELS[self._log_level] or LOG_LEVELS.warning if not lvl or lvl > thr then return end -- below threshold; drop -- stderr fallback: human-readable. The structured form goes on the -- notifications queue for the future Streamable-HTTP delivery path -- (issue #16). Cap + drop-oldest like list_changed. io.stderr:write(string.format("lmcp[%s/%s]: %s\n", level, tostring(logger or "-"), type(data) == "string" and data or json.encode(data))) if #self._notify_queue >= self._notify_cap then table.remove(self._notify_queue, 1) end self._notify_queue[#self._notify_queue + 1] = { jsonrpc = JSONRPC, method = "notifications/message", params = { level = level, logger = logger, data = data }, } end -- JSON-RPC response helpers local function jsonrpc_result(id, result) return json.encode({ jsonrpc = JSONRPC, id = id, result = result }) end local function jsonrpc_error(id, code, message) return json.encode({ jsonrpc = JSONRPC, id = id, error = { code = code, message = message }, }) end -- Handle a single JSON-RPC request function lmcp:handle_request(req) local method = req.method local id = req.id -- nil for notifications -- JSON-RPC 2.0: notifications (no id) MUST NOT receive a response. -- Some notifications carry server-side side effects (cache invalidation, -- progress signals); handle those before the early return. Anything -- not recognised silently drops — clients expect no response either way. if id == nil then if method == "notifications/roots/list_changed" then -- Invalidate cached roots for the session that sent this. if req._session_id then self._roots_cache[req._session_id] = nil end end -- (Other client→server notifications: cancelled, message — no -- action today; add side-effects here as needed.) return nil end if method == "initialize" then self._session_id = self._session_id or tostring(os.time()) -- Capture client capabilities (MCP issue #9 — sampling needs to -- know if the client supports it before issuing a request). local p = req.params or {} self._client_caps = p.capabilities or {} self._client_info = p.clientInfo or {} local caps = { tools = { listChanged = false } } if self._force_resources_cap or next(self.resources) or self.resource_templates[1] then caps.resources = { listChanged = true, subscribe = false } end if self._force_prompts_cap or next(self.prompts) then caps.prompts = { listChanged = true } end if self._force_completions_cap or next(self.completions) then -- Spec uses an empty object as the "supported" marker. -- json.empty_object → {} (not [] from the empty-table gotcha). caps.completions = json.empty_object end if self._force_logging_cap then caps.logging = json.empty_object end return jsonrpc_result(id, { protocolVersion = MCP_VERSION, capabilities = caps, serverInfo = { name = self.name, version = self.version, }, }) elseif method == "ping" then return jsonrpc_result(id, json.empty_object) elseif method == "tools/list" then local tool_list = {} for _, t in pairs(self.tools) do local entry = { name = t.name, description = t.description, inputSchema = t.inputSchema, } -- Emit annotations / outputSchema only when registered. Empty -- Lua tables would JSON-encode as [] (see -- project_json_empty_table_gotcha memory) and break -- spec-strict clients. if t.annotations then entry.annotations = t.annotations end if t.outputSchema then entry.outputSchema = t.outputSchema end tool_list[#tool_list + 1] = entry end local page, next_cursor = paginate(tool_list, (req.params or {}).cursor) local result = { tools = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "tools/call" then local params = req.params or {} local tool_name = params.name local arguments = params.arguments or {} local tool = self.tools[tool_name] if not tool then return jsonrpc_error(id, -32601, "Tool not found: " .. tostring(tool_name)) end -- ctx exposes the request's _meta (issue #13) and the session_id -- (issue #9 — so handlers can call self:sample(ctx.session_id, …)). -- Handlers that don't declare a second parameter ignore it (Lua -- call discards extras). local ctx = { _meta = params._meta, request_id = id, session_id = req._session_id, } local ok, result = pcall(tool.handler, arguments, ctx) if ok then local resp = { isError = false } local meta_out if type(result) == "string" then resp.content = {{ type = "text", text = result }} elseif type(result) == "table" and result.type then -- Typed content block (e.g. image). No structured emission. resp.content = { result } elseif type(result) == "table" then -- Issue #13: extract response _meta before mirroring as -- structuredContent, so server metadata doesn't leak into -- the structured payload. meta_out = result._meta local clean = result if meta_out ~= nil then clean = {} for k, v in pairs(result) do if k ~= "_meta" then clean[k] = v end end end resp.content = {{ type = "text", text = json.encode(clean) }} resp.structuredContent = clean else resp.content = {{ type = "text", text = tostring(result) }} end if meta_out ~= nil then resp._meta = meta_out end return jsonrpc_result(id, resp) else return jsonrpc_result(id, { content = {{ type = "text", text = "Error: " .. tostring(result) }}, isError = true, }) end elseif method == "resources/list" then local out = {} for _, r in pairs(self.resources) do out[#out + 1] = { uri = r.uri, name = r.name, description = r.description, mimeType = r.mimeType, } end local page, next_cursor = paginate(out, (req.params or {}).cursor) local result = { resources = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "resources/templates/list" then local out = {} for _, t in ipairs(self.resource_templates) do out[#out + 1] = { uriTemplate = t.uriTemplate, name = t.name, description = t.description, mimeType = t.mimeType, } end local page, next_cursor = paginate(out, (req.params or {}).cursor) local result = { resourceTemplates = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "resources/read" then local params = req.params or {} local uri = params.uri if type(uri) ~= "string" or uri == "" then return jsonrpc_error(id, -32602, "uri required (string)") end local entry, args = _resolve_resource(self, uri) if not entry then -- -32002 is the MCP-conventional "Resource not found" code; -- distinct from -32602 "Invalid params" so retry/UX logic -- can tell a malformed URI from a missing one. return jsonrpc_error(id, -32002, "Resource not found: " .. uri) end local item, err = _read_resource(entry, args, uri) if not item then return jsonrpc_error(id, -32603, err) end return jsonrpc_result(id, { contents = { item } }) elseif method == "prompts/list" then local out = {} for _, p in pairs(self.prompts) do local entry = { name = p.name, description = p.description } if p.arguments then entry.arguments = p.arguments end out[#out + 1] = entry end local page, next_cursor = paginate(out, (req.params or {}).cursor) local result = { prompts = page } if next_cursor then result.nextCursor = next_cursor end return jsonrpc_result(id, result) elseif method == "prompts/get" then local params = req.params or {} local name = params.name if type(name) ~= "string" or name == "" then return jsonrpc_error(id, -32602, "name required (string)") end local entry = self.prompts[name] if not entry then return jsonrpc_error(id, -32002, "Prompt not found: " .. name) end local result, err = _get_prompt(entry, params.arguments) if not result then return jsonrpc_error(id, -32603, err) end return jsonrpc_result(id, result) elseif method == "logging/setLevel" then local lvl = (req.params or {}).level if type(lvl) ~= "string" or not LOG_LEVELS[lvl] then return jsonrpc_error(id, -32602, "level must be one of: debug, info, notice, warning, error, critical, alert, emergency") end self._log_level = lvl return jsonrpc_result(id, json.empty_object) elseif method == "completion/complete" then local params = req.params or {} local ref = params.ref or {} local arg = params.argument or {} local ref_id = ref.name or ref.uri or ref.uriTemplate or "" if type(ref.type) ~= "string" or ref_id == "" or type(arg.name) ~= "string" then return jsonrpc_error(id, -32602, "ref.type, ref.name/uri/uriTemplate, and argument.name required") end local fn = self.completions[ref.type .. ":" .. ref_id .. ":" .. arg.name] if not fn then -- No completer registered → return empty values (spec-allowed; -- clients typically render no suggestions and let the user type). return jsonrpc_result(id, { completion = { values = {}, hasMore = false }, }) end local value = arg.value or "" local ok, values = pcall(fn, value, params.context or {}) if not ok then return jsonrpc_error(id, -32603, "completion handler error: " .. tostring(values)) end if type(values) ~= "table" then return jsonrpc_error(id, -32603, "completion handler must return a table of strings") end -- Spec cap: at most 100 values per response. If more, truncate -- and set hasMore=true so the client knows there's more. local total = #values local has_more = false if total > 100 then local out = {} for i = 1, 100 do out[i] = values[i] end values = out has_more = true end return jsonrpc_result(id, { completion = { values = values, total = total, hasMore = has_more }, }) else return jsonrpc_error(id, -32601, "Method not found: " .. tostring(method)) end end -- ---- HTTP Server (raw sockets) ---- local function parse_http_request(client) -- Read request line local line, err = client:receive('*l') if not line then return nil, err end local method, path, version = line:match('^(%S+)%s+(%S+)%s+(%S+)') if not method then return nil, 'bad request line' end -- Read headers local headers = {} while true do line, err = client:receive('*l') if not line or line == '' then break end local k, v = line:match('^(%S+):%s*(.*)') if k then headers[k:lower()] = v end end -- Read body local body = '' local content_length = tonumber(headers['content-length'] or 0) if content_length > 0 then body, err = client:receive(content_length) if not body then return nil, err end end return { method = method, path = path, version = version, headers = headers, body = body, } end local function send_response(client, status, headers, body) local parts = { string.format('HTTP/1.1 %s', status) } headers['Content-Length'] = tostring(#body) headers['Connection'] = 'close' for k, v in pairs(headers) do parts[#parts + 1] = k .. ': ' .. v end parts[#parts + 1] = '' parts[#parts + 1] = body client:send(table.concat(parts, '\r\n')) end local function send_sse_event(client, data) client:send('event: message\r\ndata: ' .. data .. '\r\n\r\n') end -- ---- Streamable HTTP transport (MCP issue #16) ---- -- -- select()-based single-thread event loop. All sockets non-blocking. -- Per-connection FSM: reading_head → reading_body → dispatching → writing | sse_open. -- -- Session model: each session has a Mcp-Session-Id; at most one open -- SSE stream (the GET /mcp connection). Server-initiated requests -- (sampling, roots — issues #9/#10) ride on the SSE stream and await -- client responses via subsequent POSTs. -- -- Queue routing: -- self._notify_queue (global): list_changed, log messages → fans out -- to ALL open sse_conn (broadcast). -- sess.notify_q (per-session): server-initiated requests → only that -- session's sse_conn. -- -- write_buf discipline: append-only via `..`; consume via :sub(offset+1) -- after partial-send. NEVER reorder or rewrite past bytes. local READ_BUF_CAP = 64 * 1024 -- 64 KiB for header section local BODY_CAP = 8 * 1024 * 1024 -- 8 MiB for request body local WRITE_BUF_CAP = 1 * 1024 * 1024 -- 1 MiB per-conn write buffer local HEARTBEAT_SEC = 30 local SESSION_IDLE_SEC = 60 local SELECT_TIMEOUT = 0.1 local function _new_session_id() return string.format("%d-%09d", os.time(), math.random(0, 999999999)) end local function _http_status_line(status) return "HTTP/1.1 " .. status .. "\r\n" end local function _http_header_block(headers) local parts = {} for k, v in pairs(headers) do parts[#parts + 1] = k .. ": " .. v end parts[#parts + 1] = "" -- blank line parts[#parts + 1] = "" -- trailing CRLF return table.concat(parts, "\r\n") end local function _queue_write(conn, s) -- Append-only. If cap exceeded, evict the connection. if #conn.write_buf + #s > WRITE_BUF_CAP then conn.state = "closing" return false end conn.write_buf = conn.write_buf .. s return true end local function _build_http_response(status, headers, body, session_id) headers = headers or {} headers["Content-Length"] = tostring(#body) headers["Connection"] = "close" if session_id then headers["Mcp-Session-Id"] = session_id end return _http_status_line(status) .. _http_header_block(headers) .. body end local function _build_sse_headers(session_id) local h = { ["Content-Type"] = "text/event-stream", ["Cache-Control"] = "no-cache", ["Connection"] = "keep-alive", ["Access-Control-Allow-Origin"] = "*", } if session_id then h["Mcp-Session-Id"] = session_id end return _http_status_line("200 OK") .. _http_header_block(h) end -- Format a JSON-RPC payload as one SSE message event. local function _sse_event(payload_str) return "event: message\r\ndata: " .. payload_str .. "\r\n\r\n" end -- Format a server-initiated JSON-RPC request from the notification queue -- entry table { jsonrpc, id?, method, params? }. local function _encode_notify(entry) return json.encode(entry) end -- ---- Per-connection FSM helpers ---- local function _parse_request_head(conn) -- Look for \r\n\r\n. If found, parse request line + headers. local sep = conn.buf:find("\r\n\r\n", 1, true) if not sep then return false end -- not complete yet local head = conn.buf:sub(1, sep - 1) conn.buf = conn.buf:sub(sep + 4) -- preserve any body bytes already buffered local lines = {} for line in head:gmatch("[^\r\n]+") do lines[#lines + 1] = line end if #lines == 0 then return nil, "empty head" end local method, path, version = lines[1]:match("^(%S+)%s+(%S+)%s+(%S+)") if not method then return nil, "bad request line" end conn.method, conn.path, conn.version = method, path, version local headers = {} for i = 2, #lines do local k, v = lines[i]:match("^(%S+):%s*(.*)") if k then headers[k:lower()] = v end end conn.headers = headers conn.body_remain = tonumber(headers["content-length"] or 0) or 0 if conn.body_remain > BODY_CAP then return nil, "body too large" end return true end local function _check_auth(self, conn) if not self._auth_token then return true end if conn.method == "OPTIONS" then return true end local auth = conn.headers["authorization"] or "" local token = auth:match("^Bearer%s+(.+)$") return token == self._auth_token end -- ---- Session lookup / create ---- -- Resolve session by id. Returns the session table on success, or -- (nil, "unknown") if `sid` is non-nil but no such session exists -- (spec: 400/404 — caller decides). With nil `sid` (sessionless POST, -- backwards compat), auto-issues a fresh session. local function _resolve_session(self, sid) if sid then local sess = self._sessions[sid] if not sess then return nil, "unknown" end sess.last_activity = os.time() return sess end local new_id = _new_session_id() self._sessions[new_id] = { id = new_id, sse_conn = nil, pending = {}, -- req_id → on_response notify_q = {}, -- per-session, server-initiated requests created = os.time(), last_activity = os.time(), } return self._sessions[new_id] end -- For `initialize` specifically, always mint a new session id regardless -- of any client-provided header (the spec lets the server choose). local function _create_session(self) local new_id = _new_session_id() self._sessions[new_id] = { id = new_id, sse_conn = nil, pending = {}, notify_q = {}, created = os.time(), last_activity = os.time(), } return self._sessions[new_id] end -- Scan all sessions for a pending server-initiated request matching id. -- Returns (session, callback) or nil. local function _find_pending(self, req_id) for _, sess in pairs(self._sessions) do local cb = sess.pending[req_id] if cb then return sess, cb end end return nil end -- ---- Dispatch a fully-parsed POST body ---- local function _dispatch_post(self, conn) local body = conn.body if body == "" then return _build_http_response("400 Bad Request", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, jsonrpc_error(nil, -32700, "Empty body"), nil) end local ok, rpc_req = pcall(json.decode, body) if not ok then return _build_http_response("400 Bad Request", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, jsonrpc_error(nil, -32700, "Parse error"), nil) end -- Server-initiated response routing: if id matches a pending -- server-initiated request in ANY session, this POST is a response. if rpc_req.id and (rpc_req.result ~= nil or rpc_req.error ~= nil) and not rpc_req.method then local sess, cb = _find_pending(self, rpc_req.id) if sess then sess.pending[rpc_req.id] = nil pcall(cb, rpc_req) return _build_http_response("202 Accepted", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, "", sess.id) end end -- Session resolution (deferred from header-parse time so we can detect -- `initialize`). Rules: -- - `initialize`: always mint a fresh session, ignoring any client sid -- - other methods, sid absent: auto-issue (backwards compat) -- - other methods, sid known: use it -- - other methods, sid unknown: 404 local sess if rpc_req.method == "initialize" then sess = _create_session(self) else local s, serr = _resolve_session(self, conn.requested_sid) if not s then return _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session not found: " .. tostring(conn.requested_sid), nil) end sess = s end conn.session_id = sess.id -- Stash session id on the request so handle_request → tools/call can -- expose it to handler ctx (issue #9 — sampling needs to know which -- session to push the request onto). rpc_req._session_id = sess.id -- Normal client request / notification. Dispatch via handle_request. local response = self:handle_request(rpc_req) if not response then -- Notification → 202 Accepted, no body. return _build_http_response("202 Accepted", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, "", conn.session_id) end -- If client accepts SSE, respond as a single-event SSE stream. -- Otherwise plain JSON body. local accept = conn.headers["accept"] or "" if accept:find("text/event%-stream") then local hdrs = _build_sse_headers(conn.session_id) return hdrs .. _sse_event(response) end return _build_http_response("200 OK", { ["Content-Type"] = "application/json", ["Access-Control-Allow-Origin"] = "*" }, response, conn.session_id) end local function _dispatch_options(conn) local acrh = conn.headers["access-control-request-headers"] return _build_http_response("204 No Content", { ["Access-Control-Allow-Origin"] = "*", ["Access-Control-Allow-Methods"] = "GET, POST, DELETE, OPTIONS", -- '*' does NOT cover Authorization per CORS spec; list explicitly. ["Access-Control-Allow-Headers"] = acrh and (acrh .. ", Authorization") or "Content-Type, Accept, Authorization, Mcp-Session-Id, Mcp-Protocol-Version", ["Access-Control-Max-Age"] = "86400", }, "", conn.session_id) end local function _dispatch_delete(self, conn) if not conn.session_id or not self._sessions[conn.session_id] then return _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session not found", nil) end local sess = self._sessions[conn.session_id] if sess.sse_conn then sess.sse_conn.state = "closing" sess.sse_conn = nil end self._sessions[conn.session_id] = nil return _build_http_response("204 No Content", { ["Access-Control-Allow-Origin"] = "*" }, "", nil) end -- ---- Main loop helpers ---- local function _conn_read(self, conn) local chunk, err, partial = conn.sock:receive(8192) local data = chunk or partial or "" if data ~= "" then if #conn.buf + #data > READ_BUF_CAP and conn.state == "reading_head" then -- Header section too large. conn.write_buf = _build_http_response("431 Request Header Fields Too Large", { ["Content-Type"] = "text/plain" }, "Headers too large", nil) conn.state = "writing" return end conn.buf = conn.buf .. data end if err == "closed" then conn.state = "closing" return end -- Advance FSM. if conn.state == "reading_head" then local ok, perr = _parse_request_head(conn) if perr then conn.write_buf = _build_http_response("400 Bad Request", { ["Content-Type"] = "text/plain" }, tostring(perr), nil) conn.state = "writing" return end if not ok then return end -- still waiting for full head -- Headers parsed: auth check, session resolve. if not _check_auth(self, conn) then conn.write_buf = _build_http_response("401 Unauthorized", { ["Content-Type"] = "application/json", ["WWW-Authenticate"] = "Bearer" }, '{"error":"unauthorized"}', nil) conn.state = "writing" return end -- Session resolution happens at dispatch time (after body is read), -- because `initialize` always mints a fresh session and we need to -- know the method to distinguish 404 (unknown id, non-initialize) -- from "auto-issue on initialize". Just stash the requested id. conn.requested_sid = conn.headers["mcp-session-id"] if conn.body_remain > 0 then -- Any body bytes already in conn.buf land here. conn.body = conn.buf:sub(1, conn.body_remain) conn.buf = conn.buf:sub(#conn.body + 1) conn.body_remain = conn.body_remain - #conn.body conn.state = (conn.body_remain == 0) and "dispatching" or "reading_body" else conn.state = "dispatching" end end if conn.state == "reading_body" then if #conn.buf > 0 then local take = math.min(conn.body_remain, #conn.buf) conn.body = conn.body .. conn.buf:sub(1, take) conn.buf = conn.buf:sub(take + 1) conn.body_remain = conn.body_remain - take end if conn.body_remain == 0 then conn.state = "dispatching" end end if conn.state == "dispatching" then local path = conn.path or "" if not path:match("^/mcp") then conn.write_buf = _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Not Found", nil) conn.state = "writing" return end if conn.method == "OPTIONS" then conn.write_buf = _dispatch_options(conn) conn.state = "writing" elseif conn.method == "DELETE" then -- Resolve session: no auto-issue for DELETE; unknown sid → 404. if not conn.requested_sid then conn.write_buf = _build_http_response("400 Bad Request", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Mcp-Session-Id required for DELETE", nil) conn.state = "writing" return end conn.session_id = conn.requested_sid conn.write_buf = _dispatch_delete(self, conn) conn.state = "writing" elseif conn.method == "GET" then -- Resolve session (auto-issue if missing, 404 if unknown). local sess, serr = _resolve_session(self, conn.requested_sid) if not sess then conn.write_buf = _build_http_response("404 Not Found", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session not found: " .. tostring(conn.requested_sid), nil) conn.state = "writing" return end conn.session_id = sess.id -- Persistent SSE stream. Enforce one per session. if sess.sse_conn and sess.sse_conn ~= conn then conn.write_buf = _build_http_response("409 Conflict", { ["Content-Type"] = "text/plain", ["Access-Control-Allow-Origin"] = "*" }, "Session already has an open SSE stream", nil) conn.state = "writing" return end sess.sse_conn = conn local hdrs = _build_sse_headers(conn.session_id) -- For backwards compat with the old SDK probe shape, emit a one-shot -- 'endpoint' event so clients can self-discover; modern clients -- ignore it and just await message events. local endpoint_data = json.encode({ endpoint = "/mcp", sessionId = conn.session_id, }) conn.write_buf = hdrs .. "event: endpoint\r\ndata: " .. endpoint_data .. "\r\n\r\n" conn.state = "sse_open" conn.last_heart = os.time() elseif conn.method == "POST" then conn.write_buf = _dispatch_post(self, conn) conn.state = "writing" else conn.write_buf = _build_http_response("405 Method Not Allowed", { ["Content-Type"] = "text/plain", ["Allow"] = "GET, POST, DELETE, OPTIONS" }, "Method Not Allowed", nil) conn.state = "writing" end end end local function _conn_write(conn) if conn.write_buf == "" then if conn.state == "writing" then conn.state = "closing" end return end local sent, err, sent_partial = conn.sock:send(conn.write_buf) if err == "closed" then conn.state = "closing" return end -- luasocket: on success returns last_byte_index_sent; on partial/timeout -- returns (nil, "timeout"|"closed", last_byte_index_sent_so_far). The -- second-return numeric is an absolute index into the original string. local idx = sent or sent_partial or 0 if idx > 0 then conn.write_buf = conn.write_buf:sub(idx + 1) end if conn.write_buf == "" and conn.state == "writing" then conn.state = "closing" end end local function _drain_notifications(self) -- Global broadcast queue: fan out to every open sse_conn. while #self._notify_queue > 0 do local entry = table.remove(self._notify_queue, 1) local payload = _sse_event(_encode_notify(entry)) for _, sess in pairs(self._sessions) do if sess.sse_conn and sess.sse_conn.state == "sse_open" then _queue_write(sess.sse_conn, payload) end end end -- Per-session queues: route to that session only. for _, sess in pairs(self._sessions) do while #sess.notify_q > 0 do if not (sess.sse_conn and sess.sse_conn.state == "sse_open") then break -- no live SSE; leave queued (or expire policy could drop) end local entry = table.remove(sess.notify_q, 1) _queue_write(sess.sse_conn, _sse_event(_encode_notify(entry))) end end end local function _heartbeat_tick(self) local now = os.time() -- Heartbeats on open SSE conns. for _, sess in pairs(self._sessions) do local conn = sess.sse_conn if conn and conn.state == "sse_open" and now - conn.last_heart >= HEARTBEAT_SEC then _queue_write(conn, ": heartbeat\r\n\r\n") conn.last_heart = now end end -- Idle session expiry: no SSE + no activity for SESSION_IDLE_SEC. for sid, sess in pairs(self._sessions) do if sess.sse_conn == nil and now - sess.last_activity > SESSION_IDLE_SEC then self._sessions[sid] = nil end end end -- ---- Public: server-initiated request (for sampling/roots/etc.) ---- -- Enqueues a JSON-RPC request on the session's SSE stream. The callback -- fires when the client POSTs back the response (matched by id). function lmcp:server_request(session_id, method, params, on_response) local sess = self._sessions[session_id] if not sess or not sess.sse_conn then return false, "no live SSE stream for session " .. tostring(session_id) end self._server_req_id = (self._server_req_id or 0) + 1 local id = "srv-" .. self._server_req_id sess.pending[id] = on_response local msg = { jsonrpc = JSONRPC, id = id, method = method } -- Omit params if nil OR an empty Lua table (would JSON-encode as [] -- per project_json_empty_table_gotcha memory). Real params with at -- least one key encode correctly as an object. if params ~= nil and (type(params) ~= "table" or next(params)) then msg.params = params end sess.notify_q[#sess.notify_q + 1] = msg return true, id end -- Sampling (MCP issue #9): ask the client's LLM to generate text. Returns -- (true, request_id) if dispatched, (false, err) otherwise. `on_response` -- is called with the client's JSON-RPC response shape: -- { result = { role, content = { type = "text", text = "..." }, model, stopReason? } } -- or { error = { code, message } }. -- -- Today this is fire-and-forget — tool handlers cannot block waiting for -- the response in the single-threaded event loop (see follow-up #20). A -- tool may kick off sampling and return immediately; the callback fires -- when the client posts the response back. -- -- opts shape (matches MCP spec): -- messages = { { role = "user"|"assistant", content = {type, text} }, ... } -- modelPreferences? = { hints?, intelligencePriority?, ... } -- systemPrompt? = string -- includeContext? = "none"|"thisServer"|"allServers" -- temperature? = number -- maxTokens = integer (required) -- stopSequences? = list of strings function lmcp:sample(session_id, opts, on_response) if not (self._client_caps.sampling) then return false, "client did not advertise sampling capability" end if type(opts) ~= "table" or type(opts.messages) ~= "table" or type(opts.maxTokens) ~= "number" then return false, "sample: opts.messages (table) and opts.maxTokens (number) required" end return self:server_request(session_id, "sampling/createMessage", opts, on_response) end -- Roots (MCP issue #10): ask the client which filesystem/URL roots are -- in scope for this session. Async like sample(); on_fetched(roots_list, -- err) fires when the client responds. Result is also cached on -- self._roots_cache[session_id] for later sync lookups. -- -- Client→server `notifications/roots/list_changed` invalidates the cache; -- next call to :roots() re-fetches. function lmcp:roots(session_id, on_fetched) if not (self._client_caps.roots) then return false, "client did not advertise roots capability" end return self:server_request(session_id, "roots/list", {}, function(resp) if resp.error then if on_fetched then on_fetched(nil, resp.error.message or "rpc error") end return end local list = resp.result and resp.result.roots or {} self._roots_cache[session_id] = { roots = list, fetched = os.time(), } if on_fetched then on_fetched(list, nil) end end) end -- Synchronous lookup of the cached roots. Returns the list (possibly -- empty) if previously fetched, or nil if no :roots() call has completed -- for this session yet. function lmcp:roots_cached(session_id) local entry = self._roots_cache[session_id] return entry and entry.roots or nil end -- Synchronous helper: is `path` (a file:// URI or absolute filesystem -- path) within any cached root for this session? Returns: -- true → matches at least one root -- false → cache is populated but path is outside all roots -- nil → no roots cached yet; caller should :roots() first function lmcp:path_in_roots(session_id, path) local roots = self:roots_cached(session_id) if not roots then return nil end -- Normalise: treat file:// URIs and bare paths uniformly. local norm = path:gsub("^file://", "") for _, r in ipairs(roots) do local root_path = (r.uri or ""):gsub("^file://", "") if root_path ~= "" and norm:sub(1, #root_path) == root_path then return true end end return false end function lmcp:run() local socket = require("socket") local server_sock = assert(socket.bind(self.host, self.port)) server_sock:settimeout(0) self._conns = {} self._sessions = self._sessions or {} local addr, port = server_sock:getsockname() io.stderr:write(string.format("lmcp: %s v%s listening on %s:%d/mcp\n", self.name, self.version, addr, port)) while true do -- Build select watch lists. local reads, writes = { server_sock }, {} for sock, conn in pairs(self._conns) do if conn.state == "reading_head" or conn.state == "reading_body" or conn.state == "sse_open" then reads[#reads + 1] = sock end if conn.write_buf ~= "" then writes[#writes + 1] = sock end end local ready_r, ready_w = socket.select(reads, writes, SELECT_TIMEOUT) for _, sock in ipairs(ready_r or {}) do if sock == server_sock then local new_sock, aerr = server_sock:accept() if new_sock then new_sock:settimeout(0) self._conns[new_sock] = { sock = new_sock, state = "reading_head", buf = "", body = "", headers = {}, method = nil, path = nil, body_remain = 0, write_buf = "", session_id = nil, last_heart = os.time(), } elseif aerr and aerr ~= "timeout" then io.stderr:write("lmcp: accept error: " .. tostring(aerr) .. "\n") end else local conn = self._conns[sock] if conn then local ok, rerr = pcall(_conn_read, self, conn) if not ok then io.stderr:write("lmcp: read error: " .. tostring(rerr) .. "\n") conn.state = "closing" end end end end for _, sock in ipairs(ready_w or {}) do local conn = self._conns[sock] if conn then local ok, werr = pcall(_conn_write, conn) if not ok then io.stderr:write("lmcp: write error: " .. tostring(werr) .. "\n") conn.state = "closing" end end end -- Per-tick maintenance. _drain_notifications(self) _heartbeat_tick(self) -- After draining, attempt immediate writes on conns whose write_buf -- just got bytes (so list_changed / heartbeat appears within one tick). for sock, conn in pairs(self._conns) do if conn.write_buf ~= "" and conn.state ~= "closing" then pcall(_conn_write, conn) end end -- Sweep closing conns. for sock, conn in pairs(self._conns) do if conn.state == "closing" then -- Detach from session if it was the sse_conn. if conn.session_id then local sess = self._sessions[conn.session_id] if sess and sess.sse_conn == conn then sess.sse_conn = nil sess.last_activity = os.time() end end pcall(sock.close, sock) self._conns[sock] = nil end end end end -- ---- stdio transport (MCP issue #15) ---- -- Line-delimited JSON-RPC: one message per line on stdin, one response -- line per request on stdout, diagnostics on stderr. EOF closes cleanly. -- Does NOT require luasocket — handle_request is transport-agnostic. -- Bearer auth bypassed: stdio means the parent process is the trust -- boundary. function lmcp:run_stdio() -- Default stdout buffering on a pipe is "full" — a response would -- sit in the buffer until it fills, deadlocking the MCP client. -- Set "no" once + per-write flush belt-and-braces. io.stdout:setvbuf("no") io.stderr:write(string.format( "lmcp: %s v%s serving stdio\n", self.name, self.version)) for line in io.stdin:lines() do if line ~= "" then -- pcall the whole body so a transient error (malformed JSON, -- handler bug, exotic pipe state) doesn't crash the loop. local body_ok, body_err = pcall(function() local parse_ok, req = pcall(json.decode, line) local response if not parse_ok then response = jsonrpc_error(nil, -32700, "Parse error") else response = self:handle_request(req) end if type(response) == "string" then io.stdout:write(response, "\n") io.stdout:flush() elseif response ~= nil then io.stderr:write( "lmcp: handler returned non-string (" .. type(response) .. "); dropped\n") end end) if not body_ok then io.stderr:write("lmcp: stdio loop error: " .. tostring(body_err) .. "\n") end end end end return lmcp