Docs Menu

$meta

$meta 式は、ドキュメントのすべてのストリーミング メタデータを含むオブジェクトを返します。このデータは、ストリーム全体、または以下の Atlas Stream Processing の集計ステージのいずれかに対して公開できます。

$meta 式には次のプロトタイプ形式があります。

{ "$meta": <string> }
{
source: {
type: string,
ts: date,
source.topic: string
source.partition: int
source.offset: int
source.key: string|int|long|double|object|binData
source.headers: array[obj]
},
window: {
start: date,
end: date
},
https: {
url: string
method: string
httpStatusCode: int
responseTimeMs: int
}
}

$meta 式は、メタデータのソースの完全修飾ドット構文パスに対応する単一の文字列入力を受け取ります。このパスのルートは "stream" である必要があります。次のパスをクエリできます。

パス
タイプ
説明

stream

オブジェクト

$source ステージおよびすべてのウィンドウ ステージ、またはパイプラインに構成された $https ステージのすべてのメタデータ。

stream.source

オブジェクト

$source ステージのすべてのメタデータ。

stream.source.type

string

ソースとして使用される接続のタイプ。

stream.source.ts

ISODate

取り込み時点でのレコードの日時。

stream.source.topic

string

ストリームがレコードを取り込む Kafka トピック。Kafka ソースからのみ利用可能です。

stream.source.partition

integer

ストリームがレコードを取り込む Kafka トピックのパーティション。Kafka ソースからのみ利用可能です。

stream.source.offset

integer

Kafka ソース パーティション内のメッセージ順序とキューの位置をオフセット追跡する。Kafka ソースからのみ利用可能です。

stream.source.key

string|int|long|double|object|binData

パーティショニングと負荷分散のために Kafka メッセージに割り当てられたキー。Kafka ソースからのみ利用可能です。

stream.source.headers

配列

Kafkaメッセージのメタデータを記述するキーと値のペアのセット。

stream.window

オブジェクト

$hoppingWindow または $tumblingWindow ステージのすべてのメタデータ。このオブジェクトは、ドキュメントが $hoppingWindow ステージまたは $tumblingWindow ステージに入ったときにのみ設定されます。

stream.window.start

ISODate

現在のウィンドウの開始時刻。

stream.window.end

ISODate

現在のウィンドウの終了時刻。

stream.https

オブジェクト

$https ステージのすべてのメタデータ。このオブジェクトは、$https ステージがドキュメントを出力する場合にのみ設定されます。

stream.https.url

string

$https ステージがデータを取得する URL。

stream.https.method

string

URL に送信された HTTPS リクエスト メソッド。

stream.https.httpStatusCode

整数

URL に送信されたリクエストの HTTP 応答コード。

stream.https.responseTimeMs

整数

URL からの応答を受信するのにかかった時間(ミリ秒)。

Atlas Stream Processing$meta 式は、既存のMongoDB$meta 集計式のすべての機能を提供します。ただし、標準のMongoDB集計クエリでは、$meta の Atlas Stream Processing バージョンに固有の機能を使用することはできません。

次の例では、データが取り込まれた Kafka ソース トピックの配列を使用してストリームの出力を強化します。

{
$source: {
connectionName: “kafka”,
topic: [“t1”, “t2”, “t3”]
}
},
{
$emit: {
connectionName: “kafka”,
topic: {
$concat: [
{
$meta: “stream.source.topic”
},
“out"
]
}
}
}

次の例では、各ウィンドウの開始時刻を報告するストリームにフィールドを追加します。

{
$source: {
connectionName: "kafka",
topic: "t1"
}
},
{
$hoppingWindow: . . .
},
{
$addFields: {
start: { $meta: "stream.window.start" }
}
}