$source
項目一覧
定義
$source
ステージでは、データをストリーミングするための接続レジストリで接続を指定します。 次の接続タイプがサポートされています。
Apache Kafka エージェント
MongoDB コレクションの変更ストリーム
MongoDB database 変更ストリーム
ドキュメント配列
注意
Atlas サーバーレスインスタンスを$source
として使用することはできません。
構文
Apache Kafka ブロック
Apache Kafka からのストリーミング データを操作するには エージェントとして、$source
ステージには次のプロトタイプ形式があります。
{ "$source": { "connectionName": "<registered-connection>", "topic" : ["<source-topic>", ...], "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "partitionIdleTimeout": { "size": <duration-number>, "unit": "<duration-unit>" }, "config": { "auto_offset_reset": "<start-event>", "group_id": "<group-id>", "keyFormat": "<deserialization-type>", "keyFormatError": "<error-handling>" }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 | |
---|---|---|---|---|
| string | 必須 | データを取り込む接続レジストリ内の接続を識別するラベル。 | |
| 文字列または複数の文字列の配列 | 必須 | メッセージをストリーミング配信する 1 つ以上の Apache Kafka トピックの名前。複数のトピックからのメッセージをストリーミングする場合は、配列で指定します。 | |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
| |
| string | 任意 | $source によってプロジェクションされるタイムスタンプ フィールドの名前を上書きする名前。 Atlas Stream Processing パイプラインの $source ステージは、ドキュメントに割り当てられたタイムスタンプを持つ | |
| ドキュメント | 任意 | 証明機関の計算で無視される前に、パーティションがアイドル状態になることを許可する時間を指定するドキュメント。 | |
| integer | 任意 | パーティションのアイドル タイムアウトの期間を指定する数値。 | |
| string | 任意 | パーティション アイドル タイムアウトの期間の単位。
| |
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 | |
| string | 任意 | Apache Kafka のど のイベントを指定するか 取り込みを開始するソース トピック。
デフォルトは | |
| string | 任意 | ストリーム プロセッサに関連付ける Kafka コンシューマー グループの ID。 省略した場合、Atlas Stream Processing は、ストリーム プロセシング インスタンスを次の形式の自動生成された ID に関連付けます。
Atlas Stream Processingはパーティション オフセットを にコミットします チェックポイントがApacheKafka IDされた後、指定されたコンします。オフセットを超えるメッセージがチェックポイントに永続的に記録されると、オフセットをコミットします。 これにより、ストリーム プロセッサのオフセット ラグと進行状況を Kafka プロバイダー コンシューマー グループのメタデータから直接追跡できます。 | |
| string | 任意 | Apache Kafka を逆直列化するために使用されるデータ型 キー データ。次のいずれかの値である必要があります。
デフォルトは | |
| string | 任意 | Apache Kafka の逆直列化中に発生したエラーの処理方法 キー データ。次のいずれかの値である必要があります。
|
注意
Atlas Stream Processing では、ソース データ ストリーム内のドキュメントが有効なjson
またはejson
である必要があります。 Atlas Stream Processing は、この要件を満たさないドキュメントをデッド レター キューに設定します(デッド レター キューを設定している場合)。
MongoDB コレクションの変更ストリーム
Atlas コレクションの変更ストリームからのストリーミング データを操作する場合、 $source
ステージには次のプロトタイプ形式があります。
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "coll" : ["<source-coll>",...], "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 条件付き | データを取り込む接続レジストリ内の接続を識別するラベル。 |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| string | 任意 | ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。 Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに |
| string | 必須 |
|
| 文字列または複数の文字列の配列 | 必須 | によって指定された Atlasインスタンスでホストされている 1 つ以上のMongoDBコレクションの名前。これらのコレクションの変更ストリームは、ストリーミングデータソースとして機能します。このフィールドを省略すると、ストリーム |
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| token | 条件付き | ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。
|
| タイムスタンプ | 条件付き | ソースがレポートを開始するoptime 。
|
| string | 条件付き | 変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。
fullDocument の値を指定しない場合、デフォルトは コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ブール値 | 条件付き | すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| string | 任意 | 変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。
コレクションの変更ストリームでこのフィールドを使用するには、そのコレクションで変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ドキュメント | 任意 | 元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。 |
MongoDB Database Change Stream
Atlas データベース変更ストリームからのストリーミング データを操作する場合、 $source
ステージには次のプロトタイプ形式があります。
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "db" : "<source-db>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 条件付き | データを取り込む接続レジストリ内の接続を識別するラベル。 |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| string | 任意 | ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。 Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに |
| string | 必須 |
|
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| token | 条件付き | ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。
|
| タイムスタンプ | 条件付き | ソースがレポートを開始するoptime 。
|
| string | 条件付き | 変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。
fullDocument の値を指定しない場合、デフォルトは データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ブール値 | 条件付き | すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| string | 任意 | 変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。
データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ドキュメント | 任意 | 元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。 |
MongoDB クラスター全体の変更ストリームソース
Atlas クラスター変更ストリーム全体からのストリーミングデータを操作するには、$source
ステージのプロトタイプ形式は次のようになります。
{ "$source": { "connectionName": "<registered-connection>", "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "config": { "startAfter": <start-token> | "startAtOperationTime": <timestamp>, "fullDocument": "<full-doc-condition>", "fullDocumentOnly": <boolean>, "fullDocumentBeforeChange": "<before-change-condition>", "pipeline": [{ "<aggregation-stage>" : { <stage-input>, . . . }, . . . }] }, } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| string | 条件付き | データを取り込む接続レジストリ内の接続を識別するラベル。 |
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| string | 任意 | ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。 Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに |
| ドキュメント | 任意 | のさまざまなデフォルト値を上書きするフィールドを含むドキュメント。 |
| token | 条件付き | ソースがレポートを開始する変更イベント。 これは再開トークンの形式をとります。
|
| タイムスタンプ | 条件付き | ソースがレポートを開始するoptime 。
|
| string | 条件付き | 変更ストリーム ソースが完全なドキュメントを返すか、更新が発生したときにのみ変更を返すかを制御する設定。 次のいずれかである必要があります。
fullDocument の値を指定しない場合、デフォルトは データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ブール値 | 条件付き | すべてのメタデータを含む変更イベント ドキュメント全体を返すか、 データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| string | 任意 | 変更ストリーム ソースに、出力に元の「変更前」状態の完全なドキュメントを含めるかどうかを指定します。 次のいずれかである必要があります。
データベース変更ストリームでこのフィールドを使用するには、そのデータベース内のすべてのコレクションに対して変更ストリームの事前イメージと事後イメージを有効にする必要があります。 |
| ドキュメント | 任意 | 元で変更ストリーム出力をフィルタリングするための集計パイプラインを指定します。 このパイプラインは、 change-stream-modify- output で説明されているパラメーターに準拠する必要があります。 |
ドキュメント配列
ドキュメントの配列を操作するために、 $source
ステージには次のプロトタイプ形式があります。
{ "$source": { "timeField": { $toDate | $dateFromString: <expression> }, "tsFieldName": "<timestamp>", "documents" : [{source-doc},...] | <expression> } }
$source
ステージは、次のフィールドを持つドキュメントを取得します。
フィールド | タイプ | 必要性 | 説明 |
---|---|---|---|
| ドキュメント | 任意 | 受信メッセージの権限のあるタイムスタンプを定義するドキュメント。
|
| string | 任意 | ソースによって宣言されたデフォルトのタイムスタンプ フィールドの名前を上書きする名前。 Atlas Stream Processing パイプラインは、タイムスタンプ情報を格納するために、受信メッセージに |
| 配列 | 条件付き | ストリーミング データソースとして使用するドキュメントの配列。 このフィールドの値は、オブジェクトの配列、またはオブジェクトの配列として評価される 式 のいずれかになります。 |
動作
$source
は、表示されるすべてのパイプラインの最初のステージである必要があります。 パイプラインごとに使用できる$source
ステージは 1 つだけです。
例
ストリーミング データ ソースは、気象用サンプル データセットのスキーマに準拠して、さまざまな場所から詳細な気象レポートを生成します。 次の集計には 3 つのステージがあります。
ステージは Apache
$source
Kafka との接続を確立します エージェントがこれらのレポートをmy_weatherdata
という名前のトピックで収集し、各レコードを後続の集計ステージに取り込まれるときに公開します。このステージではまた、プロジェクションを実行するタイムスタンプ フィールドの名前が上書きされ、フィールドはingestionTime
に設定されます。$match
ステージでは、dewPoint.value
5.0
が 以下のドキュメントを除外し、 がdewPoint.value
5.0
より大きいドキュメントを次のステージに渡します。$merge
ステージは、sample_weatherstream
データベース内のstream
という名前の Atlas コレクションに出力を書き込みます。 そのようなデータベースやコレクションが存在しない場合は、Atlas がそれらを作成します。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$match': { 'dewPoint.value': { '$gt': 5 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
結果のsample_weatherstream.stream
コレクション内のドキュメントを表示するには、Atlas クラスターに接続して次のコマンドを実行します。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66ad2edfd4fcac13b1a28ce3'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('165235') } }, airTemperature: { quality: '1', value: 27.7 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1' }, tendency: { code: '1', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '1', value: 1021.9 } }, callLetters: 'CGDS', dataSource: '4', dewPoint: { quality: '9', value: 25.7 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-02T19:09:18.071Z'), liquidPrecipitation: { condition: '9', depth: 160, period: 24, quality: '2' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '8' }, period: { quality: '9', value: 3 } }, position: { coordinates: [ 153.3, 50.7 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '4', estimatedWaterDepth: 4 }, presentWeatherObservationManual: { condition: '53', quality: '1' }, pressure: { quality: '1', value: 1016.3 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 27.6 }, sections: [ 'AA2', 'SA1', 'MW1', 'AG1', 'GF1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 6900 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '05' }, lowCloudGenus: { quality: '9', value: '03' }, lowestCloudBaseHeight: { quality: '9', value: 150 }, lowestCloudCoverage: { quality: '1', value: '05' }, midCloudGenus: { quality: '9', value: '08' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 99999 }, cloudType: { quality: '9', value: '05' }, coverage: { quality: '1', value: '04' } }, st: 'x+35700-027900', type: 'SAO', visibility: { distance: { quality: '1', value: 4000 }, variability: { quality: '1', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '99', quality: '9' }, waves: { height: 99.9, period: 14, quality: '9' } }, wind: { direction: { angle: 280, quality: '9' }, speed: { quality: '1', rate: 30.3 }, type: '9' } }
注意
前述の例はその一般的な例です。 ストリーミング データは静的ではなく、各ユーザーに異なるドキュメントが表示されます。