Menu Docs
Página inicial do Docs
/
Conector do Spark
/

Gravar no MongoDB no modo de transmissão

Para escrever dados no MongoDB, chame o método writeStream() em seu objeto Dataset<Row> . Este método retorna um objeto DataStreamWriter , que você pode usar para especificar o formato e outras definições de configuração para sua operação de gravação de streaming.

Você deve especificar as seguintes definições de configuração para gravar no MongoDB:

Contexto
Descrição
writeStream.format()
Especifica o formato da fonte de dados de saída subjacente. Use mongodb para escrever no MongoDB.
writeStream.option()

Especifica as configurações de stream, incluindo astring de conexão do MongoDB deployment, o banco de banco de dados e a collection do MongoDB e o diretório de checkpoint .

Para obter uma lista de opções de configuração de fluxo de gravação, consulte o guia Opções de configuração de gravação de streaming .

writeStream.outputMode()
Especifica como os dados de um DataFrame de streaming são gravados em um coletor de streaming. Para visualizar uma lista de todos os modos de saída suportados, consulte a documentação Java outputMode.
writeStream.trigger()

Especifica com que frequência o Spark Connector grava resultados no coletor de streaming. Chame esse método no objeto DataStreamWriter que você cria a partir do DataStreamReader que você configura.

Para usar o processamento contínuo, passe Trigger.Continuous(<time value>) como um argumento, em que <time value> é com que frequência você deseja que o Spark Connector faça o checkpoint de forma assíncrona. Se você passar qualquer outro método estático da classe Trigger ou se não chamar writeStream.trigger(), o Spark Connector usará o processamento de microlote.

Para visualizar uma lista de todas as políticas de processamento suportadas, consulte a documentação do Java trigger.

O seguinte trecho de código mostra como usar as definições de configuração anteriores para transmitir dados para o MongoDB:

<streaming DataFrame>.writeStream()
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append");

Para obter uma lista completa dos métodos, consulte a referência Streaming estruturado Java .

Para escrever dados no MongoDB, chame a função writeStream no seu objeto DataFrame . Essa função retorna um objeto DataStreamWriter , que você pode usar para especificar o formato e outras definições de configuração para sua operação de gravação de streaming.

Você deve especificar as seguintes definições de configuração para gravar no MongoDB:

Contexto
Descrição
writeStream.format()
Especifica o formato da fonte de dados de saída subjacente. Use mongodb para escrever no MongoDB.
writeStream.option()

Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e o diretório de checkpoint de verificação.

Para obter uma lista de opções de configuração de fluxo de gravação, consulte o guia Opções de configuração de fluxo de gravação .

writeStream.outputMode()
Especifica como o Spark Connector grava um DataFrame de streaming em um coletor de streaming. Para exibir uma lista de todos os modos de saída compatíveis, consulte a documentação do pyspark outputMode.
writeStream.trigger()

Especifica com que frequência o Spark Connector grava resultados no coletor de streaming. Chame esse método no objeto DataStreamWriter que você cria a partir do DataStreamReader que você configura.

Para utilizar o processamento contínuo, passe à função um valor de tempo utilizando o parâmetro continuous . Se você passar qualquer outro parâmetro nomeado ou se não chamar writeStream.trigger(), o Spark Connector usará o processamento em microlote.

Para ver uma lista de todas as políticas de processamento compatíveis, consulte a documentação do trigger pyspark.

O seguinte trecho de código mostra como usar as definições de configuração anteriores para transmitir dados para o MongoDB:

<streaming DataFrame>.writeStream \
.format("mongodb") \
.option("spark.mongodb.connection.uri", <mongodb-connection-string>) \
.option("spark.mongodb.database", <database-name>) \
.option("spark.mongodb.collection", <collection-name>) \
.outputMode("append")

Para obter uma lista completa de funções, consulte a referência de transmissão estruturada do pyspark.

Para escrever dados no MongoDB, chame o método write em seu objeto DataFrame . Este método retorna um objeto DataStreamWriter , que você pode usar para especificar o formato e outras definições de configuração para sua operação de gravação de streaming.

Você deve especificar as seguintes definições de configuração para gravar no MongoDB:

Contexto
Descrição
writeStream.format()
Especifica o formato da fonte de dados de saída subjacente. Use mongodb para escrever no MongoDB.
writeStream.option()

Especifica as configurações de fluxo, incluindo a string de conexão de implantação do MongoDB , o banco de banco de dados e a coleção do MongoDB e o diretório de checkpoint de verificação.

Para obter uma lista de opções de configuração de fluxo de gravação, consulte o guia Opções de configuração de fluxo de gravação .

writeStream.outputMode()
Especifica como o Spark Connector grava um DataFrame de streaming em um coletor de streaming. Para visualizar uma lista de todos os modos de saída suportados, consulte a documentação do Scala outputMode.
writeStream.trigger()

Especifica com que frequência o Spark Connector grava resultados no coletor de streaming. Chame esse método no objeto DataStreamWriter que você cria a partir do DataStreamReader que você configura.

Para usar o processamento contínuo, passe Trigger.Continuous(<time value>) como um argumento, em que <time value> é com que frequência você deseja que o Spark Connector faça o checkpoint de forma assíncrona. Se você passar qualquer outro método estático da classe Trigger ou se não chamar writeStream.trigger(), o Spark Connector usará o processamento de microlote.

Para visualizar uma lista de todas as políticas de processamento suportadas, consulte a documentação do trigger Scala.

O seguinte trecho de código mostra como usar as definições de configuração anteriores para transmitir dados para o MongoDB:

<streaming DataFrame>.writeStream
.format("mongodb")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")

Para obter uma lista completa dos métodos, consulte a referência de transmissão estruturada do Scala.

O exemplo a seguir mostra como transmitir dados de um arquivoCSV do para MongoDB:

  1. Crie um objeto DataStreamReader que leia a partir do arquivo CSV.

  2. Para criar um objeto DataStreamWriter , chame o método writeStream() na transmissão Dataset<Row> que você criou com o DataStreamReader. Utilize o método format() para especificar mongodb como o formato de dados subjacente.

  3. Ligue para o método start() no objeto DataStreamWriter para iniciar o stream.

Conforme o conector lê dados do arquivo CSV, ele adiciona esses dados ao MongoDB de acordo com o outputMode que você especificar.

// create a local SparkSession
SparkSession spark = SparkSession.builder()
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate();
// define a streaming query
DataStreamWriter<Row> dataStreamWriter = spark.readStream()
.format("csv")
.option("header", "true")
.schema("<csv-schema>")
.load("<csv-file-name>")
// manipulate your streaming data
.writeStream()
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", "<mongodb-connection-string>")
.option("spark.mongodb.database", "<database-name>")
.option("spark.mongodb.collection", "<collection-name>")
.outputMode("append");
// run the query
StreamingQuery query = dataStreamWriter.start();
  1. Crie um objeto DataStreamReader que leia a partir do arquivo CSV.

  2. Para criar um objeto DataStreamWriter , chame a função writeStream no streaming DataFrame que você criou com o DataStreamReader. Utilize a função format() para especificar mongodb como o formato de dados subjacente.

  3. Ligue para a função start() na instância DataStreamWriter para iniciar o fluxo.

Conforme o conector lê dados do arquivo CSV, ele adiciona esses dados ao MongoDB de acordo com o outputMode que você especificar.

# create a local SparkSession
spark = SparkSession.builder \
.appName("writeExample") \
.master("spark://spark-master:<port>") \
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>") \
.getOrCreate()
# define a streaming query
dataStreamWriter = (spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
# manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/pyspark/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
)
# run the query
query = dataStreamWriter.start()
  1. Crie um objeto DataStreamReader que leia a partir do arquivo CSV.

  2. Para criar um objeto DataStreamWriter , chame o método writeStream na transmissão DataFrame que você criou com o DataStreamReader. Utilize o método format() para especificar mongodb como o formato de dados subjacente.

  3. Chame o método start() na instância DataStreamWriter para iniciar o stream.

Conforme o conector lê dados do arquivo CSV, ele adiciona esses dados ao MongoDB de acordo com o outputMode que você especificar.

// create a local SparkSession
val spark = SparkSession.builder
.appName("writeExample")
.master("spark://spark-master:<port>")
.config("spark.jars", "<mongo-spark-connector-JAR-file-name>")
.getOrCreate()
// define a streaming query
val dataStreamWriter = spark.readStream
.format("csv")
.option("header", "true")
.schema(<csv-schema>)
.load(<csv-file-name>)
// manipulate your streaming data
.writeStream
.format("mongodb")
.option("checkpointLocation", "/tmp/")
.option("forceDeleteTempCheckpointLocation", "true")
.option("spark.mongodb.connection.uri", <mongodb-connection-string>)
.option("spark.mongodb.database", <database-name>)
.option("spark.mongodb.collection", <collection-name>)
.outputMode("append")
// run the query
val query = dataStreamWriter.start()

Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API:

Voltar

Configuração