Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

​​Reinventando a pesquisa multimodal com MongoDB e Anyscale

KK
MS
Kamil Kaczmarek, Marwan Sarieddine20 min read • Published Sep 18, 2024 • Updated Sep 18, 2024
IAFastAPIPesquisa vetorialPythonAtlas
APLICATIVO COMPLETO
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty

Resumo

Neste guia, percorreremos uma solução abrangente para melhorar um sistema de pesquisa legado em dados multimodais usando Anyscale e MongoDB. Mostraremos como construir:
Um pipeline de indexação de dados multimodal e dimensionável que executa tarefas complexas como:
  • Inferência em lote de modelo multimodal.
  • Geração de incorporação de vetores.
  • Inserindo dados em um índice de pesquisa.
  • Atualizações Delta do índice de pesquisa.
} Um backend de pesquisa híbrida de alto desempenho que:
  • Modelos de incorporação auto-hospedados.
  • Combina a correspondência de texto léxico com a consulta de pesquisa semântica.
🖥️ Uma interface de usuário simples para interagir com o backend de pesquisa.
Estaremos utilizando:
MongoDB cloud como repositório central de dados para:
  • Habilitando a pesquisa vetorial em vários campos e dimensões por meio MongoDB Atlas Vector Search.
  • Sendo uma escolha onipresente no espaço por sua flexibilidade e escalabilidade.
  • Suporte ao armazenamento de dados multimodais, como imagens, texto e dados estruturados.
Plataforma Anyscale como plataforma de computação de AI para:
  • Executando trabalhos de computação de inferência em lote de alto desempenho.
  • Habilitando implantações altamente disponíveis e escaláveis.
  • Dimensionando e utilizando de forma ideal os recursos de computação custosos.
Este guia foi extraído do artigo da Anyscale: Reinventing Multi-Modal Search with Anyscale and MongoDB.
Para obter um tutorial básico sobre a integração do MongoDB com o Anyscale, leia: Construindo aplicativos de AI e RAG com o MongoDB, o Anyscale e o PyMongo

O problema com sistemas de busca antigos

As empresas que lidam com um grande volume de dados multimodais geralmente exigem um sistema de pesquisa robusto e de alto desempenho. No entanto, os sistemas de pesquisa tradicionais têm certas limitações que precisam ser abordadas:

Suporte inadequado para dados não estruturados

Os sistemas de pesquisa legados normalmente oferecem apenas correspondência lexical para dados de texto, enquanto dados não estruturados de outras formas, como imagens, permanecem não pesquisáveis.

Dependência da qualidade e relevância dos dados

Se determinados campos de metadados estiverem ausentes ou forem de baixa qualidade, o sistema de pesquisa não poderá utilizá-los eficazmente. Em pequenos conjuntos de dados, esses problemas de metadados podem ser facilmente corrigidos, mas, dada a escala dos conjuntos de dados corporativos, a curadoria e a melhoria manuais geralmente não são uma opção.
Para superar essas limitações, faremos uso de modelos generativos e de incorporação para enriquecer e codificar os dados, permitindo uma experiência de pesquisa mais sofisticada.

Caso de uso: melhorar as buscas para uma plataforma de e-commerce

Em nosso exemplo, abordaremos o seguinte caso de uso: uma plataforma de e-commerce que possui um grande catálogo de produtos e gostaria de melhorar sua relevância e experiência de pesquisa.
O conjunto de dados que usaremos é o conjunto de dados Myntra, que contém imagens e metadados de produtos da Myntra, uma empresa indiana de comércio eletrônico de moda. O objetivo é melhorar os recursos de pesquisa da plataforma implementando um sistema de busca multimodal escalável que possa lidar com dados de texto e imagem.
O sistema de pesquisa legado permite apenas:
  • Pesquisa lexical em relação ao nome do produto.
  • Correspondência no preço e classificação do produto.
Por exemplo, realizar uma pesquisa por "green dress " retornará itens cujo nome de produto corresponde ao termo "green " ou "dress " (usamos MongoDB Atlas, que converte o texto em letras minúsculas e o divide com base nos limites das palavras para criar tokens separados para correspondência — os resultados da pesquisa são classificados usando uma pontuação BM25 .)
Abaixo está uma captura de tela mostrando os resultados retornados da consulta “green dress”. Observe como os nomes dos produtos mostrados contêm o token "green " ou "dress " (principalmente "green " na captura de tela mostrada).
Pesquisa legada
Devido às limitações do sistema de pesquisa legado, os resultados correspondentes contêm itens que não são relevantes para a consulta e a intenção dos usuários, como "Bio Green Apple Shampoo." A precisão da pesquisa lexical existente é limitada pela qualidade do nome do produto fornecido e ainda sofrerá com a lembrança insatisfatória de itens que não contêm informações relevantes no nome do produto.

Solução: Melhorando a pesquisa com Anyscale e MongoDB

Em um alto nível, nossa solução consistirá em usar o Anyscale para executar:
  • Modelos de linguagem grande (LLMs) multimodais e gerar descrições de produtos a partir da imagem e do nome do produto fornecidos.
  • Modelos de linguagem grande e campos de metadados de produto gerados que são úteis para pesquisa.
  • Incorporar modelos para codificar os nomes dos produtos e as descrições geradas e indexar os vetores numéricos resultantes em um mecanismo de busca vetorial como o MongoDB Atlas Vector Search.
Abaixo está uma captura de tela mostrando os resultados retornados da consulta "green dress" depois de melhorar nosso mecanismo de pesquisa. Observe como os produtos mostrados são mais relevantes para a consulta e a intenção dos usuários, graças aos recursos de pesquisa semântica do novo sistema. O novo sistema utiliza imagens para agregar valor ao significado semântica do nome do produto, melhorando a capacidade de pesquisa. Por exemplo, a modelo pode buscar itens relevantes que não contêm o símbolo “green” no nome do produto, mas que são, na verdade, vestidos verdes.
Vestido multimodal
Além disso, à esquerda, podemos ver filtros de metadados gerados por IA que podem ser usados para refinar ainda mais os resultados da pesquisa. Para a mesma query “green dress,”, podemos filtrar ainda mais os resultados por campos de metadados, como "category, ", "season, " e "color ", para corresponder estritamente a filtros como "color=green, "category=dress”; e época="summer or spring. "
Multimodal com filtro

Uma discussão sobre abordagens alternativas para pesquisar dados multimodais

Observe que uma abordagem alternativa para codificar imagens é fazer uso de modelos de incorporação multimodais como o CLIP. Para saber mais sobre essa abordagem, consulte a postagem do blog da Anyscale sobre pesquisa intermodal para comércio eletrônico. Essa abordagem pode ser menos intensiva computacionalmente do que usar um modelo generativo como o LLaVA, mas não permite condicionar o significado semântico gerado da imagem com o nome do produto. Por exemplo, uma foto de um modelo vestindo muitas peças de roupa ou contra um fundo de outros itens pode ter seu sinal semântico dissipado em todos os itens da foto, tornando-a muito menos útil (veja a imagem de "Girls Embellished Net Maxi Dress" na captura de tela acima com o modelo infantil falando ao telefone como exemplo).

Arquitetura no nível do sistema

Dividimos nosso sistema em um estágio de indexação de dados offline e um estágio de pesquisa online. O estágio de indexação de dados precisa ser executado periodicamente sempre que novos dados são adicionados ao sistema, enquanto o estágio de pesquisa é um serviço on-line que lida com solicitações de pesquisa.

Estágio de indexação de dados

O estágio de indexação é responsável por processar, incorporar e atualizar textos e imagens em um repositório de dados central – neste caso, o MongoDB. Os principais componentes do estágio de indexação são:
  • Enriquecimento de metadados: Utiliza modelos multimodais de linguagem ampla para enriquecer as descrições de produtos e classificadores LLM para gerar campos de metadados.
  • Geração de inserção: utiliza modelos de inserção para gerar inserções dos nomes e descrições do produto.
  • ingestão de dados: executa atualizações emmassa e insere em uma collection MongoDB que oferece suporte a pesquisa vetorial usando o MongoDB Atlas Vector Search.
Aqui está um diagrama detalhado do pipeline: Pipeline de upload em qualquer escala

$search estágio

O estágio de pesquisa é responsável por combinar a correspondência de texto legado com recursos avançados de pesquisa semântica. Os principais componentes do estágio de pesquisa são:
  • Frontend: fornece uma interface do usuário baseada em Gradio para interagir com o backend de pesquisa.
  • Backend: implementa o backend de pesquisa híbrida.
Abaixo está um diagrama de sequência do estágio de pesquisa, mostrando o fluxo de solicitação de pesquisa, que envolve principalmente as seguintes etapas:
  1. Envie uma solicitação de pesquisa do frontend.
  2. Processe a solicitação na implantação do ingresso.
  3. Encaminhe a query de texto da solicitação para o modelo de incorporação para gerar incorporações.
  4. Execute uma pesquisa vetorial no banco de dados MongoDB.
  5. Retorne os resultados da pesquisa para a implantação do ingresso para criar uma resposta.
  6. Retorne a resposta para o frontend.
Diagrama de sequência Anyscale

Aprofundamento: passo a passo detalhado do código

Depois de analisarmos a arquitetura de alto nível, passaremos agora pela implementação dos principais componentes da nossa solução. Se você quiser aprender a codificar isso sozinho, continue lendo...

Pré-requisitos do MongoDB Atlas

Implemente seu cluster do Atlas
Se você não criou um Atlas cluster, siga nosso guia.
Obtenha a cadeia de conexão do MongoDB e permita o acesso a partir de endereços IP relevantes. Para este tutorial, você usará a lista de acesso à rede0.0.0.0/0 .
Para implementações de produção, os usuários podem gerenciar VPCs e redes privadas na plataforma Anyscale e conectá-las com segurança por meio de emparelhamento de VPC ou endpoints privados de nuvem ao Atlas.
Crie uma collection, índice vetorial Atlas e um índice de texto completo
Usando o Data Explorer ou uma conexão Compass, crie a seguinte coleção para hospedar o contexto do aplicativo e o mecanismo de pesquisa. Use o nome do banco de dados de sua escolha (por exemplo. myntra) e nome da coleção de sua escolha (por exemplo. myntra-items-offline). Depois que a coleção for criada, vá para a aba Atlas Search (ou para a aba Índice no Compass usando o botão de alternância Atlas search) e crie o seguinte índice vetorial, essa configuração deve ser consistente com os valores de backend do aplicativo:
nome : vector_search_index
1{
2 "fields": [
3 {
4 "numDimensions": 1024,
5 "similarity": "cosine",
6 "type": "vector",
7 "path": "description_embedding"
8 },
9 {
10 "numDimensions": 1024,
11 "similarity": "cosine",
12 "type": "vector",
13 "path": "name_embedding"
14 },
15 {
16 "type": "filter",
17 "path": "category"
18 },
19 {
20 "type": "filter",
21 "path": "season"
22 },
23 {
24 "type": "filter",
25 "path": "color"
26 },
27 {
28 "type": "filter",
29 "path": "rating"
30 },
31 {
32 "type": "filter",
33 "path": "price"
34 }
35 ]
36}
Além disso, configure o índice de Full Text Search do Atlas:
nome : lexical_text_search_index
1{
2 "mappings": {
3 "dynamic": false,
4 "fields": {
5 "name": {
6 "type": "string",
7 "analyzer": "lucene.standard"
8 }
9 }
10 }
11}
Observe que, com clusters de "nível pago", você pode utilizar o driver para criar os índices, conforme mostrado no repositório do GitHub aqui.

Indexação de dados pipelines de dados multimodais em escala

Começamos detalhando como implementar pipelines de dados multimodais em grande escala. Os pipelines de dados são projetados para lidar com dados de texto e imagem executando instâncias de modelo de linguagem grande multimodal.

Enriquecimento de metadados: geração de descrições de produtos a partir de imagens

Usamos o Ray Data para implementar nossos pipelines de dados para execução em escala. O Ray Data é uma biblioteca que fornece uma API de alto nível para a criação de pipelines de dados dimensionáveis que podem ser executados usando computação heterogênea. O Ray Data foi desenvolvido com base no Ray, uma estrutura de computação distribuída que nos permite dimensionar facilmente nosso código Python em um cluster de máquinas.
Abaixo está um diagrama do pipeline de dados para gerar descrições de produtos a partir de imagens: Processamento Anyscale
As principais etapas mostradas no diagrama são:
  1. Leia os dados usando o PyArrow em CPUs e processe-os.
  2. Estime a distribuição de token de entrada/saída usando tokenizadores em CPUs.
  3. Execute a inferência do modelo LLaVA para gerar descrições de produtos usando vLLM em GPUs.
Observe que os resultados intermediários são armazenados em um armazenamento distribuído na memória, chamado de Object Store no diagrama acima.

Ler e processar dados

A API da Ray Data adota execução lenta, o que significa que as operações de processamento de dados só são executadas quando os dados são necessários. Começamos especificando como construir um conjunto de dados Ray Data usando um dos conectores IO. Em seguida, aplicamos transformações ao objeto Dataset usando as operações de mapa e filtro que podem ser aplicadas em paralelo nos dados, seja em linha ou em lotes.
Aqui está a implementação do pipeline Ray Data para leitura e processamento dos dados:
1ds = ray.data.read_csv(path, ...)
2ds = (
3ds.map_batches(download_images, concurrency=num_download_image_workers,
4num_cpus=4)
5.filter(lambda x: bool(x["img"]))
6.map(LargestCenterSquare(size=336))
7.map(gen_description_prompt)
8.materialize()
9)
O código acima irá:
  1. Ler os dados do armazenamento do data lake.
  2. Baixe as imagens em paralelo usando a função download_images.
  3. Filtre as imagens inválidas/vazias.
  4. Corte as imagens para o maior quadrado central usando o recurso de chamada LargestCenterSquare.
  5. Gere o prompt de descrição para o modelo utilizando a função gen_description_prompt.
  6. Materialize o conjunto de dados, que aciona a execução do pipeline e o armazenamento dos resultados na memória.
Usamos o llava-hf/llava-v1.Modelo 6-mistral-7b-hf para gerar descrições de produtos com base na imagem e no nome do produto.
Esta é a função que usaremos para gerar prompts para o modelo:
1def gen_description_prompt(row: dict[str, Any]) -> dict[str, Any]:
2 title = row["name"]
3 row["description_prompt"] = "<image>" * 1176 + (
4 f"\nUSER: Generate an ecommerce product description given the image
5 and this title: {title}."
6 "Make sure to include information about the color of the product in
7 the description."
8 "\nASSISTANT:"
9 )
10 return row
map e filter usam uma função que opera em uma única linha do conjunto de dados, portanto, por que gen_description_prompt espera uma linha. Por outro lado, map_batches usa uma função que opera em um lote de linhas - ou seja, download_images esperará uma entrada em lote.
Para a implementação completa do pipeline de dados, consulte nosso repositório GitHub.

Estime a distribuição de tokens de entrada/saída para o modelo LLaVA

Em seguida, passamos a calcular a distribuição de token de entrada/saída para o modelo LLaVA. Isso é necessário para otimizar o uso do vLLM, nosso mecanismo de inferência escolhido. Os valores padrão de distribuição de tokens de entrada/saída assumidos pelo vLLM deixam muito desempenho de lado.
O vLLM é uma biblioteca para geração de alta taxa de transferência de modelos LLM, aproveitando várias otimizações de desempenho, principalmente:
  • Gerenciamento eficiente da chave de atenção e da memória de valor com PagedAttention.
  • Execução rápida do modelo com gráfico CUDA/HIP.
  • Quantization: GPTQ, AWQ, SqueezeLLM, FP8 KV Cache.
  • Kernels CUDA otimizados.
Abaixo está uma programação do processo de geração de uma solicitação com PagedAttention feita na postagem do blog vLLM. Esse processo de geração permite que o vLLM aloque de forma ideal a memória da CPU ao armazenar o cache KV de um modelo baseado em um transformação, tornando possível processar mais sequências em paralelo (ou seja, liberando gargalos de taxa de transferência limitada à memória). Animação de processamento
Portanto, para maximizar a capacidade de cache do KV, é melhor especificar o número máximo de tokens que cada sequência consumirá. Isso informará ao vLLM sobre o número máximo de blocos que cada sequência consumirá — e, portanto, o número máximo de sequências que podem ser processadas em paralelo.
Para fazer isso, calculamos o máximo de tokens de entrada executando um LlaVAMiscalTokenizer no campo descrição_prompt usando o código abaixo:
1max_input_tokens = (
2ds.map_batches(
3LlaVAMistralTokenizer,
4fn_kwargs={
5"input": "description_prompt",
6"output": "description_prompt_tokens",
7},
8concurrency=num_llava_tokenizer_workers,
9num_cpus=1,
10)
11.select_columns(["description_prompt_tokens"])
12.map(compute_num_tokens, fn_kwargs={"col": "description_prompt_tokens"})
13.max(on="num_tokens")
14)
Definimos o número máximo de tokens de saída para 256 já que não queremos descrições de produtos muito longas. Em seguida, calculamos o comprimento máximo do modelo como a soma dos tokens máximos de entrada e saída:
1max_output_tokens = 256
2max_model_length = max_input_tokens + max_output_tokens
Agora que calculamos os máximos de token de entrada/saída, podemos executar o modelo LLaVA.

Inferência do modelo LLaVA

Aqui está o código Ray Data para executar o modelo LLaVA:
1ds = ds.map_batches(
2LlaVAMistral,
3fn_constructor_kwargs={
4"max_model_len": max_model_length,
5"max_tokens": max_output_tokens,
6"max_num_seqs": 400,
7},
8fn_kwargs={"col": "description_prompt"},
9batch_size=llava_model_batch_size,
10num_gpus=1,
11concurrency=num_llava_model_workers,
12accelerator_type=llava_model_accelerator_type,
13)
Fazemos uso de map_batches sempre que podemos nos beneficiar de operações vetorizadas nos dados. Este é o caso da inferência do modelo LLaVA, onde podemos processar múltiplas sequências em paralelo na GPU. Consideramos que um tamanho de lote de 80 é ideal, dadas as restrições de memória de uma CPU A10 e os parâmetros especificados do mecanismo.
Uma classe LlaVAMiscal é definida como:
1class LlaVAMistral:
2 def __init__(self, max_model_len: int, ...):
3 self.llm = LLM(...)
4
5 def __call__(self, batch: dict[str, np.ndarray], col: str) -> dict[str, np.ndarray]:
6 prompts = batch[col]
7 images = batch["img"]
8 responses = self.llm.generate(
9 [
10 {
11 "prompt": prompt,
12 "multi_modal_data": ImagePixelData(image),
13 }
14 for prompt, image in zip(prompts, images)
15 ],
16 sampling_params=self.sampling_params,
17 )
18 batch["description"] = [resp.outputs[0].text for resp in responses] # type: ignore
19 return batch
Este é um exemplo de uma transformação com estado no Ray Data, em que um estado caro, como o carregamento de um modelo LLM, pode ser feito no construtor e, em seguida, o modelo pode ser usado para gerar respostas no método __call__. O que isso faz é gerar um processo de trabalho de longa duração com o modelo carregado na memória e, em seguida, o método__call__ é chamado em cada lote de dados enviado ao processo de trabalho.

Enriquecimento de metadados: Geração de classificações de produtos a partir de descrições

Aqui está um diagrama do pipeline de dados para gerar classificações de produtos a partir de descrições: Processamento paralelo
As principais etapas do pipeline são:
  1. Construa prompts e tokenize-os para o modelo Miscal em CPUs.
  2. Estime a distribuição do token de entrada/saída para o modelo Mistral usando CPUs.
  3. Execute a inferência do modelo Mistral para gerar classificações de produtos usando vLLM em GPUs.

Construa prompts e tokenize para o modelo Miscal

Começamos construindo prompts para os classificadores e tokenizando-os para o modelo Mistral. Aqui está o código para nossa especificação de classificadores:
1classifiers: dict[str, Any] = {
2 "category": {
3 "classes": ["Tops", "Bottoms", "Dresses", "Footwear", "Accessories"],
4 "prompt_template": (
5 "Given the title of this product: {title} and "
6 "the description: {description}, what category does it belong to?"
7 "Chose from the following categories: {classes_str}. "
8 "Return the category that best fits the product. Only return the"
9 "category name and nothing else."
10 ),
11 "prompt_constructor": construct_prompt_classifier,
12 },
13 "season": {
14 "classes": ["Summer", "Winter", "Spring", "Fall"],
15 "prompt_template": ...,
16 "prompt_constructor": construct_prompt_classifier,
17 },
18 "color": {
19 ...
20 }
21}
Continuamos a construir prompts e tokenizá-los para cada classificador usando Ray Data e uma implementação de tokenizador de modelo Mistral.
1for classifier, classifier_spec in classifiers.items():
2 ds = (
3 ds.map(
4 classifier_spec["prompt_constructor"],
5 fn_kwargs={
6 "prompt_template": classifier_spec["prompt_template"],
7 "classes": classifier_spec["classes"],
8 "col": classifier,
9 },
10 )
11 .map_batches(
12 MistralTokenizer,
13 fn_kwargs={
14 "input": f"{classifier}_prompt",
15 "output": f"{classifier}_prompt_tokens",
16 },
17 concurrency=num_mistral_tokenizer_workers_per_classifier,
18 num_cpus=1,
19 )
20 .materialize()
21 )

Estimando a distribuição de tokens de entrada/saída para o modelo Miscal

Semelhante ao modelo LLaVA, estimamos a distribuição de tokens de entrada/saída para o modelo Mistral:
1for classifier, classifier_spec in classifiers.items():
2 max_input_tokens = (
3 ds.select_columns([f"{classifier}_prompt_tokens"])
4 .map(compute_num_tokens, fn_kwargs={"col": f"{classifier}_prompt_tokens"})
5 .max(on="num_tokens")
6 )
7 max_output_tokens = classifier_spec["max_output_tokens"]
8 max_model_length = max_input_tokens + max_output_tokens
9 classifier_spec["max_model_length"] = max_model_length

Inferência do modelo Mistral

Por fim, executamos a inferência do modelo Mistral para gerar classificações de produtos mapeando lotes para a transformação com estado do MistralVLLM, conforme visto no código abaixo:
1for classifier, classifier_spec in classifiers.items():
2 ds = (
3 ds.map_batches(
4 MistralvLLM,
5 ...,
6 batch_size=80,
7 num_gpus=num_mistral_workers_per_classifier,
8 concurrency=1,
9 accelerator_type=NVIDIA_TESLA_A10G,
10 )
11 .map(
12 MistralDeTokenizer,
13 fn_kwargs={"key": f"{classifier}_response"},
14 concurrency=num_detokenizers_per_classifier,
15 num_cpus=1,
16 )
17 .map(
18 clean_response,
19 fn_kwargs={
20 "classes": classifier_spec["classes"],
21 "response_col": f"{classifier}_response",
22 },
23 )
24 )
Observe que, diferentemente do modelo LLaVA, optamos por desacoplar o processo de destokenização e a limpeza de resposta. Fizemos isso para mostrar que podemos escalar de forma independente as etapas de processamento dentro do pipeline. Por fim, a capacidade de desacoplar volumes de trabalho complexos e de computação intensiva ajudará a resolver os gargalos de desempenho. Isso é possível, dada a facilidade com que podemos dimensionar automaticamente um cluster heterogêneo de trabalhadores com o Anyscale. O Anyscale escalará os nós automaticamente para cima ou para baixo com tempos de inicialização otimizados para dimensionar elasticamente o cluster com nós de CPU e CPU.
Para visualizar a implementação completa do pipeline de enriquecimento de metadados, consulte nosso repositório GitHub.

Geração de embeddings e ingestão de dados

Em seguida, geramos embeddings para os nomes e as descrições dos produtos usando um modelo de embedding.
Abaixo está um diagrama do pipeline de dados para gerar embeddings:
As principais etapas do pipeline são:
  1. Execute o modelo de incorporação para gerar incorporações.
  2. Faça a ingestão dos dados no MongoDB.

Geração de incorporações com Ray Data

Aqui está o código do Ray Data para gerar incorporações:
1ds = ds.map_batches(
2 EmbedderSentenceTransformer,
3 fn_kwargs={"cols": ["name", "description"]},
4 batch_size=80,
5 num_gpus=1,
6 concurrency=num_embedder_workers,
7 accelerator_type=NVIDIA_TESLA_A10G,
8)
Quando uma classe EmbedderSentenceTransformer é definida como:
1class EmbedderSentenceTransformer:
2 def __init__(self, model: str = "thenlper/gte-large"):
3 self.model = SentenceTransformer(model, device="cuda")
4
5 def __call__(self, batch: dict[str, np.ndarray], cols: list[str]) -> dict[str, np.ndarray]:
6 for col in cols:
7 batch[f"{col}_embedding"] = self.model.encode( # type: ignore
8 batch[col].tolist(), batch_size=len(batch[col])
9 )
10 return batch

Ingestão de dados em escala com Data e PyMongo

Finalmente, continuamos ingerindo os dados processados no MongoDB usando o PyMongo. Aqui está o código do Raio Data para ingestão dos dados. Observe que optamos por usar o MongoBulkInsert ou MongoBulkUpdate, dependendo se estamos realizando a primeira execução ou uma atualização no banco de dados. Certificamo-nos de definir a simultaneidade para um valor razoável, o que evita uma tempestade de conexões em nosso cluster do MongoDB. O número de conexões que o banco de dados pode manipular dependerá do tamanho do cluster escolhido.
1mongo_bulk_op: Type[MongoBulkInsert] | Type[MongoBulkUpdate]
2if mode == "first_run":
3 mongo_bulk_op = MongoBulkInsert
4elif mode == "update":
5 mongo_bulk_op = MongoBulkUpdate
6
7(
8 ds.map_batches(update_record)
9 .map_batches(
10 mongo_bulk_op,
11 fn_constructor_kwargs={
12 "db": db_name,
13 "collection": collection_name,
14 },
15 batch_size=db_update_batch_size,
16 concurrency=num_db_workers,
17 num_cpus=0.1,
18 batch_format="pandas",
19 )
20 .materialize()
21)
As classes MongoBulkUpdate e MongoBulkInsert fazem uso da biblioteca PyMongo para executar operações em massa. Abaixo está um exemplo de implementação da classe MongoBulkUpdate:
1class MongoBulkUpdate:
2 def __init__(self, db: str, collection: str) -> None:
3 client = MongoClient(os.environ["DB_CONNECTION_STRING"])
4 self.collection = client[db][collection]
5
6 def __call__(self, batch_df: pd.DataFrame) -> dict[str, np.ndarray]:
7 docs = batch_df.to_dict(orient="records")
8 bulk_ops = [
9 UpdateOne(filter={"_id": doc["_id"]}, update={"$set": doc}, upsert=True)
10 for doc in docs
11 ]
12 self.collection.bulk_write(bulk_ops)
13 return {}
Para visualizar a implementação completa do pipeline de indexação de dados, consulte nosso repositório GitHub.

Execução de pipeline usando Anyscale Jobs

Desenvolvemos e testamos nosso pipeline de dados em um espaço detrabalho Anyscale para usar a experiência do VSCode IDE em execução em um cluster de computação elástica. Agora que construímos o pipeline, estamos prontos para escaloná-lo. Para fazer isso, usamos Anyscale Jobs, que é a melhor maneira de executar volumes de trabalho em lote na produção.
Podemos enviar facilmente um trabalho em Anyscale de nosso espaço de trabalho usando o terminal VSCode. Tudo o que precisamos é de um arquivo de configuração YAML, onde especificamos:
  • Nome: o nome do trabalho do Anyscale que estamos lançando.
  • Ponto de entrada: queremos executar o script cli.py que executa nosso pipeline.
  • Diretório de trabalho: este é o diretório que contém os arquivos necessários para executar o ponto de entrada.
  • Requisitos: dependências adicionais a serem instaladas ao configurar o ambiente de execução do trabalho.
  • Variáveis de ambiente: token de acesso Hugging Face e cadeias de conexão de banco de dados.
  • Configuração de computação: o tipo e o número de nós a serem habilitados para o escalonamento automáticodo cluster.
Fornecemos nosso arquivo job.yaml de configuração do trabalho abaixo:
1name: enrich-data-and-upsert
2entrypoint: python cli.py ...
3working_dir: .
4requirements: requirements.txt
5env_vars:
6 DB_CONNECTION_STRING: <your mongodb connection string>
7 HF_TOKEN: <your huggingface token>
8compute_config:
9 cloud: "Anyscale Cloud"
10 head_node:
11 instance_type: m5.8xlarge
12 worker_nodes:
13 - instance_type: m5.8xlarge
14 min_nodes: 0
15 max_nodes: 10
16 - instance_type: g5.xlarge
17 min_nodes: 0
18 max_nodes: 40
Para enviar o trabalho no terminal, use o seguinte comando:
1anyscale job submit -f job.yaml
Essa abordagem nos permite executar nosso pipeline em um cluster gerenciado que contém apenas as métricas e os logs do nosso trabalho. Além disso, como estamos executando esse trabalho no Anyscale, o Anyscale nos notificará automaticamente sobre quaisquer falhas e tentará novamente automaticamente se ocorrer uma falha.
Sempre que novos dados forem disponibilizados ou que forem feitas alterações nos dados existentes, desejamos executar um trabalho do Anyscale que primeiro gere novas descrições de produtos, metadados e incorporações e, em seguida, execute atualizações em massa em nossa coleção do MongoDB. Isso é feito executando o mesmo comandoanyscale job submit -f job.yaml, mas com um arquivo job.yaml atualizado em que os argumentos do ponto de entrada apontam para os novos dados e especificam explicitamente a execução no modo "update" ".
Uma coisa a observar é que, em um ambiente de produção, isso geralmente é obtido integrando uma ferramenta de orquestração preferida com o Anyscale, por meio de integrações nativas ou usando programaticamente o SDK do Anyscale.

Construindo e distribuindo o aplicativo de pesquisa híbrida

O aplicativo de pesquisa é composto por vários componentes que funcionam juntos para fornecer uma experiência de pesquisa híbrida. Cada componente é uma implantação do Ray Serve com escalonamento automático que pode ser dimensionada de forma independente para atender às demandas do sistema .
Abaixo está um diagrama do backend do aplicativo de pesquisa com o qual o usuário interagirá por meio do frontend:
Em um nível alto, o backend de pesquisa consiste em:
  • Implementação de entrada: Recebe solicitações de pesquisa do frontend e as encaminha para a implementação de backend apropriada.
  • Implantação “Query Legacy”: manipula a pesquisa de texto léxico legado no MongoDB database.
  • Sistema do “Query with AI Enabled: Lida com a realização de pesquisa híbrida no MongoDB database.
  • Implantação do modelo de incorporação: gera incorporações para a consulta de pesquisa.
Esta é uma implementação de amostra do backend de pesquisa que mostra como compor recursos de pesquisa legados e novos por meio de uma camada de lógica de negócios. Ao implementar a lógica comercial personalizada na implementação de entrada, você pode controlar quais usuários são expostos a quais recursos de pesquisa. Por exemplo, considere a possibilidade de expor os recursos de pesquisa habilitados para IA apenas a um subconjunto de usuários ou apenas para determinadas consultas.

Definindo a implantação do Ingress

O Ray Serve se integra aoFastAPI para fornecer uma maneira simples e escalável de criar APIs. Abaixo está como definimos nossa implantação de entrada. Observe que decoramos a classe com o decorador @deployment para indicar que é uma implantação doRay Serve. Também decoramos a classe com o decorador @ingress para indicar que é a implantação de entrada.
1fastapi = FastAPI()
2
3@deployment
4@ingress(fastapi)
5class QueryApplication:
6 def __init__(self, query_legacy: QueryLegacySearch, query_ai_enabled: QueryAIEnabledSearch):
7 self.query_legacy = query_legacy
8 self.query_ai_enabled = query_ai_enabled
9
10 @fastapi.get("/legacy")
11 async def query_legacy_search(
12 self,
13 text_search: str,
14 min_price: int,
15 max_price: int,
16 min_rating: float,
17 num_results: int,
18 ):
19 return await self.query_legacy.run.remote(...)
20
21 @fastapi.get("/ai_enabled")
22 async def query_ai_enabled_search(
23 self,
24 text_search: str,
25 min_price: int,
26 max_price: int,
27 min_rating: float,
28 categories: list[str],
29 colors: list[str],
30 seasons: list[str],
31 num_results: int,
32 embedding_column: str,
33 search_type: list[str],
34 ):
35 logger = logging.getLogger("ray.serve")
36 logger.setLevel(logging.DEBUG)
37 logger.debug(f"Running query_ai_enabled_search with {locals()=}")
38 return await self.query_ai_enabled.run.remote(...)
Definimos dois pontos de extremidade para a implantação de entrada: um para executar a pesquisa herdada (/legacy) e outro para executar a pesquisa habilitada para IA (/ai_enabled). Os pontos de extremidade são definidos como funções assíncronas que usam os parâmetros necessários para a consulta de pesquisa e retornam os resultados da pesquisa.
Para visualizar a implementação completa da implantação de entrada, consulte nosso repositório GitHub.

Implantação “Query with AI Enabled”

O sistema "Query with AI Enabled " é responsável por realizar a pesquisa híbrida no MongoDB database completo. O tipo de pesquisa é parametrizado como pesquisa lexical, pesquisa vetorial ou ambos (pesquisa híbrida).
Veja como os itens acima são controlados no frontend:
Além disso, é oferecida a opção de usar a descrição do produto gerada ou o nome do produto como o campo de incorporação.
Vamos dar uma olhada em como podemos implementar a implantação “Query with AI Enabled” usando o Ray Serve:
1@deployment
2class QueryAIEnabledSearch:
3 def __init__(
4 self,
5 embedding_model: DeploymentHandle,
6 database_name: str,
7 collection_name: str,
8 ) -> None:
9 self.client = Async
10
11IOMotorClient(os.environ["DB_CONNECTION_STRING"])
12 self.embedding_model = embedding_model
13 self.database_name = database_name
14 self.collection_name = collection_name
15
16 async def run(
17 self,
18 text_search: str,
19 min_price: int,
20 max_price: int,
21 min_rating: float,
22 categories: list[str],
23 colors: list[str],
24 seasons: list[str],
25 n: int,
26 search_type: set[str],
27 vector_search_index_name: str = "vector_search_index",
28 vector_search_path: str = "description_embedding",
29 text_search_index_name: str = "lexical_text_search_index",
30 vector_penalty: int = 1,
31 full_text_penalty: int = 10,
32 ):
33 db = self.client[self.database_name]
34 collection = db[self.collection_name]
35 pipeline = []
36 if text_search.strip():
37 if "vector" in search_type:
38 embedding = await self.embedding_model.compute_embedding.remote(text_search)
39 is_hybrid = search_type == {"vector", "lexical"}
40 if is_hybrid:
41 pipeline.extend(hybrid_search(...))
42 elif search_type == {"vector"}:
43 pipeline.extend(vector_search(...))
44 elif search_type == {"lexical"}:
45 pipeline.extend(lexical_search(...))
46 pipeline.extend(match_on_metadata(...))
47 else:
48 pipeline = match_on_metadata(...)
49
50 records = collection.aggregate(pipeline)
51 records = [record async for record in records]
52 results = [(record["img"], record["name"]) for record in records]
53 return results
No código acima, veja como implementamos a busca vetorial:
1def vector_search(
2 vector_search_index_name: str,
3 vector_search_path: str,
4 embedding: list[float],
5 n: int,
6 min_price: int,
7 max_price: int,
8 min_rating: float,
9 categories: list[str],
10 colors: list[str],
11 seasons: list[str],
12 cosine_score_threshold: float = 0.92,
13) -> list[dict]:
14 return [
15 {
16 "$vectorSearch": {
17 "index": vector_search_index_name,
18 "path": vector_search_path,
19 "queryVector": embedding.tolist(),
20 "numCandidates": 100,
21 "limit": n,
22 "filter": {
23 "price": {"$gte": min_price, "$lte": max_price},
24 "rating": {"$gte": min_rating},
25 "category": {"$in": categories},
26 "color": {"$in": colors},
27 "season": {"$in": seasons},
28 },
29 },
30 },
31 {
32 "$project": {
33 "img": 1,
34 "name": 1,
35 "score": {"$meta": "vectorSearchScore"},
36 }
37 },
38 {"$match": {"score": {"$gte": cosine_score_threshold}}},
39 ]
Observe que usamos o estágio de agregação$vectorSearch para realizar a pesquisa de vetores no MongoDB database. O estágio recebe os seguintes parâmetros:
  • index: o nome do índice de pesquisa vetorial
  • pathO caminho para o campo vetorial no documento
  • queryVector: O vetor de incorporação da query de pesquisa
  • numCandidates: o número de candidatos a considerar para a pesquisa
  • limitO número de resultados a serem retornados
  • filter: Pré-filtros para aplicar aos resultados da pesquisa
Em seguida, adicionamos um estágio$project para projetar os campos nos quais estamos interessados e um estágio$match para filtrar os resultados com base na pontuação de similaridade do cosseno.
No código acima, também implementamos a funçãolexical_search para realizar pesquisa lexical no MongoDB database:
1def lexical_search(text_search: str) -> list[dict]:
2 return [
3 {
4 "$search": {
5 "index": "lexical_text_search_index",
6 "text": {
7 "query": text_search,
8 "path": "name",
9 },
10 }
11 }
12 ]
O estágio de agregação$search é usado para realizar a pesquisa lexical no MongoDB database. O estágio recebe os seguintes parâmetros:
  • index: o nome do índice de pesquisa de texto
  • text: A query de pesquisa de texto e o caminho para o campo de texto no documento
Observe que, diferentemente da pesquisa de vetores, os filtros de metadados são aplicados após o estágio de pesquisa ao construir um pipeline para pesquisa lexical.
No código acima, também implementamos a funçãohybrid_searchpara executar a pesquisa híbrida no MongoDB database. Aqui está um diagrama de como a função de pesquisa híbrida funciona:
E aqui está como implementamos a pesquisa híbrida:
1def hybrid_search(
2 collection_name: str,
3 ...
4) -> list[dict]:
5 # 1. Perform vector search
6 vector_search_stages = vector_search(...)
7 convert_vector_rank_to_score_stages = convert_rank_to_score(
8 score_name="vs_score", score_penalty=vector_penalty
9 )
10
11 # 2. Perform lexical search
12 lexical_search_stages = lexical_search(text_search=text_search, text_search_index_name=text_search_index_name)
13 post_filter_stages = match_on_metadata(...)
14 convert_text_rank_to_score_stages = convert_rank_to_score(
15 score_name="fts_score", score_penalty=full_text_penalty
16 )
17
18 # 3. Rerank by combined score
19 rerank_stages = rerank_by_combined_score(
20 vs_score_name="vs_score", fts_score_name="fts_score", n=n
21 )
22
23 # 4. Put it all together
24 return [
25 *vector_search_stages,
26 *convert_vector_rank_to_score_stages,
27 {
28 "$unionWith": {
29 "coll": collection_name,
30 "pipeline": [
31 *lexical_search_stages,
32 *post_filter_stages,
33 *convert_text_rank_to_score_stages,
34 ],
35 }
36 },
37 *rerank_stages,
38 ]

Implantação do modelo de incorporação

O Embedding Model Deployment é responsável por gerar embeddings para a consulta de pesquisa. Abaixo está um exemplo de como definir uma implantação de modelo de incorporação usando o Ray Serve:
1@deployment
2class EmbeddingModel:
3 def __init__(self, model: str = "thenlper/gte-large") -> None:
4 self.model = SentenceTransformer(model)
5
6 async def compute_embedding(self, text: str) -> list[float]:
7 loop = asyncio.get_event_loop()
8 return await loop.run_in_executor(None, lambda: self.model.encode(text))
Observe que, dependendo do nosso tráfego, podemos especificar:
  • Uma configuração de autoscaling como parte da especificação de implantação para reduzir o modelo de incorporação para zero. Isso é útil no caso de nosso tráfego esperado ser esporádica.O dimensionamento para zero é facilmente disponibilizado usandoo Anyscale Services.
  • Um tipo de recurso de GPU, caso desejemos acelerar nosso modelo de incorporação para processar lotes de textos recebidos. Precisaríamos agrupar dinamicamente o compute_embedding método usando a funcionalidade dinâmica de lote de solicitações do Ray Serve . Isso é útil para otimizar a taxa de transferência de tráfego de alto volume .
Para ver a implementação completa do aplicativo, que inclui o frontend do Gradio e a pesquisa legada, consulte nosso repositório do Github.

Pesquise a implantação de aplicativos usando o Anyscale Services

Com o Anyscale Services, podemos implantar aplicativos altamente disponíveis usando opções de implantação prontas para produção, ativando implementações de canary versionadas.
Podemos continuar implantando um serviço Anyscale em nosso espaço de trabalho usando o terminal VSCode. Tudo o que precisamos é de um arquivo de configuração YAML, onde especificamos:
  • Nome: o nome do serviço Anyscale que estamos implantando. Se esse for um serviço existente, a implantação será implementada gradualmente por meio de um canário.
  • Aplicativos: O caminho de importação para os aplicativos front-end e back-end.
  • Requisitos: Dependências adicionais a serem instaladas ao configurar o ambiente de tempo de execução do trabalho.
  • Sinalizadores: um sinalizador que podemos definir para desabilitar a autenticação em nosso serviço para expor nosso aplicativo ao público.
Fornecemos nosso arquivo app.yaml de configuração de serviço abaixo:
1name: mongo-multi-modal-search-v2
2applications:
3 - name: frontend
4 route_prefix: /
5 import_path: frontend:app
6 - name: backend
7 route_prefix: /backend
8 import_path: backend:app
9query_auth_token_enabled: false
10requirements: requirements.txt
Para implantar o serviço no terminal, use o seguinte comando:
1anyscale service deploy -f app.yaml
Essa abordagem nos permite implantar nosso serviço em um cluster gerenciado no Anyscale. As tarefas de serviço implantadas têm acesso às principais métricas de desempenho, como latência, taxa de transferência e medidas de erro, bem como a registros de serviços.

Saiba mais

Neste guia, apresentamos uma solução de referência para melhorar um sistema de pesquisa para dados multimodais usando Anyscale e MongoDB.
Além disso:
  • Se sua equipe está investindo pesadamente no desenvolvimento de aplicativos de pesquisa, entre emcontato conosco para saber mais sobre como o Anyscale e o MongoDB podem ajudá-lo a escalar e produzir sua pesquisa multimodal.
  • Para começar rapidamente a implantar um aplicativo semelhante, siga o guia passo a passo em nosso repositório do GitHub.
Participe dos fóruns do MongoDB e configure sua conta do Atlas com Anyscale!
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Iniciar a conversa

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Desbloqueando a pesquisa semântica: crie um mecanismo de pesquisa de filmes baseado em Java com o Atlas Vector Search e o Spring Boot


Sep 18, 2024 | 10 min read
Tutorial

Crie aplicativos inteligentes com o Atlas Vector Search e o Google Vertex AI


Sep 18, 2024 | 4 min read
Tutorial

Descubra seu Airbnb ideal: implementando um Spring Boot e Atlas Search com o driver Kotlin Sync


Oct 02, 2024 | 8 min read
Tutorial

Aprenda a criar soluções de pesquisa de varejo aprimoradas por IA com MongoDB e Databricks


Sep 18, 2024 | 14 min read
Sumário
  • Resumo