문서 메뉴
문서 홈
/ / /
Java 동기화
/

변화를 주시하세요

change stream을 열어 collection, 데이터베이스 또는 배포서버에 대한 변경 사항과 같은 MongoDB의 데이터 변경 사항을 추적할 수 있습니다. change stream을 통해 애플리케이션은 데이터의 변경 사항을 감시하고 이에 대응할 수 있습니다.

change stream은 변경 사항 발생 시 변경 이벤트 문서를 반환합니다. 변경 이벤트에는 업데이트 데이터에 대한 정보가 포함되어 있습니다.

다음 코드 예제와 같이 MongoCollection, MongoDatabase 또는 MongoClient 객체에서 watch() 메서드를 호출하여 변경 스트림을 엽니다.

ChangeStreamIterable<Document> changeStream = database.watch();

watch() 메서드는 선택적으로 여러 단계의 배열로 구성된 애그리게이션 파이프라인을 첫 번째 매개변수로 사용하여 다음과 같이 변경 이벤트 출력을 필터링하고 변환합니다:

List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.lt("fullDocument.runtime", 15)));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline);

watch() 메서드는 결과에 액세스, 구성 및 탐색할 수 있는 여러 매서드를 제공하는 클래스인 ChangeStreamIterable의 인스턴스를 반환합니다. 또한 ChangeStreamIterable은(는) 핵심 Java 인터페이스 Iterable을(를) 구현하는 상위 클래스인 MongoIterable의 메서드를 상속합니다.

ChangeStreamIterable에서 forEach()를 호출하여 이벤트가 발생할 때 처리하거나, 결과를 탐색하는 데 사용할 수 있는 MongoCursor 인스턴스를 반환하는 iterator() 메서드를 사용할 수 있습니다.

MongoCursor에서 hasNext()와 같은 메서드를 호출해 추가 결과가 있는지 확인하고 next()를 호출해 collection의 다음 문서를 반환하거나 tryNext()를 호출해 변경 스트림에서 사용 가능한 다음 요소 또는 null를 즉시 반환할 수 있습니다. 다른 쿼리에서 반환되는 MongoCursor와 달리 변경 스트림과 연결된 MongoCursor는 변경 이벤트가 도착할 때까지 기다렸다가 next()에서 결과를 반환합니다. 따라서 변경 스트림의 MongoCursor를 사용해 next()를 호출하면 java.util.NoSuchElementException이 반환되지 않습니다.

change stream에서 반환된 문서를 처리하기 위한 옵션을 구성하려면 watch()에서 반환한 ChangeStreamIterable 객체의 멤버 메서드를 사용합니다. 사용 가능한 메서드에 대한 자세한 내용은 이 예시 하단에 있는 ChangeStreamIterable API 문서 링크를 참조하세요.

변경 내역에서 이벤트를 캡처하려면 아래와 같이 콜백 함수를 사용하여 forEach() 메서드를 호출합니다.

changeStream.forEach(event -> System.out.println("Change observed: " + event));

콜백 함수는 변경 이벤트가 발생할 때 트리거됩니다. 이벤트 문서가 수신되면 이를 처리하기 위해 콜백에서 논리를 지정할 수 있습니다.

중요

forEach()는 현재 스레드를 차단함

forEach() 호출은 해당 변경 스트림이 이벤트를 수신하는 동안 현재 스레드를 차단합니다. 프로그램에서 요청을 처리하거나 사용자 입력에 응답하는 등 다른 로직을 계속 실행해야 하는 경우 별도의 스레드에서 변경 스트림을 생성하고 수신하는 것이 좋습니다.

참고

업데이트 작업 변경 이벤트의 경우 변경 스트림은 기본적으로 업데이트된 문서 전체가 아니라 수정된 필드만 반환합니다. 다음과 같이 값이 FullDocument.UPDATE_LOOKUPChangeStreamIterable 객체의 fullDocument() 멤버 메서드를 호출하여 문서의 최신 버전도 반환하도록 변경 스트림을 구성할 수 있습니다.

ChangeStreamIterable<Document> changeStream = database.watch()
.fullDocument(FullDocument.UPDATE_LOOKUP);

다음 예에서는 change stream을 사용하여 두 개의 개별 애플리케이션으로 변경 사항을 수신하는 방법을 보여줍니다.

  • Watch로 명명된 첫 번째 애플리케이션은 sample_mflix 데이터베이스의 movies 컬렉션에서 변경 스트림을 엽니다. WatchoperationType을 기준으로 변경 사항을 필터링하기 위해 애그리게이션 파이프라인을 사용하여 삽입 및 업데이트 이벤트만 수신합니다(삭제는 생략되어 제외됨). Watch는 콜백을 사용하여 컬렉션에서 발생하는 필터링된 변경 이벤트를 수신하고 출력합니다.

  • WatchCompanion으로 명명된 두 번째 애플리케이션은 sample_mflix 데이터베이스의 movies 컬렉션에 단일 문서를 삽입합니다. 다음으로 WatchCompanion이 새 필드 값으로 문서를 업데이트합니다. 마지막으로 WatchCompanion이 문서를 삭제합니다.

먼저 Watch를 실행하여 컬렉션에서 변경 스트림을 열고 forEach() 메서드를 사용하여 변경 스트림에 콜백을 정의합니다. Watch가 실행되는 동안 WatchCompanion을 실행하여 컬렉션을 변경하고 변경 이벤트를 생성합니다.

참고

이 예제에서는 연결 URI를 사용하여 MongoDB 인스턴스에 연결합니다. MongoDB 인스턴스에 연결하는 방법에 대해 자세히 알아보려면 연결 가이드를 참조하세요.

Watch:

package usage.examples;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.client.ChangeStreamIterable;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.changestream.FullDocument;
public class Watch {
public static void main( String[] args ) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
List<Bson> pipeline = Arrays.asList(
Aggregates.match(
Filters.in("operationType",
Arrays.asList("insert", "update"))));
ChangeStreamIterable<Document> changeStream = database.watch(pipeline)
.fullDocument(FullDocument.UPDATE_LOOKUP);
// variables referenced in a lambda must be final; final array gives us a mutable integer
final int[] numberOfEvents = {0};
changeStream.forEach(event -> {
System.out.println("Received a change to the collection: " + event);
if (++numberOfEvents[0] >= 2) {
System.exit(0);
}
});
}
}
}

WatchCompanion:

package usage.examples;
import java.util.Arrays;
import org.bson.Document;
import org.bson.types.ObjectId;
import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.result.InsertOneResult;
public class WatchCompanion {
public static void main(String[] args) {
// Replace the uri string with your MongoDB deployment's connection string
String uri = "<connection string uri>";
try (MongoClient mongoClient = MongoClients.create(uri)) {
MongoDatabase database = mongoClient.getDatabase("sample_mflix");
MongoCollection<Document> collection = database.getCollection("movies");
try {
InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document"));
System.out.println("Success! Inserted document id: " + insertResult.getInsertedId());
UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update"));
System.out.println("Updated " + updateResult.getModifiedCount() + " document.");
DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update"));
System.out.println("Deleted " + deleteResult.getDeletedCount() + " document.");
} catch (MongoException me) {
System.err.println("Unable to insert, update, or replace due to an error: " + me);
}
}
}
}

앞의 애플리케이션을 순서대로 실행하면 다음과 유사한 Watch 애플리케이션의 출력이 표시됩니다. 애그리게이션 파이프라인이 delete 작업을 필터링하므로 insertupdate 작업만 출력됩니다.

Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='insert'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=null,
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}
Received a change to the collection: ChangeStreamDocument{
operationType=OperationType{value='update'},
resumeToken={"_data": "825E..."},
namespace=sample_mflix.movies,
destinationNamespace=null,
fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}},
documentKey={"_id": {"$oid": "5ec3..."}},
clusterTime=Timestamp{...},
updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}},
txnNumber=null,
lsid=null,
wallTime=BsonDateTime{value=1657...}
}

또한 다음과 유사한 WatchCompanion 애플리케이션의 출력도 표시됩니다.

Success! Inserted document id: BsonObjectId{value=5ec3...}
Updated 1 document.
Deleted 1 document.

레거시 API

레거시 API를 사용하는 경우 FAQ 페이지를 참조하여 코드 예제의 어떤 부분을 변경해야는지 확인하세요.

이 페이지에 언급된 클래스 및 메서드에 대한 추가 정보는 다음 리소스를 참조하세요.

  • Change Streams 서버 수동 입력

  • 변경 이벤트 서버 수동 입력

  • 집계 파이프라인 서버 수동 입력

  • 집계 단계 서버 수동 입력

  • ChangeStreamIterable API 설명서

  • MongoCollection.watch() API 설명서

  • MongoDatabase.watch() API 설명서

  • MongoClient.watch() API 문서

돌아가기

대량 작업 수행