Docs 菜单
Docs 主页
/
Spark Connector
/

以流媒体模式从 MongoDB 读取

在此页面上

  • Overview
  • 例子
  • API 文档

从 MongoDB 数据库读取流时,MongoDB Spark Connector 支持微批处理连续处理。 微批处理是默认的处理引擎,可实现低至 100 毫秒的端到端延迟,并具有一次性容错保证。连续处理是 Spark 2.3 版中引入的一项实验性功能,可在保证至少一次的情况下实现低至 1 毫秒的端到端延迟。

要学习;了解有关连续处理的详情,请参阅 Spark文档。

注意

连接器从 MongoDB 部署的变更流中读取数据。如需在变更流上生成更改事件,请对数据库上执行更新操作。

如需了解有关变更流的更多信息,请参阅 MongoDB 手册中的“变更流”部分。

要从MongoDB读取数据,请对SparkSession对象调用 readStream()方法。 此方法会返回一个DataStreamReader对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取:

设置
说明
readStream.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。
readStream.option()

指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及聚合管道阶段。

有关读取流配置选项的列表,请参阅流读取配置选项指南。

readStream.schema()
指定输入模式。

以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir一次。将Trigger.Continuous参数传递给trigger()方法可启用连续处理。

import org.apache.spark.sql.streaming.Trigger;
Dataset<Row> streamingDataset = <local SparkSession>.readStream()
.format("mongodb")
.load();
DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream()
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append");
StreamingQuery query = dataStreamWriter.start();

注意

在您对流媒体查询调用start()方法之前,Spark 不会开始流媒体。

有关方法的完整列表,请参阅 Java结构化流参考。

要从 MongoDB 读取数据,请对SparkSession对象调用readStream函数。 此函数返回一个DataStreamReader对象,您可以使用该对象指定流式读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取:

设置
说明
readStream.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。
readStream.option()

指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及聚合管道阶段。

有关读取流配置选项的列表,请参阅流媒体读取配置选项指南。

readStream.schema()
指定输入模式。

以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir一次。将continuous参数传递给trigger()方法可启用连续处理。

streamingDataFrame = (<local SparkSession>.readStream
.format("mongodb")
.load()
)
dataStreamWriter = (streamingDataFrame.writeStream
.trigger(continuous="1 second")
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
)
query = dataStreamWriter.start()

注意

在您对流媒体查询调用start()方法之前,Spark 不会开始流媒体。

有关方法的完整列表,请参阅 pyspark 结构化流参考。

要从 MongoDB 读取数据,请对SparkSession对象调用readStream方法。 此方法会返回一个DataStreamReader对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。

您必须指定以下配置设置才能从 MongoDB 读取:

设置
说明
readStream.format()
指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。
readStream.option()

指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及聚合管道阶段。

有关读取流配置选项的列表,请参阅流媒体读取配置选项指南。

readStream.schema()
指定输入模式。

以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir一次。将Trigger.Continuous参数传递给trigger()方法可启用连续处理。

import org.apache.spark.sql.streaming.Trigger
val streamingDataFrame = <local SparkSession>.readStream
.format("mongodb")
.load()
val dataStreamWriter = streamingDataFrame.writeStream
.trigger(Trigger.Continuous("1 second"))
.format("memory")
.option("checkpointLocation", "/tmp/checkpointDir")
.outputMode("append")
val query = dataStreamWriter.start()

注意

在您对流媒体查询调用start()方法之前,Spark 不会开始流媒体。

有关方法的完整列表,请参阅 Scala 结构化流参考。

以下示例展示了如何将数据从 MongoDB 流式传输到控制台。

  1. 创建一个从 MongoDB 读取数据的DataStreamReader对象。

  2. 通过在使用DataStreamReader创建的流式Dataset对象上调用writeStream()方法来创建DataStreamWriter对象。 使用format()方法指定格式console

  3. DataStreamWriter实例上调用start()方法以开始流。

当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode将数据流式传输到控制台。

重要

避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define the schema of the source collection
StructType readSchema = new StructType()
.add("company_symbol", DataTypes.StringType)
.add("company_name", DataTypes.StringType)
.add("price", DataTypes.DoubleType)
.add("tx_time", DataTypes.TimestampType);
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream()
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. 创建一个从 MongoDB 读取数据的DataStreamReader对象。

  2. 通过在您使用DataStreamReader创建的流式DataFrame上调用writeStream()方法来创建DataStreamWriter对象。 使用format()方法指定格式console

  3. DataStreamWriter实例上调用start()方法以开始流。

当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode将数据流式传输到控制台。

重要

避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。

# create a local SparkSession
spark = SparkSession.builder \
.appName("readExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define the schema of the source collection
readSchema = (StructType()
.add('company_symbol', StringType())
.add('company_name', StringType())
.add('price', DoubleType())
.add('tx_time', TimestampType())
)
# define a streaming query
dataStreamWriter = (spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option('spark.mongodb.database', <database-name>)
.option('spark.mongodb.collection', <collection-name>)
.schema(readSchema)
.load()
# manipulate your streaming data
.writeStream
.format("console")
.trigger(continuous="1 second")
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. 创建一个从 MongoDB 读取数据的DataStreamReader对象。

  2. 通过在使用DataStreamReader创建的流式DataFrame对象上调用writeStream()方法来创建DataStreamWriter对象。 使用format()方法指定格式console

  3. DataStreamWriter实例上调用start()方法以开始流。

当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode将数据流式传输到控制台。

重要

避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。

// create a local SparkSession
val spark = SparkSession.builder
.appName("readExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define the schema of the source collection
val readSchema = StructType()
.add("company_symbol", StringType())
.add("company_name", StringType())
.add("price", DoubleType())
.add("tx_time", TimestampType())
// define a streaming query
val dataStreamWriter = spark.readStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.schema(readSchema)
.load()
// manipulate your streaming data
.writeStream
.format("console")
.trigger(Trigger.Continuous("1 second"))
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

重要

推断变更流的模式

如果将change.stream.publish.full.document.only选项设置为true ,Spark Connector 将使用扫描文档的模式推断DataFrame的模式。 如果将该选项设置为false ,则必须指定模式。

有关此设置的更多信息,并查看change stream配置选项的完整列表,请参阅读取配置选项指南。

要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档:

后退

流式处理模式