Stream Buffer vs. Message Buffer

Hello! I read through all documentation that I could find but still, the key difference between stream buffers and message buffers is unclear to me.

In the message buffer documentation I found the following sentence to contain the only meaningfull difference to the stream buffer documentation:

Unlike when using a stream buffer, a 10 byte message can only be read out as a 10 byte message, not as individual bytes.

But what does this mean? Does this mean that if I write this message of size 10 into the message buffer by means of xMessageBufferSend(), and then attempt to read letā€™s say 5 bytes using xMessageBufferReceive(), it will return some kind of error telling me that the message I attempted to read is longer so I cannot read it like this?

Or is the only difference that I cannot set a trigger level, nor see how many bytes are still available in the buffer, compared to when using stream buffers? But then whatā€™s the benefit of message buffers?

Please help me understand,
Best Regards!

Data is put into a Stream Buffer as a series of bytes and removed as a similar series. The ā€œbeginningā€ or ā€œEndā€ of a particular ā€œMessageā€ that is added to the buffer isnā€™t kept track of.

When you read out, you tell the buffer the maximum amount you can take, and the minimum you want to take (the trigger level), and if that much isnā€™t there, to wait a bit for more.

A Message Buffer puts the message into the buffer as a unified whole. You can only get one message off the buffer at a time, and only a whole message at a time.

You can think of Message Buffer as a Queue with a variable size object, where you can put/get one object per call (but the Buffer might be able to hold a number of messages).

A Stream Buffer is a Queue with a fixed 1 byte message, but you can put/get a series of them with each call.

3 Likes

Oh thanks! Reading your reply and re-checking the manual entry for xMessageBufferReceive(...) I noticed this bit:

> size_t xMessageBufferReceive( MessageBufferHandle_t xMessageBuffer, 
>                               void *pvRxData, 
>                               size_t xBufferLengthBytes, 
>                               TickType_t xTicksToWait );

xBufferLengthBytes
The length of the buffer pointed to by the pvRxData parameter. This sets
the maximum length of the message that can be received.
If xBufferLengthBytes is too small to hold the next message then the message will be left in the message buffer and 0 will be returned.

Somehow I overlooked the part that says that if you request too few bytes - less then the next message in the buffer, you get back nothing. Which makes sens if you treat messages as a whole thing.

Just to double check, is it safe to protect the message buffer access with two mutexes (one for reading and a different one for writing) to make it thread safe?
This way, reading and writing operations can potentially interrupt each other (even by different tasks). Although, writing/reading operations must always be finished, before a different task can perform the same operation. (I hope this makes sens as I try to explain it).

Best Regards,

Since stream/message buffers are provided by FreeRTOS the corresponding API is thread safe, of course. There is no need for additional (redundant) mutex protection.

I think you might be mistaken here. Amongst every documentation about stream & message buffers that I found, this note stood out:

IMPORTANT NOTE: Uniquely among FreeRTOS objects, the stream buffer implementation (so also the message buffer implementation, as message buffers are built on top of stream buffers) assumes there is only one task or interrupt that will write to the buffer (the writer), and only one task or interrupt that will read from the buffer (the reader). It is safe for the writer and reader to be different tasks or interrupts, but, unlike other FreeRTOS objects, it is not safe to have multiple different writers or multiple different readers. If there are to be multiple different writers then the application writer must place each call to a writing API function (such as xStreamBufferSend()) inside a critical section and use a send block time of 0. Likewise, if there are to be multiple different readers then the application writer must place each call to a reading API function (such as xStreamBufferReceive()) inside a critical section and use a receive block time of 0.

The question is, if I can use different mutexes for read and write access.
And which mutex I must pend if just using xMessageBufferIsEmpty(..), xMessageBufferIsFull(..) or xStreamBufferSpacesAvailable(..).
The mutex that protects reading or writing - or both?

Ah ā€¦ ok, you want to access them from multiple different tasks. Missed that.

No problem. Do you know the answere if it is safe to do so with different mutexes for read and write, and which I shall use to secure the mentioned accesses to the message buffer?

I also would have to dig into the source to verify if an ā€˜optimizedā€™ r/w locking scheme is possible at all.
But Iā€™d prefer to stick to the documented locking mechanism also in case the internal implementation changes over time.

The documentation says that you must not let two sources try to write to the Buffer at the same time or two sinks read at the same time. If all the Sources or Sinks are tasks, then a Mutex could be used to protect the Buffer, and that would make sure that you never get the prohibited simultaneous access. If an ISR might be a possible source/sink, a Mutex wonā€™t work, then you need the critical section method.

The source side and the sink side are independent for this, and each can use which ever method works for it (if needed).

Note, using a Mutex for protection can cause a form of Priority Error, as a Higher priority task that reaches the mutex after a Lower Priority task already has it is stuck in line behind it, and doesnā€™t use it priority, unlike how a Queue would work, which gives the Queue to the highest priority task waiting. This is precisesly the sort of case described in the Documentation that buffers do not support.

The one advantage of the ā€œCritical Sectionā€ method is it sort of gets around this, because once in the critical section, a Higher Priority task canā€™t try to get to the Buffer, due to the critical section, but comes at the cost that you donā€™t block if the action canā€™t proceed, so you need to do something else to wait.

1 Like

@richard-damon Thanks for the clarification.
I will check further, but I think that the priority error you mentioned is acceptable for my application.


Sorry to bother you with some more.
While running my test code I noticed something strange:

  • I created a message buffer with the size 20
  • immediately afterwards I called the function xMessageBufferSpacesAvailable(..) which returned 19 bytes

I am sure that I did not write a message of size 1 before calling xMessageBufferSpacesAvailable(..).

It appears as if xMessageBufferSpacesAvailable(..) subtracts 1 to the size of available bytes, which indeed happens, according to the source code:

size_t xStreamBufferSpacesAvailable( StreamBufferHandle_t xStreamBuffer )
{
const StreamBuffer_t * const pxStreamBuffer = xStreamBuffer;
size_t xSpace;

	configASSERT( pxStreamBuffer );

	xSpace = pxStreamBuffer->xLength + pxStreamBuffer->xTail;
	xSpace -= pxStreamBuffer->xHead;
	xSpace -= ( size_t ) 1;                //<- It happens right here!

	if( xSpace >= pxStreamBuffer->xLength )
	{
		xSpace -= pxStreamBuffer->xLength;
	}
	else
	{
		mtCOVERAGE_TEST_MARKER();
	}

	return xSpace;
}

I could not find a hint that explains this behaviour in the manual.
Do you know the reason why itā€™s there?

Thanks for your time.

I think the lost of 1 byte is to allow the ability to distinguish between a Full and Empty Buffer, which without that reduction would both be represented by the read and write pointers pointing at the same spot. Since it would cost a byte to add a flag to distinguish, and adding the flag cost code, losing 1 byte out of the space is the cheapest solution.

As to the ā€œPriority Errorā€, yes, in most cases this isnā€™t important. In fact, I would consider it unusual for multiple tasks to be reading from a stream buffer at the same time (The lack of ā€œMessageā€ boundries makes it awkward) and for writing, I would normally expect a good design isnā€™t apt to ā€œfillā€ a StreamBuffer for long. I bring it up, because in some cases it IS important.

Sharing is actually seems a lot more common with MessageBuffers due to there ā€œcompleteā€ message promises.

@richard-damon Okay I understand the reasoning behind this. However I think that this is a bit of a poor solution as it can raise a lot of confusion during debugging, when seemingly bytes go missing. At least I would expect to read about that in the manual or the code documentationā€¦

Anyway, thanks again for your time.

Best Regards,

As I remember, the internal size is stored as 1 bigger than the size supplied, as is the buffer allocated, so if you create with 20, the internal size should say 21, and the SpacesAvail should then say 20, the original size given.

@richard-damon You are right, but apparently this size adjustment is only applied when using xStreamBufferGenericCreate(..). When using the version that takes a statically allocated buffer xStreamBufferGenericCreateStatic(..), the size adjustment is missing.

StreamBufferHandle_t xStreamBufferGenericCreate(...)
{
        ...

		/* A stream buffer requires a StreamBuffer_t structure and a buffer.
		Both are allocated in a single call to pvPortMalloc().  The
		StreamBuffer_t structure is placed at the start of the allocated memory
		and the buffer follows immediately after.  The requested size is
		incremented so the free space is returned as the user would expect -
		this is a quirk of the implementation that means otherwise the free
		space would be reported as one byte smaller than would be logically
		expected. */
		xBufferSizeBytes++;
		pucAllocatedMemory = ( uint8_t * ) pvPortMalloc( xBufferSizeBytes + sizeof( StreamBuffer_t ) ); /*lint !e9079 malloc() only returns void*. */

		if( pucAllocatedMemory != NULL )
		{
			prvInitialiseNewStreamBuffer( ( StreamBuffer_t * ) pucAllocatedMemory, /* Structure at the start of the allocated memory. */ /*lint !e9087 Safe cast as allocated memory is aligned. */ /*lint !e826 Area is not too small and alignment is guaranteed provided malloc() behaves as expected and returns aligned buffer. */
										   pucAllocatedMemory + sizeof( StreamBuffer_t ),  /* Storage area follows. */ /*lint !e9016 Indexing past structure valid for uint8_t pointer, also storage area has no alignment requirement. */
										   xBufferSizeBytes,
										   xTriggerLevelBytes,
										   ucFlags );

StreamBufferHandle_t xStreamBufferGenericCreateStatic(...)
{
        ...

		/* In case the stream buffer is going to be used as a message buffer
		(that is, it will hold discrete messages with a little meta data that
		says how big the next message is) check the buffer will be large enough
		to hold at least one message. */
		configASSERT( xBufferSizeBytes > sbBYTES_TO_STORE_MESSAGE_LENGTH );

		#if( configASSERT_DEFINED == 1 )
		{
			/* Sanity check that the size of the structure used to declare a
			variable of type StaticStreamBuffer_t equals the size of the real
			message buffer structure. */
			volatile size_t xSize = sizeof( StaticStreamBuffer_t );
			configASSERT( xSize == sizeof( StreamBuffer_t ) );
		} /*lint !e529 xSize is referenced is configASSERT() is defined. */
		#endif /* configASSERT_DEFINED */

		if( ( pucStreamBufferStorageArea != NULL ) && ( pxStaticStreamBuffer != NULL ) )
		{
			prvInitialiseNewStreamBuffer( pxStreamBuffer,
										  pucStreamBufferStorageArea,
										  xBufferSizeBytes,
										  xTriggerLevelBytes,
										  ucFlags );

On first thought this makes sense, as FreeRTOS cannot decide by itself which size to actually allocate if the buffer is ā€œalready thereā€. Although, probably it could just store the bigger buffer size even though the actual size isnā€™t bigger as it knows that it will never access this extra byte anyways.

Maybe this would be considered bad programming style to ā€œfakeā€ a buffer size that is not there.
However, I guess it also could be considered bad style that the free bytes count changes, depending if the static version of creation is used or notā€¦

Best Regards,

If FreeRTOS doesnā€™t increment the count, that would be a bug. The documentation says the supplied buffer needs to be size+1 in bytes long, and it WILL access that byte (assuming the size is incremented). There will always be at least one not used byte in the buffer, but it will be between the write pointer and the read pointer, and not necessarily at the end of the buffer.

1 Like

@richard-damon Okay, yeahā€¦ I guess that this might really be a bug then.

  • The code does not show signs of this size increasement, Iā€™ve checked the newest FreeRTOS commit on GitHub inside trunk
  • The behaviour matches the expectation - if you try to read the size of free bytes right after initialization it returns ā€˜original sizeā€™ -1 byte, which demonstrates that the size variable has not been increased

Iā€™d be great though if somebody could confirm this behaviour.
How do I proceed? Should I report this bug somewhere?

I would go to the Github repository and file a bug report.

Can you describe what are you thinking the bug is? The following is from this page -

pucStreamBufferStorageArea
    Must point to a uint8_t array that is at least xBufferSizeBytes + 1 big. This is the array to which streams are copied when they are written to the stream buffer.

That means is that if you want to create a stream buffer of size 1000, you need to supply the buffer pucStreamBufferStorageArea of size 1001. When you query the size of the buffer, you will get 1000.

Which of the above behavior you do not see?

His claim is that if you call xStreamBufferCreateStatic, with a size of 1000, it will call xStreamBufferGenericCreateStatic with a size 0f 1000, never adjust that size and call prvInitialiseNewStreamBuffer with a size of 1000, so the stream buffer will only use 1000 bytes of that input buffer, and report 999 bytes available.

But, if you call xStreamBufferCreate with a size of 1000, it will call xStreamBufferGenericCreate with a size of 1000, and that will increment the buffer size by 1 before calling pvPortMalloc to get 1001 bytes and then call prvInitialiseNewStreamBuffer with a size of 1001.

This is true but it is the expected and documented behavior. When we allocate the buffer in case of xStreamBufferCreate, we allocate one extra byte. When the user supplies the buffer in case of xStreamBufferCreateStatic, we ask them to supply the buffer of one byte extra length. Here is what the documentation says -

pucStreamBufferStorageArea
    Must point to a uint8_t array that is at least xBufferSizeBytes + 1 big. This is the array to which streams are copied when they are written to the stream buffer.

Here is the example code on the same page:

/* Used to dimension the array used to hold the streams. The available
 * space will actually be one less than this, so 999. */
#define STORAGE_SIZE_BYTES 1000

/* Defines the memory that will actually hold the streams within the
 * stream buffer. */
static uint8_t ucStreamBufferStorage[ STORAGE_SIZE_BYTES ];