Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Join us at AWS re:Invent 2024! Learn how to use MongoDB for AI use cases.
Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
Conectoreschevron-right

Ative seu MongoDB e BigQuery usando procedimentos armazenados do BigQuery Spark

ZW
Venkatesh Shanbhag, Zi Wang, Maruti C5 min read • Published Aug 12, 2024 • Updated Aug 12, 2024
SparkPythonConectores
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Para capacitar as empresas que se esforçam para transformar seus dados em insights, o BigQuery surgiu como uma solução de armazenamento de dados avançada e escalável baseada na nuvem oferecida pelo Google Cloud Platform (GCP). Sua abordagem baseada na nuvem permite o gerenciamento e a manipulação eficientes de dados, fazendo do BigQuery um divisor de palavras para empresas que buscam insights avançados de dados. Em especial, um dos recursos de destaque do BigQuery é sua integração perfeita com o processamento de dados baseado em Spark, que permite que os usuários aprimorem ainda mais suas queries. Agora, aproveitando as APIs do BigQuery, os usuários podem criar e executar procedimentos armazenados do Spark, que são módulos de código reutilizáveis que podem encapsular lógica comercial complexa e transformações de dados. Essa funcionalidade ajuda engenheiros de dados, Cientistas de Dados e analistas de Dados a aproveitar os recursos avançados do BigQuery e os robustos recursos de processamento de dados do Spark.
O MongoDB, uma plataforma de dados para desenvolvedores, é uma escolha popular para armazenar e gerenciar dados operacionais por sua escalabilidade, desempenho, esquema flexível e recursos em tempo real (change streams e aggregation). Ao combinar os recursos do BigQuery com a versatilidade do Apache Spark e a flexibilidade do MongoDB, você pode desbloquear um poderoso pipeline de processamento de dados.
O Apache Spark é uma poderosa estrutura de computação distribuída de código aberto que se destaca pelo processamento de grandes quantidades de dados de forma rápida e eficiente. Ele suporta uma ampla variedade de formatos de dados, incluindo dados estruturados, semiestruturados e não estruturados, sendo a opção ideal para integrar dados de várias fontes, como o MongoDB.
Os procedimentos armazenados do BigQuery Spark são rotinas executadas dentro do ambiente do BigQuery. Esses procedimentos podem executar várias tarefas, como manipulação de dados, cálculos complexos e até mesmo integração de dados externos. Elas fornecem uma maneira de modularizar e reutilizar código, facilitando a manutenção e a otimização dos fluxos de trabalho de processamento de dados. Os procedimentos armazenados do Spark usam o mecanismoSpark sem servidor, que habilita o Spark sem servidor e com dimensionamento automático. No entanto, você não precisa habilitar as APIs do Dataproc nem ser cobrado pelo Dataproc ao aproveitar esse novo recurso.
Vamos explorar como estender o processamento de dados do BigQuery para o Apache Spark e integrar o MongoDB ao BigQuery para facilitar de forma eficaz a movimentação de dados entre as duas plataformas.

Conectando-os

Movimentação de dados entre MongoDB e BigQuery usando o procedimento armazenado do Spark em execução no Spark sem servidor
Este tutorial orienta você na criação de um procedimento PySpark usando o editor do BigQuery.
Antes de começarmos com a configuração do procedimento armazenado do Spark, você precisa fazer oupload do arquivoJAR do Spark Connector para o GCP para se conectar e ler a partir do MongoDB Atlas. Copie e salve o URI gsutil para o arquivo JAR que será utilizado nas próximas etapas.
Copie o caminho para o URI gsutil
  1. Como pré-requisito para concluir o tutorial, você precisa configurar um cluster MongoDB Atlas com dados de amostra carregados nele.
  2. Navegue até a página do BigQuery no console do Google Cloud.
  3. Crie um conjunto de dados do BigQuery com o nome spark_run.
  4. Você digitará o código PySpark diretamente no editor de queries. Para criar um procedimento armazenado no PySpark, clique em Criar procedimento no PySparke selecione Criar procedimento no PySpark.
Criando procedimento PySPark a partir do explorador BigQuery
  1. Para definir opções, clique em Mais > Opções do PySparke faça o seguinte:
    1. Especifique o local onde você deseja executar o código PySpark.
    2. No campoConexão, clique em Criar novaconexão e insira os valores abaixo para cada campo.
      1. Tipo de conexão > Apache Spark
      2. ID de conexão > Nomeie-o como mongodb-to-bigquery por causa deste tutorial.
      3. Deixe a outra opção vazia e clique em Criar.
    3. Na seção Invocação de procedimento armazenado, selecione Definir um conjunto de dados para invocação de procedimento armazenado para spark_run.
    4. Clique nas opções avançadas. Copie o nome de URI gsutil copiado no início da configuração e cole os arquivos JAR no pré-requisito. Deixe as outras opções vazias. Pressione Enter e clique em Salvar.
  2. Abra uma nova guia e Go o BigQuery. Navegue até Conexões externas > Encontre a conexão mongodb-to-bigquery > Copie o ID da conta de serviço. Conceda acesso de administrador de armazenamento do BigQuery, acessador de segredo do Secret Manager e administrador de objeto de armazenamento a esta conta de serviço do IAM.
Conexões externas criadas usando procedimentos armazenados do Spark
  1. (Opcional) Adicione seu nome de usuário e senha no Google Cloud Secret Manager, ou você pode codificá-lo na própria string URI MongoDB.
  2. Copie o script Python abaixo no editor de procedimento PySpark e clique em EXECUTAR. O trecho leva cerca de dois a três minutos para ser concluído. O script abaixo criará uma nova tabela no conjunto de dados spark_run com o nome sample_mflix_comments.
1from pyspark.sql import SparkSession
2from google.cloud import secretmanager
3
4def access_secret_version(secret_id, project_id):
5 client = secretmanager.SecretManagerServiceClient()
6 name = f"projects/{project_id}/secrets/{secret_id}/versions/1"
7 response = client.access_secret_version(request={"name": name})
8 payload = response.payload.data.decode("UTF-8")
9 return payload
10# Update project_number, username_secret_id and password_secret_id, comment them out if you did not create the secrets earlier
11
12project_id = "<Your project number, 12 digit number>"
13username_secret_id = "<Your username secret id>"
14password_secret_id = "<Your password secret id>"
15
16username = access_secret_version(username_secret_id, project_id)
17password = access_secret_version(password_secret_id, project_id)
18
19 # Update the mongodb_uri directly if with your username and password if you did not create a secret from Step 7, update the hostname with your hostname
20mongodb_uri = "mongodb+srv://"+username+":"+password+"@<hostname>/sample_mflix.comments"
21
22my_spark = SparkSession \
23 .builder \
24 .appName("myApp") \
25 .config("spark.mongodb.read.connection.uri", mongodb_uri) \
26 .config("spark.mongodb.write.connection.uri", mongodb_uri) \
27 .getOrCreate()
28
29
30dataFrame = my_spark.read.format("mongodb").option("database", "sample_mflix").option("collection", "comments").load()
31
32dataFrame.show()
33
34# Saving the data to BigQuery
35dataFrame.write.format("bigquery") \
36 .option("writeMethod", "direct") \
37 .save("spark_run.sample_mflix_comments")
Procedimento armazenado no Google BigQuery Explorer
  1. Navegue até o conjunto de dados spark_run para validar se os dados são carregados do MongoDB Atlas para o BigQuery na tabela com o nome sample_mflix_comments.
  2. Agora que os dados estão no BigQuery, aproveite o BQML para executar alguma IA generativa nos novos dados do MongoDB no BigQuery.
    1. Crie uma conexão com o nome gentext-conn, usando o console ou a linha de comando bq com o tipo de conexão CLOUD_RESOURCE.
1!bq mk \
2 --connection \
3 --location=US \
4 --project_id=<GCP Project id> \
5 --connection_type=CLOUD_RESOURCE gentext-conn
11. Para conceder permissões de IAM para acessar a Vertex AI a partir do BigQuery, navegue até Conexões externas > Encontrar a conexão gettext-conn > Copie o ID da conta de serviço. Conceda ao usuário do Vertex AI acesso a esta conta de serviço do IAM. 12. Crie um modelo utilizando o comando CREATE Model .
1CREATE OR REPLACE MODEL `gcp-pov.spark_run.llm_model`
2REMOTE WITH CONNECTION `us.gentext-conn`
3OPTIONS (ENDPOINT = 'gemini-pro');
13. Execute o comando SQL na tabela BigQuery. Essa query permite que o usuário extraia o nome do host do e-mail, aproveitando o modelo Gemini Pro.Asaídaresultante inclui os atributos de resposta e segurança.
1SELECT prompt,ml_generate_text_result
2FROM
3ML.GENERATE_TEXT( MODEL `gcp-pov.spark_run.llm_model`,
4 (
5 SELECT CONCAT('Extract the host name from the email: ', email) AS prompt,
6 * FROM `gcp-pov.spark_run.sample_mflix_comments`
7 LIMIT 5),
8 STRUCT(
9 0.9 AS temperature,
10 100 AS max_output_tokens
11 )
12 );
14. Aqui está a saída de amostra que mostra o prompt e a resposta. O parâmetro prompt fornece o texto para o modelo analisar. O design de solicitação pode afetar fortemente as respostas retornadas pelo LLM.
Saída após a execução do procedimento armazenado do Spark

Benefícios da integração

  1. Essa integração permite que você use MongoDB para OLTP e BigQuery para OLAP, fornecendo uma solução completa de gerenciamento de dados.
  2. Depois que os dados são transformados e copiados para o BigQuery, o BigQuery ML permite criar e executar modelos de machine learning (ML) usando queries do GoogleSQL.
  3. O BigQuery ML também permite acessar LLMs e APIs de AI em nuvem para executar tarefas de inteligência artificial (AI), como geração de texto e tradução automática.

Conclusão

Ao combinar o poder do BigQuery, dos procedimentos armazenados do Spark e do MongoDB, você pode criar uma pipeline de processamento de dados robusta e escalável que aproveita os pontos fortes de cada tecnologia. O BigQuery fornece um data warehouse confiável e escalável para armazenar e analisar dados estruturados, enquanto o Spark permite processar e transformar dados de várias fontes, incluindo dados semiestruturados e não estruturados do MongoDB. Os procedimentos armazenados do Spark permitem encapsular e reutilizar essa lógica, facilitando a manutenção e a otimização de seus fluxos de trabalho de processamento de dados.

Leitura adicional

  1. Comece a usar o MongoDB Atlas no Google Cloud.
  2. Trabalhe com procedimentos armazenados para Apache Spark.
  3. Criar modelos de machine learning no BigQuery ML.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.
Iniciar a conversa

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Como começar a usar o MongoDB Atlas Stream Processing e o provedor HashiCorp Terraform MongoDB Atlas


May 20, 2024 | 5 min read
Artigo

Saiba como aproveitar os dados do MongoDB dentro do Kafka com novos tutoriais!


Sep 17, 2024 | 1 min read
Tutorial

Vá para o MongoDB usando conectores Kafka - Guia final do agente


Sep 17, 2024 | 7 min read
Artigo

Medindo o desempenho do MongoDB Kafka Connector


May 09, 2022 | 6 min read
Sumário
  • Conectando-os