Location via proxy:   [ UP ]  
[Report a bug]   [Manage cookies]                

CUBE SUGAR CONTAINER

技術系のこと書きます。

Python: ソケットプログラミングのアーキテクチャパターン

今回はソケットプログラミングについて。 ソケットというのは Unix 系のシステムでネットワークを扱うとしたら、ほぼ必ずといっていいほど使われているもの。 ホスト間の通信やホスト内での IPC など、ネットワークを抽象化したインターフェースになっている。

そんな幅広く使われているソケットだけど、取り扱うときには色々なアーキテクチャパターンが考えられる。 また、比較的低レイヤーな部分なので、効率的に扱うためにはシステムコールなどの、割りと OS レベルに近い知識も必要になってくる。 ここらへんの話は、体系的に語られているドキュメントが少ないし、あっても鈍器のような本だったりする。 そこで、今回はそれらについてざっくりと見ていくことにした。

尚、今回はプログラミング言語として Python を使うけど、何もこれは特定の言語に限った話ではない。 どんな言語を使うにしても、あるいは表面上は抽象化されたインターフェースで隠蔽されていても、内部的にはソケットが使われている。 例えば Java サーブレットや Ruby on Rails で Web アプリケーションを書くにしても、それが動くサーバの通信部分はソケットで書かれていることだろう。

動作確認に使った環境は次の通り。

$ sw_vers
ProductName:    Mac OS X
ProductVersion: 10.12.4
BuildVersion:   16E195
$ python --version
Python 3.6.1

もくじ

ブロッキングとノンブロッキングについて

まず、ソケットを扱うには大きく分けて「ブロッキングで使うか・ノンブロッキングで使うか」を選ぶことになる。 その中でも基本となる使い方はブロッキングで、こちらの方が逐次的なプログラミングモデルとなりやすいので理解も早い。 ではノンブロッキングにはどんなメリットがあるかというと、こちらは通信相手が増えたときのパフォーマンス面 (スケーラビリティ) で優れている。

このエントリでは、ソケットの扱い方をブロッキング・ノンブロッキングと分けた上で、それぞれにどんなアーキテクチャパターンが考えられるか見ていく。 しかし、その前にまずは事前知識としてソケットにおけるブロッキング・ノンブロッキングという概念自体の説明から入ろう。

まず、ソケットというオブジェクトに対してはデータの読み込みや書き込みを指示できる。 読み込まれるデータは通信相手から送られてきたもので、書き込まれたデータは通信相手に送り届けられる。 しかし、それらのデータを読み書きする指示は即座に完了するわけではない。 具体的には、ソケットには読み書きができる状態とできない状態があるためだ。 読み書きができないというのは、わんこそばで例えると口の中でもぐもぐしている最中で、次のそばを口に入れられない状態を指す。

では、もしも読み込みや書き込みができない状態にあるソケットに対して、その指示を出したらどう振る舞うのだろうか。 ブロッキング・ノンブロッキングの違いというのは、正にこの「どう振る舞うか」の違いを指す。 ブロッキングというのは、読み書きができる状態になるまで、じっとそのまま待つことを意味している。 それに対して、ノンブロッキングは読み書きができない状態にあるときエラーを出してすぐに処理を終了する。

これで、ソケットのブロッキング・ノンブロッキングの違いについて説明できた。

ソケットをブロッキングで扱う場合

さて、前フリが長くなったけど、ここからは具体的なアーキテクチャパターンを見ていくことにしよう。 初めは、基本的な使い方であるソケットをブロッキングで扱う場合から。

今回、サンプルコードとして題材にするのはエコーサーバにした。 エコーサーバというのは、クライアントから送られてきたデータを、そのままオウム返しでクライアントに送り返すサーバのことをいう。

実装については IPv4 のループバックアドレスを使って TCP:37564 ポートでクライアントからの接続を待ち受けるようにした。 ループバックアドレスとは何か、みたいな TCP/IP 的な概念についての説明は省く。 これは、今回の主題として扱うアーキテクチャパターンという範疇からは、ちょっと外れるため。

あと、クライアントサイドについても自分で書いても良いんだけど、今回はありものを使うことにした。 ここでは netcat というツールを使うことにしよう。 netcatHomebrew を使ってインストールする。

$ brew install netcat

Homebrew が入っていないときは入れる感じで。

シングルスレッド

まずは、ソケットがブロッキングで、それをシングルスレッドで扱う場合から考えてみよう。 これが、最もシンプルなパターンといえるはず。

早速だけどサンプルコードを以下に示す。 それぞれの処理の内容はコメントで補足している。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket


def main():
    # IPv4/TCP のソケットを用意する
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 'Address already in use' の回避策 (必須ではない)
    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    # 待ち受けるアドレスとポートを指定する
    # もし任意のアドレスで Listen したいときは '' を使う
    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    # クライアントをいくつまでキューイングするか
    serversocket.listen(128)

    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            # クライアントソケットから指定したバッファバイト数だけデータを受け取る
            try:
                message = clientsocket.recv(1024)
                print('Recv: {}'.format(message))
            except OSError:
                break

            # 受信したデータの長さが 0 ならクライアントからの切断を表す
            if len(message) == 0:
                break

            # 受信したデータをそのまま送り返す (エコー)
            sent_message = message
            while True:
                # 送信できたバイト数が返ってくる
                sent_len = clientsocket.send(sent_message)
                # 全て送れたら完了
                if sent_len == len(sent_message):
                    break
                # 送れなかった分をもう一度送る
                sent_message = sent_message[sent_len:]
            print('Send: {}'.format(message))

        # 後始末
        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


if __name__ == '__main__':
    main()

サーバにおけるソケットプログラミングの基本的な流れは次の通り。

  • ソケットを作る (socket)
  • 待ち受けるアドレスとポートを指定する (bind)
  • 接続キューの長さを指定して接続を待ち受ける (listen)
  • 接続してきたクライアントからソケットを取得する (accept)
  • 取得したクライアントのソケットに対して読み書きする (send/recv)

このパターンでは、上記の一連の処理を一つのスレッドでこなしていく。

それではサンプルコードを実行してみよう。 これで、エコーサーバが起動する。 とはいえ、クライアントが接続しない限り特に何も表示されることはない。

$ python singlethread.py

続いて、別のターミナルを開いて netcat を実行しよう。 次のようにすると、先ほど起動したエコーサーバに接続できる。

$ nc localhost 37564

すると、エコーサーバを起動したターミナルに、クライアントからの接続を表す表示が出るはず。

$ python singlethread.py
New client: 127.0.0.1:63917

さらに netcat のターミナルで文字列を入力して Enter すると、同じ内容がまた表示される。 これは、送信した内容がエコーサーバからオウム返しで返ってきたことを意味する。

$ nc localhost 37564
hogehoge
hogehoge

エコーサーバのターミナルを見ると、送受信した内容が表示されている。

$ python singlethread.py
New client: 127.0.0.1:63917
Recv: b'hogehoge\n'
Send: b'hogehoge\n'

netcat は Ctrl キーと C キーを一緒に押すことで終了できる。 これでサーバとの接続も切断される。

$ nc localhost 37564
hogehoge
hogehoge
^C

サーバの方にもクライアントとの接続が切れた旨が表示された。

$ python singlethread.py
New client: 127.0.0.1:63917
Recv: b'hogehoge\n'
Send: b'hogehoge\n'
Recv: b''
Bye-Bye: 127.0.0.1:63917

ここまで見た限り、このパターンで何の問題も無いように見える。 しかし、クライアントを二つにすると問題点が分かってくる。

サーバを一旦終了して、もう一度起動し直そう。 ちなみにサーバについても Ctrl-C で終了できる。

$ python singlethread.py

そして、改めて別のターミナルから netcat で接続する。

$ nc localhost 37564

クライアントが一つなら、サーバは接続を正常に受け付ける。

$ python singlethread.py
New client: 127.0.0.1:49746

では、さらにもう一つターミナルを開いて netcat で接続してみると、どうだろうか?

$ nc localhost 37564

今度は、サーバ側に接続を受け付けたメッセージが表示されない。

$ python singlethread.py
New client: 127.0.0.1:49746

そう、ソケットをブロッキングかつシングルスレッドで扱う場合、二つ以上のクライアントを同時に上手くさばくことができない。 なぜなら、唯一のスレッドは最初のクライアントからデータを読み書きする仕事に従事しているからだ。

先ほどのサンプルコードでいえば以下、クライアントからの新たなデータの到来を待ち続けて (ブロックして) いることだろう。

message = clientsocket.recv(1024)

唯一のスレッドが一つのクライアントにかかりきりなので、以下の別のクライアントからの接続を受け付ける処理は実行されない。 クライアントからの接続は、ソケットの接続キューに積まれたまま放置プレイを食らう。

clientsocket, (client_address, client_port) = serversocket.accept()

今かかりきりになっている相手との通信が終わるまで、別のクライアントは受け付けることができないというわけ。

マルチスレッド

ソケットをブロッキングで扱うとき、シングルスレッドでは二つ以上のクライアントを上手くさばけないことが分かった。 そこで、次はクライアントを処理するスレッドを複数用意してマルチスレッドにしてみよう。

先ほどの内容に手を加えて、マルチスレッドにしたサンプルコードは次の通り。 先ほどと同じ処理についてはコメントを省いて、新たに追加したり変更したところにコメントを書いている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket
import threading


def client_handler(clientsocket, client_address, client_port):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        try:
            message = clientsocket.recv(1024)
            print('Recv: {0} from {1}:{2}'.format(message,
                                                  client_address,
                                                  client_port))
        except OSError:
            break

        if len(message) == 0:
            break

        sent_message = message
        while True:
            sent_len = clientsocket.send(sent_message)
            if sent_len == len(sent_message):
                break
            sent_message = sent_message[sent_len:]
        print('Send: {0} to {1}:{2}'.format(message,
                                            client_address,
                                            client_port))

    clientsocket.close()
    print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        # 接続してきたクライアントを処理するスレッドを用意する
        client_thread = threading.Thread(target=client_handler,
                                         args=(clientsocket,
                                               client_address,
                                               client_port))
        # 親 (メイン) スレッドが死んだら子も道連れにする
        client_thread.daemon = True
        # スレッドを起動する
        client_thread.start()


if __name__ == '__main__':
    main()

先ほどとの違いは、クライアントとの接続に対してスレッドが一対一で生成されるところだ。 プログラムが起動された直後に生成されるメインスレッドは、クライアントからの接続を受け付ける仕事だけに専念している。 実際に受け付けたクライアントとの接続の処理は、新たに生成した子スレッドに任せるわけだ。

では、上記サンプルコードの動作を確認してみよう。 まずはエコーサーバを起動する。

$ python multithread.py

そして、二つのターミナルからエコーサーバに接続してみよう。

$ nc localhost 37564

すると、今度は二つのクライアントから接続を受け付けた旨が表示された。

$ python multithread.py
New client: 127.0.0.1:51027
New client: 127.0.0.1:51028

それぞれのクライアントのターミナルで文字列を入力すると、ちゃんとエコーバックされるし上手く動いている。

$ python multithread.py
New client: 127.0.0.1:51027
New client: 127.0.0.1:51028
Recv: b'hogehoge\n' from 127.0.0.1:51027
Send: b'hogehoge\n' to 127.0.0.1:51027
Recv: b'hogehoge\n' from 127.0.0.1:51028
Send: b'hogehoge\n' to 127.0.0.1:51028

マルチスレッド (スレッドプール)

先ほどの例では、クライアントを処理する部分をマルチスレッド化することで、二つ以上のクライアントを同時にさばけるようになった。 しかし、実は先ほどのやり方ではクライアントの接続数がどんどん増えていくと問題になってくることがある。 それは、メモリの使用量とコンテキストスイッチにかかるコストの増加だ。

スレッドというのは、新たに作ろうとするとそれ用のコンテキストを必要とする。 この、コンテキストというのは各スレッドの状態を保持しておくために必要なメモリに他ならない。 スレッドあたりのコンテキストのサイズは状態や実装に依存するので、これくらいとはなかなか言いづらいものがある。 とはいえ、一つ一つが小さくてもクライアントの接続数が増えれば決してばかにできないサイズになってくる。

また、コンテキストスイッチというのは、CPU が処理しているスレッドを OS が途中で切り替える作業のことをいう。 まず、CPU というのは同時に処理できるスレッドの数が、あらかじめ製品ごとに決まっている。 例えば、今売られている Intel や AMD の x86-64 アーキテクチャの CPU を例に挙げてみよう。 この場合は、物理コアあたり 1 または 2 スレッドである場合が多い。 つまり、同時に処理できるスレッドには機械的な上限がある。 ちなみに、物理コアあたり同時 2 スレッドの製品については、OS からは論理コアが 2 つあるように扱われる。

にも関わらず、実のところ私たちは普段からそれよりも多くのスレッドを同時に起動して扱っている。 なぜそんなことができるかというと、CPU が実行するスレッドを、OS が途中で別のスレッドに入れ替えているためだ。 この入れ替えは、ごく短時間で行われているので、見かけ上はたくさんのスレッドが同時に実行できているかのように見える。

しかしながら、この入れ替え作業には短時間ながらもちろん時間がかかる。 そして、CPU で同時に処理できるスレッドの数に対して、OS が扱うスレッドの数が増えてくると、その頻度も上がる。 これによって、切り替え作業に要する時間が増えて、だんだんと CPU が非効率な使われ方をしてしまうことがある。

先ほどのサンプルコードでは、まさに上記の二つが問題となる。 なぜなら、生成するスレッドの数に上限を設けていないからだ。 上限がないと、クライアントの数に応じてどんどんスレッドが増え続ける。 結果として、メモリを消費すると共に CPU が非効率な使われ方をしてしまう。

スレッドが多すぎるとまずいという問題点が分かったところで、次はスレッドを生成する数に上限を設けてみよう。 具体的には、あらかじめスレッドを既定数だけ生成して、それらに仕事を割り振る形にする。 この手法は、一般にスレッドプールと呼ばれている。 スレッドプールの中にいる各スレッドは、ワーカースレッドと呼ばれる。

次のサンプルコードはスレッドプールを使った実装になっている。 生成されたワーカーがサーバソケットからの接続を奪い合う形になる。 これなら、あらかじめ決まった数を超えるスレッドは生成されないので、前述したような問題は発生しない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import socket
import threading


def worker_thread(serversocket):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        # ワーカスレッド同士でクライアントからの接続を奪い合う
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {0} from {1}:{2}'.format(message,
                                                      client_address,
                                                      client_port))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {0} to {1}:{2}'.format(message,
                                                client_address,
                                                client_port))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    # サーバソケットを渡してワーカースレッドを起動する
    NUMBER_OF_THREADS = 10
    for _ in range(NUMBER_OF_THREADS):
        thread = threading.Thread(target=worker_thread, args=(serversocket, ))
        thread.daemon = True
        thread.start()

    while True:
        # メインスレッドは遊ばせておく (ハンドラを処理させても構わない)
        time.sleep(1)


if __name__ == '__main__':
    main()

ただし、上記にも注意点がある。 それは、あらかじめプールしたスレッド数を超えてクライアントをさばくことができない、という点だ。 プール数を超えた接続があったときは、他のクライアントとの接続が切れるまで、ソケットは処理されないままキューに積まれてしまう。

実行結果については、先ほどと変わらないので省略する。

ちなみに、蛇足だけど Mac OS X に関してはプロセスごとに生成できるスレッド数があらかじめ制限されているようだ。 例えば、次のようなサンプルコードを用意して、たくさんのスレッドを起動してみる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import threading
import time


def loop():
    """各スレッドは特に何もしない"""
    while True:
        time.sleep(1)


def main():
    # ネイティブスレッドをたくさん起動してみる
    for _ in range(10000):
        t = threading.Thread(target=loop)
        t.daemon = True
        t.start()
        # 動作中のスレッド数を出力する
        print(threading.active_count())


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると 2049 個目のスレッドを起動するところで例外になった。

$ python toomanythreads.py
...(省略)...
2046
2047
2048
Traceback (most recent call last):
  File "toomanythreads.py", line 25, in <module>
    main()
  File "toomanythreads.py", line 19, in main
    t.start()
  File "/Users/amedama/.pyenv/versions/3.6.1/lib/python3.6/threading.py", line 846, in start
    _start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread

このリミットは、どうやら次のカーネルパラメータでかかっているらしい。

$ sysctl -n kern.num_taskthreads
2048

Mac OS X においては、スレッドの生成数に上限を設けないと、メモリの枯渇などを待つことなくサーバが突然死することになる。

マルチプロセス (プロセスプール)

先ほどの例では、スレッドプールを使うことで同時に処理できるクライアントの数を増やしつつ、リソースの消費を抑えることができた。 しかしながら、実はここまでの例では、パフォーマンスを求める上で、まだ使い切れていないリソースが残っている。 それは、複数の CPU コアだ。

実は Python の標準的な処理系である CPython には、とある制限が存在している。 それは、一つのプロセスで同時に実行できるスレッドの数が一つだけ、というもの。 一般的に、これはグローバルインタプリタロック (Global Interpreter Lock, GIL) と呼ばれている。 この制限は、Python/C API で書かれた拡張モジュールを Python から扱いやすくするために存在する。

この GIL がある処理系では、CPU に複数の論理コアがあったとしても、同時に使われるのが一つだけに制限されてしまう。 つまり、先ほどの例では、マルチスレッドにしても実際に使われている CPU 論理コアは同時に一つだけだった。 ようするに、複数のスレッドを OS が一つの CPU 論理コアの上で切り替え (コンテキストスイッチ) ながら動作する。

ちなみに、コンピュータの処理には、大きく分けて入出力 (I/O) が主体になるものと計算 (CPU) が主体になるものがある。 CPU が主体となるのは、例えば科学計算のようなもの。 それに対して、今回の例であるエコーサーバのようなプログラムは、CPU の処理がほとんどない。 処理時間のほとんどを I/O の待ちに使っていることから、入出力が主体のプログラムといえる。

つまり、今回取り扱うエコーサーバは CPU の処理能力がボトルネックになりにくい。 ようするに、あえて CPU の能力を最大限引き出すようなコードにする必然性は薄い。 しかしながら、アーキテクチャパターンの紹介という意味では重要だと思う。 なので、その方法についても記述しておこう。

その方法というのは、具体的にはプログラムを複数のプロセスで動かす。 前述した通り GIL はプロセスあたりの同時実行スレッド数を一つに制限するというものだった。 なので、プロセスを複数立ち上げてしまえば、同時実行スレッド数をプログラム全体で見たときに増やすことができる。

次のサンプルコードでは、スレッドの代わりにプロセスを複数起動 (マルチプロセス) している。 Python でマルチプロセスを扱う方法としては、例えば標準ライブラリの multiprocessing モジュールがある。 起動するプロセスの数は CPU の論理コア数と同じにした。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import socket
import multiprocessing


def worker_process(serversocket):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        # ワーカープロセス同士でクライアントからの接続を奪い合う
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {0} from {1}:{2}'.format(message,
                                                      client_address,
                                                      client_port))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {0} to {1}:{2}'.format(message,
                                                client_address,
                                                client_port))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    # プロセス数は CPU のコア数前後に合わせると良い
    NUMBER_OF_PROCESS = multiprocessing.cpu_count()
    # サーバソケットを渡してワーカープロセスを起動する
    for _ in range(NUMBER_OF_PROCESS):
        process = multiprocessing.Process(target=worker_process,
                                          args=(serversocket, ))
        # デーモンプロセスにする (親プロセスが死んだら子も道連れに死ぬ)
        process.daemon = True
        # プロセスを起動する
        process.start()

    while True:
        time.sleep(1)


if __name__ == '__main__':
    main()

マルチプロセスを使うときの注意点についても見ていこう。 これは、マルチスレッドの場合とほとんど変わらない。 つまり、プロセスを作るにもコンテキストが必要であり、コンテキストスイッチが起こるということだ。 そのため、同時に起動するプロセス数は制限してやる必要がある。 しかも、必要なリソースの量はスレッドに比べるとずっと多い。 そのため、一般的には起動するプロセス数は CPU の論理コアの数前後が良いとされている。

また、マルチプロセス固有の問題としては、プロセス間での値の共有が挙げられる。 マルチスレッドであれば、同一プロセス内でメモリ空間を共有していた。 なので、例えばグローバル変数の値をスレッド間で情報を共有する手段にもできた。 それに対し、マルチプロセスではプロセス同士でメモリ空間は共有していない。 そのため、別の何らかの IPC を使って情報をやり取りしなければいけない。

尚、繰り返しになるけどマルチプロセスにする必要があるのは、あくまで GIL があることに由来している。 もし、これがない処理系やプログラミング言語を使うなら、単にマルチスレッドにするだけで大丈夫。 ちゃんと CPU のコアを使い切ってくれるはず。

マルチプロセス・マルチスレッド

先ほどの例では、プロセスを複数立ち上げることで CPU の能力を使い切れるようにした。 ただし、マルチプロセスではあるものの、それぞれのプロセスでは一つのスレッドしか動かしていなかった。 そこで、次は各プロセスの中をマルチスレッドにしてみよう。 これなら、マルチプロセスかつマルチスレッドになって CPU と I/O の両方を上手く使い切れるはず。

次のサンプルコードでは、各ワーカープロセスの中でさらにスレッドプールを動かしている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import socket
import multiprocessing
import threading


def worker_thread(serversocket):
    """クライアントとの接続を処理するハンドラ (スレッド)"""
    while True:
        # クライアントからの接続を待ち受ける (接続されるまでブロックする)
        # ワーカースレッド同士でクライアントからの接続を奪い合う
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {0} from {1}:{2}'.format(message,
                                                      client_address,
                                                      client_port))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {0} to {1}:{2}'.format(message,
                                                client_address,
                                                client_port))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def worker_process(serversocket):
    """クライアントとの接続を受け付けるハンドラ (プロセス)"""

    # サーバソケットを渡してワーカースレッドを起動する
    NUMBER_OF_THREADS = 10
    for _ in range(NUMBER_OF_THREADS):
        thread = threading.Thread(target=worker_thread, args=(serversocket, ))
        thread.start()
        # ここではワーカーをデーモンスレッドにする必要はない (死ぬときはプロセスごと逝くので)

    while True:
        # ワーカープロセスのメインスレッドは遊ばせておく
        time.sleep(1)


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    NUMBER_OF_PROCESSES = multiprocessing.cpu_count()
    for _ in range(NUMBER_OF_PROCESSES):
        process = multiprocessing.Process(target=worker_process,
                                          args=(serversocket, ))
        process.daemon = True
        process.start()

    while True:
        time.sleep(1)


if __name__ == '__main__':
    main()

実行結果については、これまで変わらないので省略する。

ひとまず、ソケットをブロッキングで扱う場合のアーキテクチャパターンについては、これでおわり。

ソケットをノンブロッキングで扱う場合

続いては、ソケットをノンブロッキングで扱う場合について見ていこう。 前述した通り、ソケットをノンブロッキングで扱うと、読み書きなどを指示してもブロックが起きない。 その代わり、もし読み書きの準備ができていないときはその旨がエラーで返ってくる。

とりあえずノンブロッキングにしてみよう

最初に、ノンブロッキングなソケットをブロッキングっぽく扱ったときの挙動を確認しておこう。 具体的に、どんなことが起こるのだろうか?

次のサンプルコードは、最初に示したシングルスレッドのサーバに一行だけ手を加えている。 それは、サーバソケットを setblocking() メソッドでノンブロッキングモードにしているところだ。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # ソケットをノンブロッキングモードにする
    serversocket.setblocking(False)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {}'.format(message))
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                sent_len = clientsocket.send(sent_message)
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {}'.format(message))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


if __name__ == '__main__':
    main()

上記を実行してみよう。 すると、すぐに例外が出て終了してしまう。

$ python nonblocking.py
Traceback (most recent call last):
  File "nonblocking.py", line 48, in <module>
    main()
  File "nonblocking.py", line 22, in main
    clientsocket, (client_address, client_port) = serversocket.accept()
  File "/Users/amedama/.pyenv/versions/3.6.1/lib/python3.6/socket.py", line 205, in accept
    fd, addr = self._accept()
BlockingIOError: [Errno 35] Resource temporarily unavailable

上記の BlockingIOError という例外は、まだ準備が整っていないにも関わらず指示が出されたときに上がる。 今回の場合だと、クライアントからの接続が到着していないのに accept() メソッドを呼び出している。 ブロッキングモードのソケットなら、そのまま到着するまで待ってくれていた。 それに対し、ノンブロッキングモードでは呼び出した時点で到着していないなら即座に例外となってしまう。 正に、これがブロッキングとノンブロッキングの挙動の違い。

準備が整うまで待つ (ビジーループ)

先ほどの例で分かるように、ソケットをノンブロッキングで使うとブロッキングとは使い勝手が異なっている。 具体的には、ソケットの準備が整うのを勝手に待ってくれるわけではないので、自分で意図的に待たなければいけない。

では、どのようにすれば待つことができるだろうか。 一つのやり方としては、エラーが出なくなるまで定期的に実行してみる方法が考えられる。 この、何度も自分から試しに行くやり方はポーリングと呼ばれる。 その中でも、それぞれの試行間隔を全く空けないものはビジーループという。

次のサンプルコードではノンブロッキングなソケットをビジーループで待ちながら処理している。 ただし、あらかじめ言っておくと、このやり方は間違っている。 ソケットをノンブロッキングで扱うとき、こんなソースコードは書いちゃいけない。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setblocking(False)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        try:
            clientsocket, (client_address, client_port) = serversocket.accept()
        except (BlockingIOError, socket.error):
            # まだソケットの準備が整っていない
            continue

        print('New client: {0}:{1}'.format(client_address, client_port))

        while True:
            try:
                message = clientsocket.recv(1024)
                print('Recv: {}'.format(message))
            except (BlockingIOError,  socket.error):
                # まだソケットの準備が整っていない
                continue
            except OSError:
                break

            if len(message) == 0:
                break

            sent_message = message
            while True:
                try:
                    sent_len = clientsocket.send(sent_message)
                except (BlockingIOError,  socket.error):
                    # まだソケットの準備が整っていない
                    continue
                if sent_len == len(sent_message):
                    break
                sent_message = sent_message[sent_len:]
            print('Send: {}'.format(message))

        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


if __name__ == '__main__':
    main()

上記のサンプルコードは一応動作するものの、複数のクライアントを処理することができない。 それに、ビジーループを使っているとプロセスの CPU 使用率が 100% になってしまう。 繰り返しになるけど、ソケットをノンブロッキングで扱うとき、こんなソースコードは書いちゃだめ。

準備が整うまで待つ (イベントループ)

ビジーループでは色々と難しいことが分かったところで、次は実用的に待つ方法を見ていこう。 これには、イベントループや I/O 多重化と呼ばれる手法というかシステムコールを用いる。 システムコールというのは OS のカーネルに備わっている API のことだ。 ユーザランドのプログラムは、このシステムコールを呼び出すことで OS の機能が利用できる。

システムコールの中には、ソケットの状態を監視して、変更されたときにそれを通知してくれるものがある。 より正確には、監視できるものはファイルやソケットに汎用的に割り当てられるファイルディスクリプタだ。

イベントループにはいくつかの種類があるものの、ここでは古典的な select(2) を使うやり方を見ていく。 次のサンプルコードは、エコーサーバを select(2) システムコールで実装したもの。 ただし、先に断っておくと、これは実装している機能の割にコード量が多いし、逐次的でないから読みにくいと思う。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import socket
import select


# 読み取りが可能になるまで待っているソケットと、可能になったときに呼び出されるハンドラ・引数の対応を持つ
read_waiters = {}
# 書き込みが可能になるまで待っているソケットと、可能になったときに呼び出されるハンドラ・引数の対応を持つ
write_waiters = {}
# 接続してきたクライアントとの接続情報を格納する
connections = {}


def accept_handler(serversocket):
    """サーバソケットが読み取り可能になったとき呼ばれるハンドラ"""
    # 準備ができているので、すぐに accept() できる
    clientsocket, (client_address, client_port) = serversocket.accept()

    # クライアントソケットもノンブロックモードにする
    clientsocket.setblocking(False)

    # 接続してきたクライアントの情報を出力する
    # ただし、厳密に言えば print() もブロッキング I/O なので避けるべき
    print('New client: {0}:{1}'.format(client_address, client_port))

    # ひとまずクライアントの一覧に入れておく
    connections[clientsocket.fileno()] = (clientsocket,
                                          client_address,
                                          client_port)

    # 次はクライアントのソケットが読み取り可能になるまで待つ
    read_waiters[clientsocket.fileno()] = (recv_handler,
                                           (clientsocket.fileno(), ))

    # 次のクライアントからの接続を待ち受ける
    read_waiters[serversocket.fileno()] = (accept_handler, (serversocket, ))


def recv_handler(fileno):
    """クライアントソケットが読み取り可能になったとき呼ばれるハンドラ"""
    def terminate():
        """クライアントとの接続が切れたときの後始末"""
        # クライアント一覧から取り除く
        del connections[clientsocket.fileno()]
        # ソケットを閉じる
        clientsocket.close()
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))

    # クライアントとの接続情報を取り出す
    clientsocket, client_address, client_port = connections[fileno]

    try:
        # 準備ができているので、すぐに recv() できる
        message = clientsocket.recv(1024)
    except OSError:
        terminate()
        return

    if len(message) == 0:
        terminate()
        return

    print('Recv: {0} to {1}:{2}'.format(message,
                                        client_address,
                                        client_port))

    # 次はクライアントのソケットが書き込み可能になるまで待つ
    write_waiters[fileno] = (send_handler, (fileno, message))


def send_handler(fileno, message):
    """クライアントソケットが書き込み可能になったとき呼ばれるハンドラ"""
    # クライアントとの接続情報を取り出す
    clientsocket, client_address, client_port = connections[fileno]

    # 準備ができているので、すぐに send() できる
    sent_len = clientsocket.send(message)
    print('Send: {0} to {1}:{2}'.format(message[:sent_len],
                                        client_address,
                                        client_port))

    if sent_len == len(message):
        # 全て送ることができたら、次はまたソケットが読み取れるようになるのを待つ
        read_waiters[clientsocket.fileno()] = (recv_handler,
                                               (clientsocket.fileno(), ))
    else:
        # 送り残している内容があったら、再度ソケットが書き込み可能になるまで待つ
        write_waiters[fileno] = (send_handler,
                                 (fileno, message[sent_len:]))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # ソケットをノンブロックモードにする
    serversocket.setblocking(False)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    # クライアントからの接続がくるまで待つ
    read_waiters[serversocket.fileno()] = (accept_handler, (serversocket, ))

    while True:
        # ソケットが読み取り・書き込み可能になるまで待つ
        rlist, wlist, _ = select.select(read_waiters.keys(),
                                        write_waiters.keys(),
                                        [],
                                        60)

        # 読み取り可能になったソケット (のファイル番号) の一覧
        for r_fileno in rlist:
            # 読み取り可能になったときに呼んでほしいハンドラを取り出す
            handler, args = read_waiters.pop(r_fileno)
            # ハンドラを実行する
            handler(*args)

        # 書き込み可能になったソケット (のファイル番号の一覧)
        for w_fileno in wlist:
            # 書き込み可能になったときに呼んでほしいハンドラを取り出す
            handler, args = write_waiters.pop(w_fileno)
            # ハンドラを実行する
            handler(*args)


if __name__ == '__main__':
    main()

Python では select(2) システムコールの薄いラッパとして select モジュールが使える。 このモジュールが提供する select() 関数には、ファイルディスクリプタの入ったリストを渡す。

ファイルディスクリプタというのは、名前だけ聞くと難しそうだけどただの整数に過ぎない。 これは、各ソケットやファイルなどを使うときに OS が割り当てた一意な整数を指している。 ようするに 10 とか 20 とかいう数字が、何らかのソケットやファイルなどを表す。 ソケットに割り当てられたファイルディスクリプタは fileno() メソッドで得られる。

select() 関数には、読み込みや書き込みの準備ができたら通知してほしいファイルディスクリプタを渡す。 そして select() 関数を呼び出すと、そこでブロックした後に、準備ができたファイルディスクリプタが返される。 返ってきたファイルディスクリプタは、既に読み書きができるようになっているので指示を出しても例外にはならない。

先ほどのサンプルコードでは、そのようにして準備ができたものに対して読み書きをしている。 ビジーループと比べると CPU を使い切ることもなく、複数のクライアントを処理できる。 また、大きなポイントとしてはシングルスレッドにも関わらず、複数のクライアントを処理できているところだ。 これはソケットをブロッキングで使っていたときとの大きな違いだろう。

ちなみに、今回使った select(2) システムコールにはパフォーマンス上の問題が知られている。 そのため、実用的な用途で使われることはそこまで多くない。 代わりに、BSD 系なら kqueue(2)、Linux であれば epoll(2) が用いられる。 ただし、select(2) なら大抵のプラットフォームで使えるので、それらに比べると移植性が高いというメリットはある。

また「ソケットやファイルなど」と前述した通り、実はブロッキング・ノンブロッキングという概念はソケットに限った話ではない。 ファイルやデバイスについてもノンブロッキングで扱うことはできる。 そして、これはノンブロッキングなソケットプログラミングをする上で重要な意味を持ってくる。 詳細は後述するものの、これはノンブロッキングとブロッキングを同じスレッドで混ぜて使うと問題が発生する、というもの。

尚、前述した通り先ほどのサンプルコードはシングルプロセス・シングルスレッドで動作している。 そのため、複数の CPU コアを使い切ることはできない。 使い切れるようにするときは、マルチプロセスにする必要がある。 もちろん、これは GIL の制約のためにプロセスを複数立ち上げる必要があるに過ぎない。 別の処理系やプログラミング言語であれば、単にマルチスレッドにするだけで良い。 いずれの場合でも、それぞれのスレッドごとにイベントループを用意する。

ノンブロッキング I/O をラップした API やライブラリを使う

先ほどの例ではイベントループのシステムコールを使ってノンブロッキングなソケットを処理してみた。 とはいえ、実際にシステムコールを直接使ってソケットプログラミングする機会は、あまりないと思う。 なぜなら、先ほどのサンプルコードを見て分かる通り、それらの API はそのままでは扱いにくい上にコード量も増えてしまうため。

実際には、イベントループをラップしたライブラリを使ってプログラミングすることになると思う。 どんなライブラリがあるかはプログラミング言語ごとに異なる。 例えば C 言語なら libev が有名だと思うし Python なら Twisted などがある。 また、Python に関しては 3.4 から標準ライブラリに asyncio というモジュールが追加された。 次は、この asyncio を使ってみることにしよう。

Python の asyncio には色んなレイヤーの API が用意されている。 それこそ、先ほどのシステムコールを直接使うのと大差ないようなコードも書ける。 しかし、それだとライブラリを使う意味がないので、もうちょっと抽象度の高いものを使ってみた。 次のサンプルコードでは asyncio を使ってエコーサーバを実装している。 コードを見て分かる通り、先ほどと比べるとだいぶコード量が減って読みやすくなっている。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio


class EchoServer(asyncio.Protocol):

    def connection_made(self, transport):
        """クライアントからの接続があったときに呼ばれるイベントハンドラ"""
        # 接続をインスタンス変数として保存する
        self.transport = transport

        # 接続元の情報を出力する
        client_address, client_port = self.transport.get_extra_info('peername')
        print('New client: {0}:{1}'.format(client_address, client_port))

    def data_received(self, data):
        """クライアントからデータを受信したときに呼ばれるイベントハンドラ"""
        # 受信した内容を出力する
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Recv: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

        # 受信したのと同じ内容を返信する
        self.transport.write(data)
        print('Send: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

    def connection_lost(self, exc):
        """クライアントとの接続が切れたときに呼ばれるイベントハンドラ"""
        # 接続が切れたら後始末をする
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))
        self.transport.close()


def main():
    host = 'localhost'
    port = 37564

    # イベントループを用意する
    ev_loop = asyncio.get_event_loop()

    # 指定したアドレスとポートでサーバを作る
    factory = ev_loop.create_server(EchoServer, host, port)
    # サーバを起動する
    server = ev_loop.run_until_complete(factory)

    try:
        # イベントループを起動する
        ev_loop.run_forever()
    finally:
        # 後始末
        server.close()
        ev_loop.run_until_complete(server.wait_closed())
        ev_loop.close()


if __name__ == '__main__':
    main()

注目すべきは、もはやソースコードの中に socket モジュールが登場していないところ。 それらは Protocol や Transport といった抽象的なオブジェクトに取って代わられている。

では、本当に内部でイベントループのシステムコールが使われているのかを調べてみよう。 まずは上記のサンプルコードを実行して、エコーサーバを起動する。

$ python asyncioserver.py

続いて、別のターミナルを開いたら上記エコーサーバが動いているプロセスの ID を調べる。

$ ps auxww | grep [a]syncioserver
amedama        31018   0.0  0.2  2430616  17344 s000  S+    7:58PM   0:00.16 python asyncioserver.py

そして、プロセスで発行されるシステムコールをトレースする dtruss コマンドを仕掛ける。

$ sudo dtruss -p 31018

準備ができたらクライアントを接続する。

$ nc localhost 37564

すると、次のように kevent(2) システムコールが発行されていることが分かる。 kevent(2) システムコールは kqueue(2) と共に用いるイベントループのためのシステムコール。

$ sudo dtruss -p 31018
SYSCALL(args)            = return
...
kevent(0x3, 0x0, 0x0)            = 0 0
getsockname(0xA, 0x7FFF50F55B00, 0x7FFF50F55AFC)                 = 0 0
setsockopt(0xA, 0x6, 0x1)                = 0 0
kevent(0x3, 0x0, 0x0)            = 0 0
write(0x1, "New client: 127.0.0.1:51822\n\0", 0x1C)              = 28 0
kevent(0x3, 0x10F8FB6F0, 0x1)            = 0 0
kevent(0x3, 0x0, 0x0)            = 0 0

どうやら、ちゃんと内部がノンブロッキングな世界になっていることが確認できた。 しかも、プラットフォームに応じたパフォーマンスに優れるイベントループをちゃんと使ってくれている。

ノンブロッキングとブロッキングは混ぜるな危険

ちなみに、ノンブロッキングなソケットプログラミングをする上では重要なポイントが一つある。 それは、ノンブロッキングなソケットを扱うスレッドで、ブロッキングな操作をしてはいけない、という点。 もちろん、前述した通りブロッキング・ノンブロッキングという概念はソケットに限った話ではない。 つまり、言い換えるとノンブロッキングな I/O とブロッキングな I/O は同じスレッドで混ぜてはいけない。

二つ前のセクションで登場した select システムコールを使ったサンプルコードを思い出してほしい。 あのサンプルコードでは、シングルスレッドで複数のクライアントをさばいていた。 では、もしその一つしかないスレッドが何処かでブロックしたら、何が起こるだろうか? これは、そのスレッドでさばいている全ての処理が、そこで停止してしまうことを意味する。 これは、ノンブロッキングな I/O を扱う上で登場する代表的な問題の一つ。

どのようなことが起こるかを実際に確かめてみよう。 次のサンプルコードでは、データを受信した際に time.sleep() 関数を使っている。 これには、実行したスレッドを指定した時間だけブロックさせる効果がある。 正に、ノンブロッキングなスレッドへのブロッキングな操作の混入といえる。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import asyncio
import time


class EchoServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport

        client_address, client_port = self.transport.get_extra_info('peername')
        print('New client: {0}:{1}'.format(client_address, client_port))

    def data_received(self, data):
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Recv: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

        # 何らかの処理で、イベントループのスレッドがブロックしてしまった!
        print('Go to sleep...')
        time.sleep(20)

        self.transport.write(data)
        print('Send: {0} to {1}:{2}'.format(data,
                                            client_address,
                                            client_port))

    def connection_lost(self, exc):
        client_address, client_port = self.transport.get_extra_info('peername')
        print('Bye-Bye: {0}:{1}'.format(client_address, client_port))
        self.transport.close()


def main():
    host = 'localhost'
    port = 37564

    ev_loop = asyncio.get_event_loop()

    factory = ev_loop.create_server(EchoServer, host, port)
    server = ev_loop.run_until_complete(factory)

    try:
        ev_loop.run_forever()
    finally:
        server.close()
        ev_loop.run_until_complete(server.wait_closed())
        ev_loop.close()


if __name__ == '__main__':
    main()

上記のサンプルコードを実行してエコーサーバを起動しよう。

$ python asyncblock.py

続いて別のターミナルから nc コマンドでサーバに接続したら適当な文字列を入力する。

$ nc localhost 37564
hogehoge

これでイベントループを回しているスレッドはブロックを起こした。

$ python asyncblock.py
New client: 127.0.0.1:51883
Recv: b'hogehoge\n' to 127.0.0.1:51883
Go to sleep...

すかさず別のターミナルから nc でクライアントを追加してみよう。

$ nc localhost 37564

すると、今度はサーバに新しいクライアントが追加された旨は表示されない。 エコーサーバ全体の処理が、一箇所で停止してしまっているからだ。

$ python asyncblock.py
New client: 127.0.0.1:51883
Recv: b'hogehoge\n' to 127.0.0.1:51883
Go to sleep...

もうしばらく待つと、スレッドのブロックが解除されて新しいクライアントの接続が受理される。

python asyncblock.py
New client: 127.0.0.1:51883
Recv: b'hogehoge\n' to 127.0.0.1:51883
Go to sleep...
Send: b'hogehoge\n' to 127.0.0.1:51883
New client: 127.0.0.1:51884

このように、ノンブロッキング I/O を扱うスレッドにブロッキング I/O のコードが混入すると、全てがそこで停止してしまう。

そして、真にこの問題が恐ろしいのは、混入に気づきにくい点かもしれない。 先ほどのサンプルコードは極端な例なので、使ってみるだけでも明確に変化を知覚できた。 しかしながら、実際にはブロッキング I/O の処理は人間にとって一瞬なので気づくことは難しいかもしれない。 にも関わらず、そのタイミングで一連の処理が全て停止していることに間違いはない。 結果として、パフォーマンスの低下をもたらす。

また、世の中のほとんどのライブラリはブロッキング I/O を使って実装されている。 例えば、外部の WebAPI を叩こうとそのまま requests でも使おうものなら、それだけでアウトだ。 それに、HTTP のような分かりやすい I/O 以外にもキューのような基本的な部品であっても操作をブロックしたりする。

つまり、新たに何かを使おうとしたら、それにブロックする操作が混入していないかをあらかじめ調べる必要がある。 さらに、ブロックする操作が含まれると分かったら、それをブロックしないようにする方法を模索しなきゃいけない。 以上のように、イベントループを中心に据えた非同期なフレームワークというのは、一般的な認識よりもずっと扱いが難しいと思う。

ブロッキング I/O が混入する問題へのアプローチについて

ノンブロッキング I/O を扱うスレッドにブロッキング I/O が混ざり込む問題に対するアプローチはいくつかある。 もちろん、混入しないように人間が頑張ってコードを見張る、というのは最も基本的なやり方の一つ。

それ以外には、プログラミング言語のレベルでブロッキング I/O を排除してしまうという選択肢もある。 これは例えば JavaScript (Node.js) が採用している。 Golang も、ネットワーク部分に関してはノンブロッキング I/O しか用意していないらしい。 初めからブロッキング I/O の操作が存在していないなら、そもそも混入することはない。

それ以外には、モンキーパッチを当てるというアプローチもある。 つまり、ブロッキング I/O を使うコードを、全てノンブロッキング I/O を使うように書き換えてしまう。 Python であれば、例えば EventletGevent といったサードパーティー製ライブラリがこれにあたる。

試しに Eventlet を使った例を見てみよう。 まずは Python のパッケージマネージャである pip を使って Eventlet をインストールしておく。

$ pip install eventlet

それでは Eventlet の魔法をお見せしよう。 次のサンプルコードは、最初に示したマルチスレッドの例に、たった二行だけコードを追加している。 その冒頭に追加した二行こそ、まさにモンキーパッチを当てるためのコードになっている。 たったこれだけで、ブロッキングだった世界がノンブロッキングな世界に書き換わってしまう。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 標準ライブラリにモンキーパッチを当てる
# ブロッキング I/O を使った操作が裏側で全てノンブロッキング I/O を使うように書き換えられる
import eventlet
eventlet.monkey_patch()

import socket
import threading


def client_handler(clientsocket, client_address, client_port):
    """クライアントとの接続を処理するハンドラ"""
    while True:
        try:
            message = clientsocket.recv(1024)
            print('Recv: {0} from {1}:{2}'.format(message,
                                                  client_address,
                                                  client_port))
        except OSError:
            break

        if len(message) == 0:
            break

        sent_message = message
        while True:
            sent_len = clientsocket.send(sent_message)
            if sent_len == len(sent_message):
                break
            sent_message = sent_message[sent_len:]
        print('Send: {0} to {1}:{2}'.format(message,
                                            client_address,
                                            client_port))

    clientsocket.close()
    print('Bye-Bye: {0}:{1}'.format(client_address, client_port))


def main():
    serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)

    host = 'localhost'
    port = 37564
    serversocket.bind((host, port))

    serversocket.listen(128)

    while True:
        clientsocket, (client_address, client_port) = serversocket.accept()
        print('New client: {0}:{1}'.format(client_address, client_port))

        client_thread = threading.Thread(target=client_handler,
                                         args=(clientsocket,
                                               client_address,
                                               client_port))
        client_thread.daemon = True
        client_thread.start()


if __name__ == '__main__':
    main()

本当にイベントループが使われているのか確かめてみることにしよう。 まずは、上記のサンプルコードを実行する。

$ python eventletserver.py

続いて別のターミナルを開いて、上記で実行しているプロセス ID を調べる。

$ ps auxww | grep [e]ventletserver
amedama         7796   0.0  0.1  2426888  19488 s000  S+    8:44PM   0:00.17 python eventletserver.p

dtruss コマンドでプロセス内で発行されるシステムコールをトレースする。

$ sudo dtruss -p 7796

クライアントからサーバに接続してみよう。

$ nc localhost 37564

すると、次のように dtruss の実行結果に kevent システムコールが登場している。 本当に、モンキーパッチを当てるだけでノンブロッキング I/O を使うようになった。

$ sudo dtruss -p 7796
SYSCALL(args)            = return
kevent(0x4, 0x101256710, 0x1)            = 0 0
accept(0x3, 0x7FFF5F8A7750, 0x7FFF5F8A774C)              = 7 0
ioctl(0x7, 0x20006601, 0x0)              = 0 0
ioctl(0x7, 0x8004667E, 0x7FFF5F8A7A04)           = 0 0
ioctl(0x7, 0x8004667E, 0x7FFF5F8A74E4)           = 0 0
write(0x1, "New client: 127.0.0.1:54132\n\0", 0x1C)              = 28 0
recvfrom(0x7, 0x7FE86782BC20, 0x400)             = -1 Err#35
kevent(0x4, 0x101256710, 0x1)            = 0 0
kevent(0x4, 0x7FE866D11020, 0x0)                 = 0 0
accept(0x3, 0x7FFF5F8A7750, 0x7FFF5F8A774C)              = -1 Err#35
kevent(0x4, 0x101256710, 0x1)            = 0 0

注目すべきは、逐次的なプログラミングモデルを保ったまま、それが実現できているところだろう。 asyncio の例でも、データの読み書きなどは逐次的に書くことができたものの、基本はイベントドリブンだった。 しかし Eventlet のコードでは、完全にブロッキング I/O を使っているときと同じように書くことができている。

これが一体どのようにして実現されているかというと、主にグリーンスレッドの寄与が大きい。 Eventlet では、カーネルで実装されたネイティブスレッドの代わりにユーザランドで実装されたグリーンスレッドを用いる。 グリーンスレッドには、実装によってコルーチン、軽量プロセス、協調スレッドなど色々な呼び方がある。

カーネルで実装されたネイティブスレッドとの大きな違いは、コンテキストスイッチのタイミングがプログラムで制御できるところにある。 ネイティブスレッドのコンテキストスイッチはカーネルのスケジューラ次第なので、基本的にプログラムからは制御できない。 それに対し、グリーンスレッドでは実行中のスレッドが自発的に処理を手放さない限りコンテキストスイッチが起こらない。 つまり、I/O などの外的な要因がない限りグリーンスレッドは決定論的に動作することを意味している。

Eventlet では、モンキーパッチを使うと既存のスレッドやソケットがインターフェースはそのままに書き換えられる。 そして、本来ならブロックするコードに処理が到達したタイミングでコンテキストスイッチが起こるように変化する。 コンテキストスイッチする先は、読み書きの準備が整った I/O を処理しているグリーンスレッドだ。 これは、先ほどシステムコールをトレースした通り、イベントループを使って判断している。 そして、コンテキストスイッチした元のグリーンスレッドは、イベントループを使って処理中の I/O が読み書きができるようになるまで待たされる。 ちなみに Golang はプログラミング言語のレベルで上記を実現していて、それは goroutine と呼ばれている。

このアーキテクチャでは、逐次的なプログラミングモデルを保ったままノンブロッキング I/O を使った恩恵が受けられる。 また、グリーンスレッドは一般的にネイティブスレッドよりもコンテキストに必要なメモリのサイズが小さい。 つまり、同時に多くのクライアントをさばきやすい。

ただし、Eventlet のようなモンキーパッチを使ったアプローチには抵抗がある人も多いかもしれない。 実際のところ Eventlet にはクセが全くないとは言えないし、よく分からずに使うのはやめた方が良いと思う。 ただし、名誉のために言っておくと Eventlet は OpenStack のような巨大なプロジェクトでも使われている実績のあるライブラリだ。

ちなみに、モンキーパッチでは一つだけブロッキング I/O の混入を防げないところがある。 それは Python/C API を使って書かれた拡張モジュールだ。 コンパイル済みの拡張モジュールに対しては、個別に対応しない限り自動でモンキーパッチが効くことはない。 これは、典型的には Python/C API で書かれたデータベースドライバで問題になることが多い。

まとめ

今回はソケットプログラミングにおいて、どういったアーキテクチャが考えられるかについて見てきた。 まず、ソケットは大きく分けてブロッキングで使うかノンブロッキングで使うかという選択肢がある。

ブロッキングは、逐次的なプログラミングモデルで扱いやすいことから理解もしやすい。 ただし、複数のクライアントをさばくにはマルチスレッドやマルチプロセスにする必要がある。 それらは必要なコンテキストのサイズやスイッチのコストも大きいことから、スケーラビリティの面で問題となりやすい。

それに対し、ノンブロッキングはイベントドリブンなプログラミングモデルとなりやすいことから理解が難しい。 しかしながら、イベントループを使うことでシングルスレッドでも複数のクライアントを効率的にさばける。

また、ブロッキング・ノンブロッキングというのはソケットに限った概念ではない。 ファイルなども同じようにノンブロッキングで扱うことができる。

ノンブロッキングで I/O を扱うときの注意点としては、同じスレッドをブロックさせてはいけない、というところ。 言い換えると、イベントループを回しているスレッドにブロッキングな I/O のコードを混入させてはいけない。 もし混入するとパフォーマンス低下をもたらす。

ブロッキング I/O が混入する問題に対するアプローチは、言語や処理系、ライブラリによっていくつかある。 例えば JavaScript (Node.js) では、プログラミング言語自体にブロッキング I/O を扱う API がない。 それ以外だと、スクリプト言語ならモンキーパッチで動的に実装を書き換えてしまうというものもある。

参考文献

詳解UNIXプログラミング 第3版

詳解UNIXプログラミング 第3版