MongoDB によるレジリエントなアプリケーションのビルド
項目一覧
MongoDB の機能を活用して、レプリカセットの選挙をグレースフルに処理するアプリケーション コードを作成するには、次の手順を実行する必要があります。
最新のドライバーをインストールします。
すべてのホストを指定する接続文字列を使用します。
再試行可能な書込み と 再試行可能な読み取り を使用します。
アプリケーションに適した
majority
の書込み保証と読み取り保証を使用します。アプリケーション内のエラーを処理します。
最新のドライバーのインストール
まず、 MongoDB ドライバーからご使用言語の最新ドライバーをインストールします。 ドライバーはアプリケーションからデータベースにクエリを接続して中継します。 最新のドライバーを使用すると、最新の MongoDB 機能が有効になります。
次に、アプリケーションに依存関係をインポートします。
Maven を使用している場合 に、以下をpom.xml
依存関係リストに追加します。
<dependencies> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver-sync</artifactId> <version>4.0.1</version> </dependency> </dependencies>
Gradle を使用している場合は、次のものを build.gradle
依存関係リストに追加します。
dependencies { compile 'org.mongodb:mongodb-driver-sync:4.0.1' }
// Latest 'mongodb' version installed with npm const MongoClient = require('mongodb').MongoClient;
接続文字列
配置内のすべてのホストを指定する接続文字列を使用して、アプリケーションをデータベースに接続します。 配置でレプリカセットの選挙が実行され、新しいプライマリが選択された場合、配置内のすべてのホストを指定する接続文字列によって、アプリケーション ロジックなしで新しいプライマリが検出されます。
次のいずれかを使用して、配置内のすべてのホストを指定できます。
接続文字列では、オプション( retryWritesとwriteConcern など)を指定することもできます。
接続文字列を使用して、アプリケーション内で MongoDB クライアントをインスタンス化します。
// Create a variable for your connection string String uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]"; // Instantiate the MongoDB client with the URI MongoClient client = MongoClients.create(uri);
// Create a variable for your connection string const uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]"; // Instantiate the MongoDB client with the URI const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
再試行可能な書込みと読み取り
注意
MongoDB バージョン3.6および4.2互換ドライバーを使用すると、MongoDB はデフォルトで書込みと読み取りの両方を 1 回再試行します。
再試行可能な書き込み
再試行可能な書込みを使用して、特定の書込み操作が失敗した場合に 1 回再試行します。
書き込みを 1 回だけ 再試行する ことは、アプリケーションが正常な プライマリ ノード を一時的に見つけられない場合に、一時的なネットワークエラーやレプリカセットの選挙を処理するための最善の戦略です。 再試行が成功すると、操作全体が成功し、エラーは返されません。 操作が失敗した場合は、次の理由が原因で失敗した可能性があります。
永続的なネットワークエラー、または
無効なコマンド。
操作が失敗した場合、アプリケーションは自分自身を処理する必要があります。
再試行可能な読み取り
MongoDB バージョン3.6および4.2互換ドライバーの使用に失敗した場合、読み取り操作は 1 回自動的に再試行されます。 読み取りを再試行するようにアプリケーションを構成する必要はありません。
書込み保証 (write concern) と読み取り保証 (read concern)
書込み保証 (write concern) と読み取り保証 (read concern) を使用して、アプリケーションの整合性と可用性を調整できます。 懸念が深い場合は、データベース操作により強力なデータ整合性保証が必要になりますが、整合性要件を緩和すると可用性が向上します。
例
アプリケーションが金銭の残高を処理する場合、整合性は非常に重要です。 majority
の書込み保証と読み取り保証 を使用して、古いデータやロールバックされる可能性のあるデータからの読み取りを行わないようにできます。
あるいは、アプリケーションが 1 秒ごとに数百のセンサーからの温度データを記録する場合、最新の読み取り値を含まないデータを読み取る場合は問題が発生しない場合があります。 整合性要件を緩和すると、そのデータへのアクセスが速くなります。
書込み保証 (write concern)
接続string URI を使用してレプリカセットの 書込み保証レベル を設定できます。majority
書込み保証(write concern)を使用して、データがデータベースに正常に書き込まれ、永続化されるようにします。 これは推奨のデフォルトであり、ほとんどのユースケースで十分です。
majority
など、確認応答が必要な書込み保証を使用する場合は、そのレベルの確認応答を実現するための書込みの最大時間制限を指定することもできます。
すべての書き込み (write) のwtimeoutMS接続文字列パラメータ、または
単一の書込み (write) 操作のwtimeoutオプション。
時間制限を使用するかどうかと、使用する値は、アプリケーションのコンテキストによって異なります。
重要
書込みに時間制限を指定せず、書込み保証 (write concern) レベルが達成できない場合、書込み (write) 操作は完了しません。
読み取り保証(read concern)
接続文字列 URI を使用して、レプリカセットの 読み取り保証レベル を設定できます。理想的な読み取り保証はアプリケーションの要件によって異なりますが、ほとんどのユースケースではデフォルトで十分です。 デフォルトの読み取り保証を使用する場合、接続文字列パラメーターは必要ありません。
読み取り保証 (read concern) を指定すると、アプリケーションがデータベースから受信するデータの保証が強化されます。
注意
アプリケーションが使用する書込み保証 (write concern) と読み取り保証 (read concern) の特定の組み合わせは、操作順序の保証に影響します。 これは 因果整合性 と呼ばれます。 因果整合性の保証の詳細については、「因果整合性、読み取り保証、書込み保証 」を参照してください。
Error Handling
再試行可能な書き込みで処理されていないコマンド、ネットワークの停止、ネットワークエラーはエラーを返します。 エラーの詳細については、ドライバーのAPIドキュメントを参照してください。
たとえば、アプリケーションが重複する_id
を含むドキュメントを挿入しようとすると、ドライバーは次のようなエラーを返します。
Unable to insert due to an error: com.mongodb.MongoWriteException: E11000 duplicate key error collection: <db>.<collection> ...
{ "name": : "MongoError", "message": "E11000 duplicate key error collection on: <db>.<collection> ... ", ... }
適切なエラー処理を行わないと、エラーによってアプリケーションが再起動されるまでリクエストの処理がブロックされる可能性があります。
アプリケーションは、クラッシュしたり副作用のないエラーを処理する必要があります。 重複する_id
を挿入するアプリケーションの以前の例では、そのアプリケーションは次のようにエラーを処理できます。
// Declare a logger instance from java.util.logging.Logger private static final Logger LOGGER = ... ... try { InsertOneResult result = collection.insertOne(new Document() .append("_id", 1) .append("body", "I'm a goofball trying to insert a duplicate _id")); // Everything is OK LOGGER.info("Inserted document id: " + result.getInsertedId()); // Refer to the API documentation for specific exceptions to catch } catch (MongoException me) { // Report the error LOGGER.severe("Failed due to an error: " + me); }
... collection.insertOne({ _id: 1, body: "I'm a goofball trying to insert a duplicate _id" }) .then(result => { response.sendStatus(200) // send "OK" message to the client }, err => { response.sendStatus(400); // send "Bad Request" message to the client });
この例の挿入操作では、 _id
フィールドが一意である必要があるため、2 回目に呼び出されるときに「重複キー」エラーがスローされます。 アプリケーションはエラーをキャッチし、クライアントに通知され、アプリは実行を続行します。 しかし、挿入操作は失敗します。ユーザーに メッセージを表示するか、操作を再試行するか、または別の操作を実行するかは、ユーザーが決定する必要があります。
エラーは常にログに記録する必要があります。 これ以上の処理エラーを発生させる一般的な戦略は次のとおりです。
エラーを、エラー メッセージとともにクライアントに返します。 これは、エラーを解決できず、アクションが完了できないことをユーザーに通知する必要がある場合に適した戦略です。
バックアップ データベースに書き込みます。 これは、エラーを解決できないが、リクエスト データが失われるリスクを避けたい場合に適した戦略です。
操作を1 回のデフォルトの 再試行 を超えて再試行します。 これは、エラーの原因をプログラムで解決して再試行する場合に適した戦略です。
アプリケーションのコンテキストに最適な戦略を選択する必要があります。
例
重複キー エラーの例では、エラーをログに記録する必要がありますが、操作は成功しないため、再試行しないでください。 代わりに、フォールバック データベースに書き込みを行い、後でそのデータベースの内容を確認して、情報が失われることを確認することができます。 ユーザーは他に何もする必要がなく、データが記録されるため、クライアントにエラーメッセージを送信しないことを選択できます。
ネットワークエラーの計画
操作が完了せず、アプリケーションが新しい操作を実行する際にブロックされる場合は、エラーを返すことが推奨されます。 maxTimeMSメソッドを使用して、個々の操作に時間制限を設定でき、その時間制限を超えた場合にアプリケーションが処理するエラーを返します。
各操作に設定する時間制限は、その操作のコンテキストによって異なります。
例
アプリケーションがinventory
コレクションから簡単な製品情報を読み取って表示する場合、これらの読み取り操作にかかる時間は 1 時間のみであることがかなり確実です。 クエリの実行時間が異常に長い場合は、ネットワークの問題が永続していることを示す適切なインジケーターです。 この操作でmaxTimeMS
を 5000、つまり 5 秒に設定すると、アプリケーションはネットワークの問題があることを確認するとすぐにフィードバックを受け取ることを意味します。
回復力のあるサンプルアプリケーション
次のサンプルアプリケーションでは、回復力のあるアプリケーションを構築するための推奨事項をまとめています。
このアプリケーションは、 http://localhost:3000 で 2 つのエンドポイントを公開するシンプルなユーザー レコード API です。
方式 | エンドポイント | 説明 |
---|---|---|
GET | /users | users コレクションからユーザー名のリストを取得します。 |
POST | /users | リクエスト本文に name が必要です。 新しいユーザーをusers コレクションに追加します。 |
注意
次のサーバー アプリケーションは NaHTTPD を使用します および 実行前に、プロジェクトに 依存関係JSON として追加する必要があります。
1 // File: App.java 2 3 import java.util.Map; 4 import java.util.logging.Logger; 5 6 import org.bson.Document; 7 import org.json.JSONArray; 8 9 import com.mongodb.MongoException; 10 import com.mongodb.client.MongoClient; 11 import com.mongodb.client.MongoClients; 12 import com.mongodb.client.MongoCollection; 13 import com.mongodb.client.MongoDatabase; 14 15 import fi.iki.elonen.NanoHTTPD; 16 17 public class App extends NanoHTTPD { 18 private static final Logger LOGGER = Logger.getLogger(App.class.getName()); 19 20 static int port = 3000; 21 static MongoClient client = null; 22 23 public App() throws Exception { 24 super(port); 25 26 // Replace the uri string with your MongoDB deployment's connection string 27 String uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority"; 28 client = MongoClients.create(uri); 29 30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); 31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n"); 32 } 33 34 public static void main(String[] args) { 35 try { 36 new App(); 37 } catch (Exception e) { 38 LOGGER.severe("Couldn't start server:\n" + e); 39 } 40 } 41 42 43 public Response serve(IHTTPSession session) { 44 StringBuilder msg = new StringBuilder(); 45 Map<String, String> params = session.getParms(); 46 47 Method reqMethod = session.getMethod(); 48 String uri = session.getUri(); 49 50 if (Method.GET == reqMethod) { 51 if (uri.equals("/")) { 52 msg.append("Welcome to my API!"); 53 } else if (uri.equals("/users")) { 54 msg.append(listUsers(client)); 55 } else { 56 msg.append("Unrecognized URI: ").append(uri); 57 } 58 } else if (Method.POST == reqMethod) { 59 try { 60 String name = params.get("name"); 61 if (name == null) { 62 throw new Exception("Unable to process POST request: 'name' parameter required"); 63 } else { 64 insertUser(client, name); 65 msg.append("User successfully added!"); 66 } 67 } catch (Exception e) { 68 msg.append(e); 69 } 70 } 71 72 return newFixedLengthResponse(msg.toString()); 73 } 74 75 static String listUsers(MongoClient client) { 76 MongoDatabase database = client.getDatabase("test"); 77 MongoCollection<Document> collection = database.getCollection("users"); 78 79 final JSONArray jsonResults = new JSONArray(); 80 collection.find().forEach((result) -> jsonResults.put(result.toJson())); 81 82 return jsonResults.toString(); 83 } 84 85 static String insertUser(MongoClient client, String name) throws MongoException { 86 MongoDatabase database = client.getDatabase("test"); 87 MongoCollection<Document> collection = database.getCollection("users"); 88 89 collection.insertOne(new Document().append("name", name)); 90 return "Successfully inserted user: " + name; 91 } 92 }
注意
次のサーバー アプリケーションは Express を使用します は、実行前にプロジェクトに依存関係として追加する必要があります。
1 const express = require('express'); 2 const bodyParser = require('body-parser'); 3 4 // Use the latest drivers by installing & importing them 5 const MongoClient = require('mongodb').MongoClient; 6 7 const app = express(); 8 app.use(bodyParser.json()); 9 app.use(bodyParser.urlencoded({ extended: true })); 10 11 // Use a connection string that lists all hosts 12 // with retryable writes & majority write concern 13 const uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority"; 14 15 const client = new MongoClient(uri, { 16 useNewUrlParser: true, 17 useUnifiedTopology: true 18 }); 19 20 // ----- API routes ----- // 21 app.get('/', (req, res) => res.send('Welcome to my API!')); 22 23 app.get('/users', (req, res) => { 24 const collection = client.db("test").collection("users"); 25 26 collection 27 .find({}) 28 // In this example, 'maxTimeMS' throws an error after 5 seconds, 29 // alerting the application to a lasting network outage 30 .maxTimeMS(5000) 31 .toArray((err, data) => { 32 if (err) { 33 // Handle errors in your application 34 // In this example, by sending the client a message 35 res.send("The request has timed out. Please check your connection and try again."); 36 } 37 return res.json(data); 38 }); 39 }); 40 41 app.post('/users', (req, res) => { 42 const collection = client.db("test").collection("users"); 43 collection.insertOne({ name: req.body.name }) 44 .then(result => { 45 res.send("User successfully added!"); 46 }, err => { 47 // Handle errors in your application 48 // In this example, by sending the client a message 49 res.send("An application error has occurred. Please try again."); 50 }) 51 }); 52 // ----- End of API routes ----- // 53 54 app.listen(3000, () => { 55 console.log(`Listening on port 3000.`); 56 client.connect(err => { 57 if (err) { 58 console.log("Not connected: ", err); 59 process.exit(0); 60 } 61 console.log('Connected.'); 62 }); 63 });