KubernetesでTensorFlowOnSparkを試してみた② HDFSを使用

February 15, 2020

前回はminioにmnistの学習データをおいて、TensorFlowOnSparkのサンプルソース(mnist_spark.py)を実行してみました。このページによると、このソースはInputModeにSPARKモードを設定してcsvを読み込んでいるのですが、このページによると、TENSORFLOWモードの方がTensorFlowのnativeなAPIを使うので、効率的に処理できるそうです。サンプルソースの中にこのモードを使用したもの(mnist_ts_ds.py)がありましたので、試してみました。

mnist_ts_ds.pyでは、mnist_data_setup.pyで生成するデータ(TFRecordsというTensorFlow専用のフォーマット)を読み込みますが、残念ながらこのデータをminioに作成することができませんでした(理由は不明)。そのため、データの置き場所をHDFSに変更することにしました。HDFSのインストールはこのページを参考にして、下記のようにhelmコマンドで行いました(インストール先はいつものように自前のkubernetesクラスタです)。

git clone https://github.com/apache-spark-on-k8s/kubernetes-HDFS.git
cd kubernetes-HDFS
helm dependency build charts/hdfs-k8s
helm install --name hdfs —-namespace myhdfs charts/hdfs-k8s

前回使用したsparkの環境にはHadoop関係のファイルも含まれていましたが、TensorFlowのソースからHDFSを使用する関係上、今回はHadoopの環境一式を別途イメージに含めることにしました。spark-3.0.0-preview2-bin-without-hadoop.tgzを/optの下に展開して、このページからhadoop-2.7.7.tar.gzをダウンロードしてsparkのフォルダの下に展開しました(それぞれのルートフォルダ名を/opt/spark、/opt/spark/hadoopにリネーム)。sparkのイメージは下記のようにDockerfileを変更してから前回と同じ手順でビルドします。

ENV SPARK_HOME /opt/spark

# 下記の2行を追加
ENV HADOOP_HOME /opt/hadoop
COPY ./hadoop $HADOOP_HOME

WORKDIR /opt/spark/work-dir
RUN chmod g+w /opt/spark/work-dir

次は、TensorFlowOnSparkを動かすためのイメージの作成です。最初にpip3でtensorflow関係をインストールした後、TensorFlowからHDFSを利用するためのjarファイル(tensorflow-hadoop-1.15.0.jar)を追加したり、共有ライブラリのパスを追加しています(こちらのページを参考)。TensorFlowOnSparkのソースも、丸ごとコピーしています(サンプルソースだけを入れればよかったのですが、横着してます・・)。

FROM registry.myhost.com/myname/spark-py:v3.0.0.20
USER 0
RUN pip3 install --upgrade pip setuptools
RUN pip3 install numpy 
RUN pip3 install tensorflow
RUN pip3 install tensorflowonspark
RUN pip3 install tensorflow_datasets

# to use hdfs from tensorflow
RUN echo "/opt/hadoop/lib/native" > /etc/ld.so.conf.d/hadoop-lib.conf && echo "/usr/local/openjdk-8/jre/lib/amd64/server" > /etc/ld.so.conf.d/jvm.conf && ldconfig -v
ADD ./tensorflow-hadoop-1.15.0.jar /opt/spark/jars

# copy src(for example)
ADD ./TensorFlowOnSpark /TensorFlowOnSpark

# copy hdfs config
ADD ./hdfs-site.xml /opt/hadoop/etc/hadoop
ADD ./core-site.xml /opt/hadoop/etc/hadoop
ENV HADOOP_CONF_DIR /opt/hadoop/etc/hadoop
ADD ./entrypoint.sh /opt

# create user
RUN mkdir /home/myname && addgroup --gid 1000 myname && \
    adduser -u 1000 --disabled-password \
    --gecos ""  --home /home/myname \
    --ingroup myname --no-create-home myname && \
    chown myname:myname /home/myname

# Specify the User that the actual main process will run as
ARG spark_uid=myname
USER ${spark_uid}
WORKDIR /home/myname

その後に追加しているhdfs-site.xml、core-site.xmlはHDFSの設定ファイルです。これらは、次のコマンドでkubernetesに設定された内容をダンプしたものを、それぞれコピーしたものになります。

kubectl describe configmap hdfs-config

entrypoint.shには、「/opt/spark/kubernetes/dockerfiles/spark/entrypoint.sh」のファイルに次のように一行だけ追加してます。これは、TensorFlowが利用する共有ライブラリのlibhdfs.soからjarファイルや設定ファイルを検索する際にCLASSPATHを設定しておかないとエラーになるためです。こちらのページを参考にしました。

if ! [ -z ${HADOOP_CONF_DIR+x} ]; then
  SPARK_CLASSPATH="$HADOOP_CONF_DIR:$SPARK_CLASSPATH";
fi

# 下記の一行を追加
export CLASSPATH=$($HADOOP_HOME/bin/hadoop classpath --glob):${HADOOP_CONF_DIR}

case "$1" in

最後にユーザーを追加して、sparkの実行ユーザーにしていますが、これはHDFSへのアクセスでパーミッションエラーが出るのを防ぐためです。

このDockerfileでイメージをビルドし、レジストリにプッシュすればようやくmnistのサンプルソースコードの実行です。まずは学習データの作成です。

/opt/spark/bin/spark-submit \
    --master k8s://https://192.168.1.25:6443 \
    --deploy-mode cluster \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=myspark \
    --conf spark.kubernetes.container.image=registry.myhost.com/myname/pyspark-test \
    --conf spark.kubernetes.pyspark.pythonVersion=3 \
    --conf spark.hadoop.validateOutputSpecs=false \
    --name spark-python \
    local:///TensorFlowOnSpark/examples/mnist/mnist_data_setup.py \
    --output hdfs://hdfs-k8s/test5/mnist --num_partitions 5

これは特に問題なく、実行終了しました。下記のコマンドでデータが作成されていること(test、trainフォルダが存在する)を確認できました。

kubectl exec $(kubectl get pod -l app=hdfs-client,release=hdfs -o name | cut -d/ -f2)  -- hadoop fs -ls /test5/mnist/tfr

続いて、次のコマンドで学習させてみます。

/opt/spark/bin/spark-submit \
    --master k8s://https://192.168.1.25:6443 \
    --deploy-mode cluster \
    --conf spark.executor.instances=5 \
    --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
    --conf spark.kubernetes.namespace=myspark \
    --conf spark.kubernetes.container.image=registry.myhost.com/myname/pyspark-test \
    --conf spark.kubernetes.pyspark.pythonVersion=3 \
    --conf spark.hadoop.validateOutputSpecs=false \
    --name spark-python \
    local:///TensorFlowOnSpark/examples/mnist/keras/mnist_tf_ds.py \
    --export_dir hdfs://hdfs-k8s/test5/mnist/export \
    --images_labels "hdfs://hdfs-k8s/test5/mnist/tfr/train/part-*" \
    --data_format tfos

これで、成功すればOKなんですが、残念ながら以下のエラーが出てしまいました。

Tensorflow TFRecord: Can't parse serialized example

どうやら、サンプルソースにバグがあるっぽいです。さらっとソースを見たところ、どうやら学習データを保存しているところでlabelの次元が読み込むところと違っているようでした。保存は1つの値なのですが、読み込みは10個の値を読んでいます。恐らく、Labelをone-hot表現に変換するのを忘れたのかな、と思い、下記のようなパッチを当ててみました。

diff --git a/examples/mnist/mnist_data_setup.py b/examples/mnist/mnist_data_setup.py
index 4539875..a8f3b82 100644
--- a/examples/mnist/mnist_data_setup.py
+++ b/examples/mnist/mnist_data_setup.py
@@ -14,6 +14,7 @@ if __name__ == "__main__":
   from pyspark.conf import SparkConf
   import tensorflow as tf
   import tensorflow_datasets as tfds
+  import numpy as np
 
   parser = argparse.ArgumentParser()
   parser.add_argument("--num_partitions", help="Number of output partitions", type=int, default=10)
@@ -48,7 +49,7 @@ if __name__ == "__main__":
     ex = tf.train.Example(
       features=tf.train.Features(
         feature={
-          'label': tf.train.Feature(int64_list=tf.train.Int64List(value=[example['label'].astype("int64")])),
+          'label': tf.train.Feature(int64_list=tf.train.Int64List(value=np.identity(10, dtype=np.int64)[example['label']] )),
           'image': tf.train.Feature(int64_list=tf.train.Int64List(value=example['image'].reshape(784).astype("int64")))
         }
       )

すると、一応このエラーは解消したのですが、また新たな例外が発生してしまいました。

  File "/usr/local/lib/python3.7/dist-packages/tensorflow_core/python/eager/execute.py", line 67, in quick_execute
    six.raise_from(core._status_to_exception(e.code, message), None)
  File "<string>", line 3, in raise_from
tensorflow.python.framework.errors_impl.CancelledError:  RPC Request was cancelled
	 [[node CollectiveReduce (defined at usr/local/lib/python3.7/dist-packages/tensorflowonspark/compat.py:15) ]] [Op:__inference_collective_all_reduce_3180]

Function call stack:
collective_all_reduce

このエラーをググってみたものの、情報が少なく、手がかりが掴めていません。ちなみに、mnist_spark.pyの方はデータの置き場所をHDFSに変更しても、問題なく動作しましたので、HDFSの問題ではなさそうです。と言うことは、TensorFlow、あるいはTensorFlowOnSpark側の問題かもしれませんね・・。