MqttAgent: how to cancell publish command with qos2 after timeout

How do I send a message with qos2 but with a confirmation timeout?
I am sending large messages, the memory for which is allocated dynamically, and if no callback is called after xx seconds, then I need to cancel the sending process. The description of the MQTTAgent_Publish command must follow the Note recommendation. The context passed to the callback through pCmdContext member of pCommandInfo parameter MUST remain in scope at least until the callback has been executed by the agent task.

After a timeout for publication, I release the allocated memory, the memory for the message, and as a result, the agent’s operation becomes incorrect.

xCommandAdded = MQTTAgent_Publish(&xGlobalMqttAgentContext,pmqtt_message,pxCommandParams );
....
 awaiting notification from the callback

ack_result =  xTaskNotifyWait(0,0,NULL,pdMS_TO_TICKS( mqttMS_TO_WAIT_FOR_NOTIFICATION ));
.....
 i need to cancell publish command in this place
vPortFree()....



Hey @pistoletov1974,
One way that you can abort the sending process after a particular timeout is by configuring the MQTT_SEND_TIMEOUT_MS macro. If the send process of any packet (not just publish) is not complete within MQTT_SEND_TIMEOUT_MS time then the send API will return with MQTTSendFailed and the completion callback will be called.

You can then simply add the logic to free the memory in the completion callback by checking the return status. Refer to the official coreMQTT documentation for send timeout to know more.

Let us know if this works for you.

thanks for the answer. Сan this be used when working through Core MQTT Agent?

Yes. You can set the value for that macro in the core_mqtt_config.h file. This config file is used by the coreMQTT library for all the configuration macros. You can take the coreMQTT-Agent Demos as a reference. You just need to add the following in this file:

 #define MQTT_SEND_TIMEOUT_MS    ( ... insert time in ms here ... )

as I understand it, this parameter does not affect the callback of a command sent through an MQTT agent.
The callback is made only after confirmation from the broker, i.e. after the command is fully executed

Blockquote

Command Completion

Commands do not have any timeout associated with them. The only way for a task to be aware of a command’s completion is through the invocation of an optional MQTTAgentCommandCallback_t completion callback. The completion callback will be invoked with an optional MQTTAgentCommandContext_t, which is the incomplete type struct MQTTAgentCommandContext. This type must be defined by the application, and should contain information that would be useful in distinguishing commands.

Yes that is correct. But let me explain how this would work in case of a timeout.

If you have configured the MQTT_SEND_TIMEOUT_MS macro to say 5000 i.e. 5s and your packet is so big that it takes 10s to send then the command function will not be able to send the packet completely and will return after 5s with a return code of MQTTSendFailed. This will mark the end of this command. Only if it were MQTTSuccess then we would have to wait for the ACK before we can say the command is complete.

As a result of your command completion, your callback will be called which would tell you that the packet was not sent (as the return code was MQTTSendFailed) and you can free the memory as required.

Take a look at the MQTT_CommandLoop code here for more details.

I should also mention here that the functionality that you are trying to get (timeout on sending a publish) is something that the library does not support directly. That is the reason there is no documentation for this as well. What I have explained above is just a work around to achieve this functionality.

Let me know if more clarification is needed.

This is part of the demo code

static void prvMQTTAgentTask( void * pvParameters )
...
        xMQTTStatus = MQTTAgent_CommandLoop( &xGlobalMqttAgentContext );
//        xGlobalMqttAgentContext.mqttContext.connectStatus = xMQTTStatus;
//        broker_isConnected = FALSE;
        LogDebug(("MQTT com loop: %d",xMQTTStatus));
        /* Success is returned for disconnect or termination. The socket should
         * be disconnected. */
        if( xMQTTStatus == MQTTSuccess )
        {
            /* MQTT Disconnect. Disconnect the socket. */
            xNetworkResult = prvSocketDisconnect( &xNetworkContext );
        }
.....the code for reconnecting to the broker...


As I understand it, if the connection with the broker is lost, we exit the MQTTAgent_CommandLoop, and the commands and callbacks are not executed. As I understand it, if the connection with the broker is lost, we exit the loop, and the commands and callbacks are not executed. When the connection with the broker is restored, the message is successfully published, and a callback is received indicating the successful operation.

Ah right. Thanks for pointing this out. The problem with the suggested work around is that it will cause the command loop to return with a MQTTSendFailed return code even if the connection with the broker is still up. There is no way to distinguish between the MQTTSendFailed returned due to packet not being sent because of the timeout and the MQTTSendFailed returned due to a broken connection with the broker. So the safer thing to do here is to close the socket and reconnect when the command loop returns an error.

As far as the call to the callback is concerned, it will be called even if the command returns MQTTSendFailed see this part of the code.

...
operationStatus = commandFunction( pMqttAgentContext, 
                                   pCommandArgs, 
                                   &commandOutParams );

    if( ( operationStatus == MQTTSuccess ) && ... )
    {
        ...
    }

    if( ( pCommand != NULL ) && ( ackAdded != true ) )
    {
        /* The command is complete, call the callback. */
        concludeCommand( pMqttAgentContext, pCommand, operationStatus, NULL );
    }
...

This means that you can still free the memory in the callback.

I agree this is not a very clean way of dealing with the problem you are trying to solve. The library does not give support for it and this is just a work around.

1 Like

As @DakshitBabbar mentioned, there is no clean way of handling timeout. One potential workaround may be - when you decide to timeout a publish message, do not release the resources right way and only do them when you get a callback:

MQTTCallback()
{
    if( publish message is timed out )
    {
         /* Release resources. */
    }
    else
    {
         /* Handle message. */
    }
}

@aggarg @DakshitBabbar

How I did the code research: disconnected the ethernet cable (the mqttAgentCommandloop exited) And then the tcp socket reconnect code was executed. Since the loop was stopped, the callback was not called after 20sec.

    #define MQTT_SEND_TIMEOUT_MS    ( 20000U )

But as soon as the connection was restored, all the accumulated messages were published, and the callbacks were called with a successful completion and memory cleanup. This is not the best option, as json messages take up a significant amount of memory. I would like to have a behavior where if a message is not published within xx seconds, I store a few integer values in non-volatile memory and then attempt to publish again after 24 hours.
It might be a good idea to add a timeout parameter to the command context. Because if i clear the memory after a while, it can cause problems with sending the next commands

//wait callback execution
ack_result =  xTaskNotifyWait(0,0,NULL,pdMS_TO_TICKS( mqttMS_TO_WAIT_FOR_NOTIFICATION ));
....
vPortFree(....)

That sounds promising. Please open an issue on the repository describing your use case in detail. And if you decide to implement the solution, we’d welcome a pull request!