[mbedbot] 無線LANでの音声送信の検討(続)

July 17, 2017

引き続き、ESP-WROOM-02による音声送信の話題です。

一旦フラッシュに取り込んだ音声データを保存しておいて、後から取りに行く方法だとどうしても遅延が生じたり、ストリーミングができない、という欠点があるので、別の方法で送信できないか検討してみました。

検討してみた方法は、ストリーミングをサーバに送り込む方法として、MQTTというIoT向けのプロトコルを使ってみる、というものです。インターフェイスの2017/7月号で概要が載っていたのでさらっと読んでみたのですが、基本はブローカーが一手にクライアントからのサブスクライブやパブリッシュの要求に応じてデータのやり取りをする、といったもののようです。基本、TCPコネクションをはり続けるので接続時の3ウェイハンドシェイクによるオーバヘッドがなく、HTTPのような巨大なヘッダもないので効率が良い、とのことです。

ESP8266に対応するArduino Clientのライブラリがこのサイトにありましたので利用させてもらいました(zipをダウンロードしてArduinoでスケッチー>ライブラリをインクルードー>.zipをインストール)。ブローカーはMACにインストールしたnoderedにこちらで紹介されていたMQTTのブローカーノード(mosca)を追加して使うことにしました。noderedは簡単に拡張できるので便利ですね。ちなみに、publishするだけなら標準で付いてくるノードが使えるようです。

MQTTでサンプリングレート8,000Hz,量子化ビット数を8bitで音声を取り込みながらpublishする、ということをやってみたところ、とりあえず連続で長時間送れました。ただし、処理が間に合ってないようで、サンプリングの取りこぼしが結構な頻度で起こってしまうようです。

以下が音声を送るソースです(動作無保証です)。こちらのサイトを参考にさせていただきました。cmdというトピック名でstartという文字列をpublishすると音声配信スタート、stopで停止します。pcmというトピック名でsubscribeすれば音声を取り込めるはずです。

#include <ESP8266WiFi.h>
extern "C" {
#include "user_interface.h"                 // ESP8266用の拡張IFライブラリ
}
#include <PubSubClient.h>
#include <Ticker.h>

#define LED 13              // IO 13(5番ピン) にLEDを接続する
#define BUFBIT 8
#define BUFSIZE (1<<BUFBIT)
#define BUFMASK (BUFSIZE-1)
#define SAMPLING_FREQ (8000)

enum RecState
{
  STS_IDLE, // 待機中
  STS_PREPARING_CONNECTING, // サーバ接続準備中
  STS_CONNECTING, // サーバ接続待ち
  STS_PREPARING_RECORDING, // 録音準備中
  STS_RECORDING, // 録音中
  STS_WAITING_FLASH, // 録音中(送信完了待ち)
  STS_STOPPING_RECORDING, // 録音終了中
};

// Update these with values suitable for your network.
 
const char* ssid = "ssid";
const char* password = "passwd";
const char* mqtt_server = "ipaddress";
volatile int state = STS_IDLE;

WiFiClient espClient;
PubSubClient client(espClient);
 
char mTopic[]="pcm";

Ticker bufferWriter;
int8_t buffer[2][BUFSIZE];
volatile unsigned long bofst = 0;
volatile bool f_flash = false;
volatile int flash_block = 0;
volatile unsigned long dropped = 0;

///////////

void flush_buffer() {
  if (!f_flash) return;
  bool res = client.publish(mTopic, (uint8_t*)buffer[flash_block], BUFSIZE);
  if(!res) {
    Serial.println("Publish Failed!¥n");
    delay(100);
  }
  f_flash = false;
}

ICACHE_RAM_ATTR void get_audio() {
  if (state!=STS_RECORDING) { // check if flash is not done yet
    dropped++;
    return;
  }
  buffer[(bofst >> BUFBIT) & 1][bofst & BUFMASK] = (int8_t)((system_adc_read() - 512) / 4);
  bofst++;
  if ((bofst & BUFMASK) == 0) { // overflow
    if(bofst >= BUFSIZE * 2) {
      bofst = 0;
    }
    if(!f_flash) {
      flash_block = 1 - ((bofst >> BUFBIT) & 1);
      f_flash = true;
    }
    else {
      state = STS_WAITING_FLASH;
    }
  }
}

void start_sampling(unsigned int frequency) {
  if(state != STS_PREPARING_RECORDING) return;
  Serial.println("Audio sampling started.");
  digitalWrite(LED,HIGH); // ポートをHレベル(3.3V)に設定(点灯)
  f_flash = false;
  //bufferWriter.attach(0.02, flush_buffer);
  
  bofst = 0;
  timer1_disable();
  timer1_isr_init();
  timer1_attachInterrupt(get_audio);
  timer1_enable(TIM_DIV1, TIM_EDGE, TIM_LOOP);
  timer1_write((clockCyclesPerMicrosecond() * 1000000) / frequency);
  state = STS_RECORDING;
}

void stop_sampling() {
  char cbuf[32];
  if(state != STS_STOPPING_RECORDING) return;
  Serial.println("Audio sampling stoped.");
  sprintf(cbuf, "sample dropped %d times", dropped);
  Serial.println(cbuf);
  timer1_disable();
  timer1_detachInterrupt();
  //bufferWriter.detach();
  if(client.connected()) {
    flush_buffer(); // フラッシュ待ちデータを送信
    client.publish(mTopic, (uint8_t*)buffer[(bofst >> BUFBIT) & 1], bofst & BUFMASK); // 最後のデータ
  }
  digitalWrite(LED,LOW);  // ポートをLレベル(0V)に設定(消灯)
  state = STS_IDLE;
}

///////////

 
void setup() {
  pinMode(LED, OUTPUT);     // Initialize the BUILTIN_LED pin as an output
  Serial.begin(115200);
  setup_wifi();
  client.setServer(mqtt_server, 1883);
  client.setCallback(callback);
  bofst = 0;
  f_flash = false;
  state = STS_PREPARING_CONNECTING;
}
 
void setup_wifi() {
 
  delay(10);
  // We start by connecting to a WiFi network
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);
 
  WiFi.begin(ssid, password);
 
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
 
  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}
 
void callback(char* topic, byte* payload, unsigned int length) {
  Serial.print("Message arrived [");
  Serial.print(topic);
  Serial.print("] ");
  Serial.print((char *)payload);
  if(strncmp((char *)payload, "start", length) == 0 && state == STS_IDLE){
    state = STS_PREPARING_RECORDING;
  }
  else if(strncmp((char *)payload, "stop", length) == 0 && (state == STS_WAITING_FLASH || state == STS_RECORDING)) {
    state = STS_STOPPING_RECORDING;
  }
}
 
void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Attempt to connect
    if (client.connect("ESP8266Client")) {
      Serial.println("connected");
      // Once connected, publish an announcement...
      //client.publish("outTopic", "hello world");
      // ... and resubscribe
      client.subscribe("cmd");
      state = STS_IDLE;
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}
 
void loop() {
  switch(state) {
    case STS_PREPARING_CONNECTING:
      state = STS_CONNECTING;
      reconnect();
      break;
    case STS_CONNECTING:
      break;
    case STS_PREPARING_RECORDING:
      if(!client.connected()) {
        state=STS_PREPARING_CONNECTING;
      }
      else {
        start_sampling(SAMPLING_FREQ);
      }
      break;
    case STS_RECORDING:
      if(!client.connected()) {
        state=STS_STOPPING_RECORDING;
      }
      else {
        flush_buffer();
        client.loop();
      }
      break;
    case STS_IDLE:
      if(!client.connected()) {
        state=STS_PREPARING_CONNECTING;
      }
      else {
        client.loop();
      }
      break;
    case STS_WAITING_FLASH:
      if(!client.connected()) {
        state=STS_STOPPING_RECORDING;
      }
      else {
        if(!f_flash){
          flash_block = 1 - ((bofst >> BUFBIT) & 1);
          f_flash = true;
          state = STS_RECORDING;  
        }
        client.loop();
      }
      break;
    case STS_STOPPING_RECORDING:
      stop_sampling();
      break;
  }
  yield();
}

※MQTTクライアントライブラリのPubSubClient.hのMQTT_MAX_PACKET_SIZEを512(1回のpublishで送るデータ+ヘッダを上回る数値を指定)、MQTT_MAX_TRANSFER_SIZE(1回のwriteで送るデータの上限)を80に調整してます(この対応をしないと送信されなかったりリセットがかかってしまうため)。