- 将 CLI/chat 部署执行切换为 action 级 LangGraph runtime - 接入 LangGraph interrupt/checkpointer 处理人工确认与恢复 - 保留业务 checkpoint JSON 用于跨进程断点续跑 - 增加 MCP HTTP/SSE server_url 配置支持 - 增加 MCP 独立 OAuth token 鉴权,复用 HOME 的 client_credentials 方式 - 支持从 MCP server list_tools 自动发现 tools,action_tools 仅作为可选覆盖 - 更新 MCP 配置示例、README、打包说明和整体流程图 - 补充 MCP 配置、鉴权和 tool 自动发现测试
593 lines
23 KiB
Python
593 lines
23 KiB
Python
"""PAM 部署 Agent 的常驻式交互 CLI 会话。"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import time
|
||
import json
|
||
import shlex
|
||
import builtins
|
||
from dataclasses import asdict
|
||
from pathlib import Path
|
||
from typing import Any, Callable
|
||
|
||
from .agent import PamDeployAgent
|
||
from .checkpoint_store import load_agent_state, redact_mapping
|
||
from .langgraph_runtime import LangGraphDeploymentRuntime, LangGraphRunResult
|
||
from .llm import build_llm_client
|
||
from .llm.rule_based import RuleBasedLlmClient
|
||
from .mcp_factory import build_mcp_runner_from_config
|
||
from .models import AgentState, ExecutionStrategy
|
||
|
||
InputFunc = Callable[[str], str]
|
||
OutputFunc = Callable[[str], None]
|
||
|
||
COMMAND_HELP = """可用命令:
|
||
help 显示帮助
|
||
preview 查看当前参数和执行策略
|
||
analyze <需求> 只做理解和计划,不执行
|
||
params 脱敏展示当前会话参数
|
||
events [数量] 查看最近 action 事件,默认 10 条
|
||
set KEY=VALUE 修改当前会话参数
|
||
llm config KEY=VALUE 配置真实 LLM,支持 base_url/api_key/model
|
||
llm fallback 切回本地规则 fallback
|
||
llm action-analysis on|off 开关 action 后诊断
|
||
mcp config <路径> 加载 MCP client JSON 配置
|
||
run 创建部署任务并执行
|
||
status 查看当前运行状态
|
||
approve 确认待处理回滚
|
||
reject [原因] 拒绝待处理回滚
|
||
resume 从当前 checkpoint 续跑
|
||
list checkpoints 列出 checkpoint 目录下的 JSON 文件
|
||
load checkpoint <路径> 加载指定 checkpoint
|
||
checkpoint 显示 checkpoint 路径
|
||
exit 退出
|
||
|
||
也可以直接输入自然语言需求,Agent 会先分析并更新会话参数;执行仍需输入 run。
|
||
"""
|
||
|
||
|
||
class InteractiveCliSession:
|
||
"""维护一次交互式 CLI 会话的参数、状态和命令处理逻辑。"""
|
||
|
||
def __init__(
|
||
self,
|
||
*,
|
||
agent: PamDeployAgent,
|
||
params: dict[str, Any],
|
||
strategy: ExecutionStrategy = "hybrid_node_mcp",
|
||
checkpoint_path: str | None = None,
|
||
target_ips: list[str] | None = None,
|
||
input_func: InputFunc = input,
|
||
output_func: OutputFunc = print,
|
||
) -> None:
|
||
"""初始化会话上下文和输入输出函数。"""
|
||
self.agent = agent
|
||
self.params = dict(params)
|
||
self.strategy = strategy
|
||
self.checkpoint_path = checkpoint_path or _default_checkpoint_path()
|
||
self.target_ips = list(target_ips or [])
|
||
self.input = _build_prompt_input(input_func)
|
||
self.output = _build_output_func(output_func)
|
||
self.state: AgentState | None = None
|
||
self.last_analysis: dict[str, Any] | None = None
|
||
self.llm_config: dict[str, str] = {}
|
||
self.mcp_config_path: str = ""
|
||
self.graph_runtime: LangGraphDeploymentRuntime | None = None
|
||
|
||
def run(self) -> None:
|
||
"""启动 REPL 循环,直到用户 exit 或输入流结束。"""
|
||
self.output("PAM 部署 Agent 交互式会话")
|
||
self.output("输入 help 查看命令,输入 exit 退出。")
|
||
self._load_existing_checkpoint_if_any()
|
||
while True:
|
||
try:
|
||
line = self.input("pam-deploy-agent> ")
|
||
except EOFError:
|
||
self.output("bye")
|
||
return
|
||
if not self.handle_line(line):
|
||
return
|
||
|
||
def handle_line(self, line: str) -> bool:
|
||
"""处理用户输入的一行命令;返回 False 表示退出会话。"""
|
||
text = line.strip()
|
||
if not text:
|
||
return True
|
||
|
||
command, _, rest = text.partition(" ")
|
||
normalized = command.lower()
|
||
|
||
if normalized in ("exit", "quit", "q"):
|
||
self.output("bye")
|
||
return False
|
||
if normalized in ("help", "?"):
|
||
self.output(COMMAND_HELP.rstrip())
|
||
return True
|
||
if normalized == "preview":
|
||
self.output(self.agent.preview(self.params, self.strategy))
|
||
return True
|
||
if normalized == "params":
|
||
self._show_params()
|
||
return True
|
||
if normalized == "events":
|
||
self._show_events(rest.strip())
|
||
return True
|
||
if normalized == "analyze":
|
||
self._analyze(rest.strip())
|
||
return True
|
||
if normalized == "set":
|
||
self._set_param(rest.strip())
|
||
return True
|
||
if normalized == "llm":
|
||
self._configure_llm(rest.strip())
|
||
return True
|
||
if normalized == "mcp":
|
||
self._configure_mcp(rest.strip())
|
||
return True
|
||
if normalized in ("run", "deploy", "execute"):
|
||
self._run_deploy()
|
||
return True
|
||
if normalized == "resume":
|
||
self._resume()
|
||
return True
|
||
if normalized == "status":
|
||
self._status()
|
||
return True
|
||
if normalized == "approve":
|
||
self._confirm(approved=True, note=rest.strip())
|
||
return True
|
||
if normalized == "reject":
|
||
self._confirm(approved=False, note=rest.strip())
|
||
return True
|
||
if normalized == "checkpoint":
|
||
self.output(f"checkpoint: {self.checkpoint_path}")
|
||
return True
|
||
if normalized == "list" and rest.strip().lower() == "checkpoints":
|
||
self._list_checkpoints()
|
||
return True
|
||
if normalized == "load" and rest.strip().lower().startswith("checkpoint"):
|
||
self._load_checkpoint(rest.strip()[len("checkpoint") :].strip())
|
||
return True
|
||
|
||
self._analyze(text)
|
||
return True
|
||
|
||
def _analyze(self, text: str) -> None:
|
||
"""分析自然语言需求,并更新会话中的参数、策略和目标 IP。"""
|
||
if not text:
|
||
self.output("请输入要分析的自然语言需求,例如:analyze 请用 MCP 预演部署 HET。")
|
||
return
|
||
|
||
result = self.agent.analyze_request(text, self.params)
|
||
self.last_analysis = result
|
||
param_result = result["params"]
|
||
intent_result = result["intent"]
|
||
plan = result["plan"]
|
||
self.params = dict(param_result.extracted_params)
|
||
self.strategy = _choose_strategy(intent_result.strategy_preference, self.strategy)
|
||
|
||
user_ips = param_result.extracted_control.get("user_specified_ips")
|
||
if isinstance(user_ips, list):
|
||
self.target_ips = [str(item) for item in user_ips]
|
||
|
||
safe_payload = redact_mapping({key: asdict(value) for key, value in result.items()})
|
||
self.output("已生成结构化理解:")
|
||
self.output(f"- intent: {intent_result.intent}")
|
||
self.output(f"- strategy: {self.strategy}")
|
||
self.output(f"- summary: {plan.summary}")
|
||
if param_result.missing_required_params:
|
||
self.output("- missing: " + ", ".join(param_result.missing_required_params))
|
||
if self.target_ips:
|
||
self.output("- target_ips: " + ", ".join(self.target_ips))
|
||
self.output("执行请输 run;查看完整 JSON 可用一次性 analyze 命令。")
|
||
self.output(_format_redacted_params(safe_payload["params"]["extracted_params"]))
|
||
|
||
def _set_param(self, assignment: str) -> None:
|
||
"""处理 `set KEY=VALUE` 命令,更新当前会话参数。"""
|
||
if "=" not in assignment:
|
||
self.output("格式:set KEY=VALUE")
|
||
return
|
||
key, value = assignment.split("=", 1)
|
||
key = key.strip()
|
||
if not key:
|
||
self.output("参数名不能为空。")
|
||
return
|
||
self.params[key] = value.strip()
|
||
self.output(f"已设置 {key}")
|
||
|
||
def _show_params(self) -> None:
|
||
"""脱敏展示当前会话参数。"""
|
||
self.output(_format_redacted_params(redact_mapping(self.params)))
|
||
|
||
def _show_events(self, count_text: str) -> None:
|
||
"""展示最近若干条事件。"""
|
||
if self.state is None or not self.state.events:
|
||
self.output("当前没有事件。")
|
||
return
|
||
try:
|
||
count = int(count_text) if count_text else 10
|
||
except ValueError:
|
||
self.output("格式:events [数量]")
|
||
return
|
||
events = self.state.events[-max(count, 1) :]
|
||
self.output(json.dumps(redact_mapping(events), ensure_ascii=False, indent=2, default=str))
|
||
|
||
def _configure_llm(self, text: str) -> None:
|
||
"""热加载 LLM 配置,或开关 action 后诊断。"""
|
||
if not text:
|
||
self.output("格式:llm config base_url=... api_key=... model=... | llm fallback | llm action-analysis on|off")
|
||
return
|
||
parts = shlex.split(text)
|
||
if parts[0] == "fallback":
|
||
self.agent.llm_client = RuleBasedLlmClient()
|
||
self.llm_config = {}
|
||
self.output("已切回本地规则 LLM fallback。")
|
||
return
|
||
if parts[0] == "action-analysis":
|
||
if len(parts) < 2 or parts[1] not in ("on", "off"):
|
||
self.output("格式:llm action-analysis on|off")
|
||
return
|
||
self.agent.action_analysis_enabled = parts[1] == "on"
|
||
self.output(f"action 后诊断已{'开启' if self.agent.action_analysis_enabled else '关闭'}。")
|
||
return
|
||
if parts[0] != "config":
|
||
self.output("未知 llm 命令。")
|
||
return
|
||
updates = _parse_key_values(parts[1:])
|
||
self.llm_config.update(updates)
|
||
try:
|
||
self.agent.llm_client = build_llm_client(
|
||
base_url=self.llm_config.get("base_url"),
|
||
api_key=self.llm_config.get("api_key"),
|
||
model=self.llm_config.get("model"),
|
||
)
|
||
except Exception as exc:
|
||
self.output(f"LLM 配置失败: {exc}")
|
||
return
|
||
safe = {**self.llm_config}
|
||
if safe.get("api_key"):
|
||
safe["api_key"] = "***"
|
||
self.output("LLM 配置已加载: " + json.dumps(safe, ensure_ascii=False))
|
||
|
||
def _configure_mcp(self, text: str) -> None:
|
||
"""热加载 MCP client 配置。"""
|
||
command, _, path = text.partition(" ")
|
||
if command != "config" or not path.strip():
|
||
self.output("格式:mcp config <mcp_client.json>")
|
||
return
|
||
path = path.strip().strip('"')
|
||
try:
|
||
runner = build_mcp_runner_from_config(path)
|
||
except Exception as exc:
|
||
self.output(f"MCP 配置失败: {exc}")
|
||
return
|
||
self.agent.mcp_runner = runner
|
||
self.agent.router.mcp_runner = runner
|
||
self.mcp_config_path = path
|
||
self.output(f"MCP 配置已加载: {path}")
|
||
|
||
def _list_checkpoints(self) -> None:
|
||
"""列出当前 checkpoint 目录下的 JSON 文件。"""
|
||
checkpoint_dir = Path(self.checkpoint_path).parent
|
||
if not checkpoint_dir.exists():
|
||
self.output(f"checkpoint 目录不存在: {checkpoint_dir}")
|
||
return
|
||
files = sorted(checkpoint_dir.glob("*.json"), key=lambda item: item.stat().st_mtime, reverse=True)
|
||
if not files:
|
||
self.output(f"checkpoint 目录没有 JSON 文件: {checkpoint_dir}")
|
||
return
|
||
lines = ["checkpoint 列表:"]
|
||
for file in files[:20]:
|
||
lines.append(f"- {file}")
|
||
self.output("\n".join(lines))
|
||
|
||
def _load_checkpoint(self, path_text: str) -> None:
|
||
"""加载指定 checkpoint 文件。"""
|
||
if not path_text:
|
||
self.output("格式:load checkpoint <路径>")
|
||
return
|
||
checkpoint = Path(path_text)
|
||
if not checkpoint.exists():
|
||
self.output(f"checkpoint 不存在: {checkpoint}")
|
||
return
|
||
self.state = load_agent_state(checkpoint)
|
||
self.state.checkpoint_path = str(checkpoint)
|
||
self.checkpoint_path = str(checkpoint)
|
||
self.params = dict(self.state.params)
|
||
self.strategy = self.state.execution_strategy
|
||
self.target_ips = list(self.state.target_ips)
|
||
self.graph_runtime = None
|
||
self.output(f"已加载 checkpoint: {checkpoint}")
|
||
if self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
|
||
def _run_deploy(self) -> None:
|
||
"""在用户确认后创建状态并执行完整部署流程。"""
|
||
if self.state and self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
return
|
||
|
||
if not self._confirm_params_and_scope():
|
||
self.output("已取消执行。")
|
||
return
|
||
|
||
if not self._ask_yes_no("即将执行真实 action;确认执行请输入 yes: "):
|
||
self.output("已取消执行。")
|
||
return
|
||
|
||
self.state = self.agent.create_state(
|
||
params=self.params,
|
||
execution_strategy=self.strategy,
|
||
checkpoint_path=self.checkpoint_path,
|
||
target_ips=self.target_ips,
|
||
)
|
||
self.graph_runtime = None
|
||
self._execute_current_state()
|
||
|
||
def _confirm_params_and_scope(self) -> bool:
|
||
"""执行前确认参数和目标 IP 范围。"""
|
||
self.output(_format_redacted_params(redact_mapping(self.params)))
|
||
if not self._ask_yes_no("确认以上参数请输入 yes: "):
|
||
return False
|
||
if self.target_ips:
|
||
self.output("目标 IP: " + ", ".join(self.target_ips))
|
||
else:
|
||
self.output("目标 IP: 未指定,将在 get-online-ips 后使用全部在线 IP。")
|
||
return self._ask_yes_no("确认目标范围请输入 yes: ")
|
||
|
||
def _resume(self) -> None:
|
||
"""从内存状态或 checkpoint 文件继续执行部署流程。"""
|
||
if self.state is None:
|
||
checkpoint = Path(self.checkpoint_path)
|
||
if not checkpoint.exists():
|
||
self.output("当前没有可续跑的 checkpoint。")
|
||
return
|
||
self.state = load_agent_state(checkpoint)
|
||
self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint)
|
||
if self.graph_runtime and self.graph_runtime.waiting_confirmation:
|
||
self._print_confirmation()
|
||
return
|
||
self._execute_current_state()
|
||
|
||
def _execute_current_state(self) -> None:
|
||
"""执行当前 state,并输出报告、确认提示和 checkpoint 路径。"""
|
||
if self.state is None:
|
||
self.output("当前没有运行状态。")
|
||
return
|
||
try:
|
||
if self.graph_runtime is None or not self.graph_runtime.waiting_confirmation:
|
||
self.graph_runtime = LangGraphDeploymentRuntime(agent=self.agent)
|
||
result = self.graph_runtime.start(self.state)
|
||
except RuntimeError as exc:
|
||
self.output(f"LangGraph 确认运行器不可用,降级为本地执行: {exc}")
|
||
self.graph_runtime = None
|
||
self.state = self.agent.run_deploy_flow(self.state)
|
||
self._print_state_report_and_checkpoint()
|
||
return
|
||
self._apply_graph_result(result)
|
||
|
||
def _apply_graph_result(self, result: LangGraphRunResult) -> None:
|
||
"""把 LangGraph 运行结果同步回 chat 会话并输出用户可见状态。"""
|
||
if result.state is not None:
|
||
self.state = result.state
|
||
if self.state is None:
|
||
self.output("当前没有运行状态。")
|
||
return
|
||
self.output(result.report or self.agent.render_report(self.state))
|
||
if result.interrupted and result.confirmation:
|
||
self._print_confirmation_request(result.confirmation)
|
||
elif self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}")
|
||
|
||
def _print_state_report_and_checkpoint(self) -> None:
|
||
"""输出本地执行路径的状态报告和 checkpoint。"""
|
||
if self.state is None:
|
||
return
|
||
self.output(self.agent.render_report(self.state))
|
||
if self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
self.output(f"checkpoint: {self.state.checkpoint_path or self.checkpoint_path}")
|
||
|
||
def _status(self) -> None:
|
||
"""输出当前运行状态;没有 state 时输出 checkpoint 路径。"""
|
||
if self.state is None:
|
||
self.output("当前还没有运行状态。")
|
||
self.output(f"checkpoint: {self.checkpoint_path}")
|
||
return
|
||
self.output(self.agent.render_report(self.state))
|
||
if self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
|
||
def _confirm(self, *, approved: bool, note: str = "") -> None:
|
||
"""处理 approve/reject 命令。"""
|
||
if self.state is None:
|
||
checkpoint = Path(self.checkpoint_path)
|
||
if checkpoint.exists():
|
||
self.state = load_agent_state(checkpoint)
|
||
self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint)
|
||
else:
|
||
self.output("当前没有待确认任务。")
|
||
return
|
||
if not self.state.pending_confirmation:
|
||
self.output("当前没有待确认任务。")
|
||
return
|
||
|
||
if self.graph_runtime and self.graph_runtime.waiting_confirmation:
|
||
try:
|
||
result = self.graph_runtime.resume(approved=approved, note=note)
|
||
except RuntimeError as exc:
|
||
self.output(f"LangGraph 确认恢复失败,降级为本地确认: {exc}")
|
||
else:
|
||
self._apply_graph_result(result)
|
||
return
|
||
|
||
self.state = self.agent.confirm_pending(self.state, approved=approved, operator_note=note)
|
||
self.output(self.agent.render_report(self.state))
|
||
if self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
|
||
def _print_confirmation(self) -> None:
|
||
"""输出当前待人工确认事项。"""
|
||
if self.state is None:
|
||
return
|
||
request = self.agent.build_confirmation_request(self.state)
|
||
if not request:
|
||
return
|
||
self._print_confirmation_request(request)
|
||
|
||
def _print_confirmation_request(self, request: dict[str, Any]) -> None:
|
||
"""输出指定的人工确认请求。"""
|
||
self.output("需要人工确认:")
|
||
self.output(f"- type: {request.get('type')}")
|
||
if request.get("ip"):
|
||
self.output(f"- ip: {request['ip']}")
|
||
if request.get("failed_stage"):
|
||
self.output(f"- failed_stage: {request['failed_stage']}")
|
||
if request.get("failure_reason"):
|
||
self.output(f"- reason: {request['failure_reason']}")
|
||
self.output("输入 approve 执行回滚,或 reject [原因] 拒绝回滚。")
|
||
|
||
def _ask_yes_no(self, prompt: str) -> bool:
|
||
"""读取一次 yes/no 确认,只有 yes/y 视为确认。"""
|
||
try:
|
||
answer = self.input(prompt).strip().lower()
|
||
except EOFError:
|
||
return False
|
||
return answer in ("yes", "y")
|
||
|
||
def _load_existing_checkpoint_if_any(self) -> None:
|
||
"""会话启动时自动加载已存在的 checkpoint。"""
|
||
checkpoint = Path(self.checkpoint_path)
|
||
if not checkpoint.exists():
|
||
return
|
||
self.state = load_agent_state(checkpoint)
|
||
self.state.checkpoint_path = self.state.checkpoint_path or str(checkpoint)
|
||
self.output(f"已加载 checkpoint: {checkpoint}")
|
||
if self.state.pending_confirmation:
|
||
self._print_confirmation()
|
||
|
||
|
||
def run_interactive_chat(
|
||
*,
|
||
agent: PamDeployAgent,
|
||
params: dict[str, Any],
|
||
strategy: ExecutionStrategy,
|
||
checkpoint_path: str | None = None,
|
||
target_ips: list[str] | None = None,
|
||
input_func: InputFunc = input,
|
||
output_func: OutputFunc = print,
|
||
) -> InteractiveCliSession:
|
||
"""创建并运行交互式 CLI 会话,返回会话对象便于测试。"""
|
||
session = InteractiveCliSession(
|
||
agent=agent,
|
||
params=params,
|
||
strategy=strategy,
|
||
checkpoint_path=checkpoint_path,
|
||
target_ips=target_ips,
|
||
input_func=input_func,
|
||
output_func=output_func,
|
||
)
|
||
session.run()
|
||
return session
|
||
|
||
|
||
def _default_checkpoint_path() -> str:
|
||
"""生成默认 chat checkpoint 路径。"""
|
||
return str(Path("runtime") / "checkpoints" / f"chat_{time.strftime('%Y%m%d_%H%M%S')}.json")
|
||
|
||
|
||
def _choose_strategy(preference: str, default: ExecutionStrategy) -> ExecutionStrategy:
|
||
"""根据 LLM 偏好更新执行策略,非法值保留默认策略。"""
|
||
if preference in ("hybrid_node_mcp", "script_only", "fake"):
|
||
return preference # type: ignore[return-value]
|
||
return default
|
||
|
||
|
||
def _format_redacted_params(params: dict[str, Any]) -> str:
|
||
"""把脱敏后的参数字典格式化为多行文本。"""
|
||
lines = ["当前参数:"]
|
||
for key in sorted(params):
|
||
lines.append(f"- {key}: {params[key]}")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def _parse_key_values(parts: list[str]) -> dict[str, str]:
|
||
"""解析 KEY=VALUE 参数列表。"""
|
||
values: dict[str, str] = {}
|
||
for part in parts:
|
||
if "=" not in part:
|
||
continue
|
||
key, value = part.split("=", 1)
|
||
if key:
|
||
values[key] = value
|
||
return values
|
||
|
||
|
||
def _build_prompt_input(input_func: InputFunc) -> InputFunc:
|
||
"""如果安装了 prompt_toolkit,则启用历史记录和命令补全。"""
|
||
if input_func is not builtins.input:
|
||
return input_func
|
||
try:
|
||
from prompt_toolkit import PromptSession
|
||
from prompt_toolkit.completion import WordCompleter
|
||
from prompt_toolkit.history import FileHistory
|
||
except ImportError:
|
||
return input_func
|
||
|
||
commands = [
|
||
"help",
|
||
"preview",
|
||
"analyze",
|
||
"params",
|
||
"events",
|
||
"set",
|
||
"llm config",
|
||
"llm fallback",
|
||
"llm action-analysis on",
|
||
"llm action-analysis off",
|
||
"mcp config",
|
||
"run",
|
||
"status",
|
||
"approve",
|
||
"reject",
|
||
"resume",
|
||
"list checkpoints",
|
||
"load checkpoint",
|
||
"checkpoint",
|
||
"exit",
|
||
]
|
||
session = PromptSession(
|
||
history=FileHistory(str(Path("runtime") / "chat_history.txt")),
|
||
completer=WordCompleter(commands, ignore_case=True, sentence=True),
|
||
)
|
||
return session.prompt
|
||
|
||
|
||
def _build_output_func(output_func: OutputFunc) -> OutputFunc:
|
||
"""如果安装了 rich,则使用 rich 输出;否则保持原输出函数。"""
|
||
if output_func is not builtins.print:
|
||
return output_func
|
||
try:
|
||
from rich.console import Console
|
||
from rich.markdown import Markdown
|
||
except ImportError:
|
||
return output_func
|
||
console = Console()
|
||
|
||
def rich_print(value: str) -> None:
|
||
text = str(value)
|
||
stripped = text.lstrip()
|
||
if stripped.startswith("{") or stripped.startswith("["):
|
||
try:
|
||
console.print_json(text)
|
||
return
|
||
except Exception:
|
||
pass
|
||
if text.startswith("## ") or "\n| ---" in text:
|
||
console.print(Markdown(text))
|
||
return
|
||
console.print(text)
|
||
|
||
return rich_print
|