Construyendo Tu Primer Servidor MCP: Guia de Produccion para Ingenieros Backend
MCP es el protocolo que conecta agentes de IA con el mundo real, pero la mayoria de tutoriales se quedan en hello world. Asi es como construir servidores MCP que sobrevivan produccion — con auth, manejo de errores, observabilidad, y los patrones que me hubiera gustado tener cuando empece.
Todos los tutoriales de servidores MCP terminan en el mismo lugar: una funcion Python decorada que suma dos numeros. Felicidades, tu agente de IA sabe hacer aritmetica. Ahora despliega eso a produccion donde usuarios reales envian inputs malformados, tus APIs downstream se caen a las 3 AM, y alguien eventualmente intenta hacer prompt injection para llegar a tu base de datos.
El Model Context Protocol se ha convertido en la forma estandar en que los agentes de IA interactuan con el mundo exterior. Anthropic lo lanzo en noviembre de 2024. Para marzo de 2025, OpenAI lo adopto. Google DeepMind siguio en abril. Microsoft lo integro en Azure y Microsoft 365. El registro oficial ahora lista miles de servidores, y la encuesta del Pragmatic Engineer a 46 ingenieros encontro que construir y mantener servidores MCP se esta convirtiendo en parte rutinaria del trabajo de ingenieria de software.
El protocolo gano. La pregunta ya no es si construir servidores MCP, sino como construir los que no se rompan cuando la realidad los golpee.
He estado construyendo servidores MCP para herramientas internas durante los ultimos meses, y la experiencia se parece mucho a lo que he visto con microservicios a lo largo de los anos. Las mismas preocupaciones de produccion aplican: autenticacion, manejo de errores, validacion de inputs, observabilidad, testing y degradacion elegante. Si leiste mi post sobre patrones de sistemas distribuidos, reconoceras la mentalidad. Los servidores MCP son simplemente otro servicio en tu arquitectura, y merecen el mismo rigor.
Que Es Realmente MCP
MCP es un protocolo JSON-RPC 2.0 que define como los clientes de IA (Claude, ChatGPT, Cursor, tu agente custom) se comunican con servidores que exponen tools, resources y prompts. Piensa en ello como una capa de API estandarizada entre un LLM y tu infraestructura.
La arquitectura tiene tres componentes:
- Client: La aplicacion de IA que necesita llamar herramientas externas (Claude Desktop, un plugin de IDE, tu framework de agentes)
- Server: Tu codigo que expone capacidades -- tools que el agente puede invocar, resources que puede leer, prompts que puede usar
- Transport: Como se comunican cliente y servidor --
stdiopara procesos locales, Streamable HTTP para despliegues remotos
La idea clave es que los servidores MCP no son APIs web para humanos. Son APIs para agentes de IA. El agente lee las descripciones de tus tools, decide cuando llamarlos, e interpreta los resultados. Esto significa que tus descripciones de tools, mensajes de error y estructuras de respuesta necesitan estar optimizadas para consumo del LLM, no para consumo humano.
El Servidor Basico: Lo Que Ensenan los Tutoriales
Asi se ve un servidor MCP minimo usando el SDK oficial de Python con FastMCP:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("deploy-tools")
@mcp.tool()
def get_deploy_status(service_name: str, environment: str) -> dict:
"""Get the current deployment status for a service.
Args:
service_name: The name of the service (e.g., 'api-gateway', 'auth-service')
environment: The target environment ('staging' or 'production')
Returns:
A dict with keys: service, environment, status, version, deployed_at
"""
# In reality, this calls your deployment API
status = deployment_api.get_status(service_name, environment)
return {
"service": service_name,
"environment": environment,
"status": status.state,
"version": status.version,
"deployed_at": status.timestamp.isoformat(),
}
if __name__ == "__main__":
mcp.run(transport="stdio")FastMCP maneja el protocolo JSON-RPC, genera JSON Schema a partir de tus anotaciones de tipos, y gestiona el ciclo de vida del servidor. Esta es la parte que cubren todos los tutoriales, y funciona. El problema es todo lo que dejan fuera.
Produccion 1: Validacion de Inputs
Una auditoria de 2025 de Invariant Labs encontro que el 43% de los servidores MCP tempranos contenian vulnerabilidades de inyeccion de comandos. La razon es directa: los desarrolladores pasan inputs del agente directamente a comandos shell, queries de base de datos u operaciones del filesystem sin validacion.
Los servidores MCP reciben input de agentes de IA, y los agentes de IA reciben input de usuarios. Toda la superficie de ataque de prompt injection aplica aqui. La guia OWASP para desarrollo seguro de MCP es explicita: nunca confies en parametros proporcionados por el agente.
import re
from enum import Enum
from pydantic import BaseModel, field_validator
class Environment(str, Enum):
STAGING = "staging"
PRODUCTION = "production"
class DeployStatusRequest(BaseModel):
service_name: str
environment: Environment
@field_validator("service_name")
@classmethod
def validate_service_name(cls, v: str) -> str:
if not re.match(r"^[a-z][a-z0-9\-]{1,62}[a-z0-9]$", v):
raise ValueError(
"service_name must be lowercase alphanumeric with hyphens, "
"3-64 characters"
)
return v
@mcp.tool()
def get_deploy_status(service_name: str, environment: str) -> dict:
"""Get the current deployment status for a service.
Args:
service_name: Lowercase alphanumeric with hyphens (e.g., 'api-gateway')
environment: Must be 'staging' or 'production'
"""
req = DeployStatusRequest(
service_name=service_name,
environment=environment,
)
status = deployment_api.get_status(req.service_name, req.environment.value)
return {
"service": req.service_name,
"environment": req.environment.value,
"status": status.state,
"version": status.version,
"deployed_at": status.timestamp.isoformat(),
}Usa modelos Pydantic para cada input de tool. Restringe los enums. Valida patrones. Nunca construyas comandos shell o SQL a partir de input crudo del agente. Es la misma disciplina que aplicas a cualquier API publica, pero importa mas aqui porque el caller es un LLM que puede ser manipulado a traves de prompt injection.
Produccion 2: Manejo de Errores y Degradacion Elegante
Cuando una tool falla, el agente necesita entender que salio mal y si debe reintentar. Un traceback crudo de Python es inutil para un LLM. Las respuestas de error estructuradas no son opcionales.
import logging
from mcp.server.fastmcp import FastMCP, Context
logger = logging.getLogger(__name__)
class ToolError(Exception):
def __init__(self, message: str, retryable: bool = False):
self.message = message
self.retryable = retryable
super().__init__(message)
@mcp.tool()
async def get_deploy_status(
service_name: str,
environment: str,
ctx: Context,
) -> dict:
"""Get the current deployment status for a service."""
try:
req = DeployStatusRequest(
service_name=service_name,
environment=environment,
)
except ValueError as e:
return {
"error": str(e),
"retryable": False,
"hint": "Check service_name format and environment values",
}
try:
status = await deployment_api.get_status(
req.service_name, req.environment.value
)
except deployment_api.ServiceNotFound:
return {
"error": f"Service '{req.service_name}' not found",
"retryable": False,
"hint": "Use list_services tool to see available services",
}
except deployment_api.APITimeout:
logger.warning(
"Deployment API timeout for %s/%s",
req.service_name,
req.environment.value,
)
await ctx.report_progress(0, 1, "Deployment API is slow, retrying...")
return {
"error": "Deployment API timed out",
"retryable": True,
"hint": "The deployment API is experiencing delays. Try again.",
}
except Exception:
logger.exception("Unexpected error in get_deploy_status")
return {
"error": "Internal server error",
"retryable": False,
"hint": "Contact the platform team if this persists",
}
return {
"service": req.service_name,
"environment": req.environment.value,
"status": status.state,
"version": status.version,
"deployed_at": status.timestamp.isoformat(),
}Tres principios. Primero, retorna objetos de error estructurados con campos error, retryable y hint. El agente puede usar retryable para decidir si reintenta, y hint para guiar su siguiente accion. Segundo, usa el objeto Context para reportar progreso en operaciones largas -- el cliente puede mostrar esto al usuario. Tercero, loguea todo del lado del servidor pero nunca filtres detalles internos (stack traces, strings de conexion, hostnames internos) al agente.
Esto refleja los patrones de circuit breaker y retry de sistemas distribuidos. Si tu API downstream esta caida, el agente debe saber que puede reintentar. Si el input es invalido, reintentar no tiene sentido. Haz esta distincion explicita.
Produccion 3: Autenticacion y Autorizacion
Desde la actualizacion de la especificacion de marzo 2025, OAuth 2.1 es obligatorio para transportes MCP basados en HTTP. Para transportes stdio ejecutandose localmente, el proceso host maneja la seguridad. Pero para cualquier despliegue remoto -- que es lo que significa produccion -- necesitas auth real.
El enfoque practico para la mayoria de equipos es verificar bearer tokens en middleware en lugar de implementar un servidor OAuth completo desde cero:
import os
import jwt
from functools import wraps
from datetime import datetime, timezone
JWT_SECRET = os.environ["MCP_JWT_SECRET"]
ALLOWED_SCOPES = {"deploy:read", "deploy:write", "services:list"}
def verify_token(token: str) -> dict:
"""Verify JWT and return claims. Raises on invalid tokens."""
try:
payload = jwt.decode(
token,
JWT_SECRET,
algorithms=["HS256"],
options={"require": ["exp", "sub", "scopes"]},
)
except jwt.ExpiredSignatureError:
raise ToolError("Token expired", retryable=False)
except jwt.InvalidTokenError as e:
raise ToolError(f"Invalid token: {e}", retryable=False)
return payload
def require_scope(scope: str):
"""Decorator to enforce scope-based authorization on tools."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
ctx = kwargs.get("ctx")
if ctx is None:
raise ToolError("Missing context", retryable=False)
token = ctx.request_context.get("auth_token", "")
claims = verify_token(token)
user_scopes = set(claims.get("scopes", []))
if scope not in user_scopes:
raise ToolError(
f"Insufficient permissions. Required: {scope}",
retryable=False,
)
kwargs["_claims"] = claims
return await func(*args, **kwargs)
return wrapper
return decorator
@mcp.tool()
@require_scope("deploy:read")
async def get_deploy_status(
service_name: str,
environment: str,
ctx: Context,
_claims: dict = None,
) -> dict:
"""Get deployment status. Requires deploy:read scope."""
# _claims contains the verified JWT payload
logger.info(
"deploy_status_check user=%s service=%s env=%s",
_claims["sub"],
service_name,
environment,
)
# ... rest of implementationDefine permisos de forma granular. Un agente de monitoreo deberia tener deploy:read pero no deploy:write. Un agente de respuesta a incidentes podria necesitar deploy:write para rollbacks pero no services:delete. El OWASP MCP Top 10 lista configuraciones por defecto demasiado permisivas como una vulnerabilidad principal. El principio de minimo privilegio no es opcional.
Produccion 4: Observabilidad
Cuando un agente llama a tu servidor MCP y obtiene un resultado inesperado, necesitas rastrear toda la peticion. Quien la llamo, que parametros envio, que paso downstream, cuanto tardo y que se retorno. Sin esto, debuggear el comportamiento del agente es adivinar.
import time
import uuid
import logging
import structlog
logger = structlog.get_logger()
def with_observability(func):
"""Wrap tool calls with structured logging and timing."""
@wraps(func)
async def wrapper(*args, **kwargs):
request_id = str(uuid.uuid4())[:8]
tool_name = func.__name__
start = time.monotonic()
log = logger.bind(
request_id=request_id,
tool=tool_name,
params={
k: v for k, v in kwargs.items()
if k not in ("ctx", "_claims")
},
)
log.info("tool_invoked")
try:
result = await func(*args, **kwargs)
elapsed = time.monotonic() - start
log.info(
"tool_completed",
duration_ms=round(elapsed * 1000, 2),
has_error="error" in result if isinstance(result, dict) else False,
)
return result
except Exception as e:
elapsed = time.monotonic() - start
log.error(
"tool_failed",
duration_ms=round(elapsed * 1000, 2),
error=str(e),
)
raise
return wrapper
@mcp.tool()
@with_observability
async def get_deploy_status(service_name: str, environment: str, ctx: Context) -> dict:
"""Get the current deployment status for a service."""
# ... implementationFastMCP 3.0 (lanzado en enero de 2026) agrego instrumentacion OpenTelemetry como feature integrado, lo que te da distributed tracing de serie. Si estas corriendo multiples servidores MCP detras de un gateway, los traces de OTel te permiten seguir una peticion desde el agente, a traves del gateway, hasta cada servidor. Usalo.
Incluye limites de rate y hints de latencia en tus respuestas para que los agentes puedan presupuestar sus llamadas. Si tu tool es costosa, dilo en los metadatos de la respuesta. Un framework de agentes bien implementado usara esto para evitar bombardear endpoints lentos.
Produccion 5: Testing
No puedes lanzar un servidor MCP solo con testing manual. La buena noticia es que FastMCP hace que el testing programatico sea directo con su test client integrado:
import pytest
from mcp.server.fastmcp import FastMCP
@pytest.fixture
def mcp_server():
server = FastMCP("test-deploy-tools")
@server.tool()
async def get_deploy_status(service_name: str, environment: str) -> dict:
if service_name == "nonexistent":
return {"error": "Service not found", "retryable": False}
return {
"service": service_name,
"environment": environment,
"status": "running",
"version": "1.2.3",
}
return server
@pytest.mark.anyio
async def test_valid_deploy_status(mcp_server):
async with mcp_server.test_client() as client:
result = await client.call_tool(
"get_deploy_status",
{"service_name": "api-gateway", "environment": "staging"},
)
assert result[0].text
data = json.loads(result[0].text)
assert data["status"] == "running"
assert data["service"] == "api-gateway"
@pytest.mark.anyio
async def test_invalid_service_name(mcp_server):
async with mcp_server.test_client() as client:
result = await client.call_tool(
"get_deploy_status",
{"service_name": "nonexistent", "environment": "staging"},
)
data = json.loads(result[0].text)
assert "error" in data
assert data["retryable"] is False
@pytest.mark.anyio
async def test_tool_listing(mcp_server):
async with mcp_server.test_client() as client:
tools = await client.list_tools()
tool_names = [t.name for t in tools]
assert "get_deploy_status" in tool_namesTestea en tres niveles. Tests unitarios para tu logica de validacion y funciones de negocio. Tests de integracion usando el test client de MCP para verificar el path completo de invocacion de tools. Y testing exploratorio manual con MCP Inspector para verificar que las descripciones de tools sean lo suficientemente claras para que los agentes las usen correctamente. Si un agente usa mal tu tool consistentemente, el problema generalmente esta en tu descripcion del tool, no en el agente.
Produccion 6: Rate Limiting y Gestion de Recursos
Los servidores MCP que envuelven APIs externas heredan los rate limits de esas APIs. Un agente entusiasta puede agotar tu cuota de API en minutos si no pones limites:
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_calls: int, window_seconds: int):
self.max_calls = max_calls
self.window = window_seconds
self.calls: dict[str, list[float]] = defaultdict(list)
def check(self, key: str) -> bool:
now = time.monotonic()
window_start = now - self.window
self.calls[key] = [t for t in self.calls[key] if t > window_start]
if len(self.calls[key]) >= self.max_calls:
return False
self.calls[key].append(now)
return True
rate_limiter = RateLimiter(max_calls=30, window_seconds=60)
@mcp.tool()
async def get_deploy_status(service_name: str, environment: str, ctx: Context) -> dict:
"""Get deployment status. Rate limited to 30 calls per minute."""
caller = ctx.request_context.get("client_id", "anonymous")
if not rate_limiter.check(caller):
return {
"error": "Rate limit exceeded. Max 30 calls per minute.",
"retryable": True,
"hint": "Wait before retrying. Consider batching requests.",
"retry_after_seconds": 60,
}
# ... rest of implementationIncluye informacion de rate limit en las descripciones de tus tools para que el agente conozca las restricciones antes de encontrarselas. Retorna retry_after_seconds para que el agente pueda hacer backoff de forma inteligente. Es el mismo patron que usarias en cualquier API publica, y es aun mas importante aqui porque los agentes no tienen la intuicion para auto-limitarse.
Despliegue: Transport y Arquitectura
Para desarrollo local y configuraciones de un solo usuario, el transport stdio es la opcion mas simple. El cliente MCP lanza tu servidor como un subproceso. Sin networking, sin complejidad de auth.
Para despliegues de produccion multi-usuario, usa Streamable HTTP. El transport SSE anterior fue deprecado en la especificacion de junio 2025 a favor de Streamable HTTP, que soporta comunicacion bidireccional, escalado horizontal y resultados incrementales:
from mcp.server.fastmcp import FastMCP
mcp = FastMCP(
"deploy-tools",
host="0.0.0.0",
port=8080,
)
# Register all your tools, resources, prompts...
if __name__ == "__main__":
mcp.run(transport="streamable-http")Para produccion, pon tu servidor MCP detras de un reverse proxy (nginx, Caddy) que maneje TLS termination y gestion de certificados. El servidor en si corre HTTP plano internamente; el proxy agrega la capa de cifrado. Es el mismo patron que usas para cualquier servicio backend.
Si estas corriendo multiples servidores MCP, considera un MCP gateway que maneje routing, auth y observabilidad de forma centralizada. WorkOS y varios proyectos open source estan construyendo capas de gateway para exactamente este patron. Refleja el patron de API gateway de microservicios -- preocupaciones transversales centralizadas con logica de negocio descentralizada.
Lo Que He Aprendido Hasta Ahora
Construir servidores MCP para produccion ha reforzado patrones que ya conocia de ingenieria backend, con algunas adiciones especificas de MCP:
-
Trata los servidores MCP como microservicios. Necesitan la misma disciplina de auth, validacion, manejo de errores, logging y testing que cualquier servicio en tu arquitectura. El hecho de que el caller sea un agente de IA no reduce estos requisitos -- los incrementa.
-
Las descripciones de tools son tu documentacion de API. Los agentes deciden si llamar y como usar tus tools basandose en las descripciones que escribes. Descripciones vagas llevan a mal uso. Incluye restricciones de parametros, formatos de valores de retorno y condiciones de error directamente en el docstring.
-
Valida agresivamente. El OWASP MCP Top 10 existe por una razon. Los inputs de agentes vienen de usuarios, y los usuarios pueden ser adversarios. Modelos Pydantic, restricciones de enum y patrones regex son tu primera linea de defensa.
-
Retorna errores estructurados. Un dict de error con campos
error,retryableyhintle da al agente la informacion que necesita para recuperarse o fallar de forma elegante. Un traceback de Python no le da nada util. -
Invierte en observabilidad temprano. Cuando un agente se comporta mal, necesitas ver exactamente que envio y que retorno tu servidor. Logging estructurado con request IDs y timing es el minimo. Tracing con OpenTelemetry es la meta.
-
Empieza con
stdio, despliega con Streamable HTTP. Desarrolla y testea localmente con transport stdio. Cuando necesites acceso remoto, soporte multi-usuario o escalado horizontal, cambia a Streamable HTTP detras de un reverse proxy con TLS.
El roadmap de MCP 2026 prioriza exactamente las preocupaciones enterprise que he descrito aqui: audit trails, auth integrada con SSO, comportamiento de gateway y portabilidad de configuracion. El protocolo esta madurando rapido. Los servidores construidos sobre el necesitan madurar al mismo ritmo.
MCP no es tecnologia complicada. Es una forma estandarizada de exponer tu infraestructura existente a agentes de IA. Los desafios de produccion son los mismos que los ingenieros backend hemos estado resolviendo por decadas. Aplica lo que ya sabes.