Docs Menu
Docs Home
/ / /
Scala
/

集計フレームワーク

項目一覧

  • 前提条件
  • MongoDB 配置への接続
  • 集計の実行
  • 集計式の使用
  • 集計の説明

集計パイプラインは、データ処理パイプラインの概念をモデル化したデータ集計のフレームワークです。

集計の詳細については、サーバー マニュアルの集計パイプラインを参照してください。

このガイドのコード例を実行するには、次のコンポーネントを設定する必要があります。

  • Atest.restaurants コレクション populated with documents from therestaurants.json ファイル in the documentation assets Github.

  • 次のインポート ステートメントは次のとおりです。

import org.mongodb.scala._
import org.mongodb.scala.model.Aggregates._
import org.mongodb.scala.model.Accumulators._
import org.mongodb.scala.model.Filters._
import org.mongodb.scala.model.Projections._

注意

このガイドでは、 クイック スタート プライマリで説明されているObservable暗黙を使用します。

まず、MongoDB 配置に接続し、 インスタンスとMongoDatabase MongoCollectionインスタンスを 宣言して定義します。

次のコードは、ポート27017localhostで実行されているスタンドアロンの MongoDB 配置に接続します。 次に、 testデータベースを参照するためのdatabase変数と、 restaurantsコレクションを参照するためのcollection変数を定義します。

val mongoClient: MongoClient = MongoClient()
val database: MongoDatabase = mongoClient.getDatabase("test")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

MongoDB 配置への接続の詳細については、「 MongoDB への接続」チュートリアルを参照してください。

集計を実行するには、集計ステージのリストをMongoCollection.aggregate()メソッドに渡します。 このドライバーは、集計ステージのビルダを含むAggregatesヘルパー クラスを提供します。

この例では、集計パイプラインは次のタスクを実行しています。

  • $matchステージを使用して、 categories配列フィールドに要素"Bakery"を含むドキュメントをフィルタリングします。 この例では、 Aggregates.filter()を使用して$matchステージを構築しています。

  • $groupステージを使用して、一致するドキュメントをstarsフィールドでグループ化し、 starsの個別の値ごとにドキュメントの数を累積します。 この例では、 Aggregates.group()を使用して$groupステージを構築し、 Accumulators.sum()を使用してアキュムレータ式を構築します。 $groupステージ内で使用するアキュムレータ式の場合、ドライバーはAccumulatorsヘルパー クラスを提供します。

collection.aggregate(Seq(
Aggregates.filter(Filters.equal("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1))
)).printResults()

$groupアキュムレータ式の場合、ドライバーはAccumulatorsヘルパー クラスを提供します。 その他の集計式は、 Documentクラスを使用して式を手動で構築します。

次の例では、集計パイプラインは$projectステージを使用して、 nameフィールドと、値がcategories配列の最初の要素である計算フィールドfirstCategoryのみを返します。 この例では、 Aggregates.project()とさまざまなProjectionsクラスのメソッドを使用して$projectステージを構築します。

collection.aggregate(
Seq(
Aggregates.project(
Projections.fields(
Projections.excludeId(),
Projections.include("name"),
Projections.computed(
"firstCategory",
Document("$arrayElemAt"-> Seq("$categories", 0))
)
)
)
)
).printResults()

集計パイプラインを$explainするには、 AggregatePublisher.explain()メソッドを呼び出します。

collection.aggregate(
Seq(Aggregates.filter(Filters.eq("categories", "Bakery")),
Aggregates.group("$stars", Accumulators.sum("count", 1)))
).explain().printResults()

戻る

一括書き込み操作