Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                
408
401

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Python 3.12で増えた並列処理と、これまでの並列処理の挙動を比べてみる

Last updated at Posted at 2024-04-30

この記事ですることを3行で

  • Pythonの標準ライブラリでできる並列実行を、あらためて総当たりで速度比較しよう
  • ウォーターフォールチャートで、それぞれの並列処理の処理時間の特徴を可視化しよう
  • boto3の実行をモデルケースにして、どの並列処理が一番早いのかを調べよう

この記事の結論を先に

  • Python 3.12から本格的に使えるようになったサブインタープリターは、CPUで実行する処理について言えば、従来のサブプロセスよりも高速
     
  • boto3の実行は、サブインタープリターよりも署名付きURLの非同期実行のほうが速い
    → S3からの10ファイルの取得であれば、実行時間を90%削減できます
    → Bedrockの3回実行であれば、実行時間を60%削減できます

今回使ったソースコードはこちらに置いています。
お手持ちの環境で再実行できるようにしていますので、気になる方はぜひ。

どうしてこの記事を書くのか

2020年の10月に、「Faster Cpython」の計画が立ち上がりました。

The overall aim is to speed up CPython by a factor of (approximately) five.
(最終的な狙いは、CPythonを5倍高速にすることです)

3.10以降のPythonではこの成果が取り入れられています。3.12ではこれまで使い物にならなかった並列化処理(サブインタープリター)も使えるようになりました。次のバージョンである3.13では、GILを撤廃するモードも追加されます。

これまでのPythonの並列化の常識は変わりつつあるので、あらためて調べておきたく思いました。

この記事で扱う実行環境

  • Python 3.12.1

※比較のために、一部でPython 3.8.3を使います

この記事で扱う並列化の対象処理

  • CPUを使った単純な計算(フィボナッチ数の計算)
  • Sleep処理
  • S3からのファイル取得
  • LLM(Bedrock Claude Haiku)の実行

同時実行数は10、LLMの同時実行数は3にします。

この記事で扱う実行方法

この記事では、以下の7通りの処理を比較します
※処理する対象の関数名はprocess、非同期はasync_processとしています
※7通りのいずれも、標準のpythonで使える実行方法です
※並列と並行を含みますが、この記事では並列で表記します。

1. for文でそのまま書く(グラフでの表記: SINGLE)

for文でそのまま書く
for _ in range(10):
    process() # 実行する処理

2. asyncioで非同期実行する(グラフでの表記: ASYNC)

asyncioで非同期実行する
import asyncio

async def execute():
    task_list = [
        asyncio.create_task(async_process())
        for _ in range(10)
    ]
    for task in task_list:
        await task

def start():
    # スレッドの実行
    asyncio.run(execute())

3. multiprocessingでプロセスを分割する(グラフでの表記: SUBPROCESS)

multiprocessing.Processでプロセスを分割する
import multiprocessing

proc_list = []
for _ in range(10):
    proc = multiprocessing.Process(target=process)
    proc.start()
    proc_list.append(proc)
for p in proc_list:
    p.join()

4. threading.Threadでスレッドを立てる(グラフでの表記: THREADS)

threadingでスレッドを立てる
import threading

threads = []
for _ in range(10):
    thread = threading.Thread(target=process)
    thread.start()
    threads.append(thread)
for t in threads:
    t.join()

5. SubInterpreterを直接実行する(グラフでの表記: SUBINTERPRETER)

SubInterpreterを直接実行する
import _xxsubinterpreters as interpreters  # type: ignore
from inspect import getsource
from textwrap import dedent

# 関数を文字列に変換、インデントを除去する
process_str = dedent(getsource(process))
for _ in range(10):
    intp_id = interpreters.create()
    interpreters.run_string(intp_id, process_str)
    interpreters.destroy(intp_id)

6. SubInterpreterをthreading内で実行する(グラフでの表記: SUBINTERPRETER_THREADS)

SubInterpreterをthreading内で実行する
import _xxsubinterpreters as interpreters  # type: ignore
from inspect import getsource
from textwrap import dedent

def call_subinterpreter():
    # 関数を文字列に変換、インデントを除去する
    process_str = dedent(getsource(process))
    intp_id = interpreters.create()
    interpreters.run_string(intp_id, process_str)
    interpreters.destroy(intp_id)

# threadingを使って実行する
threads = []
for _ in range(10):
    thread = threading.Thread(target=call_subinterpreter)
    thread.start()
    threads.append(thread)
for t in threads:
    t.join()

7. SubInterpreterをsubprocess内で実行する(グラフでの表記: SUBINTERPRETER_SUBPROC)

SubInterpreterをsubprocess内で実行する
import _xxsubinterpreters as interpreters  # type: ignore
from inspect import getsource
from textwrap import dedent

def call_subinterpreter():
    # 関数を文字列に変換、インデントを除去する
    process_str = dedent(getsource(process))
    intp_id = interpreters.create()
    interpreters.run_string(intp_id, process_str)
    interpreters.destroy(intp_id)

# subprocessを使って実行する
proc_list = []
for _ in range(10):
    proc = multiprocessing.Process(target=call_subinterpreter)
    proc.start()
    proc_list.append(proc)
for p in proc_list:
    p.join()

比較していく

CPUで実行する計算処理を比較する

簡単なフィボナッチ数の計算をします。関数の再帰を繰り返して、CPUだけで処理をする関数です。

フィボナッチ数の計算
def fib_r(seq):
    if seq <= 1:
        return seq
    return fib_r(seq - 1) + fib_r(seq - 2)

res = fib_r(process_count)

この計算を、先に挙げた7通りの並列実行で比較します。

Python 3.8の場合

Python 3.8で実行すると、以下のようになりました。

fib-short-38.png

for文をそのまま書いた場合(SINGLE)でかかる時間を1としたときに、それぞれの処理がどれだけ時間がかかるのかをグラフにしています。

Pythonの3.8では、サブプロセス以外では実行時間の短縮が見られません。
サブインタープリターをサブプロセス上で実行させたケースも数字は良いのですが、3.8の時点では、致命的な不具合(例:関数の引数として渡した文字列の中身が壊れる)があるので、実用性はありません。

Python 3.12の場合

全く同じ処理をPython 3.12で実行して比較してみます。

fib-short-312.png

最も実行時間の短い処理は、サブインタープリターのスレッド実行になりました。
他の処理の半分以下の時間で済んでいます。

サブプロセスの処理が遅いのは、Pythonの3.12にとって、渡した処理が短すぎたせいです。もう少し長い時間(90秒ほど)かかる処理を渡して再実行します。

fib-long-312.png

処理の時間が長くなると、サブプロセスも悪くない数字になりました。
サブインタープリターは処理時間に関わらず、半分以下の実行時間で安定しています。

詳しく見る

それぞれの実行時間をウォーターフォールチャートで見ていきます。
おそらくブラウザの検証モードで馴染みがあるチャートだと思います。

この図です。処理が移り変わっていくときに、それぞれの処理の開始時間と終了時間を棒グラフで表したものです。今回はmatplotlibのBoxplotを変形して作っています。

Pythonでウォーターフォールチャートを書く方法
import matplotlib.pyplot as plt

# ウォーターフォールチャートのデータを用意する(開始秒、終了秒のタプル)
span_data = [
    (0.0, 1.0),
    (1.0, 2.0),
    (2.0, 3.0),
]

# ボックスプロットを描画する
plt.boxplot(
    # 2と3の間が箱、それ以外はひげになるため、箱だけを書く
    # データは若いものが上になるように逆順にする
    x=[[s[0], s[0], s[1], s[1]] for s in span_data.__reversed__()],
    labels=[f"{i}" for i in range(len(span_data), 0, -1)],
    # デザインを設定する
    vert=False,  # 横向きにする
    patch_artist=True,  # 色を設定
    boxprops=dict(  # 箱の色を設定
        facecolor="#0972d3",
        color="#0972d3",
    ),
    medianprops=dict(color="black", linewidth=0),  # 箱の上の線は消す
)

# 描画する
plt.show()

for文を直接回す(Single)

for文を回したときのウォーターフォールチャートは下のようになります。
処理が終わると次の処理に移る、それを10回繰り返すため、実行時間は階段状に並びます。

single-local-30.png

非同期実行する(Async)

Asyncの場合、awaitのない処理は通常のfor文と同じように処理されます。
ごくわずかですが、前の処理が完了する前に次の処理が始まります。

async-local-30.png

サブインタープリターを直接実行する(Subinterpreter)

サブインタープリターを直接実行した場合も、通常のfor文と同じように処理されます。
処理の先頭でわずかに準備の時間がかかるため、その分だけ遅くなっています。

subinterpreter-local-30.png

サブプロセスを実行する(Subprocess)

サブプロセスは、実行前の準備に大きく時間がかかります。
処理そのものは完全な並列ですから、グラフは階段状ではなく縦に並びます。

subprocess-local-30.png

処理が短すぎると実行前の準備時間の分をペイできないのですが、十分に時間のかかる処理を渡してやると、下のようなきれいな並列処理になります。

subprocess-local-38.png

スレッド実行する(Threads)

スレッド実行は、サブプロセスと違って実行前の準備時間がありません。
ただ、スレッドが他のスレッドを止めてしまうため、まばらな待ち時間が発生します。

threads-local-30.png

スレッド実行に時間のかかる処理を渡すと、処理が切り替わるわけではなく、全体が引きずられて遅延します。サブプロセスであれば45秒で終わる処理なのですが、どのスレッドも2倍の時間がかかっています。

threads-local-38.png

どうしてこんな挙動をするのかというと、PythonのスレッドにはGILと呼ばれるロックがあって、同時に2つ以上のスレッドを処理しないようになっているためです。

GILについて、詳しくはこちらをご参考ください。

サブインタープリターでスレッド実行する

やや古いPythonでは、マルチプロセスだけはGILの制限にかからないので、並列実行の速度的なメリットを受けるにはマルチプロセスで実装する必要がある、という事情がありました。

3.12から、本格的にサブインタープリターが使えるようになったことで、この事情が変わりました。
サブインタープリターには以下の特徴があります。

  • スレッド上で実行することが可能
  • GILの制限を受けない

サブプロセスとの違いは、ウォーターフォールチャートで見ると分かりやすいです。

subinterpreter_threads-local-30.png

スレッドで実行するため、サブプロセスにあった準備時間がありません。
GILの遅延がないため、サブプロセス並みの実行時間で処理が完了します。

時間のかかる処理を渡してみます。

subinterpreter_threads-local-38.png

とても綺麗な並列実行です。
まさに、スレッドとサブプロセスのいいとこどりの動きをしていることが分かります。

サブインタープリターをサブプロセス実行する

一応、サブインタープリターをサブプロセス上で実行することもできます。
※実装が難しくなるだけで、何もメリットはありません。

subinterpreter_subproc-local-30.png

サブプロセスのデメリット(準備に時間がかかる)を受けていて、処理時間そのものもサブプロセス並みです。ウォーターフォールチャートで見ると、サブプロセスの上でサブインタープリターを実行しない理由が分かりやすいと思います。

データの件数を増やしても、動きはサブプロセスと同じです。

subinterpreter_subproc-local-38.png

Sleep処理を比較する

スリープ処理をする場合の時間を比較します
※サブインタープリター内ではSleepできないので、それを除いた処理を比較します。

スリープ処理
sleep(1.0)

Asyncでは使う関数が変わります。

AsyncでのSleep処理
await asyncio.sleep(1.0)

1秒待機する処理を10件実行しました。
かかった時間は以下の通りです。

sleep-process.png

CPUを利用する並列処理であれば、サブプロセスは強力でした。サブプロセスに事前準備で遅くなるデメリットがあっても、それを補うだけの速度がありました。

Sleepでは事情が変わります。非同期実行のAsyncとスレッド実行は、CPUを使わない処理であれば、どちらもきれいに10分の1の時間で完了します。

S3のファイル取得を比較する

S3のファイル取得処理を比較します。
※サブインタープリター内ではboto3は動かないので、ひとまず除いた処理を比較します。

S3のファイル取得処理
bucket = boto3.resource("s3").Bucket(BUCKET_NAME)
content = bucket.Object(FILES[index]).get()["Body"].read()

ちなみに、boto3の公式ドキュメントにある通り、boto3のリソースとセッションはスレッドセーフではないため、並列処理で使いまわすことができません。各スレッドで作るようにします。

Similar to Resource objects, Session objects are not thread safe and should not be shared across threads and processes. It’s recommended to create a new Session object for each thread or process:(Resourceオブジェクトと同様に、Sessionオブジェクトはスレッドセーフではないため、スレッドやプロセス間で共有すべきではありません。Sessionスレッドまたはプロセスごとに新しいオブジェクトを作成することをお勧めします。)

S3にある10件の画像ファイルを並列で取得した結果がこちらです。

s3-get-object-boto3.png

boto3はawaitできないため、Asyncの非同期実行は遅くなります。
スレッド実行は30%ほど実行時間を短縮できています。

ですが、ちょっと物足りない数字です。マルチスレッドに対応していないboto3を無理やり動かすのではなくて、どうにかしてサブインタープリターでboto3を実行して、もっと大きく実行時間を削減したい…

しばらく考えて、ふと思いつきました。

これ、boto3から署名付きURLを取れば、Asyncで非同期実行も可能で、サブインタープリター内で実行できるんじゃね?

テイク2:署名付きURLでS3からファイルを取る

署名付きURLを並列実行の外側で発行します。

並列実行の外側で署名を取得します
s3 = boto3.client("s3")
my_config = Config(region_name="ap-northeast-1", signature_version="s3v4")
s3 = boto3.client("s3", config=my_config)
presigned_url = s3.generate_presigned_url(
    ClientMethod="get_object",
    Params={
        "Bucket": BUCKET_NAME,
        "Key": FILES[index],
    },
    ExpiresIn=3600,
)

発行した署名付きURLを使って、並列処理の中でファイルを取得します。

Awaitなしで実行する
import urllib3

response = urllib3.PoolManager().request(method="get", url=presigned_url)
res = response.data

Asyncについては、aiohttpを使って実行します。
※URL encoded=Trueを指定しないとboto3の署名が無効にされるので、encoded=Trueを指定してリクエストを投げます。

Awaitありで実行する
import aiohttp
from aiohttp.client import URL

async with aiohttp.ClientSession() as session:
    async with session.request(
        method="get",
        url=URL(presigned_url, encoded=True),
    ) as response:
        res = await response.read()

先ほどと同じように、署名付きURLを挟んで、10件の画像ファイルをS3から取得します。

s3-get-object-aiohttp.png

直列実行で署名付きURLを取得、そのままファイルを取得する場合と比べて、およそ70%ほど実行時間が短縮されています。S3からのファイル取得をサブインタープリター化することはできました。

では、boto3で直接取得する場合に比べると、どのくらい実行時間は短縮されているのでしょうか。比較してみます。

s3-process-overview.png

だいたい90%ほど実行時間が短縮できています。

Bedrockの実行を並列化する

S3の署名付きURLの発行は簡単ですが、ほかのAPIでも同じようなことができます。
Bedrock のClaude 3 Haikuの実行を並列化してみます。

以下のようなクラスを作って、Boto3のリクエストをhttpクライアントで実行できるようにします。

Boto3LowLevelClient.py
import boto3
from botocore.auth import SigV4Auth
from botocore.credentials import create_credential_resolver
from botocore.awsrequest import prepare_request_dict, create_request_object
from botocore.model import OperationModel


class Boto3LowLevelClient:
    _service_name: str
    _credential_scope: str
    _boto3_session = None
    _client = None

    def __init__(
        self,
        service_name: str,
        credential_scope: str = None,
        region_name: str = None,
        profile_name: str = None,
    ) -> None:
        # サービス名を確保する
        # 権限スコープは基本的にサービス名に同じ、異なるなら個別に設定する
        self._service_name = service_name
        if credential_scope is None:
            self._credential_scope = service_name
        else:
            self._credential_scope = credential_scope
        # boto3のセッションを確保する
        self._boto3_session = boto3.Session(
            region_name=region_name, profile_name=profile_name
        )
        # boto3のクライアントを作成する
        self._client = self._boto3_session.client(service_name)

    def create_request_parameter(self, method_name: str, method_parameter: dict):
        """
        boto3への要求情報を作成する
        """

        # クライアントから関数の定義情報を参照する
        operation_model: OperationModel = self._client._service_model.operation_model(
            method_name
        )
        request_dict = self._client._serializer.serialize_to_request(
            method_parameter, operation_model
        )

        # 今の環境に設定されている認証情報を取得する
        resolver = create_credential_resolver(self._boto3_session._session)
        credentials = resolver.load_credentials()
        # リージョン名を取得する
        region_name = self._boto3_session.region_name
        # エンドポイントのurlを取得する
        endpoint_url = f"https://{self._service_name}.{region_name}.amazonaws.com"

        # 要求情報にコンテキストとエンドポイントを書き込む
        prepare_request_dict(
            request_dict,
            endpoint_url,
            {
                "client_region": region_name,
                "client_config": self._client.meta.config,
                "has_streaming_input": operation_model.has_streaming_input,
                "auth_type": operation_model.auth_type,
            },
            user_agent=self._client._user_agent_creator.to_string(),
        )

        # リクエストに対して署名する
        request = create_request_object(request_dict)
        SigV4Auth(
            credentials=credentials,
            service_name=self._credential_scope,
            region_name=region_name,
        ).add_auth(request)

        # ヘッダを辞書型に詰め直す
        headers = {}
        for kv in request.headers.__dict__["_headers"]:
            headers[kv[0]] = kv[1]

        return {
            "method": request.method,
            "url": request.url,
            "body": request.body.decode("utf-8"),
            "headers": headers,
        }

実行するときはboto3と同じです。

実行例: STSから認証情報を取るなら
import urllib3
import json

# 署名をboto3から抜き取る
sts = Boto3LowLevelClient("sts")
# stsのget_caller_identityを作成する
get_caller_identity = sts.create_request_parameter("GetCallerIdentity", {})

# urllib3やaiohttpで実行する
# ここからはboto3ではないので、並列化やサブインタープリター内での実行ができる
res = urllib3.PoolManager().request(**get_caller_identity)
print(res.data.decode("utf-8"))
実行例: BedrockのHaikuを実行するなら
import urllib3
import json

# Bedrockは署名スコープがbedrockになるので、そこだけ指定する
bedrock_runtime = Boto3LowLevelClient(
    "bedrock-runtime", credential_scope="bedrock", region_name="us-east-1"
)
# BedrockRuntimeのinvoke_modelを作成する
invoke_model = bedrock_runtime.create_request_parameter(
    "InvokeModel",
    {
        "body": json.dumps(
            {
                "anthropic_version": "bedrock-2023-05-31",
                "max_tokens": 100,
                "messages": [
                    {
                        "role": "user",
                        "content": "こんにちは、Claude",
                    }
                ],
            }
        ),
        "modelId": "anthropic.claude-3-haiku-20240307-v1:0",
        "accept": "application/json",
        "contentType": "application/json",
    },
)

# urllib3やaiohttpで実行する
# ここからはboto3ではないので、並列化やサブインタープリター内での実行ができる
res = urllib3.PoolManager().request(**invoke_model)
print(res.data.decode("utf-8"))

aiohttpで実行するときは下のように書きます

urllib3ではなくaiohttpで実行するなら
import aiohttp
from aiohttp.client import URL

async with aiohttp.ClientSession() as session:
    async with session.request(
        method=invoke_model["method"],
        url=URL(invoke_model["url"], encoded=True), # encoded=Trueを指定
        data=invoke_model["body"], # 引数の変数名が違うので、dataにする
        headers=invoke_model["headers"],
    ) as response:
        res = await response.text()

この方法で、boto3のclient経由で実行できるメソッドならどれでも並列化できます
詳しいソースはこちらです

署名を並列処理の外側で作成して、3回のBedrockへのリクエストを実行しました。

bedrock-aiohttp.png

非同期のAsyncやスレッド実行で、3分の1に近い実行時間まで削ることができています。Bedrockは実行時間の揺れが大きいので、ややばらついた部分はありますが、挙動はS3の並列実行と同じだと考えて良さそうです。

詳しく見る

for文でBedrockを3回実行する

for文で実行したときのウォーターフォールチャートです。
Haikuは1秒ほどでレスポンスを返しています。

single-bedrock.png

Asyncを使って、非同期で実行する

aiohttpを使って、非同期でリクエストを投げたときの挙動は以下の通りです

async-bedrock.png

わずかに実行開始までのあいだのラグがあるのですが、Haikuを完全な並列で実行しています。joinではなくraceの形で実行していれば、1.6秒ほどでLLMの結果を受け取ることができそうです。

スレッド実行を使って、非同期でBedrockを実行する

スレッド実行ではawaitの必要はないので、aiohttpではなく、urllib3を使って実行します。
スレッド実行のウォーターフォールチャートは以下のようになります。

threads-bedrock.png

LLMに3回のリクエストを投げて結果を受け取るまでの時間は2秒を切っています。

サブインタープリターをスレッド実行する

サブインタープリターのスレッド実行もurllib3で実行します。
ウォーターフォールチャートは以下のようになります。

subinterpreter_threads-bedrock.png

こちらもスレッドに近い結果です。

LLMではSQL文やJSONの作成で意図に沿わない形式の結果を返すことがあるのですが、並列で同時に複数回のリクエストを送って、使える結果を選ぶようにすれば、リトライのもたつきを感じることなく実装することができます。

まとめ

あらためて、Pythonの3.12の並列処理を総あたりで調べてみました。

  • Python 3.12から本格的に使えるようになったサブインタープリターは、CPUで実行する計算処理について言えば、従来のサブプロセスよりも高速
  • boto3の実行は、新しいサブインタープリターに頼るよりも署名付きURLを噛ませるほうが早い

あらためて、今回使ったソースコードはこちらに置いています。
お手持ちの環境で再実行できるようにしています。

408
401
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
408
401

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?