How to wait for Stream Buffer space?

Is there a way to wait for a StreamBuffer to have a certain amount of space available?

Currently, I’m using a FreeRTOS Queue for an internal packet-queue(packets from multiple sources are placed into this queue on receipt, then processed by a single thread). This isn’t very space-efficient, since most of our internal packets are small, and very few are MTU-sized. The format is Fixed-Length-Header(timestamp, Source-Thread-Indicator, …, payload size), Variable-Length-Payload

I’ve switched to a MessageBuffer, and that works nicely - small packets are now faster, but MessageBuffer requires constructing a contiguous buffer before sending it. If I do my own framing using a StreamBuffer, I can skip building a contiguous buffer, but I can run into a situation where a writer thread writes a header, then blocks on the payload and exceeds its timeout.

Is there a way to wait for a StreamBuffer to have a certain amount of space available? I think what I’d like writers to do, is this:

  1. Wait for StreamBuffer to have space for sizeof(FixedLengthHeader)+VariableLengthPayloadLength
  2. xStreamBufferSend(sb, FixedLengthHeader, sizeof(FixedLengthHeader));//This will not block, as there is space in the queue.
  3. xStreamBufferSend(sb, VariableLengthPayload, VariableLengthPayloadLength);//This will not block, as there is space in the queue.

(edited from original post) I tried something like this, but once the queue is unable to fit a whole new packet, it delays the writer an entire OS tick, which is excessive for this application:

size_t wait_sz = sizeof(FixedLengthHeader)+VariableLengthPayloadLength;
while( wait_sz > SB_LEN - xStreamBufferBytesAvailable(sb)) {
    vTaskDelay(1);
}

There was a related, interesting thread a few days ago:

Is this what you’re looking for ?

Similar, but that’s on the reader-side.

What I’d like to do is block a writer until a specified about of space is available, or a timeout is reached.

Ups … mixed up space with data. My bad.
In case your mult. sources are mult. tasks it might be a problem that stream/message buffers are single-writer mechanisms :thinking:

In case your mult. sources are mult. tasks it might be a problem that stream/message buffers are single-writer mechanisms :thinking:

I use a mutex to serialize packet writes to my MessageBuffer(or future StreamBuffer). It does complicate the time-out handling.

Since a StreamBuffer can only have a single writer and a single reader, the writer getting blocked waiting for space isn’t a problem. If you really don’t want to send a message unless there is space, you could first check with xStreamBufferSpacesAvailable() before starting the send (and perhaps wait a tick to try again). You could also wait a tick or until a custom callback set with xStreamBufferCreateWithCallback tells you the receiver has read a buffer…

It’s a different problem - one of my writers is parsing sensor data over a fast UART and submitting packets to the queue. It cannot block for very long without the UART getting out of sync, so it needs a way to specify a short timeout - it’s better for this application to drop the whole sensor packet than lose sync on the UART. The sensor will send a new packet quickly anyways.

If you really don’t want to send a message unless there is space, you could first check with xStreamBufferSpacesAvailable() before starting the send (and perhaps wait a tick to try again).

This seems like a cleaner version of the workaround I tried using xStreamBufferBytesAvailable()- but I don’t want to block writers waiting for the next OS tick. In many cases, the reader thread will have fully drained the queue by then.

If you can use ANY timeout, then you could use this system, if you can’t allow yourself to wait for even one tick, then you can’t use “timeout”.

Also, I always have my UARTs receiving in a ISR and putting the data into a buffer of some form. That gives you a bit of buffering in to avoid the problem.

If you can’t afford a 1 tick timeout, then your transmitting side just skips sending a packet if the isn’t room for the message.

That is why I suggested using xStreamBufferCreateWithCallback, then the writing side could wait on a signal (maybe with a 1 tick timeout) but as soon as the reader reads a buffer, it signals the writing side to wake up. This could be using a semaphore, or the callback might just use a tasknotification that the writing side waits on (with the 1 tick timeout).

We are too :slight_smile: . There’s other tradeoffs balancing queue memory with ringbuffer memory.

It’s not that I can’t afford a 1 tick timeout sometimes, it’s that I really don’t want to accept the worst-case RTOS tick latency for all writers every time the queue isn’t prepared to accept a full packet.

I do think this would work, and meet my goals. It took me a bit to figure out how to communicate the wakeup level back and forth. For a bit I was thinking about a counting semaphore for the buffer level, but there’s no multi-increment/multi-decrement.

That said, it looks like xStreamBufferSend() works internally this way already. If we ever wanted FreeRTOS to have this functionallity directly for StreamBuffers, we could probably refactor xStreamBufferWaitForSpace() out of the top 2/3 of xStreamBufferSend().

Also, nice to meet you, another Richard - thank you for your help with this.

I think you hven’t looked at the ability to create a StreamBuffer with custom internal callbacks. If you have your writer wait for a notification when the buffer doesn’t have room, and then have the receive complete callback send a notification, then the writer will get a notification as soon as there is more room in the buffer, and avoid the waiting for the next tick. You don’t really need to communicate the wakeup level, as when you get the notification, you just test the space again and either send if room, skip the transmission if too much time has elapsed, or wait for another messgae to be removed.

Thinking about your actual problem, perhaps a less expensive alternative is to pre-allocate a single databuff of each type, with a flag. Each sensor when it gets new data, sees if its buffer is clear, and if so puts the new data in the buffer, and sets the flag to show it busy, then sends a notification to the receiving task. If the buffer is still busy, it just discards (or does what ever else needs to be done with the data, you do seem to want to allow for data to be lost). The receiving task starts by waiting for a notification, and when it gets one, checks the flags in an order that meets your preferences for data priority, and for each buffer that is marked busy, processes that data, then sets the flag to clear, and after checking all the flags, goes backs to the wait for notification.

This way there are ZERO bytes spent on communication buffers, just the data buffer that the transmitter was going to have anyway.

Maybe. I worry it’s going it’s going to trigger a context switch every time there is more room buffer, including times when there is not enough room in the buffer for the full transfer. I’ll have to try it and find out.

Yes, it will trigger a context switch every time a full message is delivered (if the writer has higher priority than the reader). The point is that if you are often running at that level, then you likely want the reader higher than the writers as that is your bottleneck, and my other method will also improve your efficiency.

Adding the size needed would help with the efficency, but you need to use care to make sure that you avoid races.