使用 MongoDB 构建弹性应用程序
要编写能够利用 MongoDB 功能并妥善处理副本集选举的应用程序代码,您应该:
安装最新的驱动程序。
使用指定所有主机的连接字符串。
使用可重试写入和可重试读取。
使用对您的应用程序有意义的
majority
写关注和读关注。处理应用程序中的错误。
安装最新驱动程序
首先,从 MongoDB 驱动程序安装适合您的语言的最新驱动程序。驱动程序将查询从应用程序连接并中继到数据库。使用最新的驱动程序可以使用最新的 MongoDB 功能。
然后,在应用程序中,导入依赖项:
// Latest 'mongodb' version installed with npm const MongoClient = require('mongodb').MongoClient;
连接字符串(Connection Strings)
使用指定部署中所有主机的连接string将应用程序连接到数据库。 如果您的部署执行副本集选举并选举了新的主节点,则指定部署中所有主机的连接string会在没有应用程序逻辑的情况下发现新的主节点。
您可以使用以下任一方法指定部署中的所有主机:
连接string还可以指定选项,特别是retryWrites和writeConcern。
使用连接字符串在应用程序中实例化 MongoDB 客户端:
// Create a variable for your connection string String uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]"; // Instantiate the MongoDB client with the URI MongoClient client = MongoClients.create(uri);
// Create a variable for your connection string const uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]"; // Instantiate the MongoDB client with the URI const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
可重试写入和读取
注意
从 MongoDB 3.6版本开始以及4.2兼容驱动程序,MongoDB 默认会重试一次写入和读取操作。
可重试写入 (Retryable Writes)
如果某些写入操作失败,则使用可重试写入重试一次这些操作。
重试写入一次是处理暂时性网络错误和副本集选举(在此类错误中,应用程序暂时无法找到正常主节点)的最佳策略。 如果重试成功,则整个操作成功,并且不会返回错误。 如果操作失败,原因可能是:
持续的网络错误,或
无效命令。
当操作失败时,应用程序需要自行处理错误。
可重试读取
如果读取操作在 MongoDB 3.6版本中和使用4.2兼容驱动程序中启动失败,则会自动重试一次。 您无需将应用程序配置为重试读取。
写关注和读关注
可以使用写关注和读关注来调整应用程序的一致性和可用性。更严格的关注意味着数据库操作需要等待更强的数据一致性保证,而宽松的一致性要求则提供更高的可用性。
例子
如果您的应用程序处理货币余额,则一致性极为重要。 您可以使用majority
写关注和读关注(read concern)来确保您永远不会读取过时的数据或可能回滚的数据。
或者,如果您的应用程序每秒记录来自数百个传感器的温度数据,您可能不会担心读取的数据是否不包括最新读数。 您可以放宽一致性要求,以更快地访问该数据。
写关注
您可以通过连接string URI 设置副本集的 写关注级别 。使用majority
写关注确保您的数据成功写入数据库并持久保存。 这是推荐的默认值,对于大多数使用案例来说,这是足够的。
当您使用需要确认的写关注(例如 majority
)时,您还可以指定写入达到该确认级别的最大时间限制:
用于所有写入的 wtimeoutMS 连接字符串参数,或
用于单次写入操作的 wtimeout 选项。
是否使用时间限制以及使用的值取决于应用程序上下文。
重要
如果没有指定写入时间限制,并且写关注(write concern)级别为无法实现,则写入操作将永远无法完成。
读关注 (read concern)
您可以通过连接string URI副本集的 读关注(read concern)级别 。理想的读关注(read concern)取决于应用程序要求,但默认对于大多数使用案例来说就足够了。 使用默认读关注不需要连接string参数。
指定读关注(read concern)可以提高对应用程序从数据库接收的数据的保证。
注意
应用程序使用的写入关注和读关注(read concern)的特定组合会影响操作顺序ACID 一致性保证。 这称为因果一致性。 有关因果一致性ACID 一致性保证的更多信息,请参阅因果一致性和读写关注。
Error Handling
无效的命令、网络服务中断以及未经可重试写入处理的网络错误会返回错误。有关错误详情,请参阅驱动程序的 API 文档。
例如,如果应用程序尝试插入具有重复_id
的文档,您的驱动程序将返回错误,其中包括:
Unable to insert due to an error: com.mongodb.MongoWriteException: E11000 duplicate key error collection: <db>.<collection> ...
{ "name": : "MongoError", "message": "E11000 duplicate key error collection on: <db>.<collection> ... ", ... }
如果没有正确的错误处理,错误可能会阻止应用程序处理请求,直到重新启动为止。
您的应用程序应处理错误,而不会崩溃或产生副作用。 在前面的应用程序插入重复_id
的示例中,该应用程序可以按如下方式处理错误:
// Declare a logger instance from java.util.logging.Logger private static final Logger LOGGER = ... ... try { InsertOneResult result = collection.insertOne(new Document() .append("_id", 1) .append("body", "I'm a goofball trying to insert a duplicate _id")); // Everything is OK LOGGER.info("Inserted document id: " + result.getInsertedId()); // Refer to the API documentation for specific exceptions to catch } catch (MongoException me) { // Report the error LOGGER.severe("Failed due to an error: " + me); }
... collection.insertOne({ _id: 1, body: "I'm a goofball trying to insert a duplicate _id" }) .then(result => { response.sendStatus(200) // send "OK" message to the client }, err => { response.sendStatus(400); // send "Bad Request" message to the client });
此示例中的插入操作在第二次调用时会引发“重复键”错误,因为_id
字段必须是唯一的。 应用程序捕获错误,通知客户端,然后应用继续运行。 但是,插入操作失败,您可以决定是否向用户显示消息、重试操作或执行其他操作。
您应该始终记录错误。进一步处理错误的常见策略包括:
将错误返回给客户端,并显示错误消息。 当您无法解决错误并且需要通知用户操作无法完成时,这是一个很好的策略。
写入备份数据库。 当您无法解决错误但又不想冒丢失请求数据的风险时,这是一个很好的策略。
在单次默认重试之后重试该操作。如果您可以通过编程方式解决错误原因,请重试,这是一个很好的策略。
您必须为应用程序上下文选择最佳策略。
例子
在重复键错误的示例中,您应该记录错误但不要重试操作,因为它永远不会成功。相反,您可以写入回退数据库并稍后查看该数据库的内容,以确保不会丢失任何信息。用户无需执行任何其他操作,数据就会被记录下来,因此您可以选择不向客户端发送错误消息。
规划网络错误
当操作永远无法完成并阻止应用程序执行新操作时,返回错误可能是理想行为。 您可以使用maxTimeMS方法对单个操作设置时间限制,如果超过该时间限制,则返回错误供应用程序进行处理。
对每个操作设置的时间限制取决于该操作的上下文。
例子
如果您的应用程序读取并显示 inventory
集合中的简单产品信息,您可以确信这些读取操作只需要一点时间。查询长时间运行表明一直存在网络问题。将该操作的 maxTimeMS
设置为 5000(即 5 秒)意味着,一旦您确信存在网络问题,应用程序就会收到反馈。
弹性示例应用程序
以下示例应用程序汇集了构建弹性应用程序的建议。
该应用程序是一个简单的用户记录 API ,它在 http://localhost: 上公开两个端点:3000
方法 | 端点 | 说明 |
---|---|---|
GET | /users | 从 users 集合中获取用户名列表。 |
POST | /users | 要求在请求正文中加入 name 。将新用户添加到 users 集合。 |
1 // File: App.java 2 3 import java.util.Map; 4 import java.util.logging.Logger; 5 6 import org.bson.Document; 7 import org.json.JSONArray; 8 9 import com.mongodb.MongoException; 10 import com.mongodb.client.MongoClient; 11 import com.mongodb.client.MongoClients; 12 import com.mongodb.client.MongoCollection; 13 import com.mongodb.client.MongoDatabase; 14 15 import fi.iki.elonen.NanoHTTPD; 16 17 public class App extends NanoHTTPD { 18 private static final Logger LOGGER = Logger.getLogger(App.class.getName()); 19 20 static int port = 3000; 21 static MongoClient client = null; 22 23 public App() throws Exception { 24 super(port); 25 26 // Replace the uri string with your MongoDB deployment's connection string 27 String uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority"; 28 client = MongoClients.create(uri); 29 30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); 31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n"); 32 } 33 34 public static void main(String[] args) { 35 try { 36 new App(); 37 } catch (Exception e) { 38 LOGGER.severe("Couldn't start server:\n" + e); 39 } 40 } 41 42 43 public Response serve(IHTTPSession session) { 44 StringBuilder msg = new StringBuilder(); 45 Map<String, String> params = session.getParms(); 46 47 Method reqMethod = session.getMethod(); 48 String uri = session.getUri(); 49 50 if (Method.GET == reqMethod) { 51 if (uri.equals("/")) { 52 msg.append("Welcome to my API!"); 53 } else if (uri.equals("/users")) { 54 msg.append(listUsers(client)); 55 } else { 56 msg.append("Unrecognized URI: ").append(uri); 57 } 58 } else if (Method.POST == reqMethod) { 59 try { 60 String name = params.get("name"); 61 if (name == null) { 62 throw new Exception("Unable to process POST request: 'name' parameter required"); 63 } else { 64 insertUser(client, name); 65 msg.append("User successfully added!"); 66 } 67 } catch (Exception e) { 68 msg.append(e); 69 } 70 } 71 72 return newFixedLengthResponse(msg.toString()); 73 } 74 75 static String listUsers(MongoClient client) { 76 MongoDatabase database = client.getDatabase("test"); 77 MongoCollection<Document> collection = database.getCollection("users"); 78 79 final JSONArray jsonResults = new JSONArray(); 80 collection.find().forEach((result) -> jsonResults.put(result.toJson())); 81 82 return jsonResults.toString(); 83 } 84 85 static String insertUser(MongoClient client, String name) throws MongoException { 86 MongoDatabase database = client.getDatabase("test"); 87 MongoCollection<Document> collection = database.getCollection("users"); 88 89 collection.insertOne(new Document().append("name", name)); 90 return "Successfully inserted user: " + name; 91 } 92 }
注意
以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。
1 const express = require('express'); 2 const bodyParser = require('body-parser'); 3 4 // Use the latest drivers by installing & importing them 5 const MongoClient = require('mongodb').MongoClient; 6 7 const app = express(); 8 app.use(bodyParser.json()); 9 app.use(bodyParser.urlencoded({ extended: true })); 10 11 // Use a connection string that lists all hosts 12 // with retryable writes & majority write concern 13 const uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority"; 14 15 const client = new MongoClient(uri, { 16 useNewUrlParser: true, 17 useUnifiedTopology: true 18 }); 19 20 // ----- API routes ----- // 21 app.get('/', (req, res) => res.send('Welcome to my API!')); 22 23 app.get('/users', (req, res) => { 24 const collection = client.db("test").collection("users"); 25 26 collection 27 .find({}) 28 // In this example, 'maxTimeMS' throws an error after 5 seconds, 29 // alerting the application to a lasting network outage 30 .maxTimeMS(5000) 31 .toArray((err, data) => { 32 if (err) { 33 // Handle errors in your application 34 // In this example, by sending the client a message 35 res.send("The request has timed out. Please check your connection and try again."); 36 } 37 return res.json(data); 38 }); 39 }); 40 41 app.post('/users', (req, res) => { 42 const collection = client.db("test").collection("users"); 43 collection.insertOne({ name: req.body.name }) 44 .then(result => { 45 res.send("User successfully added!"); 46 }, err => { 47 // Handle errors in your application 48 // In this example, by sending the client a message 49 res.send("An application error has occurred. Please try again."); 50 }) 51 }); 52 // ----- End of API routes ----- // 53 54 app.listen(3000, () => { 55 console.log(`Listening on port 3000.`); 56 client.connect(err => { 57 if (err) { 58 console.log("Not connected: ", err); 59 process.exit(0); 60 } 61 console.log('Connected.'); 62 }); 63 });