A2A 服务器
A2A 服务器使您能够通过标准化的 A2A(Agent-to-Agent)协议公开 AI 代理。它提供了 A2A 协议规范 的完整实现,负责处理客户端请求、执行代理逻辑、管理复杂的任务生命周期,并支持实时流式响应。
依赖项
要在您的项目中配置 A2A 服务器,请将以下依赖项添加到您的 build.gradle.kts 文件中:
dependencies {
// 核心 A2A 服务器库
implementation("ai.koog:a2a-server:$koogVersion")
// HTTP JSON-RPC 传输(最常用)
implementation("ai.koog:a2a-transport-server-jsonrpc-http:$koogVersion")
// Ktor 服务器引擎(选择适合您需求的)
implementation("io.ktor:ktor-server-netty:$ktorVersion")
}概述
A2A 服务器充当 A2A 协议传输层与您的自定义代理逻辑之间的桥梁。它编排整个请求生命周期,同时保持协议合规性并提供健壮的会话管理。
核心组件
A2AServer
实现完整 A2A 协议的主要服务器类。它充当中心协调器,负责:
- 验证 传入请求是否符合协议规范
- 管理 并发会话和任务生命周期
- 编排 传输层、存储层和业务逻辑层之间的通信
- 处理 所有协议操作:消息发送、任务查询、取消和推送通知
A2AServer 接受两个必填形参:
AgentExecutor,它定义了代理的业务逻辑实现AgentCard,它定义了代理能力和元数据
以及一些可选形参,可用于自定义其存储和传输行为。
AgentExecutor
AgentExecutor 接口是您实现代理核心业务逻辑的地方。它充当 A2A 协议与您特定的 AI 代理能力之间的桥梁。要启动代理的执行,您必须实现 execute 方法来定义代理的逻辑。要取消代理,您必须实现 cancel 方法。
class MyAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
// 在此处编写代理逻辑
}
override suspend fun cancel(
context: RequestContext<TaskIdParams>,
eventProcessor: SessionEventProcessor,
agentJob: Deferred<Unit>?
) {
// 在此处取消代理,可选
}
}RequestContext 提供有关当前请求的丰富信息,包括当前会话的 contextId 和 taskId、发送的 message 以及请求的 params。
SessionEventProcessor 与客户端通信:
sendMessage(message):发送即时响应(聊天式交互)sendTaskEvent(event):发送任务相关的更新(长时间运行的操作)
// 用于即时响应(例如聊天机器人)
eventProcessor.sendMessage(
Message(
messageId = generateId(),
role = Role.Agent,
parts = listOf(TextPart("Here's your answer!")),
contextId = context.contextId
)
)
// 用于基于任务的操作
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Working,
message = Message(/* 进度更新 */),
timestamp = Clock.System.now()
),
final = false // 更多更新将到来
)
)AgentCard
AgentCard 作为代理的自描述清单。它告知客户端代理的能力、如何与其通信以及具有哪些安全要求。
val agentCard = AgentCard(
// 基本身份
name = "Advanced Recipe Assistant",
description = "AI agent specialized in cooking advice, recipe generation, and meal planning",
version = "2.1.0",
protocolVersion = "0.3.0",
// 通信设置
url = "https://api.example.com/a2a",
preferredTransport = TransportProtocol.JSONRPC,
// 可选:支持多种传输
additionalInterfaces = listOf(
AgentInterface("https://api.example.com/a2a", TransportProtocol.JSONRPC),
),
// 能力声明
capabilities = AgentCapabilities(
streaming = true, // 支持实时响应
pushNotifications = true, // 发送异步通知
stateTransitionHistory = true // 维护任务历史记录
),
// 内容类型支持
defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),
// 定义可用的安全方案
securitySchemes = mapOf(
"bearer" to HTTPAuthSecurityScheme(
scheme = "Bearer",
bearerFormat = "JWT",
description = "JWT token authentication"
),
"api-key" to APIKeySecurityScheme(
`in` = In.Header,
name = "X-API-Key",
description = "API key for service authentication"
)
),
// 指定安全要求(要求的逻辑或)
security = listOf(
mapOf("bearer" to listOf("read", "write")), // 选项 1:具有读/写范围的 JWT
mapOf("api-key" to emptyList()) // 选项 2:API 密钥
),
// 为经过身份验证的用户启用扩展卡
supportsAuthenticatedExtendedCard = true,
// 技能/能力
skills = listOf(
AgentSkill(
id = "recipe-generation",
name = "Recipe Generation",
description = "Generate custom recipes based on ingredients, dietary restrictions, and preferences",
tags = listOf("cooking", "recipes", "nutrition"),
examples = listOf(
"Create a vegan pasta recipe with mushrooms",
"I have chicken, rice, and vegetables. What can I make?"
)
),
AgentSkill(
id = "meal-planning",
name = "Meal Planning",
description = "Plan weekly meals and generate shopping lists",
tags = listOf("meal-planning", "nutrition", "shopping")
)
),
// 可选:品牌信息
iconUrl = "https://example.com/agent-icon.png",
documentationUrl = "https://docs.example.com/recipe-agent",
provider = AgentProvider(
organization = "CookingAI Inc.",
url = "https://cookingai.com"
)
)传输层
A2A 本身支持多种传输协议用于与客户端通信。目前,Koog 提供了基于 HTTP 的 JSON-RPC 服务器传输实现。
HTTP JSON-RPC 传输
val transport = HttpJSONRPCServerTransport(server)
transport.start(
engineFactory = CIO, // Ktor 引擎 (CIO, Netty, Jetty)
port = 8080, // 服务器端口
path = "/a2a", // API 端点路径
wait = true // 阻塞直到服务器停止
)存储
A2A 服务器采用可插拔存储架构,分离不同类型的数据。所有存储实现都是可选的,并在开发时默认为内存变体。
- TaskStorage:任务生命周期管理 - 存储和管理任务状态、历史记录和构件
- MessageStorage:对话历史记录 - 管理对话上下文中的消息历史记录
- PushNotificationConfigStorage:Webhook 管理 - 管理用于异步通知的 webhook 配置
快速开始
1. 创建 AgentCard
定义代理的能力和元数据。
val agentCard = AgentCard(
name = "IO Assistant",
description = "AI agent specialized in input modification",
version = "2.1.0",
protocolVersion = "0.3.0",
// 通信设置
url = "https://api.example.com/a2a",
preferredTransport = TransportProtocol.JSONRPC,
// 能力声明
capabilities =
AgentCapabilities(
streaming = true, // 支持实时响应
pushNotifications = true, // 发送异步通知
stateTransitionHistory = true // 维护任务历史记录
),
// 内容类型支持
defaultInputModes = listOf("text/plain", "text/markdown", "image/jpeg"),
defaultOutputModes = listOf("text/plain", "text/markdown", "application/json"),
// 技能/能力
skills = listOf(
AgentSkill(
id = "echo",
name = "echo",
description = "Echoes back user messages",
tags = listOf("io"),
)
)
)2. 创建一个 AgentExecutor
执行器管理代理逻辑,处理传入请求并发送响应。
class EchoAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val userMessage = context.params.message
val userText = userMessage.parts
.filterIsInstance<TextPart>()
.joinToString(" ") { it.text }
// 回显用户的消息
val response = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("You said: $userText")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(response)
}
}2. 创建服务器
将代理执行器和代理卡传递给服务器。
val server = A2AServer(
agentExecutor = EchoAgentExecutor(),
agentCard = agentCard
)3. 添加传输层
创建一个传输层并启动服务器。
// HTTP JSON-RPC 传输
val transport = HttpJSONRPCServerTransport(server)
transport.start(
engineFactory = CIO,
port = 8080,
path = "/agent",
wait = true
)代理实现模式
简单响应代理
如果您的代理只需要响应单个消息,您可以将其实现为简单代理。如果代理执行逻辑不复杂且不耗时,也可以使用此模式。
class SimpleAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
val response = Message(
messageId = UUID.randomUUID().toString(),
role = Role.Agent,
parts = listOf(TextPart("Hello from agent!")),
contextId = context.contextId,
taskId = context.taskId
)
eventProcessor.sendMessage(response)
}
}基于任务的代理
如果代理的执行逻辑复杂且需要多个步骤,您可以将其实现为基于任务的代理。如果代理执行逻辑耗时且需要挂起,也可以使用此模式。
class TaskAgentExecutor : AgentExecutor {
override suspend fun execute(
context: RequestContext<MessageSendParams>,
eventProcessor: SessionEventProcessor
) {
// 发送工作状态
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Working,
timestamp = Clock.System.now()
),
final = false
)
)
// 执行工作...
// 发送完成状态
eventProcessor.sendTaskEvent(
TaskStatusUpdateEvent(
contextId = context.contextId,
taskId = context.taskId,
status = TaskStatus(
state = TaskState.Completed,
timestamp = Clock.System.now()
),
final = true
)
)
}
}