Ative seu MongoDB e BigQuery usando procedimentos armazenados do BigQuery Spark
ZW
Venkatesh Shanbhag, Zi Wang, Maruti C5 min read • Published Aug 12, 2024 • Updated Aug 12, 2024
Avalie esse Tutorial
Para capacitar as empresas que se esforçam para transformar seus dados em insights, o BigQuery surgiu como uma solução de armazenamento de dados avançada e escalável baseada na nuvem oferecida pelo Google Cloud Platform (GCP). Sua abordagem baseada na nuvem permite o gerenciamento e a manipulação eficientes de dados, fazendo do BigQuery um divisor de palavras para empresas que buscam insights avançados de dados. Em especial, um dos recursos de destaque do BigQuery é sua integração perfeita com o processamento de dados baseado em Spark, que permite que os usuários aprimorem ainda mais suas queries. Agora, aproveitando as APIs do BigQuery, os usuários podem criar e executar procedimentos armazenados do Spark, que são módulos de código reutilizáveis que podem encapsular lógica comercial complexa e transformações de dados. Essa funcionalidade ajuda engenheiros de dados, Cientistas de Dados e analistas de Dados a aproveitar os recursos avançados do BigQuery e os robustos recursos de processamento de dados do Spark.
O MongoDB, uma plataforma de dados para desenvolvedores, é uma escolha popular para armazenar e gerenciar dados operacionais por sua escalabilidade, desempenho, esquema flexível e recursos em tempo real (change streams e aggregation). Ao combinar os recursos do BigQuery com a versatilidade do Apache Spark e a flexibilidade do MongoDB, você pode desbloquear um poderoso pipeline de processamento de dados.
O Apache Spark é uma poderosa estrutura de computação distribuída de código aberto que se destaca pelo processamento de grandes quantidades de dados de forma rápida e eficiente. Ele suporta uma ampla variedade de formatos de dados, incluindo dados estruturados, semiestruturados e não estruturados, sendo a opção ideal para integrar dados de várias fontes, como o MongoDB.
Os procedimentos armazenados do BigQuery Spark são rotinas executadas dentro do ambiente do BigQuery. Esses procedimentos podem executar várias tarefas, como manipulação de dados, cálculos complexos e até mesmo integração de dados externos. Elas fornecem uma maneira de modularizar e reutilizar código, facilitando a manutenção e a otimização dos fluxos de trabalho de processamento de dados. Os procedimentos armazenados do Spark usam o mecanismoSpark sem servidor, que habilita o Spark sem servidor e com dimensionamento automático. No entanto, você não precisa habilitar as APIs do Dataproc nem ser cobrado pelo Dataproc ao aproveitar esse novo recurso.
Vamos explorar como estender o processamento de dados do BigQuery para o Apache Spark e integrar o MongoDB ao BigQuery para facilitar de forma eficaz a movimentação de dados entre as duas plataformas.
Este tutorial orienta você na criação de um procedimento PySpark usando o editor do BigQuery.
Antes de começarmos com a configuração do procedimento armazenado do Spark, você precisa fazer oupload do arquivoJAR do Spark Connector para o GCP para se conectar e ler a partir do MongoDB Atlas. Copie e salve o URI gsutil para o arquivo JAR que será utilizado nas próximas etapas.
- Como pré-requisito para concluir o tutorial, você precisa configurar um cluster MongoDB Atlas com dados de amostra carregados nele.
- Você digitará o código PySpark diretamente no editor de queries. Para criar um procedimento armazenado no PySpark, clique em Criar procedimento no PySparke selecione Criar procedimento no PySpark.
- Para definir opções, clique em Mais > Opções do PySparke faça o seguinte:
- Especifique o local onde você deseja executar o código PySpark.
- No campoConexão, clique em Criar novaconexão e insira os valores abaixo para cada campo.
- Tipo de conexão > Apache Spark
- ID de conexão > Nomeie-o como mongodb-to-bigquery por causa deste tutorial.
- Deixe a outra opção vazia e clique em Criar.
- Na seção Invocação de procedimento armazenado, selecione Definir um conjunto de dados para invocação de procedimento armazenado para spark_run.
- Clique nas opções avançadas. Copie o nome de URI gsutil copiado no início da configuração e cole os arquivos JAR no pré-requisito. Deixe as outras opções vazias. Pressione Enter e clique em Salvar.
- Abra uma nova guia e Go o BigQuery. Navegue até Conexões externas > Encontre a conexão mongodb-to-bigquery > Copie o ID da conta de serviço. Conceda acesso de administrador de armazenamento do BigQuery, acessador de segredo do Secret Manager e administrador de objeto de armazenamento a esta conta de serviço do IAM.
- (Opcional) Adicione seu nome de usuário e senha no Google Cloud Secret Manager, ou você pode codificá-lo na própria string URI MongoDB.
- Copie o script Python abaixo no editor de procedimento PySpark e clique em EXECUTAR. O trecho leva cerca de dois a três minutos para ser concluído. O script abaixo criará uma nova tabela no conjunto de dados spark_run com o nome sample_mflix_comments.
1 from pyspark.sql import SparkSession 2 from google.cloud import secretmanager 3 4 def access_secret_version(secret_id, project_id): 5 client = secretmanager.SecretManagerServiceClient() 6 name = f"projects/{project_id}/secrets/{secret_id}/versions/1" 7 response = client.access_secret_version(request={"name": name}) 8 payload = response.payload.data.decode("UTF-8") 9 return payload 10 # Update project_number, username_secret_id and password_secret_id, comment them out if you did not create the secrets earlier 11 12 project_id = "<Your project number, 12 digit number>" 13 username_secret_id = "<Your username secret id>" 14 password_secret_id = "<Your password secret id>" 15 16 username = access_secret_version(username_secret_id, project_id) 17 password = access_secret_version(password_secret_id, project_id) 18 19 # Update the mongodb_uri directly if with your username and password if you did not create a secret from Step 7, update the hostname with your hostname 20 mongodb_uri = "mongodb+srv://"+username+":"+password+"@<hostname>/sample_mflix.comments" 21 22 my_spark = SparkSession \ 23 .builder \ 24 .appName("myApp") \ 25 .config("spark.mongodb.read.connection.uri", mongodb_uri) \ 26 .config("spark.mongodb.write.connection.uri", mongodb_uri) \ 27 .getOrCreate() 28 29 30 dataFrame = my_spark.read.format("mongodb").option("database", "sample_mflix").option("collection", "comments").load() 31 32 dataFrame.show() 33 34 # Saving the data to BigQuery 35 dataFrame.write.format("bigquery") \ 36 .option("writeMethod", "direct") \ 37 .save("spark_run.sample_mflix_comments")
- Navegue até o conjunto de dados spark_run para validar se os dados são carregados do MongoDB Atlas para o BigQuery na tabela com o nome sample_mflix_comments.
- Agora que os dados estão no BigQuery, aproveite o BQML para executar alguma IA generativa nos novos dados do MongoDB no BigQuery.
- Crie uma conexão com o nome gentext-conn, usando o console ou a linha de comando bq com o tipo de conexão CLOUD_RESOURCE.
1 !bq mk \ 2 --connection \ 3 --location=US \ 4 --project_id=<GCP Project id> \ 5 --connection_type=CLOUD_RESOURCE gentext-conn
11. Para conceder permissões de IAM para acessar a Vertex AI a partir do BigQuery, navegue até Conexões externas > Encontrar a conexão gettext-conn > Copie o ID da conta de serviço. Conceda ao usuário do Vertex AI acesso a esta conta de serviço do IAM. 12. Crie um modelo utilizando o comando CREATE Model .
1 CREATE OR REPLACE MODEL `gcp-pov.spark_run.llm_model` 2 REMOTE WITH CONNECTION `us.gentext-conn` 3 OPTIONS (ENDPOINT = 'gemini-pro');
13. Execute o comando SQL na tabela BigQuery. Essa query permite que o usuário extraia o nome do host do e-mail, aproveitando o modelo Gemini Pro.Asaídaresultante inclui os atributos de resposta e segurança.
1 SELECT prompt,ml_generate_text_result 2 FROM 3 ML.GENERATE_TEXT( MODEL `gcp-pov.spark_run.llm_model`, 4 ( 5 SELECT CONCAT('Extract the host name from the email: ', email) AS prompt, 6 * FROM `gcp-pov.spark_run.sample_mflix_comments` 7 LIMIT 5), 8 STRUCT( 9 0.9 AS temperature, 10 100 AS max_output_tokens 11 ) 12 );
14. Aqui está a saída de amostra que mostra o prompt e a resposta. O parâmetro prompt fornece o texto para o modelo analisar. O design de solicitação pode afetar fortemente as respostas retornadas pelo LLM.
- Essa integração permite que você use MongoDB para OLTP e BigQuery para OLAP, fornecendo uma solução completa de gerenciamento de dados.
- Depois que os dados são transformados e copiados para o BigQuery, o BigQuery ML permite criar e executar modelos de machine learning (ML) usando queries do GoogleSQL.
- O BigQuery ML também permite acessar LLMs e APIs de AI em nuvem para executar tarefas de inteligência artificial (AI), como geração de texto e tradução automática.
Ao combinar o poder do BigQuery, dos procedimentos armazenados do Spark e do MongoDB, você pode criar uma pipeline de processamento de dados robusta e escalável que aproveita os pontos fortes de cada tecnologia. O BigQuery fornece um data warehouse confiável e escalável para armazenar e analisar dados estruturados, enquanto o Spark permite processar e transformar dados de várias fontes, incluindo dados semiestruturados e não estruturados do MongoDB. Os procedimentos armazenados do Spark permitem encapsular e reutilizar essa lógica, facilitando a manutenção e a otimização de seus fluxos de trabalho de processamento de dados.
Principais comentários nos fóruns
Ainda não há comentários sobre este artigo.