Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$emit

項目一覧

  • 定義
  • 構文
  • Apache Kafka ブロック
  • Atlas 時系列コレクション
  • 動作

$emitステージは、メッセージを出力する接続レジストリで接続を指定します。 接続は Apache Kafka のいずれかである必要があります。 プロバイダーまたは 時系列コレクション。

処理されたデータを Apache Kafka に書き込む$emit プロバイダーは、次のプロトタイプ形式を持つ パイプライン ステージを使用します。

{
"$emit": {
"connectionName": "<registered-connection>",
"topic" : "<target-topic>" | <expression>,
"config": {
"headers": "<expression>",
"key": "<key-string>" | { key-document },
"keyFormat": "<deserialization-type>",
"outputFormat": "<json-format>"
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続の名前(接続レジストリに表示されます)。

topic

string |式

必須

Apache Kafka の名前 メッセージを送信するトピック。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.acks

整数

任意

操作を成功させるためにApache$emit Kafkaクラスターから必要な確認応答の数。

デフォルト値は all です。 Atlas Stream Processing は次の値をサポートしています。

  • -1

  • 0

  • 1

  • all

config.compression_type

string

任意

プロデューサーによって生成されたすべてのデータの圧縮タイプ。デフォルトは なし(圧縮なし)です。有効な値は以下のとおりです。

  • none

  • gzip

  • snappy

  • lz4

  • zstd

圧縮はデータの完全なバッチに使用されるため、バッチ処理の有効性は圧縮比率に影響します。バッチする数を増やすと、圧縮がより優れたものになります。

config.headers

任意

出力メッセージに追加する ヘッダー 。 式は、オブジェクトまたは配列のいずれかに評価される必要があります。

式がオブジェクトと評価される場合、Atlas Stream Processing は、そのオブジェクト内の各キーと値のペアからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。

式が配列と評価される場合は、キーと値のペア オブジェクトの配列の形式になる必要があります。 例:

[
{k: "name1", v: ...},
{k: "name2", v: ...},
{k: "name3", v: ...}
]

Atlas Stream Processing は、 配列内の各オブジェクトからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。

Atlas Stream Processing は、次の型のヘッダー値をサポートしています。

  • binData

  • string

  • object

  • int

  • long

  • double

  • null

config.key

オブジェクト | string

任意

Apache Kafka として評価される 式 メッセージ キー。

config.keyを指定する場合は、 config.keyFormatを指定する必要があります。

config.keyFormat

string

条件付き

Apache Kafka を逆直列化するために使用されるデータ型 キー データ。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトはbinDataです。 config.keyを指定する場合は、 config.keyFormatを指定する必要があります。 ドキュメントのconfig.keyが指定されたデータ型に正常に逆シリアル化されない場合、Atlas Stream Processing はそれをデッド レター キューに送信します。

config.outputFormat

string

任意

Apache Kafka にメッセージを送信するときに使用する JSON 形式 。次のいずれかの値である必要があります。

  • "relaxedJson"

  • "canonicalJson"

デフォルトは "relaxedJson" です。

処理されたデータを Atlas 時系列コレクションに書き込むには、次のプロトタイプ形式で$emitパイプライン ステージを使用します。

{
"$emit": {
"connectionName": "<registered-connection>",
"db" : "<target-db>",
"coll" : "<target-coll>",
"timeseries" : {
<options>
}
}
}

$emitステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続の名前(接続レジストリに表示されます)。

db

string

必須

ターゲット 時系列コレクションを含む Atlas database の名前。

coll

string

必須

書き込み先の Atlas 時系列コレクションの名前。

timeseries

ドキュメント

必須

コレクションの時系列フィールドを定義するドキュメント。

注意

時系列コレクション内のドキュメントの最大サイズは4 MB です。 詳細については「時系列コレクションの制限 」を参照してください。

$emit は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$emitステージは 1 つだけです。

ストリーム プロセッサごとに 1 つの Atlas 時系列コレクションにのみ書込み (write) ができます。 存在しないコレクションを指定した場合、Atlas は指定した時系列フィールドでコレクションを作成します。 既存のデータベースを指定する必要があります。

ストリーム プロセッサが別のターゲット Apache Kafka に書き込むようにするために、 フィールドの値として 動的式topic を使用できます。 メッセージごとにトピックを作成します。式は string として評価される必要があります。

次の形式のメッセージを生成するトランザクション イベントのストリームがあります。

{
"customer": "Very Important Industries",
"customerStatus": "VIP",
"tenantId": 1,
"transactionType": "subscription"
}
{
"customer": "N. E. Buddy",
"customerStatus": "employee",
"tenantId": 5,
"transactionType": "requisition"
}
{
"customer": "Khan Traktor",
"customerStatus": "contractor",
"tenantId": 11,
"transactionType": "billableHours"
}

これらをそれぞれ個別の Apache Kafka に並べ替えるには、 トピックには、次の$emit ステージを記述できます。

$emit: {
connectionName: "kafka1",
topic: "$customerStatus"
}

この$emitステージ:

  • Very Important IndustriesメッセージをVIPという名前のトピックに書き込みます。

  • N. E. Buddyメッセージをemployeeという名前のトピックに書き込みます。

  • Khan Traktorメッセージをcontractorという名前のトピックに書き込みます。

動的式の詳細については、「式演算子 」を参照してください。

まだ存在しないトピックを指定した場合、 Apache Kafka は、それを対象とする最初のメッセージを受信したときにトピックを自動的に作成します。

動的式でトピックを指定したが、Atlas Stream Processing が特定のメッセージの式を評価できない場合、Atlas Stream Processing はそのメッセージをデッド レター キューに送信し、後続のメッセージを処理します。 デッドレターキューが設定されていない場合、Atlas Stream Processing はメッセージを完全にスキップし、後続のメッセージを処理します。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、 ingestionTimeに設定されます。

  2. $match ステージでは、airTemperature.value30.0 以上であるドキュメントを除外し、airTemperature.value30.0 未満のドキュメントを次のステージに渡します。

  3. $emit ステージは、weatherStreamOutput Kafkaブローカー接続を介してstream というトピックに出力を書き込みます。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'airTemperature.value': { '$lt': 30 } } },
{
'$emit': {
connectionName: 'weatherStreamOutput',
topic: 'stream'
}
}

stream トピックのドキュメントは以下の形式をとります:

{
"st":"x+34700+119500",
"position": {
"type": "Point",
"coordinates": [122.8,116.1]
},
"elevation": 9999,
"callLetters": "6ZCM",
"qualityControlProcess": "V020",
"dataSource": "4",
"type": "SAO",
"airTemperature": {
"value": 6.7,
"quality": "9"
},
"dewPoint": {
"value": 14.1,
"quality": "1"
},
"pressure": {
"value": 1022.2,
"quality": "1"
},
"wind": {
"direction": {
"angle": 200,
"quality": "9"
},
"type": "C",
"speed": {
"rate": 35,
"quality": "1"
}
},
"visibility": {
"distance": {
"value": 700,
"quality": "1"
},
"variability": {
"value": "N",
"quality": "1"
}
},
"skyCondition": {
"ceilingHeight": {
"value": 1800,
"quality": "9",
"determination": "9"
},
"cavok": "N"
},
"sections": ["AA1","AG1","UG1","SA1","MW1"],
"precipitationEstimatedObservation": {
"discrepancy": "0",
"estimatedWaterDepth": 999
},
"atmosphericPressureChange": {
"tendency": {
"code": "4",
"quality": "1"
},
"quantity3Hours": {
"value": 3.8,
"quality": "1"
},
"quantity24Hours": {
"value": 99.9,
"quality": "9"
}
},
"seaSurfaceTemperature": {
"value": 9.7,
"quality": "9"
},
"waveMeasurement": {
"method": "M",
"waves": {
"period": 8,
"height": 3,
"quality": "9"
},
"seaState": {
"code": "00",
"quality": "9"
}
},
"pastWeatherObservationManual": {
"atmosphericCondition": {
"value": "6",
"quality": "1"
},
"period": {
"value": 3,
"quality": "1"
}
},
"skyConditionObservation": {
"totalCoverage": {
"value": "02",
"opaque": "99",
"quality": "9"
},
"lowestCloudCoverage": {
"value": "00",
"quality": "9"
},
"lowCloudGenus": {
"value": "00",
"quality": "1"
},
"lowestCloudBaseHeight":{
"value": 1750,
"quality": "1"
},
"midCloudGenus": {
"value": "99",
"quality": "1"
},
"highCloudGenus": {
"value": "00",
"quality": "1"
}
},
"presentWeatherObservationManual": {
"condition": "52",
"quality": "1"
},
"atmosphericPressureObservation": {
"altimeterSetting": {
"value": 1015.9,
"quality": "9"
},
"stationPressure": {
"value": 1026,
"quality": "1"
}
},
"skyCoverLayer": {
"coverage": {
"value": "08",
"quality": "1"
},
"baseHeight": {
"value": 2700,
"quality": "9"
},
"cloudType": {
"value": "99",
"quality": "9"
}
},
"liquidPrecipitation": {
"period": 12,
"depth": 20,
"condition": "9",
"quality": "9"
},
"extremeAirTemperature": {
"period": 99.9,
"code": "N",
"value": -30.4,
"quantity": "1"
},
"ingestionTime":{
"$date":"2024-09-26T17:34:41.843Z"
},
"_stream_meta":{
"source":{
"type": "kafka",
"topic": "my_weatherdata",
"partition": 0,
"offset": 4285
}
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

$tumblingWindow