$merge
項目一覧
$merge
は、 集計パイプラインの結果を Atlas クラスター上の一時コレクションに書込みます。 次に、Atlas Data Federation は Atlas クラスターでローカルに$merge
を実行し、チャンク内のデータをターゲット コレクションにマージします。 $merge
操作中に障害が発生した場合、少なくとも部分的なデータがターゲット コレクションに書き込まれるようにします。
Atlas Data Federation では、 $merge
は次のことが可能です。
でサポートされているフェデレーティッドデータベースインスタンス ストアからデータを書き込みます。
同じ Atlas プロジェクト内の同じまたは異なる Atlas クラスター、データベース、またはコレクションに書き込みます。
Atlas クラスターへの書込みを許可するために、Atlas Data Federation は必須のinto
フィールドに代替構文を導入します。 Atlas Data Federation は、 $merge
で説明されている他のすべてのフィールドをサポートしています。
詳しくは、 $merge
パイプライン ステージを参照してください。
必要な権限
$merge
を使用して Atlas クラスター上のコレクションに書き込むには、次の特権を持つデータベースユーザーである必要があります。
Considerations
集計が失敗した場合、Atlas Data Federation は、エラーが発生する前に$merge
が完了した書き込みをロールバックしません。
構文
{ "$merge": { "into": { "atlas": { "projectId": "<atlas-project-ID>", "clusterName": "<atlas-cluster-name>", "db": "<atlas-database-name>", "coll": "<atlas-collection-name>" } }, "on": "<identifier field>"|[ "<identifier field1>", ...], "let": { <variables> }, "whenMatched": "replace|keepExisting|merge|fail|pipeline", "whenNotMatched": "insert|discard|fail" } }
フィールド
このセクションでは、Atlas Data Federation がinto
フィールドに提供する代替構文について説明します。
フィールド | タイプ | 説明 | 必要性 |
---|---|---|---|
| オブジェクト | 集約パイプラインからドキュメントを書き込むロケーション。 | 必須 |
| string | Atlas クラスターの名前。 | 必須 |
| string | Atlas クラスター上のコレクションの名前。 | 必須 |
| string | コレクションを含む Atlas クラスター上のデータベースの名前。 | 必須 |
| string | Atlas クラスターを含むプロジェクトの一意の識別子。 これは、フェデレーティッドデータベースインスタンスを含むプロジェクトの ID です。 省略した場合、デフォルトはフェデレーティッドデータベースインスタンスを含むプロジェクトの ID。 | 任意 |
他のフィールド、 on
、 let
、 whenMatched
、 whenNotMatched
の詳細については、 $merge
の MongoDB サーバーのドキュメントを参照してください。
注意
複数のフィールドでon
を使用するには、``on`` 識別子フィールドに複合一意のインデックスを作成する必要があります。
オプション
オプション | タイプ | 説明 | 必要性 | |
---|---|---|---|---|
| ブール値 | バックグラウンドで集計操作を実行するためのフラグ。 省略した場合、デフォルトは
現在実行中のクエリがフェデレーティッドデータベースインスタンスへの接続を完了または切断するのを待たずに、他の新しいクエリを送信すると同時に、クエリがバックグラウンドで実行され続ける場合は、このオプションを使用します。 | 任意 |
重複したドキュメント ID の解決
アーカイブまたはデータ ストアから Atlas クラスターにドキュメントを書き込む場合、ドキュメントに重複する_id
フィールドが含まれることがあります。 このセクションでは、Atlas Data Federation が重複を解決する方法と、集計パイプラインの重複を解決するための推奨事項を含めます。
Atlas Data Federation で重複 ID を解決
重複を解決するために、Atlas Data Federation は以下を実行します。
重複が検出されるまで、ドキュメントを受け取った順序で Atlas コレクション
X
にドキュメントを書込みます。重複する
_id
フィールドと後続のすべてのドキュメントを新しい Atlas コレクションY
に書込みます。指定された
$merge
ステージを実行して、コレクションY
をコレクションX
にマージします。結果のドキュメントを、指定された Atlas クラスター上のターゲット コレクションに書き込みます。
注意
Atlas Data Federation は、 _id
フィールドの重複値のみを解決します。 一意なインデックスを持つ他のフィールドの重複値は解決されません。
重複 ID の修正
重複する_id
フィールドを修正するには、次の操作を実行します。
Atlas Data Federation が結果ドキュメントを処理する順序を指定するには、パイプラインに
$sort
ステージを含めます。$merge
ステージに移行するドキュメントの順序に基づいて、whenMatched
whenNotMatched
$merge
ステージの オプションと オプションの値を慎重に選択します。例
次の例は、
whenMatched
オプションがkeepExisting
またはreplace
に設定されている場合に、$merge
ステージ中に Atlas Data Federation が重複を解決する方法を示しています。 これらの例では、次のドキュメントを使用します。{ "_id" : 1, "state" : "FL" }, { "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } 上記でリストされているドキュメントに対して次のパイプラインを実行するとします。
db.s3coll.aggregate([ { "$sort": { "_id": 1, "state": 1, } }, { "$merge": { "into": { "atlas": { "clusterName": "clustername", "db": "clusterdb", "coll": "clustercoll" } }, "on": "_id", "whenMatched": "keepExisting", "whenNotMatched": "insert" } } ]) Atlas Data Federation は、次のデータを
X
とY
という名前の 2 つのコレクションに書き込みます。{ "_id" : 1, "state" : "FL" } { "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } 次に、Atlas Data Federation はコレクション のドキュメントをコレクション
Y
X
にマージします。パイプラインのwhenMatched: keepExisting
オプションの場合、Atlas Data Federation はコレクションX
に_id: 1
を含む既存のドキュメントを保持します。 そのため、重複を含むパイプラインの結果には次のドキュメントが含まれます。{ "_id" : 1, "state" : "FL" }, { "_id" : 2, "state" : "TX" } 次に、Atlas Data Federation はこれらのドキュメントを、指定された Atlas クラスター上のターゲット コレクションにマージします。
上記でリストされているドキュメントに対して次のパイプラインを実行するとします。
db.s3coll.aggregate([ { "$sort": { "_id": 1, "state": 1, } }, { "$merge": { "into": { "atlas": { "clusterName": "clustername", "db": "clusterdb", "coll": "clustercoll" } }, "on": "_id", "whenMatched": "replace", "whenNotMatched": "insert" } } ]) Atlas Data Federation は、次のデータを
X
とY
という名前の 2 つのコレクションに書き込みます。{ "_id" : 1, "state" : "FL" } { "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } Atlas Data Federation は、コレクション のドキュメントをコレクション
Y
X
にマージします。パイプラインのwhenMatched: replace
オプションでは、Atlas Data Federation はコレクションX
内の_id: 1
を持つドキュメントを、コレクションY
内の_id: 1
を持つドキュメントに置き換えます。 そのため、重複を含むパイプラインの結果には次のドキュメントが含まれます。{ "_id" : 1, "state" : "NJ" }, { "_id" : 2, "state" : "TX" } 次に、Atlas Data Federation はこれらのドキュメントを、指定された Atlas クラスター上のターゲット コレクションにマージします。
whenNotMatched: discard
オプションは使用しないでください。例
この例では、次のドキュメントを使用して、
whenNotMatched
オプションがdiscard
に設定されている場合に Atlas Data Federation が重複を解決する方法を示しています。{ "_id" : 1, "state" : "AZ" }, { "_id" : 1, "state" : "CA" }, { "_id" : 2, "state" : "NJ" }, { "_id" : 3, "state" : "NY" }, { "_id" : 4, "state" : "TX" } 上記でリストされているドキュメントに対して次のパイプラインを実行するとします。
db.archivecoll.aggregate([ { "$sort": { "_id": 1, "state": 1, } }, { "$merge": { "into": { "atlas": { "clusterName": "clustername", "db": "clusterdb", "coll": "clustercoll" } }, "on": "_id", "whenMatched": "replace", "whenNotMatched": "discard" } } ]) Atlas Data Federation は、次のデータを
X
とY
という名前の 2 つのコレクションに書き込みます。{ "_id" : 1, "state" : "AZ" // gets replaced } { "_id" : 1, "state" : "CA" } { "_id" : 2, "state" : "NJ" // gets discarded } { "_id" : 3, "state" : "NY" // gets discarded } { "_id" : 4, "state" : "TX" // gets discarded } Atlas Data Federation は、コレクション のドキュメントをコレクション
Y
X
にマージします。パイプラインのwhenMatched: replace
オプションでは、Atlas Data Federation はコレクションX
内の_id: 1
を持つドキュメントを、コレクションY
内の_id: 1
を持つドキュメントに置き換えます。 パイプラインのwhenNotMatched: discard
オプションでは、Atlas Data Federation は、コレクションX
内のドキュメントと一致しないコレクションY
内のドキュメントを破棄します。 したがって、重複を含むパイプラインの結果には、次のドキュメントのみが含まれます。{ "_id" : 1, "state" : "CA" } 次に、Atlas Data Federation はこのドキュメントを、指定された Atlas クラスター上のターゲット コレクションにマージします。
例
を使用したデータのマージ $merge
次の$merge
構文の例では、 myTestCluster
という名前の Atlas クラスター上のsampleDB.mySampleData
コレクションに結果を書込みます。 この例では、プロジェクト ID を指定していません。 $merge
ステージでは、フェデレーティッドデータベースインスタンスを含むプロジェクトの ID が使用されます。
例
1 db.mySampleData.aggregate( 2 [ 3 { 4 "$merge": { 5 "into": { 6 "atlas": { 7 "clusterName": "myTestCluster", 8 "db": "sampleDB", 9 "coll": "mySampleData" 10 } 11 }, 12 ... 13 } 14 } 15 ] 16 )
$merge
バックグラウンドで を実行
次の$merge
構文は、バックグラウンドでmyTestCluster
という名前の Atlas クラスター上のsampleDB.mySampleData
コレクションに結果を書込みます。 この例では、プロジェクト ID を指定していません。 $merge
ステージでは、フェデレーティッドデータベースインスタンスを含むプロジェクトの ID が使用されます。
例
1 db.mySampleData.aggregate( 2 [ 3 { 4 "$merge": { 5 "into": { 6 "atlas": { 7 "clusterName": "myTestCluster", 8 "db": "sampleDB", 9 "coll": "mySampleData" 10 } 11 }, 12 ... 13 } 14 } 15 ], 16 { "background" : true } 17 )