변화를 주시하세요
변경 스트림을 열어 collection, 데이터베이스 또는 배포서버에 대한 변경 사항과 같은 MongoDB의 데이터 변경 사항을 추적할 수 있습니다. 변경 스트림을 통해 애플리케이션은 데이터의 변경 사항을 감시하고 이에 대응할 수 있습니다.
변경 스트림은 변경 사항 발생 시 변경 이벤트 문서를 반환합니다. 변경 이벤트에는 업데이트 데이터에 대한 정보가 포함되어 있습니다.
다음 코드 예제와 같이 MongoCollection
, MongoDatabase
또는 MongoClient
객체에서 watch()
메서드를 호출하여 변경 스트림을 엽니다.
ChangeStreamIterable<Document> changeStream = database.watch();
watch()
메서드는 선택적으로 여러 단계의 배열로 구성된 집계 파이프라인을 첫 번째 매개변수로 사용하여 다음과 같이 변경 이벤트 출력을 필터링하고 변환합니다:
List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
watch()
메서드는 결과에 액세스, 구성 및 탐색할 수 있는 여러 매서드를 제공하는 클래스인 ChangeStreamIterable
의 인스턴스를 반환합니다. 또한 ChangeStreamIterable
은(는) 핵심 Java 인터페이스 Iterable
을(를) 구현하는 상위 클래스인 MongoIterable
의 메서드를 상속합니다.
ChangeStreamIterable
에서 forEach()
를 호출하여 이벤트가 발생할 때 처리하거나, 결과를 탐색하는 데 사용할 수 있는 MongoCursor
인스턴스를 반환하는 iterator()
메서드를 사용할 수 있습니다.
MongoCursor
에서 hasNext()
와 같은 메서드를 호출해 추가 결과가 있는지 확인하고 next()
를 호출해 collection의 다음 문서를 반환하거나 tryNext()
를 호출해 변경 스트림에서 사용 가능한 다음 요소 또는 null
를 즉시 반환할 수 있습니다. 다른 쿼리에서 반환되는 MongoCursor
와 달리 변경 스트림과 연결된 MongoCursor
는 변경 이벤트가 도착할 때까지 기다렸다가 next()
에서 결과를 반환합니다. 따라서 변경 스트림의 MongoCursor
를 사용해 next()
를 호출하면 java.util.NoSuchElementException
이 반환되지 않습니다.
변경 스트림에서 반환된 문서를 처리하기 위한 옵션을 구성하려면 watch()
에서 반환한 ChangeStreamIterable
객체의 멤버 메서드를 사용합니다. 사용 가능한 메서드에 대한 자세한 내용은 이 예시 하단에 있는 ChangeStreamIterable
API 문서 링크를 참조하세요.
콜백으로 변경 스트림 이벤트를 처리하는 방법
변경 스트림에서 이벤트를 캡처하려면 아래와 같이 콜백 함수를 사용하여 forEach()
메서드를 호출합니다.
changeStream.forEach(event -> System.out.println("Change observed: " + event));
콜백 함수는 변경 이벤트가 발생할 때 트리거됩니다. 이벤트 문서가 수신되면 이를 처리하기 위해 콜백에서 논리를 지정할 수 있습니다.
중요
forEach()는 현재 스레드를 차단함
forEach()
호출은 해당 변경 스트림이 이벤트를 수신하는 동안 현재 스레드를 차단합니다. 프로그램에서 요청을 처리하거나 사용자 입력에 응답하는 등 다른 로직을 계속 실행해야 하는 경우 별도의 스레드에서 변경 스트림을 생성하고 수신하는 것이 좋습니다.
참고
업데이트 작업 변경 이벤트의 경우 변경 스트림은 기본적으로 업데이트된 문서 전체가 아니라 수정된 필드만 반환합니다. 다음과 같이 값이 FullDocument.UPDATE_LOOKUP
인 ChangeStreamIterable
객체의 fullDocument()
멤버 메서드를 호출하여 문서의 최신 버전도 반환하도록 변경 스트림을 구성할 수 있습니다.
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
예시
다음 예에서는 변경 스트림을 사용하여 두 개의 개별 애플리케이션으로 변경 사항을 수신하는 방법을 보여줍니다.
Watch
로 명명된 첫 번째 애플리케이션은sample_mflix
데이터베이스의movies
컬렉션에서 변경 스트림을 엽니다.Watch
는operationType
을 기준으로 변경 사항을 필터링하기 위해 집계 파이프라인을 사용하여 삽입 및 업데이트 이벤트만 수신합니다(삭제는 생략되어 제외됨).Watch
는 콜백을 사용하여 컬렉션에서 발생하는 필터링된 변경 이벤트를 수신하고 출력합니다.WatchCompanion
으로 명명된 두 번째 애플리케이션은sample_mflix
데이터베이스의movies
컬렉션에 단일 문서를 삽입합니다. 다음으로WatchCompanion
이 새 필드 값으로 문서를 업데이트합니다. 마지막으로WatchCompanion
이 문서를 삭제합니다.
먼저 Watch
를 실행하여 컬렉션에서 변경 스트림을 열고 forEach()
메서드를 사용하여 변경 스트림에 콜백을 정의합니다. Watch
가 실행되는 동안 WatchCompanion
을 실행하여 컬렉션을 변경하고 변경 이벤트를 생성합니다.
참고
이 예시 에서는 연결 URI를 사용하여 MongoDB 인스턴스 에 연결합니다. MongoDB 인스턴스 에 연결하는 방법에 학습 보려면 연결 가이드 를 참조하세요.
Watch
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package usage.examples; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion
:
// Performs CRUD operations to generate change events when run with the Watch application package usage.examples; import java.util.Arrays; import org.bson.Document; import org.bson.types.ObjectId; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Success! Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
앞의 애플리케이션을 순서대로 실행하면 다음과 유사한 Watch
애플리케이션의 출력이 표시됩니다. 집계 파이프라인이 delete
작업을 필터링하므로 insert
및 update
작업만 출력됩니다.
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
또한 다음과 유사한 WatchCompanion
애플리케이션의 출력도 표시됩니다.
Success! Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
이 페이지에 언급된 클래스 및 메서드에 대한 추가 정보는 다음 리소스를 참조하세요.
Change Streams 서버 수동 입력
변경 이벤트 서버 수동 입력
집계 파이프라인 서버 수동 입력
집계 단계 서버 수동 입력
ChangeStreamIterable API 설명서
MongoCollection.watch() API 설명서
MongoDatabase.watch() API 설명서
MongoClient.watch() API 문서