agent_deply/pam_deploy_graph/logging_utils.py
2026-06-05 10:41:24 +08:00

151 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Agent 运行日志配置和脱敏工具。"""
from __future__ import annotations
import json
import logging
import os
import re
from dataclasses import asdict, is_dataclass
from logging.handlers import TimedRotatingFileHandler
from pathlib import Path
from typing import Any
from .constants import SENSITIVE_KEYS
DEFAULT_LOG_FILE = Path("logs") / "pam_deploy_agent.log"
DEFAULT_LOG_RETENTION_DAYS = 14
LOG_FILE_ENV = "PAM_AGENT_LOG_FILE"
LOG_LEVEL_ENV = "PAM_AGENT_LOG_LEVEL"
LOG_RETENTION_DAYS_ENV = "PAM_AGENT_LOG_RETENTION_DAYS"
_HANDLER_MARKER = "_pam_deploy_agent_handler"
_SENSITIVE_NAME_PARTS = ("secret", "token", "authorization", "api_key", "apikey", "password")
_ASSIGNMENT_PATTERN = re.compile(
r"(?i)\b(client_secret|mcp_client_secret|api_key|pam_llm_api_key|token|access_token|authorization|password)\b"
r"\s*([:=])\s*([^\s,;]+)"
)
_AUTH_BEARER_ASSIGNMENT_PATTERN = re.compile(r"(?i)\b(authorization)\b\s*([:=])\s*bearer\s+[^\s,;]+")
_BEARER_PATTERN = re.compile(r"(?i)(bearer\s+)[A-Za-z0-9._~+\-/=]+")
def configure_logging(
log_file: str | Path | None = None,
level: str | int | None = None,
retention_days: int | str | None = None,
) -> Path:
"""配置 Agent 每日滚动文件日志;重复调用不会重复添加 handler。"""
actual_path = Path(log_file or os.getenv(LOG_FILE_ENV) or DEFAULT_LOG_FILE)
actual_path.parent.mkdir(parents=True, exist_ok=True)
actual_level = _resolve_level(level or os.getenv(LOG_LEVEL_ENV) or "INFO")
actual_retention_days = _resolve_retention_days(
retention_days if retention_days is not None else os.getenv(LOG_RETENTION_DAYS_ENV),
)
package_logger = logging.getLogger("pam_deploy_graph")
package_logger.setLevel(actual_level)
package_logger.propagate = False
marker = str(actual_path.resolve())
for handler in list(package_logger.handlers):
if getattr(handler, _HANDLER_MARKER, "") == marker:
if isinstance(handler, TimedRotatingFileHandler):
handler.setLevel(actual_level)
handler.backupCount = actual_retention_days
return actual_path
package_logger.removeHandler(handler)
handler.close()
break
handler = TimedRotatingFileHandler(
actual_path,
when="midnight",
interval=1,
backupCount=actual_retention_days,
encoding="utf-8",
)
setattr(handler, _HANDLER_MARKER, marker)
handler.setLevel(actual_level)
handler.setFormatter(
logging.Formatter(
fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
package_logger.addHandler(handler)
package_logger.info(
"日志已初始化 path=%s level=%s rotation=daily retention_days=%s",
actual_path,
logging.getLevelName(actual_level),
actual_retention_days,
)
return actual_path
def redact_for_log(value: Any, *, max_text_len: int = 1200) -> Any:
"""递归脱敏并截断日志对象避免把密钥、token 或完整长文本写入日志。"""
if is_dataclass(value) and not isinstance(value, type):
return redact_for_log(asdict(value), max_text_len=max_text_len)
if isinstance(value, dict):
redacted: dict[str, Any] = {}
for key, item in value.items():
text_key = str(key)
if _is_sensitive_key(text_key):
redacted[text_key] = "***"
else:
redacted[text_key] = redact_for_log(item, max_text_len=max_text_len)
return redacted
if isinstance(value, (list, tuple, set)):
return [redact_for_log(item, max_text_len=max_text_len) for item in value]
if isinstance(value, str):
return _truncate(_redact_string(value), max_text_len)
if value is None or isinstance(value, (bool, int, float)):
return value
return _truncate(_redact_string(str(value)), max_text_len)
def json_for_log(value: Any, *, max_text_len: int = 1200) -> str:
"""把对象脱敏后序列化成适合单行日志的 JSON 文本。"""
redacted = redact_for_log(value, max_text_len=max_text_len)
return json.dumps(redacted, ensure_ascii=False, default=str, sort_keys=True)
def _resolve_level(value: str | int) -> int:
"""解析日志级别字符串,非法值降级为 INFO。"""
if isinstance(value, int):
return value
resolved = getattr(logging, str(value).upper(), logging.INFO)
return resolved if isinstance(resolved, int) else logging.INFO
def _resolve_retention_days(value: int | str | None) -> int:
"""解析日志保留天数,非法值使用默认值。"""
if value in (None, ""):
return DEFAULT_LOG_RETENTION_DAYS
try:
days = int(str(value).strip())
except (TypeError, ValueError):
return DEFAULT_LOG_RETENTION_DAYS
return max(days, 0)
def _is_sensitive_key(key: str) -> bool:
"""判断字段名是否应脱敏。"""
if key in SENSITIVE_KEYS:
return True
normalized = key.lower().replace("-", "_")
return any(part in normalized for part in _SENSITIVE_NAME_PARTS)
def _truncate(value: str, limit: int) -> str:
"""截断过长字符串。"""
if len(value) <= limit:
return value
return value[:limit] + "...[已截断]"
def _redact_string(value: str) -> str:
"""脱敏字符串中的常见 KEY=VALUE 和 Bearer token 片段。"""
value = _AUTH_BEARER_ASSIGNMENT_PATTERN.sub(lambda match: f"{match.group(1)}{match.group(2)}***", value)
value = _ASSIGNMENT_PATTERN.sub(lambda match: f"{match.group(1)}{match.group(2)}***", value)
return _BEARER_PATTERN.sub(lambda match: f"{match.group(1)}***", value)