managed processadores de fluxo
Nesta página
- Pré-requisitos
- Considerações
- Crie um processador de stream interativamente
- Conecte-se à sua instância de processamento de fluxo.
- Defina um pipeline.
- Crie um processador de fluxo.
- Crie um processador de fluxo
- Inicie um processador de fluxo
- Interromper um processador de fluxo
- Modificar um processador de stream
- Limitações
- Para modificar um processador de fluxo:
- Descartar um processador de fluxo
- Liste os processadores de fluxo disponíveis
- Amostra de um processador de fluxo
- Ver estatísticas de um processador de fluxo
Um processador de fluxo Atlas Stream Processing aplica a lógica de umpipeline de agregação de fluxo com nome exclusivo aos seus dados de streaming. O Atlas Stream Processing salva cada definição do processador de fluxo no armazenamento persistente para que elas possam ser reutilizadas. Você só pode usar um determinado processador de fluxo na instância de processamento de fluxo em que sua definição está armazenada. O Atlas Stream Processing suporta até 4 processadores de fluxo por trabalhador. Para processadores adicionais que excedem esse limite, o Atlas Stream Processing aloca um novo recurso.
Pré-requisitos
Para criar e managed um processador de stream, você deve ter:
mongosh
versão 2.0 ou superiorUm utilizador de banco de dados com a função
atlasAdmin
para criar e executar processadores de streamUm cluster do Atlas
Considerações
Muitos comandos do processador de fluxo exigem que você especifique o nome do processador de fluxo relevante na invocação do método. A sintaxe descrita nas seções a seguir assume nomes estritamente alfanuméricos. Se o nome do processador de stream incluir caracteres não alfanuméricos, como hífens (-
) ou pontos finais (.
), você deverá colocar o nome entre colchetes ([]
) e aspas duplas (""
) no invocação de método, como em sp.["special-name-stream"].stats()
.
Crie um processador de stream interativamente
Você pode criar um processador de fluxo interativamente com o método sp.process()
. Os processadores de fluxo que você cria interativamente exibem o seguinte comportamento:
Gravar documentos de saída e dead letter queue (DLQ) no shell
Começam a ser executados imediatamente após a criação
Execute por 10 minutos ou até que o usuário os pare
Não persista depois de parar
Os processadores de fluxo que você cria interativamente são destinados à prototipagem. Para criar um processador de fluxo persistente, consulte Criar um processador de fluxo.
sp.process()
tem a seguinte sintaxe:
sp.process(<pipeline>)
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
| array | Obrigatório | Transmitir o pipeline de agregação que você deseja aplicar aos seus dados de streaming. |
Conecte-se à sua instância de processamento de fluxo.
Use a connection string associada à sua instância de Atlas Stream Processing para se conectar usando mongosh
.
Exemplo
O seguinte comando conecta-se a uma instância de processamento de stream como um usuário chamado streamOwner
usando a autenticação x.059:
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Forneça sua senha de usuário quando solicitado.
Defina um pipeline.
No prompt mongosh
, atribua uma array contendo as fases de agregação que você quer aplicar a uma variável chamada pipeline
.
O exemplo a seguir usa o tópico stuff
na conexão myKafka
no registro de conexão como o $source
, corresponde a registros em que o campo temperature
tem um valor de 46
e emite as mensagens processadas para o output
tópico da conexão mySink
no registro de conexão:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
Crie um processador de fluxo
Para criar um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para criar um processador de fluxo.
Para criar um novo processador de fluxo com mongosh
, use o método sp.createStreamProcessor()
. Tem a seguinte sintaxe:
sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument | Tipo | necessidade | Descrição |
---|---|---|---|
| string | Obrigatório | Nome lógico do processador de fluxo. Deve ser exclusivo na instância de processamento de fluxo. Esse nome deve conter somente caracteres alfanuméricos. |
| array | Obrigatório | Transmitir o pipeline de agregação que você deseja aplicar aos seus dados de streaming. |
| objeto | Opcional | objeto que define várias configurações opcionais para o processador de fluxo. |
| objeto | Condicional | objeto que atribui uma fila de mensagens não entregues (DLQ) para sua instância de Atlas Stream Processing. Este campo é necessário se você definir o campo |
| string | Condicional | Etiqueta legível por humanos que identifica uma conexão em seu registro de conexões. Esta conexão deve fazer referência a um cluster do Atlas. Este campo é necessário se você definir o campo |
| string | Condicional | Nome de um reconhecimento de data center Atlas no cluster especificado em |
| string | Condicional | Nome de uma collection no reconhecimento de data center especificado no |
Conecte-se à sua instância de processamento de fluxo.
Use a connection string associada à sua instância de Atlas Stream Processing para se conectar usando mongosh
.
Exemplo
O seguinte comando conecta-se a uma instância de processamento de stream como um usuário chamado streamOwner
usando a autenticação x.059.
mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\ --tls --authenticationDatabase admin --username streamOwner
Forneça sua senha de usuário quando solicitado.
Defina um pipeline.
No prompt mongosh
, atribua uma array contendo as fases de agregação que você quer aplicar a uma variável chamada pipeline
.
O exemplo a seguir usa o tópico stuff
na conexão myKafka
no registro de conexão como o $source
, corresponde a registros em que o campo temperature
tem um valor de 46
e emite as mensagens processadas para o output
tópico da conexão mySink
no registro de conexão:
pipeline = [ {$source: {"connectionName": "myKafka", "topic": "stuff"}}, {$match: { temperature: 46 }}, { "$emit": { "connectionName": "mySink", "topic" : "output", } } ]
(Opcional) Defina um DLQ.
No prompt mongosh
, atribua um objeto contendo as seguintes propriedades do seu DLQ:
connectionName
Nome do Banco de Dados
Nome da Coleção
O exemplo a seguir define um DLQ na conexão cluster01
, na collection de reconhecimento de data center metadata.dlq
.
deadLetter = { dlq: { connectionName: "cluster01", db: "metadata", coll: "dlq" } }
Inicie um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped
há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Para iniciar um processador de fluxo:
A API Atlas Administration fornece um endpoint para iniciar um processador de fluxo.
Para iniciar um processador de fluxo existente com mongosh
, use o método sp.<streamprocessor>.start()
. <streamprocessor>
deve ser o nome de um processador de fluxo definido para a instância de processamento de fluxo atual.
Por exemplo, para iniciar um processador de fluxo denominado proc01
, execute o seguinte comando:
sp.proc01.start()
Este método retorna:
true
se o processador de fluxo existir e não estiver em execução no momento.false
se você tentar iniciar um processador de fluxo que não existe ou existe e está em execução no momento.
Interromper um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped
há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Para interromper um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para interromper um processador de fluxo.
Para interromper um processador de fluxo existente com mongosh
, use o método sp.<streamprocessor>.stop()
. <streamprocessor>
deve ser o nome de um processador de fluxo em execução no momento definido para a instância de processamento de fluxo atual.
Por exemplo, para parar um processador de stream denominado proc01
, execute o seguinte comando:
sp.proc01.stop()
Este método retorna:
true
se o processador de stream existir e estiver em execução.false
se o processador de fluxo não existir ou se o processador de fluxo não estiver em execução no momento.
Modificar um processador de stream
Você pode modificar os seguintes elementos de um processador de stream existente:
Para modificar um processador de fluxo, faça o seguinte:
Aplique sua atualização ao processador de fluxo.
Por padrão, os processadores modificados restauram a partir do último checkpoint. Como alternativa, você pode definir resumeFromCheckpoint=false
; nesse caso, o processador retém apenas estatísticas resumidas. Quando você modifica um processador com janelas abertas, as janelas são totalmente recalculadas no pipeline atualizado.
Limitações
Quando a configuração padrão resumeFromCheckpoint=true
está habilitada, as seguintes limitações se aplicam:
Você não pode modificar o estágio
$source
.Você não pode modificar o intervalo da sua janela.
Não é possível remover uma janela.
Você só pode modificar um pipeline com uma janela se essa janela tiver um estágio
$group
ou$sort
em seu pipeline interno.Você não pode alterar um tipo de janela existente. Por exemplo, você não pode alterar de
$tumblingWindow
para$hoppingWindow
ou vice-versa.Os processadores com janelas podem reprocessar alguns dados como resultado do recalculo das janelas.
Para modificar um processador de fluxo:
Exige mongosh
v2.3.4+.
Use o comando sp.<streamprocessor>.modify()
para modificar um processador de fluxo existente. <streamprocessor>
deve ser o nome de um processador de fluxo parado definido para a instância de processamento de fluxo atual.
Por exemplo, para modificar um processador de fluxo denominado proc01
, execute o seguinte comando:
sp.proc1.modify(<pipeline>, { resumeFromCheckpoint: bool, // optional name: string, // optional dlq: string, // optional }})
Adicionar um estágio a um pipeline existente
sp.createStreamProcessor("foo", [ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ]) sp.foo.start();
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ]); sp.foo.start();
Modificar a origem de entrada de um processador de stream
sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test", config: { startAtOperationTime: new Date(now.getTime() - 5 * 60 * 1000) } }}, {$match: { operationType: "insert" }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout2" } }} ], {resumeFromCheckpoint: false});
Remover uma fila de letras mortas de um processador de streams
sp.foo.stop(); sp.foo.modify({dlq: {}}) sp.foo.start();
Modificar um processador de fluxo com uma janela
sp.foo.stop(); sp.foo.modify([ {$source: { connectionName: "StreamsAtlasConnection", db: "test", coll: "test" }}, {$replaceRoot: {newRoot: "$fullDocument"}}, {$match: {cost: {$gt: 500}}}, {$tumblingWindow: { interval: {unit: "day", size: 1}, pipeline: [ {$group: {_id: "$customerId", sum: {$sum: "$cost"}, avg: {$avg: "$cost"}}} ] }}, {$merge: { into: { connectionName: "StreamsAtlasConnection", db: "testout", coll: "testout" } }} ], {resumeFromCheckpoint: false}); sp.foo.start();
A API de administração do Atlas fornece um endpoint para modificar um processador de stream.
Descartar um processador de fluxo
Para descartar um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para excluir um processador de fluxo.
Para excluir um processador de fluxo existente com mongosh
, use o método sp.<streamprocessor>.drop()
. <streamprocessor>
deve ser o nome de um processador de fluxo definido para a instância de processamento de fluxo atual.
Por exemplo, para eliminar um processador de fluxo denominado proc01
, execute o seguinte comando:
sp.proc01.drop()
Este método retorna:
true
se o processador de fluxo existir.false
se o processador de fluxo não existir.
Quando você descarta um processador de fluxo, todos os recursos que o Atlas Stream Processing provisionou para ele são destruídos, junto com todo o estado salvo.
Liste os processadores de fluxo disponíveis
Para listar todos os processadores de fluxo disponíveis:
A API de Administração do Atlas fornece um endpoint para listar todos os processadores de fluxo disponíveis.
Para listar todos os processadores de fluxo disponíveis na instância de processamento de fluxo atual com mongosh
, use o método sp.listStreamProcessors()
. Ele retorna uma lista de documentos contendo o nome, a hora de início, o estado atual e o respectivo pipeline de cada processador de fluxo. Tem a seguinte sintaxe:
sp.listStreamProcessors(<filter>)
<filter>
é um documento que especifica por quais campos filtrar a lista.
Exemplo
O exemplo a seguir mostra um valor de retorno para uma solicitação não filtrada:
sp.listStreamProcessors()
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }, 28 { 29 id: '0218', 30 name: "proc02", 31 last_modified: ISODate("2023-03-21T20:17:33.601Z"), 32 state: "STOPPED", 33 error_msg: '', 34 pipeline: [ 35 { 36 $source: { 37 connectionName: "myKafka", 38 topic: "things" 39 } 40 }, 41 { 42 $match: { 43 temperature: 41 44 } 45 }, 46 { 47 $emit: { 48 connectionName: "mySink", 49 topic: "results", 50 } 51 } 52 ], 53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z") 54 }
Se você executar o comando novamente na mesma instância do Atlas Stream Processing, filtrando para um "state"
de "running"
, verá a seguinte saída:
sp.listStreamProcessors({"state": "running"})
1 { 2 id: '0135', 3 name: "proc01", 4 last_modified: ISODate("2023-03-20T20:15:54.601Z"), 5 state: "RUNNING", 6 error_msg: '', 7 pipeline: [ 8 { 9 $source: { 10 connectionName: "myKafka", 11 topic: "stuff" 12 } 13 }, 14 { 15 $match: { 16 temperature: 46 17 } 18 }, 19 { 20 $emit: { 21 connectionName: "mySink", 22 topic: "output", 23 } 24 } 25 ], 26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z") 27 }
Amostra de um processador de fluxo
Para retornar um array de resultados obtidos como amostra de um processador de fluxo existente para STDOUT
com mongosh
, use o método sp.<streamprocessor>.sample()
. <streamprocessor>
deve ser o nome de um processador de fluxo em execução atualmente definido para a instância de processamento de fluxo atual. Por exemplo, os exemplos de comando a seguir são de um processador de fluxo chamado proc01
.
sp.proc01.sample()
Esse comando é executado continuamente até que você o cancele usando CTRL-C
ou até que as amostras retornadas atinjam cumulativamente 40 MB de tamanho. O processador de fluxo relata documentos inválidos na amostra em um documento _dlqMessage
do seguinte formato:
{ _dlqMessage: { _stream_meta: { source: { type: "<type>" } }, errInfo: { reason: "<reasonForError>" }, doc: { _id: ObjectId('<group-id>'), ... }, processorName: '<procName>', instanceName: '<instanceName>', dlqTime: ISODate('2024-09-19T20:04:34.263+00:00') } }
Você pode usar essas mensagens para diagnosticar problemas de higiene de dados sem definir uma coleção de filas de letras mortas.
Ver estatísticas de um processador de fluxo
Observação
O Atlas Stream Processing descarta o estado interno dos processadores de stream que estão stopped
há 45 dias ou mais. Quando você inicia esse processador, ele opera e relata estatísticas idênticas à execução inicial.
Para visualizar as estatísticas de um processador de fluxo:
A API de Administração do Atlas fornece um endpoint para visualizar as estatísticas de um processador de fluxo.
Para retornar um documento resumindo o status atual de um processador de fluxo existente com mongosh
, use o método sp.<streamprocessor>.stats()
. streamprocessor
deve ser o nome de um processador de fluxo em execução no momento definido para a instância de processamento de fluxo atual. Tem a seguinte sintaxe:
sp.<streamprocessor>.stats({options: {<options>}})
Onde options
é um documento opcional com os seguintes campos:
Campo | Tipo | Descrição |
---|---|---|
| inteiro | Unidade a ser usada para o tamanho dos itens na saída. Por padrão, o Atlas Stream Processing exibe o tamanho do item em bytes. Para exibir em KB, especifique um |
| booleano | Sinalizador que especifica o nível de verbosidade do documento de saída. Se definido como |
O documento de saída tem os seguintes campos:
Campo | Tipo | Descrição |
---|---|---|
| string | O namespace no qual o processador de stream está definido. |
| objeto | Um documento que descreve o estado operacional do processador de fluxo. |
| string | O nome do processador de fluxo. |
| string | O status do processador de fluxo. Este campo pode ter os seguintes valores:
|
| inteiro | A escala na qual o campo de tamanho é exibido. Se definido como |
| inteiro | O número de documentos publicados no stream. Um documento é considerado "publicado" no fluxo quando passa pelo estágio |
| inteiro | O número de bytes ou kilobytes publicados no stream. Os bytes são considerados "publicados" no fluxo quando passam pelo estágio |
| inteiro | O número de documentos processados pelo fluxo. Um documento é considerado "processado" pelo fluxo quando passa por todo o pipeline. |
| inteiro | O número de bytes ou kilobytes processados pelo stream. Os bytes são considerados "processados" pelo fluxo quando passam por todo o pipeline. |
| inteiro | O número de documento enviados para a fila de mensagens não entregues (DLQ). |
| inteiro | O número de bytes ou kilobytes enviados para a fila de mensagens não entregues (DLQ). |
| inteiro | A diferença, em segundos, entre o tempo do evento representado pelo token de retomada do fluxo de alterações mais recente e o evento mais recente no oplog. |
| token | O token de retomada do fluxo de alterações mais recente. Só se aplica a processadores de fluxo com uma fonte do fluxo de alterações. |
| inteiro | O número de bytes usados pelo Windows para armazenar o estado do processador. |
| inteiro | O carimbo de data/hora da marca d'Água atual. |
| array | As estatísticas de cada operador no pipeline do processador. O Atlas Stream Processing retorna este campo somente se você passar a opção
|
| inteiro | O uso máximo de memória do operador em bytes ou kilobytes. |
| inteiro | O tempo total de execução do operador em segundos. |
| data | A hora de início da janela aberta mínima. Este valor é opcional. |
| data | A hora de início da janela aberta máxima. Este valor é opcional. |
| array | Informações de compensação das partições de um broker Apache Kafka. |
| inteiro | O número da partição do tópico do Apache Kafka. |
| inteiro | O deslocamento em que o processador de fluxo está ativado para a partição especificada. Esse valor é igual ao offset anterior que o processador de stream processou mais |
| inteiro | O deslocamento que o processador de fluxo confirmou pela última vez com o Apache Kafka corretor e o ponto de verificação da partição especificada. Todas as mensagens através deste deslocamento são registradas no último ponto de verificação. |
| booleano | O sinalizador que indica se a partição está ociosa. Este valor é padronizado como |
Por exemplo, o seguinte mostra o status de um processador de Atlas Stream Processing denominado proc01
em uma instância de Atlas Stream Processing denominada inst01
com tamanhos de item exibidos em KB:
sp.proc01.stats(1024) { ok: 1, ns: 'inst01', stats: { name: 'proc01', status: 'running', scaleFactor: Long("1"), inputMessageCount: Long("706028"), inputMessageSize: 958685236, outputMessageCount: Long("46322"), outputMessageSize: 85666332, dlqMessageCount: Long("0"), dlqMessageSize: Long("0"), stateSize: Long("2747968"), watermark: ISODate("2023-12-14T14:35:32.417Z"), ok: 1 }, }