Observables
Nesta página
O driver Scala é um driver assíncrono e não bloqueante. Ao implementar o modelo Observable
, eventos assíncronos se tornam operações simples e compostas que estão livres da complexidade das chamadas de resposta aninhadas.
Para operações assíncronas, existem três interfaces:
Observação
O driver é baseado no driver MongoDB Reactive Streams e é uma implementação da especificação de reactive streams. Observable
é uma implementação de Publisher
e Observer
é uma implementação de Subscriber
.
Aplicam-se as seguintes convenções de nomenclatura de classe:
Observable
: implementação personalizada de umPublisher
Observer
: implementação personalizada de umSubscriber
Subscription
Observável
O Observable
é uma implementação do Publisher
estendida e, em geral, representa uma operação MongoDB que emite seus resultados para o Observer
baseado em uma solicitação do Subscription
para o Observable
.
Importante
Observable
pode ser considerado uma função parcial. Como nas funções parciais, nada acontece até que elas sejam chamadas. Um Observable
pode ser inscrito várias vezes, e cada inscrição pode causar novos efeitos colaterais, como fazer query no MongoDB ou inserir dados.
Observável única
A única observável feature é uma Publisher
implementação que retorna somente um único item. Ele pode ser usado da mesma forma que um Observable
comum.
inscrição
Um Subscription
representa um ciclo de vida individual de um Observer
assinando um Observable
. Um Subscription
a um Observable
só pode ser usado por um único Observer
. O objetivo de um Subscription
é controlar a demanda e permitir o cancelamento da assinatura do Observable
.
Observer
Um Observer
fornece o mecanismo para receber notificações baseadas em push do Observable
. A demanda por esses eventos é sinalizada por seu Subscription
.
Após a inscrição de um Observable[TResult]
, o Observer
será passado para o Subscription
embora o método onSubscribe(subscription:
Subscription)
. A demanda por resultados é sinalizada por meio do Subscription
e quaisquer resultados são passados para o método onNext(result:
TResult)
. Se houver um erro por qualquer motivo, o método onError(e:
Throwable)
será chamado e mais nenhum evento será passado para Observer
. Como alternativa, quando Observer
tiver consumido todos os resultados de Observable
, o método onComplete()
será chamado.
Contrapressão
No exemplo a seguir, o Subscription
é usado para controlar a demanda ao iterar um Observable
. A implementação padrão do Observer
solicita automaticamente todos os dados. Abaixo, substituímos o método personalizado onSubscribe()
para que possamos gerenciar a iteração orientada pela demanda do Observable
:
collection.find().subscribe(new Observer[Document](){ var batchSize: Long = 10 var seen: Long = 0 var subscription: Option[Subscription] = None override def onSubscribe(subscription: Subscription): Unit = { this.subscription = Some(subscription) subscription.request(batchSize) } override def onNext(result: Document): Unit = { println(document.toJson()) seen += 1 if (seen == batchSize) { seen = 0 subscription.get.request(batchSize) } } override def onError(e: Throwable): Unit = println(s"Error: $e") override def onComplete(): Unit = println("Completed") })
Auxiliares observáveis
O pacote org.mongodb.scala
fornece interação aprimorada com tipos Publisher
. A funcionalidade estendida inclui assinatura simples por meio de funções anônimas:
// Subscribe with custom onNext: collection.find().subscribe((doc: Document) => println(doc.toJson())) // Subscribe with custom onNext and onError collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e")) // Subscribe with custom onNext, onError and onComplete collection.find().subscribe((doc: Document) => println(doc.toJson()), (e: Throwable) => println(s"There was an error: $e"), () => println("Completed!"))
O pacote org.mongodb.scala
inclui uma classe implícita que também fornece os seguintes operadores monádicas para simplificar o encadeamento e o trabalho com instâncias Publisher
ou Observable
:
GenerateHtmlObservable().andThen({ case Success(html: String) => renderHtml(html) case Failure(t) => renderHttp500 })
A seguinte lista descreve os operadores Monádica disponíveis:
andThen
: permite a cadeia de instâncias doObservable
.collect
: coleta todos os resultados em uma sequência.fallbackTo
: permite voltar para umaObservable
alternativa se houver uma falha.filter
: filtra os resultados deObservable
.flatMap
: Cria um novoObservable
aplicando uma função a cada resultado doObservable
.foldLeft
: cria um novoObservable
que contém o único resultado da função de acumulador aplicada.foreach
: Aplica uma função aplicada a cada resultado emitido.head
: retorna a head doObservable
em umFuture
.map
: Cria um novoObservable
aplicando uma função para cada resultado emitido doObservable
.observeOn
: cria um novoObservable
que utiliza umExecutionContext
específico para operações futuras.recover
: cria um novoObservable
que lidará com qualquer lançamento correspondente que esseObservable
possa conter, atribuindo a ele um valor de outroObservable
.recoverWith
: Cria um novoObservable
que lidará com qualquer lançamento correspondente que esseObservable
possa conter.toFuture
: coleta os resultados deObservable
e os converte emFuture
.transform
: cria um novoObservable
aplicando a funçãoresultFunction
a cada resultado emitido.withFilter
: fornece suporte para compreensão de instâncias doObservable
.zip
: compacta os valores deste e de outroObservable
e cria um novoObservable
contendo a tupla de seus resultados.
Veja o Boxedpublisher Documentação da API para saber mais sobre cada operador.
Observável única
Como SingleObservable[T]
retorna um único item, o método toFuture()
retorna Future[T]
da mesma forma que o método head. Há também um conversor implícito que converte um Publisher
em um SingleObservable
.