Menu Docs
Página inicial do Docs
/ / /
Scala

Acessar dados de um observável

Nesta página

  • Visão geral
  • Como processar um observável
  • Dados de amostra
  • Usar chamadas de resposta para processar resultados
  • Acesse os resultados da operação de leitura
  • Acesse os resultados da operação de gravação
  • Usar funções lambda para processar resultados
  • Exemplo
  • Use futuros para recuperar todos os resultados
  • Exemplo
  • Documentação da API

Neste guia, você aprenderá como acessar os resultados das operações do MongoDB a partir de uma instância Observable.

Um Observable representa um fluxo de dados emitidos por uma operação ao longo do tempo. Para acessar esses dados, você pode criar uma instância do Observer que assina o Observable.

Observação

O driver Scala é baseado no driver MongoDB Java Reactive Streams. A classe Observable estende a classe Publisher do Java Reactive Streams e inclui métodos de conveniência adicionais para ajudar a processar os resultados.

Para executar uma operação MongoDB e processar seus dados, você deve solicitar os resultados da operação de um Observable. O driver fornece a interface Observable para operações que retornam qualquer número de resultados. As operações que não produzem resultados ou que produzem um resultado, como o método findOne(), retornam um SingleObservable[T]. A parametrização [T] corresponde ao tipo de dados que o SingleObservable manipula.

As operações que podem produzir um número ilimitado de resultados retornam uma instância do tipo Observable[T]. Algumas operações retornam tipos específicos de Observable que fornecem métodos adicionais para filtrar e processar resultados antes de assiná-los. A lista a seguir descreve alguns tipos que permitem encadear métodos específicos de operação ao Observable:

  • FindObservable[T]: retornado pelo método find()

  • DistinctObservable[T]: retornado pelo método distinct()

  • AggregateObservable[T]: retornado pelo método aggregate()

Você pode solicitar os resultados da operação ligando para o método subscribe() no Observable da operação. Passe uma instância da classe Observer como parâmetro para o método subscribe(). Este Observer recebe os resultados da operação do Observable. Em seguida, você pode usar os métodos fornecidos pela classe Observer para imprimir resultados, gerenciar erros e executar processamento de dados adicional.

Para saber mais sobre o processamento de resultados, consulte a seguinte documentação da API:

  • Observável

  • inscrição

  • Observer

Os exemplos neste guia usam a restaurants collection no sample_restaurants banco de dados dos conjuntos de dados de amostra do Atlas . Para acessar essa collection a partir do seu aplicação Scala, crie um MongoClient que se conecte a um Atlas cluster e atribua os seguintes valores às suas variáveis database collection e:

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

Para saber como criar um cluster MongoDB Atlas gratuito e carregar os conjuntos de dados de amostra, consulte o guia Iniciar com Atlas .

Após assinar um Observable[T], você pode utilizar os seguintes métodos de chamada de resposta fornecidos pela classe Observer para acessar os resultados da operação ou gerenciar erros:

  • onNext(result: TResult): chamado quando o Observer recebe novos resultados. Você pode definir a lógica para processar resultados substituindo esse método.

  • onError(e: Throwable): chamado quando a operação gera um erro e impede que o Observer receba mais dados do Observable. Você pode definir a lógica de tratamento de erros substituindo esse método.

  • onComplete(): Chamado quando Observer tiver consumido todos os resultados de Observable. Você pode realizar qualquer processamento de dados final substituindo este método.

As seções seguintes mostram como personalizar estes métodos para processar resultados de operação de leitura e escrita de um Observable.

Para acessar dados recuperados por uma operação de leitura, crie um Observable[T] para armazenar os resultados da operação. Em seguida, assine o observável e substitua os métodos de chamada de resposta da classe Observer para processar os resultados.

Este exemplo faz uma query da collection restaurants para documentos nos quais o valor cuisine é "Czech". Para recuperar e processar resultados, o exemplo atribui um Observable[Document] à operação e executa as seguintes ações:

  • Chama o método subscribe() para assinar o Observable e passa um Observer como parâmetro

  • Substitui o método onNext() para imprimir cada documento recuperado, que são instâncias Document

  • Substitui o método onError() para imprimir quaisquer erros

  • Substitui os métodos onComplete() para imprimir uma mensagem após todos os resultados de Observable serem processados

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

Para acessar dados recuperados por uma operação de gravação, crie um Observable[T] para armazenar os resultados da operação. Em seguida, assine o observável e substitua os métodos de chamada de resposta da classe Observer para processar os resultados.

Este exemplo insere documentos na collection restaurants em que o valor cuisine é "Nepalese". Para recuperar e processar resultados, o exemplo atribui um Observable[InsertManyResult] à operação e executa as seguintes ações:

  • Chama o método subscribe() para assinar o Observable e passa um Observer como parâmetro

  • Substitui o método onNext() para imprimir o resultado da operação de inserção, retornado como InsertManyResult

  • Substitui o método onError() para imprimir quaisquer erros

  • Substitui os métodos onComplete() para imprimir uma mensagem após todos os resultados de Observable serem processados

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

Em vez de substituir explicitamente as funções de chamada de resposta da classe Observer, você pode utilizar funções lambda para processar de forma concisa os resultados da operação. Estas funções permitem a você utilizar a notação de seta => para personalizar a implementação de onNext(), onError() e onComplete().

Dica

Para saber mais sobre as funções lambda, também conhecidas como funções anônimas, consulte o verbete Função anônima na Wikipedia.

Este exemplo faz uma query da collection restaurants para cada valor distinto do campo borough. O código assina o Observable retornado pelo método distinct() e, em seguida, usa funções lambda para imprimir resultados e lidar com erros:

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

Você pode assinar um Observable implicitamente e agregar seus resultados chamando o método toFuture(). Quando você chama toFuture() em um Observable, o driver executa as seguintes ações:

  • Assina o Observable

  • Coleta os itens emitidos pelo Observable e os armazena em uma instância do Future

Em seguida, você pode iterar pelo Future para recuperar os resultados da operação.

Importante

Se Observable o seu contiver um grande número de documentos, chamar o toFuture() método consumirá significativa memória. Se você espera um conjunto de resultados grande, considere usar funções de chamada de resposta ou lambda para acessar os resultados.

Este exemplo faz uma query da collection restaurants para documentos nos quais o valor do campo name é "The Halal Guys". Para acessar os resultados da operação, o código converte Observable em Future, espera até que Future colete cada resultado e itera pelos resultados:

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

Para saber mais sobre qualquer um dos métodos ou tipos discutidos neste guia, consulte a seguinte documentação da API:

Voltar

aggregation de dados