Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$source

項目一覧

  • 定義
  • 構文
  • Apache Kafka ブロック
  • MongoDB コレクションの変更ストリーム
  • MongoDB Database Change Stream
  • MongoDB クラスター全体の変更ストリームソース
  • ドキュメント配列
  • 動作
$source

$sourceステージでは、データをストリーミングするための接続レジストリで接続を指定します。 次の接続タイプがサポートされています。

注意

Atlas サーバーレスインスタンスを$sourceとして使用することはできません。

Apache Kafka からのストリーミング データを操作するには エージェントとして、$source ステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"topic" : ["<source-topic>", ...],
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"partitionIdleTimeout": {
"size": <duration-number>,
"unit": "<duration-unit>"
},
"config": {
"auto_offset_reset": "<start-event>",
"group_id": "<group-id>",
"keyFormat": "<deserialization-type>",
"keyFormatError": "<error-handling>"
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明
connectionName
string
必須
データを取り込む接続レジストリ内の接続を識別するラベル。
topic
文字列または複数の文字列の配列
必須
1つ以上のApache Kafka の名前メッセージをストリーミングするトピック。複数のトピックからのメッセージをストリーミングする場合は、配列で指定します。
timeField
ドキュメント
任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName
string
任意

$source によってプロジェクションされるタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインの $source ステージは、ドキュメントに割り当てられたタイムスタンプを持つ_tsというフィールドをプロジェクションします。 ストリーミング データのソースでは、各メッセージのタイムスタンプを保存するために_tsという名前のフィールドも使用される場合があります。 これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、 tsFieldNameを使用して_tsという名前のソース提供フィールドの名前を変更します。

partitionIdleTimeout
ドキュメント
任意
証明機関の計算で無視される前に、パーティションがアイドル状態になることを許可する時間を指定するドキュメント。
partitionIdleTimeout.size
integer
任意
パーティションのアイドル タイムアウトの期間を指定する数値。
partitionIdleTimeout.unit
string
任意

パーティション アイドル タイムアウトの期間の単位。

unitの値は次のいずれかになります。

  • "ms" (ミリ秒)

  • "second"

  • "minute"

  • "hour"

  • "day"

config
ドキュメント
任意
のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。
config.auto_offset_reset
string
任意

Apache Kafka のど のイベントを指定するか 取り込みを開始するソース トピック。auto_offset_resetは次の値を取ります。

  • endlatest 、またはlargest : 集計が初期化されたときに、トピックの最新のイベントから取り込みを開始します。

  • earliestbeginning 、またはsmallest : トピック内の最も近いイベントから取り込みを開始します。

デフォルトは latest です。

config.group_id
string
任意

ストリーム プロセッサに関連付ける Kafka コンシューマー グループの ID。 省略した場合、Atlas Stream Processing は、ストリーム プロセシング インスタンスを次の形式の自動生成された ID に関連付けます。

asp-${streamProcessorId}-consumer

Atlas Stream Processingはパーティション オフセットを にコミットします チェックポイントがApacheKafka IDされた後、指定されたコンします。オフセットを超えるメッセージがチェックポイントに永続的に記録されると、オフセットをコミットします。 これにより、ストリーム プロセッサのオフセット ラグと進行状況を Kafka プロバイダー コンシューマー グループのメタデータから直接追跡できます。

config.keyFormat
string
任意

Apache Kafka を逆直列化するために使用されるデータ型 キー データ。次のいずれかの値である必要があります。

  • "binData"

  • "string"

  • "json"

  • "int"

  • "long"

デフォルトは binData です。

config.keyFormatError
string
任意

Apache Kafka の逆直列化中に発生したエラーの処理方法 キー データ。次のいずれかの値である必要があります。

注意

Atlas Stream Processing では、ソース データ ストリーム内のドキュメントが有効なjsonまたはejsonである必要があります。 Atlas Stream Processing は、この要件を満たさないドキュメントをデッド レター キューに設定します(デッド レター キューを設定している場合)。

Atlas コレクションの変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"coll" : ["<source-coll>",...],
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明
connectionName
string
条件付き
データを取り込む接続レジストリ内の接続を識別するラベル。
timeField
ドキュメント
任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName
string
任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

db
string
必須
connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。
coll
文字列または複数の文字列の配列
必須
connectionNameによって指定された Atlas インスタンスでホストされている 1 つ以上の MongoDB コレクションの名前。 これらのコレクションの変更ストリームは、ストリーミング データソースとして機能します。 このフィールドを省略すると、ストリーム プロセッサはMongoDB Database Change Stream を使用します。
config
ドキュメント
任意
のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。
config.startAfter
token
条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime
タイムスタンプ
条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.fullDocument
string
条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly
ブール値
条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange
string
任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline
ドキュメント
任意
元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。

Atlas データベース変更ストリームからのストリーミング データを操作する場合、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"db" : "<source-db>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明
connectionName
string
条件付き
データを取り込む接続レジストリ内の接続を識別するラベル。
timeField
ドキュメント
任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName
string
任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

db
string
必須
connectionNameによって指定された Atlas インスタンスでホストされている MongoDB database の名前。 このデータベースの変更ストリームは、ストリーミング データソースとして機能します。
config
ドキュメント
任意
のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。
config.startAfter
token
条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime
タイムスタンプ
条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.fullDocument
string
条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly
ブール値
条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange
string
任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline
ドキュメント
任意
元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。

Atlas クラスター変更ストリーム全体からのストリーミングデータを操作するには、$source ステージのプロトタイプ形式は次のようになります。

{
"$source": {
"connectionName": "<registered-connection>",
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"config": {
"startAfter": <start-token> | "startAtOperationTime": <timestamp>,
"fullDocument": "<full-doc-condition>",
"fullDocumentOnly": <boolean>,
"fullDocumentBeforeChange": "<before-change-condition>",
"pipeline": [{
"<aggregation-stage>" : {
<stage-input>,
. . .
},
. . .
}]
},
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明
connectionName
string
条件付き
データを取り込む接続レジストリ内の接続を識別するラベル。
timeField
ドキュメント
任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName
string
任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

config
ドキュメント
任意
のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。
config.startAfter
token
条件付き

ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.startAtOperationTime
タイムスタンプ
条件付き

ソースがレポートを開始するoptime 。

config.startAfterまたはconfig.StartAtOperationTimeのいずれか 1 つだけを使用できます。

config.fullDocument
string
条件付き

変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。

  • updateLookup : 更新時の変更のみを返します。

  • required : 完全なドキュメントを返す必要があります。 完全なドキュメントが利用できない場合、 は何も返しません。

  • whenAvailable : 完全なドキュメントが利用可能になるたびに完全なドキュメントを返し、そうでない場合は変更を返します。

fullDocument の値を指定しない場合、デフォルトはupdateLookupになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentOnly
ブール値
条件付き

すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 fullDocumentの内容のみを返すかを制御する設定。 trueに設定されている場合、ソースはfullDocumentの内容のみを返します。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.fullDocumentBeforeChange
string
任意

変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。

  • off : fullDocumentBeforeChangeフィールドを省略します。

  • required : 状態が変更される前に、完全なドキュメントを返す必要があります。 状態が変更される前の完全なドキュメントが利用できない場合、ストリーム プロセッサは失敗します。

  • whenAvailable : 使用可能な場合は常に、変更前の状態で完全なドキュメントを返します。それ以外の場合は、 fullDocumentBeforeChangeフィールドを省略します。

fullDocumentBeforeChangeの値を指定しない場合、デフォルトはoffになります。

データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。

config.pipeline
ドキュメント
任意
元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。

ドキュメントの配列を操作するために、 $sourceステージには次のプロトタイプ形式があります。

{
"$source": {
"timeField": {
$toDate | $dateFromString: <expression>
},
"tsFieldName": "<timestamp>",
"documents" : [{source-doc},...] | <expression>
}
}

$sourceステージは、次のフィールドを持つドキュメントを取得します。

フィールド
タイプ
必要性
説明
timeField
ドキュメント
任意

受信メッセージの権限のあるタイムスタンプを定義するドキュメント。

timeFieldを使用する場合は、次のいずれかとして定義する必要があります。

  • ソース メッセージ フィールドを引数として$toDate

  • ソース メッセージ フィールドを引数として$dateFromString式。

timeFieldを宣言しない場合、Atlas Stream Processing は、ソースによって提供されたメッセージ タイムスタンプからタイムスタンプを作成します。

tsFieldName
string
任意

ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。

Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに _ts というフィールドを内部的に追加します。ストリーミングデータのソースでは、_ts というフィールドを使用して各メッセージのタイムスタンプを格納することもできます。これらのフィールド間の競合を防ぐには、追加の処理が行われる前に、tsFieldName を使用して、ソース提供のフィールド前を _ts に変更します。

documents
配列
条件付き
ストリーミング データソースとして使用するドキュメントの配列。 このフィールドの値は、オブジェクトの配列、またはオブジェクトの配列として評価される 式 のいずれかになります。 connectionNameフィールドを使用する場合は、このフィールドを使用しないでください。

$sourceは、表示されるすべてのパイプラインの最初のステージである必要があります。 パイプラインごとに使用できる$sourceステージは 1 つだけです。

ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。

  1. ステージは Apache$source Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTimeに設定されます。

  2. $matchステージでは、dewPoint.value 5.0が 以下のドキュメントを除外し、 がdewPoint.value 5.0より大きいドキュメントを次のステージに渡します。

  3. $mergeステージは、 sample_weatherstreamデータベース内のstreamという名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{ '$match': { 'dewPoint.value': { '$gt': 5 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

結果のsample_weatherstream.streamコレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66ad2edfd4fcac13b1a28ce3'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('165235')
}
},
airTemperature: { quality: '1', value: 27.7 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1' },
tendency: { code: '1', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '1', value: 1021.9 }
},
callLetters: 'CGDS',
dataSource: '4',
dewPoint: { quality: '9', value: 25.7 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-02T19:09:18.071Z'),
liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '8' },
period: { quality: '9', value: 3 }
},
position: { coordinates: [ 153.3, 50.7 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 },
presentWeatherObservationManual: { condition: '53', quality: '1' },
pressure: { quality: '1', value: 1016.3 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 27.6 },
sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 6900 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '05' },
lowCloudGenus: { quality: '9', value: '03' },
lowestCloudBaseHeight: { quality: '9', value: 150 },
lowestCloudCoverage: { quality: '1', value: '05' },
midCloudGenus: { quality: '9', value: '08' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 99999 },
cloudType: { quality: '9', value: '05' },
coverage: { quality: '1', value: '04' }
},
st: 'x+35700-027900',
type: 'SAO',
visibility: {
distance: { quality: '1', value: 4000 },
variability: { quality: '1', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '99', quality: '9' },
waves: { height: 99.9, period: 14, quality: '9' }
},
wind: {
direction: { angle: 280, quality: '9' },
speed: { quality: '1', rate: 30.3 },
type: '9'
}
}

注意

前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。

戻る

集計パイプライン