Ir al contenido

Construyendo un Pipeline de Embeddings en Producción con MongoDB Atlas y Voyage AI

Generar embeddings es la parte fácil. Mantenerlos sincronizados a medida que tus datos cambian — a escala, sin tiempo de inactividad — es donde reside la verdadera ingeniería. Aquí te explicamos cómo construir el pipeline completo en MongoDB Atlas.

Durval Pereira
Durval Pereira
11 min

El problema del pipeline

Si has leído sobre vector search, conoces la propuesta: convierte tus datos en embeddings, indexa y, de repente, tu búsqueda entiende el significado en lugar de solo coincidir con palabras clave. El concepto es simple. La implementación en producción no lo es.

La brecha entre una demo funcional y un pipeline de producción se reduce a tres preguntas que la mayoría de los tutoriales omiten:

  1. ¿Cómo generas embeddings para una colección existente con millones de documentos? No puedes enviarlos todos a la vez.
  2. ¿Cómo mantienes los embeddings sincronizados cuando los documentos cambian? Los embeddings obsoletos son peores que no tener embeddings — devuelven resultados erróneos con confianza.
  3. ¿Cómo manejas los rate limits, fallos y reintentos sin perder datos?

Este artículo detalla un pipeline de embeddings completo, de grado de producción, construido en MongoDB Atlas con Voyage AI — desde la generación inicial por lotes hasta la sincronización automatizada en tiempo real a través de Atlas Triggers.

Eligiendo un modelo de embedding

Voyage AI ofrece una serie de modelos de embedding optimizados para diferentes compensaciones. MongoDB Atlas se integra de forma nativa con Voyage AI, lo que significa que la facturación se gestiona a través de tu cuenta de Atlas y no hay una relación de proveedor separada que administrar.

Los modelos de la serie 4 son la generación actual:

ModeloLongitud de ContextoDimensiones PredeterminadasMejor Para
voyage-4-large32,000 tokens1024Mayor calidad de recuperación, multilingüe
voyage-432,000 tokens1024Equilibrio entre calidad y rendimiento
voyage-4-lite32,000 tokens1024Menor latencia y costo

Todos los modelos de la serie 4 admiten dimensiones de salida flexibles — 256, 512, 1024 o 2048 — y los embeddings generados por cualquier modelo de la serie 4 son compatibles entre sí. Esto significa que puedes empezar con voyage-4-lite durante el desarrollo y cambiar a voyage-4-large en producción sin necesidad de reindexar.

También hay modelos específicos de dominio que vale la pena considerar:

  • voyage-code-3 para código y documentación técnica
  • voyage-finance-2 para documentos financieros
  • voyage-law-2 para texto legal

Para la mayoría de los casos de uso general, voyage-4-large ofrece la mejor calidad de recuperación, y voyage-4 es el valor predeterminado pragmático.

Paso 1: Generación de embeddings por lotes

El primer paso es generar embeddings para tus documentos existentes. Esta es una operación única (con la sincronización continua gestionada por separado), y debe ser eficiente — no querrás hacer una llamada API por documento cuando tienes cientos de miles de registros.

import voyageai
from pymongo import MongoClient, UpdateOne

MONGO_URI = "mongodb+srv://<user>:<password>@<cluster-url>/"
VOYAGE_API_KEY = "<your-voyage-api-key>"
DATABASE = "myapp"
COLLECTION = "articles"

MODEL = "voyage-4-large"
DIMENSIONS = 1024
BATCH_SIZE = 128

client = MongoClient(MONGO_URI)
collection = client[DATABASE][COLLECTION]
vo = voyageai.Client(api_key=VOYAGE_API_KEY)

docs = list(collection.find({}, {"_id": 1, "content": 1}))
texts = [doc["content"] for doc in docs]
print(f"Retrieved {len(docs)} documents for embedding generation.")

El detalle clave es el procesamiento por lotes. Voyage AI admite solicitudes por lotes, y procesar 128 documentos a la vez es drásticamente más eficiente que las llamadas individuales:

all_embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
    batch = texts[i : i + BATCH_SIZE]
    result = vo.embed(
        batch,
        model=MODEL,
        input_type="document",
        output_dimension=DIMENSIONS
    )
    all_embeddings.extend(result.embeddings)
    print(f"  Processed {min(i + BATCH_SIZE, len(texts))}/{len(texts)} documents...")

print(f"Generated {len(all_embeddings)} embeddings of dimension {len(all_embeddings[0])}.")

Observa input_type="document". Esto es crítico. Voyage AI optimiza los embeddings de manera diferente según si la entrada es un documento almacenado o una consulta de búsqueda. Internamente, el modelo antepone contexto como "Represent the document for retrieval:" a la entrada. Utiliza siempre "document" para datos almacenados y "query" para consultas de búsqueda — omitir este parámetro degrada la calidad de la recuperación.

Escribiendo embeddings de vuelta a MongoDB

Utiliza escrituras masivas (bulk writes) para almacenar embeddings de manera eficiente:

bulk_ops = [
    UpdateOne(
        {"_id": doc["_id"]},
        {"$set": {"embedding": emb}}
    )
    for doc, emb in zip(docs, all_embeddings)
]

result = collection.bulk_write(bulk_ops)
print(f"Updated {result.modified_count} documents with embeddings.")

Una operación bulk_write con operaciones UpdateOne es órdenes de magnitud más rápida que las llamadas individuales a update_one. Para una colección con 500,000 documentos, la diferencia es de minutos frente a horas.

Con los embeddings almacenados, necesitas un índice de Atlas Vector Search para consultarlos. Créalo desde la UI de Atlas o a través de la CLI de Atlas:

{
  "fields": [
    {
      "type": "vector",
      "path": "embedding",
      "numDimensions": 1024,
      "similarity": "cosine"
    }
  ]
}

Algunas cosas a tener en cuenta:

  • numDimensions debe coincidir exactamente. Si generaste embeddings de 1024 dimensiones, el índice debe especificar 1024. Una discrepancia hará que las consultas fallen silenciosamente o devuelvan resultados basura.
  • cosine es la elección predeterminada para los embeddings de Voyage AI. Otras opciones son euclidean y dotProduct, pero cosine se recomienda para la mayoría de los casos de uso de embeddings de texto.
  • El índice debe alcanzar el estado Active antes de que las consultas $vectorSearch funcionen. Esto puede tardar unos minutos para colecciones grandes.

Ejecutando una consulta de búsqueda

query = "How does MongoDB handle distributed transactions?"

query_embedding = vo.embed(
    [query],
    model=MODEL,
    input_type="query",
    output_dimension=DIMENSIONS
).embeddings[0]

results = collection.aggregate([
    {
        "$vectorSearch": {
            "index": "vector_index",
            "path": "embedding",
            "queryVector": query_embedding,
            "numCandidates": 150,
            "limit": 5
        }
    },
    {
        "$project": {
            "content": 1,
            "score": {"$meta": "vectorSearchScore"},
            "_id": 0
        }
    }
])

for doc in results:
    print(f"Score: {doc['score']:.4f} | {doc['content'][:100]}...")

El parámetro numCandidates controla el equilibrio entre precisión y rendimiento. Valores más altos (150-200) ofrecen resultados más precisos a costa de una latencia ligeramente mayor. Para la mayoría de las aplicaciones, 100-150 es el punto óptimo.

Paso 3: Manteniendo los embeddings sincronizados

Aquí es donde la mayoría de los tutoriales terminan y la mayoría de los problemas de producción comienzan. Tus datos cambian — se insertan, actualizan, eliminan documentos. Si los embeddings no se regeneran cuando el texto fuente cambia, tu búsqueda se desincroniza silenciosamente.

Los Atlas Triggers de MongoDB resuelven esto con dos estrategias complementarias:

EstrategiaTipo de TriggerMejor Para
Tiempo realDatabase Trigger (Change Stream)Inserciones y actualizaciones individuales a medida que ocurren
Lote programadoScheduled Trigger (Cron)Reprocesamiento masivo durante horas de menor actividad
HíbridoAmbosRespuesta inmediata + puesta al día periódica

Opción A: Database Trigger en tiempo real

Un Database Trigger se activa en cada inserción o actualización de la colección. Detecta el cambio a través de Change Streams y regenera el embedding inmediatamente.

Configuración en Atlas:

ParámetroValor
Tipo de TriggerDatabase
Tipos de OperaciónInsert, Update
Documento CompletoHabilitado

El detalle crítico es evitar bucles infinitos. Cuando el trigger actualiza el campo embedding, esa actualización a su vez activaría la función de nuevo. La solución es un filtro de coincidencia que solo se activa cuando cambia el campo de texto fuente:

{
  "updateDescription.updatedFields.content": { "$exists": true }
}

Este filtro se aplica solo a las operaciones de Update — las operaciones de Insert siempre se activan. Ahora la función del trigger:

exports = async function(changeEvent) {
  const MODEL = "voyage-4-large";
  const DIMENSIONS = 1024;
  const DB_NAME = "myapp";
  const COLL_NAME = "articles";
  const MODEL_API_KEY = context.values.get("voyageModelApiKey");
  const API_URL = "https://ai.mongodb.com/v1/embeddings";

  const doc = changeEvent.fullDocument;

  if (!doc || !doc.content) {
    console.log("Document missing 'content' field. Skipping.");
    return;
  }

  if (changeEvent.operationType === "update") {
    const updatedFields = changeEvent.updateDescription.updatedFields;
    if (!updatedFields || !updatedFields.hasOwnProperty("content")) {
      console.log("Source field not modified. Skipping.");
      return;
    }
  }

  try {
    const response = await context.http.post({
      url: API_URL,
      headers: {
        "Authorization": [`Bearer ${MODEL_API_KEY}`],
        "Content-Type": ["application/json"]
      },
      body: JSON.stringify({
        input: [doc.content],
        model: MODEL,
        input_type: "document",
        output_dimension: DIMENSIONS
      })
    });

    if (response.statusCode !== 200) {
      console.error(`API returned HTTP ${response.statusCode}`);
      return;
    }

    const parsed = JSON.parse(response.body.text());
    if (!parsed.data || parsed.data.length === 0) {
      console.error("API returned no embeddings.");
      return;
    }

    const collection = context.services
      .get("mongodb-atlas")
      .db(DB_NAME)
      .collection(COLL_NAME);

    await collection.updateOne(
      { _id: doc._id },
      { $set: { embedding: parsed.data[0].embedding } }
    );

    console.log(`Embedding updated for document: ${doc._id}`);
  } catch (error) {
    console.error(`Failed to generate embedding for ${doc._id}: ${error}`);
  }
};

Opción B: Scheduled batch trigger

Para cargas de trabajo con altos volúmenes de escritura, un trigger en tiempo real por documento puede no ser práctico. Un Scheduled Trigger se ejecuta en un cron schedule — cada hora, cada noche, cada 30 minutos — y reprocesa todos los documentos que necesitan nuevos embeddings.

El enfoque: marcar los documentos que necesitan reprocesamiento y luego embeberlos por lotes:

exports = async function() {
  const MODEL = "voyage-4-large";
  const DIMENSIONS = 1024;
  const DB_NAME = "myapp";
  const COLL_NAME = "articles";
  const BATCH_SIZE = 128;
  const DOC_LIMIT = 1000;
  const MODEL_API_KEY = context.values.get("voyageModelApiKey");
  const API_URL = "https://ai.mongodb.com/v1/embeddings";

  const collection = context.services
    .get("mongodb-atlas")
    .db(DB_NAME)
    .collection(COLL_NAME);

  const docs = await collection.find({
    $or: [
      { embedding: { $exists: false } },
      { embeddingStale: true }
    ]
  }).limit(DOC_LIMIT).toArray();

  if (docs.length === 0) {
    console.log("No documents need embedding updates.");
    return;
  }

  console.log(`Found ${docs.length} documents to process.`);
  let totalUpdated = 0;

  for (let i = 0; i < docs.length; i += BATCH_SIZE) {
    const batch = docs.slice(i, i + BATCH_SIZE);
    const texts = batch.map(d => d.content).filter(Boolean);
    if (texts.length === 0) continue;

    try {
      const response = await context.http.post({
        url: API_URL,
        headers: {
          "Authorization": [`Bearer ${MODEL_API_KEY}`],
          "Content-Type": ["application/json"]
        },
        body: JSON.stringify({
          input: texts,
          model: MODEL,
          input_type: "document",
          output_dimension: DIMENSIONS
        })
      });

      if (response.statusCode === 429) {
        console.warn(`Rate limit hit at batch ${Math.floor(i / BATCH_SIZE) + 1}. Stopping.`);
        break;
      }

      if (response.statusCode !== 200) {
        console.error(`HTTP ${response.statusCode} at batch ${Math.floor(i / BATCH_SIZE) + 1}`);
        continue;
      }

      const parsed = JSON.parse(response.body.text());
      if (!parsed.data || parsed.data.length === 0) continue;

      const ops = [];
      for (let j = 0; j < batch.length; j++) {
        if (parsed.data[j]) {
          ops.push({
            updateOne: {
              filter: { _id: batch[j]._id },
              update: {
                $set: {
                  embedding: parsed.data[j].embedding,
                  embeddingUpdatedAt: new Date()
                },
                $unset: { embeddingStale: "" }
              }
            }
          });
        }
      }

      if (ops.length > 0) {
        const result = await collection.bulkWrite(ops);
        totalUpdated += result.modifiedCount;
      }
    } catch (error) {
      console.error(`Error in batch ${Math.floor(i / BATCH_SIZE) + 1}: ${error}`);
    }
  }

  console.log(`Done. Updated ${totalUpdated} documents.`);
};

Cron schedules comunes:

ExpresiónFrecuencia
0 */1 * * *Cada hora
0 2 * * *Diariamente a las 2:00 AM
*/30 * * * *Cada 30 minutos
0 0 * * *Diariamente a medianoche

El límite DOC_LIMIT asegura que cada ejecución procese un número acotado de documentos. Si hay más de 1,000 documentos obsoletos, la siguiente ejecución programada recogerá el resto. Esto evita que una sola ejecución exceda el tiempo de espera o los rate limits.

Rate limits y consideraciones operativas

Voyage AI impone rate limits que debes considerar en tu diseño:

ModeloTokens por MinutoSolicitudes por Minuto
voyage-4-large3,000,0002,000
voyage-48,000,0002,000
voyage-4-lite16,000,0002,000

Estos son límites de Nivel 1 — se duplican en el Nivel 2 y se triplican en el Nivel 3 a medida que tu uso crece. Si tu trigger alcanza un 429 (Rate Limit Exceeded), el enfoque de lote programado lo maneja con elegancia deteniéndose y reintentando en la siguiente ejecución programada. El trigger en tiempo real debe registrar el error y depender del trigger programado como mecanismo de puesta al día.

Algunas de las mejores prácticas operativas:

  • Especifica siempre input_type. Omitirlo funciona, pero produce embeddings de menor calidad. Usa "document" para datos almacenados, "query" para consultas de búsqueda.
  • Monitoriza la obsolescencia de los embeddings. Haz un seguimiento de la proporción de documentos con embeddingStale: true frente al total de documentos. Si crece más rápido de lo que tu trigger programado puede procesar, aumenta la frecuencia o el tamaño del lote.
  • Usa la estrategia híbrida. Los triggers en tiempo real manejan el caso común (inserciones/actualizaciones individuales). Los triggers programados manejan los casos excepcionales (importaciones masivas, actualizaciones perdidas, recuperación de fallos). Juntos, proporcionan tanto frescura como fiabilidad.

La arquitectura completa

Juntándolo todo, el pipeline de producción se ve así:

  1. Carga inicial: Un script de Python por lotes genera embeddings para todos los documentos existentes y los escribe de vuelta a través de bulk_write.
  2. Creación del índice: Índice de Atlas Vector Search en el campo embedding con similitud de coseno.
  3. Sincronización en tiempo real: Un Database Trigger regenera los embeddings en inserciones/actualizaciones, filtrado para evitar bucles infinitos.
  4. Puesta al día programada: Un Scheduled Trigger reprocesa cualquier documento marcado como obsoleto o con embeddings faltantes.
  5. Tiempo de consulta: Agregación $vectorSearch con input_type="query" para el término de búsqueda del usuario.

Sin infraestructura externa. Sin una base de datos vectorial separada. Sin trabajos de sincronización entre sistemas. Todo el pipeline se ejecuta en MongoDB Atlas — embeddings, índices, triggers y consultas — lo que significa una única plataforma para monitorizar, un único conjunto de credenciales para gestionar y una única relación de facturación.

Esa es la parte que más importa en producción. No si la demo funciona, sino si el sistema se mantiene correcto a las 3 AM cuando se ejecuta un trabajo de importación masiva y 50,000 documentos necesitan nuevos embeddings.


Este artículo es un complemento de Por qué tu búsqueda no devuelve nada, que cubre el caso conceptual de vector search sobre la coincidencia de palabras clave.

Etiquetasmongodbvector-searchembeddingsvoyage-aiatlas-triggers