Docs 菜单
Docs 主页
/
Spark Connector
/

以流媒体模式写入 MongoDB

要将数据写入MongoDB,请对Dataset<Row>对象调用 writeStream()方法。 此方法返回一个DataStreamWriter对象,您可以使用该对象指定流媒体写入操作的格式和其他配置设置。

您必须指定以下配置设置才能写入MongoDB:

设置
说明
writeStream.format()
指定根本的输出数据源的格式。 使用mongodb写入MongoDB。
writeStream.option()

指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及检查点目录。

有关写入流配置选项的列表,请参阅流写入配置选项指南。

writeStream.outputMode()
指定如何将流媒体DataFrame 的数据写入流媒体接收器。 要查看所有支持的输出模式的列表,请参阅 Java outputMode 文档。
writeStream.trigger()

指定Spark Connector将结果写入流媒体接收器的频率。 对您通过配置的DataStreamReader创建的DataStreamWriter对象调用此方法。

要使用连续处理,请将Trigger.Continuous(<time value>)作为参数传递,其中<time value>是您希望Spark Connector异步设置检查点的频率。 如果传递 Trigger 类的任何其他静态方法,或者不调用 writeStream.trigger(),则Spark Connector将改用微处理。

要查看所有支持的处理策略的列表,请参阅Javatrigger 文档。

以下代码片段展示了如何使用以前的配置设置将数据流传输到MongoDB:

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

有关方法的完整列表,请参阅 Java结构化流参考。

要将数据写入MongoDB,请对DataFrame对象调用writeStream函数。 此函数返回一个DataStreamWriter对象,您可以使用该对象指定流媒体写入操作的格式和其他配置设置。

您必须指定以下配置设置才能写入MongoDB:

设置
说明
writeStream.format()
指定根本的输出数据源的格式。 使用mongodb写入MongoDB。
writeStream.option()

指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及检查点目录。

有关写入流配置选项的列表,请参阅Streaming Write Configuration Options指南。

writeStream.outputMode()
指定Spark Connector如何将流式处理 DataFrame 写入流媒体流媒体接收器。 要查看所有支持的输出模式的列表,请参阅 pyspark outputMode 文档。
writeStream.trigger()

指定Spark Connector将结果写入流媒体接收器的频率。 对您通过配置的DataStreamReader创建的DataStreamWriter对象调用此方法。

要使用连续处理,请使用continuous参数向函数传递时间值。 如果传递任何其他命名参数,或者不调用writeStream.trigger() ,则Spark Connector将改用微处理。

要查看所有支持的处理策略的列表,请参阅 pysparktrigger 文档。

以下代码片段展示了如何使用以前的配置设置将数据流传输到MongoDB:

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

有关函数的完整列表,请参阅 pyspark 结构化流参考。

要将数据写入MongoDB,请对DataFrame对象调用write方法。 此方法返回一个DataStreamWriter对象,您可以使用该对象指定流媒体写入操作的格式和其他配置设置。

您必须指定以下配置设置才能写入MongoDB:

设置
说明
writeStream.format()
指定根本的输出数据源的格式。 使用mongodb写入MongoDB。
writeStream.option()

指定流设置,包括MongoDB部署连接string 、 MongoDB数据库和集合以及检查点目录。

有关写入流配置选项的列表,请参阅Streaming Write Configuration Options指南。

writeStream.outputMode()
指定Spark Connector如何将流式处理 DataFrame 写入流媒体流媒体接收器。 要查看所有支持的输出模式的列表,请参阅 Scala outputMode 文档。
writeStream.trigger()

指定Spark Connector将结果写入流媒体接收器的频率。 对您通过配置的DataStreamReader创建的DataStreamWriter对象调用此方法。

要使用连续处理,请将Trigger.Continuous(<time value>)作为参数传递,其中<time value>是您希望Spark Connector异步设置检查点的频率。 如果传递 Trigger 类的任何其他静态方法,或者不调用 writeStream.trigger(),则Spark Connector将改用微处理。

要查看所有支持的处理策略的列表,请参阅Scalatrigger 文档。

以下代码片段展示了如何使用以前的配置设置将数据流传输到MongoDB:

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

有关方法的完整列表,请参阅 Scala 结构化流参考。

以下示例展示了如何将数据从 CSV文件流式传输到 MongoDB:

  1. 创建一个从 CSV 文件中读取数据的DataStreamReader对象。

  2. 要创建DataStreamWriter对象,请在使用DataStreamReader创建的流媒体处理Dataset<Row>上调用writeStream()方法。 使用format()方法将mongodb指定为根本的数据格式。

  3. DataStreamWriter对象调用start()方法以开始流。

当Connector从逗号分隔值(CSV)文件读取数据时,它会根据您指定的outputMode将数据添加到MongoDB 。

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. 创建一个从 CSV 文件中读取数据的DataStreamReader对象。

  2. 要创建DataStreamWriter对象,请在使用DataStreamReader创建的流媒体DataFrame上调用writeStream函数。 使用format()函数指定mongodb作为根本的数据格式。

  3. DataStreamWriter实例调用start()函数以开始流。

当Connector从逗号分隔值(CSV)文件读取数据时,它会根据您指定的outputMode将数据添加到MongoDB 。

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. 创建一个从 CSV 文件中读取数据的DataStreamReader对象。

  2. 要创建DataStreamWriter对象,请在使用DataStreamReader创建的流媒体处理DataFrame上调用writeStream方法。 使用format()方法将mongodb指定为根本的数据格式。

  3. DataStreamWriter实例上调用start()方法以开始流。

当Connector从逗号分隔值(CSV)文件读取数据时,它会根据您指定的outputMode将数据添加到MongoDB 。

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

要了解有关这些示例中使用的类型的更多信息,请参阅以下 Apache Spark API 文档:

后退

流式读取配置选项