Docs 菜单
Docs 主页
/
MongoDB Manual
/ /

读取隔离性、一致性和新近度

在此页面上

  • 隔离性保证
  • 单调写入
  • 实时顺序
  • 因果一致性(Causal Consistency)

根据读关注,客户端可在写入操作 持久化之前看到写入结果:

对于多文档事务中的操作,当事务提交时,该事务中进行的所有数据更改都将保存并在事务外部可见。换言之,一个事务不会在回滚其他事务的同时提交某些更改。

在事务进行提交前,在事务中所做的数据更改在事务外不可见。

不过,当事务写入多个分片时,并非所有外部读取操作都需等待已提交事务的结果在各个分片上可见。例如,如果事务已提交并且写入 1 在分片 A 上可见,但写入 2 在分片 B 上尚不可见,则读关注 "local" 处的外部读取可以在不看到写入 2 的情况下读取写入 1 的结果。

未提交读取是默认隔离级别,适用于 mongod 独立运行实例以及副本集和分片集群。

写入操作对于单个文档来说是原子的;即,如果写入正在更新文档中的多个字段,则读取操作将永远不会看到仅更新了某些字段的文档。然而,尽管客户端可能看不到部分更新的文档,但未提交读取意味着并发读取操作在更改持久性之前仍可能看到更新的文档。

使用独立的 mongod 实例时,可以对单个文档的一组读取和写入操作执行序列化。使用副本集时,只有在没有回滚的情况下才能对单个文档的一组读取和写入操作执行序列化。

当单个写操作(例如 db.collection.updateMany())修改了多份文档,则每份文档的修改都是原子性的,但整个操作不是原子性的。

在执行多文档写入操作时,无论是通过单次写入操作还是多次写入操作,其他操作都可能会交错进行。

对于需要对多个文档(在单个或多个集合中)原子性读取和写入的情况,MongoDB 支持分布式事务,包括副本集和分片集群上的事务。

有关详细信息,请参阅事务

重要

在大多数情况下,与单文档写入操作相比,分布式事务会产生更高的性能成本,并且分布式事务的可用性不应取代有效的模式设计。在许多情况下,非规范化数据模型(嵌入式文档和数组)仍然是数据和使用案例的最佳选择。换言之,对于许多场景,适当的数据建模将最大限度地减少对分布式事务的需求。

有关其他事务使用注意事项(如运行时间限制和 oplog 大小限制),另请参阅生产注意事项

在不隔离多文档写操作的情况下,MongoDB 表现出以下行为:

  1. 非时间点读操作。假设读操作在时间 t 1 开始并开始读取文档。写操作会在稍后某个时间 t 2 提交对其中一份文档的更新。读取者可能会看到该文档的更新版本,因此看不到数据的时间点快照。

  2. 不可序列化的操作。假设读操作在时间 t 1 读取文档 d 1 并且写操作在随后的时间 t 3 更新d 1。该操作引入读写依赖关系,使得如果要进行序列化操作,则读取操作必须先于写入操作。此外,假设写入操作在时间 t 2 更新文档 d 2,并且读取操作在随后的时间 t 4 读取d 2。该操作引入写读依赖关系,这将要求在可序列化计划中读取操作在写入操作之后进行。这种依赖关系循环,使序列化性变得无法实现。

  3. 读取可能会遗漏在读取操作过程中更新的匹配文档。

在某些情况下,MongoDB 游标可以多次返回同一文档。当游标返回文档时,其他操作可能会与查询交错进行。如果其中一个操作改变了查询所用索引的索引字段,那么游标可能会多次返回同一文档。

某些情况下,使用唯一索引的查询可能会返回重复值。如果使用唯一索引的游标与共享相同唯一值的文档的删除和插入交错,则该游标可能会从不同的文档两次返回相同的唯一值。

考虑使用读取隔离。要了解更多信息,请参阅读关注 "snapshot"

默认情况下,MongoDB 为独立的 mongod 实例和副本集提供单调写入保证。

对于单调写入和分片集群,请参阅 因果一致性

对于主线程的读取和写入操作,发出具有 "linearizable" 读关注的读取操作和具有 "majority" 写关注的写入操作使多个线程能够对单个文档执行读写,就像单个线程实时执行这些操作一样;换言之,这些读写的相应计划表被认为是可线性化的。

提示

另请参阅:

如果操作在逻辑上依赖于先前的操作,则这些操作之间存在因果关系。 例如,根据指定条件删除所有文档的写入操作与验证删除操作的后续读取操作之间存在因果关系。

通过因果一致的会话,MongoDB 按照其因果关系的顺序执行因果操作,并且客户端观察到与因果关系一致的结果。

为了提供因果一致性,MongoDB 在客户端会话中启用因果一致性。因果一致会话表示具有 "majority" 读关注的读操作和具有 "majority" 写关注的写操作的关联序列具有因果关系,该因果关系通过其顺序反映出来。 应用程序必须确保在客户端会话中一次只能有一个线程执行这些操作。

对于因果相关的操作:

  1. 客户端会启动客户端会话。

    重要

    客户端会话仅保证以下方面的因果一致性:

    • 具有 "majority" 的读取操作;即返回数据已得到大多数副本集成员的确认,并且具有持久性。

    • 具有 "majority" 写关注的写入操作;即请求确认该操作已应用于副本集的大多数有投票权成员的写入操作。

    有关因果一致性和各种读写关注的更多信息,请参阅因果一致性和读写关注

  2. 当客户端发出一系列具有 "majority" 读关注的读取和写入操作(具有 "majority" 写关注)时,客户端会在每个操作中包含会话信息。

  3. 对于与会话关联的具有 "majority" 的每个读取操作以及具有 "majority" 写关注的写入操作,即使操作错误,MongoDB 也会返回操作时间和集群时间。客户端会话会跟踪操作时间和集群时间。

    注意

    MongoDB 不返回未确认 (w: 0) 写操作的操作时间和集群时间。未确认的写并不意味着任何因果关系。

    尽管 MongoDB 在客户端会话中返回读取操作和已确认写入操作的操作时间和集群时间,但只有具有 "majority" 读关注的读取操作和具有 "majority" 写关注的写入操作才能保证因果一致性。有关详细信息,请参阅因果一致性和读写关注。

  4. 相关客户端会话会追踪这两个时间字段。

    注意

    不同会话之间的操作可以具有因果一致性。 MongoDB驱动程序和mongosh提供了提前客户端会话的 optime 和集群时间的方法。 因此,客户端可以提前一个客户端会话的集群时间和 optime,以与另一个客户端会话的操作保持一致。

下表列出了因果一致性会话为具有 "majority" 读关注的读取操作和具有 "majority" 写关注的写入操作提供的因果一致性保证。

保证
说明

读取写入操作

读取操作反映了它们之前的写入操作的结果。

单调读取

读取操作不会返回与特定数据状态(在某一先前读取操作之前出现的状态)相对应的结果。

例如,如果在会话中:

  • 写入 1 先于写入 2

  • 读取 1 先于读取 2,并且

  • 读取操作1返回结果,反映写入操作2

然后读取 2 无法返回写入 1 的结果。

单调写入

必须在其他写入之前并且会在其他写入之前执行的写入操作。

例如,如果在会话中写入 1 必须先于写入 2,则写入 2 时的数据状态必须反映写入 1 后数据的状态。其他写入可以在写入 1 和写入 2 之间交错进行,但写入 2 不能在写入 1 之前发生。

读取后写入

读操作之后必须发生的写操作在这些读操作之后执行。换言之,写入时的数据状态必须包含先前读操作的数据状态。

这些保证适用于 MongoDB 部署的所有节点。如果在因果一致的会话中,您发出带 "majority" 写关注的写操作,接着发出从从节点读取的带 "majority" 读关注的读操作(即读取偏好 secondary),则读操作将反映写操作后数据库的状态。

因果一致会话内的操作并不与会话外的操作隔离。如果并发写入操作在会话的写入和读取操作之间交错进行,则会话的读取操作可能会返回反映会话写入操作之后发生的写入操作的结果。

提示

应用程序必须确保在客户端会话中一次只有一个线程执行这些操作。

客户端需要针对 MongoDB 3.6 或更高版本更新 MongoDB 驱动程序:

Java 3.6 +

Python 3.6+

C 1.9+

Go 1.8+

C# 2.5 +

节点3.0 +

Ruby 2.5+

Rust 2.1+

Swift 1.2 +

Perl 2.0 +

PHPC 1.4 +

Scala 2.2+

C++ 3.6.6 +

重要

因果一致的会话只能保证具有 "majority" 读关注的读取和具有 "majority" 写关注的写入的因果一致性。

考虑维护各种项目的当前和历史数据的集合 items。只有历史数据具有非空值的 end 日期。如果某个项目的 sku 值发生更改,则需要使用 end 日期更新具有旧 sku 值的文档,然后插入具有当前 sku 值的新文档。客户端可以使用因果一致会话,确保更新发生在插入之前。


➤ 使用右上角的 Select your language(选择语言)下拉菜单设置本示例的语言。


/* Use a causally-consistent session to run some operations. */
wc = mongoc_write_concern_new ();
mongoc_write_concern_set_wmajority (wc, 1000);
mongoc_collection_set_write_concern (coll, wc);
rc = mongoc_read_concern_new ();
mongoc_read_concern_set_level (rc, MONGOC_READ_CONCERN_LEVEL_MAJORITY);
mongoc_collection_set_read_concern (coll, rc);
session_opts = mongoc_session_opts_new ();
mongoc_session_opts_set_causal_consistency (session_opts, true);
session1 = mongoc_client_start_session (client, session_opts, &error);
if (!session1) {
fprintf (stderr, "couldn't start session: %s\n", error.message);
goto cleanup;
}
/* Run an update_one with our causally-consistent session. */
update_opts = bson_new ();
res = mongoc_client_session_append (session1, update_opts, &error);
if (!res) {
fprintf (stderr, "couldn't add session to opts: %s\n", error.message);
goto cleanup;
}
query = BCON_NEW ("sku", "111");
update = BCON_NEW ("$set", "{", "end",
BCON_DATE_TIME (bson_get_monotonic_time ()), "}");
res = mongoc_collection_update_one (coll,
query,
update,
update_opts,
NULL, /* reply */
&error);
if (!res) {
fprintf (stderr, "update failed: %s\n", error.message);
goto cleanup;
}
/* Run an insert with our causally-consistent session */
insert_opts = bson_new ();
res = mongoc_client_session_append (session1, insert_opts, &error);
if (!res) {
fprintf (stderr, "couldn't add session to opts: %s\n", error.message);
goto cleanup;
}
insert = BCON_NEW ("sku", "nuts-111", "name", "Pecans",
"start", BCON_DATE_TIME (bson_get_monotonic_time ()));
res = mongoc_collection_insert_one (coll, insert, insert_opts, NULL, &error);
if (!res) {
fprintf (stderr, "insert failed: %s\n", error.message);
goto cleanup;
}
using (var session1 = client.StartSession(new ClientSessionOptions { CausalConsistency = true }))
{
var currentDate = DateTime.UtcNow.Date;
var items = client.GetDatabase(
"test",
new MongoDatabaseSettings
{
ReadConcern = ReadConcern.Majority,
WriteConcern = new WriteConcern(
WriteConcern.WMode.Majority,
TimeSpan.FromMilliseconds(1000))
})
.GetCollection<BsonDocument>("items");
items.UpdateOne(session1,
Builders<BsonDocument>.Filter.And(
Builders<BsonDocument>.Filter.Eq("sku", "111"),
Builders<BsonDocument>.Filter.Eq("end", BsonNull.Value)),
Builders<BsonDocument>.Update.Set("end", currentDate));
items.InsertOne(session1, new BsonDocument
{
{"sku", "nuts-111"},
{"name", "Pecans"},
{"start", currentDate}
});
}
// Example 1: Use a causally consistent session to ensure that the update occurs before the insert.
ClientSession session1 = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build());
Date currentDate = new Date();
MongoCollection<Document> items = client.getDatabase("test")
.withReadConcern(ReadConcern.MAJORITY)
.withWriteConcern(WriteConcern.MAJORITY.withWTimeout(1000, TimeUnit.MILLISECONDS))
.getCollection("test");
items.updateOne(session1, eq("sku", "111"), set("end", currentDate));
Document document = new Document("sku", "nuts-111")
.append("name", "Pecans")
.append("start", currentDate);
items.insertOne(session1, document);
async with await client.start_session(causal_consistency=True) as s1:
current_date = datetime.datetime.today()
items = client.get_database(
"test",
read_concern=ReadConcern("majority"),
write_concern=WriteConcern("majority", wtimeout=1000),
).items
await items.update_one(
{"sku": "111", "end": None}, {"$set": {"end": current_date}}, session=s1
)
await items.insert_one(
{"sku": "nuts-111", "name": "Pecans", "start": current_date}, session=s1
)
my $s1 = $conn->start_session({ causalConsistency => 1 });
$items = $conn->get_database(
"test", {
read_concern => { level => 'majority' },
write_concern => { w => 'majority', wtimeout => 10000 },
}
)->get_collection("items");
$items->update_one(
{
sku => 111,
end => undef
},
{
'$set' => { end => $current_date}
},
{
session => $s1
}
);
$items->insert_one(
{
sku => "nuts-111",
name => "Pecans",
start => $current_date
},
{
session => $s1
}
);
$items = $client->selectDatabase(
'test',
[
'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::MAJORITY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->items;
$s1 = $client->startSession(
['causalConsistency' => true],
);
$currentDate = new \MongoDB\BSON\UTCDateTime();
$items->updateOne(
['sku' => '111', 'end' => ['$exists' => false]],
['$set' => ['end' => $currentDate]],
['session' => $s1],
);
$items->insertOne(
['sku' => '111-nuts', 'name' => 'Pecans', 'start' => $currentDate],
['session' => $s1],
);
with client.start_session(causal_consistency=True) as s1:
current_date = datetime.datetime.today()
items = client.get_database(
"test",
read_concern=ReadConcern("majority"),
write_concern=WriteConcern("majority", wtimeout=1000),
).items
items.update_one(
{"sku": "111", "end": None}, {"$set": {"end": current_date}}, session=s1
)
items.insert_one(
{"sku": "nuts-111", "name": "Pecans", "start": current_date}, session=s1
)
let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true))
let currentDate = Date()
var dbOptions = MongoDatabaseOptions(
readConcern: .majority,
writeConcern: try .majority(wtimeoutMS: 1000)
)
let items = client1.db("test", options: dbOptions).collection("items")
let result1 = items.updateOne(
filter: ["sku": "111", "end": .null],
update: ["$set": ["end": .datetime(currentDate)]],
session: s1
).flatMap { _ in
items.insertOne(["sku": "nuts-111", "name": "Pecans", "start": .datetime(currentDate)], session: s1)
}
let s1 = client1.startSession(options: ClientSessionOptions(causalConsistency: true))
let currentDate = Date()
var dbOptions = MongoDatabaseOptions(
readConcern: .majority,
writeConcern: try .majority(wtimeoutMS: 1000)
)
let items = client1.db("test", options: dbOptions).collection("items")
try items.updateOne(
filter: ["sku": "111", "end": .null],
update: ["$set": ["end": .datetime(currentDate)]],
session: s1
)
try items.insertOne(["sku": "nuts-111", "name": "Pecans", "start": .datetime(currentDate)], session: s1)

如果其他客户端需要读取所有当前的 sku 值,您可以将集群时间和操作时间提前到其他会话的时间,以确保该客户端与其他会话因果一致并在两次写入后读取:

/* Make a new session, session2, and make it causally-consistent
* with session1, so that session2 will read session1's writes. */
session2 = mongoc_client_start_session (client, session_opts, &error);
if (!session2) {
fprintf (stderr, "couldn't start session: %s\n", error.message);
goto cleanup;
}
/* Set the cluster time for session2 to session1's cluster time */
cluster_time = mongoc_client_session_get_cluster_time (session1);
mongoc_client_session_advance_cluster_time (session2, cluster_time);
/* Set the operation time for session2 to session2's operation time */
mongoc_client_session_get_operation_time (session1, &timestamp, &increment);
mongoc_client_session_advance_operation_time (session2,
timestamp,
increment);
/* Run a find on session2, which should now find all writes done
* inside of session1 */
find_opts = bson_new ();
res = mongoc_client_session_append (session2, find_opts, &error);
if (!res) {
fprintf (stderr, "couldn't add session to opts: %s\n", error.message);
goto cleanup;
}
find_query = BCON_NEW ("end", BCON_NULL);
read_prefs = mongoc_read_prefs_new (MONGOC_READ_SECONDARY);
cursor = mongoc_collection_find_with_opts (coll,
query,
find_opts,
read_prefs);
while (mongoc_cursor_next (cursor, &result)) {
json = bson_as_relaxed_extended_json (result, NULL);
fprintf (stdout, "Document: %s\n", json);
bson_free (json);
}
if (mongoc_cursor_error (cursor, &error)) {
fprintf (stderr, "cursor failure: %s\n", error.message);
goto cleanup;
}
using (var session2 = client.StartSession(new ClientSessionOptions { CausalConsistency = true }))
{
session2.AdvanceClusterTime(session1.ClusterTime);
session2.AdvanceOperationTime(session1.OperationTime);
var items = client.GetDatabase(
"test",
new MongoDatabaseSettings
{
ReadPreference = ReadPreference.Secondary,
ReadConcern = ReadConcern.Majority,
WriteConcern = new WriteConcern(WriteConcern.WMode.Majority, TimeSpan.FromMilliseconds(1000))
})
.GetCollection<BsonDocument>("items");
var filter = Builders<BsonDocument>.Filter.Eq("end", BsonNull.Value);
foreach (var item in items.Find(session2, filter).ToEnumerable())
{
// process item
}
}
// Example 2: Advance the cluster time and the operation time to that of the other session to ensure that
// this client is causally consistent with the other session and read after the two writes.
ClientSession session2 = client.startSession(ClientSessionOptions.builder().causallyConsistent(true).build());
session2.advanceClusterTime(session1.getClusterTime());
session2.advanceOperationTime(session1.getOperationTime());
items = client.getDatabase("test")
.withReadPreference(ReadPreference.secondary())
.withReadConcern(ReadConcern.MAJORITY)
.withWriteConcern(WriteConcern.MAJORITY.withWTimeout(1000, TimeUnit.MILLISECONDS))
.getCollection("items");
for (Document item: items.find(session2, eq("end", BsonNull.VALUE))) {
System.out.println(item);
}
async with await client.start_session(causal_consistency=True) as s2:
s2.advance_cluster_time(s1.cluster_time)
s2.advance_operation_time(s1.operation_time)
items = client.get_database(
"test",
read_preference=ReadPreference.SECONDARY,
read_concern=ReadConcern("majority"),
write_concern=WriteConcern("majority", wtimeout=1000),
).items
async for item in items.find({"end": None}, session=s2):
print(item)
my $s2 = $conn->start_session({ causalConsistency => 1 });
$s2->advance_cluster_time( $s1->cluster_time );
$s2->advance_operation_time( $s1->operation_time );
$items = $conn->get_database(
"test", {
read_preference => 'secondary',
read_concern => { level => 'majority' },
write_concern => { w => 'majority', wtimeout => 10000 },
}
)->get_collection("items");
$cursor = $items->find( { end => undef }, { session => $s2 } );
for my $item ( $cursor->all ) {
say join(" ", %$item);
}
$s2 = $client->startSession(
['causalConsistency' => true],
);
$s2->advanceClusterTime($s1->getClusterTime());
$s2->advanceOperationTime($s1->getOperationTime());
$items = $client->selectDatabase(
'test',
[
'readPreference' => new \MongoDB\Driver\ReadPreference(\MongoDB\Driver\ReadPreference::SECONDARY),
'readConcern' => new \MongoDB\Driver\ReadConcern(\MongoDB\Driver\ReadConcern::MAJORITY),
'writeConcern' => new \MongoDB\Driver\WriteConcern(\MongoDB\Driver\WriteConcern::MAJORITY, 1000),
],
)->items;
$result = $items->find(
['end' => ['$exists' => false]],
['session' => $s2],
);
foreach ($result as $item) {
var_dump($item);
}
with client.start_session(causal_consistency=True) as s2:
s2.advance_cluster_time(s1.cluster_time)
s2.advance_operation_time(s1.operation_time)
items = client.get_database(
"test",
read_preference=ReadPreference.SECONDARY,
read_concern=ReadConcern("majority"),
write_concern=WriteConcern("majority", wtimeout=1000),
).items
for item in items.find({"end": None}, session=s2):
print(item)
let options = ClientSessionOptions(causalConsistency: true)
let result2: EventLoopFuture<Void> = client2.withSession(options: options) { s2 in
// The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above.
s2.advanceClusterTime(to: s1.clusterTime!)
s2.advanceOperationTime(to: s1.operationTime!)
dbOptions.readPreference = .secondary
let items2 = client2.db("test", options: dbOptions).collection("items")
return items2.find(["end": .null], session: s2).flatMap { cursor in
cursor.forEach { item in
print(item)
}
}
}
try client2.withSession(options: ClientSessionOptions(causalConsistency: true)) { s2 in
// The cluster and operation times are guaranteed to be non-nil since we already used s1 for operations above.
s2.advanceClusterTime(to: s1.clusterTime!)
s2.advanceOperationTime(to: s1.operationTime!)
dbOptions.readPreference = .secondary
let items2 = client2.db("test", options: dbOptions).collection("items")
for item in try items2.find(["end": .null], session: s2) {
print(item)
}
}

以下构建内存结构的操作在因果关系上不一致:

操作
注意

$collStats,带 latencyStats 选项。

如果操作与因果一致的客户端会话关联,则返回错误。

如果操作与因果一致的客户端会话关联,则返回错误。

如果操作与因果一致的客户端会话关联,则返回错误。

如果操作与因果一致的客户端会话关联,则返回错误。

后退

句点