Ler do MongoDB no modo de lote
Visão geral
Para ler dados do MongoDB, ligue para o método read()
em seu objeto SparkSession
. Este método retorna um objeto DataFrameReader
, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.
Você deve especificar as seguintes definições de configuração para ler do MongoDB:
Contexto | Descrição |
---|---|
dataFrame.read.format() | Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB. |
dataFrame.read.option() | Use o método Para obter uma lista de opções de configuração de leitura em lote , consulte o guiaOpções de configuração de leitura em lote . |
O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts
no MongoDB:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Dica
Tipo de DataFrame
DataFrame
não existe como uma classe na API Java . Utilize o Dataset<Row>
para referenciar um DataFrame.
Para ler dados do MongoDB, chame a função read
no objeto SparkSession
. Esta função retorna um objeto DataFrameReader
, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.
Você deve especificar as seguintes definições de configuração para ler do MongoDB:
Contexto | Descrição |
---|---|
dataFrame.read.format() | Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB. |
dataFrame.read.option() | Use o método Para obter uma lista de opções de configuração de leitura em lote, consulte o guia Opções de configuração de leitura em lote. |
O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts
no MongoDB:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ler dados do MongoDB, chame o método read
em seu objeto SparkSession
. Este método retorna um objeto DataFrameReader
, que você pode utilizar para especificar o formato e outras definições de configuração para sua operação de leitura em lote.
Você deve especificar as seguintes definições de configuração para ler do MongoDB:
Contexto | Descrição |
---|---|
dataFrame.read.format() | Especifica o formato da fonte de dados de entrada subjacente. Use mongodb para ler a partir do MongoDB. |
dataFrame.read.option() | Use o método Para obter uma lista de opções de configuração de leitura em lote, consulte o guia Opções de configuração de leitura em lote. |
O seguinte exemplo de código mostra como utilizar as definições de configuração anteriores para ler dados do people.contacts
no MongoDB:
val dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Dica
Tipo de DataFrame
Um DataFrame é representado por um Dataset
de Row
objetos. O tipo DataFrame
é um alias para Dataset[Row]
.
Inferência de esquema
Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.
Suponha que a coleção people.contacts
do MongoDB contenha os seguintes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
A seguinte operação carrega dados do people.contacts
e infere o esquema do DataFrame:
Dataset<Row> dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load();
Para ver o esquema inferido, use o método printSchema()
no objeto Dataset<Row>
, conforme o exemplo abaixo:
dataFrame.printSchema();
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver os dados no DataFrame, use o método show()
em seu objeto DataFrame
, conforme o exemplo abaixo:
dataFrame.show();
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.
Suponha que a coleção people.contacts
do MongoDB contenha os seguintes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
A seguinte operação carrega dados do people.contacts
e infere o esquema do DataFrame:
dataFrame = spark.read .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ver o esquema inferido, use a função printSchema()
no objeto DataFrame
, conforme mostrado no exemplo a seguir:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver os dados no DataFrame, use a função show()
no objeto DataFrame
, conforme mostrado no exemplo a seguir:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
Ao carregar um conjunto de dados ou DataFrame sem um esquema, o Spark faz uma amostra dos registros para inferir o esquema da coleção.
Suponha que a coleção people.contacts
do MongoDB contenha os seguintes documentos:
{ "_id" : ObjectId("585024d558bef808ed84fc3e"), "name" : "Bilbo Baggins", "age" : 50 } { "_id" : ObjectId("585024d558bef808ed84fc3f"), "name" : "Gandalf", "age" : 1000 } { "_id" : ObjectId("585024d558bef808ed84fc40"), "name" : "Thorin", "age" : 195 } { "_id" : ObjectId("585024d558bef808ed84fc41"), "name" : "Balin", "age" : 178 } { "_id" : ObjectId("585024d558bef808ed84fc42"), "name" : "Kíli", "age" : 77 } { "_id" : ObjectId("585024d558bef808ed84fc43"), "name" : "Dwalin", "age" : 169 } { "_id" : ObjectId("585024d558bef808ed84fc44"), "name" : "Óin", "age" : 167 } { "_id" : ObjectId("585024d558bef808ed84fc45"), "name" : "Glóin", "age" : 158 } { "_id" : ObjectId("585024d558bef808ed84fc46"), "name" : "Fíli", "age" : 82 } { "_id" : ObjectId("585024d558bef808ed84fc47"), "name" : "Bombur" }
A seguinte operação carrega dados do people.contacts
e infere o esquema do DataFrame:
val dataFrame = spark.read() .format("mongodb") .option("database", "people") .option("collection", "contacts") .load()
Para ver o esquema inferido, use o método printSchema()
no objeto DataFrame
, conforme o exemplo abaixo:
dataFrame.printSchema()
root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- age: integer (nullable = true) |-- name: string (nullable = true)
Para ver os dados no DataFrame, use o método show()
em seu objeto DataFrame
, conforme o exemplo abaixo:
dataFrame.show()
+--------------------+----+-------------+ | _id| age| name| +--------------------+----+-------------+ |[585024d558bef808...| 50|Bilbo Baggins| |[585024d558bef808...|1000| Gandalf| |[585024d558bef808...| 195| Thorin| |[585024d558bef808...| 178| Balin| |[585024d558bef808...| 77| Kíli| |[585024d558bef808...| 169| Dwalin| |[585024d558bef808...| 167| Óin| |[585024d558bef808...| 158| Glóin| |[585024d558bef808...| 82| Fíli| |[585024d558bef808...|null| Bombur| +--------------------+----+-------------+
Especificar campos conhecidos com dicas de esquema
Você pode especificar um esquema contendo valores de campo conhecidos para utilizar durante a inferência de esquema especificando a opção de configuração do schemaHint
. Você pode especificar a opção schemaHint
em qualquer um dos seguintes formatos do Spark:
Tipo | Formatar | |||
---|---|---|---|---|
DDL | <field one name> <FIELD ONE TYPE>, <field two name> <FIELD TWO TYPE> | |||
SQL DDL | STRUCT<<field one name>: <FIELD ONE TYPE>, <field two name>: <FIELD TWO TYPE> | |||
JSON |
|
O exemplo a seguir mostra como especificar a opção schemaHint
em cada formato usando o shell do Spark. O exemplo especifica um campo com valor de string denominado "value"
e um campo com valor inteiro denominado "count"
.
import org.apache.spark.sql.types._ val mySchema = StructType(Seq( StructField("value", StringType), StructField("count", IntegerType)) // Generate DDL format mySchema.toDDL // Generate SQL DDL format mySchema.sql // Generate Simple String DDL format mySchema.simpleString // Generate JSON format mySchema.json
Você também pode especificar a opção schemaHint
no formato DDL de string simples ou no formato JSON usando o PySpark, conforme mostrado no exemplo a seguir:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType mySchema = StructType([ StructField('value', StringType(), True), StructField('count', IntegerType(), True)]) # Generate Simple String DDL format mySchema.simpleString() # Generate JSON format mySchema.json()
Filtros
Ao usar filtros com DataFrames ou conjuntos de dados, o código do Conector MongoDB subjacente constrói um pipeline de agregação para filtrar os dados no MongoDB antes de enviá-los ao Spark. Isso melhora o desempenho do Spark ao recuperar e processar somente os dados necessários.
O Conector Spark do MongoDB transforma os seguintes filtros em fases do pipeline de agregação:
And
EqualNullSafe
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
LessThan
LessThanOrEqual
Not
Or
StringContains
StringEndsWith
StringStartsWith
Use filter()
para ler um subconjunto de dados de sua coleção do MongoDB.
Considere uma coleção denominada fruit
que contenha os seguintes documentos:
{ "_id" : 1, "type" : "apple", "qty" : 5 } { "_id" : 2, "type" : "orange", "qty" : 10 } { "_id" : 3, "type" : "banana", "qty" : 15 }
Primeiro, configure um objeto DataFrame
para se conectar com sua fonte de dados padrão do MongoDB:
df = spark.read.format("mongodb").load()
O exemplo abaixo inclui somente registros nos quais o campo qty
é maior ou igual a 10
.
df.filter(df['qty'] >= 10).show()
A operação imprime a seguinte saída:
+---+----+------+ |_id| qty| type| +---+----+------+ |2.0|10.0|orange| |3.0|15.0|banana| +---+----+------+
Ao usar filtros com DataFrames ou conjuntos de dados, o código do Conector MongoDB subjacente constrói um pipeline de agregação para filtrar os dados no MongoDB antes de enviá-los ao Spark. Isso melhora o desempenho do Spark ao recuperar e processar somente os dados necessários.
O Conector Spark do MongoDB transforma os seguintes filtros em fases do pipeline de agregação:
And
EqualNullSafe
EqualTo
GreaterThan
GreaterThanOrEqual
In
IsNull
LessThan
LessThanOrEqual
Not
Or
StringContains
StringEndsWith
StringStartsWith
O exemplo a seguir filtra e gera os caracteres com idades inferiores a 100:
df.filter(df("age") < 100).show()
A operação produz o seguinte:
+--------------------+---+-------------+ | _id|age| name| +--------------------+---+-------------+ |[5755d7b4566878c9...| 50|Bilbo Baggins| |[5755d7b4566878c9...| 82| Fíli| |[5755d7b4566878c9...| 77| Kíli| +--------------------+---+-------------+
Queries SQL
Antes de executar consultas SQL em seu conjunto de dados, é necessário registrar uma visualização temporária para o conjunto de dados.
A operação abaixo registra uma tabela characters
e, em seguida, a consulta para localizar todos os caracteres com 100 ou mais:
implicitDS.createOrReplaceTempView("characters"); Dataset<Row> centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100"); centenarians.show();
centenarians.show()
gera a saída:
+-------+----+ | name| age| +-------+----+ |Gandalf|1000| | Thorin| 195| | Balin| 178| | Dwalin| 169| | Óin| 167| | Glóin| 158| +-------+----+
Antes de executar consultas SQL em seu DataFrame, você precisa registrar uma tabela temporária.
O exemplo a seguir registra uma tabela temporária chamada temp
e, em seguida, usa o SQL para consultar registros nos quais o campo type
contém a letra e
:
df.createOrReplaceTempView("temp") some_fruit = spark.sql("SELECT type, qty FROM temp WHERE type LIKE '%e%'") some_fruit.show()
No shell pyspark
, a operação imprime a seguinte saída:
+------+----+ | type| qty| +------+----+ | apple| 5.0| |orange|10.0| +------+----+
Antes de executar consultas SQL em seu conjunto de dados, é necessário registrar uma visualização temporária para o conjunto de dados.
A operação abaixo registra uma tabela characters
e, em seguida, a consulta para localizar todos os caracteres com 100 ou mais:
val characters = spark.read.format("mongodb").as[Character] characters.createOrReplaceTempView("characters") val centenarians = spark.sql("SELECT name, age FROM characters WHERE age >= 100") centenarians.show()
Documentação da API
Para saber mais sobre os tipos usados nestes exemplos, consulte a seguinte documentação do Apache Spark API: