使用 MongoDB Atlas 构建弹性应用程序
在此页面上
在构建关键任务应用程序时,为生产中可能发生的意外事件做好准备非常重要。 这包括意外的慢速查询、缺失索引或工作负载量急剧增加。
MongoDB Atlas为您提供开箱即用的功能,帮助您构建弹性应用程序,让您能够主动做好准备,并对情况做出被动响应。 要构建弹性应用程序,我们建议您使用以下集群韧性以及应用程序和客户端最佳实践来配置MongoDB 部署。
集群弹性
要提高集群的弹性,请将集群升级到MongoDB 8.0 。 MongoDB 8.0引入了以下与韧性相关的性能改进和新功能:
操作拒绝过滤器,以被动方式缓解昂贵的查询
集群级超时,可主动防范代价高昂的读取操作
使用moveCollection 命令实现更好的工作负载隔离性
改进内存管理
为了在生产中安全地运行应用程序,请务必确保内存利用率留出空间。 如果节点耗尽可用内存,则可能容易受到 Linux Out of Memory Killer 的攻击 终止mongod
进程。
MongoDB 8.0会自动为所有部署使用升级后的 TCMalloc ,从而减少平均内存碎片随时间的增长。 这种较低的碎片化提高了峰值负载期间的操作稳定性,从而总体提高了内存利用率。
操作拒绝过滤器
如果不及时处理,无意中的资源密集型操作可能会导致生产问题。
MongoDB 8.0允许您使用操作拒绝筛选器来最大限度地减少这些操作的影响。 操作拒绝筛选器允许您将MongoDB配置为拒绝查询运行,直到您重新启用具有该查询结构的查询。
换句话说,一旦识别出慢速查询,您无需等待应用程序团队修复其查询以控制慢速查询的影响。 相反,一旦在查询分析器、实时性能面板或查询日志中发现性能不佳的查询,就可以对该查询结构设立拒绝过滤。 然后, MongoDB会阻止执行该传入查询结构的新实例。 修复查询后,您可以重新启用查询结构。
如果要执行以下操作,则应使用操作拒绝过滤:
在修复过程中,快速遏制慢查询的影响。
在过载期间,通过拒绝不太重要的查询来优先处理更重要的工作负载。
如果集群接近最大资源利用率,请给集群时间进行恢复。
在Atlas用户界面中识别并拒绝慢速查询
要在Atlas用户界面中使用操作拒绝过滤,请执行以下操作:
AtlasGoClusters在Atlas中,Go项目的 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含所需项目的组织。
如果尚未显示,请从导航栏的Projects菜单中选择所需的项目。
如果尚未出现,请单击侧边栏中的 Clusters(集群)。
会显示集群页面。
拒绝特定查询结构的操作。
在db.adminCommand()
函数中使用setQuerySettings
。
有关示例,请参阅使用操作拒绝筛选器阻止慢速查询。
在拒绝或超时后监控查询
随后,您可以在“指标”标签页中监控查询的运行情况:
读取操作的集群级超时
在查询投入生产之前,请务必确保您的开发进程仔细考虑查询的效率。 异常可能总会发生,但主动缓解低效查询有助于防止集群性能问题。
使用MongoDB 8.0 ,您可以通过进入集群的服务器端defaultMaxTimeMS
来保护查询免受未索引操作的影响。 如果操作超过此超时时间, MongoDB会取消该操作,以防止查询运行时间过长并占用资源。 这样您就可以:
将设置超时的责任从单个应用程序团队转移到专注于数据库的团队。
如果查询缺少索引,则尽量减少集合扫描的影响。
对投入生产的昂贵操作进行最后一轮缓解。
如果您有需要不同超时的查询(例如分析查询),则可以通过使用maxTimeMS方法设置操作级超时来覆盖这些查询。
在Atlas Administration API中设置读取操作的默认超时
要通过Atlas Administration API设立defaultMaxTimeMS
参数,请参阅更新一个集群的高级配置选项。
在Atlas用户界面中设置读取操作的默认超时
要在Atlas用户界面中设立defaultMaxTimeMS
参数,请执行以下操作:
导航到您的配置选项。
如果您已有集群,请导航至“编辑集群”页面。
如果要创建新集群,请从Select a version下拉列表中选择MongoDB 8.0 。
单击 Additional Settings(连接)。
向下滚动并单击More Configuration Options 。
将 切换为Default Timeout for Read Operations Yes。
要查看已终止操作的行为,请参阅在拒绝或超时后监控查询。 要学习;了解更多信息,请参阅defaultMaxTimeMS
和设置读取操作的默认超时。
隔离繁忙、未分片集合的影响
分片允许您水平扩展集群。 使用MongoDB,您可以分片某些集合进行分片,同时允许同一集群中的其他集合保持未分片。 创建新数据库时,默认选择分片集群数据量最少的分片作为该数据库的主分片分片。 默认,该数据库的所有未分片集合都位于该主分片分片中。 随着工作负载的增长,这可能会导致主分片分片的流量增加,尤其是当工作负载增长集中在主分片分片上的未分片集合时。
为了更好地分配此工作负载, MongoDB 8.0允许您使用moveCollection
命令将未分片集合从主分片分片移动到其他分片。 这允许您将活动、繁忙的集合放置到预期资源使用量较少的分片上。 这样,您就可以:
优化更大、更复杂的工作负载的性能。
实现更好的资源利用率。
在各分片之间更均匀地分配日期。
我们建议在以下情况下隔离您的集合:
如果您的主分片分片由于存在多个高吞吐量未分片集合而经历大量工作负载。
您预计未分片的集合会在未来增长,这可能会成为其他集合的瓶颈。
您正在运行每个集群一个集合的部署设计,并且希望根据优先级或工作负载隔离这些客户。
由于分片上存在大量未分片集合,因此分片上的数据量超过比例。
应用程序和客户端最佳实践
您可以配置 MongoDB 部署和驱动程序库的功能,以创建能够承受网络中断和故障转移事件的弹性应用程序。要编写能充分利用 MongoDB Atlas 始终在线功能的应用程序代码,应该执行以下任务:
使用对您的应用程序有意义的
majority
写关注(write concern)和读关注(read concern)。处理应用程序中的错误。
安装最新驱动程序
从MongoDB驱动程序安装您语言的最新驱动程序。 驱动程序将查询从应用程序程序连接并中继到数据库。 使用最新的驱动程序可启用最新的MongoDB功能。
然后,在应用程序中,导入依赖项:
如果使用 Maven,请将以下内容添加到您的 pom.xml
依赖项列表中:
<dependencies> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongodb-driver-sync</artifactId> <version>4.0.1</version> </dependency> </dependencies>
如果使用 Gradle,则将以下内容添加到 build.gradle
依赖项列表中:
dependencies { compile 'org.mongodb:mongodb-driver-sync:4.0.1' }
// Latest 'mongodb' version installed with npm const MongoClient = require('mongodb').MongoClient;
# Install the latest 'pymongo' version with pip and # import MongoClient from the package to establish a connection. from pymongo import MongoClient
连接字符串(Connection Strings)
注意
Atlas提供了预配置的连接string 。 有关复制预配置string的步骤,请参阅Atlas 提供的连接字符串。
使用指定 Atlas 集群中所有节点的连接字符串将应用程序连接到数据库。如果您的集群执行副本集选举并选出了新的主节点,则指定集群中所有节点的连接字符串将在没有应用程序逻辑的情况下发现新的主节点。
您可以使用以下任一种方法指定集群中的所有节点:
DNS 种子列表连接格式(推荐使用 Atlas)。
连接string还可以指定选项,特别是retryWrites和writeConcern。
Atlas 可以使用专用端点服务中的负载均衡器为分片集群生成优化的 SRV 连接字符串。当您使用优化的连接字符串时,Atlas 会限制应用程序与分片集群之间每个mongos
的连接数。每个mongos
的受限连接数可提高连接计数高峰期间的性能。
要了解有关为私有端点后的分片集群优化连接字符串的更多信息,请参阅提高私有端点后分片集群的连接性能。
Atlas 提供的连接字符串
如果您从 Atlas 集群界面复制连接字符串,则会为您的集群预先配置连接字符串,使用 DNS 种子列表格式,并包括建议的 retryWrites
和 w
(写关注)选项以实现韧性。
要从 Atlas 复制您的连接字符串 URI:
AtlasGoClusters在Atlas中,Go项目的 页面。
如果尚未显示,请从导航栏上的 Organizations 菜单中选择包含所需项目的组织。
如果尚未显示,请从导航栏的Projects菜单中选择所需的项目。
如果尚未出现,请单击侧边栏中的 Clusters(集群)。
会显示集群页面。
复制连接string URI。
将连接字符串或完整驱动程序示例复制到应用程序代码中。必须提供数据库用户凭证。
注意
本指南通过连接 使用SCRAM 身份验证 string。要学习;了解如何使用 X. 509证书进行身份验证,请参阅X. 509 。
使用连接字符串在应用程序中实例化 MongoDB 客户端:
// Copy the connection string provided by Atlas String uri = <your Atlas connection string>; // Instantiate the MongoDB client with the URI MongoClient client = MongoClients.create(uri);
// Copy the connection string provided by Atlas const uri = <your Atlas connection string>; // Instantiate the MongoDB client with the URI const client = new MongoClient(uri, { useNewUrlParser: true, useUnifiedTopology: true });
# Copy the connection string provided by Atlas uri = <your Atlas connection string> # Pass your connection string URI to the MongoClient constructor client = MongoClient(uri)
可重试写入和读取
注意
从 MongoDB 4.0版本开始以及4.2兼容驱动程序,MongoDB 默认会重试一次写入和读取操作。
可重试写入 (Retryable Writes)
使用可重试写入功能,可在某些写入操作失败时重试一次。 如果您从 复制了连接string Atlas,则其中包括"retryWrites=true"
。如果您提供自己的连接string ,请将 "retryWrites=true"
作为查询参数。
重试一次写入是处理暂时性网络错误以及在副本集选举期间应用程序暂时无法找到健康主节点的最佳策略。如果重试成功,则整个操作成功,并且不会返回任何错误。如果操作失败,原因可能是:
持久的网络错误
无效的命令
当操作失败时,应用程序需要自行处理错误。
可重试读取
如果读取操作在 MongoDB 4.0版本中和使用4.2兼容驱动程序中启动失败,则会自动重试一次。 无需其他配置即可重试读取。
写关注和读关注
可以使用写关注和读关注来调整应用程序的一致性和可用性。更严格的关注意味着数据库操作需要等待更强的数据一致性保证,而宽松的一致性要求则提供更高的可用性。
例子
如果您的应用程序处理货币余额,则一致性极为重要。您可以使用 majority
写关注和读关注来确保您永远不会读取过时的数据或可能回滚的数据。
或者,如果您的应用程序每秒记录来自数百个传感器的温度数据,您可能不会担心自己读取的数据是否包括最新读数。您可以放宽一致性要求,并更快地访问该数据。
写关注
Atlas您可以通过连接string URI 设置 副本集的 写关注级别 。使用majority
写关注确保您的数据成功写入数据库并持久保存。 这是推荐的默认值,对于大多数使用案例来说,这是足够的。 如果您从 复制了连接string Atlas,则其中包括"w=majority"
。
当您使用需要确认的写关注(例如 majority
)时,您还可以指定写入达到该确认级别的最大时间限制:
用于所有写入的 wtimeoutMS 连接字符串参数,或
用于单次写入操作的 wtimeout 选项。
是否使用时间限制以及使用的值取决于应用程序上下文。
重要
如果未指定写入时间限制且写关注的级别无法实现,写入操作则会无限期挂起。
读关注 (read concern)
您可以通过连接字符串 URI 设置 Atlas 副本集的读关注级别。理想的读关注取决于您的应用程序要求,但默认值足以满足大多数使用案例。无需连接字符串参数即可使用默认读关注。
指定读关注可以提高对应用程序从 Atlas 接收数据的保证。
注意
应用程序使用的写关注和读关注的特定组合会对操作顺序保证产生影响。 这称为因果一致性。 有关因果一致性保证的更多信息,请参阅因果一致性和读写关注。
Error Handling
无效的命令、网络服务中断以及未经可重试写入处理的网络错误会返回错误。有关错误详情,请参阅驱动程序的 API 文档。
例如,如果应用程序尝试插入包含数据库集合中已使用的 _id
值的文档,您的驱动程序将返回错误,其中包括:
Unable to insert due to an error: com.mongodb.MongoWriteException: E11000 duplicate key error collection: <db>.<collection> ...
{ "name": : "MongoError", "message": "E11000 duplicate key error collection on: <db>.<collection> ... ", ... }
pymongo.errors.DuplicateKeyError: E11000 duplicate key error collection: <db>.<collection> ...
如果没有正确的错误处理,错误可能会阻止您的应用程序处理请求,直到重新启动为止。
应用程序应处理错误,而不会崩溃或产生副作用。在前面的示例中,应用程序在集合中插入了重复的 _id
,该应用程序可以按如下方式处理错误:
// Declare a logger instance from java.util.logging.Logger private static final Logger LOGGER = ... ... try { InsertOneResult result = collection.insertOne(new Document() .append("_id", 1) .append("body", "I'm a goofball trying to insert a duplicate _id")); // Everything is OK LOGGER.info("Inserted document id: " + result.getInsertedId()); // Refer to the API documentation for specific exceptions to catch } catch (MongoException me) { // Report the error LOGGER.severe("Failed due to an error: " + me); }
... collection.insertOne({ _id: 1, body: "I'm a goofball trying to insert a duplicate _id" }) .then(result => { response.sendStatus(200) // send "OK" message to the client }, err => { response.sendStatus(400); // send "Bad Request" message to the client });
... try: collection.insert_one({ "_id": 1, "body": "I'm a goofball trying to insert a duplicate _id" }) return {"message": "User successfully added!"} except pymongo.errors.DuplicateKeyError as e: print ("The insert operation failed:", e)
此示例中的插入操作在第二次调用时会引发“重复键”错误,因为 _id
字段必须是唯一的。捕获错误,通知客户端,应用继续运行。但是,插入操作失败,您可以决定是否向用户显示消息、重试操作或执行其他操作。
您应该始终记录错误。进一步处理错误的常见策略包括:
将错误返回给客户端,并显示错误消息。当您无法解决错误并且需要通知用户动作无法完成时,这是一个很好的策略。
写入备份数据库。当您无法解决错误但又不想冒丢失请求数据的风险时,这是一个很好的策略。
在单次默认重试之后重试该操作。如果您可以通过编程方式解决错误原因,请重试,这是一个很好的策略。
您必须为应用程序上下文选择最佳策略。
例子
在重复键错误的示例中,您应该记录错误但不要重试操作,因为它永远不会成功。相反,您可以写入回退数据库并稍后查看该数据库的内容,以确保不会丢失任何信息。用户无需执行任何其他操作,数据就会被记录下来,因此您可以选择不向客户端发送错误消息。
规划网络错误
当操作无限期挂起并阻止应用程序执行新操作时,返回错误可能是理想行为。 您可以使用maxTimeMS方法对单个操作设置时间限制,如果超过该时间限制,则返回错误供应用程序进行处理。
对每个操作设置的时间限制取决于该操作的上下文。
例子
如果您的应用程序读取并显示 inventory
集合中的简单产品信息,您可以确信这些读取操作只需要一点时间。查询长时间运行表明一直存在网络问题。将该操作的 maxTimeMS
设置为 5000(即 5 秒)意味着,一旦您确信存在网络问题,应用程序就会收到反馈。
测试故障转移
基于混乱测试,Atlas 将自动执行副本集选举,以进行定期维护和某些配置更改。
要检查您的应用程序是否能够以足够的弹性来支持副本集选举,请通过模拟故障转移事件来测试故障转移过程。
弹性示例应用程序
该示例应用程序汇集了以下建议,以确保针对网络中断和故障转移事件的恢复能力:
使用 Atlas 提供的连接字符串,具有可重试写入、多数写关注和默认读关注。
处理重复键和超时的错误。
该应用程序是一个 HTTP API ,允许客户端创建或列出用户记录。 它公开一个接受 GET 和 POST 请求的端点 http://localhost:3000 :
方法 | 端点 | 说明 |
---|---|---|
|
| 从 |
|
| 要求在请求正文中加入 |
1 // File: App.java 2 3 import java.util.Map; 4 import java.util.logging.Logger; 5 6 import org.bson.Document; 7 import org.json.JSONArray; 8 9 import com.mongodb.MongoException; 10 import com.mongodb.client.MongoClient; 11 import com.mongodb.client.MongoClients; 12 import com.mongodb.client.MongoCollection; 13 import com.mongodb.client.MongoDatabase; 14 15 import fi.iki.elonen.NanoHTTPD; 16 17 public class App extends NanoHTTPD { 18 private static final Logger LOGGER = Logger.getLogger(App.class.getName()); 19 20 static int port = 3000; 21 static MongoClient client = null; 22 23 public App() throws Exception { 24 super(port); 25 26 // Replace the uri string with your MongoDB deployment's connection string 27 String uri = "<atlas-connection-string>"; 28 client = MongoClients.create(uri); 29 30 start(NanoHTTPD.SOCKET_READ_TIMEOUT, false); 31 LOGGER.info("\nStarted the server: http://localhost:" + port + "/ \n"); 32 } 33 34 public static void main(String[] args) { 35 try { 36 new App(); 37 } catch (Exception e) { 38 LOGGER.severe("Couldn't start server:\n" + e); 39 } 40 } 41 42 43 public Response serve(IHTTPSession session) { 44 StringBuilder msg = new StringBuilder(); 45 Map<String, String> params = session.getParms(); 46 47 Method reqMethod = session.getMethod(); 48 String uri = session.getUri(); 49 50 if (Method.GET == reqMethod) { 51 if (uri.equals("/")) { 52 msg.append("Welcome to my API!"); 53 } else if (uri.equals("/users")) { 54 msg.append(listUsers(client)); 55 } else { 56 msg.append("Unrecognized URI: ").append(uri); 57 } 58 } else if (Method.POST == reqMethod) { 59 try { 60 String name = params.get("name"); 61 if (name == null) { 62 throw new Exception("Unable to process POST request: 'name' parameter required"); 63 } else { 64 insertUser(client, name); 65 msg.append("User successfully added!"); 66 } 67 } catch (Exception e) { 68 msg.append(e); 69 } 70 } 71 72 return newFixedLengthResponse(msg.toString()); 73 } 74 75 static String listUsers(MongoClient client) { 76 MongoDatabase database = client.getDatabase("test"); 77 MongoCollection<Document> collection = database.getCollection("users"); 78 79 final JSONArray jsonResults = new JSONArray(); 80 collection.find().forEach((result) -> jsonResults.put(result.toJson())); 81 82 return jsonResults.toString(); 83 } 84 85 static String insertUser(MongoClient client, String name) throws MongoException { 86 MongoDatabase database = client.getDatabase("test"); 87 MongoCollection<Document> collection = database.getCollection("users"); 88 89 collection.insertOne(new Document().append("name", name)); 90 return "Successfully inserted user: " + name; 91 } 92 }
注意
以下服务器应用程序使用 Express,您需要先将其作为依赖项添加到项目中,然后才能运行它。
1 const express = require('express'); 2 const bodyParser = require('body-parser'); 3 4 // Use the latest drivers by installing & importing them 5 const MongoClient = require('mongodb').MongoClient; 6 7 const app = express(); 8 app.use(bodyParser.json()); 9 app.use(bodyParser.urlencoded({ extended: true })); 10 11 const uri = "mongodb+srv://<db_username>:<db_password>@cluster0-111xx.mongodb.net/test?retryWrites=true&w=majority"; 12 13 const client = new MongoClient(uri, { 14 useNewUrlParser: true, 15 useUnifiedTopology: true 16 }); 17 18 // ----- API routes ----- // 19 app.get('/', (req, res) => res.send('Welcome to my API!')); 20 21 app.get('/users', (req, res) => { 22 const collection = client.db("test").collection("users"); 23 24 collection 25 .find({}) 26 .maxTimeMS(5000) 27 .toArray((err, data) => { 28 if (err) { 29 res.send("The request has timed out. Please check your connection and try again."); 30 } 31 return res.json(data); 32 }); 33 }); 34 35 app.post('/users', (req, res) => { 36 const collection = client.db("test").collection("users"); 37 collection.insertOne({ name: req.body.name }) 38 .then(result => { 39 res.send("User successfully added!"); 40 }, err => { 41 res.send("An application error has occurred. Please try again."); 42 }) 43 }); 44 // ----- End of API routes ----- // 45 46 app.listen(3000, () => { 47 console.log(`Listening on port 3000.`); 48 client.connect(err => { 49 if (err) { 50 console.log("Not connected: ", err); 51 process.exit(0); 52 } 53 console.log('Connected.'); 54 }); 55 });
注意
以下 Web应用程序使用 FastAPI 。要创建新应用程序,请使用 FastAPI示例文件 结构。
1 # File: main.py 2 3 from fastapi import FastAPI, Body, Request, Response, HTTPException, status 4 from fastapi.encoders import jsonable_encoder 5 6 from typing import List 7 from models import User 8 9 import pymongo 10 from pymongo import MongoClient 11 from pymongo import errors 12 13 # Replace the uri string with your |service| connection string 14 uri = "<atlas-connection-string>" 15 db = "test" 16 17 app = FastAPI() 18 19 20 def startup_db_client(): 21 app.mongodb_client = MongoClient(uri) 22 app.database = app.mongodb_client[db] 23 24 25 def shutdown_db_client(): 26 app.mongodb_client.close() 27 28 ##### API ROUTES ##### 29 30 def list_users(request: Request): 31 try: 32 users = list(request.app.database["users"].find().max_time_ms(5000)) 33 return users 34 except pymongo.errors.ExecutionTimeout: 35 raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail="The request has timed out. Please check your connection and try again.") 36 37 38 def new_user(request: Request, user: User = Body(...)): 39 user = jsonable_encoder(user) 40 try: 41 new_user = request.app.database["users"].insert_one(user) 42 return {"message":"User successfully added!"} 43 except pymongo.errors.DuplicateKeyError: 44 raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Could not create user due to existing '_id' value in the collection. Try again with a different '_id' value.")