Google A2A Protocol
GuΓa del protocolo Agent-to-Agent (A2A) de Google para comunicaciΓ³n estandarizada entre agentes de IA. Cubre la especificaciΓ³n del protocolo, Agent Cards, gestiΓ³n de tareas, streaming y autenticaciΓ³n.
Protocol Overview
A2A define un estΓ‘ndar abierto para que agentes de IA se descubran, comuniquen y colaboren independientemente de su framework o proveedor. Se basa en HTTP/JSON-RPC.
Flujo de comunicaciΓ³n A2A
ββββββββββββββββ ββββββββββββββββ
β Agent A β β Agent B β
β (client) β β (server) β
β β β β
β 1. Discover β GET /.well-known/agent.jsonβ β
β βββββββββββββΌββββββββββββββββββββββββββββββΆβ β
β ββββββββββββββββββββββββββββββββ€ Agent Card β
β β 200 OK β β
β β β β
β 2. Send Taskβ POST / (JSON-RPC) β β
β βββββββββββββΌββββββββββββββββββββββββββββββΆβ β
β β method: "tasks/send" β 3. Process β
β β β βββββββββββββ
β β β Execute taskβ
β ββββββββββββββββββββββββββββββββ€ β
β 4. Result β JSON-RPC Response β β
β β (Task with artifacts) β β
ββββββββββββββββ ββββββββββββββββ | Concepto | DescripciΓ³n |
|---|---|
| Agent Card | Documento JSON que describe las capacidades del agente, sus skills y cΓ³mo autenticarse |
| Task | Unidad de trabajo enviada entre agentes con un ciclo de vida definido |
| Message | ComunicaciΓ³n dentro de una task con uno o mΓ‘s Parts |
| Part | Contenido atΓ³mico: TextPart, FilePart o DataPart |
| Artifact | Resultado producido por el agente al completar una task |
Agent Card
La Agent Card es un documento JSON servido en /.well-known/agent.json que describe las capacidades del agente. Es el mecanismo de descubrimiento del protocolo.
{
"name": "CodeReview Agent",
"description": "Agente que revisa cΓ³digo, sugiere mejoras y detecta bugs",
"url": "https://agent.example.com/",
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": false
},
"skills": [
{
"id": "code-review",
"name": "Code Review",
"description": "Analiza cΓ³digo fuente y genera sugerencias",
"tags": ["python", "javascript", "review"],
"examples": [
"Revisa este pull request para bugs de seguridad",
"Sugiere mejoras de rendimiento en este cΓ³digo"
]
},
{
"id": "bug-detection",
"name": "Bug Detection",
"description": "Detecta bugs potenciales en el cΓ³digo",
"tags": ["bugs", "testing"]
}
],
"authentication": {
"schemes": ["Bearer"],
"credentials": "OAuth2 token required"
}
} Task Management
Las tareas tienen un ciclo de vida definido con estados claros. Se envΓan via JSON-RPC y pueden ser sΓncronas o asΓncronas.
Ciclo de vida de una Task
βββββββββββββ ββββββββββββ βββββββββββββ
β submitted ββββββΆβ working ββββββΆβ completed β
βββββββββββββ ββββββ¬ββββββ βββββββββββββ
β
ββββββΌβββββββ
β failed β
βββββββββββββ
β
ββββββΌβββββββββββββ
β input-required β (necesita mΓ‘s info del client)
βββββββββββββββββββ Enviar una task (client)
import httpx
# ββ Enviar task al agente A2A ββ
request = {
"jsonrpc": "2.0",
"id": "req-001",
"method": "tasks/send",
"params": {
"id": "task-123",
"message": {
"role": "user",
"parts": [
{
"type": "text",
"text": "Revisa este cΓ³digo Python para bugs de seguridad"
},
{
"type": "file",
"file": {
"name": "auth.py",
"mimeType": "text/x-python",
"bytes": "base64-encoded-content..."
}
}
]
}
}
}
response = httpx.post("https://agent.example.com/", json=request)
result = response.json() Server Implementation
Implementar un servidor A2A con el SDK oficial de Google. El servidor registra handlers para cada mΓ©todo del protocolo.
from a2a.server import A2AServer, TaskHandler
from a2a.types import (
AgentCard, Skill, Task, TaskState,
Message, TextPart, Artifact,
)
# ββ Agent Card ββ
card = AgentCard(
name="CodeReview Agent",
description="Revisa cΓ³digo y detecta bugs",
url="https://agent.example.com/",
version="1.0.0",
skills=[
Skill(
id="code-review",
name="Code Review",
description="Analiza cΓ³digo fuente",
)
],
)
# ββ Task Handler ββ
class ReviewHandler(TaskHandler):
async def on_task_send(self, task: Task) -> Task:
# Extraer cΓ³digo del mensaje
user_message = task.messages[-1]
code = self.extract_code(user_message)
# Ejecutar review con LLM
review = await self.run_review(code)
# Devolver resultado como artifact
task.status = TaskState.COMPLETED
task.artifacts = [
Artifact(
name="review-result",
parts=[TextPart(text=review)],
)
]
return task
# ββ Iniciar servidor ββ
server = A2AServer(agent_card=card, handler=ReviewHandler())
server.run(host="0.0.0.0", port=8080) Streaming
Para tareas de larga duraciΓ³n, A2A soporta Server-Sent Events (SSE) que permiten al client recibir actualizaciones en tiempo real.
# Client: Recibir streaming via SSE
import httpx
request = {
"jsonrpc": "2.0",
"id": "req-002",
"method": "tasks/sendSubscribe", # SSE endpoint
"params": {
"id": "task-456",
"message": {
"role": "user",
"parts": [{"type": "text", "text": "Analiza este repo completo"}]
}
}
}
# Stream de eventos
with httpx.stream("POST", "https://agent.example.com/", json=request) as r:
for line in r.iter_lines():
if line.startswith("data:"):
event = json.loads(line[5:])
print(f"Status: {event['result']['status']}")
# submitted β working β completed Multi-agent orchestration
Un agente orquestador puede descubrir otros agentes vΓa Agent Cards, enviar sub-tareas a agentes especializados (code review, testing, deployment), y combinar los resultados. A2A estandariza esta comunicaciΓ³n.
Security & Discovery
En entornos de producciΓ³n, los agentes A2A necesitan autenticaciΓ³n, descubrimiento dinΓ‘mico y comunicaciΓ³n cifrada. Sin estas medidas, cualquier servicio podria enviar tareas maliciosas a tus agentes o interceptar datos sensibles en trΓ‘nsito.
Flujo seguro de comunicaciΓ³n A2A
ββββββββββββββββ ββββββββββββββββ
β Agent A β β Agent B β
β (client) β β (server) β
β β β β
β 1. Discover β DNS SRV: _a2a._tcp.example.com β β
β ββββββββββββββΌβββββββββββββββββββββββββββββββββββΆβ β
β βββββββββββββββββββββββββββββββββββββ€ Agent Card β
β β β β
β 2. Get Token β POST /oauth2/token β β
β ββββββββββββββΌβββΆ Identity Provider βββΆ JWT β β
β β β β
β 3. Send Task β POST / (JSON-RPC) β β
β ββββββββββββββΌβββββββββββββββββββββββββββββββββββΆβ β
β β Authorization: Bearer <JWT> β 4. Validate β
β β mTLS client cert β JWT + β
β β β cert β
β βββββββββββββββββββββββββββββββββββββ€ β
β 5. Result β Encrypted response (TLS 1.3) β β
ββββββββββββββββ ββββββββββββββββ AutenticaciΓ³n OAuth2 para agentes A2A
El esquema mas robusto para A2A es OAuth2 con client credentials flow: cada agente obtiene un JWT del Identity Provider y lo envΓa en cada request. El agente servidor valida el token antes de procesar la tarea.
# a2a_client/auth.py
import httpx
import time
from dataclasses import dataclass, field
@dataclass
class OAuth2AgentAuth:
"""AutenticaciΓ³n OAuth2 para clientes A2A."""
token_url: str # URL del Identity Provider
client_id: str # ID del agente cliente
client_secret: str # Secret del agente cliente
scopes: list[str] = field(default_factory=lambda: ["a2a:tasks:send"])
_token: str | None = field(default=None, repr=False)
_expires_at: float = field(default=0.0, repr=False)
async def get_token(self) -> str:
"""Obtener JWT vΓ‘lido, renovando si ha expirado."""
if self._token and time.time() < self._expires_at - 30:
return self._token
async with httpx.AsyncClient() as client:
response = await client.post(
self.token_url,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": " ".join(self.scopes),
},
)
response.raise_for_status()
data = response.json()
self._token = data["access_token"]
self._expires_at = time.time() + data.get("expires_in", 3600)
return self._token
async def auth_headers(self) -> dict:
"""Headers de autenticaciΓ³n para requests A2A."""
token = await self.get_token()
return {"Authorization": f"Bearer {token}"} Descubrimiento de agentes via DNS y registry
En lugar de hardcodear URLs de agentes, usa DNS SRV records para descubrimiento automΓ‘tico o un registry centralizado para entornos mas complejos.
# a2a_client/discovery.py
import dns.asyncresolver
import httpx
from dataclasses import dataclass
@dataclass
class AgentDiscovery:
"""Descubrimiento de agentes A2A via DNS SRV y registry."""
# ββ Descubrimiento via DNS SRV ββ
@staticmethod
async def discover_via_dns(service_name: str, domain: str) -> list[dict]:
"""Descubrir agentes A2A via DNS SRV records.
Ejemplo DNS record:
_a2a._tcp.code-review.agents.example.com. SRV 10 0 8080 agent1.example.com.
"""
agents = []
try:
answers = await dns.asyncresolver.resolve(
f"_a2a._tcp.{service_name}.{domain}", "SRV"
)
for rdata in answers:
agent_url = f"https://{rdata.target}:{rdata.port}"
# Obtener Agent Card para validar capacidades
async with httpx.AsyncClient() as client:
card_resp = await client.get(
f"{agent_url}/.well-known/agent.json",
timeout=5.0,
)
if card_resp.status_code == 200:
agents.append({
"url": agent_url,
"priority": rdata.priority,
"weight": rdata.weight,
"card": card_resp.json(),
})
except dns.asyncresolver.NXDOMAIN:
pass # No hay agentes registrados para este servicio
return sorted(agents, key=lambda a: a["priority"])
# ββ Descubrimiento via Registry centralizado ββ
@staticmethod
async def discover_via_registry(
registry_url: str,
capability: str,
auth_headers: dict | None = None,
) -> list[dict]:
"""Descubrir agentes en un registry centralizado.
El registry es un servicio HTTP que indexa Agent Cards.
"""
async with httpx.AsyncClient() as client:
response = await client.get(
f"{registry_url}/agents",
params={"capability": capability, "status": "healthy"},
headers=auth_headers or {},
timeout=10.0,
)
response.raise_for_status()
return response.json()["agents"]
# ββ Uso ββ
async def find_code_review_agent():
# OpciΓ³n 1: DNS
agents = await AgentDiscovery.discover_via_dns(
"code-review", "agents.example.com"
)
# OpciΓ³n 2: Registry
agents = await AgentDiscovery.discover_via_registry(
"https://registry.agents.example.com",
capability="code-review",
)
return agents[0] if agents else None Rate limiting entre agentes
Para evitar que un agente cliente sature a un agente servidor, implementa rate limiting tanto en el lado cliente (para ser buen ciudadano) como en el servidor (para protegerse).
# a2a_server/rate_limiter.py
import time
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class TokenBucketLimiter:
"""Rate limiter por agente cliente usando token bucket."""
max_tokens: int = 10 # Requests mΓ‘ximos en rΓ‘faga
refill_rate: float = 2.0 # Tokens por segundo
buckets: dict = field(default_factory=lambda: defaultdict(lambda: {
"tokens": 10, "last_refill": time.time()
}))
def allow_request(self, client_id: str) -> bool:
"""Verificar si el cliente puede enviar un request."""
bucket = self.buckets[client_id]
now = time.time()
# Rellenar tokens basado en tiempo transcurrido
elapsed = now - bucket["last_refill"]
bucket["tokens"] = min(
self.max_tokens,
bucket["tokens"] + elapsed * self.refill_rate,
)
bucket["last_refill"] = now
if bucket["tokens"] >= 1:
bucket["tokens"] -= 1
return True
return False
# ββ Middleware de rate limiting para el servidor A2A ββ
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
limiter = TokenBucketLimiter(max_tokens=20, refill_rate=5.0)
class RateLimitMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
# Identificar cliente por JWT subject o IP
client_id = request.headers.get("X-Agent-ID", request.client.host)
if not limiter.allow_request(client_id):
return JSONResponse(
status_code=429,
content={
"jsonrpc": "2.0",
"error": {
"code": -32000,
"message": "Rate limit exceeded. Retry after 1 second.",
},
},
headers={"Retry-After": "1"},
)
return await call_next(request) Cliente A2A autenticado y seguro
Ejemplo completo de un cliente A2A que integra autenticaciΓ³n OAuth2, descubrimiento automΓ‘tico y rate limiting del lado cliente.
# a2a_client/secure_client.py
import httpx
import asyncio
from a2a_client.auth import OAuth2AgentAuth
from a2a_client.discovery import AgentDiscovery
class SecureA2AClient:
"""Cliente A2A con autenticaciΓ³n, descubrimiento y retry."""
def __init__(
self,
auth: OAuth2AgentAuth,
registry_url: str = "https://registry.agents.example.com",
max_retries: int = 3,
verify_ssl: str | bool = True, # Path a CA bundle o True
):
self.auth = auth
self.registry_url = registry_url
self.max_retries = max_retries
self.verify_ssl = verify_ssl
async def send_task(
self,
capability: str,
task_id: str,
message_text: str,
) -> dict:
"""Descubrir agente, autenticar y enviar tarea."""
# 1. Descubrir agente con la capacidad requerida
auth_headers = await self.auth.auth_headers()
agents = await AgentDiscovery.discover_via_registry(
self.registry_url, capability, auth_headers
)
if not agents:
raise RuntimeError(f"No se encontraron agentes para: {capability}")
agent_url = agents[0]["url"]
# 2. Construir request A2A
request = {
"jsonrpc": "2.0",
"id": f"req-{task_id}",
"method": "tasks/send",
"params": {
"id": task_id,
"message": {
"role": "user",
"parts": [{"type": "text", "text": message_text}],
},
},
}
# 3. Enviar con retry y autenticaciΓ³n
for attempt in range(self.max_retries):
try:
async with httpx.AsyncClient(verify=self.verify_ssl) as client:
response = await client.post(
agent_url,
json=request,
headers=auth_headers,
timeout=30.0,
)
if response.status_code == 401:
# Token expirado β renovar y reintentar
self.auth._token = None
auth_headers = await self.auth.auth_headers()
continue
if response.status_code == 429:
# Rate limited β esperar y reintentar
retry_after = int(response.headers.get("Retry-After", "2"))
await asyncio.sleep(retry_after)
continue
response.raise_for_status()
return response.json()
except httpx.ConnectError:
if attempt < self.max_retries - 1:
await asyncio.sleep(2 ** attempt)
continue
raise
raise RuntimeError(f"FallΓ³ tras {self.max_retries} intentos")
# ββ Uso completo ββ
async def main():
auth = OAuth2AgentAuth(
token_url="https://auth.example.com/oauth2/token",
client_id="orchestrator-agent",
client_secret="secret-from-vault",
scopes=["a2a:tasks:send", "a2a:tasks:read"],
)
client = SecureA2AClient(
auth=auth,
registry_url="https://registry.agents.example.com",
verify_ssl="/etc/ssl/certs/ca-bundle.crt",
)
result = await client.send_task(
capability="code-review",
task_id="task-789",
message_text="Revisa este mΓ³dulo de autenticaciΓ³n para vulnerabilidades SQL injection",
)
print(f"Task result: {result}")
asyncio.run(main()) | Capa de seguridad | Mecanismo | Protege contra |
|---|---|---|
| AutenticaciΓ³n | OAuth2 client credentials + JWT | Agentes no autorizados enviando tareas |
| Cifrado en trΓ‘nsito | mTLS (TLS 1.3 con certificados mutuos) | InterceptaciΓ³n de datos, man-in-the-middle |
| Descubrimiento | DNS SRV + Agent Card validation | Agentes falsos o endpoints incorrectos |
| Rate limiting | Token bucket por agente cliente | DenegaciΓ³n de servicio, saturaciΓ³n |
| AutorizaciΓ³n | OAuth2 scopes por skill del agente | Acceso a capacidades no permitidas |
mTLS en producciΓ³n
Para comunicaciΓ³n entre agentes en entornos empresariales, mTLS es imprescindible. Cada agente tiene su propio certificado cliente emitido por una CA interna. En Kubernetes, usa cert-manager para automatizar la emisiΓ³n y rotaciΓ³n de certificados. Sin mTLS, cualquier servicio en la red podrΓa hacerse pasar por un agente legΓtimo.