Claude API基础专题(七):Agent架构与智能体设计
Claude API基础专题(七):Agent架构与智能体设计 ⭐⭐⭐⭐
目标读者:希望构建复杂AI应用系统的架构师与高级开发者 前置知识:已完成第一篇《API基础》、第二篇《提示词工程》、第三篇《工具调用》、第四篇《RAG系统》、第五篇《MCP协议》、第六篇《Claude Code与Computer Use》 学习提醒:本文是Claude API系列的收官之作,将系统性地梳理Agent架构的核心概念与设计模式
章节导航
| 小节 | 主题 | 重要程度 |
|---|---|---|
| 7.1 | 从工具调用到Agent:跨越的关键一步 | ⭐⭐⭐⭐⭐ |
| 7.2 | 单Agent系统架构 | ⭐⭐⭐⭐⭐ |
| 7.3 | 多Agent协作系统 | ⭐⭐⭐⭐⭐ |
| 7.4 | 执行模式:并行与链式 | ⭐⭐⭐⭐⭐ |
| 7.5 | 状态管理与上下文 | ⭐⭐⭐⭐⭐ |
| 7.6 | 错误处理与容错机制 | ⭐⭐⭐⭐⭐ |
| 7.7 | 安全与权限管理 | ⭐⭐⭐⭐ |
| 7.8 | 生产环境最佳实践 | ⭐⭐⭐⭐⭐ |
7.1 从工具调用到Agent:跨越的关键一步
为什么需要Agent?
在深入Agent架构之前,我们必须回答一个根本问题:既然工具调用已经如此强大,为什么还需要Agent?
让我们回顾一下工具调用的本质:
# 工具调用的工作模式
result = await client.messages.create(
model="claude-opus-4-20241120",
messages=[{
"role": "user",
"content": "帮我查一下北京天气"
}],
tools=[{"name": "get_weather", ...}]
)在这个模式中,LLM的角色是被动响应者:
- 用户提问 → LLM调用工具 → 工具返回结果 → LLM回答
这是一种请求-响应的交互模式,LLM本身不持有状态,不主动决策,只是根据当前输入决定调用哪个工具。
但现实世界的任务往往更加复杂:
| 任务特征 | 工具调用的局限 | 为什么需要Agent |
|---|---|---|
| 多步骤决策 | 每次决策独立 | Agent能保持目标状态 |
| 条件分支 | 无法根据结果跳转 | Agent能动态规划路径 |
| 长期任务 | 上下文会丢失 | Agent能持久化状态 |
| 多工具协同 | 缺乏编排能力 | Agent能编排执行流程 |
| 错误恢复 | 失败即终止 | Agent能重试和回退 |
Agent的本质定义
那么,究竟什么是Agent?
Agent = LLM + 状态 + 工具 + 执行循环 + 终止条件
这个定义揭示了Agent的五个核心要素:
┌─────────────────────────────────────────────────────────────┐
│ Agent 系统 │
├─────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────┐ │
│ │ LLM │ ← 大脑:理解、推理、决策 │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 状态 │ ← 记忆:目标、上下文、中间结果 │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 工具 │ ← 能力:搜索、代码、文件、API │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 执行循环 │ ← 引擎:观察→决策→执行 │
│ └─────┬─────┘ │
│ │ │
│ ┌─────▼─────┐ │
│ │ 终止条件 │ ← 边界:完成、出错、超时 │
│ └───────────┘ │
│ │
└─────────────────────────────────────────────────────────────┘为什么这五个要素缺一不可?
- LLM:没有LLM就没有智能决策
- 状态:没有状态就无法处理多步骤任务
- 工具:没有工具就无法影响外部世界
- 执行循环:没有循环就无法持续工作
- 终止条件:没有终止条件就会无限循环
Agent vs 传统软件
| 维度 | 传统软件 | Agent |
|---|---|---|
| 决策方式 | 确定性的if-else | 基于LLM的概率推理 |
| 流程定义 | 预先设计 | 运行时动态规划 |
| 错误处理 | 显式try-catch | 自我纠错能力 |
| 状态管理 | 显式变量 | 隐式上下文 |
| 扩展方式 | 模块化 | 添加工具/提示词 |
| 可预测性 | 高 | 中(有一定随机性) |
7.2 单Agent系统架构
最小可运行Agent
让我们从一个最简单的Agent开始,逐步理解其架构:
from anthropic import Anthropic
from dataclasses import dataclass, field
from typing import Optional, Any
import asyncio
@dataclass
class AgentState:
"""
Agent状态容器
为什么需要专门的状态类?
1. 封装所有状态,边界清晰
2. 便于序列化和持久化
3. 类型提示让代码更健壮
"""
goal: str # 当前目标
messages: list[dict] = field(default_factory=list) # 对话历史
context: dict[str, Any] = field(default_factory=dict) # 共享上下文
results: list[dict] = field(default_factory=list) # 执行结果
iterations: int = 0 # 迭代计数器
max_iterations: int = 10 # 最大迭代次数
class SimpleAgent:
"""
最小可运行Agent
设计原则:
- 简单性:先让它跑起来
- 可观测性:每个步骤都记录
- 可停止:明确的终止条件
"""
def __init__(self, api_key: str, tools: list[dict]):
self.client = Anthropic(api_key=api_key)
self.tools = tools
async def run(self, goal: str) -> dict[str, Any]:
"""
Agent主循环
流程:
1. 初始化状态
2. 进入执行循环
3. 每次迭代:LLM推理 → 调用工具 → 更新状态
4. 达到终止条件时退出
"""
state = AgentState(goal=goal)
while not self._should_terminate(state):
state.iterations += 1
# 步骤1:LLM推理
response = await self._think(state)
# 步骤2:检查是否需要调用工具
if response.content[0].type == "tool_use":
tool_result = await self._execute_tool(response)
state.results.append(tool_result)
state.messages.append(response)
state.messages.append(tool_result)
else:
# LLM直接回答,任务完成
return {
"status": "completed",
"goal": goal,
"result": response.content[0].text,
"iterations": state.iterations
}
return {
"status": "terminated",
"goal": goal,
"reason": "max_iterations_reached",
"iterations": state.iterations
}
def _should_terminate(self, state: AgentState) -> bool:
"""判断是否应该终止"""
# 达到最大迭代次数
if state.iterations >= state.max_iterations:
return True
# 其他终止条件可以在这里添加
return False
async def _think(self, state: AgentState) -> Any:
"""LLM推理"""
response = self.client.messages.create(
model="claude-opus-4-20241120",
max_tokens=4096,
messages=[{
"role": "user",
"content": f"目标:{state.goal}\n\n请决定下一步行动。"
}],
tools=self.tools
)
return response
async def _execute_tool(self, response: Any) -> dict:
"""执行工具调用"""
tool_use = response.content[0]
tool_name = tool_use.name
tool_args = tool_use.input
# 这里应该调用实际的工具
# 为了简化,省略具体实现
return {
"role": "user",
"content": f"Tool {tool_name} executed with args {tool_args}"
}为什么要设计成这样的架构?
1. 状态外部化(AgentState类)
# 好:状态外部化
state = AgentState(goal=goal)
while not self._should_terminate(state):
...
# 不好:状态散落在各处
while self.iterations < self.max_iterations:
self.messages.append(...)
self.context.update(...)原因:
- 状态外部化后,可以序列化保存(断点续跑)
- 状态外部化后,可以热切换(修改状态不影响逻辑)
- 状态外部化后,可以并行运行多个Agent实例
2. 终止条件前置判断
# 好:先检查再执行
while not self._should_terminate(state):
response = await self._think(state)
...
# 不好:先执行再检查
while True:
response = await self._think(state)
if self._should_terminate(state):
break原因:避免在达到终止条件后还执行一次无用推理。
3. 结果记录完整
state.results.append(tool_result)
state.messages.append(response)
state.messages.append(tool_result) # 工具结果也加入上下文原因:保持完整的对话历史,让LLM能够理解完整的执行脉络。
7.3 多Agent协作系统
为什么需要多Agent?
现实世界的复杂任务往往需要分工协作:
任务:帮用户规划一次旅行
单个Agent的问题:
- 需要同时是旅行专家 + 酒店专家 + 天气专家 + 预算专家
- 知识过于分散,难以精通所有领域
- 单一Agent处理所有任务,响应会变慢
多Agent方案:
- 旅行规划Agent(主Agent)负责任务分解和协调
- 酒店Agent负责搜索和推荐酒店
- 天气Agent负责查询天气预报
- 预算Agent负责计算和控制预算多Agent架构模式
模式一:星型架构(主从模式)
┌─────────────┐
│ 主Agent │
│ (协调者) │
└──────┬──────┘
│
┌────────────────┼────────────────┐
│ │ │
┌─────▼─────┐ ┌─────▼─────┐ ┌─────▼─────┐
│ 子Agent1 │ │ 子Agent2 │ │ 子Agent3 │
│ (酒店) │ │ (天气) │ │ (预算) │
└───────────┘ └───────────┘ └───────────┘class MasterAgent:
"""
星型架构主Agent
特点:
- 主Agent负责任务分解和结果汇总
- 子Agent只做特定领域的任务
- 主Agent和子Agent之间通过消息传递
"""
def __init__(self):
self.sub_agents = {
"hotel": HotelAgent(),
"weather": WeatherAgent(),
"budget": BudgetAgent()
}
async def run(self, task: str) -> dict:
# 1. 分解任务
subtasks = await self._decompose_task(task)
# 2. 并行派发子任务
results = await asyncio.gather(*[
self._run_subagent(name, subtask)
for name, subtask in subtasks.items()
])
# 3. 汇总结果
return await self._aggregate_results(results)模式二:链式架构(流水线模式)
┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐
│ Agent1 │ → │ Agent2 │ → │ Agent3 │ → │ Agent4 │
│ (预处理) │ │ (核心处理) │ │ (验证) │ │ (输出) │
└───────────┘ └───────────┘ └───────────┘ └───────────┘class ChainAgent:
"""
链式架构Agent
特点:
- 每个Agent有明确的职责(预处理/核心/验证/输出)
- 输出层层传递,形成处理流水线
- 适合有明确处理步骤的任务
"""
def __init__(self):
self.chain = [
PreprocessAgent(),
CoreAgent(),
ValidationAgent(),
OutputAgent()
]
async def run(self, input_data: Any) -> Any:
current = input_data
for agent in self.chain:
current = await agent.process(current)
if not self._is_valid(current):
# 验证失败,触发错误处理
return await self._handle_error(current, agent)
return current模式三:网状架构(对等模式)
┌───────────┐────────────┐
│ Agent1 │──────────││
└───┬───────┘ ││
│ ││
│ ┌─────────────▼┐
└───►│ Agent2 │
└──────┬───────┘
│
┌──────▼───────┐
│ Agent3 │
└───────────────┘class PeerNetwork:
"""
网状架构(对等模式)
特点:
- 没有中心协调者
- Agent之间可以直接通信
- 适合需要横向协作的场景
- 复杂度高,需要好的协调机制
"""
async def run(self, task: str) -> dict:
# 使用消息队列进行Agent间通信
queue = MessageQueue()
# 启动所有Agent
agents = [Agent(i) for i in range(3)]
tasks = [agent.run(queue) for agent in agents]
# 发送初始任务
await queue.publish("task", task)
# 等待所有Agent完成
results = await asyncio.gather(*tasks)
return self._merge_results(results)如何选择架构模式?
| 场景 | 推荐架构 | 原因 |
|---|---|---|
| 任务明确、子任务独立 | 星型 | 便于并行、易于管理 |
| 处理流程固定 | 链式 | 清晰、易于调试 |
| 横向协作、无固定流程 | 网状 | 灵活、适应性强 |
| 需要高可靠性 | 星型+链式混合 | 主备冗余 |
7.4 执行模式:并行与链式
并行执行
并行执行是提升Agent效率的关键技术:
async def parallel_execution(tasks: list[dict]) -> list[dict]:
"""
并行执行多个任务
为什么并行能提升效率?
假设每个任务需要3秒:
- 串行:3 * N 秒
- 并行:约3秒(理想情况)
注意:
- 并行任务之间不能有依赖
- 需要处理好并发竞争问题
"""
async def execute_single(task: dict) -> dict:
agent = create_agent(task["type"])
result = await agent.run(task["input"])
return {"task_id": task["id"], "result": result}
# 使用asyncio.gather实现真正并行
results = await asyncio.gather(*[
execute_single(task) for task in tasks
])
return results什么任务适合并行?
# 适合并行的任务
parallel_tasks = [
{"id": 1, "type": "search", "input": "北京天气"},
{"id": 2, "type": "search", "input": "上海天气"}, # 独立
{"id": 3, "type": "search", "input": "广州天气"}, # 独立
]
# 不适合并行的任务(存在依赖)
sequential_tasks = [
{"id": 1, "type": "search", "input": "用户ID"},
{"id": 2, "type": "fetch", "input": "依赖任务1的结果"}, # 依赖
{"id": 3, "type": "save", "input": "依赖任务2的结果"}, # 依赖
]链式执行
链式执行用于有依赖关系的任务:
async def chain_execution(tasks: list[dict]) -> dict:
"""
链式执行(串行依赖)
执行流程:
Task1 → Task2 → Task3 → ... → TaskN
每个任务接收前一个任务的输出作为输入
"""
context = {}
for i, task in enumerate(tasks):
# 构建当前任务的输入
task_input = await self._prepare_input(task, context)
# 执行当前任务
agent = create_agent(task["type"])
result = await agent.run(task_input)
# 保存结果到上下文,供后续任务使用
context[task["id"]] = result
# 检查是否需要错误恢复
if not self._is_success(result):
if task.get("retry"):
result = await self._retry(task, context)
else:
raise ExecutionError(f"Task {task['id']} failed")
return context # 返回完整的上下文(包含所有任务的结果)并行与链式的混合模式
class HybridExecutor:
"""
混合执行器:并行 + 链式的结合
适用场景:
- 部分任务可以并行
- 部分任务有依赖关系
示例:旅行规划
1. 并行:同时查询 酒店、景点、天气
2. 链式:基于查询结果生成行程 → 审核行程 → 输出最终方案
"""
async def run(self, workflow: dict) -> dict:
# 阶段1:并行执行独立任务
parallel_results = await self._parallel_phase(workflow["parallel_tasks"])
# 阶段2:基于并行结果执行链式任务
context = {"parallel": parallel_results}
chain_result = await self._chain_phase(
workflow["chain_tasks"],
context
)
return {
"parallel_results": parallel_results,
"chain_result": chain_result
}
async def _parallel_phase(self, tasks: list[dict]) -> dict:
"""并行阶段"""
results = await asyncio.gather(*[
self._execute(task) for task in tasks
])
return {task["id"]: result for task, result in zip(tasks, results)}
async def _chain_phase(self, tasks: list[dict], context: dict) -> dict:
"""链式阶段"""
for task in tasks:
context[task["id"]] = await self._execute(task, context)
return context7.5 状态管理与上下文
状态持久化的重要性
为什么Agent需要特别关注状态管理?
# 问题:LLM的上下文是有限的
MAX_TOKENS = 200000 # Claude Opus的最大上下文
# 如果对话历史越来越长...
messages = [
{"role": "user", "content": "第一轮对话"}, # 100 tokens
{"role": "assistant", "content": "第一轮回答"}, # 200 tokens
{"role": "user", "content": "第二轮对话"}, # 150 tokens
{"role": "assistant", "content": "第二轮回答"}, # 300 tokens
# ... 100轮后
# tokens总量超过限制
]解决方案:状态压缩与摘要
class StateManager:
"""
状态管理器
核心功能:
1. 压缩对话历史
2. 提取关键信息
3. 维护工作上下文
"""
def __init__(self, max_history: int = 10):
self.max_history = max_history # 保留最近N轮对话
self.summaries = [] # 压缩后的摘要
self.working_memory = {} # 当前工作内存
def add_interaction(self, user_msg: str, assistant_msg: str):
"""添加一轮交互"""
# 1. 加入历史
self.summaries.append({
"user": user_msg,
"assistant": assistant_msg,
"timestamp": now()
})
# 2. 检查是否需要压缩
if len(self.summaries) > self.max_history:
self._compress()
def _compress(self):
"""
压缩历史
压缩策略:
1. 保留最近N轮完整对话
2. 更早的对话压缩成摘要
3. 提取关键实体和决策
"""
recent = self.summaries[-self.max_history:]
older = self.summaries[:-self.max_history]
# 用LLM生成摘要
summary_prompt = f"""
请总结以下对话的关键信息:
{self._format_conversation(older)}
提取:
1. 用户的主要目标
2. 已完成的关键步骤
3. 当前状态
4. 重要的中间结果
"""
summary = self.client.messages.create(
model="claude-opus-4-20241120",
messages=[{"role": "user", "content": summary_prompt}]
)
# 保存摘要,清空旧历史
self.working_memory["conversation_summary"] = summary.content[0].text
self.summaries = recent
def get_context_for_llm(self) -> str:
"""构建发送给LLM的上下文"""
parts = []
# 1. 添加摘要(如果有)
if "conversation_summary" in self.working_memory:
parts.append(f"对话摘要:{self.working_memory['conversation_summary']}")
# 2. 添加最近N轮对话
for item in self.summaries[-self.max_history:]:
parts.append(f"用户:{item['user']}")
parts.append(f"助手:{item['assistant']}")
return "\n".join(parts)上下文窗口的智能管理
class SmartContextManager:
"""
智能上下文管理器
核心思想:
- 不是简单截断,而是智能选择
- 保留与当前任务最相关的上下文
"""
def __init__(self, max_tokens: int = 150000):
self.max_tokens = max_tokens
self.priority_levels = {
"critical": ["目标", "关键约束", "核心决策"],
"important": ["中间结果", "用户偏好", "当前状态"],
"normal": ["一般对话", "解释说明"],
"discardable": ["问候", "重复确认"]
}
def build_context(self, all_items: list[dict], current_task: str) -> str:
"""
构建当前任务所需的上下文
算法:
1. 为每个上下文项计算相关性分数
2. 按分数排序
3. 从最高分开始选取,直到达到token限制
"""
scored_items = []
for item in all_items:
relevance = self._calculate_relevance(item, current_task)
priority = self._get_priority(item)
# 综合分数 = 相关性 * 优先级权重
weight = {"critical": 1.0, "important": 0.7, "normal": 0.4, "discardable": 0.0}
score = relevance * weight.get(priority, 0.5)
scored_items.append((score, item))
# 按分数降序排序
scored_items.sort(key=lambda x: x[0], reverse=True)
# 选取项直到达到token限制
selected = []
total_tokens = 0
for score, item in scored_items:
item_tokens = self._estimate_tokens(item)
if total_tokens + item_tokens <= self.max_tokens:
selected.append(item)
total_tokens += item_tokens
return self._format_selected(selected)7.6 错误处理与容错机制
错误分类与处理策略
class ErrorType(Enum):
"""Agent可能遇到的错误类型"""
# LLM相关错误
LLM_TIMEOUT = "llm_timeout" # LLM响应超时
LLM_RATE_LIMIT = "llm_rate_limit" # API限流
LLM_INVALID_RESPONSE = "llm_invalid" # LLM返回无效响应
# 工具相关错误
TOOL_NOT_FOUND = "tool_not_found" # 工具不存在
TOOL_EXECUTION_FAILED = "tool_failed" # 工具执行失败
TOOL_TIMEOUT = "tool_timeout" # 工具执行超时
# 状态相关错误
STATE_CORRUPTED = "state_corrupted" # 状态损坏
CONTEXT_OVERFLOW = "context_overflow" # 上下文溢出
# 业务相关错误
MAX_ITERATIONS = "max_iterations" # 达到最大迭代
USER_CANCELLED = "user_cancelled" # 用户取消
class ErrorHandler:
"""
错误处理器
设计原则:
1. 不同错误类型采用不同的处理策略
2. 记录错误日志便于调试
3. 在适当时候回退或重试
"""
def __init__(self):
self.error_counts = {}
async def handle(self, error: Exception, state: AgentState) -> ErrorAction:
"""
处理错误,返回处理动作
"""
error_type = self._classify(error)
# 记录错误
self._log_error(error_type, error, state)
# 根据错误类型决定动作
handlers = {
ErrorType.LLM_TIMEOUT: self._handle_timeout,
ErrorType.LLM_RATE_LIMIT: self._handle_rate_limit,
ErrorType.TOOL_EXECUTION_FAILED: self._handle_tool_failure,
ErrorType.CONTEXT_OVERFLOW: self._handle_context_overflow,
ErrorType.MAX_ITERATIONS: self._handle_max_iterations,
}
handler = handlers.get(error_type, self._handle_unknown)
return await handler(error, state)
async def _handle_timeout(self, error, state):
"""超时处理:等待后重试"""
state.context["retry_count"] = state.context.get("retry_count", 0) + 1
if state.context["retry_count"] < 3:
await asyncio.sleep(2 ** state.context["retry_count"]) # 指数退避
return ErrorAction.RETRY
else:
return ErrorAction.FAIL
async def _handle_rate_limit(self, error, state):
"""限流处理:等待指定时间"""
retry_after = getattr(error, "retry_after", 60)
await asyncio.sleep(retry_after)
return ErrorAction.RETRY
async def _handle_tool_failure(self, error, state):
"""工具失败处理:尝试备用方案"""
tool_name = getattr(error, "tool_name", None)
# 检查是否有备用工具
if self._has_fallback(tool_name):
state.context["using_fallback"] = True
return ErrorAction.RETRY_WITH_FALLBACK
return ErrorAction.FAIL
async def _handle_context_overflow(self, error, state):
"""上下文溢出处理:压缩历史"""
# 调用状态管理器压缩历史
state_manager = state.context.get("state_manager")
if state_manager:
state_manager._compress()
return ErrorAction.RETRY
else:
return ErrorAction.FAIL
async def _handle_max_iterations(self, error, state):
"""达到最大迭代:返回当前最佳结果"""
return ErrorAction.RETURN_BEST_RESULT重试与回退机制
class RetryPolicy:
"""
重试策略
为什么需要重试策略?
1. 瞬时故障(网络抖动)可能自行恢复
2. 限流错误等待后通常可以继续
3. 合理重试能提高系统稳定性
"""
def __init__(
self,
max_retries: int = 3,
base_delay: float = 1.0,
exponential_base: float = 2.0,
max_delay: float = 60.0,
jitter: bool = True
):
self.max_retries = max_retries
self.base_delay = base_delay
self.exponential_base = exponential_base
self.max_delay = max_delay
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""计算重试延迟"""
# 指数退避
delay = self.base_delay * (self.exponential_base ** attempt)
delay = min(delay, self.max_delay)
# 添加随机抖动,避免惊群效应
if self.jitter:
import random
delay = delay * (0.5 + random.random())
return delay
class FallbackManager:
"""
备用方案管理器
核心思想:
- 每个主工具可以有多个备用工具
- 主工具失败时,自动尝试备用工具
- 记录使用情况,便于优化
"""
def __init__(self):
self.fallback_map = {
"primary_search": ["fallback_search_1", "fallback_search_2"],
"primary_translate": ["fallback_translate"],
"primary_code_exec": ["fallback_sandbox"]
}
self.usage_stats = {}
async def execute_with_fallback(self, primary_tool: str, args: dict) -> Any:
"""执行主工具,失败时尝试备用方案"""
tools_to_try = [primary_tool] + self.fallback_map.get(primary_tool, [])
last_error = None
for tool in tools_to_try:
try:
result = await self._execute_tool(tool, args)
self._record_success(tool)
return result
except Exception as e:
last_error = e
self._record_failure(tool, e)
continue
# 所有工具都失败
raise AllToolsFailedError(tools_to_try, last_error)7.7 安全与权限管理
权限模型设计
class PermissionScope(Enum):
"""权限范围"""
NONE = "none" # 无任何权限
READ = "read" # 只读
WRITE = "write" # 读写
EXECUTE = "execute" # 执行
ADMIN = "admin" # 管理
class Permission:
"""
权限定义
为什么需要权限系统?
1. 防止Agent执行危险操作
2. 限制资源访问范围
3. 满足合规审计要求
"""
def __init__(
self,
file_paths: list[str] = [], # 允许访问的文件路径
allowed_tools: list[str] = [], # 允许使用的工具
allowed_domains: list[str] = [], # 允许访问的网络域名
max_execution_time: int = 300, # 最大执行时间(秒)
max_api_calls: int = 100 # 最大API调用次数
):
self.file_paths = file_paths
self.allowed_tools = allowed_tools
self.allowed_domains = allowed_domains
self.max_execution_time = max_execution_time
self.max_api_calls = max_api_calls
class SecurityManager:
"""
安全管理器
核心功能:
1. 验证操作权限
2. 拦截危险操作
3. 记录所有操作日志
"""
def __init__(self, permission: Permission):
self.permission = permission
self.audit_log = []
def check_file_access(self, path: str, mode: str) -> bool:
"""检查文件访问权限"""
# 如果是只读权限但要求写操作
if mode == "write" and PermissionScope.WRITE not in self.permission.scopes:
return False
# 检查路径是否在允许范围内
import os
real_path = os.path.realpath(path)
for allowed_path in self.permission.file_paths:
if real_path.startswith(os.path.realpath(allowed_path)):
return True
return False
def check_tool_usage(self, tool_name: str) -> bool:
"""检查工具使用权限"""
return tool_name in self.permission.allowed_tools
def check_network_access(self, domain: str) -> bool:
"""检查网络访问权限"""
for allowed in self.permission.allowed_domains:
if domain.endswith(allowed) or domain == allowed:
return True
return False
def audit(self, operation: str, details: dict):
"""记录审计日志"""
self.audit_log.append({
"timestamp": now(),
"operation": operation,
"details": details
})沙箱隔离
class SandboxConfig:
"""
沙箱配置
为什么需要沙箱?
即使Agent出错,也只会影响沙箱内的模拟环境
不会影响真实系统
"""
def __init__(
self,
use_sandbox: bool = True,
network_isolation: bool = True,
filesystem_boundary: str = "/workspace/sandbox",
memory_limit: str = "2GB"
):
self.use_sandbox = use_sandbox
self.network_isolation = network_isolation
self.filesystem_boundary = filesystem_boundary
self.memory_limit = memory_limit
class SandboxExecutor:
"""沙箱执行器"""
def __init__(self, config: SandboxConfig):
self.config = config
async def execute(self, code: str, language: str) -> Any:
"""在沙箱中执行代码"""
if self.config.use_sandbox:
return await self._execute_in_sandbox(code, language)
else:
return await self._execute_direct(code, language)
async def _execute_in_sandbox(self, code: str, language: str) -> Any:
"""在沙箱中执行"""
# 1. 准备沙箱环境
sandbox = await self._prepare_sandbox()
# 2. 设置资源限制
sandbox.set_memory_limit(self.config.memory_limit)
sandbox.set_network隔离(self.config.network_isolation)
sandbox.set_filesystem_boundary(self.config.filesystem_boundary)
# 3. 执行代码
try:
result = await sandbox.run(code, language)
return result
finally:
# 4. 清理沙箱
await sandbox.cleanup()7.8 生产环境最佳实践
架构设计原则
# 生产环境Agent系统架构原则
PRINCIPLES = """
1. 分离关注点(Separation of Concerns)
- Agent核心逻辑与工具实现分离
- 状态管理与执行逻辑分离
- 安全检查与业务逻辑分离
2. 失败设计(Design for Failure)
- 每个组件都可能失败
- 优雅降级,而非整体崩溃
- 快速失败,便于诊断
3. 可观测性(Observability)
- 日志:记录每个关键步骤
- 指标:QPS、延迟、错误率
- 追踪:请求全链路追踪
4. 资源管理(4. 资源管理(Resource Management)
- 限制并发请求数
- 控制内存使用
- 防止资源泄漏
5. 安全第一(Security First)
- 最小权限原则
- 纵深防御
- 审计追踪
"""监控与告警
class AgentMonitor:
"""
Agent监控指标
"""
metrics = {
"requests_total": "总请求数",
"requests_success": "成功请求数",
"requests_failed": "失败请求数",
"average_latency": "平均延迟",
"p99_latency": "P99延迟",
"active_agents": "活跃Agent数",
"tools_usage": "工具使用统计",
"error_distribution": "错误分布"
}
class AlertManager:
"""
告警管理
"""
alert_rules = {
"high_error_rate": {
"condition": "error_rate > 0.05", # 错误率超过5%
"severity": "critical",
"action": "notify_oncall"
},
"high_latency": {
"condition": "p99_latency > 30s",
"severity": "warning",
"action": "notify_team"
},
"agent_timeout": {
"condition": "timeout_count > 10/min",
"severity": "warning",
"action": "investigate"
}
}部署架构建议
┌─────────────────────────────────────────────────────────────────┐
│ 负载均衡层 │
│ (Nginx / 云负载均衡) │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────────────▼───────────────────────────────────┐
│ API网关层 │
│ (认证、限流、日志、路由) │
└─────────────────────────────┬───────────────────────────────────┘
│
┌─────────────────────────────▼───────────────────────────────────┐
│ Agent服务集群 │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ ┌────────────┐│
│ │ Agent-1 │ │ Agent-2 │ │ Agent-3 │ │ Agent-N ││
│ │ (实例1) │ │ (实例2) │ │ (实例3) │ │ (实例N) ││
│ └────────────┘ └────────────┘ └────────────┘ └────────────┘│
└─────────────────────────────┬───────────────────────────────────┘
│
┌───────────────────┼───────────────────┐
│ │ │
┌─────────▼─────┐ ┌────────▼────────┐ ┌─────▼─────────┐
│ 工具服务1 │ │ 工具服务2 │ │ 工具服务3 │
│ (搜索服务) │ │ (代码执行) │ │ (API服务) │
└───────────────┘ └─────────────────┘ └───────────────┘性能优化建议
| 优化项 | 方法 | 效果 |
|---|---|---|
| 缓存LLM响应 | 对相同输入缓存响应 | 减少API调用 |
| 并行工具调用 | 不依赖结果的工具并行执行 | 降低延迟 |
| 状态压缩 | 对话历史压缩摘要 | 减少token消耗 |
| 预热机制 | 定期预加载模型 | 降低冷启动延迟 |
| 连接池复用 | 复用HTTP/数据库连接 | 提高吞吐 |
本章总结
核心知识点
| 知识点 | 掌握程度 | 关键点 |
|---|---|---|
| Agent定义 | ⭐⭐⭐⭐⭐ | LLM+状态+工具+执行循环+终止条件 |
| 单Agent架构 | ⭐⭐⭐⭐⭐ | 状态外部化、终止条件前置 |
| 多Agent协作 | ⭐⭐⭐⭐⭐ | 星型/链式/网状架构 |
| 执行模式 | ⭐⭐⭐⭐⭐ | 并行与链式混合 |
| 状态管理 | ⭐⭐⭐⭐⭐ | 压缩、摘要、智能选择 |
| 错误处理 | ⭐⭐⭐⭐⭐ | 分类处理、重试回退 |
| 安全机制 | ⭐⭐⭐⭐ | 权限模型、沙箱隔离 |
| 生产实践 | ⭐⭐⭐⭐⭐ | 监控、告警、优化 |
Claude API七篇完整系列
恭喜你!已完成Claude API全系列的学习:
| 篇 | 主题 | 核心要点 |
|---|---|---|
| 一 | API基础 | 认证、请求、会话、结构化输出 |
| 二 | 提示词工程 | Few-shot、CoT、Temperature |
| 三 | 工具调用 | Function Calling、MCP |
| 四 | RAG系统 | 分块、嵌入、搜索、重排序 |
| 五 | MCP协议 | 架构、服务器开发、客户端 |
| 六 | Computer Use | 观察-决策-执行、安全机制 |
| 七 | Agent架构 | 多Agent、状态管理、生产实践 |
下一步
- 实践项目:用Agent架构构建一个自动化助手
- 深入研究:MCP协议与Agent的结合
- 参考资料:Anthropic Agent文档
文档元信息 难度:⭐⭐⭐⭐ | 类型:专家设计 | 更新日期:2026-03-25 | 预计阅读时间:60分钟 | 字数:约8000字