Change Streams 열기
이 페이지의 내용
개요
이 가이드에서는 change stream을 사용하여 데이터베이스의 실시간 변경 사항을 모니터링하는 방법을 배울 수 있습니다. change stream은 애플리케이션이 단일 collection, 데이터베이스 또는 배포에서 데이터 변경 사항을 구독할 수 있도록 하는 MongoDB Server 기능입니다.
애그리게이션 연산자 세트를 지정하여 애플리케이션이 수신하는 데이터를 필터링하고 변환할 수 있습니다. MongoDB deployment v6.0 이상에 연결할 때 변경 전후의 문서 데이터를 포함하도록 이벤트를 구성할 수도 있습니다.
다음 섹션에서 change stream을 열고 구성하는 방법을 알아보세요.
변경 스트림 열기
change stream을 열어 특정 유형의 데이터 변경 사항을 구독하고 애플리케이션에서 변경 이벤트를 생성할 수 있습니다.
관찰할 범위 선택
change stream을 열려면 watch()
MongoCollection
MongoDatabase
instance, 또는 인스턴스에서 메서드를 MongoClient
호출합니다.
중요
독립형 MongoDB 배포는 변경 스트림을 지원하지 않는데, 이는 이 기능에 복제본 세트 oplog가 필요하기 때문입니다. oplog 에 대해 자세히 알아보려면 복제본 세트 oplog MongoDB Server 매뉴얼 페이지를 참조하세요.
watch()
메서드를 호출하는 객체 에 따라 변경 스트림 수신하는 이벤트 범위가 결정됩니다.
MongoCollection.watch()
컬렉션 모니터링합니다.MongoDatabase.watch()
데이터베이스 의 모든 컬렉션을 모니터링합니다.MongoClient.watch()
연결된 MongoDB deployment 의 모든 변경 사항을 모니터링합니다.
이벤트 필터링
watch()
메서드는 다음과 같이 변경 이벤트 출력을 필터하다 하고 변환하는 데 사용할 수 있는 단계 목록으로 구성된 선택적 집계 파이프라인 첫 번째 매개변수로 사용합니다.
List<Bson> pipeline = List.of( Aggregates.match( Filters.in("operationType", List.of("insert", "update"))), Aggregates.match( Filters.lt("fullDocument.runtime", 15))); ChangeStreamIterable<Document> changeStream = database.watch(pipeline);
참고
업데이트 작업 변경 이벤트의 경우 변경 스트림은 기본적으로 업데이트된 문서 전체가 아니라 수정된 필드만 반환합니다. 다음과 같이 값이 FullDocument.UPDATE_LOOKUP
인 ChangeStreamIterable
객체의 fullDocument()
멤버 메서드를 호출하여 문서의 최신 버전도 반환하도록 변경 스트림을 구성할 수 있습니다.
ChangeStreamIterable<Document> changeStream = database.watch() .fullDocument(FullDocument.UPDATE_LOOKUP);
출력 관리
watch()
메서드는 결과에 액세스 , 구성하고, 탐색할 수 있는 여러 메서드를 제공하는 인터페이스인 ChangeStreamIterable
의 인스턴스 반환합니다. 또한 ChangeStreamIterable
는 핵심 Java 인터페이스 Iterable
를 구현하는 상위 인터페이스인 MongoIterable
의 메서드를 상속합니다.
ChangeStreamIterable
에서 forEach()
를 호출하여 이벤트가 발생할 때 처리하거나, 결과를 탐색하는 데 사용할 수 있는 MongoChangeStreamCursor
인스턴스를 반환하는 iterator()
메서드를 사용할 수 있습니다.
MongoChangeStreamCursor
인스턴스 에서 다음 메서드를 호출할 수 있습니다.
hasNext()
: 더 많은 결과가 있는지 확인합니다.next()
: 컬렉션 의 다음 문서 반환합니다.tryNext()
: 변경 스트림 에서 사용 가능한 다음 요소를 즉시 반환하거나null
중요
커서 반복으로 인해 현재 스레드 차단
forEach()
또는 iterator()
메서드를 사용하여 커서 반복하면 해당 변경 스트림 이벤트를 수신하는 동안 현재 스레드가 차단됩니다. 프로그램에서 요청을 처리 하거나 사용자 입력에 응답하는 등 다른 로직을 계속 실행해야 하는 경우 별도의 스레드에서 변경 스트림 생성하고 수신하는 것이 좋습니다.
다른 쿼리에서 반환되는 MongoCursor
와 달리 변경 스트림 과 연결된 MongoChangeStreamCursor
는 변경 이벤트 도착할 때까지 기다렸다가 next()
에서 결과를 반환합니다. 따라서 변경 스트림의 MongoChangeStreamCursor
를 사용하여 next()
를 호출하면 java.util.NoSuchElementException
가 발생하지 않습니다.
변경 스트림에서 반환된 문서를 처리하기 위한 옵션을 구성하려면 watch()
에서 반환한 ChangeStreamIterable
객체의 멤버 메서드를 사용합니다. 사용 가능한 메서드에 대한 자세한 내용은 이 예시 하단에 있는 ChangeStreamIterable
API 문서 링크를 참조하세요.
예시
이 예제에서는 myColl
collection에서 change stream을 열고 change stream 이벤트가 발생할 때 출력하는 방법을 보여 줍니다.
드라이버는 ChangeStreamIterable
유형의 변수에 change stream 이벤트를 저장합니다. 다음 예제에서는 드라이버가 ChangeStreamIterable
객체를 Document
유형으로 채워야 한다고 지정합니다. 결과적으로 드라이버는 개별 change stream 이벤트를 ChangeStreamDocument
객체로 저장합니다.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch(); changeStream.forEach(event -> System.out.println("Received a change: " + event));
collection에 대한 삽입 작업은 다음과 같은 출력을 생성합니다.
Received a change: ChangeStreamDocument{ operationType=insert, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
보기 예시: 전체 파일
참고
설정 예시
이 예시 연결 URI를 사용하여 MongoDB 인스턴스 에 연결합니다. MongoDB 인스턴스 에 연결하는 방법에 대해 자세히 학습 MongoClient 만들기 가이드 참조하세요. 이 예시 movies
sample_mflix
Atlas 샘플 데이터 세트에 포함된 데이터베이스 의 컬렉션 도 사용합니다. Atlas 시작하기가이드에 따라 MongoDB Atlas 의 무료 계층 에서 데이터베이스 에 로드할 수 있습니다.
이 예시 watch 메서드를 사용하여 변경 스트림 여는 방법을 보여 줍니다. Watch.java
파일 파이프라인 인수로 사용하여 watch()
메서드를 호출하여 "insert"
및 "update"
이벤트만 필터하다 합니다. WatchCompanion.java
파일 문서 삽입, 업데이트 및 삭제합니다.
다음 예제를 사용하려면 이 순서대로 파일을 실행 .
Watch.java
파일을 실행합니다.WatchCompanion.java
파일을 실행합니다.
참고
Watch.java
파일 WatchCompanion.java
파일 실행 때까지 계속 실행 됩니다.
Watch.java
:
/** * This file demonstrates how to open a change stream by using the Java driver. * It connects to a MongoDB deployment, accesses the "sample_mflix" database, and listens * to change events in the "movies" collection. The code uses a change stream with a pipeline * to only filter for "insert" and "update" events. */ package org.example; import java.util.Arrays; import java.util.List; import org.bson.Document; import org.bson.conversions.Bson; import com.mongodb.client.ChangeStreamIterable; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; import com.mongodb.client.model.changestream.FullDocument; import com.mongodb.client.model.Aggregates; public class Watch { public static void main( String[] args ) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); // Creates instructions to match insert and update operations List<Bson> pipeline = Arrays.asList( Aggregates.match( Filters.in("operationType", Arrays.asList("insert", "update")))); // Creates a change stream that receives change events for the specified operations ChangeStreamIterable<Document> changeStream = database.watch(pipeline) .fullDocument(FullDocument.UPDATE_LOOKUP); final int[] numberOfEvents = {0}; // Prints a message each time the change stream receives a change event, until it receives two events changeStream.forEach(event -> { System.out.println("Received a change to the collection: " + event); if (++numberOfEvents[0] >= 2) { System.exit(0); } }); } } }
WatchCompanion.java
:
// Performs CRUD operations to generate change events when run with the Watch application package org.example; import org.bson.Document; import com.mongodb.MongoException; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.result.InsertOneResult; import com.mongodb.client.model.Updates; import com.mongodb.client.result.UpdateResult; import com.mongodb.client.result.DeleteResult; public class WatchCompanion { public static void main(String[] args) { // Replace the uri string with your MongoDB deployment's connection string String uri = "<connection string uri>"; try (MongoClient mongoClient = MongoClients.create(uri)) { MongoDatabase database = mongoClient.getDatabase("sample_mflix"); MongoCollection<Document> collection = database.getCollection("movies"); try { // Inserts a sample document into the "movies" collection and print its ID InsertOneResult insertResult = collection.insertOne(new Document("test", "sample movie document")); System.out.println("Inserted document id: " + insertResult.getInsertedId()); // Updates the sample document and prints the number of modified documents UpdateResult updateResult = collection.updateOne(new Document("test", "sample movie document"), Updates.set("field2", "sample movie document update")); System.out.println("Updated " + updateResult.getModifiedCount() + " document."); // Deletes the sample document and prints the number of deleted documents DeleteResult deleteResult = collection.deleteOne(new Document("field2", "sample movie document update")); System.out.println("Deleted " + deleteResult.getDeletedCount() + " document."); // Prints a message if any exceptions occur during the operations } catch (MongoException me) { System.err.println("Unable to insert, update, or replace due to an error: " + me); } } } }
전체 파일 예시 출력
앞의 애플리케이션은 다음과 같은 출력을 생성합니다.
Watch.java
집계 파이프라인 delete
작업을 필터링하므로 insert
및 update
작업만 캡처합니다.
Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='insert'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=null, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} } Received a change to the collection: ChangeStreamDocument{ operationType=OperationType{value='update'}, resumeToken={"_data": "825E..."}, namespace=sample_mflix.movies, destinationNamespace=null, fullDocument=Document{{_id=5ec3..., test=sample movie document, field2=sample movie document update}}, documentKey={"_id": {"$oid": "5ec3..."}}, clusterTime=Timestamp{...}, updateDescription=UpdateDescription{removedFields=[], updatedFields={"field2": "sample movie document update"}}, txnNumber=null, lsid=null, wallTime=BsonDateTime{value=1657...} }
WatchCompanion
완료된 작업의 요약을 인쇄합니다.
Inserted document id: BsonObjectId{value=5ec3...} Updated 1 document. Deleted 1 document.
watch()
메서드에 대해 자세히 알아보려면 다음 API 문서를 참조하세요.
change stream에 애그리게이션 연산자 적용
집계 파이프라인을 watch()
메서드에 매개변수로 전달하여 change stream이 수신할 이벤트를 지정할 수 있습니다.
사용 중인 MongoDB Server 버전이 지원하는 애그리게이션 연산자를 알아보려면 변경 스트림 출력 수정을 참조하세요.
예시
다음 코드 예제에서는 집계 파이프라인을 적용하여 삽입 및 업데이트 작업에 대해서만 이벤트를 수신하도록 change stream을 구성하는 방법을 보여 줍니다.
MongoCollection<Document> collection = database.getCollection("myColl"); List<Bson> pipeline = Arrays.asList( Aggregates.match(Filters.in("operationType", Arrays.asList("insert", "update")))); ChangeStreamIterable<Document> changeStream = collection.watch(pipeline); changeStream.forEach(event -> System.out.println("Received a change to the collection: " + event));
collection에 대한 업데이트 작업은 다음과 같은 출력을 생성합니다.
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, ... }
대규모 change stream 이벤트 분할
MongoDB 7.0부터는 $changeStreamSplitLargeEvent
애그리게이션 단계를 사용하여 16MB를 초과하는 이벤트를 더 작은 조각으로 분할할 수 있습니다.
꼭 필요한 경우에만 $changeStreamSplitLargeEvent
를 사용합니다. 예를 들어, 애플리케이션에 전체 문서 사전 또는 사후 이미지가 필요하고 16MB를 초과하는 이벤트를 생성하는 경우 $changeStreamSplitLargeEvent
를 사용합니다.
$changeStreamSplitLargeEvent 단계는 프래그먼트를 순차적으로 반환합니다. change stream 커서를 사용하여 프래그먼트에 액세스할 수 있습니다. 각 프래그먼트에는 다음 필드를 포함하는 SplitEvent
객체가 포함되어 있습니다.
필드 | 설명 |
---|---|
| 에서 시작하는 프래그먼트의 인덱스 |
| 분할 이벤트를 구성하는 총 프래그먼트 수 |
다음 예에서는 $changeStreamSplitLargeEvent
애그리게이션 단계를 사용하여 대규모 이벤트를 split하여 change stream을 수정합니다.
ChangeStreamIterable<Document> changeStream = collection.watch( List.of(Document.parse("{ $changeStreamSplitLargeEvent: {} }")));
참고
집계 파이프라인에는 $changeStreamSplitLargeEvent
단계가 하나만 있을 수 있으며 이 단계가 파이프라인의 마지막 단계여야 합니다.
다음 예와 같이 change stream 커서에서 getSplitEvent()
메서드를 호출하여 SplitEvent
에 액세스할 수 있습니다.
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = changeStream.cursor(); SplitEvent event = cursor.tryNext().getSplitEvent();
$changeStreamSplitLargeEvent
집계 단계에 대한 자세한 내용은 $changeStreamSplitLargeEvent 서버 설명서를 참조하세요.
사전 이미지 및 사후 이미지 포함하기
다음 데이터를 포함하거나 생략하도록 변경 이벤트를 구성할 수 있습니다.
사전 이미지 작업 전의 문서의 버전을 나타내는 문서(있는 경우)
사후 이미지 작업 후의 문서의 버전을 나타내는 문서(있는 경우)
중요
배포에서 MongoDB v6.0 이상을 사용하는 경우에만 collection에서 사전 및 사후 이미지를 활성화할 수 있습니다.
사전 이미지 또는 사후 이미지가 포함된 변경 스트림 이벤트를 수신하려면 다음 작업을 수행해야 합니다.
MongoDB deployment에서 collection에 대한 사전 이미지 및 사후 이미지를 활성화합니다.
팁
배포에서 사전 및 사후 이미지를 활성화하는 방법에 대한 자세한 내용은 서버 설명서의 Change Streams with Document Pre- and Post-Images(이미지 사전 및 사후 문서화의 Change Streams)를 참조하세요.
사전 이미지 및 사후 이미지가 활성화된 컬렉션을 생성하도록 드라이버에 지시하는 방법을 알아보려면 사전 이미지 및 사후 이미지가 활성화 된 컬렉션 생성 섹션을 참조하세요.
사전 이미지와 사후 이미지 중 하나 또는 둘 다를 검색하도록 change stream을 구성합니다.
팁
변경 이벤트에 사전 이미지를 기록하도록 변경 스트림을 구성하려면 사전 이미지 구성 예시를 참조하세요.
변경 이벤트에 사후 이미지를 기록하도록 변경 스트림을 구성하려면 사후 이미지 구성 예시를 참조하세요.
사전 이미지 및 사후 이미지를 활성화하여 collection 만들기
드라이버를 사용하여 사전 이미지 및 사후 이미지 옵션이 활성화된 collection을 생성하려면 ChangeStreamPreAndPostImagesOptions
createCollection()
다음 예와 같이 인스턴스를 지정하고 메서드를 호출합니다.
CreateCollectionOptions collectionOptions = new CreateCollectionOptions(); collectionOptions.changeStreamPreAndPostImagesOptions(new ChangeStreamPreAndPostImagesOptions(true)); database.createCollection("myColl", collectionOptions);
MongoDB Shell에서 collMod
명령을 실행하여 기존 컬렉션의 사전 이미지 및 사후 이미지 옵션을 변경할 수 있습니다. 이 작업을 수행하는 방법을 알아보려면 MongoDB Server 매뉴얼의 collMod 항목을 참조하세요.
경고
collection에서 사전 이미지 또는 사후 이미지를 활성화한 경우 collMod
으)로 이러한 설정을 수정하면 해당 collection의 기존 change stream이 실패할 수 있습니다.
사전 이미지 구성 예시
다음 코드 예시에서는 사전 myColl
이미지를 포함하고 이벤트를 출력하도록 collection에서 change stream을 구성하는 방법을 보여 줍니다.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocumentBeforeChange(FullDocumentBeforeChange.REQUIRED); changeStream.forEach(event -> System.out.println("Received a change: " + event));
앞의 예에서는 FullDocumentBeforeChange.REQUIRED
옵션을 사용하도록 change stream을 구성합니다. 이 옵션은 교체, 업데이트 및 삭제 이벤트에 대해 사전 이미지를 요구하도록 change stream을 구성합니다. 사전 이미지를 사용할 수 없는 경우 드라이버에서 오류가 발생합니다.
문서의 amount
필드 값을 150
에서 2000
로 업데이트한다고 가정해 보겠습니다. 이 변경 이벤트는 다음과 같은 출력을 생성합니다.
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=null, fullDocumentBeforeChange=Document{{_id=..., amount=150, ...}}, ... }
옵션 목록은 FullDocumentBeforeChange 를 참조하세요. API 문서.
사후 이미지 구성 예시
다음 코드 예시에서는 사전 myColl
이미지를 포함하고 이벤트를 출력하도록 collection에서 change stream을 구성하는 방법을 보여 줍니다.
MongoCollection<Document> collection = database.getCollection("myColl"); ChangeStreamIterable<Document> changeStream = collection.watch() .fullDocument(FullDocument.WHEN_AVAILABLE); changeStream.forEach(event -> System.out.println("Received a change: " + event));
앞의 예에서는 FullDocument.WHEN_AVAILABLE
옵션을 사용하도록 change stream을 구성합니다. 이 옵션은 사용 가능한 경우 교체 및 업데이트 이벤트에 대해 수정된 문서의 사후 이미지를 반환하도록 change stream을 구성합니다.
문서의 color
필드 값을 "purple"
에서 "pink"
로 업데이트한다고 가정해 보겠습니다. 변경 이벤트는 다음과 같은 출력을 생성합니다.
Received a change: ChangeStreamDocument{ operationType=update, resumeToken={"_data": "..."}, namespace=myDb.myColl, destinationNamespace=null, fullDocument=Document{{_id=..., color=purple, ...}}, updatedFields={"color": purple}, ... }
옵션 목록은 FullDocument 를 참조하세요. API 문서.
추가 정보
API 문서
변경 스트림을 관리 데 사용되는 메서드 및 클래스에 대한 자세한 내용은 다음 API 설명서를 참조하세요.