クイック スタート
このチュートリアルは、 PyMongoArrow の操作の紹介を目的としています。チュートリアルでは、読者は PyMongo とMongoDBの基本的な概念に理解していることを前提としています。
前提条件
PyMongoArrow ディストリビューションがインストールされていることを確認します。 Python shell では、次の例は例外を発生させずに実行されます。
import pymongoarrow as pma
このチュートリアルでは、MongoDB インスタンスがデフォルトのホストとポートで実行されていることも前提としています。 MongoDB をダウンロードしてインストールしたら、次のコード例に示すように MongoDB を起動できます。
$ mongod
PyMongo の拡張
pymongoarrow.monkey
モジュールは、PyMongo をその場でパッチし、PyMongoArrow 機能をCollection
インスタンスに直接追加するためのインターフェースを提供します。
from pymongoarrow.monkey import patch_all patch_all()
monkey.patch_all()
メソッドを実行すると、 Collection
クラスの新しいインスタンスに PyMongoArrow API など、 pymongoarrow.api.find_pandas_all()
メソッドが含まれます。
注意
また、PyMongoArrow API は、 pymongoarrow.api
モジュールからインポートして使用することもできます。 その場合は、API メソッドを呼び出すときに、操作が実行されるCollection
のインスタンスを最初の引数として渡す必要があります。
テスト データ
次のコードでは PyMongo を使用してサンプル データをクラスターに追加します。
from datetime import datetime from pymongo import MongoClient client = MongoClient() client.db.data.insert_many([ {'_id': 1, 'amount': 21, 'last_updated': datetime(2020, 12, 10, 1, 3, 1), 'account': {'name': 'Customer1', 'account_number': 1}, 'txns': ['A']}, {'_id': 2, 'amount': 16, 'last_updated': datetime(2020, 7, 23, 6, 7, 11), 'account': {'name': 'Customer2', 'account_number': 2}, 'txns': ['A', 'B']}, {'_id': 3, 'amount': 3, 'last_updated': datetime(2021, 3, 10, 18, 43, 9), 'account': {'name': 'Customer3', 'account_number': 3}, 'txns': ['A', 'B', 'C']}, {'_id': 4, 'amount': 0, 'last_updated': datetime(2021, 2, 25, 3, 50, 31), 'account': {'name': 'Customer4', 'account_number': 4}, 'txns': ['A', 'B', 'C', 'D']}])
スキーマの定義
PyMongoArrow は、クエリ結果セットを表形式にマーシャリングするためにデータ スキーマに依存します。 このスキーマを指定しない場合、PyMongoArrow は のデータから 1 つのスキーマを推論します。 次の例に示すように、 Schema
オブジェクトを作成し、フィールド名を type 指定子にマッピングすることで、スキーマを定義できます。
from pymongoarrow.api import Schema schema = Schema({'_id': int, 'amount': float, 'last_updated': datetime})
MongoDB は、ネストされたデータを表すために埋め込みドキュメントを使用します。 PyMongoArrow は、次のドキュメントに対するファーストクラスのサポートを提供します。
schema = Schema({'_id': int, 'amount': float, 'account': { 'name': str, 'account_number': int}})
PyMongoArrow は、リストとネストされたリストもサポートしています。
from pyarrow import list_, string schema = Schema({'txns': list_(string())}) polars_df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)
Tip
PyMongoArrow には、サポートされている BSON 型ごとに複数の許容される型識別子が含まれています。 これらのデータ型とそれに関連する型識別子の完全なリストについては、「データ型 」を参照してください。
検索操作
次のコード例は、 amount
フィールドの値がゼロ以外のすべてのレコードをpandas.DataFrame
オブジェクトとしてロードする方法を示しています。
df = client.db.data.find_pandas_all({'amount': {'$gt': 0}}, schema=schema)
また、 pyarrow.Table
インスタンスと同じ結果セットを読み込むこともできます。
arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}}, schema=schema)
または、 polars.DataFrame
インスタンスの場合には、次のようにします。
df = client.db.data.find_polars_all({'amount': {'$gt': 0}}, schema=schema)
または NumPy arrays
オブジェクトとして次のようにします。
ndarrays = client.db.data.find_numpy_all({'amount': {'$gt': 0}}, schema=schema)
NumPy を使用する場合、返される値は、キーがフィールド名で値が対応するnumpy.ndarray
インスタンスである辞書です。
注意
前述のすべての例では、次の例に示すように スキーマを省略できます。
arrow_table = client.db.data.find_arrow_all({'amount': {'$gt': 0}})
スキーマを省略すると、PyMongoArrow は最初のバッチに含まれるデータに基づいてスキーマを自動的に適用しようとします。
集計操作
集計操作の実行は検索操作の実行と似ていますが、実行するには一連の操作が必要です。
以下は、すべての_id
値がグループ化され、それらのamount
値が合計された新しいデータフレームを出力するaggregate_pandas_all()
メソッドの簡単な例です。
df = client.db.data.aggregate_pandas_all([{'$group': {'_id': None, 'total_amount': { '$sum': '$amount' }}}])
埋め込みドキュメントに対して集計操作を実行することもできます。 次の例では、ネストされたtxn
フィールドの値を展開し、各値の数をカウントし、降順でソートされた NumPy ndarray
オブジェクトのリストとして結果を返します。
pipeline = [{'$unwind': '$txns'}, {'$group': {'_id': '$txns', 'count': {'$sum': 1}}}, {'$sort': {"count": -1}}] ndarrays = client.db.data.aggregate_numpy_all(pipeline)
Tip
集計パイプラインの詳細については、 MongoDB Server のドキュメント を参照してください。
MongoDB への書き込み
write()
メソッドを使用して、次のタイプのオブジェクトを MongoDB に書込み (write) できます。
Arrow
Table
Pandas
DataFrame
NumPy
ndarray
北極と南極
DataFrame
from pymongoarrow.api import write from pymongo import MongoClient coll = MongoClient().db.my_collection write(coll, df) write(coll, arrow_table) write(coll, ndarrays)
注意
NumPy 配列はdict[str, ndarray]
として指定されます。
Ignore Empty Values
write()
メソッドはオプションでブール値の exclude_none
パラメータを受け入れます。このパラメータを True
に設定すると、ドライバーはデータベースに空の値を書込みません。このパラメータを False
に設定するか空白のままにすると、ドライバーは空のフィールドごとに None
を書き込みます。
次の例のコードでは、Arrow Table
をMongoDBに 2 回書込みます。'b'
フィールドの値の 1 つが None
に設定されています。
write()
メソッドの最初の呼び出しでは exclude_none
パラメータが省略されているため、デフォルトは False
になります。None
を含む Table
内のすべての値は、データベースに書き込まれます。write()
メソッドへの 2 回目の呼び出しでは、exclude_none
が True
に設定されるため、'b'
フィールドの空の値は無視されます。
data_a = [1, 2, 3] data_b = [1, None, 3] data = Table.from_pydict( { "a": data_a, "b": data_b, }, ) coll.drop() write(coll, data) col_data = list(coll.find({})) coll.drop() write(coll, data, exclude_none=True) col_data_exclude_none = list(coll.find({})) print(col_data) print(col_data_exclude_none)
{'_id': ObjectId('...'), 'a': 1, 'b': 1} {'_id': ObjectId('...'), 'a': 2, 'b': None} {'_id': ObjectId('...'), 'a': 3, 'b': 3} {'_id': ObjectId('...'), 'a': 1, 'b': 1} {'_id': ObjectId('...'), 'a': 2} {'_id': ObjectId('...'), 'a': 3, 'b': 3}
他の形式への書き込み
結果セットを読み込んだ後、パッケージがサポートする任意の形式に結果セットを書込むことができます。
たとえば、変数arrow_table
が参照するテーブルをexample.parquet
という名前の Compass ファイルに書き込むには、次のコードを実行します。
import pyarrow.parquet as pq pq.write_table(arrow_table, 'example.parquet')
Pandoraは、CSV や FD などのさまざまな形式へのDataFrame
インスタンスの書き込みもサポートしています。 変数df
が参照するデータフレームをout.csv
という名前の CSV ファイルに書き込むには、次のコードを実行します。
df.to_csv('out.csv', index=False)
Tars API は、前述の 2 つの例が組み合わされています。
import polars as pl df = pl.DataFrame({"foo": [1, 2, 3, 4, 5]}) df.write_parquet('example.parquet')
注意
ネストされたデータは、parquet の読み取りおよび書込み操作でサポートされていますが、CSV の読み取りおよび書込み操作の Arrow または Pandora ではサポートされていません。