Docs Menu
Docs Home
/
MongoDB Atlas
/ /

$validate

이 페이지의 내용

  • 정의
  • 구문
  • 행동
  • 유효성 검사기 예제
  • 예시

$validate 단계에서는 스트리밍 문서가 예상 범위, 값 또는 데이터 유형의 스키마를 준수하는지 확인합니다.

$validate

$validate 파이프라인 단계의 프로토타입 형식은 다음과 같습니다.

{
"$validate": {
"validator": { <filter> },
"validationAction" : "discard" | "dlq"
}
}

$validate 단계에서는 다음 필드가 있는 문서를 사용합니다.

필드
유형
필요성
설명
validator
문서
필수 사항

사용자 정의 스키마에 대해 수신 메시지의 유효성을 검사하는 데 사용되는 표현식 문서입니다. 다음 쿼리 연산자 를 제외한 모든 연산자를 사용하여 유효성 검사 표현식을 정의할 수 있습니다.

  • $near

  • $nearSphere

  • $text

  • $where

validationAction
문자열
옵션

메시지가 사용자 정의 스키마를 위반할 때 취할 조치를 지정합니다. 다음 값 중 하나를 지정할 수 있습니다.

  • discard: 메시지를 삭제합니다. validationAction 에 값을 지정하지 않으면 이것이 기본 동작입니다.

  • dlq: 스트림 프로세서 구성에 정의된 collection에 위반 사항을 기록하고 트랜잭션 보장 없이 최선형 삭제를 수행합니다.

파이프라인의 어느 점에서나 $source 단계 이후, $emit 또는 $merge 단계 이전에 $validate 를 사용할 수 있습니다.

validationAction 필드에 discard 또는 dlq 옵션을 지정하는 경우 Atlas Stream Processing은 유효성 검사에 실패한 메시지를 다음 형식으로 기록합니다.

{
"t": <datetime>,
"s": "<severity-level>",
"c": "streams-<job-name>",
"ctx": "<processed-pipeline>",
"msg": "<message-body>",
"attrs": {
<result-of-logAttributes-evaluation>
},
"tags": <array-of-strings>,
"truncated": {
<truncation-description>
},
"size": <size-of-entry>
}

다음 표에서는 로그 항목 필드에 대해 설명합니다.

필드
유형
설명
attrs
문서
$validate 정의의 logAttributes 필드를 평가한 결과가 포함된 문서입니다. 결과는 필드 목록입니다.
c
문자열
실패가 발생한 특정 Atlas Stream Processing 작업의 이름입니다.
ctx
문자열
처리 중인 스트리밍 데이터 파이프라인의 이름입니다.
msg
문자열
유효성 검사에 실패한 메시지 본문입니다.

Atlas Stream Processing 은 Draft JSON schema 만 4 지원합니다. 또는 그 이전 버전.

다음 문서에서는 $and를 사용하여 논리적 AND 연산을 수행하는 유효성 검사기 표현식의 예를 보여 줍니다.

{
$validate: {
validator: {
$and: [{
$expr: {
$ne: [
"$Racer_Name",
"Pace Car"
]
}
},
{
$jsonSchema: {
required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ],
properties: {
Racer_Num: {
bsonType: "int",
description: "'Racer_Num' is the integer number of the race car and is required"
},
Racer_Name: {
bsonType: "string",
description: "'Racer_Name' must be a string and is required"
},
lap: {
bsonType: "int",
minimum: 1,
description: "'lap' must be a int and is required"
},
Corner_Num: {
bsonType: "int",
minimum: 1,
maximum: 4,
description: "'Corner_Num' must be a int between 1 and 4 and is required"
},
timestamp: {
bsonType: "string",
pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$",
description: "'timestamp' must be a string matching iso date pattern and is required"
}
}
}
}]
}, validationAction : "dlq"
}
}

다양한 위치에서 상세한 날씨 보고서를 생성하는 스트리밍 데이터 소스를 고려해 보세요. 다음 예시 집계 파이프라인에서, 문서가 샘플 날씨 데이터 세트의 스키마에 부합하도록 $validate 단계를 포함합니다. 이 집계에는 4단계가 있습니다.

  1. $source 단계는 Apache Kafka 브로커와 연결을 설정하여 my_weatherdata라는 이름의 주제에서 이러한 보고서를 수집하므로, 각 기록이 수집될 때 후속 집계 단계에 전달됩니다.

  2. $validate 단계에서는 문서에 position.coordinatessections 필드에 대한 배열 값이 있는지 확인하여, 배열 값이 있는 문서는 파이프라인의 나머지 부분에 전달하고 배열 값이 없는 문서는 DLQ에 전달합니다.

  3. $match 단계는 wind.speed.rate 값이 30 이상인 문서를 제외하고, wind.speed.rate 값이 30 미만인 문서를 전달합니다.

  4. $merge 단계는 sample_weatherstream 데이터베이스의 stream라는 Atlas 컬렉션에 출력을 기록합니다. 해당 데이터베이스나 컬렉션이 존재하지 않으면 Atlas가 이를 생성합니다.

{
'$source': {
connectionName: 'sample_weatherdata',
topic: 'my_weatherdata',
tsFieldName: 'ingestionTime'
}
},
{
'$validate': {
validator: {
'$jsonSchema': { properties: { position: [Object], sections: [Object] } }
},
validationAction: 'dlq'
}
},
{ '$match': { 'wind.speed.rate': { '$lt': 30 } } },
{
'$merge': {
into: {
connectionName: 'weatherStreamOutput',
db: 'sample_weatherstream',
coll: 'stream'
}
}
}

결과 sample_weatherstream.sample 컬렉션에 있는 문서를 보려면 mongosh를 사용하여 Atlas 클러스터에 연결한 후 다음 명령을 실행하세요.

참고

다음은 대표적인 예시입니다. 스트리밍 데이터는 정적이지 않으며 각 사용자는 서로 다른 문서를 봅니다.

db.getSiblingDB("sample_weatherstream").stream.find()
{
_id: ObjectId('66b25302fe8bbac5f39dbdba'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('168843')
}
},
airTemperature: { quality: '9', value: 3.5 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 10.9 },
tendency: { code: '3', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '1', value: 1015.9 },
stationPressure: { quality: '9', value: 1022.5 }
},
callLetters: 'JIVX',
dataSource: '4',
dewPoint: { quality: '9', value: 20.5 },
elevation: 9999,
extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 },
ingestionTime: ISODate('2024-08-06T16:44:50.322Z'),
liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '7' },
period: { quality: '1', value: 3 }
},
position: { coordinates: [ 120.7, -98.2 ], type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 },
presentWeatherObservationManual: { condition: '90', quality: '1' },
pressure: { quality: '1', value: 1028.2 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 11.1 },
sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 390 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '06' },
lowCloudGenus: { quality: '9', value: '07' },
lowestCloudBaseHeight: { quality: '1', value: 800 },
lowestCloudCoverage: { quality: '9', value: '06' },
midCloudGenus: { quality: '9', value: '07' },
totalCoverage: { opaque: '99', quality: '1', value: '99' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 1200 },
cloudType: { quality: '9', value: '04' },
coverage: { quality: '1', value: '09' }
},
st: 'x+36700+144300',
type: 'FM-13',
visibility: {
distance: { quality: '9', value: 9000 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 9.5, period: 4, quality: '9' }
},
wind: {
direction: { angle: 140, quality: '1' },
speed: { quality: '2', rate: 15.9 },
type: 'N'
}
}

이 컬렉션의 모든 문서가 position.coordinatessections에 대해 예상되는 array 타입의 값을 가지고 있음을 확인합니다. dlq라는 이름의 데드 레터 큐에서 유효성 검증에 실패한 문서를 보려면, 다음 명령어를 실행하세요.

db.getSiblingDB("sample_weatherstream").dlq.find()
{
_id: ObjectId('66b254d3a045fb1406047394'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('168949'),
key: Binary.createFromBase64('', 0),
headers: []
}
},
errInfo: { reason: 'Input document found to be invalid in $validate stage' },
doc: {
airTemperature: { quality: '9', value: 7.6 },
atmosphericPressureChange: {
quantity24Hours: { quality: '9', value: 99.9 },
quantity3Hours: { quality: '1', value: 0.3 },
tendency: { code: '8', quality: '1' }
},
atmosphericPressureObservation: {
altimeterSetting: { quality: '9', value: 1015.9 },
stationPressure: { quality: '1', value: 1017 }
},
callLetters: 'WRGL',
dataSource: '4',
dewPoint: { quality: '9', value: 25.3 },
elevation: 9999,
extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 },
liquidPrecipitation: { condition: '9', period: 99, quality: '9' },
pastWeatherObservationManual: {
atmosphericCondition: { quality: '1', value: '2' },
period: { quality: '1', value: 6 }
},
position: { coordinates: -100.2, type: 'Point' },
precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 },
presentWeatherObservationManual: { condition: '08', quality: '1' },
pressure: { quality: '9', value: 1001 },
qualityControlProcess: 'V020',
seaSurfaceTemperature: { quality: '9', value: 10.4 },
sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ],
skyCondition: {
cavok: 'N',
ceilingHeight: { determination: 'C', quality: '1', value: 240 }
},
skyConditionObservation: {
highCloudGenus: { quality: '1', value: '02' },
lowCloudGenus: { quality: '9', value: '02' },
lowestCloudBaseHeight: { quality: '1', value: 150 },
lowestCloudCoverage: { quality: '1', value: '03' },
midCloudGenus: { quality: '1', value: '06' },
totalCoverage: { opaque: '99', quality: '1', value: '06' }
},
skyCoverLayer: {
baseHeight: { quality: '9', value: 450 },
cloudType: { quality: '9', value: '03' },
coverage: { quality: '1', value: '07' }
},
st: 'x+20500-074300',
type: 'SAO',
visibility: {
distance: { quality: '9', value: 3800 },
variability: { quality: '9', value: 'N' }
},
waveMeasurement: {
method: 'I',
seaState: { code: '00', quality: '9' },
waves: { height: 37.5, period: 7, quality: '9' }
},
wind: {
direction: { angle: 230, quality: '1' },
speed: { quality: '1', rate: 46.3 },
type: 'N'
},
ingestionTime: ISODate('2024-08-06T16:52:35.287Z'),
_stream_meta: {
source: {
type: 'kafka',
topic: 'my_weatherdata',
partition: 0,
offset: Long('168949')
}
}
},
processorName: 'sampleWeather'
}

데드 레터 큐에 있는 모든 문서가 position.coordinates, sections 또는 둘 다에 대해 유효하지 않은 값을 가지고 있는지 확인합니다.

돌아가기

$source