Construindo um Pipeline de Embeddings em Produção com MongoDB Atlas e Voyage AI
Gerar embeddings é a parte fácil. Mantê-los sincronizados à medida que seus dados mudam — em escala, sem tempo de inatividade — é onde a verdadeira engenharia reside. Veja como construir o pipeline completo no MongoDB Atlas.
O problema do pipeline
Se você já leu sobre vector search, conhece a proposta: converta seus dados em embeddings, indexe-os e, de repente, sua busca entende o significado em vez de apenas corresponder a palavras-chave. O conceito é simples. A implementação em produção, não.
A lacuna entre uma demo funcional e um pipeline de produção se resume a três perguntas que a maioria dos tutoriais ignora:
- Como você gera embeddings para uma coleção existente com milhões de documentos? Você não pode enviá-los todos de uma vez.
- Como você mantém os embeddings sincronizados quando os documentos mudam? Embeddings desatualizados são piores do que não ter embeddings — eles retornam resultados confiantemente errados.
- Como você lida com rate limits, falhas e retentativas sem perder dados?
Este artigo detalha um pipeline de embeddings completo, de nível de produção, construído no MongoDB Atlas com Voyage AI — desde a geração inicial em batch até a sincronização automatizada em tempo real via Atlas Triggers.
Escolhendo um modelo de embedding
A Voyage AI oferece uma série de modelos de embedding otimizados para diferentes trade-offs. O MongoDB Atlas se integra nativamente com a Voyage AI, o que significa que a cobrança é feita através da sua conta Atlas e não há um relacionamento separado com o fornecedor para gerenciar.
Os modelos da série 4 são a geração atual:
| Model | Context Length | Default Dimensions | Best For |
|---|---|---|---|
voyage-4-large | 32,000 tokens | 1024 | Maior qualidade de recuperação, multilíngue |
voyage-4 | 32,000 tokens | 1024 | Equilíbrio entre qualidade e throughput |
voyage-4-lite | 32,000 tokens | 1024 | Menor latência e custo |
Todos os modelos da série 4 suportam dimensões de saída flexíveis — 256, 512, 1024 ou 2048 — e os embeddings gerados por qualquer modelo da série 4 são compatíveis entre si. Isso significa que você pode começar com voyage-4-lite durante o desenvolvimento e mudar para voyage-4-large em produção sem reindexar.
Existem também modelos específicos de domínio que valem a pena considerar:
voyage-code-3para código e documentação técnicavoyage-finance-2para documentos financeirosvoyage-law-2para texto jurídico
Para a maioria dos casos de uso de propósito geral, voyage-4-large oferece a melhor qualidade de recuperação, e voyage-4 é o padrão pragmático.
Passo 1: Geração de embeddings em batch
O primeiro passo é gerar embeddings para seus documentos existentes. Esta é uma operação única (com a sincronização contínua sendo tratada separadamente), e precisa ser eficiente — você não quer fazer uma chamada de API por documento quando tem centenas de milhares de registros.
import voyageai
from pymongo import MongoClient, UpdateOne
MONGO_URI = "mongodb+srv://<user>:<password>@<cluster-url>/"
VOYAGE_API_KEY = "<your-voyage-api-key>"
DATABASE = "myapp"
COLLECTION = "articles"
MODEL = "voyage-4-large"
DIMENSIONS = 1024
BATCH_SIZE = 128
client = MongoClient(MONGO_URI)
collection = client[DATABASE][COLLECTION]
vo = voyageai.Client(api_key=VOYAGE_API_KEY)
docs = list(collection.find({}, {"_id": 1, "content": 1}))
texts = [doc["content"] for doc in docs]
print(f"Retrieved {len(docs)} documents for embedding generation.")
O detalhe chave é o batching. A Voyage AI suporta requisições em batch, e processar 128 documentos por vez é dramaticamente mais eficiente do que chamadas um-a-um:
all_embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch = texts[i : i + BATCH_SIZE]
result = vo.embed(
batch,
model=MODEL,
input_type="document",
output_dimension=DIMENSIONS
)
all_embeddings.extend(result.embeddings)
print(f" Processed {min(i + BATCH_SIZE, len(texts))}/{len(texts)} documents...")
print(f"Generated {len(all_embeddings)} embeddings of dimension {len(all_embeddings[0])}.")
Observe input_type="document". Isso é crítico. A Voyage AI otimiza os embeddings de forma diferente dependendo se a entrada é um documento armazenado ou uma query de busca. Internamente, o modelo adiciona contexto como "Represent the document for retrieval:" à entrada. Sempre use "document" para dados armazenados e "query" para queries de busca — omitir este parâmetro degrada a qualidade da recuperação.
Escrevendo embeddings de volta para o MongoDB
Use bulk writes para armazenar embeddings de forma eficiente:
bulk_ops = [
UpdateOne(
{"_id": doc["_id"]},
{"$set": {"embedding": emb}}
)
for doc, emb in zip(docs, all_embeddings)
]
result = collection.bulk_write(bulk_ops)
print(f"Updated {result.modified_count} documents with embeddings.")
Um bulk_write com operações UpdateOne é ordens de magnitude mais rápido do que chamadas individuais update_one. Para uma coleção com 500.000 documentos, a diferença é de minutos versus horas.
Passo 2: Criando o índice de vector search
Com os embeddings armazenados, você precisa de um Atlas Vector Search index para consultá-los. Crie-o a partir da UI do Atlas ou via Atlas CLI:
{
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 1024,
"similarity": "cosine"
}
]
}
Alguns pontos importantes a acertar:
numDimensionsdeve corresponder exatamente. Se você gerou embeddings de 1024 dimensões, o índice deve especificar 1024. Uma incompatibilidade fará com que as queries falhem silenciosamente ou retornem resultados incorretos.cosineé a escolha padrão para embeddings da Voyage AI. Outras opções sãoeuclideanedotProduct, mas cosine é recomendado para a maioria dos casos de uso de text embedding.- O índice deve atingir o status
Activeantes que as queries$vectorSearchfuncionem. Isso pode levar alguns minutos para coleções grandes.
Executando uma query de busca
query = "How does MongoDB handle distributed transactions?"
query_embedding = vo.embed(
[query],
model=MODEL,
input_type="query",
output_dimension=DIMENSIONS
).embeddings[0]
results = collection.aggregate([
{
"$vectorSearch": {
"index": "vector_index",
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": 150,
"limit": 5
}
},
{
"$project": {
"content": 1,
"score": {"$meta": "vectorSearchScore"},
"_id": 0
}
}
])
for doc in results:
print(f"Score: {doc['score']:.4f} | {doc['content'][:100]}...")
O parâmetro numCandidates controla o trade-off entre precisão e desempenho. Valores mais altos (150-200) fornecem resultados mais precisos ao custo de uma latência ligeiramente maior. Para a maioria das aplicações, 100-150 é o ponto ideal.
Passo 3: Mantendo os embeddings sincronizados
É aqui que a maioria dos tutoriais termina e a maioria dos problemas de produção começa. Seus dados mudam — documentos são inseridos, atualizados, excluídos. Se os embeddings não forem regenerados quando o texto fonte muda, sua busca silenciosamente sai de sincronia.
Os MongoDB Atlas Triggers resolvem isso com duas estratégias complementares:
| Strategy | Trigger Type | Best For |
|---|---|---|
| Real-time | Database Trigger (Change Stream) | Inserções e atualizações individuais à medida que acontecem |
| Scheduled batch | Scheduled Trigger (Cron) | Reprocessamento em massa durante horas de menor movimento |
| Hybrid | Both | Resposta imediata + atualização periódica |
Opção A: Real-time Database Trigger
Um Database Trigger é acionado em cada inserção ou atualização na coleção. Ele detecta a mudança via Change Streams e regenera o embedding imediatamente.
Configuração no Atlas:
| Parameter | Value |
|---|---|
| Trigger Type | Database |
| Operation Types | Insert, Update |
| Full Document | Habilitado |
O detalhe crítico é evitar loops infinitos. Quando o trigger atualiza o campo embedding, essa atualização por si só acionaria a função novamente. A solução é um filtro de correspondência que só é acionado quando o campo de texto fonte muda:
{
"updateDescription.updatedFields.content": { "$exists": true }
}
Este filtro se aplica apenas a operações de Update — operações de Insert sempre são acionadas. Agora, a função do trigger:
exports = async function(changeEvent) {
const MODEL = "voyage-4-large";
const DIMENSIONS = 1024;
const DB_NAME = "myapp";
const COLL_NAME = "articles";
const MODEL_API_KEY = context.values.get("voyageModelApiKey");
const API_URL = "https://ai.mongodb.com/v1/embeddings";
const doc = changeEvent.fullDocument;
if (!doc || !doc.content) {
console.log("Document missing 'content' field. Skipping.");
return;
}
if (changeEvent.operationType === "update") {
const updatedFields = changeEvent.updateDescription.updatedFields;
if (!updatedFields || !updatedFields.hasOwnProperty("content")) {
console.log("Source field not modified. Skipping.");
return;
}
}
try {
const response = await context.http.post({
url: API_URL,
headers: {
"Authorization": [`Bearer ${MODEL_API_KEY}`],
"Content-Type": ["application/json"]
},
body: JSON.stringify({
input: [doc.content],
model: MODEL,
input_type: "document",
output_dimension: DIMENSIONS
})
});
if (response.statusCode !== 200) {
console.error(`API returned HTTP ${response.statusCode}`);
return;
}
const parsed = JSON.parse(response.body.text());
if (!parsed.data || parsed.data.length === 0) {
console.error("API returned no embeddings.");
return;
}
const collection = context.services
.get("mongodb-atlas")
.db(DB_NAME)
.collection(COLL_NAME);
await collection.updateOne(
{ _id: doc._id },
{ $set: { embedding: parsed.data[0].embedding } }
);
console.log(`Embedding updated for document: ${doc._id}`);
} catch (error) {
console.error(`Failed to generate embedding for ${doc._id}: ${error}`);
}
};
Opção B: Scheduled batch trigger
Para cargas de trabalho com alto volume de escrita, um trigger em tempo real por documento pode não ser prático. Um Scheduled Trigger é executado em um cron schedule — a cada hora, diariamente à noite, a cada 30 minutos — e reprocessa todos os documentos que precisam de novos embeddings.
A abordagem: sinalizar documentos que precisam de reprocessamento e, em seguida, incorporá-los em batch:
exports = async function() {
const MODEL = "voyage-4-large";
const DIMENSIONS = 1024;
const DB_NAME = "myapp";
const COLL_NAME = "articles";
const BATCH_SIZE = 128;
const DOC_LIMIT = 1000;
const MODEL_API_KEY = context.values.get("voyageModelApiKey");
const API_URL = "https://ai.mongodb.com/v1/embeddings";
const collection = context.services
.get("mongodb-atlas")
.db(DB_NAME)
.collection(COLL_NAME);
const docs = await collection.find({
$or: [
{ embedding: { $exists: false } },
{ embeddingStale: true }
]
}).limit(DOC_LIMIT).toArray();
if (docs.length === 0) {
console.log("No documents need embedding updates.");
return;
}
console.log(`Found ${docs.length} documents to process.`);
let totalUpdated = 0;
for (let i = 0; i < docs.length; i += BATCH_SIZE) {
const batch = docs.slice(i, i + BATCH_SIZE);
const texts = batch.map(d => d.content).filter(Boolean);
if (texts.length === 0) continue;
try {
const response = await context.http.post({
url: API_URL,
headers: {
"Authorization": [`Bearer ${MODEL_API_KEY}`],
"Content-Type": ["application/json"]
},
body: JSON.stringify({
input: texts,
model: MODEL,
input_type: "document",
output_dimension: DIMENSIONS
})
});
if (response.statusCode === 429) {
console.warn(`Rate limit hit at batch ${Math.floor(i / BATCH_SIZE) + 1}. Stopping.`);
break;
}
if (response.statusCode !== 200) {
console.error(`HTTP ${response.statusCode} at batch ${Math.floor(i / BATCH_SIZE) + 1}`);
continue;
}
const parsed = JSON.parse(response.body.text());
if (!parsed.data || parsed.data.length === 0) continue;
const ops = [];
for (let j = 0; j < batch.length; j++) {
if (parsed.data[j]) {
ops.push({
updateOne: {
filter: { _id: batch[j]._id },
update: {
$set: {
embedding: parsed.data[j].embedding,
embeddingUpdatedAt: new Date()
},
$unset: { embeddingStale: "" }
}
}
});
}
}
if (ops.length > 0) {
const result = await collection.bulkWrite(ops);
totalUpdated += result.modifiedCount;
}
} catch (error) {
console.error(`Error in batch ${Math.floor(i / BATCH_SIZE) + 1}: ${error}`);
}
}
console.log(`Done. Updated ${totalUpdated} documents.`);
};
Cron schedules comuns:
| Expression | Frequency |
|---|---|
0 */1 * * * | A cada hora |
0 2 * * * | Diariamente às 2:00 AM |
*/30 * * * * | A cada 30 minutos |
0 0 * * * | Diariamente à meia-noite |
O limite DOC_LIMIT garante que cada execução processe um número limitado de documentos. Se houver mais de 1.000 documentos desatualizados, a próxima execução agendada processará o restante. Isso evita que uma única execução exceda o tempo limite ou os rate limits.
Rate limits e preocupações operacionais
A Voyage AI impõe rate limits que você precisa considerar no design:
| Model | Tokens Per Minute | Requests Per Minute |
|---|---|---|
voyage-4-large | 3,000,000 | 2,000 |
voyage-4 | 8,000,000 | 2,000 |
voyage-4-lite | 16,000,000 | 2,000 |
Estes são os limites do Tier 1 — eles dobram no Tier 2 e triplicam no Tier 3 à medida que seu uso cresce. Se seu trigger atingir um 429 (Rate Limit Exceeded), a abordagem de batch agendado lida com isso de forma elegante, parando e tentando novamente na próxima execução agendada. O trigger em tempo real deve registrar o erro e depender do trigger agendado como um mecanismo de recuperação.
Algumas boas práticas operacionais:
- Sempre especifique
input_type. Omiti-lo funciona, mas produz embeddings de menor qualidade. Use"document"para dados armazenados,"query"para queries de busca. - Monitore a desatualização dos embeddings. Acompanhe a proporção de documentos com
embeddingStale: trueversus o total de documentos. Se crescer mais rápido do que seu trigger agendado pode processar, aumente a frequência ou o tamanho do batch. - Use a estratégia híbrida. Triggers em tempo real lidam com o caso comum (inserções/atualizações individuais). Triggers agendados lidam com os casos de borda (importações em massa, atualizações perdidas, recuperação de falhas). Juntos, eles fornecem tanto frescor quanto confiabilidade.
A arquitetura completa
Juntando tudo, o pipeline de produção se parece com isto:
- Carga inicial: Script Python batch gera embeddings para todos os documentos existentes e os escreve de volta via
bulk_write. - Criação do índice: Atlas Vector Search index no campo
embeddingcom similaridade de cosseno. - Sincronização em tempo real: Database Trigger regenera embeddings na inserção/atualização, filtrado para evitar loops infinitos.
- Atualização agendada: Scheduled Trigger reprocessa quaisquer documentos sinalizados como desatualizados ou com embeddings ausentes.
- Tempo de query: Agregação
$vectorSearchcominput_type="query"para o termo de busca do usuário.
Nenhuma infraestrutura externa. Nenhum banco de dados vetorial separado. Nenhum job de sincronização entre sistemas. Todo o pipeline é executado no MongoDB Atlas — embeddings, índices, triggers e queries — o que significa uma única plataforma para monitorar, um único conjunto de credenciais para gerenciar e um único relacionamento de faturamento.
Essa é a parte que mais importa em produção. Não se a demo funciona, mas se o sistema permanece correto às 3 da manhã quando um job de importação em massa é executado e 50.000 documentos precisam de novos embeddings.
Este artigo é um complemento a Por Que Sua Busca Não Retorna Nada, que aborda o caso conceitual para vector search em vez de correspondência de palavras-chave.
Artigos Relacionados

Por Que Sua Busca Não Retorna Nada — E Como o MongoDB Vector Search Resolve Isso
A busca por palavras-chave só encontra o que está literalmente lá. Quando usuários buscam por 'laptop bag' e seus documentos dizem 'notebook carrying case', regex não vai ajudar. A busca vetorial entende o significado — e o MongoDB Atlas a suporta nativamente.

A Mentalidade da Engenharia de IA: O Que Muda ao Construir com LLMs
A engenharia de IA não é apenas engenharia de software com um modelo acoplado. Os ciclos de feedback, modos de falha e sinais de qualidade são fundamentalmente diferentes. Veja como pensar sobre isso.

Padrões de Banco de Dados que Você Deve Conhecer Antes de Escolher Seu Próximo Banco de Dados
A escolha entre Postgres e MongoDB não é sobre qual é 'melhor'. É sobre entender os padrões de acesso, requisitos de consistência e restrições operacionais do seu sistema.