Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Junte-se a nós no Amazon Web Services re:Invent 2024! Saiba como usar o MongoDB para casos de uso de AI .
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Atlaschevron-right

Como criar um sistema RAG avançado com recuperação autoquery

Apoorva Joshi, Maria Khalusova21 min read • Published Sep 12, 2024 • Updated Sep 12, 2024
IAPythonAtlas
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Imagine que você é um dos desenvolvedores responsáveis por criar um chatbot de produto do Atlas Search para uma plataforma de e-commerce. Você viu toda essa conversa sobre Atlas Search semântica (vetor) e Geração Aumentada de Recuperação (RAG), então criou um chatbot RAG que usa Atlas Search semântica para ajudar os usuários da Atlas Search através do seu catálogo de produtos usando linguagem natural. Alguns dias depois de implantar esse recurso, sua equipe de suporte relata reclamação de usuários que não conseguem encontrar o que procuram. Um usuário relata ter pesquisado "Size 10 Nike running shoes priced under $150 " e rolar por páginas de sapatos informais, fora do orçamento ou que não sejam daNike antes de encontrar o que precisa.
Isso resulta de uma limitação das incorporações, pois elas podem não capturar palavras-chave ou critérios específicos das queries dos usuários. Neste tutorial, examinaremos alguns cenários em que o vetor Atlas Search por si só é inadequado e verá como melhorá-los usando uma técnica chamada recuperação autoconsulta.
Especificamente, neste tutorial, abordaremos o seguinte:
  • Extraindo filtros de metadados da linguagem natural
  • Combinando filtragem de metadados com o Atlas Search vetorial
  • Obtenção de resultados estruturados de LLMs
  • Construindo um sistema RAG com recuperação autoconsulta

O que são metadados? Por que isso é importante para o RAG?

Metadados são informações que descrevem seus dados. Para produtos em um mercado de e-commerce, os metadados podem incluir ID do produto, marca, categoria, opções de tamanho etc. A extração dos metadados certos durante o processamento de dados brutos para seus aplicativos RAG pode ter várias vantagens downstream.
No MongoDB, os metadados extraídos podem ser usados como pré-filtros para restringir o Atlas Search vetorial com base em critérios que podem não ser capturados com precisão por meio de incorporações, como valores numéricos, datas, categorias, nomes ou identificadores exclusivos, como IDs de produtos. Isso leva à recuperação de resultados mais relevantes da Atlas Search semântica e, consequentemente, a respostas mais precisas do seu aplicativo RAG .

Extraindo filtros de metadados da linguagem natural

Os usuários normalmente interagem com aplicativos LLM usando linguagem natural. No caso do RAG, a recuperação de documentos relevantes de uma base de conhecimento por meio do Atlas Search vetorial requer apenas a incorporação da query do usuário de linguagem natural. No entanto, digamos que você queira aplicar filtros de metadados ao vetor Atlas Search. Nesse caso, isso requer a etapa adicional de extrair os metadados certos da query do usuário e gerar filtros precisos para usar junto com o Atlas Search vetorial. É aqui que a recuperação autoconsulta entra em jogo.
A recuperação autoconsulta, no contexto de aplicativos LLM, é uma técnica que usa LLMs para gerar queries estruturadas a partir de entrada de linguagem natural e as usa para recuperar informações de uma base de conhecimento. No MongoDB, isso envolve as seguintes etapas:
  • Extraindo metadados da linguagem natural
  • Indexando os campos de metadados
  • Geração de definições de filtro precisas contendo apenas expressões e operadores de correspondência da API de query do MongoDB compatíveis
  • Executando a query do Atlas Search vetorial com os pré-filtros

Construindo um sistema RAG com recuperação autoconsulta

Neste tutorial, construiremos um sistema RAG auto-query para um assistente de compromisso. O objetivo do assistente é ajudar os Investidores a tomar decisões informadas, respondendo a perguntas sobre a capacidade financeira, os rendimentos, etc. para empresas nas quais estão interessados. Os Investidores podem ter dúvidas sobre empresas e anos fiscais específicos, mas é difícil capturar essas informações usando incorporações, especialmente após o chunking. Para superar isso, incluiremos a recuperação autoconsulta no fluxo de trabalho do assistente da seguinte maneira:
Arquitetura RAG com auto-query
Dada uma pergunta de usuário, um LLM primeiro tenta extrair metadados dela de acordo com um esquema de metadados especificado. Se os metadados especificados forem encontrados, um LLM gerará uma definição de filtro que é aplicada como um pré-filtro durante a Atlas Search vetorial. Caso contrário, a query do usuário sozinha é usada para recuperar resultados usando o Atlas Search vetorial. Finalmente, o contexto recuperado e a pergunta do usuário, juntamente com quaisquer outros prompts, são passados para um LLM para gerar uma resposta.

Antes de começarmos

Dados

Usaremos 10-K arquivamentos de 30 empresas Fortune 500 — os PDFs brutos estão disponíveis em uma pastadoGoogle Drive. Esses relatórios, exigidos pela Commissão de Valores Mobiliários (SEC) dos EUA, oferecem um detalhamento do desempenho financeiro de uma empresa, incluindo histórico financeiro, declarações, rendimentos por ação e outros pontos de dados cruciais. Para os Investidores, os 10-Ks são ferramentas inestimáveis para tomar decisões informadas. Você pode acessar e baixar facilmente esses relatórios acessando o website de qualquer empresa dos EUA com capital aberto.

Ferramentas

Usaremos o Não estruturado para fragmentar, incorporar e extrair metadados de documentos PDF e prepará-los para o RAG. Usaremos o MongoDB como armazenamento de vetores para nosso aplicativo. Por fim, usaremos o LangGraph para orquestrar nosso sistema RAG como um fluxo de trabalho com estado e semelhante a DAG.

Onde está o código?

O Jupyter Notebook para este tutorial pode ser encontrado no GitHub.

Etapa 1: decidir quais metadados extrair

A primeira etapa da recuperação por autoconsulta é decidir quais metadados extrair de seus dados brutos. Fazer este exercício antes de inserir seus dados em um banco de dados vetorial garante que todos os metadados necessários sejam capturados nos documentos ingeridos e os campos correspondentes indexados. Não fazer isso com antecedência pode resultar em erros e ineficiências downstream, devido ao LLM não extrair os metadados certos das queries do usuário, usar nomes de campo errados ao gerar os pré-filtros ou fazer query de campos não indexados.
Uma boa maneira de determinar quais metadados extrair é trabalhando de trás para frente a partir de sua melhor suposição dos tipos de queries que os usuários podem fazer ao seu sistema. Para o nosso assistente de financiamento, por exemplo, esperamos que os usuários façam perguntas sobre empresas e anos específicos. Portanto, começaremos com estes metadados. MongoDBO flexível do document model significa que você sempre pode atualizar documentos existentes com novos campos de metadados com base em observações de interações de usuários reais.

Etapa 2: instalar as bibliotecas necessárias

Vamos precisar das seguintes bibliotecas para este tutorial:
  • langgraph: Pacote Python para criar aplicativos com estado e vários atores com LLMs
  • openai: Pacote Python para interagir com APIs OpenAI
  • PyMongo: Pacote Python para interagir com bancos de dados e coleções MongoDB
  • phrase-transformers: Pacote Python para modelos de linguagem de código aberto
  • unestruturado-ingest: Pacote Python para processamento de dados usando Não estruturado
1!pip install -qU langgraph openai pymongo sentence-transformers "unstructured-ingest[pdf, s3, mongodb, embed-huggingface]"

Etapa 3: configurar pré-requisitos

Neste tutorial, usaremos a API Serverless do Unestruturado para preparar nosso conjunto de dados para RAG. A API Serverless é uma API de nível de produção para processar 25+ diferentes tipos de arquivos não estruturados, incluindo PDFs. Para usar a API, primeiro inscreva-se para uma avaliação gratuita 14dias. Depois de se inscrever, você terá acesso a um painel personalizado no qual encontrará sua chave de API e o URL do ponto de extremidade da API. Defina estes em seu código:
1UNSTRUCTURED_API_KEY = "your-api-key"
2UNSTRUCTURED_URL = "your-api-url"
Usaremos o MongoDB Atlas como o armazenamento de vetores para nosso aplicativo RAG. Mas, primeiro, você precisará de uma conta do MongoDB Atlas com um cluster de banco de dados. Depois de fazer isso, você precisará obter a connection string para se conectar ao cluster. Siga estas etapas para configurar:
Depois de obter a connection string, defina-a em seu código. Além disso, defina o nome do banco de dados (MONGODB_DB_NAME) e a coleção (MONGODB_COLLECTION) na qual você deseja ingestão de dados. Por fim, instancie o cliente MongoDB para se conectar ao seu banco de dados:
1# Your MongoDB connection string (uri), and collection/database names
2MONGODB_URI = "your-mongodb-uri"
3MONGODB_DB_NAME = "your-db-name"
4MONGODB_COLLECTION = "your-collection-name"
5# Instantiate the MongoDB client
6mongodb_client = MongoClient(
7 MONGODB_URI, appname="devrel.content.selfquery_mongodb_unstructured"
8)
Não se lembre de adicionar o IP da sua máquina host à lista de acesso IP do seu cluster.
O não estruturado suporta o carregamento de documentos de mais de 20 fontes diferentes, incluindo Amazon Web Services S3, Azure Storage e Google Cloud Platform Storage. Para este tutorial, baixamos os PDFs brutos em um bucket S3 , mas você também pode baixá-los em um diretório local e usá-los como fonte de dados.
Se você estiver usando um bucket S3 como fonte de dados, também precisará definir o URI do S3 e suas credenciais de autenticação do Amazon Web Services no código. Se estiver usando um diretório local como fonte de dados, pule esta etapa:
1# Your AWS authentication credentials
2AWS_KEY = "your-aws-access-key-id"
3AWS_SECRET = "your-aws-secret-access-key"
4# S3 URI for the Access Point to the bucket with PDF files
5AWS_S3_NAME = ""
Vamos também especificar o modelo de incorporação e o LLM a serem usados:
1# Embedding model to use
2EMBEDDING_MODEL_NAME = "BAAI/bge-base-en-v1.5"
3# Completion model to use
4COMPLETION_MODEL_NAME = "gpt-4o-2024-08-06"
Usaremos o bge-base-en-v1 de código aberto daBAAI .5 modelo de incorporação de Abraçando a face. O modelo produz incorporações com 768 dimensões. Quanto ao LLM, usaremos a versão mais recente do GPT-4o durante o tutorial.

Etapa 4: particionar, chunk e incorporar arquivos PDF

Agora que definimos todos os nossos pré-requisitos, vamos definir um pipeline de processamento de PDF usando a biblioteca de ingestãonão estruturada .
1from unstructured_ingest.v2.pipeline.pipeline import Pipeline
2from unstructured_ingest.v2.interfaces import ProcessorConfig
3from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
4from unstructured_ingest.v2.processes.chunker import ChunkerConfig
5from unstructured_ingest.v2.processes.embedder import EmbedderConfig
6from unstructured_ingest.v2.processes.connectors.fsspec.s3 import (
7 S3ConnectionConfig,
8 S3DownloaderConfig,
9 S3IndexerConfig,
10 S3AccessConfig,
11)
12from unstructured_ingest.v2.processes.connectors.local import LocalUploaderConfig
13
14WORK_DIR = "/content/temp"
15
16Pipeline.from_configs(
17 context=ProcessorConfig(
18 verbose=True, tqdm=True, num_processes=5, work_dir=WORK_DIR
19 ),
20 indexer_config=S3IndexerConfig(remote_url=AWS_S3_NAME),
21 downloader_config=S3DownloaderConfig(),
22 source_connection_config=S3ConnectionConfig(
23 access_config=S3AccessConfig(key=AWS_KEY, secret=AWS_SECRET)
24 ),
25 partitioner_config=PartitionerConfig(
26 partition_by_api=True,
27 api_key=UNSTRUCTURED_API_KEY,
28 partition_endpoint=UNSTRUCTURED_URL,
29 strategy="hi_res",
30 additional_partition_args={
31 "split_pdf_page": True,
32 "split_pdf_allow_failed": True,
33 "split_pdf_concurrency_level": 15,
34 },
35 ),
36 chunker_config=ChunkerConfig(
37 chunking_strategy="by_title",
38 chunk_max_characters=1500,
39 chunk_overlap=150,
40 ),
41 embedder_config=EmbedderConfig(
42 embedding_provider="langchain-huggingface",
43 embedding_model_name=EMBEDDING_MODEL_NAME,
44 ),
45 uploader_config=LocalUploaderConfig(output_dir="/content/ingest-outputs"),
46).run()
O pipeline acima é construído a partir de várias configurações que definem diferentes aspectos de seu comportamento:
  • ProcessorConfig: define os parâmetros gerais do comportamento do pipeline, como registro, localização do cache (este é importante neste exemplo), reprocessamento etc.
  • S3IndexerConfig, S3DownloaderConfige S3ConnectionConfig: estas são as configurações do conector de origem Amazon Web Services S3.
  • PartitionerConfig: divide os documentos em JSON padronizado contendo elementos de documentos e metadados. A estratégiahi_res significa que a API não estruturada empregará OCR, modelos de entendimento de documentos e métodos clássicos de ML para extrair e classificar elementos de documentos. Essa estratégia é recomendada para PDFs complexos que contêm mais do que apenas texto, como tabelas e imagens. Saiba mais sobre estratégias de particionamento.
  • ChunkerConfig: depois que todos os documentos forem particionados, a próxima etapa é dividi-los em chunks. Os parâmetros nesta configuração controlam o comportamento de chunking. Usamos a estratégia de agrupamentoby_title para manter as seções do documento juntas até que o tamanho do bloco (chunk_max_characters) seja atingido. Especificamos um chunk_overlap para manter a continuação nos limites do bloco.
  • EmbedderConfig: a etapa de processamento final é incorporar os chunks. Usaremos o modelobge-base-en-v1.5 por meio da integraçãolangchain-huggingface .
  • LocalUploaderConfig: esta configuração nos permite armazenar os resultados finais localmente no diretório especificado.
Quando o pipeline terminar de executar, teremos os resultados finais em *.json arquivos no /content/ingest-outputs diretório local, bem como resultados em cache para cada uma das etapas do pipeline no WORK_DIR diretório.
Se estiver usando um diretório local como fonte de dados, descomente as importações e configurações do conector local no código e comente as importações e configurações do conector S3 .

Etapa 5: Adicionar metadados personalizados aos documentos processados

Nesta etapa, queremos adicionar o nome da empresa e o ano fiscal como metadados personalizados a cada chunk, para habilitar a pré-filtragem durante o Atlas Search vetorial. Felizmente, os documentos do Formulário10K têm uma página mais ou menos padrão com essas informações, para que possamos usar expressões regulares simples (regex) para extrair os metadados necessários deles.
Vamos primeiro definir uma função para extrair o ano:
1def get_fiscal_year(elements: dict) -> int:
2 # Regular expression pattern to find the element containing the fiscal year
3 pattern = r"for the (fiscal\s+)?year ended.*?(\d{4})"
4 year = 0
5 for i in range(len(elements)):
6 match = re.search(pattern, elements[i]["text"], re.IGNORECASE)
7 if match:
8 year = match.group(0)[-4:]
9 try:
10 year = int(year)
11 except:
12 year = 0
13 return year
Os documentos Formulário-10K são arquivados para um ano fiscal específico, e há uma frase padrão em cada documento que indica em qual ano o documento foi arquivado, por exemplo, "For the fiscal year ended December 31, 2023. " A função acima usa os elementos de um documento como entrada, usa um regex para encontrar a linha que contém a data de registro e extrai o ano dessa linha. Retornamos o ano como int para poder aproveitar os operadores de comparação mais tarde.
Vamos definir outra função para extrair o nome da empresa:
1def get_company_name(elements: dict) -> str:
2 name = ""
3 # In most cases the name of the company is right before/above the following line
4 substring = "(Exact name of registrant as specified"
5 for i in range(len(elements)):
6 if substring.lower() in elements[i]["text"].lower():
7 pattern = (
8 r"([A-Z][A-Za-z\s&.,]+?)\s*\(Exact name of registrant as specified"
9 )
10 match = re.search(pattern, elements[i]["text"], re.IGNORECASE)
11 if match:
12 name = match.group(1).strip()
13 name = name.split("\n\n")[-1]
14
15 if name == "":
16 for i in range(len(elements)):
17 # In some cases, the name of the company is right after/below the following line
18 match = re.search(
19 r"Exact name of registrant as specified in its charter:\n\n(.*?)\n\n",
20 elements[i]["text"],
21 )
22 if match:
23 name = match.group(1)
24 else:
25 # In other cases, the name follows the "Commission File Number [Number]" line
26 match = re.search(
27 r"Commission File Number.*\n\n(.*?)\n\n", elements[i]["text"]
28 )
29 if match:
30 name = match.group(1)
31 return name
A extração do nome da empresa é semelhante à forma como extraímos o ano fiscal, exceto pelo fato de que há um pouco mais de variação de onde exatamente o nome da empresa é mencionado no documento, portanto, empregamos várias expressões regex. Na maioria dos casos, você encontrará o nome da empresa logo após a linha, "Exact name of registrant as specified. " No entanto, em alguns casos, o nome da empresa precede esta linha ou segue o "Commission File Number [Number] ".
Agora, vamos percorrer o diretório com os resultados da incorporação e, para cada documento, encontrar o nome e o ano da empresa e adicioná-los como metadados personalizados a todos os elementos do documento:
1directory = f"{WORK_DIR}/embed"
2
3for filename in os.listdir(directory):
4 if filename.endswith(".json"):
5 file_path = os.path.join(directory, filename)
6 print(f"Processing file {filename}")
7 try:
8 with open(file_path, "r") as file:
9 data = json.load(file)
10
11 company_name = get_company_name(data)
12 fiscal_year = get_fiscal_year(data)
13
14 # Add custom metadata fields to each entry
15 for entry in data:
16 entry["metadata"]["custom_metadata"] = {}
17 entry["metadata"]["custom_metadata"]["company"] = company_name
18 entry["metadata"]["custom_metadata"]["year"] = fiscal_year
19
20 with open(file_path, "w") as file:
21 json.dump(data, file, indent=2)
22
23 print(f"Successfully updated {file_path} with custom metadata fields.")
24 except json.JSONDecodeError as e:
25 print(f"Error parsing JSON in {file_path}: {e}")
26 except IOError as e:
27 print(f"Error reading from or writing to {file_path}: {e}")
O código acima lê *.json arquivos do diretório de trabalho que contém os resultados da etapa de incorporação do pipeline. Cada arquivo é o resultado do processamento de um único documento, então você tem um arquivo JSON para cada documento PDF original. Para cada arquivo, estamos usando as funções definidas anteriormente para extrair a data fiscal e o nome da empresa e adicionar esses valores como campos de metadados personalizados a todos os elementos do documento. Em seguida, escrevemos os resultados de volta no arquivo original.

Etapa 6: gravar os documentos processados no MongoDB

Para gravar os documentos processados finais no MongoDB, precisaremos executar novamente o pipeline que definimos na etapa 4, exceto pelo fato de que agora alteraremos o destino de local para MongoDB.
1from unstructured_ingest.v2.processes.connectors.mongodb import (
2 MongoDBConnectionConfig,
3 MongoDBUploadStagerConfig,
4 MongoDBUploaderConfig,
5 MongoDBAccessConfig,
6)
7
8Pipeline.from_configs(
9 context=ProcessorConfig(
10 verbose=True, tqdm=True, num_processes=5, work_dir=WORK_DIR
11 ),
12 indexer_config=S3IndexerConfig(remote_url=AWS_S3_NAME),
13 downloader_config=S3DownloaderConfig(),
14 source_connection_config=S3ConnectionConfig(
15 access_config=S3AccessConfig(key=AWS_KEY, secret=AWS_SECRET)
16 ),
17 partitioner_config=PartitionerConfig(
18 partition_by_api=True,
19 api_key=UNSTRUCTURED_API_KEY,
20 partition_endpoint=UNSTRUCTURED_URL,
21 strategy="hi_res",
22 additional_partition_args={
23 "split_pdf_page": True,
24 "split_pdf_allow_failed": True,
25 "split_pdf_concurrency_level": 15,
26 },
27 ),
28 chunker_config=ChunkerConfig(
29 chunking_strategy="by_title",
30 chunk_max_characters=1500,
31 chunk_overlap=150,
32 ),
33 embedder_config=EmbedderConfig(
34 embedding_provider="langchain-huggingface",
35 embedding_model_name=EMBEDDING_MODEL_NAME,
36 ),
37 destination_connection_config=MongoDBConnectionConfig(
38 access_config=MongoDBAccessConfig(uri=MONGODB_URI),
39 collection=MONGODB_COLLECTION,
40 database=MONGODB_DB_NAME,
41 ),
42 stager_config=MongoDBUploadStagerConfig(),
43 uploader_config=MongoDBUploaderConfig(batch_size=100),
44).run()
O pipeline acima é o mesmo que tivemos na etapa 4, a única diferença é o local de destino dos resultados do processamento de dados, que agora é a coleção MongoDB especificada na etapa 2. O destino MongoDB é definido usando três configurações: MongoDBConnectionConfig, MongoDBUploadStagerConfige MongoDBUploaderConfig. Essas configurações estabelecem a conexão com nosso banco de dados MongoDB Atlas, especificam onde os dados precisam ser carregados e anote quaisquer parâmetros de carregamento adicionais, como batch_size, para ingestão em massa dos dados no MongoDB.
Um exemplo de documento ingerido no MongoDB é o seguinte:
Exemplo de documento MongoDB
Observe os campos de metadados personalizados metadata.custom_metadata.company e metadata.custom_metadata.year contendo o nome da empresa e o ano em que o documento foi publicado.

Etapa 7: definir o estado do gráfico

Como mencionado anteriormente, usaremos o LangGraph para orquestrar nosso sistema RAG. O LangGraph permite construir sistemas LLM como gráficos. Os nós do grafo são funções ou ferramentas para executar tarefas específicas, enquanto as bordas definem rotas entre os nós — elas podem ser fixas, condicionais ou até cíclicas. Cada grafo tem um estado que é uma estrutura de dados compartilhada que todos os nós podem acessar e fazer atualizações. Você pode definir atributos personalizados dentro do estado, dependendo de quais parâmetros deseja acompanhar nos nós do gráfico.
Go em frente e definir o estado do nosso gráfico:
1class GraphState(TypedDict):
2 """
3 Represents the state of the graph.
4
5 Attributes:
6 question: User query
7 metadata: Extracted metadata
8 filter: Filter definition
9 documents: List of retrieved documents from vector search
10 memory: Conversational history
11 """
12
13 question: str
14 metadata: Dict
15 filter: Dict
16 context: List[str]
17 memory: Annotated[list, add_messages]
O código acima:
  • Cria uma classeGraphState que representa o estado do gráfico para nosso assistente.
  • Define o esquema de estado usando o modeloTypedDict para criar dicionários seguros por tipo que podem ajudar a detectar erros no tempo de compilação em vez de no tempo de execução.
  • Anota a chavememory no estado com a funçãoadd_messages redução, informando ao LangGraph para anexar novas mensagens à lista existente em vez de substituí-la.

Etapa 8: definir nós do gráfico

Em seguida, vamos definir os nós do nosso grafo. Os nós contêm a lógica principal do sistema. Elas são essencialmente funções do Python e podem ou não usar um LLM. Cada nó recebe o grafo state como entrada, executa alguns cálculos e retorna um estado atualizado.
Nosso assistente possui quatro funcionalidades principais:
  • Extraia metadados de uma query de linguagem natural.
  • Gere uma definição de filtro de API de query MongoDB.
  • Recupere documentos do MongoDB usando o semântica Atlas Search.
  • Gere uma resposta para a pergunta do usuário.
Vamos definir nós para cada uma das funcionalidades acima.

Extração de metadados

Para o nosso assistente, extrair metadados em um formato específico é crucial para evitar erros e resultados incorretos downstream. A OpenAI lança recentemente Saídas estruturadas, um recurso que garante que seus modelos sempre gerem respostas aderentes a um esquema especificado. Vamos tentar!
Vamos criar um modelo Pydentic que defina o esquema de metadados:
1class Metadata(BaseModel):
2 """Metadata to use for pre-filtering."""
3
4 company: List[str] = Field(description="List of company names")
5 year: List[str] = Field(description="List containing start year and end year")
No código acima, criamos um modelo Pydentic chamado Metadata com company e year como atributos. Cada atributo é do tipo List[str], indicando uma lista de strings.
Agora, vamos definir uma função para extrair metadados da pergunta do usuário:
1def extract_metadata(state: Dict) -> Dict:
2 """
3 Extract metadata from natural language query.
4
5 Args:
6 state (Dict): The current graph state
7
8 Returns:
9 Dict: New key added to state i.e. metadata containing the metadata extracted from the user query.
10 """
11 print("---EXTRACTING METADATA---")
12 question = state["question"]
13 system = f"""Extract the specified metadata from the user question:
14 - company: List of company names, eg: Google, Adobe etc. Match the names to companies on this list: {companies}
15 - year: List of [start year, end year]. Guidelines for extracting dates:
16 - If a single date is found, only include that.
17 - For phrases like 'in the past X years/last year', extract the start year by subtracting X from the current year. The current year is {datetime.now().year}.
18 - If more than two dates are found, only include the smallest and the largest year."""
19 completion = openai_client.beta.chat.completions.parse(
20 model=COMPLETION_MODEL_NAME,
21 messages=[
22 {"role": "system", "content": system},
23 {"role": "user", "content": question},
24 ],
25 response_format=Metadata,
26 )
27 result = completion.choices[0].message.parsed
28 # If no metadata is extracted return an empty dictionary
29 if len(result.company) == 0 and len(result.year) == 0:
30 return {"metadata": {}}
31 metadata = {
32 "metadata.custom_metadata.company": result.company,
33 "metadata.custom_metadata.year": result.year,
34 }
35 return {"metadata": metadata}
O código acima:
  • Obtém a pergunta do usuário a partir do estado do gráfico (state).
  • Define um prompt do sistema instruindo o LLM a extrair nomes de empresas e intervalos de datas da pergunta. Solicitamos especificamente ao LLM que corresponda os nomes das empresas à lista de empresas em nosso conjunto de dados. Caso contrário, o pré-filtro não corresponderá a nenhum registro em nossa base de conhecimento.
  • Passa o modeloMetadata Pydentic como o argumento response_formatpara o métodoparse() da API de Compleções de bate-papo do OpenAI para obter a resposta do LLM de volta como um objetoMetadata .
  • Define o atributometadata do gráfico state como {}, se nenhum metadado for extraído pelo LLM.
  • Define o atributometadata a um dicionário que consiste em chaves correspondentes a campos de metadados em nossos documentos MongoDB — ou seja, metadata.custom_metadata.company e metadata.custom_metadata.year — e valores obtidos a partir da resposta do LLM, se forem encontrados metadados.

Geração de filtro

Em seguida, vamos definir uma função para gerar a definição de filtro:
1def generate_filter(state: Dict) -> Dict:
2 """
3 Generate MongoDB Query API filter definition.
4
5 Args:
6 state (Dict): The current graph state
7
8 Returns:
9 Dict: New key added to state i.e. filter.
10 """
11 print("---GENERATING FILTER DEFINITION---")
12 metadata = state["metadata"]
13 system = """Generate a MongoDB filter definition from the provided fields. Follow the guidelines below:
14 - Respond in JSON with the filter assigned to a `filter` key.
15 - The field `metadata.custom_metadata.company` is a list of companies.
16 - The field `metadata.custom_metadata.year` is a list of one or more years.
17 - If any of the provided fields are empty lists, DO NOT include them in the filter.
18 - If both the metadata fields are empty lists, return an empty dictionary {{}}.
19 - The filter should only contain the fields `metadata.custom_metadata.company` and `metadata.custom_metadata.year`
20 - The filter can only contain the following MongoDB Query API match expressions:
21 - $gt: Greater than
22 - $lt: Lesser than
23 - $gte: Greater than or equal to
24 - $lte: Less than or equal to
25 - $eq: Equal to
26 - $ne: Not equal to
27 - $in: Specified field value equals any value in the specified array
28 - $nin: Specified field value is not present in the specified array
29 - $nor: Logical NOR operation
30 - $and: Logical AND operation
31 - $or: Logical OR operation
32 - If the `metadata.custom_metadata.year` field has multiple dates, create a date range filter using expressions such as $gt, $lt, $lte and $gte
33 - If the `metadata.custom_metadata.company` field contains a single company, use the $eq expression
34 - If the `metadata.custom_metadata.company` field contains multiple companies, use the $in expression
35 - To combine date range and company filters, use the $and operator
36 """
37 completion = openai_client.chat.completions.create(
38 model=COMPLETION_MODEL_NAME,
39 temperature=0,
40 messages=[
41 {"role": "system", "content": system},
42 {"role": "user", "content": f"Fields: {metadata}"},
43 ],
44 response_format={"type": "json_object"},
45 )
46 result = json.loads(completion.choices[0].message.content)
47 return {"filter": result.get("filter", {})}
O código acima:
  • Extrai metadados do estado do gráfico.
  • Define um prompt do sistema contendo instruções sobre como criar a definição de filtro — expressões de correspondência e operadores suportados, campos a serem usados etc.
  • Obtém a saída do LLM especificado como um objeto JSON passando json_object como argumentoresponse_format para a chamada da API de Conclusões de bate-papo, bem como especificando a palavra JSON no prompt do sistema.
  • Define o atributofilter do estado do gráfico para a definição de filtro gerada.
A próxima etapa é definir um nó de gráfico que recupera documentos do MongoDB usando o Atlas Search semântica. Mas, primeiro, você precisará criar um índice vetorial do Atlas Search:
1collection = mongodb_client[MONGODB_DB_NAME][MONGODB_COLLECTION]
2VECTOR_SEARCH_INDEX_NAME = "vector_index"
3
4model = {
5 "name": VECTOR_SEARCH_INDEX_NAME,
6 "type": "vectorSearch",
7 "definition": {
8 "fields": [
9 {
10 "type": "vector",
11 "path": "embeddings",
12 "numDimensions": 768,
13 "similarity": "cosine",
14 },
15 {"type": "filter", "path": "metadata.custom_metadata.company"},
16 {"type": "filter", "path": "metadata.custom_metadata.year"},
17 ]
18 },
19}
20collection.create_search_index(model=model)
O código acima:
  • Especifica a collection contra a qual executar a Atlas Search semântica.
  • Especifica o nome do índice vetorial do Atlas Search.
  • Especifica a definição de índice do vetor Atlas Search que contém o caminho para o campo de incorporações nos documentos (path), o número de dimensões de incorporação (numDimensions), a métrica de similaridade para encontrar os vizinhos mais próximos (similarity) e o caminho para os campos de filtro a serem indexados.
  • Cria o índice vetorial do Atlas Search.
Agora, vamos definir uma função que executa o Atlas Search vetorial:
1def vector_search(state: Dict) -> Dict:
2 """
3 Get relevant information using MongoDB Atlas Vector Search
4
5 Args:
6 state (Dict): The current graph state
7
8 Returns:
9 Dict: New key added to state i.e. documents.
10 """
11 print("---PERFORMING VECTOR SEARCH---")
12 question = state["question"]
13 filter = state["filter"]
14 # We always want a valid filter object
15 if not filter:
16 filter = {}
17 query_embedding = embedding_model.encode(question).tolist()
18 pipeline = [
19 {
20 "$vectorSearch": {
21 "index": VECTOR_SEARCH_INDEX_NAME,
22 "path": "embeddings",
23 "queryVector": query_embedding,
24 "numCandidates": 150,
25 "limit": 5,
26 "filter": filter,
27 }
28 },
29 {
30 "$project": {
31 "_id": 0,
32 "text": 1,
33 "score": {"$meta": "vectorSearchScore"},
34 }
35 },
36 ]
37 # Execute the aggregation pipeline
38 results = collection.aggregate(pipeline)
39 # Drop documents with cosine similarity score < 0.8
40 relevant_results = [doc["text"] for doc in results if doc["score"] >= 0.8]
41 context = "\n\n".join([doc for doc in relevant_results])
42 return {"context": context}
O código acima:
  • Lê a pergunta do usuário e a definição de filtro a partir do estado do gráfico.
  • Define como um objeto vazio, se nenhum filtro for encontrado. Lembre-se de que pularemos diretamente para o Atlas Search vetorial se nenhum metadado for extraído.
  • Incorpora a query do usuário utilizando o modelobge-base-en-v1.5.
  • Cria e executa um pipeline de agregação para realizar a Pesquisa semântica do Atlas. O estágio$vectorSearch recupera documentos usando o Atlas Search do vizinho mais próximo — o camponumCandidates indica quantos vizinhos mais próximos devem ser considerados, o campolimit indica quantos documentos devem ser devolvidos e o campofilter especifica os pré-filtros. O estágio$project que segue gera somente os campos especificados — ou seja, text e a pontuação de similaridade do Atlas Search a partir dos documentos retornados.
  • Filtra documentos com uma pontuação de similaridade de < 0.8 para garantir que somente documentos altamente relevantes sejam retornados.
  • Concatena os valores de campotext da lista final de documentos em uma string e a define como o atributocontext do estado do gráfico.

Geração de resposta

Vamos definir o nó final em nosso grafo para gerar respostas às perguntas.
1def generate_answer(state: Dict) -> Dict:
2 """
3 Generate the final answer to the user query
4
5 Args:
6 state (Dict): The current graph state
7
8 Returns:
9 Dict: New key added to state i.e. generation.
10 """
11 print("---GENERATING THE ANSWER---")
12 question = state["question"]
13 context = state["context"]
14 memory = state["memory"]
15 system = f"Answer the question based only on the following context. If the context is empty or if it doesn't provide enough information to answer the question, say I DON'T KNOW"
16 completion = openai_client.chat.completions.create(
17 model=COMPLETION_MODEL_NAME,
18 temperature=0,
19 messages=[
20 {"role": "system", "content": system},
21 {
22 "role": "user",
23 "content": f"Context:\n{context}\n\n{memory}\n\nQuestion:{question}",
24 },
25 ],
26 )
27 answer = completion.choices[0].message.content
28 return {"memory": [HumanMessage(content=context), AIMessage(content=answer)]}
O código acima:
  • Lê a pergunta do usuário, o contexto recuperado do Atlas Search vetorial e qualquer histórico de conversa a partir do estado do gráfico.
  • Define um prompt do sistema para instruir o LLM a responder apenas com base no contexto fornecido. Este é um passo simples para mitigar atordoamentos.
  • Passa o prompt do sistema, o contexto recuperado, o histórico de bate-papo e a pergunta do usuário para a API de conclusões de bate-papo do OpenAI para obter uma resposta do LLM especificado.
  • Atualiza o atributo de memória do estado do gráfico adicionando o contexto recuperado e a resposta LLM à lista existente de mensagens.

Etapa 9: Definir bordas condicionais

Como mencionado anteriormente, as bordas em um gráfico LangGraph definem interações entre nós. Nosso gráfico principalmente tem bordas fixas entre os nós, exceto uma borda condicional para pular a geração de filtro e Go diretamente para o Atlas Search vetorial com base na extração ou não dos metadados da query do usuário.
Vamos definir a lógica para esta borda condicional:
1def check_metadata_extracted(state: Dict) -> str:
2 """
3 Check if any metadata is extracted.
4
5 Args:
6 state (Dict): The current graph state
7
8 Returns:
9 str: Binary decision for next node to call
10 """
11
12 print("---CHECK FOR METADATA---")
13 metadata = state["metadata"]
14 # If no metadata is extracted, skip the generate filter step
15 if not metadata:
16 print("---DECISION: SKIP TO VECTOR SEARCH---")
17 return "vector_search"
18 # If metadata is extracted, generate filter definition
19 else:
20 print("---DECISION: GENERATE FILTER---")
21 return "generate_filter"
No código acima, extraímos o atributo de metadados do estado do gráfico. Se nenhum metadado for encontrado, ela retornará a string “vector_search.” Caso contrário, ela retornará a string “generate_filter.” Veremos como vincular as saídas desta função de roteamento aos nós na próxima etapa.

Etapa 10: construir o gráfico

Finalmente, vamos construir o grafo conectando os nós usando arestas.
1from langgraph.graph import END, StateGraph, START
2from langgraph.checkpoint.memory import MemorySaver
3from IPython.display import Image, display
4
5workflow = StateGraph(GraphState)
6# Adding memory to the graph
7memory = MemorySaver()
8
9# Add nodes
10workflow.add_node("extract_metadata", extract_metadata)
11workflow.add_node("generate_filter", generate_filter)
12workflow.add_node("vector_search", vector_search)
13workflow.add_node("generate_answer", generate_answer)
14
15# Add edges
16workflow.add_edge(START, "extract_metadata")
17workflow.add_conditional_edges(
18 "extract_metadata",
19 check_metadata_extracted,
20 {
21 "vector_search": "vector_search",
22 "generate_filter": "generate_filter",
23 },
24)
25workflow.add_edge("generate_filter", "vector_search")
26workflow.add_edge("vector_search", "generate_answer")
27workflow.add_edge("generate_answer", END)
28
29# Compile the graph
30app = workflow.compile(checkpointer=memory)
O código acima:
  • Instancia o gráfico usando a classeStateGraph e o parametriza com o estado de gráfico definido na etapa 7.
  • Inicializa um checkpointer na memória utilizando a classeMemorySaver(). Os checkpoints persistem no estado do gráfico entre as interações no LangGraph. É assim que nosso sistema RAG persiste o histórico de bate-papo.
  • Adiciona todos os nós definidos na Etapa 8 no gráfico utilizando o métodoadd_node. O primeiro argumento para add_node é o nome do nó e o segundo argumento é a função Python correspondente ao nó.
  • Adiciona bordas fixas usando o métodoadd_edge. Observe as bordas entre STARTe extract_metadatae generate_answer e END. START e END são nós especiais para indicar qual nó chamar primeiro e quais nós não têm ações seguintes.
  • Adiciona bordas condicionais do extract_metadataaos nósgenerate_filter e vector_search com base nas saídas da função de roteamento definida na etapa 9.
  • Compila o gráfico com o ponto de verificação da memória.
Podemos visualizar nosso gráfico como um diagrama de sereia. Isso ajuda a verificar os nós e as bordas, especialmente para fluxos de trabalho complicados.
1try:
2 display(Image(app.get_graph().draw_mermaid_png()))
3except Exception:
4 # This requires some extra dependencies and is optional
5 pass
No código acima, usamos o métodoget_graph() para obter o gráfico compilado e o métododraw_mermaid_png para gerar um diagrama Mermaid do gráfico como uma imagem PNG.
Veja como nosso gráfico se parece:
Diagrama sereia de um grafo LangGraph

Etapa 11: execute o gráfico

Como última etapa, vamos definir uma função que recebe a entrada do usuário e transmite as saídas da execução do gráfico de volta para o usuário.
1def execute_graph(thread_id: str, question: str) -> None:
2 """
3 Execute the graph and stream its output
4
5 Args:
6 thread_id (str): Conversation thread ID
7 question (str): User question
8 """
9 # Add question to the question and memory attributes of the graph state
10 inputs = {"question": question, "memory": [HumanMessage(content=question)]}
11 config = {"configurable": {"thread_id": thread_id}}
12 # Stream outputs as they come
13 for output in app.stream(inputs, config):
14 for key, value in output.items():
15 print(f"Node {key}:")
16 print(value)
17 print("---FINAL ANSWER---")
18 print(value["memory"][-1].content)
O código acima:
  • Aceita a pergunta do usuário (question) e o ID do tópico (thread_id) como entrada.
  • Cria um dicionárioinputs contendo as atualizações iniciais no estado do gráfico. Em nosso caso, queremos adicionar a pergunta do usuário aos atributosquestion e memory do estado do gráfico.
  • Cria uma configuração de tempo de execução contendo o ID do thread de entrada. IDs de thread são IDs únicos atribuídos a checkpoints de memória no LangGraph. Cada checkpoint tem um estado separado.
  • Transmite as saídas do gráfico, especificamente os nomes dos nós e as saídas.
  • Imprime a resposta final do sistema, extraída do valor de retorno do último nó no gráfico — ou seja, o generate_answer.
Um exemplo de execução do sistema tem a seguinte aparência:
1execute_graph("1", "Sales summary for Walmart for 2023.")
2
3---EXTRACTING METADATA---
4---CHECK FOR METADATA---
5---DECISION: GENERATE FILTER---
6'Node extract_metadata:'
7{'metadata': {'metadata.custom_metadata.company': ['WALMART INC.'], 'metadata.custom_metadata.year': ['2023']}}
8---GENERATING FILTER DEFINITION---
9'Node generate_filter:'
10{'filter': {'$and': [{'metadata.custom_metadata.company': {'$eq': 'WALMART INC.'}}, {'metadata.custom_metadata.year': {'$eq': 2023}}]}}
11---PERFORMING VECTOR SEARCH---
12'Node vector_search:'
13{'context': 'DOCUMENTS INCORPORATED BY REFERENCE...'}
14---GENERATING THE ANSWER---
15'Node generate_answer:'
16{'memory': [HumanMessage(content='DOCUMENTS INCORPORATED BY REFERENCE...'), AIMessage(content='In fiscal 2023, Walmart generated total revenues of $611.3 billion, primarily comprised of net sales amounting to $605.9 billion. Walmart International contributed $101.0 billion to the fiscal 2023 consolidated net sales, representing 17% of the total.')]}
17---FINAL ANSWER---
18In fiscal 2023, Walmart generated total revenues of $611.3 billion, primarily comprised of net sales amounting to $605.9 billion. Walmart International contributed $101.0 billion to the fiscal 2023 consolidated net sales, representing 17% of the total.
Façamos uma pergunta de acompanhamento para garantir que o gráfico persista no histórico de bate-papo:
1execute_graph("1", "What did I just ask you?")
2
3---EXTRACTING METADATA---
4---CHECK FOR METADATA---
5---DECISION: SKIP TO VECTOR SEARCH---
6'Node extract_metadata:'
7{'metadata': {}}
8---PERFORMING VECTOR SEARCH---
9'Node vector_search:'
10{'context': ''}
11---GENERATING THE ANSWER---
12'Node generate_answer:'
13{'memory': [HumanMessage(content=''), AIMessage(content='You asked for the sales summary for Walmart for 2023.')]}
14---FINAL ANSWER---
15You asked for the sales summary for Walmart for 2023.
O sistema conseguiu se lembrar do que perguntamos anteriormente.
Aqui Go você ! Construímos com sucesso um sistema RAG com recuperação e memória autoconsultas.

Conclusão

Neste tutorial, aprenderam como construir um sistema RAG avançado que aproveita a recuperação auto-consulta. Essas técnicas são úteis em cenários em que as queries do usuário podem conter palavras-chave importantes que as incorporações podem não capturar. Criamos um assistente de capital que extrai nomes de empresas e intervalos de datas de consultas de usuários, usa-os para gerar definições de filtro do MongoDB e os aplica como pré-filtros durante o Atlas Search vetorial para obter as informações mais precisas para que um LLM responda a perguntas sobre o cenário financeiro desempenho de empresas específicas durante determinados períodos de tempo.
Para obter conteúdo mais avançado, como a criação de sistemas de agentes, confira os seguintes tutoriais em nosso Centro do Desenvolvedor:
Principais comentários nos fóruns
Avatar do Comentarista do Fórum
J ack_WoehrJ ack W oehr3 meses atrás

oooh ... preciso fazer essa ... brb :grin:


Avatar do Comentarista do Fórum
joao_schaab joaoschaab3 meses atrás

Há um exemplo usando a versão JavaScript das estruturas?

Veja mais nos fóruns

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Parte #3: pesquise seus dados semanticamente com o Atlas Vector Search do MongoDB


Sep 18, 2024 | 6 min read
Tutorial

Criar um cluster multinuvem com o MongoDB Atlas


Sep 23, 2022 | 11 min read
Tutorial

Tutorial do MongoDB Atlas Data Federation: consultas federadas e $out para AWS S3


Jan 23, 2024 | 7 min read
Tutorial

Aplicativos MEAN Stack sem servidor com Cloud Run e MongoDB Atlas


Apr 02, 2024 | 8 min read
Sumário