Docs Menu
Docs Home
/
MongoDB Atlas
/

ストリーム プロセッサの管理

項目一覧

  • 前提条件
  • Considerations
  • ストリーム プロセッサを対話的に作成する
  • Atlas Stream Processing インスタンスに接続します。
  • パイプラインを定義します。
  • ストリーム プロセッサを作成します。
  • ストリーム プロセッサを作成する
  • ストリーム プロセッサを起動する
  • ストリーム プロセッサの停止
  • ストリーム プロセッサの削除
  • 使用可能なストリーム プロセッサを一覧表示する
  • ストリーム プロセッサからのサンプル
  • ストリーム プロセッサの統計情報を表示する

Atlas Stream Processing ストリーム プロセッサは、一意の名前を持つストリーム集計パイプラインのロジックをストリーミング データに適用します。 Atlas Stream Processing は、各ストリーム プロセッサの定義を永続ストレージに保存し、再利用できるようにします。 特定のストリーム プロセッサは、その定義が保存されているストリーム プロセシング インスタンス内でのみ使用できます。 Atlas Stream Processing は、ワーカーごとに最大4ストリーム プロセッサをサポートします。 この制限を超える追加のプロセッサには、Atlas Stream Processing は新しいリソースを割り当てます。

ストリーム プロセッサを作成および管理するには、次のものが必要です。

多くのストリーム プロセッサ コマンドでは、メソッド呼び出しで関連するストリーム プロセッサの名前を指定する必要があります。 次のセクションで説明される構文では、厳密に英数字の名前を使用することを前提としています。 ストリーム プロセッサの名前にハイフン(-)や完全停止(.)など、英数字以外の文字が含まれている場合は、名前を角括弧([])とdouble引用符("")で囲む必要があります。メソッド呼び出し(sp.["special-name-stream"].stats() と同様)。

sp.process()メソッドを使用して、ストリーム プロセッサを対話的に作成できます。 対話的に作成したストリーム プロセッサは、次の動作を示します。

  • 出力とデッドレター キューのドキュメントを shell に書込む

  • 作成後すぐに実行を開始

  • 10分間、またはユーザーがそれらを停止するまで実行します

  • 停止した後に永続化しない

対話的に作成するストリーム プロセッサは、プロトタイプ作成を目的としています。 永続的なストリーム プロセッサを作成するには、「 ストリーム プロセッサの作成 」を参照してください。

sp.process() の構文は次のとおりです。

sp.process(<pipeline>)
フィールド
タイプ
必要性
説明
pipeline
配列
必須
データのストリーミング配信に適用するストリーム集約パイプライン
1

Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

次のコマンドは、SCRAM-SHA-256 認証を使用して、 streamOwnerという名前のユーザーとして Atlas Stream Processing インスタンスに接続します。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

ユーザーのパスワードの入力を求められたら、入力します。

2

mongoshプロンプトで、 pipelineという名前の変数に適用する集計ステージを含む配列を割り当てます。

次の例では、接続レジストリ内のmyKafka接続のstuffトピックを$sourceとして使用し、 temperatureフィールドの値が46であるレコードを照合し、処理されたメッセージをoutputに出力します。接続レジストリにあるmySink接続のトピック。

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

次のコマンドは、 pipelineで定義されたロジックを適用するストリーム プロセッサを作成します。

sp.process(pipeline)

ストリーム プロセッサを作成するには、次の手順に従います。

Atlas管理APIは、ストリームプロセッサを作成するためのエンドポイントを提供します。

1 つのストリーム プロセッサを作成

mongoshを使用して新しいストリーム プロセッサを作成するには、 sp.createStreamProcessor()メソッドを使用します。 構文は次のとおりです。

sp.createStreamProcessor(<name>, <pipeline>, <options>)
Argument
タイプ
必要性
説明
name
string
必須
ストリーム プロセッサの論理名。 これは、ストリーム プロセシング インスタンス内で一意である必要があります。 この名前には、英数字のみを含める必要があります。
pipeline
配列
必須
データのストリーミング配信に適用するストリーム集約パイプライン
options
オブジェクト
任意
ストリーム プロセッサのさまざまなオプション設定を定義するオブジェクト。
options.dlq
オブジェクト
条件付き
ストリーム プロセシング インスタンスにデッド レター キューを割り当てるオブジェクト。 このフィールドは、 optionsフィールドを定義する場合に必要です。
options.dlq.connectionName
string
条件付き
接続レジストリ内の接続を識別する、人間が判読可能なラベル。 この接続は Atlas クラスターを参照する必要があります。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。
options.dlq.db
string
条件付き
options.dlq.connectionNameで指定されたクラスター上の Atlas データベースの名前。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。
options.dlq.coll
string
条件付き
options.dlq.dbで指定されるデータベース内のコレクションの名前。 このフィールドは、 options.dlqフィールドを定義する場合に必要です。
1

Atlas Stream Processing インスタンスに関連付けられた接続文字列を使用して接続し、 mongosh を使用して接続します。

次のコマンドは、SCRAM-SHA-256 認証を使用して、 streamOwnerという名前のユーザーとして Atlas Stream Processing インスタンスに接続します。

mongosh "mongodb://atlas-stream-78xq9gi1v4b4288x06a73e9f-zi30g.virginia-usa.a.query.mongodb-qa.net/?authSource=%24external&authMechanism=MONGODB-X509" \\
--tls --authenticationDatabase admin --username streamOwner

ユーザーのパスワードの入力を求められたら、入力します。

2

mongoshプロンプトで、 pipelineという名前の変数に適用する集計ステージを含む配列を割り当てます。

次の例では、接続レジストリ内のmyKafka接続のstuffトピックを$sourceとして使用し、 temperatureフィールドの値が46であるレコードを照合し、処理されたメッセージをoutputに出力します。接続レジストリにあるmySink接続のトピック。

pipeline = [
{$source: {"connectionName": "myKafka", "topic": "stuff"}},
{$match: { temperature: 46 }},
{
"$emit": {
"connectionName": "mySink",
"topic" : "output",
}
}
]
3

mongoshプロンプトで、DLQ の次のプロパティを含むオブジェクトを割り当てます。

  • connectionName

  • databaseName

  • コレクション名

次の例では、 metadata.dlqデータベース コレクション内のcluster01接続を介したDLQ を定義しています。

deadLetter = {
dlq: {
connectionName: "cluster01",
db: "metadata",
coll: "dlq"
}
}
4

次のコマンドは、 pipelineで定義されたロジックを適用するproc01という名前のストリーム プロセッサを作成します。 処理でエラーをスローするドキュメントは、 deadLetterで定義されたDLQ に書き込まれます。

sp.createStreamProcessor("proc01", pipeline, deadLetter)

ストリームプロセッサを起動するには、次のようにします。

Atlas管理APIは、ストリームプロセッサを起動するためのエンドポイントを提供します。

1 つのストリーム プロセッサを起動

mongoshを使用して既存のストリーム プロセッサを起動するには、 sp.<streamprocessor>.start()メソッドを使用します。 <streamprocessor>は、現在のストリーム プロセシング インスタンスに定義されているストリーム プロセッサの名前である必要があります。

たとえば、 proc01という名前のストリーム プロセッサを起動するには、次のコマンドを実行します。

sp.proc01.start()

このメソッドは以下を返します。

  • true ストリーム プロセッサが存在し、現在実行中でない場合。

  • false 存在しない、または存在し、現在実行中のストリーム プロセッサを起動しようとした場合。

ストリームプロセッサを停止するには:

Atlas Administration APIは、ストリーム プロセッサを停止するためのエンドポイントを提供します。

1 つのストリーム プロセッサを停止

mongoshで既存のストリーム プロセッサを停止するには、 sp.<streamprocessor>.stop()メソッドを使用します。 <streamprocessor>は、現在のストリーム プロセシング インスタンスに定義されている現在実行中のストリーム プロセッサの名前である必要があります。

たとえば、 proc01という名前のストリーム プロセッサを停止するには、次のコマンドを実行します。

sp.proc01.stop()

このメソッドは以下を返します。

  • true ストリーム プロセッサが存在し、現在実行中の場合。

  • false ストリーム プロセッサが存在しない場合、またはストリーム プロセッサが現在実行中でない場合は になります。

ストリーム プロセッサを削除するには:

Atlas管理APIは、ストリームプロセッサを削除するためのエンドポイントを提供します。

1 つのストリーム プロセッサを削除

mongoshを使用して既存のストリーム プロセッサを削除するには、 sp.<streamprocessor>.drop()メソッドを使用します。 <streamprocessor>は、現在のストリーム プロセシング インスタンスに定義されているストリーム プロセッサの名前である必要があります。

たとえば、 proc01という名前のストリーム プロセッサを削除するには、次のコマンドを実行します。

sp.proc01.drop()

このメソッドは以下を返します。

  • true ストリーム プロセッサが存在する場合。

  • false ストリーム プロセッサが存在しない場合。

ストリーム プロセッサを削除すると、Atlas Stream Processing がそのストリーム用にプロビジョニングしたすべてのリソースと、保存されたすべての状態が破棄されます。

使用可能なすべてのストリームプロセッサを一覧表示するには:

Atlas Administration APIには、利用可能なすべてのストリーム プロセッサを一覧表示するためのエンドポイントが用意されています。

1 つのストリーム プロセッサを削除

mongoshを使用して現在のストリーム プロセシング インスタンスで使用可能なすべてのストリーム プロセッサを一覧表示するには、 sp.listStreamProcessors()メソッドを使用します。 各ストリーム プロセッサに関連付けられている名前、開始時間、現在の状態、パイプラインを含むドキュメントのリストが返されます。 構文は次のとおりです。

sp.listStreamProcessors(<filter>)

<filter> は、リストをフィルタリングするフィールドを指定するドキュメントです。

次の例では、フィルタリングされていないリクエストの戻り値を示しています。

sp.listStreamProcessors()
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27},
28{
29 id: '0218',
30 name: "proc02",
31 last_modified: ISODate("2023-03-21T20:17:33.601Z"),
32 state: "STOPPED",
33 error_msg: '',
34 pipeline: [
35 {
36 $source: {
37 connectionName: "myKafka",
38 topic: "things"
39 }
40 },
41 {
42 $match: {
43 temperature: 41
44 }
45 },
46 {
47 $emit: {
48 connectionName: "mySink",
49 topic: "results",
50 }
51 }
52 ],
53 lastStateChange: ISODate("2023-03-21T20:18:26.139Z")
54}

同じストリーム プロセシング インスタンスで コマンドを再度実行し、 "running""state"をフィルタリングすると、次の出力が表示されます。

sp.listStreamProcessors({"state": "running"})
1{
2 id: '0135',
3 name: "proc01",
4 last_modified: ISODate("2023-03-20T20:15:54.601Z"),
5 state: "RUNNING",
6 error_msg: '',
7 pipeline: [
8 {
9 $source: {
10 connectionName: "myKafka",
11 topic: "stuff"
12 }
13 },
14 {
15 $match: {
16 temperature: 46
17 }
18 },
19 {
20 $emit: {
21 connectionName: "mySink",
22 topic: "output",
23 }
24 }
25 ],
26 lastStateChange: ISODate("2023-03-20T20:15:59.442Z")
27}

既存のストリーム プロセッサからサンプル結果の配列をmongoshを使用してSTDOUTに返すには、 sp.<streamprocessor>.sample()メソッドを使用します。 <streamprocessor>は、現在のストリーム プロセシング インスタンスに定義されている現在実行中のストリーム プロセッサの名前である必要があります。 たとえば、次のコマンドは、 proc01という名前のストリーム プロセッサからサンプリングしています。

sp.proc01.sample()

このコマンドは、 CTRL-Cを使用してキャンセルするまで、または返されたサンプルのサイズが累積40 MB になるまで継続的に実行されます。 ストリーム プロセッサは、サンプル内の無効なドキュメントを次の形式の_dlqMessageドキュメントで報告します。

{
_dlqMessage: {
_stream_meta: {
source: {
type: "<type>"
}
},
errInfo: {
reason: "<reasonForError>"
},
doc: {
_id: ObjectId('<group-id>'),
...
},
processorName: '<procName>',
instanceName: '<instanceName>',
dlqTime: ISODate('2024-09-19T20:04:34.263+00:00')
}
}

これらのメッセージを使用して、デッド レター キューコレクションを定義せずにデータのクリーンアップの問題を診断できます。

ストリームプロセッサの統計を表示するには:

Atlas Administration APIは、ストリーム プロセッサの統計を表示するためのエンドポイントを提供します。

1 つのストリーム プロセッサを取得

mongoshを使用して既存のストリーム プロセッサの現在のステータスを要約するドキュメントを返すには、 sp.<streamprocessor>.stats()メソッドを使用します。 streamprocessorは、現在のストリーム プロセシング インスタンスに定義されている現在実行中のストリーム プロセッサの名前である必要があります。 構文は次のとおりです。

sp.<streamprocessor>.stats({options: {<options>}})

ここで、 optionsは次のフィールドを持つ任意のドキュメントです。

フィールド
タイプ
説明
scale
integer
出力内の項目のサイズに使用する単位。 デフォルトでは、Atlas Stream Processing はアイテムのサイズをバイト単位で表示します。 KB 単位で表示するには、 1024scaleを指定します。
verbose
ブール値
出力ドキュメントの冗長レベルを指定するフラグ。 trueに設定されている場合、出力ドキュメントには、パイプライン内の各演算子の統計を報告するサブドキュメントが含まれます。 デフォルトはfalseです。

出力ドキュメントには、次のフィールドがあります。

フィールド
タイプ
説明
ns
string
ストリーム プロセッサが定義されている名前空間。
stats
オブジェクト
ストリーム プロセッサの動作状態を説明するドキュメント。
stats.name
string
ストリーム プロセッサの名前。
stats.status
string

ストリーム プロセッサの状態。 このフィールドには、次の値を指定できます。

  • starting

  • running

  • error

  • stopping

stats.scaleFactor
integer
サイズ フィールドが表示される単位。 1に設定されている場合、サイズはバイト単位で表示されます。 1024に設定されている場合、サイズはキロバイト単位で表示されます。
stats.inputMessageCount
integer
ストリームに公開されたドキュメントの数。 ドキュメントは、パイプライン全体を通過する場合ではなく、 $sourceステージを通過するとストリームに「公開」されたと見なされます。
stats.inputMessageSize
integer
ストリームに公開されたバイト数またはキロバイト数。 バイトは、パイプライン全体を通過する場合ではなく、 $sourceステージを通過するとストリームに「公開」と見なされます。
stats.outputMessageCount
integer
ストリームによって処理されたドキュメントの数。 ドキュメントはストリームによってパイプライン全体を通過すると、ストリームによって「処理済み」と見なされます。
stats.outputMessageSize
integer
ストリームによって処理されたバイト数またはキロバイト数。 バイトはパイプライン全体を通過すると、ストリームによって「処理済み」と見なされます。
stats.dlqMessageCount
integer
stats.dlqMessageSize
integer
stats.changeStreamTimeDifferenceSecs
integer
最新の変更ストリーム再開トークンによって表されるイベント時間とoplog 内の最新のイベントとの差(秒単位)。
stats.changeStreamState
token
最新の変更ストリーム再開トークン。 変更ストリーム ソースを持つストリーム プロセッサにのみ適用されます。
stats.stateSize
integer
Windowsがプロセッサの状態を保存するために使用するバイト数。
stats.watermark
integer
現在の浮動小数のタイムスタンプ。
stats.operatorStats
配列

プロセッサ パイプライン内の各演算子の統計情報。 Atlas Stream Processing では、 verboseオプションで渡された場合にのみこのフィールドが返されます。

stats.operatorStats は、多数のコアstatsフィールドの演算子ごとのバージョンを提供します。

  • stats.operatorStats.name

  • stats.operatorStats.inputMessageCount

  • stats.operatorStats.inputMessageSize

  • stats.operatorStats.outputMessageCount

  • stats.operatorStats.outputMessageSize

  • stats.operatorStats.dlqMessageCount

  • stats.operatorStats.dlqMessageSize

  • stats.operatorStats.stateSize

さらに、 stats.operatorStatsには次の一意のフィールドが含まれています。

  • stats.operatorStats.maxMemoryUsage

  • stats.operatorStats.executionTime

stats.operatorStats.maxMemoryUsage
integer
演算子の最大メモリ使用量(バイトまたはキロバイト単位)。
stats.operatorStats.executionTime
integer
演算子の合計実行時間(秒単位)。
stats.kafkaPartitions
配列
Apache Kafka のオフセット情報 プロバイダーのパーティション。はkafkaPartitions Apache Kafka を使用する接続にのみ適用されます ソース。
stats.kafkaPartitions.partition
integer
Apache Kafka トピックのパーティション番号。
stats.kafkaPartitions.currentOffset
integer
指定されたパーティションに対するストリーム プロセッサのオフセット。 この値は、ストリーム プロセッサが処理した以前のオフセットに1を加えた値に等しくなります。
stats.kafkaPartitions.checkpointOffset
integer
ストリーム プロセッサが Apache Kafka に最後にコミットしたオフセット 指定されたパーティションのプロバイダーとチェックポイント。このオフセットを超えるすべてのメッセージは最後のチェックポイントに記録されます。

たとえば、以下は、 inst01という名前のストリーム プロセシング インスタンス上のproc01という名前のストリーム プロセッサのステータスを示しており、アイテムサイズは KB 単位で表示されています。

sp.proc01.stats(1024)
{
ok: 1,
ns: 'inst01',
stats: {
name: 'proc01',
status: 'running',
scaleFactor: Long("1"),
inputMessageCount: Long("706028"),
inputMessageSize: 958685236,
outputMessageCount: Long("46322"),
outputMessageSize: 85666332,
dlqMessageCount: Long("0"),
dlqMessageSize: Long("0"),
stateSize: Long("2747968"),
watermark: ISODate("2023-12-14T14:35:32.417Z"),
ok: 1
},
}

戻る

接続の管理