Menu Docs
Página inicial do Docs
/
MongoDB Atlas
/ /

$merge

Nesta página

  • Definição
  • Sintaxe
  • Comportamento
  • Exemplos

O estágio $merge especifica uma conexão noRegistro de conexão para escrever mensagens. A conexão deve ser uma conexão Atlas.

Um estágio de pipeline do $merge tem a seguinte forma de protótipo:

{
"$merge": {
"into": {
"connectionName": "<registered-atlas-connection>",
"db": "<registered-database-name>" | <expression>,
"coll": "<atlas-collection-name>" | <expression>
},
"on": "<identifier field>" | [ "<identifier field1>", ...],
"let": {
<var_1>: <expression>,
<var_2>: <expression>,
,
<var_n>: <expression>
},
"whenMatched": "replace | keepExisting | merge",
"whenNotMatched": "insert | discard"
}
}

A versão de $merge do Atlas Stream Processing usa a maioria dos mesmos campos que a versão do Atlas Data Federation. No entanto, como o Atlas Stream Processing só suporta mesclagem em uma conexão Atlas, a sintaxe de into é simplificada. Para mais informações, consulte esta descrição dos campos do Atlas Data Federation $merge .

$merge deve ser o último estágio de qualquer pipeline em que apareça. Você pode usar apenas um estágio $merge por pipeline.

O campo on tem requisitos especiais para $merge contra collections fragmentadas. Para saber mais, consulte Sintaxe $merge.

Você pode utilizar uma expressão dinâmica como o valor dos seguintes campos:

  • into.db

  • into.coll

Isso permite que seu processador de fluxo grave mensagens em diferentes collection de destino do Atlas, mensagem por mensagem.

Exemplo

Você tem um fluxo de eventos de transação que gera mensagens da seguinte forma:

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

Para classificar cada um deles em um reconhecimento de data center e collection Atlas distintos, você pode escrever o seguinte estágio $merge :

$merge: {
into: {
connectionName: "db1",
db: "$customerStatus",
coll: "$transactionType"
}

Este estágio $merge :

  • Escreve a mensagem Very Important Industries para uma collection Atlas denominada VIP.subscription.

  • Escreve a mensagem N. E. Buddy para uma collection Atlas denominada employee.requisition.

  • Escreve a mensagem Khan Traktor para uma collection Atlas denominada contractor.billableHours.

Você só pode usar expressões dinâmicas que avaliam para strings. Para obter mais informações sobre expressões dinâmicas, consulte operadores de expressão.

Se você especificar um banco de dados ou uma collection com uma expressão dinâmica, mas o Atlas Stream Processing não puder avaliar a expressão para uma determinada mensagem, o Atlas Stream Processing enviará essa mensagem para a dead letter queue (DLQ) se estiver configurado e processará as mensagens subsequentes. Se não houver nenhuma fila de letras mortas configurada, o Atlas Stream Processing ignorará a mensagem completamente e processará as mensagens subsequentes.

Uma fonte de dados de streaming gera relatórios meteorológicos detalhados a partir de vários locais, em conformidade com o esquema do Conjunto de dados meteorológicos de amostra. A seguinte aggregation tem três estágios:

  1. O estágio estabelece uma $source conexão com o Apache Kafka o corretor coleta esses relatórios em um my_weatherdata tópico chamado , expondo cada registro à medida que ele é ingerido para os estágios de agregação subsequentes. Esse estágio também substitui o nome do campo de carimbo de data/hora que ele projeta, definindo-o ingestionTime como .

  2. O estágio $match exclui documentos que tenham um dewPoint.value menor ou igual a 5.0 e passa os documentos com dewPoint.value maior que 5.0 para o próximo estágio.

  3. O estágio $merge escreve o resultado em uma collection do Atlas chamada stream no banco de dados sample_weatherstream . Se não existir tal banco de dados ou coleção, o Atlas os criará.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

Para visualizar os documentos na coleção sample_weatherstream.stream resultante, conecte-se ao cluster do Atlas e execute o seguinte comando:

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

Observação

O anterior é um exemplo representativo. Os dados de streaming não são estáticos e cada usuário vê documentos distintos.

Voltar

$emit