OpenAICompatibleLlmClient 新增 chat_stream(),使用 OpenAI-compatible /chat/completions 的 stream: true。 chat 普通对话现在优先流式分段输出;流式不可用或服务端不返回 SSE 时,会提示并自动 fallback 到非流式 chat()。 普通 chat 和 log analyze 都会过滤 think 内容,并且日志只记录过滤后的摘要。 更新了 chat/log 分析提示词,明确禁止输出 think/内部思考。 同步 README、打包 README、run.sh --help。 补充了过滤器、OpenAI 流式、CLI fallback、日志分析过滤等测试。
423 lines
19 KiB
Python
423 lines
19 KiB
Python
"""LLM 结构化输出的确定性规则 fallback。
|
||
|
||
该类不是对真实模型的替代,只用于本地开发和测试时提供稳定输出。
|
||
真实 LLM client 需要实现相同方法。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
from collections.abc import Iterable
|
||
import logging
|
||
import re
|
||
from dataclasses import asdict
|
||
from typing import Any
|
||
|
||
from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, REQUIRED_PARAMS, SENSITIVE_KEYS
|
||
from pam_deploy_graph.logging_utils import json_for_log, redact_for_log
|
||
from pam_deploy_graph.models import (
|
||
ActionResult,
|
||
ExecutionStrategy,
|
||
LlmActionAnalysis,
|
||
LlmDeployPlan,
|
||
LlmIntentResult,
|
||
LlmParamResult,
|
||
LlmSingleActionProposal,
|
||
)
|
||
|
||
from .text_filter import strip_thinking_text
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
KEY_ALIASES = {
|
||
"home_base_url": "HOME_BASE_URL",
|
||
"HOME_BASE_URL": "HOME_BASE_URL",
|
||
"client_id": "CLIENT_ID",
|
||
"CLIENT_ID": "CLIENT_ID",
|
||
"client_secret": "CLIENT_SECRET",
|
||
"CLIENT_SECRET": "CLIENT_SECRET",
|
||
"airportCode": "AIRPORT_CODE",
|
||
"AIRPORT_CODE": "AIRPORT_CODE",
|
||
"applicationName": "APP_NAME",
|
||
"APP_NAME": "APP_NAME",
|
||
"moduleName": "MODULE_NAME",
|
||
"MODULE_NAME": "MODULE_NAME",
|
||
"versionNumber": "VERSION_NUMBER",
|
||
"VERSION_NUMBER": "VERSION_NUMBER",
|
||
"parentVersionNumber": "PARENT_VERSION_NUMBER",
|
||
"PARENT_VERSION_NUMBER": "PARENT_VERSION_NUMBER",
|
||
"parent_version_number": "PARENT_VERSION_NUMBER",
|
||
"zipFilePath": "ZIP_FILE_PATH",
|
||
"ZIP_FILE_PATH": "ZIP_FILE_PATH",
|
||
"actionType": "ACTION_TYPE",
|
||
"ACTION_TYPE": "ACTION_TYPE",
|
||
"timeOut": "TIMEOUT",
|
||
"TIMEOUT": "TIMEOUT",
|
||
"logName": "LOG_NAME",
|
||
"LOG_NAME": "LOG_NAME",
|
||
}
|
||
|
||
|
||
class RuleBasedLlmClient:
|
||
"""基于规则的轻量 LLM client fallback。"""
|
||
|
||
def chat(self, text: str, context: dict[str, Any] | None = None) -> str:
|
||
"""规则 fallback 的普通对话说明。"""
|
||
logger.info("规则 LLM 普通对话 text=%s context=%s", redact_for_log(text, max_text_len=800), json_for_log(context or {}))
|
||
lowered = text.lower()
|
||
if any(word in lowered for word in ("help", "帮助", "怎么用", "命令")):
|
||
return strip_thinking_text(
|
||
"当前是本地规则 LLM fallback。可用 `analyze <需求>` 分析部署需求,`run` 执行完整 workflow,"
|
||
"`action propose <需求>` 解析单个 action,`action run ...` 确认后执行单个 action,"
|
||
"`log analyze <路径>` 分析日志尾部。"
|
||
)
|
||
return strip_thinking_text(
|
||
"当前未配置真实 LLM,已使用本地规则 fallback。普通闲聊只能给出有限说明;"
|
||
"如需自然语言问答、日志深度分析或更准确的 action 解析,请配置真实 LLM。"
|
||
)
|
||
|
||
def chat_stream(self, text: str, context: dict[str, Any] | None = None) -> Iterable[str]:
|
||
"""规则 fallback 的流式对话兼容实现。"""
|
||
yield self.chat(text, context=context)
|
||
|
||
def analyze_log(self, log_text: str, question: str | None = None, source_path: str = "") -> str:
|
||
"""用本地规则分析日志尾部。"""
|
||
logger.info("规则 LLM 日志分析 source=%s question=%s text_len=%s", source_path, redact_for_log(question or "", max_text_len=300), len(log_text))
|
||
lines = log_text.splitlines()
|
||
problem_lines = [
|
||
line
|
||
for line in lines
|
||
if re.search(r"error|exception|fail|traceback|timeout|refused|denied|失败|异常|错误|超时", line, flags=re.IGNORECASE)
|
||
]
|
||
summary = [
|
||
f"日志来源: {source_path or '-'}",
|
||
f"已分析尾部 {len(lines)} 行。",
|
||
]
|
||
if question:
|
||
summary.append(f"关注问题: {question}")
|
||
if problem_lines:
|
||
summary.append(f"发现 {len(problem_lines)} 行疑似异常,最近几条:")
|
||
summary.extend(f"- {redact_for_log(line, max_text_len=240)}" for line in problem_lines[-5:])
|
||
summary.append("建议:优先检查以上异常附近的接口返回、网络连通性、认证信息和目标服务状态。")
|
||
else:
|
||
summary.append("未在日志尾部发现明显 ERROR/Exception/fail/timeout 关键字。")
|
||
summary.append("建议:如问题仍存在,请扩大 `--tail` 或提供更具体的问题描述。")
|
||
return strip_thinking_text("\n".join(summary))
|
||
|
||
def propose_action(
|
||
self,
|
||
text: str,
|
||
allowed_actions: list[str],
|
||
params: dict[str, Any],
|
||
state_summary: dict[str, Any] | None = None,
|
||
) -> LlmSingleActionProposal:
|
||
"""只在用户明确写出 action 名时生成单 action 建议。"""
|
||
logger.info(
|
||
"规则 LLM 单 action 解析开始 text=%s allowed=%s state=%s",
|
||
redact_for_log(text, max_text_len=800),
|
||
allowed_actions,
|
||
json_for_log(state_summary or {}),
|
||
)
|
||
action = ""
|
||
lowered = text.lower()
|
||
for candidate in allowed_actions:
|
||
if candidate.lower() in lowered:
|
||
action = candidate
|
||
break
|
||
ip_match = re.search(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", text)
|
||
kwargs = _safe_action_kwargs(self._extract_key_values(text))
|
||
risk = "high" if action in ("publish-version", "create-download-task", "upgrade-ip", "start-ip", "stop-ip", "rollback-ip") else "medium"
|
||
proposal = LlmSingleActionProposal(
|
||
action=action,
|
||
ip=ip_match.group(0) if ip_match else "",
|
||
kwargs=kwargs,
|
||
reason="规则 fallback 仅在输入中出现明确 action 名时生成建议。" if action else "未识别到明确 action 名。",
|
||
risk_level=risk, # type: ignore[arg-type]
|
||
requires_confirmation=True,
|
||
)
|
||
logger.info("规则 LLM 单 action 解析完成 proposal=%s", json_for_log(asdict(proposal)))
|
||
return proposal
|
||
|
||
def understand_request(self, text: str) -> LlmIntentResult:
|
||
"""用关键词规则识别用户意图和执行策略偏好。"""
|
||
logger.info("规则 LLM 意图识别开始 text=%s", redact_for_log(text, max_text_len=800))
|
||
lowered = text.lower()
|
||
reasons: list[str] = []
|
||
intent = "deploy"
|
||
|
||
if any(word in lowered for word in ("用法", "怎么用", "生成脚本", "给我脚本", "usage")):
|
||
intent = "show_usage"
|
||
reasons.append("用户在询问脚本用法或脚本生成")
|
||
elif any(word in lowered for word in ("预演", "计划", "不执行", "不要动环境", "dry-run", "preview")):
|
||
intent = "preview"
|
||
reasons.append("用户要求只预演或不触碰环境")
|
||
elif any(word in lowered for word in ("在线ip", "在线 ip", "查询ip", "查询 ip", "node", "工作站")):
|
||
intent = "query_node_ips"
|
||
reasons.append("用户要求查询 Node 或在线工作站")
|
||
elif any(word in lowered for word in ("回滚", "rollback")):
|
||
intent = "rollback"
|
||
reasons.append("用户要求回滚")
|
||
else:
|
||
reasons.append("默认识别为部署请求")
|
||
|
||
mode_preference = "未指定"
|
||
strategy_preference = "未指定"
|
||
if any(word in lowered for word in ("mcp", "在线执行", "直接在线")):
|
||
mode_preference = "MCP"
|
||
strategy_preference = "hybrid_node_mcp"
|
||
reasons.append("用户倾向 MCP;PAM_HOME 仍需脚本 action")
|
||
if any(word in lowered for word in ("脚本", "离线", "script", "shell", "powershell")):
|
||
mode_preference = "API脚本"
|
||
strategy_preference = "script_only"
|
||
reasons.append("用户倾向脚本或离线执行")
|
||
if intent == "preview":
|
||
strategy_preference = strategy_preference if strategy_preference != "未指定" else "hybrid_node_mcp"
|
||
|
||
result = LlmIntentResult(
|
||
intent=intent, # type: ignore[arg-type]
|
||
mode_preference=mode_preference, # type: ignore[arg-type]
|
||
strategy_preference=strategy_preference, # type: ignore[arg-type]
|
||
confidence=0.72 if intent != "deploy" else 0.6,
|
||
reasons=reasons,
|
||
)
|
||
logger.info("规则 LLM 意图识别完成 result=%s", json_for_log(asdict(result)))
|
||
return result
|
||
|
||
def extract_params(self, text: str, base_params: dict[str, Any] | None = None) -> LlmParamResult:
|
||
"""从 key=value、中文短语和 IP 地址中抽取参数。"""
|
||
logger.info("规则 LLM 参数抽取开始 text=%s base_params=%s", redact_for_log(text, max_text_len=800), json_for_log(base_params or {}))
|
||
params = dict(base_params or {})
|
||
params.update(self._extract_key_values(text))
|
||
params.update(self._extract_chinese_patterns(text))
|
||
|
||
control: dict[str, Any] = {}
|
||
ips = re.findall(r"\b(?:\d{1,3}\.){3}\d{1,3}\b", text)
|
||
if ips:
|
||
control["user_specified_ips"] = ips
|
||
|
||
missing = [key for key in REQUIRED_PARAMS if not params.get(key)]
|
||
sensitive = [key for key in ("CLIENT_SECRET", "CLIENT_ID") if params.get(key)]
|
||
result = LlmParamResult(
|
||
extracted_params=params,
|
||
extracted_control=control,
|
||
missing_required_params=missing,
|
||
sensitive_fields_present=sensitive,
|
||
)
|
||
logger.info("规则 LLM 参数抽取完成 result=%s", json_for_log(asdict(result)))
|
||
return result
|
||
|
||
def generate_plan(
|
||
self,
|
||
*,
|
||
params: dict[str, Any],
|
||
intent: str,
|
||
strategy: ExecutionStrategy,
|
||
) -> LlmDeployPlan:
|
||
"""生成确定性的部署计划和风险提示。"""
|
||
logger.info("规则 LLM 计划生成开始 intent=%s strategy=%s params=%s", intent, strategy, json_for_log(params))
|
||
if strategy == "hybrid_node_mcp":
|
||
strategy_text = "PAM_HOME 使用脚本 action,PAM_NODE 使用 MCP"
|
||
elif strategy == "script_only":
|
||
strategy_text = "全部 action 使用脚本 action"
|
||
else:
|
||
strategy_text = "全部 action 使用 fake runner"
|
||
|
||
summary = (
|
||
f"计划处理 {params.get('AIRPORT_CODE', '-')}/"
|
||
f"{params.get('APP_NAME', '-')}/"
|
||
f"{params.get('MODULE_NAME', '-')}/"
|
||
f"{params.get('VERSION_NUMBER', '-')},执行策略为 {strategy_text}。"
|
||
)
|
||
risk_notes = [
|
||
"真实部署前必须确认参数。",
|
||
"发布版本、创建下载任务、升级和回滚属于高风险动作。",
|
||
"回滚只能在用户确认后执行。",
|
||
]
|
||
if strategy == "hybrid_node_mcp":
|
||
risk_notes.append("PAM_HOME 当前没有 MCP 能力,HOME 阶段仍会调用脚本 action。")
|
||
|
||
result = LlmDeployPlan(
|
||
summary=summary,
|
||
risk_notes=risk_notes,
|
||
planned_actions=list(GLOBAL_ACTION_SEQUENCE),
|
||
requires_confirmation=intent in ("deploy", "query_node_ips", "rollback"),
|
||
execution_strategy=strategy,
|
||
)
|
||
logger.info("规则 LLM 计划生成完成 result=%s", json_for_log(asdict(result)))
|
||
return result
|
||
|
||
def analyze_action_result(
|
||
self,
|
||
*,
|
||
action: str,
|
||
result: ActionResult,
|
||
) -> LlmActionAnalysis:
|
||
"""用本地规则分析 action 结果,作为真实 LLM 不可用时的兜底。"""
|
||
logger.info(
|
||
"规则 LLM action 审核开始 action=%s result=%s",
|
||
action,
|
||
json_for_log(
|
||
{
|
||
"backend": result.backend,
|
||
"ok": result.ok,
|
||
"exit_code": result.exit_code,
|
||
"tool_name": result.tool_name,
|
||
"values": result.values,
|
||
"error_summary": result.error_summary,
|
||
},
|
||
max_text_len=1000,
|
||
),
|
||
)
|
||
notes: list[str] = []
|
||
has_anomaly = not result.ok
|
||
severity = "info"
|
||
possible_reason = ""
|
||
suggested_action = "继续观察。"
|
||
requires_confirmation = False
|
||
should_continue = True
|
||
progress_complete: bool | None = None
|
||
|
||
if not result.ok:
|
||
severity = "medium"
|
||
possible_reason = result.error_summary or "action 返回失败状态。"
|
||
suggested_action = "查看 action 诊断日志、参数、网络和目标服务状态。"
|
||
notes.append("硬规则检测到 action 执行失败。")
|
||
should_continue = False
|
||
|
||
if action == "verify-ip":
|
||
success = result.values.get("SUCCESS")
|
||
if success is not None and str(success).lower() not in ("true", "1", "yes"):
|
||
has_anomaly = True
|
||
severity = "high"
|
||
possible_reason = result.values.get("MESSAGE", "") or "工作站健康检查未通过。"
|
||
suggested_action = "先下载日志并人工确认是否执行回滚。"
|
||
requires_confirmation = True
|
||
notes.append("verify-ip SUCCESS 非成功值。")
|
||
should_continue = False
|
||
|
||
if action == "rollback-ip" and not result.ok:
|
||
severity = "high"
|
||
suggested_action = "保持待确认状态,人工排查回滚失败原因后重试或转人工处理。"
|
||
requires_confirmation = True
|
||
notes.append("rollback-ip 失败需要人工处理。")
|
||
should_continue = False
|
||
|
||
if action in ("poll-download-progress", "poll-upgrade-progress"):
|
||
progress_complete, progress_has_anomaly, progress_reason, progress_note = _analyze_progress_values(action, result.values)
|
||
if progress_note:
|
||
notes.append(progress_note)
|
||
if progress_has_anomaly:
|
||
has_anomaly = True
|
||
severity = "high"
|
||
possible_reason = progress_reason or possible_reason or "进度接口返回失败状态。"
|
||
suggested_action = "停止后续 action,检查下载/推送任务状态、PAM_HOME/PAM_NODE 日志和接口返回。"
|
||
should_continue = False
|
||
elif progress_complete:
|
||
has_anomaly = has_anomaly or False
|
||
suggested_action = "进度已完成,可以继续下一个 action。"
|
||
should_continue = should_continue and True
|
||
elif result.ok:
|
||
severity = severity if has_anomaly else "info"
|
||
suggested_action = "进度未完成,继续查询进度。"
|
||
should_continue = should_continue and True
|
||
|
||
if result.values.get("PENDING_AGENT_CONFIRMATION"):
|
||
has_anomaly = True
|
||
severity = "high"
|
||
possible_reason = str(result.values["PENDING_AGENT_CONFIRMATION"])
|
||
suggested_action = "暂停自动流程,等待人工确认。"
|
||
requires_confirmation = True
|
||
notes.append("action 返回待人工确认标记。")
|
||
should_continue = False
|
||
|
||
analysis = LlmActionAnalysis(
|
||
action=action,
|
||
has_anomaly=has_anomaly,
|
||
severity=severity, # type: ignore[arg-type]
|
||
possible_reason=possible_reason,
|
||
suggested_action=suggested_action,
|
||
requires_confirmation=requires_confirmation,
|
||
should_continue=should_continue,
|
||
progress_complete=progress_complete,
|
||
notes=notes,
|
||
)
|
||
logger.info("规则 LLM action 审核完成 analysis=%s", json_for_log(asdict(analysis)))
|
||
return analysis
|
||
|
||
def _extract_key_values(self, text: str) -> dict[str, str]:
|
||
"""抽取 KEY=VALUE 形式的参数。"""
|
||
params: dict[str, str] = {}
|
||
for match in re.finditer(r"([A-Za-z_][A-Za-z0-9_]*)\s*=\s*([^\s,;]+)", text):
|
||
raw_key, value = match.groups()
|
||
key = KEY_ALIASES.get(raw_key)
|
||
if key:
|
||
params[key] = value.strip()
|
||
return params
|
||
|
||
def _extract_chinese_patterns(self, text: str) -> dict[str, str]:
|
||
"""抽取常见中文描述中的部署参数。"""
|
||
patterns = {
|
||
"AIRPORT_CODE": r"(?:机场|三字码)\s*[::]?\s*([A-Z]{3})",
|
||
"APP_NAME": r"(?:应用|应用名)\s*[::]?\s*([A-Za-z0-9_.-]+)",
|
||
"MODULE_NAME": r"(?:模块|模块名)\s*[::]?\s*([A-Za-z0-9_.-]+)",
|
||
"VERSION_NUMBER": r"(?:版本|版本号)\s*[::]?\s*([A-Za-z0-9_.-]+)",
|
||
"PARENT_VERSION_NUMBER": r"(?:继承版本|父版本|规则版本|继承哪个版本的规则)\s*[::]?\s*([A-Za-z0-9_.-]+)",
|
||
"ZIP_FILE_PATH": r"(?:包|软件包|zip)\s*[::]?\s*([A-Za-z]:[\\/][^\s,;]+|/[^\s,;]+)",
|
||
}
|
||
params: dict[str, str] = {}
|
||
for key, pattern in patterns.items():
|
||
match = re.search(pattern, text)
|
||
if match:
|
||
params[key] = match.group(1)
|
||
return params
|
||
|
||
|
||
def _analyze_progress_values(action: str, values: dict[str, Any]) -> tuple[bool, bool, str, str]:
|
||
"""分析进度字段,返回完成状态、异常状态、原因和备注。"""
|
||
step = _lower_value(values.get("STEP"))
|
||
status = _lower_value(values.get("STATUS"))
|
||
msg = _lower_value(values.get("MSG"))
|
||
message = _lower_value(values.get("MESSAGE"))
|
||
success = _lower_value(values.get("SUCCESS"))
|
||
finish = _lower_value(values.get("FINISH"))
|
||
code = _lower_value(values.get("CODE"))
|
||
rate = _lower_value(values.get("RATE_OF_PROGRESS"))
|
||
|
||
complete = False
|
||
if step == "done":
|
||
complete = True
|
||
elif status in ("completed", "complete", "done", "success", "succeeded"):
|
||
complete = True
|
||
elif success in ("true", "1", "yes"):
|
||
complete = True
|
||
elif action == "poll-upgrade-progress" and finish in ("true", "1", "yes"):
|
||
complete = True
|
||
elif msg == "success" and rate == "100" and (not code or code == "0"):
|
||
complete = True
|
||
|
||
if code and code != "0":
|
||
return complete, True, f"进度接口返回非 0 CODE: {code}", _progress_note(values)
|
||
combined = " ".join(item for item in (step, status, msg, message) if item)
|
||
if re.search(r"fail|error", combined, flags=re.IGNORECASE):
|
||
return complete, True, values.get("MESSAGE") or values.get("MSG") or values.get("STEP") or "进度接口返回失败状态", _progress_note(values)
|
||
return complete, False, "", _progress_note(values)
|
||
|
||
|
||
def _progress_note(values: dict[str, Any]) -> str:
|
||
"""把进度核心字段整理成一条备注。"""
|
||
parts = []
|
||
for key in ("RATE_OF_PROGRESS", "STEP", "MSG", "STATUS", "SUCCESS", "CODE", "FINISH", "MESSAGE"):
|
||
value = values.get(key)
|
||
if value not in (None, ""):
|
||
parts.append(f"{key}={value}")
|
||
return "当前进度: " + ", ".join(parts) if parts else "进度接口未返回明确进度字段。"
|
||
|
||
|
||
def _lower_value(value: Any) -> str:
|
||
"""把字段值转成小写字符串。"""
|
||
return str(value).strip().lower() if value is not None else ""
|
||
|
||
|
||
def _safe_action_kwargs(values: dict[str, str]) -> dict[str, str]:
|
||
"""过滤单 action 额外参数,避免把敏感字段放入执行建议。"""
|
||
return {key: value for key, value in values.items() if key not in SENSITIVE_KEYS}
|