パート18: Deferreds En Masse

はじめに

ひとつ前の章で、ジェネレータを使って一連の非同期なコールバックを構成する新しい方法を学びました。 遅延オブジェクトを使うことを含め、非同期操作を繋げる方法としてはふたつのテクニックを利用可能になりましたね。

けれどもときどき、非同期操作をまとめて並列に実行させたくなりますよね。 Twisted はシングルスレッドですので、本当に並行して動かすことはできませんが、大事な点は、できる限り素早くタスクグループを実行させるために非同期な入出力を使いたいことです。 たとえば詩のクライアントでは、同時に複数のサーバから詩をダウンロードします。サーバを順々に回ったりはしません。 結局のところ、これこそが詩を取得するために Twisted を使う全てなのです。

そして結果として、すべての詩のクライアントはこの問題を解かなくてはなりませんでした。

あなたが開始したすべての非同期操作が完了したときを、どうよって知るのでしょうか?

ここまでは、結果をリストに詰め込み (クライアント 7.0 にある results 一覧のように)、 その大きさを確認することで問題を解決してきました。 成功した結果と同様に、失敗した場合も収集しておくことに気を遣わなくてはなりません。 さもなくば、ひとつの失敗がプログラムを永久に実行させてしまうことに繋がります。すべきことが残っていると考えてしまうのです。

ご期待のように、Twisted はこの問題を解決するために利用できる抽象化を持ち合わせています。 今日はこれを見ていくことにしましょう。

DeferredList

DeferredList クラスは遅延オブジェクトのリストをひとつの遅延オブジェクトとして扱えるようにしてくれます。 これにより、たくさんの非同期操作を開始し、それらすべてが終了したときにだけ知らせてもらえます (それらが成功したか失敗したかに関わらず)。 いくつかの例を見ていきましょう。

deferred-list/deferred-list-1.py には次のコードがあります。

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'Empty List.'
d = defer.DeferredList([])
print 'Adding Callback.'
d.addCallback(got_results)

実行させてみると、次の出力を得られます。

Empty List.
Adding Callback.
We got: []

いくつか注意しておきましょう。

  • DeferredList は Python の list から生成されます。ここではリストは空です。しかし、全ての要素が Deferred オブジェクトでなければならないことがすぐに分かります。

  • DeferredList はそれ自身が遅延オブジェクト (Deferred から継承されたもの) となります。 通常の遅延オブジェクトと同様に、コールバックとエラー用コールバックを追加できるのです。

  • 上述の例では、コールバックは追加されるとすぐに発火されました。 DeferredList も同じく即座に発火しました。 これについては後で議論していきます。

  • 遅延オブジェクトのリストの結果はそのままリスト (空) でした。

deferred-list/deferred-list-2.py を見てみましょう。

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'One Deferred.'
d1 = defer.Deferred()
d = defer.DeferredList([d1])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')

ひとつの遅延オブジェクトを含む、要素がひとつだけのリストから DeferredList を作りました。 次のような出力が見られますね。

One Deferred.
Adding Callback.
Firing d1.
We got: [(True, 'd1 result')]

さらに注意しておきましょう。

  • 今回は DeferredList はリスト内で遅延オブジェクトを発火させるまでコールバックを発火しませんでした。

  • 結果は依然としてリストですが、ひとつの要素があります。

  • この要素は、ふたつ目の値がリスト内の遅延オブジェクトの結果を表すタプルです。

リストにふたつの遅延オブジェクトを入れてみましょう。 (deferred-list/deferred-list-3.py)

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'Two Deferreds.'
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
print 'Firing d2.'
d2.callback('d2 result')

出力は次のようになります。

Two Deferreds.
Adding Callback.
Firing d1.
Firing d2.
We got: [(True, 'd1 result'), (True, 'd2 result')]

ここでは、少なくとも私たちが使ってきた方法では、 DeferredList の結果が、コンストラクタに渡した遅延オブジェクトのリストと同じ個数の要素を持つリストであることがとてもはっきりしています。 そして、結果のリストの要素は元の遅延オブジェクトの結果を含んでいます。少なくとも遅延オブジェクトが成功した場合は。 これは、 DeferredList 自体は元のリストにあった全ての遅延オブジェクトが発火し終わるまで発火しないことを意味します。 見方を変えると、空のリストから生成された DeferredList は、待つべき遅延オブジェクトがありませんので、即座に発火します。

最終的なリストにおける結果の順番はどうなるでしょうか? deferred-list/deferred-list-4.py について考えましょう。

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

print 'Two Deferreds.'
d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2])
print 'Adding Callback.'
d.addCallback(got_results)
print 'Firing d2.'
d2.callback('d2 result')
print 'Firing d1.'
d1.callback('d1 result')

d2 を最初に発火させ、それから d1 を発火させました。 それでも、 d1d2 が元の順番を保って遅延オブジェクトのリストが構成されています。 出力は次の通りです。

Two Deferreds.
Adding Callback.
Firing d2.
Firing d1.
We got: [(True, 'd1 result'), (True, 'd2 result')]

出力のリストは元々の遅延オブジェクトのリストと同じ順番で結果を持っています。それぞれの遅延オブジェクトが発火された順番ではありません。 これはとても素晴らしいことです。 生成された操作とそれぞれの結果を簡単に関連付けることができますからね (たとえば、どの詩がどのサーバから届いたか、など)。

よし、それでは、リスト内のひとつ以上の遅延オブジェクトが失敗したら何が起きるでしょうか? また、 True である値はどうなるでしょうか? deferred-list/deferred-list-5.py の例で試してみましょう。

from twisted.internet import defer

def got_results(res):
    print 'We got:', res

d1 = defer.Deferred()
d2 = defer.Deferred()
d = defer.DeferredList([d1, d2], consumeErrors=True)
d.addCallback(got_results)
print 'Firing d1.'
d1.callback('d1 result')
print 'Firing d2 with errback.'
d2.errback(Exception('d2 failure'))

d1 は通常の結果になるように発火させ、 d2 はエラーにしています。 ここでは consumerErrors オプションは無視しておいてください。後で見ていきます。 出力は次のようになります。

Firing d1.
Firing d2 with errback.
We got: [(True, 'd1 result'), (False, <twisted.python.failure.Failure <type 'exceptions.Exception'>>)]

d2 に対応するタプルの値は、ふたつ目のスロットは Failure で、ひとつ目のスロットは False です。 この時点では、 DeferredList がどのように動作するかは非常に明快でしょう (とはいえ、以下の「議論」にも目を通してくださいね)。

  • DeferredList は遅延オブジェクトのリストから構築されます。

  • DeferredList は、その結果が与えられたリストと同じ長さのリストである、それ自身が遅延オブジェクトでもあります。

  • DeferredList は元のリストに含まれる全ての遅延オブジェクトが発火した後で、発火します。

  • 結果のリストの個別の要素は、元のリストと同じ位置の遅延オブジェクトに対応します。 遅延オブジェクトが成功した場合の要素は (True, result) であり、失敗した場合は (False, failure) になります。

  • DeferredList は決して失敗しません。何があろうとも個々の遅延オブジェクトの結果はリストに収集されるからです (しかし繰り返しになりますが、以下の「議論」にも目を通してくださいね)。

DeferredList に渡した consumeErrors オプションについて考えましょう。 オプションを渡さないで同じコードを実行させると (deferred-list/deferred-list-6.py)、以下の出力を得られます。

Firing d1.
Firing d2 with errback.
We got: [(True, 'd1 result'), (False, <twisted.python.failure.Failure <type 'exceptions.Exception'>>)]
Unhandled error in Deferred:
Traceback (most recent call last):
Failure: exceptions.Exception: d2 failure

もし思い出せるなら、”Unhandled error in Deferred” というメッセージは、遅延オブジェクトがゴミ回収され (原文: garbage collected) 遅延オブジェクト内の最後のコールバックが失敗したときに生成されます。 このメッセージは、プログラムにある全ての潜在的な非同期の失敗を捕まえていなかった、ということを教えてくれます。 ではここでの例だと、その失敗はどこからやってきたのでしょうか? DeferredList からでないことは明らかです。成功しますから。 ということで、 d2 からに違いありません。

DeferredList は監視しているそれぞれの遅延オブジェクトがいつ発火するかを知る必要があります。 DeferredList は他と変わらない方法でこれを実行します。それぞれの遅延オブジェクトにコールバックとエラー用コールバックを付与します。 デフォルトでは、コールバック (エラー用コールバックも) は最終的なリストに結果を詰め込んだ後で、元の結果 (もしくは失敗) を返します。 エラー用コールバックから元の失敗を返すと次のエラー用コールバックを発動させることになりますので、 d2 は発火後にも失敗した状態のままなのです。

しかし DeferredListconsumeErrors=True を渡すと、個々の遅延オブジェクトに対して DeferredList が追加したエラー用コールバックは、 失敗の代わりに None を返します。 したがって、エラーを消費し (“consuming”)、警告メッセージを取り除きます。 また、 deferred-list/deferred-list-7.py にあるように、 d2 に独自のエラー用コールバックを付け足すことでエラーを処理することもできます。

クライアント 8.0

バージョン 8.0 の Get Poetry Now! クライアントは、全ての詩が完了 (あるいは失敗) したときを判別するために DeferredList を使います。 新しいクライアントは twisted-client-8/get-poetry.py にあります。 繰り返しになりますが、変更箇所は poetry_main にしかありません。 重要な変更部分を見ていきましょう。

...
ds = []

for (host, port) in addresses:
    d = get_transformed_poem(host, port)
    d.addCallbacks(got_poem)
    ds.append(d)

dlist = defer.DeferredList(ds, consumeErrors=True)
dlist.addCallback(lambda res : reactor.stop())

client 7.0 の同じセクションと比較したくなるかもしれませんね。

クライアント 8.0 では poem_done コールバックも results リストも必要ありません。 その代わりに get_transformed_poem から受け取ったそれぞれの遅延オブジェクトをリスト (ds) に入れて、 DeferredList を生成します。 DeferredList は全ての詩に対する処理が完了するまで発火しませんので、reactor を停止させるために DeferredList にコールバックを追加します。 今回は DeferredList の結果を使っていません。 すべてが完了したことを知りたいだけなのです。 これだけです!

議論

図37に、 DeferredList がどのように動作するかを示します。

_images/p18_deferred-list.png

図37:DeferredList の結果

非常にシンプルですね。 DeferredList にはここで触れていないオプションがいくつかあり、それによって上述の振る舞いを変えることができます。 これは文末の「おすすめの練習問題」であなたが追求していってください。

次のパート (“パート19: 欲しいと思っていても考えを改めると”) では Deferred クラスのもうひとつの機能をおさえておきましょう。 これは Twisted 10.1.0、つまり比較的最近になって導入された機能です。

おすすめの練習問題

  • DeferredListソースコード を読んでみてください。

  • コンストラクタの fireOnOneCallbackfireOnOneErrback オプション引数を試してみられるように deferred-list の例を修正してください。 どちらか片方 (もしくは両方) を使うようなシナリオに合わせてみましょう。

  • DeferredList のリストを使って DeferredList を作成できますか? 可能な場合は、その結果はどのように見えるでしょうか?

  • 全ての詩をダウンロードし終えるまで何も出力しないようにクライアント 8.0 を修正してください。 今回は DeferredList からの結果を使うことになるでしょう。

  • DeferredDict の意味を定義して、それを実装してください。