Menu Docs
Página inicial do Docs
/
Conector do Spark
/

Ler do MongoDB no modo de lote

Nesta página

  • Visão geral
  • Inferência de esquema
  • Filtros
  • Queries SQL
  • Documentação da API

Para ler dados do MongoDB, ligue para o método read() em seu objeto SparkSession. Este método retorna um objeto DataFrameReader, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição
dataFrame.read.format()
Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.
dataFrame.read.option()

Use o método option para definir as configurações de leitura em lote, inclusive a string de conexão da implantação MongoDB , o banco de dados e a coleção do MongoDB e a configuração do particionador.

Para obter uma lista de opções de configuração de leitura em lote , consulte o guiaOpções de configuração de leitura em lote .

O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts no MongoDB:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Dica

Tipo de DataFrame

DataFrame não existe como uma classe na API Java . Utilize o Dataset<Row> para referenciar um DataFrame.

Para ler dados do MongoDB, chame a função read no objeto SparkSession. Esta função retorna um objeto DataFrameReader, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição
dataFrame.read.format()
Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.
dataFrame.read.option()

Use o método option para definir as configurações de leitura em lote, incluindo a cadeia de conexão da implantação do MongoDB, o banco de dados e coleção do MongoDB e a configuração do particionador.

Para obter uma lista de opções de configuração de leitura em lote, consulte o guia Opções de configuração de leitura em lote.

O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts no MongoDB:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ler dados do MongoDB, chame o método read em seu objeto SparkSession. Este método retorna um objeto DataFrameReader, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.

Você deve especificar as seguintes definições de configuração para ler do MongoDB:

Contexto
Descrição
dataFrame.read.format()
Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB.
dataFrame.read.option()

Use o método option para definir as configurações de leitura em lote, incluindo a cadeia de conexão da implantação do MongoDB, o banco de dados e coleção do MongoDB e a configuração do particionador.

Para obter uma lista de opções de configuração de leitura em lote, consulte o guia Opções de configuração de leitura em lote.

O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts no MongoDB:

val dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Dica

Tipo de DataFrame

Um DataFrame é representado por um Dataset de Row objetos. O tipo DataFrame é um alias para Dataset[Row].

Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.

Suponha que a coleção people.contacts do MongoDB contenha os seguintes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

A seguinte operação carrega dados do people.contacts e infere o esquema do DataFrame:

Dataset<Row> dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load();

Para ver o esquema inferido, use o método printSchema() no objeto Dataset<Row>, conforme o exemplo abaixo:

dataFrame.printSchema();
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver os dados no DataFrame, use o método show() em seu objeto DataFrame, conforme o exemplo abaixo:

dataFrame.show();
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.

Suponha que a coleção people.contacts do MongoDB contenha os seguintes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

A seguinte operação carrega dados do people.contacts e infere o esquema do DataFrame:

dataFrame = spark.read
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver o esquema inferido, use a função printSchema() no objeto DataFrame, conforme mostrado no exemplo a seguir:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver os dados no DataFrame, use a função show() no objeto DataFrame, conforme mostrado no exemplo a seguir:

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.

Suponha que a coleção people.contacts do MongoDB contenha os seguintes documentos:

{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 }
{ "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 }
{ "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 }
{ "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 }
{ "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 }
{ "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 }
{ "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 }
{ "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 }
{ "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 }
{ "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }

A seguinte operação carrega dados do people.contacts e infere o esquema do DataFrame:

val dataFrame = spark.read()
.format("mongodb")
.option("database", "people")
.option("collection", "contacts")
.load()

Para ver o esquema inferido, use o método printSchema() no objeto DataFrame, conforme o exemplo abaixo:

dataFrame.printSchema()
root
|-- _id: struct (nullable = true)
| |-- oid: string (nullable = true)
|-- age: integer (nullable = true)
|-- name: string (nullable = true)

Para ver os dados no DataFrame, use o método show() em seu objeto DataFrame, conforme o exemplo abaixo:

dataFrame.show()
+--------------------+----+-------------+
| _id| age| name|
+--------------------+----+-------------+
|[585024d558bef808...| 50|Bilbo Baggins|
|[585024d558bef808...|1000| Gandalf|
|[585024d558bef808...| 195| Thorin|
|[585024d558bef808...| 178| Balin|
|[585024d558bef808...| 77| Kíli|
|[585024d558bef808...| 169| Dwalin|
|[585024d558bef808...| 167| Óin|
|[585024d558bef808...| 158| Glóin|
|[585024d558bef808...| 82| Fíli|
|[585024d558bef808...|null| Bombur|
+--------------------+----+-------------+

Você pode especificar um esquema contendo valores de campo conhecidos para utilizar durante a inferência de esquema especificando a opção de configuração do schemaHint . Você pode especificar a opção schemaHint em qualquer um dos seguintes formatos do Spark:

Tipo
Formatar
DDL
<field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE>
SQL DDL
STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE>
JSON
{ "type": "struct", "fields": [
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> },
{ "name": "<field name>", "type": "<field type>", "nullable": <true/false> }]}

O exemplo a seguir mostra como especificar a opção schemaHint em cada formato usando o shell do Spark. O exemplo especifica um campo com valor de string denominado "value" e um campo com valor inteiro denominado "count".

import org.apache.spark.sql.types._
val mySchema = StructType(Seq(
StructField("value", StringType),
StructField("count", IntegerType))
// Generate DDL format
mySchema.toDDL
// Generate SQL DDL format
mySchema.sql
// Generate Simple String DDL format
mySchema.simpleString
// Generate JSON format
mySchema.json

Você também pode especificar a opção schemaHint no formato DDL de string simples ou no formato JSON usando o PySpark, conforme mostrado no exemplo a seguir:

from pyspark.sql.types import StructType, StructField, StringType, IntegerType
mySchema = StructType([
StructField('value', StringType(), True),
StructField('count', IntegerType(), True)])
# Generate Simple String DDL format
mySchema.simpleString()
# Generate JSON format
mySchema.json()

Ao usar filtros com DataFrames ou conjuntos de dados, o código do Conector MongoDB subjacente constrói um pipeline de agregação para filtrar os dados no MongoDB antes de enviá-los ao Spark. Isso melhora o desempenho do Spark ao recuperar e processar somente os dados necessários.

O Conector Spark do MongoDB transforma os seguintes filtros em fases do pipeline de agregação:

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

Use filter() para ler um subconjunto de dados de sua coleção do MongoDB.

Considere uma coleção denominada fruit que contenha os seguintes documentos:

{ "_id" : 1, "type" : "apple", "qty" : 5 }
{ "_id" : 2, "type" : "orange", "qty" : 10 }
{ "_id" : 3, "type" : "banana", "qty" : 15 }

Primeiro, configure um objeto DataFrame para se conectar com sua fonte de dados padrão do MongoDB:

df = spark.read.format("mongodb").load()

O exemplo abaixo inclui somente registros nos quais o campo qty é maior ou igual a 10.

df.filter(df['qty'] >= 10).show()

A operação imprime a seguinte saída:

+---+----+------+
|_id| qty| type|
+---+----+------+
|2.0|10.0|orange|
|3.0|15.0|banana|
+---+----+------+

Ao usar filtros com DataFrames ou conjuntos de dados, o código do Conector MongoDB subjacente constrói um pipeline de agregação para filtrar os dados no MongoDB antes de enviá-los ao Spark. Isso melhora o desempenho do Spark ao recuperar e processar somente os dados necessários.

O Conector Spark do MongoDB transforma os seguintes filtros em fases do pipeline de agregação:

  • And

  • EqualNullSafe

  • EqualTo

  • GreaterThan

  • GreaterThanOrEqual

  • In

  • IsNull

  • LessThan

  • LessThanOrEqual

  • Not

  • Or

  • StringContains

  • StringEndsWith

  • StringStartsWith

O exemplo a seguir filtra e gera os caracteres com idades inferiores a 100:

df.filter(df("age") < 100).show()

A operação produz o seguinte:

+--------------------+---+-------------+
| _id|age| name|
+--------------------+---+-------------+
|[5755d7b4566878c9...| 50|Bilbo Baggins|
|[5755d7b4566878c9...| 82| Fíli|
|[5755d7b4566878c9...| 77| Kíli|
+--------------------+---+-------------+

Antes de executar consultas SQL em seu conjunto de dados, é necessário registrar uma visualização temporária para o conjunto de dados.

A operação abaixo registra uma tabela characters e, em seguida, a consulta para localizar todos os caracteres com 100 ou mais:

implicitDS.createOrReplaceTempView("characters");
Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100");
centenarians.show();

centenarians.show() gera a saída:

+-------+----+
| name| age|
+-------+----+
|Gandalf|1000|
| Thorin| 195|
| Balin| 178|
| Dwalin| 169|
| Óin| 167|
| Glóin| 158|
+-------+----+

Antes de executar consultas SQL em seu DataFrame, você precisa registrar uma tabela temporária.

O exemplo a seguir registra uma tabela temporária chamada temp e, em seguida, usa o SQL para consultar registros nos quais o campo type contém a letra e:

df.createOrReplaceTempView("temp")
some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'")
some_fruit.show()

No shell pyspark, a operação imprime a seguinte saída:

+------+----+
| type| qty|
+------+----+
| apple| 5.0|
|orange|10.0|
+------+----+

Antes de executar consultas SQL em seu conjunto de dados, é necessário registrar uma visualização temporária para o conjunto de dados.

A operação abaixo registra uma tabela characters e, em seguida, a consulta para localizar todos os caracteres com 100 ou mais:

val characters = spark.read.format("mongodb").as[Character]
characters.createOrReplaceTempView("characters")
val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100")
centenarians.show()

Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API:

Voltar

Modo de lote