Storing Data Packets from 3 different threads in the same queue

Hello all,

I want to ask if its possible to have 3 threads that send out packets, around 1536 bytes (8 bit) to the same queue (or buffer). Is the preferred method to use queues as they are thread safe? I see the mention of using queues for messaging between threads, this does not sound suitable for moving data to a joint buffer in this case.

Thank you for your time.

yes, queues are by definition thread safe. The only concern in your case is that with a payload size of 1536 bytes (these look like network buffers to me - suspicious size) you may need to pass pointers to the buffers around instead of the buffer payload itself, and the access to the buffers themselves then of course needs to be explicitly synchronized between threads.

1 Like

Thank you for your answer, Id like to store the entire data of the pointer into the queue so that it creates a copy of this data and so that it wont be overwritten, however when calling queuesend it seems that it is only storing the pointer so that when I call queuerecieve it sends me back the most recent data, not the data at the point the pointer was enqueued. What is the fix to this?

we would need to see your code to correct it.

1 Like

Sure, I cant share the entire code but I can create a similar example :

TxQueue = xQueueCreate(5,sizeof(uint8_t*));

uint8_t *TxBufferPtr;
uint8_t *TempBufferPtr;

if(xQueueSend(TxQueue,(void*)&TxBufferPtr,(TickType_t)10) != pdPASS){
     if(xQueueReceive(TxQueue,&(TempBufferPtr),(TickType_t)10)==pdPASS){
          *print TempBufferPtr data*

Im basically creating the queue adding a packet and checking if the queue is full, once full I want to receive and print the first item I sent to the queue. However what I am actually getting is the most recent item that I have sent. I want to store the data instead of just the pointer to the data.

well, first of all you have to create the queue with 5 entries of size 1536 instead of pointer size so the queue can store all the data, and then change the xQueueSend and xQueueReceive calls to not put the pointer to the data in the queue but the data itself. This is so basic C code that you should be able to code that out yourself.

Be aware, however, that doing it this way means that a) you lose up of lot of storage (does your target have the spare memory?) and you have a lot of cpu cycle overhead copying the data back and forth; also, if your source is a DMA network ring buffer, you must ensure to follow the access rules ahile copying.

That is exactly why zero-copy is a fairly hot concept in RTOSs.

1 Like

When I’ve needed to so something like what the OP is trying to do, I set up a pool of buffers for each task that is going to post messages to the queue. I create a “return” queue for each of the sending tasks, and have a small header on each buffer that I store the return queue ID/Address in.

During start-up, I populate the header with the return queue ID/address and post all of the buffers for each of the tasks to their own return queue.

During operation, the sending tasks reserve a buffer by reading their return queue (if it’s empty, the task may block or not, depending on how you code it), they fill the buffer, post it to the common send queue. The receiver retrieves the buffer pointer for the next pending message from the send queue, does whatever it’s going to do with it, then posts the buffer back on the return queue specified in the header.

This way, you have a zero-copy transfer, the queue size isn’t huge, and you have a mechanism to put back-pressure on the senders so that some fairness can be implemented when the senders have different priorities. Buffers can be forwarded from task to task, and so long as they eventually get returned to the originator once the data in the buffers no longer needs to be accessed, it works well.

I’ve done this under several bare-metal executives like FreeRTOS, Atomthreads, and others, and it works quite well.

3 Likes

Hi daniel,

Yes this sounds like the best approach, for now Ive implemented the copy approach (not enough time to change stuff now), thanks alot for your input.