Criar um pipeline de dados para o fluxo de alterações do MongoDB usando a assinatura Pub/Sub do BigQuery
Venkatesh Shanbhag, Stanimira Vlaeva5 min read • Published Oct 28, 2022 • Updated Apr 02, 2024
Avalie esse Tutorial
No dia 1de outubro de 2022, o MongoDB e o Google anunciaram um conjunto de modelos de fluxo de dados de código aberto para mover dados entre o MongoDB e o BigQuery e para executar análises no BigQuery usando BQML e trazer de volta as inferências para o MongoDB. Três modelos foram introduzidos como parte dessa versão, incluindo o modelo CDC (change data capture) do MongoDB para o BigQuery.
Este modelo exige que os usuários executem o fluxo de alterações no MongoDB, que monitorará inserções e atualizações na coleção. Essas alterações serão registradas e enviadas para um tópico do Pub/Sub. O modelo de CDC criará um trabalho para ler os dados do tópico e obter as alterações, aplicar a transformação e gravar as alterações no BigQuery. As transformações variarão de acordo com a entrada do usuário durante a execução do trabalho do Dataflow.
Outra opção é usar um recurso nativo do Pub/Sub para configurar um pipeline de dados entre o MongoDB cluster e o BigQuery. A assinatura do BigQuery do Pub/Sub grava mensagens em uma tabela do BigQuery existente à medida que são recebidas. Sem o tipo de assinatura do BigQuery, você precisa de uma assinatura pull ou push e de um assinante (como o Dataflow) que leia as mensagens e as grave no BigQuery.
Este artigo explica como configurar a assinatura do BigQuery para processar dados lidos de um fluxo de alterações do MongoDB. Como pré-requisito, você precisará de um MongoDB Atlas cluster.
Para configurar um cluster gratuito, você pode se cadastrar no MongoDB no Google Cloud Marketplace ou na página de registro. Siga as etapas na documentação do MongoDB para definir o usuário do banco de dados e as configurações de rede do cluster.
No Google Cloud, criaremos um tópico do Pub/Sub, um conjunto de dados do BigQuery e uma tabela antes de criar a assinatura do BigQuery.
Em seguida, adicione uma nova tabela ao seu conjunto de dados. Defina-o com um nome de sua escolha e o seguinte esquema:
Nome do campo | Tipo |
---|---|
id | STRING |
source_data | STRING |
Timestamp | STRING |
Em seguida, vamos configurar um esquema e um tópico do Pub/Sub para ingestão das mensagens do nosso fluxo de alterações do MongoDB. Então, criaremos uma assinatura para gravar as mensagens recebidas na tabela do BigQuery que acabamos de criar.
Para esta seção, usaremos a API do Google Cloud Pub/Sub. Antes de continuar, verifique se você habilitou a API para seu projeto.
Forneça um identificador apropriado, como "mdb-to-bq-schema,", ao seu esquema. Em seguida, selecione "Avro " para o tipo. Por fim, adicione a seguinte definição para corresponder aos campos da sua tabela do BigQuery:
1 { 2 "type" : "record", 3 "name" : "Avro", 4 "fields" : [ 5 { 6 "name" : "id", 7 "type" : "string" 8 }, 9 { 10 "name" : "source_data", 11 "type" : "string" 12 }, 13 { 14 "name" : "Timestamp", 15 "type" : "string" 16 } 17 ] 18 }
Na barra lateral, navegue até "Topics " e clique em "Create a topic" (Criar um tópico).
Dê um identificador ao seu tópico, como "MongoDBCDC.. Habilite o campo "Use a schema" (Usar um esquema) e selecione o esquema que você acabou de criar. Deixe o restante dos parâmetros como padrão e clique em Create Topic (Criar tópico).
Dentro do tópico, clique em Create new subscription (Criar nova assinatura). Configure sua assinatura da seguinte maneira:
- Forneça um ID de assinatura - por exemplo, "mdb-cdc. "
- Defina o tipo de entrega como Write to BigQuery.
- Selecione seu conjunto de dados do BigQuery no menu suspenso.
- Forneça o nome da tabela criada no conjunto de dados do BigQuery.
- Habilite o campo Use topic schema (Usar esquema de tópico).
Você precisa ter uma função
bigquery.dataEditor
na sua conta de serviço para criar uma assinatura do Pub/Sub BigQuery. Para conceder acesso usando a ferramenta de linha de comandobq
, execute o seguinte comando:1 bq add-iam-policy-binding \ 2 --member="serviceAccount:service<project-number>@gcp-sa-pubsub.iam.gserviceaccount.com" \ 3 --role=roles/bigquery.dataEditor \ 4 -t "<dataset>.<table>"
Mantenha os outros campos como padrão e clique em Create subscription (Criar assinatura).
Por fim, vamos configurar um fluxo de alterações que escute novos documento inseridos em nosso MongoDB cluster.
Usaremos Node.js, mas você pode adaptar o código para uma linguagem de programação de sua preferência. Confira a documentação do Google Cloud para ver mais exemplos do Pub/Sub usando diversas linguagens. O código-fonte deste exemplo está disponível no repositório GitHub especializado.
Primeiro, configure um novo projeto Node.js e instale as seguintes dependências.
1 npm install mongodb @google-cloud/pubsub avro-js
Em seguida, adicione um esquema Avro, correspondente ao que criamos no Google Cloud Pub/Sub:
. /document-message.avsc
1 { 2 "type": "record", 3 "name": "DocumentMessage", 4 "fields": [ 5 { 6 "name": "id", 7 "type": "string" 8 }, 9 { 10 "name": "source_data", 11 "type": "string" 12 }, 13 { 14 "name": "Timestamp", 15 "type": "string" 16 } 17 ] 18 }
Em seguida, crie um novo módulo JavaScript —
index.mjs
. Comece importando as bibliotecas necessárias e configurando a string do MongoDB e o nome do tópico do Pub/Sub. Se ainda não tiver um MongoDB cluster, você poderá criar um gratuitamente no MongoDB Atlas../index.mjs
1 import { MongoClient } from 'mongodb'; 2 import { PubSub } from '@google-cloud/pubsub'; 3 import avro from 'avro-js'; 4 import fs from 'fs'; 5 6 const MONGODB_URI = '<mongodb-connection-string>'; 7 const PUB_SUB_TOPIC = 'projects/<project-name>/topics/<topic-name>';
Depois disso, podemos nos conectar à nossa instância do MongoDB e configurar um ouvinte de evento de fluxo de alterações. Usando um pipeline de agregação, observaremos apenas eventos “insert” na coleção especificada. Também definiremos um tempo limite de 60segundos antes de fechar o fluxo de alterações.
./index.mjs
1 let mongodbClient; 2 try { 3 mongodbClient = new MongoClient(MONGODB_URI); 4 await monitorCollectionForInserts(mongodbClient, 'my-database', 'my-collection'); 5 } finally { 6 mongodbClient.close(); 7 } 8 9 async function monitorCollectionForInserts(client, databaseName, collectionName, timeInMs) { 10 const collection = client.db(databaseName).collection(collectionName); 11 // An aggregation pipeline that matches on new documents in the collection. 12 const pipeline = [ { $match: { operationType: 'insert' } } ]; 13 const changeStream = collection.watch(pipeline); 14 15 changeStream.on('change', event => { 16 const document = event.fullDocument; 17 publishDocumentAsMessage(document, PUB_SUB_TOPIC); 18 }); 19 20 await closeChangeStream(timeInMs, changeStream); 21 } 22 23 function closeChangeStream(timeInMs = 60000, changeStream) { 24 return new Promise((resolve) => { 25 setTimeout(() => { 26 console.log('Closing the change stream'); 27 changeStream.close(); 28 resolve(); 29 }, timeInMs) 30 }) 31 };
Finalmente, definiremos a função
publishDocumentAsMessage()
que irá:- Transforme cada documento do MongoDB recebido por meio do fluxo de alterações.
- Converta-o para o buffer de dados seguindo o esquema Avro.
- Publique-o no tópico Pub/Sub no Google Cloud.
1 async function publishDocumentAsMessage(document, topicName) { 2 const pubSubClient = new PubSub(); 3 const topic = pubSubClient.topic(topicName); 4 5 const definition = fs.readFileSync('./document-message.avsc').toString(); 6 const type = avro.parse(definition); 7 8 const message = { 9 id: document?._id?.toString(), 10 source_data: JSON.stringify(document), 11 Timestamp: new Date().toISOString(), 12 }; 13 14 const dataBuffer = Buffer.from(type.toString(message)); 15 try { 16 const messageId = await topic.publishMessage({ data: dataBuffer }); 17 console.log(`Avro record ${messageId} published.`); 18 } catch(error) { 19 console.error(error); 20 } 21 }
Execute o arquivo para iniciar o ouvinte do fluxo de alterações:
1 node ./index.mjs
Insira um novo documento na sua coleção do MongoDB para vê-lo passar pelo pipeline de dados e aparecer em sua tabela do BigQuery!
Existem várias maneiras de carregar os dados do fluxo de alterações do MongoDB para o BigQuery e mostramos como usar a assinatura do BigQuery no Pub/Sub. Os fluxos de alterações do MongoDB são monitorados, registrados e, posteriormente, gravados em um tópico do Pub/Sub usando bibliotecas Java.
Os dados então são gravados no BigQuery usando a assinatura do BigQuery. O tipo de dados da tabela do BigQuery é definido usando o esquema Pub/Sub. Assim, os dados do fluxo de alterações podem ser registrados e gravados no BigQuery usando a funcionalidade de assinatura do BigQuery do Pub/Sub.
- Configure seu primeiro MongoDB cluster usando o Google Marketplace.
- Como publicar uma mensagem em um tópico com esquema.