Docs 菜单
Docs 主页
/ / /
PyMongoArrow

快速入门

在此页面上

  • 先决条件
  • 扩展 PyMongo
  • 测试数据
  • 定义模式
  • 查找操作
  • 聚合操作
  • 写入 MongoDB
  • Ignore Empty Values
  • 写入其他格式

本教程旨在介绍 PyMongoArrow 的使用。本教程假定读者熟悉基本的 PyMongo 和MongoDB概念。

确保您已安装PyMongoArrow 发行版 。 在Python shell中,运行以下命令应不会引发异常:

>>> import pymongoarrow as pma

本教程还假设 MongoDB 实例正在默认主机和端口上运行。 下载并安装MongoDB 后,您可以启动它,如以下代码示例所示:

$ mongod

pymongoarrow.monkey模块提供了一个接口,用于就地修补 PyMongo,并将 PyMongoArrow 功能直接添加到Collection实例:

from pymongoarrow.monkey import patch_all
patch_all()

运行monkey.patch_all()方法后, Collection类的新实例将包含 PyMongoArrow API,例如pymongoarrow.api.find_pandas_all()方法。

注意

您还可以通过从pymongoarrow.api模块导入任何 PyMongoArrow API 来使用它们。 如果这样做,则在调用 API 方法时,必须将要对其运行操作的Collection实例作为第一个参数传递。

以下代码使用 PyMongo 将样本数据添加到集群:

from datetime import datetime
from pymongo import MongoClient
client = MongoClient()
client.db.data.insert_many([
{'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']},
{'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']},
{'_id': 3, 'amount': 3, 'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']},
{'_id': 4, 'amount': 0, 'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])

PyMongoArrow 依赖数据模式将查询结果集编组为表格形式。 如果您不提供此模式,PyMongoArrow 会从数据中推断出一个模式。 您可以通过创建Schema对象并将字段名称映射到类型说明符来定义模式,如以下示例所示:

from pymongoarrow.api import Schema
schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})

MongoDB 使用嵌入式文档来表示嵌套数据。 PyMongoArrow 为这些文档提供一流的支持:

schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})

PyMongoArrow 还支持列表和嵌套列表:

from pyarrow import list_, string
schema = Schema({'txns': list_(string())})
polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

提示

PyMongoArrow 为每种支持的 BSON 类型包含多个允许的类型标识符。 有关这些数据类型及其关联的类型标识符的完整列表,请参阅数据类型。

以下代码示例展示了如何将amount字段具有非零值的所有记录作为pandas.DataFrame对象加载:

df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)

您还可以加载与pyarrow.Table实例相同的结果集:

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)

或者作为polars.DataFrame实例:

df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)

或者作为 NumPy arrays对象:

ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)

使用 NumPy 时,返回值是字典,其中键是字段名称,值是相应的numpy.ndarray实例。

注意

在前面的所有示例中,您都可以省略模式,如下例所示:

arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})

如果省略模式,PyMongoArrow 会尝试根据第一批处理中包含的数据自动应用模式。

运行聚合操作与运行查找操作类似,但需要执行一系列操作。

以下是输出一个新数据帧的aggregate_pandas_all()方法的简单示例,其中所有_id值都分组在一起,并对它们的amount值求和:

df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])

您还可以对嵌入式文档运行聚合操作。 以下示例展开嵌套txn字段中的值,计算每个值的数量,然后以 NumPy ndarray对象列表的形式返回结果,并按降序排序:

pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}]
ndarrays = client.db.data.aggregate_numpy_all(pipeline)

提示

有关聚合管道的更多信息,请参阅MongoDB Server 文档。

您可以使用write()方法将以下类型的对象写入 MongoDB:

  • 箭头 Table

  • Pandas DataFrame

  • NumPy ndarray

  • 极地 DataFrame

from pymongoarrow.api import write
from pymongo import MongoClient
coll = MongoClient().db.my_collection
write(coll, df)
write(coll, arrow_table)
write(coll, ndarrays)

注意

NumPy 数组指定为dict[str, ndarray]

write() 方法可以选择接受布尔值 exclude_none 参数。如果将此参数设立为 True,则驾驶员不会写入空值写入数据库。如果将此参数设立为 False 或留空,则驾驶员将为每个空字段写入 None

以下示例中的代码将箭头 Table 写入MongoDB两次。'b'字段中的一个值设立为 None

第一次调用 write() 方法时省略了 exclude_none 参数,因此默认值为 FalseTable 中的所有值(包括 None)都将写入数据库。第二次调用 write() 方法会将 exclude_none 设为 True,因此会忽略 'b'字段中的空值。

data_a = [1, 2, 3]
data_b = [1, None, 3]
data = Table.from_pydict(
{
"a": data_a,
"b": data_b,
},
)
coll.drop()
write(coll, data)
col_data = list(coll.find({}))
coll.drop()
write(coll, data, exclude_none=True)
col_data_exclude_none = list(coll.find({}))
print(col_data)
print(col_data_exclude_none)
{'_id': ObjectId('...'), 'a': 1, 'b': 1}
{'_id': ObjectId('...'), 'a': 2, 'b': None}
{'_id': ObjectId('...'), 'a': 3, 'b': 3}
{'_id': ObjectId('...'), 'a': 1, 'b': 1}
{'_id': ObjectId('...'), 'a': 2}
{'_id': ObjectId('...'), 'a': 3, 'b': 3}

加载结果集后,您可以将其写入包支持的任何格式。

例如,要将变量arrow_table引用的表写入名为example.parquet的 Parquet 文件,请运行以下代码:

import pyarrow.parquet as pq
pq.write_table(arrow_table, 'example.parquet')

Pandas 还支持将DataFrame实例写入各种格式,包括 CSV 和 HDF。 要将变量df引用的数据框写入名为out.csv的 CSV 文件,请运行以下代码:

df.to_csv('out.csv', index=False)

Polars API 是前面两个示例的混合:

import polars as pl
df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]})
df.write_parquet('example.parquet')

注意

Parquet 读写操作支持嵌套数据,但 Arrow 或 Pandas 不能很好地支持 CSV 读写操作。

后退

安装和升级