Pular para o conteúdo

Construindo um Pipeline de Embeddings em Produção com MongoDB Atlas e Voyage AI

Gerar embeddings é a parte fácil. Mantê-los sincronizados à medida que seus dados mudam — em escala, sem tempo de inatividade — é onde a verdadeira engenharia reside. Veja como construir o pipeline completo no MongoDB Atlas.

Durval Pereira
Durval Pereira
11 min

O problema do pipeline

Se você já leu sobre vector search, conhece a proposta: converta seus dados em embeddings, indexe-os e, de repente, sua busca entende o significado em vez de apenas corresponder a palavras-chave. O conceito é simples. A implementação em produção, não.

A lacuna entre uma demo funcional e um pipeline de produção se resume a três perguntas que a maioria dos tutoriais ignora:

  1. Como você gera embeddings para uma coleção existente com milhões de documentos? Você não pode enviá-los todos de uma vez.
  2. Como você mantém os embeddings sincronizados quando os documentos mudam? Embeddings desatualizados são piores do que não ter embeddings — eles retornam resultados confiantemente errados.
  3. Como você lida com rate limits, falhas e retentativas sem perder dados?

Este artigo detalha um pipeline de embeddings completo, de nível de produção, construído no MongoDB Atlas com Voyage AI — desde a geração inicial em batch até a sincronização automatizada em tempo real via Atlas Triggers.

Escolhendo um modelo de embedding

A Voyage AI oferece uma série de modelos de embedding otimizados para diferentes trade-offs. O MongoDB Atlas se integra nativamente com a Voyage AI, o que significa que a cobrança é feita através da sua conta Atlas e não há um relacionamento separado com o fornecedor para gerenciar.

Os modelos da série 4 são a geração atual:

ModelContext LengthDefault DimensionsBest For
voyage-4-large32,000 tokens1024Maior qualidade de recuperação, multilíngue
voyage-432,000 tokens1024Equilíbrio entre qualidade e throughput
voyage-4-lite32,000 tokens1024Menor latência e custo

Todos os modelos da série 4 suportam dimensões de saída flexíveis — 256, 512, 1024 ou 2048 — e os embeddings gerados por qualquer modelo da série 4 são compatíveis entre si. Isso significa que você pode começar com voyage-4-lite durante o desenvolvimento e mudar para voyage-4-large em produção sem reindexar.

Existem também modelos específicos de domínio que valem a pena considerar:

  • voyage-code-3 para código e documentação técnica
  • voyage-finance-2 para documentos financeiros
  • voyage-law-2 para texto jurídico

Para a maioria dos casos de uso de propósito geral, voyage-4-large oferece a melhor qualidade de recuperação, e voyage-4 é o padrão pragmático.

Passo 1: Geração de embeddings em batch

O primeiro passo é gerar embeddings para seus documentos existentes. Esta é uma operação única (com a sincronização contínua sendo tratada separadamente), e precisa ser eficiente — você não quer fazer uma chamada de API por documento quando tem centenas de milhares de registros.

import voyageai
from pymongo import MongoClient, UpdateOne

MONGO_URI = "mongodb+srv://<user>:<password>@<cluster-url>/"
VOYAGE_API_KEY = "<your-voyage-api-key>"
DATABASE = "myapp"
COLLECTION = "articles"

MODEL = "voyage-4-large"
DIMENSIONS = 1024
BATCH_SIZE = 128

client = MongoClient(MONGO_URI)
collection = client[DATABASE][COLLECTION]
vo = voyageai.Client(api_key=VOYAGE_API_KEY)

docs = list(collection.find({}, {"_id": 1, "content": 1}))
texts = [doc["content"] for doc in docs]
print(f"Retrieved {len(docs)} documents for embedding generation.")

O detalhe chave é o batching. A Voyage AI suporta requisições em batch, e processar 128 documentos por vez é dramaticamente mais eficiente do que chamadas um-a-um:

all_embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
    batch = texts[i : i + BATCH_SIZE]
    result = vo.embed(
        batch,
        model=MODEL,
        input_type="document",
        output_dimension=DIMENSIONS
    )
    all_embeddings.extend(result.embeddings)
    print(f"  Processed {min(i + BATCH_SIZE, len(texts))}/{len(texts)} documents...")

print(f"Generated {len(all_embeddings)} embeddings of dimension {len(all_embeddings[0])}.")

Observe input_type="document". Isso é crítico. A Voyage AI otimiza os embeddings de forma diferente dependendo se a entrada é um documento armazenado ou uma query de busca. Internamente, o modelo adiciona contexto como "Represent the document for retrieval:" à entrada. Sempre use "document" para dados armazenados e "query" para queries de busca — omitir este parâmetro degrada a qualidade da recuperação.

Escrevendo embeddings de volta para o MongoDB

Use bulk writes para armazenar embeddings de forma eficiente:

bulk_ops = [
    UpdateOne(
        {"_id": doc["_id"]},
        {"$set": {"embedding": emb}}
    )
    for doc, emb in zip(docs, all_embeddings)
]

result = collection.bulk_write(bulk_ops)
print(f"Updated {result.modified_count} documents with embeddings.")

Um bulk_write com operações UpdateOne é ordens de magnitude mais rápido do que chamadas individuais update_one. Para uma coleção com 500.000 documentos, a diferença é de minutos versus horas.

Com os embeddings armazenados, você precisa de um Atlas Vector Search index para consultá-los. Crie-o a partir da UI do Atlas ou via Atlas CLI:

{
  "fields": [
    {
      "type": "vector",
      "path": "embedding",
      "numDimensions": 1024,
      "similarity": "cosine"
    }
  ]
}

Alguns pontos importantes a acertar:

  • numDimensions deve corresponder exatamente. Se você gerou embeddings de 1024 dimensões, o índice deve especificar 1024. Uma incompatibilidade fará com que as queries falhem silenciosamente ou retornem resultados incorretos.
  • cosine é a escolha padrão para embeddings da Voyage AI. Outras opções são euclidean e dotProduct, mas cosine é recomendado para a maioria dos casos de uso de text embedding.
  • O índice deve atingir o status Active antes que as queries $vectorSearch funcionem. Isso pode levar alguns minutos para coleções grandes.

Executando uma query de busca

query = "How does MongoDB handle distributed transactions?"

query_embedding = vo.embed(
    [query],
    model=MODEL,
    input_type="query",
    output_dimension=DIMENSIONS
).embeddings[0]

results = collection.aggregate([
    {
        "$vectorSearch": {
            "index": "vector_index",
            "path": "embedding",
            "queryVector": query_embedding,
            "numCandidates": 150,
            "limit": 5
        }
    },
    {
        "$project": {
            "content": 1,
            "score": {"$meta": "vectorSearchScore"},
            "_id": 0
        }
    }
])

for doc in results:
    print(f"Score: {doc['score']:.4f} | {doc['content'][:100]}...")

O parâmetro numCandidates controla o trade-off entre precisão e desempenho. Valores mais altos (150-200) fornecem resultados mais precisos ao custo de uma latência ligeiramente maior. Para a maioria das aplicações, 100-150 é o ponto ideal.

Passo 3: Mantendo os embeddings sincronizados

É aqui que a maioria dos tutoriais termina e a maioria dos problemas de produção começa. Seus dados mudam — documentos são inseridos, atualizados, excluídos. Se os embeddings não forem regenerados quando o texto fonte muda, sua busca silenciosamente sai de sincronia.

Os MongoDB Atlas Triggers resolvem isso com duas estratégias complementares:

StrategyTrigger TypeBest For
Real-timeDatabase Trigger (Change Stream)Inserções e atualizações individuais à medida que acontecem
Scheduled batchScheduled Trigger (Cron)Reprocessamento em massa durante horas de menor movimento
HybridBothResposta imediata + atualização periódica

Opção A: Real-time Database Trigger

Um Database Trigger é acionado em cada inserção ou atualização na coleção. Ele detecta a mudança via Change Streams e regenera o embedding imediatamente.

Configuração no Atlas:

ParameterValue
Trigger TypeDatabase
Operation TypesInsert, Update
Full DocumentHabilitado

O detalhe crítico é evitar loops infinitos. Quando o trigger atualiza o campo embedding, essa atualização por si só acionaria a função novamente. A solução é um filtro de correspondência que só é acionado quando o campo de texto fonte muda:

{
  "updateDescription.updatedFields.content": { "$exists": true }
}

Este filtro se aplica apenas a operações de Update — operações de Insert sempre são acionadas. Agora, a função do trigger:

exports = async function(changeEvent) {
  const MODEL = "voyage-4-large";
  const DIMENSIONS = 1024;
  const DB_NAME = "myapp";
  const COLL_NAME = "articles";
  const MODEL_API_KEY = context.values.get("voyageModelApiKey");
  const API_URL = "https://ai.mongodb.com/v1/embeddings";

  const doc = changeEvent.fullDocument;

  if (!doc || !doc.content) {
    console.log("Document missing 'content' field. Skipping.");
    return;
  }

  if (changeEvent.operationType === "update") {
    const updatedFields = changeEvent.updateDescription.updatedFields;
    if (!updatedFields || !updatedFields.hasOwnProperty("content")) {
      console.log("Source field not modified. Skipping.");
      return;
    }
  }

  try {
    const response = await context.http.post({
      url: API_URL,
      headers: {
        "Authorization": [`Bearer ${MODEL_API_KEY}`],
        "Content-Type": ["application/json"]
      },
      body: JSON.stringify({
        input: [doc.content],
        model: MODEL,
        input_type: "document",
        output_dimension: DIMENSIONS
      })
    });

    if (response.statusCode !== 200) {
      console.error(`API returned HTTP ${response.statusCode}`);
      return;
    }

    const parsed = JSON.parse(response.body.text());
    if (!parsed.data || parsed.data.length === 0) {
      console.error("API returned no embeddings.");
      return;
    }

    const collection = context.services
      .get("mongodb-atlas")
      .db(DB_NAME)
      .collection(COLL_NAME);

    await collection.updateOne(
      { _id: doc._id },
      { $set: { embedding: parsed.data[0].embedding } }
    );

    console.log(`Embedding updated for document: ${doc._id}`);
  } catch (error) {
    console.error(`Failed to generate embedding for ${doc._id}: ${error}`);
  }
};

Opção B: Scheduled batch trigger

Para cargas de trabalho com alto volume de escrita, um trigger em tempo real por documento pode não ser prático. Um Scheduled Trigger é executado em um cron schedule — a cada hora, diariamente à noite, a cada 30 minutos — e reprocessa todos os documentos que precisam de novos embeddings.

A abordagem: sinalizar documentos que precisam de reprocessamento e, em seguida, incorporá-los em batch:

exports = async function() {
  const MODEL = "voyage-4-large";
  const DIMENSIONS = 1024;
  const DB_NAME = "myapp";
  const COLL_NAME = "articles";
  const BATCH_SIZE = 128;
  const DOC_LIMIT = 1000;
  const MODEL_API_KEY = context.values.get("voyageModelApiKey");
  const API_URL = "https://ai.mongodb.com/v1/embeddings";

  const collection = context.services
    .get("mongodb-atlas")
    .db(DB_NAME)
    .collection(COLL_NAME);

  const docs = await collection.find({
    $or: [
      { embedding: { $exists: false } },
      { embeddingStale: true }
    ]
  }).limit(DOC_LIMIT).toArray();

  if (docs.length === 0) {
    console.log("No documents need embedding updates.");
    return;
  }

  console.log(`Found ${docs.length} documents to process.`);
  let totalUpdated = 0;

  for (let i = 0; i < docs.length; i += BATCH_SIZE) {
    const batch = docs.slice(i, i + BATCH_SIZE);
    const texts = batch.map(d => d.content).filter(Boolean);
    if (texts.length === 0) continue;

    try {
      const response = await context.http.post({
        url: API_URL,
        headers: {
          "Authorization": [`Bearer ${MODEL_API_KEY}`],
          "Content-Type": ["application/json"]
        },
        body: JSON.stringify({
          input: texts,
          model: MODEL,
          input_type: "document",
          output_dimension: DIMENSIONS
        })
      });

      if (response.statusCode === 429) {
        console.warn(`Rate limit hit at batch ${Math.floor(i / BATCH_SIZE) + 1}. Stopping.`);
        break;
      }

      if (response.statusCode !== 200) {
        console.error(`HTTP ${response.statusCode} at batch ${Math.floor(i / BATCH_SIZE) + 1}`);
        continue;
      }

      const parsed = JSON.parse(response.body.text());
      if (!parsed.data || parsed.data.length === 0) continue;

      const ops = [];
      for (let j = 0; j < batch.length; j++) {
        if (parsed.data[j]) {
          ops.push({
            updateOne: {
              filter: { _id: batch[j]._id },
              update: {
                $set: {
                  embedding: parsed.data[j].embedding,
                  embeddingUpdatedAt: new Date()
                },
                $unset: { embeddingStale: "" }
              }
            }
          });
        }
      }

      if (ops.length > 0) {
        const result = await collection.bulkWrite(ops);
        totalUpdated += result.modifiedCount;
      }
    } catch (error) {
      console.error(`Error in batch ${Math.floor(i / BATCH_SIZE) + 1}: ${error}`);
    }
  }

  console.log(`Done. Updated ${totalUpdated} documents.`);
};

Cron schedules comuns:

ExpressionFrequency
0 */1 * * *A cada hora
0 2 * * *Diariamente às 2:00 AM
*/30 * * * *A cada 30 minutos
0 0 * * *Diariamente à meia-noite

O limite DOC_LIMIT garante que cada execução processe um número limitado de documentos. Se houver mais de 1.000 documentos desatualizados, a próxima execução agendada processará o restante. Isso evita que uma única execução exceda o tempo limite ou os rate limits.

Rate limits e preocupações operacionais

A Voyage AI impõe rate limits que você precisa considerar no design:

ModelTokens Per MinuteRequests Per Minute
voyage-4-large3,000,0002,000
voyage-48,000,0002,000
voyage-4-lite16,000,0002,000

Estes são os limites do Tier 1 — eles dobram no Tier 2 e triplicam no Tier 3 à medida que seu uso cresce. Se seu trigger atingir um 429 (Rate Limit Exceeded), a abordagem de batch agendado lida com isso de forma elegante, parando e tentando novamente na próxima execução agendada. O trigger em tempo real deve registrar o erro e depender do trigger agendado como um mecanismo de recuperação.

Algumas boas práticas operacionais:

  • Sempre especifique input_type. Omiti-lo funciona, mas produz embeddings de menor qualidade. Use "document" para dados armazenados, "query" para queries de busca.
  • Monitore a desatualização dos embeddings. Acompanhe a proporção de documentos com embeddingStale: true versus o total de documentos. Se crescer mais rápido do que seu trigger agendado pode processar, aumente a frequência ou o tamanho do batch.
  • Use a estratégia híbrida. Triggers em tempo real lidam com o caso comum (inserções/atualizações individuais). Triggers agendados lidam com os casos de borda (importações em massa, atualizações perdidas, recuperação de falhas). Juntos, eles fornecem tanto frescor quanto confiabilidade.

A arquitetura completa

Juntando tudo, o pipeline de produção se parece com isto:

  1. Carga inicial: Script Python batch gera embeddings para todos os documentos existentes e os escreve de volta via bulk_write.
  2. Criação do índice: Atlas Vector Search index no campo embedding com similaridade de cosseno.
  3. Sincronização em tempo real: Database Trigger regenera embeddings na inserção/atualização, filtrado para evitar loops infinitos.
  4. Atualização agendada: Scheduled Trigger reprocessa quaisquer documentos sinalizados como desatualizados ou com embeddings ausentes.
  5. Tempo de query: Agregação $vectorSearch com input_type="query" para o termo de busca do usuário.

Nenhuma infraestrutura externa. Nenhum banco de dados vetorial separado. Nenhum job de sincronização entre sistemas. Todo o pipeline é executado no MongoDB Atlas — embeddings, índices, triggers e queries — o que significa uma única plataforma para monitorar, um único conjunto de credenciais para gerenciar e um único relacionamento de faturamento.

Essa é a parte que mais importa em produção. Não se a demo funciona, mas se o sistema permanece correto às 3 da manhã quando um job de importação em massa é executado e 50.000 documentos precisam de novos embeddings.


Este artigo é um complemento a Por Que Sua Busca Não Retorna Nada, que aborda o caso conceitual para vector search em vez de correspondência de palavras-chave.

Tagsmongodbvector-searchembeddingsvoyage-aiatlas-triggers