MQTTAgent Publish is slow as compare to FreeRTOS MQTT

Hi rtel,

Case 1:
I tried using the MQTTAgent Connection context passing to the coreMQTT-Publish, in this case I can see improvement in speed. but huge data loss observed.

Case 2:
Direct CoreMQTT connection (without Agent) and tried publishing, Not much increase in speed but data loss observed.

  1. Please help me out speeding up the MQTT-Agent .
  2. Or else Please help me to avoid data loss for the case1. (QOS1 also didn’t help me)

Regards,
Suresh

I think QOS will always make your transfer slower as it adds data to the link, but data loss should never happen on a TCP socket even without QOS1, in fact if data loss happens on a MQTT connection the connection would break and need teardown as the MQTT protocol cannot handle data loss.

I think you should understand why you are losing data because that is definitely not normal, there is no case where you should ever see any data loss during normal operation.

If you can provide Wireshark logs to us then that’d be great. I would also like to understand what do you mean by data loss since as Cobus pointed out, without a break in the connection, there should not be any data loss over MQTT (which uses underlying TCP) connection.

EDIT: Additionally, would you mind checking whether the configASSERT is defined in your copy of FreeRTOSConfig.h file?

Hi Cobusve:

Thanks for your response.

When I am running with Above mentioned Case 1: (MQTT-Agent along with Core-Publish API)
[I understand cross publish (MQTT Agent to core-Publish) may not be supported]
in debug mode, as you mentioned didn’t observe any connection loss but can’t see my data in the cloud. Please find the log traces in debug mode.

[WIFI_MQTT] [ERROR] No ack found for packet id 12.
[WIFI_MQTT] [ERROR] No operation found matching packet id 12.
[WIFI_MQTT] [ERROR] No ack found for packet id 16.
[WIFI_MQTT] [ERROR] No operation found matching packet id 16.

Some more observations below.

Case A: MQTT-Agent publish taking ~4 seconds to publish complete data.

Case B: Core-MQTT publish taking ~8 seconds to publish complete data.

Case C: FreeRTOS MQTT V2.1.1 publish taking ~2 seconds to publish complete data.

I am looking help for Case A to improve publish time for the MQTT-Agent (at least equivalent to to case c ).

Hi kanherea
Thanks for your response.

Currently i cant provide the WireShark logs. configASSERT is defined in FreeRTOSConfig.h

Regards,
Suresh

Hi,

I can see lots of difference in with coreMQTT Agent v1.0.0 and AWS IoT Over-the-air Update v3.0.0 in terms of Publishing time and Update timings.

When I compare this timing along with old FreeRTOS MQTT V2.1.1 & FreeRTOS OTA V1.1.1, I observed better result with old SDK only.

Is it expected behaviour? Hopefully latest SDK shall perform much faster than Old SDK, can anyone help me out how to increase the speed of the both agents.

Regards,
Suresh

Could you share the code you are using to compare, or a testcase, so that we can try to reproduce this issue? coreMQTT-Agent uses coreMQTT’s publish under the hood, so it being faster seems to be likely an issue with setup.

Hi archigup,

Thanks for your response,

After Optimizing the CoreMQTT network buffer size (1024 to 8000) and MQTTProcess Loop timeout (500 to 0), Now CoreMQTT also performing same like MQTT Agent (both are Publishing @ ~4 sec time).

But as I mentioned above, both (MQTT Agent & Core MQTT) publishing time is low, as compare to FreeRTOS OTA V1.1.1.

I have used the example code from mqtt_demo_helpers.c and used same API’s to publish the data.

API’s FYI,

  • EstablishMqttSession()

  • PublishToTopic()

Regards,
Suresh

Hi,

Can anyone help me resolving this issue?

Is there any testing evidence that the latest SDK ( coreMQTT Agent v1.0.0 ) can perform faster than the old SDK( FreeRTOS MQTT V2.1.1 )?

My battery life is 20% more if I use latest SDK, If any chances is there to optimize please let us know and help me on resolving the issue.

Please find the below debug logs for CoreMQTTAgent Connection & CoreMQTTAgent Publish including millisecond timestamp.

Establishing CoreMQTTAgent Connection

35109 [WIFI_MQTT] [INFO] Creating a TLS connection.
37238 [WIFI_MQTT] [DEBUG] Creating command queue.
37238 [WIFI_MQTT] [INFO] adMQTTAgentInit Success 
37239 [WIFI_MQTT] [DEBUG] Encoded size for length 21 is 1 bytes.
37239 [WIFI_MQTT] [DEBUG] CONNECT packet remaining length=21 and packet size=23.
37239 [WIFI_MQTT] [DEBUG] CONNECT packet size is 23 and remaining length is 21.
37239 [WIFI_MQTT] [DEBUG] Encoded size for length 21 is 1 bytes.
37239 [WIFI_MQTT] [DEBUG] Length of serialized CONNECT packet is 23.
37240 [WIFI_MQTT] [DEBUG] BytesSent=23, BytesRemaining=0
37240 [WIFI_MQTT] [DEBUG] Successfully sent packet at time 37240.
37240 [WIFI_MQTT] [DEBUG] Sent 23 bytes of CONNECT packet.
37435 [WIFI_MQTT] [DEBUG] No data was received from the transport.
37540 [WIFI_MQTT] [DEBUG] Encoded size for length 2 is 1 bytes.
37541 [WIFI_MQTT] [DEBUG] BytesReceived=2, BytesRemaining=0, TotalBytesReceived=2.
37541 [WIFI_MQTT] [INFO] Packet received. ReceivedBytes=2.
37541 [WIFI_MQTT] [INFO] CONNACK session present bit not set.
37541 [WIFI_MQTT] [INFO] Connection accepted.
37542 [WIFI_MQTT] [INFO] Received MQTT CONNACK successfully from broker.
37542 [WIFI_MQTT] [INFO] MQTT connection established with the broker.
37542 [WIFI_MQTT] [INFO] [LOG] MQTT-Agent Connection success 

37542 [WIFI_MQTT] [INFO] [LOG] Success creating MQTT connection

CoreMQTTAgent Publish

53908 [WIFI_MQTT] [INFO] Sent PUBLISH packet to broker.. 

54107 [MQTT Agent] [DEBUG] No data was received from the transport.
54107 [MQTT Agent] [INFO] Publishing message...

54108 [MQTT Agent] [DEBUG] Encoded size for length 15127 is 2 bytes.
54108 [MQTT Agent] [DEBUG] Encoded size for length 15127 is 2 bytes.
54108 [MQTT Agent] [DEBUG] PUBLISH packet remaining length=15127 and packet size=15130.
54108 [MQTT Agent] [DEBUG] PUBLISH packet size is 15130 and remaining length is 15127.
54109 [MQTT Agent] [DEBUG] Encoded size for length 15127 is 2 bytes.
54109 [MQTT Agent] [DEBUG] Serialized PUBLISH header size is 86.
54109 [MQTT Agent] [DEBUG] BytesSent=86, BytesRemaining=0
54109 [MQTT Agent] [DEBUG] Successfully sent packet at time 54109.
54110 [MQTT Agent] [DEBUG] Sent 86 bytes of PUBLISH header.
54117 [MQTT Agent] [DEBUG] BytesSent=15044, BytesRemaining=0
54117 [MQTT Agent] [DEBUG] Successfully sent packet at time 54117.
54117 [MQTT Agent] [DEBUG] Sent 15044 bytes of PUBLISH payload.
54117 [MQTT Agent] [INFO] MQTTAgentCmdCompleteCallback return status [0]
54118 [MQTT Agent] [DEBUG] Returned Command Context 1 to pool
54118 [WIFI_MQTT] [INFO] Sent PUBLISH packet to broker ...



Regards,
Suresh

Hi, using coreMQTT-Agent is slower than directly using coreMQTT. While coreMQTT is not thread-safe, its APIs can be protected by a mutex. The new release of coreMQTT, version v2.0.0, also provides hooks which can be configured with mutexes to make the API thread-safe (See coreMQTT/core_mqtt.c at dafa7c89e6d36802640201bd1cf56c460d5881bc · FreeRTOS/coreMQTT · GitHub)

Removing coreMQTT-Agent and directly using coreMQTT might solve the speed problems.

Ahh, sorry, missed that you had also tried coreMQTT.
Just to confirm; the logs are from calling the functions from demos/common/mqtt_demo_helpers/mqtt_demo_helpers.c, in the 202107 release. That file is using coreMQTT; is the coreMQTT-Agent code used for testing the one from demos/coreMQTT_Agent/simple_sub_pub_demo.c? Was the file you used for measuring 202002 demos/mqtt/iot_demo_mqtt.c?

@Suresh, Is it possible for you to share a sample project that we can use to repro the problem?

I too have noticed that using agent is quite slow. Notably I have been measuring a publish cycle time of 2.9 seconds at QoS 0 on an ESP32. I did some measuring and the actual time it spends calling the coreMQTT publish is about 2ms.

So somewhere the agent is blocked for that time. Reducing the “MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME” from the default of 1000ms to 100 ms has mnimal impact.

Notably changing to QoS 1 only increases the cycle time by 200ms or so.

Hello @txf,

Notably I have been measuring a publish cycle time of 2.9 seconds at QoS 0 on an ESP32

Ouch! That is too much! When we say that the agent is slow, it is not slow by that much.
Would you mind telling us how many threads are trying to access the agent and what are their relative priorities? Also, what is the relative priority of the Agent? Can you try using just one thread - just as an experiment?

Reducing the “MQTT_AGENT_MAX_EVENT_QUEUE_WAIT_TIME” from the default of 1000ms to 100 ms has mnimal impact.

I expect this not to help as this would reduce the time that one thread is waiting for a queue space to become available - but it doesn’t mean that the thread will get to publish a message quicker.

If the mqtt-Agent has a priority which is lower than any of the threads using the agent, then while the thread run, the agent cannot. What this means is that agent will wait for the producer thread to stop (maybe to block on a queue). Once the producer thread stops, the agent can do it’s work. That is why it might be taking as long as ~3seconds!

Let me know what you think.

Thanks,
Aniruddha

Currently, I have 3 threads:

Sensor Data Measure and Publish [priority 1] (this only publishes once per loop, where I measured the delay).

Config Shadow Thread [priority 2] (this thread is mostly shadow API subscriptions, only publishes infrequently).

Ota Thread [priority 1] (I am using the OTA library, It spends most of its time listening for new jobs) (OTA Agent is at priority 4).

The Agent is at Priority 4. Along with the OTA Agent these are the highest “User” priority tasks.

I just did a test with just the sensor publish task.

Edit: No change, I get exactly the same publish cycle time.

(I will note that before the edit, I had made the mistake of using the same mqtt client id on two devices and I was get lower cycle times in the publish while getting agent connections and disconnections).

Out of curiosity, in an ideal case what would be the cycle times for just one publish process at QoS 0.

Surely the if one is waiting for the callback to indicate publish success, it should be called as soon as the publish is complete?

Hello!

That is a tricky question as it depends on a lot of variables including but not limited to your processor speed, the TCP stack you use, the speed of your internet connection etc.
But, a small “Hello world” QoS0 publish sent by a task in a case when there are no other tasks except the agent should not take more than a few milliseconds at the maximum. But, this is just a very high level estimation.
That is why I said that the 2.9 seconds that you are observing is quite a huge deal!

I think that the configuration must have something to do with this. Otherwise, for a small publish the flow would be as follows:
Publishing task posts to the Agent queue → The MQTT Agent task being the higher priority wakes up and reads from the queue → It sends the message over the internet → Sees that it is QoS0 and calls the user callback saying that the publish is successful and blocks on the queue again → The publisher wakes up and continues with its work.

'|' represents running/active task
'\' represents blocked/preempted task 

Publisher           Agent
   |                  \
   |                  \
   |                  \
   | -----Publish---->\  <Posting to the queue wakes up the agent>
   \                  |
   \                  |
   \                  | ------ Send data over internet --->
   \                  |
   \ <--Call callback-| <Tell task that the QoS0 publish was successful>
   \                  |
   \                  | <Nothing to do. Block on the queue>
   \                  | <The Publisher can run again as the agent
   |                  \  is blocked now. >
   |                  \
   |                  \

Now coming to your previous post:

That is very curious. Can you put a breakpoint in the code and see whether the Agent queue is empty or whether it is full? Also, maybe you can step through the code to see what happens when the code seems to be waiting for 2.9 seconds? It might be a configuration error I think.

Let me know if that answers your question.

Thanks,
Aniruddha

I figured out what was causing the delay. The Espressif releases of these libraries are using synchronous tls commands.

so in the ports folder of mqtt where the interface functions for network transport (network_transport.c) are defined there is this:

    esp_tls_cfg_t xEspTlsConfig = {
        .cacert_buf = (const unsigned char*) ( pxNetworkContext->pcServerRootCAPem ),
        .cacert_bytes = strlen( pxNetworkContext->pcServerRootCAPem ) + 1,
        .clientcert_buf = (const unsigned char*) ( pxNetworkContext->pcClientCertPem ),
        .clientcert_bytes = strlen( pxNetworkContext->pcClientCertPem ) + 1,
        .skip_common_name = pxNetworkContext->disableSni,
        .alpn_protos = pxNetworkContext->pAlpnProtos,
#if CONFIG_CORE_MQTT_USE_SECURE_ELEMENT
        .use_secure_element = true,
#elif CONFIG_CORE_MQTT_USE_DS_PERIPHERAL
        .ds_data = pxNetworkContext->ds_data,
#else
        .use_secure_element = false,
        .ds_data = NULL,
        .clientkey_buf = ( const unsigned char* )( pxNetworkContext->pcClientKeyPem ),
        .clientkey_bytes = strlen( pxNetworkContext->pcClientKeyPem ) + 1,
#endif
        .timeout_ms = 3000,
    };

so when MQTT_ProcessLoop() is called:

int32_t espTlsTransportRecv(NetworkContext_t* pxNetworkContext,
    void* pvData, size_t uxDataLen)
{
    if (pvData == NULL || uxDataLen == 0)
    {
        return -1;
    }
    int32_t lBytesRead = 0;
    if(pxNetworkContext != NULL && pxNetworkContext->pxTls != NULL)
    {
        xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
        lBytesRead = esp_tls_conn_read(pxNetworkContext->pxTls, pvData, uxDataLen);
        xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);
    }
    else
    {
        return -1; /* pxNetworkContext or pxTls uninitialised */
    }
    if (lBytesRead == ESP_TLS_ERR_SSL_WANT_WRITE  || lBytesRead == ESP_TLS_ERR_SSL_WANT_READ) {
        return 0;
    }
    if (lBytesRead < 0) {
        return lBytesRead;
    }
    if (lBytesRead == 0) {
        /* Connection closed */
        return -1;
    }
    return lBytesRead;
}

esp_tls_conn_read() will block for 3 seconds due to .timeout_ms = 3000.

The latest LTS release lowered it down to 1000.

Is there a significant problem in lowering it even more?

@txf For optimal performance, it would be best to set your esp_tls connection to non-blocking mode and use select() to wait for an incoming message of any size rather than the default behavior of esp_tls_conn_read which attempts to fill the entire buffer up until the timeout.

When initializing your esp_tls connection, be set to set the non_block field to true.

You can get the current socket handle using the esp_tls_get_conn_sockfd() function call and from there, do something like the following:

int32_t espTlsTransportRecv(NetworkContext_t* pxNetworkContext,
                            void* pvData, size_t uxDataLen)
{
    int32_t lBytesRead = -1;

    if( ( pvData != NULL ) &&
        ( uxDataLen > 0 ) &&
        ( pxNetworkContext != NULL ) &&
        ( pxNetworkContext->pxTls != NULL ) )
    {
        int lSockFd = -1;

        // TODO: Is a semaphore really necessary here?
        xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
        {
            lSockFd = esp_tls_get_conn_sockfd(pxNetworkContext->pxTls);
        }
        xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);

        if( lSockFd >= 0 )
        {
            int lResult = -1;
            fd_set xReadSet;
            fd_set xErrorSet;
            FD_SET(sockFd, &xReadSet);
            FD_SET(sockFd, &xErrorSet);

            // TODO: set a timeout for the select call
            lResult = select(sockFd + 1, &xReadSet, NULL, &xErrorSet, NULL );

            if( ( lResult > 0 ) &&
                FD_ISSET( sockFd, &xReadSet ) )
            {
                xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
                {
                    lBytesRead = ( int32_t ) esp_tls_conn_read(pxNetworkContext->pxTls, pvData, uxDataLen);
                }
                xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);
            }
            else if( ( lResult == -1 ) &&
                     FD_ISSET( sockFd, &xErrorSet ) )
            {
                /* Socket Error */
                lBytesRead = -1;
            }
            else
            {
                lBytesRead = 0;
            }
        }
    }
    if( ( lBytesRead == ESP_TLS_ERR_SSL_WANT_WRITE ) || 
        ( lBytesRead == ESP_TLS_ERR_SSL_WANT_READ) ) {
        lBytesRead = 0;
    }
    return lBytesRead;
}

Be sure to use the latest version of coreMQTT for best results. Previous versions had a bug which prevented proper handling of messages that arrive across multiple calls to the transport receive function.

@PaulB-AWS Thanks.

Unfortunately, using this method, I can’t even establish a connection to the broker.

I had to fix up some of the functions and variables:

int32_t espTlsTransportRecv(NetworkContext_t* pxNetworkContext,
                            void* pvData, size_t uxDataLen)
{
    int32_t lBytesRead = -1;

    if( ( pvData != NULL ) &&
        ( uxDataLen > 0 ) &&
        ( pxNetworkContext != NULL ) &&
        ( pxNetworkContext->pxTls != NULL ) )
    {
        int lSockFd = -1;

        // TODO: Is a semaphore really necessary here?
        xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
        {
            /*lSockFd =*/ esp_tls_get_conn_sockfd(pxNetworkContext->pxTls,&lSockFd);
        }
        xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);

        if( lSockFd >= 0 )
        {
            int lResult = -1;
            fd_set xReadSet;
            fd_set xErrorSet;
            FD_SET(lSockFd, &xReadSet);
            FD_SET(lSockFd, &xErrorSet);
            struct timeval test = {.tv_sec=1, .tv_usec=0};
            // TODO: set a timeout for the select call
            lResult = select(lSockFd + 1, &xReadSet, NULL, &xErrorSet, &test);

            if( ( lResult > 0 ) &&
                FD_ISSET( lSockFd, &xReadSet ) )
            {
                xSemaphoreTake(pxNetworkContext->xTlsContextSemaphore, portMAX_DELAY);
                {
                    lBytesRead = ( int32_t ) esp_tls_conn_read(pxNetworkContext->pxTls, pvData, uxDataLen);
                }
                xSemaphoreGive(pxNetworkContext->xTlsContextSemaphore);
            }
            else if( ( lResult == -1 ) &&
                     FD_ISSET( lSockFd, &xErrorSet ) )
            {
                /* Socket Error */
                lBytesRead = -1;
            }
            else
            {
                lBytesRead = 0;
            }
        }
    }
    if( ( lBytesRead == ESP_TLS_ERR_SSL_WANT_WRITE ) || 
        ( lBytesRead == ESP_TLS_ERR_SSL_WANT_READ) ) {
        lBytesRead = 0;
    }
    return lBytesRead;
}

Just bumping the thread with some additional info, in case somebody stumbles upon it in a search.

The non-blocking solution provided above by @PaulB-AWS does not appear to be necessary.

All that is required is to set the .non_block flag in the tls config struct. and increase the timeout to something large like 10000 (which is necessary to prevent excessive timeouts during connection).

However, a word of warning, if you use theOTA lib, you cannot make multiple block requests as those will constantly bring the connection down. It works with one block request at a time.

Presumably if your message rate is similar to that of OTA, non_block will have issues with that too.