Docs 菜单
Docs 主页
/
MongoDB Ops Manager
/

使用 MongoDB 构建弹性应用程序

在此页面上

  • 安装最新驱动程序
  • 连接字符串(Connection Strings)
  • 可重试写入和读取
  • 写关注和读关注
  • Error Handling
  • 弹性示例应用程序

要编写能够利用 MongoDB 功能并妥善处理副本集选举的应用程序代码,您应该:

  • 安装最新的驱动程序。

  • 使用指定所有主机的连接字符串。

  • 使用可重试写入和可重试读取。

  • 使用对您的应用程序有意义的 majority 写关注和读关注。

  • 处理应用程序中的错误。

首先,从 MongoDB 驱动程序安装适合您的语言的最新驱动程序。驱动程序将查询从应用程序连接并中继到数据库。使用最新的驱动程序可以使用最新的 MongoDB 功能。

然后,在应用程序中,导入依赖项:

如果使用 Maven ,将以下内容添加到pom.xml 依赖项列表中:

<dependencies>
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver-sync</artifactId>
<version>4.0.1</version>
</dependency>
</dependencies>

如果使用 Gradle,则将以下内容添加到 build.gradle 依赖项列表中:

dependencies {
compile 'org.mongodb:mongodb-driver-sync:4.0.1'
}
// Latest 'mongodb' version installed with npm
const MongoClient = require('mongodb').MongoClient;

使用指定部署中所有主机的连接string将应用程序连接到数据库。 如果您的部署执行副本集选举并选举了新的主节点,则指定部署中所有主机的连接string会在没有应用程序逻辑的情况下发现新的主节点。

您可以使用以下任一方法指定部署中的所有主机:

连接string还可以指定选项,特别是retryWriteswriteConcern。

提示

另请参阅:

有关格式化连接字符串的帮助,请参阅使用 MongoDB 驱动程序连接到部署。

使用连接字符串在应用程序中实例化 MongoDB 客户端:

// Create a variable for your connection string
String uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]";
// Instantiate the MongoDB client with the URI
MongoClient client = MongoClients.create(uri);
// Create a variable for your connection string
const uri = "mongodb://[<username>:<password>@]hostname0<:port>[,hostname1:<port1>][,hostname2:<port2>][...][,hostnameN:<portN>]";
// Instantiate the MongoDB client with the URI
const client = new MongoClient(uri, {
useNewUrlParser: true,
useUnifiedTopology: true
});

注意

从 MongoDB 3.6版本开始以及4.2兼容驱动程序,MongoDB 默认会重试一次写入和读取操作。

如果某些写入操作失败,则使用可重试写入重试一次这些操作。

重试写入一次是处理暂时性网络错误和副本集选举(在此类错误中,应用程序暂时无法找到正常主节点)的最佳策略。 如果重试成功,则整个操作成功,并且不会返回错误。 如果操作失败,原因可能是:

  • 持续的网络错误,或

  • 无效命令。

提示

另请参阅:

有关启用可重试写入的更多信息,请参阅启用可重试写入。

当操作失败时,应用程序需要自行处理错误

如果读取操作在 MongoDB 3.6版本中和使用4.2兼容驱动程序中启动失败,则会自动重试一次。 您无需将应用程序配置为重试读取。

可以使用写关注和读关注来调整应用程序的一致性和可用性。更严格的关注意味着数据库操作需要等待更强的数据一致性保证,而宽松的一致性要求则提供更高的可用性。

例子

如果您的应用程序处理货币余额,则一致性极为重要。 您可以使用majority写关注和读关注(read concern)来确保您永远不会读取过时的数据或可能回滚的数据。

或者,如果您的应用程序每秒记录来自数百个传感器的温度数据,您可能不会担心读取的数据是否不包括最新读数。 您可以放宽一致性要求,以更快地访问该数据。

您可以通过连接string URI 设置副本集的 写关注级别 。使用majority写关注确保您的数据成功写入数据库并持久保存。 这是推荐的默认值,对于大多数使用案例来说,这是足够的。

当您使用需要确认的写关注(例如 majority)时,您还可以指定写入达到该确认级别的最大时间限制:

  • 用于所有写入的 wtimeoutMS 连接字符串参数,或

  • 用于单次写入操作的 wtimeout 选项。

是否使用时间限制以及使用的值取决于应用程序上下文。

提示

另请参阅:

有关设置写关注级别的更多信息,请参阅写关注选项。

重要

如果没有指定写入时间限制,并且写关注(write concern)级别为无法实现,则写入操作将永远无法完成。

您可以通过连接字符串 URI 设置副本集的读关注级别。理想的读关注取决于应用程序要求,但默认值对于大多数使用案例来说就足够了。使用默认读关注不需要连接字符串参数。

指定读关注(read concern)可以提高对应用程序从数据库接收的数据的保证。

提示

另请参阅:

有关设置读关注级别的更多信息,请参阅读关注选项。

注意

应用程序使用的写关注和读关注的特定组合会影响操作顺序保证。这称为因果一致性。有关因果一致性保证的更多信息,请参阅因果一致性和读写关注。

重试写入未处理的无效命令、网络中断和网络错误都会返回错误。有关错误详细信息,请参阅驱动程序的 API文档。

例如,如果应用程序尝试插入具有重复_id的文档,您的驱动程序将返回错误,其中包括:

Unable to insert due to an error: com.mongodb.MongoWriteException:
E11000 duplicate key error collection: <db>.<collection> ...
{
"name": : "MongoError",
"message": "E11000 duplicate key error collection on: <db>.<collection> ... ",
...
}

如果没有正确的错误处理,错误可能会阻止应用程序处理请求,直到重新启动为止。

您的应用程序应处理错误,而不会崩溃或产生副作用。 在前面的应用程序插入重复_id的示例中,该应用程序可以按如下方式处理错误:

// Declare a logger instance from java.util.logging.Logger
private static final Logger LOGGER = ...
...
try {
InsertOneResult result = collection.insertOne(new Document()
.append("_id", 1)
.append("body", "I'm a goofball trying to insert a duplicate _id"));
// Everything is OK
LOGGER.info("Inserted document id: " + result.getInsertedId());
// Refer to the API documentation for specific exceptions to catch
} catch (MongoException me) {
// Report the error
LOGGER.severe("Failed due to an error: " + me);
}
...
collection.insertOne({
_id: 1,
body: "I'm a goofball trying to insert a duplicate _id"
})
.then(result => {
response.sendStatus(200) // send "OK" message to the client
},
err => {
response.sendStatus(400); // send "Bad Request" message to the client
});

此示例中的插入操作在第二次调用时会引发“重复键”错误,因为_id字段必须是唯一的。 应用程序捕获错误,通知客户端,然后应用继续运行。 但是,插入操作失败,您可以决定是否向用户显示消息、重试操作或执行其他操作。

您应该始终记录错误。进一步处理错误的常见策略包括:

  • 将错误返回给客户端,并显示错误消息。 当您无法解决错误并且需要通知用户操作无法完成时,这是一个很好的策略。

  • 写入备份数据库。 当您无法解决错误但又不想冒丢失请求数据的风险时,这是一个很好的策略。

  • 单次默认重试之后重试该操作。如果您可以通过编程方式解决错误原因,请重试,这是一个很好的策略。

您必须为应用程序上下文选择最佳策略。

例子

在重复键错误的示例中,您应该记录错误但不要重试操作,因为它永远不会成功。相反,您可以写入回退数据库并稍后查看该数据库的内容,以确保不会丢失任何信息。用户无需执行任何其他操作,数据就会被记录下来,因此您可以选择不向客户端发送错误消息。

当操作永远无法完成并阻止应用程序执行新操作时,返回错误可能是理想行为。 您可以使用maxTimeMS方法对单个操作设置时间限制,如果超过该时间限制,则返回错误供应用程序进行处理。

对每个操作设置的时间限制取决于该操作的上下文。

例子

如果您的应用程序读取并显示 inventory 集合中的简单产品信息,您可以确信这些读取操作只需要一点时间。查询长时间运行表明一直存在网络问题。将该操作的 maxTimeMS 设置为 5000(即 5 秒)意味着,一旦您确信存在网络问题,应用程序就会收到反馈。

以下示例应用程序汇集了构建弹性应用程序的建议。

该应用程序是一个简单的用户记录 API ,它在 http://localhost: 上公开两个端点:3000

方法
端点
说明
GET
/users
users 集合中获取用户名列表。
POST
/users
要求在请求正文中加入 name。将新用户添加到 users 集合。

注意

以下服务器应用程序使用 NanoHTTPD json 在运行之前,您需要将其作为依赖项添加到项目中。

1// File: App.java
2
3import java.util.Map;
4import java.util.logging.Logger;
5
6import org.bson.Document;
7import org.json.JSONArray;
8
9import com.mongodb.MongoException;
10import com.mongodb.client.MongoClient;
11import com.mongodb.client.MongoClients;
12import com.mongodb.client.MongoCollection;
13import com.mongodb.client.MongoDatabase;
14
15import fi.iki.elonen.NanoHTTPD;
16
17public class App extends NanoHTTPD {
18 private static final Logger LOGGER = Logger.getLogger(App.class.getName());
19
20 static int port = 3000;
21 static MongoClient client = null;
22
23 public App() throws Exception {
24 super(port);
25
26 // Replace the uri string with your MongoDB deployment's connection string
27 String uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority";
28 client = MongoClients.create(uri);
29
30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n");
32 }
33
34 public static void main(String[] args) {
35 try {
36 new App();
37 } catch (Exception e) {
38 LOGGER.severe("Couldn't start server:\n" + e);
39 }
40 }
41
42 @Override
43 public Response serve(IHTTPSession session) {
44 StringBuilder msg = new StringBuilder();
45 Map<String, String> params = session.getParms();
46
47 Method reqMethod = session.getMethod();
48 String uri = session.getUri();
49
50 if (Method.GET == reqMethod) {
51 if (uri.equals("/")) {
52 msg.append("Welcome to my API!");
53 } else if (uri.equals("/users")) {
54 msg.append(listUsers(client));
55 } else {
56 msg.append("Unrecognized URI: ").append(uri);
57 }
58 } else if (Method.POST == reqMethod) {
59 try {
60 String name = params.get("name");
61 if (name == null) {
62 throw new Exception("Unable to process POST request: 'name' parameter required");
63 } else {
64 insertUser(client, name);
65 msg.append("User successfully added!");
66 }
67 } catch (Exception e) {
68 msg.append(e);
69 }
70 }
71
72 return newFixedLengthResponse(msg.toString());
73 }
74
75 static String listUsers(MongoClient client) {
76 MongoDatabase database = client.getDatabase("test");
77 MongoCollection<Document> collection = database.getCollection("users");
78
79 final JSONArray jsonResults = new JSONArray();
80 collection.find().forEach((result) -> jsonResults.put(result.toJson()));
81
82 return jsonResults.toString();
83 }
84
85 static String insertUser(MongoClient client, String name) throws MongoException {
86 MongoDatabase database = client.getDatabase("test");
87 MongoCollection<Document> collection = database.getCollection("users");
88
89 collection.insertOne(new Document().append("name", name));
90 return "Successfully inserted user: " + name;
91 }
92}

注意

以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。

1const express = require('express');
2const bodyParser = require('body-parser');
3
4// Use the latest drivers by installing & importing them
5const MongoClient = require('mongodb').MongoClient;
6
7const app = express();
8app.use(bodyParser.json());
9app.use(bodyParser.urlencoded({ extended: true }));
10
11// Use a connection string that lists all hosts
12// with retryable writes & majority write concern
13const uri = "mongodb://<username>:<password>@hostname0:27017,hostname1:27017,hostname2:27017/?retryWrites=true&w=majority";
14
15const client = new MongoClient(uri, {
16 useNewUrlParser: true,
17 useUnifiedTopology: true
18});
19
20// ----- API routes ----- //
21app.get('/', (req, res) => res.send('Welcome to my API!'));
22
23app.get('/users', (req, res) => {
24 const collection = client.db("test").collection("users");
25
26 collection
27 .find({})
28 // In this example, 'maxTimeMS' throws an error after 5 seconds,
29 // alerting the application to a lasting network outage
30 .maxTimeMS(5000)
31 .toArray((err, data) => {
32 if (err) {
33 // Handle errors in your application
34 // In this example, by sending the client a message
35 res.send("The request has timed out. Please check your connection and try again.");
36 }
37 return res.json(data);
38 });
39});
40
41app.post('/users', (req, res) => {
42 const collection = client.db("test").collection("users");
43 collection.insertOne({ name: req.body.name })
44 .then(result => {
45 res.send("User successfully added!");
46 }, err => {
47 // Handle errors in your application
48 // In this example, by sending the client a message
49 res.send("An application error has occurred. Please try again.");
50 })
51});
52// ----- End of API routes ----- //
53
54app.listen(3000, () => {
55 console.log(`Listening on port 3000.`);
56 client.connect(err => {
57 if (err) {
58 console.log("Not connected: ", err);
59 process.exit(0);
60 }
61 console.log('Connected.');
62 });
63});

后退

审核事件