dark e572a26e6f pam_deploy_graph/agent.py:progress action 未完成不标记 completed,超时暂停在当前 action,支持断点继续。
llm 提示词和规则:新增 progress_complete 判断字段。
deploy.sh / deploy.ps1:poll-* action 入口改为单次查询。
interactive.py:chat 会播报进度更新。
config.txt.example / README / packaging 文档 / Skill 文档:同步进度查询参数和新 workflow 语义。
测试补充了进度重复查询、超时暂停、chat 进度播报。
2026-06-04 16:28:18 +08:00

215 lines
7.5 KiB
Python

"""统一维护面向 LLM 和 runtime 的 PAM action tool schema。"""
from __future__ import annotations
from pam_deploy_graph.action_router import build_action_backends
from pam_deploy_graph.constants import GLOBAL_ACTION_SEQUENCE, IP_ACTION_SEQUENCE
from pam_deploy_graph.models import AgentExecutionMode, ExecutionStrategy
from pam_deploy_graph.models import ActionToolSpec, SkillPolicy
ACTION_TOOL_SPECS: dict[str, ActionToolSpec] = {
"get-token": ActionToolSpec(
name="get_token",
action="get-token",
scope="global",
description="获取 PAM HOME OAuth token。",
risk_level="low",
required_param_fields=("HOME_BASE_URL", "CLIENT_ID", "CLIENT_SECRET"),
preferred_backend="script",
),
"create-version": ActionToolSpec(
name="create_version",
action="create-version",
scope="global",
description="创建版本记录。",
risk_level="medium",
preferred_backend="script",
),
"upload-package": ActionToolSpec(
name="upload_package",
action="upload-package",
scope="global",
description="上传软件包并返回 HASH_CODE。",
risk_level="high",
preferred_backend="script",
),
"publish-version": ActionToolSpec(
name="publish_version",
action="publish-version",
scope="global",
description="发布版本,需要已有 HASH_CODE。",
risk_level="high",
requires_confirmation=True,
required_runtime_fields=("hash_code",),
preferred_backend="script",
),
"get-node-url": ActionToolSpec(
name="get_node_url",
action="get-node-url",
scope="global",
description="获取目标 PAM NODE 地址。",
risk_level="low",
preferred_backend="script",
),
"get-online-ips": ActionToolSpec(
name="get_online_ips",
action="get-online-ips",
scope="global",
description="获取当前在线工作站 IP 列表。",
risk_level="low",
),
"create-download-task": ActionToolSpec(
name="create_download_task",
action="create-download-task",
scope="global",
description="创建云下载任务。",
risk_level="high",
),
"poll-download-progress": ActionToolSpec(
name="poll_download_progress",
action="poll-download-progress",
scope="global",
description="单次查询云下载任务进度;是否继续查询由 Agent workflow 和 LLM 审核决定。",
risk_level="medium",
),
"upgrade-ip": ActionToolSpec(
name="upgrade_ip",
action="upgrade-ip",
scope="ip",
description="对单个工作站创建升级任务。",
risk_level="high",
requires_confirmation=True,
),
"poll-upgrade-progress": ActionToolSpec(
name="poll_upgrade_progress",
action="poll-upgrade-progress",
scope="ip",
description="单次查询单个工作站升级进度;是否继续查询由 Agent workflow 和 LLM 审核决定。",
risk_level="medium",
),
"start-ip": ActionToolSpec(
name="start_ip",
action="start-ip",
scope="ip",
description="启动单个工作站应用。",
risk_level="high",
requires_confirmation=True,
),
"stop-ip": ActionToolSpec(
name="stop_ip",
action="stop-ip",
scope="ip",
description="停止单个工作站应用。",
risk_level="high",
requires_confirmation=True,
),
"verify-ip": ActionToolSpec(
name="verify_ip",
action="verify-ip",
scope="ip",
description="校验单个工作站版本和健康状态。",
risk_level="medium",
),
"download-log": ActionToolSpec(
name="download_log",
action="download-log",
scope="ip",
description="下载单个工作站日志。",
risk_level="low",
),
"rollback-ip": ActionToolSpec(
name="rollback_ip",
action="rollback-ip",
scope="ip",
description="对单个工作站执行回滚。",
risk_level="high",
requires_confirmation=True,
),
}
def ordered_actions_for_skill(policy: SkillPolicy) -> list[str]:
"""根据 skill 策略返回默认 action 顺序。"""
global_actions = list(policy.action_sequence or GLOBAL_ACTION_SEQUENCE)
ip_actions = list(policy.ip_action_sequence or IP_ACTION_SEQUENCE)
return [*global_actions, *ip_actions]
ACTION_DEPENDENCIES: dict[str, tuple[str, ...]] = {
"create-version": ("get-token",),
"upload-package": ("get-token", "create-version"),
"publish-version": ("get-token", "create-version", "upload-package"),
"get-node-url": ("get-token",),
"get-online-ips": ("get-token", "get-node-url"),
"create-download-task": ("get-token", "get-node-url", "get-online-ips"),
"poll-download-progress": ("get-token", "get-node-url", "get-online-ips", "create-download-task"),
}
for _ip_action in IP_ACTION_SEQUENCE:
ACTION_DEPENDENCIES[_ip_action] = tuple(GLOBAL_ACTION_SEQUENCE)
def allowed_tool_specs(policy: SkillPolicy) -> list[ActionToolSpec]:
"""按 skill 限制过滤并排序 tool specs。"""
ordered_actions = ordered_actions_for_skill(policy)
specs: list[ActionToolSpec] = []
for action in ordered_actions:
if action not in policy.allowed_actions:
continue
if action in policy.forbidden_actions:
continue
spec = ACTION_TOOL_SPECS.get(action)
if spec is not None:
specs.append(spec)
return specs
def tool_summaries(policy: SkillPolicy, strategy: ExecutionStrategy) -> list[dict[str, str]]:
"""生成给 LLM 使用的受控 tool 摘要。"""
routes = build_action_backends(strategy)
summaries: list[dict[str, str]] = []
for spec in allowed_tool_specs(policy):
summaries.append(
{
"name": spec.name,
"action": spec.action,
"scope": spec.scope,
"description": spec.description,
"risk_level": spec.risk_level,
"backend": routes.get(spec.action, spec.preferred_backend or ""),
"requires_confirmation": "true" if spec.requires_confirmation else "false",
}
)
return summaries
def normalize_planned_actions(
planned_actions: list[str],
*,
policy: SkillPolicy,
mode: AgentExecutionMode,
) -> list[str]:
"""按 skill 限制和依赖关系归一化 planned actions。"""
allowed = set(policy.allowed_actions)
forbidden = set(policy.forbidden_actions)
ordered = ordered_actions_for_skill(policy)
if not planned_actions:
return [action for action in ordered if action in allowed and action not in forbidden]
normalized: list[str] = []
for action in planned_actions:
if action in allowed and action not in forbidden and action not in normalized:
normalized.append(action)
expanded: list[str] = []
for action in normalized:
for dependency in ACTION_DEPENDENCIES.get(action, ()):
if dependency in allowed and dependency not in forbidden and dependency not in expanded:
expanded.append(dependency)
if action not in expanded:
expanded.append(action)
global_order = [action for action in ordered if action in GLOBAL_ACTION_SEQUENCE and action in expanded]
ip_order = [action for action in ordered if action in IP_ACTION_SEQUENCE and action in expanded]
return [*global_order, *ip_order]