パート14: Deferred が無かったら

はじめに

このパートでは Deferred クラスの違う一面を学んでいきましょう。 議論を盛り上げるため、詩に関連したサービスにサーバをもうひとつ追加します。 外部の同じサーバから詩を取得したがっている内部的なクライアントがたくさんあると仮定してください。 しかし、この外部のサーバは遅く、インターネット越しの膨大な要求によってすでに過負荷になっています。 クライアントをさらに増やして可愛そうなサーバの問題を増長させたくはありません。

その代わりにキャッシュプロキシサーバを作ってみましょう。 クライアントがプロキシに接続すると、プロキシは外部のサーバから詩を取得するか以前に取得したキャッシュコピーを返します。 すべてのクライアントをプロキシに向けると外部サーバへの負荷を抑えられます。 図30にこの設定を示します。

_images/p14_proxy1.png

図30:キャッシュするプロキシサーバ

クライアントが詩を取得するためにプロキシに接続すると何が起こるかを考えてみてください。 キャッシュが空の場合にはプロキシは (非同期に) 外部サーバからの応答を待たなくてはなりません。詩を送り返すのはその後です。 幸いにして、私たちは遅延オブジェクトを返す非同期関数を使ってこの状況を処理する方法を知っています。 とはいえ、キャッシュに詩がすでに存在していると、プロキシは即座にそれを送り返せます。待つ必要がありません。 このため、プロキシが詩を取得する内部機構は時として非同期であり、同期にもなりえるのです。

それでは、非同期でしかない関数しかなかったらどうすれば良いのでしょうか? Twisted はふたつのオプションを提供しており、どちらも Deferred クラスでこれまで使ってこなかった機能に依存しています。 呼び出し側に戻す「前」に遅延オブジェクトを発火させてしまうことです。

遅延オブジェクトを二回は発火できませんが、発火された後でもコールバックとエラー用コールバックを追加できますので、これは機能します。 こうすると、単純に遅延オブジェクトはコールバックチェーンを続けていきます。 ひとつ大事なこととしては、発火済みの遅延オブジェクトは新しいコールバック (もしくはエラー用コールバックであり、遅延オブジェクトの状態に依ります) をすぐさま発火させるかもしれない、ということです。 つまり、コールバックを追加した瞬間にです。

発火済みの遅延オブジェクトを表す図31について考えてみましょう。

_images/p14_deferred-13.png

図31:発火された遅延オブジェクト

この時点でもうひとつのコールバックとエラー用コールバックのペアを付け足していたとすれば、遅延オブジェクトはすぐさまコールバックを発火させるでしょう。この様子を図32に示します。

_images/p14_deferred-14.png

図32:新しいコールバックを持つ同じ遅延オブジェクト

ひとつ前のコールバックは成功しますので、このコールバック (エラー用コールバックではありません) は発火されます。 もしも失敗したら (Exception を送り出すか Failure を返した場合) 、新しいエラー用コールバックが呼び出されるでしょう。

twisted-deferred/defer-11.py にある例でこの新機能を確認できます。 スクリプトを読んで実行してみてください。遅延オブジェクトを発火させてからコールバックを付け足したときに、それがどのように振舞うかを理解できるでしょう。 最初の例では、新しいコールバックそれぞれが即座に呼び出されていた方法に注意しましょう。(print が出力する順番で分かります)

スクリプトのふたつ目の例は、コールバックをすぐには発火しないことで、遅延オブジェクトを pause() できる方法を教えてくれます。 コールバックを発火する準備が整ったら unpause() を呼び出します。 これこそが Deferred のコールバックが自分ではない遅延オブジェクトを返したときに、自分自身を一時停止させる方法なのです。 素晴らしいですね!

プロキシ 1.0

それでは twisted-server-1/poetry-proxy.py にある詩のプロキシの最初のバージョンを見てみましょう。 プロキシはクライアントとしてもサーバとしても動作しますので、プロトコルクラスとファクトリクラスのふたつのペアを持ちます。 ひとつは詩を提供するためであり、もうひとつは外部サーバから詩を取得するためです。 クライアントのペアのためのコードを見ておく必要はないでしょう。ひとつ前の詩のクライアントと一緒だからです。

とはいえサーバのペアを見る前に、 ProxyService に目を通しておきましょう。 これは詩を取得するためのサーバ側のプロトコルです。

class ProxyService(object):

    poem = None # the cached poem

    def __init__(self, host, port):
        self.host = host
        self.port = port

    def get_poem(self):
        if self.poem is not None:
            print 'Using cached poem.'
            return self.poem

        print 'Fetching poem from server.'
        factory = PoetryClientFactory()
        factory.deferred.addCallback(self.set_poem)
        from twisted.internet import reactor
        reactor.connectTCP(self.host, self.port, factory)
        return factory.deferred

    def set_poem(self, poem):
        self.poem = poem
        return poem

ここで大事なメソッドは get_poem です。 すでにキャッシュに詩があれば、その詩を返すだけです。 キャッシュになければ外部サーバへの接続を初期化し、詩が返されたら発火する遅延オブジェクトを返します。 このため get_poem はたまにしか非同期でない関数といえます。

このような関数をどうやって扱いましょうか? サーバサイドにある protocol/factory のペアを見てみましょう。

class PoetryProxyProtocol(Protocol):

    def connectionMade(self):
        d = maybeDeferred(self.factory.service.get_poem)
        d.addCallback(self.transport.write)
        d.addBoth(lambda r: self.transport.loseConnection())

class PoetryProxyFactory(ServerFactory):

    protocol = PoetryProxyProtocol

    def __init__(self, service):
        self.service = service

ファクトリはそのままですね。 プロトコルインスタンスが get_poem メソッドを呼び出せるようにプロキシサービスの参照を保存しておくだけです。 プロトコルが肝心の部分です。 プロトコルは get_poem を直接呼び出す代わりに、 twisted.internet.defer モジュールにある maybeDeferred という名前のラッパー関数を使います。

maybeDeferred 関数は、異なる関数への参照とそれに渡すオプション引数 (この例では使いません) を受け取ります。 maybeDeferred は実際にその関数を呼び出し、次のいずれかの振る舞いをします。

  • 関数が遅延オブジェクトを返したら、 maybeDeferred はその遅延オブジェクトを返します。

  • 関数が失敗を返したら、 maybeDeferred はその失敗で発火された (errback で) 新しい遅延オブジェクトを返します。

  • 関数が通常の値を返したら、 maybeDeferred はその値で発火された遅延オブジェクトを返します。

  • 関数が例外を送出したら、 maybeDeferred は例外をラップした Failure で発火された (errback で) 遅延オブジェクトを返します。

言い換えると、あなたが渡した関数が決して遅延オブジェクトを返さないものであったとしても、 maybeDeferred の戻り値は遅延オブジェクトであることが保証されています。 これによって安全に (例外によって失敗したものでも) 同期関数を呼び出すことができ、遅延オブジェクトを返す非同期関数のように扱うことができます。

  • ノート 1:

    それでも、わすかな違いがあるでしょう。同期関数が返す遅延オブジェクトは発火済みですので、いかなるコールバックもエラー用コールバックもすぐに実行されます。reactor ループの将来的な繰り返しには入りません。

  • ノート 2:

    あと知恵になりますが、いつも遅延オブジェクトを返す関数を “maybeDeferred” と名付けたことは最善ではありません。しかし、このまま進みます。

プロトコルが遅延オブジェクトを使える状態になると、詩をクライアントに送って接続を閉じるようなコールバックをいくつか追加できます。 私たちの詩の最初のプロキシにぴったりですね!

プロキシを実行する

プロキシを試してみるには、次のようにして詩のサーバを起動しておきます。

python twisted-server-1/fastpoetry.py --port 10001 poetry/fascination.txt

そしてプロキシサーバを起動させます。

python twisted-server-1/poetry-proxy.py --port 10000 10001

これで10001番ポートの詩のサーバを10000番ポートでプロキシしてくれるようになります。 ということで、クライアントにはプロキシを指定します。

python twisted-client-4/get-poetry.py 10000

詩の変換に関知しないクライアントも使えます。 クライアントのウィンドウには詩が表示され、プロキシのウィンドウにはサーバから詩を取得していることを表すテキストを確認できるでしょう。 もう一度クライアントを動かせてみると、プロキシはキャッシュにある詩を使っていることを確認できます。もちろん、クライアントは前回と同じ詩を表示します。

プロキシ 2.0

以前も言及したように、同じスキームを実現するもうひとつの方法があります。 これは Poetry Proxy 2.0 として twisted-server-2/poetry-proxy.py にあります。 遅延オブジェクトを戻り値として返す前に発火させることができますので、詩がキャッシュに見つかった場合は、プロキシサービスに発火済みの遅延オブジェクトを返させるのです。 プロキシサービスの get_poem メソッドの新バージョンは次のようになります。

def get_poem(self):
    if self.poem is not None:
        print 'Using cached poem.'
        # return an already-fired deferred
        return succeed(self.poem)

    print 'Fetching poem from server.'
    factory = PoetryClientFactory()
    factory.deferred.addCallback(self.set_poem)
    from twisted.internet import reactor
    reactor.connectTCP(self.host, self.port, factory)
    return factory.deferred

defer.succeed 関数は、 与えられた結果から発火済み遅延オブジェクトを生成する簡単な方法です。 関数の実装を読んでみると、新しい遅延オブジェクトを生成してそれに .callback を実行しているだけであることが分かるでしょう。 なお、失敗として発火済みの遅延オブジェクトが欲しい場合には defer.fail を使えます。

このバージョンでは get_poem はいつも遅延オブジェクトを返しますので、もはや プロトコルクラスmaybeDeferred を使う必要がありません。 (先ほど学んだように、それを使い続けても動きます)

class PoetryProxyProtocol(Protocol):

    def connectionMade(self):
        d = self.factory.service.get_poem()
        d.addCallback(self.transport.write)
        d.addBoth(lambda r: self.transport.loseConnection())

ふたつの変更を除けば、プロキシのセカンドバージョンはファーストにそっくりです。 そして、元のバージョンと同じ方法で実行できます。

まとめ

このパートでは遅延オブジェクトが返される前にどのようにして発火されることがあるかを学びました。 これによって、同期 (もしくはたまに同期) コードで使うことができます。 実現方法はふたつあります。

  • 遅延オブジェクトを返したり普通の値を返したりする (もしくは例外を投げる) 関数を扱うために maybeDeferred を使います。

  • defer.succeeddefer.fail を使って自分自身の遅延オブジェクトを予め発火させることができます。これにより、半同期関数は値に関係なくいつも遅延オブジェクトを返します。

どちらを選択するかは我々次第です。 ひとつ目の方法は私たちの関数がいつも非同期であるとは限らないことを強調し、もうひとつの方法はクライアントのコードを簡潔にします。 おそらく、どちらかを選ぶのに決定的となる主張もないでしょう。

発火後でもコールバックとエラー用コールバックを遅延オブジェクトに追加できますので、両方のテクニックを使えます。 これは”パート9: Deferred 再入門”と twisted-deferred/defer-unhandled.py の例で発見した興味深いことを説明してくれます。 遅延オブジェクトにおける「処理されない例外」 (最後のコールバックかエラー用コールバックのどちらかが失敗したときに発生します) はガベージコレクトされるまでレポートされない、ということを学びました (つまり、ユーザーコードからは参照されていません) 。 しかし今ではその理由が分かります。 そのようなエラーを処理するコールバックのペアを遅延オブジェクトにいつでも追加できますので、 Twisted が「そのエラーは処理されなかった」と言うまで、遅延オブジェクトに対する最後の参照が無くなってしまうことはありません。

Deferred クラス (twisted.internet パッケージにあります) の探求に多くの時間を使ってきましたが、実はインターネットとは関係ないことに気付いたかもしれません。 コールバックを管理するための抽象化なのです。そこでは何が行われているのでしょうか? それこそ Twisted が形作られてきた歴史なのです。 考えられる最善の言葉 (World Ultimate Frisbee League でプレイするために何百万ドルも払わされるような場合ですが) を挙げるなら、 defer モジュールは twisted.python にあるべきでしょう。 もちろん、そのような世界では超越した力が働く犯罪と戦うのに忙しすぎて、この入門文書を読めないかもしれません。 まぁ、それも人生でしょう。 (that’s life)

さあ、これで遅延オブジェクトについては万全ですか? ようやく全ての機能を理解できたのでしょうか? 少し前まではこれが基本的なケースでした。 しかし、Twisted の開発者たちは新しい機能を追加してきました。 Twisted 10 のリリースから、 Deferred クラスは全く新しいことができます。 これは後々のパートで紹介するとして、遅延オブジェクトからちょっと離れて、”パート15: テストされた詩”ではテスト方法も含めた Twisted の他の側面にも目を向けてみましょう。

おすすめの練習問題

  1. .errback() を使って予め失敗した遅延オブジェクトを表現するように twisted-deferred/defer-11.py の例を修正してみましょう。 defer.fail のドキュメントと実装を読んでみてください。

  2. 二時間以上前にキャッシュされた詩は破棄するように、プロキシを修正してみましょう。その詩への次のリクエストはサーバからの再リクエストになります。

  3. プロキシはサーバと二回以上やり取りするのを避けるようにすべきです。 しかし、キャッシュに詩が存在しないときに複数のクライアントが同時にリクエストを送ると、プロキシは複数のリクエストを生成することになるでしょう。 スローサーバを使えば簡単に確認できます。

    リクエストをひとつしか生成しないようにプロキシサービスを修正しましょう。 今のところ、サービスはふたつの状態しか持ちません。 詩がキャッシュにあるかないかだけです。 リクエストを生成したものの完了していないことを表す、三つ目の状態を認識させる必要があるでしょう。 三つ目の状態で get_poem メソッドが呼ばれたら、 “waiters” リストに遅延オブジェクトを追加します。 新しい遅延オブジェクトは get_poem メソッドの結果になるでしょう。 最終的に詩が返されると、待っていた全ての遅延オブジェクトにその詩を渡して発火させて、キャッシュした状態に遷移します。 逆に詩の取得に失敗した場合は、待っていた全ての遅延オブジェクトの .errback() メソッドを発火させ、キャッシュが存在しない状態に遷移します。

  4. プロキシサービスに変換プロキシを追加してみましょう。 このサービスは元の変換サービスのように動作すべきです。 しかし、変換処理自体には外部サーバを使います。