Docs 菜单

Atlas for Resiliency 指导

MongoDB Atlas是一款高性能数据库,无论基础架构中断、系统维护等情况如何,都能保持正常运行时间。使用此页面上的指导来规划设置,以最大限度地提高应用程序和数据库的弹性。

Atlas集群由至少具有三个节点的副本集组成,您可以将节点点数增加到所需的任意奇数节点。Atlas首先将应用程序中的数据写入主节点 (primary node in the replica set)节点,然后Atlas将该数据以增量方式复制并存储到集群内的所有从节点(secondary node from replica set)上。要控制数据存储的持久性,您可以调整应用程序代码的写关注(write concern),仅在一定数量的从节点提交写入后才完成写入。要学习;了解更多信息,请参阅配置读关注和写关注。

默认下, Atlas会将集群节点分布在您所选云提供商的可用区域之一的可用区域内。示例,如果您的集群部署到云提供商地区us-east,则Atlas默认会将节点部署到 us-east-aus-east-bus-east-c

要学习;了解有关跨区域高可用性和节点分布的更多信息,请参阅Atlas高可用性指南。

Atlas集群必须由奇数个节点组成,因为节点池必须选举一个主节点 (primary node in the replica set)节点,应用程序可以直接向该主节点写入和读取该主节点。由偶数个节点组成的集群可能会导致死锁,从而阻止选举主节点 (primary node in the replica set)节点。

如果主节点因基础设施服务中断、维护窗口或其他原因而不可用,Atlas 集群会通过将现有的从节点提升为主节点的角色来进行自我修复,以保持数据库的可用性。要了解更多关于此过程的信息,请参阅 MongoDB Atlas 如何提供高可用性?

在计划维护期间,Atlas 通过以滚动方式一次对一个节点应用更新来保持正常运行时间。在此过程中,Atlas 会在必要时选举新的主节点,就像在任何其他计划外的主节点服务中断期间一样。

当您配置维护窗口时,请选择与应用程序流量最低的时间相对应的时间。

Atlas提供内置工具来监控集群性能、查询性能等。此外, Atlas可轻松与第三方服务集成。

通过主动监控集群,您可以获得有关查询和部署性能的宝贵见解。要学习;了解有关Atlas监控的更多信息,请参阅监控集群以及监控和警报。

您可以模拟需要灾难恢复工作流程的各种场景,以评估您对这些事件的准备程度。具体来说,使用 Atlas,您可以测试主节点故障转移模拟区域性服务中断。我们强烈建议您在将应用程序部署到生产环境之前运行这些测试。

您可以通过启用终止保护来防止意外删除Atlas集群。在利用 Terraform 等 IaC 工具时,启用终止保护尤其重要,以确保重新部署时不会预配新的基础架构。要删除已启用终止保护的集群,必须先禁用终止保护。默认下, Atlas会禁用所有集群的终止保护。

Atlas云备份使用部署集群的云服务提供商的原生快照功能促进云备份存储。示例,如果您在Amazon Web Services上部署集群,则可以选择使用按可配置时间间隔在Amazon Web Services S3 中拍摄的快照备份集群的数据。

要了解有关数据库备份和快照检索的更多信息,请参阅备份集群

有关 Atlas 备份的建议,请参阅 Atlas 备份指导。

要提高集群的弹性,请将集群升级到MongoDB 8.0。MongoDB 8.0 引入了以下与韧性相关的性能改进和新功能:

我们建议您尽可能使用基于应用程序编程语言的最新驾驶员版本构建的连接方法。虽然Atlas提供的默认连接字符串是一个很好的起点,但您可能希望根据特定应用程序和部署架构的上下文对其性能进行调整。

示例,您可能希望为提供登录功能的微服务设立一个简短的 maxTimeMS,而如果应用程序代码是针对以下对象的长时间运行的分析作业请求,则您可能希望将 maxTimeMS设立为大得多的值:集群。

在企业级应用程序部署中,调整连接池设置尤为重要。

打开数据库客户端连接是维护客户端连接池所涉及的资源最密集的过程之一,客户端连接池促进应用程序访问权限Atlas 集群。

因此,值得考虑的是,您希望在特定应用程序的上下文中如何以及何时展开打开客户端连接的进程。

示例,如果您要扩展Atlas 集群以满足用户需求,请考虑应用程序始终需要的最小连接池大小,这样当应用程序池扩展时,打开新客户端连接所带来的额外网络和计算负载就不会受到影响。不要削弱应用程序对增加数据库操作的时间敏感需求。

如果 minPoolSizemaxPoolSize 值相似,则大多数数据库客户端连接会在应用程序初创企业时打开。示例,如果 minPoolSize设立为 10maxPoolSize设立为 12,则应用程序初创企业时会打开 10 个客户端连接,然后在应用程序运行时期间只能再打开 2 个连接。但是,如果您的 minPoolSize设立为 10,并且 maxPoolSize设立为 100,则在应用程序运行时期间,最多可以根据需要打开 90 个额外连接。

与打开新的客户端连接相关的额外网络开销。因此,请考虑是否愿意在应用程序初创企业时产生该网络费用,还是在应用程序运行期间按需动态产生网络成本,这可能会影响最终用户的操作延迟和感知性能(如果存在请求突然激增,需要立即打开大量额外连接。

应用程序的架构是这一考虑因素的核心。示例,如果您将应用程序部署为微服务,请考虑哪些服务应直接调用Atlas ,作为控制连接池动态扩展和收缩的一种方法。或者,如果您的应用程序部署利用的是单线程资源(例如Amazon Web Services Lambda ),则您的应用程序将只能打开和使用一个客户端连接,因此您的 minPoolSizemaxPoolSize 都应设立为 1

应用程序特定于工作负载的查询几乎总是会有所不同,具体取决于它们在 Atlas 中执行所需的时间以及应用程序可以等待响应的时间。

您可以在 Atlas 中全局设置查询超时行为,也可以在查询级别定义该行为。

Atlas支持可重试读取可重试写入操作。启用后, Atlas会重试写入操作一次,以防止间歇性网络中断。

Atlas 集群最终会在所有节点上复制所有数据。不过,您可以配置在报告读取或写入操作成功之前必须在其中复制数据的节点数。您可以在 Atlas 中全局定义读关注写关注,也可以在连接字符串中定义客户端级别的读关注和写关注。Atlas 的默认写关注为 majority,这意味着必须在集群中一半以上的节点之间复制数据,Atlas 才会报告成功。相反,Atlas 的默认读关注级别为 local,这意味着在查询时,Atlas 只从集群中的一个节点检索数据

允许您水平扩展集群。 使用MongoDB,您可以分片某些集合进行分片,同时允许同一集群中的其他集合保持未分片。 创建新数据库时,默认选择分片集群数据量最少的分片作为该数据库的主分片分片。 默认,该数据库的所有未分片集合都位于该主分片分片中。 随着工作负载的增长,这可能会导致主分片分片的流量增加,尤其是当工作负载增长集中在主分片分片上的未分片集合时。

为了更好地分配此工作负载, MongoDB 8.0允许您使用moveCollection命令将未分片集合从主分片分片移动到其他分片。 这允许您将活动、繁忙的集合放置到预期资源使用量较少的分片上。 这样,您就可以:

  • 优化更大、更复杂的工作负载的性能。

  • 实现更好的资源利用率。

  • 在各分片之间更均匀地分配日期。

我们建议在以下情况下隔离您的集合:

  • 如果您的主分片分片由于存在多个高吞吐量未分片集合而经历大量工作负载。

  • 您预计未分片的集合会在未来增长,这可能会成为其他集合的瓶颈。

  • 您正在运行每个集群一个集合的部署设计,并且希望根据优先级或工作负载隔离这些客户。

  • 由于分片上存在大量未分片集合,因此分片上的数据量超过比例。

要学习;了解如何使用mongosh移动未分片的集合,请参阅移动集合。

有关 Atlas 灾难恢复最佳实践的建议,请参阅 Atlas 灾难恢复指导高可用性和恢复的推荐配置

该示例应用程序汇集了以下建议,以确保针对网络中断和故障转移事件的恢复能力:

  • 使用 Atlas 提供的连接字符串,具有可重试写入、多数写关注和默认读关注。

  • 使用maxTimeMS方法指定 optime 限制。 有关如何设置maxTimeMS的说明,请参阅特定的驱动程序文档。

  • 处理重复键和超时的错误。

该应用程序是一个 HTTP API,允许客户端创建或列出用户记录。它公开一个接受 GET 和 POST 请求的端点 http://localhost:3000:

方法
端点
说明

GET

/users

users 集合中获取用户名列表。

POST

/users

要求在请求正文中加入 name。将新用户添加到 users 集合。

注意

以下服务器应用程序使用 NanoHTTPDJSON 在运行之前,您需要将其作为依赖项添加到项目中。

1// File: App.java
2
3import java.util.Map;
4import java.util.logging.Logger;
5
6import org.bson.Document;
7import org.json.JSONArray;
8
9import com.mongodb.MongoException;
10import com.mongodb.client.MongoClient;
11import com.mongodb.client.MongoClients;
12import com.mongodb.client.MongoCollection;
13import com.mongodb.client.MongoDatabase;
14
15import fi.iki.elonen.NanoHTTPD;
16
17public class App extends NanoHTTPD {
18 private static final Logger LOGGER = Logger.getLogger(App.class.getName());
19
20 static int port = 3000;
21 static MongoClient client = null;
22
23 public App() throws Exception {
24 super(port);
25
26 // Replace the uri string with your MongoDB deployment's connection string
27 String uri = "<atlas-connection-string>";
28 client = MongoClients.create(uri);
29
30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false);
31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n");
32 }
33
34 public static void main(String[] args) {
35 try {
36 new App();
37 } catch (Exception e) {
38 LOGGER.severe("Couldn't start server:\n" + e);
39 }
40 }
41
42 @Override
43 public Response serve(IHTTPSession session) {
44 StringBuilder msg = new StringBuilder();
45 Map<String, String> params = session.getParms();
46
47 Method reqMethod = session.getMethod();
48 String uri = session.getUri();
49
50 if (Method.GET == reqMethod) {
51 if (uri.equals("/")) {
52 msg.append("Welcome to my API!");
53 } else if (uri.equals("/users")) {
54 msg.append(listUsers(client));
55 } else {
56 msg.append("Unrecognized URI: ").append(uri);
57 }
58 } else if (Method.POST == reqMethod) {
59 try {
60 String name = params.get("name");
61 if (name == null) {
62 throw new Exception("Unable to process POST request: 'name' parameter required");
63 } else {
64 insertUser(client, name);
65 msg.append("User successfully added!");
66 }
67 } catch (Exception e) {
68 msg.append(e);
69 }
70 }
71
72 return newFixedLengthResponse(msg.toString());
73 }
74
75 static String listUsers(MongoClient client) {
76 MongoDatabase database = client.getDatabase("test");
77 MongoCollection<Document> collection = database.getCollection("users");
78
79 final JSONArray jsonResults = new JSONArray();
80 collection.find().forEach((result) -> jsonResults.put(result.toJson()));
81
82 return jsonResults.toString();
83 }
84
85 static String insertUser(MongoClient client, String name) throws MongoException {
86 MongoDatabase database = client.getDatabase("test");
87 MongoCollection<Document> collection = database.getCollection("users");
88
89 collection.insertOne(new Document().append("name", name));
90 return "Successfully inserted user: " + name;
91 }
92}

注意

以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。

1const express = require('express');
2const bodyParser = require('body-parser');
3
4// Use the latest drivers by installing & importing them
5const MongoClient = require('mongodb').MongoClient;
6
7const app = express();
8app.use(bodyParser.json());
9app.use(bodyParser.urlencoded({ extended: true }));
10
11const uri = "mongodb+srv://<db_username>:<db_password>@cluster0-111xx.mongodb.net/test?retryWrites=true&w=majority";
12
13const client = new MongoClient(uri, {
14 useNewUrlParser: true,
15 useUnifiedTopology: true
16});
17
18// ----- API routes ----- //
19app.get('/', (req, res) => res.send('Welcome to my API!'));
20
21app.get('/users', (req, res) => {
22 const collection = client.db("test").collection("users");
23
24 collection
25 .find({})
26 .maxTimeMS(5000)
27 .toArray((err, data) => {
28 if (err) {
29 res.send("The request has timed out. Please check your connection and try again.");
30 }
31 return res.json(data);
32 });
33});
34
35app.post('/users', (req, res) => {
36 const collection = client.db("test").collection("users");
37 collection.insertOne({ name: req.body.name })
38 .then(result => {
39 res.send("User successfully added!");
40 }, err => {
41 res.send("An application error has occurred. Please try again.");
42 })
43});
44// ----- End of API routes ----- //
45
46app.listen(3000, () => {
47 console.log(`Listening on port 3000.`);
48 client.connect(err => {
49 if (err) {
50 console.log("Not connected: ", err);
51 process.exit(0);
52 }
53 console.log('Connected.');
54 });
55});

注意

以下 Web应用程序使用 FastAPI 。要创建新应用程序,请使用 FastAPI示例文件 结构。

1# File: main.py
2
3from fastapi import FastAPI, Body, Request, Response, HTTPException, status
4from fastapi.encoders import jsonable_encoder
5
6from typing import List
7from models import User
8
9import pymongo
10from pymongo import MongoClient
11from pymongo import errors
12
13# Replace the uri string with your |service| connection string
14uri = "<atlas-connection-string>"
15db = "test"
16
17app = FastAPI()
18
19@app.on_event("startup")
20def startup_db_client():
21 app.mongodb_client = MongoClient(uri)
22 app.database = app.mongodb_client[db]
23
24@app.on_event("shutdown")
25def shutdown_db_client():
26 app.mongodb_client.close()
27
28##### API ROUTES #####
29@app.get("/users", response_description="List all users", response_model=List[User])
30def list_users(request: Request):
31 try:
32 users = list(request.app.database["users"].find().max_time_ms(5000))
33 return users
34 except pymongo.errors.ExecutionTimeout:
35 raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="The request has timed out. Please check your connection and try again.")
36
37@app.post("/users", response_description="Create a new user", status_code=status.HTTP_201_CREATED)
38def new_user(request: Request, user: User = Body(...)):
39 user = jsonable_encoder(user)
40 try:
41 new_user = request.app.database["users"].insert_one(user)
42 return {"message":"User successfully added!"}
43 except pymongo.errors.DuplicateKeyError:
44 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not create user due to existing '_id' value in the collection. Try again with a different '_id' value.")