Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 17af91a99b | |||
| b29a2716d1 | |||
| 555beb9fd9 |
@@ -25,9 +25,26 @@ local socket = require('socket')
|
||||
-- ---- Backend registry ---------------------------------------------------
|
||||
|
||||
local CONF_PATH = os.getenv("LMCP_HUB_BACKENDS") or "/opt/herding/etc/hub-backends.conf"
|
||||
local PROBE_TTL = tonumber(os.getenv("LMCP_HUB_PROBE_TTL") or "30")
|
||||
local PROBE_TTL_UP = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_UP") or "30")
|
||||
local PROBE_TTL_DOWN_MIN = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_DOWN_MIN") or "60")
|
||||
local PROBE_TTL_DOWN_MAX = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_DOWN_MAX") or "900")
|
||||
local PROBE_BUDGET = tonumber(os.getenv("LMCP_HUB_PROBE_BUDGET") or "3")
|
||||
local LMCP_TIMEOUT = tonumber(os.getenv("LMCP_HUB_LMCP_TIMEOUT") or "6")
|
||||
local SSH_TIMEOUT = tonumber(os.getenv("LMCP_HUB_SSH_TIMEOUT") or "10")
|
||||
local SSH_HARD_TIMEOUT = tonumber(os.getenv("LMCP_HUB_SSH_HARD_TIMEOUT") or "30")
|
||||
local LOG_REQUESTS = (os.getenv("LMCP_HUB_LOG") or "1") ~= "0"
|
||||
|
||||
local function logreq(fmt, ...)
|
||||
if not LOG_REQUESTS then return end
|
||||
io.stderr:write(string.format("[hub %s] " .. fmt .. "\n",
|
||||
os.date("%H:%M:%S"), ...))
|
||||
io.stderr:flush()
|
||||
end
|
||||
|
||||
local function monotonic()
|
||||
local ok, s = pcall(socket.gettime)
|
||||
return ok and s or os.time()
|
||||
end
|
||||
|
||||
local backends = {} -- name -> { name, ssh_host, lmcp_url, token }
|
||||
local status = {} -- name -> { up=bool, via="lmcp"|"ssh"|nil, checked=t, err=... }
|
||||
@@ -160,12 +177,17 @@ local function ssh_run_script(ssh_host, script)
|
||||
local spath, perr = tmpwrite(script)
|
||||
if not spath then return nil, perr, -1 end
|
||||
local outpath = spath .. ".out"
|
||||
-- Hard-cap wall time with GNU `timeout` — ssh's ConnectTimeout only
|
||||
-- bounds TCP connect, not the session. Without this, a half-dead sshd
|
||||
-- (auth stall, remote bash-s hang) locks the hub event loop indefinitely.
|
||||
local cmd = string.format(
|
||||
"ssh -o ConnectTimeout=%d -o BatchMode=yes -o StrictHostKeyChecking=accept-new %s 'bash -s' < %s > %s 2>&1; echo $? > %s.rc",
|
||||
SSH_TIMEOUT, shell_quote(ssh_host), shell_quote(spath),
|
||||
"timeout --kill-after=2 %d ssh -o ConnectTimeout=%d -o ServerAliveInterval=5 -o ServerAliveCountMax=2 -o BatchMode=yes -o StrictHostKeyChecking=accept-new %s 'bash -s' < %s > %s 2>&1; echo $? > %s.rc",
|
||||
SSH_HARD_TIMEOUT, SSH_TIMEOUT, shell_quote(ssh_host), shell_quote(spath),
|
||||
shell_quote(outpath), shell_quote(spath)
|
||||
)
|
||||
local t0 = monotonic()
|
||||
os.execute(cmd)
|
||||
local dt = monotonic() - t0
|
||||
local rc_f = io.open(spath .. ".rc", "r")
|
||||
local rc = -1
|
||||
if rc_f then
|
||||
@@ -178,42 +200,123 @@ local function ssh_run_script(ssh_host, script)
|
||||
local output = out_f and out_f:read("*a") or ""
|
||||
if out_f then out_f:close() end
|
||||
os.remove(spath); os.remove(outpath); os.remove(spath .. ".rc")
|
||||
-- timeout(1) exits 124 on wall-clock expiry, 137 on SIGKILL. Surface it.
|
||||
if rc == 124 or rc == 137 then
|
||||
logreq("ssh HARD-TIMEOUT host=%s after=%.2fs rc=%d", ssh_host, dt, rc)
|
||||
return nil, string.format("ssh hard-timeout after %ds", SSH_HARD_TIMEOUT), rc
|
||||
end
|
||||
logreq("ssh host=%s elapsed=%.2fs rc=%d bytes=%d", ssh_host, dt, rc, #output)
|
||||
return output, nil, rc
|
||||
end
|
||||
|
||||
-- ---- Health probe ------------------------------------------------------
|
||||
--
|
||||
-- Design notes:
|
||||
-- - Probe is lmcp-only. SSH is NOT checked here: it's expensive (3-6s per
|
||||
-- offline host) and the hub exists specifically to absorb lots of
|
||||
-- offline hosts. Hosts with lmcp down but ssh up show as DOWN in the
|
||||
-- health list, but actual remote_* calls still fall through to ssh
|
||||
-- fallback correctly.
|
||||
-- - Sticky DOWN cache with exponential backoff: a DOWN host is re-probed
|
||||
-- at intervals that grow from PROBE_TTL_DOWN_MIN (default 60s) up to
|
||||
-- PROBE_TTL_DOWN_MAX (default 900s). Prevents a sleeping fleet from
|
||||
-- burning probe budget every health check.
|
||||
-- - `remote_list_hosts` uses a parallel curl fan-out for all hosts at
|
||||
-- once so wall-clock is bounded by PROBE_BUDGET (default 3s), not the
|
||||
-- sum of per-host timeouts.
|
||||
|
||||
local function cache_fresh(s, now)
|
||||
if not s then return false end
|
||||
local ttl
|
||||
if s.up then
|
||||
ttl = PROBE_TTL_UP
|
||||
else
|
||||
ttl = math.min(PROBE_TTL_DOWN_MAX, PROBE_TTL_DOWN_MIN * (2 ^ (s.down_streak or 0)))
|
||||
end
|
||||
return (now - s.checked) < ttl
|
||||
end
|
||||
|
||||
local function apply_probe_result(name, up, err, via, tool_count)
|
||||
local prev = status[name]
|
||||
local new_s = {
|
||||
checked = os.time(),
|
||||
up = up,
|
||||
via = up and (via or "lmcp") or nil,
|
||||
err = err,
|
||||
tool_count = tool_count,
|
||||
down_streak = up and 0 or ((prev and prev.down_streak or 0) + 1),
|
||||
}
|
||||
status[name] = new_s
|
||||
return new_s
|
||||
end
|
||||
|
||||
-- Fallback single-host probe used only when a caller explicitly needs a
|
||||
-- status refresh outside the parallel path (currently only
|
||||
-- remote_list_hosts when a single host is looked up). lmcp-only.
|
||||
local function probe(name, force)
|
||||
local b = backends[name]
|
||||
if not b then return { up = false, err = "unknown host" } end
|
||||
local now = os.time()
|
||||
local s = status[name]
|
||||
if not force and s and (now - s.checked) < PROBE_TTL then
|
||||
return s
|
||||
if not force and cache_fresh(s, os.time()) then return s end
|
||||
if not b.lmcp_url then
|
||||
return apply_probe_result(name, false, "no lmcp url", nil, nil)
|
||||
end
|
||||
local new_s = { checked = now, up = false, via = nil, err = nil }
|
||||
if b.lmcp_url then
|
||||
local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/list", {})
|
||||
if result and result.tools then
|
||||
new_s.up = true
|
||||
new_s.via = "lmcp"
|
||||
new_s.tool_count = #result.tools
|
||||
else
|
||||
new_s.err = "lmcp: " .. tostring(err)
|
||||
local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/list", {})
|
||||
if result and result.tools then
|
||||
return apply_probe_result(name, true, nil, "lmcp", #result.tools)
|
||||
end
|
||||
return apply_probe_result(name, false, "lmcp: " .. tostring(err), nil, nil)
|
||||
end
|
||||
|
||||
-- Parallel lmcp probe for every backend with an lmcp_url, via a single
|
||||
-- bash fan-out of curl calls. Total wall clock ≈ PROBE_BUDGET.
|
||||
local function probe_all_parallel(force)
|
||||
local now = os.time()
|
||||
local need = {}
|
||||
for name, b in pairs(backends) do
|
||||
if b.lmcp_url and (force or not cache_fresh(status[name], now)) then
|
||||
need[#need+1] = b
|
||||
end
|
||||
end
|
||||
if not new_s.up and b.ssh_host then
|
||||
local _, _, rc = ssh_run_script(b.ssh_host, "exit 0\n")
|
||||
if rc == 0 then
|
||||
new_s.up = true
|
||||
new_s.via = "ssh"
|
||||
new_s.err = nil
|
||||
else
|
||||
new_s.err = (new_s.err or "") .. "; ssh: rc=" .. tostring(rc)
|
||||
if #need == 0 then return end
|
||||
|
||||
local script_parts = {}
|
||||
for _, b in ipairs(need) do
|
||||
local auth = b.token and (" -H 'Authorization: Bearer " .. b.token .. "'") or ""
|
||||
local url = b.lmcp_url:gsub("'", "'\\''")
|
||||
script_parts[#script_parts+1] = string.format(
|
||||
"(curl --max-time %d -s -o /dev/null -w '%s %%{http_code} %%{time_total}\\n' -X POST%s -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\",\"params\":{}}' '%s' || echo '%s ERR 0') &",
|
||||
PROBE_BUDGET, b.name, auth, url, b.name
|
||||
)
|
||||
end
|
||||
script_parts[#script_parts+1] = "wait"
|
||||
|
||||
local t0 = monotonic()
|
||||
local p = io.popen(table.concat(script_parts, "\n"))
|
||||
local out = p and p:read("*a") or ""
|
||||
if p then p:close() end
|
||||
local dt = monotonic() - t0
|
||||
|
||||
local seen = {}
|
||||
for line in out:gmatch("[^\n]+") do
|
||||
local name, code, t = line:match("^(%S+)%s+(%S+)%s+([%d%.]+)")
|
||||
if name then
|
||||
seen[name] = true
|
||||
local is_up = (code == "200")
|
||||
if is_up then
|
||||
apply_probe_result(name, true, nil, "lmcp", nil)
|
||||
else
|
||||
apply_probe_result(name, false, "lmcp code=" .. code, nil, nil)
|
||||
end
|
||||
end
|
||||
end
|
||||
status[name] = new_s
|
||||
return new_s
|
||||
-- Hosts with no output (fan-out error): mark DOWN
|
||||
for _, b in ipairs(need) do
|
||||
if not seen[b.name] then
|
||||
apply_probe_result(b.name, false, "probe fan-out missing", nil, nil)
|
||||
end
|
||||
end
|
||||
logreq("probe_all_parallel n=%d elapsed=%.2fs", #need, dt)
|
||||
end
|
||||
|
||||
-- ---- Call-tool dispatcher ----------------------------------------------
|
||||
@@ -221,16 +324,20 @@ end
|
||||
-- `allow_ssh`: whether the tool has an SSH fallback path
|
||||
-- `ssh_impl`: function(backend, args) -> output string, or nil + error
|
||||
local function call_remote(tool, args, allow_ssh, ssh_impl)
|
||||
local t_start = monotonic()
|
||||
local host = args.host
|
||||
if type(host) ~= "string" or host == "" then
|
||||
logreq("tool=%s ERR missing-host", tool)
|
||||
return "Error: missing `host` parameter"
|
||||
end
|
||||
local b = backends[host]
|
||||
if not b then
|
||||
logreq("tool=%s host=%s ERR unknown-host", tool, host)
|
||||
return string.format("Error: unknown host %q (registry: %s)", host, CONF_PATH)
|
||||
end
|
||||
|
||||
local errs = {}
|
||||
local via = nil
|
||||
|
||||
-- Try lmcp first
|
||||
if b.lmcp_url then
|
||||
@@ -239,6 +346,8 @@ local function call_remote(tool, args, allow_ssh, ssh_impl)
|
||||
local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/call",
|
||||
{ name = tool, arguments = args_pass })
|
||||
if result then
|
||||
via = "lmcp"
|
||||
logreq("tool=%s host=%s via=lmcp elapsed=%.2fs", tool, host, monotonic() - t_start)
|
||||
-- Propagate backend's content, extract first text block
|
||||
if type(result.content) == "table" and result.content[1] and result.content[1].text then
|
||||
local prefix = result.isError and "[lmcp isError] " or ""
|
||||
@@ -254,12 +363,16 @@ local function call_remote(tool, args, allow_ssh, ssh_impl)
|
||||
-- Fallback to ssh
|
||||
if allow_ssh and ssh_impl and b.ssh_host then
|
||||
local out, serr = ssh_impl(b, args)
|
||||
if out ~= nil then return "[via ssh fallback]\n" .. out end
|
||||
if out ~= nil then
|
||||
logreq("tool=%s host=%s via=ssh elapsed=%.2fs", tool, host, monotonic() - t_start)
|
||||
return "[via ssh fallback]\n" .. out
|
||||
end
|
||||
errs[#errs+1] = "ssh: " .. tostring(serr)
|
||||
elseif allow_ssh and not b.ssh_host then
|
||||
errs[#errs+1] = "ssh: no host configured"
|
||||
end
|
||||
|
||||
logreq("tool=%s host=%s FAIL elapsed=%.2fs err=%s", tool, host, monotonic() - t_start, table.concat(errs, " | "):sub(1, 200))
|
||||
return "Error: " .. table.concat(errs, " | ")
|
||||
end
|
||||
|
||||
@@ -355,7 +468,7 @@ math.randomseed(os.time())
|
||||
|
||||
local server = lmcp.new(os.getenv("LMCP_NAME") or "hub-tools", {
|
||||
port = tonumber(os.getenv("LMCP_PORT") or arg[1]) or 8090,
|
||||
version = "0.5.0",
|
||||
version = "0.5.3",
|
||||
conf = os.getenv("LMCP_HUB_CONF") or "/opt/herding/etc/lmcp-hub.conf",
|
||||
})
|
||||
|
||||
@@ -371,13 +484,14 @@ server:tool("remote_list_hosts",
|
||||
} },
|
||||
function(a)
|
||||
local force = a and a.force or false
|
||||
probe_all_parallel(force)
|
||||
local lines = {}
|
||||
local names = {}
|
||||
for n in pairs(backends) do names[#names+1] = n end
|
||||
table.sort(names)
|
||||
for _, n in ipairs(names) do
|
||||
local b = backends[n]
|
||||
local s = probe(n, force)
|
||||
local s = status[n] or { up = false, err = "no probe result" }
|
||||
local paths = {}
|
||||
if b.lmcp_url then paths[#paths+1] = "lmcp" end
|
||||
if b.ssh_host then paths[#paths+1] = "ssh" end
|
||||
@@ -433,6 +547,17 @@ server:tool("remote_edit_file",
|
||||
function(a) return call_remote("edit_file", a, false, nil) end
|
||||
)
|
||||
|
||||
server:tool("remote_shell_bg",
|
||||
"Launch a detached background command on a fleet host (Linux). Returns PID + log path immediately. Requires backend lmcp v0.5.2+.",
|
||||
{ type = "object", properties = {
|
||||
host = HOST_ARG,
|
||||
command = { type = "string", description = "Shell command" },
|
||||
cwd = { type = "string" },
|
||||
log = { type = "string", description = "Log file path" },
|
||||
}, required = { "host", "command" } },
|
||||
function(a) return call_remote("shell_bg", a, false, nil) end
|
||||
)
|
||||
|
||||
server:tool("remote_list_dir", "List directory entries on a fleet host.",
|
||||
{ type = "object", properties = {
|
||||
host = HOST_ARG,
|
||||
|
||||
+47
-1
@@ -170,6 +170,49 @@ server:tool("shell", "Execute a shell command.", {
|
||||
return run(cmd, a.timeout or 120)
|
||||
end)
|
||||
|
||||
server:tool("shell_bg",
|
||||
"Fire-and-forget shell command (Linux-only). Fully detaches via setsid+nohup+stdio-redirect and returns immediately with PID and log path. Use for daemons that must outlive the lmcp request.",
|
||||
{
|
||||
type = "object",
|
||||
properties = {
|
||||
command = { type = "string", description = "Shell command to launch" },
|
||||
cwd = { type = "string", description = "Working directory" },
|
||||
log = { type = "string", description = "Log file (stdout+stderr). Default: /tmp/lmcp-bg-<ts>-<rand>.log" },
|
||||
},
|
||||
required = { "command" },
|
||||
},
|
||||
function(a)
|
||||
if WINDOWS then
|
||||
return "Error: shell_bg is Linux-only (Windows Start-Process equivalent TBD)"
|
||||
end
|
||||
if type(a.command) ~= "string" or a.command == "" then
|
||||
return "Error: command required"
|
||||
end
|
||||
local log = a.log
|
||||
if not log or log == "" then
|
||||
log = string.format("/tmp/lmcp-bg-%d-%d.log", os.time(), math.random(1000, 9999))
|
||||
end
|
||||
local pid_file = log .. ".pid"
|
||||
local inner = a.command
|
||||
if a.cwd and a.cwd ~= "" then
|
||||
inner = "cd '" .. a.cwd:gsub("'", "'\\''") .. "' && " .. inner
|
||||
end
|
||||
local sq = function(s) return "'" .. s:gsub("'", "'\\''") .. "'" end
|
||||
local full = string.format(
|
||||
"setsid nohup sh -c %s </dev/null >%s 2>&1 & echo $! > %s",
|
||||
sq(inner), sq(log), sq(pid_file)
|
||||
)
|
||||
os.execute(full)
|
||||
local f = io.open(pid_file, 'r')
|
||||
local pid = "?"
|
||||
if f then
|
||||
pid = (f:read('*a') or ""):match("(%d+)") or "?"
|
||||
f:close()
|
||||
os.remove(pid_file)
|
||||
end
|
||||
return string.format("launched pid=%s log=%s", pid, log)
|
||||
end)
|
||||
|
||||
server:tool("read_file", "Read a file.", {
|
||||
type = "object",
|
||||
properties = { path = { type = "string" } },
|
||||
@@ -272,7 +315,10 @@ server:tool("search_files", "Search for files by pattern.", {
|
||||
if WINDOWS then
|
||||
return run('dir /b /s "' .. path .. '\\' .. a.pattern .. '"', 30)
|
||||
else
|
||||
return run("find '" .. path:gsub("'", "'\\''") .. "' -name '" .. a.pattern:gsub("'", "'\\''") .. "' 2>/dev/null", 30)
|
||||
-- -L: follow symlinks on the start path. macOS BSD find otherwise
|
||||
-- silently emits nothing when the start path is itself a symlink
|
||||
-- (common on Homebrew, e.g. /usr/local/share/lua -> Cellar/…/share/lua).
|
||||
return run("find -L '" .. path:gsub("'", "'\\''") .. "' -name '" .. a.pattern:gsub("'", "'\\''") .. "' 2>/dev/null", 30)
|
||||
end
|
||||
end)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user