Blocking on multiple queues?

n1vg wrote on Tuesday, March 15, 2005:

I’ve got a task that needs to wait for input on multiple queues.  It’s not worth the extra RAM overhead to have multiple tasks, so I either set up a loop that checks each queue without blocking, and pauses for my maximum allowable latency time at the end, or I block for a fraction of that time at each queue.

Input to these queues is infrequent, but it needs to be processed in a reasonable amount of time.  The approach above works, but it does a lot more context switching that it would if it could just block on multiple queues simultaneously.

Is there an easy way to do this that I’m missing?



rtel wrote on Tuesday, March 15, 2005:


How about:

+ Use the same queue for all data.
+ Any task/ISR that wants to post to the queue posts a structure that lets the receiver know from where the data originated (or how to interpret the data).
+ Block on the queue.

For a simplistic example (not necessarily optimal), if you want to receive data from COM0 and COM1, then rather than have two queues - one for each COM port - have both COM ports post to the same queue as follows:

#define COM0_DATA 0
#define COM1_DATA 1

typedef struct
____portCHAR cDataID;
____portCHAR cData; // Could be a pointer to a more complex data item or buffer instead.
} xQueueItem;

void vRxTask( void *pvParameters )
xQueueItem RxData;

________cQueueReceive( xQueue, ( void * )&RxData, portWAIT_FOREVER );

________//RxData contains the data AND the source of the data.
________switch( RxData.cDataID )
______________case COM0_DATA : ProcessCom0Data(RxData.cData); break;
______________case COM1_DATA : ProcessCom1Data(RxData.cData); break;

[error checking/return values, etc not shown]

n1vg wrote on Tuesday, March 15, 2005:

Might be doable, but I’ll have to rethink my architecture.  In this particular example, I’ve got queues serviced by a USART ISR, and queues serviced by a USB CDC task.

The separate modules call a function to register with the handler task.  Registered ports have a data structure with pointers to their TX and RX queues, plus other state information.

My main concern with a shared queue would be overflow - if a port is slow or hung (might be a network socket, for example), the queue would fill with data for the stalled port and the others would block.

Do other RTOSes have mechanisms to deal with this, or is it just a matter of proper architecture in the user program?

rtel wrote on Tuesday, March 15, 2005:

Blocking on multiple queues itself is vary rare (feel free to correct me anybody - this is just my experience).  I cannot think of a method of implementing it that would not take a lot of resource [both processor and RAM], but this might be an interesting challenge for the future.  I have an idea about a data structure that could sit between tasks and blocking lists  dont know how this will pan out though.

There may be other methods - event flags allow a single event to wake multiple tasks - you just want to do it in reverse.  Berkley sockets have the select() call also.

Another, probably less elegant solution, would be to have an additional queue that was used to unblock the receiving task.  A task that wanted to send data would post data to the usual queue first, then send a message on the new queue to wake the task and tell it which queue to poll to get the posted data.  This is probably worse than my first solution, but may fit in with your architecture better.

mikehibbett wrote on Wednesday, March 16, 2005:

I’m using threadx, and that doesn’t allow blocking on multiple queues. There is a useful feature of ‘event flags’ - 32 bit arrays in which you can block on one or more event flags being set.

We recently had exactly that problem with our design, and solved it by having a source id in the queued message. Seems to work fine.


nobody wrote on Monday, April 17, 2006:

This function xDoubleQueueWait allows you to block on two queues, or more if you wish. (I think two should be enough!)
To use it, cut & paste the patch in queue.c
Richardbarry, please give your blessing.


/*----- START xDoubleQueueWait ------*/

signed portBASE_TYPE xDoubleQueueWait(xQueueHandle pxQueue1, xQueueHandle pxQueue2, portTickType xTicksToWait )
signed portBASE_TYPE xReturn = 0;

xListItem    LocalEventListItem1;
xListItem    LocalEventListItem2;
xList        LocalEventList;
void*     pxOwner;

    /* This function allows waiting for two queues until one of them unblock,
    for example one for socket/message queue, one for serial i/o.
    it returns 0 (none of them / timeout), 1 (queue1) or 2 (queue2) that can
    be used in a switch block.
    I create a local event list to which the task is placed on so that the
    EventListItem in TCB points to this local event list. (using vTaskPlaceOnEventList)
    For each queue, I insert a locally allocated event-list-item with owner set to
    this task’s TCB.
    Then when an event arrives to either queue, the event will resume this task
    by removing it from the local event list.
    Afterward we can then safely remove the two event-list-items from the two queues.
    This is made possible because xTaskRemoveFromEventList contains the following two lines:
        pxUnblockedTCB = ( tskTCB * ) listGET_OWNER_OF_HEAD_ENTRY( pxEventList );
        vListRemove( &( pxUnblockedTCB->xEventListItem ) );
    So it removes the item from pxUnblockedTCB->EventListItem rather than
    from the pxEventList->pxHead->pxNext.
    Normally the two should point to the same thing.
    However (fortunately for this modification), the pxUnblockedTCB->xEventListItem is
    contained in our local event-list (actually dummy event list), and not in the
    event list that causes the Task to be removed.
    This allows the task to subscribe to more than one event list by
    allocating list items locally on stack
    RichardBarry, please don’t change this nice loophole :slight_smile:

    /*Make sure other tasks do not access the queue. */

    /* Make sure interrupts do not access the queue. */
    prvLockQueue( pxQueue1 );
    prvLockQueue( pxQueue2 );
    if(prvIsQueueEmpty( pxQueue1 ) && prvIsQueueEmpty( pxQueue2 ))
        /* There are no messages in both queues, do we want to block or just
        leave with nothing? */           
        if( xTicksToWait > ( portTickType ) 0 )
            vTaskPlaceOnEventList( &LocalEventList , xTicksToWait );
            pxOwner = listGET_OWNER_OF_HEAD_ENTRY( ( &LocalEventList ) );
            listSET_LIST_ITEM_OWNER( &LocalEventListItem1, pxOwner );
            listSET_LIST_ITEM_OWNER( &LocalEventListItem2, pxOwner );
            vListInsert(  &( pxQueue2->xTasksWaitingToReceive ), & LocalEventListItem1  );
            vListInsert(  &( pxQueue2->xTasksWaitingToReceive ), & LocalEventListItem2  );
                prvUnlockQueue( pxQueue1 );
                prvUnlockQueue( pxQueue2 );
                if( !xTaskResumeAll() )

                prvLockQueue( pxQueue1);
                prvLockQueue( pxQueue2);

            vListRemove(  &LocalEventListItem1 );
            vListRemove(  &LocalEventListItem2 );
            xReturn = 0;
            if(prvIsQueueEmpty( pxQueue1 ) != 0)
                xReturn = 1;
            if(prvIsQueueEmpty( pxQueue2 ) != 0)
                xReturn = 2;
            xReturn = 0;
        if(prvIsQueueEmpty( pxQueue1 ) != 0)
            xReturn = 1;
        if(prvIsQueueEmpty( pxQueue2 ) != 0)
            xReturn = 2;

    /* We no longer require exclusive access to the queue. */

    /* !! Here we must use binary OR, not logical OR since both queue must
    be unlocked */
    if( prvUnlockQueue( pxQueue1 ) | prvUnlockQueue( pxQueue2 ) )
        if( !xTaskResumeAll() )
    return xReturn;

imajeff wrote on Tuesday, April 18, 2006:

I’ve never thought it important either to block one task on multiple queues, but I do wonder about being able to add multiple listeners to one queue.

Basically, it is easy on the other end… multiple tasks can write to the same queue. I just sometimes want the ISR to only have to send an event to one queue, but have that effectively wake multiple tasks sequentially.

nobody wrote on Tuesday, April 18, 2006:

Might be costly time wise, but if you want an ISR to wake all the tasks on a queue you can call xQueueSendFromISR() repeatedly passing in false as the xTaskPreviouslyWoken parameter until the function also returns false.

nobody wrote on Thursday, April 20, 2006:

Case application for one task waiting for multiple queues:
Comm protocol task, with one queue from serial ISR and other queue from Application (for posting requests).
I am interested to know how others address this design issue without resorting to this double-queue wait.
Serial port data comes one byte at a time so it can’t be mixed up with app request. The other solution is polling the two queues which is either CPU intensive or slow (depending on polling delay.) It’s not healthy either for low power application (battery operated), since we can’t put the system to low power while waiting.
Another way I can imagine is two tasks waiting for each queue, but synchronizing the two process would be a nightmare.

Any suggestions?

For multiple tasks waiting for one queue I think it’s already handled by FreeRTOS, am I mistaken?
Although I don’t see much interest in doing so.