diff --git a/hub.lua b/hub.lua index f4566da..d4d3bce 100644 --- a/hub.lua +++ b/hub.lua @@ -25,9 +25,26 @@ local socket = require('socket') -- ---- Backend registry --------------------------------------------------- local CONF_PATH = os.getenv("LMCP_HUB_BACKENDS") or "/opt/herding/etc/hub-backends.conf" -local PROBE_TTL = tonumber(os.getenv("LMCP_HUB_PROBE_TTL") or "30") +local PROBE_TTL_UP = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_UP") or "30") +local PROBE_TTL_DOWN_MIN = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_DOWN_MIN") or "60") +local PROBE_TTL_DOWN_MAX = tonumber(os.getenv("LMCP_HUB_PROBE_TTL_DOWN_MAX") or "900") +local PROBE_BUDGET = tonumber(os.getenv("LMCP_HUB_PROBE_BUDGET") or "3") local LMCP_TIMEOUT = tonumber(os.getenv("LMCP_HUB_LMCP_TIMEOUT") or "6") local SSH_TIMEOUT = tonumber(os.getenv("LMCP_HUB_SSH_TIMEOUT") or "10") +local SSH_HARD_TIMEOUT = tonumber(os.getenv("LMCP_HUB_SSH_HARD_TIMEOUT") or "30") +local LOG_REQUESTS = (os.getenv("LMCP_HUB_LOG") or "1") ~= "0" + +local function logreq(fmt, ...) + if not LOG_REQUESTS then return end + io.stderr:write(string.format("[hub %s] " .. fmt .. "\n", + os.date("%H:%M:%S"), ...)) + io.stderr:flush() +end + +local function monotonic() + local ok, s = pcall(socket.gettime) + return ok and s or os.time() +end local backends = {} -- name -> { name, ssh_host, lmcp_url, token } local status = {} -- name -> { up=bool, via="lmcp"|"ssh"|nil, checked=t, err=... } @@ -160,12 +177,17 @@ local function ssh_run_script(ssh_host, script) local spath, perr = tmpwrite(script) if not spath then return nil, perr, -1 end local outpath = spath .. ".out" + -- Hard-cap wall time with GNU `timeout` — ssh's ConnectTimeout only + -- bounds TCP connect, not the session. Without this, a half-dead sshd + -- (auth stall, remote bash-s hang) locks the hub event loop indefinitely. local cmd = string.format( - "ssh -o ConnectTimeout=%d -o BatchMode=yes -o StrictHostKeyChecking=accept-new %s 'bash -s' < %s > %s 2>&1; echo $? > %s.rc", - SSH_TIMEOUT, shell_quote(ssh_host), shell_quote(spath), + "timeout --kill-after=2 %d ssh -o ConnectTimeout=%d -o ServerAliveInterval=5 -o ServerAliveCountMax=2 -o BatchMode=yes -o StrictHostKeyChecking=accept-new %s 'bash -s' < %s > %s 2>&1; echo $? > %s.rc", + SSH_HARD_TIMEOUT, SSH_TIMEOUT, shell_quote(ssh_host), shell_quote(spath), shell_quote(outpath), shell_quote(spath) ) + local t0 = monotonic() os.execute(cmd) + local dt = monotonic() - t0 local rc_f = io.open(spath .. ".rc", "r") local rc = -1 if rc_f then @@ -178,42 +200,123 @@ local function ssh_run_script(ssh_host, script) local output = out_f and out_f:read("*a") or "" if out_f then out_f:close() end os.remove(spath); os.remove(outpath); os.remove(spath .. ".rc") + -- timeout(1) exits 124 on wall-clock expiry, 137 on SIGKILL. Surface it. + if rc == 124 or rc == 137 then + logreq("ssh HARD-TIMEOUT host=%s after=%.2fs rc=%d", ssh_host, dt, rc) + return nil, string.format("ssh hard-timeout after %ds", SSH_HARD_TIMEOUT), rc + end + logreq("ssh host=%s elapsed=%.2fs rc=%d bytes=%d", ssh_host, dt, rc, #output) return output, nil, rc end -- ---- Health probe ------------------------------------------------------ +-- +-- Design notes: +-- - Probe is lmcp-only. SSH is NOT checked here: it's expensive (3-6s per +-- offline host) and the hub exists specifically to absorb lots of +-- offline hosts. Hosts with lmcp down but ssh up show as DOWN in the +-- health list, but actual remote_* calls still fall through to ssh +-- fallback correctly. +-- - Sticky DOWN cache with exponential backoff: a DOWN host is re-probed +-- at intervals that grow from PROBE_TTL_DOWN_MIN (default 60s) up to +-- PROBE_TTL_DOWN_MAX (default 900s). Prevents a sleeping fleet from +-- burning probe budget every health check. +-- - `remote_list_hosts` uses a parallel curl fan-out for all hosts at +-- once so wall-clock is bounded by PROBE_BUDGET (default 3s), not the +-- sum of per-host timeouts. +local function cache_fresh(s, now) + if not s then return false end + local ttl + if s.up then + ttl = PROBE_TTL_UP + else + ttl = math.min(PROBE_TTL_DOWN_MAX, PROBE_TTL_DOWN_MIN * (2 ^ (s.down_streak or 0))) + end + return (now - s.checked) < ttl +end + +local function apply_probe_result(name, up, err, via, tool_count) + local prev = status[name] + local new_s = { + checked = os.time(), + up = up, + via = up and (via or "lmcp") or nil, + err = err, + tool_count = tool_count, + down_streak = up and 0 or ((prev and prev.down_streak or 0) + 1), + } + status[name] = new_s + return new_s +end + +-- Fallback single-host probe used only when a caller explicitly needs a +-- status refresh outside the parallel path (currently only +-- remote_list_hosts when a single host is looked up). lmcp-only. local function probe(name, force) local b = backends[name] if not b then return { up = false, err = "unknown host" } end - local now = os.time() local s = status[name] - if not force and s and (now - s.checked) < PROBE_TTL then - return s + if not force and cache_fresh(s, os.time()) then return s end + if not b.lmcp_url then + return apply_probe_result(name, false, "no lmcp url", nil, nil) end - local new_s = { checked = now, up = false, via = nil, err = nil } - if b.lmcp_url then - local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/list", {}) - if result and result.tools then - new_s.up = true - new_s.via = "lmcp" - new_s.tool_count = #result.tools - else - new_s.err = "lmcp: " .. tostring(err) + local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/list", {}) + if result and result.tools then + return apply_probe_result(name, true, nil, "lmcp", #result.tools) + end + return apply_probe_result(name, false, "lmcp: " .. tostring(err), nil, nil) +end + +-- Parallel lmcp probe for every backend with an lmcp_url, via a single +-- bash fan-out of curl calls. Total wall clock ≈ PROBE_BUDGET. +local function probe_all_parallel(force) + local now = os.time() + local need = {} + for name, b in pairs(backends) do + if b.lmcp_url and (force or not cache_fresh(status[name], now)) then + need[#need+1] = b end end - if not new_s.up and b.ssh_host then - local _, _, rc = ssh_run_script(b.ssh_host, "exit 0\n") - if rc == 0 then - new_s.up = true - new_s.via = "ssh" - new_s.err = nil - else - new_s.err = (new_s.err or "") .. "; ssh: rc=" .. tostring(rc) + if #need == 0 then return end + + local script_parts = {} + for _, b in ipairs(need) do + local auth = b.token and (" -H 'Authorization: Bearer " .. b.token .. "'") or "" + local url = b.lmcp_url:gsub("'", "'\\''") + script_parts[#script_parts+1] = string.format( + "(curl --max-time %d -s -o /dev/null -w '%s %%{http_code} %%{time_total}\\n' -X POST%s -H 'Content-Type: application/json' -d '{\"jsonrpc\":\"2.0\",\"id\":1,\"method\":\"tools/list\",\"params\":{}}' '%s' || echo '%s ERR 0') &", + PROBE_BUDGET, b.name, auth, url, b.name + ) + end + script_parts[#script_parts+1] = "wait" + + local t0 = monotonic() + local p = io.popen(table.concat(script_parts, "\n")) + local out = p and p:read("*a") or "" + if p then p:close() end + local dt = monotonic() - t0 + + local seen = {} + for line in out:gmatch("[^\n]+") do + local name, code, t = line:match("^(%S+)%s+(%S+)%s+([%d%.]+)") + if name then + seen[name] = true + local is_up = (code == "200") + if is_up then + apply_probe_result(name, true, nil, "lmcp", nil) + else + apply_probe_result(name, false, "lmcp code=" .. code, nil, nil) + end end end - status[name] = new_s - return new_s + -- Hosts with no output (fan-out error): mark DOWN + for _, b in ipairs(need) do + if not seen[b.name] then + apply_probe_result(b.name, false, "probe fan-out missing", nil, nil) + end + end + logreq("probe_all_parallel n=%d elapsed=%.2fs", #need, dt) end -- ---- Call-tool dispatcher ---------------------------------------------- @@ -221,16 +324,20 @@ end -- `allow_ssh`: whether the tool has an SSH fallback path -- `ssh_impl`: function(backend, args) -> output string, or nil + error local function call_remote(tool, args, allow_ssh, ssh_impl) + local t_start = monotonic() local host = args.host if type(host) ~= "string" or host == "" then + logreq("tool=%s ERR missing-host", tool) return "Error: missing `host` parameter" end local b = backends[host] if not b then + logreq("tool=%s host=%s ERR unknown-host", tool, host) return string.format("Error: unknown host %q (registry: %s)", host, CONF_PATH) end local errs = {} + local via = nil -- Try lmcp first if b.lmcp_url then @@ -239,6 +346,8 @@ local function call_remote(tool, args, allow_ssh, ssh_impl) local result, err = jsonrpc_call(b.lmcp_url, b.token, "tools/call", { name = tool, arguments = args_pass }) if result then + via = "lmcp" + logreq("tool=%s host=%s via=lmcp elapsed=%.2fs", tool, host, monotonic() - t_start) -- Propagate backend's content, extract first text block if type(result.content) == "table" and result.content[1] and result.content[1].text then local prefix = result.isError and "[lmcp isError] " or "" @@ -254,12 +363,16 @@ local function call_remote(tool, args, allow_ssh, ssh_impl) -- Fallback to ssh if allow_ssh and ssh_impl and b.ssh_host then local out, serr = ssh_impl(b, args) - if out ~= nil then return "[via ssh fallback]\n" .. out end + if out ~= nil then + logreq("tool=%s host=%s via=ssh elapsed=%.2fs", tool, host, monotonic() - t_start) + return "[via ssh fallback]\n" .. out + end errs[#errs+1] = "ssh: " .. tostring(serr) elseif allow_ssh and not b.ssh_host then errs[#errs+1] = "ssh: no host configured" end + logreq("tool=%s host=%s FAIL elapsed=%.2fs err=%s", tool, host, monotonic() - t_start, table.concat(errs, " | "):sub(1, 200)) return "Error: " .. table.concat(errs, " | ") end @@ -355,7 +468,7 @@ math.randomseed(os.time()) local server = lmcp.new(os.getenv("LMCP_NAME") or "hub-tools", { port = tonumber(os.getenv("LMCP_PORT") or arg[1]) or 8090, - version = "0.5.2", + version = "0.5.3", conf = os.getenv("LMCP_HUB_CONF") or "/opt/herding/etc/lmcp-hub.conf", }) @@ -371,13 +484,14 @@ server:tool("remote_list_hosts", } }, function(a) local force = a and a.force or false + probe_all_parallel(force) local lines = {} local names = {} for n in pairs(backends) do names[#names+1] = n end table.sort(names) for _, n in ipairs(names) do local b = backends[n] - local s = probe(n, force) + local s = status[n] or { up = false, err = "no probe result" } local paths = {} if b.lmcp_url then paths[#paths+1] = "lmcp" end if b.ssh_host then paths[#paths+1] = "ssh" end