KubernetesでTensorFlowOnSparkを試してみた① minioを使用

January 26, 2020

自宅のKubernetesクラスタにSpark3.0.0-Previewを導入して、Yahoo!のTensorFlowOnSparkを試してみました。結構ハマったので、備忘録としてメモを残しておきます。ちなみにSpark(Apache Spark)は、v2.3.0よりKubernetesNativeサポートしているとのことです(ただ、まだまだ開発中で、制約はあるようです)。

 
①Spark3.0.0-Previewのインストール(イメージビルドとプッシュ)
まずは、Sparkを動かすためのDockerイメージをビルドして、適当なDockerレジストリにプッシュする必要があります。今回は全部マスターノードで作業し、Dockerレジストリはクラスタ内に立てました(registry.myhost.com)。
まずは、sparkのダウンロードページから、3.0.0-preview2(Dec 23 2019)のPre-build for Apache Hadoop 3.2 and laterをダウンロードして、マスターの/opt/spark以下に展開します。
そして下記のコマンドで、spark本体とpythonのジョブ 実行用のイメージ(spark、spark-py)を作成します。後者の方は、TensorFlowOnSparkを入れるため、後ほどカスタマイズします。
cd /opt/spark
sudo bin/docker-image-tool.sh -r registry.myhost.com/myname -t v3.0.0.1 build
sudo bin/docker-image-tool.sh -r registry.myhost.com/myname -t v3.0.0.1 -p kubernetes/dockerfiles/spark/bindings/python/Dockerfile build

docker login -u myname https://registry.myhost.com
sudo bin/docker-image-tool.sh -r registry.myhost.com/myname -t v3.0.0.1 push

②Sparkのタスクを実行するアカウントを作成

kubectlでアカウント(spark)を作成しておきます。mysparkという名前空間を作成してそこでSparkのドライバとエグゼキューターを動作させることにしました。

kubectl create serviceaccount spark -n myspark
kubectl create clusterrolebinding spark-role --clusterrole=edit --serviceaccount=myspark:spark --namespace=myspark

ここまでの作業が完了すればでとりあえずSparkに付属しているサンプルプログラムは動かせます。例えば、下記のようなコマンドで円周率を求めるサンプルプログラムを実行できます(Jarのパスは違ってるかもしれません)。

/opt/spark/bin/spark-submit \
    --master k8s://https://192.168.1.25:6443 \
    --deploy-mode cluster \
    --conf spark.executor.instances=3 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=myspark \
    --conf spark.kubernetes.container.image=registry.myhost.com/myname/spark-py:v3.0.0.1 \
    --conf spark.kubernetes.pyspark.pythonVersion=3 \
    --name spark-python \
    local:///opt/spark/examples/src/main/python/pi.py 3

基本、プログラム実行は上記のようにspark-submitを使えば良いようです。結果はdriverのpodのログを確認する必要があります。

③TensorFlowOnSparkをインストールしたイメージを作成する

最後に、spark-pyに対して、TensorFlowOnSparkをインストールしたイメージを作成します。イメージをビルドするために作成したDockerfileは下記の通りです。なお、最後の二行は、Sparkで動作させるアプリからS3互換の分散オブジェクトストレージであるminio上のファイルにアクセスするために追加したものです。

FROM registry.myhost.com/myname/spark-py:v3.0.0.1
USER 0
RUN pip3 install --upgrade pip setuptools
RUN pip3 install numpy 
RUN pip3 install tensorflow
RUN pip3 install tensorflowonspark
ADD ./aws-java-sdk-bundle-1.11.375.jar /opt/spark/jars
ADD ./hadoop-aws-3.2.0.jar /opt/spark/jars

とりあえず、このDockerfileで作成したイメージ(pyspark-test)を使って、tensorflowonsparkのサンプルプログラムを動かすことができました。minioへのファイルの書き出しや、読み込みも問題なく動作しました。ちなみに、今回アプリからminioにアクセスするために使用した上記のjarファイルがなかなか特定できず、かなりハマってしまった(別のバージョンのjarを使うとS3AFileSystemクラスが見つからなかったり、NumberFormatExceptionが出たりした)のですが、このサイトのおかげでようやく解にたどり着きました。このイメージを使って、下記のようにTensorFlowOnSparkのサンプルプログラム(mnistの学習データとテストデータを作成する)を実行してみたところ、minioにデータを作成することができました。

/opt/spark/bin/spark-submit \
    --master k8s://https://192.168.1.25:6443 \
    --deploy-mode cluster \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=myspark \
    --conf spark.kubernetes.container.image=registry.myhost.com/myname/pyspark-test \
    --conf spark.kubernetes.pyspark.pythonVersion=3 \
    --conf spark.hadoop.fs.s3a.access.key=YOUR_MINIO_ACCESSKEY \
    --conf spark.hadoop.fs.s3a.fast.upload=true \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.hadoop.fs.s3a.endpoint=http://minio.minio:9000 \
    --conf spark.hadoop.fs.s3a.secret.key=YOUR_MINIO_SECRETKEY \
    --conf spark.hadoop.validateOutputSpecs=false \
    --name spark-python \
    local:///TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
    --output s3a://common/mnist --num_partitions 5

spark.hadoop.validateOutputSpecs=falseとあるのは、ファイルの上書きを許可するためのもので、これがないと既に出力先フォルダがある場合はエラーになります(参考サイト)。トレーニングについては、下記のように実行すると成功しました。生成されたモデルは永続ボリューム(/pvcにマウント)に保存するようにしています。

/opt/spark/bin/spark-submit \
    --master k8s://https://192.168.1.25:6443 \
    --deploy-mode cluster \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=myspark \
    --conf spark.kubernetes.container.image=registry.myhost.com/myname/pyspark-test \
    --conf spark.kubernetes.pyspark.pythonVersion=3 \
    --conf spark.hadoop.fs.s3a.access.key=YOUR_MINIO_ACCESSKEY \
    --conf spark.hadoop.fs.s3a.fast.upload=true \
    --conf spark.hadoop.fs.s3a.path.style.access=true \
    --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
    --conf spark.hadoop.fs.s3a.endpoint=http://minio.minio:9000 \
    --conf spark.hadoop.fs.s3a.secret.key=YOUR_MINIO_SECRETKEY \
    --conf spark.hadoop.validateOutputSpecs=false \
    --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.volume.mount.path=/pvc \
    --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.volume.mount.readOnly=false \
    --conf spark.kubernetes.executor.volumes.persistentVolumeClaim.volume.options.claimName=spark-disk \
    --name spark-python \
    local:///TensorFlowOnSpark/examples/mnist/keras/mnist_spark.py \
    --export_dir /pvc/mnist_export \
    --images_labels s3a://common/mnist/csv/train

次は、スタンドアローンな環境と、今回の分散環境で実行した場合とでどのくらいパフォーマンスに差が出るのか、評価してみたいと思います。