Skip to content

Building a Production Embedding Pipeline with MongoDB Atlas and Voyage AI

Generating embeddings is the easy part. Keeping them in sync as your data changes — at scale, without downtime — is where the real engineering lives. Here's how to build the full pipeline on MongoDB Atlas.

Durval Pereira
Durval Pereira
11 min

The pipeline problem

If you've read about vector search, you know the pitch: convert your data into embeddings, index them, and suddenly your search understands meaning instead of matching keywords. The concept is simple. The production implementation is not.

The gap between a working demo and a production pipeline comes down to three questions most tutorials skip:

  1. How do you generate embeddings for an existing collection with millions of documents? You can't send them all at once.
  2. How do you keep embeddings in sync when documents change? Stale embeddings are worse than no embeddings — they return confidently wrong results.
  3. How do you handle rate limits, failures, and retries without losing data?

This article walks through a complete, production-grade embedding pipeline built on MongoDB Atlas with Voyage AI — from initial batch generation to automated real-time sync via Atlas Triggers.

Choosing an embedding model

Voyage AI offers a series of embedding models optimized for different trade-offs. MongoDB Atlas integrates with Voyage AI natively, which means the billing flows through your Atlas account and there's no separate vendor relationship to manage.

The series 4 models are the current generation:

ModelContext LengthDefault DimensionsBest For
voyage-4-large32,000 tokens1024Highest retrieval quality, multilingual
voyage-432,000 tokens1024Balance of quality and throughput
voyage-4-lite32,000 tokens1024Lowest latency and cost

All series 4 models support flexible output dimensions — 256, 512, 1024, or 2048 — and embeddings generated by any series 4 model are compatible with each other. This means you can start with voyage-4-lite during development and switch to voyage-4-large in production without re-indexing.

There are also domain-specific models worth considering:

  • voyage-code-3 for code and technical documentation
  • voyage-finance-2 for financial documents
  • voyage-law-2 for legal text

For most general-purpose use cases, voyage-4-large provides the best retrieval quality, and voyage-4 is the pragmatic default.

Step 1: Batch embedding generation

The first step is generating embeddings for your existing documents. This is a one-time operation (with ongoing sync handled separately), and it needs to be efficient — you don't want to make one API call per document when you have hundreds of thousands of records.

import voyageai
from pymongo import MongoClient, UpdateOne

MONGO_URI = "mongodb+srv://<user>:<password>@<cluster-url>/"
VOYAGE_API_KEY = "<your-voyage-api-key>"
DATABASE = "myapp"
COLLECTION = "articles"

MODEL = "voyage-4-large"
DIMENSIONS = 1024
BATCH_SIZE = 128

client = MongoClient(MONGO_URI)
collection = client[DATABASE][COLLECTION]
vo = voyageai.Client(api_key=VOYAGE_API_KEY)

docs = list(collection.find({}, {"_id": 1, "content": 1}))
texts = [doc["content"] for doc in docs]
print(f"Retrieved {len(docs)} documents for embedding generation.")

The key detail is batching. Voyage AI supports batch requests, and processing 128 documents at a time is dramatically more efficient than one-at-a-time calls:

all_embeddings = []
for i in range(0, len(texts), BATCH_SIZE):
    batch = texts[i : i + BATCH_SIZE]
    result = vo.embed(
        batch,
        model=MODEL,
        input_type="document",
        output_dimension=DIMENSIONS
    )
    all_embeddings.extend(result.embeddings)
    print(f"  Processed {min(i + BATCH_SIZE, len(texts))}/{len(texts)} documents...")

print(f"Generated {len(all_embeddings)} embeddings of dimension {len(all_embeddings[0])}.")

Notice input_type="document". This is critical. Voyage AI optimizes embeddings differently depending on whether the input is a stored document or a search query. Internally, the model prepends context like "Represent the document for retrieval:" to the input. Always use "document" for stored data and "query" for search queries — omitting this parameter degrades retrieval quality.

Writing embeddings back to MongoDB

Use bulk writes to store embeddings efficiently:

bulk_ops = [
    UpdateOne(
        {"_id": doc["_id"]},
        {"$set": {"embedding": emb}}
    )
    for doc, emb in zip(docs, all_embeddings)
]

result = collection.bulk_write(bulk_ops)
print(f"Updated {result.modified_count} documents with embeddings.")

A bulk_write with UpdateOne operations is orders of magnitude faster than individual update_one calls. For a collection with 500,000 documents, the difference is minutes versus hours.

Step 2: Creating the vector search index

With embeddings stored, you need an Atlas Vector Search index to query them. Create it from the Atlas UI or via the Atlas CLI:

{
  "fields": [
    {
      "type": "vector",
      "path": "embedding",
      "numDimensions": 1024,
      "similarity": "cosine"
    }
  ]
}

A few things to get right:

  • numDimensions must match exactly. If you generated 1024-dimensional embeddings, the index must specify 1024. A mismatch will cause queries to fail silently or return garbage results.
  • cosine is the default choice for Voyage AI embeddings. Other options are euclidean and dotProduct, but cosine is recommended for most text embedding use cases.
  • The index must reach Active status before $vectorSearch queries will work. This can take a few minutes for large collections.

Running a search query

query = "How does MongoDB handle distributed transactions?"

query_embedding = vo.embed(
    [query],
    model=MODEL,
    input_type="query",
    output_dimension=DIMENSIONS
).embeddings[0]

results = collection.aggregate([
    {
        "$vectorSearch": {
            "index": "vector_index",
            "path": "embedding",
            "queryVector": query_embedding,
            "numCandidates": 150,
            "limit": 5
        }
    },
    {
        "$project": {
            "content": 1,
            "score": {"$meta": "vectorSearchScore"},
            "_id": 0
        }
    }
])

for doc in results:
    print(f"Score: {doc['score']:.4f} | {doc['content'][:100]}...")

The numCandidates parameter controls the precision-performance trade-off. Higher values (150-200) give more accurate results at the cost of slightly higher latency. For most applications, 100-150 is the sweet spot.

Step 3: Keeping embeddings in sync

This is where most tutorials end and most production issues begin. Your data changes — documents are inserted, updated, deleted. If embeddings aren't regenerated when the source text changes, your search silently drifts out of sync.

MongoDB Atlas Triggers solve this with two complementary strategies:

StrategyTrigger TypeBest For
Real-timeDatabase Trigger (Change Stream)Individual inserts and updates as they happen
Scheduled batchScheduled Trigger (Cron)Bulk reprocessing during off-peak hours
HybridBothImmediate response + periodic catch-up

Option A: Real-time Database Trigger

A Database Trigger fires on every insert or update to the collection. It detects the change via Change Streams and regenerates the embedding immediately.

Configuration in Atlas:

ParameterValue
Trigger TypeDatabase
Operation TypesInsert, Update
Full DocumentEnabled

The critical detail is avoiding infinite loops. When the trigger updates the embedding field, that update would itself trigger the function again. The solution is a match filter that only fires when the source text field changes:

{
  "updateDescription.updatedFields.content": { "$exists": true }
}

This filter applies to Update operations only — Insert operations always fire. Now the trigger function:

exports = async function(changeEvent) {
  const MODEL = "voyage-4-large";
  const DIMENSIONS = 1024;
  const DB_NAME = "myapp";
  const COLL_NAME = "articles";
  const MODEL_API_KEY = context.values.get("voyageModelApiKey");
  const API_URL = "https://ai.mongodb.com/v1/embeddings";

  const doc = changeEvent.fullDocument;

  if (!doc || !doc.content) {
    console.log("Document missing 'content' field. Skipping.");
    return;
  }

  if (changeEvent.operationType === "update") {
    const updatedFields = changeEvent.updateDescription.updatedFields;
    if (!updatedFields || !updatedFields.hasOwnProperty("content")) {
      console.log("Source field not modified. Skipping.");
      return;
    }
  }

  try {
    const response = await context.http.post({
      url: API_URL,
      headers: {
        "Authorization": [`Bearer ${MODEL_API_KEY}`],
        "Content-Type": ["application/json"]
      },
      body: JSON.stringify({
        input: [doc.content],
        model: MODEL,
        input_type: "document",
        output_dimension: DIMENSIONS
      })
    });

    if (response.statusCode !== 200) {
      console.error(`API returned HTTP ${response.statusCode}`);
      return;
    }

    const parsed = JSON.parse(response.body.text());
    if (!parsed.data || parsed.data.length === 0) {
      console.error("API returned no embeddings.");
      return;
    }

    const collection = context.services
      .get("mongodb-atlas")
      .db(DB_NAME)
      .collection(COLL_NAME);

    await collection.updateOne(
      { _id: doc._id },
      { $set: { embedding: parsed.data[0].embedding } }
    );

    console.log(`Embedding updated for document: ${doc._id}`);
  } catch (error) {
    console.error(`Failed to generate embedding for ${doc._id}: ${error}`);
  }
};

Option B: Scheduled batch trigger

For workloads with high write volumes, a real-time trigger per document may not be practical. A Scheduled Trigger runs on a cron schedule — hourly, nightly, every 30 minutes — and reprocesses all documents that need new embeddings.

The approach: flag documents that need reprocessing, then batch-embed them:

exports = async function() {
  const MODEL = "voyage-4-large";
  const DIMENSIONS = 1024;
  const DB_NAME = "myapp";
  const COLL_NAME = "articles";
  const BATCH_SIZE = 128;
  const DOC_LIMIT = 1000;
  const MODEL_API_KEY = context.values.get("voyageModelApiKey");
  const API_URL = "https://ai.mongodb.com/v1/embeddings";

  const collection = context.services
    .get("mongodb-atlas")
    .db(DB_NAME)
    .collection(COLL_NAME);

  const docs = await collection.find({
    $or: [
      { embedding: { $exists: false } },
      { embeddingStale: true }
    ]
  }).limit(DOC_LIMIT).toArray();

  if (docs.length === 0) {
    console.log("No documents need embedding updates.");
    return;
  }

  console.log(`Found ${docs.length} documents to process.`);
  let totalUpdated = 0;

  for (let i = 0; i < docs.length; i += BATCH_SIZE) {
    const batch = docs.slice(i, i + BATCH_SIZE);
    const texts = batch.map(d => d.content).filter(Boolean);
    if (texts.length === 0) continue;

    try {
      const response = await context.http.post({
        url: API_URL,
        headers: {
          "Authorization": [`Bearer ${MODEL_API_KEY}`],
          "Content-Type": ["application/json"]
        },
        body: JSON.stringify({
          input: texts,
          model: MODEL,
          input_type: "document",
          output_dimension: DIMENSIONS
        })
      });

      if (response.statusCode === 429) {
        console.warn(`Rate limit hit at batch ${Math.floor(i / BATCH_SIZE) + 1}. Stopping.`);
        break;
      }

      if (response.statusCode !== 200) {
        console.error(`HTTP ${response.statusCode} at batch ${Math.floor(i / BATCH_SIZE) + 1}`);
        continue;
      }

      const parsed = JSON.parse(response.body.text());
      if (!parsed.data || parsed.data.length === 0) continue;

      const ops = [];
      for (let j = 0; j < batch.length; j++) {
        if (parsed.data[j]) {
          ops.push({
            updateOne: {
              filter: { _id: batch[j]._id },
              update: {
                $set: {
                  embedding: parsed.data[j].embedding,
                  embeddingUpdatedAt: new Date()
                },
                $unset: { embeddingStale: "" }
              }
            }
          });
        }
      }

      if (ops.length > 0) {
        const result = await collection.bulkWrite(ops);
        totalUpdated += result.modifiedCount;
      }
    } catch (error) {
      console.error(`Error in batch ${Math.floor(i / BATCH_SIZE) + 1}: ${error}`);
    }
  }

  console.log(`Done. Updated ${totalUpdated} documents.`);
};

Common cron schedules:

ExpressionFrequency
0 */1 * * *Every hour
0 2 * * *Daily at 2:00 AM
*/30 * * * *Every 30 minutes
0 0 * * *Daily at midnight

The DOC_LIMIT cap ensures each execution processes a bounded number of documents. If there are more than 1,000 stale documents, the next scheduled run picks up the rest. This prevents any single execution from timing out or exceeding rate limits.

Rate limits and operational concerns

Voyage AI enforces rate limits that you need to design around:

ModelTokens Per MinuteRequests Per Minute
voyage-4-large3,000,0002,000
voyage-48,000,0002,000
voyage-4-lite16,000,0002,000

These are Tier 1 limits — they double at Tier 2 and triple at Tier 3 as your usage grows. If your trigger hits a 429 (Rate Limit Exceeded), the scheduled batch approach handles it gracefully by stopping and retrying on the next scheduled run. The real-time trigger should log the error and rely on the scheduled trigger as a catch-up mechanism.

A few operational best practices:

  • Always specify input_type. Omitting it works but produces lower-quality embeddings. Use "document" for stored data, "query" for search queries.
  • Monitor embedding staleness. Track the ratio of documents with embeddingStale: true versus total documents. If it grows faster than your scheduled trigger can process, increase frequency or batch size.
  • Use the hybrid strategy. Real-time triggers handle the common case (individual inserts/updates). Scheduled triggers handle the edge cases (bulk imports, missed updates, recovery from failures). Together, they provide both freshness and reliability.

The complete architecture

Putting it all together, the production pipeline looks like this:

  1. Initial load: Python batch script generates embeddings for all existing documents and writes them back via bulk_write.
  2. Index creation: Atlas Vector Search index on the embedding field with cosine similarity.
  3. Real-time sync: Database Trigger regenerates embeddings on insert/update, filtered to avoid infinite loops.
  4. Scheduled catch-up: Scheduled Trigger reprocesses any documents flagged as stale or missing embeddings.
  5. Query time: $vectorSearch aggregation with input_type="query" for the user's search term.

No external infrastructure. No separate vector database. No sync jobs between systems. The entire pipeline runs on MongoDB Atlas — embeddings, indexes, triggers, and queries — which means one platform to monitor, one set of credentials to manage, and one billing relationship.

That's the part that matters most in production. Not whether the demo works, but whether the system stays correct at 3 AM when a bulk import job runs and 50,000 documents need new embeddings.


This article is a companion to Why Your Search Returns Nothing, which covers the conceptual case for vector search over keyword matching.

Tagsmongodbvector-searchembeddingsvoyage-aiatlas-triggers