$meta
定義
$meta
式は、ドキュメントのすべてのストリーミング メタデータを含むオブジェクトを返します。このデータは、ストリーム全体、または以下の Atlas Stream Processing の集計ステージのいずれかに対して公開できます。
$meta
式には次のプロトタイプ形式があります。
{ "$meta": <string> }
{ source: { type: string, ts: date, source.topic: string source.partition: int source.offset: int source.key: string|int|long|double|object|binData source.headers: array[obj] }, window: { start: date, end: date }, https: { url: string method: string httpStatusCode: int responseTimeMs: int } }
構文
$meta
式は、メタデータのソースの完全修飾ドット構文パスに対応する単一の文字列入力を受け取ります。このパスのルートは "stream"
である必要があります。次のパスをクエリできます。
パス | タイプ | 説明 |
---|---|---|
| オブジェクト | |
| オブジェクト |
|
| string | ソースとして使用される接続のタイプ。 |
| ISODate | 取り込み時点でのレコードの日時。 |
| string | ストリームがレコードを取り込む Kafka トピック。Kafka ソースからのみ利用可能です。 |
| integer | ストリームがレコードを取り込む Kafka トピックのパーティション。Kafka ソースからのみ利用可能です。 |
| integer | Kafka ソース パーティション内のメッセージ順序とキューの位置をオフセット追跡する。Kafka ソースからのみ利用可能です。 |
| string|int|long|double|object|binData | パーティショニングと負荷分散のために Kafka メッセージに割り当てられたキー。Kafka ソースからのみ利用可能です。 |
| 配列 | Kafkaメッセージのメタデータを記述するキーと値のペアのセット。 |
| オブジェクト |
|
| ISODate | 現在のウィンドウの開始時刻。 |
| ISODate | 現在のウィンドウの終了時刻。 |
| オブジェクト | |
| string |
|
| string | URL に送信された HTTPS リクエスト メソッド。 |
| 整数 | URL に送信されたリクエストの HTTP 応答コード。 |
| 整数 | URL からの応答を受信するのにかかった時間(ミリ秒)。 |
動作
Atlas Stream Processing$meta
式は、既存のMongoDB$meta
集計式のすべての機能を提供します。ただし、標準のMongoDB集計クエリでは、$meta
の Atlas Stream Processing バージョンに固有の機能を使用することはできません。
例
次の例では、データが取り込まれた Kafka ソース トピックの配列を使用してストリームの出力を強化します。
{ $source: { connectionName: “kafka”, topic: [“t1”, “t2”, “t3”] } }, { $emit: { connectionName: “kafka”, topic: { $concat: [ { $meta: “stream.source.topic” }, “out" ] } } }
次の例では、各ウィンドウの開始時刻を報告するストリームにフィールドを追加します。
{ $source: { connectionName: "kafka", topic: "t1" } }, { $hoppingWindow: . . . }, { $addFields: { start: { $meta: "stream.window.start" } } }