How to Implement Databricks Workflows and Atlas Vector Search for Enhanced Ecommerce Search Accuracy
Vittal Pai, Ashwin Gangadhar, Francesco Baldissera6 min read • Published Sep 18, 2024 • Updated Sep 18, 2024
Rate this tutorial
In the vast realm of Ecommerce, customers' ability to quickly and accurately search through an extensive range of products is paramount. Atlas Vector Search is emerging as a turning point in this space, offering a refined approach to search that goes beyond mere keyword matching. Let's delve into its implementation using MongoDB Atlas, Atlas Vector Search, and Databricks.
- GitHub repo for AI-enhanced search and vector search (code is bundled up for clarity)
In a previous tutorial, Learn to Build AI-Enhanced Retail Search Solutions with MongoDB and Databricks, we showcased how the integration of MongoDB and Databricks provides a comprehensive solution for the retail industry by combining real-time data processing, workflow orchestration, machine learning, custom data functions, and advanced search capabilities as a way to optimize product catalog management and enhance customer interactions.
In this tutorial, we are going to be building the Vector Search solution on top of the codebase from the previous tutorial. Please check out the Github repository for the full solution.
The diagram below represents the Databricks workflow for indexing data from the atp (available to promise), images, prd_desc (product discount), prd_score (product score), and price collections. These collections are also part of the previously mentioned tutorial, so please refer back if you need to access them.
Within the MongoDB Atlas platform, we can use change streams and the MongoDB Connector for Spark to move data from the collections into a new collection called Catalog. From there, we will use a text transformer to create the
Catalog Final Collection
. This will enable us to create a corpus of indexed and vector embedded data that will be used later as the search dictionary. We’ll call this collection catalog_final_myn
. This will be shown further along after we embed the product names.The catalog final collection will include the available to promise status for each product, its images, the product discount, product relevance score, and price, along with the vectorized or embedded product name that we’ll point our vector search engine at.
With the image below, we explain what the Databricks workflow looks like. It consists of two jobs that are separated in two notebooks respectively. We’ll go over each of the notebooks below.
The first step is to ingest data from the previously mentioned collections using the spark.readStream method from the MongoDB Connector for Spark. The code below is part of the notebook we’ll set as a job using Databricks Workflows. You can learn more about Databricks notebooks by following their tutorial.
1 atp = spark.readStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "atp_status_myn").\ option('spark.mongodb.change.stream.publish.full.document.only','true').\ option('spark.mongodb.aggregation.pipeline',[]).\ option("forceDeleteTempCheckpointLocation", "true").load() atp = atp.drop("_id") atp.writeStream.format("mongodb").\ option('spark.mongodb.connection.uri', MONGO_CONN).\ option('spark.mongodb.database', "search").\ option('spark.mongodb.collection', "catalog_myn").\ option('spark.mongodb.operationType', "update").\ option('spark.mongodb.upsertDocument', True).\ option('spark.mongodb.idFieldList', "id").\ option("forceDeleteTempCheckpointLocation", "true").\ option("checkpointLocation", "/tmp/retail-atp-myn4/_checkpoint/").\ outputMode("append").\ start()
This part of the notebook reads data changes from the atp_status_myn collection in the search database, drops the _id field, and then writes (or updates) the processed data to the catalog_myn collection in the same database.
Notice how it’s reading from the
atp_status_myn
collection, which already has the one hot encoding (boolean values if the product is available or not) from the previous tutorial. This way, we make sure that we only embed the data from the products that are available in our stock.Please refer to the full notebook in our Github repository if you want to learn more about all the data ingestion and transformations conducted during this stage.
Using a combination of Python libraries and PySpark operations to process data from the Catalog MongoDB collection, we’ll transform it, vectorize it, and write the transformed data back to the Final Catalog collection. On top of this, we’ll build our application search business logic.
We start by using the %pip magic command, which is specific to Jupyter notebooks and IPython environments. The necessary packages are:
- pymongo: A Python driver for MongoDB.
- tqdm: A library to display progress bars.
- sentence-transformers: A library for state-of-the-art sentence, text, and image embeddings.
First, let’s use pip to install these packages in our Databricks notebook:
1 %pip install pymongo tqdm sentence-transformers
We continue the notebook with the following code:
1 model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
Here we load a pre-trained model from the sentence-transformers library. This model will be used to convert text into embeddings or vectors.
The next step is to bring the data from the MongoDB Atlas catalog and search collections. This as a continuation of the same notebook:
1 catalog_status = spark.readStream.format("mongodb").\ 2 option('spark.mongodb.connection.uri', MONGO_CONN).\ 3 option('spark.mongodb.database', "search").\ 4 option('spark.mongodb.collection', "catalog_myn").\ 5 option('spark.mongodb.change.stream.publish.full.document.only','true').\ 6 option('spark.mongodb.aggregation.pipeline',[]).\ 7 option("forceDeleteTempCheckpointLocation", "true").load()
With this code, we set up a structured streaming read from the
catalog_myn
collection in the search
database of MongoDB. The resulting data is stored in the catalog_status
DataFrame in Spark. The read operation is configured to fetch the full document from MongoDB's change stream and does not apply any aggregation.The notebook code continues with:
1 #Calculating new column called discounted price using the F decorator 2 3 catalog_status = catalog_status.withColumn("discountedPrice", F.col("price") * F.col("pred_price")) 4 5 #One hot encoding of the atp_status column 6 7 catalog_status = catalog_status.withColumn("atp", (F.col("atp").cast("boolean") & F.lit(1).cast("boolean")).cast("integer")) 8 9 #Running embeddings of the product titles with the get_vec function 10 11 catalog_status.withColumn("vec", get_vec("title")) 12 13 #Dropping _id column and creating a new final catalog collection with checkpointing 14 15 catalog_status = catalog_status.drop("_id") 16 catalog_status.writeStream.format("mongodb").\ 17 option('spark.mongodb.connection.uri', MONGO_CONN).\ 18 option('spark.mongodb.database', "search").\ 19 option('spark.mongodb.collection', "catalog_final_myn").\ 20 option('spark.mongodb.operationType', "update").\ 21 option('spark.mongodb.idFieldList', "id").\ 22 option("forceDeleteTempCheckpointLocation", "true").\ 23 option("checkpointLocation", "/tmp/retail-atp-myn5/_checkpoint/").\ 24 outputMode("append").\ 25 start()
With this last part of the code, we calculate a new column called discountedPrice as the product of the predicted price. Then, we perform one-hot encoding on the atp status column, vectorize the title of the product, and merge everything back into a final catalog collection.
Now that we have our catalog collection with its proper embeddings, it’s time for us to build the Vector Search Index using MongoDB Atlas Search.
Here we’ll define how data should be stored and indexed for efficient searching. To configure the index, you can insert the snippet in MongoDB Atlas by browsing to your cluster splash page and clicking on the “Search” tab:
Next, you can click over “Create Index.” Make sure you select “JSON Editor”:
Paste the JSON snippet from below into the JSON Editor. Make sure you select the correct database and collection! In our case, the collection name is
catalog_final_myn
. Please refer to the full code in the repository to see how the full index looks and how you can bring it together with the rest of parameters for the AI-enhanced search tutorial.1 { 2 "mappings": { 3 "fields": { 4 "vec": [ 5 { 6 "dimensions": 384, 7 "similarity": "cosine", 8 "type": "knnVector" 9 } 10 ] 11 } 12 } 13 }
In the code above, the vec field is of type knnVector, designed for vector search. It indicates that each vector has 384 dimensions and uses cosine similarity to determine vector closeness. This is crucial for semantic search, where the goal is to find results that are contextually or semantically related.
By implementing these indexing parameters, we speed up retrieval times. Especially with high-dimensional vector data, as raw vectors can consume a significant amount of storage and reduce the computational cost of operations like similarity calculations.
Instead of comparing a query vector with every vector in the dataset, indexing allows the system to compare with a subset, saving computational resources.
Browse over to our LEAFYY Ecommerce website, in which we will perform a search for the keywords
tan bags
. You’ll get these results:As you can see, you’ll first get results that match the specific tokenized keywords “tan” and “bags”. As a result, this will give you any product that contains any or both of those keywords in the product catalog collection documents.
However, not all the results are bags or of tan color. You can see shoes, wallets, a dress, and a pair of pants. This could be frustrating as a customer, prompting them to leave the site.
Now, enable vector search by clicking on the checkbox on the left of the magnifying glass icon in the search bar, and re-run the query “tan bags”. The results you get are in the image below:
As you can see from the screenshot, the results became more relevant for a consumer. Our search engine is able to identify similar products by understanding the context that “beige” is a similar color to “tan”, and therefore showcase these products as alternatives.
By working with MongoDB Atlas and Databricks, we can create real-time data transformation pipelines. We achieve this by leveraging the MongoDB Connector for Spark to prepare our operational data for vectorization, and store it back into our MongoDB Atlas collections. This approach allows us to develop the search logic for our Ecommerce app with minimal operational overhead.
On top of that, Atlas Vector Search provides a robust solution for implementing advanced search features, making it easy to deliver a great search user experience for your customers. By understanding and integrating these tools, developers can create search experiences that are fast, relevant, and user-friendly.
Make sure to review the full code in our GitHub repository. Contact us to get a deeper understanding of how to build advanced search solutions for your Ecommerce business.
Related
Tutorial
How to Query from Multiple MongoDB Databases Using MongoDB Atlas Data Federation
Jan 23, 2024 | 7 min read
Tutorial
Getting Started With MongoDB Atlas Serverless, AWS CDK, and AWS Serverless Computing
Aug 09, 2024 | 18 min read