ESP-IDFを使ってみる

FreeRTOSの新機能 Ring Buffers


FreeRTOSのV10にはxMessageBufferとxStreamBufferの機能が有りますが、
ESP-IDFが使用しているFreeRTOSはまだV8なので、xMessageBufferとxStreamBufferの機能が有りませ ん。
(ESP-IDFのV4.3でFreeRTOSがV10になりました)

可変長のデータをタスク間で受け渡しする機能として、ESP-IDFではRing Buffersの機能が追加されています。
Ring Buffersについては、追加機能としてこ ちらに詳しく紹介されています。
最初の方だけGoogleで翻訳してみました。

アイテムのサイズが可変である状況では、リングバッファはFreeRTOSキューよりもメモリ効率の良い代替手段です。
リングバッファの容量は、格納できるアイテムの数ではなく、アイテムの格納に使用されるメモリの量で測定されます。
項目はコピーによってリングバッファに送信されますが、効率上の理由から、項目は参照によって取得されます。
結果として、それらがリングバッファから完全に削除されるためには、検索されたすべての項目も返されなければなりません。
リングバッファは、次の3つのタイプ

分割なし
バッファは、アイテムが連続したメモリに格納されることを保証し、いかなる状況下でもアイテムを分割しようとしません。
項目が連続したメモリを占有する必要がある場合は、分割なしバッファを使用します。
送信時にバッファの空きが十分にないときは、バッファの先頭にデータが格納される。


分割許可
それをすることがアイテムが格納されるのを許すならば、バッファはラップするときアイテムが分割されるのを許すでしょう。
許可分割バッファは、非分割バッファよりもメモリ効率がよくなりますが、取得時に2つの部分に分けて項目を返すことができます。
送信時にバッファの空きが十分にないときは、データを2つに分割して、バッファの末尾とバッファの先頭にデータが格納される。


バイトバッファ
データを個別の項目として保存しないでください。
すべてのデータは一連のバイト、および任意の数のバイトとして格納され、毎回送信または取得されます。
個別の項目を維持する必要がない場合はバイトバッファを使用してください



分割無しと分割許可の違いは、バッファの末尾に到達して、折り返し(Wrap)するときの動作が違います。
以下のコードでその違いを確認する事ができます。
なお、ドキュメントにはMutexによる排他制御のことが書いてなかったので、一応Mutexを使って排他制御をしていますが、
もしかしたら排他制御は不要かもしれません。
/* The example of ESP-IDF
 *
 * This sample code is in the public domain.
 */
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/ringbuf.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "esp_log.h"

#define No_Split
//#define Allow_Split

TaskHandle_t xTask1 = NULL, xTask2 = NULL;
SemaphoreHandle_t xSemaphore;
RingbufHandle_t xRingbuffer;

void task1(void *pvParameter)
{
    TickType_t nowTick;
    UBaseType_t res;
    char tx_item[64];
    size_t freeSize;
    UBaseType_t prio = uxTaskPriorityGet(NULL);
    nowTick = xTaskGetTickCount();
    printf("[%s:%d] Start Priority=%d\n",pcTaskGetTaskName(NULL),nowTick,prio);

    //Send an item
    xSemaphoreTake(xSemaphore, portMAX_DELAY);
    xRingbufferPrintInfo(xRingbuffer);
    memset(tx_item,'A',sizeof(tx_item));
    res =  xRingbufferSend(xRingbuffer, tx_item, 32, pdMS_TO_TICKS(1000));
    if (res != pdTRUE) {
        printf("Failed to send item\n");
    }
    nowTick = xTaskGetTickCount();
    freeSize = xRingbufferGetCurFreeSize(xRingbuffer);
    printf("[%s:%d] freeSize=%d\n",pcTaskGetTaskName(NULL),nowTick,freeSize);
    xSemaphoreGive(xSemaphore);
    vTaskDelay(1);

    xSemaphoreTake(xSemaphore, portMAX_DELAY);
    xRingbufferPrintInfo(xRingbuffer);
    memset(tx_item,'B',sizeof(tx_item));
    res =  xRingbufferSend(xRingbuffer, tx_item, 16, pdMS_TO_TICKS(1000));
    if (res != pdTRUE) {
        printf("Failed to send item\n");
    }
    nowTick = xTaskGetTickCount();
    freeSize = xRingbufferGetCurFreeSize(xRingbuffer);
    printf("[%s:%d] freeSize=%d\n",pcTaskGetTaskName(NULL),nowTick,freeSize);
    xSemaphoreGive(xSemaphore);
    vTaskDelay(1);

    xSemaphoreTake(xSemaphore, portMAX_DELAY);
    xRingbufferPrintInfo(xRingbuffer);
    memset(tx_item,'C',sizeof(tx_item));
    res =  xRingbufferSend(xRingbuffer, tx_item, 40, pdMS_TO_TICKS(1000));
    if (res != pdTRUE) {
        printf("Failed to send item\n");
    }
    nowTick = xTaskGetTickCount();
    freeSize = xRingbufferGetCurFreeSize(xRingbuffer);
    printf("[%s:%d] freeSize=%d\n",pcTaskGetTaskName(NULL),nowTick,freeSize);
    xSemaphoreGive(xSemaphore);
    vTaskDelay(1);

    xSemaphoreTake(xSemaphore, portMAX_DELAY);
    xRingbufferPrintInfo(xRingbuffer);
    memset(tx_item,'D',sizeof(tx_item));
    res =  xRingbufferSend(xRingbuffer, tx_item, 28, pdMS_TO_TICKS(1000));
    if (res != pdTRUE) {
        printf("Failed to send item\n");
    }
    nowTick = xTaskGetTickCount();
    freeSize = xRingbufferGetCurFreeSize(xRingbuffer);
    printf("[%s:%d] freeSize=%d\n",pcTaskGetTaskName(NULL),nowTick,freeSize);
    xSemaphoreGive(xSemaphore);
    vTaskDelay(1);

    while(1) {
      vTaskDelay(100);
    }
}


void task2(void *pvParameter)
{
    TickType_t nowTick;
    UBaseType_t prio = uxTaskPriorityGet(NULL);
    nowTick = xTaskGetTickCount();
    printf("[%s:%d] Start Priority=%d\n",pcTaskGetTaskName(NULL),nowTick,prio);


#ifdef No_Split
    size_t item_size;
    while (1) {
      xSemaphoreTake(xSemaphore, portMAX_DELAY);
      //Receive an item from no-split ring buffer
      char *item = (char *)xRingbufferReceive(xRingbuffer, &item_size, pdMS_TO_TICKS(1000));

      //Check received item
      if (item != NULL) {
        //Print item
        nowTick = xTaskGetTickCount();
        printf("[%s:%d] item_size=%d:",pcTaskGetTaskName(0),nowTick,item_size);
        for (int i = 0; i < item_size; i++) {
            printf("%c", item[i]);
        }
        printf("\n");
        //Return Item
        vRingbufferReturnItem(xRingbuffer, (void *)item);
#if 0
      } else {
        //Failed to receive item
        printf("Failed to receive item\n");
#endif
      }
      xSemaphoreGive(xSemaphore);
//      nowTick = xTaskGetTickCount();
//      printf("[%s:%d] Give\n",pcTaskGetTaskName(NULL),nowTick);
    }
#endif

#ifdef Allow_Split
    //Receive an item from allow-split ring buffer
    size_t item_size1, item_size2;
    char *item1, *item2;
    while (1) {
      xSemaphoreTake(xSemaphore, portMAX_DELAY);
      //Receive an item from allow-split ring buffer
      BaseType_t ret = xRingbufferReceiveSplit(xRingbuffer, (void **)&item1, (void **)&item2, &item_size1, &item_size2, pdMS_TO_TICKS(1000));

    //Check received item
      if (ret == pdTRUE && item1 != NULL) {
        nowTick = xTaskGetTickCount();
        printf("[%s:%d] item_size1=%d:",pcTaskGetTaskName(0),nowTick,item_size1);
        for (int i = 0; i < item_size1; i++) {
            printf("%c", item1[i]);
        }
        printf("\n");
        vRingbufferReturnItem(xRingbuffer, (void *)item1);
        //Check if item was split
        if (item2 != NULL) {
            printf("[%s:%d] item_size2=%d:",pcTaskGetTaskName(0),nowTick,item_size2);
            for (int i = 0; i < item_size2; i++) {
                printf("%c", item2[i]);
            }
            printf("\n");
            vRingbufferReturnItem(xRingbuffer, (void *)item2);
        }
#if 0
      } else {
        //Failed to receive item
        printf("Failed to receive item\n");
#endif
      }
      xSemaphoreGive(xSemaphore);
    }
#endif

}


void app_main()
{
    TickType_t nowTick;
    vTaskDelay(2000/portTICK_PERIOD_MS);
    UBaseType_t prio = uxTaskPriorityGet(NULL);
    nowTick = xTaskGetTickCount();
    printf("[%s:%d] Start Priority=%d\n",pcTaskGetTaskName(NULL),nowTick,prio);
    printf("[%s:%d] portTICK_PERIOD_MS=%d\n",pcTaskGetTaskName(NULL),nowTick,portTICK_PERIOD_MS);

    /* Create Mutex */
    xSemaphore = xSemaphoreCreateMutex();

    /* Create Ring Buffer */
#ifdef No_Split
    xRingbuffer = xRingbufferCreate(128, RINGBUF_TYPE_NOSPLIT);
#endif
#ifdef Allow_Split
    xRingbuffer = xRingbufferCreate(128, RINGBUF_TYPE_ALLOWSPLIT);
#endif

    /* Check everything was created. */
    configASSERT( xSemaphore );
    configASSERT( xRingbuffer );

    /* Create task */
    xTaskCreatePinnedToCore(task1, "Task1", 4096, NULL, 1, &xTask1, tskNO_AFFINITY);
    xTaskCreatePinnedToCore(task2, "Task2", 4096, NULL, 1, &xTask2, tskNO_AFFINITY);
}



最初に分割なし(No-Split)の時の動きです。
1回の書き込みで8バイトのヘッダー領域を消費します。
128バイトのバッファに対して、32バイト、16バイト、40バイトのデータを書き込むと、(8+32)+(8+16)+(8+40)=112 バイトのバッファを消費します。
バッファの残りが16バイトの状態で28バイトを書き込むと、末尾の16バイトは無効となり、先頭に回り込んで格納されます。
[main:200] Start Priority=1
[main:200] portTICK_PERIOD_MS=10
[Task2:200] Start Priority=1
[Task1:200] Start Priority=1
Rb size:128     free: 128       rptr: 0 freeptr: 0      wptr: 0
[Task1:200] freeSize=56
[Task2:200] item_size=32:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
Rb size:128     free: 128       rptr: 40        freeptr: 40     wptr: 40
[Task1:301] freeSize=56
[Task2:301] item_size=16:BBBBBBBBBBBBBBBB
Rb size:128     free: 128       rptr: 64        freeptr: 64     wptr: 64
[Task1:401] freeSize=56
[Task2:401] item_size=40:CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
Rb size:128     free: 128       rptr: 112       freeptr: 112    wptr: 112
[Task1:501] freeSize=56
[Task2:501] item_size=28:DDDDDDDDDDDDDDDDDDDDDDDDDDDD

xRingbufferGetCurFreeSize()の戻り値がおかしいので、Espressif Systemsに確認し、以下の回答をもらいました。

【Espressif Systemsからの回答】
xRingbufferGetCurFreeSize()は、アイテム/データが現在リングバッファに送信されている場合に取り得る最大サイズを 返します。
分割不可バッファに送信されるアイテムの最大サイズは、(pxRingbuffer-> xSize / 2) -  8に制限されています。
したがって、上記のログでは、128バイトの空の分割不可バッファは最大カレントサイズになります。
アイテムサイズは(128/2)-8 = 56です。



次に分割許可(Allow-Split)の時の動きです。
こちらも、1回の書き込みで8バイトのヘッダー領域を消費します。
128バイトのバッファに対して、32バイト、16バイト、40バイトのデータを書き込むと、(8+32)+(8+16)+(8+40)=112 バイトのバッファを消費します。
バッファの残りが16バイトの状態で28バイトを書き込むと、末尾の16バイトに一部が格納され、残りは先頭に回り込んで格納されます。
以下の例では[D]の文字が2回に分けて読まれています。
[main:200] Start Priority=1
[main:200] portTICK_PERIOD_MS=10
[Task2:200] Start Priority=1
[Task1:200] Start Priority=1
Rb size:128     free: 128       rptr: 0 freeptr: 0      wptr: 0
[Task1:200] freeSize=72
[Task2:200] item_size1=32:AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
Rb size:128     free: 128       rptr: 40        freeptr: 40     wptr: 40
[Task1:301] freeSize=88
[Task2:301] item_size1=16:BBBBBBBBBBBBBBBB
Rb size:128     free: 128       rptr: 64        freeptr: 64     wptr: 64
[Task1:401] freeSize=64
[Task2:401] item_size1=40:CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
Rb size:128     free: 128       rptr: 112       freeptr: 112    wptr: 112
[Task1:501] freeSize=76
[Task2:501] item_size1=8:DDDDDDDD
[Task2:501] item_size2=20:DDDDDDDDDDDDDDDDDDDD



RingBufferのサイズの上限は160KByteです。
こ ちらにその理由が書かれています。
ESP32-WROVERの様にPSRAM(疑似SRAM)を持つモデルでは、RingBufferを4MByteまで拡張することができます。
WROVER自体は8MByteのPSRAMを持っていますが、RingBufferとして使えるのは4Mまでです。
理由はやはりこ ちらに書かれています。

4MのRingBufferを使うためには、xRingbufferCreateStatic()を使って領域を確保する必要が有ります。
分割許可(Allow-Split)のRingBufferは使うことができません。
以下がそのコードです。
RingBufferのハンドルさえアサインできれば、RingBufferの使い方は変わりません。
/* The example of ESP-IDF
 *
 * This sample code is in the public domain.
 */
#include <string.h>
#include "freertos/FreeRTOS.h"
#include "freertos/ringbuf.h"
#include "freertos/task.h"
#include "freertos/semphr.h"
#include "esp_heap_caps.h"
#include "esp_log.h"

//#define DYNAMIC
//#define STATIC
#define SPIRAM

#define DYNAMIC_BUFFER_SIZE             256
#define STATIC_BUFFER_SIZE              256
#define SPIRAM_BUFFER_SIZE              4000000          //32-bit aligned size

RingbufHandle_t xRingbuffer;


void app_main()
{
        /* Create Ring Buffer */
#ifdef DYNAMIC
        xRingbuffer = xRingbufferCreate(DYNAMIC_BUFFER_SIZE, RINGBUF_TYPE_NOSPLIT);
#endif
#ifdef STATIC
        //Allocate memory for ring buffer data structure
        StaticRingbuffer_t *buffer_struct = (StaticRingbuffer_t *)malloc(sizeof(StaticRingbuffer_t));
        //Allocate memory for ring buffer's data storage area
        uint8_t *buffer_storage = (uint8_t *)malloc(sizeof(uint8_t)*STATIC_BUFFER_SIZE);
        //Set your buffer type
        RingbufferType_t buffer_type = RINGBUF_TYPE_NOSPLIT;
        xRingbuffer = xRingbufferCreateStatic(STATIC_BUFFER_SIZE, buffer_type, buffer_storage, buffer_struct);
#endif
#ifdef SPIRAM
        //Allocate ring buffer data structure and storage area into external RAM
        StaticRingbuffer_t *buffer_struct = (StaticRingbuffer_t *)heap_caps_malloc(sizeof(StaticRingbuffer_t), MALLOC_CAP_SPIRAM);
        //Allocate memory for ring buffer's data storage area
        uint8_t *buffer_storage = (uint8_t *)heap_caps_malloc(sizeof(uint8_t)*SPIRAM_BUFFER_SIZE, MALLOC_CAP_SPIRAM);
        //Set your buffer type
        RingbufferType_t buffer_type = RINGBUF_TYPE_NOSPLIT;
        xRingbuffer = xRingbufferCreateStatic(SPIRAM_BUFFER_SIZE, buffer_type, buffer_storage, buffer_struct);
#endif

        /* Check everything was created. */
        configASSERT( xRingbuffer );

}

コンパイルするためにはいくつかCONFIG変数を設定する必要が有ります。
まず以下の画面で以下の項目を有効にします。


次に以下の画面でSPI RAMを有効にします。


最後にこの画面でSPI RAMの詳細を設定します。


いちいちメニューで設定するのは面倒なので、以下の内容でsdkconfig.defaultsを作っておけば、make defconfigで設定が反映されます。
#
# ESP32-specific
#
CONFIG_ESP32_SPIRAM_SUPPORT=y
#
# SPI RAM config
#
CONFIG_SPIRAM_BOOT_INIT=y
#CONFIG_SPIRAM_IGNORE_NOTFOUND=n
#CONFIG_SPIRAM_USE_MEMMAP=n
CONFIG_SPIRAM_USE_CAPS_ALLOC=y
#CONFIG_SPIRAM_USE_MALLOC=n
CONFIG_SPIRAM_TYPE_AUTO=y
#CONFIG_SPIRAM_TYPE_ESPPSRAM32=n
#CONFIG_SPIRAM_TYPE_ESPPSRAM64=n
CONFIG_SPIRAM_SIZE=-1
CONFIG_SPIRAM_SPEED_40M=n
#CONFIG_SPIRAM_SPEED_80M=y
CONFIG_SPIRAM_MEMTEST=y
CONFIG_SPIRAM_CACHE_WORKAROUND=y
CONFIG_SPIRAM_BANKSWITCH_ENABLE=y
CONFIG_SPIRAM_BANKSWITCH_RESERVE=8
#CONFIG_SPIRAM_ALLOW_BSS_SEG_EXTERNAL_MEMORY=n
#
# freeRTOS config
#
CONFIG_FREERTOS_SUPPORT_STATIC_ALLOCATION=y

ESP32-WROVERに書き込むと、ちゃんと4MのRingBufferを認識します。
但し、この設定のままPSRAM無しのESP32に書き込むとコアダンプします。
つまり、WROVERのときと、そうでないときで、sdkconfigを切り替える必要が有ります。




ESP32-WROVERの様にPSRAM(疑似SRAM)を持つモデルでは、PSRAMを使ってRingBufferを4MByteまで拡張す ることができますが、
PSRAM上に確保したRingBufferは書き込みのスピードがかなり遅いことが分かりました。

100KのRingBufferを確保して、64バイトのデータを100,000回書き込むのに要する時間を計測してみました。
RAM上にRingBufferを確保した場合、990mSecで終わりますが、PSRAM上にRingBufferを確保した場合、 2240mSec掛かりました。

画像データや音声データなどをRingBuffer経由でやり取りする場合は、パフォーマンスにかなり影響します。



さらにPSRAMを使う場合、一部のGPIOが使えない副作用が有ります。
なぜかPSRAMと一緒にGPIO16を使うとHALTするので調べてみたら、こ ちらに以下のような記載が有りました。

To connect the ESP-PSRAM32 chip to ESP32D0W*, connect the following signals:
PSRAM /CE (pin 1) > ESP32 GPIO 16
PSRAM SO (pin 2) > flash DO
PSRAM SIO[2] (pin 3) > flash WP
PSRAM SI (pin 5) > flash DI
PSRAM SCLK (pin 6) > ESP32 GPIO 17
PSRAM SIO[3] (pin 7) > flash HOLD
PSRAM Vcc (pin 8) > ESP32 VCC_SDIO

GPIO16とGPIO17はPSRAMの制御に使われます。
分かるまでメチャクチャはまりました。
PSRAMを使わないときは、GPIO16もGPIO17も普通に使えます。



こ ちらには以下の様に書かれています。

Ring buffers are a more memory efficient alternative to FreeRTOS queues in situations where the size of items is variable.

アイテムのサイズが可変である状況では、リングバッファはFreeRTOSキューよりもメモリ効率の良い代替手段です。

それでは固定長アイテムの場合、QueueとRingBufferはどちらが高速なのか調べてみました。

最初にQueueのパフォーマンスです。
uxQueueLength=100、uxItemSize=100(100バイトのデータを100件まで格納可能)のQueueを作成し、
100バイト固定長のデータを100,000回書き込んでみま した。
I (305) QueueRecv: Start
I (305) QueueSend: Start
I (305) MAIN: total_size(MALLOC_CAP_8BIT):321384
I (325) MAIN: total_size(MALLOC_CAP_32BIT):413484
I (325) MAIN: free_size(MALLOC_CAP_8BIT):190868
I (335) MAIN: free_size(MALLOC_CAP_32BIT):282932
I (705) QueueSend: elapsed time[ms]:390
I (705) QueueRecv: elapsed time[ms]:390 counter=100000

次にRingBufferのパフォーマンスです。
100000バイトの分割無しバッファを作成し、100バイト固定長のデータを100,000回書き込んでみま した。
I (307) RingRecv: Start
I (307) RingSend: Start
I (307) MAIN: total_size(MALLOC_CAP_8BIT):321384
I (327) MAIN: total_size(MALLOC_CAP_32BIT):410744
I (327) MAIN: free_size(MALLOC_CAP_8BIT):190700
I (337) MAIN: free_size(MALLOC_CAP_32BIT):280024
I (1767) RingSend: elapsed time[ms]:1440
I (1767) RingRecv: elapsed time[ms]:1440 counter=100000

メモリ消費量はRingBufferの方が少しだけ小さいですが、RingBufferの方が3倍以上遅い結果となりました。
固定長のデータをやり取りする場合は、Queueを使った方がいいです。

続く....