Docs Home → MongoDB Spark Connector
Filters and Aggregation
Depending on the dataset, filtering data using MongoDB's aggregation framework may perform more efficiently than the direct use of RDD filters and dataset filters.
The following sections use the myCollection
collection in the
test
database that is configured in the SparkSession
:
{ "_id" : 1, "test" : 1 } { "_id" : 2, "test" : 2 } { "_id" : 3, "test" : 3 } { "_id" : 4, "test" : 4 } { "_id" : 5, "test" : 5 } { "_id" : 6, "test" : 6 } { "_id" : 7, "test" : 7 } { "_id" : 8, "test" : 8 } { "_id" : 9, "test" : 9 } { "_id" : 10, "test" : 10 }
Filters
The following example uses the RDD
defined above and
filters for all documents where the test
field has a value greater
than 5:
val rdd = MongoSpark.load(sc) val filteredRdd = rdd.filter(doc => doc.getInteger("test") > 5) println(filteredRdd.count) println(filteredRdd.first.toJson)
Aggregation
Pass an aggregation pipeline to
a MongoRDD
instance to filter data and perform aggregations in
MongoDB before passing documents to Spark.
The following example uses an aggregation pipeline to perform the same
filter operation as the example above; filter all documents where the
test
field has a value greater than 5:
val rdd = MongoSpark.load(sc) val aggregatedRdd = rdd.withPipeline(Seq(Document.parse("{ $match: { test : { $gt : 5 } } }"))) println(aggregatedRdd.count) println(aggregatedRdd.first.toJson)
Any valid aggregation pipeline can be specified in the example above.
Aggregation pipelines handle null results whereas the filter
methods do not. If the filter does not match any documents, the
operation throws the following exception:
ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 8) java.lang.NullPointerException