Multiple writers to a message queue

alainm3 wrote on Friday, December 08, 2017:

In this post about Stream buffers: RTOS Stream & Message Buffers there is an important warning:

“Uniquely among FreeRTOS objects, the stream buffer implementation (so also the message buffer implementation, as message buffers are built on top of stream buffers) assumes there is only one task or interrupt that will write to the buffer (the writer), and only one task or interrupt that will read from the buffer (the reader).”

This will break many of my applications as I regularly use one message queue to send information from many different sources (some interrups and some not) to a single Task for precessing !!!

Could someone explain it a bit more? And why this brings new and seemingly impossible restrictions?

rtel wrote on Friday, December 08, 2017:

The queue implementation has not changed. Queues can have multiple
readers and multiple writers. That has always been the case, and I
cannot see a time when that will change. That makes them weightier
‘generic’ objects that can be used in any number of use cases.

Stream buffers and message buffers are completely new features unrelated
to queues. If you use queues in your application, then the introduction
of new features that you are not using is not going to break anything.
Stream buffers and message buffers are much lighter weight objects than
queues in recognition of the fact that in a lot of use cases a lot of
the features of queues are not used. For example, when sending data
from an interrupt service routine to a task there is only one writer and
one reader, so the overhead of managing multiple readers and writers is
not required.

I’m interested to know why you thought the introduction of stream
buffers would break legacy code, that couldn’t be using them as they are
new in V10. It may indicate that our documentation is not as clear as
it needs to be and that some updates to clear up any confusion might
prevent others coming to the same conclusion as yourself.

richarddamon wrote on Friday, December 08, 2017:

I think the confusion is that people can think of a ‘Queue’ and a ‘Message Queue’ which sounds a bit like a ‘MessageBuffer’, so they think it is a change of behavior. I think the notification was writen fairly clearly, but I can understand the possible confusion.

I have a somewhat related question though, I can see the StreamBuffer as a useful way to implement the holding buffer for a serial port. The one issue would be that multiple tasks would want to be able to write to that serial port at various times. The mitigating factor is that the way I have structured things, any task that wants to write to the serial port, first aquires a mutex guarding that serial port, sends its message (maybe using a number of serial port driver calls), and then releases the mutex, so at any one time, the StreamBuffer (or currently the Queue) will only be accessed by a single task, the it doesn’t need to worry about handling multiple accesses at once. Looking over the code, I think that would be usable.

thomask wrote on Friday, December 08, 2017:

I have a somewhat related question though, I can see the StreamBuffer as a useful way to implement the holding buffer for a serial port.

I’m a little bit disappointed, that the StreamBuffer API seems to be copying bytes around.

I prefer a zero-copy solution, that can also be used as a target for serial port DMA transfers… here’s an example implementation:

https://github.com/thomask77/drquad32/blob/master/Source/ringbuf.h

My API was a little bit inspired by DirectX IDirectSoundBuffer, which has similar requirements:

https://msdn.microsoft.com/en-us/library/mt708923(v=vs.85).aspx

best regards,
Thomas.

rtel wrote on Friday, December 08, 2017:

Using a mutex means only one writer will access the buffer at a time
which is fine PROVIDED no more than one task blocks on the buffer.
Queues have lists of blocked tasks, whereas stream and message buffers
just have one. An assert will be triggered if a task attempts to block
when another task is already blocked.

rtel wrote on Friday, December 08, 2017:

I’m a little bit disappointed, that the StreamBuffer API seems to be
copying bytes around.

Read the “User Model: Maximum Simplicity, Maximum Flexibility . . .”
section of this page: Queues for task and interrupt message passing in FreeRTOS real time embedded software applications

Although that page discusses queues and not stream or message buffers,
the same principal applies.

Queue/buffer by copy allows you to use use either pass by copy (copy the
bytes) or pass by reference (zero copy, by passing a pointer to the
data). If you design to only pass by reference (zero copy) then you can
only do it in the more complex way rather than having the choice (plus
you can’t pass data across memory protection boundaries without created
exceptions, etc.).

alainm3 wrote on Sunday, December 10, 2017:

Hi, Richard Damon got it right, I just confused Message Queues with Message Buffers.
I am a regular user of FreeRTOS but not continuously, the last time that I read the manual was a few Years ago (there will more sometime…) so exact terminology gets a bit hazy…
It would be nice to rephase that page to avoid that confusion.

davidbrown wrote on Monday, December 11, 2017:

To get the kind of stream Richard Damon is looking for with his serial ports, you are wanting a multiple-writer, single-reader stream. That should be possible with a StreamBuffer and a Mutex for writing. Each writer takes the mutex, writes to the stream, then releases the mutex. You can’t get more than one task blocking on the StreamBuffer, because only one task can access it at the time. If the StreamBuffer is full (while waiting for the other side to pass the data on to the serial port) causing it to block, then the mutex won’t be released by the current writing task - and thus no other taks can try to write to it.

An alternative is to have a “multiplexer” task. This would be the reader for multiple StreamBuffers (or MessageBuffers might make more sense). It would read from one buffer at a time, and pass the data on to the serial port, using whatever priority mechanism you want to determine which buffer to read from next. This mirrors what you would do in hardware to solve the same problem.

marjonz wrote on Tuesday, November 27, 2018:

Has anyone gotton the StreamBuffer implementation working properly? I am running into the problem of losing bytes along the way:

  • I am using a mutex for the writer to the streambuffer because I can have multiple writers from different threads.
  • I breakpoint prior to writing to the streambuffer, checking that the bytes I am attempting to write is correct.
  • Then, I write to the streambuffer and the number of bytes written (return value of the write) is then checked after the write occurs. The number corresponds with the correct number of bytes being attempted to write.
  • In the main for loop of the thread, I then check the streambuffer if there are new bytes available in the streambuffer, and if there is, fetch it.
  • At this point, I get inconsistent results. I sometimes lose 4 bytes at the beginning of the previous byte stream I supposedly wrote.
  • I don’t have a mutex on the reader side, since I only have that thread as the reader of the stream buffer.

Can anyone please confirm if this happens to them? Thanks.

rtel wrote on Wednesday, November 28, 2018:

Do your writers ever block? Unlike queues, stream buffers are designed
for singe readers and single writers - so do not maintain a list of
blocked tasks as per the Queue implementation. Therefore if you have
multiple writers that have block times you could get into difficulties.

marjonz wrote on Wednesday, November 28, 2018:

@Richard Barry, I have a single writer, that is protected by a mutex. But the writer can be called from multiple threads, and I don’t think they are blocking. Here is the writer function:

BaseType_t serial_write(uart_channel_t channel, const char * const out_string, uint16_t string_length)
{
    BaseType_t return_value = pdFAIL;
    BaseType_t sem_take = xSemaphoreTake (mutex_list[channel],SERIAL_GENERIC_WAIT);
    if (sem_take == pdPASS)
    { // Try to place the character in the streambuffer
        if (xStreamBufferSpacesAvailable(uart_tx_buffer[channel]) > string_length)
        {
            return_value = send_bytes_to_streambuffer(uart_tx_buffer[channel],
                                                      out_string, string_length);
        }
        xSemaphoreGive(mutex_list[channel]);
    }
    else
    { // retry
    }
}

where send_bytes_to_streambuffer() is 

static BaseType_t send_bytes_to_streambuffer(StreamBufferHandle_t streambuffer,
                                                      const void *tx_data,
                                                      size_t tx_data_length_in_bytes)
{
    BaseType_t return_value = pdFAIL;
    size_t bytes_written = xStreamBufferSend(streambuffer,
                                                       tx_data,
                                                       tx_data_length_in_bytes,
                                                       0);
    if (bytes_written >= tx_data_length_in_bytes)
    { // bytes sent to stream buffer
        return_value = pdPASS;
    }
    return return_value;
}

marjonz wrote on Sunday, December 02, 2018:

I think I got it sorted, took out most calls to enter/exit the critical section of the code in other places.