Introdução ao Atlas Stream Processing
Nesta página
- Pré-requisitos
- Procedimento
- No Atlas, Go para a página Stream Processing do seu projeto.
- Crie uma Instância de Atlas Stream Processing .
- Obtenha a Atlas Stream Processing connection da instância do string.
- Adicione uma conexão MongoDB Atlas ao registro de conexão.
- Verifique se sua fonte de dados de streaming emite mensagens.
- Crie um processador de fluxo persistente.
- Inicie o processador de fluxo.
- Verifique a saída do processador de fluxo.
- Solte o processador de fluxo.
- Próximos passos
Este tutorial orienta você pelas etapas de configuração do Atlas Stream Processing e execução do seu primeiro processador de stream.
Pré-requisitos
Para concluir este tutorial você precisa:
Um projeto do Atlas
mongosh
versão 2.0 ou superiorUm usuário do Atlas com o papel
Project Owner
ouProject Stream Processing Owner
para gerenciar uma Instância de Processamento de Fluxo e Registro de ConexãoObservação
A função
Project Owner
permite que você crie sistemas de banco de dados, gerencie o acesso ao projeto e as configurações do projeto, gerencie entradas da lista de acesso IP e muito mais.A função
Project Stream Processing Owner
habilita ações do Atlas Stream Processing, como visualizar, criar, excluir e editar instâncias de processamento de fluxo e visualizar, adicionar, modificar e excluir conexões no registro de conexões.Consulte Roles do projeto para saber mais sobre as diferenças entre as duas roles.
Um utilizador de banco de dados com a função
atlasAdmin
para criar e executar processadores de streamUm cluster do Atlas
Procedimento
No Atlas, vá Stream Processing para a página do seu projeto.
Se ainda não tiver sido exibido, selecione a organização que contém seu projeto no menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione seu projeto no menu Projects na barra de navegação.
Na barra lateral, clique em Stream Processing sob o título Services .
A página Processamento de Stream é exibida.
Crie uma Instância de Atlas Stream Processing .
Clique em Get Started no canto inferior direito. O Atlas fornece uma breve explicação dos principais componentes do Atlas Stream Processing.
Clique no botão Create instance.
Na página Create a stream processing instance , configure sua instância da seguinte maneira:
Tier:
SP30
Provider:
AWS
Region:
us-east-1
Instance Name:
tutorialInstance
Clique em Create.
Obtenha a Atlas Stream Processing connection da instância do string.
Localize o painel de visão geral da sua instância de Atlas Stream Processing e clique em Connect.
Selecione I have the MongoDB shell installed.
No menu suspenso Select your mongo shell version , selecione a versão mais recente do
mongosh
.Copie a connection string fornecida em Run your connection string in your command line. Você precisará disso em uma etapa posterior.
Clique em Close.
Adicione uma conexão MongoDB Atlas ao registro de conexão.
Essa conexão serve como nosso coletor de dados de streaming.
No painel da instância do Atlas Stream Processing , clique em Configure.
Na aba Connection Registry , clique em + Add Connection no canto superior direito.
Clique Atlas Database. No campo Connection Name , insira
mongodb1
. Na lista suspensa Atlas Cluster , selecione um Atlas cluster sem quaisquer dados armazenados nele.Clique em Add connection.
Verifique se sua fonte de dados de streaming emite mensagens.
Sua instância de processamento de fluxo vem pré-configurada com uma conexão com uma fonte de dados de exemplo chamada sample_stream_solar
. Esta fonte gera um fluxo de relatórios de vários dispositivos de energia solar. Cada relatório descreve a potência e a temperatura observadas de um único dispositivo solar em um ponto específico no tempo, bem como a potência máxima desse dispositivo.
O documento a seguir é um exemplo representativo.
{ device_id: 'device_8', group_id: 7, timestamp: '2024-08-12T21:41:01.788+00:00', max_watts: 450, event_type: 0, obs: { watts: 252, temp: 17 }, _ts: ISODate('2024-08-12T21:41:01.788Z'), _stream_meta: { source: { type: 'generated' } } }
Para verificar se essa fonte emite mensagens, crie um processador de stream interativamente.
Abra um aplicativo de terminal de sua escolha.
Conecte-se à sua instância do Atlas Stream Processing com
mongosh
.Cole a
mongosh
connection string que você copiou em uma etapa anterior em seu terminal, onde<atlas-stream-processing-url>
é a URL da sua Atlas Stream Processing instância de e<username>
é um usuário com o papelatlasAdmin
.mongosh "mongodb://<atlas-stream-processing-url>/" --tls --authenticationDatabase admin --username <username> Digite sua senha quando solicitado.
Crie o processador de stream.
Copie o seguinte código no prompt
mongosh
:sp.process([{"$source": { "connectionName": "sample_stream_solar" }}]) Verifique se os dados da conexão
sample_stream_solar
são exibidos no console e encerre o processo.Os processadores de stream que você cria com
sp.process()
não persistem depois que você os encerra.
Crie um processador de fluxo persistente.
Usando uma aggregation pipeline, você pode transformar cada documento à medida que ele é ingerido. O pipeline de agregação a seguir deriva a temperatura máxima e as potências média, mediana, máxima e mínima de cada dispositivo solar em intervalos de um segundo.
Configure um estágio
$source
.O estágio
$source
a seguir ingere dados da fontesample_stream_solar
e define o valor do campo de tempo de processamento do Atlas Stream igual ao valor do campotimestamp
da fonte.let s = { source: { connectionName: "sample_stream_solar", timeField: { $dateFromString: { dateString: '$timestamp' } } } } Configure um estágio
$group
.O estágio
$group
a seguir organiza todos os dados recebidos de acordo com seugroup_id
, acumula os valores dos camposobs.temp
eobs.watts
de todos os documentos para cadagroup_id
e deriva os dados desejados.let g = { group: { _id: "$group_id", max_temp: { $avg: "$obs.temp" }, avg_watts: { $min: "$obs.watts" }, median_watts: { $min: "$obs.watts" }, max_watts: { $max: "$obs.watts" }, min_watts: { $min: "$obs.watts" } } } Configure um estágio
$tumblingWindow
.Para realizar acumulações como
$group
em dados de streaming, o Atlas Stream Processing usa janelas para limitar o conjunto de dados. O estágio$tumblingWindow
a seguir separa o stream em intervalos consecutivos 10segundos.Isso significa, por exemplo, que quando o estágio
$group
calcula um valor paramedian_watts
, ele assume os valoresobs.watts
para todos os documentos com um determinadogroup_id
ingerido nos 10 segundos anteriores.let t = { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, pipeline: [g] } } Configure um estágio $merge .
$merge
permite que você grave seus dados de streaming processados em um banco de dados Atlas.let m = { merge: { into: { connectionName: "mongodb1", db: "solarDb", coll: "solarColl" } } } Crie o processador de stream.
Atribua um nome ao seu novo processador de stream e declare seu pipeline de agregação listando cada estágio em ordem. O estágio
$group
pertence ao pipeline aninhado do$tumblingWindow
e você não deve incluí-lo na definição de pipeline do processador.sp.createStreamProcessor("solarDemo", [s, t, m])
Isso cria um processador de solarDemo
fluxo chamado que aplica a query definida anteriormente e grava os dados processados na solarColl
coleção do solarDb
banco de dados no cluster ao qual você se conectou. Ele retorna várias medições derivadas de 10intervalos de segundos de observações de seus dispositivos especiais.
Para saber mais sobre como o Atlas Stream Processing grava em bancos de dados at-rest, consulte $merge
.
Inicie o processador de fluxo.
Execute o seguinte comando em mongosh
:
sp.solarDemo.start()
Verifique a saída do processador de fluxo.
Para verificar se o processador está ativo, execute o seguinte comando em mongosh
:
sp.solarDemo.stats()
Este comando relata estatísticas operacionais do processador de fluxo do solarDemo
.
Para verificar se o processador de stream está gravando dados em seu Atlas cluster:
No Atlas, acesse a página Clusters do seu projeto.
Se ainda não estiver exibido, selecione a organização que contém o projeto desejado no Menu Organizations na barra de navegação.
Se ainda não estiver exibido, selecione o projeto desejado no menu Projects na barra de navegação.
Se a página Clusters ainda não estiver exibida, clique em Database na barra lateral.
A página Clusters é exibida.
Clique no botão Browse Collections para o seu cluster.
O Explorador de Dados é exibido.
Ver a coleção
MySolar
.
Como alternativa, você pode exibir uma amostragem de documentos processados no terminal usando mongosh
:
sp.solarDemo.sample()
{ _id: 10, max_watts: 136, min_watts: 130, avg_watts: 133, median_watts: 130, max_temp: 7, _stream_meta: { source: { type: 'generated' }, window: { start: ISODate('2024-08-12T22:49:05.000Z'), end: ISODate('2024-08-12T22:49:10.000Z') } } }
Observação
O anterior é um exemplo representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.
Solte o processador de fluxo.
Execute o seguinte comando em mongosh
:
sp.avgWatts.drop()
Para confirmar que você descartou avgWatts
, liste todos os seus processadores de stream disponíveis:
sp.listStreamProcessors()
Próximos passos
Saiba como: