Reinventando a pesquisa multimodal com MongoDB e Anyscale
KK
MS
Kamil Kaczmarek, Marwan Sarieddine20 min read • Published Sep 18, 2024 • Updated Sep 18, 2024
APLICATIVO COMPLETO
Avalie esse Tutorial
Neste guia, percorreremos uma solução abrangente para melhorar um sistema de pesquisa legado em dados multimodais usando Anyscale e MongoDB. Mostraremos como construir:
Um pipeline de indexação de dados multimodal e dimensionável que executa tarefas complexas como:
- Inferência em lote de modelo multimodal.
- Geração de incorporação de vetores.
- Inserindo dados em um índice de pesquisa.
- Atualizações Delta do índice de pesquisa.
} Um backend de pesquisa híbrida de alto desempenho que:
- Modelos de incorporação auto-hospedados.
- Combina a correspondência de texto léxico com a consulta de pesquisa semântica.
🖥️ Uma interface de usuário simples para interagir com o backend de pesquisa.
Estaremos utilizando:
MongoDB cloud como repositório central de dados para:
- Sendo uma escolha onipresente no espaço por sua flexibilidade e escalabilidade.
- Suporte ao armazenamento de dados multimodais, como imagens, texto e dados estruturados.
Plataforma Anyscale como plataforma de computação de AI para:
- Executando trabalhos de computação de inferência em lote de alto desempenho.
- Habilitando implantações altamente disponíveis e escaláveis.
- Dimensionando e utilizando de forma ideal os recursos de computação custosos.
Este guia foi extraído do artigo da Anyscale: Reinventing Multi-Modal Search with Anyscale and MongoDB.
Para obter um tutorial básico sobre a integração do MongoDB com o Anyscale, leia: Construindo aplicativos de AI e RAG com o MongoDB, o Anyscale e o PyMongo
As empresas que lidam com um grande volume de dados multimodais geralmente exigem um sistema de pesquisa robusto e de alto desempenho. No entanto, os sistemas de pesquisa tradicionais têm certas limitações que precisam ser abordadas:
Os sistemas de pesquisa legados normalmente oferecem apenas correspondência lexical para dados de texto, enquanto dados não estruturados de outras formas, como imagens, permanecem não pesquisáveis.
Se determinados campos de metadados estiverem ausentes ou forem de baixa qualidade, o sistema de pesquisa não poderá utilizá-los eficazmente. Em pequenos conjuntos de dados, esses problemas de metadados podem ser facilmente corrigidos, mas, dada a escala dos conjuntos de dados corporativos, a curadoria e a melhoria manuais geralmente não são uma opção.
Para superar essas limitações, faremos uso de modelos generativos e de incorporação para enriquecer e codificar os dados, permitindo uma experiência de pesquisa mais sofisticada.
Em nosso exemplo, abordaremos o seguinte caso de uso: uma plataforma de e-commerce que possui um grande catálogo de produtos e gostaria de melhorar sua relevância e experiência de pesquisa.
O conjunto de dados que usaremos é o conjunto de dados Myntra, que contém imagens e metadados de produtos da Myntra, uma empresa indiana de comércio eletrônico de moda. O objetivo é melhorar os recursos de pesquisa da plataforma implementando um sistema de busca multimodal escalável que possa lidar com dados de texto e imagem.
O sistema de pesquisa legado permite apenas:
- Pesquisa lexical em relação ao nome do produto.
- Correspondência no preço e classificação do produto.
Por exemplo, realizar uma pesquisa por "green dress " retornará itens cujo nome de produto corresponde ao termo "green " ou "dress " (usamos MongoDB Atlas, que converte o texto em letras minúsculas e o divide com base nos limites das palavras para criar tokens separados para correspondência — os resultados da pesquisa são classificados usando uma pontuação BM25 .)
Abaixo está uma captura de tela mostrando os resultados retornados da consulta “green dress”. Observe como os nomes dos produtos mostrados contêm o token "green " ou "dress " (principalmente "green " na captura de tela mostrada).
Devido às limitações do sistema de pesquisa legado, os resultados correspondentes contêm itens que não são relevantes para a consulta e a intenção dos usuários, como "Bio Green Apple Shampoo." A precisão da pesquisa lexical existente é limitada pela qualidade do nome do produto fornecido e ainda sofrerá com a lembrança insatisfatória de itens que não contêm informações relevantes no nome do produto.
Em um alto nível, nossa solução consistirá em usar o Anyscale para executar:
- Modelos de linguagem grande (LLMs) multimodais e gerar descrições de produtos a partir da imagem e do nome do produto fornecidos.
- Modelos de linguagem grande e campos de metadados de produto gerados que são úteis para pesquisa.
- Incorporar modelos para codificar os nomes dos produtos e as descrições geradas e indexar os vetores numéricos resultantes em um mecanismo de busca vetorial como o MongoDB Atlas Vector Search.
Abaixo está uma captura de tela mostrando os resultados retornados da consulta "green dress" depois de melhorar nosso mecanismo de pesquisa. Observe como os produtos mostrados são mais relevantes para a consulta e a intenção dos usuários, graças aos recursos de pesquisa semântica do novo sistema. O novo sistema utiliza imagens para agregar valor ao significado semântica do nome do produto, melhorando a capacidade de pesquisa. Por exemplo, a modelo pode buscar itens relevantes que não contêm o símbolo “green” no nome do produto, mas que são, na verdade, vestidos verdes.
Além disso, à esquerda, podemos ver filtros de metadados gerados por IA que podem ser usados para refinar ainda mais os resultados da pesquisa. Para a mesma query “green dress,”, podemos filtrar ainda mais os resultados por campos de metadados, como "category, ", "season, " e "color ", para corresponder estritamente a filtros como "color=green, "category=dress”; e época="summer or spring. "
Observe que uma abordagem alternativa para codificar imagens é fazer uso de modelos de incorporação multimodais como o CLIP. Para saber mais sobre essa abordagem, consulte a postagem do blog da Anyscale sobre pesquisa intermodal para comércio eletrônico. Essa abordagem pode ser menos intensiva computacionalmente do que usar um modelo generativo como o LLaVA, mas não permite condicionar o significado semântico gerado da imagem com o nome do produto. Por exemplo, uma foto de um modelo vestindo muitas peças de roupa ou contra um fundo de outros itens pode ter seu sinal semântico dissipado em todos os itens da foto, tornando-a muito menos útil (veja a imagem de "Girls Embellished Net Maxi Dress" na captura de tela acima com o modelo infantil falando ao telefone como exemplo).
Dividimos nosso sistema em um estágio de indexação de dados offline e um estágio de pesquisa online. O estágio de indexação de dados precisa ser executado periodicamente sempre que novos dados são adicionados ao sistema, enquanto o estágio de pesquisa é um serviço on-line que lida com solicitações de pesquisa.
O estágio de indexação é responsável por processar, incorporar e atualizar textos e imagens em um repositório de dados central – neste caso, o MongoDB. Os principais componentes do estágio de indexação são:
- Enriquecimento de metadados: Utiliza modelos multimodais de linguagem ampla para enriquecer as descrições de produtos e classificadores LLM para gerar campos de metadados.
- Geração de inserção: utiliza modelos de inserção para gerar inserções dos nomes e descrições do produto.
- ingestão de dados: executa atualizações emmassa e insere em uma collection MongoDB que oferece suporte a pesquisa vetorial usando o MongoDB Atlas Vector Search.
Aqui está um diagrama detalhado do pipeline:
O estágio de pesquisa é responsável por combinar a correspondência de texto legado com recursos avançados de pesquisa semântica. Os principais componentes do estágio de pesquisa são:
- Frontend: fornece uma interface do usuário baseada em Gradio para interagir com o backend de pesquisa.
- Backend: implementa o backend de pesquisa híbrida.
Abaixo está um diagrama de sequência do estágio de pesquisa, mostrando o fluxo de solicitação de pesquisa, que envolve principalmente as seguintes etapas:
- Envie uma solicitação de pesquisa do frontend.
- Processe a solicitação na implantação do ingresso.
- Encaminhe a query de texto da solicitação para o modelo de incorporação para gerar incorporações.
- Execute uma pesquisa vetorial no banco de dados MongoDB.
- Retorne os resultados da pesquisa para a implantação do ingresso para criar uma resposta.
- Retorne a resposta para o frontend.
Depois de analisarmos a arquitetura de alto nível, passaremos agora pela implementação dos principais componentes da nossa solução. Se você quiser aprender a codificar isso sozinho, continue lendo...
Implemente seu cluster do Atlas
Obtenha a cadeia de conexão do MongoDB e permita o acesso a partir de endereços IP relevantes. Para este tutorial, você usará a lista de acesso à rede
0.0.0.0/0
.Para implementações de produção, os usuários podem gerenciar VPCs e redes privadas na plataforma Anyscale e conectá-las com segurança por meio de emparelhamento de VPC ou endpoints privados de nuvem ao Atlas.
Crie uma collection, índice vetorial Atlas e um índice de texto completo
Usando o Data Explorer ou uma conexão Compass, crie a seguinte coleção para hospedar o contexto do aplicativo e o mecanismo de pesquisa. Use o nome do banco de dados de sua escolha (por exemplo.
myntra
) e nome da coleção de sua escolha (por exemplo. myntra-items-offline
). Depois que a coleção for criada, vá para a aba Atlas Search (ou para a aba Índice no Compass usando o botão de alternância Atlas search) e crie o seguinte índice vetorial, essa configuração deve ser consistente com os valores de backend do aplicativo:nome : vector_search_index
1 { 2 "fields": [ 3 { 4 "numDimensions": 1024, 5 "similarity": "cosine", 6 "type": "vector", 7 "path": "description_embedding" 8 }, 9 { 10 "numDimensions": 1024, 11 "similarity": "cosine", 12 "type": "vector", 13 "path": "name_embedding" 14 }, 15 { 16 "type": "filter", 17 "path": "category" 18 }, 19 { 20 "type": "filter", 21 "path": "season" 22 }, 23 { 24 "type": "filter", 25 "path": "color" 26 }, 27 { 28 "type": "filter", 29 "path": "rating" 30 }, 31 { 32 "type": "filter", 33 "path": "price" 34 } 35 ] 36 }
Além disso, configure o índice de Full Text Search do Atlas:
nome : lexical_text_search_index
1 { 2 "mappings": { 3 "dynamic": false, 4 "fields": { 5 "name": { 6 "type": "string", 7 "analyzer": "lucene.standard" 8 } 9 } 10 } 11 }
Observe que, com clusters de "nível pago", você pode utilizar o driver para criar os índices, conforme mostrado no repositório do GitHub aqui.
Começamos detalhando como implementar pipelines de dados multimodais em grande escala. Os pipelines de dados são projetados para lidar com dados de texto e imagem executando instâncias de modelo de linguagem grande multimodal.
Usamos o Ray Data para implementar nossos pipelines de dados para execução em escala. O Ray Data é uma biblioteca que fornece uma API de alto nível para a criação de pipelines de dados dimensionáveis que podem ser executados usando computação heterogênea. O Ray Data foi desenvolvido com base no Ray, uma estrutura de computação distribuída que nos permite dimensionar facilmente nosso código Python em um cluster de máquinas.
Abaixo está um diagrama do pipeline de dados para gerar descrições de produtos a partir de imagens:
As principais etapas mostradas no diagrama são:
- Estime a distribuição de token de entrada/saída usando tokenizadores em CPUs.
Observe que os resultados intermediários são armazenados em um armazenamento distribuído na memória, chamado de Object Store no diagrama acima.
A API da Ray Data adota execução lenta, o que significa que as operações de processamento de dados só são executadas quando os dados são necessários. Começamos especificando como construir um conjunto de dados Ray Data usando um dos conectores IO. Em seguida, aplicamos transformações ao objeto Dataset usando as operações de mapa e filtro que podem ser aplicadas em paralelo nos dados, seja em linha ou em lotes.
Aqui está a implementação do pipeline Ray Data para leitura e processamento dos dados:
1 ds = ray.data.read_csv(path, ...) 2 ds = ( 3 ds.map_batches(download_images, concurrency=num_download_image_workers, 4 num_cpus=4) 5 .filter(lambda x: bool(x["img"])) 6 .map(LargestCenterSquare(size=336)) 7 .map(gen_description_prompt) 8 .materialize() 9 )
O código acima irá:
- Ler os dados do armazenamento do data lake.
- Baixe as imagens em paralelo usando a função download_images.
- Filtre as imagens inválidas/vazias.
- Corte as imagens para o maior quadrado central usando o recurso de chamada LargestCenterSquare.
- Gere o prompt de descrição para o modelo utilizando a função gen_description_prompt.
- Materialize o conjunto de dados, que aciona a execução do pipeline e o armazenamento dos resultados na memória.
Usamos o llava-hf/llava-v1.Modelo 6-mistral-7b-hf para gerar descrições de produtos com base na imagem e no nome do produto.
Esta é a função que usaremos para gerar prompts para o modelo:
1 def gen_description_prompt(row: dict[str, Any]) -> dict[str, Any]: 2 title = row["name"] 3 row["description_prompt"] = "<image>" * 1176 + ( 4 f"\nUSER: Generate an ecommerce product description given the image 5 and this title: {title}." 6 "Make sure to include information about the color of the product in 7 the description." 8 "\nASSISTANT:" 9 ) 10 return row
map e filter usam uma função que opera em uma única linha do conjunto de dados, portanto, por que gen_description_prompt espera uma linha. Por outro lado, map_batches usa uma função que opera em um lote de linhas - ou seja, download_images esperará uma entrada em lote.
Em seguida, passamos a calcular a distribuição de token de entrada/saída para o modelo LLaVA. Isso é necessário para otimizar o uso do vLLM, nosso mecanismo de inferência escolhido. Os valores padrão de distribuição de tokens de entrada/saída assumidos pelo vLLM deixam muito desempenho de lado.
O vLLM é uma biblioteca para geração de alta taxa de transferência de modelos LLM, aproveitando várias otimizações de desempenho, principalmente:
- Gerenciamento eficiente da chave de atenção e da memória de valor com PagedAttention.
- Execução rápida do modelo com gráfico CUDA/HIP.
- Quantization: GPTQ, AWQ, SqueezeLLM, FP8 KV Cache.
- Kernels CUDA otimizados.
Abaixo está uma programação do processo de geração de uma solicitação com PagedAttention feita na postagem do blog vLLM. Esse processo de geração permite que o vLLM aloque de forma ideal a memória da CPU ao armazenar o cache KV de um modelo baseado em um transformação, tornando possível processar mais sequências em paralelo (ou seja, liberando gargalos de taxa de transferência limitada à memória).
Portanto, para maximizar a capacidade de cache do KV, é melhor especificar o número máximo de tokens que cada sequência consumirá. Isso informará ao vLLM sobre o número máximo de blocos que cada sequência consumirá — e, portanto, o número máximo de sequências que podem ser processadas em paralelo.
Para fazer isso, calculamos o máximo de tokens de entrada executando um LlaVAMiscalTokenizer no campo descrição_prompt usando o código abaixo:
1 max_input_tokens = ( 2 ds.map_batches( 3 LlaVAMistralTokenizer, 4 fn_kwargs={ 5 "input": "description_prompt", 6 "output": "description_prompt_tokens", 7 }, 8 concurrency=num_llava_tokenizer_workers, 9 num_cpus=1, 10 ) 11 .select_columns(["description_prompt_tokens"]) 12 .map(compute_num_tokens, fn_kwargs={"col": "description_prompt_tokens"}) 13 .max(on="num_tokens") 14 )
Definimos o número máximo de tokens de saída para 256 já que não queremos descrições de produtos muito longas. Em seguida, calculamos o comprimento máximo do modelo como a soma dos tokens máximos de entrada e saída:
1 max_output_tokens = 256 2 max_model_length = max_input_tokens + max_output_tokens
Agora que calculamos os máximos de token de entrada/saída, podemos executar o modelo LLaVA.
Aqui está o código Ray Data para executar o modelo LLaVA:
1 ds = ds.map_batches( 2 LlaVAMistral, 3 fn_constructor_kwargs={ 4 "max_model_len": max_model_length, 5 "max_tokens": max_output_tokens, 6 "max_num_seqs": 400, 7 }, 8 fn_kwargs={"col": "description_prompt"}, 9 batch_size=llava_model_batch_size, 10 num_gpus=1, 11 concurrency=num_llava_model_workers, 12 accelerator_type=llava_model_accelerator_type, 13 )
Fazemos uso de map_batches sempre que podemos nos beneficiar de operações vetorizadas nos dados. Este é o caso da inferência do modelo LLaVA, onde podemos processar múltiplas sequências em paralelo na GPU. Consideramos que um tamanho de lote de 80 é ideal, dadas as restrições de memória de uma CPU A10 e os parâmetros especificados do mecanismo.
Uma classe LlaVAMiscal é definida como:
1 class LlaVAMistral: 2 def __init__(self, max_model_len: int, ...): 3 self.llm = LLM(...) 4 5 def __call__(self, batch: dict[str, np.ndarray], col: str) -> dict[str, np.ndarray]: 6 prompts = batch[col] 7 images = batch["img"] 8 responses = self.llm.generate( 9 [ 10 { 11 "prompt": prompt, 12 "multi_modal_data": ImagePixelData(image), 13 } 14 for prompt, image in zip(prompts, images) 15 ], 16 sampling_params=self.sampling_params, 17 ) 18 batch["description"] = [resp.outputs[0].text for resp in responses] # type: ignore 19 return batch
Este é um exemplo de uma transformação com estado no Ray Data, em que um estado caro, como o carregamento de um modelo LLM, pode ser feito no construtor e, em seguida, o modelo pode ser usado para gerar respostas no método
__call__
. O que isso faz é gerar um processo de trabalho de longa duração com o modelo carregado na memória e, em seguida, o método__call__
é chamado em cada lote de dados enviado ao processo de trabalho.Aqui está um diagrama do pipeline de dados para gerar classificações de produtos a partir de descrições:
As principais etapas do pipeline são:
- Construa prompts e tokenize-os para o modelo Miscal em CPUs.
- Estime a distribuição do token de entrada/saída para o modelo Mistral usando CPUs.
- Execute a inferência do modelo Mistral para gerar classificações de produtos usando vLLM em GPUs.
Começamos construindo prompts para os classificadores e tokenizando-os para o modelo Mistral. Aqui está o código para nossa especificação de classificadores:
1 classifiers: dict[str, Any] = { 2 "category": { 3 "classes": ["Tops", "Bottoms", "Dresses", "Footwear", "Accessories"], 4 "prompt_template": ( 5 "Given the title of this product: {title} and " 6 "the description: {description}, what category does it belong to?" 7 "Chose from the following categories: {classes_str}. " 8 "Return the category that best fits the product. Only return the" 9 "category name and nothing else." 10 ), 11 "prompt_constructor": construct_prompt_classifier, 12 }, 13 "season": { 14 "classes": ["Summer", "Winter", "Spring", "Fall"], 15 "prompt_template": ..., 16 "prompt_constructor": construct_prompt_classifier, 17 }, 18 "color": { 19 ... 20 } 21 }
Continuamos a construir prompts e tokenizá-los para cada classificador usando Ray Data e uma implementação de tokenizador de modelo Mistral.
1 for classifier, classifier_spec in classifiers.items(): 2 ds = ( 3 ds.map( 4 classifier_spec["prompt_constructor"], 5 fn_kwargs={ 6 "prompt_template": classifier_spec["prompt_template"], 7 "classes": classifier_spec["classes"], 8 "col": classifier, 9 }, 10 ) 11 .map_batches( 12 MistralTokenizer, 13 fn_kwargs={ 14 "input": f"{classifier}_prompt", 15 "output": f"{classifier}_prompt_tokens", 16 }, 17 concurrency=num_mistral_tokenizer_workers_per_classifier, 18 num_cpus=1, 19 ) 20 .materialize() 21 )
Semelhante ao modelo LLaVA, estimamos a distribuição de tokens de entrada/saída para o modelo Mistral:
1 for classifier, classifier_spec in classifiers.items(): 2 max_input_tokens = ( 3 ds.select_columns([f"{classifier}_prompt_tokens"]) 4 .map(compute_num_tokens, fn_kwargs={"col": f"{classifier}_prompt_tokens"}) 5 .max(on="num_tokens") 6 ) 7 max_output_tokens = classifier_spec["max_output_tokens"] 8 max_model_length = max_input_tokens + max_output_tokens 9 classifier_spec["max_model_length"] = max_model_length
Por fim, executamos a inferência do modelo Mistral para gerar classificações de produtos mapeando lotes para a transformação com estado do MistralVLLM, conforme visto no código abaixo:
1 for classifier, classifier_spec in classifiers.items(): 2 ds = ( 3 ds.map_batches( 4 MistralvLLM, 5 ..., 6 batch_size=80, 7 num_gpus=num_mistral_workers_per_classifier, 8 concurrency=1, 9 accelerator_type=NVIDIA_TESLA_A10G, 10 ) 11 .map( 12 MistralDeTokenizer, 13 fn_kwargs={"key": f"{classifier}_response"}, 14 concurrency=num_detokenizers_per_classifier, 15 num_cpus=1, 16 ) 17 .map( 18 clean_response, 19 fn_kwargs={ 20 "classes": classifier_spec["classes"], 21 "response_col": f"{classifier}_response", 22 }, 23 ) 24 )
Observe que, diferentemente do modelo LLaVA, optamos por desacoplar o processo de destokenização e a limpeza de resposta. Fizemos isso para mostrar que podemos escalar de forma independente as etapas de processamento dentro do pipeline. Por fim, a capacidade de desacoplar volumes de trabalho complexos e de computação intensiva ajudará a resolver os gargalos de desempenho. Isso é possível, dada a facilidade com que podemos dimensionar automaticamente um cluster heterogêneo de trabalhadores com o Anyscale. O Anyscale escalará os nós automaticamente para cima ou para baixo com tempos de inicialização otimizados para dimensionar elasticamente o cluster com nós de CPU e CPU.
Para visualizar a implementação completa do pipeline de enriquecimento de metadados, consulte nosso repositório GitHub.
Em seguida, geramos embeddings para os nomes e as descrições dos produtos usando um modelo de embedding.
Abaixo está um diagrama do pipeline de dados para gerar embeddings:
As principais etapas do pipeline são:
- Execute o modelo de incorporação para gerar incorporações.
- Faça a ingestão dos dados no MongoDB.
Aqui está o código do Ray Data para gerar incorporações:
1 ds = ds.map_batches( 2 EmbedderSentenceTransformer, 3 fn_kwargs={"cols": ["name", "description"]}, 4 batch_size=80, 5 num_gpus=1, 6 concurrency=num_embedder_workers, 7 accelerator_type=NVIDIA_TESLA_A10G, 8 )
Quando uma classe EmbedderSentenceTransformer é definida como:
1 class EmbedderSentenceTransformer: 2 def __init__(self, model: str = "thenlper/gte-large"): 3 self.model = SentenceTransformer(model, device="cuda") 4 5 def __call__(self, batch: dict[str, np.ndarray], cols: list[str]) -> dict[str, np.ndarray]: 6 for col in cols: 7 batch[f"{col}_embedding"] = self.model.encode( # type: ignore 8 batch[col].tolist(), batch_size=len(batch[col]) 9 ) 10 return batch
Finalmente, continuamos ingerindo os dados processados no MongoDB usando o PyMongo. Aqui está o código do Raio Data para ingestão dos dados. Observe que optamos por usar o MongoBulkInsert ou MongoBulkUpdate, dependendo se estamos realizando a primeira execução ou uma atualização no banco de dados. Certificamo-nos de definir a simultaneidade para um valor razoável, o que evita uma tempestade de conexões em nosso cluster do MongoDB. O número de conexões que o banco de dados pode manipular dependerá do tamanho do cluster escolhido.
1 mongo_bulk_op: Type[MongoBulkInsert] | Type[MongoBulkUpdate] 2 if mode == "first_run": 3 mongo_bulk_op = MongoBulkInsert 4 elif mode == "update": 5 mongo_bulk_op = MongoBulkUpdate 6 7 ( 8 ds.map_batches(update_record) 9 .map_batches( 10 mongo_bulk_op, 11 fn_constructor_kwargs={ 12 "db": db_name, 13 "collection": collection_name, 14 }, 15 batch_size=db_update_batch_size, 16 concurrency=num_db_workers, 17 num_cpus=0.1, 18 batch_format="pandas", 19 ) 20 .materialize() 21 )
As classes MongoBulkUpdate e MongoBulkInsert fazem uso da biblioteca PyMongo para executar operações em massa. Abaixo está um exemplo de implementação da classe MongoBulkUpdate:
1 class MongoBulkUpdate: 2 def __init__(self, db: str, collection: str) -> None: 3 client = MongoClient(os.environ["DB_CONNECTION_STRING"]) 4 self.collection = client[db][collection] 5 6 def __call__(self, batch_df: pd.DataFrame) -> dict[str, np.ndarray]: 7 docs = batch_df.to_dict(orient="records") 8 bulk_ops = [ 9 UpdateOne(filter={"_id": doc["_id"]}, update={"$set": doc}, upsert=True) 10 for doc in docs 11 ] 12 self.collection.bulk_write(bulk_ops) 13 return {}
Para visualizar a implementação completa do pipeline de indexação de dados, consulte nosso repositório GitHub.
Desenvolvemos e testamos nosso pipeline de dados em um espaço detrabalho Anyscale para usar a experiência do VSCode IDE em execução em um cluster de computação elástica. Agora que construímos o pipeline, estamos prontos para escaloná-lo. Para fazer isso, usamos Anyscale Jobs, que é a melhor maneira de executar volumes de trabalho em lote na produção.
Podemos enviar facilmente um trabalho em Anyscale de nosso espaço de trabalho usando o terminal VSCode. Tudo o que precisamos é de um arquivo de configuração YAML, onde especificamos:
- Nome: o nome do trabalho do Anyscale que estamos lançando.
- Ponto de entrada: queremos executar o script cli.py que executa nosso pipeline.
- Diretório de trabalho: este é o diretório que contém os arquivos necessários para executar o ponto de entrada.
- Requisitos: dependências adicionais a serem instaladas ao configurar o ambiente de execução do trabalho.
- Variáveis de ambiente: token de acesso Hugging Face e cadeias de conexão de banco de dados.
- Configuração de computação: o tipo e o número de nós a serem habilitados para o escalonamento automáticodo cluster.
Fornecemos nosso arquivo job.yaml de configuração do trabalho abaixo:
1 name: enrich-data-and-upsert 2 entrypoint: python cli.py ... 3 working_dir: . 4 requirements: requirements.txt 5 env_vars: 6 DB_CONNECTION_STRING: <your mongodb connection string> 7 HF_TOKEN: <your huggingface token> 8 compute_config: 9 cloud: "Anyscale Cloud" 10 head_node: 11 instance_type: m5.8xlarge 12 worker_nodes: 13 - instance_type: m5.8xlarge 14 min_nodes: 0 15 max_nodes: 10 16 - instance_type: g5.xlarge 17 min_nodes: 0 18 max_nodes: 40
Para enviar o trabalho no terminal, use o seguinte comando:
1 anyscale job submit -f job.yaml
Essa abordagem nos permite executar nosso pipeline em um cluster gerenciado que contém apenas as métricas e os logs do nosso trabalho. Além disso, como estamos executando esse trabalho no Anyscale, o Anyscale nos notificará automaticamente sobre quaisquer falhas e tentará novamente automaticamente se ocorrer uma falha.
Sempre que novos dados forem disponibilizados ou que forem feitas alterações nos dados existentes, desejamos executar um trabalho do Anyscale que primeiro gere novas descrições de produtos, metadados e incorporações e, em seguida, execute atualizações em massa em nossa coleção do MongoDB. Isso é feito executando o mesmo comando
anyscale job submit -f job.yaml
, mas com um arquivo job.yaml atualizado em que os argumentos do ponto de entrada apontam para os novos dados e especificam explicitamente a execução no modo "update" ".Uma coisa a observar é que, em um ambiente de produção, isso geralmente é obtido integrando uma ferramenta de orquestração preferida com o Anyscale, por meio de integrações nativas ou usando programaticamente o SDK do Anyscale.
O aplicativo de pesquisa é composto por vários componentes que funcionam juntos para fornecer uma experiência de pesquisa híbrida. Cada componente é uma implantação do Ray Serve com escalonamento automático que pode ser dimensionada de forma independente para atender às demandas do sistema .
Abaixo está um diagrama do backend do aplicativo de pesquisa com o qual o usuário interagirá por meio do frontend:
Em um nível alto, o backend de pesquisa consiste em:
- Implementação de entrada: Recebe solicitações de pesquisa do frontend e as encaminha para a implementação de backend apropriada.
- Implantação “Query Legacy”: manipula a pesquisa de texto léxico legado no MongoDB database.
- Sistema do “Query with AI Enabled: Lida com a realização de pesquisa híbrida no MongoDB database.
- Implantação do modelo de incorporação: gera incorporações para a consulta de pesquisa.
Esta é uma implementação de amostra do backend de pesquisa que mostra como compor recursos de pesquisa legados e novos por meio de uma camada de lógica de negócios. Ao implementar a lógica comercial personalizada na implementação de entrada, você pode controlar quais usuários são expostos a quais recursos de pesquisa. Por exemplo, considere a possibilidade de expor os recursos de pesquisa habilitados para IA apenas a um subconjunto de usuários ou apenas para determinadas consultas.
O Ray Serve se integra aoFastAPI para fornecer uma maneira simples e escalável de criar APIs. Abaixo está como definimos nossa implantação de entrada. Observe que decoramos a classe com o decorador @deployment para indicar que é uma implantação doRay Serve. Também decoramos a classe com o decorador @ingress para indicar que é a implantação de entrada.
1 fastapi = FastAPI() 2 3 4 5 class QueryApplication: 6 def __init__(self, query_legacy: QueryLegacySearch, query_ai_enabled: QueryAIEnabledSearch): 7 self.query_legacy = query_legacy 8 self.query_ai_enabled = query_ai_enabled 9 10 11 async def query_legacy_search( 12 self, 13 text_search: str, 14 min_price: int, 15 max_price: int, 16 min_rating: float, 17 num_results: int, 18 ): 19 return await self.query_legacy.run.remote(...) 20 21 22 async def query_ai_enabled_search( 23 self, 24 text_search: str, 25 min_price: int, 26 max_price: int, 27 min_rating: float, 28 categories: list[str], 29 colors: list[str], 30 seasons: list[str], 31 num_results: int, 32 embedding_column: str, 33 search_type: list[str], 34 ): 35 logger = logging.getLogger("ray.serve") 36 logger.setLevel(logging.DEBUG) 37 logger.debug(f"Running query_ai_enabled_search with {locals()=}") 38 return await self.query_ai_enabled.run.remote(...)
Definimos dois pontos de extremidade para a implantação de entrada: um para executar a pesquisa herdada (
/legacy
) e outro para executar a pesquisa habilitada para IA (/ai_enabled
). Os pontos de extremidade são definidos como funções assíncronas que usam os parâmetros necessários para a consulta de pesquisa e retornam os resultados da pesquisa.Para visualizar a implementação completa da implantação de entrada, consulte nosso repositório GitHub.
O sistema "Query with AI Enabled " é responsável por realizar a pesquisa híbrida no MongoDB database completo. O tipo de pesquisa é parametrizado como pesquisa lexical, pesquisa vetorial ou ambos (pesquisa híbrida).
Veja como os itens acima são controlados no frontend:
Além disso, é oferecida a opção de usar a descrição do produto gerada ou o nome do produto como o campo de incorporação.
Vamos dar uma olhada em como podemos implementar a implantação “Query with AI Enabled” usando o Ray Serve:
1 2 class QueryAIEnabledSearch: 3 def __init__( 4 self, 5 embedding_model: DeploymentHandle, 6 database_name: str, 7 collection_name: str, 8 ) -> None: 9 self.client = Async 10 11 IOMotorClient(os.environ["DB_CONNECTION_STRING"]) 12 self.embedding_model = embedding_model 13 self.database_name = database_name 14 self.collection_name = collection_name 15 16 async def run( 17 self, 18 text_search: str, 19 min_price: int, 20 max_price: int, 21 min_rating: float, 22 categories: list[str], 23 colors: list[str], 24 seasons: list[str], 25 n: int, 26 search_type: set[str], 27 vector_search_index_name: str = "vector_search_index", 28 vector_search_path: str = "description_embedding", 29 text_search_index_name: str = "lexical_text_search_index", 30 vector_penalty: int = 1, 31 full_text_penalty: int = 10, 32 ): 33 db = self.client[self.database_name] 34 collection = db[self.collection_name] 35 pipeline = [] 36 if text_search.strip(): 37 if "vector" in search_type: 38 embedding = await self.embedding_model.compute_embedding.remote(text_search) 39 is_hybrid = search_type == {"vector", "lexical"} 40 if is_hybrid: 41 pipeline.extend(hybrid_search(...)) 42 elif search_type == {"vector"}: 43 pipeline.extend(vector_search(...)) 44 elif search_type == {"lexical"}: 45 pipeline.extend(lexical_search(...)) 46 pipeline.extend(match_on_metadata(...)) 47 else: 48 pipeline = match_on_metadata(...) 49 50 records = collection.aggregate(pipeline) 51 records = [record async for record in records] 52 results = [(record["img"], record["name"]) for record in records] 53 return results
No código acima, veja como implementamos a busca vetorial:
1 def vector_search( 2 vector_search_index_name: str, 3 vector_search_path: str, 4 embedding: list[float], 5 n: int, 6 min_price: int, 7 max_price: int, 8 min_rating: float, 9 categories: list[str], 10 colors: list[str], 11 seasons: list[str], 12 cosine_score_threshold: float = 0.92, 13 ) -> list[dict]: 14 return [ 15 { 16 "$vectorSearch": { 17 "index": vector_search_index_name, 18 "path": vector_search_path, 19 "queryVector": embedding.tolist(), 20 "numCandidates": 100, 21 "limit": n, 22 "filter": { 23 "price": {"$gte": min_price, "$lte": max_price}, 24 "rating": {"$gte": min_rating}, 25 "category": {"$in": categories}, 26 "color": {"$in": colors}, 27 "season": {"$in": seasons}, 28 }, 29 }, 30 }, 31 { 32 "$project": { 33 "img": 1, 34 "name": 1, 35 "score": {"$meta": "vectorSearchScore"}, 36 } 37 }, 38 {"$match": {"score": {"$gte": cosine_score_threshold}}}, 39 ]
Observe que usamos o estágio de agregação
$vectorSearch
para realizar a pesquisa de vetores no MongoDB database. O estágio recebe os seguintes parâmetros:index
: o nome do índice de pesquisa vetorialpath
O caminho para o campo vetorial no documentoqueryVector
: O vetor de incorporação da query de pesquisanumCandidates
: o número de candidatos a considerar para a pesquisalimit
O número de resultados a serem retornadosfilter
: Pré-filtros para aplicar aos resultados da pesquisa
Em seguida, adicionamos um estágio
$project
para projetar os campos nos quais estamos interessados e um estágio$match
para filtrar os resultados com base na pontuação de similaridade do cosseno.No código acima, também implementamos a função
lexical_search
para realizar pesquisa lexical no MongoDB database:1 def lexical_search(text_search: str) -> list[dict]: 2 return [ 3 { 4 "$search": { 5 "index": "lexical_text_search_index", 6 "text": { 7 "query": text_search, 8 "path": "name", 9 }, 10 } 11 } 12 ]
O estágio de agregação
$search
é usado para realizar a pesquisa lexical no MongoDB database. O estágio recebe os seguintes parâmetros:index
: o nome do índice de pesquisa de textotext
: A query de pesquisa de texto e o caminho para o campo de texto no documento
Observe que, diferentemente da pesquisa de vetores, os filtros de metadados são aplicados após o estágio de pesquisa ao construir um pipeline para pesquisa lexical.
No código acima, também implementamos a função
hybrid_search
para executar a pesquisa híbrida no MongoDB database. Aqui está um diagrama de como a função de pesquisa híbrida funciona:E aqui está como implementamos a pesquisa híbrida:
1 def hybrid_search( 2 collection_name: str, 3 ... 4 ) -> list[dict]: 5 # 1. Perform vector search 6 vector_search_stages = vector_search(...) 7 convert_vector_rank_to_score_stages = convert_rank_to_score( 8 score_name="vs_score", score_penalty=vector_penalty 9 ) 10 11 # 2. Perform lexical search 12 lexical_search_stages = lexical_search(text_search=text_search, text_search_index_name=text_search_index_name) 13 post_filter_stages = match_on_metadata(...) 14 convert_text_rank_to_score_stages = convert_rank_to_score( 15 score_name="fts_score", score_penalty=full_text_penalty 16 ) 17 18 # 3. Rerank by combined score 19 rerank_stages = rerank_by_combined_score( 20 vs_score_name="vs_score", fts_score_name="fts_score", n=n 21 ) 22 23 # 4. Put it all together 24 return [ 25 *vector_search_stages, 26 *convert_vector_rank_to_score_stages, 27 { 28 "$unionWith": { 29 "coll": collection_name, 30 "pipeline": [ 31 *lexical_search_stages, 32 *post_filter_stages, 33 *convert_text_rank_to_score_stages, 34 ], 35 } 36 }, 37 *rerank_stages, 38 ]
O Embedding Model Deployment é responsável por gerar embeddings para a consulta de pesquisa. Abaixo está um exemplo de como definir uma implantação de modelo de incorporação usando o Ray Serve:
1 2 class EmbeddingModel: 3 def __init__(self, model: str = "thenlper/gte-large") -> None: 4 self.model = SentenceTransformer(model) 5 6 async def compute_embedding(self, text: str) -> list[float]: 7 loop = asyncio.get_event_loop() 8 return await loop.run_in_executor(None, lambda: self.model.encode(text))
Observe que, dependendo do nosso tráfego, podemos especificar:
- Uma configuração de autoscaling como parte da especificação de implantação para reduzir o modelo de incorporação para zero. Isso é útil no caso de nosso tráfego esperado ser esporádica.O dimensionamento para zero é facilmente disponibilizado usandoo Anyscale Services.
- Um tipo de recurso de GPU, caso desejemos acelerar nosso modelo de incorporação para processar lotes de textos recebidos. Precisaríamos agrupar dinamicamente o
compute_embedding
método usando a funcionalidade dinâmica de lote de solicitações do Ray Serve . Isso é útil para otimizar a taxa de transferência de tráfego de alto volume .
Para ver a implementação completa do aplicativo, que inclui o frontend do Gradio e a pesquisa legada, consulte nosso repositório do Github.
Com o Anyscale Services, podemos implantar aplicativos altamente disponíveis usando opções de implantação prontas para produção, ativando implementações de canary versionadas.
Podemos continuar implantando um serviço Anyscale em nosso espaço de trabalho usando o terminal VSCode. Tudo o que precisamos é de um arquivo de configuração YAML, onde especificamos:
- Nome: o nome do serviço Anyscale que estamos implantando. Se esse for um serviço existente, a implantação será implementada gradualmente por meio de um canário.
- Aplicativos: O caminho de importação para os aplicativos front-end e back-end.
- Requisitos: Dependências adicionais a serem instaladas ao configurar o ambiente de tempo de execução do trabalho.
- Sinalizadores: um sinalizador que podemos definir para desabilitar a autenticação em nosso serviço para expor nosso aplicativo ao público.
Fornecemos nosso arquivo app.yaml de configuração de serviço abaixo:
1 name: mongo-multi-modal-search-v2 2 applications: 3 - name: frontend 4 route_prefix: / 5 import_path: frontend:app 6 - name: backend 7 route_prefix: /backend 8 import_path: backend:app 9 query_auth_token_enabled: false 10 requirements: requirements.txt
Para implantar o serviço no terminal, use o seguinte comando:
1 anyscale service deploy -f app.yaml
Essa abordagem nos permite implantar nosso serviço em um cluster gerenciado no Anyscale. As tarefas de serviço implantadas têm acesso às principais métricas de desempenho, como latência, taxa de transferência e medidas de erro, bem como a registros de serviços.
Neste guia, apresentamos uma solução de referência para melhorar um sistema de pesquisa para dados multimodais usando Anyscale e MongoDB.
Além disso:
- Se sua equipe está investindo pesadamente no desenvolvimento de aplicativos de pesquisa, entre emcontato conosco para saber mais sobre como o Anyscale e o MongoDB podem ajudá-lo a escalar e produzir sua pesquisa multimodal.
- Para começar rapidamente a implantar um aplicativo semelhante, siga o guia passo a passo em nosso repositório do GitHub.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Relacionado
Tutorial
Desbloqueando a pesquisa semântica: crie um mecanismo de pesquisa de filmes baseado em Java com o Atlas Vector Search e o Spring Boot
Sep 18, 2024 | 10 min read
Tutorial
Crie aplicativos inteligentes com o Atlas Vector Search e o Google Vertex AI
Sep 18, 2024 | 4 min read