Index Anything, Search Everything: Scalable Vector Search with Replicate AI, MongoDB, and Hookdeck
Avalie esse Tutorial
In this tutorial, we'll build a Flask application allowing users to index and search anything on the internet with a publicly accessible URL. That's right! Ask the app to index an MP3 or WAV file, an HTML or text file, or a MOV or MP4 file, and it will use the power of Replicate AI to create a textual representation of that file, and the app stores the results in MongoDB Atlas. As long as an LLM can analyze the resource and create a textual representation, it can be indexed. Then, all those indexed files, no matter the originating file type, can be searched with text using MongoDB Atlas.
We'll use Hookdeck, an event gateway that provides infrastructure and tooling to build and manage event-driven applications by seamlessly handling webhooks and API requests. In this tutorial, Hookdeck ensures reliable, scalable communication between the Flask application and Replicate, an LLM API platform, by acting as a serverless queue for rate-limiting API calls and guaranteeing webhook delivery.
We'll begin by setting up the required services and getting the Flask application up and running. Then, we'll follow the journey of data through key components and code within the app, covering submitting the indexing request, analyzing the content type, generating a textual representation, and generating and storing a vector embedding. The content is ultimately made available for search within a vector search index.
Scalability is often overhyped, but it remains an important aspect of building robust applications. One of the benefits of using serverless and cloud-hosted providers is the ability to offload work to specialized services. Also required in any scalable architecture is a way of ensuring services aren't overloaded and your application is fault-tolerant. In this tutorial, we leverage several such services to handle different aspects of our application.
First, let's take a look at the services:
- Replicate: Provides open-source machine learning models accessible via an API
- MongoDB Atlas: An integrated suite of data services centered around a cloud database designed to accelerate and simplify how you build with data
- Hookdeck: An event gateway that provides engineering teams with infrastructure and tooling to build and manage event-driven applications
Next, let's see how the services are used.
- MongoDB Atlas: MongoDB Atlas provides database storage and vector search capabilities, ensuring our data is stored efficiently and queried quickly.
- Index All The Things: This is the Flask application.
- Hookdeck: Hookdeck acts as a serverless queue for a) ensuring Replicate API requests do not exceed rate limits and can be retried and b) ingestion, delivery, and retrying webhooks from Replicate to ensure reliable ingestion of events. Note: We'll also use the Hookdeck CLI to receive webhooks in our local development environment.
- Replicate: Replicate handles AI inference, produces text and embeddings, and allows us to offload the computationally intensive tasks of running machine learning models. We use different LLMs to analyze different content types.
By utilizing these cloud-based services, we can focus on building the core functionality of our application while ensuring it remains scalable and efficient. Webhooks, in particular, allow for scalability by enabling asynchronous AI workflows, offloading those high compute usage scenarios to the third-party services, and just receiving callbacks via a webhook when work is completed.
Before you begin, ensure you have the following:
- A free Hookdeck account
- Uma conta gratuita do MongoDB Atlas
- A free Replicate account
- Poetry for package management
Let's begin by getting the application running and seeing it in action.
Begin by getting the application codebase.
1 git clone https://github.com/hookdeck/index-all-the-things.git
Activate a virtual environment with Poetry:
1 poetry shell
And install the app dependencies:
1 poetry install
The application needs credentials for the services it interacts with.
Copy the example
.env-example
file to a new .env
file:1 cp .env-example .env
Update the values within
.env
as follows:SECRET_KEY
: A secret key that will be used for securely signing the session cookie. See theSECRET_KEY
Flask docs for more information.MONGODB_CONNECTION_URI
: Populate with a MongoDB Atlas connection string with a Read and write to any database role. See the Get Connection String docs.HOOKDECK_PROJECT_API_KEY
: Get an API Key from the Project -> Settings -> Secrets section of the Hookdeck dashboard.HOOKDECK_WEBHOOK_SECRET
: Get a Signing Secret from the Project -> Settings -> Secrets section of the Hookdeck dashboard.REPLICATE_WEBHOOKS_SECRET
: Go to the Webhooks section of the Replicate dashboard and click the Show signing key button.HOOKDECK_REPLICATE_API_QUEUE_API_KEY
,HOOKDECK_REPLICATE_API_QUEUE_URL
,AUDIO_WEBHOOK_URL
andEMBEDDINGS_WEBHOOK_URL
will be automatically populated in the next step.
Hookdeck connections are used to route inbound HTTP requests received by a Hookdeck source to a Hookdeck destination.
The
create-hookdeck-connections.py
script automatically creates the following Hookdeck connections that:- Route requests made to Hookdeck URLs to the locally running application via the Hookdeck CLI. Here, Hookdeck is used as an inbound queue.
- Route requests made to a Hookdeck URL to the Replicate API. Hookdeck is used as an outbound queue, in this situation.
The script also updates the
.env
file with the source URLs that handle the webhooks. Let's go through the details of the script.First, ensure you have the necessary imports and define the authentication and content type headers for the Hookdeck API request:
1 import httpx 2 import re 3 import hashlib 4 import os 5 6 from config import Config 7 8 headers = { 9 "Authorization": f"Bearer {Config.HOOKDECK_PROJECT_API_KEY}", 10 "Content-Type": "application/json", 11 }
Next, define a function to create a connection to the Hookdeck API:
1 def create_connection(payload): 2 response = httpx.request( 3 "PUT", 4 "https://api.hookdeck.com/latest/connections", 5 headers=headers, 6 json=payload, 7 ) 8 data = response.json() 9 10 if response.status_code != 200: 11 raise Exception(f"Failed to create connection: {data}") 12 13 return data
This function makes a
PUT
request to the Hookdeck API with the upsert connection payload and handles the response. If the response status is not 200
(OK), an exception is raised. The function returns the parsed JSON response.The first connection to be created is one for the Replicate API outbound queue:
1 replicate_api_queue_api_key = hashlib.sha256(os.urandom(32)).hexdigest() 2 replicate_api_queue = { 3 "name": "replicate-api-queue", 4 "source": { 5 "name": "replicate-api-queue", 6 "verification": { 7 "type": "API_KEY", 8 "configs": { 9 "header_key": Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME, 10 "api_key": replicate_api_queue_api_key, 11 }, 12 }, 13 }, 14 "rules": [ 15 { 16 "type": "retry", 17 "strategy": "exponential", 18 "count": 5, 19 "interval": 30000, 20 "response_status_codes": ["429", "500"], 21 } 22 ], 23 "destination": { 24 "name": "replicate-api", 25 "url": "https://api.replicate.com/v1/", 26 "auth_method": { 27 "type": "BEARER_TOKEN", 28 "config": { 29 "token": Config.REPLICATE_API_TOKEN, 30 }, 31 }, 32 }, 33 } 34 35 replicate_api_connection = create_connection(replicate_api_queue)
The Connection has a
name
, a source
, and a destination
. The source
also has a name
and a verification
. The verification
instructs Hookdeck how to authenticate requests. Since the connection is acting as an API queue, we're using the API_KEY
type with the header_key
set to the value defined in Config.HOOKDECK_QUEUE_API_KEY_HEADER_NAME
and the api_key
value set to the generated hash stored in replicate_api_queue_api_key
.The
rules
define the request retry strategy to use when interacting with the Replicate API. In this case, we're stating that we should retry up to five times, using an interval of 30000
milliseconds, but apply an exponential
back-off retry strategy. Also, we're using the response_status_codes
option to inform Hookdeck to only retry on 429
and 500
HTTP responses. See the Hookdeck Retry docs for more information on retries and the Hookdeck Rules docs for details on other types of rules that are available.The
url
on the destination is the base URL for the Replicate API. Hookdeck uses path forwarding by default, so any path appended to the Hookdeck source URL will also be appended to the destination URL. For example, a request to a Hookdeck source with URL https://hkdk.events/{id}/predictions
will result in a request to a connected destination of https://api.replicate.com/v1/predictions
where the destination has a base URL of https://api.replicate.com/v1/
. Hookdeck acts very much like a proxy in this scenario.The
auth_method
on the destination is of type BEARER_TOKEN
with a config.token
set to the value of the REPLICATE_API_TOKEN
environment variable. This allows Hookdeck to make authenticated API calls to Replicate.Now, create a connection for the Replicate Audio webhooks to handle audio analysis callbacks:
1 replicate_audio = { 2 "name": "replicate-audio", 3 "source": { 4 "name": "replicate-audio", 5 "verification": { 6 "type": "REPLICATE", 7 "configs": { 8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET, 9 }, 10 }, 11 }, 12 "rules": [ 13 { 14 "type": "retry", 15 "count": 5, 16 "interval": 30000, 17 "strategy": "exponential", 18 "response_status_codes": ["!404"], 19 } 20 ], 21 "destination": { 22 "name": "cli-replicate-audio", 23 "cli_path": "/webhooks/audio", 24 }, 25 } 26 27 replicate_audio_connection = create_connection(replicate_audio)
The Replicate Audio webhook callback connection uses a
verification
of type REPLICATE
with a configs.webhook_secret_key
value set from the REPLICATE_WEBHOOKS_SECRET
value we stored in the .env
file. This enables and instructs Hookdeck to verify that the webhook has come from Replicate.The
rules
for this inbound connection are similar to the outbound connection and define a delivery retry strategy to follow if any requests to our application's webhook endpoint fail. The only difference is the response_status_codes
informs Hookdeck not to retry if it receives a 200
or 404
response.The
destination
has a name
and a cli_path
informing Hookdeck that the destination is the Hookdeck CLI and the path the request should be forwarded to is /webhooks/audio
.Next, create a connection for Replicate Embeddings webhook callbacks:
1 replicate_embedding = { 2 "name": "replicate-embedding", 3 "source": { 4 "name": "replicate-embedding", 5 "verification": { 6 "type": "REPLICATE", 7 "configs": { 8 "webhook_secret_key": Config.REPLICATE_WEBHOOKS_SECRET, 9 }, 10 }, 11 }, 12 "rules": [ 13 { 14 "type": "retry", 15 "count": 5, 16 "interval": 30000, 17 "strategy": "exponential", 18 "response_status_codes": ["!200", "!404"], 19 } 20 ], 21 "destination": { 22 "name": "cli-replicate-embedding", 23 "cli_path": "/webhooks/embedding", 24 }, 25 } 26 27 replicate_embedding_connection = create_connection(replicate_embedding)
Finally, update the
.env
file with some of the generated values:1 # Update .env 2 with open(".env", "r") as file: 3 env_content = file.read() 4 5 replicate_api_connection_url = replicate_api_connection["source"]["url"] 6 audio_webhook_url = replicate_audio_connection["source"]["url"] 7 embedding_webhook_url = replicate_embedding_connection["source"]["url"] 8 9 # Replace the .env URLs in the .env content 10 env_content = re.sub( 11 r"HOOKDECK_REPLICATE_API_QUEUE_API_KEY=.*", 12 f"HOOKDECK_REPLICATE_API_QUEUE_API_KEY={replicate_api_queue_api_key}", 13 env_content, 14 ) 15 env_content = re.sub( 16 r"HOOKDECK_REPLICATE_API_QUEUE_URL=.*", 17 f"HOOKDECK_REPLICATE_API_QUEUE_URL={replicate_api_connection_url}", 18 env_content, 19 ) 20 env_content = re.sub( 21 r"AUDIO_WEBHOOK_URL=.*", f"AUDIO_WEBHOOK_URL={audio_webhook_url}", env_content 22 ) 23 env_content = re.sub( 24 r"EMBEDDINGS_WEBHOOK_URL=.*", 25 f"EMBEDDINGS_WEBHOOK_URL={embedding_webhook_url}", 26 env_content, 27 ) 28 29 with open(".env", "w") as file: 30 file.write(env_content) 31 32 print("Connections created successfully!")
This code reads the current
.env
content, replaces the lines with existing environmental variable placeholders using regular expressions, and writes the updated content back to the .env
file. This ensures that the environment variables, such as the webhook URLs, are up-to-date.Run the script:
1 poetry run python create-hookdeck-connections.py
Check your
.env
file to ensure all values are populated.Also, navigate to the Connections section of the Hookdeck dashboard and check the visual representation of your connections.
To search a MongoDB database efficiently, you need indexes. For MongoDB vector search, you must create an Atlas Vector Search index. The
create-indexes.py
script automates the creation and updating of the search indexes in MongoDB using the pymongo
library.First, ensure you have the necessary imports and initialize the database connection:
1 from allthethings.mongo import Database 2 from pymongo.operations import SearchIndexModel 3 4 database = Database() 5 collection = database.get_collection()
Database
is defined in allthethings/mongo.py
and provides utility access to the assets
collection in the iaat
database, with these string values defined in config.py
.Next, ensure that the required collection exists within the database so that the indexes can be created:
1 if collection.name not in collection.database.list_collection_names(): 2 print("Creating empty collection so indexes can be created.") 3 collection.database.create_collection(collection.name)
With the collection created, define a function to create or update search indexes:
1 def create_or_update_search_index(index_name, index_definition, index_type): 2 indexes = list(collection.list_search_indexes(index_name)) 3 4 if len(indexes) == 0: 5 print(f'Creating search index: "{index_name}"') 6 index_model = SearchIndexModel( 7 definition=index_definition, 8 name=index_name, 9 type=index_type, 10 ) 11 collection.create_search_index(model=index_model) 12 13 else: 14 print(f'Search index "{index_name}" already exists. Updating.') 15 collection.update_search_index(name=index_name, definition=index_definition)
This function checks if an index with the given
index_name
already exists. It creates a new search index using the provided definition and type if it does not exist. If it exists, it updates the existing index with the new definition.Now, create a vector search index for embeddings:
1 vector_result = create_or_update_search_index( 2 "vector_index", 3 { 4 "fields": [ 5 { 6 "type": "vector", 7 "path": "embedding", 8 "numDimensions": 768, 9 "similarity": "euclidean", 10 } 11 ] 12 }, 13 "vectorSearch", 14 )
This creates or updates a vector search index named "vector_index" for the
embedding
field.Finally, create a search index for the
url
field, as this is used to determine if a URL has already been indexed:1 create_or_update_search_index( 2 "url_index", 3 { 4 "mappings": { 5 "fields": { 6 "url": { 7 "type": "string", 8 }, 9 }, 10 } 11 }, 12 "search", 13 ) 14 15 print("Indexes created successfully!")
Run the script:
1 poetry run python create-indexes.py
Go to the Atlas Search section within the MongoDB Atlas dashboard and check the search indexes have been created.
In one terminal window, run the Flask application:
1 poetry run python -m flask --app app --debug run
In a second terminal window, create a localtunnel using the Hookdeck CLI:
1 hookdeck listen 5000 '*'
This command listens to all Hookdeck sources connected to a CLI destination, routing webhooks to the application running locally on port 5000.
When you run the command, you will see output similar to the following:
1 Listening for events on Sources that have Connections with CLI Destinations 2 3 Dashboard 4 👉 Inspect and replay events: https://dashboard.hookdeck.com?team_id=tm_{id} 5 6 Sources 7 🔌 replicate-embedding URL: https://hkdk.events/{id} 8 🔌 replicate-audio URL: https://hkdk.events/{id} 9 10 Connections 11 replicate-embedding -> replicate-embedding forwarding to /webhooks/embedding 12 replicate-audio -> replicate-audio forwarding to /webhooks/audio 13 14 Ready! (^C to quit)
Open
localhost:5000
in your web browser to ensure the Flask app is running.With the app running, it's time to submit an asset for indexing.
Click Bruce (mp3) under the Examples header to populate the in-app search bar with a URL and click Submit.
Submitting the form sends the URL to a
/process
endpoint as a POST
request. Let's walk through what that code does.First, define the
/process
route in app.py
:1 2 def process(): 3 url = request.form["url"] 4 5 parsed_url = urlparse(url) 6 if not all([parsed_url.scheme, parsed_url.netloc]): 7 flash("Invalid URL") 8 return redirect(url_for("index"))
This route handles the
POST
request to the /process
endpoint and retrieves the URL from the form data submitted by the user. It validates the URL and redirects to the index page with an error message if it's not.Next, check if the URL already exists in the database:
1 database = Database() 2 collection = database.get_collection() 3 4 exists = collection.find_one({"url": url}) 5 6 if exists is not None: 7 flash("URL has already been indexed") 8 return redirect(url_for("index"))
If the URL is already indexed, flash a message to the user and redirect them to the index page.
Ensure the resource exists:
1 req = urllib.request.Request(url, method="HEAD") 2 fetch = urllib.request.urlopen(req) 3 4 if fetch.status != 200: 5 flash("URL is not reachable") 6 return redirect(url_for("index"))
This code sends a
HEAD
request to the URL to avoid downloading the entire file. If the URL is not reachable (status code is not 200), flash a message to the user and redirect them to the index page.Retrieve the content type and length from the response headers:
1 content_length = fetch.headers["Content-Length"] 2 content_type = fetch.headers["Content-Type"]
This code extracts the content length and content type from the response headers.
Retrieve the appropriate asset processor based on the content type:
1 processor = get_asset_processor(content_type) 2 3 if processor is None: 4 flash('Unsupported content type "' + content_type + '"') 5 return redirect(url_for("index"))
If no processor is found for the content type, flash a message to the user and redirect them to the index page.
The
get_asset_processor
function, defined in allthethings/processors.py
, returns a processor used to analyze the contents of an asset based on the content_type
.1 def get_asset_processor( 2 content_type, 3 ): 4 if "audio/" in content_type: 5 return AudioProcessor() 6 elif "video/" in content_type: 7 return None 8 elif "image/" in content_type: 9 return None 10 else: 11 return None
In this case, the file is an MP3, and the
content_type
is audio/mpeg
, so return an AudioProcessor
instance.Insert the URL, along with its content type and length, into the database with a status of
SUBMITTED
:1 asset = collection.insert_one( 2 { 3 "url": url, 4 "content_type": content_type, 5 "content_length": content_length, 6 "status": "SUBMITTED", 7 } 8 )
Process the URL using the asset processor, an
AudioProcessor
, and obtain the prediction results:1 try: 2 response = processor.process(asset.inserted_id, url) 3 except Exception as e: 4 app.logger.error("Error processing asset: %s", e) 5 collection.update_one( 6 filter={"url": url}, 7 update={ 8 "$set": { 9 "status": "PROCESSING_ERROR", 10 "error": str(e), 11 } 12 }, 13 ) 14 flash("Error processing asset") 15 return redirect(url_for("index"))
Let's look at the
AudioProcessor
from allthethings/processors.py
in more detail to understand what this does:1 import httpx 2 from config import Config 3 4 ... 5 6 class AudioProcessor: 7 def process(self, id, url): 8 input = { 9 "audio": url, 10 "model": "large-v3", 11 "language": "auto", 12 "translate": False, 13 "temperature": 0, 14 "transcription": "plain text", 15 "suppress_tokens": "-1", 16 "logprob_threshold": -1, 17 "no_speech_threshold": 0.6, 18 "condition_on_previous_text": True, 19 "compression_ratio_threshold": 2.4, 20 "temperature_increment_on_fallback": 0.2, 21 } 22 23 payload = { 24 "version": "cdd97b257f93cb89dede1c7584e3f3dfc969571b357dbcee08e793740bedd854", 25 "input": input, 26 "webhook": f"{Config.AUDIO_WEBHOOK_URL}/{id}", 27 "webhook_events_filter": ["completed"], 28 } 29 30 response = httpx.request( 31 "POST", 32 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions", 33 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS, 34 json=payload, 35 ) 36 37 return response.json()
process
method processes the audio URL by creating a prediction request passing the payload
as the JSON body.payload
includes webhooks
, which consists of the Config.AUDIO_WEBHOOK_URL
with an appended path (/{id}
) that indicates which asset the callback is for. The use of the webhook_events_filter=["completed"]
filter informs Replicate to only send a webhook when the prediction is completed.The
payload.version
instructs Replicate to use the OpenAI Whisper model for audio to text. The input
includes details such as the language should be auto-detected and the transcription should be in plain text
.Since we're using Hookdeck as an outbound API queue, the request uses the
Config.HOOKDECK_REPLICATE_API_QUEUE_URL
with the API path /predications
suffix. The appropriate auth headers are also used from Config.HOOKDECK_QUEUE_AUTH_HEADERS
.Back in
app.py
, update the database with the processing status and pending prediction details:1 collection.update_one( 2 filter={"url": url}, 3 update={ 4 "$set": { 5 "status": "PROCESSING", 6 "processor_response": response, 7 } 8 }, 9 )
The
processor_response
value is stored for debug purposes as it contains a Hookdeck request ID that can be useful.Flash a success message to the user and redirect them to the index page:
1 flash( 2 message="Processing: " + url + " with content type: " + content_type, 3 category="success", 4 ) 5 6 return redirect(url_for("index"))
At this point, the Flask application has offloaded all the work to Replicate, and, from a data journey perspective, we're waiting for the predication completed webhook.
Once Replicate completes the predication, it makes a webhook callback to Hookdeck. Hookdeck instantly ingests the webhook, verifies the event came from Replicate, and pushes the data onto a queue for processing and delivery. Based on the current Hookdeck connection setup, the webhook event is delivered to the CLI and then to the
/webhooks/audio/<id>
endpoint of the Flask application. Let's look at the code that handles the /webhooks/audio/<id>
request.Here's the
/webhooks/audio/<id>
route definition in app.py
:1 2 def webhook_audio(id): 3 if not verify_webhook(request): 4 app.logger.error("Webhook signature verification failed") 5 return jsonify({"error": "Webhook signature verification failed"}), 401 6 7 payload = request.json 8 app.logger.info("Audio payload received for id %s", id) 9 app.logger.debug(payload)
This route handles
POST
requests to the /webhooks/audio/<id>
endpoint. The id
path parameter represents the asset in the MongoDB database that the audio callback is for. The JSON payload from the webhook callback from Replicate.Before handling the webhook, we check that the webhook came from Hookdeck via a
verify_webhook
function. If the verification fails a 401
response is returned. Here's the code to verify the webhook:1 def verify_webhook(request): 2 if Config.HOOKDECK_WEBHOOK_SECRET is None: 3 app.logger.error("No HOOKDECK_WEBHOOK_SECRET found.") 4 return False 5 6 hmac_header = request.headers.get("x-hookdeck-signature") 7 8 hash = base64.b64encode( 9 hmac.new( 10 Config.HOOKDECK_WEBHOOK_SECRET.encode(), request.data, hashlib.sha256 11 ).digest() 12 ).decode() 13 14 verified = hash == hmac_header 15 app.logger.debug("Webhook signature verification: %s", verified) 16 return verified
This reads the Hookdeck webhook secret stored in the
HOOKDECK_WEBHOOK_SECRET
environment variable, generates a hash using the secret from the inbound webhook data, and compares it with the hash that was sent in the x-hookdeck-signature
header. If they match, the webhook is verified.Next, the processing status is determined based on the presence of an error in the payload:
1 database = Database() 2 collection = database.get_collection() 3 4 status = ( 5 "PROCESSING_ERROR" if "error" in payload and payload["error"] else "PROCESSED" 6 )
If an error is present, the status is set to
PROCESSING_ERROR
. Otherwise, it is set to PROCESSED
.The database is updated with the transcription results and the processing status:
1 result = collection.find_one_and_update( 2 filter={"_id": ObjectId(id)}, 3 update={ 4 "$set": { 5 "status": status, 6 "text": payload["output"]["transcription"], 7 "replicate_response": payload, 8 } 9 }, 10 return_document=True, 11 )
This finds the document in the database with the matching
id
and updates it with the new status, transcription text
, and the entire Replicate response payload.Next, we check to ensure the document was found:
1 if result is None: 2 app.logger.error( 3 "No document found for id %s to add audio transcript", payload["id"] 4 ) 5 return jsonify({"error": "No document found to add audio transcript"}), 404
If no document is found for the given
id
, an error is logged, and a JSON response with an error message is returned. The 404
response will inform Hookdeck that although the request did not succeed, the request should not be retried.With the audio converted to text and stored, the data journey moves to generating embeddings via Replicate:
1 app.logger.info("Transcription updated") 2 app.logger.debug(result) 3 4 request_embeddings(id) 5 6 return "OK"
Next, the
request_embeddings
function is called to generate embeddings for the processed audio. The endpoint returns an OK
response to inform Hookdeck the webhook has been successfully processed.The
request_embeddings
function triggers the generation of embeddings for the textual representation of an indexed asset:1 def request_embeddings(id): 2 app.logger.info("Requesting embeddings for %s", id) 3 4 database = Database() 5 collection = database.get_collection() 6 7 asset = collection.find_one({"_id": id}) 8 9 if asset is None: 10 raise RuntimeError("Asset not found") 11 12 if asset["status"] != "PROCESSED": 13 raise RuntimeError("Asset has not been processed")
If this asset with the passed
id
is not found or the status of the asset is not PROCESSED
, which indicates that a textual representation has been created, a RuntimeError
is raised.Next, the embeddings are generated for the processed asset using the
AsyncEmbeddingsGenerator
:1 generator = AsyncEmbeddingsGenerator() 2 3 try: 4 response = generator.generate(id, asset["text"]) 5 except Exception as e: 6 app.logger.error("Error generating embeddings for %s: %s", id, e) 7 raise
This initializes the
AsyncEmbeddingsGenerator
and calls the generate
function on the instance, passing the ID of the asset being indexed and the textual representation.The
AsyncEmbeddingsGenerator
definition in allthethings/generators.py
follows a similar pattern to the previously used processor:1 import httpx 2 from config import Config 3 4 5 class AsyncEmbeddingsGenerator: 6 def generate(self, id, text): 7 payload = { 8 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305", 9 "input": {"text": text}, 10 "webhook": f"{Config.EMBEDDINGS_WEBHOOK_URL}/{id}", 11 "webhook_events_filter": ["completed"], 12 } 13 14 response = httpx.request( 15 "POST", 16 f"{Config.HOOKDECK_REPLICATE_API_QUEUE_URL}/predictions", 17 headers=Config.HOOKDECK_QUEUE_AUTH_HEADERS, 18 json=payload, 19 ) 20 21 return response.json()
The
generate
method receives the asset id
and the text
that embeddings are to be generated for.A request
payload
is created containing a version
that identifies that the replicate/all-mpnet-base-v2 model is used to generate embeddings, and that the text
for the embedding is passed within an input
parameter.The
webhook
property is set to Config.EMBEDDINGS_WEBHOOK_URL
with an appended path (/{id}
) that indicates which asset the callback is for. As before, the use of the webhook_events_filter=["completed"]
filter informs Replicate to only send a webhook when the prediction is completed.Since this is an asynchronous call, Hookdeck is again used to queue the Replicate API request via a call to the
HOOKDECK_REPLICATE_API_QUEUE_URL
endpoint with the /predications
path.The method returns the Hookdeck response.
Back in
app.py
, update the database with the status and embedding request ID:1 collection.update_one( 2 filter={"_id": ObjectId(id)}, 3 update={ 4 "$set": { 5 "status": "GENERATING_EMBEDDINGS", 6 "generator_response": response, 7 } 8 }, 9 )
Update the document in the database with the new status
GENERATING_EMBEDDINGS
and the Hookdeck queue response.The request to asynchronously generate the embeddings has been triggered, and the work offloaded to Replicate. When the result is read, a webhook will be triggered with the result.
Once Replicate has generated the embedding, a webhook callback is made to the
/webhooks/embedding/<id>
route in our Flask application. This route receives the webhook payload, verifies it came from Hookdeck, updates the database with the embedding results, and sets the appropriate status.Here's the route definition:
1 2 def webhook_audio(id): 3 if not verify_webhook(request): 4 app.logger.error("Webhook signature verification failed") 5 return jsonify({"error": "Webhook signature verification failed"}), 401 6 7 payload = request.json 8 app.logger.info("Audio payload received for id %s", id) 9 app.logger.debug(payload)
This route handles
POST
requests to the /webhooks/embedding/<id>
endpoint and is passed the id
path parameter. It verifies the request came from Hookdeck and, if so, retrieves the JSON payload from the request. Otherwise, it returns a 401
response.Next, it checks for errors:
1 status = ( 2 "EMBEDDINGS_ERROR" if "error" in payload and payload["error"] else "SEARCHABLE" 3 )
If an error is present, the status is set to
EMBEDDINGS_ERROR
. Otherwise, it is set to SEARCHABLE
.Next, the vector embedding is extracted from the payload and the database is updated with the embedding details and the new status:
1 embedding = payload["output"][0]["embedding"] 2 3 database = Database() 4 collection = database.get_collection() 5 6 result = collection.update_one( 7 filter={"_id": ObjectId(id)}, 8 update={ 9 "$set": { 10 "status": status, 11 "embedding": embedding, 12 "replicate_embeddings_response": payload, 13 } 14 }, 15 )
This finds the document in the database with the matching
id
and updates it with the new status, embedding, and the entire payload.Check if the document was found and updated:
1 if result.matched_count == 0: 2 app.logger.error( 3 "No document found for id %s to update embedding", payload["id"] 4 ) 5 return jsonify({"error": "No document found to update embedding"}), 404 6 7 return "OK"
If no document is found for the given
id
, an error is logged, and a JSON response with an error message is returned with a 404
status. If the update was a success, return an OK
to inform Hookdeck that the webhook has been processed.With the vector embedding stored in the
embedding
property, it's now searchable with MongoDB due to the previously defined vector search index.Search is user-driven. The user enters a search term and submits a form. That search query is handled and processed, and the result is returned and displayed. Ideally, this is a real-time experience, so operations are performed synchronously.
Let's walk through each of those steps.
The user navigates to the
/search
endpoint in their web browser, enters a search term, and submits the form, making a GET
request to the /search
endpoint:1 2 def search(): 3 query = request.args.get("query") 4 if query is None: 5 return render_template("search.html", results=[]) 6 7 app.logger.info("Query submitted") 8 app.logger.debug(query) 9 10 results = query_vector_search(query) 11 12 results = format_results(results) 13 14 app.logger.debug("Formatted search results", results) 15 16 return render_template("search.html", results=results, query=query)
The
search
function in the Flask application handles GET
requests to the /search
endpoint. It retrieves the search query
from the request.args.get
submitted by the user. If there is no query, the search
template is rendered. Otherwise, a vector search is performed using the query_vector_search
function. The result is then formatted by passing the results to the format_results
function. The formatted results are then rendered using the search.html
template.The
query_vector_search
function generates embeddings for the query, performs a vector search using the query provided by the user, and retrieves matching documents from the MongoDB collection.1 def query_vector_search(q): 2 generator = SyncEmbeddingsGenerator() 3 4 try: 5 generator_response = generator.generate(q) 6 app.logger.debug(generator_response) 7 except Exception as e: 8 app.logger.error("Error generating embeddings: %s", e) 9 return None 10 11 if generator_response["status"] != "completed": 12 app.logger.debug("Embeddings generation timed out") 13 return None 14 15 query_embedding = generator_response["output"][0]["embedding"]
The function takes the query,
q
, and uses the SyncEmbeddingsGenerator
to generate the embedding for the search query by calling its generate
function and passing the query. If the embedding creation fails for various reasons, None
is returned.The
SyncEmbeddingsGenerated
is used to synchronously generate embeddings for the search query. This operation is synchronous because the request is user-driven and requires a direct response. SyncEmbeddingsGenerated
is defined in allthethings/generators.py
:1 class SyncEmbeddingsGenerator: 2 def generate(self, text): 3 payload = { 4 "version": "b6b7585c9640cd7a9572c6e129c9549d79c9c31f0d3fdce7baac7c67ca38f305", 5 "input": {"text": text}, 6 } 7 8 response = httpx.request( 9 "POST", 10 "https://api.replicate.com/v1/predictions", 11 headers={**Config.REPLICATE_API_AUTH_HEADERS, "Prefer": "wait"}, 12 json=payload, 13 timeout=60, 14 ) 15 16 return response.json()
The
generate
function receives the text
to generate an embedding from. A synchronous request is made directly to the Replicate HTTP API, passing the same replicate/all-mpnet-base-v2 model version
used in the asynchronous embedding request. The "Prefer": "Wait"
header and timeout
values are set to enable long-running synchronous HTTP requests. Also, the Replicate API token is included in the headers via Config.REPLICATE_API_AUTH_HEADERS
.The response JSON is returned to the calling function.
Back in
query_vector_search
, the embedding result is used to construct the vector search query.1 ... 2 3 query_embedding = generate_response[0]["embedding"] 4 5 vs_query = { 6 "index": "vector_index", 7 "path": "embedding", 8 "queryVector": query_embedding, 9 "numCandidates": 100, 10 "limit": 10, 11 } 12 13 new_search_query = {"$vectorSearch": vs_query} 14 15 app.logger.info("Vector search query created") 16 app.logger.debug(new_search_query)
vs_query
represents the vector search to be performed. It identifies the index
to be queried as vector_index
; the path
to the property, embedding
, the query is on; and the result of the text query in embedding format ("queryVector": query_embedding
). See the MongoDB Vector Search docs for more information, including the purpose of the numCandidates
and limit
properties.Next, the function defines the projection to specify which fields to include in the search results.
1 project = { 2 "$project": { 3 "score": {"$meta": "vectorSearchScore"}, 4 "_id": 0, 5 "url": 1, 6 "content_type": 1, 7 "content_length": 1, 8 "text": 1, 9 } 10 }
The projection includes the vector search score, URL, content type, content length, and text. For more information on the score, see the Atlas Vector Search Score docs.
The function then performs the aggregation query using the constructed vector search query and projection:
1 database = Database() 2 collection = database.get_collection() 3 4 app.logger.info("Vector search query without post filter") 5 res = list(collection.aggregate([new_search_query, project])) 6 7 app.logger.info("Vector search query run") 8 app.logger.debug(res) 9 return res
Overall, the
query_vector_search
function performs a vector search using the query provided by the user, generates embeddings for the query, and retrieves matching documents from the MongoDB database.Next, within
search_post
in app.py
, the results are formatted for rendering:1 results = format_results(results)
And within
format_results
, also defined in app.py
:1 def format_results(results): 2 formatted_results = [] 3 for _idx, index in enumerate(results): 4 parse_result = urlparse(index["url"]) 5 parsed_url = { 6 "netloc": parse_result.netloc, 7 "path": parse_result.path, 8 "params": parse_result.params, 9 "query": parse_result.query, 10 "fragment": parse_result.fragment, 11 "hostname": parse_result.hostname, 12 "last_part": parse_result.path.rstrip("/").split("/")[-1], 13 } 14 index["parsed_url"] = parsed_url 15 formatted_results.append(index) 16 17 return formatted_results
The
format_results
function iterates over the vector search result and returns an array with each element containing the result along with a parsed_url
property with information about the indexed asset.Finally, back in the
POST /search
route, the results are displayed:1 2 def search_post(): 3 ... 4 5 results = format_results(results) 6 7 return render_template("search.html", results=results, query=query)
This renders the
search.html
template, passing the formatted results and the original query to the template for display.In this tutorial, we walked through the components used in a Flask application that can index and run a text search on any asset with a public URL. Data is stored, and vector search takes place via MongoDB. AI inference and embedding generation is performed by Replicate. Hookdeck is used as a serverless queue between the Flask application and Replicate to manage API request rate-limiting to Replicate, and to verify, queue, and guarantee the delivery of asynchronous webhook callbacks from Replicate back to the Flask application.
If you haven't already done so, you can grab the Index All The Things code on GitHub. There are a number of feature issues to add support for additional content types, so feel free to get involved.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Relacionado
Artigo
Mantendo seus custos baixos com as instâncias sem servidor do MongoDB Atlas
Oct 01, 2024 | 3 min read
Tutorial
Aplicativo agente Sentiment Chef com Google Cloud e MongoDB Atlas
Jun 24, 2024 | 16 min read