Observable에서 데이터 액세스
이 페이지의 내용
개요
이 가이드 에서는 Observable
인스턴스 에서 MongoDB 작업 결과 액세스 방법을 학습 수 있습니다.
Observable
는 시간이 지남에 따라 작업에서 방출하는 데이터 스트림 나타냅니다. 이 데이터 액세스 하려면 Observable
를 구독하는 Observer
인스턴스 생성하면 됩니다.
참고
스칼라 운전자 MongoDB Java Reactive Streams 운전자 기반으로 합니다. Observable
클래스는 Java Reactive Streams의 Publisher
클래스를 확장하며 결과를 프로세스 도움이 되는 추가 편의 메서드를 포함하고 있습니다.
Observable을 처리하는 방법
MongoDB 작업을 실행 하고 해당 데이터를 프로세스 하려면 Observable
에 작업 결과를 요청 해야 합니다. 운전자 원하는 수의 결과를 반환하는 작업에 대해 Observable
인터페이스를 제공합니다. findOne()
메서드와 같이 결과를 생성하지 않거나 하나의 결과를 생성하는 작업은 SingleObservable[T]
을 반환합니다. [T]
매개변수화는 SingleObservable
가 처리하는 데이터 유형 에 해당합니다.
제한 없는 수의 결과를 생성할 수 있는 작업은 Observable[T]
유형의 인스턴스 반환합니다. 일부 작업은 결과를 구독 전에 결과를 필터하다 하고 프로세스 추가 메서드를 제공하는 특정 Observable
유형을 반환합니다. 다음 목록에서는 작업별 메서드를 Observable
에 연결할 수 있는 몇 가지 유형에 대해 설명합니다.
FindObservable[T]
:find()
메서드에서 반환DistinctObservable[T]
:distinct()
메서드에서 반환AggregateObservable[T]
:aggregate()
메서드에서 반환
작업의 Observable
에서 subscribe()
메서드를 호출하여 작업 결과를 요청 수 있습니다. Observer
클래스의 인스턴스 subscribe()
메서드에 매개변수로 전달합니다. 이 Observer
는 Observable
에서 작업 결과를 수신합니다. 그런 다음 Observer
클래스에서 제공하는 메서드를 사용하여 결과를 인쇄하고, 오류를 처리하다 , 추가 데이터 처리 수행할 수 있습니다.
결과 처리 에 대해 자세히 학습 다음 API 문서를 참조하세요.
샘플 데이터
이 가이드 의 예제에서는 restaurants
Atlas 샘플 데이터 sample_restaurants
세트의 데이터베이스 에 있는 컬렉션 사용합니다. 스칼라 애플리케이션 에서 이 컬렉션 액세스 하려면 MongoClient
Atlas cluster 에 연결하는 를 만들고 및 변수에 다음 값을 할당합니다.database
collection
val database: MongoDatabase = mongoClient.getDatabase("sample_restaurants") val collection: MongoCollection[Document] = database.getCollection("restaurants")
무료 MongoDB Atlas cluster 를 생성하고 샘플 데이터 세트를 로드하는 방법을 학습 보려면 Atlas 시작하기 가이드 를 참조하세요.
콜백을 사용하여 결과 처리
Observable[T]
을(를) 구독 후에는 Observer
클래스에서 제공하는 다음 콜백 메서드를 사용하여 작업 결과에 액세스 하거나 오류를 처리하다 .
onNext(result: TResult)
:Observer
이 새 결과를 수신할 때 호출됩니다. 이 메서드를 재정의하여 결과를 처리 위한 로직을 정의할 수 있습니다.onError(e: Throwable)
: 작업에서 오류를 생성하고Observer
가Observable
에서 더 많은 데이터를 수신하는 것을 방지할 때 호출됩니다. 이 메서드를 재정의하여 오류 처리 로직을 정의할 수 있습니다.onComplete()
:Observer
가Observable
의 모든 결과를 소비할 때 호출됩니다. 이 메서드를 재정의하여 최종 데이터 처리 수행할 수 있습니다.
다음 섹션에서는 이러한 메서드를 사용자 지정하여 Observable
에서 읽기 및 쓰기 (write) 작업 결과를 프로세스 방법을 보여 줍니다.
읽기 작업 결과 액세스
읽기 작업으로 검색한 데이터 액세스 하려면 Observable[T]
를 만들어 작업 결과를 저장 합니다. 그런 다음 관찰 가능 항목을 구독 하고 Observer
클래스 콜백 메서드를 재정의하여 결과를 프로세스 .
이 예시 restaurants
컬렉션 에서 cuisine
값이 "Czech"
인 문서를 쿼리합니다. 결과를 조회 하고 프로세스 위해 이 예시 작업에 Observable[Document]
를 할당하고 다음 작업을 수행합니다.
subscribe()
메서드를 호출하여Observable
을(를) 구독 하고Observer
을(를) 매개 변수로 전달합니다.onNext()
메서드를 재정의하여Document
인스턴스인 조회된 각 문서 인쇄합니다.onError()
메서드를 재정의하여 오류를 출력합니다.Observable
의 모든 결과가 처리된 후 메시지를 인쇄하도록onComplete()
메서드를 재정의합니다.
val filter = equal("cuisine", "Czech") val findObservable: Observable[Document] = collection.find(filter) findObservable.subscribe(new Observer[Document] { override def onNext(result: Document): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
Iterable((_id, ...), ..., (name,BsonString{value='Koliba Restaurant'}), (restaurant_id,BsonString{value='40812870'})) Iterable((_id, ...), ..., (name,BsonString{value='Bohemian Beer Garden'}), (restaurant_id,BsonString{value='41485121'})) Iterable((_id,...), ..., (name,BsonString{value='Hospoda'}), (restaurant_id,BsonString{value='41569184'})) Iterable((_id,...), ..., (name,BsonString{value='Olde Prague Tavern'}), (restaurant_id,BsonString{value='41711983'})) Processed all results
쓰기 작업 결과 액세스
쓰기 (write) 작업으로 검색한 데이터 액세스 하려면 Observable[T]
를 만들어 작업 결과를 저장 합니다. 그런 다음 관찰 가능 항목을 구독 하고 Observer
클래스 콜백 메서드를 재정의하여 결과를 프로세스 .
이 예시 cuisine
값이 "Nepalese"
인 restaurants
컬렉션 에 문서를 삽입합니다. 결과를 조회 하고 프로세스 위해 이 예시 작업에 Observable[InsertManyResult]
를 할당하고 다음 작업을 수행합니다.
subscribe()
메서드를 호출하여Observable
을(를) 구독 하고Observer
을(를) 매개 변수로 전달합니다.onNext()
메서드를 재정의하여 삽입 작업의 결과를 출력하며,InsertManyResult
로 반환됩니다.onError()
메서드를 재정의하여 오류를 출력합니다.Observable
의 모든 결과가 처리된 후 메시지를 인쇄하도록onComplete()
메서드를 재정의합니다.
val docs: Seq[Document] = Seq( Document("name" -> "Cafe Himalaya", "cuisine" -> "Nepalese"), Document("name" -> "Taste From Everest", "cuisine" -> "Nepalese") ) val insertObservable: Observable[InsertManyResult] = collection.insertMany(docs) insertObservable.subscribe(new Observer[InsertManyResult] { override def onNext(result: InsertManyResult): Unit = println(result) override def onError(e: Throwable): Unit = println("Failed: " + e.getMessage) override def onComplete(): Unit = println("Processed all results") })
AcknowledgedInsertManyResult{insertedIds={0=BsonObjectId{value=...}, 1=BsonObjectId{value=...}}} Processed all results
Lambda 함수를 사용하여 결과 처리
Observer
클래스의 콜백 함수를 명시적으로 재정의하는 대신 Lambda 함수를 사용하여 작업 결과를 간결하게 프로세스 할 수 있습니다. 이러한 함수를 사용하면 =>
화살표 표기법을 사용하여 onNext()
, onError()
및 onComplete()
의 구현 사용자 지정할 수 있습니다.
팁
익명 함수라고도 하는 Lambda 함수에 대해 자세히 학습 익명 함수 위키백과 항목을 참조하세요.
예시
이 예시 borough
필드 의 각 고유 값에 대해 restaurants
컬렉션 쿼리합니다. 이 코드는 distinct()
메서드에서 반환된 Observable
를 구독한 다음 Lambda 함수를 사용하여 결과를 출력하고 오류를 처리하다 .
collection.distinct("borough") .subscribe((value: String) => println(value), (e: Throwable) => println(s"Failed: $e"), () => println("Processed all results"))
Bronx Brooklyn Manhattan Missing Queens Staten Island Processed all results
퓨처(Futures)를 사용하여 모든 결과 조회
toFuture()
메서드를 호출하여 Observable
를 암시적으로 구독 하고 결과를 집계할 수 있습니다. Observable
에서 toFuture()
를 호출하면 운전자 다음 작업을 수행합니다.
다음을 구독합니다.
Observable
Observable
에서 방출한 항목을 수집하여Future
인스턴스 에 저장합니다.
그런 다음 Future
를 반복하여 작업 결과를 조회 할 수 있습니다.
중요
예시
이 예시 restaurants
컬렉션 에서 name
필드 의 값이 "The Halal Guys"
인 문서를 쿼리합니다. 작업 결과 액세스 위해 코드는 Observable
를 Future
로 변환하고 Future
가 각 결과를 수집할 때까지 기다렸다가 결과를 반복합니다.
val observable = collection.find(equal("name", "The Halal Guys")) val results = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS)) results.foreach(println)
Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50012258'})) Iterable((_id,...), ..., (name,BsonString{value='The Halal Guys'}), (restaurant_id,BsonString{value='50017823'}))
API 문서
이 가이드에서 사용되는 메서드 또는 유형에 대해 자세히 알아보려면 다음 API 설명서를 참조하세요.