これは、なにをしたくて書いたもの?
Pythonで、マルチスレッドに関する標準ライブラリーを知っておきたいなと思いまして。
ちなみにスレッド自体は過去にも扱っています。
Pythonのスレッドは、ネイティブスレッドなのか? - CLOVER🍀
PythonのTCPServer/HTTPServerをマルチスレッドで使う - CLOVER🍀
今回はロックやセマフォといったスレッド間に関する標準ライブラリーを見ていきます。
threadingライブラリー
threadingライブラリーのページはこちら。
threading --- スレッドベースの並列処理 — Python 3.10.15 ドキュメント
こちらには以下のAPIやクラスが含まれています。
- スレッドローカルデータ … スレッドごとに固有の値を設定する
- Lock … ロック、アンロックが可能で、特定のスレッドがロックを獲得している時に他のスレッドがロックを獲得しようとすると、先に獲得されたロックがアンロックされるまで待機する
- RLock … 再入可能ロック(Reentrant Lock)。Lockと異なり同じスレッドが再帰的にロックを獲得可能
- Condition … ロックに関連付けられたうえで、
wait
/notify
(notify_all
)でスレッドの待機/起動を操作できるオブジェクト
- Semaphore … いわゆるセマフォで、ある範囲に対して同時に実行できるスレッド数を制限する仕組み
- Event … あるスレッドがイベントを発信し、他のスレッドはイベントの発信を待つというスレッド間通信を行う仕組み
- Timer … 一定時間後にスレッドを実行する仕組み
- Barrier … 複数のスレッドの待ち合わせを行う仕組み
なお、Lock
、RLock
、Condition
、Semaphore
はコンテキストマネージャーとして使えます。
with 文でのロック・条件変数・セマフォの使い方
具体的にはロックの獲得をacquire
で行い、解放にrelease
を使うものはコンテキストマネージャーとして使えるようになっていて、
with
ブロックに入る時にacquire
が呼び出されwith
ブロックを抜ける時にrelease
が呼び出されます。
ところで、PythonにおけるスレッドはGILがあるので1プロセス内で同時に実行できるスレッドはひとつだけです。Pythonでマルチスレッドが
有効なのはIOバウンドな処理を並列して実行したい時ですね。
CPython 実装の詳細: CPython は Global Interpreter Lock のため、ある時点で Python コードを実行できるスレッドは1つに限られます (ただし、いくつかのパフォーマンスが強く求められるライブラリはこの制限を克服しています)。アプリケーションにマルチコアマシンの計算能力をより良く利用させたい場合は、 multiprocessing モジュールや concurrent.futures.ProcessPoolExecutor の利用をお勧めします。 ただし、I/Oバウンドなタスクを並行して複数走らせたい場合においては、 マルチスレッドは正しい選択肢です。
ちなみにスレッド自体を直接扱うのではなく、concurrent.futures
のThreadPoolExecutor
を使うのがよいと思います。
concurrent.futures -- 並列タスク実行 — Python 3.10.15 ドキュメント
今回はこのあたりを試してみたいと思います。
環境
今回の環境はこちら。
$ python3 --version
Python 3.10.12
$ pip3 --version
pip 22.0.2 from /usr/lib/python3/dist-packages/pip (python 3.10)
準備
確認はpytestで行いたいと思います。型チェックにmypyも入れておきます。
$ pip3 install pytest mypy
インストールされたライブラリーの一覧。
$ pip3 list
Package Version
----------------- -------
exceptiongroup 1.2.2
iniconfig 2.0.0
mypy 1.13.0
mypy-extensions 1.0.0
packaging 24.2
pip 22.0.2
pluggy 1.5.0
pytest 8.3.3
setuptools 59.6.0
tomli 2.1.0
typing_extensions 4.12.2
動作確認はpytestを使ったテストコードで行いますが、雛形はこちらです。
tests/test_threading.py
from concurrent.futures import ThreadPoolExecutor
import datetime
import threading
import time
def log(message: str) -> None:
print(f"[{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {threading.current_thread().name} - {message}")
またテスト中に標準出力への書き出しを行うので、pytestは--capture=no
オプションを指定して実行します。
$ pytest --capture=no
では、試していってみましょう。
スレッドローカルデータ
最初はスレッドローカルデータから。
スレッドローカルデータ
スレッドローカルデータは、そのスレッド固有のデータを持たせる仕組みです。あたかも単一スレッド前提のような使い方をするコードで
複数スレッドで実行しても、それぞれのデータが独立して扱えるので便利です。
サンプルコード。
def test_thread_local_data() -> None:
results = {}
localdata = threading.local()
def thread1() -> None:
time.sleep(3)
localdata.mydata = "Hello from thread1"
time.sleep(2)
log(f"thread1 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread1"
results[threading.current_thread().name] = "done"
def thread2() -> None:
time.sleep(2)
localdata.mydata = "Hello from thread2"
time.sleep(3)
log(f"thread2 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread2"
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(thread1))
futures.append(executor.submit(thread2))
[f.result() for f in futures]
assert len(results) == 2
スレッドローカルデータは、threading.local
で取得したオブジェクトで表現されます。
localdata = threading.local()
スレッドローカルデータは辞書のように扱えます。
同じオブジェクトに各スレッドが同じ属性に書き込んでいますが、それぞれのスレッドが設定した値がしっかり残っています。
def thread1() -> None:
time.sleep(3)
localdata.mydata = "Hello from thread1"
time.sleep(2)
log(f"thread1 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread1"
results[threading.current_thread().name] = "done"
def thread2() -> None:
time.sleep(2)
localdata.mydata = "Hello from thread2"
time.sleep(3)
log(f"thread2 data = {localdata.mydata}")
assert localdata.mydata == "Hello from thread2"
results[threading.current_thread().name] = "done"
標準出力の結果。
[2024-11-16 20:32:29] ThreadPoolExecutor-0_0 - thread1 data = Hello from thread1
[2024-11-16 20:32:29] ThreadPoolExecutor-0_1 - thread2 data = Hello from thread2
このように、スレッドごとに固有の値を管理できる仕組みです。
ちなみにスレッドローカルデータはlocal
を継承することで独自のスレッドローカルデータを作れたりするのですが、ドキュメントにほとんど
説明がありません。APIの説明もないですね。
詳しくはソースコードを見ること、だそうです。
詳細と例題については、 _threading_local モジュールのドキュメンテーション文字列を参照してください。
https://github.com/python/cpython/blob/v3.10.12/Lib/_threading_local.py
Lock、RLock
次はLock
とRLock
です。
まずはLock
から。
def test_lock() -> None:
lock = threading.Lock()
results = {}
def with_lock() -> None:
log("try lock")
lock.acquire()
try:
log("start")
time.sleep(2)
log("end")
results[threading.current_thread().name] = "done"
finally:
lock.release()
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_lock))
futures.append(executor.submit(with_lock))
[f.result() for f in futures]
assert len(results) == 2
Lock
を作成して
lock = threading.Lock()
Lock#acquire
でロックを獲得できます。ロックを獲得できるスレッドはひとつだけで、他のスレッドがLock#acquire
を呼び出した場合は
Lock#release
でロックが解放されるまで待たされることになります。
def with_lock() -> None:
log("try lock")
lock.acquire()
try:
log("start")
time.sleep(2)
log("end")
results[threading.current_thread().name] = "done"
finally:
lock.release()
なのでfinally
で確実にロックを解放する必要があります。
標準出力に書き出された結果を見ると、最初にロックを取得したスレッドがロックを開放するまで2つ目のスレッドが待たされているのが
確認できます。
[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - try lock
[2024-11-16 20:41:09] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:41:09] ThreadPoolExecutor-0_1 - try lock
[2024-11-16 20:41:11] ThreadPoolExecutor-0_0 - end
[2024-11-16 20:41:11] ThreadPoolExecutor-0_1 - start
[2024-11-16 20:41:13] ThreadPoolExecutor-0_1 - end
なお、Lock
はコンテキストマネージャーに対応しているのでwith
を使ってシンプルに書くことができます。
with lock:
log("start")
time.sleep(2)
log("end")
results[threading.current_thread().name] = "done"
こちらの方がLock#release
の呼び出し忘れなどがなくてよいでしょう。以降はwith
でロックを扱います。
なお、Lock
を使ったロックの場合、ロックを獲得したスレッドであってもロック解放前にLock#acquire
を呼び出した場合はロックを取得できず
待たされることになります。
つまり、以下のようなコードを書いてしまうとロックを取得したスレッドが止まってしまいます。
def test_lock_reentrant() -> None:
lock = threading.Lock()
results = {}
def with_lock() -> None:
log("try lock")
with lock:
log("start")
time.sleep(2)
log("reentrant lock")
with lock:
log("do something")
log("release reentrant lock")
log("end")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_lock))
futures.append(executor.submit(with_lock))
[f.result() for f in futures]
assert len(results) == 2
実行した場合は、標準出力がここで停止します。
[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - try lock
[2024-11-16 20:44:08] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:44:08] ThreadPoolExecutor-0_1 - try lock
[2024-11-16 20:44:10] ThreadPoolExecutor-0_0 - reentrant lock
つまり、Lock
は最入可能ではありません。
最入可能なロックが必要な場合はRLock
を使います。
先程のコードをRLock
を使って書き直したものがこちらです。
def test_rlock() -> None:
lock = threading.RLock()
results = {}
def with_lock() -> None:
with lock:
log("start")
time.sleep(2)
log("reentrant lock")
with lock:
log("do something")
log("release reentrant lock")
log("end")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_lock))
futures.append(executor.submit(with_lock))
[f.result() for f in futures]
assert len(results) == 2
Lock
でインスタンスを作成していたところをRLock
にするだけで、あとはLock
と使い方は同じですね。
lock = threading.RLock()
ただしRLock
は最入可能なので、先ほどはLock
で動作しなかったひとつのスレッドが同じロックインスタンスに対して2回acquire
を
呼び出すようなコードであっても
def with_lock() -> None:
with lock:
log("start")
time.sleep(2)
log("reentrant lock")
with lock:
log("do something")
log("release reentrant lock")
log("end")
results[threading.current_thread().name] = "done"
このように動くようになります。
[2024-11-16 20:50:49] ThreadPoolExecutor-0_0 - start
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - reentrant lock
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - do something
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - release reentrant lock
[2024-11-16 20:50:51] ThreadPoolExecutor-0_0 - end
[2024-11-16 20:50:51] ThreadPoolExecutor-0_1 - start
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - reentrant lock
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - do something
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - release reentrant lock
[2024-11-16 20:50:53] ThreadPoolExecutor-0_1 - end
Condition
Conditionはロックに関連付けられるオボジェクトで、スレッドを待機させたり起こしたりできます。
サンプルはこちら。
def test_condition() -> None:
condition = threading.Condition()
results = {}
def wait_task() -> None:
with condition:
log("waiting...")
condition.wait()
log("wakeup")
results[threading.current_thread().name] = "done"
def notify_task() -> None:
with condition:
log("notify")
condition.notify_all()
log("done")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(wait_task))
futures.append(executor.submit(wait_task))
time.sleep(3)
futures.append(executor.submit(notify_task))
[f.result() for f in futures]
assert len(results) == 3
Condition
はコンストラクターでインスタンスを取得しますが、引数を指定しない場合は内部的にRLock
のインスタンスを作成します。
condition = threading.Condition()
引数を指定する場合は、Lock
またはRLock
のインスタンスを渡す必要があります。
Condition#wait
でスレッドを待機させます。Condition
に対する操作は、ロックを獲得したうえで行う必要があります。
def wait_task() -> None:
with condition:
log("waiting...")
condition.wait()
log("wakeup")
results[threading.current_thread().name] = "done"
そしてCondition#nofity
またはCondition#notify_all
で待機しているスレッドを起こすことができます。
def notify_task() -> None:
with condition:
log("notify")
condition.notify_all()
log("done")
results[threading.current_thread().name] = "done"
Condition#nofity
ではひとつまたは指定した数のスレッドを、Condition#notify_all
では待機しているスレッドすべてを起こすことができます。
標準出力の結果はこちら。
[2024-11-16 20:59:00] ThreadPoolExecutor-0_0 - waiting...
[2024-11-16 20:59:00] ThreadPoolExecutor-0_1 - waiting...
[2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - notify
[2024-11-16 20:59:03] ThreadPoolExecutor-0_2 - done
[2024-11-16 20:59:03] ThreadPoolExecutor-0_0 - wakeup
[2024-11-16 20:59:03] ThreadPoolExecutor-0_1 - wakeup
ちなみに、Condition#wait_for
という引数に指定した関数の戻り値がTrue
になるとスレッドが起きるようにするAPIもあるようです。
Semaphore
Semaphore
は、ある範囲を同時に実行できるスレッドの数を制限する仕組みです。
Semaphore
サンプルコードはこちら。
def test_semaphore() -> None:
semaphore = threading.Semaphore(2)
results = {}
def with_semaphore() -> None:
log("acquire semaphore")
with semaphore:
log("enter semaphore")
time.sleep(2)
log("leave semaphore")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(with_semaphore))
futures.append(executor.submit(with_semaphore))
futures.append(executor.submit(with_semaphore))
futures.append(executor.submit(with_semaphore))
[f.result() for f in futures]
assert len(results) == 4
Semaphore
は、コンストラクターに同時に実行できるスレッド数を指定してインスタンスを生成します。ここでは2を指定しています。
semaphore = threading.Semaphore(2)
あとはLock
やRLock
のようにロックしたい範囲を指定して使います。
def with_semaphore() -> None:
log("acquire semaphore")
with semaphore:
log("enter semaphore")
time.sleep(2)
log("leave semaphore")
results[threading.current_thread().name] = "done"
今回は4つのスレッドを実行しているのですが、最初に入った2つのスレッドのどちらかが抜けるまでは3つ目、4つ目のスレッドはロックを
獲得できず待機しています。
[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_0 - enter semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_1 - enter semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_2 - acquire semaphore
[2024-11-16 21:02:53] ThreadPoolExecutor-0_3 - acquire semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_0 - leave semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_2 - enter semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_1 - leave semaphore
[2024-11-16 21:02:55] ThreadPoolExecutor-0_3 - enter semaphore
[2024-11-16 21:02:57] ThreadPoolExecutor-0_3 - leave semaphore
[2024-11-16 21:02:57] ThreadPoolExecutor-0_2 - leave semaphore
Event
Event
は、Event
というオブジェクトを仲介してあるスレッドがイベントを発信した時に、Event
からの通知を待っているスレッドを起動する
しくみです。
Event
サンプルコードはこちら。
def test_event() -> None:
event = threading.Event()
results = {}
def wait_event() -> None:
log("wait...")
event.wait()
log("wake up")
results[threading.current_thread().name] = "done"
def set_event() -> None:
log("before set event")
time.sleep(2)
event.set()
log("after set event")
results[threading.current_thread().name] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(wait_event))
futures.append(executor.submit(wait_event))
time.sleep(2)
futures.append(executor.submit(set_event))
[f.result() for f in futures]
assert len(results) == 3
Event
の作成。
event = threading.Event()
待機するスレッドは、Event#wait
で通知を待ちます。
def wait_event() -> None:
log("wait...")
event.wait()
log("wake up")
results[threading.current_thread().name] = "done"
そしてイベントを送るスレッドは、Event#set
で待機しているスレッドをEvent#wait
から抜けさせることができます。
def set_event() -> None:
log("before set event")
time.sleep(2)
event.set()
log("after set event")
results[threading.current_thread().name] = "done"
実行結果。
[2024-11-16 21:08:51] ThreadPoolExecutor-0_0 - wait...
[2024-11-16 21:08:51] ThreadPoolExecutor-0_1 - wait...
[2024-11-16 21:08:53] ThreadPoolExecutor-0_2 - before set event
[2024-11-16 21:08:55] ThreadPoolExecutor-0_2 - after set event
[2024-11-16 21:08:55] ThreadPoolExecutor-0_0 - wake up
[2024-11-16 21:08:55] ThreadPoolExecutor-0_1 - wake up
2つのスレッドがEvent#set
を待っていることがわかります。
Barrier
Barrier
を使うと、複数のスレッドの待ち合わせができるようになります。
Barrier
サンプルコードはこちら。
def test_barrier() -> None:
barrier = threading.Barrier(3)
results = {}
def thread1() -> None:
log("thread1 waiting 3sec...")
time.sleep(3)
barrier.wait()
log("thread1 wakeup")
results["thread1"] = "done"
def thread2() -> None:
log("thread2 waiting 2sec...")
time.sleep(2)
barrier.wait()
log("thread2 wakeup")
results["thread2"] = "done"
def thread3() -> None:
log("thread3 waiting 5sec...")
time.sleep(5)
barrier.wait()
log("thread3 wakeup")
results["thread3"] = "done"
with ThreadPoolExecutor() as executor:
futures = []
futures.append(executor.submit(thread1))
futures.append(executor.submit(thread2))
futures.append(executor.submit(thread3))
[f.result() for f in futures]
assert results["thread1"] == "done"
assert results["thread2"] == "done"
assert results["thread3"] == "done"
Barrier
は、コンストラクターに待ち合わせるスレッドの数を指定してインスタンスを生成します。
barrier = threading.Barrier(3)
あとはBarrier#wait
を呼び出すとそこで待機し、コンストラクターに指定した数のスレッドがBarrier#wait
の呼び出しに到達すると
動き始めます。
def thread1() -> None:
log("thread1 waiting 3sec...")
time.sleep(3)
barrier.wait()
log("thread1 wakeup")
results["thread1"] = "done"
def thread2() -> None:
log("thread2 waiting 2sec...")
time.sleep(2)
barrier.wait()
log("thread2 wakeup")
results["thread2"] = "done"
def thread3() -> None:
log("thread3 waiting 5sec...")
time.sleep(5)
barrier.wait()
log("thread3 wakeup")
results["thread3"] = "done"
つまり、こういう動作結果になります。
[2024-11-16 21:13:53] ThreadPoolExecutor-0_0 - thread1 waiting 3sec...
[2024-11-16 21:13:53] ThreadPoolExecutor-0_1 - thread2 waiting 2sec...
[2024-11-16 21:13:53] ThreadPoolExecutor-0_2 - thread3 waiting 5sec...
[2024-11-16 21:13:58] ThreadPoolExecutor-0_2 - thread3 wakeup
[2024-11-16 21:13:58] ThreadPoolExecutor-0_1 - thread2 wakeup
[2024-11-16 21:13:58] ThreadPoolExecutor-0_0 - thread1 wakeup
最後のスレッドがBarrier#wait
を呼び出すまで他のスレッドが待機し、最後のスレッドがBarrier#wait
を呼び出したところで待機していた
スレッドすべてが一気に動き出します。
Timer
最後はTimer
です。これは、指定した時間の後にタスクを実行する仕組みですね。
Timer
サンプルコードはこちら。スレッドの待ち合わせにはBarrier
を使いました。
def test_timer() -> None:
results = {}
barrier = threading.Barrier(2)
def task() -> None:
log("execute task")
results[threading.current_thread().name] = "done"
barrier.wait()
log("register task")
timer = threading.Timer(3, task)
timer.start()
barrier.wait()
assert len(results) == 1
Timer
は、コンストラクターにタスクを起動するまでの秒数と起動するタスクを関数として指定します。
timer = threading.Timer(3, task)
実行結果はこちら。3秒後にタスクが実行されています。
[2024-11-16 21:18:39] MainThread - register task
[2024-11-16 21:18:42] Thread-1 - execute task
こんなところでしょうか。
おわりに
Pythonでスレッドに関する標準ライブラリーをいろいろ試してみました。
使う頻度はそう多くないと思いますが、マルチスレッドを扱う時には押さえておいた方がよさそうなものばかりなので覚えておきましょう。