Explore o novo chatbot do Developer Center! O MongoDB AI chatbot pode ser acessado na parte superior da sua navegação para responder a todas as suas perguntas sobre o MongoDB .

Desenvolvedor do MongoDB
Central de desenvolvedor do MongoDBchevron-right
Produtoschevron-right
MongoDBchevron-right

Criar um pipeline de dados para o fluxo de alterações do MongoDB usando a assinatura Pub/Sub do BigQuery

Venkatesh Shanbhag, Stanimira Vlaeva5 min read • Published Oct 28, 2022 • Updated Apr 02, 2024
Google cloudNode.jsIAMongoDBFluxos de alteraçõesJavaScript
Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
No dia 1de outubro de 2022, o MongoDB e o Google anunciaram um conjunto de modelos de fluxo de dados de código aberto para mover dados entre o MongoDB e o BigQuery e para executar análises no BigQuery usando BQML e trazer de volta as inferências para o MongoDB. Três modelos foram introduzidos como parte dessa versão, incluindo o modelo CDC (change data capture) do MongoDB para o BigQuery.
Fluxo de alterações do MongoDB
Este modelo exige que os usuários executem o fluxo de alterações no MongoDB, que monitorará inserções e atualizações na coleção. Essas alterações serão registradas e enviadas para um tópico do Pub/Sub. O modelo de CDC criará um trabalho para ler os dados do tópico e obter as alterações, aplicar a transformação e gravar as alterações no BigQuery. As transformações variarão de acordo com a entrada do usuário durante a execução do trabalho do Dataflow.
Outra opção é usar um recurso nativo do Pub/Sub para configurar um pipeline de dados entre o MongoDB cluster e o BigQuery. A assinatura do BigQuery do Pub/Sub grava mensagens em uma tabela do BigQuery existente à medida que são recebidas. Sem o tipo de assinatura do BigQuery, você precisa de uma assinatura pull ou push e de um assinante (como o Dataflow) que leia as mensagens e as grave no BigQuery.
Este artigo explica como configurar a assinatura do BigQuery para processar dados lidos de um fluxo de alterações do MongoDB. Como pré-requisito, você precisará de um MongoDB Atlas cluster.
Para configurar um cluster gratuito, você pode se cadastrar no MongoDB no Google Cloud Marketplace ou na página de registro. Siga as etapas na documentação do MongoDB para definir o usuário do banco de dados e as configurações de rede do cluster.
No Google Cloud, criaremos um tópico do Pub/Sub, um conjunto de dados do BigQuery e uma tabela antes de criar a assinatura do BigQuery.

Criar um conjunto de dados do BigQuery

Começaremos criando um novo conjunto de dados para o BigQuery no console do Google Cloud.
Em seguida, adicione uma nova tabela ao seu conjunto de dados. Defina-o com um nome de sua escolha e o seguinte esquema:
Nome do campoTipo
idSTRING
source_dataSTRING
TimestampSTRING
Criar um novo conjunto de dados e tabela no BigQuery

Configurar o Google Cloud Pub/Sub

Em seguida, vamos configurar um esquema e um tópico do Pub/Sub para ingestão das mensagens do nosso fluxo de alterações do MongoDB. Então, criaremos uma assinatura para gravar as mensagens recebidas na tabela do BigQuery que acabamos de criar.
Para esta seção, usaremos a API do Google Cloud Pub/Sub. Antes de continuar, verifique se você habilitou a API para seu projeto.

Definir um esquema do Pub/Sub

Na IU do Cloud Pub/Sub, navegue até Create Schema (Criar esquema).
Forneça um identificador apropriado, como "mdb-to-bq-schema,", ao seu esquema. Em seguida, selecione "Avro " para o tipo. Por fim, adicione a seguinte definição para corresponder aos campos da sua tabela do BigQuery:
1{
2 "type" : "record",
3 "name" : "Avro",
4 "fields" : [
5 {
6 "name" : "id",
7 "type" : "string"
8 },
9 {
10 "name" : "source_data",
11 "type" : "string"
12 },
13 {
14 "name" : "Timestamp",
15 "type" : "string"
16 }
17 ]
18}
Criar um esquema do Cloud Pub/Sub

Criar um tópico do Pub/Sub

Na barra lateral, navegue até "Topics " e clique em "Create a topic" (Criar um tópico).
Dê um identificador ao seu tópico, como "MongoDBCDC.. Habilite o campo "Use a schema" (Usar um esquema) e selecione o esquema que você acabou de criar. Deixe o restante dos parâmetros como padrão e clique em Create Topic (Criar tópico).
Criar um tópico do Cloud Pub/Sub

Inscrever-se no tópico e gravar no BigQuery

Dentro do tópico, clique em Create new subscription (Criar nova assinatura). Configure sua assinatura da seguinte maneira:
  • Forneça um ID de assinatura - por exemplo, "mdb-cdc. "
  • Defina o tipo de entrega como Write to BigQuery.
  • Selecione seu conjunto de dados do BigQuery no menu suspenso.
  • Forneça o nome da tabela criada no conjunto de dados do BigQuery.
  • Habilite o campo Use topic schema (Usar esquema de tópico).
Você precisa ter uma função bigquery.dataEditor na sua conta de serviço para criar uma assinatura do Pub/Sub BigQuery. Para conceder acesso usando a ferramenta de linha de comandobq, execute o seguinte comando:
1bq add-iam-policy-binding \
2 --member="serviceAccount:service<project-number>@gcp-sa-pubsub.iam.gserviceaccount.com" \
3 --role=roles/bigquery.dataEditor \
4 -t "<dataset>.<table>"
Mantenha os outros campos como padrão e clique em Create subscription (Criar assinatura).
Criar uma assinatura do Cloud Pub/Sub

Configurar um fluxo de alterações em um MongoDB cluster

Por fim, vamos configurar um fluxo de alterações que escute novos documento inseridos em nosso MongoDB cluster.
Usaremos Node.js, mas você pode adaptar o código para uma linguagem de programação de sua preferência. Confira a documentação do Google Cloud para ver mais exemplos do Pub/Sub usando diversas linguagens. O código-fonte deste exemplo está disponível no repositório GitHub especializado.
Primeiro, configure um novo projeto Node.js e instale as seguintes dependências.
1npm install mongodb @google-cloud/pubsub avro-js
Em seguida, adicione um esquema Avro, correspondente ao que criamos no Google Cloud Pub/Sub:
. /document-message.avsc
1{
2 "type": "record",
3 "name": "DocumentMessage",
4 "fields": [
5 {
6 "name": "id",
7 "type": "string"
8 },
9 {
10 "name": "source_data",
11 "type": "string"
12 },
13 {
14 "name": "Timestamp",
15 "type": "string"
16 }
17 ]
18}
Em seguida, crie um novo módulo JavaScript — index.mjs. Comece importando as bibliotecas necessárias e configurando a string do MongoDB e o nome do tópico do Pub/Sub. Se ainda não tiver um MongoDB cluster, você poderá criar um gratuitamente no MongoDB Atlas.
./index.mjs
1import { MongoClient } from 'mongodb';
2import { PubSub } from '@google-cloud/pubsub';
3import avro from 'avro-js';
4import fs from 'fs';
5
6const MONGODB_URI = '<mongodb-connection-string>';
7const PUB_SUB_TOPIC = 'projects/<project-name>/topics/<topic-name>';
Depois disso, podemos nos conectar à nossa instância do MongoDB e configurar um ouvinte de evento de fluxo de alterações. Usando um pipeline de agregação, observaremos apenas eventos “insert” na coleção especificada. Também definiremos um tempo limite de 60segundos antes de fechar o fluxo de alterações.
./index.mjs
1let mongodbClient;
2try {
3 mongodbClient = new MongoClient(MONGODB_URI);
4 await monitorCollectionForInserts(mongodbClient, 'my-database', 'my-collection');
5} finally {
6 mongodbClient.close();
7}
8
9async function monitorCollectionForInserts(client, databaseName, collectionName, timeInMs) {
10 const collection = client.db(databaseName).collection(collectionName);
11 // An aggregation pipeline that matches on new documents in the collection.
12 const pipeline = [ { $match: { operationType: 'insert' } } ];
13 const changeStream = collection.watch(pipeline);
14
15 changeStream.on('change', event => {
16 const document = event.fullDocument;
17 publishDocumentAsMessage(document, PUB_SUB_TOPIC);
18 });
19
20 await closeChangeStream(timeInMs, changeStream);
21}
22
23function closeChangeStream(timeInMs = 60000, changeStream) {
24 return new Promise((resolve) => {
25 setTimeout(() => {
26 console.log('Closing the change stream');
27 changeStream.close();
28 resolve();
29 }, timeInMs)
30 })
31};
Finalmente, definiremos a função publishDocumentAsMessage() que irá:
  1. Transforme cada documento do MongoDB recebido por meio do fluxo de alterações.
  2. Converta-o para o buffer de dados seguindo o esquema Avro.
  3. Publique-o no tópico Pub/Sub no Google Cloud.
1async function publishDocumentAsMessage(document, topicName) {
2 const pubSubClient = new PubSub();
3 const topic = pubSubClient.topic(topicName);
4
5 const definition = fs.readFileSync('./document-message.avsc').toString();
6 const type = avro.parse(definition);
7
8 const message = {
9 id: document?._id?.toString(),
10 source_data: JSON.stringify(document),
11 Timestamp: new Date().toISOString(),
12 };
13
14 const dataBuffer = Buffer.from(type.toString(message));
15 try {
16 const messageId = await topic.publishMessage({ data: dataBuffer });
17 console.log(`Avro record ${messageId} published.`);
18 } catch(error) {
19 console.error(error);
20 }
21}
Execute o arquivo para iniciar o ouvinte do fluxo de alterações:
1node ./index.mjs
Insira um novo documento na sua coleção do MongoDB para vê-lo passar pelo pipeline de dados e aparecer em sua tabela do BigQuery!

Resumo

Existem várias maneiras de carregar os dados do fluxo de alterações do MongoDB para o BigQuery e mostramos como usar a assinatura do BigQuery no Pub/Sub. Os fluxos de alterações do MongoDB são monitorados, registrados e, posteriormente, gravados em um tópico do Pub/Sub usando bibliotecas Java.
Os dados então são gravados no BigQuery usando a assinatura do BigQuery. O tipo de dados da tabela do BigQuery é definido usando o esquema Pub/Sub. Assim, os dados do fluxo de alterações podem ser registrados e gravados no BigQuery usando a funcionalidade de assinatura do BigQuery do Pub/Sub.

Leitura adicional

  1. Um pipeline de dados para MongoDB Atlas e BigQuery usando Dataflow.
  2. Configure seu primeiro MongoDB cluster usando o Google Marketplace.
  3. Execute análises usando o BigQuery com o BigQuery ML.
  4. Como publicar uma mensagem em um tópico com esquema.

Ícone do FacebookÍcone do Twitterícone do linkedin
Avalie esse Tutorial
star-empty
star-empty
star-empty
star-empty
star-empty
Relacionado
Tutorial

Como implantar um aplicativo Spark com MongoDB no Fly.io


Dec 02, 2024 | 5 min read
Tutorial

Análise de moeda com coleções de séries temporais #2 — Cálculo da média móvel simples e da média móvel exponencial


May 16, 2022 | 7 min read
Tutorial

Use o MongoDB como o armazenamento de dados para seu CMS Strapi Headless


Sep 23, 2022 | 8 min read
Início rápido

Armazene dados confidenciais com a criptografia em nível de campo do lado do cliente do Python & MongoDB


Sep 23, 2022 | 11 min read
Sumário