Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$source

項目一覧

  • 定義
  • 構文
  • Apache Kafka ブロック
  • MongoDB コレクションの変更ストリーム
  • MongoDB Database Change Stream
  • MongoDB クラスター全体の変更ストリームソース
  • ドキュメント配列
  • 動作
$source

$sourceステージでは、データをストリーミングするための接続レジストリで接続を指定します。 次の接続タイプがサポートされています。

注意

Atlas サーバーレスインスタンスを$sourceとして使用することはできません。

Apache Kafka からのストリーミング データを操作するには エージェントとして、$source ステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

必須

データを取り込む接続レジストリ内の接続を識別するラベル。

topic

文字列または複数の文字列の配列

必須

メッセージをストリーミング配信する 1 つ以上の Apache Kafka トピックの名前。複数のトピックからのメッセージをストリーミングする場合は、配列で指定します。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName

string

任意

$source によってプロジェクションされるタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインの $source ステージは、ドキュメントに割り当てられたタイムスタンプを持つ_tsというフィールドをプロジェクションします。 ストリーミング データのソースでは、各メッセージのタイムスタンプを保存するために_tsという名前のフィールドも使用される場合があります。 これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、 tsFieldNameを使用して_tsという名前のソース提供フィールドの名前を変更します。

partitionIdleTimeout

ドキュメント

任意

証明機関の計算で無視される前に、パーティションがアイドル状態になることを許可する時間を指定するドキュメント。

partitionIdleTimeout.size

integer

任意

パーティションのアイドル タイムアウトの期間を指定する数値。

partitionIdleTimeout.unit

string

任意

パーティション アイドル タイムアウトの期間の単位。

unitの値は次のいずれかになります。

  • "ms" (ミリ秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.auto_offset_reset

string

任意

Apache Kafka のど のイベントを指定するか 取り込みを開始するソース トピック。auto_offset_resetは次の値を取ります。

  • endlatest 、またはlargest : 集計が初期化されたときに、トピックの最新のイベントから取り込みを開始します。

  • earliestbeginning 、またはsmallest : トピック内の最も近いイベントから取り込みを開始します。

デフォルトは latest です。

config.group_id

string

任意

ストリーム プロセッサに関連付ける Kafka コンシューマー グループの ID。 省略した場合、Atlas Stream Processing は、ストリーム プロセシング インスタンスを次の形式の自動生成された ID に関連付けます。

asp-${streamProcessorId}-consumer

Atlas Stream Processingはパーティション オフセットを にコミットします チェックポイントがApacheKafka IDされた後、指定されたコンします。オフセットを超えるメッセージがチェックポイントに永続的に記録されると、オフセットをコミットします。 これにより、ストリーム プロセッサのオフセット ラグと進行状況を Kafka プロバイダー コンシューマー グループのメタデータから直接追跡できます。

config.keyFormat

string

任意

Apache Kafka を逆直列化するために使用されるデータ型 キー データ。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトは binData です。

config.keyFormatError

string

任意

Apache Kafka の逆直列化中に発生したエラーの処理方法 キー データ。次のいずれかの値である必要があります。

注意

Atlas Stream Processing では、ソース データ ストリーム内のドキュメントが有効なjsonまたはejsonである必要があります。 Atlas Stream Processing は、この要件を満たさないドキュメントをデッド レター キューに設定します(デッド レター キューを設定している場合)。

Atlas コレクションの変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName

string

任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

db

string

必須

connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。

coll

文字列または複数の文字列の配列

必須

によって指定された Atlasインスタンスでホストされている 1 つ以上のMongoDBコレクションの名前。これらのコレクションの変更ストリームは、ストリーミングデータソースとして機能します。このフィールドを省略すると、ストリームconnectionName プロセッサはMongoDB Database Change Stream を使用します。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。

Atlas データベース変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName

string

任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

db

string

必須

connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。

Atlas クラスター変更ストリーム全体からのストリーミングデータを操作するには、$source ステージのプロトタイプ形式は次のようになります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

connectionName

string

条件付き

データを取り込む接続レジストリ内の接続を識別するラベル。

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName

string

任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

config

ドキュメント

任意

のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。

config.startAfter

token

条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime

タイムスタンプ

条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.fullDocument

string

条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly

ブール値

条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange

string

任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline

ドキュメント

任意

元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。

ドキュメントの配列を操作するために、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明

timeField

ドキュメント

任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName

string

任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

documents

配列

条件付き

ストリーミング データソースとして使用するドキュメントの配列。 このフィールドの値は、オブジェクトの配列、またはオブジェクトの配列として評価される 式 のいずれかになります。 connectionNameフィールドを使用する場合は、このフィールドを使用しないでください。

$sourceは、表示されるすべてのパイプラインの最初のステージである必要があります。 パイプラインごとに使用できる$sourceステージは 1 つだけです。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTimeに設定されます。

  2. $matchステージでは、dewPoint.value 5.0が 以下のドキュメントを除外し、 がdewPoint.value 5.0より大きいドキュメントを次のステージに渡します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

集計パイプライン