$validate
定义
$validate
阶段检查流媒体文档是否符合预期范围、值或数据类型的模式。
语法
$validate
阶段采用包含以下字段的文档:
字段 | 类型 | 必要性 | 说明 |
---|---|---|---|
| 文档 | 必需 | 用于根据用户定义的模式验证传入消息的表达式文档。 您可以使用除以下查询运算符之外的所有查询运算符来定义验证表达式:
|
| 字符串 | Optional | 指定当消息违反用户定义的模式时要采取的操作。 您可以指定以下值之一:
|
行为
您可以在$source
阶段之后、 $emit
或$merge
阶段之前的管道中的任何位置使用$validate
。
如果您为validationAction
字段指定discard
或dlq
选项,Atlas Stream Processing 将按以下格式记录验证失败的消息:
{ "t": <datetime>, "s": "<severity-level>", "c": "streams-<job-name>", "ctx": "<processed-pipeline>", "msg": "<message-body>", "attrs": { <result-of-logAttributes-evaluation> }, "tags": <array-of-strings>, "truncated": { <truncation-description> }, "size": <size-of-entry> }
下表描述了日志条目字段:
字段 | 类型 | 说明 |
---|---|---|
| 文档 | 包含对 |
| 字符串 | 发生故障的特定Atlas Stream Processing作业的名称。 |
| 字符串 | 正在处理的流媒体数据管道的名称。 |
| 字符串 | 验证失败的消息正文。 |
Atlas Stream Processing 仅支持 JSON schema Draft4 或更早版本。
验证器示例
以下文档展示了一个使用$and执行逻辑 AND 运算的验证器表达式示例:
{ $validate: { validator: { $and: [{ $expr: { $ne: [ "$Racer_Name", "Pace Car" ] } }, { $jsonSchema: { required: [ "Racer_Num", "Racer_Name", "lap", "Corner_Num", "timestamp" ], properties: { Racer_Num: { bsonType: "int", description: "'Racer_Num' is the integer number of the race car and is required" }, Racer_Name: { bsonType: "string", description: "'Racer_Name' must be a string and is required" }, lap: { bsonType: "int", minimum: 1, description: "'lap' must be a int and is required" }, Corner_Num: { bsonType: "int", minimum: 1, maximum: 4, description: "'Corner_Num' must be a int between 1 and 4 and is required" }, timestamp: { bsonType: "string", pattern: "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}\\.\\d{6}$", description: "'timestamp' must be a string matching iso date pattern and is required" } } } }] }, validationAction : "dlq" } }
示例
考虑一个从不同位置生成详细天气报告的流媒体数据源。 在以下示例聚合管道中,您将包括一个 $validate
阶段,以确保文档符合 Sample Weather Dataset 的架构。聚合有四个阶段:
$source
阶段与 Apache Kafka 代理建立连接,在名为my_weatherdata
的主题中收集这些报告,并将每条记录在摄取时传递到后续聚合阶段。$validate
阶段检查文档的position.coordinates
和sections
字段是否有数组值,将有数组值的文档传递给管道的其他部分,将没有数组值的文档传递给 DLQ。$match
阶段排除wind.speed.rate
值大于或等于30
的文件,并传递wind.speed.rate
值小于30
的文件。$merge
阶段将输出写入sample_weatherstream
数据库中名为stream
的 Atlas 集合。如果不存在此类数据库或集合,Atlas 会创建它们。
{ '$source': { connectionName: 'sample_weatherdata', topic: 'my_weatherdata', tsFieldName: 'ingestionTime' } }, { '$validate': { validator: { '$jsonSchema': { properties: { position: [Object], sections: [Object] } } }, validationAction: 'dlq' } }, { '$match': { 'wind.speed.rate': { '$lt': 30 } } }, { '$merge': { into: { connectionName: 'weatherStreamOutput', db: 'sample_weatherstream', coll: 'stream' } } }
要查看生成的 sample_weatherstream.sample
集合中的文档,可以使用 mongosh
连接到 Atlas 集群,运行以下命令:
注意
以下为代表性示例。流数据不是静态的,每个用户看到的都是不同的文档。
db.getSiblingDB("sample_weatherstream").stream.find()
{ _id: ObjectId('66b25302fe8bbac5f39dbdba'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('168843') } }, airTemperature: { quality: '9', value: 3.5 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1', value: 10.9 }, tendency: { code: '3', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '1', value: 1015.9 }, stationPressure: { quality: '9', value: 1022.5 } }, callLetters: 'JIVX', dataSource: '4', dewPoint: { quality: '9', value: 20.5 }, elevation: 9999, extremeAirTemperature: { code: 'N', period: 99.9, quantity: '9', value: -30.4 }, ingestionTime: ISODate('2024-08-06T16:44:50.322Z'), liquidPrecipitation: { condition: '9', depth: 7000, period: 12, quality: '9' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '7' }, period: { quality: '1', value: 3 } }, position: { coordinates: [ 120.7, -98.2 ], type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 999 }, presentWeatherObservationManual: { condition: '90', quality: '1' }, pressure: { quality: '1', value: 1028.2 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 11.1 }, sections: [ 'UG1', 'MA1', 'GA3', 'KA1', 'UA1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 390 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '06' }, lowCloudGenus: { quality: '9', value: '07' }, lowestCloudBaseHeight: { quality: '1', value: 800 }, lowestCloudCoverage: { quality: '9', value: '06' }, midCloudGenus: { quality: '9', value: '07' }, totalCoverage: { opaque: '99', quality: '1', value: '99' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 1200 }, cloudType: { quality: '9', value: '04' }, coverage: { quality: '1', value: '09' } }, st: 'x+36700+144300', type: 'FM-13', visibility: { distance: { quality: '9', value: 9000 }, variability: { quality: '9', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '00', quality: '9' }, waves: { height: 9.5, period: 4, quality: '9' } }, wind: { direction: { angle: 140, quality: '1' }, speed: { quality: '2', rate: 15.9 }, type: 'N' } }
观察此集合中的所有文档, position.coordinates
和 sections
均具有预期的 array
类型值。要查看验证失败的文档,对于名为 dlq
的死信队列,请运行以下命令:
db.getSiblingDB("sample_weatherstream").dlq.find()
{ _id: ObjectId('66b254d3a045fb1406047394'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('168949'), key: Binary.createFromBase64('', 0), headers: [] } }, errInfo: { reason: 'Input document found to be invalid in $validate stage' }, doc: { airTemperature: { quality: '9', value: 7.6 }, atmosphericPressureChange: { quantity24Hours: { quality: '9', value: 99.9 }, quantity3Hours: { quality: '1', value: 0.3 }, tendency: { code: '8', quality: '1' } }, atmosphericPressureObservation: { altimeterSetting: { quality: '9', value: 1015.9 }, stationPressure: { quality: '1', value: 1017 } }, callLetters: 'WRGL', dataSource: '4', dewPoint: { quality: '9', value: 25.3 }, elevation: 9999, extremeAirTemperature: { code: 'M', period: 99.9, quantity: '1', value: -30.9 }, liquidPrecipitation: { condition: '9', period: 99, quality: '9' }, pastWeatherObservationManual: { atmosphericCondition: { quality: '1', value: '2' }, period: { quality: '1', value: 6 } }, position: { coordinates: -100.2, type: 'Point' }, precipitationEstimatedObservation: { discrepancy: '5', estimatedWaterDepth: 17 }, presentWeatherObservationManual: { condition: '08', quality: '1' }, pressure: { quality: '9', value: 1001 }, qualityControlProcess: 'V020', seaSurfaceTemperature: { quality: '9', value: 10.4 }, sections: [ 'GA2', 'GA1', 'KA1', 'AA1', 'OA1' ], skyCondition: { cavok: 'N', ceilingHeight: { determination: 'C', quality: '1', value: 240 } }, skyConditionObservation: { highCloudGenus: { quality: '1', value: '02' }, lowCloudGenus: { quality: '9', value: '02' }, lowestCloudBaseHeight: { quality: '1', value: 150 }, lowestCloudCoverage: { quality: '1', value: '03' }, midCloudGenus: { quality: '1', value: '06' }, totalCoverage: { opaque: '99', quality: '1', value: '06' } }, skyCoverLayer: { baseHeight: { quality: '9', value: 450 }, cloudType: { quality: '9', value: '03' }, coverage: { quality: '1', value: '07' } }, st: 'x+20500-074300', type: 'SAO', visibility: { distance: { quality: '9', value: 3800 }, variability: { quality: '9', value: 'N' } }, waveMeasurement: { method: 'I', seaState: { code: '00', quality: '9' }, waves: { height: 37.5, period: 7, quality: '9' } }, wind: { direction: { angle: 230, quality: '1' }, speed: { quality: '1', rate: 46.3 }, type: 'N' }, ingestionTime: ISODate('2024-08-06T16:52:35.287Z'), _stream_meta: { source: { type: 'kafka', topic: 'my_weatherdata', partition: 0, offset: Long('168949') } } }, processorName: 'sampleWeather' }
请注意,死信队列中的所有文档对 position.coordinates
、 sections
或两者均具有无效值。