Docs Menu
Docs Home
/ / /
Scala

集計によるデータの変換

項目一覧

  • Overview
  • 集計操作と検索操作の比較
  • 制限
  • 集計操作の実行
  • ドキュメントのフィルタリング、グループ化、カウント
  • 集計の説明
  • Atlas 全文検索の実行
  • 詳細情報
  • MongoDB Server マニュアル
  • API ドキュメント

このガイドでは、 Scalaドライバーを使用して集計操作 を実行する方法を学習できます。

集計操作により MongoDB コレクション内のデータが処理され、計算結果が返されます。 クエリ API の一部である MongoDB 集計フレームワークは、データ処理パイプラインの概念をモデル化したものです。 ドキュメントは 1 つ以上の ステージを含むパイプラインに投入され、そこで集計結果に変換されます。

集計操作は自動車工場に似ています。工場内の組立ラインには、ドリルや溶接機のような、特定の作業をするための専用工具を備えた組立ステーションがあります。未加工のパーツが工場に搬入され、組立ラインで完成品に加工、組み立てられます。

集計パイプラインは組み立てライン、集計ステージは組み立てステーション、演算子式は専用ツールです。

次の表は、検索操作が実行できるさまざまなタスクを示し、それらを集計操作と比較しています。集計フレームワークは、データを変換および操作するための拡張機能を提供します。

検索操作
集計操作
Select certain documents to return
Select which fields to return
Sort the results
Limit the results
Count the results
Select certain documents to return
Select which fields to return
Sort the results
Limit the results
Count the results
Rename fields
Compute new fields
Summarize data
Connect and merge data sets

集計操作を実行する際には、次の制限を考慮してください。

  • 返されたドキュメントは、 BSONドキュメントサイズの制限である16メガバイトに違反することはできません。

  • パイプライン ステージには、デフォルトで 100 メガバイトのメモリ制限があります。この制限を超えるには、true の値を allowDiskUse() メソッドに渡し、そのメソッドを aggregate() にチェーンします。

  • $graphLookup 演算子には、 MB100 の厳格なメモリ制限があり、allowDiskUse() メソッドに渡される値を無視します。

注意

サンプル データ

このガイドの例では、 Atlas サンプル データセットsample_restaurantsデータベースのrestaurantsコレクションを使用します。 MongoDB Atlas クラスターを無料で作成して、サンプル データセットをロードする方法については、 「 Atlas を使い始める 」ガイドを参照してください。

集計 を実行するには、パイプラインステージを含むリストを aggregate() メソッドに渡します。 Scalaドライバーは、パイプラインステージを構築するためのヘルパーメソッドを含む Aggregatesクラスを提供します。

パイプラインステージとそれに対応する Aggregatesヘルパーメソッドの詳細については、次のリソースを参照してください。

このコード例では、ニューヨークの各地区のケーキの数のカウントを生成します。そのためには、aggregate() メソッドを呼び出し、集計パイプラインをステージのリストとして渡します。コードでは、次の Aggregatesヘルパーメソッドを使用してこれらのステージを構築します。

  • filter(): $match ステージを構築して、cuisine の値が であるドキュメントをフィルタリングします"Bakery"

  • group(): $group ステージを構築して、一致するドキュメントをborough フィールドでグループ化し、個別の 値ごとにドキュメントの数を蓄積します

val pipeline = Seq(Aggregates.filter(Filters.equal("cuisine", "Bakery")),
Aggregates.group("$borough", Accumulators.sum("count", 1))
)
collection.aggregate(pipeline)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"_id": "Brooklyn", "count": 173}
{"_id": "Queens", "count": 204}
{"_id": "Bronx", "count": 71}
{"_id": "Staten Island", "count": 20}
{"_id": "Missing", "count": 2}
{"_id": "Manhattan", "count": 221}

MongoDB が操作を実行する方法に関する情報を表示するには、 MongoDBクエリ プランナーにそれを説明するように指示できます。 MongoDB が操作 を説明すると、実行プランとパフォーマンス統計が返されます。実行プランは、 MongoDB が操作を完了できる潜在的な方法です。 MongoDB に操作を説明するよう指示すると、 MongoDBが実行したプランと拒否された実行プランの両方がデフォルトで返されます。

集計操作explain() aggregate()を説明するには、 メソッドを メソッドに連鎖させます。冗長レベルをexplain() に渡すことで、メソッドが返す情報のタイプと量が変更されます。冗長 の詳細については、 MongoDB Serverマニュアルの「 冗長モード 」を参照してください。

次の例では、 MongoDBExplainVerbosity.EXECUTION_STATS explain()に、前述の フィルター、グループ、ドキュメントの例の集計操作を説明するように指示します。このコードでは、 の冗長値が メソッドに渡されます。これにより、 メソッドは、当選プランの実行を説明する統計を返すように構成されます。

val pipelineToExplain = Seq(Aggregates.filter(Filters.equal("cuisine", "Bakery")),
Aggregates.group("$borough", Accumulators.sum("count", 1))
)
collection.aggregate(pipelineToExplain)
.explain(ExplainVerbosity.EXECUTION_STATS)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"explainVersion": "2", "queryPlanner": {"namespace": "sample_restaurants.restaurants",
"indexFilterSet": false, "parsedQuery": {"cuisine": {"$eq": "Bakery"}}, "queryHash": "865F14C3",
"planCacheKey": "0FC225DA", "optimizedPipeline": true, "maxIndexedOrSolutionsReached": false,
"maxIndexedAndSolutionsReached": false, "maxScansToExplodeReached": false, "winningPlan":
{"queryPlan": {"stage": "GROUP", "planNodeId": 3, "inputStage": {"stage": "COLLSCAN",
"planNodeId": 1, "filter": {"cuisine": {"$eq": "Bakery"}}, "direction": "forward"}},
...}

Tip

MongoDB v4.2 以降の Atlas でのみ利用可能

1 つ以上のフィールドの全文検索を指定するには、$searchパイプラインステージを作成します。 Scalaドライバーは、このステージを作成するための Aggregates.search()ヘルパーメソッドを提供します。 search() メソッドには次の引数が必要です。

  • SearchOperator インスタンス: 検索するフィールドとテキストを指定します。

  • SearchOptions インスタンス: 全文検索をカスタマイズするオプションを指定します。使用する Atlas Searchインデックスの名前に index オプションを設定する必要があります。

この例では、次のアクションを実行するためのパイプラインステージを作成しています。

  • nameフィールドで"Salt" という単語を含むテキストを検索

  • 一致するドキュメントの _idname の値のみを予測

val operator = SearchOperator.text(SearchPath.fieldPath("name"), "Salt")
val options = searchOptions().index("<search index name>")
val pipeline = Seq(Aggregates.search(operator, options),
Aggregates.project(Projections.include("name")))
collection.aggregate(pipeline)
.subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
{"_id": {"$oid": "..."}, "name": "Fresh Salt"}
{"_id": {"$oid": "..."}, "name": "Salt & Pepper"}
{"_id": {"$oid": "..."}, "name": "Salt + Charcoal"}
{"_id": {"$oid": "..."}, "name": "A Salt & Battery"}
{"_id": {"$oid": "..."}, "name": "Salt And Fat"}
{"_id": {"$oid": "..."}, "name": "Salt And Pepper Diner"}

重要

上記の例 を実行するには、 フィールド をカバーするrestaurants コレクションに Atlasname Searchインデックスを作成する必要があります。次に、 プレースホルダーをインデックスの名前に置き換えます。"<search index name>" Atlas Search インデックスの詳細については、「 Atlas Search インデックスガイド 」を参照してください。

このガイドで説明されているトピックについて詳しくは、 MongoDB Serverマニュアルの次のページ を参照してください。

このガイドで説明するメソッドとタイプの詳細については、次の API ドキュメントを参照してください。

戻る

クラスター モニタリング