변화를 주시하세요
변경 스트림 열기
다음 객체에서 watch()
메서드를 사용하여 MongoDB의 변경 사항을 확인할 수 있습니다.
각 객체에 대해 watch()
메서드는 변경 스트림을 열어 변경 이벤트 문서가 발생하면 이를 전송합니다.
watch()
메서드는 선택적으로 첫 번째 매개변수로 집계 단계 배열로 구성된 집계 파이프라인을 취합니다. 집계 단계에서는 변경 이벤트를 필터링하고 변환합니다.
다음 스니펫에서 $match
단계는 runtime
값이 15 미만인 모든 변경 이벤트 문서를 일치 결과로 표시하고, 나머지는 모두 필터링합니다.
const pipeline = [ { $match: { runtime: { $lt: 15 } } } ]; const changeStream = myColl.watch(pipeline);
watch()
메서드는 options
객체를 두 번째 매개 변수로 허용합니다. 이 객체로 구성할 수 있는 설정에 관한 자세한 내용은 이 섹션 끝에 있는 링크를 참조하세요.
watch()
메서드는 ChangeStream의 인스턴스를 반환합니다. 변경 스트림을 반복하거나 이벤트를 수신 대기하여 변경 스트림에서 이벤트를 읽을 수 있습니다.
변경 스트림에서 이벤트를 읽는 방식에 해당하는 탭을 선택합니다.
버전 4.12부터, ChangeStream
객체는 비동기 이터러블입니다. 이 변경 사항으로 for-await
루프를 사용하여 열린 변경 스트림에서 이벤트를 검색할 수 있습니다.
for await (const change of changeStream) { console.log("Received change: ", change); }
ChangeStream
객체 에서 다음과 같은 메서드를 호출할 수 있습니다.
hasNext()
스트림 에 남아 있는 문서 확인next()
스트림 의 다음 문서 를 요청 합니다.close()
ChangeStream을 닫습니다.
on()
메서드를 호출하여 ChangeStream
객체에 리스너 함수를 연결할 수 있습니다. 이 메서드는 Javascript EventEmitter
클래스에서 상속됩니다. 아래와 같이 문자열 "change"
를 첫 번째 매개변수로 전달하고 리스너 함수를 두 번째 매개변수로 전달합니다.
changeStream.on("change", (changeEvent) => { /* your listener function */ });
리스너 함수는 change
이벤트 가 방출될 때 트리거됩니다. 변경 이벤트 문서 가 수신되면 리스너에서 로직을 지정하여 이를 프로세스 할 수 있습니다.
pause()
를 호출하여 이벤트 방출을 중지하거나 resume()
을 호출하여 이벤트 방출을 계속하여 변경 스트림을 제어할 수 있습니다.
변경 이벤트 처리 ChangeStream
를 중지하려면 close() 인스턴스 에 대한 메서드입니다. 이렇게 하면 변경 스트림 이 닫히고 리소스가 확보됩니다.
changeStream.close();
경고
EventEmitter
모드와 Iterator
모드에서 ChangeStream
를 동시에 사용하는 것은 드라이버에서 지원하지 않으므로 오류가 발생합니다. 이는 드라이버가 어떤 소비자가 먼저 문서를 받을지 보장하지 못하는 정의되지 않은 행동을 방지하기 위한 것입니다.
예시
반복
다음 예제에서는 insertDB
데이터베이스의 haikus
컬렉션에서 변경 스트림을 열고 변경 이벤트 발생 시 이를 출력합니다.
참고
이 예시를 사용하여 MongoDB 인스턴스에 연결하고 샘플 데이터가 포함된 데이터베이스와 상호 작용할 수 있습니다. MongoDB 인스턴스에 연결하고 샘플 데이터 세트를 로드하는 방법에 대해 자세히 알아보려면 사용 예제 가이드를 참조하세요.
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
1 // Watch for changes in a collection by using a change stream 2 import { MongoClient } from "mongodb"; 3 4 // Replace the uri string with your MongoDB deployment's connection string. 5 const uri = "<connection string uri>"; 6 7 const client = new MongoClient(uri); 8 9 // Declare a variable to hold the change stream 10 let changeStream; 11 12 // Define an asynchronous function to manage the change stream 13 async function run() { 14 try { 15 const database = client.db("insertDB"); 16 const haikus = database.collection("haikus"); 17 18 // Open a Change Stream on the "haikus" collection 19 changeStream = haikus.watch(); 20 21 // Print change events as they occur 22 for await (const change of changeStream) { 23 console.log("Received change:\n", change); 24 } 25 // Close the change stream when done 26 await changeStream.close(); 27 28 } finally { 29 // Close the MongoDB client connection 30 await client.close(); 31 } 32 } 33 run().catch(console.dir);
참고
동일한 코드 스니펫
위의 JavaScript 및 TypeScript 코드 스니펫은 동일합니다. 이 사용 사례와 관련된 드라이버의 TypeScript 특정 기능은 없습니다.
이 코드를 실행한 후 삽입 또는 삭제 작업을 수행하는 등 haikus
컬렉션을 변경하면 터미널에 변경 이벤트 문서가 출력되는 것을 볼 수 있습니다.
예를 들어 컬렉션에 문서를 삽입하면 코드는 다음 출력을 인쇄합니다.
Received change: { _id: { _data: '...' }, operationType: 'insert', clusterTime: new Timestamp({ t: 1675800603, i: 31 }), fullDocument: { _id: new ObjectId("..."), ... }, ns: { db: 'insertDB', coll: 'haikus' }, documentKey: { _id: new ObjectId("...") } }
참고
업데이트에서 전체 문서 받기
업데이트 작업에 대한 정보가 포함된 변경 이벤트는 기본적으로 전체 업데이트된 문서가 아닌 수정된 필드만 반환합니다. 다음과 같이 옵션 객체의 fullDocument
필드를 "updateLookup"
으로 설정하여 문서의 가장 최신 버전도 반환하도록 변경 스트림을 구성할 수 있습니다.
const options = { fullDocument: "updateLookup" }; // This could be any pipeline. const pipeline = []; const changeStream = myColl.watch(pipeline, options);
리스너 기능
다음 예시에서는 haikus
컬렉션의 insertDB
데이터베이스에 있는 변경 스트림을 엽니다. 컬렉션에서 발생하는 변경 이벤트를 수신하고 인쇄하는 리스너 함수를 만들어 보겠습니다.
먼저 컬렉션에서 변경 스트림을 연 다음 on()
메서드를 사용하여 변경 스트림에 수신 대상을 정의합니다. 수신 대상을 설정한 후 컬렉션을 변경하여 변경 이벤트를 생성합니다.
컬렉션에서 변경 이벤트를 생성하기 위해 insertOne()
메서드를 사용하여 새 문서를 추가해 보겠습니다. 수신 함수가 등록되기 전에 insertOne()
가 실행될 수 있으므로 simulateAsyncPause
로 정의된 타이머를 사용하여 삽입을 실행하기 전에 1초를 기다립니다.
또한 문서 삽입 후 simulateAsyncPause
를 사용합니다. 이렇게 하면 리스너 함수가 변경 이벤트를 수신하고 리스너가 실행을 완료한 후 close()
메서드를 사용하여 ChangeStream
인스턴스를 닫기 전에 충분한 시간을 확보할 수 있습니다.
참고
타이머를 포함해야 하는 이유
이 예시에서 사용된 타이머는 데모용으로만 사용됩니다. 리스너를 등록하고 종료하기 전에 리스너가 변경 이벤트를 프로세스할 시간이 충분한지 확인합니다.
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
1 /* Change stream listener */ 2 3 import { MongoClient } from "mongodb"; 4 5 // Replace the uri string with your MongoDB deployment's connection string 6 const uri = "<connection string uri>"; 7 8 const client = new MongoClient(uri); 9 10 const simulateAsyncPause = () => 11 new Promise(resolve => { 12 setTimeout(() => resolve(), 1000); 13 }); 14 15 let changeStream; 16 async function run() { 17 try { 18 const database = client.db("insertDB"); 19 const haikus = database.collection("haikus"); 20 21 // Open a Change Stream on the "haikus" collection 22 changeStream = haikus.watch(); 23 24 // Set up a change stream listener when change events are emitted 25 changeStream.on("change", next => { 26 // Print any change event 27 console.log("received a change to the collection: \t", next); 28 }); 29 30 // Pause before inserting a document 31 await simulateAsyncPause(); 32 33 // Insert a new document into the collection 34 await myColl.insertOne({ 35 title: "Record of a Shriveled Datum", 36 content: "No bytes, no problem. Just insert a document, in MongoDB", 37 }); 38 39 // Pause before closing the change stream 40 await simulateAsyncPause(); 41 42 // Close the change stream and print a message to the console when it is closed 43 await changeStream.close(); 44 console.log("closed the change stream"); 45 } finally { 46 // Close the database connection on completion or error 47 await client.close(); 48 } 49 } 50 run().catch(console.dir);
참고
동일한 코드 스니펫
위의 JavaScript 및 TypeScript 코드 스니펫은 동일합니다. 이 사용 사례와 관련된 드라이버의 TypeScript 특정 기능은 없습니다.
이 페이지에 언급된 클래스 및 메서드에 대한 더 많은 자료를 보려면 다음 리소스를 방문하세요.