DataflowJavaSDK Weekly

DataflowJavaSDK の更新を解説します。このブログは Google 社とは一切関係ありません。

Job name uniqueness, unlimit GCS filezise, Avro Source, Sharding BigQuery output

少々間があきましたが、着実に追い付いてきています。

Job name uniqueness

Add logic in SDK to support unique name check during job creation. · GoogleCloudPlatform/DataflowJavaSDK@1147e4c · GitHub

ジョブ名に自動的に日付や時刻を埋め込みがユニークになることを保証するようにしています。また同名のジョブを投入しようとした時のエラーメッセージも親切なものにしています。 Dataflow のジョブ管理まわりは割と適当でしたが、そろそろこのへんも実装の手が回りはじめたようですね。

unlimit GCS filezise

Turning off the 250Gb upload limit by default · GoogleCloudPlatform/DataflowJavaSDK@fc35bb5 · GitHub

GCS へアップロードするファイルサイズに 250GB の制限があったそうなのですがそれをデフォルトで off にしています。 そんな制限あるとは知らなかった。

Avro Source

Introduces a custom source implementation of Avro · GoogleCloudPlatform/DataflowJavaSDK@81e162e · GitHub

入力 Source に io/AvroSource を追加して Avro 形式のファイルからの入力のサポートを追加しています。 Avro というのはオープンソースの data serialization system だそうで。 https://avro.apache.org/

Sharding BigQuery output

Support per-window tables in BigQueryIO · GoogleCloudPlatform/DataflowJavaSDK@ccad188 · GitHub

BigQuery への出力に BigQueryIO.Write.to() メソッドを追加して streaming モードで Window 毎に別のテーブルへの出力ができるように(BigQueryIO.Write.to() メソッドにテーブル名を返す SerializableFunction を渡せる)しています。おお、これは便利そうですね。テーブルも自動で作ってくれるそうです。

PubsubIO in batch and direct modes, Dataflow API v1b3, Option for GCE Network

そろそろ追い付きそうです。

PubsubIO in batch and direct modes

Adds support for PubsubIO in batch and direct modes. · GoogleCloudPlatform/DataflowJavaSDK@58dde1a · GitHub

Streaming モードでのみ利用可能だった PubSubIO が Batch モードと direct モード(ローカルで実行するモードのことかな?)でも利用可能になっています。 io/PubsubIO に実装が追加されていますが、Source としてではなく DoFn の一種として Create Transform のように何も入力を受け取らずに出力を発生させる Transform として挙動する PTransform を追加しているという感じみたいです。バッチモードで PubSubIO をどう Bounded にするか(つまりデータ量を有限にするか)については、maxNumRecords() や maxReadTime() で読み込む件数か読み込む時間を制限することで指定するようです。

Dataflow API v1b3

Update generated Dataflow API from v1beta3 to v1b3 · GoogleCloudPlatform/DataflowJavaSDK@4763291 · GitHub

コミットメッセージによると呼び出す Dataflow API の v1beta3 から v1b3 に更新したとのことです。beta3 と b3 ってどう違うんだ…。 あと sdk/pom.xml の依存する google-api-services-dataflow のバージョンの指定は確かにそういう変更なんですけど、実際のソースの dataflowClient の呼び出しからは V1b3() を削るという内容になっていて、はて。v1beta3 のバージョンでは V1b3() を呼ぶことで v1b3 相当のバージョンが呼べるようになっていたのかな。 beta と b の間にバージョニングのルールが何かあるんでしょうね。

Option for GCE Network

Adds PipelineOption to specify a GCE network for GCE VMs · GoogleCloudPlatform/DataflowJavaSDK@24b24e5 · GitHub

PipelineOption に network オプションが追加され、Dataflow を実行する VM インスタンスを起動する時の GCE の network が選択できるようになっています。 おお、これ地味に嬉しい場面が考えられて、Dataflow とは別に自分が立ててる GCE インスタンス上に外部ネットワークには公開していないサービスがあって、Dataflow からそれを参照したい時に、firewall 設定をして接続を許可しないといけないのですが、Dataflow の worker はサービスが勝手に起動するので接続元 IP アドレスを自動的に適用するにはネットワークの設定が必要になるわけです。このオプションで Dataflow を実行する worker のネットワークが指定できるので、それが可能になるのですね。

Add stableUniqueNames option, Set PTransform name, ConcatReader, Unbounded custom source

まだ master の HEAD には追いついてませんが 6月頭の commit まで進んでいます。

Add stableUniqueNames option

Add option to treat non-stable-unique name as an error · GoogleCloudPlatform/DataflowJavaSDK@12c18e7 · GitHub

pipeline 実行のオプションに stableUniqueNames というのを追加しています。各 Transform の名前(というのがあるのをはじめて認識しましたが)がユニークでないときに警告やエラー扱いにするように指定できるようにしています。 Streaming mode の pipeline の reload の時に名前がユニークである必要があるそうです。

Set PTransform name

Allow overriding transform name in apply · GoogleCloudPlatform/DataflowJavaSDK@24718be · GitHub

で、おそらくその Streaming mode の pipeline の reload に関連して Transform に PCollection.apply などのメソッド呼び出し時に名前を付けることができるようにメソッドのオーバロードを追加しています。

ConcatReader

Adds ConcatReader - a Reader that may encapsulate one or more Dataflo… · GoogleCloudPlatform/DataflowJavaSDK@d9243e2 · GitHub

複数の Reader を連結できる ConcatReader というクラスが追加されています。当然ながら取り出される型が同じじゃないといけないので、GCS と BigQuery からの入力を連結、というのはできませんが、複数のファイルを連結したり、複数の BigQuery のクエリ結果(スキーマは同じじゃないといけないでしょうけど)を繋げて入力とするのがやりやすくなりそうです。

Unbounded custom source

Windmill API changes for unbounded custom sources · GoogleCloudPlatform/DataflowJavaSDK@5aa8abc · GitHub

Unbounded な custom source のために Windmill API を変更、ということなのですが差分みてもよくわかりません。そもそも .proto ファイルってのは Protocol Buffer の定義ファイルですかね? ただ Stream mode での入力のカスタマイズが可能になるのかもしれないということでちょっと気になりますね。

Revert MapFn and FlatMapFn, Credential with gcloud, Use PTransform more than once, Cancel pipeline from CLI

引き続き 5月中旬〜後半頃の commit を読み進めています。 ちなみに今週はあまり更新がない(push されてない)のであと 25 commits ほどで追いつきそうです(ということはこのペースだとまた引きはなされそう)。

Revert MapFn and FlatMapFn

Revert "Add MapFn and FlatMapFn" · GoogleCloudPlatform/DataflowJavaSDK@0f00695 · GitHub

前回 コメントした MapFn と FlatMapFn の追加ですが、実はその後 revert されてしまっていました。oh...

Credential with gcloud

Use application default credentials from gcloud core component. · GoogleCloudPlatform/DataflowJavaSDK@7a479f6 · GitHub

pipeline を実行する時に GCP Project の credential の取得方法から gcloud の指定をする方法が削除されています。これは機能がなくなったというより、Dataflow が gcloud の更新によって gcloud alpha の機能に組み込まれたみたいで、わざわざ指定しなくてもデフォルトの挙動になるようになったということみたいですね。試してないですけど。

Use PTransform more than once

Allows a PTransform to be used more than once · GoogleCloudPlatform/DataflowJavaSDK@4a4c23b · GitHub

pipeline の組み立て時に同じ PTransform を繰り返し別の箇所に使いまわしができるようにしているようです。元々 AppliedTransform というクラスが存在していて、pipeline のツリー構造のノードとしてはこれが使われていたようですが、DataflowPipelineTranslator.getCurrentTransform() でこの AppliedTransform を取得できるようにして、必要な箇所では元の PTransform ではなく AppliedTransform で扱うようにしているようです。Ruby で言うと Module と IClass の関係のようなものですね。

Cancel pipeline from CLI

Print the CLI command for cancelling jobs running in the cloud · GoogleCloudPlatform/DataflowJavaSDK@73d813c · GitHub

pipeline の実行時に gcloud によるジョブのキャンセルの方法をメッセージに出力するようにしています。 おお、コマンドラインからジョブのキャンセルってできたんですね(できるようになった?)。

Runtime.totalMemory() vs. Runtime.maxMemory(), MapFn and FlatMapFn

引き続き5月中旬のコミットを読んでいます。

Runtime.totalMemory() vs. Runtime.maxMemory()

Fix wrongly set buffer size for GoogleCloudStorageWriteChannel. · GoogleCloudPlatform/DataflowJavaSDK@6c99876 · GitHub

これ1行の変更で JVM のメモリに関する値をチェックして GCS への書き込み時のバッファサイズを調整しているところなのですが、1行だけの変更なのにコミットログが非常に詳細で、元のコードの意図の推測と、 -Xms オプションの有無の考慮と、あと -Xmx の設定について(worker では物理メモリの 1/4 が設定されているらしいということがここからわかる)などが書かれていて、非常に丁寧ですね。 JVM の Runtime.totalMemory() (現在の使用量)と Runtime.maxMemory() (最大の割り当て可能メモリ)の違いというのがわかりました。

MapFn and FlatMapFn

Add MapFn and FlatMapFn · GoogleCloudPlatform/DataflowJavaSDK@431b317 · GitHub

ParDo の DoFn が入力の値を単に変換するだけの Map や FlatMap の挙動をする時に実装を簡易にするためのサブクラス MapFn や FlatMapFn を導入して、ParDo にも ParDo.map, Pardo.flatMap メソッドを追加しています。DoFn は processElement() を上書きしますが、こっちは fn に変換する関数を渡すか、apply を再定義するだけで良いようです。 また fn は SerializableFunction という型で定義されています。いいですね SerializableFunction。まあ、クラス定義は jar を共有してるんでしょうけど。

Reload Streaming Pipeline, Window in sideInput, Move to the backend

まだ5月前半のコミットを追随している途中です。

Reload Streaming Pipeline

Add support to the SDK for reloading streaming pipelines. This is no… · GoogleCloudPlatform/DataflowJavaSDK@2229a47 · GitHub

"This is not yet supported by the dataflow service." とのことなのでまだ利用できないみたいですが、DataflowPipelineDebugOptions など起動時オプションとして --reload を受け付けて、Streaming モードの Pipeline のリロードというのを入れようとしているみたいです。 Pipeline を動かしたままで worker の処理を途中から更新できるようにするってことですかね。末端の処理ならいいけど、Merge や GroupByKey の構成変わるのは大変そう。

Window in sideInput

Fix race condition that can cause side inputs to be null. This chang… · GoogleCloudPlatform/DataflowJavaSDK@449d8ee · GitHub

sideInput での Window/Trigger に関して race condition の修正とのこと。

Streaming モードで Windows がある状態での sideInput がどういう扱いになるのか(どのタイミングのものがその Window で取得できるのか、どう同期されるのか)というのは謎のひとつなのですが(試したことがないです)、WorkItem の処理中はキャッシュしておくようにしたというコミットログなので、やっぱりある程度バッファリングされるみたいですね。 あと WorkItem という新しい語彙が出てきました。WorkItem は Dataflow の Streaming モードの元となっている Windmill の用語みたいです。

Move to the backend

Move GroupByKey expansion to the backend. · GoogleCloudPlatform/DataflowJavaSDK@2eff826 · GitHub

タイトルの通りで、WindowFn つきの GroupByKey をよりプリミティブなオペレーションに展開するのをサービス側にもっていって SDK では展開しないようにしているというもので、これ自体はさほど大きな変更ではないのですが、サービス側に遮蔽される部分を増やす方向で SDK だけ読んでもなぁという感じではありますね。

Streaming mode cache improvement, deprecate winthXXX APIs, StreamingDataflowWorker, DoFnWithContext

先月の更新直後に beta リリースがアナウンスされました。それからしばらくコミットが止まっていたのですが、5月中旬からまた大量のコミットが push されはじめたので徐々に追随していきます。

Streaming mode cache improvement

Improve caching in StreamingModeExecutionContext. · GoogleCloudPlatform/DataflowJavaSDK@82e5e9a · GitHub は Stream mode でのパフォーマンスの改善についての変更のようですが、いくつかよくわからない気になるキーワードが出てきます。 DoFn.RequiresKeyedState という interface があって、どうもこいつは bundle を処理中に(worker はまたがない?)実行時情報を保持しておくストレージとして使えるなにかを指しているようなのですが、どうもこれも実体は Dataflow のクラウド側のみにあって、 SDK 側には実装はないみたいです。

deprecate winthXXX APIs

Fix use of setXXX (which mutates an object) versus withXXX (which sho… · GoogleCloudPlatform/DataflowJavaSDK@89bc303 · GitHub は重大な変更というよりは API の deprecated についてで、withXXX() というメソッド(transform にちょっとパラメータを追加したりするのによく使われてます)を addXXX() とか setXXX() と改名して古いメソッド名は deprecated にしています。@Deprecated という directive? をつけているので多分警告が出るんでしょうね。 コメントをみると withXXX はオブジェクトを破壊的に変更しないことが期待されるので名前を変更したっぽいことが書いてあります。

StreamingDataflowWorker

Add a healthz page to the streaming dataflow worker to support health… · GoogleCloudPlatform/DataflowJavaSDK@a09fe13 · GitHub はちょっとした変更で StreamingDataflowWorker.handle で "/healthz" というヘルスチェック用のパスに対応しているだけなのですが、そもそもこの StreamingDataflowWorker というクラスはどうも HTTP サーバとして口をあけていて、処理状況や利用リソースについての情報を返すものみたいです。GCP のコンソール上で Dataflow の各 Transform 毎の処理がどのくらい進んでいるかをみることができるので、これを使って情報を収集しているのかもしれませんね。クラウド上でもこれが動いているのかよくわかりませんが。

DoFnWithContext

Introduce DoFnWithContext, an annotation based version of DoFn. · GoogleCloudPlatform/DataflowJavaSDK@6e75943 · GitHub で DoFn に代わる(まだ experimental な) DoFnWithContext という interface を導入しています。今度の processElement() は引数に ProcessContext の他に BoundedWindow も受け取り、KeyedState へのアクセスも明示的に指定しなくても持っているという、まあつまり DoFn に最初からいろいろ後から必要になったものを使えるようにしたいけど、beta リリース後だから互換性が派手に壊れるのはよくないから別の名前で定義しておこう、ということだと思うので次第にこちらに移行して欲しい、というところなのではないかと思います。