Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/

Introdução ao Atlas Stream Processing

Nesta página

  • Pré-requisitos
  • Procedimento
  • No Atlas, Go para a página Stream Processing do seu projeto.
  • Crie uma Instância de Atlas Stream Processing .
  • Obtenha a Atlas Stream Processing connection da instância do string.
  • Adicione uma conexão MongoDB Atlas ao registro de conexão.
  • Verifique se sua fonte de dados de streaming emite mensagens.
  • Crie um processador de fluxo persistente.
  • Inicie o processador de fluxo.
  • Verifique a saída do processador de fluxo.
  • Solte o processador de fluxo.
  • Próximos passos

Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.

Para concluir este tutorial você precisa:

  • Um projeto do Atlas

  • mongosh versão 2.0 ou superior

  • Um usuário do Atlas com o papel Project Owner ou Project Stream Processing Owner para gerenciar uma Instância de Processamento de Fluxo e Registro de Conexão

    Observação

    A função Project Owner permite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.

    A função Project Stream Processing Owner habilita ações do Atlas Stream Processing, como visualizar, criar, excluir e editar instâncias de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de conexões.

    Consulte Roles do projeto para saber mais sobre as diferenças entre as duas roles.

  • Um utilizador de banco de dados com a função atlasAdmin para criar e executar processadores de stream

  • Um cluster do Atlas

1
  1. Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.

  2. Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.

  3. Na barra lateral, clique em Stream Processing sob o título Services .

    A página Processamento de Stream é exibida.

2
  1. Clique em Get Started no canto inferior direito. O Atlas fornece uma breve explicação dos principais componentes do Atlas Stream Processing.

  2. Clique no botão Create instance.

  3. Na página Create a stream processing instance , configure sua instância da seguinte maneira:

    • Tier: SP30

    • Provider: AWS

    • Region: us-east-1

    • Instance Name: tutorialInstance

  4. Clique em Create.

3
  1. Localize o painel de visão geral da sua instância de Atlas Stream Processing e clique em Connect.

  2. Selecione I have the MongoDB shell installed.

  3. No menu suspenso Select your mongo shell version , selecione a versão mais recente do mongosh.

  4. Copie a connection string fornecida em Run your connection string in your command line. Você precisará disso em uma etapa posterior.

  5. Clique em Close.

4

Essa conexão serve como nosso coletor de dados de streaming.

  1. No painel da instância do Atlas Stream Processing , clique em Configure.

  2. Na aba Connection Registry , clique em + Add Connection no canto superior direito.

  3. Clique Atlas Database. No campo Connection Name , insira mongodb1. Na lista suspensa Atlas Cluster , selecione um Atlas cluster sem quaisquer dados armazenados nele.

  4. Clique em Add connection.

5

Sua instância de processamento de fluxo vem pré-configurada com uma conexão com uma fonte de dados de exemplo chamada sample_stream_solar. Esta fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto específico no tempo, bem como a potência máxima desse dispositivo.

O documento a seguir é um exemplo representativo.

{
device_id: 'device_8',
group_id: 7,
timestamp: '2024-08-12T21:41:01.788+00:00',
max_watts: 450,
event_type: 0,
obs: {
watts: 252,
temp: 17
},
_ts: ISODate('2024-08-12T21:41:01.788Z'),
_stream_meta: {
source: {
type: 'generated'
}
}
}

Para verificar se essa fonte emite mensagens, crie um processador de stream interativamente.

  1. Abra um aplicativo de terminal de sua escolha.

  2. Conecte-se à sua instância do Atlas Stream Processing com mongosh.

    Cole a mongosh connection string que você copiou em uma etapa anterior em seu terminal, onde <atlas-stream-processing-url> é a URL da sua Atlas Stream Processing instância de e <username> é um usuário com o papel atlasAdmin .

    mongosh "mongodb://<atlas-stream-processing-url>/"
    --tls --authenticationDatabase admin --username <username>

    Digite sua senha quando solicitado.

  3. Crie o processador de stream.

    Copie o seguinte código no prompt mongosh :

    sp.process([{"$source": {
    "connectionName": "sample_stream_solar"
    }}])

    Verifique se os dados da conexão sample_stream_solar são exibidos no console e encerre o processo.

    Os processadores de stream que você cria com sp.process() não persistem depois que você os encerra.

6

Usando uma aggregation pipeline, você pode transformar cada documento à medida que ele é ingerido. O pipeline de agregação a seguir deriva a temperatura máxima e as potências média, mediana, máxima e mínima de cada dispositivo solar em intervalos de um segundo.

  1. Configure um estágio $source .

    O estágio $source a seguir ingere dados da fonte sample_stream_solar e define o valor do campo de tempo de processamento do Atlas Stream igual ao valor do campo timestamp da fonte.

    let s = {
    $source: {
    connectionName: "sample_stream_solar",
    timeField: {
    $dateFromString: {
    dateString: '$timestamp'
    }
    }
    }
    }
  2. Configure um estágio $group .

    O estágio $group a seguir organiza todos os dados recebidos de acordo com seu group_id, acumula os valores dos campos obs.temp e obs.watts de todos os documentos para cada group_id e deriva os dados desejados.

    let g = {
    $group: {
    _id: "$group_id",
    max_temp: {
    $avg: "$obs.temp"
    },
    avg_watts: {
    $min: "$obs.watts"
    },
    median_watts: {
    $min: "$obs.watts"
    },
    max_watts: {
    $max: "$obs.watts"
    },
    min_watts: {
    $min: "$obs.watts"
    }
    }
    }
  3. Configure um estágio $tumblingWindow .

    Para realizar acumulações como $group em dados de streaming, o Atlas Stream Processing usa janelas para limitar o conjunto de dados. O estágio $tumblingWindow a seguir separa o stream em intervalos consecutivos 10segundos.

    Isso significa, por exemplo, que quando o estágio $group calcula um valor para median_watts, ele assume os valores obs.watts para todos os documentos com um determinado group_id ingerido nos 10 segundos anteriores.

    let t = {
    $tumblingWindow: {
    interval: {
    size: NumberInt(10),
    unit: "second"
    },
    pipeline: [g]
    }
    }
  4. Configure um estágio $merge .

    $merge permite que você grave seus dados de streaming processados em um banco de dados Atlas.

    let m = {
    $merge: {
    into: {
    connectionName: "mongodb1",
    db: "solarDb",
    coll: "solarColl"
    }
    }
    }
  5. Crie o processador de stream.

    Atribua um nome ao seu novo processador de stream e declare seu pipeline de agregação listando cada estágio em ordem. O estágio $group pertence ao pipeline aninhado do $tumblingWindow e você não deve incluí-lo na definição de pipeline do processador.

    sp.createStreamProcessor("solarDemo", [s, t, m])

Isso cria um processador de solarDemo fluxo chamado que aplica a query definida anteriormente e grava os dados processados na solarColl coleção do solarDb banco de dados no cluster ao qual você se conectou. Ele retorna várias medições derivadas de 10intervalos de segundos de observações de seus dispositivos especiais.

Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados at-rest, consulte $merge.

7

Execute o seguinte comando em mongosh:

sp.solarDemo.start()
8

Para verificar se o processador está ativo, execute o seguinte comando em mongosh:

sp.solarDemo.stats()

Este comando relata estatísticas operacionais do processador de fluxo do solarDemo .

Para verificar se o processador de stream está gravando dados em seu Atlas cluster:

  1. No Atlas, acesse a página Clusters do seu projeto.

    1. Se ainda não estiver exibido, selecione a organização que contém o projeto desejado no Menu Organizations na barra de navegação.

    2. Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.

    3. Se a página Clusters ainda não estiver exibida, clique em Database na barra lateral.

      A página Clusters é exibida.

  2. Clique no botão Browse Collections para o seu cluster.

    O Explorador de Dados é exibido.

  3. Ver a coleção MySolar .

Como alternativa, você pode exibir uma amostragem de documentos processados no terminal usando mongosh:

sp.solarDemo.sample()
{
_id: 10,
max_watts: 136,
min_watts: 130,
avg_watts: 133,
median_watts: 130,
max_temp: 7,
_stream_meta: {
source: {
type: 'generated'
},
window: {
start: ISODate('2024-08-12T22:49:05.000Z'),
end: ISODate('2024-08-12T22:49:10.000Z')
}
}
}

Observação

O anterior é um exemplo representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

9

Execute o seguinte comando em mongosh:

sp.avgWatts.drop()

Para confirmar que você descartou avgWatts, liste todos os seus processadores de stream disponíveis:

sp.listStreamProcessors()

Saiba como:

Voltar

Visão geral