$merge
Nesta página
Definição
O estágio $merge especifica uma conexão noRegistro de conexão para escrever mensagens. A conexão deve ser uma conexão Atlas.
Um estágio de pipeline do $merge
tem a seguinte forma de protótipo:
{ "$merge": { "into": { "connectionName": "<registered-atlas-connection>", "db": "<registered-database-name>" | <expression>, "coll": "<atlas-collection-name>" | <expression> }, "on": "<identifier field>" | [ "<identifier field1>", ...], "let": { <var_1>: <expression>, <var_2>: <expression>, …, <var_n>: <expression> }, "whenMatched": "replace | keepExisting | merge", "whenNotMatched": "insert | discard" } }
Sintaxe
A versão de $merge do Atlas Stream Processing usa a maioria dos mesmos campos que a versão do Atlas Data Federation. No entanto, como o Atlas Stream Processing só suporta mesclagem em uma conexão Atlas, a sintaxe de into
é simplificada. Para mais informações, consulte esta descrição dos campos do Atlas Data Federation $merge
.
Comportamento
$merge
deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $merge
por pipeline.
O campo on
tem requisitos especiais para $merge
contra collections fragmentadas. Para saber mais, consulte Sintaxe $merge.
Você pode utilizar uma expressão dinâmica como o valor dos seguintes campos:
into.db
into.coll
Isso permite que seu processador de fluxo grave mensagens em diferentes collection de destino do Atlas, mensagem por mensagem.
Exemplo
Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
Para classificar cada um deles em um reconhecimento de data center e collection Atlas distintos, você pode escrever o seguinte estágio $merge
:
$merge: { into: { connectionName: "db1", db: "$customerStatus", coll: "$transactionType" }
Este estágio $merge
:
Escreve a mensagem
Very Important Industries
para uma collection Atlas denominadaVIP.subscription
.Escreve a mensagem
N. E. Buddy
para uma collection Atlas denominadaemployee.requisition
.Escreve a mensagem
Khan Traktor
para uma collection Atlas denominadacontractor.billableHours
.
Você só pode usar expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.
Se você especificar um banco de dados ou uma collection com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a dead letter queue (DLQ) se estiver configurado e processará as mensagens subsequentes. Se não houver nenhuma fila de letras mortas configurada, o Atlas Stream Processing ignorará a mensagem completamente e processará as mensagens subsequentes.
Exemplos
Uma fonte de dados de streaming gera relatórios meteorológicos detalhados a partir de vários locais, em conformidade com o esquema do Conjunto de dados meteorológicos de amostra. A seguinte aggregation tem três estágios:
O estágio estabelece uma
$source
conexão com o Apache Kafka o corretor coleta esses relatórios em ummy_weatherdata
tópico chamado , expondo cada registro à medida que ele é ingerido para os estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-oingestionTime
como .O estágio
$match
exclui documentos que tenham umdewPoint.value
menor ou igual a5.0
e passa os documentos comdewPoint.value
maior que5.0
para o próximo estágio.O estágio
$merge
escreve o resultado em uma collection do Atlas chamadastream
no banco de dadossample_weatherstream
. Se não existir tal banco de dados ou coleção, o Atlas os criará.
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
Para visualizar os documentos na coleção sample_weatherstream.stream
resultante, conecte-se ao cluster do Atlas e execute o seguinte comando:
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
Observação
O anterior é um exemplo representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.