Docs Menu
Docs Home
/ / /
Ruby MongoDB Driver
/

변경 스트림

이 페이지의 내용

  • collection의 변경 사항 보기
  • 데이터베이스의 변경 사항 모니터링
  • cluster의 변경 사항 보기
  • change stream 닫기
  • 변경 스트림 재개하기

MongoDB Server 버전 3.6부터 집계 프레임워크에서 새로운 $changeStream 파이프라인 단계가 지원됩니다. 집계 파이프라인에서 이 단계를 먼저 지정하면 사용자가 특정 collection의 모든 변경 사항에 대해 알림을 보내도록 요청할 수 있습니다. MongoDB 4.0부터 change stream은 collection 외에도 데이터베이스와 cluster에서 지원됩니다.

Ruby 드라이버는 이 새로운 파이프라인 단계를 사용하여 특정 collection, 데이터베이스 또는 cluster의 변경 사항에 대한 알림을 받을 수 있는 API를 제공합니다. 파이프라인 연산자와 애그리게이션 프레임워크를 사용하여 change stream을 직접 생성할 수 있지만, 시간 초과, 네트워크 오류, 다음을 나타내는 서버 오류가 있는 경우 드라이버가 change stream을 한 번 재개하므로 아래에 설명된 드라이버 API를 사용하는 것이 좋습니다. 페일오버가 진행 중이거나 다른 유형의 재개 가능한 오류가 발생한 경우.

change stream에는 서버에서 "majority" 읽기 고려 (read concern)가 필요하거나 읽기 고려 (read concern)가 필요하지 않습니다.

여기 에설명된 문제로 인해 변경 스트림이 JRuby에서 제대로 작동하지 않습니다. . 즉, JRuby는 백그라운드 녹색 스레드의 열거자에서 #next 를 적극적으로 평가하므로 변경 스트림에서 #next 을 호출하면 백그라운드의 루프에서 getMores가 호출됩니다.

collection change stream은 collection에서 #watch 메서드를 호출하여 생성됩니다.

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
collection = client[:test]
stream = collection.watch
collection.insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

알림이 제공되면 받을 수도 있습니다.

stream = collection.watch
enum = stream.to_enum
while doc = enum.next
process(doc)
end

next 메서드는 변경 사항이 제공될 때까지 cluster를 차단하고 폴링합니다. 블로킹 없이 변경 스트림을 반복하려면 try_next 메서드를 사용합니다. 이 메서드는 서버의 변경 사항을 최대 max_await_time_ms 밀리초까지 기다리며, 변경 사항이 수신되지 않으면 nil을 반환합니다. 재개할 수 없는 오류가 있는 경우 nexttry_next 모두 예외를 발생시킵니다. collection의 change stream을 무기한 읽는 예제는 아래 재개하기 섹션을 참조하세요.

change stream은 프레임워크 파이프라인 연산자 형식의 필터를 사용할 수 있습니다.

stream = collection.watch([{'$match' => { 'operationType' => {'$in' => ['insert', 'replace'] } } },
{'$match' => { 'fullDocument.n' => { '$gte' => 1 } } }
])
enum = stream.to_enum
while doc = enum.next
process(doc)
end

change stream은 데이터베이스 내의 모든 collection에 대한 변경 사항과 데이터베이스 삭제와 같은 데이터베이스 전체 이벤트를 알립니다.

데이터베이스 change stream은 객체에서 #watch 메서드를 호출하여 생성됩니다.

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
database = client.database
stream = database.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

cluster change stream은 collection, cluster 내의 모든 데이터베이스의 변경 사항 및 cluster 전체 이벤트를 알립니다.

cluster change stream은 클라이언트 객체(클러스터 객체가 아님)에서 #watch 메서드를 호출하여 생성됩니다.

client = Mongo::Client.new([ '127.0.0.1:27017' ], :database => 'test')
stream = client.watch
client[:test].insert_one(a: 1)
doc = stream.to_enum.next
process(doc)

#close 메서드를 호출하여 변경 스트림을 닫을 수 있습니다.

stream.close

change stream은 초기 애그리게이션과 다음 batch 수신을 위한 getMore 요청이라는 두 가지 유형의 작업으로 구성됩니다.

드라이버는 네트워크 오류가 발생하고 서버가 상태가 변경되었음을 나타내는 오류를 반환하는 경우(예: 더 이상 프라이머리가 아닌 경우) 각 getMore 작업을 자동으로 재시도합니다. 드라이버는 초기 애그리게이션을 다시 시도하지 않습니다.

실질적인 텀 이는 예를 들면 다음과 같습니다.

  • 호출이 cluster에 읽기 설정 (read preference)을 collection.watch 충족할 만큼 가용 노드가 충분하지 않으면 실패합니다."majority"

  • collection.watch 가 성공적으로 반환된 후 cluster가 이후에 election을 경험하거나 노드를 잃은 후 충분히 빠르게 복구되는 경우, next 또는 each 메서드를 통한 change stream 읽기는 애플리케이션에 투명하게 계속 진행됩니다.

변경 사항을 손실하거나 두 번 이상 변경 사항을 처리하지 않고 무기한 안정적으로 변경 사항을 감시하려면 애플리케이션은 change stream에 대한 다시 시작 토큰을 추적하고 드라이버의 자동 다시 시작도 실패하게 하는 확장된 오류 조건이 발생할 경우 change stream을 다시 시작해야 합니다. . 다음 코드 스니펫은 모든 MongoDB 또는 네트워크 오류에 대해 change stream을 무기한 반복하고, resume_token change stream 메서드를 사용하여 재개 토큰을 검색하고, :resume_after 옵션을 사용하여 change stream을 다시 시작하는 예를 보여줍니다.

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
while doc = enum.next
process(doc)
token = stream.resume_token
end
rescue Mongo::Error
sleep 1
end
end

위의 반복은 enum.next 호출에서 차단되며 이 코드를 실행하는 Ruby 프로세스가 종료된 이벤트 처리 재개를 허용하지 않습니다. 드라이버는 change stream에 변경 사항이 없는 경우 무기한 차단하는 대신 (약간 대기 후) nil 을(를) 반환하는 try_next 메서드도 제공합니다. try_next 메서드를 사용하면 특정 요청이 변경 사항을 반환하지 않는 경우에도 각 getMore 요청 후에 재개 토큰을 유지할 수 있으므로 재개 토큰이 oplog의 상단에 유지되며 애플리케이션은 다음을 수행할 수 있습니다. 변경 사항을 처리하는 프로세스가 종료되더라도 유지합니다:

token = nil
loop do
begin
stream = collection.watch([], resume_after: token)
enum = stream.to_enum
doc = enum.try_next
if doc
process(doc)
end
token = stream.resume_token
# Persist +token+ to support resuming processing upon process restart
rescue Mongo::Error
sleep 1
end
end

호출에서 document를 반환하지 않더라도 모든 try_next 호출 후에 change stream에서 재개 토큰을 검색해야 합니다.

재개 토큰은 _id 각 change stream 문서의 필드에도 제공됩니다. _id 필드를 읽는 것은 애플리케이션에서 밖으로 프로젝션될 수 있고 getMore 가 문서를 반환하지 않는 경우 _id 필드만 사용하면 재개 토큰을 진행하지 않기 때문에 권장되지 않습니다.

돌아가기

GridFS.