I have an MQTT agent running for WiFi for the ESP32. I notice that that the MQTTAgent_CommandLoop exits when the wifi drops. What is the correct way restarting the MQTT agent with or without clearing the queue? Please Advice.
Ideally, I’d like to hold the MQTT Agent on hold untill the network layer re-establishes its connection with Wi-Fi.
The return code is MQTTRecvFailed rather than MQTTKeepAliveTimeout which I don’t understand why.
static void prvMQTTAgentTask( void * pParam )
{
BaseType_t xResult = pdFAIL;
MQTTStatus_t xMQTTStatus = MQTTSuccess, xConnectStatus = MQTTSuccess;
( void ) pParam;
LogInfo( ( "MQTT agent run" ) );
/* MQTTAgent_CommandLoop() is effectively the agent implementation. It
* will manage the MQTT protocol until such time that an error occurs,
* which could be a disconnect. If an error occurs the MQTT error code
* is returned and the queue left uncleared so there can be an attempt to
* clean up and reconnect however the application writer prefers. */
xMQTTStatus = MQTTAgent_CommandLoop( &xGlobalMqttAgentContext );
if( MQTTKeepAliveTimeout == xMQTTStatus )
{
LogError( ( "MQTTKeepAliveTimeout occured." ) );
}
else
{
LogError( ( "Failure reason : %d", xMQTTStatus ) );
}
}
MQTTRecvFailed indicates that the underlying transport receive function used for the coreMQTT network interface port failed (see coreMQTT: Porting Guide) (usually because the connection to the server died) and MQTTKeepAliveTimeout means that coreMQTT pinged the server for a response (to check if the connection was still up) but never received a response (coreMQTT KeepAlive). See coreMQTT: Enumerated Types for all values that MQTTStatus_t can take and what they mean. Essentially what you need to do after this happens is disconnect the socket used by the MQTTContext stored in xGlobalMqttAgentContext (this frees up resources), reconnect to wifi (if needed), re-establish the TCP connection to the server, then re-establish the MQTT connection to the MQTT broker of the server. If you chose to have a persistent session then you should call MQTTAgent_ResumeSession, and if not/if the MQTT session was dropped by the MQTT broker, you will have to re-subscribe to all topics that your device was subscribed to (example), then call MQTTAgent_CommandLoop again in the same way.
Some pseudo-code:
static void prvMQTTAgentTask( void * pParam )
{
BaseType_t xResult = pdFAIL;
MQTTStatus_t xMQTTStatus = MQTTSuccess, xConnectStatus = MQTTSuccess;
( void ) pParam;
LogInfo( ( "MQTT agent run" ) );
do
{
/* MQTTAgent_CommandLoop() is effectively the agent implementation. It
* will manage the MQTT protocol until such time that an error occurs,
* which could be a disconnect. If an error occurs the MQTT context on
* which the error happened is returned so there can be an attempt to
* clean up and reconnect however the application writer prefers. */
xMQTTStatus = MQTTAgent_CommandLoop( &xGlobalMqttAgentContext );
/* If transport failed then connection needs to be re-established */
if(xMQTTStatus == MQTTRecvFailed || xMQTTStatus == MQTTSendFailed)
{
/* Mostly pseudo code past this point. */
mqttBrokerConnectionStatus = NOT_CONNECTED;
/* Disconnect socket used for underlying transport of MQTT messages */
disconnectSocket(socketUsedInMQTTContext);
/* Continuously attempt to establish MQTT connection with server */
while( mqttBrokerConnectionStatus != CONNECTED )
{
/* Check if you still are connected to WiFi as perhaps just the
* connection to the server died */
wifiConnectionStatus = getWifiStatus();
if(wifiConnectionStatus != CONNECTED)
{
wifiConnectionStatus = connectToWifi();
}
serverTCPConnectionStatus = NOT_CONNECTED;
/* If wifi connection is established,
* attempt to connect to server. */
if(wifiConnectionStatus == CONNECTED)
{
serverTCPConnectionStatus =
connectToServer(serverCreds, socketUsedInMQTTContext);
}
/* If connected to server,
* attempt to establish an MQTT connection. */
if(serverTCPConnectionStatus == CONNECTED)
{
/* This call should also handle persistent session management
* and resubscribing to topics if your session is clean
* See example linked above. */
mqttBrokerConnectionStatus =
establishMQTTConnectionWithServer(MQTTContext);
}
}
}
else if (xMQTTStatus == MQTTSuccess)
{
/* If MQTTAgent_CommandLoop returns MQTTSuccess, this means it was
* commanded to terminate the connection so clean up the underlying
* socket. */
disconnectSocket(socketUsedInMQTTContext);
}
else
{
/* For any other return, just print out an error.
* MQTT_Status_strerr is a function from coreMQTT that takes a status
* and stringifies it. */
LogError(("MQTTAgent_CommandLoop returned: %s",
MQTT_Status_strerr(xMQTTStatus)))
}
} while( xMQTTStatus != MQTTSuccess );
/* MQTT_Success indicates the agent was told to terminate so stop */
/* Clean up task resources */
vTaskDelete(NULL);
}
Other considerations are to take into account that if MQTTAgent_CommandLoop fails and other tasks are trying to perform MQTT operations through the MQTTAgent that are higher or equal priority to the task that calls MQTTAgent_CommandLoop (at least if you have the command loop task wait for reconnection - which you should because otherwise its just going to process and fail commands until connection is re-established) - you might run into the issue that the queue of commands for the command loop becomes full.
A simple solution to that problem is to just have the task that calls MQTTAgent_CommandLoop be at a higher priority than tasks that issue MQTT commands - but you’ll probably not want to do this as MQTTAgent_CommandLoop only blocks when the command queue is empty so you’ll end up having a lot of context switches - a task will issue an MQTT command, this unblocks the agent task as there is now something in its command queue, since agent task is higher priority it will get scheduled, it will process the one command given by the task that unblocked it, since agent task has nor more commands to process it will block again, so on and so forth. Effectively you’d only be servicing one command each time the MQTT agent task runs doing this.
So a better and more universal solution to this issue is to use some sort of flag that gets set once MQTTAgent_Command loop fails that prevents other tasks that from passing commands to MQTTAgent until the connection is re-established - you probably just want all of these tasks to block so your MQTT agent task (or whatever task you end up using to establish reconnection) runs without interruption. To achieve this in a nice way, FreeRTOS has what are called event groups. For information on these see FreeRTOS event bits, event groups and event flags and for the API see FreeRTOS event groups and event bits API functions. Essentially what these allow you to do is have a task check and block on an event bit being set. In your case this event bit could be something that notifies that the connection is good and your tasks that use MQTT-Agent block on that bit being set.