#!/usr/bin/env lua -- lmcp hub — fleet-wide MCP broker. -- -- One MCP endpoint that fans out to every lmcp-backed host, with an SSH -- fallback for hosts whose lmcp is temporarily down (or not installed). -- Exposes a small set of "remote_*" tools that all take a `host` arg -- naming the target in the backend registry. -- -- Registry file (default /opt/herding/etc/hub-backends.conf): -- # name ssh_host lmcp_url token -- boltzmann boltzmann.fritz.box http://boltzmann.fritz.box:8080/mcp -- tesla tesla http://tesla.fritz.box:8080/mcp - -- broglie - http://broglie.fritz.box:8080/mcp - -- Use `-` for "not applicable". Lines starting with # are comments. -- Missing `ssh_host`: no ssh fallback available. Missing `lmcp_url`: ssh-only. -- -- SPDX-License-Identifier: MIT local dir = arg[0]:match('(.*/)') or './' package.path = package.path .. ';' .. dir .. '?.lua' local lmcp = require('lmcp') local json = require('json') local socket = require('socket') -- ---- Backend registry --------------------------------------------------- local CONF_PATH = os.getenv("LMCP_HUB_BACKENDS") or "/opt/herding/etc/hub-backends.conf" local PROBE_TTL_UP = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_UP") or "30") local PROBE_TTL_DOWN_MIN = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_DOWN_MIN") or "60") local PROBE_TTL_DOWN_MAX = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_DOWN_MAX") or "900") local PROBE_BUDGET = tonumber(os.getenv("LMCP_HUB_PROBE_BUDGET") or "3") local LMCP_TIMEOUT = tonumber(os.getenv("LMCP_HUB_LMCP_TIMEOUT") or "6") local SSH_TIMEOUT = tonumber(os.getenv("LMCP_HUB_SSH_TIMEOUT") or "10") local SSH_HARD_TIMEOUT = tonumber(os.getenv("LMCP_HUB_SSH_HARD_TIMEOUT") or "30") local LOG_REQUESTS = (os.getenv("LMCP_HUB_LOG") or "1") ~= "0" local function logreq(fmt, ...) if not LOG_REQUESTS then return end io.stderr:write(string.format("[hub %s] " .. fmt .. "\n", os.date("%H:%M:%S"), ...)) io.stderr:flush() end local function monotonic() local ok, s = pcall(socket.gettime) return ok and s or os.time() end local backends = {} -- name -> { name, ssh_host, lmcp_url, token } local status = {} -- name -> { up=bool, via="lmcp"|"ssh"|nil, checked=t, err=... } local function load_registry() local f = io.open(CONF_PATH, "r") if not f then io.stderr:write("hub: no backend registry at " .. CONF_PATH .. "\n") return end backends = {} for line in f:lines() do line = line:gsub("^%s+", ""):gsub("%s+$", "") if line ~= "" and not line:match("^#") then local parts = {} for p in line:gmatch("%S+") do parts[#parts+1] = p end if #parts >= 2 then local name = parts[1] local ssh_host = (parts[2] ~= "-" and parts[2]) or nil local lmcp_url = (parts[3] ~= "-" and parts[3]) or nil local token = (parts[4] ~= "-" and parts[4]) or nil backends[name] = { name = name, ssh_host = ssh_host, lmcp_url = lmcp_url, token = token, } end end end f:close() end -- ---- Outbound HTTP client (plain, no TLS) ------------------------------ local function parse_url(url) local scheme, host, port, path = url:match("^(%w+)://([^:/]+):?(%d*)(/?.*)$") if not scheme then return nil, "bad url" end port = tonumber(port) or (scheme == "https" and 443 or 80) if path == "" then path = "/" end return { scheme = scheme, host = host, port = port, path = path } end local function http_post_json(url, body, token, timeout) local u, err = parse_url(url) if not u then return nil, err end if u.scheme ~= "http" then return nil, "hub only speaks http to backends" end local sock = socket.tcp() sock:settimeout(timeout or 6) local ok, e = sock:connect(u.host, u.port) if not ok then sock:close(); return nil, "connect: " .. e end local headers = { "POST " .. u.path .. " HTTP/1.1", "Host: " .. u.host .. ":" .. u.port, "Content-Type: application/json", "Content-Length: " .. tostring(#body), "Connection: close", } if token then headers[#headers+1] = "Authorization: Bearer " .. token end local req = table.concat(headers, "\r\n") .. "\r\n\r\n" .. body local _, se = sock:send(req) if se then sock:close(); return nil, "send: " .. se end local chunks = {} while true do local data, rerr, partial = sock:receive(4096) if data then chunks[#chunks+1] = data elseif partial and #partial > 0 then chunks[#chunks+1] = partial end if not data then if rerr == "timeout" then sock:close(); return nil, "timeout" end break end end sock:close() local resp = table.concat(chunks) local hend = resp:find("\r\n\r\n", 1, true) if not hend then return nil, "no body separator" end local status_line = resp:sub(1, resp:find("\r\n", 1, true) - 1) local status_code = tonumber(status_line:match("HTTP/[%d%.]+%s+(%d+)")) local body_str = resp:sub(hend + 4) if status_code ~= 200 then return nil, "http " .. tostring(status_code) .. ": " .. body_str:sub(1, 200) end local ok2, parsed = pcall(json.decode, body_str) if not ok2 then return nil, "bad json: " .. body_str:sub(1, 200) end return parsed end local function jsonrpc_call(url, token, method, params) local body = json.encode({ jsonrpc = "2.0", id = os.time() * 1000 + math.random(1000, 9999), method = method, params = params or {}, }) local resp, err = http_post_json(url, body, token, LMCP_TIMEOUT) if not resp then return nil, err end if resp.error then return nil, "rpc: " .. (resp.error.message or "unknown") end return resp.result end -- ---- SSH exec (shell-escape-free via bash -s on stdin) ----------------- local function shell_quote(s) -- Single-quote wrap, replace embedded single quotes with '\'' return "'" .. tostring(s):gsub("'", "'\\''") .. "'" end local function tmpwrite(content) local path = os.tmpname() local f = io.open(path, "w") if not f then return nil, "tmpfile create failed" end f:write(content) f:close() return path end local function ssh_run_script(ssh_host, script) -- Write the script body locally, then ssh with stdin redirected from it. -- bash -s reads the script from stdin. Nothing inside `script` is subject -- to another shell round of expansion — this is the escape-free path. local spath, perr = tmpwrite(script) if not spath then return nil, perr, -1 end local outpath = spath .. ".out" -- Hard-cap wall time with GNU `timeout` — ssh's ConnectTimeout only -- bounds TCP connect, not the session. Without this, a half-dead sshd -- (auth stall, remote bash-s hang) locks the hub event loop indefinitely. local cmd = string.format( "timeout --kill-after=2 %d ssh -o ConnectTimeout=%d -o ServerAliveInterval=5 -o ServerAliveCountMax=2 -o BatchMode=yes -o StrictHostKeyChecking=accept-new %s 'bash -s' < %s > %s 2>&1; echo $? > %s.rc", SSH_HARD_TIMEOUT, SSH_TIMEOUT, shell_quote(ssh_host), shell_quote(spath), shell_quote(outpath), shell_quote(spath) ) local t0 = monotonic() os.execute(cmd) local dt = monotonic() - t0 local rc_f = io.open(spath .. ".rc", "r") local rc = -1 if rc_f then local s = rc_f:read("*a") or "" s = s:match("^(%S+)") or "" rc = tonumber(s) or -1 rc_f:close() end local out_f = io.open(outpath, "r") local output = out_f and out_f:read("*a") or "" if out_f then out_f:close() end os.remove(spath); os.remove(outpath); os.remove(spath .. ".rc") -- timeout(1) exits 124 on wall-clock expiry, 137 on SIGKILL. Surface it. if rc == 124 or rc == 137 then logreq("ssh HARD-TIMEOUT host=%s after=%.2fs rc=%d", ssh_host, dt, rc) return nil, string.format("ssh hard-timeout after %ds", SSH_HARD_TIMEOUT), rc end logreq("ssh host=%s elapsed=%.2fs rc=%d bytes=%d", ssh_host, dt, rc, #output) return output, nil, rc end -- ---- Health probe ------------------------------------------------------ -- -- Design notes: -- - Probe is lmcp-only. SSH is NOT checked here: it's expensive (3-6s per -- offline host) and the hub exists specifically to absorb lots of -- offline hosts. Hosts with lmcp down but ssh up show as DOWN in the -- health list, but actual remote_* calls still fall through to ssh -- fallback correctly. -- - Sticky DOWN cache with exponential backoff: a DOWN host is re-probed -- at intervals that grow from PROBE_TTL_DOWN_MIN (default 60s) up to -- PROBE_TTL_DOWN_MAX (default 900s). Prevents a sleeping fleet from -- burning probe budget every health check. -- - `remote_list_hosts` uses a parallel curl fan-out for all hosts at -- once so wall-clock is bounded by PROBE_BUDGET (default 3s), not the -- sum of per-host timeouts. local function cache_fresh(s, now) if not s then return false end local ttl if s.up then ttl = PROBE_TTL_UP else ttl = math.min(PROBE_TTL_DOWN_MAX, PROBE_TTL_DOWN_MIN * (2 ^ (s.down_streak or 0))) end return (now - s.checked) < ttl end local function apply_probe_result(name, up, err, via, tool_count) local prev = status[name] local new_s = { checked = os.time(), up = up, via = up and (via or "lmcp") or nil, err = err, tool_count = tool_count, down_streak = up and 0 or ((prev and prev.down_streak or 0) + 1), } status[name] = new_s return new_s end -- Fallback single-host probe used only when a caller explicitly needs a -- status refresh outside the parallel path (currently only -- remote_list_hosts when a single host is looked up). lmcp-only. local function probe(name, force) local b = backends[name] if not b then return { up = false, err = "unknown host" } end local s = status[name] if not force and cache_fresh(s, os.time()) then return s end if not b.lmcp_url then return apply_probe_result(name, false, "no lmcp url", nil, nil) end local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/list", {}) if result and result.tools then return apply_probe_result(name, true, nil, "lmcp", #result.tools) end return apply_probe_result(name, false, "lmcp: " .. tostring(err), nil, nil) end -- Parallel lmcp probe for every backend with an lmcp_url, via a single -- bash fan-out of curl calls. Total wall clock ≈ PROBE_BUDGET. local function probe_all_parallel(force) local now = os.time() local need = {} for name, b in pairs(backends) do if b.lmcp_url and (force or not cache_fresh(status[name], now)) then need[#need+1] = b end end if #need == 0 then return end local script_parts = {} for _, b in ipairs(need) do local auth = b.token and (" -H 'Authorization: Bearer " .. b.token .. "'") or "" local url = b.lmcp_url:gsub("'", "'\\''") script_parts[#script_parts+1] = string.format( "(curl --max-time %d -s -o /dev/null -w '%s %%{http_code} %%{time_total}\\n' -X POST%s -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\",\"params\":{}}' '%s' || echo '%s ERR 0') &", PROBE_BUDGET, b.name, auth, url, b.name ) end script_parts[#script_parts+1] = "wait" local t0 = monotonic() local p = io.popen(table.concat(script_parts, "\n")) local out = p and p:read("*a") or "" if p then p:close() end local dt = monotonic() - t0 local seen = {} for line in out:gmatch("[^\n]+") do local name, code, t = line:match("^(%S+)%s+(%S+)%s+([%d%.]+)") if name then seen[name] = true local is_up = (code == "200") if is_up then apply_probe_result(name, true, nil, "lmcp", nil) else apply_probe_result(name, false, "lmcp code=" .. code, nil, nil) end end end -- Hosts with no output (fan-out error): mark DOWN for _, b in ipairs(need) do if not seen[b.name] then apply_probe_result(b.name, false, "probe fan-out missing", nil, nil) end end logreq("probe_all_parallel n=%d elapsed=%.2fs", #need, dt) end -- ---- Call-tool dispatcher ---------------------------------------------- -- `allow_ssh`: whether the tool has an SSH fallback path -- `ssh_impl`: function(backend, args) -> output string, or nil + error local function call_remote(tool, args, allow_ssh, ssh_impl) local t_start = monotonic() local host = args.host if type(host) ~= "string" or host == "" then logreq("tool=%s ERR missing-host", tool) return "Error: missing `host` parameter" end local b = backends[host] if not b then logreq("tool=%s host=%s ERR unknown-host", tool, host) return string.format("Error: unknown host %q (registry: %s)", host, CONF_PATH) end local errs = {} local via = nil -- Try lmcp first if b.lmcp_url then local args_pass = {} for k, v in pairs(args) do if k ~= "host" then args_pass[k] = v end end local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/call", { name = tool, arguments = args_pass }) if result then via = "lmcp" logreq("tool=%s host=%s via=lmcp elapsed=%.2fs", tool, host, monotonic() - t_start) -- Propagate backend's content, extract first text block if type(result.content) == "table" and result.content[1] and result.content[1].text then local prefix = result.isError and "[lmcp isError] " or "" return prefix .. result.content[1].text end return json.encode(result) end errs[#errs+1] = "lmcp: " .. tostring(err) else errs[#errs+1] = "lmcp: no url configured" end -- Fallback to ssh if allow_ssh and ssh_impl and b.ssh_host then local out, serr = ssh_impl(b, args) if out ~= nil then logreq("tool=%s host=%s via=ssh elapsed=%.2fs", tool, host, monotonic() - t_start) return "[via ssh fallback]\n" .. out end errs[#errs+1] = "ssh: " .. tostring(serr) elseif allow_ssh and not b.ssh_host then errs[#errs+1] = "ssh: no host configured" end logreq("tool=%s host=%s FAIL elapsed=%.2fs err=%s", tool, host, monotonic() - t_start, table.concat(errs, " | "):sub(1, 200)) return "Error: " .. table.concat(errs, " | ") end -- ---- SSH implementations for each fallback-capable tool ---------------- local function ssh_shell(b, a) if type(a.command) ~= "string" then return nil, "command required" end local cwd_prefix = "" if a.cwd and a.cwd ~= "" then cwd_prefix = "cd " .. shell_quote(a.cwd) .. " && " end local script = cwd_prefix .. a.command .. "\n" local out, err, rc = ssh_run_script(b.ssh_host, script) if err then return nil, err end -- ssh exits 255 when it couldn't connect/authenticate. Treat that as -- fallback-failed so the caller sees the combined lmcp+ssh errors. if rc == 255 then local line1 = (out:match("^([^\n]+)") or ""):gsub("%s+$", "") return nil, "ssh connect failed: " .. line1 end return string.format("[rc=%d]\n%s", rc, out) end local function ssh_read_file(b, a) if type(a.path) ~= "string" then return nil, "path required" end local script = "cat -- " .. shell_quote(a.path) .. "\n" local out, err, rc = ssh_run_script(b.ssh_host, script) if err then return nil, err end if rc ~= 0 then return nil, "cat exit " .. rc .. ": " .. out:sub(1, 200) end return out end local function ssh_write_file(b, a) if type(a.path) ~= "string" or type(a.content) ~= "string" then return nil, "path and content required" end -- Emit a script that decodes content from base64, so no shell -- interpretation of the payload is possible. Lua stdlib has no base64; -- implement a minimal encoder inline. local b64_alphabet = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" local function b64enc(data) local out = {} local pad = (3 - (#data % 3)) % 3 local padded = data .. ("\0"):rep(pad) for i = 1, #padded, 3 do local a1, a2, a3 = padded:byte(i), padded:byte(i+1), padded:byte(i+2) local n = a1 * 65536 + a2 * 256 + a3 out[#out+1] = b64_alphabet:sub(((n >> 18) & 63) + 1, ((n >> 18) & 63) + 1) out[#out+1] = b64_alphabet:sub(((n >> 12) & 63) + 1, ((n >> 12) & 63) + 1) out[#out+1] = b64_alphabet:sub(((n >> 6) & 63) + 1, ((n >> 6) & 63) + 1) out[#out+1] = b64_alphabet:sub(((n ) & 63) + 1, ((n ) & 63) + 1) end local s = table.concat(out) if pad > 0 then s = s:sub(1, -pad - 1) .. ("="):rep(pad) end return s end local payload = b64enc(a.content) local script = string.format( "base64 -d > %s <<'EOF_B64_PAYLOAD'\n%s\nEOF_B64_PAYLOAD\n", shell_quote(a.path), payload ) local out, err, rc = ssh_run_script(b.ssh_host, script) if err then return nil, err end if rc ~= 0 then return nil, "write exit " .. rc .. ": " .. out:sub(1, 200) end return string.format("Written %d bytes to %s (via ssh)", #a.content, a.path) end local function ssh_list_dir(b, a) local path = (a and type(a.path) == "string" and a.path) or "." local script = "ls -1 -- " .. shell_quote(path) .. "\n" local out, err, rc = ssh_run_script(b.ssh_host, script) if err then return nil, err end if rc ~= 0 then return nil, "ls exit " .. rc .. ": " .. out:sub(1, 200) end return out end local function ssh_search_files(b, a) if type(a.pattern) ~= "string" then return nil, "pattern required" end local path = (a.path and a.path ~= "") and a.path or "/" local script = string.format( "find %s -name %s 2>/dev/null | head -200\n", shell_quote(path), shell_quote(a.pattern) ) local out, err, rc = ssh_run_script(b.ssh_host, script) if err then return nil, err end return out end -- ---- Server setup ------------------------------------------------------ load_registry() math.randomseed(os.time()) local server = lmcp.new(os.getenv("LMCP_NAME") or "hub-tools", { port = tonumber(os.getenv("LMCP_PORT") or arg[1]) or 8090, version = "0.5.4", conf = os.getenv("LMCP_HUB_CONF") or "/opt/herding/etc/lmcp-hub.conf", }) local HOST_ARG = { type = "string", description = "Name of the backend host (see remote_list_hosts)", } server:tool("remote_list_hosts", "List all registered fleet hosts and their live status (lmcp vs ssh vs down).", { type = "object", properties = { force = { type = "boolean", description = "Force re-probe (bypass cache)", default = false }, } }, function(a) local force = a and a.force or false probe_all_parallel(force) local lines = {} local names = {} for n in pairs(backends) do names[#names+1] = n end table.sort(names) for _, n in ipairs(names) do local b = backends[n] local s = status[n] or { up = false, err = "no probe result" } local paths = {} if b.lmcp_url then paths[#paths+1] = "lmcp" end if b.ssh_host then paths[#paths+1] = "ssh" end lines[#lines+1] = string.format( "%-14s %-6s via=%-4s paths=%s %s", n, s.up and "UP" or "DOWN", tostring(s.via or "-"), table.concat(paths, ","), s.err and ("[" .. s.err .. "]") or "" ) end return table.concat(lines, "\n") end, { annotations = { title = "List fleet hosts", readOnlyHint = true, destructiveHint = false, idempotentHint = true, openWorldHint = true, } } ) server:tool("remote_shell", "Run a shell command on a fleet host. lmcp-primary with ssh fallback.", { type = "object", properties = { host = HOST_ARG, command = { type = "string", description = "Shell command" }, cwd = { type = "string", description = "Working directory" }, timeout = { type = "integer", description = "Timeout (seconds)", default = 120 }, }, required = { "host", "command" } }, function(a) return call_remote("shell", a, true, ssh_shell) end, { annotations = { title = "Remote shell", readOnlyHint = false, destructiveHint = true, idempotentHint = false, openWorldHint = true, } } ) server:tool("remote_read_file", "Read a file from a fleet host.", { type = "object", properties = { host = HOST_ARG, path = { type = "string", description = "File path" }, }, required = { "host", "path" } }, function(a) return call_remote("read_file", a, true, ssh_read_file) end, { annotations = { title = "Remote read file", readOnlyHint = true, destructiveHint = false, idempotentHint = true, openWorldHint = true, } } ) server:tool("remote_write_file", "Write content to a file on a fleet host.", { type = "object", properties = { host = HOST_ARG, path = { type = "string" }, content = { type = "string" }, }, required = { "host", "path", "content" } }, function(a) return call_remote("write_file", a, true, ssh_write_file) end, { annotations = { title = "Remote write file", readOnlyHint = false, destructiveHint = true, idempotentHint = true, openWorldHint = true, } } ) server:tool("remote_edit_file", "Literal-match edit on a fleet host. **Requires backend lmcp up** — no ssh fallback.", { type = "object", properties = { host = HOST_ARG, path = { type = "string" }, old_string = { type = "string" }, new_string = { type = "string" }, replace_all = { type = "boolean", default = false }, }, required = { "host", "path", "old_string", "new_string" } }, function(a) return call_remote("edit_file", a, false, nil) end, { annotations = { title = "Remote edit file", readOnlyHint = false, destructiveHint = true, idempotentHint = false, openWorldHint = true, } } ) server:tool("remote_shell_bg", "Launch a detached background command on a fleet host (Linux). Returns PID + log path immediately. Requires backend lmcp v0.5.2+.", { type = "object", properties = { host = HOST_ARG, command = { type = "string", description = "Shell command" }, cwd = { type = "string" }, log = { type = "string", description = "Log file path" }, }, required = { "host", "command" } }, function(a) return call_remote("shell_bg", a, false, nil) end, { annotations = { title = "Remote shell (background)", readOnlyHint = false, destructiveHint = true, idempotentHint = false, openWorldHint = true, } } ) server:tool("remote_list_dir", "List directory entries on a fleet host.", { type = "object", properties = { host = HOST_ARG, path = { type = "string", default = "." }, }, required = { "host" } }, function(a) return call_remote("list_dir", a, true, ssh_list_dir) end, { annotations = { title = "Remote list directory", readOnlyHint = true, destructiveHint = false, idempotentHint = true, openWorldHint = true, } } ) server:tool("remote_search_files", "find-by-pattern on a fleet host.", { type = "object", properties = { host = HOST_ARG, pattern = { type = "string" }, path = { type = "string", default = "/" }, }, required = { "host", "pattern" } }, function(a) return call_remote("search_files", a, true, ssh_search_files) end, { annotations = { title = "Remote find files", readOnlyHint = true, destructiveHint = false, idempotentHint = true, openWorldHint = true, } } ) io.stderr:write(string.format("lmcp-hub starting on port %d with %d backends from %s\n", server.port, (function() local n = 0; for _ in pairs(backends) do n = n + 1 end; return n end)(), CONF_PATH)) server:run()