さて、世界70億人のPythonプログラマが手放せないのが、tqdmというプログレスバーを簡単に表示できるようにしてくれるライブラリです。
しかしながらtqdmは(当然と言えば当然ですが)複数のプロセスから1つのプログレスバーを管理することができるような設計にはなっていません。
たとえば高速化のためにmultiprocessingを用いて並列処理をしながらプログレスバーを出したいとき、tqdmはそのままでは使えないことになります。
例えば、下記のようなコードで単にtqdmのインスタンスを子プロセスに渡して中でupdateをすると…
import tqdm import multiprocessing def worker(pbar): pbar.update() ar = [i for i in range(100)] with tqdm.tqdm(ar) as pbar: jobs = [] for t in ar: job = multiprocessing.Process(target=worker, args=(pbar,)) job.start() jobs.append(job) for job in jobs: job.join()
実行結果はこんな感じ。
% python usage.py 1%|▉ | 1/100 [00:00<00:25, 3.89it/s]
こんなふうに、100%まで行ってくれません。
端的に説明すると、100個forkされた内部カウント0の状態をそれぞれ+1するのが100回走るだけ、だからです。
じゃあ内部カウントが常によろしく増えるために、内部カウントを無視して共有メモリで常に正しくカウントしてればいいよね、っていう単純な発想による汚いハックをしてみました。
tqdmをprocess safeっぽくするdirty hack
tqdmをラップするコード(process_safe_tqdm.py)およびそれを使ってmultiprocessingから並列にカウントアップするコード(usage.py)はこちら。
なおtqdm 4.11.2、python 3.6.0で動作確認しています。
実行結果はこんなかんじ。ちゃんと100%まで行ってくれるし、画面の破綻等もありません。
% python usage.py 100%|███████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [00:00<00:00, 343.01it/s]
解説
基本的にやっていることは最初に述べたとおりで、
「tqdmが内部に持っているカウントを無視し、プロセス間共有変数に基づく真のカウントを別途用意してそれに基づいてプログレスバーを管理する」
という説明に尽きます。
これをするため、updateの中では、真カウントでtqdm内部カウントを強制的に上書きしたあと画面の更新をし、その後真のカウントも更新させるようにしています。
真のカウントにはmultiprocessing.Valueを使います。画面の破綻や真のカウントのズレを防ぐため、multiprocess.Valueの持つロック機能を使って排他的に走るようにしています。
tqdmでは画面更新の頻度を抑えるため適当にupdateの処理を無視している(_tqdm.py#L930など)のですが、一番最後はちゃんと100%に到達させるための処理がcloseに書かれています(_tqdm.py#L1023)。
この中でも内部カウントを呼んでるので、正しいカウントで上書きするようcloseをオーバーライドすることで期待通り動くようにしています。
使える場面
主にmultiprocessingで重たい処理を数十〜数百分割し子プロセスに分担させる場合に使えます。ラッパー導入によるオーバヘッドがあるので1ループあたりの計算粒度を小さくしすぎないことは大事ですが、それよりはforkしすぎることのオーバヘッドに注意していればOKです。
このラッパーはtqdmをそのまま置き換えて使えます(シングルプロセスでも動きます)。特に、updateを呼ばず__iter__()で呼ぶ場合(iterable based)は、tqdmを直に叩いているのと全く同じでオーバーヘッドはありません。updateを叩く場合(manual、マルチプロセス化する場合はこっちになります)でシングルプロセスの場合はオーバヘッドを避けてtqdmを直に使うほうがいいです。
注意
tqdmのバーの右に出てくるイテレーションの速度は、信頼できなくなります。
tqdmが内部で持っている前回更新からの時間については正しく管理していないためです。
joblibと組み合わせて使えるととても便利なのですが、tqdmのインスタンスがpickle化できない(lambdaをもつなど)ため、残念ながら今のところは手書きでゴリゴリとmultiprocessingしなければいけません。
If you just need a progressbar for MapReduce-like logic; a vanilla tqdm SHOULD BE ENOUGH.
下記は僕の作法です、普通のtqdmとas_completed()の弁用で、進捗を把握もできます。
“`python
from os import cpu_count
from concurrent.futures import ProcessPoolExecutor, as_completed
tasks = range(9)
def work(i):
try:
time.sleep(random.uniform(1, 2))
if i == 3: # 人为抛个异常
1 / 0
return i
except Exception as ex: # 在worker进程内部捕获异常,外面会比较好处理
return ex
finally:
print(‘done job-%d’, i)
if __name__ == ‘__main__’:
max_workers = max(1, cpu_count() – 1)
with ProcessPoolExecutor(max_workers=max_workers) as executor:
futures = [executor.submit(work, n) for n in tasks]
# 按照完成顺序打印一遍;同时刷新进度条
# 注意 as_complete() 不要放到 executor.__exit__()的后面
for f in tqdm.tqdm(as_completed(futures), desc=’sleep’, total=len(tasks)):
print(f.result()) # 尝试调用xxx.result()时,会把worker进程内部的异常抛出来;这里已经在内部捕获了,就没事
# 按照提交顺序获取所有结果,此时不动进度条了,可以放到executor外面
print([f.result() for f in futures])
“`