2abd5da3a6
Phase 3 commit #2 per docs/PHASE3.md §12. Adds the LLM-probe gate on top of commit #1's static patterns. Together they form is_destructive. broker.lua extension: - opts.max_tokens (A2) — passed through to the request body. Phase 3 probes cap at 4 tokens for YES/NO replies. - opts.timeout_ms — overrides model_cfg.timeout_ms per-call. Probe uses 15000ms cap regardless of the model's normal timeout (the user's deep model has 1800000ms for long generations; the probe must stay snappy). - M.chat now accepts an opts table (same shape as chat_stream's). Backwards compatible — existing callers passing (cfg, msgs) unaffected. safety.lua additions: - llm_probe(cfg, system, cmd): single broker.chat call returning "YES"/"NO"/"YES_FAILSAFE"/"YES_UNPARSEABLE" — fail-safe defaults. - llm_second_opinion(cmd, cfg): two-probe protocol per R-B2. Probe 1: "Is this destructive?" — YES → flag. Probe 2 (only if probe 1 said NO): "Is this safe?" inverted question — NO → flag (disagreement = HALT). Both NO → safe. - Session-scoped cache _llm_cache keyed by normalized command (lowercased + whitespace-collapsed). Mitigates Q23 latency for repeated commands within a Norris run. - Model-selection precedence: cfg.safety.llm_model (explicit) → cfg.models.deep (independent local class) → cfg.models[default]. Fail-safe YES if none configured. - is_destructive(cmd, cfg): runs static patterns first (always), then LLM if cfg present + not explicitly opted-out. cfg=nil yields static-only mode (handy for tests). End-to-end verified against hossenfelder using qwen-coder-7b-32k as the deep probe (qwen3-30b-a3b-instruct in repo's config.lua isn't currently loaded on the local backend): cat /etc/hostname → hit=false (LLM: NO, NO inverted = safe) rm /tmp/x.log → hit=true (LLM flagged; static missed because no -r/-f flags) cp /etc/passwd /tmp/passwd.bak → hit=false (safe copy) cache: second probe on same cmd → 0s wall time static-only (cfg=nil): rm -rf /tmp/x → static hit, no LLM call opt-out (llm_second_opinion=false): cp x y → hit=false, no probe Test corpus (test_safety.lua, 87 cases) still all pass — cfg=nil preserves the static-only behavior. Note: production config.lua currently has `deep = qwen3-30b-a3b-instruct` which isn't loaded on the proxy backend right now; Norris users will hit the fail-safe (everything flagged destructive) until either the deep model is brought up OR cfg.safety.llm_model = "cloud" is set to route the probe through anthropic/claude-haiku-4.5. Update the config or model deployment for production use — covered by Phase 3 verify test case. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
174 lines
7.4 KiB
Lua
174 lines
7.4 KiB
Lua
-- broker.lua — llama.cpp HTTP client.
|
|
-- Phase 0: blocking POST via ffi/curl + vendored dkjson.
|
|
-- Phase 1: streaming POST via ffi/curl.post_sse with an OpenAI-shape decoder
|
|
-- on top. M.chat becomes a thin buffering wrapper around M.chat_stream so the
|
|
-- one streaming path covers both incremental and sync callers.
|
|
-- Phase 2: optional opts.tools array passed through to the request body
|
|
-- (omitted entirely when nil/empty per §12 risk row 1). The chat_stream
|
|
-- on_delta callback widens to (kind, payload) where kind is "text" or
|
|
-- "tool_call"; tool_call deltas are accumulated by `index` (default 0 if
|
|
-- absent per C2) and emitted as complete records on finish_reason "tool_calls".
|
|
-- broker.lua does NOT depend on mcp.lua — the caller assembles opts.tools
|
|
-- and passes it in. See docs/PHASE0.md §6, PHASE1.md §3, PHASE2.md §3 / §5.
|
|
|
|
local curl = require("ffi.curl")
|
|
local json = require("dkjson")
|
|
|
|
local M = {}
|
|
|
|
local function build_headers(model_cfg)
|
|
local h = { "Content-Type: application/json" }
|
|
if model_cfg.key_env then
|
|
local key = os.getenv(model_cfg.key_env)
|
|
if key and key ~= "" then
|
|
h[#h + 1] = "Authorization: Bearer " .. key
|
|
end
|
|
end
|
|
return h
|
|
end
|
|
|
|
local function build_request(model_cfg, messages, stream, tools, max_tokens)
|
|
if not (model_cfg and model_cfg.endpoint and model_cfg.model) then
|
|
return nil, "broker: model_cfg.endpoint and .model are required"
|
|
end
|
|
local url = model_cfg.endpoint:gsub("/+$", "") .. "/v1/chat/completions"
|
|
local req = {
|
|
model = model_cfg.model,
|
|
messages = messages,
|
|
stream = stream and true or false,
|
|
temperature = model_cfg.temperature or 0.2,
|
|
}
|
|
-- Per PHASE2.md §12 risk row "Empty tools array": some servers reject
|
|
-- "tools": []. Only set the field when the list has entries.
|
|
if tools and #tools > 0 then req.tools = tools end
|
|
-- Phase 3 (A2): max_tokens passthrough — used by safety.is_destructive
|
|
-- to cap YES/NO probes at ~4 tokens. Omitted when nil (Phase 1/2
|
|
-- callers unaffected — model defaults still apply).
|
|
if max_tokens then req.max_tokens = max_tokens end
|
|
return url, json.encode(req), build_headers(model_cfg),
|
|
(model_cfg.timeout_ms or 60000)
|
|
end
|
|
|
|
-- Streaming /v1/chat/completions.
|
|
-- Signature widens vs Phase 1: opts is optional and may carry .tools.
|
|
-- on_delta is called as on_delta(kind, payload):
|
|
-- on_delta("text", content_string) - per text chunk
|
|
-- on_delta("tool_call", { id, name, arguments }) - once per completed
|
|
-- tool call (on finish_reason "tool_calls").
|
|
-- arguments is the raw JSON-string accumulated across
|
|
-- deltas; caller does the dkjson.decode.
|
|
-- Returns:
|
|
-- true stream ended cleanly
|
|
-- nil, errmsg transport / API failure
|
|
function M.chat_stream(model_cfg, messages, on_delta, opts)
|
|
opts = opts or {}
|
|
local url, body, headers, timeout_ms =
|
|
build_request(model_cfg, messages, true, opts.tools, opts.max_tokens)
|
|
if not url then return nil, body end -- url slot carries err on bad cfg
|
|
-- Phase 3: opts.timeout_ms overrides the model's default. Used by
|
|
-- safety.is_destructive's LLM probe to cap YES/NO checks at ~15s even
|
|
-- when the model's normal timeout is much higher (e.g. user's deep
|
|
-- model has 1800000ms for long generations).
|
|
if opts.timeout_ms then timeout_ms = opts.timeout_ms end
|
|
|
|
local done = false
|
|
local api_err
|
|
-- Tool-call accumulator keyed by index. Each slot is filled across
|
|
-- many deltas: id+name come on the opener, arguments arrives as
|
|
-- character-fragment JSON-string chunks (PHASE2-baseline.md §4).
|
|
local tc_by_index = {}
|
|
local tc_index_order = {} -- preserve emission order
|
|
local index_absent_warned = false
|
|
|
|
local function on_event(data)
|
|
if done then return end
|
|
if data == "[DONE]" then done = true; return end
|
|
local doc = json.decode(data)
|
|
if not doc then return end -- ignore unparseable events
|
|
-- Some servers emit an SSE-framed error envelope at the start of the
|
|
-- stream — surface it.
|
|
if doc.error then
|
|
local m = (type(doc.error) == "table" and doc.error.message)
|
|
or tostring(doc.error)
|
|
api_err = m
|
|
done = true
|
|
return
|
|
end
|
|
local choice = doc.choices and doc.choices[1]
|
|
local delta = choice and choice.delta
|
|
|
|
-- Text path (unchanged from Phase 1 semantics; kind widened).
|
|
local content = delta and delta.content
|
|
if type(content) == "string" and #content > 0 then
|
|
on_delta("text", content)
|
|
end
|
|
|
|
-- Tool-call accumulation (Phase 2).
|
|
local tcs = delta and delta.tool_calls
|
|
if type(tcs) == "table" then
|
|
for _, tc in ipairs(tcs) do
|
|
local idx = tc.index
|
|
if idx == nil then
|
|
idx = 0
|
|
if not index_absent_warned then
|
|
index_absent_warned = true
|
|
-- One-shot debug status per stream; printed to
|
|
-- stderr so it doesn't interleave with renderer
|
|
-- stdout output.
|
|
io.stderr:write(
|
|
"[aish] broker: tool_calls[].index absent; assuming 0\n")
|
|
end
|
|
end
|
|
local slot = tc_by_index[idx]
|
|
if not slot then
|
|
slot = { id = nil, name = nil, arguments = "" }
|
|
tc_by_index[idx] = slot
|
|
tc_index_order[#tc_index_order + 1] = idx
|
|
end
|
|
if tc.id then slot.id = tc.id end
|
|
if tc["function"] then
|
|
local fn = tc["function"]
|
|
if fn.name then slot.name = fn.name end
|
|
if fn.arguments then
|
|
slot.arguments = slot.arguments .. fn.arguments
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
-- On finish_reason "tool_calls", emit all accumulated calls.
|
|
if choice and choice.finish_reason == "tool_calls" then
|
|
for _, idx in ipairs(tc_index_order) do
|
|
on_delta("tool_call", tc_by_index[idx])
|
|
end
|
|
tc_by_index = {}
|
|
tc_index_order = {}
|
|
end
|
|
end
|
|
|
|
local ok, err = curl.post_sse(url, body, headers, on_event, timeout_ms)
|
|
if api_err then return nil, "api: " .. api_err end
|
|
if not ok then return nil, "transport: " .. tostring(err) end
|
|
return true
|
|
end
|
|
|
|
-- Send a /v1/chat/completions request and return the full assistant text.
|
|
-- Thin buffering wrapper over M.chat_stream — same path as the streaming
|
|
-- consumer, so the broker keeps one HTTP shape (stream:true always).
|
|
-- M.chat's external contract is unchanged in Phase 2 (C1 in review):
|
|
-- it returns the text string and silently ignores any tool_call kinds
|
|
-- the model might emit (no caller of M.chat passes opts.tools).
|
|
-- Returns:
|
|
-- assistant_content_string on success
|
|
-- nil, errmsg on transport / decode / API failure
|
|
function M.chat(model_cfg, messages, opts)
|
|
local parts = {}
|
|
local ok, err = M.chat_stream(model_cfg, messages, function(kind, payload)
|
|
if kind == "text" then parts[#parts + 1] = payload end
|
|
end, opts)
|
|
if not ok then return nil, err end
|
|
return table.concat(parts)
|
|
end
|
|
|
|
return M
|