How to insert multiple items in a Queue?

fojtik wrote on Friday, May 28, 2010:

Is there any effective way, how to insert  many messages into a Queue?

I have found only a way to call “xQueueSendToBack” repeatedly. But this causes some overhead, because implementation of
“xQueueSendToBack” is quite complex.
    On the other side the Queue is emptied from IRQ char by char and it is convenient to have message with size 1 Byte.


/** Stream send buffer. Function sends buffer via stream defined.
@param PStream pointer to a stream handle.
@param pbuffbegin begin of buffer to send.
@param bufflen length of buffer to send.
@return 0 / -err. */
INT32 SendBlock(TStream *PStream, const UINT8 *pbuffbegin, UINT16 bufflen)
  if(PStream==NULL) return -1;
  if(PStream->Tx_QueueHandle==NULL) return -2;
    if(xQueueSendToBack(PStream->Tx_QueueHandle,pbuffbegin++,clib_STREAM_OSqueueTxTicksToWait) != pdPASS )
          /// @TODO: unsuccessfull sending of whole buff - do an action


richard_damon wrote on Friday, May 28, 2010:

I don’t know of a faster way, if you really are going to use a queue and don’t want to write your own xQueueSendToBackMultiple function. As Richard has pointed out, the demos do this because it is easy, and also somewhat because it is a bit inefficient so it stresses the system a bit better.

If you want to improve the transfer speed, you can probably do better with mutex to acquire access to a plain circular buffer and maybe a semaphore to handle waiting if the buffer fills. (And if only 1 task will be writing to the buffer the mutex can be eliminated).

A second method is to actually send pointers to buffers of data to the queue, and the interrupt pops the pointer off and remembers it, drawing out of the buffer until empty, then returning the buffer back to the program and getting the next buffer from the queue.

I will point out that your implementation has a few weaknesses,  If multiple tasks send to the same queue, their outputs can get intermixed (might not be a problem in your given case, but is in general, you may want to wrap it with a mutex), and that the queue needs to be as long as the longest message that might ever be sent (or you will fill the buffer and block till timeout and then error return without calling your ActivateTransfer function), and without the mutex, it needs to be longer to handle rare case where one task is sending a message, and just before putting in the last character it gets swapped out, and the next task sends more data, and so on (it can be hard to figure the real exact maximum that might get cued up).

fojtik wrote on Friday, May 28, 2010:

The big advantage of RTOS Queue is that somebody solved putting messages from task and receiving messages within IRQ.

My particular queue usage serves for logging messages to a serial line.
Locking of queue would damage real time process. If there is no space in a queue, then it is better to thrash message.

xQueueSendToBackMultiple would help. But I am afraid that I will need to rewrite nearly whole RTOS core functionality.

Yes, messages could get intermixed, but it is still better than nothing. I am using around 512 bytes of queue and I hope that it would be sufficient enough for minimally 5 messages. xQueueSendToBackMultiple would solve even this. When the queue
has enough free space, it would copy everything into a FIFO.

rtel wrote on Friday, May 28, 2010:

As richard_damon says, placing characters in a simple circular buffer, then using a semaphore to indicate when a complete message has arrived is a very common technique.  Circular buffer implementation can be very fast, and provided the read pointer is only modified from a task, and the write pointer is only modified from the interrupt, then there is normally no need for any locking.

Another common technique is to use a dma to move data around without  cpu overhead.  Then you can use a single dma interrupt to give a semaphore to unblock a task that is waiting for the data.


richard_damon wrote on Saturday, May 29, 2010:

I think you may find less impact than you think.  If you make the mutex aquire have a timeout of 1, then if a lower priority task was suspended in the middle of sending a message, it will be priority elevated, finish and return back. If the buffer fills it will time out and you just trash your message. If you can’t tolerate the 1 tick possible delay, you can either use a 0 tick delay and lose the occasional message because the lower priority task had the queue, or you could replace the mutex with a critical section. Especially if you are using the simple circular buffer, this should be a very short critical section (of course, if the buffer fills, you need to handle this and leave the critical section.)

The concept of queuing up  “message buffers” also works very well for this sort of thing, messages by their nature are kept together. It does add complexity, but does improve efficiency by a fair amount.

fojtik wrote on Saturday, May 29, 2010:

I posted this message to probe whether “xQueueSendToBackMultiple” is planned.

Thank you for hints. It is not a good ides to cache buffers in my particular case, because they are internally used inside Clib and it is not good idea to rewrite whole IAR Clib - see below.

DMA is a “gun shooting of a sparrow” (just Czech proverb). It will surely work, but I am afraid that 128kbit/s UART line cannot emit more than 12000 interrupts per second. For my particular architecture it is less than 3000 ints/s - 4 byte HW FIFO on UART.
When I write my own FIFO, DMA could point into it. But I am afraid that overhead would be pretty big. It would be necessary to check even DMA transfer position, just because the transfer is slow and DMA area could occupy significant amount of buffer.

DMA is a good for ethernet or SSI, where transfer rates are from 10Mbit/s to 100Mbit/s.

/* If the __write implementation uses internal buffering, uncomment
* the following line to ensure that we are called with “buffer” as 0
* (i.e. flush) when the application terminates. */
size_t __write(int handle, const unsigned char *buffer, size_t size)
  size_t nChars;

  if (buffer == 0)
  { /*
     * This means that we should flush internal buffers.  Since we
     * don’t we just return.  (Remember, “handle” == -1 means that all
     * handles should be flushed.) */
    return 0;

  /* This template only writes to “standard out” and “standard err”,
   * for all other file handles it returns failure. */
  if (handle != _LLIO_STDOUT && handle != _LLIO_STDERR)
    return _LLIO_ERROR;

  nChars = 0;
  for (/* Empty */; size != 0; -size)
    if(xQueueSendToBack(LOG2STREAM->Tx_QueueHandle,&(*buffer++),clib_STREAM_OSqueueTxTicksToWait) != pdPASS ) 
      return _LLIO_ERROR;

  return nChars;