以流媒体模式从 MongoDB 读取
Overview
从 MongoDB 数据库读取流时,MongoDB Spark Connector 支持微批处理和连续处理。 微批处理是默认的处理引擎,可实现低至 100 毫秒的端到端延迟,并具有一次性容错保证。连续处理是 Spark 2.3 版中引入的一项实验性功能,可在保证至少一次的情况下实现低至 1 毫秒的端到端延迟。
要学习;了解有关连续处理的详情,请参阅 Spark文档。
注意
连接器从 MongoDB 部署的变更流中读取数据。如需在变更流上生成更改事件,请对数据库上执行更新操作。
如需了解有关变更流的更多信息,请参阅 MongoDB 手册中的“变更流”部分。
要从MongoDB读取数据,请对SparkSession
对象调用 readStream()
方法。 此方法会返回一个DataStreamReader
对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
readStream.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。 |
readStream.option() | |
readStream.schema() | 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir
一次。将Trigger.Continuous
参数传递给trigger()
方法可启用连续处理。
import org.apache.spark.sql.streaming.Trigger; Dataset<Row> streamingDataset = <local SparkSession>.readStream() .format("mongodb") .load(); DataStreamWriter<Row> dataStreamWriter = streamingDataset.writeStream() .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append"); StreamingQuery query = dataStreamWriter.start();
注意
在您对流媒体查询调用start()
方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅 Java结构化流参考。
要从 MongoDB 读取数据,请对SparkSession
对象调用readStream
函数。 此函数返回一个DataStreamReader
对象,您可以使用该对象指定流式读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
readStream.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。 |
readStream.option() | |
readStream.schema() | 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir
一次。将continuous
参数传递给trigger()
方法可启用连续处理。
streamingDataFrame = (<local SparkSession>.readStream .format("mongodb") .load() ) dataStreamWriter = (streamingDataFrame.writeStream .trigger(continuous="1 second") .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") ) query = dataStreamWriter.start()
注意
在您对流媒体查询调用start()
方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅 pyspark 结构化流参考。
要从 MongoDB 读取数据,请对SparkSession
对象调用readStream
方法。 此方法会返回一个DataStreamReader
对象,您可以使用该对象指定流媒体读取操作的格式和其他配置设置。
您必须指定以下配置设置才能从 MongoDB 读取:
设置 | 说明 |
---|---|
readStream.format() | 指定底层输入数据源的格式。使用 mongodb 从 MongoDB 读取数据。 |
readStream.option() | |
readStream.schema() | 指定输入模式。 |
以下代码片段展示了如何使用前面的配置设置来持续处理从 MongoDB 流式传输的数据。 connector将所有新数据附加到现有数据,并每秒将检查点异步写入/tmp/checkpointDir
一次。将Trigger.Continuous
参数传递给trigger()
方法可启用连续处理。
import org.apache.spark.sql.streaming.Trigger val streamingDataFrame = <local SparkSession>.readStream .format("mongodb") .load() val dataStreamWriter = streamingDataFrame.writeStream .trigger(Trigger.Continuous("1 second")) .format("memory") .option("checkpointLocation", "/tmp/checkpointDir") .outputMode("append") val query = dataStreamWriter.start()
注意
在您对流媒体查询调用start()
方法之前,Spark 不会开始流媒体。
有关方法的完整列表,请参阅 Scala 结构化流参考。
例子
以下示例展示了如何将数据从 MongoDB 流式传输到控制台。
创建一个从 MongoDB 读取数据的
DataStreamReader
对象。通过在使用
DataStreamReader
创建的流式Dataset
对象上调用writeStream()
方法来创建DataStreamWriter
对象。 使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode
将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define the schema of the source collection StructType readSchema = new StructType() .add("company_symbol", DataTypes.StringType) .add("company_name", DataTypes.StringType) .add("price", DataTypes.DoubleType) .add("tx_time", DataTypes.TimestampType); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("mongodb") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .schema(readSchema) .load() // manipulate your streaming data .writeStream() .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
创建一个从 MongoDB 读取数据的
DataStreamReader
对象。通过在您使用
DataStreamReader
创建的流式DataFrame
上调用writeStream()
方法来创建DataStreamWriter
对象。 使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode
将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
# create a local SparkSession spark = SparkSession.builder \ .appName("readExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define the schema of the source collection readSchema = (StructType() .add('company_symbol', StringType()) .add('company_name', StringType()) .add('price', DoubleType()) .add('tx_time', TimestampType()) ) # define a streaming query dataStreamWriter = (spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option('spark.mongodb.database', <database-name>) .option('spark.mongodb.collection', <collection-name>) .schema(readSchema) .load() # manipulate your streaming data .writeStream .format("console") .trigger(continuous="1 second") .outputMode("append") ) # run the query query = dataStreamWriter.start()
创建一个从 MongoDB 读取数据的
DataStreamReader
对象。通过在使用
DataStreamReader
创建的流式DataFrame
对象上调用writeStream()
方法来创建DataStreamWriter
对象。 使用format()
方法指定格式console
。在
DataStreamWriter
实例上调用start()
方法以开始流。
当新数据插入 MongoDB 时,MongoDB 会根据您指定的outputMode
将数据流式传输到控制台。
重要
避免将大型数据集流式传输到控制台。 流媒体到控制台会占用大量内存,且仅用于测试目的。
// create a local SparkSession val spark = SparkSession.builder .appName("readExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define the schema of the source collection val readSchema = StructType() .add("company_symbol", StringType()) .add("company_name", StringType()) .add("price", DoubleType()) .add("tx_time", TimestampType()) // define a streaming query val dataStreamWriter = spark.readStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .schema(readSchema) .load() // manipulate your streaming data .writeStream .format("console") .trigger(Trigger.Continuous("1 second")) .outputMode("append") // run the query val query = dataStreamWriter.start()
重要
推断变更流的模式
如果将change.stream.publish.full.document.only
选项设置为true
,Spark Connector 将使用扫描文档的模式推断DataFrame
的模式。 如果将该选项设置为false
,则必须指定模式。
有关此设置的更多信息,并查看change stream配置选项的完整列表,请参阅读取配置选项指南。
API 文档
要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档: