jupyter notebookをWEBAPIで自動実行する

February 03, 2022

以前書いたように、Google Colaboのノートブックをchromeから自動実行させることに成功したのですが、最近またエラーがたびたび出るようになりました。そのたびにchromeを再起動したり、いろいろ面倒だったので、結局Google Colaboの自動実行はあきらめて、ローカルのサーバでjupyter notebookを立ち上げて、定期的にWEBAPI経由でノートブックを実行するようにしました。

まずjupyter notebookサーバの構築です。今回はkubernetesのクラスタ上でjupter notebookを起動しました。以下のyamlをkubectlでapplyするだけです(namespaceはml)。デフォルトだとトークンが毎回変わってしまうので、WEBAPIから呼び出すときに毎回トークンを変更する必要が出てきます。それを避けるため、固定でトークンを指定しています。

---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  labels:
    app: myclaim
  name: myclaim
  namespace: ml
spec:
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: 50Gi
  storageClassName: nfs
---
apiVersion: v1
kind: Service
metadata:
  namespace: ml
  name: jupyter2
  labels:
    app: jupyter2
spec:
  ports:
  - port: 8888
    targetPort: 8888
    nodePort: 30888
    name: http
  selector:
    app: jupyter2
  type: NodePort
---
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: ml
  name: jupyter2-deployment
  labels:
    app: jupyter2
spec:
  replicas: 1
  selector:
    matchLabels:
      app: jupyter2
  template:
    metadata:
      labels:
        app: jupyter2
    spec:
      containers:
        - name: jupyter2
          image: jupyter/tensorflow-notebook
          command: ["start.sh"]
          args: ["jupyter","lab","--LabApp.token='mytoken'"]
          ports:
          - containerPort: 8888
          volumeMounts:
          - mountPath: /home/jovyan/work
            name: jupyter2
      volumes:
      - name: jupyter2
        persistentVolumeClaim:
          claimName: myclaim

jupyter notebookが無事立ち上がったら、ブラウザからアクセスして、実行したいノートブック(拡張子がipynbのファイル)をworkの下にアップロードしておきます。

次に、WEBAPIからこのノートブックを実行するスクリプト(python)を作成しました。基本的にはこのサイトをのソースを使わせていただいたのですが、ノートブックの実行が終わってもカーネルが立ち上がりっぱなしになるため何回も実行するとメモリ不足になったり、異常系の処理がなかったので、少し改造してみました。

from datetime import timedelta
import json
import requests
import websocket  # pip install websocket-client
import uuid
import sys

# JupyterサーバのURL
notebook_host = '192.168.0.50:30888'

# 実行するノートブックファイル
file_path = 'work/target.ipynb'

# リクエスト共通のヘッダ
headers = {
    # Jupyter起動時のトークンをセット
    'Authorization': 'token ' + 'mytoken'
}

# 終了時の出力文字列
finished_sentence = '___all_cell_executed___'

def task_exec_notebook():
    base_url = 'http://' + notebook_host
    # ノートブックファイルの取得
    url = base_url + '/api/contents/' + file_path
    response = requests.get(url, headers=headers)
    notebook = json.loads(response.text)

    # ノートブックファイルのコードのみを取得
    codes =  for c in notebook['content']['cells'] if c['cell_type'] == 'code']
    
    codes.append('print("' + finished_sentence + '", end="")')  # 改行しないようにendを空文字で指定

    # カーネルの起動
    url = base_url + '/api/kernels'

    response = requests.post(url, headers=headers)  # getでカーネルのリストを取得できます
    kernel = json.loads(response.text)
    # print(kernel)

    # WebSocketで接続
    url = 'ws://' + notebook_host +'/api/kernels/' + kernel['id'] + '/channels'

    socket = websocket.create_connection(url, header=headers)
    # print(socket.status)  # 101

    # コードを実行
    for code in codes:
        header = {
            'msg_type': 'execute_request',
            'msg_id': uuid.uuid1().hex,
            'session': uuid.uuid1().hex
        }
        
        message = json.dumps({
            'header': header,
            'parent_header': header,
            'metadata': {},
            'content': {
                'code': code,
                'silent': False
            }
        })
        
        # 送信
        socket.send(message)

    # 結果の保持
    outputs = []
    output = ''
    if_success = False
    while True:
        response = json.loads(socket.recv()) 
        # print(response)
        # エラー処理
        if 'msg_type' not in response:
            print("Unexpected response.")
            break
        msg_type = response['msg_type']
        if msg_type == 'error':
            print("Error.")
            break

        if msg_type == 'stream':
            output = response['content']['text']            
            if output == finished_sentence: # 終了判定
                if_success = True
                break
            else:
                outputs.append(output)
    # print(outputs)

    socket.close()
    # カーネル閉じる
    url = base_url + '/api/kernels/' + kernel['id']
    response = requests.delete(url, headers=headers) 
    if not if_success:
        sys.exit(1)

if __name__ == '__main__':
    task_exec_notebook()

ひとまずこれで動いているようです。k8sで動かしてみた感想としては、機械学習はメモリーをかなり消費しますので、無駄にカーネルを起動すると動作が不安定になるため、動作状況には気を配る必要はありそうです。