Docs Menu
Docs Home
/ /
Atlas Architecture Center
/ / /

Building Continuously Updating RAG Applications

Use MongoDB Atlas native stream processing and vector search to continuously update, store, and search embeddings through a unified interface.

Use cases: Gen AI

Industries: Finance, Healthcare, Retail

Products: Atlas, Atlas Vector Search, Atlas Stream Processing

Partners: Confluent, AWS

Providing AI models with up-to-date data is essential to delivering a differentiated experience. Retrieval-augmented generation (RAG) systems enable organizations to ground large language models (LLMs) and other foundational models in the truth of their proprietary data. However, maintaining the freshness of the underlying data introduces a layer of complexity. To ensure models provide accurate answers, it is essential to continuously update the vector embeddings that form the core of RAG systems to represent the latest information available.

Furthermore, the choice of embedding model impacts the quality of AI outputs due to how different models are optimized for varying purposes and data types. For example, an embedding model trained on a particular language will create more contextually appropriate embeddings for that language than a general-purpose model trained across many languages.

By leveraging MongoDB Atlas' native Stream Processing and Vector Search capabilities, this solution addresses this issue of continuously updating and routing vector embeddings in a RAG system. With this solution, developers can continuously update, store, and search embeddings within a single interface.

This solution is relevant to many industries and use cases, including:

  • Financial services: Financial documents, legal policies, and contracts often use multiple languages and differ based on country regulations. Empowering loan officers with an AI-powered interface using relevant and fresh data for expediting loan creation can optimize banking workflows.

  • Healthcare and Insurance: From constantly updating patient records to AI-powered underwriting of insurance policies, it’s important that any RAG system that optimizes these processes has access to the latest information.

  • Retail: Personalizing retail experiences for customers is critical. However, consider the many languages that shoppers might use and product descriptions that have to match. Routing up-to-date, contextual data to the most accurate embedding model can improve these experiences.

  • MongoDB Atlas Cluster: Enables the flexible storage of various data types including text, associated metadata, and corresponding vector embeddings in documents. Atlas's vector index capability directly supports efficient semantic search queries within the database, which can be leveraged through the MongoDB Aggregation Framework.

  • Atlas Stream Processing: Subscribes to the event streams generated by MongoDB, filters relevant information, transforms events, and emits them to the corresponding Kafka topic. It also subscribes to the Kafka cluster to process updates and propagate changes back to the database.

  • Confluent Kafka Cluster: Receives document updates and new documents from producers and makes them available for further processing by Atlas Stream Processing.

  • Metadata Service:

    • Embedding Generator: Python script that subscribes to the Kafka input topics. For each message received, it generates an embedding using a specialized machine learning model.

    • Tags Extractor: Python script that analyzes incoming data to identify relevant structured metadata to enrich the document for indexing, search, or analysis.

Scalable vector updates reference architecture with MongoDB
click to enlarge

Figure 1. Scalable vector updates reference architecture with MongoDB

In the demo solution, the data model is a collection of documents that encapsulate all relevant information about a song. This approach leverages the flexibility of the document data model to store diverse data types alongside their embeddings, allowing for easy and fast retrieval.

The sample data has two datasets available for import: archive_lyrics_small1 and archive_lyrics_small2. The documents in these datasets have the following structure:

{
"title": "Hurricane",
"artist": "Bob Dylan",
"year": 1976,
"lyrics": "...",
"language": "en",
"genre": "rock",
"duration": 61,
"lyrics_embeddings_en": [...],
"tags": ["man", "story", "night"] // only in archive_lyrics_small1
}

The relevant data fields are:

  • lyrics_embeddings_en/lyrics_embeddings_es: Language-specific lyrics embedding vector

  • tags: In the archive_lyrics_small1 dataset, list of most common words in the lyrics

The GitHub repository contains detailed instructions on how to build the solution to update your embeddings asynchronously and at scale, leveraging MongoDB Atlas.

1

Clone the repository, set up a virtual environment, and install necessary dependencies.

2

Follow the instructions in the Confluent documentation https://docs.confluent.io/cloud/current/clusters/create-cluster.html#create-ak-clusters to create a Kafka Cluster.

Copy your bootstrap URL.

Kafka cluster settings
click to enlarge

Figure 2. Kafka cluster settings

Create an API key to connect to your cluster.

API key settings
click to enlarge

Figure 3. API key settings

Configure the topics SpanishInputTopic, EnglishInputTopic, and OutputTopic in Confluent.

Topic settings
click to enlarge

Figure 4. Topic settings

3

Configure a new connection in Atlas Stream Processing to connect the instance with the Kafka Cluster.

Use the Confluent bootstrap URL in the connection Registry.

Stream Processing connection registry settings
click to enlarge

Figure 5. Stream Processing connection registry settings

Connect the Atlas Stream Processing Instance to the Atlas cluster.

Stream Processing to Atlas settings
click to enlarge

Figure 6. Stream Processing to Atlas settings

4

To configure the pipelines and connections in the Stream Processing Instance, you can connect to the cluster using the Mongo Shell (mongosh).

When clicking on the Connect button in the Stream Processing Instance, the Atlas UI provides instructions on connecting to the instance.

Connect to Stream Processing
click to enlarge

Figure 7. Connect to Stream Processing

5

You can follow the steps to configure Atlas Stream Processing in the README file in the GitHub repo. There you will learn how to create the pipelines to subscribe to changes in MongoDB, emit to each language-specific topic, and merge the events containing the processed data with the embeddings received from the Kafka cluster into MongoDB using a MongoDB aggregation stage.

6

Next, you will create language-specific vector indexes in Atlas Search.

Visit the Atlas Vector Search Quick Start guide and start building smarter searches.

The definition for the Atlas Vector Search Index for Spanish is as follows:

{
"fields": [
{
"type": "vector",
"path": "lyrics_embeddings_es",
"numDimensions": 768,
"similarity": "cosine"
}
]
}

The definition for the Atlas Vector Search Index for English is as follows:

{
"fields": [
{
"type": "vector",
"path": "lyrics_embeddings_en",
"numDimensions": 384,
"similarity": "cosine"
}
]
}
7

The metadata service is a Python script that will subscribe to the input topics, create the tags and embeddings for the corresponding language according to the information received in the event, and write the event to the output topic.

8

We created a script in Python to help you interactively run semantic queries. You can find the script in the repository under the client folder.

  • Maintain embedding relevancy: Regularly update data embeddings to ensure your semantic searches remain accurate, especially if your documents change frequently.

  • Optimize language-model pairing: To maximize semantic search accuracy, ensure your large language model (LLM) closely aligns with the language of your data to significantly enhance the relevance and precision of your search results.

  • Embrace flexible embeddings: MongoDB's flexible data model eliminates the need for rigid schema definitions. This flexibility allows you to store embeddings directly alongside your data, regardless of their length or the model used to generate them.

  • Choose the right similarity function: The effectiveness of your semantic searches depends on the chosen similarity function. Tailor your selection to your specific use case.

  • Asynchronous embedding generation: Generating embeddings can be computationally expensive. Consider running this task asynchronously to avoid impacting your application's performance. Leverage the cloud's elasticity by horizontally scaling the functions responsible for embedding generation to handle bursts in workload.

To learn more about the products and technologies in this example, see the associated links below.

  • Confluent Cloud

  • AWS EC2

David Sanchez, MongoDB

Back

Optimizing Generative AI Applications with Fireworks AI and MongoDB for Peak Performance

On this page