Docs Menu
Docs Home
/ / /
Scala

Observable에서 데이터 액세스

이 페이지의 내용

  • 개요
  • Observable을 처리하는 방법
  • 샘플 데이터
  • 콜백을 사용하여 결과 처리
  • 읽기 작업 결과 액세스
  • 쓰기 작업 결과 액세스
  • Lambda 함수를 사용하여 결과 처리
  • 예시
  • 퓨처(Futures)를 사용하여 모든 결과 조회
  • 예시
  • API 문서

이 가이드 에서는 Observable 인스턴스 에서 MongoDB 작업 결과 액세스 방법을 학습 수 있습니다.

Observable 는 시간이 지남에 따라 작업에서 방출하는 데이터 스트림 나타냅니다. 이 데이터 액세스 하려면 Observable를 구독하는 Observer 인스턴스 생성하면 됩니다.

참고

스칼라 운전자 MongoDB Java Reactive Streams 운전자 기반으로 합니다. Observable 클래스는 Java Reactive Streams의 Publisher 클래스를 확장하며 결과를 프로세스 도움이 되는 추가 편의 메서드를 포함하고 있습니다.

MongoDB 작업을 실행 하고 해당 데이터를 프로세스 하려면 Observable에 작업 결과를 요청 해야 합니다. 운전자 원하는 수의 결과를 반환하는 작업에 대해 Observable 인터페이스를 제공합니다. findOne() 메서드와 같이 결과를 생성하지 않거나 하나의 결과를 생성하는 작업은 SingleObservable[T]을 반환합니다. [T] 매개변수화는 SingleObservable 가 처리하는 데이터 유형 에 해당합니다.

제한 없는 수의 결과를 생성할 수 있는 작업은 Observable[T] 유형의 인스턴스 반환합니다. 일부 작업은 결과를 구독 전에 결과를 필터하다 하고 프로세스 추가 메서드를 제공하는 특정 Observable 유형을 반환합니다. 다음 목록에서는 작업별 메서드를 Observable에 연결할 수 있는 몇 가지 유형에 대해 설명합니다.

  • FindObservable[T]: find() 메서드에서 반환

  • DistinctObservable[T]: distinct() 메서드에서 반환

  • AggregateObservable[T]: aggregate() 메서드에서 반환

작업의 Observable에서 subscribe() 메서드를 호출하여 작업 결과를 요청 수 있습니다. Observer 클래스의 인스턴스 subscribe() 메서드에 매개변수로 전달합니다. 이 ObserverObservable에서 작업 결과를 수신합니다. 그런 다음 Observer 클래스에서 제공하는 메서드를 사용하여 결과를 인쇄하고, 오류를 처리하다 , 추가 데이터 처리 수행할 수 있습니다.

결과 처리 에 대해 자세히 학습 다음 API 문서를 참조하세요.

  • 관찰 가능

  • 서브스크립션

  • 관찰자

이 가이드 의 예제에서는 restaurants Atlas 샘플 데이터 sample_restaurants 세트의 데이터베이스 에 있는 컬렉션 사용합니다. 스칼라 애플리케이션 에서 이 컬렉션 액세스 하려면 MongoClient Atlas cluster 에 연결하는 를 만들고 및 변수에 다음 값을 할당합니다.database collection

val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants")
val collection: MongoCollection[Document] = database.getCollection("restaurants")

무료 MongoDB Atlas cluster 를 생성하고 샘플 데이터 세트를 로드하는 방법을 학습 보려면 Atlas 시작하기 가이드 를 참조하세요.

Observable[T]을(를) 구독 후에는 Observer 클래스에서 제공하는 다음 콜백 메서드를 사용하여 작업 결과에 액세스 하거나 오류를 처리하다 .

  • onNext(result: TResult): Observer 이 새 결과를 수신할 때 호출됩니다. 이 메서드를 재정의하여 결과를 처리 위한 로직을 정의할 수 있습니다.

  • onError(e: Throwable): 작업에서 오류를 생성하고 ObserverObservable에서 더 많은 데이터를 수신하는 것을 방지할 때 호출됩니다. 이 메서드를 재정의하여 오류 처리 로직을 정의할 수 있습니다.

  • onComplete(): ObserverObservable의 모든 결과를 소비할 때 호출됩니다. 이 메서드를 재정의하여 최종 데이터 처리 수행할 수 있습니다.

다음 섹션에서는 이러한 메서드를 사용자 지정하여 Observable에서 읽기 및 쓰기 (write) 작업 결과를 프로세스 방법을 보여 줍니다.

읽기 작업으로 검색한 데이터 액세스 하려면 Observable[T] 를 만들어 작업 결과를 저장 합니다. 그런 다음 관찰 가능 항목을 구독 하고 Observer 클래스 콜백 메서드를 재정의하여 결과를 프로세스 .

이 예시 restaurants 컬렉션 에서 cuisine 값이 "Czech"인 문서를 쿼리합니다. 결과를 조회 하고 프로세스 위해 이 예시 작업에 Observable[Document] 를 할당하고 다음 작업을 수행합니다.

  • subscribe() 메서드를 호출하여 Observable 을(를) 구독 하고 Observer 을(를) 매개 변수로 전달합니다.

  • onNext() 메서드를 재정의하여 Document 인스턴스인 조회된 각 문서 인쇄합니다.

  • onError() 메서드를 재정의하여 오류를 출력합니다.

  • Observable 의 모든 결과가 처리된 후 메시지를 인쇄하도록 onComplete() 메서드를 재정의합니다.

val filter = equal("cuisine", "Czech")
val findObservable: Observable[Document] = collection.find(filter)
findObservable.subscribe(new Observer[Document] {
override def onNext(result: Document): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}),
(restaurant_id,BsonString{value='40812870'}))
Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}),
(restaurant_id,BsonString{value='41485121'}))
Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}),
(restaurant_id,BsonString{value='41569184'}))
Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}),
(restaurant_id,BsonString{value='41711983'}))
Processed all results

쓰기 (write) 작업으로 검색한 데이터 액세스 하려면 Observable[T] 를 만들어 작업 결과를 저장 합니다. 그런 다음 관찰 가능 항목을 구독 하고 Observer 클래스 콜백 메서드를 재정의하여 결과를 프로세스 .

이 예시 cuisine 값이 "Nepalese"restaurants 컬렉션 에 문서를 삽입합니다. 결과를 조회 하고 프로세스 위해 이 예시 작업에 Observable[InsertManyResult] 를 할당하고 다음 작업을 수행합니다.

  • subscribe() 메서드를 호출하여 Observable 을(를) 구독 하고 Observer 을(를) 매개 변수로 전달합니다.

  • onNext() 메서드를 재정의하여 삽입 작업의 결과를 출력하며, InsertManyResult로 반환됩니다.

  • onError() 메서드를 재정의하여 오류를 출력합니다.

  • Observable 의 모든 결과가 처리된 후 메시지를 인쇄하도록 onComplete() 메서드를 재정의합니다.

val docs: Seq[Document] = Seq(
Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"),
Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese")
)
val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs)
insertObservable.subscribe(new Observer[InsertManyResult] {
override def onNext(result: InsertManyResult): Unit = println(result)
override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage)
override def onComplete(): Unit = println("Processed all results")
})
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...},
1=BsonObjectId{value=...}}}
Processed all results

Observer 클래스의 콜백 함수를 명시적으로 재정의하는 대신 Lambda 함수를 사용하여 작업 결과를 간결하게 프로세스 할 수 있습니다. 이러한 함수를 사용하면 => 화살표 표기법을 사용하여 onNext(), onError()onComplete()의 구현 사용자 지정할 수 있습니다.

익명 함수라고도 하는 Lambda 함수에 대해 자세히 학습 익명 함수 위키백과 항목을 참조하세요.

이 예시 borough 필드 의 각 고유 값에 대해 restaurants 컬렉션 쿼리합니다. 이 코드는 distinct() 메서드에서 반환된 Observable 를 구독한 다음 Lambda 함수를 사용하여 결과를 출력하고 오류를 처리하다 .

collection.distinct("borough")
.subscribe((value: String) => println(value),
(e: Throwable) => println(s"Failed: $e"),
() => println("Processed all results"))
Bronx
Brooklyn
Manhattan
Missing
Queens
Staten Island
Processed all results

toFuture() 메서드를 호출하여 Observable 를 암시적으로 구독 하고 결과를 집계할 수 있습니다. Observable에서 toFuture() 를 호출하면 운전자 다음 작업을 수행합니다.

  • 다음을 구독합니다. Observable

  • Observable 에서 방출한 항목을 수집하여 Future 인스턴스 에 저장합니다.

그런 다음 Future 를 반복하여 작업 결과를 조회 할 수 있습니다.

중요

Observable 에 많은 수의 문서가 포함된 toFuture() 경우 메서드를 호출하면 상당한 메모리가 소모됩니다. 큰 결과 설정하다 예상되는 경우 콜백 또는 Lambda 함수를 사용하여 결과에 액세스 좋습니다.

이 예시 restaurants 컬렉션 에서 name 필드 의 값이 "The Halal Guys"인 문서를 쿼리합니다. 작업 결과 액세스 위해 코드는 ObservableFuture로 변환하고 Future 가 각 결과를 수집할 때까지 기다렸다가 결과를 반복합니다.

val observable = collection.find(equal("name", "The Halal Guys"))
val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50012258'}))
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}),
(restaurant_id,BsonString{value='50017823'}))

이 가이드에서 사용되는 메서드 또는 유형에 대해 자세히 알아보려면 다음 API 설명서를 참조하세요.

돌아가기

데이터 집계