Blocking Queues: Consumer-Receive Succeeds Before Producer-Send To Shared Queue

InitialStatesOfTasksAndQueues

Hi ! I have an initial state as shown in the figure above ! My task implementation is shown in the code below ! configUSE_PREEMPTION is 1.

#define TEMP_STR_LEN 9


static portTASK_FUNCTION( vBlockingQueueProducer, pvParameters )
{
    uint16_t usValue = 0;
    xBlockingQueueParameters * pxQueueParameters;
    short sErrorEverOccurred = pdFALSE;
    
    // Sid - 2022.02.23
    char *tasknamestr = pcTaskGetName( NULL );
    char tempstr[TEMP_STR_LEN];

    pxQueueParameters = ( xBlockingQueueParameters * ) pvParameters;

    for( ; ; )
    {
        if( xQueueSend( pxQueueParameters->xQueue, ( void * ) &usValue, pxQueueParameters->xBlockTime ) != pdPASS )
        {
            // Sid - 2022.02.24
            portENTER_CRITICAL();
            sprintf((char *) tempstr, "%s:FTS\n", tasknamestr);         // Sid - FTS = Failed To Send
            uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
            portEXIT_CRITICAL();

            sErrorEverOccurred = pdTRUE;
        }
        else
        {
            // Sid - 2022.02.24
            portENTER_CRITICAL();
            sprintf((char *) tempstr, "%s:%3d\n", tasknamestr, usValue);// Sid - Print the value that was successfully sent
            uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
            portEXIT_CRITICAL();

            /* We have successfully posted a message, so increment the variable
             * used to check we are still running. */
            if( sErrorEverOccurred == pdFALSE )
            {
                ( *pxQueueParameters->psCheckVariable )++;

                // Sid - 2022.02.24
                portENTER_CRITICAL();
                sprintf((char *) tempstr, "%s:EEF\n", tasknamestr);     // Sid - EEF = Error Ever occured is False
                uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
                portEXIT_CRITICAL();
            }
            
            /* Increment the variable we are going to post next time round.  The
             * consumer will expect the numbers to	follow in numerical order. */
            ++usValue;

            #if configUSE_PREEMPTION == 0
                taskYIELD();
            #endif
        }
    }
}
/*-----------------------------------------------------------*/

static portTASK_FUNCTION( vBlockingQueueConsumer, pvParameters )
{
    uint16_t usData, usExpectedValue = 0;
    xBlockingQueueParameters * pxQueueParameters;
    short sErrorEverOccurred = pdFALSE;

    // Sid - 2022.02.23
    char *tasknamestr = pcTaskGetName( NULL );
    char tempstr[TEMP_STR_LEN];

    pxQueueParameters = ( xBlockingQueueParameters * ) pvParameters;

    for( ; ; )
    {
        if( xQueueReceive( pxQueueParameters->xQueue, &usData, pxQueueParameters->xBlockTime ) == pdPASS )
        {
            // Sid - 2022.02.24
            portENTER_CRITICAL();
            sprintf((char *) tempstr, "%s:%3d\n", tasknamestr, usData); // Sid - Print the data that was successfully received
            uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
            portEXIT_CRITICAL();

            if( usData != usExpectedValue )
            {
                /* Catch-up. */
                usExpectedValue = usData;

                sErrorEverOccurred = pdTRUE;
              
                // Sid - 2022.02.24
                portENTER_CRITICAL();
                sprintf((char *) tempstr, "%s:DNE\n", tasknamestr);         // Sid - DNE = Data is Not Expected value
                uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
                portEXIT_CRITICAL();
              
            }
            else
            {
                /* We have successfully received a message, so increment the
                 * variable used to check we are still running. */
                if( sErrorEverOccurred == pdFALSE )
                {
                    ( *pxQueueParameters->psCheckVariable )++;
                  
                    // Sid - 2022.02.24
                    portENTER_CRITICAL();
                    sprintf((char *) tempstr, "%s:EEF\n", tasknamestr);     // Sid - EEF = Error Ever occured is False
                    uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
                    portEXIT_CRITICAL();
                }

                /* Increment the value we expect to remove from the queue next time
                 * round. */
                ++usExpectedValue;
            }

            #if configUSE_PREEMPTION == 0
                {
                    if( pxQueueParameters->xBlockTime == 0 )
                    {
                        taskYIELD();
                    }
                }
            #endif
        }
        else
        {
            // Sid - 2022.02.24
            portENTER_CRITICAL();
            sprintf((char *) tempstr, "%s:FTR\n", tasknamestr);     // Sid - FTR = Failed To Receive
            uart_printf((BYTE *) tempstr, TEMP_STR_LEN);
            portEXIT_CRITICAL();
        }
    }
}

The initial part of my output is shown below:

 QP4:  0
 QP4:EEF
 QC1:  0      ..........|   --->   Consumer task
 QC1:EEF      ..........|
 QP4:  1
 QP4:EEF
 QP5:  0
 QP5:EEF
 QC6:  0
 QC6:EEF
 QP2:  0      ..........|   --->   Producer task
 QP2:EEF      ..........|
 QC1:  1
 QC1:EEF
 QC3:  0
 QC3:EEF
 QP4:  2
 QP4:EEF

It looks like:

  1. A Consumer task (QC1) occurs before the respective Producer task (QP2).

  2. The Consumer task’s xQueueReceive() succeeds & 0 is removed from the queue that is shared by QC1 & QP2 ! How did this happen ? The shared queue should be empty when accessed by QC1, right ?

Hope someone can make sense of this ! Thanks in advance !

Show us the code how you create the tasks. Did you ensure that the queue objects are indeed only shared by the task pairs you expecr, or is it just one queue object shared by all tasks? If the latter, I do not see anything unexpected…

This is the code that creates the tasks: uxPriority is 2 !

void vStartBlockingQueueTasks( UBaseType_t uxPriority )
{
    xBlockingQueueParameters * pxQueueParameters1, * pxQueueParameters2;
    xBlockingQueueParameters * pxQueueParameters3, * pxQueueParameters4;
    xBlockingQueueParameters * pxQueueParameters5, * pxQueueParameters6;
    const UBaseType_t uxQueueSize1 = 1, uxQueueSize5 = 5;
    const TickType_t xBlockTime = pdMS_TO_TICKS( ( TickType_t ) 1000 );
    const TickType_t xDontBlock = ( TickType_t ) 0;

    /* Create the first two tasks as described at the top of the file. */

    /* First create the structure used to pass parameters to the consumer tasks. */
    pxQueueParameters1 = ( xBlockingQueueParameters * ) pvPortMalloc( sizeof( xBlockingQueueParameters ) );

    /* Create the queue used by the first two tasks to pass the incrementing number.
     * Pass a pointer to the queue in the parameter structure. */
    pxQueueParameters1->xQueue = xQueueCreate( uxQueueSize1, ( UBaseType_t ) sizeof( uint16_t ) );

    /* The consumer is created first so gets a block time as described above. */
    pxQueueParameters1->xBlockTime = xBlockTime;

    /* Pass in the variable that this task is going to increment so we can check it
     * is still running. */
    pxQueueParameters1->psCheckVariable = &( sBlockingConsumerCount[ 0 ] );

    /* Create the structure used to pass parameters to the producer task. */
    pxQueueParameters2 = ( xBlockingQueueParameters * ) pvPortMalloc( sizeof( xBlockingQueueParameters ) );

    /* Pass the queue to this task also, using the parameter structure. */
    pxQueueParameters2->xQueue = pxQueueParameters1->xQueue;

    /* The producer is not going to block - as soon as it posts the consumer will
     * wake and remove the item so the producer should always have room to post. */
    pxQueueParameters2->xBlockTime = xDontBlock;

    /* Pass in the variable that this task is going to increment so we can check
     * it is still running. */
    pxQueueParameters2->psCheckVariable = &( sBlockingProducerCount[ 0 ] );


    /* Note the producer has a lower priority than the consumer when the tasks are
     * spawned. */
    xTaskCreate( vBlockingQueueConsumer, "QC1", blckqSTACK_SIZE, ( void * ) pxQueueParameters1, uxPriority, NULL );
    xTaskCreate( vBlockingQueueProducer, "QP2", blckqSTACK_SIZE, ( void * ) pxQueueParameters2, tskIDLE_PRIORITY, NULL );



    /* Create the second two tasks as described at the top of the file.   This uses
     * the same mechanism but reverses the task priorities. */

    pxQueueParameters3 = ( xBlockingQueueParameters * ) pvPortMalloc( sizeof( xBlockingQueueParameters ) );
    pxQueueParameters3->xQueue = xQueueCreate( uxQueueSize1, ( UBaseType_t ) sizeof( uint16_t ) );
    pxQueueParameters3->xBlockTime = xDontBlock;
    pxQueueParameters3->psCheckVariable = &( sBlockingProducerCount[ 1 ] );

    pxQueueParameters4 = ( xBlockingQueueParameters * ) pvPortMalloc( sizeof( xBlockingQueueParameters ) );
    pxQueueParameters4->xQueue = pxQueueParameters3->xQueue;
    pxQueueParameters4->xBlockTime = xBlockTime;
    pxQueueParameters4->psCheckVariable = &( sBlockingConsumerCount[ 1 ] );

    xTaskCreate( vBlockingQueueConsumer, "QC3", blckqSTACK_SIZE, ( void * ) pxQueueParameters3, tskIDLE_PRIORITY, NULL );
    xTaskCreate( vBlockingQueueProducer, "QP4", blckqSTACK_SIZE, ( void * ) pxQueueParameters4, uxPriority, NULL );



    /* Create the last two tasks as described above.  The mechanism is again just
     * the same.  This time both parameter structures are given a block time. */
    pxQueueParameters5 = ( xBlockingQueueParameters * ) pvPortMalloc( sizeof( xBlockingQueueParameters ) );
    pxQueueParameters5->xQueue = xQueueCreate( uxQueueSize5, ( UBaseType_t ) sizeof( uint16_t ) );
    pxQueueParameters5->xBlockTime = xBlockTime;
    pxQueueParameters5->psCheckVariable = &( sBlockingProducerCount[ 2 ] );

    pxQueueParameters6 = ( xBlockingQueueParameters * ) pvPortMalloc( sizeof( xBlockingQueueParameters ) );
    pxQueueParameters6->xQueue = pxQueueParameters5->xQueue;
    pxQueueParameters6->xBlockTime = xBlockTime;
    pxQueueParameters6->psCheckVariable = &( sBlockingConsumerCount[ 2 ] );

    xTaskCreate( vBlockingQueueProducer, "QP5", blckqSTACK_SIZE, ( void * ) pxQueueParameters5, tskIDLE_PRIORITY, NULL );
    xTaskCreate( vBlockingQueueConsumer, "QC6", blckqSTACK_SIZE, ( void * ) pxQueueParameters6, tskIDLE_PRIORITY, NULL );
}

QC1 has high priority than QP2 and therefore, it will run first and get blocked on the queue receive. QP2 will then run and send an item to the queue which will unblock QC1. Therefore, the control from QP2’s xQueueSend will go to the QC1 as that would be the highest priority runnable task. QC1 prints the received items and blocks for the next receive at which point QP2 runs again and prints the item it posted to the queue.

Thanks Gaurav ! I didn’t realise that:

  1. Switch out from QC1 occurred during the blocking receive.

  2. Switch in to QP2 occured.

  3. Switch out from QP2 occurred after the send.

  4. Switch in to QC1 occurred.

  5. QC1 printed the received value !

After adding some prints in the right places, my output looked like:

 QP4:IFS        Infinite For loop Started
 xQS/GS.        xQueueSend / GenericSend        (QP4)
 QP4:  0        Sent value is     0
 QP4:EEF        Error Ever occured is False
 QC1:IFS
 xQRcv..        xQueueReceive (blocks)          (QC1)
 QP4:IFS
 xQS/GS.        xQueueSend / GenericSend        (QP4)
 QP2:IFS
 xQS/GS.        xQueueSend / GenericSend        (QP2)
 QC1:  0        Received value is 0             (QC1)
 QC1:EEF
 QC1:IFS
 xQRcv..
 QC3:IFS
 xQRcv..
 QP4:  1
 QP4:EEF
 QP4:IFS
 xQS/GS.
 QP5:IFS
 QC6:IFS
 xQRcv..
 QP2:  0        Sent value is     0
 QP2:EEF        Error Ever occured is False