DataflowJavaSDK Weekly

DataflowJavaSDK の更新を解説します。このブログは Google 社とは一切関係ありません。

contrib、Join Library、Thread Num、Combine.BinaryCombineFn、FileBasedSource<T>

今回は初回ということもあり3月上旬からの変更点のうち目についたものを取り挙げます。

contribディレクトリ導入

Setting up 'contrib' directory and rules. · 8a3e55d にて contrib ディレクトリが作成され、community からの contribution によるライブラリなどはこの下に配置するようにしたようです。SDK本体とはメンテナンスの主体が異なり、品質の保証をGoogleがするわけではないということを明確しつつ、コミュニティからのコードによる貢献を可能にするための措置でしょう。contrib/README.md をよむと、各サブディレクトリは独自のリポジトリを持って git の submodule 機能を使って contrib の下に取り込むことも可能とのこと。そしてさっそく

Move join code to contrib module join-library. Adding pom.xml and README... · ecc44e1 で join-library が contrib の下に追加されています(上記のコミットは別のブランチで一旦本体のSDK内に追加されたものを移動する内容です)。

Join Library

Generic implementation of inner and outer left/right join. · f8765aa にて

Join 機能を提供するユーティリティクラスJoinを追加しています(上記の contrib の記事にでてきたものです)。これは2つのKey-Value型(KV<K,V>)のPCollectionを受け取り、KV型を値としたKV型にして返すものです。

つまり PCollection<<K,V1>> と PCollection<KV<K,V2>> から共通の K の値をまとめた PCollection<KV<K,KV<V1,V2>>> を返すユーティリティメソッドを提供しています。InnerJoin と OuterJoin は一方にしか存在しないキーの要素が含まれるか否かです。

この Join Library は初の contrib に追加されたライブラリですが、作者の M.Runesson は Google 社員のようです。

Thread Num

DataflowWorkerHarness はおそらくマネージドサービスの時にGCEのVMインスタンス上でDataflowの処理を走らせるための管理用のクラスではないかと思います。

Change DataflowWorkerHarness to repeatedly request work. Switches to hav... · e89f47f で起動する Thread 数をプロセッサのコア数-1からコア数と同じ数まで起動するように変更されています。また無名クラスとして定義していたコールバックメソッド用のクラスをWorkerThreadというインナークラスとして定義し、Thread内で繰り返しタスクを取得して実行するようにしています。

Combine.BinaryCombineFn

transforms/Combine.java に BinaryCombineFn が追加され、Iterable<V> で複数の値の集合を値として持つ PCollection の要素を2項演算子で集計した結果を返す PTransfer を容易に定義できるようにしています。

 実際に Max, Min, Sum などのプリミティブな PTransfer がこれを用いて書き直されています。

FileBasedSource<T>

Introduces FileBasedSource<T>, a custom source that implements functiona... · b9f4384

 でファイルベースの入力Sourceを定義するために利用できるベースクラスとして FileBasedSource<T> を追加しています。Cloud Dataflow では入出力先のカスタムドライバを定義して利用することができる(ただし FAQ によると現在のアルファ版SDKではカスタム入力ソースの開発はサポートされていないとある)ので、その時に使えるようにしているものだと思います。ファイルの shard への分割などの機能を提供し、サブクラスでは具体的なファイルからのデータの読み込み方法の記述のみで利用できるようにしているものだと思います。