$merge
Nesta página
Definição
A fase $merge especifica uma conexão no registro de conexão para gravar mensagens. A conexão deve ser uma conexão do Atlas.
Um estágio de pipeline do $merge
tem a seguinte forma de protótipo:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
Sintaxe
A versão Atlas Stream Processing do $merge usa a maioria dos mesmos campos que a versão Atlas Data Federation. No entanto, como o Atlas Stream Processing só suporta a fusão em uma conexão Atlas, a sintaxe de into
é simplificada. Para obter mais informações, consulte esta descrição dos campos do Atlas Data Federation $merge
.
Comportamento
$merge
deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $merge
por pipeline.
O campo on
tem requisitos especiais para $merge
em relação a coleções fragmentadas. Para saber mais, consulte Sintaxe $merge.
Você pode usar uma expressão dinâmica como o valor dos seguintes campos:
into.db
into.coll
Isso permite que seu processador de fluxo grave mensagens em diferentes collection de destino do Atlas, mensagem por mensagem.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um reconhecimento de data center e collection Atlas distintos, você pode escrever o seguinte estágio $merge
:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
Este estágio $merge
:
Escreve a mensagem
Very Important Industries
para uma collection Atlas denominadaVIP.subscription
.Escreve a mensagem
N. E. Buddy
para uma collection Atlas denominadaemployee.requisition
.Escreve a mensagem
Khan Traktor
para uma collection Atlas denominadacontractor.billableHours
.
Você só pode usar expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um banco de dados ou coleção com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a fila de mensagens não entregues, se configurada, e processará as mensagens subsequentes. Se não houver nenhuma fila de mensagens não entregues configurada, o Atlas Stream Processing ignorará a mensagem completamente e processará as mensagens subsequentes.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados de vários locais, em conformidade com o esquema do conjunto de dados meteorológicos de amostra. A seguinte agregação tem três estágios:
O estágio estabelece
$source
uma conexão com o broker do Apache Kafka que coleta esses relatórios em um tópico chamadomy_weatherdata
, expondo cada registro à medida que ele é ingerido aos estágios de agregação posteriores. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o comoingestionTime
.O estágio
$match
exclui documentos que têm umdewPoint.value
menor ou igual a5.0
e passa os documentos comdewPoint.value
maior que5.0
para o próximo estágio.O estágio
$merge
grava a saída na coleção do Atlas chamadastream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.stream
resultante, conecte-se ao cluster Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Observação
O exemplo anterior é representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.