Menu Docs
Página inicial do Docs
/ / /
Scala
/

Observables

Nesta página

  • Observável
  • Observável única
  • inscrição
  • Observer
  • Contrapressão
  • Auxiliares observáveis
  • Observável única

O driver Scala é um driver assíncrono e não bloqueante. Ao implementar o modelo Observable , eventos assíncronos se tornam operações simples e compostas que estão livres da complexidade das chamadas de resposta aninhadas.

Para operações assíncronas, existem três interfaces:

  • Observável

  • inscrição

  • Observer

Observação

O driver é baseado no driver MongoDB Reactive Streams e é uma implementação da especificação de reactive streams. Observable é uma implementação de Publisher e Observer é uma implementação de Subscriber.

Aplicam-se as seguintes convenções de nomenclatura de classe:

  • Observable: implementação personalizada de um Publisher

  • Observer: implementação personalizada de um Subscriber

  • Subscription

O Observable é uma implementação do Publisher estendida e, em geral, representa uma operação MongoDB que emite seus resultados para o Observer baseado em uma solicitação do Subscription para o Observable.

Importante

Observable pode ser considerado uma função parcial. Como nas funções parciais, nada acontece até que elas sejam chamadas. Um Observable pode ser inscrito várias vezes, e cada inscrição pode causar novos efeitos colaterais, como fazer query no MongoDB ou inserir dados.

A única observável feature é uma Publisher implementação que retorna somente um único item. Ele pode ser usado da mesma forma que um Observable comum.

Um Subscription representa um ciclo de vida individual de um Observer assinando um Observable. Um Subscription a um Observable só pode ser usado por um único Observer. O objetivo de um Subscription é controlar a demanda e permitir o cancelamento da assinatura do Observable.

Um Observer fornece o mecanismo para receber notificações baseadas em push do Observable. A demanda por esses eventos é sinalizada por seu Subscription.

Após a inscrição de um Observable[TResult], o Observer será passado para o Subscription embora o método onSubscribe(subscription: Subscription) . A demanda por resultados é sinalizada por meio do Subscription e quaisquer resultados são passados para o método onNext(result: TResult) . Se houver um erro por qualquer motivo, o método onError(e: Throwable) será chamado e mais nenhum evento será passado para Observer. Como alternativa, quando Observer tiver consumido todos os resultados de Observable, o método onComplete() será chamado.

No exemplo a seguir, o Subscription é usado para controlar a demanda ao iterar um Observable. A implementação padrão do Observer solicita automaticamente todos os dados. Abaixo, substituímos o método personalizado onSubscribe() para que possamos gerenciar a iteração orientada pela demanda do Observable:

collection.find().subscribe(new Observer[Document](){
var batchSize: Long = 10
var seen: Long = 0
var subscription: Option[Subscription] = None
override def onSubscribe(subscription: Subscription): Unit = {
this.subscription = Some(subscription)
subscription.request(batchSize)
}
override def onNext(result: Document): Unit = {
println(document.toJson())
seen += 1
if (seen == batchSize) {
seen = 0
subscription.get.request(batchSize)
}
}
override def onError(e: Throwable): Unit = println(s"Error: $e")
override def onComplete(): Unit = println("Completed")
})

O pacote org.mongodb.scala fornece interação aprimorada com tipos Publisher . A funcionalidade estendida inclui inscrição simples por meio de funções anônimas:

// Subscribe with custom onNext:
collection.find().subscribe((doc: Document) => println(doc.toJson()))
// Subscribe with custom onNext and onError
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"))
// Subscribe with custom onNext, onError and onComplete
collection.find().subscribe((doc: Document) => println(doc.toJson()),
(e: Throwable) => println(s"There was an error: $e"),
() => println("Completed!"))

O pacote org.mongodb.scala inclui uma classe implícita que também fornece os seguintes operadores monádicas para simplificar o encadeamento e o trabalho com instâncias Publisher ou Observable :

GenerateHtmlObservable().andThen({
case Success(html: String) => renderHtml(html)
case Failure(t) => renderHttp500
})

A seguinte lista descreve os operadores Monádica disponíveis:

  • andThen: permite a cadeia de instâncias do Observable .

  • collect: coleta todos os resultados em uma sequência.

  • fallbackTo: permite voltar para uma Observable alternativa se houver uma falha.

  • filter: filtra os resultados de Observable.

  • flatMap: Cria um novo Observable aplicando uma função a cada resultado do Observable.

  • foldLeft: cria um novo Observable que contém o único resultado da função de acumulador aplicada.

  • foreach: Aplica uma função aplicada a cada resultado emitido.

  • head: retorna a head do Observable em um Future.

  • map: Cria um novo Observable aplicando uma função para cada resultado emitido do Observable.

  • observeOn: cria um novo Observable que utiliza um ExecutionContext específico para operações futuras.

  • recover: cria um novo Observable que lidará com qualquer lançamento correspondente que esse Observable possa conter, atribuindo a ele um valor de outro Observable.

  • recoverWith: Cria um novo Observable que lidará com qualquer lançamento correspondente que esse Observable possa conter.

  • toFuture: coleta os resultados de Observable e os converte em Future.

  • transform: cria um novo Observable aplicando a função resultFunction a cada resultado emitido.

  • withFilter: fornece suporte para compreensão de instâncias do Observable .

  • zip: compacta os valores deste e de outro Observable e cria um novo Observable contendo a tupla de seus resultados.

Veja o Boxedpublisher Documentação da API para saber mais sobre cada operador.

Como SingleObservable[T] retorna um único item, o método toFuture() retorna Future[T] da mesma forma que o método head. Há também um conversor implícito que converte um Publisher em um SingleObservable.

Voltar

Monitoramento