用户修复外部问题后输入 resume,会从失败 action 重新执行,而不是结束整个流程。 回滚从 workflow 中拆出,新增显式命令: chat:rollback [IP] CLI:rollback --checkpoint ... [--ip ...] [--stop-first|--no-stop-first] 旧 confirm approve/reject 只保留为旧 checkpoint 兼容入口,新流程不再推荐使用。 LangGraph workflow 已移除回滚确认 interrupt 节点,失败暂停和续跑走业务 checkpoint。 README、打包 README、run.sh --help、流程图、todo、提示词基线和测试都已同步。
262 lines
11 KiB
Python
262 lines
11 KiB
Python
"""PAM 部署 Agent 的命令行入口。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import json
|
||
import logging
|
||
from dataclasses import asdict
|
||
|
||
from .agent import PamDeployAgent
|
||
from .checkpoint_store import load_agent_state, redact_mapping
|
||
from .interactive import run_interactive_chat
|
||
from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult
|
||
from .llm import build_llm_client
|
||
from .logging_utils import configure_logging, json_for_log
|
||
from .mcp_factory import build_mcp_runner_from_config
|
||
from .params_loader import load_params_file
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
def add_llm_args(parser: argparse.ArgumentParser) -> None:
|
||
"""为子命令追加真实 LLM 配置参数。"""
|
||
parser.add_argument("--llm-base-url")
|
||
parser.add_argument("--llm-api-key")
|
||
parser.add_argument("--llm-model")
|
||
parser.add_argument("--llm-action-analysis-prompt-file")
|
||
|
||
|
||
def add_mcp_args(parser: argparse.ArgumentParser) -> None:
|
||
"""为需要执行 MCP action 的子命令追加 MCP 配置参数。"""
|
||
parser.add_argument("--mcp-config", help="MCP client JSON 配置文件路径")
|
||
|
||
|
||
def add_action_analysis_arg(parser: argparse.ArgumentParser) -> None:
|
||
"""为执行类子命令追加 action 后诊断开关。"""
|
||
parser.add_argument("--analyze-actions", action="store_true", help="每个 action 后追加 LLM/规则诊断建议")
|
||
|
||
|
||
def require_confirm(args: argparse.Namespace) -> None:
|
||
"""真实执行前强制要求命令行显式传入 --confirm。"""
|
||
if not getattr(args, "confirm", False):
|
||
raise SystemExit("Refusing to execute actions without --confirm.")
|
||
|
||
|
||
def print_pause_payload(agent: PamDeployAgent, state) -> None:
|
||
"""输出 checkpoint 和待确认信息,便于用户续跑或确认。"""
|
||
if state.pending_confirmation:
|
||
print(json.dumps({"confirmation": agent.build_confirmation_request(state)}, ensure_ascii=False, indent=2))
|
||
if state.checkpoint_path:
|
||
print(json.dumps({"checkpoint": state.checkpoint_path}, ensure_ascii=False, indent=2))
|
||
|
||
|
||
def run_graph_once(agent: PamDeployAgent, state, *, flow: str = "deploy") -> LangGraphRunResult:
|
||
"""用 LangGraph runtime 执行一次状态,返回图执行结果。"""
|
||
runtime = LangGraphDeploymentRuntime(agent=agent, flow=flow) # type: ignore[arg-type]
|
||
return runtime.start(state)
|
||
|
||
|
||
def print_graph_result(agent: PamDeployAgent, result: LangGraphRunResult) -> None:
|
||
"""输出 LangGraph 执行结果、报告和暂停信息。"""
|
||
state = result.state
|
||
if result.report:
|
||
print(result.report)
|
||
elif state is not None:
|
||
print(agent.render_report(state))
|
||
if result.interrupted and result.confirmation:
|
||
print(json.dumps({"confirmation": result.confirmation}, ensure_ascii=False, indent=2))
|
||
if state is not None:
|
||
print_pause_payload(agent, state)
|
||
|
||
|
||
def main() -> None:
|
||
"""解析 CLI 参数并分发到对应命令。"""
|
||
parser = argparse.ArgumentParser(prog="pam-deploy-agent")
|
||
sub = parser.add_subparsers(dest="command", required=True)
|
||
|
||
preview = sub.add_parser("preview")
|
||
preview.add_argument("--config", required=True)
|
||
preview.add_argument("--strategy", default="hybrid_node_mcp", choices=["hybrid_node_mcp", "script_only", "fake"])
|
||
|
||
analyze = sub.add_parser("analyze")
|
||
analyze.add_argument("--text", required=True)
|
||
analyze.add_argument("--config")
|
||
add_llm_args(analyze)
|
||
|
||
chat = sub.add_parser("chat")
|
||
chat.add_argument("--config", required=True)
|
||
chat.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"])
|
||
chat.add_argument("--target-ip", action="append", default=[])
|
||
chat.add_argument("--checkpoint")
|
||
add_llm_args(chat)
|
||
add_mcp_args(chat)
|
||
add_action_analysis_arg(chat)
|
||
|
||
run = sub.add_parser("run-global")
|
||
run.add_argument("--config", required=True)
|
||
run.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"])
|
||
run.add_argument("--checkpoint")
|
||
run.add_argument("--confirm", action="store_true")
|
||
add_llm_args(run)
|
||
add_mcp_args(run)
|
||
add_action_analysis_arg(run)
|
||
|
||
deploy = sub.add_parser("run-deploy")
|
||
deploy.add_argument("--config", required=True)
|
||
deploy.add_argument("--strategy", default="fake", choices=["hybrid_node_mcp", "script_only", "fake"])
|
||
deploy.add_argument("--target-ip", action="append", default=[])
|
||
deploy.add_argument("--checkpoint")
|
||
deploy.add_argument("--confirm", action="store_true")
|
||
add_llm_args(deploy)
|
||
add_mcp_args(deploy)
|
||
add_action_analysis_arg(deploy)
|
||
|
||
resume = sub.add_parser("resume")
|
||
resume.add_argument("--checkpoint", required=True)
|
||
resume.add_argument("--confirm", action="store_true")
|
||
add_llm_args(resume)
|
||
add_mcp_args(resume)
|
||
add_action_analysis_arg(resume)
|
||
|
||
confirm = sub.add_parser("confirm")
|
||
confirm.add_argument("--checkpoint", required=True)
|
||
confirm.add_argument("--decision", required=True, choices=["approve", "reject"])
|
||
confirm.add_argument("--note", default="")
|
||
confirm.add_argument("--confirm", action="store_true")
|
||
add_llm_args(confirm)
|
||
add_mcp_args(confirm)
|
||
add_action_analysis_arg(confirm)
|
||
|
||
rollback = sub.add_parser("rollback")
|
||
rollback.add_argument("--checkpoint", required=True)
|
||
rollback.add_argument("--ip", help="要回滚的 IP;不传时使用当前失败 IP")
|
||
rollback.add_argument("--stop-first", dest="stop_first", action="store_true", default=None, help="回滚前先停机")
|
||
rollback.add_argument("--no-stop-first", dest="stop_first", action="store_false", default=None, help="回滚前不先停机")
|
||
rollback.add_argument("--note", default="")
|
||
rollback.add_argument("--confirm", action="store_true")
|
||
add_llm_args(rollback)
|
||
add_mcp_args(rollback)
|
||
add_action_analysis_arg(rollback)
|
||
|
||
args = parser.parse_args()
|
||
log_path = configure_logging()
|
||
logger.info("CLI 启动 command=%s args=%s log_path=%s", args.command, json_for_log(vars(args)), log_path)
|
||
params = load_params_file(args.config) if getattr(args, "config", None) else {}
|
||
if getattr(args, "config", None):
|
||
logger.info("参数文件已加载 command=%s config=%s params=%s", args.command, args.config, json_for_log(params))
|
||
llm_client = None
|
||
if args.command != "preview":
|
||
llm_client = build_llm_client(
|
||
base_url=getattr(args, "llm_base_url", None),
|
||
api_key=getattr(args, "llm_api_key", None),
|
||
model=getattr(args, "llm_model", None),
|
||
action_analysis_prompt_path=getattr(args, "llm_action_analysis_prompt_file", None),
|
||
)
|
||
mcp_runner = None
|
||
if getattr(args, "mcp_config", None):
|
||
logger.info("开始加载 MCP 配置 path=%s", args.mcp_config)
|
||
mcp_runner = build_mcp_runner_from_config(args.mcp_config)
|
||
logger.info("MCP 配置加载完成 path=%s runner=%s", args.mcp_config, type(mcp_runner).__name__)
|
||
agent = PamDeployAgent(
|
||
llm_client=llm_client,
|
||
mcp_runner=mcp_runner,
|
||
action_analysis_enabled=bool(getattr(args, "analyze_actions", False)),
|
||
)
|
||
|
||
if args.command == "analyze":
|
||
logger.info("开始执行 analyze text_len=%s", len(args.text))
|
||
result = agent.analyze_request(args.text, params)
|
||
payload = redact_mapping({key: asdict(value) for key, value in result.items()})
|
||
logger.info("analyze 完成 result=%s", json_for_log(payload))
|
||
print(json.dumps(payload, ensure_ascii=False, indent=2))
|
||
return
|
||
|
||
if args.command == "chat":
|
||
logger.info("进入 chat 模式 strategy=%s checkpoint=%s target_ips=%s", args.strategy, args.checkpoint, args.target_ip)
|
||
run_interactive_chat(
|
||
agent=agent,
|
||
params=params,
|
||
strategy=args.strategy,
|
||
checkpoint_path=args.checkpoint,
|
||
target_ips=args.target_ip,
|
||
)
|
||
return
|
||
|
||
if args.command == "preview":
|
||
logger.info("执行 preview strategy=%s", args.strategy)
|
||
print(agent.preview(params, args.strategy))
|
||
return
|
||
|
||
require_confirm(args)
|
||
if args.command == "run-global":
|
||
logger.info("开始 run-global strategy=%s checkpoint=%s", args.strategy, args.checkpoint)
|
||
state = agent.create_state(
|
||
params=params,
|
||
execution_strategy=args.strategy,
|
||
checkpoint_path=args.checkpoint,
|
||
)
|
||
result = run_graph_once(agent, state, flow="global")
|
||
if result.state is not None:
|
||
print(json.dumps({"events": result.state.events}, ensure_ascii=False, indent=2))
|
||
print_pause_payload(agent, result.state)
|
||
return
|
||
|
||
if args.command == "resume":
|
||
logger.info("开始 resume checkpoint=%s", args.checkpoint)
|
||
state = load_agent_state(args.checkpoint)
|
||
state.checkpoint_path = state.checkpoint_path or args.checkpoint
|
||
if state.paused:
|
||
state = agent.resume_state(state)
|
||
result = run_graph_once(agent, state, flow="deploy")
|
||
print_graph_result(agent, result)
|
||
return
|
||
|
||
if args.command == "confirm":
|
||
logger.info("开始 confirm checkpoint=%s decision=%s note_len=%s", args.checkpoint, args.decision, len(args.note))
|
||
state = load_agent_state(args.checkpoint)
|
||
state.checkpoint_path = state.checkpoint_path or args.checkpoint
|
||
if not state.pending_confirmation:
|
||
raise SystemExit("当前 checkpoint 没有待确认事项;新流程请使用 resume 重试,或 rollback 显式回滚。")
|
||
state = agent.confirm_pending(state, approved=args.decision == "approve", operator_note=args.note)
|
||
print(agent.render_report(state))
|
||
print_pause_payload(agent, state)
|
||
return
|
||
|
||
if args.command == "rollback":
|
||
logger.info("开始 rollback checkpoint=%s ip=%s stop_first=%s note_len=%s", args.checkpoint, args.ip, args.stop_first, len(args.note))
|
||
state = load_agent_state(args.checkpoint)
|
||
state.checkpoint_path = state.checkpoint_path or args.checkpoint
|
||
ip = args.ip or _find_current_failed_ip(state)
|
||
if not ip:
|
||
raise SystemExit("未找到当前失败 IP,请传入 --ip。")
|
||
state = agent.rollback_ip(state, ip, stop_first=args.stop_first, operator_note=args.note)
|
||
print(agent.render_report(state))
|
||
print_pause_payload(agent, state)
|
||
return
|
||
|
||
logger.info("开始 run-deploy strategy=%s checkpoint=%s target_ips=%s", args.strategy, args.checkpoint, args.target_ip)
|
||
state = agent.create_state(
|
||
params=params,
|
||
execution_strategy=args.strategy,
|
||
checkpoint_path=args.checkpoint,
|
||
target_ips=args.target_ip,
|
||
)
|
||
result = run_graph_once(agent, state, flow="deploy")
|
||
print_graph_result(agent, result)
|
||
|
||
|
||
def _find_current_failed_ip(state) -> str:
|
||
"""从 checkpoint state 中找一个适合显式回滚的失败 IP。"""
|
||
context_ip = str((state.review_context or {}).get("ip", ""))
|
||
if context_ip and context_ip in state.ip_states:
|
||
return context_ip
|
||
for ip, ip_state in state.ip_states.items():
|
||
if ip_state.get("status") == "FAILED":
|
||
return ip
|
||
return ""
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|