Sending variable sized buffers between tasks

Evening,

I’m considering a couple of approaches to addressing my use case, and have read as many forum posts as possible, as well as the relevant sections in the FreeRTOS books and API guides. Before posting some example code, I’ll outline the use case below:

  • I have a class, let’s call it classA, which has an interrupt handler, and a data processing task. When an interrupt occurs, data is packaged (very quickly) and sent to the data processing task by way of a queue. The size of the packaged interrupt data is known and constant, so the use of a queue here is relatively straight forward.

  • The data processing task produces an additional package of data, which is also known and constant, which it sends to a second class, let’s call it classB, also by way of a queue. Up to this point, everything works exactly as expected.

  • The intent is for classB to eventually send the received data to my TCP class for transmission. I’ve got this process working perfectly when I read and send each queue item as it arrives, but this isn’t particularly efficient, as the size of each queue item (30 bytes) is considerably smaller than the subsequent TCP overhead to justify sending it as a standalone payload each time.

  • Rather than classB reading the queue immediately and sending each queue item via TCP individually, I’d like to instead introduce a 10ms delay, and read the accumulated queue into a cache/buffer. From there, I can send the cache/buffer via TCP as a more substantial payload.

  • As mentioned above, I’ve read through the theory and options on variable sized queues. I’ve also looked at stream buffers and message buffers, but as I intend to use this approach for additional classes (i.e. many writers, single reader), stream/message buffers aren’t well suited.

  • I’ve created a struct, which will hold a few small pieces of metadata, as well as a pointer to the actual data to be transmitted via TCP. This allows for a variable payload to be ‘sent’ (i.e. via pointer) by way of a queue.

My general approach is as follows:

myIncludeFile.h (defining the queue struct):

typedef enum
{
    option1 = 1,
    option2,
    option3
} optionEnum_t;

typedef struct
{
    optionEnum_t optionEnum;
    uint16_t size;
    void *structData;
} queuePackage_t;

main.c (creating the queue):

myQueue = xQueueCreate(10, sizeof(queuePackage_t));

classB.c (reading from the classA queue and packaging myQueue):

static void sendData(void *pvParameters)
{
    queuePackage_t queuePackage;

    (void) pvParameters;

    for (;;) {

        vTaskDelay(pdMS_TO_TICKS(10));

        //Create a dynamic buffer, sized according to the number
        //of items in the classA queue multiplied by the known, fixed size of each item
        unsigned char dataBuffer[uxQueueMessageWaiting(classAQueue) * 30];

        //Receive the classA queue data into dataBuffer
        BaseType_t status = xQueueReceive(classAQueue, &dataBuffer, portMAX_DELAY);

        if (status == pdPASS) {

            //Package the data
            queuePackage.optionEnum = 1;
            queuePackage.size = sizeof(dataBuffer);
            queuePackage.structData = (void *) &dataBuffer;

            //Send the package to the TCP class for transmission
            BaseType_t sendStatus = xQueueSend(myQueue, &queuePackage, portMAX_DELAY);

        }
    }
}

The above approach shows some promise; the data I receive on the remote machine via TCP contains actual/valid data, but it also contains a lot of garbage. I’m really not sure about the dynamic dataBuffer approach, or how deallocation of the dynamic buffer is taking place. I suspect that the garbage I’m seeing is either due to incorrectly sized queue data or memory mismanagement?

Any guidance or input would be greatly appreciated! I’m using FreeRTOS 10.3.1 with heap_5 memory management.

Morning :wink:

Reading through your explanation I would imagine that stream buffers might be of interest to you. You can set trigger levels to unblock when you have sufficient data in the buffer to warrant sending the buffered data into your TCP stack.

As for the receiving garbage that is most likely because you’re only receiving and sending one message. You’re only filling the first “slot” in your bigger queue before sending.

If you really want to dynamically allocate buffer space for the maximum number of messages you receive from classAQueue, you will need to call pvPortMalloc().

But using queues will always result in one element being received/sent at a time.
Use either steam buffers or message buffers for this use case.

Morning Jean, thanks for your input.

I agree, the stream buffer approach does appear to make sense on paper, but where I’d like to get to eventually, is having a single queue which has data sent into it from a number of different tasks. The documentation regarding stream (and message) buffers makes it pretty clear that they are designed around the ‘one writer, one reader’ premise.

If I can make use of mutexes or critical sections, etc. to work around this I’d be more than open to it, I’ve just come across similar questions on the forum and the common feedback appears to be “pass a struct to the queue with a pointer to the data”.

The restrictions on stream/message buffers is one one thing can be waiting on each end of them AT A TIME, so if you mutex protect that access, it mostly works (just mostly as task priority determines who gets the mutex, not the message, so if a lower priority task gets there first, and gets the mutex, then a higher priority task comes to wait, it will be waiting on the mutex, and the lower priority task gets the message, not normally the issue)

The problem with your code above is that the buffer doesn’t survive long enough, living on the stack of the sending class, so gets reused time after time. Allocate that buffer with vPortMalloc (and then free it when done) and you shouldn’t have that problem.

Thanks for that Richard, that all makes sense, particularly the buffer lifetime issue. I’m not entirely sure how to go about implementing the pvPortMalloc() approach though. Do I pvPortMalloc() dataBuffer on each loop in classB, and then vPortFree() from within classA? Or do I pvPortMalloc() dataBuffer at the start of the sendData() function in classA and let the allocation persist throughout the loop iterations?

I’ve used the second approach elsewhere in my application, but that was with a fixed/known allocation size. As the amount of data received into dataBuffer will vary on each loop, I need to be able to size the allocation dynamically.

Amended code to reflect my understanding of the second approach:

static void sendData(void *pvParameters)
{
    queuePackage_t queuePackage;

    unsigned char *dataBuffer;
    dataBuffer = (unsigned char *) pvPortMalloc(SOME_GENERIC_SIZE??);

    (void) pvParameters;

    for (;;) {

        vTaskDelay(pdMS_TO_TICKS(10));
        //Capture the actual size of data being received
        uint8_t dataSize = uxQueueMessageWaiting(classAQueue) * 30;

        //Resize the memory allocation to suit the actual dataSize??

        //Receive the classA queue data into dataBuffer
        BaseType_t status = xQueueReceive(classAQueue, &dataBuffer, portMAX_DELAY);

        if (status == pdPASS) {

            //Package the data
            queuePackage.optionEnum = 1;
            queuePackage.size = sizeof(dataBuffer);
            queuePackage.structData = (void *) &dataBuffer;

            //Send the package to the TCP class for transmission
            BaseType_t sendStatus = xQueueSend(myQueue, &queuePackage, portMAX_DELAY);

        }
    }
}

Is the above on the right track? If so, how do I manage memory sizing on each iteration?

Well, the first obvious issue is that you program is only calling xQueueReceive once, so you haven’t gotten all the data to send.

The call to pvPortMalloc should be after the call to get dataSize so you get a new buffer each time to put the data in. As it is, you still only get a single buffer and reuse it each time.

Thanks again, I see the error of my ways now. I had mistakenly assumed that xQueueReceive() reads all the items in the queue, but I obviously need to call it once per item.

The pvPortMalloc call after dataSize makes sense as well. Does the vPortFree() call then take place in classA? I.e. pvPortMalloc() in classB to allocate the memory, pass the data by pointer to classA via the queue, classA reads and processes the data by pointer, then when it’s done it calls vPortFree()?

Yes, the receiver of the message needs to free it when it is done.

As a side note depending on your heap implementation it might be better to allocate a fixed max size buffer instead of varying sizes to avoid so called heap fragmentation
or manage a dedicated buffer pool yourself.