Docs 菜单
Docs 主页
/
Spark Connector
/

以批处理模式从 MongoDB 读取

在此页面上

  • Overview
  • 模式推断
  • 筛选器
  • SQL 查询
  • API 文档

要从 MongoDB 读取数据,请对 SparkSession 对象调用read() 方法。此方法会返回一个 DataFrameReader 对象,您可以使用该对象指定批量读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取:

设置
说明
dataFrame.read.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。
dataFrame.read.option()

使用 option 方法配置批量读取设置,包括 MongoDB 部署 连接字符串、MongoDB 数据库和集合以及分区程序配置。

有关批处理读取配置选项的列表,请参阅批量读取配置选项指南。

以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts 中读取数据:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

提示

DataFrame 类型

DataFrame 在Java API中不作为类存在。 使用Dataset<Row>引用 DataFrame。

要从 MongoDB 读取数据,请调用 SparkSession 对象上的 read 函数。此函数会返回一个 DataFrameReader 对象,您可以使用该对象指定批处理读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取:

设置
说明
dataFrame.read.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。
dataFrame.read.option()

使用 option 方法配置批处理读取设置,包括 MongoDB 部署连接字符串、MongoDB 数据库和集合以及分区器配置。

有关批量读取配置选项的列表,请参阅批量读取配置选项指南。

以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts 中读取数据:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

要从 MongoDB 读取数据,请调用 SparkSession 对象上的 read 方法。此方法会返回一个 DataFrameReader 对象,您可以使用该对象指定批处理读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取:

设置
说明
dataFrame.read.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。
dataFrame.read.option()

使用 option 方法配置批处理读取设置,包括 MongoDB 部署连接字符串、MongoDB 数据库和集合以及分区器配置。

有关批量读取配置选项的列表,请参阅批量读取配置选项指南。

以下代码示例演示了如何利用先前的配置设置,从 MongoDB 的 people.contacts 中读取数据:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

提示

DataFrame 类型

一个 DataFrame 由 Row 对象中的一个 Dataset 表示。DataFrame 类型是Dataset[Row] 的别名。

当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。

假设 MongoDB 集合 people.contacts 包含以下文档:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

以下操作从 people.contacts 加载数据并推断 DataFrame 的模式:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

若要查看推断的模式,则在 Dataset<Row> 对象上使用 printSchema() 方法,如以下示例所示:

dataFrame.printSchema();

要查看 DataFrame 中的数据,则在 DataFrame 对象上使用 show() 方法,如以下示例所示:

dataFrame.show();

当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。

假设 MongoDB 集合 people.contacts 包含以下文档:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

以下操作从 people.contacts 加载数据并推断 DataFrame 的模式:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

若要查看推断的模式,请在 DataFrame 对象上使用 printSchema() 方法,如以下示例所示:

dataFrame.printSchema()

要查看 DataFrame 中的数据,请对 DataFrame 对象使用 show() 函数,如下例所示:

dataFrame.show()

当您加载没有模式的数据集或 DataFrame 时,Spark 会对记录进行采样以推断集合的模式。

假设 MongoDB 集合 people.contacts 包含以下文档:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

以下操作从 people.contacts 加载数据并推断 DataFrame 的模式:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

若要查看推断的模式,则在 DataFrame 对象上使用 printSchema() 方法,如以下示例所示:

dataFrame.printSchema()

要查看 DataFrame 中的数据,则在 DataFrame 对象上使用 show() 方法,如以下示例所示:

dataFrame.show()

您可以通过指定schemaHint配置选项来指定要在模式推断期间使用的包含已知字段值的模式。 您可以使用以下任何Spark格式指定schemaHint选项:

类型
format
DDL
<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>
SQL DDL
STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>
JSON
{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

以下示例演示如何使用Spark shell以每种格式指定 schemaHint 选项。 该示例指定了一个名为"value"的字符串值字段和一个名为"count"的整数值字段。

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

您还可以使用 PySpark 以简单string DDL 格式或JSON格式指定 schemaHint 选项,如以下示例所示:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

在 DataFrame 或数据集中使用过滤器时,底层的 MongoDB Connector 代码会构建聚合管道,以在将 MongoDB 中的数据发送到 Spark 之前对其进行过滤。这通过仅检索和处理所需数据来提高 Spark 性能。

MongoDB Spark Connector 将以下筛选器转变为聚合管道阶段:

  • EqualNullSafe

  • EqualTo

  • 大于

  • GreaterThanOrEqual

  • 登录

  • IsNull

  • 小于

  • LessThanOrEqual

  • not

  • StringContains

  • StringEndsWith

  • StringStartsWith

使用 filter() 从 MongoDB 集合中读取数据子集。

考虑包含以下文档的集合 fruit

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

首先,设置 DataFrame 对象来连接默认的 MongoDB 数据源:

df = spark.read.format("mongodb").load()

以下示例仅包含 qty 字段大于或等于 10 的记录。

df.filter(df['qty'] >= 10).show()

该操作将打印以下输出:

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

在 DataFrame 或数据集中使用过滤器时,底层的 MongoDB Connector 代码会构建聚合管道,以在将 MongoDB 中的数据发送到 Spark 之前对其进行过滤。这通过仅检索和处理所需数据来提高 Spark 性能。

MongoDB Spark Connector 将以下筛选器转变为聚合管道阶段:

  • EqualNullSafe

  • EqualTo

  • 大于

  • GreaterThanOrEqual

  • 登录

  • IsNull

  • 小于

  • LessThanOrEqual

  • not

  • StringContains

  • StringEndsWith

  • StringStartsWith

以下示例筛选并输出年龄低于100的字符:

df.filter(df("age") < 100).show()

该操作输出以下内容:

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

在数据集上运行 SQL 查询之前,必须为数据集注册临时视图。

以下操作注册 characters 表,然后进行查询以查找所有大于或等于 100 的字符:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() 输出以下内容:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

在对 DataFrame 运行 SQL 查询之前,您需要注册一个临时表。

以下示例注册一个名为 temp 的临时表,然后使用 SQL 查询 type 字段包含字母 e 的记录:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

pyspark Shell 中,该操作将打印以下输出:

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

在数据集上运行 SQL 查询之前,必须为数据集注册临时视图。

以下操作注册 characters 表,然后进行查询以查找所有大于或等于 100 的字符:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档:

后退

批处理模式