パート13: Deferred と行こう¶
はじめに¶
“パート10: 変換された詩”で実装した詩のクライアント 5.1 を思い出してください。
あのクライアントでは、詩の変換エンジンの呼び出しを含むコールバックチェーン (図27:ふたつのプロトコルを持つ変換サーバ) を管理するために Deferred
を使いました。
client 5.1 では、エンジンはクライアント自身のコード内に同期関数として実装されていました。
“パート12: 詩の変換サーバー”で書いたようなネットワーク越しの詩の変換サービスを使う新しいクライアントを実装したいと思います。
しかし、ちょっと待ってください。
変換サービスにはネットワーク越しにアクセスするため、非同期入出力を使う必要があるでしょう。
そして、変換リクエスト用の API は非同期になるでしょう。
言い換えると、新しいクライアントでは try_to_cummingsify
コールバックは Deferred
を返すようになります。
遅延オブジェクトのチェーンの途中にあるコールバックが他の遅延オブジェクトを返したらどうなるでしょうか? 一番最初の遅延オブジェクトを「アウター」、二番目のものを「インナー」と呼ぶことにします。 アウターにおける N 番目のコールバックがインナーを返すとします。 コールバックは「自分は非同期だ。結果はまだない。」と主張します。 チェーンの過程では、アウターは次のコールバックもしくはエラー用コールバックに結果を渡して呼び出す必要がありますので、アウターはインナーが発火されるまで待たなくてはなりません。 もちろん、アウターが何かをブロックしてはなりませんので、その代わりに、コールバックチェーンの実行を一時停止して、reactor (またはアウターを発火させた何か) に制御を戻します。
このとき、アウターが再開するタイミングをどのようにして知るのでしょうか? 簡単です。コールバックとエラー用コールバックのペアをインナーに追加するのです。 インナーが発火されるとき、アウターは自身のチェーンを一時停止させます。 もしもインナーが成功したら (つまり、アウターが付け足したコールバックを呼び出したら)、アウターは N+1 のコールバックにその結果を渡します。 逆にインナーが失敗したら (アウターが付け足したエラー用コールバックを呼び出したら)、failure (失敗を表すオブジェクト) をエラー用コールバックに渡します。
なかなか複雑ですね。図28で考えを図にしてみましょう。
この図では、アウターは 4 層のコールバックとエラー用コールバックのペアを持ちます。 アウターが発火すると、最初のコールバックは遅延オブジェクト (インナー) を返します。 この時点で、アウターはチェーンの発火を停止し、制御を reactor に戻すでしょう (コールバックとエラー用コールバックのペアをインナーに追加した後に)。 しばらくするとインナーが発火し、アウターは自身のコールバックチェーンの処理を再開します。 アウターはインナー自体を発火させていないことに注意してください。 アウターはインナーの結果がいつになったら有効になるか、もしくはどのような結果になるかを知りえないので、そのようなことは不可能でしょう。 むしろ、アウターはインナーが発火されるのを単純に (非同期に) 待ちます。
図28において、インナーへ接続されているコールバックの線が、緑や赤ではなく、黒であることをよく見てください。 コールバックが成功したか失敗したかが、インナーが発火されるまで分からないからです。 アウターは次のコールバックを呼び出すかエラー用コールバックを呼び出すかを、自分自身のチェーンにおいてしか決められません。
図29では、図28と同じアウターとインナーの遅延オブジェクトが発火していく流れを reactor の観点から図示します。
これはおそらく Deferred
クラスの最も複雑な機能ですので、身につけるまでに時間がかかっても気にしないでください。
twisted-deferred/defer-10.py
にあるコード例を使ったもうひとつの方法で説明しましょう。
この例はふたつのアウターを生成します。ひとつは普通のコールバックを持ち、もうひとつはインナーを返すコールバックを持ちます。
コードと出力をじっくり見ると、インナーが返されると二つ目のアウターがチェーンを止めて、インナーが発火されると再び開始されることが分かるでしょう。
クライアント 6.0¶
入れ子になった遅延オブジェクトという新しい知識を使って、パート12でやったようなネットワーク越しの変換サービスを使う詩のクライアントを実装し直してみましょう。 twisted-client-6/get-poetry.py にコードがあります。 詩のプロトコルとファクトリは前回のバージョンと変わりません。 しかし、今は変換リクエストを生成するためのプロトコルとファクトリがあります。 変換クライアントの プロトコル は次のようになります。
class TransformClientProtocol(NetstringReceiver):
def connectionMade(self):
self.sendRequest(self.factory.xform_name, self.factory.poem)
def sendRequest(self, xform_name, poem):
self.sendString(xform_name + '.' + poem)
def stringReceived(self, s):
self.transport.loseConnection()
self.poemReceived(s)
def poemReceived(self, poem):
self.factory.handlePoem(poem)
基底クラスとして NetstringReceiver を使うと、この実装は非常に簡単になります。 接続が確立されるとすぐにサーバに変換リクエストを送信し、ファクトリから変換名と詩を取得します。 詩を受け取ると、処理するためにファクトリへ渡します。 ファクトリ のコードは次のようになります。
class TransformClientFactory(ClientFactory):
protocol = TransformClientProtocol
def __init__(self, xform_name, poem):
self.xform_name = xform_name
self.poem = poem
self.deferred = defer.Deferred()
def handlePoem(self, poem):
d, self.deferred = self.deferred, None
d.callback(poem)
def clientConnectionLost(self, _, reason):
if self.deferred is not None:
d, self.deferred = self.deferred, None
d.errback(reason)
clientConnectionFailed = clientConnectionLost
このファクトリはクライアント向けに設計されていて、プロトコルが使う変換名と詩を保存することによって、単一の変換リクエストを処理します。
ファクトリは変換リクエストの結果を表す遅延オブジェクトを生成します。
ファクトリがふたつのエラーの場合を扱う方法に注意してください。
接続に失敗した場合と、詩を受け取る前に接続が閉じられてしまった場合です。
詩を受け取っても clientConnectionLost
メソッドは呼び出されます。
しかし、この場合は handlePoem
メソッドのおかげで self.deferred
は None
になります。
ファクトリクラスは自分自身も発火させる遅延オブジェクトを生成します。 これは Twisted プログラミングにおいて守るべき良い決まりごとです。注目しておきましょう。
一般に、遅延オブジェクトを生成するオブジェクトは、その遅延オブジェクトを発火させることに責任を負うべきです。
「あなたが作ったらあなたが発火させてください」という決まりごとは、ある遅延オブジェクトが一回しか発火されないことを保証してくれて、Twisted プログラムの制御フローを簡単にしてくれます。
変換ファクトリに加えて、特定の変換サーバへの TCP 接続を隠蔽してくれる Proxy クラスもあります。
class TransformProxy(object):
"""
I proxy requests to a transformation service.
"""
def __init__(self, host, port):
self.host = host
self.port = port
def xform(self, xform_name, poem):
factory = TransformClientFactory(xform_name, poem)
from twisted.internet import reactor
reactor.connectTCP(self.host, self.port, factory)
return factory.deferred
このクラスは、他のコードでも変換リクエストに使えるような、単一の xform()
インターフェイスを表します。
このため、ホスト名とポート番号に煩わされることなく変換の要求を出して、遅延オブジェクトを受け取ることができます。
try_to_cummingsify コールバックを除いて、プログラムの残りの部分は変わりません。
def try_to_cummingsify(poem):
d = proxy.xform('cummingsify', poem)
def fail(err):
print >>sys.stderr, 'Cummingsify failed!'
return poem
return d.addErrback(fail)
このコールバックは遅延オブジェクトを返します。しかし、Proxy インスタンスを生成することを除けば main
関数の残りの部分を変更する必要はありません。
try_to_cummingsify
は遅延オブジェクトのチェーン (get_poetry
で返される遅延オブジェクトです) の一部ですので、
すでに非同期に使われていて、何も変更する必要がありません。
d.addErrback(fail)
の結果を返していることに気をつけてください。
これはちょっとした構文糖衣です。
addCallback
と addErrback
メソッドは元の遅延オブジェクトを返します。
次のように記述した方が良さそうですね。
d.addErrback(fail)
return d
最初のバージョンと同じことですが、ちょっと短くなりましたね。
クライアントをテストしてみる¶
新しいクライアントは以前のものたちとはやや異なる構文を持ちます。 10001番ポートで変換サービスを起動していて、ふたつの詩のサーバが10002番ポートと10003番ポートで動いているならば、次のように実行できます。
python twisted-client-6/get-poetry.py 10001 10002 10003
ふたつの詩をダウンロードして両方ともを変換するためには、変換サーバを次のように起動しましょう。
python twisted-server-1/tranformedpoetry.py --port 10001
詩のサーバはこんな感じです。
python twisted-server-1/fastpoetry.py --port 10002 poetry/fascination.txt
python twisted-server-1/fastpoetry.py --port 10003 poetry/science.txt
これで詩のクライアントを上記のように実行できます。 変換サーバをクラッシュさせてみて、同じコマンドでクライアントを再度実行してみましょう。
まとめましょう¶
このパートでは、遅延オブジェクトがコールバックチェーンの途中で透過的に他の遅延オブジェクトを扱う方法を学びました。 このため、私たちは詳しいことを気にせず安全に非同期のコールバックを「アウター」に付け足すことができます。 私たちの関数の多くが非同期になりますので、とてもお手軽です。
これで遅延オブジェクトに関しては全てを理解したのでしょうか? とんでもない! 話しておくべき重要な機能がもうひとつあります。とはいえ、これは”パート14: Deferred が無かったら”にとっておきましょう。
おすすめの練習問題¶
名前によって特定の種類の変換をお願いできるようにクライアントを修正しましょう。
変換サーバのアドレスがオプション引数になるようにクライアントを修正しましょう。引数がなければ変換ステップを飛ばします。
PoetryClientFactory が現在は「自分で作ったら自分で開始させて」という遅延オブジェクトの決まりごとを破っています。これを改めるように
get_poetry
とPoetryClientFactory
をリファクタリングしてください。実際にお見せしていませんが、エラー用コールバックが遅延オブジェクトを返すと対称的になります。 このことを確認するために twisted-deferred/defer-10.py の例を修正してみてください。
Deferred の実装において、コールバックもしくはエラー用コールバックが自分とは異なる遅延オブジェクトを返す場合を扱っている箇所を見つけてください。