$emit
定義
$emit
ステージは、メッセージを出力する接続レジストリで接続を指定します。 接続は Apache Kafka のいずれかである必要があります。 プロバイダーまたは 時系列コレクション。
構文
Apache Kafka ブロック
処理されたデータを Apache Kafka に書き込む$emit
プロバイダーは、次のプロトタイプ形式を持つ パイプライン ステージを使用します。
{ "$emit": { "connectionName": "<registered-connection>", "topic" : "<target-topic>" | <expression>, "config": { "headers": "<expression>", "key": "<key-string>" | { key-document }, "keyFormat": "<deserialization-type>", "outputFormat": "<json-format>" } } }
$emit
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 | |||||
---|---|---|---|---|---|---|---|---|
| string | 必須 | データを取り込む接続の名前(接続レジストリに表示されます)。 | |||||
| string |式 | 必須 | Apache Kafka の名前 メッセージを送信するトピック。 | |||||
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 | |||||
| 整数 | 任意 | ||||||
| string | 任意 | プロデューサーによって生成されたすべてのデータの圧縮タイプ。デフォルトは なし(圧縮なし)です。有効な値は以下のとおりです。
圧縮はデータの完全なバッチに使用されるため、バッチ処理の有効性は圧縮比率に影響します。バッチする数を増やすと、圧縮がより優れたものになります。 | |||||
| 式 | 任意 | 出力メッセージに追加する ヘッダー 。 式は、オブジェクトまたは配列のいずれかに評価される必要があります。 式がオブジェクトと評価される場合、Atlas Stream Processing は、そのオブジェクト内の各キーと値のペアからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。 式が配列と評価される場合は、キーと値のペア オブジェクトの配列の形式になる必要があります。 例:
Atlas Stream Processing は、 配列内の各オブジェクトからヘッダーを構築します。キーはヘッダー名、値はヘッダー値です。 Atlas Stream Processing は、次の型のヘッダー値をサポートしています。
| |||||
| オブジェクト | string | 任意 | Apache Kafka として評価される 式 メッセージ キー。
| |||||
| string | 条件付き | Apache Kafka を逆直列化するために使用されるデータ型 キー データ。次のいずれかの値である必要があります。
デフォルトは | |||||
| string | 任意 | Apache Kafka にメッセージを送信するときに使用する JSON 形式 。次のいずれかの値である必要があります。
デフォルトは |
Atlas 時系列コレクション
処理されたデータを Atlas 時系列コレクションに書き込むには、次のプロトタイプ形式で$emit
パイプライン ステージを使用します。
{ "$emit": { "connectionName": "<registered-connection>", "db" : "<target-db>", "coll" : "<target-coll>", "timeseries" : { <options> } } }
$emit
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 必須 | データを取り込む接続の名前(接続レジストリに表示されます)。 |
| string | 必須 | ターゲット 時系列コレクションを含む Atlas database の名前。 |
| string | 必須 | 書き込み先の Atlas 時系列コレクションの名前。 |
| ドキュメント | 必須 | コレクションの時系列フィールドを定義するドキュメント。 |
注意
時系列コレクション内のドキュメントの最大サイズは4 MB です。 詳細については「時系列コレクションの制限 」を参照してください。
動作
$emit
は、表示されるすべてのパイプラインの 最後のステージ である必要があります。 パイプラインごとに使用できる$emit
ステージは 1 つだけです。
ストリーム プロセッサごとに 1 つの Atlas 時系列コレクションにのみ書込み (write) ができます。 存在しないコレクションを指定した場合、Atlas は指定した時系列フィールドでコレクションを作成します。 既存のデータベースを指定する必要があります。
ストリーム プロセッサが別のターゲット Apache Kafka に書き込むようにするために、 フィールドの値として 動的式topic
を使用できます。 メッセージごとにトピックを作成します。式は string として評価される必要があります。
例
次の形式のメッセージを生成するトランザクション イベントのストリームがあります。
{ "customer": "Very Important Industries", "customerStatus": "VIP", "tenantId": 1, "transactionType": "subscription" } { "customer": "N. E. Buddy", "customerStatus": "employee", "tenantId": 5, "transactionType": "requisition" } { "customer": "Khan Traktor", "customerStatus": "contractor", "tenantId": 11, "transactionType": "billableHours" }
これらをそれぞれ個別の Apache Kafka に並べ替えるには、 トピックには、次の$emit
ステージを記述できます。
$emit: { connectionName: "kafka1", topic: "$customerStatus" }
この$emit
ステージ:
Very Important Industries
メッセージをVIP
という名前のトピックに書き込みます。N. E. Buddy
メッセージをemployee
という名前のトピックに書き込みます。Khan Traktor
メッセージをcontractor
という名前のトピックに書き込みます。
動的式の詳細については、「式演算子 」を参照してください。
まだ存在しないトピックを指定した場合、 Apache Kafka は、それを対象とする最初のメッセージを受信したときにトピックを自動的に作成します。
動的式でトピックを指定したが、Atlas Stream Processing が特定のメッセージの式を評価できない場合、Atlas Stream Processing はそのメッセージをデッド レター キューに送信し、後続のメッセージを処理します。 デッドレターキューが設定されていない場合、Atlas Stream Processing はメッセージを完全にスキップし、後続のメッセージを処理します。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、ingestionTime
に設定されます。$match
ステージでは、airTemperature.value
が30.0
以上であるドキュメントを除外し、airTemperature.value
が30.0
未満のドキュメントを次のステージに渡します。$emit
ステージは、weatherStreamOutput
Kafkaブローカー接続を介してstream
というトピックに出力を書き込みます。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'airTemperature.value': { '$lt': 30 } } }, { '$emit': { connectionName: 'weatherStreamOutput', topic: 'stream' } }
stream
トピックのドキュメントは以下の形式をとります:
{ "st":"x+34700+119500", "position": { "type": "Point", "coordinates": [122.8,116.1] }, "elevation": 9999, "callLetters": "6ZCM", "qualityControlProcess": "V020", "dataSource": "4", "type": "SAO", "airTemperature": { "value": 6.7, "quality": "9" }, "dewPoint": { "value": 14.1, "quality": "1" }, "pressure": { "value": 1022.2, "quality": "1" }, "wind": { "direction": { "angle": 200, "quality": "9" }, "type": "C", "speed": { "rate": 35, "quality": "1" } }, "visibility": { "distance": { "value": 700, "quality": "1" }, "variability": { "value": "N", "quality": "1" } }, "skyCondition": { "ceilingHeight": { "value": 1800, "quality": "9", "determination": "9" }, "cavok": "N" }, "sections": ["AA1","AG1","UG1","SA1","MW1"], "precipitationEstimatedObservation": { "discrepancy": "0", "estimatedWaterDepth": 999 }, "atmosphericPressureChange": { "tendency": { "code": "4", "quality": "1" }, "quantity3Hours": { "value": 3.8, "quality": "1" }, "quantity24Hours": { "value": 99.9, "quality": "9" } }, "seaSurfaceTemperature": { "value": 9.7, "quality": "9" }, "waveMeasurement": { "method": "M", "waves": { "period": 8, "height": 3, "quality": "9" }, "seaState": { "code": "00", "quality": "9" } }, "pastWeatherObservationManual": { "atmosphericCondition": { "value": "6", "quality": "1" }, "period": { "value": 3, "quality": "1" } }, "skyConditionObservation": { "totalCoverage": { "value": "02", "opaque": "99", "quality": "9" }, "lowestCloudCoverage": { "value": "00", "quality": "9" }, "lowCloudGenus": { "value": "00", "quality": "1" }, "lowestCloudBaseHeight":{ "value": 1750, "quality": "1" }, "midCloudGenus": { "value": "99", "quality": "1" }, "highCloudGenus": { "value": "00", "quality": "1" } }, "presentWeatherObservationManual": { "condition": "52", "quality": "1" }, "atmosphericPressureObservation": { "altimeterSetting": { "value": 1015.9, "quality": "9" }, "stationPressure": { "value": 1026, "quality": "1" } }, "skyCoverLayer": { "coverage": { "value": "08", "quality": "1" }, "baseHeight": { "value": 2700, "quality": "9" }, "cloudType": { "value": "99", "quality": "9" } }, "liquidPrecipitation": { "period": 12, "depth": 20, "condition": "9", "quality": "9" }, "extremeAirTemperature": { "period": 99.9, "code": "N", "value": -30.4, "quantity": "1" }, "ingestionTime":{ "$date":"2024-09-26T17:34:41.843Z" }, "_stream_meta":{ "source":{ "type": "kafka", "topic": "my_weatherdata", "partition": 0, "offset": 4285 } } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。