Docs Menu

Docs HomeMongoDB Spark Connector

Datasets and SQL

The Dataset API provides the type safety and functional programming benefits of RDDs along with the relational model and performance optimizations of the DataFrame API. DataFrame no longer exists as a class in the Java API, so Dataset<Row> must be used to reference a DataFrame going forward.

The following app demonstrates how to create a Dataset with an implicit schema, create a Dataset with an explicit schema, and run SQL queries on the dataset.

Consider a collection named characters:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
package com.mongodb.spark_examples;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import com.mongodb.spark.MongoSpark;
public final class DatasetSQLDemo {
public static void main(final String[] args) throws InterruptedException {
SparkSession spark = SparkSession.builder()
.master("local")
.appName("MongoSparkConnectorIntro")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/test.myCollection")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/test.myCollection")
.getOrCreate();
// Create a JavaSparkContext using the SparkSession's SparkContext object
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Load data and infer schema, disregard toDF() name as it returns Dataset
Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();
// Load data with explicit schema
Dataset<Character> explicitDS = MongoSpark.load(jsc).toDS(Character.class);
explicitDS.printSchema();
explicitDS.show();
// Create the temp view and execute the query
explicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();
// Write the data to the "hundredClub" collection
MongoSpark.write(centenarians).option("collection", "hundredClub").mode("overwrite").save();
// Load the data from the "hundredClub" collection
MongoSpark.load(sparkSession, ReadConfig.create(sparkSession).withOption("collection", "hundredClub"), Character.class).show();
jsc.close();
}
}

To create a Dataset from MongoDB data, load the data via MongoSpark and call the JavaMongoRDD.toDF() method. Despite toDF() sounding like a DataFrame method, it is part of the Dataset API and returns a Dataset<Row>.

The dataset's schema is inferred whenever data is read from MongoDB and stored in a Dataset<Row> without specifying a schema-defining Java bean. The schema is inferred by sampling documents from the database. To explicitly declare a schema, see Explicitly Declare a Schema.

The following operation loads data from MongoDB then uses the Dataset API to create a Dataset and infer the schema:

Dataset<Row> implicitDS = MongoSpark.load(jsc).toDF();
implicitDS.printSchema();
implicitDS.show();

implicitDS.printSchema() outputs the following schema to the console:

root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

implicitDS.show() outputs the following to the console:

+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

By default, reading from MongoDB in a SparkSession infers the schema by sampling documents from the collection. You can also use a Java bean to define the schema explicitly, thus removing the extra queries needed for sampling.

Note

If you provide a case class for the schema, MongoDB returns only the declared fields. This helps minimize the data sent across the wire.

The following statement creates a Character Java bean and then uses it to define the schema for the DataFrame:

import java.io.Serializable;
public final class Character implements Serializable {
private String name;
private Integer age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(final Integer age) {
this.age = age;
}
}

The bean is passed to the toDS( Class<T> beanClass ) method to define the schema for the Dataset:

Dataset<Character> explicitDS = MongoSpark.load(jsc).toDS(Character.class);
explicitDS.printSchema();
explicitDS.show();

explicitDS.printSchema() outputs the following:

root
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

explicitDS.show() outputs the following:

+----+-------------+
| age| name|
+----+-------------+
| 50|Bilbo Baggins|
|1000| Gandalf|
| 195| Thorin|
| 178| Balin|
| 77| Kíli|
| 169| Dwalin|
| 167| Óin|
| 158| Glóin|
| 82| Fíli|
|null| Bombur|
+----+-------------+

Before running SQL queries on your dataset, you must register a temporary view for the dataset.

The following operation registers a characters table and then queries it to find all characters that are 100 or older:

explicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() outputs the following:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

The MongoDB Spark Connector provides the ability to persist DataFrames to a collection in MongoDB.

The following operation saves centenarians into the hundredClub collection in MongoDB:

/* Note: "overwrite" drops the collection before writing,
* use "append" to add to the collection */
MongoSpark.write(centenarians).option("collection", "hundredClub")
.mode("overwrite").save();
←  AggregationSpark Connector Python Guide →
Share Feedback