Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Index Anything, Search Everything: Scalable Vector Search with Replicate AI, MongoDB, and Hookdeck

Phil Leggetter20 min read • Published Dec 16, 2024 • Updated Dec 16, 2024
Atlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
In this tutorial, we'll build a Flask application allowing users to index and search anything on the internet with a publicly accessible URL. That's right! Ask the app to index an MP3 or WAV file, an HTML or text file, or a MOV or MP4 file, and it will use the power of Replicate AI to create a textual representation of that file, and the app stores the results in MongoDB Atlas. As long as an LLM can analyze the resource and create a textual representation, it can be indexed. Then, all those indexed files, no matter the originating file type, can be searched with text using MongoDB Atlas.
We'll use Hookdeck, an event gateway that provides infrastructure and tooling to build and manage event-driven applications by seamlessly handling webhooks and API requests. In this tutorial, Hookdeck ensures reliable, scalable communication between the Flask application and Replicate, an LLM API platform, by acting as a serverless queue for rate-limiting API calls and guaranteeing webhook delivery.
We'll begin by setting up the required services and getting the Flask application up and running. Then, we'll follow the journey of data through key components and code within the app, covering submitting the indexing request, analyzing the content type, generating a textual representation, and generating and storing a vector embedding. The content is ultimately made available for search within a vector search index.

Visão geral da arquitetura

Scalability is often overhyped, but it remains an important aspect of building robust applications. One of the benefits of using serverless and cloud-hosted providers is the ability to offload work to specialized services. Also required in any scalable architecture is a way of ensuring services aren't overloaded and your application is fault-tolerant. In this tutorial, we leverage several such services to handle different aspects of our application.
First, let's take a look at the services:
  • Replicate: Provides open-source machine learning models accessible via an API
  • MongoDB Atlas: An integrated suite of data services centered around a cloud database designed to accelerate and simplify how you build with data
  • Hookdeck: An event gateway that provides engineering teams with infrastructure and tooling to build and manage event-driven applications
Next, let's see how the services are used.
All The Things Architecture
  • MongoDB Atlas: MongoDB Atlas provides database storage and vector search capabilities, ensuring our data is stored efficiently and queried quickly.
  • Index All The Things: This is the Flask application.
  • Hookdeck: Hookdeck acts as a serverless queue for a) ensuring Replicate API requests do not exceed rate limits and can be retried and b) ingestion, delivery, and retrying webhooks from Replicate to ensure reliable ingestion of events. Note: We'll also use the Hookdeck CLI to receive webhooks in our local development environment.
  • Replicate: Replicate handles AI inference, produces text and embeddings, and allows us to offload the computationally intensive tasks of running machine learning models. We use different LLMs to analyze different content types.
By utilizing these cloud-based services, we can focus on building the core functionality of our application while ensuring it remains scalable and efficient. Webhooks, in particular, allow for scalability by enabling asynchronous AI workflows, offloading those high compute usage scenarios to the third-party services, and just receiving callbacks via a webhook when work is completed.

Pré-requisitos

Before you begin, ensure you have the following:

Get the app up and running

Let's begin by getting the application running and seeing it in action.

Get the code

Begin by getting the application codebase.
1git clone https://github.com/hookdeck/index-all-the-things.git
Activate a virtual environment with Poetry:
1poetry shell
And install the app dependencies:
1poetry install

Configure the app

The application needs credentials for the services it interacts with.
Copy the example .env-example file to a new .env file:
1cp .env-example .env
Update the values within .env as follows:
  • SECRET_KEY: A secret key that will be used for securely signing the session cookie. See the SECRET_KEY Flask docs for more information.
  • MONGODB_CONNECTION_URI: Populate with a MongoDB Atlas connection string with a Read and write to any database role. See the Get Connection String docs.
  • HOOKDECK_PROJECT_API_KEY: Get an API Key from the Project -> Settings -> Secrets section of the Hookdeck dashboard.
  • HOOKDECK_WEBHOOK_SECRET: Get a Signing Secret from the Project -> Settings -> Secrets section of the Hookdeck dashboard.
  • REPLICATE_API_TOKEN: Create an API token in the Replicate dashboard.
  • REPLICATE_WEBHOOKS_SECRET: Go to the Webhooks section of the Replicate dashboard and click the Show signing key button.
  • HOOKDECK_REPLICATE_API_QUEUE_API_KEY, HOOKDECK_REPLICATE_API_QUEUE_URL, AUDIO_WEBHOOK_URL and EMBEDDINGS_WEBHOOK_URL will be automatically populated in the next step.

Create Hookdeck connections

Hookdeck connections are used to route inbound HTTP requests received by a Hookdeck source to a Hookdeck destination.
The create-hookdeck-connections.py script automatically creates the following Hookdeck connections that:
  1. Route requests made to Hookdeck URLs to the locally running application via the Hookdeck CLI. Here, Hookdeck is used as an inbound queue.
  2. Route requests made to a Hookdeck URL to the Replicate API. Hookdeck is used as an outbound queue, in this situation.
The script also updates the .env file with the source URLs that handle the webhooks. Let's go through the details of the script.
First, ensure you have the necessary imports and define the authentication and content type headers for the Hookdeck API request:
1import httpx
2import re
3import hashlib
4import os
5
6from config import Config
7
8headers = {
9 "Authorization": f"Bearer {Config.HOOKDECK_PROJECT_API_KEY}",
10 "Content-Type": "application/json",
11}
Next, define a function to create a connection to the Hookdeck API:
1def create_connection(payload):
2 response = httpx.request(
3 "PUT",
4 "https://api.hookdeck.com/latest/connections",
5 headers=headers,
6 json=payload,
7 )
8 data = response.json()
9
10 if response.status_code != 200:
11 raise Exception(f"Failed to create connection: {data}")
12
13 return data
This function makes a PUT request to the Hookdeck API with the upsert connection payload and handles the response. If the response status is not 200 (OK), an exception is raised. The function returns the parsed JSON response.
The first connection to be created is one for the Replicate API outbound queue:
1replicate_api_queue_api_key = hashlib.sha256(os.urandom(32)).hexdigest()
2replicate_api_queue = {
3 "name": "replicate-api-queue",
4 "source": {
5 "name": "replicate-api-queue",
6 "verification": {
7 "type": "API_KEY",
8 "configs": {
9 "header_key": Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME,
10 "api_key": replicate_api_queue_api_key,
11 },
12 },
13 },
14 "rules": [
15 {
16 "type": "retry",
17 "strategy": "exponential",
18 "count": 5,
19 "interval": 30000,
20 "response_status_codes": ["429", "500"],
21 }
22 ],
23 "destination": {
24 "name": "replicate-api",
25 "url": "https://api.replicate.com/v1/",
26 "auth_method": {
27 "type": "BEARER_TOKEN",
28 "config": {
29 "token": Config.REPLICATE_API_TOKEN,
30 },
31 },
32 },
33}
34
35replicate_api_connection = create_connection(replicate_api_queue)
The Connection has a name, a source, and a destination. The source also has a name and a verification. The verification instructs Hookdeck how to authenticate requests. Since the connection is acting as an API queue, we're using the API_KEY type with the header_key set to the value defined in Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME and the api_key value set to the generated hash stored in replicate_api_queue_api_key.
The rules define the request retry strategy to use when interacting with the Replicate API. In this case, we're stating that we should retry up to five times, using an interval of 30000 milliseconds, but apply an exponential back-off retry strategy. Also, we're using the response_status_codes option to inform Hookdeck to only retry on 429 and 500 HTTP responses. See the Hookdeck Retry docs for more information on retries and the Hookdeck Rules docs for details on other types of rules that are available.
The url on the destination is the base URL for the Replicate API. Hookdeck uses path forwarding by default, so any path appended to the Hookdeck source URL will also be appended to the destination URL. For example, a request to a Hookdeck source with URL https://hkdk.events/{id}/predictions will result in a request to a connected destination of https://api.replicate.com/v1/predictions where the destination has a base URL of https://api.replicate.com/v1/. Hookdeck acts very much like a proxy in this scenario.
The auth_method on the destination is of type BEARER_TOKEN with a config.token set to the value of the REPLICATE_API_TOKEN environment variable. This allows Hookdeck to make authenticated API calls to Replicate.
Now, create a connection for the Replicate Audio webhooks to handle audio analysis callbacks:
1replicate_audio = {
2 "name": "replicate-audio",
3 "source": {
4 "name": "replicate-audio",
5 "verification": {
6 "type": "REPLICATE",
7 "configs": {
8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET,
9 },
10 },
11 },
12 "rules": [
13 {
14 "type": "retry",
15 "count": 5,
16 "interval": 30000,
17 "strategy": "exponential",
18 "response_status_codes": ["!404"],
19 }
20 ],
21 "destination": {
22 "name": "cli-replicate-audio",
23 "cli_path": "/webhooks/audio",
24 },
25}
26
27replicate_audio_connection = create_connection(replicate_audio)
The Replicate Audio webhook callback connection uses a verification of type REPLICATE with a configs.webhook_secret_key value set from the REPLICATE_WEBHOOKS_SECRET value we stored in the .env file. This enables and instructs Hookdeck to verify that the webhook has come from Replicate.
The rules for this inbound connection are similar to the outbound connection and define a delivery retry strategy to follow if any requests to our application's webhook endpoint fail. The only difference is the response_status_codes informs Hookdeck not to retry if it receives a 200 or 404 response.
The destination has a name and a cli_path informing Hookdeck that the destination is the Hookdeck CLI and the path the request should be forwarded to is /webhooks/audio.
Next, create a connection for Replicate Embeddings webhook callbacks:
1replicate_embedding = {
2 "name": "replicate-embedding",
3 "source": {
4 "name": "replicate-embedding",
5 "verification": {
6 "type": "REPLICATE",
7 "configs": {
8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET,
9 },
10 },
11 },
12 "rules": [
13 {
14 "type": "retry",
15 "count": 5,
16 "interval": 30000,
17 "strategy": "exponential",
18 "response_status_codes": ["!200", "!404"],
19 }
20 ],
21 "destination": {
22 "name": "cli-replicate-embedding",
23 "cli_path": "/webhooks/embedding",
24 },
25}
26
27replicate_embedding_connection = create_connection(replicate_embedding)
Finally, update the .env file with some of the generated values:
1# Update .env
2with open(".env", "r") as file:
3 env_content = file.read()
4
5replicate_api_connection_url = replicate_api_connection["source"]["url"]
6audio_webhook_url = replicate_audio_connection["source"]["url"]
7embedding_webhook_url = replicate_embedding_connection["source"]["url"]
8
9# Replace the .env URLs in the .env content
10env_content = re.sub(
11 r"HOOKDECK_REPLICATE_API_QUEUE_API_KEY=.*",
12 f"HOOKDECK_REPLICATE_API_QUEUE_API_KEY={replicate_api_queue_api_key}",
13 env_content,
14)
15env_content = re.sub(
16 r"HOOKDECK_REPLICATE_API_QUEUE_URL=.*",
17 f"HOOKDECK_REPLICATE_API_QUEUE_URL={replicate_api_connection_url}",
18 env_content,
19)
20env_content = re.sub(
21 r"AUDIO_WEBHOOK_URL=.*", f"AUDIO_WEBHOOK_URL={audio_webhook_url}", env_content
22)
23env_content = re.sub(
24 r"EMBEDDINGS_WEBHOOK_URL=.*",
25 f"EMBEDDINGS_WEBHOOK_URL={embedding_webhook_url}",
26 env_content,
27)
28
29with open(".env", "w") as file:
30 file.write(env_content)
31
32print("Connections created successfully!")
This code reads the current .env content, replaces the lines with existing environmental variable placeholders using regular expressions, and writes the updated content back to the .env file. This ensures that the environment variables, such as the webhook URLs, are up-to-date.
Run the script:
1poetry run python create-hookdeck-connections.py
Check your .env file to ensure all values are populated.
Also, navigate to the Connections section of the Hookdeck dashboard and check the visual representation of your connections.
Hookdeck connection in the Hookdeck dashboard

Create MongoDB Atlas indexes

To search a MongoDB database efficiently, you need indexes. For MongoDB vector search, you must create an Atlas Vector Search index. The create-indexes.py script automates the creation and updating of the search indexes in MongoDB using the pymongo library.
First, ensure you have the necessary imports and initialize the database connection:
1from allthethings.mongo import Database
2from pymongo.operations import SearchIndexModel
3
4database = Database()
5collection = database.get_collection()
Database is defined in allthethings/mongo.py and provides utility access to the assets collection in the iaat database, with these string values defined in config.py.
Next, ensure that the required collection exists within the database so that the indexes can be created:
1if collection.name not in collection.database.list_collection_names():
2 print("Creating empty collection so indexes can be created.")
3 collection.database.create_collection(collection.name)
With the collection created, define a function to create or update search indexes:
1def create_or_update_search_index(index_name, index_definition, index_type):
2 indexes = list(collection.list_search_indexes(index_name))
3
4 if len(indexes) == 0:
5 print(f'Creating search index: "{index_name}"')
6 index_model = SearchIndexModel(
7 definition=index_definition,
8 name=index_name,
9 type=index_type,
10 )
11 collection.create_search_index(model=index_model)
12
13 else:
14 print(f'Search index "{index_name}" already exists. Updating.')
15 collection.update_search_index(name=index_name, definition=index_definition)
This function checks if an index with the given index_name already exists. It creates a new search index using the provided definition and type if it does not exist. If it exists, it updates the existing index with the new definition.
Now, create a vector search index for embeddings:
1vector_result = create_or_update_search_index(
2 "vector_index",
3 {
4 "fields": [
5 {
6 "type": "vector",
7 "path": "embedding",
8 "numDimensions": 768,
9 "similarity": "euclidean",
10 }
11 ]
12 },
13 "vectorSearch",
14)
This creates or updates a vector search index named "vector_index" for the embedding field.
Finally, create a search index for the url field, as this is used to determine if a URL has already been indexed:
1create_or_update_search_index(
2 "url_index",
3 {
4 "mappings": {
5 "fields": {
6 "url": {
7 "type": "string",
8 },
9 },
10 }
11 },
12 "search",
13)
14
15print("Indexes created successfully!")
Run the script:
1poetry run python create-indexes.py
Go to the Atlas Search section within the MongoDB Atlas dashboard and check the search indexes have been created.
MongoDB Atlas dashboard Atlas Search indexes

Check the app is working

In one terminal window, run the Flask application:
1poetry run python -m flask --app app --debug run
In a second terminal window, create a localtunnel using the Hookdeck CLI:
1hookdeck listen 5000 '*'
This command listens to all Hookdeck sources connected to a CLI destination, routing webhooks to the application running locally on port 5000.
When you run the command, you will see output similar to the following:
1Listening for events on Sources that have Connections with CLI Destinations
2
3Dashboard
4👉 Inspect and replay events: https://dashboard.hookdeck.com?team_id=tm_{id}
5
6Sources
7🔌 replicate-embedding URL: https://hkdk.events/{id}
8🔌 replicate-audio URL: https://hkdk.events/{id}
9
10Connections
11replicate-embedding -> replicate-embedding forwarding to /webhooks/embedding
12replicate-audio -> replicate-audio forwarding to /webhooks/audio
13
14> Ready! (^C to quit)
Open localhost:5000 in your web browser to ensure the Flask app is running.
Index All the The Things app

Submit content for analysis and indexing

With the app running, it's time to submit an asset for indexing.
Click Bruce (mp3) under the Examples header to populate the in-app search bar with a URL and click Submit.
URL submitted for indexing
Submitting the form sends the URL to a /process endpoint as a POST request. Let's walk through what that code does.
First, define the /process route in app.py:
1@app.route("/process", methods=["POST"])
2def process():
3 url = request.form["url"]
4
5 parsed_url = urlparse(url)
6 if not all([parsed_url.scheme, parsed_url.netloc]):
7 flash("Invalid URL")
8 return redirect(url_for("index"))
This route handles the POST request to the /process endpoint and retrieves the URL from the form data submitted by the user. It validates the URL and redirects to the index page with an error message if it's not.
Next, check if the URL already exists in the database:
1 database = Database()
2 collection = database.get_collection()
3
4 exists = collection.find_one({"url": url})
5
6 if exists is not None:
7 flash("URL has already been indexed")
8 return redirect(url_for("index"))
If the URL is already indexed, flash a message to the user and redirect them to the index page.
Ensure the resource exists:
1 req = urllib.request.Request(url, method="HEAD")
2 fetch = urllib.request.urlopen(req)
3
4 if fetch.status != 200:
5 flash("URL is not reachable")
6 return redirect(url_for("index"))
This code sends a HEAD request to the URL to avoid downloading the entire file. If the URL is not reachable (status code is not 200), flash a message to the user and redirect them to the index page.
Retrieve the content type and length from the response headers:
1 content_length = fetch.headers["Content-Length"]
2 content_type = fetch.headers["Content-Type"]
This code extracts the content length and content type from the response headers.
Retrieve the appropriate asset processor based on the content type:
1 processor = get_asset_processor(content_type)
2
3 if processor is None:
4 flash('Unsupported content type "' + content_type + '"')
5 return redirect(url_for("index"))
If no processor is found for the content type, flash a message to the user and redirect them to the index page.
The get_asset_processor function, defined in allthethings/processors.py, returns a processor used to analyze the contents of an asset based on the content_type.
1def get_asset_processor(
2 content_type,
3):
4 if "audio/" in content_type:
5 return AudioProcessor()
6 elif "video/" in content_type:
7 return None
8 elif "image/" in content_type:
9 return None
10 else:
11 return None
In this case, the file is an MP3, and the content_type is audio/mpeg, so return an AudioProcessor instance.
Insert the URL, along with its content type and length, into the database with a status of SUBMITTED:
1 asset = collection.insert_one(
2 {
3 "url": url,
4 "content_type": content_type,
5 "content_length": content_length,
6 "status": "SUBMITTED",
7 }
8 )
Process the URL using the asset processor, an AudioProcessor, and obtain the prediction results:
1 try:
2 response = processor.process(asset.inserted_id, url)
3 except Exception as e:
4 app.logger.error("Error processing asset: %s", e)
5 collection.update_one(
6 filter={"url": url},
7 update={
8 "$set": {
9 "status": "PROCESSING_ERROR",
10 "error": str(e),
11 }
12 },
13 )
14 flash("Error processing asset")
15 return redirect(url_for("index"))
Let's look at the AudioProcessor from allthethings/processors.py in more detail to understand what this does:
1import httpx
2from config import Config
3
4...
5
6class AudioProcessor:
7 def process(self, id, url):
8 input = {
9 "audio": url,
10 "model": "large-v3",
11 "language": "auto",
12 "translate": False,
13 "temperature": 0,
14 "transcription": "plain text",
15 "suppress_tokens": "-1",
16 "logprob_threshold": -1,
17 "no_speech_threshold": 0.6,
18 "condition_on_previous_text": True,
19 "compression_ratio_threshold": 2.4,
20 "temperature_increment_on_fallback": 0.2,
21 }
22
23 payload = {
24 "version": "cdd97b257f93cb89dede1c7584e3f3dfc969571b357dbcee08e793740bedd854",
25 "input": input,
26 "webhook": f"{Config.AUDIO_WEBHOOK_URL}/{id}",
27 "webhook_events_filter": ["completed"],
28 }
29
30 response = httpx.request(
31 "POST",
32 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions",
33 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS,
34 json=payload,
35 )
36
37 return response.json()
process method processes the audio URL by creating a prediction request passing the payload as the JSON body.
payload includes webhooks, which consists of the Config.AUDIO_WEBHOOK_URL with an appended path (/{id}) that indicates which asset the callback is for. The use of the webhook_events_filter=["completed"] filter informs Replicate to only send a webhook when the prediction is completed.
The payload.version instructs Replicate to use the OpenAI Whisper model for audio to text. The input includes details such as the language should be auto-detected and the transcription should be in plain text.
Since we're using Hookdeck as an outbound API queue, the request uses the Config.HOOKDECK_REPLICATE_API_QUEUE_URL with the API path /predications suffix. The appropriate auth headers are also used from Config.HOOKDECK_QUEUE_AUTH_HEADERS.
Back in app.py, update the database with the processing status and pending prediction details:
1 collection.update_one(
2 filter={"url": url},
3 update={
4 "$set": {
5 "status": "PROCESSING",
6 "processor_response": response,
7 }
8 },
9 )
The processor_response value is stored for debug purposes as it contains a Hookdeck request ID that can be useful.
Flash a success message to the user and redirect them to the index page:
1 flash(
2 message="Processing: " + url + " with content type: " + content_type,
3 category="success",
4 )
5
6 return redirect(url_for("index"))
At this point, the Flask application has offloaded all the work to Replicate, and, from a data journey perspective, we're waiting for the predication completed webhook.

Handle audio to text prediction completion webhook

Once Replicate completes the predication, it makes a webhook callback to Hookdeck. Hookdeck instantly ingests the webhook, verifies the event came from Replicate, and pushes the data onto a queue for processing and delivery. Based on the current Hookdeck connection setup, the webhook event is delivered to the CLI and then to the /webhooks/audio/<id> endpoint of the Flask application. Let's look at the code that handles the /webhooks/audio/<id> request.
Here's the /webhooks/audio/<id> route definition in app.py:
1@app.route("/webhooks/audio/<id>", methods=["POST"])
2def webhook_audio(id):
3 if not verify_webhook(request):
4 app.logger.error("Webhook signature verification failed")
5 return jsonify({"error": "Webhook signature verification failed"}), 401
6
7 payload = request.json
8 app.logger.info("Audio payload received for id %s", id)
9 app.logger.debug(payload)
This route handles POST requests to the /webhooks/audio/<id> endpoint. The id path parameter represents the asset in the MongoDB database that the audio callback is for. The JSON payload from the webhook callback from Replicate.
Before handling the webhook, we check that the webhook came from Hookdeck via a verify_webhook function. If the verification fails a 401 response is returned. Here's the code to verify the webhook:
1def verify_webhook(request):
2 if Config.HOOKDECK_WEBHOOK_SECRET is None:
3 app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.")
4 return False
5
6 hmac_header = request.headers.get("x-hookdeck-signature")
7
8 hash = base64.b64encode(
9 hmac.new(
10 Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256
11 ).digest()
12 ).decode()
13
14 verified = hash == hmac_header
15 app.logger.debug("Webhook signature verification: %s", verified)
16 return verified
This reads the Hookdeck webhook secret stored in the HOOKDECK_WEBHOOK_SECRET environment variable, generates a hash using the secret from the inbound webhook data, and compares it with the hash that was sent in the x-hookdeck-signature header. If they match, the webhook is verified.
Next, the processing status is determined based on the presence of an error in the payload:
1 database = Database()
2 collection = database.get_collection()
3
4 status = (
5 "PROCESSING_ERROR" if "error" in payload and payload["error"] else "PROCESSED"
6 )
If an error is present, the status is set to PROCESSING_ERROR. Otherwise, it is set to PROCESSED.
The database is updated with the transcription results and the processing status:
1 result = collection.find_one_and_update(
2 filter={"_id": ObjectId(id)},
3 update={
4 "$set": {
5 "status": status,
6 "text": payload["output"]["transcription"],
7 "replicate_response": payload,
8 }
9 },
10 return_document=True,
11 )
This finds the document in the database with the matching id and updates it with the new status, transcription text, and the entire Replicate response payload.
Next, we check to ensure the document was found:
1 if result is None:
2 app.logger.error(
3 "No document found for id %s to add audio transcript", payload["id"]
4 )
5 return jsonify({"error": "No document found to add audio transcript"}), 404
If no document is found for the given id, an error is logged, and a JSON response with an error message is returned. The 404 response will inform Hookdeck that although the request did not succeed, the request should not be retried.
With the audio converted to text and stored, the data journey moves to generating embeddings via Replicate:
1 app.logger.info("Transcription updated")
2 app.logger.debug(result)
3
4 request_embeddings(id)
5
6 return "OK"
Next, the request_embeddings function is called to generate embeddings for the processed audio. The endpoint returns an OK response to inform Hookdeck the webhook has been successfully processed.

Generate embedding

The request_embeddings function triggers the generation of embeddings for the textual representation of an indexed asset:
1def request_embeddings(id):
2 app.logger.info("Requesting embeddings for %s", id)
3
4 database = Database()
5 collection = database.get_collection()
6
7 asset = collection.find_one({"_id": id})
8
9 if asset is None:
10 raise RuntimeError("Asset not found")
11
12 if asset["status"] != "PROCESSED":
13 raise RuntimeError("Asset has not been processed")
If this asset with the passed id is not found or the status of the asset is not PROCESSED, which indicates that a textual representation has been created, a RuntimeError is raised.

Trigger embedding generation with webhook callback

Next, the embeddings are generated for the processed asset using the AsyncEmbeddingsGenerator:
1 generator = AsyncEmbeddingsGenerator()
2
3 try:
4 response = generator.generate(id, asset["text"])
5 except Exception as e:
6 app.logger.error("Error generating embeddings for %s: %s", id, e)
7 raise
This initializes the AsyncEmbeddingsGenerator and calls the generate function on the instance, passing the ID of the asset being indexed and the textual representation.
The AsyncEmbeddingsGenerator definition in allthethings/generators.py follows a similar pattern to the previously used processor:
1import httpx
2from config import Config
3
4
5class AsyncEmbeddingsGenerator:
6 def generate(self, id, text):
7 payload = {
8 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305",
9 "input": {"text": text},
10 "webhook": f"{Config.EMBEDDINGS_WEBHOOK_URL}/{id}",
11 "webhook_events_filter": ["completed"],
12 }
13
14 response = httpx.request(
15 "POST",
16 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions",
17 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS,
18 json=payload,
19 )
20
21 return response.json()
The generate method receives the asset id and the text that embeddings are to be generated for.
A request payload is created containing a version that identifies that the replicate/all-mpnet-base-v2 model is used to generate embeddings, and that the text for the embedding is passed within an input parameter.
The webhook property is set to Config.EMBEDDINGS_WEBHOOK_URL with an appended path (/{id}) that indicates which asset the callback is for. As before, the use of the webhook_events_filter=["completed"] filter informs Replicate to only send a webhook when the prediction is completed.
Since this is an asynchronous call, Hookdeck is again used to queue the Replicate API request via a call to the HOOKDECK_REPLICATE_API_QUEUE_URL endpoint with the /predications path.
The method returns the Hookdeck response.
Back in app.py, update the database with the status and embedding request ID:
1 collection.update_one(
2 filter={"_id": ObjectId(id)},
3 update={
4 "$set": {
5 "status": "GENERATING_EMBEDDINGS",
6 "generator_response": response,
7 }
8 },
9 )
Update the document in the database with the new status GENERATING_EMBEDDINGS and the Hookdeck queue response.
The request to asynchronously generate the embeddings has been triggered, and the work offloaded to Replicate. When the result is read, a webhook will be triggered with the result.

Handle embedding generation webhook callback

Once Replicate has generated the embedding, a webhook callback is made to the /webhooks/embedding/<id> route in our Flask application. This route receives the webhook payload, verifies it came from Hookdeck, updates the database with the embedding results, and sets the appropriate status.
Here's the route definition:
1@app.route("/webhooks/audio/<id>", methods=["POST"])
2def webhook_audio(id):
3 if not verify_webhook(request):
4 app.logger.error("Webhook signature verification failed")
5 return jsonify({"error": "Webhook signature verification failed"}), 401
6
7 payload = request.json
8 app.logger.info("Audio payload received for id %s", id)
9 app.logger.debug(payload)
This route handles POST requests to the /webhooks/embedding/<id> endpoint and is passed the id path parameter. It verifies the request came from Hookdeck and, if so, retrieves the JSON payload from the request. Otherwise, it returns a 401 response.
Next, it checks for errors:
1 status = (
2 "EMBEDDINGS_ERROR" if "error" in payload and payload["error"] else "SEARCHABLE"
3 )
If an error is present, the status is set to EMBEDDINGS_ERROR. Otherwise, it is set to SEARCHABLE.
Next, the vector embedding is extracted from the payload and the database is updated with the embedding details and the new status:
1 embedding = payload["output"][0]["embedding"]
2
3 database = Database()
4 collection = database.get_collection()
5
6 result = collection.update_one(
7 filter={"_id": ObjectId(id)},
8 update={
9 "$set": {
10 "status": status,
11 "embedding": embedding,
12 "replicate_embeddings_response": payload,
13 }
14 },
15 )
This finds the document in the database with the matching id and updates it with the new status, embedding, and the entire payload.
Check if the document was found and updated:
1 if result.matched_count == 0:
2 app.logger.error(
3 "No document found for id %s to update embedding", payload["id"]
4 )
5 return jsonify({"error": "No document found to update embedding"}), 404
6
7 return "OK"
If no document is found for the given id, an error is logged, and a JSON response with an error message is returned with a 404 status. If the update was a success, return an OK to inform Hookdeck that the webhook has been processed.
With the vector embedding stored in the embedding property, it's now searchable with MongoDB due to the previously defined vector search index.
Search is user-driven. The user enters a search term and submits a form. That search query is handled and processed, and the result is returned and displayed. Ideally, this is a real-time experience, so operations are performed synchronously.
Let's walk through each of those steps.

Handle search submission

The user navigates to the /search endpoint in their web browser, enters a search term, and submits the form, making a GET request to the /search endpoint:
1@app.route("/search", methods=["GET"])
2def search():
3 query = request.args.get("query")
4 if query is None:
5 return render_template("search.html", results=[])
6
7 app.logger.info("Query submitted")
8 app.logger.debug(query)
9
10 results = query_vector_search(query)
11
12 results = format_results(results)
13
14 app.logger.debug("Formatted search results", results)
15
16 return render_template("search.html", results=results, query=query)
The search function in the Flask application handles GET requests to the /search endpoint. It retrieves the search query from the request.args.get submitted by the user. If there is no query, the search template is rendered. Otherwise, a vector search is performed using the query_vector_search function. The result is then formatted by passing the results to the format_results function. The formatted results are then rendered using the search.html template.

Generating search query embeddings

The query_vector_search function generates embeddings for the query, performs a vector search using the query provided by the user, and retrieves matching documents from the MongoDB collection.
1def query_vector_search(q):
2 generator = SyncEmbeddingsGenerator()
3
4 try:
5 generator_response = generator.generate(q)
6 app.logger.debug(generator_response)
7 except Exception as e:
8 app.logger.error("Error generating embeddings: %s", e)
9 return None
10
11 if generator_response["status"] != "completed":
12 app.logger.debug("Embeddings generation timed out")
13 return None
14
15 query_embedding = generator_response["output"][0]["embedding"]
The function takes the query, q, and uses the SyncEmbeddingsGenerator to generate the embedding for the search query by calling its generate function and passing the query. If the embedding creation fails for various reasons, None is returned.
The SyncEmbeddingsGenerated is used to synchronously generate embeddings for the search query. This operation is synchronous because the request is user-driven and requires a direct response. SyncEmbeddingsGenerated is defined in allthethings/generators.py:
1class SyncEmbeddingsGenerator:
2 def generate(self, text):
3 payload = {
4 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305",
5 "input": {"text": text},
6 }
7
8 response = httpx.request(
9 "POST",
10 "https://api.replicate.com/v1/predictions",
11 headers={**Config.REPLICATE_API_AUTH_HEADERS, "Prefer": "wait"},
12 json=payload,
13 timeout=60,
14 )
15
16 return response.json()
The generate function receives the text to generate an embedding from. A synchronous request is made directly to the Replicate HTTP API, passing the same replicate/all-mpnet-base-v2 model version used in the asynchronous embedding request. The "Prefer": "Wait" header and timeout values are set to enable long-running synchronous HTTP requests. Also, the Replicate API token is included in the headers via Config.REPLICATE_API_AUTH_HEADERS.
The response JSON is returned to the calling function.

Create Vector Search query

Back in query_vector_search, the embedding result is used to construct the vector search query.
1 ...
2
3 query_embedding = generate_response[0]["embedding"]
4
5 vs_query = {
6 "index": "vector_index",
7 "path": "embedding",
8 "queryVector": query_embedding,
9 "numCandidates": 100,
10 "limit": 10,
11 }
12
13 new_search_query = {"$vectorSearch": vs_query}
14
15 app.logger.info("Vector search query created")
16 app.logger.debug(new_search_query)
vs_query represents the vector search to be performed. It identifies the index to be queried as vector_index; the path to the property, embedding, the query is on; and the result of the text query in embedding format ("queryVector": query_embedding). See the MongoDB Vector Search docs for more information, including the purpose of the numCandidates and limit properties.

Retrieve Vector Search results

Next, the function defines the projection to specify which fields to include in the search results.
1 project = {
2 "$project": {
3 "score": {"$meta": "vectorSearchScore"},
4 "_id": 0,
5 "url": 1,
6 "content_type": 1,
7 "content_length": 1,
8 "text": 1,
9 }
10 }
The projection includes the vector search score, URL, content type, content length, and text. For more information on the score, see the Atlas Vector Search Score docs.
The function then performs the aggregation query using the constructed vector search query and projection:
1 database = Database()
2 collection = database.get_collection()
3
4 app.logger.info("Vector search query without post filter")
5 res = list(collection.aggregate([new_search_query, project]))
6
7 app.logger.info("Vector search query run")
8 app.logger.debug(res)
9 return res
Overall, the query_vector_search function performs a vector search using the query provided by the user, generates embeddings for the query, and retrieves matching documents from the MongoDB database.

Format and display the Vector Search results

Next, within search_post in app.py, the results are formatted for rendering:
1 results = format_results(results)
And within format_results, also defined in app.py:
1def format_results(results):
2 formatted_results = []
3 for _idx, index in enumerate(results):
4 parse_result = urlparse(index["url"])
5 parsed_url = {
6 "netloc": parse_result.netloc,
7 "path": parse_result.path,
8 "params": parse_result.params,
9 "query": parse_result.query,
10 "fragment": parse_result.fragment,
11 "hostname": parse_result.hostname,
12 "last_part": parse_result.path.rstrip("/").split("/")[-1],
13 }
14 index["parsed_url"] = parsed_url
15 formatted_results.append(index)
16
17 return formatted_results
The format_results function iterates over the vector search result and returns an array with each element containing the result along with a parsed_url property with information about the indexed asset.
Finally, back in the POST /search route, the results are displayed:
1@app.route("/search", methods=["POST"])
2def search_post():
3 ...
4
5 results = format_results(results)
6
7 return render_template("search.html", results=results, query=query)
This renders the search.html template, passing the formatted results and the original query to the template for display.
Search results

Conclusão

In this tutorial, we walked through the components used in a Flask application that can index and run a text search on any asset with a public URL. Data is stored, and vector search takes place via MongoDB. AI inference and embedding generation is performed by Replicate. Hookdeck is used as a serverless queue between the Flask application and Replicate to manage API request rate-limiting to Replicate, and to verify, queue, and guarantee the delivery of asynchronous webhook callbacks from Replicate back to the Flask application.
If you haven't already done so, you can grab the Index All The Things code on GitHub. There are a number of feature issues to add support for additional content types, so feel free to get involved.
Finally, if you've any questions or ideas, please share them either via a GitHub issue on the repo or by messaging me on X or Bluesky.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Iniciar a conversa

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Artigo

Mantendo seus custos baixos com as instâncias sem servidor do MongoDB Atlas


Oct 01, 2024 | 3 min read
Tutorial

Aplicativo agente Sentiment Chef com Google Cloud e MongoDB Atlas


Jun 24, 2024 | 16 min read
Tutorial

Parte #1: Crie seu próprio Vector Search com o MongoDB Atlas e o Amazon SageMaker


Sep 18, 2024 | 4 min read
Tutorial

Como usar os vetores quantizados do Cohere para criar aplicativos de AI econômicos com o MongoDB


Oct 03, 2024 | 23 min read
Sumário