快速入门
本教程旨在介绍 PyMongoArrow 的使用。本教程假定读者熟悉基本的 PyMongo 和MongoDB概念。
先决条件
确保您已安装PyMongoArrow 发行版 。 在Python shell中,运行以下命令应不会引发异常:
import pymongoarrow as pma
本教程还假设 MongoDB 实例正在默认主机和端口上运行。 下载并安装MongoDB 后,您可以启动它,如以下代码示例所示:
$ mongod
扩展 PyMongo
pymongoarrow.monkey
模块提供了一个接口,用于就地修补 PyMongo,并将 PyMongoArrow 功能直接添加到Collection
实例:
from pymongoarrow.monkey import patch_all patch_all()
运行monkey.patch_all()
方法后, Collection
类的新实例将包含 PyMongoArrow API,例如pymongoarrow.api.find_pandas_all()
方法。
注意
您还可以通过从pymongoarrow.api
模块导入任何 PyMongoArrow API 来使用它们。 如果这样做,则在调用 API 方法时,必须将要对其运行操作的Collection
实例作为第一个参数传递。
测试数据
以下代码使用 PyMongo 将样本数据添加到集群:
from datetime import datetime from pymongo import MongoClient client = MongoClient() client.db.data.insert_many([ {'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']}, {'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']}, {'_id': 3, 'amount': 3, 'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']}, {'_id': 4, 'amount': 0, 'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])
定义模式
PyMongoArrow 依赖数据模式将查询结果集编组为表格形式。 如果您不提供此模式,PyMongoArrow 会从数据中推断出一个模式。 您可以通过创建Schema
对象并将字段名称映射到类型说明符来定义模式,如以下示例所示:
from pymongoarrow.api import Schema schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})
MongoDB 使用嵌入式文档来表示嵌套数据。 PyMongoArrow 为这些文档提供一流的支持:
schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})
PyMongoArrow 还支持列表和嵌套列表:
from pyarrow import list_, string schema = Schema({'txns': list_(string())}) polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)
提示
PyMongoArrow 为每种支持的 BSON 类型包含多个允许的类型标识符。 有关这些数据类型及其关联的类型标识符的完整列表,请参阅数据类型。
查找操作
以下代码示例展示了如何将amount
字段具有非零值的所有记录作为pandas.DataFrame
对象加载:
df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)
您还可以加载与pyarrow.Table
实例相同的结果集:
arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)
或者作为polars.DataFrame
实例:
df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)
或者作为 NumPy arrays
对象:
ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)
使用 NumPy 时,返回值是字典,其中键是字段名称,值是相应的numpy.ndarray
实例。
注意
在前面的所有示例中,您都可以省略模式,如下例所示:
arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})
如果省略模式,PyMongoArrow 会尝试根据第一批处理中包含的数据自动应用模式。
聚合操作
运行聚合操作与运行查找操作类似,但需要执行一系列操作。
以下是输出一个新数据帧的aggregate_pandas_all()
方法的简单示例,其中所有_id
值都分组在一起,并对它们的amount
值求和:
df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])
您还可以对嵌入式文档运行聚合操作。 以下示例展开嵌套txn
字段中的值,计算每个值的数量,然后以 NumPy ndarray
对象列表的形式返回结果,并按降序排序:
pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}] ndarrays = client.db.data.aggregate_numpy_all(pipeline)
提示
有关聚合管道的更多信息,请参阅MongoDB Server 文档。
写入 MongoDB
您可以使用write()
方法将以下类型的对象写入 MongoDB:
箭头
Table
Pandas
DataFrame
NumPy
ndarray
极地
DataFrame
from pymongoarrow.api import write from pymongo import MongoClient coll = MongoClient().db.my_collection write(coll, df) write(coll, arrow_table) write(coll, ndarrays)
注意
NumPy 数组指定为dict[str, ndarray]
。
Ignore Empty Values
write()
方法可以选择接受布尔值 exclude_none
参数。如果将此参数设立为 True
,则驾驶员不会写入空值写入数据库。如果将此参数设立为 False
或留空,则驾驶员将为每个空字段写入 None
。
以下示例中的代码将箭头 Table
写入MongoDB两次。'b'
字段中的一个值设立为 None
。
第一次调用 write()
方法时省略了 exclude_none
参数,因此默认值为 False
。Table
中的所有值(包括 None
)都将写入数据库。第二次调用 write()
方法会将 exclude_none
设为 True
,因此会忽略 'b'
字段中的空值。
data_a = [1, 2, 3] data_b = [1, None, 3] data = Table.from_pydict( { "a": data_a, "b": data_b, }, ) coll.drop() write(coll, data) col_data = list(coll.find({})) coll.drop() write(coll, data, exclude_none=True) col_data_exclude_none = list(coll.find({})) print(col_data) print(col_data_exclude_none)
{'_id': ObjectId('...'), 'a': 1, 'b': 1} {'_id': ObjectId('...'), 'a': 2, 'b': None} {'_id': ObjectId('...'), 'a': 3, 'b': 3} {'_id': ObjectId('...'), 'a': 1, 'b': 1} {'_id': ObjectId('...'), 'a': 2} {'_id': ObjectId('...'), 'a': 3, 'b': 3}
写入其他格式
加载结果集后,您可以将其写入包支持的任何格式。
例如,要将变量arrow_table
引用的表写入名为example.parquet
的 Parquet 文件,请运行以下代码:
import pyarrow.parquet as pq pq.write_table(arrow_table, 'example.parquet')
Pandas 还支持将DataFrame
实例写入各种格式,包括 CSV 和 HDF。 要将变量df
引用的数据框写入名为out.csv
的 CSV 文件,请运行以下代码:
df.to_csv('out.csv', index=False)
Polars API 是前面两个示例的混合:
import polars as pl df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]}) df.write_parquet('example.parquet')
注意
Parquet 读写操作支持嵌套数据,但 Arrow 或 Pandas 不能很好地支持 CSV 读写操作。