sp.createStreamProcessor()
Nesta página
Definição
Novidades na versão 7.0: Cria um Processador deStream na Instância de Processamento de Stream atual.
Compatibilidade
Este método é suportado em Instâncias de Atlas Stream Processing .
Sintaxe
O método sp.createStreamProcessor()
tem a seguinte sintaxe:
sp.createStreamProcessor( <name>, [ <pipeline> ], { <options> } )
Campos de comando
sp.createStreamProcessor()
usa estes campos:
Campo | Tipo | necessidade | Descrição |
---|---|---|---|
| string | Obrigatório | Nome lógico para o processador de fluxo. Isso deve ser exclusivo dentro da instância do Atlas Stream Processing. |
| array | Obrigatório | Transmita o pipeline de agregação que você deseja aplicar aos seus dados de streaming. |
| objeto | Opcional | objeto que define várias configurações opcionais para o processador de fluxo. |
| objeto | Condicional | Objeto que atribui uma dead letter queue (DLQ) para sua instância do Atlas Stream Processing . Este campo é necessário se você definir o campo |
| string | Condicional | Etiqueta que identifica uma conexão em seu registro de conexão. Esta conexão deve fazer referência a um cluster Atlas. Este campo é necessário se você definir o campo |
| string | Condicional | Nome de um reconhecimento de data center Atlas no cluster especificado em |
| string | Condicional | Nome de uma collection no reconhecimento de data center especificado no |
Comportamento
sp.createStreamProcessor()
cria um processador de fluxo persistente e nomeado na instância atual de processamento de fluxo. Você pode inicializar este processador de fluxo com sp.processor.start()
. Se você tentar criar um processador de stream com o mesmo nome de um processador de stream existente, mongosh
retornará um erro.
Controle de acesso
O usuário que executa sp.createStreamProcessor()
deve ter a função atlasAdmin
.
Exemplo
O exemplo a seguir cria um processador de fluxo denominado solarDemo
que ingere dados da conexão sample_stream_solar
. O processador exclui todos os documentos onde o valor do campo device_id
é device_8
, passando o restante para uma janela em cascata com uma duração 10segundos. Cada janela agrupa os documentos que recebe e, em seguida, retorna várias estatísticas úteis de cada grupo. Em seguida, o processador de fluxo mescla esses registros com solar_db.solar_coll
pela conexão mongodb1
.
sp.createStreamProcessor( 'solarDemo', [ { $source: { connectionName: 'sample_stream_solar', timeField: { $dateFromString: { dateString: '$timestamp' } } } }, { $match: { $expr: { $ne: [ "$device_id", "device_8" ] } } }, { $tumblingWindow: { interval: { size: NumberInt(10), unit: "second" }, "pipeline": [ { $group: { "_id": { "device_id": "$device_id" }, "max_temp": { $max: "$obs.temp" }, "max_watts": { $max: "$obs.watts" }, "min_watts": { $min: "$obs.watts" }, "avg_watts": { $avg: "$obs.watts" }, "median_watts": { $median: { input: "$obs.watts", method: "approximate" } } } } ] } }, { $merge: { into: { connectionName: "mongodb1", db: "solar_db", coll: "solar_coll" }, on: ["_id"] } } ] )