DataflowJavaSDK Weekly

DataflowJavaSDK の更新を解説します。このブログは Google 社とは一切関係ありません。

DoFn と Windowing, Trigger API, Orderness, Progress Estimation と Source の並列化, Custom Output, Streaming Side Input

先週はコミットが全く push されていなかったのですが、その間たまっていたのが一気に push されたようで、最後にチェックしたところから 113 個もコミットが追加されていました。 さすがに読み切れないので今回はコミットメッセージを斜め読みして気になるコミットをまとめるだけにします。それでもかなり盛りだくさんになりました。

ものすごくおおざっぱにまとめると

  • 主に streaming mode のための Windowing や Trigger API といった機構の整備
  • Custom Output のサポート

が最近の大きなトピックであったかなという感じです。

DoFn と Windowing

この期間中 bjchambers というアカウントの Googler が精力的に活動していて、主に Windowing に関連した変更やリファクタリングをしています。

Change DoFn.ProcessContext#windows to #window. · GoogleCloudPlatform/DataflowJavaSDK@3c56d2f · GitHub で DoFn.ProcessContext#windows() メソッドが #window() と名称が変わっています。それだけでなく DoFn.RequiresWindowAccess という空の Interface が追加されていて、DoFn の継承クラスで window() メソッドを呼ぶものに目印として implements すべしというものみたいです。これは Windowing を意識した DoFn をサービスの runtime 側で特別扱いするため、もしくはこれがない時は Windowsing がないものとして最適化したスケジューリングをするなどの用途で使われるのではないかなと予想しています。

また Introduce an interface for the DoFns that implement windowing to use. · GoogleCloudPlatform/DataflowJavaSDK@3c4101b · GitHub で DoFn#windowingInternals() という抽象メソッドが追加され、WindowingInternals<I, O> という interface を返すようにしています。これはデータを timestamp を元に Windows に分配する Window を自前で実装することを可能にしているようです。

Trigger API

bjchambers は他にも https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/6bf7e28547a225accf3bc29ceb06c812b4a4bedahttps://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/0d6899f8bcfbf57afb3fcf7949c6db10ba31e998https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/2be164eae342e154234821c2776d54a15dbe04a0https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/d6bfe4a17d42a61e90723cdc2d8f3bf23a39aeb0https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/f9d89832badccfbb488fddb7fc80c36ce4baae72 等々 Trigger API に関連してたくさんのコミットをしています。

Trigger API とはなんでしょうか。 Update the Javadoc for features related to Windowing & Triggers · GoogleCloudPlatform/DataflowJavaSDK@f1b1db8 · GitHub に Windowing と Trigger について Javadoc の更新があるので、これを読んでみたところ、主に streaming mode で各 Windows のデータの処理をいつ開始するかというのを制御するための仕組みのようです。また streaming mode の Source (現在は PubsubIO のみです)から得られるデータについている timestamp にはいくつか strategy があり、データの発生元が設定した timestamp を利用する場合、Dataflow の Windowing による Window の終端時刻が過ぎてからその Window のデータが到着するという可能性があるため、このような遅れてきたデータ(late data)が来た時にいつ処理するか(すぐに処理するとか、バッファリングしておいてまとめて処理するか)というのも制御できるようです。

Orderness

This removes and cleans up PCollection ordererdness. · GoogleCloudPlatform/DataflowJavaSDK@80187b7 · GitHub で PCollection#setOrdered と PCollection#isOrdered を削除しています。つまり PCollection のデータは ParDo で DoFn に渡される順番が保証することができなくなるということで、並列処理する上では一般的にはそうでしょうね。しかし出力をソートしたい時とかはどうするんだろう。

Progress Estimation と Source の並列化

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/e07ed2fb506b6ed4de58c39137ce40d2a9d75708 では Progress Estimation のサポートと Custom Source で入力を実装した時に入力を並列で行えるように分割する方法を追加しています。 Source を継承した BoundedSource というクラスが追加されていて、これを継承して Custom Source を作って #getEstimatedSizeBytes() で全体のサイズを返すことで Progress Estimation に使われるようです。また #splitAtFraction() で Source を分割して並列で読み込みできるようにします。 ByteOffsetBasedSource では実際にこれを実装しています。それをみると引数の double fraction で 0.0 から 1.0 のあいだの数値で「全体のうちこの割合までの入力を担当することにして、残りの担当を表す Source を生成してそれを return する」ということをしているみたいです。

Custom Output

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/2b76a7ef4c1c8c5edf9521654bde1fdc906deb13 で Sink という抽象クラスと Write という transform が追加されていて、Custom Output を実装することができるようにするための準備が進んでいるようです。まだ Dataflow のバックエンドで実際に使えるのかどうかわかりませんが。同時に FileBasedSink と XmlFileBasedSink という抽象クラスも追加されていて実装の参考になります。現状だと TextIO で GCS に出力すると結果は複数のファイルに分割されますが、そのマージを行うタイミングも用意されているみたいなのでこれを使って実装しなおせば結果を1ファイルにまとめることもできそうですね。ただデータの出力は冗長性のために重複する可能性があって、bundle と呼ばれる PCollection の実際のデータを処理するかたまり毎にIDが振ってあるからそれを元に冗長除去してねみたいなことも書かれてあって、ちゃんと実装しようと思ったらなかなか大変そうです。

Streaming Side Input

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/7031109aa475fa4dafbc4def65d058ee402d996c でついに streaming mode での side input が実行可能になったようです。