Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/

managed processadores de fluxo

Nesta página

  • Pré-requisitos
  • Considerações
  • Crie um processador de stream interativamente
  • Conecte-se à sua instância de processamento de fluxo.
  • Defina um pipeline.
  • Crie um processador de fluxo.
  • Crie um processador de fluxo
  • Conecte-se à sua instância de processamento de fluxo.
  • Defina um pipeline.
  • (Opcional) Defina umaDLQ .
  • Crie um processador de fluxo.
  • Inicie um processador de fluxo
  • Interromper um processador de fluxo
  • Descartar um processador de fluxo
  • Liste os processadores de fluxo disponíveis
  • Amostra de um processador de fluxo
  • Ver estatísticas de um processador de fluxo

Um processador de stream do Atlas Stream Processing aplica a lógica de umaggregation pipeline de stream nomeado exclusivamente aos seus dados de streaming. O Atlas Stream Processing salva cada definição do processador de stream no armazenamento persistente para que ela possa ser reutilizada. Você só pode usar um determinado processador de stream na instância de processamento de fluxo em que sua definição está armazenada. O Atlas Stream Processing aceita até 4 processadores de stream por trabalhador. Para processadores adicionais que excedam esse limite, o Atlas Stream Processing aloca um novo recurso.

Para criar e managed um processador de stream, você deve ter:

Muitos comandos do processador de fluxo exigem que você especifique o nome do processador de fluxo relevante na invocação do método. A sintaxe descrita nas seções a seguir assume nomes estritamente alfanuméricos. Se o nome do processador de stream incluir caracteres não alfanuméricos, como hífens (-) ou pontos finais (.), você deverá colocar o nome entre colchetes ([]) e aspas duplas ("") no invocação de método, como em sp.["special-name-stream"].stats().

Você pode criar um processador de stream interativamente com o método sp.process() . Os processadores de fluxo que você cria interativamente exibem o seguinte comportamento:

  • Gravar documentos de saída e dead letter queue (DLQ) no shell

  • Começam a ser executados imediatamente após a criação

  • Execute por 10 minutos ou até que o usuário pare

  • Não persista depois de parar

Os processadores de fluxo que você cria interativamente destinam-se à criação de protótipos. Para criar um processador de fluxo persistente, consulte Criar um processador de fluxo.

sp.process() tem a seguinte sintaxe:

sp.process(<pipeline>)
Campo
Tipo
necessidade
Descrição
pipeline
array
Obrigatório
Transmitir o pipeline de agregação que você deseja aplicar aos seus dados de streaming.
1

Use a connection string associada à sua instância de Atlas Stream Processing para se conectar usando mongosh.

Exemplo

O seguinte comando conecta a uma instância de Atlas Stream Processing como um usuário denominado streamOwner utilizando autenticação SCRAM-SHA-256:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Forneça sua senha de usuário quando solicitado.

2

No prompt mongosh , atribua uma array contendo os estágios de aggregation que você deseja aplicar a uma variável denominada pipeline.

O exemplo a seguir usa o tópico stuff na conexão myKafka no registro de conexão como o $source, corresponde a registros em que o campo temperature tem um valor de 46 e emite as mensagens processadas para o output tópico da conexão mySink no registro de conexão:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

O seguinte comando cria um processador de fluxo que aplica a lógica definida no pipeline.

sp.process(pipeline)

Para criar um novo processador de fluxo com mongosh, use o método sp.createStreamProcessor() . Tem a seguinte sintaxe:

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argumento
Tipo
necessidade
Descrição
name
string
Obrigatório
Nome lógico para o processador de stream. Deve ser exclusivo dentro da instância de processamento de fluxo. Este nome deve conter apenas caracteres alfanuméricos.
pipeline
array
Obrigatório
Transmitir o pipeline de agregação que você deseja aplicar aos seus dados de streaming.
options
objeto
Opcional
objeto que define várias configurações opcionais para o processador de fluxo.
options.dlq
objeto
Condicional
objeto que atribui uma fila de mensagens não entregues (DLQ) para sua instância de Atlas Stream Processing. Este campo é necessário se você definir o campo options .
options.dlq.connectionName
string
Condicional
Etiqueta legível por humanos que identifica uma conexão em seu registro de conexões. Esta conexão deve fazer referência a um cluster do Atlas. Este campo é necessário se você definir o campo options.dlq .
options.dlq.db
string
Condicional
Nome de um reconhecimento de data center Atlas no cluster especificado em options.dlq.connectionName. Este campo é necessário se você definir o campo options.dlq .
options.dlq.coll
string
Condicional
Nome de uma collection no reconhecimento de data center especificado no options.dlq.db. Este campo é necessário se você definir o campo options.dlq .
1

Use a connection string associada à sua instância de Atlas Stream Processing para se conectar usando mongosh.

Exemplo

O seguinte comando conecta a uma instância de Atlas Stream Processing como um usuário denominado streamOwner utilizando autenticação SCRAM-SHA-256:

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

Forneça sua senha de usuário quando solicitado.

2

No prompt mongosh , atribua uma array contendo os estágios de aggregation que você deseja aplicar a uma variável denominada pipeline.

O exemplo a seguir usa o tópico stuff na conexão myKafka no registro de conexão como o $source, corresponde a registros em que o campo temperature tem um valor de 46 e emite as mensagens processadas para o output tópico da conexão mySink no registro de conexão:

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

No prompt mongosh , atribua um objeto contendo as seguintes propriedades do seu DLQ:

  • connectionName

  • Nome do Banco de Dados

  • Nome da Coleção

O exemplo a seguir define um DLQ na conexão cluster01 , na collection de reconhecimento de data center metadata.dlq .

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

O comando a seguir cria um processador de fluxo chamado proc01 que aplica a lógica definida em pipeline. Os documentos que lançam erros no processamento são gravados no DLQ definido em deadLetter.

sp.createStreamProcessor("proc01", pipeline, deadLetter)

Para iniciar um processador de fluxo existente com mongosh, use o método sp.<streamprocessor>.start() . <streamprocessor> deve ser o nome de um processador de fluxo definido para a instância de processamento de fluxo atual.

Por exemplo, para iniciar um processador de fluxo denominado proc01, execute o seguinte comando:

sp.proc01.start()

Este método retorna:

  • true se o processador de stream existir e não estiver em execução no momento.

  • false se você tentar iniciar um processador de fluxo que não existe ou existe e está em execução no momento.

Para interromper um processador de fluxo existente com mongosh, use o método sp.<streamprocessor>.stop() . <streamprocessor> deve ser o nome de um processador de fluxo atualmente em execução definido para a instância de processamento de fluxo atual.

Por exemplo, para parar um processador de stream denominado proc01, execute o seguinte comando:

sp.proc01.stop()

Este método retorna:

  • true se o processador de stream existir e estiver em execução.

  • false se o processador de stream não existir ou se o processador de stream não estiver em execução no momento.

Para excluir um processador de fluxo existente com mongosh, use o método sp.<streamprocessor>.drop() . <streamprocessor> deve ser o nome de um processador de fluxo definido para a instância de processamento de fluxo atual.

Por exemplo, para eliminar um processador de fluxo denominado proc01, execute o seguinte comando:

sp.proc01.drop()

Este método retorna:

  • true se o processador de fluxo existir.

  • false se o processador de fluxo não existir.

Quando você descarta um processador de fluxo, todos os recursos que o Atlas Stream Processing provisionou para ele são destruídos, junto com todo o estado salvo.

Para listar todos os processadores de fluxo disponíveis na instância de processamento de fluxo atual com mongosh, utilize o método sp.listStreamProcessors() . Ele retorna uma lista de documentos que contêm o nome, hora de início, estado atual e pipeline associados a cada processador de stream. Tem a seguinte sintaxe:

sp.listStreamProcessors(<filter>)

<filter> é um documento que especifica por quais campos filtrar a lista.

Exemplo

O exemplo a seguir mostra um valor de retorno para uma solicitação não filtrada:

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

Se você executar o comando novamente na mesma instância do Atlas Stream Processing, filtrando para um "state" de "running", verá a seguinte saída:

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

Para retornar uma array de resultados de amostra de um processador de stream existente para STDOUT com mongosh, use o método sp.<streamprocessor>.sample() . <streamprocessor> deve ser o nome de um processador de fluxo atualmente em execução definido para a instância de processamento de fluxo atual. Por exemplo, as seguintes amostras de comando de um processador de stream denominado proc01.

sp.proc01.sample()

Esse comando é executado continuamente até que você o cancele usando CTRL-C ou até que as amostras retornadas atinjam cumulativamente 40 MB de tamanho. O processador de fluxo relata documentos inválidos na amostra em um documento _dlqMessage do seguinte formato:

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>'
}
}

Você pode usar essas mensagens para diagnosticar problemas de limpeza de dados sem definir uma coleção de fila de letras mortas .

Para retornar um documento resumindo o status atual de um processador de fluxo existente com mongosh, use o método sp.<streamprocessor>.stats() . streamprocessor deve ser o nome de um processador de fluxo atualmente em execução definido para a instância de processamento de fluxo atual. Tem a seguinte sintaxe:

sp.<streamprocessor>.stats({options: {<options>}})

Onde options é um documento opcional com os seguintes campos:

Campo
Tipo
Descrição
scale
inteiro
Unidade a ser usada para o tamanho dos itens na saída. Por padrão, o Atlas Stream Processing exibe o tamanho do item em bytes. Para exibir em KB, especifique um scale de 1024 .
verbose
booleano
Sinalizador que especifica o nível de verbosidade do documento de saída. Se definido como true, o documento de saída contém um subdocumento que relata as estatísticas de cada operador individual em seu pipeline. O padrão é false.

O documento de saída tem os seguintes campos:

Campo
Tipo
Descrição
ns
string
O namespace no qual o processador de stream está definido.
stats
objeto
Um documento que descreve o estado operacional do processador de fluxo.
stats.name
string
O nome do processador de fluxo.
stats.status
string

O status do processador de fluxo. Este campo pode ter os seguintes valores:

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
inteiro
A escala na qual o campo de tamanho é exibido. Se definido como 1, os tamanhos são exibidos em bytes. Se definido como 1024 , os tamanhos são exibidos em kilobytes.
stats.inputMessageCount
inteiro
O número de documentos publicados no stream. Um documento é considerado "publicado" no fluxo quando passa pelo estágio $source , não quando passa por todo o pipeline.
stats.inputMessageSize
inteiro
O número de bytes ou kilobytes publicados no stream. Os bytes são considerados "publicados" no fluxo quando passam pelo estágio $source , não quando ele passa por todo o pipeline.
stats.outputMessageCount
inteiro
O número de documentos processados pelo fluxo. Um documento é considerado "processado" pelo fluxo quando passa por todo o pipeline.
stats.outputMessageSize
inteiro
O número de bytes ou kilobytes processados pelo stream. Os bytes são considerados "processados" pelo fluxo quando passam por todo o pipeline.
stats.dlqMessageCount
inteiro
O número de documento enviados para a fila de mensagens não entregues (DLQ).
stats.dlqMessageSize
inteiro
O número de bytes ou kilobytes enviados para a fila de mensagens não entregues (DLQ).
stats.stateSize
inteiro
O número de bytes usados pelo Windows para armazenar o estado do processador.
stats.watermark
inteiro
O carimbo de data/hora da marca d'Água atual.
stats.operatorStats
array

As estatísticas de cada operador no pipeline do processador. O Atlas Stream Processing retorna este campo somente se você passar a opção verbose .

stats.operatorStats fornece versões por operador de muitos campos principais do stats :

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

Além disso, o stats.operatorStats inclui os seguintes campos únicos:

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
inteiro
O uso máximo de memória do operador em bytes ou kilobytes.
stats.operatorStats.executionTime
inteiro
O tempo total de execução do operador em segundos.
stats.kafkaPartitions
array
Informações offset para um Apache Kafka partiçõeskafkaPartitions do corretor. se aplica somente a conexões que usam um Apache Kafka fonte.
stats.kafkaPartitions.partition
inteiro
O Apache Kafka número da partição do tópico.
stats.kafkaPartitions.currentOffset
inteiro
O deslocamento em que o processador de fluxo está ativado para a partição especificada. Esse valor é igual ao offset anterior que o processador de stream processou mais 1.
stats.kafkaPartitions.checkpointOffset
inteiro
O deslocamento que o processador de fluxo confirmou pela última vez com o Apache Kafka corretor e o ponto de verificação da partição especificada. Todas as mensagens através deste deslocamento são registradas no último ponto de verificação.

Por exemplo, o seguinte mostra o status de um processador de Atlas Stream Processing denominado proc01 em uma instância de Atlas Stream Processing denominada inst01 com tamanhos de item exibidos em KB:

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}
← Configurar uma instância