Explore Developer Center's New Chatbot! MongoDB AI Chatbot can be accessed at the top of your navigation to answer all your MongoDB questions.

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
MongoDB Developer
Atlas
plus
Sign in to follow topics
MongoDB Developer Centerchevron-right
Developer Topicschevron-right
Productschevron-right
Atlaschevron-right

How to Implement Databricks Workflows and Atlas Vector Search for Enhanced Ecommerce Search Accuracy

Vittal Pai, Ashwin Gangadhar, Francesco Baldissera6 min read • Published Sep 18, 2024 • Updated Sep 18, 2024
Node.jsAtlasVector SearchPython
Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
In the vast realm of Ecommerce, customers' ability to quickly and accurately search through an extensive range of products is paramount. Atlas Vector Search is emerging as a turning point in this space, offering a refined approach to search that goes beyond mere keyword matching. Let's delve into its implementation using MongoDB Atlas, Atlas Vector Search, and Databricks.

Prerequisites

In a previous tutorial, Learn to Build AI-Enhanced Retail Search Solutions with MongoDB and Databricks, we showcased how the integration of MongoDB and Databricks provides a comprehensive solution for the retail industry by combining real-time data processing, workflow orchestration, machine learning, custom data functions, and advanced search capabilities as a way to optimize product catalog management and enhance customer interactions.
In this tutorial, we are going to be building the Vector Search solution on top of the codebase from the previous tutorial. Please check out the Github repository for the full solution.
The diagram below represents the Databricks workflow for indexing data from the atp (available to promise), images, prd_desc (product discount), prd_score (product score), and price collections. These collections are also part of the previously mentioned tutorial, so please refer back if you need to access them.
Within the MongoDB Atlas platform, we can use change streams and the MongoDB Connector for Spark to move data from the collections into a new collection called Catalog. From there, we will use a text transformer to create the Catalog Final Collection. This will enable us to create a corpus of indexed and vector embedded data that will be used later as the search dictionary. We’ll call this collection catalog_final_myn. This will be shown further along after we embed the product names.
The catalog final collection will include the available to promise status for each product, its images, the product discount, product relevance score, and price, along with the vectorized or embedded product name that we’ll point our vector search engine at.
With the image below, we explain what the Databricks workflow looks like. It consists of two jobs that are separated in two notebooks respectively. We’ll go over each of the notebooks below.
Overview of a Databricks job, including two pipelines to ingest data from MongoDB collections, transform that data, and vectorize it using a text transformer model

Indexing and merging several collections into one catalog

The first step is to ingest data from the previously mentioned collections using the spark.readStream method from the MongoDB Connector for Spark. The code below is part of the notebook we’ll set as a job using Databricks Workflows. You can learn more about Databricks notebooks by following their tutorial.
1atp = spark.readStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "atp_status_myn").\ option('spark.mongodb.change.stream.publish.full.document.only','true').\ option('spark.mongodb.aggregation.pipeline',[]).\ option("forceDeleteTempCheckpointLocation", "true").load() atp = atp.drop("_id") atp.writeStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "catalog_myn").\ option('spark.mongodb.operationType', "update").\ option('spark.mongodb.upsertDocument', True).\ option('spark.mongodb.idFieldList', "id").\ option("forceDeleteTempCheckpointLocation", "true").\ option("checkpointLocation", "/tmp/retail-atp-myn4/_checkpoint/").\ outputMode("append").\ start()
This part of the notebook reads data changes from the atp_status_myn collection in the search database, drops the _id field, and then writes (or updates) the processed data to the catalog_myn collection in the same database.
Notice how it’s reading from the atp_status_myn collection, which already has the one hot encoding (boolean values if the product is available or not) from the previous tutorial. This way, we make sure that we only embed the data from the products that are available in our stock.
Please refer to the full notebook in our Github repository if you want to learn more about all the data ingestion and transformations conducted during this stage.

Encoding text as vectors and building the final catalog collection

Using a combination of Python libraries and PySpark operations to process data from the Catalog MongoDB collection, we’ll transform it, vectorize it, and write the transformed data back to the Final Catalog collection. On top of this, we’ll build our application search business logic.
We start by using the %pip magic command, which is specific to Jupyter notebooks and IPython environments. The necessary packages are:
  • pymongo: A Python driver for MongoDB.
  • tqdm: A library to display progress bars.
  • sentence-transformers: A library for state-of-the-art sentence, text, and image embeddings.
First, let’s use pip to install these packages in our Databricks notebook:
1%pip install pymongo tqdm sentence-transformers
We continue the notebook with the following code:
1model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
Here we load a pre-trained model from the sentence-transformers library. This model will be used to convert text into embeddings or vectors.
The next step is to bring the data from the MongoDB Atlas catalog and search collections. This as a continuation of the same notebook:
1catalog_status = spark.readStream.format("mongodb").\
2 option('spark.mongodb.connection.uri', MONGO_CONN).\
3 option('spark.mongodb.database', "search").\
4 option('spark.mongodb.collection', "catalog_myn").\
5option('spark.mongodb.change.stream.publish.full.document.only','true').\
6 option('spark.mongodb.aggregation.pipeline',[]).\
7 option("forceDeleteTempCheckpointLocation", "true").load()
With this code, we set up a structured streaming read from the catalog_myn collection in the search database of MongoDB. The resulting data is stored in the catalog_status DataFrame in Spark. The read operation is configured to fetch the full document from MongoDB's change stream and does not apply any aggregation.
The notebook code continues with:
1#Calculating new column called discounted price using the F decorator
2
3catalog_status = catalog_status.withColumn("discountedPrice", F.col("price") * F.col("pred_price"))
4
5#One hot encoding of the atp_status column
6
7catalog_status = catalog_status.withColumn("atp", (F.col("atp").cast("boolean") & F.lit(1).cast("boolean")).cast("integer"))
8
9#Running embeddings of the product titles with the get_vec function
10
11catalog_status.withColumn("vec", get_vec("title"))
12
13#Dropping _id column and creating a new final catalog collection with checkpointing
14
15catalog_status = catalog_status.drop("_id")
16catalog_status.writeStream.format("mongodb").\
17 option('spark.mongodb.connection.uri', MONGO_CONN).\
18 option('spark.mongodb.database', "search").\
19 option('spark.mongodb.collection', "catalog_final_myn").\
20 option('spark.mongodb.operationType', "update").\
21 option('spark.mongodb.idFieldList', "id").\
22 option("forceDeleteTempCheckpointLocation", "true").\
23 option("checkpointLocation", "/tmp/retail-atp-myn5/_checkpoint/").\
24 outputMode("append").\
25 start()
With this last part of the code, we calculate a new column called discountedPrice as the product of the predicted price. Then, we perform one-hot encoding on the atp status column, vectorize the title of the product, and merge everything back into a final catalog collection.
Now that we have our catalog collection with its proper embeddings, it’s time for us to build the Vector Search Index using MongoDB Atlas Search.

Configuring the Atlas Vector Search index

Here we’ll define how data should be stored and indexed for efficient searching. To configure the index, you can insert the snippet in MongoDB Atlas by browsing to your cluster splash page and clicking on the “Search” tab: Overview of the Search configuration panel for MongoDB Atlas collections
Next, you can click over “Create Index.” Make sure you select “JSON Editor”: Overview of the JSON Editor functionality in the Search Configuration panel for MongoDB Atlas
Paste the JSON snippet from below into the JSON Editor. Make sure you select the correct database and collection! In our case, the collection name is catalog_final_myn. Please refer to the full code in the repository to see how the full index looks and how you can bring it together with the rest of parameters for the AI-enhanced search tutorial.
1{
2 "mappings": {
3 "fields": {
4 "vec": [
5 {
6 "dimensions": 384,
7 "similarity": "cosine",
8 "type": "knnVector"
9 }
10 ]
11 }
12 }
13}
In the code above, the vec field is of type knnVector, designed for vector search. It indicates that each vector has 384 dimensions and uses cosine similarity to determine vector closeness. This is crucial for semantic search, where the goal is to find results that are contextually or semantically related.
By implementing these indexing parameters, we speed up retrieval times. Especially with high-dimensional vector data, as raw vectors can consume a significant amount of storage and reduce the computational cost of operations like similarity calculations.
Instead of comparing a query vector with every vector in the dataset, indexing allows the system to compare with a subset, saving computational resources.

A quick example of improved search results

Browse over to our LEAFYY Ecommerce website, in which we will perform a search for the keywords tan bags. You’ll get these results:
Search engine results page after running a simple query on an Ecommerce storefront powered with MongoDB Atlas Search
As you can see, you’ll first get results that match the specific tokenized keywords “tan” and “bags”. As a result, this will give you any product that contains any or both of those keywords in the product catalog collection documents.
However, not all the results are bags or of tan color. You can see shoes, wallets, a dress, and a pair of pants. This could be frustrating as a customer, prompting them to leave the site.
Now, enable vector search by clicking on the checkbox on the left of the magnifying glass icon in the search bar, and re-run the query “tan bags”. The results you get are in the image below: Search engine results page after running a vector search query on an ecommerce storefront powered with MongoDB Atlas Search
As you can see from the screenshot, the results became more relevant for a consumer. Our search engine is able to identify similar products by understanding the context that “beige” is a similar color to “tan”, and therefore showcase these products as alternatives.

Conclusion

By working with MongoDB Atlas and Databricks, we can create real-time data transformation pipelines. We achieve this by leveraging the MongoDB Connector for Spark to prepare our operational data for vectorization, and store it back into our MongoDB Atlas collections. This approach allows us to develop the search logic for our Ecommerce app with minimal operational overhead.
On top of that, Atlas Vector Search provides a robust solution for implementing advanced search features, making it easy to deliver a great search user experience for your customers. By understanding and integrating these tools, developers can create search experiences that are fast, relevant, and user-friendly.
Make sure to review the full code in our GitHub repository. Contact us to get a deeper understanding of how to build advanced search solutions for your Ecommerce business.

Facebook Icontwitter iconlinkedin icon
Rate this tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Related
Tutorial

How to Query from Multiple MongoDB Databases Using MongoDB Atlas Data Federation


Jan 23, 2024 | 7 min read
Tutorial

Getting Started With MongoDB Atlas Serverless, AWS CDK, and AWS Serverless Computing


Aug 09, 2024 | 18 min read
Tutorial

Streaming Data from MongoDB to BigQuery Using Confluent Connectors


Jul 11, 2023 | 4 min read
News & Announcements

Near Real-time Analytics Powered by Mirroring in Microsoft Fabric for MongoDB Atlas


Nov 20, 2024 | 4 min read
Table of Contents