Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/

Introdução ao Atlas Stream Processing

Nesta página

  • Pré-requisitos
  • Procedimento
  • No Atlas, Go para a página Stream Processing do seu projeto.
  • Crie uma Instância de Atlas Stream Processing .
  • Obtenha a Atlas Stream Processing connection da instância do string.
  • Adicione uma conexão MongoDB Atlas ao registro de conexão.
  • Verifique se sua fonte de dados de streaming emite mensagens.
  • Crie um processador de fluxo persistente.
  • Inicie o processador de fluxo.
  • Verifique a saída do processador de fluxo.
  • Solte o processador de fluxo.
  • Próximos passos

Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.

Para concluir este tutorial você precisa:

  • Um projeto do Atlas

  • mongosh versão 2.0 ou superior

  • Um Atlas user com a função Project Owner ou Project Stream Processing Owner para gerenciar uma Instância de processamento de fluxo e um Registro de conexão

    Observação

    A função Project Owner permite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.

    A função Project Stream Processing Owner permite ações do Atlas Stream Processing, como visualizar, criar, excluir e editar instâncias de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de conexões.

    Consulte Funções de projeto para saber mais sobre as diferenças entre as duas funções.

  • Um utilizador de banco de dados com a função atlasAdmin para criar e executar processadores de stream

  • Um cluster do Atlas

1
  1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

  2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

  3. Na barra lateral, clique em Stream Processing sob o título Services.

    A página Processamento de fluxo é exibida.

2
  1. Clique em Get Started no canto inferior direito. O Atlas fornece uma breve explicação dos principais componentes do Atlas Stream Processing.

  2. Clique no botão Create instance.

  3. Na página Create a stream processing instance , configure sua instância da seguinte maneira:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Clique em Create.

3
  1. Localize o painel de visão geral da sua instância de Atlas Stream Processing e clique em Connect.

  2. Selecione I have the MongoDB shell installed.

  3. No menu suspenso Select your mongo shell version , selecione a versão mais recente do mongosh.

  4. Copie a connection string fornecida em Run your connection string in your command line. Você precisará disso em uma etapa posterior.

  5. Clique em Close.

4

Essa conexão serve como nosso coletor de dados de streaming.

  1. No painel da instância do Atlas Stream Processing , clique em Configure.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Clique Atlas Database. No campo Connection Name , insira mongodb1. Na lista suspensa Atlas Cluster , selecione um Atlas cluster sem quaisquer dados armazenados nele.

  4. Clique em Add connection.

5

Sua instância de processamento de fluxo vem pré-configurada com uma conexão com uma fonte de dados de amostra chamada sample_stream_solar. Essa fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto temporal específico, bem como a potência máxima desse dispositivo.

O documento a seguir é um exemplo representativo.

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
},
_ts: ISODate('2024-08-12T21:41:01.788Z'),
_stream_meta: {
source: {
type: 'generated'
}
}
}

Para verificar se essa fonte emite mensagens, crie um processador de fluxo de forma interativa.

  1. Abra um aplicativo de terminal de sua escolha.

  2. Conecte-se à sua instância do Atlas Stream Processing com mongosh.

    Cole a mongosh connection string que você copiou em uma etapa anterior em seu terminal, onde <atlas-stream-processing-url> é a URL da sua Atlas Stream Processing instância de e <username> é um usuário com o papel atlasAdmin .

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    Digite sua senha quando solicitado.

  3. Crie o processador de stream.

    Copie o seguinte código no prompt mongosh :

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verifique se os dados da conexão sample_stream_solar são exibidos no console e encerre o processo.

    Os processadores de stream que você cria com sp.process() não persistem depois que você os encerra.

6

Usando um pipeline de agregação, você pode transformar cada documento à medida que ele é ingerido. O seguinte pipeline de agregação deriva a temperatura máxima e as potências média, mediana, máxima e mínima de cada dispositivo solar em intervalos de um segundo.

  1. Configure um estágio $source.

    O seguinte estágio $source ingere dados da fonte sample_stream_solar.

    let s = {
    $source: {
    connectionName: "sample_stream_solar"
    }
    }
  2. Configure um estágio $group.

    O estágio $group a seguir organiza todos os dados recebidos de acordo com seus group_id, acumula os valores dos campos obs.temp e obs.watts de todos os documentos para cada group_id e, em seguida, obtém os dados desejados.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $avg: "$obs.temp"
    },
    avg_watts: {
    $min: "$obs.watts"
    },
    median_watts: {
    $min: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  3. Configure um estágio $tumblingWindow.

    Para realizar acumulações como $group em dados de streaming, o Atlas Stream Processing usa janelas para vincular o conjunto de dados. O estágio $tumblingWindow a seguir separa o fluxo em intervalos consecutivos de 10 segundos.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para median_watts, ele utiliza os valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  4. Configurar um estágio $merge .

    $merge permite que você grave seus dados de streaming processados em um banco de dados Atlas.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  5. Crie o processador de stream.

    Atribua um nome ao seu novo processador de fluxo e declare seu pipeline de agregação listando cada estágio em ordem. O estágio $group pertence ao pipeline aninhado do $tumblingWindowe você não deve incluí-lo na definição do pipeline de processador.

    sp.createStreamProcessor("solarDemo", [s, t, m])

Isso cria um processador de fluxo chamado solarDemo que aplica a query definida anteriormente e grava os dados processados na coleção solarColl do banco de dados solarDb no cluster ao qual você se conectou. Ele retorna várias medições derivadas de intervalos de 10 segundos de observações de seus dispositivos solares.

Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados em repouso, consulte $merge.

7

Execute o seguinte comando em mongosh:

sp.solarDemo.start()
8

Para verificar se o processador está ativo, execute o seguinte comando em mongosh:

sp.solarDemo.stats()

Este comando relata estatísticas operacionais do processador de fluxo do solarDemo .

Para verificar se o processador de fluxo está gravando dados no cluster do Atlas:

  1. No Atlas, acesse a página Clusters do seu projeto.

    1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.

    3. Se ainda não estiver exibido, clique em Clusters na barra lateral.

      A página Clusters é exibida.

  2. Clique no botão Browse Collections para o seu cluster.

    O Data Explorer é exibido.

  3. Veja a coleção MySolar.

Como alternativa, você pode exibir uma amostra de documentos processados no terminal usando mongosh:

sp.solarDemo.sample()
{
_id: 10,
max_watts: 136,
min_watts: 130,
avg_watts: 133,
median_watts: 130,
max_temp: 7,
_stream_meta: {
source: {
type: 'generated'
},
window: {
start: ISODate('2024-08-12T22:49:05.000Z'),
end: ISODate('2024-08-12T22:49:10.000Z')
}
}
}

Observação

O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

9

Execute o seguinte comando em mongosh:

sp.solarDemo.drop()

Para confirmar que você descartou avgWatts, liste todos os seus processadores de stream disponíveis:

sp.listStreamProcessors()

Saiba como:

Voltar

Visão geral