DataflowJavaSDK Weekly

DataflowJavaSDK の更新を解説します。このブログは Google 社とは一切関係ありません。

Beta Release is coming?

先週は大量のコミットがあって大変でしたが、今週は18個くらいで、コメントや Javadoc の更新も多かったのでコード的な変更は細かな不具合修正などでした。

Beta Release is coming?

しかし README.md の変更に重要な内容があって、 Update README.md for Beta · GoogleCloudPlatform/DataflowJavaSDK@843eec8 · GitHub に "The SDK is publicly available as a Beta release" と Alpha から Beta 公開へ記述が変更されていて、Beta リリースが近いのではないかと思われます。

DoFn と Windowing, Trigger API, Orderness, Progress Estimation と Source の並列化, Custom Output, Streaming Side Input

先週はコミットが全く push されていなかったのですが、その間たまっていたのが一気に push されたようで、最後にチェックしたところから 113 個もコミットが追加されていました。 さすがに読み切れないので今回はコミットメッセージを斜め読みして気になるコミットをまとめるだけにします。それでもかなり盛りだくさんになりました。

ものすごくおおざっぱにまとめると

  • 主に streaming mode のための Windowing や Trigger API といった機構の整備
  • Custom Output のサポート

が最近の大きなトピックであったかなという感じです。

DoFn と Windowing

この期間中 bjchambers というアカウントの Googler が精力的に活動していて、主に Windowing に関連した変更やリファクタリングをしています。

Change DoFn.ProcessContext#windows to #window. · GoogleCloudPlatform/DataflowJavaSDK@3c56d2f · GitHub で DoFn.ProcessContext#windows() メソッドが #window() と名称が変わっています。それだけでなく DoFn.RequiresWindowAccess という空の Interface が追加されていて、DoFn の継承クラスで window() メソッドを呼ぶものに目印として implements すべしというものみたいです。これは Windowing を意識した DoFn をサービスの runtime 側で特別扱いするため、もしくはこれがない時は Windowsing がないものとして最適化したスケジューリングをするなどの用途で使われるのではないかなと予想しています。

また Introduce an interface for the DoFns that implement windowing to use. · GoogleCloudPlatform/DataflowJavaSDK@3c4101b · GitHub で DoFn#windowingInternals() という抽象メソッドが追加され、WindowingInternals<I, O> という interface を返すようにしています。これはデータを timestamp を元に Windows に分配する Window を自前で実装することを可能にしているようです。

Trigger API

bjchambers は他にも https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/6bf7e28547a225accf3bc29ceb06c812b4a4bedahttps://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/0d6899f8bcfbf57afb3fcf7949c6db10ba31e998https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/2be164eae342e154234821c2776d54a15dbe04a0https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/d6bfe4a17d42a61e90723cdc2d8f3bf23a39aeb0https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/f9d89832badccfbb488fddb7fc80c36ce4baae72 等々 Trigger API に関連してたくさんのコミットをしています。

Trigger API とはなんでしょうか。 Update the Javadoc for features related to Windowing & Triggers · GoogleCloudPlatform/DataflowJavaSDK@f1b1db8 · GitHub に Windowing と Trigger について Javadoc の更新があるので、これを読んでみたところ、主に streaming mode で各 Windows のデータの処理をいつ開始するかというのを制御するための仕組みのようです。また streaming mode の Source (現在は PubsubIO のみです)から得られるデータについている timestamp にはいくつか strategy があり、データの発生元が設定した timestamp を利用する場合、Dataflow の Windowing による Window の終端時刻が過ぎてからその Window のデータが到着するという可能性があるため、このような遅れてきたデータ(late data)が来た時にいつ処理するか(すぐに処理するとか、バッファリングしておいてまとめて処理するか)というのも制御できるようです。

Orderness

This removes and cleans up PCollection ordererdness. · GoogleCloudPlatform/DataflowJavaSDK@80187b7 · GitHub で PCollection#setOrdered と PCollection#isOrdered を削除しています。つまり PCollection のデータは ParDo で DoFn に渡される順番が保証することができなくなるということで、並列処理する上では一般的にはそうでしょうね。しかし出力をソートしたい時とかはどうするんだろう。

Progress Estimation と Source の並列化

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/e07ed2fb506b6ed4de58c39137ce40d2a9d75708 では Progress Estimation のサポートと Custom Source で入力を実装した時に入力を並列で行えるように分割する方法を追加しています。 Source を継承した BoundedSource というクラスが追加されていて、これを継承して Custom Source を作って #getEstimatedSizeBytes() で全体のサイズを返すことで Progress Estimation に使われるようです。また #splitAtFraction() で Source を分割して並列で読み込みできるようにします。 ByteOffsetBasedSource では実際にこれを実装しています。それをみると引数の double fraction で 0.0 から 1.0 のあいだの数値で「全体のうちこの割合までの入力を担当することにして、残りの担当を表す Source を生成してそれを return する」ということをしているみたいです。

Custom Output

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/2b76a7ef4c1c8c5edf9521654bde1fdc906deb13 で Sink という抽象クラスと Write という transform が追加されていて、Custom Output を実装することができるようにするための準備が進んでいるようです。まだ Dataflow のバックエンドで実際に使えるのかどうかわかりませんが。同時に FileBasedSink と XmlFileBasedSink という抽象クラスも追加されていて実装の参考になります。現状だと TextIO で GCS に出力すると結果は複数のファイルに分割されますが、そのマージを行うタイミングも用意されているみたいなのでこれを使って実装しなおせば結果を1ファイルにまとめることもできそうですね。ただデータの出力は冗長性のために重複する可能性があって、bundle と呼ばれる PCollection の実際のデータを処理するかたまり毎にIDが振ってあるからそれを元に冗長除去してねみたいなことも書かれてあって、ちゃんと実装しようと思ったらなかなか大変そうです。

Streaming Side Input

https://github.com/GoogleCloudPlatform/DataflowJavaSDK/commit/7031109aa475fa4dafbc4def65d058ee402d996c でついに streaming mode での side input が実行可能になったようです。

side inputs per window 再び, 用語の統一, Partial Group By Key with Combine Function

今週も 12 個ほどコミットがありました。そのなかから主要そうなものをピックアップします。

side inputs per window 再び

先週 32d07db でコミットされた side inputs の windowing 対応ですが、その後 90c811a で一旦 revert され、その後 Makes side inputs per window. · GoogleCloudPlatform/DataflowJavaSDK@67e99cc で再度適用されています。 sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java の変更が消されているようですね。 context.sideInput() は startBundle() や finishBundle() では呼び出せないようになって、現在処理中の入力の PCollection の window と同じ window のデータのみ取得できるようになるそうです。

Windows support 再び

Updated TestFileBasedSourceTest to use File.separator instead of "/". · GoogleCloudPlatform/DataflowJavaSDK@4d7ae28 は特に大きな変更ではなくて、テストでファイルの separator を "/" と書いていたのをポータビリティのため File.separator が利用されるように変えていて、ここでも Windows でテストされている感が出ています。

用語の統一 (shard -> bundle, fork -> dynamic split)

Renaming shard => bundle in the Java custom source API. · GoogleCloudPlatform/DataflowJavaSDK@2a8d571Renames fork => dynamic split in the Dataflow Java SDK. · GoogleCloudPlatform/DataflowJavaSDK@7cbc12a はそれぞれ shard を bundle (データを処理単位に分割したもののことだと思います)、fork を dynamic split (データを複数の worker に分散させることのことかなと思います)と用語を整理しています。まだアルファということもあってこのような用語の統一のために API の互換性も壊しても一貫性を保つようにしているようです。

Partial Group By Key with Combine Function

Modify the Dataflow Java SDK to support the combining function in Partia... · GoogleCloudPlatform/DataflowJavaSDK@b426cbd PartialGroupByKeyOperation で Combine も同時に適用できるようにしています。 Partial Group By Key という処理がいまいちよくわからないのですが、グループ分け後の値の集合について Conbine 関数を適用して集計処理を同時に実施できるようにしています。

PCollectionView, Proto2Coder, Windows support, side inputs per window

今週も 21個ほどコミットが push されていました。そのなかから主に backword-incompatible とされている変更をピックアップしてみます。

PCollectionView の型変数削除

Remove unnecessary type parameter on PCollectionView. c919a29

PCollectionView<T,WT> と2つの型変数を持っていたクラスを PCollectionView<T> と未使用になっていた2つめの型変数を削除しています。 自前で PTransform のクラスを書いているような場合は変更が必要だと思います。DoFn を書いているだけ(つまり普通のユーザー)にとっては影響はないでしょう。たぶん。

nested context to the Proto2Coder

Added support for nested context to the Proto2Coder. · GoogleCloudPlatform/DataflowJavaSDK@6cb1963

Proto2Coder でネストしたメッセージ構造の serialize/deserialize に対応しています。

なんてことない変更ですがなぜこれを取りあげたかというと、DataflowJavaSDK での PCoder というしくみについてちょっと調べてみたからです。 Proto2Coder というのがなにものかというと、これは Protocol Buffer のメッセージを encode/decode するためのものみたいです。処理の実体はほとんど com.google.protobuf のパッケージに委譲しています。 Proto2Coder 自体は DataflowJavaSDK 内では利用されていませんが、これはユーザが PCollection.setCoder() や TextIO.withCoder() で指定して使うためのものだと思われます。Dataflow の pipeline 上のデータ PCollection は構造化されたデータ(つまり Integer だったり Double だったり KV<K, V> といったクラスのインスタンスであったり)ですが、入出力(現在のところ withCoder を実装しているのは TextIO のみのようです)の際に byte stream に変換するにあたりそのフォーマットを指定するのが PCoder です。公式のドキュメントにも説明がありました。 Data Encoding - Google Cloud Dataflow

Windows support

Updated the FileBasedSourceTest so that it does not depend on the size oe of the new line character (it was failing on Windows due to this) · GoogleCloudPlatform/DataflowJavaSDK@38b3685 という変更はただのテストの修正なのですが、注目したのは変更理由で、Windows での改行コードのサイズの違いによりエラーになっていたため、改行コードのサイズに依存しないようにしているとあり、Windows 上でもテストがされているんだな、ということがわかります。

side inputs per window

Makes side inputs per window. · GoogleCloudPlatform/DataflowJavaSDK@32d07db にて、sideInput() についての backwords-incompatible な変更が入っています。 sideInput() は ParDo などで複数の PCollection を受け取るための機構で、メインの入力となる PCollection のほかに ParDo.withSideInputs() で PCollection を受け取って、DoFn 内で ProcessContext.sideInput() で取り出すということができますが、これまで GlobalWindowing、つまり全てのデータが1つの window にはいるというバッチ的なモードしかサポートされていなかったのを、sideInput() でも同じ window のデータのみ受け取れるようにしているようです。おそらくこれは FAQ の limitations of streaming mode のひとつである "Side Inputs to ParDo are not supported in streaming mode." を解消するべく導入されたものだと思います。

[追記] この変更は Rollback of a "Makes side inputs per window" change. · GoogleCloudPlatform/DataflowJavaSDK@90c811a で rollback されました。まだ何か問題があったのでしょうか。

contrib、Join Library、Thread Num、Combine.BinaryCombineFn、FileBasedSource<T>

今回は初回ということもあり3月上旬からの変更点のうち目についたものを取り挙げます。

contribディレクトリ導入

Setting up 'contrib' directory and rules. · 8a3e55d にて contrib ディレクトリが作成され、community からの contribution によるライブラリなどはこの下に配置するようにしたようです。SDK本体とはメンテナンスの主体が異なり、品質の保証をGoogleがするわけではないということを明確しつつ、コミュニティからのコードによる貢献を可能にするための措置でしょう。contrib/README.md をよむと、各サブディレクトリは独自のリポジトリを持って git の submodule 機能を使って contrib の下に取り込むことも可能とのこと。そしてさっそく

Move join code to contrib module join-library. Adding pom.xml and README... · ecc44e1 で join-library が contrib の下に追加されています(上記のコミットは別のブランチで一旦本体のSDK内に追加されたものを移動する内容です)。

Join Library

Generic implementation of inner and outer left/right join. · f8765aa にて

Join 機能を提供するユーティリティクラスJoinを追加しています(上記の contrib の記事にでてきたものです)。これは2つのKey-Value型(KV<K,V>)のPCollectionを受け取り、KV型を値としたKV型にして返すものです。

つまり PCollection<<K,V1>> と PCollection<KV<K,V2>> から共通の K の値をまとめた PCollection<KV<K,KV<V1,V2>>> を返すユーティリティメソッドを提供しています。InnerJoin と OuterJoin は一方にしか存在しないキーの要素が含まれるか否かです。

この Join Library は初の contrib に追加されたライブラリですが、作者の M.Runesson は Google 社員のようです。

Thread Num

DataflowWorkerHarness はおそらくマネージドサービスの時にGCEのVMインスタンス上でDataflowの処理を走らせるための管理用のクラスではないかと思います。

Change DataflowWorkerHarness to repeatedly request work. Switches to hav... · e89f47f で起動する Thread 数をプロセッサのコア数-1からコア数と同じ数まで起動するように変更されています。また無名クラスとして定義していたコールバックメソッド用のクラスをWorkerThreadというインナークラスとして定義し、Thread内で繰り返しタスクを取得して実行するようにしています。

Combine.BinaryCombineFn

transforms/Combine.java に BinaryCombineFn が追加され、Iterable<V> で複数の値の集合を値として持つ PCollection の要素を2項演算子で集計した結果を返す PTransfer を容易に定義できるようにしています。

 実際に Max, Min, Sum などのプリミティブな PTransfer がこれを用いて書き直されています。

FileBasedSource<T>

Introduces FileBasedSource<T>, a custom source that implements functiona... · b9f4384

 でファイルベースの入力Sourceを定義するために利用できるベースクラスとして FileBasedSource<T> を追加しています。Cloud Dataflow では入出力先のカスタムドライバを定義して利用することができる(ただし FAQ によると現在のアルファ版SDKではカスタム入力ソースの開発はサポートされていないとある)ので、その時に使えるようにしているものだと思います。ファイルの shard への分割などの機能を提供し、サブクラスでは具体的なファイルからのデータの読み込み方法の記述のみで利用できるようにしているものだと思います。

 

DataflowJavaSDK 更新解説をはじめます

Dataflow とは

Google Cloud Platform のサービスのひとつ Cloud Dataflow のことです。大規模データのバッチ処理、リアルタイムでの集計、分析などをパイプライン形式のプログラミングモデルで記述して分散処理を行うサービスです。現在はアルファ版で申請のあったユーザーにのみ利用可能です。詳細は公式のドキュメント What is Google Cloud Dataflow? を参照してください。

Dataflow Java SDK とは

Cloud Dataflow の主要なコンポーネントには、Google Cloud Platform のマネージドサービスとしての Cloud Dataflow と、Dataflow 上でのデータ処理を記述する SDK があります。

マネージドサービスとしての Cloud Dataflow は現在のところアルファ版で、申請のあったユーザーのみ利用可能です。

SDK は現在のところ Java 版の DataflowJavaSDKオープンソースになっており、GitHub 上で公開されています。この DataflowJavaSDK は誰でも利用可能です。DataflowJavaSDK を用いてプログラミングされたパイプライン処理にはローカルでのテスト実行を行うモードもあり、こちらは許可されたユーザーでなくても Cloud Dataflow のプログラミングモデルを試すことが可能です(ただし Google Cloud Storage を利用するため Google Cloud Platform のユーザ登録は必要)。

DataflowJavaSDK Weekly とは

Cloud Dataflow は現在アルファ版であり活発に開発中です。どんなことができるサービスになるのかはユーザからのフィードバックによっても変化し続けるであろうと思われます。 しかしDataflowJavaSDKの更新はGitHub上のリポジトリに日々コミットがpushされているため、開発状況はSDKの変更を追跡することでも伺い知ることができると思われます。

DataflowJavaSDK WeeklyではDataflowJavaSDKの変更点を観察し変更点をまとめることで現在の開発動向を知ることを目的とした個人の活動です。このブログはGoogle社とは全く関係がありません。