Stream buffer doesn't seem to handle receiving bytes before call to xStreamBufferReceive()

I’m using the stream buffer to do serial I/O. The device I’m talking to is a command/response type device, so typically we’ll transmit a message, then call xStreamBufferReceive to receive the response. Our stream buffer is setup to receive before we transmit so we can catch any data that comes during transmission, which can happen if our device detects an error in our command.

In stream_buffer.c, xStreamBufferReceive():

        if( xBytesAvailable <= xBytesToStoreMessageLength )
        {
            /* Wait for data to be available. */
            traceBLOCKING_ON_STREAM_BUFFER_RECEIVE( xStreamBuffer );
            ( void ) xTaskNotifyWait( ( uint32_t ) 0, ( uint32_t ) 0, NULL, xTicksToWait );
            pxStreamBuffer->xTaskWaitingToReceive = NULL;

            /* Recheck the data available after blocking. */
            xBytesAvailable = prvBytesInBuffer( pxStreamBuffer );
        }

This if checks if xBytesAvailable, the received number of bytes, is less than xbytesToStoreMessageLength. This looks wrong. as we want to receive the number of bytes specified we specified by setting xStreamBuffer->xTriggerLevelBytes.

If we haven’t received any bytes before we call this function, it works because xBytesAvailable is 0, and 0 is <= 0 (xBytesToStoreMessageLength is 0 since we’re using the stream buffer directly, not via message buffers). However, when we received bytes during transmission, xBytesAvailable is > 0, so this condition is false, and xStreamBufferReceive() returns right away before the complete response is received.
Am I understanding the logic correctly, or am I missing something, or using stream buffers incorrectly? Thanks.

I think this is the intended behavior. That value you pass is the MAXIMUM number of character to receive, not the “expected number.

Look at the xStreamBufferSetTriggerLevel function to set the minimum number of characters that need to be received before the function returns without the full delay.

Hello @richard-damon. Thank you for responding. I’m not sure what value you’re referring to when you say “value you pass”.

size_t xStreamBufferReceive( StreamBufferHandle_t xStreamBuffer,
                             void * pvRxData,
                             size_t xBufferLengthBytes,
                             TickType_t xTicksToWait )

I imagine you’re referring to xBufferLengthBytes, which is our buffer length, which is quite large in our case (512). I do call xStreamBufferSetTriggerLevel before starting the transaction. In this case, we set it to 6 as we expect 6 bytes in response. In my modification to get the behaviour I expect, I change the if to:

if( xBytesAvailable <= xStreamBuffer->xTriggerLevelBytes)

xBytesToStoreMessageLength is set by xStreamBufferSetTriggerLevel ( ). So even though I call this and set it to 6, xStreamBufferReceive() returns right away if there’s 1 byte in the stream buffer.

I don’t know if this will clarify anything, but here’s a scope trace on the serial lines.


We’re transmitting a packet (yellow trace), and start receiving a response before we’re done (bluish trace). xStreamBufferReceive() is called right after transmission is complete, so in this example there’s 2 bytes in the buffer. Since I expected a 6-byte response, I would expect xStreamBufferReceive() to wait xTicksToWait before returning, since it’s waiting to have 6 bytes. That’s how I understand the API documentation for stream buffers (as a new user, I’m not allowed to post a link). Instead, it return right away with whatever is in the buffer.

This isn’t a problem in the example trace shown, but there are other commands where we have received 1 of 2 bytes when xStreamBufferReceive( ) is called (again, expecting a 6-byte response but the device is throwing an error code and responds before transmission is complete), so in that case, we’re not getting a complete message.

Looking closer, first, the comment about xBytesToStoreMessageLength isn’t applicable, since that will always be 0 for a normal StreamBuffer, it gets set for MessageBuffers which reuse the code (look at the top of the function).

As to later in the code that reads from the buffer, I would agree that the concept of the trigger level would seem to say that it should wait for that many bytes, but the code (and the wording) says it only waits for that many bytes if there are no bytes in the buffer when the reading starts.

My suggestion for what you are doing is that the outgoing message should be just STARTED by the task, and then the later bytes be sent by the servicing ISR, and then it immediately starts the read, so the buffer WILL be empty to start, and then you will wait till at least the trigger level bytes.

That, or the reading code can see if it looks like a complete message, and if not, do a second (or more) read for the remaining time and the remaining characters.

That’s a plausible work-around, but I don’t know why the the stream buffer needs to be empty to enforce the trigger level. But, the API does seem to indicate this.

The amount of data that must be in the stream buffer before a task that is waiting for data is removed from the blocked state is called the stream buffer’s Trigger Level. For example:

  • If a task is blocked on a read of an empty stream buffer that has a trigger level of 1 then the task will be unblocked when a single byte is written to the buffer or the task’s block time expires.*

  • If a task is blocked on a read of an empty stream buffer that has a trigger level of 10 then the task will not be unblocked until the stream buffer contains at least 10 bytes or the task’s block time expires.*

If a reading task’s block time expires before the trigger level is reached then the task will still receive however many bytes are actually available.

So maybe it is as intended, but it seems strange.
Thanks again, @richard-damon.

This implies, and seems to be what is programmed. that the Trigger Level only applies IF the task “waits for data”, and it only does that if there is no data available.

A major reason for the trigger level is to minimize the overhead of switching to the task when there isn’t much data to process. There is no overhead in NOT blocking if you have data, so the design seems to give the data right away. The buffers aren’t really optimized for receiving “packets” of messages, where you know the length you are expecting to get up front, but more for processing data streams that you process as they come.

If I was doing a lot of work with that sort of message, I would be using a wrapper around the StreamBuffer that fetches and sees if it got a full message (presumably you know the format so can tell if you had a complete message) and if not get some more into the buffer.

We know how many bytes to receive normally, but if an error is detected we’ll get a different length.

We did use a wrapper, which calls xStreamBufferReceive() to get responses byte-by-byte. If we get the expected response length, or it exceeds xTicksToWait, we deem the response complete.

Thanks again, @richard-damon. I appreciate the help.

Since in your case there won’t be a message following that you need to be careful of not accidentally pulling the front off, you don’t need to go character by character but can ask for the full message, and if short (and not an error message) ask for more until you time out or get the full message.

Asking for the full message is a good idea. I’ll deem the message complete when either I get all the expected bytes or we reach the timeout.

I don’t want to parse the message contents at this level, if I can help it. I may lose a few ms waiting for the timeout when I could know I have a complete message, though smaller than I expect, but that’s’ a small price to pay to keep application level details at a higher level.

The question comes how easy is it to detect a “full” message. If the protocol includes a unique end of message character (like carriage return) or a length code, then low level detection of end of message works. If it doesn’t then, yes, you want to get to timeout (or max length) and then pass to higher levels.