Task switching: instant generator task with some dynamic consumer tasks

There is a generator task task_gen that fills some buf with data. buffer is filled character-by-character, so at single moment of time, buf may be in incomplete state.

For simplification, this is 9-char buffer which is filled with 9 equal letters. buffer complete state occurs when all characters (except trailing \0) are same. Other states are incomplete:

AAA AAA AAA \0    // Complete state
BAA AAA AAA \0    // Incomplete state
BBA AAA AAA \0    // Incomplete state
BBB AAA AAA \0    // Incomplete state
...
BBB BBB BBA \0    // Incomplete state
BBB BBB BBB \0    // Complete state
...
ZZZ ZZZ ZZZ \0    // Complete state
AZZ ZZZ ZZZ \0    // Incomplete state
...
AAA AAA AAZ \0    // Incomplete state
(loop)
Code of `task_gen`
uint8_t buf[10]; // shared buffer

/// @brief Periodically fills global "buf" with equal characters, one by one.
/// "buf" is considered to be correct when all characters are equal
/// @param pvParams must be NULL
void task_gen(void *pvParams)
{
  uint8_t start_val = 'A',  end_val = 'Z', val = start_val;
  for(;;)
  {
    vTaskDelay(pdMS_TO_TICKS(esp_random() % 1000));
    Serial.printf("%s Generating new line\n", pcTaskGetName(NULL));
    for(int i = 0; i < (sizeof(buf) - 1); i++)
    {
      buf[i] = val;
      vTaskDelay(pdMS_TO_TICKS(esp_random() % 50));
    }
    // !!! THIS PLACE is where "buffer complete".
    Serial.printf("Line generated:  %s\n", buf);
    
    val++;
    if (val > end_val) val = start_val;
  }
}

There are two identical receive tasks: task_rcv1 and task_rcv2 implemented using function task_rcv. task_rcv sleeps for some time and when wakes, it must read exact number of consecutive buffers when they are in complete state and loop.

Code for `task_rcv`
SemaphoreHandle_t gen_rcv_mtx;

/// @brief periodically receives few _consecutive_ values from "buf"
/// @param pvParams pointer to int with number of consecutive correct buffers must be read.
void task_rcv(void *pvParams)
{
  int count = *((int*)pvParams);
  Serial.printf("task with count %d started\n", count);
  for(;;)
  {
    vTaskDelay(pdMS_TO_TICKS(esp_random() % 10 * 500));

    if (xSemaphoreTake(gen_rcv_mtx, portMAX_DELAY) == pdTRUE)
    {
      for (int i = 1; i <= count; i++)
      {
        // !!! Here wait for buffer complete state and suspend task_gen
        Serial.printf("%s  %d/%d : %s\n", pcTaskGetName(NULL), count, i, buf);
        vTaskDelay(pdMS_TO_TICKS(esp_random() % 100));
        // !!! Here wake task_gen
      }
      xSemaphoreGive(gen_rcv_mtx);
    }
  }
}


Question is: How should I implement logic: task_rcv waits until buf is complete and suspends task_gen while processing buf?


Current output:
Line generated:  JJJJJJJJJ
task_gen Generating new line
Line generated:  KKKKKKKKK
task_gen Generating new line
task_rcv1  5/1 : LLLKKKKKK
task_rcv1  5/2 : LLLLKKKKK
task_rcv1  5/3 : LLLLLKKKK
task_rcv1  5/4 : LLLLLKKKK
task_rcv1  5/5 : LLLLLLLLK
Line generated:  LLLLLLLLL
task_gen Generating new line
task_rcv2  3/1 : LLLLLLLLL
task_rcv2  3/2 : MMLLLLLLL
task_rcv2  3/3 : MMMMLLLLL
Line generated:  MMMMMMMMM
Wanted output:
Line generated:  KKKKKKKKK
task_rcv1  5/1 : KKKKKKKKK
task_gen Generating new line
Line generated:  LLLLLLLLL
task_rcv1  5/2 : LLLLLLLLL
task_gen Generating new line
Line generated:  MMMMMMMMM
task_rcv1  5/3 : MMMMMMMMM
task_gen Generating new line
Line generated:  NNNNNNNNN
task_rcv1  5/4 : NNNNNNNNN
task_gen Generating new line
Line generated:  OOOOOOOOO
task_rcv1  5/5 : OOOOOOOOO
task_gen Generating new line
Line generated:  PPPPPPPPP
task_gen Generating new line
Line generated:  QQQQQQQQQ
task_rcv1  3/1 : QQQQQQQQQ
task_gen Generating new line
Line generated:  RRRRRRRRR
task_rcv1  3/2 : RRRRRRRRR
task_gen Generating new line
Line generated:  SSSSSSSSS
task_rcv1  3/3 : SSSSSSSSS
task_gen Generating new line
Line generated:  TTTTTTTTT
task_gen Generating new line
Line generated:  UUUUUUUUU


Rest of the code:
TaskHandle_t task_gen_handler, task_rcv1, task_rcv2;
int count_task1 = 5;
int count_task2 = 3;  

void setup() {
  Serial.begin(115200);
  gen_rvc_mtx = xSemaphoreCreateMutex();
  buf[63] = '\0';
  memset(buf, 'Z', sizeof(buf) - 1);

  xTaskCreate(task_gen, "task_gen",  4096, NULL, tskIDLE_PRIORITY + 1, &task_gen_handler);
  xTaskCreate(task_rcv, "task_rcv1", 4096, &count_task1, tskIDLE_PRIORITY + 1, &task_rcv1);
  xTaskCreate(task_rcv, "task_rcv2", 4096, &count_task2, tskIDLE_PRIORITY + 1, &task_rcv2);

  Serial.println("Started");
  vTaskDelay(pdMS_TO_TICKS(1000));
}

void loop() {
  vTaskDelete(NULL);
}

I would have the “generator” task internally buffer the data until a “complete buffer” is generated, and then send that as a single composite whole.

If the buffers are fixed sized, and somewhat small, I would just send them to a queue defined as a queue of the size of a complete state.

If the buffers are highly variable is size, I might use a message_buffer.

If the buffers are large, I might manage the buffer storage myself, and send pointers to the buffers via a queue.

That way the receives only get “complete” buffers when they wake up. Otherwise, since you have multiple receivers, you are going to need to do some sort of mutex like interlock on the reading. Then a task could get the right to read, then read from the queue until it gets a “complete” buffer, then give up teh right to read, so the other task could get in.

If, as it seems from your code, the generator won’t try to do more with the buffer until after the reader is done, it could just signal a semaphore that the buffer is full, and the receiver waits on the semaphore. At that point, there really isn’t much need for two receivers as one can handle it (unless there is a longer action afterwords that doens’t need the buffer).

1 Like

So I modified this adding 1-element queue with pointer to buf:
task_gen checks if some task should await for message in queue using gen_rcv_mtx mutex using xSemaphoreGetMutexHolder
task_gen freezes until queue is empty, and task_rcv uses peek, processes data and resets queue (it is 1-element, so no data loss), then task_gen sends pointer to queue.

Could you please tell if it looks ok?

Code of `task_gen`
void task_gen(void *pvParams)
{
  uint8_t start_val = 'A',  end_val = 'Z', val = start_val;
  for(;;)
  {
    vTaskDelay(pdMS_TO_TICKS(esp_random() % 1000));
    for(int i = 0; i < (sizeof(buf) - 1); i++)
    {
      buf[i] = val;
      vTaskDelay(pdMS_TO_TICKS(esp_random() % 50));
    }
    // THIS PLACE is where "buf" is correct.
    Serial.printf("Line generated:  %s\n", buf);

    // ADDED BEGIN
    if (xSemaphoreGetMutexHolder(gen_rcv_mtx) != NULL) 
      {
        void *buf_p = &buf;
        xQueueSend(gen_output_queue, &buf_p, portMAX_DELAY);
      }
    // ADDED END

    val++;
    if (val > end_val) val = start_val;
  }
}
Code of `task_rcv`
void task_rcv(void *pvParams)
{
  void *buf_rcv = NULL;
  int count = *((int*)pvParams);
  Serial.printf("task with count %d started\n", count);
  for(;;)
  {

    vTaskDelay(pdMS_TO_TICKS(esp_random() % 10 * 500));
    if (xSemaphoreTake(gen_rcv_mtx, portMAX_DELAY) == pdTRUE)
    {
      for (int i = 1; i <= count; i++)
      {
        // REPLACED LOOP BEGIN
        xQueuePeek(gen_output_queue, &(buf_rcv), portMAX_DELAY);
        Serial.printf("%s  %d/%d : %s\n", pcTaskGetName(NULL), count, i, buf_rcv);
        vTaskDelay(pdMS_TO_TICKS(esp_random() % 100));
        xQueueReset(gen_output_queue);
        // REPLACED LOOP END
      }
      xSemaphoreGive(gen_rcv_mtx);
    }
  }
}

I’m afraid of situation where between task_rcv::xQueueReset and task_rcv::xSemaphoreGive may happen task switching, task_gen::xQueueSend will produce two codes and i’ll get locked task_gen::xQueueSend until some task will clear the queue. How to get rid of it ?

You seem to be inventing complexity (perhaps because you haven’t actually sat down to define the exact problem you are trying to solve) and ignoring some conditions that can occur.

First, since you have only a single buffer, and the writer can’t start the next write until the reader is done, you need something to block the writer until the buffer is ready. The simple solution is to have a semaphore (that starts ‘ready’) that the writer takes before generating the next block of data, and the readers give again when they are done.

If you want the writer to be able to start building its buffer before the reader is done, then you need separate buffers, either by copying the buffer into a queue, or having multiple buffers that the writer uses and some way for it to know which to use.

The writer can then just give a semaphore to tell the readers there is new data available, and one of them will take it, process it, then give the other semaphore to let the writer continue.

Note sure what your get_rcv_mtx is for, and why the writer cares about it. It seems to be just to make a reader process N messages in a row, which shouldn’t concern the writer.

The writer testing it just says that it might throw away data. If the idea was to allow it to discard data, then perhaps what you need to do is have the writer try to take the mutex, and if it succeeds, reset the queue and give the mutex back, and if it fails post the next data.