Python+Peewee ORM+SQLiteで1億レコード最速insertチャレンジ

イワシの大群が特に大規模になったとき、それをサーディンランと呼び、個体数は数千万とも数億とも数十億ともいわれるのだそうです。そのような生物量がそれほど密集したとき酸素濃度は足りるんだろうかと心配です。

さて、データ処理の一環で億オーダーのレコード数(ディスク上で~100GB)をもつSQLiteテーブルを構築しようということになり、データ自体は生CSVがある状態でこれをなるべく短時間でDBに流し込むという雑なチャレンジをしてみたので、雑な記録をまとめておきました。

できるだけPythonで閉じさせたかったため、C++などで書くという選択肢はなし。
またDBサイズがサイズなのでインメモリではなくファイルに吐き出します。
またスキーマ定義をさくっとやりたい・DB構築後の扱いを楽にしたいということで、PythonベースのORM Peeweeを使用することにしています。なおPeeweeについて詳細は触れないですが、(ユーザビリティ的な意味で)とても軽量でありまたPythonicであるので個人的にはとても好きです。selectのwhereの比較演算子を文字列で渡すとかしなくていいのです。

DB関連は全くの素人なので、もっといい方法があったらぜひアドバイスをいただきたいです。

完全なコードをGist上のipynbにまとめています。

環境条件

  • OS: Ubuntu 18.04
  • SQLite: 3.22.0 (Ubuntuパッケージのもの)
  • Python: 3.7.1
  • Peewee ORM: 3.8.1
  • CPU: i7 8700K
  • RAM: 16GB
  • SSD: Crucial m4 512GB (SATAII)

モデル(テーブル)定義

テスト用に、自分のユースケースに似ていてシンプル化したスキーマを用意してみました。

主キーの他に2つのIntegerFieldをもつモデルを定義。 一方はそれらのカラムそれぞれに対して・およびペアでインデクスを張るモデル、もう一方はインデクスを張らないモデル。

import peewee

from peewee import IntegerField
from peewee import Model
from peewee import SqliteDatabase

from peewee import chunked

db = SqliteDatabase(None)

class NoIndexedTable(Model):
    value_1 = IntegerField(unique=False)
    value_2 = IntegerField(unique=False)
    
    class Meta:
        database = db
        
    def __str__(self):
        return "{}-{}".format(self.value_1, self.value_2)
        
class IndexedTable(Model):
    value_1 = IntegerField(index=True, unique=False)
    value_2 = IntegerField(index=True, unique=False)
    
    class Meta:
        database = db
        
        indexes = ((("value_1", "value_2"), True),)
        
    def __str__(self):
        return "{}-{}".format(self.value_1, self.value_2)

DB初期化

DBは最初にこのようにして初期化。ディスク同期などのオーバヘッドを取り除くためそのようなオプションを使用。

import os
os.remove('tmp.db')
db.init('tmp.db', pragmas={'synchronous': 0, 'journal_mode': 'memory'})
db.connect()
db.create_tables([NoIndexedTable, IndexedTable])

実験条件の設定

1億件のinsertをベンチマーク。 SQLiteでは1クエリには最大999変数まで含められ、各モデルの1レコードのinsertに2変数を保存できるため、1クエリ(チャンク)サイズは最大で499である。ここではchunk_sizeとして400を設定。

レコードの各カラムの値としてランダム値を用い、その最大値をmax_valueとする。

N = 1000 * 1000 * 100
chunk_size = 400
max_value = N ** 2

(1) ベースライン:Peewee ORMの通常のインタフェースでのバッチinsert

両方のモデルにレコードをN件追加するコード。

なお多数のレコードをinsertしようとする人がトランザクションを使用しない・バッチinsertをしない、という選択肢はありえないため、それらは比較対象にしない。

値の生成方法として(1a)dictで生成するか、(1b)tupleで生成するか、をtplオプションで指定できる。

def insert_records_peewee_standard_api(table, tpl=False):
    with db.atomic(), tqdm(total=N) as pbar:
        for _ in range(int(math.ceil(N / chunk_size))):
            if tpl:
                table.insert_many([(random.randint(0, max_value), random.randint(0, max_value))
                                   for _ in range(chunk_size)],
                                  fields=[NoIndexedTable.value_1, NoIndexedTable.value_2]).execute()
            else:
                table.insert_many([{'value_1': random.randint(0, max_value),
                                    'value_2': random.randint(0, max_value)}
                                  for _ in range(chunk_size)]).execute()
            pbar.update(chunk_size)
    assert table.select().count() == N
Table Method Wall Time Average speed
NoIndexed (1a) Baseline (dict) 19min 18s 86356 records/s
NoIndexed (1b) Baseline (tuple) 18min 10s 91743 records/s
Indexed (1a) Baseline (dict) 54min 54s 30358 records/s
Indexed (1b) Baseline (tuple) 53min 56s 30902 records/s

(2) インデクスを一旦無効化して再有効化

アクロバティックでかつあまり安全ではない方法。

インデクスの更新をレコード追加のたびに行うのはオーバヘッドが大きいため、一旦インデクスを削除した状態でレコードを追加した上でインデクスを張り直す。まとめてインデクスを再構築したほうが、レコード追加のたびに更新を行うより効率がいいという仮説に基づく方法。

Peeweeの拡張モジュールplayhouseにあるDBマイグレーションAPIを用いて、インデクスの削除および再構築を行う。

※DBをオンラインに保った状態でこれをするのは危険もしくは動かない。

from playhouse.migrate import SqliteMigrator, migrate
migrator = SqliteMigrator(db)

def drop_index_insert_remake_index():
    table_name = IndexedTable._meta.table_name
    with db.atomic():
        migrate(
            migrator.drop_index(table_name, table_name + '_value_1'),
            migrator.drop_index(table_name, table_name + '_value_2'),
            migrator.drop_index(table_name, table_name + '_value_1_value_2'),
        )

    insert_records_peewee_standard_api(IndexedTable, tpl=True)
    
    with db.atomic():
        migrate(
            migrator.add_index(table_name, ('value_1',), False),
            migrator.add_index(table_name, ('value_2',), False),
            migrator.add_index(table_name, ('value_1', 'value_2'), True),
        )
    assert IndexedTable.select().count() == N
Table Method Wall Time Average speed
NoIndexed (1a) Baseline (dict) 19min 18s 86356 records/s
NoIndexed (1b) Baseline (tuple) 18min 10s 91743 records/s
Indexed (1a) Baseline (dict) 54min 54s 30358 records/s
Indexed (1b) Baseline (tuple) 53min 56s 30902 records/s
Indexed (2) Drop index->insert->Reindex 20min 45s 80321 records/s

インデクスのあるテーブルに対して、53分56秒→20分45秒(2.6倍)高速化。

(3) SQLクエリ発行の効率化

前述の方法とは直交するアプローチ。

極端に大量のレコードを一括で挿入する場合、ORMそのもののオーバヘッドが速度の制約となる可能性がある。そこで、ORMの各種の利益を一時的に享受できなくなることを理解した上で、モデルをすっ飛ばして直接DBにinsertする。

(3-1) モデルを経由せずSQLiteへのクエリを直接Peeweeから実行

DBへのコネクション、(テーブルが存在しない場合の)テーブルの用意やトランザクションなどはPeeweeに乗っかれるため、それほど大変ではない。

def insert_records_peewee_execute_sql(table):
    with db.atomic():
        query = 'insert into {}(value_1, value_2) values (?,?)'.format(table._meta.table_name)
        for _ in tqdm(range(N)):
            db.execute_sql(query, (random.randint(0, max_value), random.randint(0, max_value)))
    assert table.select().count() == N
Table Method Wall Time Average speed
NoIndexed (1a) Baseline (dict) 19min 18s 86356 records/s
NoIndexed (1b) Baseline (tuple) 18min 10s 91743 records/s
Indexed (1a) Baseline (dict) 54min 54s 30358 records/s
Indexed (1b) Baseline (tuple) 53min 56s 30902 records/s
Indexed (2) Drop index->insert->Reindex 20min 45s 80321 records/s
NoIndexed (3-1) Raw SQL through ORM 7min 23s 225734 records/s
Indexed (3-1) Raw SQL through ORM 39min 47s 41893 records/s

(1)で示した標準APIで追加する場合では、インデクスなしで18分、インデクスありで約54分であり、その差は約36分であった。 ここで示した直接SQL実行でのインデクスなしとありの差は32分であった。 このおよそ30分強がインデクスの更新で消費されていると言えそう。1要素あたりにならすと18マイクロ秒程度であるが1億件insertでは無視できない。

(3-2) Pythonのsqlite3を直接叩いてinsert

上記の直接SQL発行では1要素ずつクエリを発行していた。これは現在のPeeweeのAPIの制約によるものであるが、SQLiteそれ自体は複数レコードを一度にinsertできる。

なおこのときPeeweeで開いたDBは一旦閉じる必要がある。

import sqlite3

def insert_records_direct_sqlite(table):
    con = sqlite3.connect("tmp.db")
    c = con.cursor()
    query = "insert into {}(value_1, value_2) values (?,?)".format(table._meta.table_name)
    c.executemany(query, [(random.randint(0, max_value), random.randint(0, max_value))
                          for _ in tqdm(range(N))])
    con.commit()
    con.close()
Table Method Wall Time Average speed
NoIndexed (1a) Baseline (dict) 19min 18s 86356 records/s
NoIndexed (1b) Baseline (tuple) 18min 10s 91743 records/s
Indexed (1a) Baseline (dict) 54min 54s 30358 records/s
Indexed (1b) Baseline (tuple) 53min 56s 30902 records/s
Indexed (2) Drop index->insert->Reindex 20min 45s 80321 records/s
NoIndexed (3-1) Raw SQL through ORM 7min 23s 225734 records/s
Indexed (3-1) Raw SQL through ORM 39min 47s 41893 records/s
NoIndexed (3-2) Direct SQLite 4min 2s 413223 records/s
Indexed (3-2) Raw SQL through ORM 53min 23s 31221 records/s

インデクスのないテーブルの場合は、execute_sqlを呼ぶ場合に比べてさらに大幅に高速化(7分23秒→4分2秒、差は約3分強)。

当然、インデクスのあるテーブルの場合は全く恩恵に与れない(ボトルネックはSQL発行ではないため)。

(4) これらのテクニックを組み合わせる

(2)で示したように、インデクスを一旦削除しレコードを追加した上でインデクス再構築することが(安全性と引き換えに)パフォーマンス上有効。

また(3)で示したように、ORMをすっとばしてレコードを追加することがパフォーマンス上有効。

これらを組み合わせて最速insertを目指す。

Peeweeにおける汎用的なindexの削除・再構築コード

index名を手打ちしてPeeweeのマイグレータに投げるのは能率が悪いので、これを自動化する。

from playhouse.migrate import SqliteMigrator, migrate

class drop_and_recreate_index:
    def __init__(self, table):
        self.table = table
        self.table_name = table._meta.table_name
        self.migrator = SqliteMigrator(table._meta.database)
        
        self.indexed_columns = [(name, col.unique)
                                for name, col in self.table._meta.columns.items()
                                if col.index]
        
        # [(('value_1', 'value_2'), True), ...]
        self.multi_column_indexes = self.table._meta.indexes

    def drop_index(self):
        with self.table._meta.database.atomic():
            # Drop column indexes
            for name, _ in self.indexed_columns:
                idx_name = self.table_name + '_' + name
                migrate(migrator.drop_index(self.table_name, idx_name))
                
            # Drop multi-column indexes
            for columns, _ in self.multi_column_indexes:
                idx_name = "_".join([self.table_name] + list(columns))
                migrate(migrator.drop_index(self.table_name, idx_name))

    def recreate_index(self):
        with self.table._meta.database.atomic():
            # Recreate column indexes
            for name, unique in self.indexed_columns:
                migrate(migrator.add_index(self.table_name, (name,), unique))
            
            # Recreate multi-column indexes
            for (columns, unique) in self.multi_column_indexes:
                migrate(migrator.add_index(self.table_name, columns, unique))

    def __enter__(self):
        self.drop_index()
    
    def __exit__(self, exception_type, exception_value, traceback):
        self.recreate_index()

このようなコードで、下記のように使える。

with drop_and_recreate_index(TableName):
    # insert records to DB

なお途中で失敗した場合にインデクスがない状態のままDBが残る危険があることなど実装としては十分安全に詰められていないので実際に使う場合は注意されたい。

(4-1) execute_sql + index drop/rebuildでのレコード挿入

上で定義済みのinsert_records_peewee_execute_sqlを再利用。

def insert_records_peewee_execute_sql_drop_index():
    with drop_and_recreate_index(IndexedTable):
        insert_records_peewee_execute_sql(IndexedTable)
    assert IndexedTable.select().count() == N

(4-2) sqlite3直叩き + index drop/rebuildでのレコード挿入

def insert_records_direct_sqlite_drop_index():
    with drop_and_recreate_index(IndexedTable):
        insert_records_direct_sqlite(IndexedTable)
    assert IndexedTable.select().count() == N
Table Method Wall Time Average speed
NoIndexed (1a) Baseline (dict) 19min 18s 86356 records/s
NoIndexed (1b) Baseline (tuple) 18min 10s 91743 records/s
Indexed (1a) Baseline (dict) 54min 54s 30358 records/s
Indexed (1b) Baseline (tuple) 53min 56s 30902 records/s
Indexed (2) Drop index->insert->Reindex 20min 45s 80321 records/s
NoIndexed (3-1) Raw SQL through ORM 7min 23s 225734 records/s
Indexed (3-1) Raw SQL through ORM 39min 47s 41893 records/s
NoIndexed (3-2) Direct SQLite 4min 2s 413223 records/s
Indexed (3-2) Raw SQL through ORM 53min 23s 31221 records/s
Indexed (4-1) (2)+(3-1) 9min 57s 167504 records/s
Indexed (4-2) (2)+(3-2) 6min 33s 254453 records/s

インデクスをもつモデルでも1億件のinsertを6分33秒で完了(インデクス破棄・再構築の時間を全て含む)、平均して254000レコード毎秒を達成。通常のバッチ+トランザクションでのinsertでは3.1万レコード毎秒が最速であったので、8倍以上の高速化を達成。

インデクスをもたないモデルの場合でも、9.2万レコード毎秒であったものが41万レコード毎秒と4倍以上の高速化。

ORMはもちろん柔軟性や安全性を担保するために必要な各種の仕組みのために細かなオーバヘッドが生じているが、DBが壊れるリスクを承知の上、また挿入する値がアプリケーションロジック上正当であることが確実である場合でありかつ極端に多数のinsertを行う場合については、これらのテクニックが有効。

インデクスがあまり多く張られていない場合は、このような危険をおかす必要は通常ないです。


これで実際の自分の例でも(もう少し複雑なスキーマの)DBに約2億件程度のレコードを正味1時間以下でinsertできるようになりました。自分のユースケースではその時点でSQLiteボトルネックではなくなったので、もう深追いしなくて良さそう(SQLiteのビルドオプション(スレッドセーフ周りとか)でまだ危険を承知で速くする余地はありそうです)。

Peewee ORMを使いますと言いながら、最終的にPeeweeを介さないのが一番速かったですねという元も子もない結果に。同じ作者さんのSweepeaというのもありますが試していません。たぶんC++にしたらさらに速くなる(やらないよ!ぜったいやらないよ!)

2 thoughts on “Python+Peewee ORM+SQLiteで1億レコード最速insertチャレンジ

  1. C++ + python にて C++とのレコード引き渡しについて
    クライアント側の他言語からC++(DLL)を使い WEB側のpythonを経由してsqliteLのレコードのやり取りができないものでしょうか? 

    1. ご説明だけでは何をしたいのか(何が何をどう触りたいのか)が正直よくわかりません。
      記事の主題とかけ離れたopen questionに見えるので、ご自身のなかでまず具体的に何がわかっていないのかまでブレイクダウンしてstackoverflowなどで聞いてみることをおすすめします。
      Windowsは使っていないので私では意味のあるお答えはできなそうです。

コメントを残す

メールアドレスが公開されることはありません。