Building a Production Embedding Pipeline with MongoDB Atlas and Voyage AI
Generating embeddings is the easy part. Keeping them in sync as your data changes — at scale, without downtime — is where the real engineering lives. Here's how to build the full pipeline on MongoDB Atlas.
The pipeline problem
If you've read about vector search, you know the pitch: convert your data into embeddings, index them, and suddenly your search understands meaning instead of matching keywords. The concept is simple. The production implementation is not.
The gap between a working demo and a production pipeline comes down to three questions most tutorials skip:
- How do you generate embeddings for an existing collection with millions of documents? You can't send them all at once.
- How do you keep embeddings in sync when documents change? Stale embeddings are worse than no embeddings — they return confidently wrong results.
- How do you handle rate limits, failures, and retries without losing data?
This article walks through a complete, production-grade embedding pipeline built on MongoDB Atlas with Voyage AI — from initial batch generation to automated real-time sync via Atlas Triggers.
Choosing an embedding model
Voyage AI offers a series of embedding models optimized for different trade-offs. MongoDB Atlas integrates with Voyage AI natively, which means the billing flows through your Atlas account and there's no separate vendor relationship to manage.
The series 4 models are the current generation:
| Model | Context Length | Default Dimensions | Best For |
|---|---|---|---|
voyage-4-large | 32,000 tokens | 1024 | Highest retrieval quality, multilingual |
voyage-4 | 32,000 tokens | 1024 | Balance of quality and throughput |
voyage-4-lite | 32,000 tokens | 1024 | Lowest latency and cost |
All series 4 models support flexible output dimensions — 256, 512, 1024, or 2048 — and embeddings generated by any series 4 model are compatible with each other. This means you can start with voyage-4-lite during development and switch to voyage-4-large in production without re-indexing.
There are also domain-specific models worth considering:
voyage-code-3for code and technical documentationvoyage-finance-2for financial documentsvoyage-law-2for legal text
For most general-purpose use cases, voyage-4-large provides the best retrieval quality, and voyage-4 is the pragmatic default.
Step 1: Batch embedding generation
The first step is generating embeddings for your existing documents. This is a one-time operation (with ongoing sync handled separately), and it needs to be efficient — you don't want to make one API call per document when you have hundreds of thousands of records.
import voyageai
from pymongo import MongoClient, UpdateOne
MONGO_URI = "mongodb+srv://<user>:<password>@<cluster-url>/"
VOYAGE_API_KEY = "<your-voyage-api-key>"
DATABASE = "myapp"
COLLECTION = "articles"
MODEL = "voyage-4-large"
DIMENSIONS = 1024
BATCH_SIZE = 128
client = MongoClient(MONGO_URI)
collection = client[DATABASE][COLLECTION]
vo = voyageai.Client(api_key=VOYAGE_API_KEY)
docs = list(collection.find({}, {"_id": 1, "content": 1}))
texts = [doc["content"] for doc in docs]
print(f"Retrieved {len(docs)} documents for embedding generation.")
The key detail is batching. Voyage AI supports batch requests, and processing 128 documents at a time is dramatically more efficient than one-at-a-time calls:
all_embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
batch = texts[i : i + BATCH_SIZE]
result = vo.embed(
batch,
model=MODEL,
input_type="document",
output_dimension=DIMENSIONS
)
all_embeddings.extend(result.embeddings)
print(f" Processed {min(i + BATCH_SIZE, len(texts))}/{len(texts)} documents...")
print(f"Generated {len(all_embeddings)} embeddings of dimension {len(all_embeddings[0])}.")
Notice input_type="document". This is critical. Voyage AI optimizes embeddings differently depending on whether the input is a stored document or a search query. Internally, the model prepends context like "Represent the document for retrieval:" to the input. Always use "document" for stored data and "query" for search queries — omitting this parameter degrades retrieval quality.
Writing embeddings back to MongoDB
Use bulk writes to store embeddings efficiently:
bulk_ops = [
UpdateOne(
{"_id": doc["_id"]},
{"$set": {"embedding": emb}}
)
for doc, emb in zip(docs, all_embeddings)
]
result = collection.bulk_write(bulk_ops)
print(f"Updated {result.modified_count} documents with embeddings.")
A bulk_write with UpdateOne operations is orders of magnitude faster than individual update_one calls. For a collection with 500,000 documents, the difference is minutes versus hours.
Step 2: Creating the vector search index
With embeddings stored, you need an Atlas Vector Search index to query them. Create it from the Atlas UI or via the Atlas CLI:
{
"fields": [
{
"type": "vector",
"path": "embedding",
"numDimensions": 1024,
"similarity": "cosine"
}
]
}
A few things to get right:
numDimensionsmust match exactly. If you generated 1024-dimensional embeddings, the index must specify 1024. A mismatch will cause queries to fail silently or return garbage results.cosineis the default choice for Voyage AI embeddings. Other options areeuclideananddotProduct, but cosine is recommended for most text embedding use cases.- The index must reach
Activestatus before$vectorSearchqueries will work. This can take a few minutes for large collections.
Running a search query
query = "How does MongoDB handle distributed transactions?"
query_embedding = vo.embed(
[query],
model=MODEL,
input_type="query",
output_dimension=DIMENSIONS
).embeddings[0]
results = collection.aggregate([
{
"$vectorSearch": {
"index": "vector_index",
"path": "embedding",
"queryVector": query_embedding,
"numCandidates": 150,
"limit": 5
}
},
{
"$project": {
"content": 1,
"score": {"$meta": "vectorSearchScore"},
"_id": 0
}
}
])
for doc in results:
print(f"Score: {doc['score']:.4f} | {doc['content'][:100]}...")
The numCandidates parameter controls the precision-performance trade-off. Higher values (150-200) give more accurate results at the cost of slightly higher latency. For most applications, 100-150 is the sweet spot.
Step 3: Keeping embeddings in sync
This is where most tutorials end and most production issues begin. Your data changes — documents are inserted, updated, deleted. If embeddings aren't regenerated when the source text changes, your search silently drifts out of sync.
MongoDB Atlas Triggers solve this with two complementary strategies:
| Strategy | Trigger Type | Best For |
|---|---|---|
| Real-time | Database Trigger (Change Stream) | Individual inserts and updates as they happen |
| Scheduled batch | Scheduled Trigger (Cron) | Bulk reprocessing during off-peak hours |
| Hybrid | Both | Immediate response + periodic catch-up |
Option A: Real-time Database Trigger
A Database Trigger fires on every insert or update to the collection. It detects the change via Change Streams and regenerates the embedding immediately.
Configuration in Atlas:
| Parameter | Value |
|---|---|
| Trigger Type | Database |
| Operation Types | Insert, Update |
| Full Document | Enabled |
The critical detail is avoiding infinite loops. When the trigger updates the embedding field, that update would itself trigger the function again. The solution is a match filter that only fires when the source text field changes:
{
"updateDescription.updatedFields.content": { "$exists": true }
}
This filter applies to Update operations only — Insert operations always fire. Now the trigger function:
exports = async function(changeEvent) {
const MODEL = "voyage-4-large";
const DIMENSIONS = 1024;
const DB_NAME = "myapp";
const COLL_NAME = "articles";
const MODEL_API_KEY = context.values.get("voyageModelApiKey");
const API_URL = "https://ai.mongodb.com/v1/embeddings";
const doc = changeEvent.fullDocument;
if (!doc || !doc.content) {
console.log("Document missing 'content' field. Skipping.");
return;
}
if (changeEvent.operationType === "update") {
const updatedFields = changeEvent.updateDescription.updatedFields;
if (!updatedFields || !updatedFields.hasOwnProperty("content")) {
console.log("Source field not modified. Skipping.");
return;
}
}
try {
const response = await context.http.post({
url: API_URL,
headers: {
"Authorization": [`Bearer ${MODEL_API_KEY}`],
"Content-Type": ["application/json"]
},
body: JSON.stringify({
input: [doc.content],
model: MODEL,
input_type: "document",
output_dimension: DIMENSIONS
})
});
if (response.statusCode !== 200) {
console.error(`API returned HTTP ${response.statusCode}`);
return;
}
const parsed = JSON.parse(response.body.text());
if (!parsed.data || parsed.data.length === 0) {
console.error("API returned no embeddings.");
return;
}
const collection = context.services
.get("mongodb-atlas")
.db(DB_NAME)
.collection(COLL_NAME);
await collection.updateOne(
{ _id: doc._id },
{ $set: { embedding: parsed.data[0].embedding } }
);
console.log(`Embedding updated for document: ${doc._id}`);
} catch (error) {
console.error(`Failed to generate embedding for ${doc._id}: ${error}`);
}
};
Option B: Scheduled batch trigger
For workloads with high write volumes, a real-time trigger per document may not be practical. A Scheduled Trigger runs on a cron schedule — hourly, nightly, every 30 minutes — and reprocesses all documents that need new embeddings.
The approach: flag documents that need reprocessing, then batch-embed them:
exports = async function() {
const MODEL = "voyage-4-large";
const DIMENSIONS = 1024;
const DB_NAME = "myapp";
const COLL_NAME = "articles";
const BATCH_SIZE = 128;
const DOC_LIMIT = 1000;
const MODEL_API_KEY = context.values.get("voyageModelApiKey");
const API_URL = "https://ai.mongodb.com/v1/embeddings";
const collection = context.services
.get("mongodb-atlas")
.db(DB_NAME)
.collection(COLL_NAME);
const docs = await collection.find({
$or: [
{ embedding: { $exists: false } },
{ embeddingStale: true }
]
}).limit(DOC_LIMIT).toArray();
if (docs.length === 0) {
console.log("No documents need embedding updates.");
return;
}
console.log(`Found ${docs.length} documents to process.`);
let totalUpdated = 0;
for (let i = 0; i < docs.length; i += BATCH_SIZE) {
const batch = docs.slice(i, i + BATCH_SIZE);
const texts = batch.map(d => d.content).filter(Boolean);
if (texts.length === 0) continue;
try {
const response = await context.http.post({
url: API_URL,
headers: {
"Authorization": [`Bearer ${MODEL_API_KEY}`],
"Content-Type": ["application/json"]
},
body: JSON.stringify({
input: texts,
model: MODEL,
input_type: "document",
output_dimension: DIMENSIONS
})
});
if (response.statusCode === 429) {
console.warn(`Rate limit hit at batch ${Math.floor(i / BATCH_SIZE) + 1}. Stopping.`);
break;
}
if (response.statusCode !== 200) {
console.error(`HTTP ${response.statusCode} at batch ${Math.floor(i / BATCH_SIZE) + 1}`);
continue;
}
const parsed = JSON.parse(response.body.text());
if (!parsed.data || parsed.data.length === 0) continue;
const ops = [];
for (let j = 0; j < batch.length; j++) {
if (parsed.data[j]) {
ops.push({
updateOne: {
filter: { _id: batch[j]._id },
update: {
$set: {
embedding: parsed.data[j].embedding,
embeddingUpdatedAt: new Date()
},
$unset: { embeddingStale: "" }
}
}
});
}
}
if (ops.length > 0) {
const result = await collection.bulkWrite(ops);
totalUpdated += result.modifiedCount;
}
} catch (error) {
console.error(`Error in batch ${Math.floor(i / BATCH_SIZE) + 1}: ${error}`);
}
}
console.log(`Done. Updated ${totalUpdated} documents.`);
};
Common cron schedules:
| Expression | Frequency |
|---|---|
0 */1 * * * | Every hour |
0 2 * * * | Daily at 2:00 AM |
*/30 * * * * | Every 30 minutes |
0 0 * * * | Daily at midnight |
The DOC_LIMIT cap ensures each execution processes a bounded number of documents. If there are more than 1,000 stale documents, the next scheduled run picks up the rest. This prevents any single execution from timing out or exceeding rate limits.
Rate limits and operational concerns
Voyage AI enforces rate limits that you need to design around:
| Model | Tokens Per Minute | Requests Per Minute |
|---|---|---|
voyage-4-large | 3,000,000 | 2,000 |
voyage-4 | 8,000,000 | 2,000 |
voyage-4-lite | 16,000,000 | 2,000 |
These are Tier 1 limits — they double at Tier 2 and triple at Tier 3 as your usage grows. If your trigger hits a 429 (Rate Limit Exceeded), the scheduled batch approach handles it gracefully by stopping and retrying on the next scheduled run. The real-time trigger should log the error and rely on the scheduled trigger as a catch-up mechanism.
A few operational best practices:
- Always specify
input_type. Omitting it works but produces lower-quality embeddings. Use"document"for stored data,"query"for search queries. - Monitor embedding staleness. Track the ratio of documents with
embeddingStale: trueversus total documents. If it grows faster than your scheduled trigger can process, increase frequency or batch size. - Use the hybrid strategy. Real-time triggers handle the common case (individual inserts/updates). Scheduled triggers handle the edge cases (bulk imports, missed updates, recovery from failures). Together, they provide both freshness and reliability.
The complete architecture
Putting it all together, the production pipeline looks like this:
- Initial load: Python batch script generates embeddings for all existing documents and writes them back via
bulk_write. - Index creation: Atlas Vector Search index on the
embeddingfield with cosine similarity. - Real-time sync: Database Trigger regenerates embeddings on insert/update, filtered to avoid infinite loops.
- Scheduled catch-up: Scheduled Trigger reprocesses any documents flagged as stale or missing embeddings.
- Query time:
$vectorSearchaggregation withinput_type="query"for the user's search term.
No external infrastructure. No separate vector database. No sync jobs between systems. The entire pipeline runs on MongoDB Atlas — embeddings, indexes, triggers, and queries — which means one platform to monitor, one set of credentials to manage, and one billing relationship.
That's the part that matters most in production. Not whether the demo works, but whether the system stays correct at 3 AM when a bulk import job runs and 50,000 documents need new embeddings.
This article is a companion to Why Your Search Returns Nothing, which covers the conceptual case for vector search over keyword matching.
Related Posts

Why Your Search Returns Nothing — And How MongoDB Vector Search Fixes It
Keyword search can only find what's literally there. When users search for 'laptop bag' and your documents say 'notebook carrying case,' regex won't help. Vector search understands meaning — and MongoDB Atlas supports it natively.

The AI Engineering Mindset: What Changes When You Build with LLMs
AI engineering isn't just software engineering with a model attached. The feedback loops, failure modes, and quality signals are fundamentally different. Here's how to think about it.

Database Patterns You Should Know Before Choosing Your Next Database
The choice between Postgres and MongoDB isn't about which is 'better.' It's about understanding the access patterns, consistency requirements, and operational constraints of your system.