Invalid pending ack

We are in the process of integrating CoreMQTT library along with MQTT Agent.
Unfortunately we are getting intermittent crash caused by invalid pointer access while subscription callback is being serviced.
In our application, we always subscribe 1 topic at a time and call
MQTTAgent_Subscribe() with following arguments

MQTTSubscribeInfo_t xSubscribeInfo = { 0 };
MQTTAgentSubscribeArgs_t xSubscribeArgs = { 0 };
CommandInfo_t xCommandParams = { 0 };
CommandContext_t xApplicationDefinedContext = { 0 };
MQTTStatus_t xCommandAdded;
uint32_t ulNotificationValue;

xSubscribeInfo.qos = qos;
xSubscribeInfo.pTopicFilter = topic;
xSubscribeInfo.topicFilterLength = ( uint16_t ) strlen( topic );
xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo;
xSubscribeArgs.numSubscriptions = 1;

/* Complete an application defined context associated with this subscribe message.
 * This gets updated in the callback function so the variable must persist until
 * the callback executes. */
xApplicationDefinedContext.xTaskToNotify = xTaskGetCurrentTaskHandle();
xApplicationDefinedContext.pArgs = ( void * ) &xSubscribeArgs;

xCommandParams.blockTimeMs = MAX_COMMAND_SEND_BLOCK_TIME_MS;
xCommandParams.cmdCompleteCallback = prvSubscribeCommandCallback;
xCommandParams.pCmdCompleteCallbackContext = ( void * ) &xApplicationDefinedContext;

xCommandAdded = MQTTAgent_Subscribe( &xGlobalMqttAgentContext,
                                        &xCommandParams );
    if ( xCommandAdded != MQTTSuccess )
    ERR_LOG("error(%d) while subscribing thru MQTT agent\n", xCommandAdded);
    return xCommandAdded;
/* Wait for acks from subscribe messages - this is optional.  If the
 * returned value is zero then the wait timed out. */
xTaskNotifyWait( 0, mqtt_SUB_COMMAND_ACK_BIT, &ulNotificationValue, MS_TO_WAIT_FOR_NOTIFICATION );

When the crash occurs from invalid pointer access, it is
xSubcribeArgs.pSubscribeInfo that appears to have been corrupt. Actually xSubscribeArgs.numSubscriptions also is some huge number.

I understand that the memory for xSubscribeInfo in xSubscribeArgs.pSubscribeInfo = &xSubscribeInfo;
is from the task stack and, therefore, it will no longer be valid if the task (which calls MQTTAgent_Subscribe() and blocks afterwards until subscription callback is invoked) times out from xTaskNotifyWait.
I double-confirmed that the timeout is NOT happening but sometime when a subscription callback is invoked, it is invoked with invalid MQTTAgentSubscribeArgs_t*
for our callback implemented as below:

static void prvSubscribeCommandCallback( void * pxCommandContext,
                                     MQTTAgentReturnInfo_t * pxReturnInfo )
bool xSubscriptionAdded = false;
CommandContext_t * pxApplicationDefinedContext = ( CommandContext_t * ) pxCommandContext;
MQTTAgentSubscribeArgs_t * pxSubscribeArgs = ( MQTTAgentSubscribeArgs_t * ) (pxApplicationDefinedContext->pArgs);

I was wondering if pPendingAcks list in MqttAgentContext was somehow not managed properly. The size of the pending ack list is defined to be 200 in our application and the total number of subscription topics in our app is way less than 200. It’s around 50 only.

Does anyone know what we might be missing or perhaps there is a known bug in MqttAgent?

Another question I have is whether blocking after calling MqttAgent_Subscribe() with “timeout” is a reasonable way to deal with the sync between the caller and the MQTT agent task.
What I am not sure is what the timeout should be. The issue I see is if timeout occurs but a subscription callback is invoked, since pxSubscribeArgs is from the caller’s stack, it won’t be valid any more. What timeout value can guarantee that this won’t happen. I am not sure if that’s possible since it’s Broker implementation specific and there can be bad broker’s out there too.

Any insight on the first and the second question would be greatly appreciated.

Sorry not a full answer yet as I’m on my phone but want to ensure you are aware there are some examples you can use as a reference here and here - there is also a visual studio build for the second link. Hope that helps.

Is all the code you posted within the same function, making…

MQTTSubscribeInfo_t xSubscribeInfo = { 0 };

…as stack variable?

If so, then if xTaskNotifyWait() times out (not sure how long MS_TO_WAIT_FOR_NOTIFICATION is), and the function returns, then xSubscribeInfo will no longer be valid when the callback is executed.

Hi @jamesk,

If your task is blocking until the subscribe is completed, I’m not sure why the subscribe arguments struct is getting corrupted, as adding to/retrieving from the pending ack list shouldn’t alter the arguments for any command. Perhaps you can try setting breakpoints when a command is added to the queue, added to the ack list, and retrieved from the ack list to verify that the pCommand->pArgs pointer is the same. Some things I can think of:

  • Every element of the subscribe args needs to stay in scope until prvSubscribeCommandCallback is invoked, including xSubscribeArgs, xSubscribeInfo, and xSubscribeInfo.pTopicFilter. Is it possible something is getting overwritten in another thread, or before the callback is invoked?
  • Could the notification be coming from a previous subscription, and you are just 1 notification out of sync? You could try giving each subscribe command a different bit to set, instead of reusing mqtt_SUB_COMMAND_ACK_BIT for every one.

As for your second question, you’re correct that we rely on the broker to send a SUBACK in a timely manner, and if the SUBACK never arrives then the entry will be stuck in the pending ack list. Since the MQTT Agent doesn’t have any timeouts in the library itself, one way you can recover in this situation is:

  1. Call MQTTAgent_Terminate() from the task that initially called MQTTAgent_Subscribe(). When a terminate command is processed, it internally calls MQTTAgent_CancelAll to cancel every command that is still in the queue or in the pending ack list, and invokes their callbacks with an MQTTRecvFailed return code. However, the MQTT connection is still active and not disconnected.
  2. From the agent task (the one calling MQTTAgent_CommandLoop), restart the command loop instead of disconnecting the socket.
  3. Retry the subscription from the subscriber task.

Additionally, you said the pending ack list was 200 for your application. In case it was not clear, the list only needs to be as large as the maximum number of simultaneous ack-awaiting operations you expect to have. If you only send subscribes 1 at a time, and block for every one, your list could be much smaller (unless you also plan on sending 100+ QoS 1 publishes simultaneously). I’d recommend setting it to a lower value, as a list size of 20 would save you about 6 * 180 = 1080 bytes in the MQTTAgentContext_t struct.

Please let us know if you have further questions

Thanks Muneeb. Your answer clarifies and confirms quite a bit for me.

Yeah the subscribe arguments struct getting corrupted was a big clue that indicates the stack was being corrupt. It turned out that it was NOT the timeout causing the stack corruption but improper use of xTaskNotifyWait() and xTaskNotify(). The subscriber task was woken up by unexpected notification(something other than SUBACK) and unwound the stack.

Regarding the pending ack list size of 200, I appreciate your explanation. While I was debugging step-by-step, I also realized that the list size needs to be big only if simultaneous ack-awaiting operations are expected.
In our case we have a dedicated task responsible for subscribing & publishing for the app tasks and therefore, ideally we can have a size of 1 pending ack list since we are doing QoS 0.
And also instead of having the subscribe arguments on the stack, we can have static subscribe arguments.
If you could comment on this, I would appreciate it too.

Regarding dealing with misbehaving broker that may not send a SUBACK in a timely manner or a case where broker disconnect occurs while waiting for SUBACK, I will need to carefully think about your recovery suggestions. It’s probably a really rare case that we will run into such scenario but it is possible.

And also instead of having the subscribe arguments on the stack, we can have static subscribe arguments.
If you could comment on this, I would appreciate it too.

There isn’t an issue with having the subscribe arguments be constants. Also, if you’re using the example subscription manager in our demo, then the topic filter string needs to be a static string and stay in scope for the lifetime of the subscription, as it isn’t copied into a buffer.