Passing replies back from tasks

I’m working implementing “RPC” over MQTT, but I’m having trouble working out how to implement timeouts safely.

Currently I have an MQTT task which reads commands from a queue, but I need to return responses to these commands to the caller. The current way I’m doing this is by pushing items like this to the queue:

typedef struct xQUEUE_ITEM
{
    // The handle of the task that pushed this item to the queue.
    TaskHandle_t xTaskHandle;
    // The response from the task.
    void * pvResponse;
} QueueItem_t;

And then using xTaskNotifyWait (in the caller) and xTaskNotify (in the MQTT task) to notify the caller that a response has been written by the task.

Here’s a stripped-down example of what I am doing:

static void prvMQTTTask( void * pvArg )
{
    for( ; ; )
    {
        QueueItem_t xItem;
        if( xQueueReceive( xQueue, &( xItem ), portMAX_DELAY ) == pdTRUE ) {
            strcpy( xItem.pvResponse, "This is an example!" );
            xTaskNotify( xItem.xTaskHandle, 0, eSetValueWithOverwrite );
        }
    }         
}

static BaseType_t xMQTTSend( void * pvResponse, TickType_t xTicksToWait )
{
    // Clear any pending notifications.
    xTaskNotifyWait( 0, 0, NULL, 0 );

    BaseType_t xResult = pdPASS;
    const QueueItem_t xItem = {
        .xTaskHandle = xTaskGetCurrentTaskHandle(),
        .pvResponse = pvResponse,
    };
    const TickType_t xStartTick = xTaskGetTickCount();
    if( xQueueSendToBack( xQueue, &( xItem ), xTicksToWait ) == errQUEUE_FULL )
    {
        xResult = pdFAIL;
    }

    if( xResult == pdPASS )
    {
        const TickType_t xEnqueueTicks = xTaskGetTickCount() - xStartTick;
        if( xEnqueueTicks > xTicksToWait )
        {
            // Already timed out.
            xResult = pdFAIL;
        }
        else
        {
            // Subtract time spent enqueueing.
            xTicksToWait -= xEnqueueTicks;
        }
    }

    if ( xResult == pdPASS )
    {
        // Wait for acknowledgement.
        if ( xTaskNotifyWait( 0, 0, NULL, xTicksToWait ) == pdFAIL )
        {
            // Timed out.
            xResult = pdFAIL;
        }
    }

    return xResult;
}

The issue I’m having with timeouts is that the MQTT task could end up writing to xItem.pvResponse after it has already “expired” (i.e. after xTaskNotifyWait has returned), which could end up writing to a variable that no longer exists.

Has anyone got any ideas on this? I think I need a way to notify the MQTT task that the item was cancelled.

I would send the task a new message with a NULL pointer to tell it not to answer the previous message. You probably want to wait for that to be acknowledged to know it is safe to reuse the buffer.

That does say your MQTTTask needs to stay responsive to its request Queue will processing requests, so it wouldn’t work with your example.

I would like to share an additional thought regarding this scenario. Instead of the current implementation, we might consider using portMAX_DELAY to block indefinitely in the sending task. However, it’s important to ensure that the MQTT task consistently provides a response, be it a success or a timeout notification. This approach would require maintaining the responsiveness of the MQTT task.

Thank you.

Ideally I’d like for this to work even if the MQTT task is not servicing the queue in a timely manner (e.g. slow network, as we call MQTT_ProcessLoopin that same loop).

In my current implementation I do use portMAX_DELAY when waiting for a notification as I don’t have a proper solution for timeout, but I’m concerned about callers getting hung for longer than the timeout.

I’m wondering if I can do something with a mutex to lock setting/checking a “cancelled” flag and storing the response, as that should provide synchronisation I think?

A few options, in MQTT_ProcessLoop use a very short timeout, to check the Queue with a zero time out and then loop.

Second, change your structure so your MQTTTask is just doing the MQTT_ProcessLoop and have something else modify “global” variables it looks at to indicate the sort of requests that are happening, with a synchronization object (like a Mutex) between the two sides with just sort holdings to keep both sides responsive. This could be a task getting messages from the Queue, or it could be the tasks just calling an API function that does the manipulation.

I tend to hide most of the inter-module abstractions like Queues behind an API.