Docs / Agentic AI / Google A2A

Google A2A Protocol

GuΓ­a del protocolo Agent-to-Agent (A2A) de Google para comunicaciΓ³n estandarizada entre agentes de IA. Cubre la especificaciΓ³n del protocolo, Agent Cards, gestiΓ³n de tareas, streaming y autenticaciΓ³n.

Protocol Overview

A2A define un estΓ‘ndar abierto para que agentes de IA se descubran, comuniquen y colaboren independientemente de su framework o proveedor. Se basa en HTTP/JSON-RPC.

text
Flujo de comunicaciΓ³n A2A

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Agent A     β”‚                              β”‚  Agent B     β”‚
β”‚  (client)    β”‚                              β”‚  (server)    β”‚
β”‚              β”‚                              β”‚              β”‚
β”‚  1. Discover β”‚   GET /.well-known/agent.jsonβ”‚              β”‚
β”‚  ────────────┼─────────────────────────────▢│              β”‚
β”‚              │◀──────────────────────────────  Agent Card  β”‚
β”‚              β”‚            200 OK            β”‚              β”‚
β”‚              β”‚                              β”‚              β”‚
β”‚  2. Send Taskβ”‚   POST / (JSON-RPC)          β”‚              β”‚
β”‚  ────────────┼─────────────────────────────▢│              β”‚
β”‚              β”‚   method: "tasks/send"       β”‚  3. Process  β”‚
β”‚              β”‚                              β”‚  ────────────│
β”‚              β”‚                              β”‚  Execute taskβ”‚
β”‚              │◀──────────────────────────────              β”‚
β”‚  4. Result   β”‚   JSON-RPC Response          β”‚              β”‚
β”‚              β”‚   (Task with artifacts)      β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                              β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
ConceptoDescripciΓ³n
Agent CardDocumento JSON que describe las capacidades del agente, sus skills y cΓ³mo autenticarse
TaskUnidad de trabajo enviada entre agentes con un ciclo de vida definido
MessageComunicaciΓ³n dentro de una task con uno o mΓ‘s Parts
PartContenido atΓ³mico: TextPart, FilePart o DataPart
ArtifactResultado producido por el agente al completar una task

Agent Card

La Agent Card es un documento JSON servido en /.well-known/agent.json que describe las capacidades del agente. Es el mecanismo de descubrimiento del protocolo.

json
{
  "name": "CodeReview Agent",
  "description": "Agente que revisa cΓ³digo, sugiere mejoras y detecta bugs",
  "url": "https://agent.example.com/",
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": false
  },
  "skills": [
    {
      "id": "code-review",
      "name": "Code Review",
      "description": "Analiza cΓ³digo fuente y genera sugerencias",
      "tags": ["python", "javascript", "review"],
      "examples": [
        "Revisa este pull request para bugs de seguridad",
        "Sugiere mejoras de rendimiento en este cΓ³digo"
      ]
    },
    {
      "id": "bug-detection",
      "name": "Bug Detection",
      "description": "Detecta bugs potenciales en el cΓ³digo",
      "tags": ["bugs", "testing"]
    }
  ],
  "authentication": {
    "schemes": ["Bearer"],
    "credentials": "OAuth2 token required"
  }
}

Task Management

Las tareas tienen un ciclo de vida definido con estados claros. Se envΓ­an via JSON-RPC y pueden ser sΓ­ncronas o asΓ­ncronas.

text
Ciclo de vida de una Task

 β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”     β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
 β”‚ submitted │────▢│ working  │────▢│ completed β”‚
 β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”˜     β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                   β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”
                   β”‚  failed   β”‚
                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                        β”‚
                   β”Œβ”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                   β”‚ input-required  β”‚  (necesita mΓ‘s info del client)
                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Enviar una task (client)

python
import httpx

# ── Enviar task al agente A2A ──
request = {
    "jsonrpc": "2.0",
    "id": "req-001",
    "method": "tasks/send",
    "params": {
        "id": "task-123",
        "message": {
            "role": "user",
            "parts": [
                {
                    "type": "text",
                    "text": "Revisa este cΓ³digo Python para bugs de seguridad"
                },
                {
                    "type": "file",
                    "file": {
                        "name": "auth.py",
                        "mimeType": "text/x-python",
                        "bytes": "base64-encoded-content..."
                    }
                }
            ]
        }
    }
}

response = httpx.post("https://agent.example.com/", json=request)
result = response.json()

Server Implementation

Implementar un servidor A2A con el SDK oficial de Google. El servidor registra handlers para cada mΓ©todo del protocolo.

python
from a2a.server import A2AServer, TaskHandler
from a2a.types import (
    AgentCard, Skill, Task, TaskState,
    Message, TextPart, Artifact,
)

# ── Agent Card ──
card = AgentCard(
    name="CodeReview Agent",
    description="Revisa cΓ³digo y detecta bugs",
    url="https://agent.example.com/",
    version="1.0.0",
    skills=[
        Skill(
            id="code-review",
            name="Code Review",
            description="Analiza cΓ³digo fuente",
        )
    ],
)

# ── Task Handler ──
class ReviewHandler(TaskHandler):
    async def on_task_send(self, task: Task) -> Task:
        # Extraer cΓ³digo del mensaje
        user_message = task.messages[-1]
        code = self.extract_code(user_message)

        # Ejecutar review con LLM
        review = await self.run_review(code)

        # Devolver resultado como artifact
        task.status = TaskState.COMPLETED
        task.artifacts = [
            Artifact(
                name="review-result",
                parts=[TextPart(text=review)],
            )
        ]
        return task

# ── Iniciar servidor ──
server = A2AServer(agent_card=card, handler=ReviewHandler())
server.run(host="0.0.0.0", port=8080)

Streaming

Para tareas de larga duraciΓ³n, A2A soporta Server-Sent Events (SSE) que permiten al client recibir actualizaciones en tiempo real.

python
# Client: Recibir streaming via SSE
import httpx

request = {
    "jsonrpc": "2.0",
    "id": "req-002",
    "method": "tasks/sendSubscribe",  # SSE endpoint
    "params": {
        "id": "task-456",
        "message": {
            "role": "user",
            "parts": [{"type": "text", "text": "Analiza este repo completo"}]
        }
    }
}

# Stream de eventos
with httpx.stream("POST", "https://agent.example.com/", json=request) as r:
    for line in r.iter_lines():
        if line.startswith("data:"):
            event = json.loads(line[5:])
            print(f"Status: {event['result']['status']}")
            # submitted β†’ working β†’ completed

Multi-agent orchestration

Un agente orquestador puede descubrir otros agentes vΓ­a Agent Cards, enviar sub-tareas a agentes especializados (code review, testing, deployment), y combinar los resultados. A2A estandariza esta comunicaciΓ³n.

Security & Discovery

En entornos de producciΓ³n, los agentes A2A necesitan autenticaciΓ³n, descubrimiento dinΓ‘mico y comunicaciΓ³n cifrada. Sin estas medidas, cualquier servicio podria enviar tareas maliciosas a tus agentes o interceptar datos sensibles en trΓ‘nsito.

text
Flujo seguro de comunicaciΓ³n A2A

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”                                   β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  Agent A     β”‚                                   β”‚  Agent B     β”‚
β”‚  (client)    β”‚                                   β”‚  (server)    β”‚
β”‚              β”‚                                   β”‚              β”‚
β”‚ 1. Discover  β”‚  DNS SRV: _a2a._tcp.example.com   β”‚              β”‚
β”‚ ─────────────┼──────────────────────────────────▢│              β”‚
β”‚              │◀───────────────────────────────────  Agent Card  β”‚
β”‚              β”‚                                   β”‚              β”‚
β”‚ 2. Get Token β”‚  POST /oauth2/token               β”‚              β”‚
β”‚ ─────────────┼──▢ Identity Provider ──▢ JWT      β”‚              β”‚
β”‚              β”‚                                   β”‚              β”‚
β”‚ 3. Send Task β”‚  POST / (JSON-RPC)                β”‚              β”‚
β”‚ ─────────────┼──────────────────────────────────▢│              β”‚
β”‚              β”‚  Authorization: Bearer <JWT>       β”‚ 4. Validate  β”‚
β”‚              β”‚  mTLS client cert                  β”‚    JWT +     β”‚
β”‚              β”‚                                   β”‚    cert      β”‚
β”‚              │◀───────────────────────────────────              β”‚
β”‚ 5. Result    β”‚  Encrypted response (TLS 1.3)     β”‚              β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜                                   β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

AutenticaciΓ³n OAuth2 para agentes A2A

El esquema mas robusto para A2A es OAuth2 con client credentials flow: cada agente obtiene un JWT del Identity Provider y lo envΓ­a en cada request. El agente servidor valida el token antes de procesar la tarea.

python
# a2a_client/auth.py
import httpx
import time
from dataclasses import dataclass, field

@dataclass
class OAuth2AgentAuth:
    """AutenticaciΓ³n OAuth2 para clientes A2A."""
    token_url: str              # URL del Identity Provider
    client_id: str              # ID del agente cliente
    client_secret: str          # Secret del agente cliente
    scopes: list[str] = field(default_factory=lambda: ["a2a:tasks:send"])

    _token: str | None = field(default=None, repr=False)
    _expires_at: float = field(default=0.0, repr=False)

    async def get_token(self) -> str:
        """Obtener JWT vΓ‘lido, renovando si ha expirado."""
        if self._token and time.time() < self._expires_at - 30:
            return self._token

        async with httpx.AsyncClient() as client:
            response = await client.post(
                self.token_url,
                data={
                    "grant_type": "client_credentials",
                    "client_id": self.client_id,
                    "client_secret": self.client_secret,
                    "scope": " ".join(self.scopes),
                },
            )
            response.raise_for_status()
            data = response.json()

        self._token = data["access_token"]
        self._expires_at = time.time() + data.get("expires_in", 3600)
        return self._token

    async def auth_headers(self) -> dict:
        """Headers de autenticaciΓ³n para requests A2A."""
        token = await self.get_token()
        return {"Authorization": f"Bearer {token}"}

Descubrimiento de agentes via DNS y registry

En lugar de hardcodear URLs de agentes, usa DNS SRV records para descubrimiento automΓ‘tico o un registry centralizado para entornos mas complejos.

python
# a2a_client/discovery.py
import dns.asyncresolver
import httpx
from dataclasses import dataclass

@dataclass
class AgentDiscovery:
    """Descubrimiento de agentes A2A via DNS SRV y registry."""

    # ── Descubrimiento via DNS SRV ──
    @staticmethod
    async def discover_via_dns(service_name: str, domain: str) -> list[dict]:
        """Descubrir agentes A2A via DNS SRV records.

        Ejemplo DNS record:
          _a2a._tcp.code-review.agents.example.com. SRV 10 0 8080 agent1.example.com.
        """
        agents = []
        try:
            answers = await dns.asyncresolver.resolve(
                f"_a2a._tcp.{service_name}.{domain}", "SRV"
            )
            for rdata in answers:
                agent_url = f"https://{rdata.target}:{rdata.port}"
                # Obtener Agent Card para validar capacidades
                async with httpx.AsyncClient() as client:
                    card_resp = await client.get(
                        f"{agent_url}/.well-known/agent.json",
                        timeout=5.0,
                    )
                    if card_resp.status_code == 200:
                        agents.append({
                            "url": agent_url,
                            "priority": rdata.priority,
                            "weight": rdata.weight,
                            "card": card_resp.json(),
                        })
        except dns.asyncresolver.NXDOMAIN:
            pass  # No hay agentes registrados para este servicio
        return sorted(agents, key=lambda a: a["priority"])

    # ── Descubrimiento via Registry centralizado ──
    @staticmethod
    async def discover_via_registry(
        registry_url: str,
        capability: str,
        auth_headers: dict | None = None,
    ) -> list[dict]:
        """Descubrir agentes en un registry centralizado.

        El registry es un servicio HTTP que indexa Agent Cards.
        """
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{registry_url}/agents",
                params={"capability": capability, "status": "healthy"},
                headers=auth_headers or {},
                timeout=10.0,
            )
            response.raise_for_status()
            return response.json()["agents"]

# ── Uso ──
async def find_code_review_agent():
    # OpciΓ³n 1: DNS
    agents = await AgentDiscovery.discover_via_dns(
        "code-review", "agents.example.com"
    )
    # OpciΓ³n 2: Registry
    agents = await AgentDiscovery.discover_via_registry(
        "https://registry.agents.example.com",
        capability="code-review",
    )
    return agents[0] if agents else None

Rate limiting entre agentes

Para evitar que un agente cliente sature a un agente servidor, implementa rate limiting tanto en el lado cliente (para ser buen ciudadano) como en el servidor (para protegerse).

python
# a2a_server/rate_limiter.py
import time
import asyncio
from collections import defaultdict
from dataclasses import dataclass, field

@dataclass
class TokenBucketLimiter:
    """Rate limiter por agente cliente usando token bucket."""
    max_tokens: int = 10           # Requests mΓ‘ximos en rΓ‘faga
    refill_rate: float = 2.0       # Tokens por segundo
    buckets: dict = field(default_factory=lambda: defaultdict(lambda: {
        "tokens": 10, "last_refill": time.time()
    }))

    def allow_request(self, client_id: str) -> bool:
        """Verificar si el cliente puede enviar un request."""
        bucket = self.buckets[client_id]
        now = time.time()

        # Rellenar tokens basado en tiempo transcurrido
        elapsed = now - bucket["last_refill"]
        bucket["tokens"] = min(
            self.max_tokens,
            bucket["tokens"] + elapsed * self.refill_rate,
        )
        bucket["last_refill"] = now

        if bucket["tokens"] >= 1:
            bucket["tokens"] -= 1
            return True
        return False

# ── Middleware de rate limiting para el servidor A2A ──
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse

limiter = TokenBucketLimiter(max_tokens=20, refill_rate=5.0)

class RateLimitMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        # Identificar cliente por JWT subject o IP
        client_id = request.headers.get("X-Agent-ID", request.client.host)

        if not limiter.allow_request(client_id):
            return JSONResponse(
                status_code=429,
                content={
                    "jsonrpc": "2.0",
                    "error": {
                        "code": -32000,
                        "message": "Rate limit exceeded. Retry after 1 second.",
                    },
                },
                headers={"Retry-After": "1"},
            )
        return await call_next(request)

Cliente A2A autenticado y seguro

Ejemplo completo de un cliente A2A que integra autenticaciΓ³n OAuth2, descubrimiento automΓ‘tico y rate limiting del lado cliente.

python
# a2a_client/secure_client.py
import httpx
import asyncio
from a2a_client.auth import OAuth2AgentAuth
from a2a_client.discovery import AgentDiscovery

class SecureA2AClient:
    """Cliente A2A con autenticaciΓ³n, descubrimiento y retry."""

    def __init__(
        self,
        auth: OAuth2AgentAuth,
        registry_url: str = "https://registry.agents.example.com",
        max_retries: int = 3,
        verify_ssl: str | bool = True,  # Path a CA bundle o True
    ):
        self.auth = auth
        self.registry_url = registry_url
        self.max_retries = max_retries
        self.verify_ssl = verify_ssl

    async def send_task(
        self,
        capability: str,
        task_id: str,
        message_text: str,
    ) -> dict:
        """Descubrir agente, autenticar y enviar tarea."""
        # 1. Descubrir agente con la capacidad requerida
        auth_headers = await self.auth.auth_headers()
        agents = await AgentDiscovery.discover_via_registry(
            self.registry_url, capability, auth_headers
        )
        if not agents:
            raise RuntimeError(f"No se encontraron agentes para: {capability}")

        agent_url = agents[0]["url"]

        # 2. Construir request A2A
        request = {
            "jsonrpc": "2.0",
            "id": f"req-{task_id}",
            "method": "tasks/send",
            "params": {
                "id": task_id,
                "message": {
                    "role": "user",
                    "parts": [{"type": "text", "text": message_text}],
                },
            },
        }

        # 3. Enviar con retry y autenticaciΓ³n
        for attempt in range(self.max_retries):
            try:
                async with httpx.AsyncClient(verify=self.verify_ssl) as client:
                    response = await client.post(
                        agent_url,
                        json=request,
                        headers=auth_headers,
                        timeout=30.0,
                    )

                    if response.status_code == 401:
                        # Token expirado β€” renovar y reintentar
                        self.auth._token = None
                        auth_headers = await self.auth.auth_headers()
                        continue

                    if response.status_code == 429:
                        # Rate limited β€” esperar y reintentar
                        retry_after = int(response.headers.get("Retry-After", "2"))
                        await asyncio.sleep(retry_after)
                        continue

                    response.raise_for_status()
                    return response.json()

            except httpx.ConnectError:
                if attempt < self.max_retries - 1:
                    await asyncio.sleep(2 ** attempt)
                    continue
                raise

        raise RuntimeError(f"FallΓ³ tras {self.max_retries} intentos")

# ── Uso completo ──
async def main():
    auth = OAuth2AgentAuth(
        token_url="https://auth.example.com/oauth2/token",
        client_id="orchestrator-agent",
        client_secret="secret-from-vault",
        scopes=["a2a:tasks:send", "a2a:tasks:read"],
    )

    client = SecureA2AClient(
        auth=auth,
        registry_url="https://registry.agents.example.com",
        verify_ssl="/etc/ssl/certs/ca-bundle.crt",
    )

    result = await client.send_task(
        capability="code-review",
        task_id="task-789",
        message_text="Revisa este mΓ³dulo de autenticaciΓ³n para vulnerabilidades SQL injection",
    )
    print(f"Task result: {result}")

asyncio.run(main())
Capa de seguridadMecanismoProtege contra
AutenticaciΓ³nOAuth2 client credentials + JWTAgentes no autorizados enviando tareas
Cifrado en trΓ‘nsitomTLS (TLS 1.3 con certificados mutuos)InterceptaciΓ³n de datos, man-in-the-middle
DescubrimientoDNS SRV + Agent Card validationAgentes falsos o endpoints incorrectos
Rate limitingToken bucket por agente clienteDenegaciΓ³n de servicio, saturaciΓ³n
AutorizaciΓ³nOAuth2 scopes por skill del agenteAcceso a capacidades no permitidas

mTLS en producciΓ³n

Para comunicaciΓ³n entre agentes en entornos empresariales, mTLS es imprescindible. Cada agente tiene su propio certificado cliente emitido por una CA interna. En Kubernetes, usa cert-manager para automatizar la emisiΓ³n y rotaciΓ³n de certificados. Sin mTLS, cualquier servicio en la red podrΓ­a hacerse pasar por un agente legΓ­timo.

END OF DOCUMENT

ΒΏNecesitas mΓ‘s? Volver a la LibrerΓ­a →