Como criar um sistema RAG avançado com recuperação autoquery
Apoorva Joshi, Maria Khalusova21 min read • Published Sep 12, 2024 • Updated Sep 12, 2024
Avalie esse Tutorial
Imagine que você é um dos desenvolvedores responsáveis por criar um chatbot de produto do Atlas Search para uma plataforma de e-commerce. Você viu toda essa conversa sobre Atlas Search semântica (vetor) e Geração Aumentada de Recuperação (RAG), então criou um chatbot RAG que usa Atlas Search semântica para ajudar os usuários da Atlas Search através do seu catálogo de produtos usando linguagem natural. Alguns dias depois de implantar esse recurso, sua equipe de suporte relata reclamação de usuários que não conseguem encontrar o que procuram. Um usuário relata ter pesquisado "Size 10 Nike running shoes priced under $150 " e rolar por páginas de sapatos informais, fora do orçamento ou que não sejam daNike antes de encontrar o que precisa.
Isso resulta de uma limitação das incorporações, pois elas podem não capturar palavras-chave ou critérios específicos das queries dos usuários. Neste tutorial, examinaremos alguns cenários em que o vetor Atlas Search por si só é inadequado e verá como melhorá-los usando uma técnica chamada recuperação autoconsulta.
Especificamente, neste tutorial, abordaremos o seguinte:
- Extraindo filtros de metadados da linguagem natural
- Combinando filtragem de metadados com o Atlas Search vetorial
- Obtenção de resultados estruturados de LLMs
- Construindo um sistema RAG com recuperação autoconsulta
Metadados são informações que descrevem seus dados. Para produtos em um mercado de e-commerce, os metadados podem incluir ID do produto, marca, categoria, opções de tamanho etc. A extração dos metadados certos durante o processamento de dados brutos para seus aplicativos RAG pode ter várias vantagens downstream.
No MongoDB, os metadados extraídos podem ser usados como pré-filtros para restringir o Atlas Search vetorial com base em critérios que podem não ser capturados com precisão por meio de incorporações, como valores numéricos, datas, categorias, nomes ou identificadores exclusivos, como IDs de produtos. Isso leva à recuperação de resultados mais relevantes da Atlas Search semântica e, consequentemente, a respostas mais precisas do seu aplicativo RAG .
Os usuários normalmente interagem com aplicativos LLM usando linguagem natural. No caso do RAG, a recuperação de documentos relevantes de uma base de conhecimento por meio do Atlas Search vetorial requer apenas a incorporação da query do usuário de linguagem natural. No entanto, digamos que você queira aplicar filtros de metadados ao vetor Atlas Search. Nesse caso, isso requer a etapa adicional de extrair os metadados certos da query do usuário e gerar filtros precisos para usar junto com o Atlas Search vetorial. É aqui que a recuperação autoconsulta entra em jogo.
A recuperação autoconsulta, no contexto de aplicativos LLM, é uma técnica que usa LLMs para gerar queries estruturadas a partir de entrada de linguagem natural e as usa para recuperar informações de uma base de conhecimento. No MongoDB, isso envolve as seguintes etapas:
- Extraindo metadados da linguagem natural
- Indexando os campos de metadados
- Geração de definições de filtro precisas contendo apenas expressões e operadores de correspondência da API de query do MongoDB compatíveis
- Executando a query do Atlas Search vetorial com os pré-filtros
Neste tutorial, construiremos um sistema RAG auto-query para um assistente de compromisso. O objetivo do assistente é ajudar os Investidores a tomar decisões informadas, respondendo a perguntas sobre a capacidade financeira, os rendimentos, etc. para empresas nas quais estão interessados. Os Investidores podem ter dúvidas sobre empresas e anos fiscais específicos, mas é difícil capturar essas informações usando incorporações, especialmente após o chunking. Para superar isso, incluiremos a recuperação autoconsulta no fluxo de trabalho do assistente da seguinte maneira:
Dada uma pergunta de usuário, um LLM primeiro tenta extrair metadados dela de acordo com um esquema de metadados especificado. Se os metadados especificados forem encontrados, um LLM gerará uma definição de filtro que é aplicada como um pré-filtro durante a Atlas Search vetorial. Caso contrário, a query do usuário sozinha é usada para recuperar resultados usando o Atlas Search vetorial. Finalmente, o contexto recuperado e a pergunta do usuário, juntamente com quaisquer outros prompts, são passados para um LLM para gerar uma resposta.
Usaremos 10-K arquivamentos de 30 empresas Fortune 500 — os PDFs brutos estão disponíveis em uma pastadoGoogle Drive. Esses relatórios, exigidos pela Commissão de Valores Mobiliários (SEC) dos EUA, oferecem um detalhamento do desempenho financeiro de uma empresa, incluindo histórico financeiro, declarações, rendimentos por ação e outros pontos de dados cruciais. Para os Investidores, os 10-Ks são ferramentas inestimáveis para tomar decisões informadas. Você pode acessar e baixar facilmente esses relatórios acessando o website de qualquer empresa dos EUA com capital aberto.
Usaremos o Não estruturado para fragmentar, incorporar e extrair metadados de documentos PDF e prepará-los para o RAG. Usaremos o MongoDB como armazenamento de vetores para nosso aplicativo. Por fim, usaremos o LangGraph para orquestrar nosso sistema RAG como um fluxo de trabalho com estado e semelhante a DAG.
A primeira etapa da recuperação por autoconsulta é decidir quais metadados extrair de seus dados brutos. Fazer este exercício antes de inserir seus dados em um banco de dados vetorial garante que todos os metadados necessários sejam capturados nos documentos ingeridos e os campos correspondentes indexados. Não fazer isso com antecedência pode resultar em erros e ineficiências downstream, devido ao LLM não extrair os metadados certos das queries do usuário, usar nomes de campo errados ao gerar os pré-filtros ou fazer query de campos não indexados.
Uma boa maneira de determinar quais metadados extrair é trabalhando de trás para frente a partir de sua melhor suposição dos tipos de queries que os usuários podem fazer ao seu sistema. Para o nosso assistente de financiamento, por exemplo, esperamos que os usuários façam perguntas sobre empresas e anos específicos. Portanto, começaremos com estes metadados. MongoDBO flexível do document model significa que você sempre pode atualizar documentos existentes com novos campos de metadados com base em observações de interações de usuários reais.
Vamos precisar das seguintes bibliotecas para este tutorial:
- langgraph: Pacote Python para criar aplicativos com estado e vários atores com LLMs
- openai: Pacote Python para interagir com APIs OpenAI
- PyMongo: Pacote Python para interagir com bancos de dados e coleções MongoDB
- phrase-transformers: Pacote Python para modelos de linguagem de código aberto
- unestruturado-ingest: Pacote Python para processamento de dados usando Não estruturado
1 !pip install -qU langgraph openai pymongo sentence-transformers "unstructured-ingest[pdf, s3, mongodb, embed-huggingface]"
Neste tutorial, usaremos a API Serverless do Unestruturado para preparar nosso conjunto de dados para RAG. A API Serverless é uma API de nível de produção para processar 25+ diferentes tipos de arquivos não estruturados, incluindo PDFs. Para usar a API, primeiro inscreva-se para uma avaliação gratuita 14dias. Depois de se inscrever, você terá acesso a um painel personalizado no qual encontrará sua chave de API e o URL do ponto de extremidade da API. Defina estes em seu código:
1 UNSTRUCTURED_API_KEY = "your-api-key" 2 UNSTRUCTURED_URL = "your-api-url"
Usaremos o MongoDB Atlas como o armazenamento de vetores para nosso aplicativo RAG. Mas, primeiro, você precisará de uma conta do MongoDB Atlas com um cluster de banco de dados. Depois de fazer isso, você precisará obter a connection string para se conectar ao cluster. Siga estas etapas para configurar:
- Obtenha a stringde conexão para seu cluster de banco de dados.
Depois de obter a connection string, defina-a em seu código. Além disso, defina o nome do banco de dados (
MONGODB_DB_NAME
) e a coleção (MONGODB_COLLECTION
) na qual você deseja ingestão de dados. Por fim, instancie o cliente MongoDB para se conectar ao seu banco de dados:1 # Your MongoDB connection string (uri), and collection/database names 2 MONGODB_URI = "your-mongodb-uri" 3 MONGODB_DB_NAME = "your-db-name" 4 MONGODB_COLLECTION = "your-collection-name" 5 # Instantiate the MongoDB client 6 mongodb_client = MongoClient( 7 MONGODB_URI, appname="devrel.content.selfquery_mongodb_unstructured" 8 )
O não estruturado suporta o carregamento de documentos de mais de 20 fontes diferentes, incluindo Amazon Web Services S3, Azure Storage e Google Cloud Platform Storage. Para este tutorial, baixamos os PDFs brutos em um bucket S3 , mas você também pode baixá-los em um diretório local e usá-los como fonte de dados.
Se você estiver usando um bucket S3 como fonte de dados, também precisará definir o URI do S3 e suas credenciais de autenticação do Amazon Web Services no código. Se estiver usando um diretório local como fonte de dados, pule esta etapa:
1 # Your AWS authentication credentials 2 AWS_KEY = "your-aws-access-key-id" 3 AWS_SECRET = "your-aws-secret-access-key" 4 # S3 URI for the Access Point to the bucket with PDF files 5 AWS_S3_NAME = ""
Vamos também especificar o modelo de incorporação e o LLM a serem usados:
1 # Embedding model to use 2 EMBEDDING_MODEL_NAME = "BAAI/bge-base-en-v1.5" 3 # Completion model to use 4 COMPLETION_MODEL_NAME = "gpt-4o-2024-08-06"
Usaremos o bge-base-en-v1 de código aberto daBAAI .5 modelo de incorporação de Abraçando a face. O modelo produz incorporações com 768 dimensões. Quanto ao LLM, usaremos a versão mais recente do GPT-4o durante o tutorial.
Agora que definimos todos os nossos pré-requisitos, vamos definir um pipeline de processamento de PDF usando a biblioteca de ingestãonão estruturada .
1 from unstructured_ingest.v2.pipeline.pipeline import Pipeline 2 from unstructured_ingest.v2.interfaces import ProcessorConfig 3 from unstructured_ingest.v2.processes.partitioner import PartitionerConfig 4 from unstructured_ingest.v2.processes.chunker import ChunkerConfig 5 from unstructured_ingest.v2.processes.embedder import EmbedderConfig 6 from unstructured_ingest.v2.processes.connectors.fsspec.s3 import ( 7 S3ConnectionConfig, 8 S3DownloaderConfig, 9 S3IndexerConfig, 10 S3AccessConfig, 11 ) 12 from unstructured_ingest.v2.processes.connectors.local import LocalUploaderConfig 13 14 WORK_DIR = "/content/temp" 15 16 Pipeline.from_configs( 17 context=ProcessorConfig( 18 verbose=True, tqdm=True, num_processes=5, work_dir=WORK_DIR 19 ), 20 indexer_config=S3IndexerConfig(remote_url=AWS_S3_NAME), 21 downloader_config=S3DownloaderConfig(), 22 source_connection_config=S3ConnectionConfig( 23 access_config=S3AccessConfig(key=AWS_KEY, secret=AWS_SECRET) 24 ), 25 partitioner_config=PartitionerConfig( 26 partition_by_api=True, 27 api_key=UNSTRUCTURED_API_KEY, 28 partition_endpoint=UNSTRUCTURED_URL, 29 strategy="hi_res", 30 additional_partition_args={ 31 "split_pdf_page": True, 32 "split_pdf_allow_failed": True, 33 "split_pdf_concurrency_level": 15, 34 }, 35 ), 36 chunker_config=ChunkerConfig( 37 chunking_strategy="by_title", 38 chunk_max_characters=1500, 39 chunk_overlap=150, 40 ), 41 embedder_config=EmbedderConfig( 42 embedding_provider="langchain-huggingface", 43 embedding_model_name=EMBEDDING_MODEL_NAME, 44 ), 45 uploader_config=LocalUploaderConfig(output_dir="/content/ingest-outputs"), 46 ).run()
O pipeline acima é construído a partir de várias configurações que definem diferentes aspectos de seu comportamento:
- ProcessorConfig: define os parâmetros gerais do comportamento do pipeline, como registro, localização do cache (este é importante neste exemplo), reprocessamento etc.
- S3IndexerConfig, S3DownloaderConfige S3ConnectionConfig: estas são as configurações do conector de origem Amazon Web Services S3.
- PartitionerConfig: divide os documentos em JSON padronizado contendo elementos de documentos e metadados. A estratégia
hi_res
significa que a API não estruturada empregará OCR, modelos de entendimento de documentos e métodos clássicos de ML para extrair e classificar elementos de documentos. Essa estratégia é recomendada para PDFs complexos que contêm mais do que apenas texto, como tabelas e imagens. Saiba mais sobre estratégias de particionamento. - ChunkerConfig: depois que todos os documentos forem particionados, a próxima etapa é dividi-los em chunks. Os parâmetros nesta configuração controlam o comportamento de chunking. Usamos a estratégia de agrupamento
by_title
para manter as seções do documento juntas até que o tamanho do bloco (chunk_max_characters
) seja atingido. Especificamos umchunk_overlap
para manter a continuação nos limites do bloco. - EmbedderConfig: a etapa de processamento final é incorporar os chunks. Usaremos o modelo
bge-base-en-v1.5
por meio da integraçãolangchain-huggingface
. - LocalUploaderConfig: esta configuração nos permite armazenar os resultados finais localmente no diretório especificado.
Quando o pipeline terminar de executar, teremos os resultados finais em
*.json
arquivos no /content/ingest-outputs
diretório local, bem como resultados em cache para cada uma das etapas do pipeline no WORK_DIR
diretório.Se estiver usando um diretório local como fonte de dados, descomente as importações e configurações do conector local no código e comente as importações e configurações do conector S3 .
Nesta etapa, queremos adicionar o nome da empresa e o ano fiscal como metadados personalizados a cada chunk, para habilitar a pré-filtragem durante o Atlas Search vetorial. Felizmente, os documentos do Formulário10K têm uma página mais ou menos padrão com essas informações, para que possamos usar expressões regulares simples (regex) para extrair os metadados necessários deles.
Vamos primeiro definir uma função para extrair o ano:
1 def get_fiscal_year(elements: dict) -> int: 2 # Regular expression pattern to find the element containing the fiscal year 3 pattern = r"for the (fiscal\s+)?year ended.*?(\d{4})" 4 year = 0 5 for i in range(len(elements)): 6 match = re.search(pattern, elements[i]["text"], re.IGNORECASE) 7 if match: 8 year = match.group(0)[-4:] 9 try: 10 year = int(year) 11 except: 12 year = 0 13 return year
Os documentos Formulário-10K são arquivados para um ano fiscal específico, e há uma frase padrão em cada documento que indica em qual ano o documento foi arquivado, por exemplo, "For the fiscal year ended December 31, 2023. " A função acima usa os elementos de um documento como entrada, usa um regex para encontrar a linha que contém a data de registro e extrai o ano dessa linha. Retornamos o ano como
int
para poder aproveitar os operadores de comparação mais tarde.Vamos definir outra função para extrair o nome da empresa:
1 def get_company_name(elements: dict) -> str: 2 name = "" 3 # In most cases the name of the company is right before/above the following line 4 substring = "(Exact name of registrant as specified" 5 for i in range(len(elements)): 6 if substring.lower() in elements[i]["text"].lower(): 7 pattern = ( 8 r"([A-Z][A-Za-z\s&.,]+?)\s*\(Exact name of registrant as specified" 9 ) 10 match = re.search(pattern, elements[i]["text"], re.IGNORECASE) 11 if match: 12 name = match.group(1).strip() 13 name = name.split("\n\n")[-1] 14 15 if name == "": 16 for i in range(len(elements)): 17 # In some cases, the name of the company is right after/below the following line 18 match = re.search( 19 r"Exact name of registrant as specified in its charter:\n\n(.*?)\n\n", 20 elements[i]["text"], 21 ) 22 if match: 23 name = match.group(1) 24 else: 25 # In other cases, the name follows the "Commission File Number [Number]" line 26 match = re.search( 27 r"Commission File Number.*\n\n(.*?)\n\n", elements[i]["text"] 28 ) 29 if match: 30 name = match.group(1) 31 return name
A extração do nome da empresa é semelhante à forma como extraímos o ano fiscal, exceto pelo fato de que há um pouco mais de variação de onde exatamente o nome da empresa é mencionado no documento, portanto, empregamos várias expressões regex. Na maioria dos casos, você encontrará o nome da empresa logo após a linha, "Exact name of registrant as specified. " No entanto, em alguns casos, o nome da empresa precede esta linha ou segue o "Commission File Number [Number] ".
Agora, vamos percorrer o diretório com os resultados da incorporação e, para cada documento, encontrar o nome e o ano da empresa e adicioná-los como metadados personalizados a todos os elementos do documento:
1 directory = f"{WORK_DIR}/embed" 2 3 for filename in os.listdir(directory): 4 if filename.endswith(".json"): 5 file_path = os.path.join(directory, filename) 6 print(f"Processing file {filename}") 7 try: 8 with open(file_path, "r") as file: 9 data = json.load(file) 10 11 company_name = get_company_name(data) 12 fiscal_year = get_fiscal_year(data) 13 14 # Add custom metadata fields to each entry 15 for entry in data: 16 entry["metadata"]["custom_metadata"] = {} 17 entry["metadata"]["custom_metadata"]["company"] = company_name 18 entry["metadata"]["custom_metadata"]["year"] = fiscal_year 19 20 with open(file_path, "w") as file: 21 json.dump(data, file, indent=2) 22 23 print(f"Successfully updated {file_path} with custom metadata fields.") 24 except json.JSONDecodeError as e: 25 print(f"Error parsing JSON in {file_path}: {e}") 26 except IOError as e: 27 print(f"Error reading from or writing to {file_path}: {e}")
O código acima lê
*.json
arquivos do diretório de trabalho que contém os resultados da etapa de incorporação do pipeline. Cada arquivo é o resultado do processamento de um único documento, então você tem um arquivo JSON para cada documento PDF original. Para cada arquivo, estamos usando as funções definidas anteriormente para extrair a data fiscal e o nome da empresa e adicionar esses valores como campos de metadados personalizados a todos os elementos do documento. Em seguida, escrevemos os resultados de volta no arquivo original.Para gravar os documentos processados finais no MongoDB, precisaremos executar novamente o pipeline que definimos na etapa 4, exceto pelo fato de que agora alteraremos o destino de local para MongoDB.
1 from unstructured_ingest.v2.processes.connectors.mongodb import ( 2 MongoDBConnectionConfig, 3 MongoDBUploadStagerConfig, 4 MongoDBUploaderConfig, 5 MongoDBAccessConfig, 6 ) 7 8 Pipeline.from_configs( 9 context=ProcessorConfig( 10 verbose=True, tqdm=True, num_processes=5, work_dir=WORK_DIR 11 ), 12 indexer_config=S3IndexerConfig(remote_url=AWS_S3_NAME), 13 downloader_config=S3DownloaderConfig(), 14 source_connection_config=S3ConnectionConfig( 15 access_config=S3AccessConfig(key=AWS_KEY, secret=AWS_SECRET) 16 ), 17 partitioner_config=PartitionerConfig( 18 partition_by_api=True, 19 api_key=UNSTRUCTURED_API_KEY, 20 partition_endpoint=UNSTRUCTURED_URL, 21 strategy="hi_res", 22 additional_partition_args={ 23 "split_pdf_page": True, 24 "split_pdf_allow_failed": True, 25 "split_pdf_concurrency_level": 15, 26 }, 27 ), 28 chunker_config=ChunkerConfig( 29 chunking_strategy="by_title", 30 chunk_max_characters=1500, 31 chunk_overlap=150, 32 ), 33 embedder_config=EmbedderConfig( 34 embedding_provider="langchain-huggingface", 35 embedding_model_name=EMBEDDING_MODEL_NAME, 36 ), 37 destination_connection_config=MongoDBConnectionConfig( 38 access_config=MongoDBAccessConfig(uri=MONGODB_URI), 39 collection=MONGODB_COLLECTION, 40 database=MONGODB_DB_NAME, 41 ), 42 stager_config=MongoDBUploadStagerConfig(), 43 uploader_config=MongoDBUploaderConfig(batch_size=100), 44 ).run()
O pipeline acima é o mesmo que tivemos na etapa 4, a única diferença é o local de destino dos resultados do processamento de dados, que agora é a coleção MongoDB especificada na etapa 2. O destino MongoDB é definido usando três configurações:
MongoDBConnectionConfig
, MongoDBUploadStagerConfig
e MongoDBUploaderConfig
. Essas configurações estabelecem a conexão com nosso banco de dados MongoDB Atlas, especificam onde os dados precisam ser carregados e anote quaisquer parâmetros de carregamento adicionais, como batch_size
, para ingestão em massa dos dados no MongoDB.Um exemplo de documento ingerido no MongoDB é o seguinte:
Observe os campos de metadados personalizados
metadata.custom_metadata.company
e metadata.custom_metadata.year
contendo o nome da empresa e o ano em que o documento foi publicado.Como mencionado anteriormente, usaremos o LangGraph para orquestrar nosso sistema RAG. O LangGraph permite construir sistemas LLM como gráficos. Os nós do grafo são funções ou ferramentas para executar tarefas específicas, enquanto as bordas definem rotas entre os nós — elas podem ser fixas, condicionais ou até cíclicas. Cada grafo tem um estado que é uma estrutura de dados compartilhada que todos os nós podem acessar e fazer atualizações. Você pode definir atributos personalizados dentro do estado, dependendo de quais parâmetros deseja acompanhar nos nós do gráfico.
Go em frente e definir o estado do nosso gráfico:
1 class GraphState(TypedDict): 2 """ 3 Represents the state of the graph. 4 5 Attributes: 6 question: User query 7 metadata: Extracted metadata 8 filter: Filter definition 9 documents: List of retrieved documents from vector search 10 memory: Conversational history 11 """ 12 13 question: str 14 metadata: Dict 15 filter: Dict 16 context: List[str] 17 memory: Annotated[list, add_messages]
O código acima:
- Cria uma classe
GraphState
que representa o estado do gráfico para nosso assistente. - Define o esquema de estado usando o modelo
TypedDict
para criar dicionários seguros por tipo que podem ajudar a detectar erros no tempo de compilação em vez de no tempo de execução. - Anota a chave
memory
no estado com a funçãoadd_messages
redução, informando ao LangGraph para anexar novas mensagens à lista existente em vez de substituí-la.
Em seguida, vamos definir os nós do nosso grafo. Os nós contêm a lógica principal do sistema. Elas são essencialmente funções do Python e podem ou não usar um LLM. Cada nó recebe o grafo
state
como entrada, executa alguns cálculos e retorna um estado atualizado.Nosso assistente possui quatro funcionalidades principais:
- Extraia metadados de uma query de linguagem natural.
- Gere uma definição de filtro de API de query MongoDB.
- Recupere documentos do MongoDB usando o semântica Atlas Search.
- Gere uma resposta para a pergunta do usuário.
Vamos definir nós para cada uma das funcionalidades acima.
Para o nosso assistente, extrair metadados em um formato específico é crucial para evitar erros e resultados incorretos downstream. A OpenAI lança recentemente Saídas estruturadas, um recurso que garante que seus modelos sempre gerem respostas aderentes a um esquema especificado. Vamos tentar!
Vamos criar um modelo Pydentic que defina o esquema de metadados:
1 class Metadata(BaseModel): 2 """Metadata to use for pre-filtering.""" 3 4 company: List[str] = Field(description="List of company names") 5 year: List[str] = Field(description="List containing start year and end year")
No código acima, criamos um modelo Pydentic chamado
Metadata
com company
e year
como atributos. Cada atributo é do tipo List[str]
, indicando uma lista de strings.Agora, vamos definir uma função para extrair metadados da pergunta do usuário:
1 def extract_metadata(state: Dict) -> Dict: 2 """ 3 Extract metadata from natural language query. 4 5 Args: 6 state (Dict): The current graph state 7 8 Returns: 9 Dict: New key added to state i.e. metadata containing the metadata extracted from the user query. 10 """ 11 print("---EXTRACTING METADATA---") 12 question = state["question"] 13 system = f"""Extract the specified metadata from the user question: 14 - company: List of company names, eg: Google, Adobe etc. Match the names to companies on this list: {companies} 15 - year: List of [start year, end year]. Guidelines for extracting dates: 16 - If a single date is found, only include that. 17 - For phrases like 'in the past X years/last year', extract the start year by subtracting X from the current year. The current year is {datetime.now().year}. 18 - If more than two dates are found, only include the smallest and the largest year.""" 19 completion = openai_client.beta.chat.completions.parse( 20 model=COMPLETION_MODEL_NAME, 21 messages=[ 22 {"role": "system", "content": system}, 23 {"role": "user", "content": question}, 24 ], 25 response_format=Metadata, 26 ) 27 result = completion.choices[0].message.parsed 28 # If no metadata is extracted return an empty dictionary 29 if len(result.company) == 0 and len(result.year) == 0: 30 return {"metadata": {}} 31 metadata = { 32 "metadata.custom_metadata.company": result.company, 33 "metadata.custom_metadata.year": result.year, 34 } 35 return {"metadata": metadata}
O código acima:
- Obtém a pergunta do usuário a partir do estado do gráfico (
state
). - Define um prompt do sistema instruindo o LLM a extrair nomes de empresas e intervalos de datas da pergunta. Solicitamos especificamente ao LLM que corresponda os nomes das empresas à lista de empresas em nosso conjunto de dados. Caso contrário, o pré-filtro não corresponderá a nenhum registro em nossa base de conhecimento.
- Passa o modelo
Metadata
Pydentic como o argumentoresponse_format
para o métodoparse()
da API de Compleções de bate-papo do OpenAI para obter a resposta do LLM de volta como um objetoMetadata
. - Define o atributo
metadata
do gráficostate
como{}
, se nenhum metadado for extraído pelo LLM. - Define o atributo
metadata
a um dicionário que consiste em chaves correspondentes a campos de metadados em nossos documentos MongoDB — ou seja,metadata.custom_metadata.company
emetadata.custom_metadata.year
— e valores obtidos a partir da resposta do LLM, se forem encontrados metadados.
Em seguida, vamos definir uma função para gerar a definição de filtro:
1 def generate_filter(state: Dict) -> Dict: 2 """ 3 Generate MongoDB Query API filter definition. 4 5 Args: 6 state (Dict): The current graph state 7 8 Returns: 9 Dict: New key added to state i.e. filter. 10 """ 11 print("---GENERATING FILTER DEFINITION---") 12 metadata = state["metadata"] 13 system = """Generate a MongoDB filter definition from the provided fields. Follow the guidelines below: 14 - Respond in JSON with the filter assigned to a `filter` key. 15 - The field `metadata.custom_metadata.company` is a list of companies. 16 - The field `metadata.custom_metadata.year` is a list of one or more years. 17 - If any of the provided fields are empty lists, DO NOT include them in the filter. 18 - If both the metadata fields are empty lists, return an empty dictionary {{}}. 19 - The filter should only contain the fields `metadata.custom_metadata.company` and `metadata.custom_metadata.year` 20 - The filter can only contain the following MongoDB Query API match expressions: 21 - $gt: Greater than 22 - $lt: Lesser than 23 - $gte: Greater than or equal to 24 - $lte: Less than or equal to 25 - $eq: Equal to 26 - $ne: Not equal to 27 - $in: Specified field value equals any value in the specified array 28 - $nin: Specified field value is not present in the specified array 29 - $nor: Logical NOR operation 30 - $and: Logical AND operation 31 - $or: Logical OR operation 32 - If the `metadata.custom_metadata.year` field has multiple dates, create a date range filter using expressions such as $gt, $lt, $lte and $gte 33 - If the `metadata.custom_metadata.company` field contains a single company, use the $eq expression 34 - If the `metadata.custom_metadata.company` field contains multiple companies, use the $in expression 35 - To combine date range and company filters, use the $and operator 36 """ 37 completion = openai_client.chat.completions.create( 38 model=COMPLETION_MODEL_NAME, 39 temperature=0, 40 messages=[ 41 {"role": "system", "content": system}, 42 {"role": "user", "content": f"Fields: {metadata}"}, 43 ], 44 response_format={"type": "json_object"}, 45 ) 46 result = json.loads(completion.choices[0].message.content) 47 return {"filter": result.get("filter", {})}
O código acima:
- Extrai metadados do estado do gráfico.
- Define um prompt do sistema contendo instruções sobre como criar a definição de filtro — expressões de correspondência e operadores suportados, campos a serem usados etc.
- Obtém a saída do LLM especificado como um objeto JSON passando
json_object
como argumentoresponse_format
para a chamada da API de Conclusões de bate-papo, bem como especificando a palavraJSON
no prompt do sistema. - Define o atributo
filter
do estado do gráfico para a definição de filtro gerada.
A próxima etapa é definir um nó de gráfico que recupera documentos do MongoDB usando o Atlas Search semântica. Mas, primeiro, você precisará criar um índice vetorial do Atlas Search:
1 collection = mongodb_client[MONGODB_DB_NAME][MONGODB_COLLECTION] 2 VECTOR_SEARCH_INDEX_NAME = "vector_index" 3 4 model = { 5 "name": VECTOR_SEARCH_INDEX_NAME, 6 "type": "vectorSearch", 7 "definition": { 8 "fields": [ 9 { 10 "type": "vector", 11 "path": "embeddings", 12 "numDimensions": 768, 13 "similarity": "cosine", 14 }, 15 {"type": "filter", "path": "metadata.custom_metadata.company"}, 16 {"type": "filter", "path": "metadata.custom_metadata.year"}, 17 ] 18 }, 19 } 20 collection.create_search_index(model=model)
O código acima:
- Especifica a collection contra a qual executar a Atlas Search semântica.
- Especifica o nome do índice vetorial do Atlas Search.
- Especifica a definição de índice do vetor Atlas Search que contém o caminho para o campo de incorporações nos documentos (
path
), o número de dimensões de incorporação (numDimensions
), a métrica de similaridade para encontrar os vizinhos mais próximos (similarity
) e o caminho para os campos de filtro a serem indexados. - Cria o índice vetorial do Atlas Search.
Agora, vamos definir uma função que executa o Atlas Search vetorial:
1 def vector_search(state: Dict) -> Dict: 2 """ 3 Get relevant information using MongoDB Atlas Vector Search 4 5 Args: 6 state (Dict): The current graph state 7 8 Returns: 9 Dict: New key added to state i.e. documents. 10 """ 11 print("---PERFORMING VECTOR SEARCH---") 12 question = state["question"] 13 filter = state["filter"] 14 # We always want a valid filter object 15 if not filter: 16 filter = {} 17 query_embedding = embedding_model.encode(question).tolist() 18 pipeline = [ 19 { 20 "$vectorSearch": { 21 "index": VECTOR_SEARCH_INDEX_NAME, 22 "path": "embeddings", 23 "queryVector": query_embedding, 24 "numCandidates": 150, 25 "limit": 5, 26 "filter": filter, 27 } 28 }, 29 { 30 "$project": { 31 "_id": 0, 32 "text": 1, 33 "score": {"$meta": "vectorSearchScore"}, 34 } 35 }, 36 ] 37 # Execute the aggregation pipeline 38 results = collection.aggregate(pipeline) 39 # Drop documents with cosine similarity score < 0.8 40 relevant_results = [doc["text"] for doc in results if doc["score"] >= 0.8] 41 context = "\n\n".join([doc for doc in relevant_results]) 42 return {"context": context}
O código acima:
- Lê a pergunta do usuário e a definição de filtro a partir do estado do gráfico.
- Define como um objeto vazio, se nenhum filtro for encontrado. Lembre-se de que pularemos diretamente para o Atlas Search vetorial se nenhum metadado for extraído.
- Incorpora a query do usuário utilizando o modelo
bge-base-en-v1.5
. - Cria e executa um pipeline de agregação para realizar a Pesquisa semântica do Atlas. O estágio
$vectorSearch
recupera documentos usando o Atlas Search do vizinho mais próximo — o camponumCandidates
indica quantos vizinhos mais próximos devem ser considerados, o campolimit
indica quantos documentos devem ser devolvidos e o campofilter
especifica os pré-filtros. O estágio$project
que segue gera somente os campos especificados — ou seja,text
e a pontuação de similaridade do Atlas Search a partir dos documentos retornados. - Filtra documentos com uma pontuação de similaridade de < 0.8 para garantir que somente documentos altamente relevantes sejam retornados.
- Concatena os valores de campo
text
da lista final de documentos em uma string e a define como o atributocontext
do estado do gráfico.
Vamos definir o nó final em nosso grafo para gerar respostas às perguntas.
1 def generate_answer(state: Dict) -> Dict: 2 """ 3 Generate the final answer to the user query 4 5 Args: 6 state (Dict): The current graph state 7 8 Returns: 9 Dict: New key added to state i.e. generation. 10 """ 11 print("---GENERATING THE ANSWER---") 12 question = state["question"] 13 context = state["context"] 14 memory = state["memory"] 15 system = f"Answer the question based only on the following context. If the context is empty or if it doesn't provide enough information to answer the question, say I DON'T KNOW" 16 completion = openai_client.chat.completions.create( 17 model=COMPLETION_MODEL_NAME, 18 temperature=0, 19 messages=[ 20 {"role": "system", "content": system}, 21 { 22 "role": "user", 23 "content": f"Context:\n{context}\n\n{memory}\n\nQuestion:{question}", 24 }, 25 ], 26 ) 27 answer = completion.choices[0].message.content 28 return {"memory": [HumanMessage(content=context), AIMessage(content=answer)]}
O código acima:
- Lê a pergunta do usuário, o contexto recuperado do Atlas Search vetorial e qualquer histórico de conversa a partir do estado do gráfico.
- Define um prompt do sistema para instruir o LLM a responder apenas com base no contexto fornecido. Este é um passo simples para mitigar atordoamentos.
- Passa o prompt do sistema, o contexto recuperado, o histórico de bate-papo e a pergunta do usuário para a API de conclusões de bate-papo do OpenAI para obter uma resposta do LLM especificado.
- Atualiza o atributo de memória do estado do gráfico adicionando o contexto recuperado e a resposta LLM à lista existente de mensagens.
Como mencionado anteriormente, as bordas em um gráfico LangGraph definem interações entre nós. Nosso gráfico principalmente tem bordas fixas entre os nós, exceto uma borda condicional para pular a geração de filtro e Go diretamente para o Atlas Search vetorial com base na extração ou não dos metadados da query do usuário.
Vamos definir a lógica para esta borda condicional:
1 def check_metadata_extracted(state: Dict) -> str: 2 """ 3 Check if any metadata is extracted. 4 5 Args: 6 state (Dict): The current graph state 7 8 Returns: 9 str: Binary decision for next node to call 10 """ 11 12 print("---CHECK FOR METADATA---") 13 metadata = state["metadata"] 14 # If no metadata is extracted, skip the generate filter step 15 if not metadata: 16 print("---DECISION: SKIP TO VECTOR SEARCH---") 17 return "vector_search" 18 # If metadata is extracted, generate filter definition 19 else: 20 print("---DECISION: GENERATE FILTER---") 21 return "generate_filter"
No código acima, extraímos o atributo de metadados do estado do gráfico. Se nenhum metadado for encontrado, ela retornará a string “vector_search.” Caso contrário, ela retornará a string “generate_filter.” Veremos como vincular as saídas desta função de roteamento aos nós na próxima etapa.
Finalmente, vamos construir o grafo conectando os nós usando arestas.
1 from langgraph.graph import END, StateGraph, START 2 from langgraph.checkpoint.memory import MemorySaver 3 from IPython.display import Image, display 4 5 workflow = StateGraph(GraphState) 6 # Adding memory to the graph 7 memory = MemorySaver() 8 9 # Add nodes 10 workflow.add_node("extract_metadata", extract_metadata) 11 workflow.add_node("generate_filter", generate_filter) 12 workflow.add_node("vector_search", vector_search) 13 workflow.add_node("generate_answer", generate_answer) 14 15 # Add edges 16 workflow.add_edge(START, "extract_metadata") 17 workflow.add_conditional_edges( 18 "extract_metadata", 19 check_metadata_extracted, 20 { 21 "vector_search": "vector_search", 22 "generate_filter": "generate_filter", 23 }, 24 ) 25 workflow.add_edge("generate_filter", "vector_search") 26 workflow.add_edge("vector_search", "generate_answer") 27 workflow.add_edge("generate_answer", END) 28 29 # Compile the graph 30 app = workflow.compile(checkpointer=memory)
O código acima:
- Instancia o gráfico usando a classe
StateGraph
e o parametriza com o estado de gráfico definido na etapa 7. - Inicializa um checkpointer na memória utilizando a classe
MemorySaver()
. Os checkpoints persistem no estado do gráfico entre as interações no LangGraph. É assim que nosso sistema RAG persiste o histórico de bate-papo. - Adiciona todos os nós definidos na Etapa 8 no gráfico utilizando o método
add_node
. O primeiro argumento paraadd_node
é o nome do nó e o segundo argumento é a função Python correspondente ao nó. - Adiciona bordas fixas usando o método
add_edge
. Observe as bordas entreSTART
eextract_metadata
egenerate_answer
eEND
.START
eEND
são nós especiais para indicar qual nó chamar primeiro e quais nós não têm ações seguintes. - Adiciona bordas condicionais do nó
extract_metadata
aos nósgenerate_filter
evector_search
com base nas saídas da função de roteamento definida na etapa 9. - Compila o gráfico com o ponto de verificação da memória.
Podemos visualizar nosso gráfico como um diagrama de sereia. Isso ajuda a verificar os nós e as bordas, especialmente para fluxos de trabalho complicados.
1 try: 2 display(Image(app.get_graph().draw_mermaid_png())) 3 except Exception: 4 # This requires some extra dependencies and is optional 5 pass
No código acima, usamos o método
get_graph()
para obter o gráfico compilado e o métododraw_mermaid_png
para gerar um diagrama Mermaid do gráfico como uma imagem PNG.Veja como nosso gráfico se parece:
Como última etapa, vamos definir uma função que recebe a entrada do usuário e transmite as saídas da execução do gráfico de volta para o usuário.
1 def execute_graph(thread_id: str, question: str) -> None: 2 """ 3 Execute the graph and stream its output 4 5 Args: 6 thread_id (str): Conversation thread ID 7 question (str): User question 8 """ 9 # Add question to the question and memory attributes of the graph state 10 inputs = {"question": question, "memory": [HumanMessage(content=question)]} 11 config = {"configurable": {"thread_id": thread_id}} 12 # Stream outputs as they come 13 for output in app.stream(inputs, config): 14 for key, value in output.items(): 15 print(f"Node {key}:") 16 print(value) 17 print("---FINAL ANSWER---") 18 print(value["memory"][-1].content)
O código acima:
- Aceita a pergunta do usuário (
question
) e o ID do tópico (thread_id
) como entrada. - Cria um dicionário
inputs
contendo as atualizações iniciais no estado do gráfico. Em nosso caso, queremos adicionar a pergunta do usuário aos atributosquestion
ememory
do estado do gráfico. - Cria uma configuração de tempo de execução contendo o ID do thread de entrada. IDs de thread são IDs únicos atribuídos a checkpoints de memória no LangGraph. Cada checkpoint tem um estado separado.
- Transmite as saídas do gráfico, especificamente os nomes dos nós e as saídas.
- Imprime a resposta final do sistema, extraída do valor de retorno do último nó no gráfico — ou seja, o nó
generate_answer
.
Um exemplo de execução do sistema tem a seguinte aparência:
1 execute_graph("1", "Sales summary for Walmart for 2023.") 2 3 ---EXTRACTING METADATA--- 4 ---CHECK FOR METADATA--- 5 ---DECISION: GENERATE FILTER--- 6 'Node extract_metadata:' 7 {'metadata': {'metadata.custom_metadata.company': ['WALMART INC.'], 'metadata.custom_metadata.year': ['2023']}} 8 ---GENERATING FILTER DEFINITION--- 9 'Node generate_filter:' 10 {'filter': {'$and': [{'metadata.custom_metadata.company': {'$eq': 'WALMART INC.'}}, {'metadata.custom_metadata.year': {'$eq': 2023}}]}} 11 ---PERFORMING VECTOR SEARCH--- 12 'Node vector_search:' 13 {'context': 'DOCUMENTS INCORPORATED BY REFERENCE...'} 14 ---GENERATING THE ANSWER--- 15 'Node generate_answer:' 16 {'memory': [HumanMessage(content='DOCUMENTS INCORPORATED BY REFERENCE...'), AIMessage(content='In fiscal 2023, Walmart generated total revenues of $611.3 billion, primarily comprised of net sales amounting to $605.9 billion. Walmart International contributed $101.0 billion to the fiscal 2023 consolidated net sales, representing 17% of the total.')]} 17 ---FINAL ANSWER--- 18 In fiscal 2023, Walmart generated total revenues of $611.3 billion, primarily comprised of net sales amounting to $605.9 billion. Walmart International contributed $101.0 billion to the fiscal 2023 consolidated net sales, representing 17% of the total.
Façamos uma pergunta de acompanhamento para garantir que o gráfico persista no histórico de bate-papo:
1 execute_graph("1", "What did I just ask you?") 2 3 ---EXTRACTING METADATA--- 4 ---CHECK FOR METADATA--- 5 ---DECISION: SKIP TO VECTOR SEARCH--- 6 'Node extract_metadata:' 7 {'metadata': {}} 8 ---PERFORMING VECTOR SEARCH--- 9 'Node vector_search:' 10 {'context': ''} 11 ---GENERATING THE ANSWER--- 12 'Node generate_answer:' 13 {'memory': [HumanMessage(content=''), AIMessage(content='You asked for the sales summary for Walmart for 2023.')]} 14 ---FINAL ANSWER--- 15 You asked for the sales summary for Walmart for 2023.
O sistema conseguiu se lembrar do que perguntamos anteriormente.
Aqui Go você ! Construímos com sucesso um sistema RAG com recuperação e memória autoconsultas.
Neste tutorial, aprenderam como construir um sistema RAG avançado que aproveita a recuperação auto-consulta. Essas técnicas são úteis em cenários em que as queries do usuário podem conter palavras-chave importantes que as incorporações podem não capturar. Criamos um assistente de capital que extrai nomes de empresas e intervalos de datas de consultas de usuários, usa-os para gerar definições de filtro do MongoDB e os aplica como pré-filtros durante o Atlas Search vetorial para obter as informações mais precisas para que um LLM responda a perguntas sobre o cenário financeiro desempenho de empresas específicas durante determinados períodos de tempo.
Para obter conteúdo mais avançado, como a criação de sistemas de agentes, confira os seguintes tutoriais em nosso Centro do Desenvolvedor:
Principais comentários nos fóruns
J ack_WoehrJ ack W oehr3 meses atrás
joao_schaab joaoschaab3 meses atrás
Há um exemplo usando a versão JavaScript das estruturas?
oooh ... preciso fazer essa ... brb