Introdução ao Atlas Stream Processing
Nesta página
- Pré-requisitos
- Procedimento
- No Atlas, Go para a página Stream Processing do seu projeto.
- Crie uma Instância de Atlas Stream Processing .
- Obtenha a Atlas Stream Processing connection da instância do string.
- Adicione uma conexão MongoDB Atlas ao registro de conexão.
- Verifique se sua fonte de dados de streaming emite mensagens.
- Crie um processador de fluxo persistente.
- Inicie o processador de fluxo.
- Verifique a saída do processador de fluxo.
- Solte o processador de fluxo.
- Próximos passos
Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.
Pré-requisitos
Para concluir este tutorial você precisa:
Um projeto do Atlas
mongosh
versão 2.0 ou superiorUm Atlas user com a função
Project Owner
ouProject Stream Processing Owner
para gerenciar uma Instância de processamento de fluxo e um Registro de conexãoObservação
A função
Project Owner
permite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.A função
Project Stream Processing Owner
permite ações do Atlas Stream Processing, como visualizar, criar, excluir e editar instâncias de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de conexões.Consulte Funções de projeto para saber mais sobre as diferenças entre as duas funções.
Um utilizador de banco de dados com a função
atlasAdmin
para criar e executar processadores de streamUm cluster do Atlas
Procedimento
No Atlas, váGo para a Stream Processing página do seu projeto.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.
Na barra lateral, clique em Stream Processing sob o título Services.
A página Processamento de fluxo é exibida.
Crie uma Instância de Atlas Stream Processing .
Clique em Get Started no canto inferior direito. O Atlas fornece uma breve explicação dos principais componentes do Atlas Stream Processing.
Clique no botão Create instance.
Na página Create a stream processing instance , configure sua instância da seguinte maneira:
Tier:
SP30
Provider:
AWS
Region:
us-east-1
Instance Name:
tutorialInstance
Clique em Create.
Obtenha a Atlas Stream Processing connection da instância do string.
Localize o painel de visão geral da sua instância de Atlas Stream Processing e clique em Connect.
Selecione I have the MongoDB shell installed.
No menu suspenso Select your mongo shell version , selecione a versão mais recente do
mongosh
.Copie a connection string fornecida em Run your connection string in your command line. Você precisará disso em uma etapa posterior.
Clique em Close.
Adicione uma conexão MongoDB Atlas ao registro de conexão.
Essa conexão serve como nosso coletor de dados de streaming.
No painel da instância do Atlas Stream Processing , clique em Configure.
Na aba Connection Registry , clique em + Add Connection no canto superior direito.
Clique Atlas Database. No campo Connection Name , insira
mongodb1
. Na lista suspensa Atlas Cluster , selecione um Atlas cluster sem quaisquer dados armazenados nele.Clique em Add connection.
Verifique se sua fonte de dados de streaming emite mensagens.
Sua instância de processamento de fluxo vem pré-configurada com uma conexão com uma fonte de dados de amostra chamada sample_stream_solar
. Essa fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto temporal específico, bem como a potência máxima desse dispositivo.
O documento a seguir é um exemplo representativo.
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 }, _ts: ISODate('2024-08-12T21:41:01.788Z'), _stream_meta: { source: { type: 'generated' } } }
Para verificar se essa fonte emite mensagens, crie um processador de fluxo de forma interativa.
Abra um aplicativo de terminal de sua escolha.
Conecte-se à sua instância do Atlas Stream Processing com
mongosh
.Cole a
mongosh
connection string que você copiou em uma etapa anterior em seu terminal, onde<atlas-stream-processing-url>
é a URL da sua Atlas Stream Processing instância de e<username>
é um usuário com o papelatlasAdmin
.mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> Digite sua senha quando solicitado.
Crie o processador de stream.
Copie o seguinte código no prompt
mongosh
:sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) Verifique se os dados da conexão
sample_stream_solar
são exibidos no console e encerre o processo.Os processadores de stream que você cria com
sp.process()
não persistem depois que você os encerra.
Crie um processador de fluxo persistente.
Usando um pipeline de agregação, você pode transformar cada documento à medida que ele é ingerido. O seguinte pipeline de agregação deriva a temperatura máxima e as potências média, mediana, máxima e mínima de cada dispositivo solar em intervalos de um segundo.
Configure um estágio
$source
.A próxima etapa
$source
ingere dados da fontesample_stream_solar
.let s = { source: { connectionName: "sample_stream_solar" } } Configure um estágio
$group
.O estágio
$group
a seguir organiza todos os dados recebidos de acordo com seusgroup_id
, acumula os valores dos camposobs.temp
eobs.watts
de todos os documentos para cadagroup_id
e, em seguida, obtém os dados desejados.let g = { group: { _id: "$group_id", max_temp: { $avg: "$obs.temp" }, avg_watts: { $min: "$obs.watts" }, median_watts: { $min: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } Configure um estágio
$tumblingWindow
.Para realizar acumulações como
$group
em dados de streaming, o Atlas Stream Processing usa janelas para vincular o conjunto de dados. O estágio$tumblingWindow
a seguir separa o fluxo em intervalos consecutivos de 10 segundos.Isso significa, por exemplo, que quando o estágio
$group
calcula um valor paramedian_watts
, ele utiliza os valoresobs.watts
para todos os documentos com um determinadogroup_id
ingerido nos 10 segundos anteriores.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } Configurar um estágio $merge .
$merge
permite que você grave seus dados de streaming processados em um banco de dados Atlas.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } Crie o processador de stream.
Atribua um nome ao seu novo processador de fluxo e declare seu pipeline de agregação listando cada estágio em ordem. O estágio
$group
pertence ao pipeline aninhado do$tumblingWindow
e você não deve incluí-lo na definição do pipeline de processador.sp.createStreamProcessor("solarDemo", [s, t, m])
Isso cria um processador de fluxo chamado solarDemo
que aplica a query definida anteriormente e grava os dados processados na coleção solarColl
do banco de dados solarDb
no cluster ao qual você se conectou. Ele retorna várias medições derivadas de intervalos de 10 segundos de observações de seus dispositivos solares.
Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados em repouso, consulte $merge
.
Inicie o processador de fluxo.
Execute o seguinte comando em mongosh
:
sp.solarDemo.start()
Verifique a saída do processador de fluxo.
Para verificar se o processador está ativo, execute o seguinte comando em mongosh
:
sp.solarDemo.stats()
Este comando relata estatísticas operacionais do processador de fluxo do solarDemo
.
Para verificar se o processador de fluxo está gravando dados no cluster do Atlas:
No Atlas, acesse a página Clusters do seu projeto.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.
Se ainda não estiver exibido, clique em Clusters na barra lateral.
A página Clusters é exibida.
Clique no botão Browse Collections para o seu cluster.
O Data Explorer é exibido.
Veja a coleção
MySolar
.
Como alternativa, você pode exibir uma amostra de documentos processados no terminal usando mongosh
:
sp.solarDemo.sample()
{ _id: 10, max_watts: 136, min_watts: 130, avg_watts: 133, median_watts: 130, max_temp: 7, _stream_meta: { source: { type: 'generated' }, window: { start: ISODate('2024-08-12T22:49:05.000Z'), end: ISODate('2024-08-12T22:49:10.000Z') } } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.
Solte o processador de fluxo.
Execute o seguinte comando em mongosh
:
sp.solarDemo.drop()
Para confirmar que você descartou avgWatts
, liste todos os seus processadores de stream disponíveis:
sp.listStreamProcessors()
Próximos passos
Saiba como: