Acessar dados de um observável
Nesta página
- Visão geral
- Como processar um observável
- Dados de amostra
- Usar chamadas de resposta para processar resultados
- Acesse os resultados da operação de leitura
- Acesse os resultados da operação de gravação
- Usar funções lambda para processar resultados
- Exemplo
- Use futuros para recuperar todos os resultados
- Exemplo
- Documentação da API
Visão geral
Neste guia, você aprenderá como acessar os resultados das operações do MongoDB a partir de uma instância Observable
.
Um Observable
representa um fluxo de dados emitidos por uma operação ao longo do tempo. Para acessar esses dados, você pode criar uma instância do Observer
que assina o Observable
.
Observação
O driver Scala é baseado no driver MongoDB Java Reactive Streams. A classe Observable
estende a classe Publisher
do Java Reactive Streams e inclui métodos de conveniência adicionais para ajudar a processar os resultados.
Como processar um observável
Para executar uma operação MongoDB e processar seus dados, você deve solicitar os resultados da operação de um Observable
. O driver fornece a interface Observable
para operações que retornam qualquer número de resultados. As operações que não produzem resultados ou que produzem um resultado, como o método findOne()
, retornam um SingleObservable[T]
. A parametrização [T]
corresponde ao tipo de dados que o SingleObservable
manipula.
As operações que podem produzir um número ilimitado de resultados retornam uma instância do tipo Observable[T]
. Algumas operações retornam tipos específicos de Observable
que fornecem métodos adicionais para filtrar e processar resultados antes de assiná-los. A lista a seguir descreve alguns tipos que permitem encadear métodos específicos de operação ao Observable
:
FindObservable[T]
: retornado pelo métodofind()
DistinctObservable[T]
: retornado pelo métododistinct()
AggregateObservable[T]
: retornado pelo métodoaggregate()
Você pode solicitar os resultados da operação ligando para o método subscribe()
no Observable
da operação. Passe uma instância da classe Observer
como parâmetro para o método subscribe()
. Este Observer
recebe os resultados da operação do Observable
. Em seguida, você pode usar os métodos fornecidos pela classe Observer
para imprimir resultados, gerenciar erros e executar processamento de dados adicional.
Para saber mais sobre o processamento de resultados, consulte a seguinte documentação da API:
Dados de amostra
Os exemplos neste guia usam a restaurants
collection no sample_restaurants
banco de dados dos conjuntos de dados de amostra do Atlas . Para acessar essa collection a partir do seu aplicação Scala, crie um MongoClient
que se conecte a um Atlas cluster e atribua os seguintes valores às suas variáveis database
collection
e:
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
Para saber como criar um cluster MongoDB Atlas gratuito e carregar os conjuntos de dados de amostra, consulte o guia Iniciar com Atlas .
Usar chamadas de resposta para processar resultados
Após assinar um Observable[T]
, você pode utilizar os seguintes métodos de chamada de resposta fornecidos pela classe Observer
para acessar os resultados da operação ou gerenciar erros:
onNext(result: TResult)
: chamado quando oObserver
recebe novos resultados. Você pode definir a lógica para processar resultados substituindo esse método.onError(e: Throwable)
: chamado quando a operação gera um erro e impede que oObserver
receba mais dados doObservable
. Você pode definir a lógica de tratamento de erros substituindo esse método.onComplete()
: Chamado quandoObserver
tiver consumido todos os resultados deObservable
. Você pode realizar qualquer processamento de dados final substituindo este método.
As seções seguintes mostram como personalizar estes métodos para processar resultados de operação de leitura e escrita de um Observable
.
Acesse os resultados da operação de leitura
Para acessar dados recuperados por uma operação de leitura, crie um Observable[T]
para armazenar os resultados da operação. Em seguida, assine o observável e substitua os métodos de chamada de resposta da classe Observer
para processar os resultados.
Este exemplo faz uma query da collection restaurants
para documentos nos quais o valor cuisine
é "Czech"
. Para recuperar e processar resultados, o exemplo atribui um Observable[Document]
à operação e executa as seguintes ações:
Chama o método
subscribe()
para assinar oObservable
e passa umObserver
como parâmetroSubstitui o método
onNext()
para imprimir cada documento recuperado, que são instânciasDocument
Substitui o método
onError()
para imprimir quaisquer errosSubstitui os métodos
onComplete()
para imprimir uma mensagem após todos os resultados deObservable
serem processados
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
Acesse os resultados da operação de gravação
Para acessar dados recuperados por uma operação de gravação, crie um Observable[T]
para armazenar os resultados da operação. Em seguida, assine o observável e substitua os métodos de chamada de resposta da classe Observer
para processar os resultados.
Este exemplo insere documentos na collection restaurants
em que o valor cuisine
é "Nepalese"
. Para recuperar e processar resultados, o exemplo atribui um Observable[InsertManyResult]
à operação e executa as seguintes ações:
Chama o método
subscribe()
para assinar oObservable
e passa umObserver
como parâmetroSubstitui o método
onNext()
para imprimir o resultado da operação de inserção, retornado comoInsertManyResult
Substitui o método
onError()
para imprimir quaisquer errosSubstitui os métodos
onComplete()
para imprimir uma mensagem após todos os resultados deObservable
serem processados
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Usar funções lambda para processar resultados
Em vez de substituir explicitamente as funções de chamada de resposta da classe Observer
, você pode utilizar funções lambda para processar de forma concisa os resultados da operação. Estas funções permitem a você utilizar a notação de seta =>
para personalizar a implementação de onNext()
, onError()
e onComplete()
.
Dica
Para saber mais sobre as funções lambda, também conhecidas como funções anônimas, consulte o verbete Função anônima na Wikipedia.
Exemplo
Este exemplo faz uma query da collection restaurants
para cada valor distinto do campo borough
. O código assina o Observable
retornado pelo método distinct()
e, em seguida, usa funções lambda para imprimir resultados e lidar com erros:
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
Use futuros para recuperar todos os resultados
Você pode assinar um Observable
implicitamente e agregar seus resultados chamando o método toFuture()
. Quando você chama toFuture()
em um Observable
, o driver executa as seguintes ações:
Assina o
Observable
Coleta os itens emitidos pelo
Observable
e os armazena em uma instância doFuture
Em seguida, você pode iterar pelo Future
para recuperar os resultados da operação.
Importante
Se Observable
o seu contiver um grande número de documentos, chamar o toFuture()
método consumirá significativa memória. Se você espera um conjunto de resultados grande, considere usar funções de chamada de resposta ou lambda para acessar os resultados.
Exemplo
Este exemplo faz uma query da collection restaurants
para documentos nos quais o valor do campo name
é "The Halal Guys"
. Para acessar os resultados da operação, o código converte Observable
em Future
, espera até que Future
colete cada resultado e itera pelos resultados:
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
Documentação da API
Para saber mais sobre qualquer um dos métodos ou tipos discutidos neste guia, consulte a seguinte documentação da API: