以流媒体模式写入 MongoDB
要将数据写入MongoDB,请对Dataset<Row>
对象调用 writeStream()
方法。 此方法返回一个DataStreamWriter
对象,您可以使用该对象指定流媒体写入操作的格式和其他配置设置。
您必须指定以下配置设置才能写入MongoDB:
设置 | 说明 |
---|---|
writeStream.format() | 指定根本的输出数据源的格式。 使用 mongodb 写入MongoDB。 |
writeStream.option() | |
writeStream.outputMode() | 指定如何将流媒体DataFrame 的数据写入流媒体接收器。 要查看所有支持的输出模式的列表,请参阅 Java outputMode 文档。 |
writeStream.trigger() | 指定Spark Connector将结果写入流媒体接收器的频率。 对您通过配置的 要使用连续处理,请将 要查看所有支持的处理策略的列表,请参阅Javatrigger 文档。 |
以下代码片段展示了如何使用以前的配置设置将数据流传输到MongoDB:
<streaming DataFrame>.writeStream() .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append");
有关方法的完整列表,请参阅 Java结构化流参考。
要将数据写入MongoDB,请对DataFrame
对象调用writeStream
函数。 此函数返回一个DataStreamWriter
对象,您可以使用该对象指定流媒体写入操作的格式和其他配置设置。
您必须指定以下配置设置才能写入MongoDB:
设置 | 说明 |
---|---|
writeStream.format() | 指定根本的输出数据源的格式。 使用 mongodb 写入MongoDB。 |
writeStream.option() | 指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及检查点目录。 有关写入流配置选项的列表,请参阅Streaming Write Configuration Options指南。 |
writeStream.outputMode() | 指定Spark Connector如何将流式处理 DataFrame 写入流媒体流媒体接收器。 要查看所有支持的输出模式的列表,请参阅 pyspark outputMode 文档。 |
writeStream.trigger() | 指定Spark Connector将结果写入流媒体接收器的频率。 对您通过配置的 要使用连续处理,请使用 要查看所有支持的处理策略的列表,请参阅 pysparktrigger 文档。 |
以下代码片段展示了如何使用以前的配置设置将数据流传输到MongoDB:
<streaming DataFrame>.writeStream \ .format("mongodb") \ .option("spark.mongodb.connection.uri", <mongodb-connection-string>) \ .option("spark.mongodb.database", <database-name>) \ .option("spark.mongodb.collection", <collection-name>) \ .outputMode("append")
有关函数的完整列表,请参阅 pyspark 结构化流参考。
要将数据写入MongoDB,请对DataFrame
对象调用write
方法。 此方法返回一个DataStreamWriter
对象,您可以使用该对象指定流媒体写入操作的格式和其他配置设置。
您必须指定以下配置设置才能写入MongoDB:
设置 | 说明 |
---|---|
writeStream.format() | 指定根本的输出数据源的格式。 使用 mongodb 写入MongoDB。 |
writeStream.option() | 指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及检查点目录。 有关写入流配置选项的列表,请参阅Streaming Write Configuration Options指南。 |
writeStream.outputMode() | 指定Spark Connector如何将流式处理 DataFrame 写入流媒体流媒体接收器。 要查看所有支持的输出模式的列表,请参阅 Scala outputMode 文档。 |
writeStream.trigger() | 指定Spark Connector将结果写入流媒体接收器的频率。 对您通过配置的 要使用连续处理,请将 要查看所有支持的处理策略的列表,请参阅Scalatrigger 文档。 |
以下代码片段展示了如何使用以前的配置设置将数据流传输到MongoDB:
<streaming DataFrame>.writeStream .format("mongodb") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append")
有关方法的完整列表,请参阅 Scala 结构化流参考。
例子
以下示例展示了如何将数据从 CSV文件流式传输到 MongoDB:
创建一个从 CSV 文件中读取数据的
DataStreamReader
对象。要创建
DataStreamWriter
对象,请在使用DataStreamReader
创建的流媒体处理Dataset<Row>
上调用writeStream()
方法。 使用format()
方法将mongodb
指定为根本的数据格式。对
DataStreamWriter
对象调用start()
方法以开始流。
当Connector从逗号分隔值(CSV)文件读取数据时,它会根据您指定的outputMode
将数据添加到MongoDB 。
// create a local SparkSession SparkSession spark = SparkSession.builder() .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate(); // define a streaming query DataStreamWriter<Row> dataStreamWriter = spark.readStream() .format("csv") .option("header", "true") .schema("<csv-schema>") .load("<csv-file-name>") // manipulate your streaming data .writeStream() .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", "<mongodb-connection-string>") .option("spark.mongodb.database", "<database-name>") .option("spark.mongodb.collection", "<collection-name>") .outputMode("append"); // run the query StreamingQuery query = dataStreamWriter.start();
创建一个从 CSV 文件中读取数据的
DataStreamReader
对象。要创建
DataStreamWriter
对象,请在使用DataStreamReader
创建的流媒体DataFrame
上调用writeStream
函数。 使用format()
函数指定mongodb
作为根本的数据格式。对
DataStreamWriter
实例调用start()
函数以开始流。
当Connector从逗号分隔值(CSV)文件读取数据时,它会根据您指定的outputMode
将数据添加到MongoDB 。
# create a local SparkSession spark = SparkSession.builder \ .appName("writeExample") \ .master("spark://spark-master:<port>") \ .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \ .getOrCreate() # define a streaming query dataStreamWriter = (spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) # manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/pyspark/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") ) # run the query query = dataStreamWriter.start()
创建一个从 CSV 文件中读取数据的
DataStreamReader
对象。要创建
DataStreamWriter
对象,请在使用DataStreamReader
创建的流媒体处理DataFrame
上调用writeStream
方法。 使用format()
方法将mongodb
指定为根本的数据格式。在
DataStreamWriter
实例上调用start()
方法以开始流。
当Connector从逗号分隔值(CSV)文件读取数据时,它会根据您指定的outputMode
将数据添加到MongoDB 。
// create a local SparkSession val spark = SparkSession.builder .appName("writeExample") .master("spark://spark-master:<port>") .config("spark.jars", "<mongo-spark-connector-JAR-file-name>") .getOrCreate() // define a streaming query val dataStreamWriter = spark.readStream .format("csv") .option("header", "true") .schema(<csv-schema>) .load(<csv-file-name>) // manipulate your streaming data .writeStream .format("mongodb") .option("checkpointLocation", "/tmp/") .option("forceDeleteTempCheckpointLocation", "true") .option("spark.mongodb.connection.uri", <mongodb-connection-string>) .option("spark.mongodb.database", <database-name>) .option("spark.mongodb.collection", <collection-name>) .outputMode("append") // run the query val query = dataStreamWriter.start()
API 文档
要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档: