Docs Menu
Docs Home
/
MongoDB Atlas
/ / /

$merge

項目一覧

  • 必要な権限
  • Considerations
  • 構文
  • フィールド
  • オプション
  • 重複したドキュメント ID の解決
  • Atlas Data Federation で重複 ID を解決
  • 重複 ID の修正
  • を使用したデータのマージ $merge
  • バックグラウンドで$mergeを実行

$merge は、 集計パイプラインの結果を Atlas クラスター上の一時コレクションに書込みます。 次に、Atlas Data Federation は Atlas クラスターでローカルに$mergeを実行し、チャンク内のデータをターゲット コレクションにマージします$merge操作中に障害が発生した場合、少なくとも部分的なデータがターゲット コレクションに書き込まれるようにします。

Atlas Data Federation では、 $mergeは次のことが可能です。

  • サポートされているフェデレーティッドデータベースインスタンス ストアからデータを書き込みます。

  • 同じ Atlas プロジェクト内の同じまたは異なる Atlas クラスター、データベース、またはコレクションに書き込みます。

Atlas クラスターへの書込みを許可するために、Atlas Data Federation は必須のintoフィールドに代替構文を導入します。 Atlas Data Federation は、 $mergeで説明されている他のすべてのフィールドをサポートしています。

詳しくは、 $mergeパイプライン ステージを参照してください。

$mergeを使用して Atlas クラスター上のコレクションに書き込むには、次の特権を持つデータベースユーザーである必要があります。

集計が失敗した場合、Atlas Data Federation は、エラーが発生する前に$mergeが完了した書き込みをロールバックしません。

{
"$merge": {
"into": {
"atlas": {
"projectId": "<atlas-project-ID>",
"clusterName": "<atlas-cluster-name>",
"db": "<atlas-database-name>",
"coll": "<atlas-collection-name>"
}
},
"on": "<identifier field>"|[ "<identifier field1>", ...],
"let": { <variables> },
"whenMatched": "replace|keepExisting|merge|fail|pipeline",
"whenNotMatched": "insert|discard|fail"
}
}

このセクションでは、Atlas Data Federation がintoフィールドに提供する代替構文について説明します。

フィールド
タイプ
説明
必要性
atlas
オブジェクト
集約パイプラインからドキュメントを書き込むロケーション。
必須
clusterName
string
Atlas クラスターの名前。
必須
coll
string
Atlas クラスター上のコレクションの名前。
必須
db
string
コレクションを含む Atlas クラスター上のデータベースの名前。
必須
projectId
string
Atlas クラスターを含むプロジェクトの一意の識別子。 これは、フェデレーティッドデータベースインスタンスを含むプロジェクトの ID です。 省略した場合、デフォルトはフェデレーティッドデータベースインスタンスを含むプロジェクトの ID。
任意

他のフィールド、 onletwhenMatchedwhenNotMatchedの詳細については、 $mergeの MongoDB サーバーのドキュメントを参照してください。

注意

複数のフィールドでonを使用するには、``on`` 識別子フィールドに複合一意のインデックスを作成する必要があります。

オプション
タイプ
説明
必要性
background
ブール値

バックグラウンドで集計操作を実行するためのフラグ。 省略した場合、デフォルトはfalseになります。 trueに設定すると、Atlas Data Federation はバックグラウンドでクエリを実行します。

{ "background" : true }

現在実行中のクエリがフェデレーティッドデータベースインスタンスへの接続を完了または切断するのを待たずに、他の新しいクエリを送信すると同時に、クエリがバックグラウンドで実行され続ける場合は、このオプションを使用します。

任意

アーカイブまたはデータ ストアから Atlas クラスターにドキュメントを書き込む場合、ドキュメントに重複する_idフィールドが含まれることがあります。 このセクションでは、Atlas Data Federation が重複を解決する方法と、集計パイプラインの重複を解決するための推奨事項を含めます。

重複を解決するために、Atlas Data Federation は以下を実行します。

  1. 重複が検出されるまで、ドキュメントを受け取った順序で Atlas コレクションXにドキュメントを書込みます。

  2. 重複する_idフィールドと後続のすべてのドキュメントを新しい Atlas コレクションYに書込みます。

  3. 指定された$mergeステージを実行して、コレクションYをコレクションXにマージします。

  4. 結果のドキュメントを、指定された Atlas クラスター上のターゲット コレクションに書き込みます。

注意

Atlas Data Federation は、 _idフィールドの重複値のみを解決します。 一意なインデックスを持つ他のフィールドの重複値は解決されません。

重複する_idフィールドを修正するには、次の操作を実行します。

  1. Atlas Data Federation が結果ドキュメントを処理する順序を指定するには、パイプラインに$sortステージを含めます。

  2. $mergeステージに移行するドキュメントの順序に基づいて、whenMatched whenNotMatched$mergeステージの オプションと オプションの値を慎重に選択します。

    次の例は、 whenMatchedオプションがkeepExistingまたはreplaceに設定されている場合に、 $mergeステージ中に Atlas Data Federation が重複を解決する方法を示しています。 これらの例では、次のドキュメントを使用します。

    {
    "_id" : 1,
    "state" : "FL"
    },
    {
    "_id" : 1,
    "state" : "NJ"
    },
    {
    "_id" : 2,
    "state" : "TX"
    }

    上記でリストされているドキュメントに対して次のパイプラインを実行するとします。

    db.s3coll.aggregate([
    {
    "$sort": {
    "_id": 1,
    "state": 1,
    }
    },
    {
    "$merge": {
    "into": {
    "atlas": {
    "clusterName": "clustername",
    "db": "clusterdb",
    "coll": "clustercoll"
    }
    },
    "on": "_id",
    "whenMatched": "keepExisting",
    "whenNotMatched": "insert"
    }
    }
    ])

    Atlas Data Federation は、次のデータをXYという名前の 2 つのコレクションに書き込みます。

    {
    "_id" : 1,
    "state" : "FL"
    }
    {
    "_id" : 1,
    "state" : "NJ"
    },
    {
    "_id" : 2,
    "state" : "TX"
    }

    次に、Atlas Data Federation はコレクション のドキュメントをコレクションY Xにマージします。パイプラインのwhenMatched: keepExistingオプションの場合、Atlas Data Federation はコレクションX_id: 1を含む既存のドキュメントを保持します。 そのため、重複を含むパイプラインの結果には次のドキュメントが含まれます。

    {
    "_id" : 1,
    "state" : "FL"
    },
    {
    "_id" : 2,
    "state" : "TX"
    }

    次に、Atlas Data Federation はこれらのドキュメントを、指定された Atlas クラスター上のターゲット コレクションにマージします。

    上記でリストされているドキュメントに対して次のパイプラインを実行するとします。

    db.s3coll.aggregate([
    {
    "$sort": {
    "_id": 1,
    "state": 1,
    }
    },
    {
    "$merge": {
    "into": {
    "atlas": {
    "clusterName": "clustername",
    "db": "clusterdb",
    "coll": "clustercoll"
    }
    },
    "on": "_id",
    "whenMatched": "replace",
    "whenNotMatched": "insert"
    }
    }
    ])

    Atlas Data Federation は、次のデータをXYという名前の 2 つのコレクションに書き込みます。

    {
    "_id" : 1,
    "state" : "FL"
    }
    {
    "_id" : 1,
    "state" : "NJ"
    },
    {
    "_id" : 2,
    "state" : "TX"
    }

    Atlas Data Federation は、コレクション のドキュメントをコレクションY Xにマージします。パイプラインのwhenMatched: replaceオプションでは、Atlas Data Federation はコレクションX内の_id: 1を持つドキュメントを、コレクションY内の_id: 1を持つドキュメントに置き換えます。 そのため、重複を含むパイプラインの結果には次のドキュメントが含まれます。

    {
    "_id" : 1,
    "state" : "NJ"
    },
    {
    "_id" : 2,
    "state" : "TX"
    }

    次に、Atlas Data Federation はこれらのドキュメントを、指定された Atlas クラスター上のターゲット コレクションにマージします。

  3. whenNotMatched: discardオプションは使用しないでください。

    この例では、次のドキュメントを使用して、 whenNotMatchedオプションがdiscardに設定されている場合に Atlas Data Federation が重複を解決する方法を示しています。

    {
    "_id" : 1,
    "state" : "AZ"
    },
    {
    "_id" : 1,
    "state" : "CA"
    },
    {
    "_id" : 2,
    "state" : "NJ"
    },
    {
    "_id" : 3,
    "state" : "NY"
    },
    {
    "_id" : 4,
    "state" : "TX"
    }

    上記でリストされているドキュメントに対して次のパイプラインを実行するとします。

    db.archivecoll.aggregate([
    {
    "$sort": {
    "_id": 1,
    "state": 1,
    }
    },
    {
    "$merge": {
    "into": {
    "atlas": {
    "clusterName": "clustername",
    "db": "clusterdb",
    "coll": "clustercoll"
    }
    },
    "on": "_id",
    "whenMatched": "replace",
    "whenNotMatched": "discard"
    }
    }
    ])

    Atlas Data Federation は、次のデータをXYという名前の 2 つのコレクションに書き込みます。

    {
    "_id" : 1,
    "state" : "AZ" // gets replaced
    }
    {
    "_id" : 1,
    "state" : "CA"
    }
    {
    "_id" : 2,
    "state" : "NJ" // gets discarded
    }
    {
    "_id" : 3,
    "state" : "NY" // gets discarded
    }
    {
    "_id" : 4,
    "state" : "TX" // gets discarded
    }

    Atlas Data Federation は、コレクション のドキュメントをコレクションY Xにマージします。パイプラインのwhenMatched: replaceオプションでは、Atlas Data Federation はコレクションX内の_id: 1を持つドキュメントを、コレクションY内の_id: 1を持つドキュメントに置き換えます。 パイプラインのwhenNotMatched: discardオプションでは、Atlas Data Federation は、コレクションX内のドキュメントと一致しないコレクションY内のドキュメントを破棄します。 したがって、重複を含むパイプラインの結果には、次のドキュメントのみが含まれます。

    {
    "_id" : 1,
    "state" : "CA"
    }

    次に、Atlas Data Federation はこのドキュメントを、指定された Atlas クラスター上のターゲット コレクションにマージします。

次の$merge構文の例では、 myTestClusterという名前の Atlas クラスター上のsampleDB.mySampleDataコレクションに結果を書込みます。 この例では、プロジェクト ID を指定していません。 $mergeステージでは、フェデレーティッドデータベースインスタンスを含むプロジェクトの ID が使用されます。

1db.mySampleData.aggregate(
2 [
3 {
4 "$merge": {
5 "into": {
6 "atlas": {
7 "clusterName": "myTestCluster",
8 "db": "sampleDB",
9 "coll": "mySampleData"
10 }
11 },
12 ...
13 }
14 }
15 ]
16)

次の$merge構文は、バックグラウンドでmyTestClusterという名前の Atlas クラスター上のsampleDB.mySampleDataコレクションに結果を書込みます。 この例では、プロジェクト ID を指定していません。 $mergeステージでは、フェデレーティッドデータベースインスタンスを含むプロジェクトの ID が使用されます。

1db.mySampleData.aggregate(
2 [
3 {
4 "$merge": {
5 "into": {
6 "atlas": {
7 "clusterName": "myTestCluster",
8 "db": "sampleDB",
9 "coll": "mySampleData"
10 }
11 },
12 ...
13 }
14 }
15 ],
16 { "background" : true }
17)

戻る

$lookup