Subscribe example without TLS

Hi, I am looking for subscribing to a topic example without TLS(like plaintext), In the SDK I see a sample(plaintext) where it subscribes and publishes to the same topic to get the subscription working, Can anyone please get me an example which does subscribe to a topic and waits till it receives a published message from the broker?

Please elaborate on how your request is different to the example you reference. The example publishes and subscribes to the same topic so traffic goes in both direction. You could alternatively subscribe to a different topic, then wait for data, but something else would then have to publish to that different topic before the device received anything. You could send to the topic from any MQTT client - including the AWS IoT test client accessible from the AWS IoT console - but you can’t make a plain text connection to AWS IoT.

Hi @rtel, I am trying to connect to Mosquitto broker for plaintext example, We need both TLS and without TLS connections for our application for scalability.

I have written a sample code to understand the Linux aws iot SDK API’s, I have attached my sample code, I have few doubts can you please help me to understand it?

Q1: In MQTT_Init function, I am using my own MQTTGetCurrentTimeFunc_t function which returns 0 and I declared the below macro’s in core_mqtt_config.h as per the document, I have used value 0 to timeoutMs in MQTT_Connect function as per the document, Now i see an error as shown below:

#define MQTT_RECV_POLLING_TIMEOUT_MS 0
#define MQTT_MAX_CONNACK_RECEIVE_RETRY_COUNT 10
#define MQTT_SEND_TIMEOUT_MS 0

Error:

[INFO] [DEMO] [mqtt_demo_plaintext.c:99] Creating an MQTT connection to 192.155.70.63.
[ERROR] [MQTT] [core_mqtt.c:815] sendMessageVector: Unable to send packet: Timed out.
[ERROR] [MQTT] [core_mqtt.c:2689] MQTT connection failed with status = MQTTSendFailed.
[ERROR] [DEMO] [mqtt_demo_plaintext.c:134] Connection with MQTT broker failed with status MQTTSendFailed.

When i comment “#define MQTT_SEND_TIMEOUT_MS 0” then connection is successful.

Q2: After subscribing i am using MQTT_ReceiveLoop, Because i am using my own dummy MQTTGetCurrentTimeFunc_t function. I see the below error in idle state.

[INFO] [DEMO] [mqtt_demo_plaintext.c:99] Creating an MQTT connection to 192.155.70.63.
[INFO] [MQTT] [core_mqtt.c:2680] MQTT connection established with the broker.
[INFO] [DEMO] [mqtt_demo_plaintext.c:139] Subscribing to the MQTT topic listener/#.
[INFO] [DEMO] [mqtt_demo_plaintext.c:176] Success
[ERROR] [Transport_Plaintext_Sockets] [plaintext_posix.c:58] A transport error occurred: Resource temporarily unavailable.
[ERROR] [MQTT] [core_mqtt.c:1722] Call to receiveSingleIteration failed. Status=MQTTRecvFailed
[ERROR] [DEMO] [mqtt_demo_plaintext.c:171] Receive Loop error
[ERROR] [Transport_Plaintext_Sockets] [plaintext_posix.c:58] A transport error occurred: Resource temporarily unavailable.
[ERROR] [MQTT] [core_mqtt.c:1722] Call to receiveSingleIteration failed. Status=MQTTRecvFailed
[ERROR] [DEMO] [mqtt_demo_plaintext.c:171] Receive Loop error
[ERROR] [Transport_Plaintext_Sockets] [plaintext_posix.c:58] A transport error occurred: Resource temporarily unavailable.
[ERROR] [MQTT] [core_mqtt.c:1722] Call to receiveSingleIteration failed. Status=MQTTRecvFailed
[ERROR] [DEMO] [mqtt_demo_plaintext.c:171] Receive Loop error
/* Standard includes. */
#include <assert.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>

#include "demo_config.h"
#include "core_mqtt.h"
#include "plaintext_posix.h"
#include "backoff_algorithm.h"
#include "clock.h"

#define BROKER_ENDPOINT "*******************"
#define CLIENT_IDENTIFIER "Karthik_test1"
#define BROKER_PORT 1883
#define MQTT_USERNAME "*******"
#define MQTT_PASSWORD "***********"
#define MQTT_USERNAME_LENGTH                 ( ( uint16_t ) ( sizeof( MQTT_USERNAME ) - 1 ) )
#define MQTT_PASSWORD_LENGTH                 ( ( uint16_t ) ( sizeof( MQTT_PASSWORD ) - 1 ) )
#define CLIENT_IDENTIFIER_LENGTH                 ( ( uint16_t ) ( sizeof( CLIENT_IDENTIFIER ) - 1 ) )
#define BROKER_ENDPOINT_LENGTH                   ( ( uint16_t ) ( sizeof( BROKER_ENDPOINT ) - 1 ) )

#define MQTT_EXAMPLE_TOPIC                  "listener/#"
#define MQTT_EXAMPLE_TOPIC_LENGTH           ( ( uint16_t ) ( sizeof( MQTT_EXAMPLE_TOPIC ) - 1 ) )


#define TRANSPORT_SEND_RECV_TIMEOUT_MS      ( 1000 )
#define MQTT_KEEP_ALIVE_INTERVAL_SECONDS    ( 60U )

struct NetworkContext
{
    PlaintextParams_t * pParams;
};

#define NETWORK_BUFFER_SIZE    ( 1024U )
static uint8_t buffer[ NETWORK_BUFFER_SIZE ];
static MQTTSubscribeInfo_t pGlobalSubscriptionList[ 1 ];
static uint16_t globalSubscribePacketIdentifier = 0U;
static uint16_t globalUnsubscribePacketIdentifier = 0U;

static void eventCallback( MQTTContext_t * pMqttContext,
                           MQTTPacketInfo_t * pPacketInfo,
                           MQTTDeserializedInfo_t * pDeserializedInfo )
{
    uint16_t packetIdentifier;
    packetIdentifier = pDeserializedInfo->packetIdentifier;

    if( ( pPacketInfo->type & 0xF0U ) == MQTT_PACKET_TYPE_PUBLISH )
    {
        assert( pDeserializedInfo->pPublishInfo != NULL );
        LogInfo( ( "Incoming Publish Topic Name: %.*s matches subscribed topic.\n"
        "Incoming Publish message Packet Id is %u.\n"
        "Incoming Publish Message : %.*s.\n\n",
        pDeserializedInfo->pPublishInfo->topicNameLength,
        pDeserializedInfo->pPublishInfo->pTopicName,
        pDeserializedInfo->packetIdentifier,
        ( int ) pDeserializedInfo->pPublishInfo->payloadLength,
        ( const char * ) pDeserializedInfo->pPublishInfo->pPayload ) );
    }
}


uint32_t Clock_cust_GetTimeMs( void )
{
    return 0;
}

int main()
{
    int returnStatus = EXIT_SUCCESS;
    SocketStatus_t socketStatus = SOCKETS_SUCCESS;
    MQTTContext_t mqttContext = { 0 };
    NetworkContext_t networkContext = { 0 };
    PlaintextParams_t plaintextParams = { 0 };
    TransportInterface_t transport;
    MQTTFixedBuffer_t networkBuffer;
    MQTTStatus_t mqttStatus;
    MQTTConnectInfo_t connectInfo;
    bool sessionPresent;

    networkContext.pParams = &plaintextParams;

    struct timespec tp;
    ( void ) clock_gettime( CLOCK_REALTIME, &tp );
    srand( tp.tv_nsec );

    ServerInfo_t serverInfo;
    serverInfo.pHostName = BROKER_ENDPOINT;
    serverInfo.hostNameLength = BROKER_ENDPOINT_LENGTH;
    serverInfo.port = BROKER_PORT;

    socketStatus = Plaintext_Connect( &networkContext,
                                    &serverInfo,
                                    TRANSPORT_SEND_RECV_TIMEOUT_MS,
                                    TRANSPORT_SEND_RECV_TIMEOUT_MS );
    if( socketStatus == SOCKETS_SUCCESS )
    {
        LogInfo( ( "Creating an MQTT connection to %.*s.",
            BROKER_ENDPOINT_LENGTH,
            BROKER_ENDPOINT ) );
        transport.pNetworkContext = &networkContext;
        transport.send = Plaintext_Send;
        transport.recv = Plaintext_Recv;
        transport.writev = NULL;
        
        networkBuffer.pBuffer = buffer;
        networkBuffer.size = NETWORK_BUFFER_SIZE;
        mqttStatus = MQTT_Init( &mqttContext,
                                &transport,
                                Clock_cust_GetTimeMs,
                                eventCallback,
                                &networkBuffer );

        if( mqttStatus != MQTTSuccess )
        {
            LogError( ( "MQTT_Init failed: Status = %s.", MQTT_Status_strerror( mqttStatus ) ) );
            Plaintext_Disconnect(&networkContext);
            return 1;
        }

        connectInfo.cleanSession = true;
        connectInfo.pClientIdentifier = CLIENT_IDENTIFIER;
        connectInfo.clientIdentifierLength = CLIENT_IDENTIFIER_LENGTH;
        connectInfo.keepAliveSeconds = MQTT_KEEP_ALIVE_INTERVAL_SECONDS;
        connectInfo.pUserName = MQTT_USERNAME;
        connectInfo.userNameLength = MQTT_USERNAME_LENGTH;
        connectInfo.pPassword = MQTT_PASSWORD;
        connectInfo.passwordLength = MQTT_PASSWORD_LENGTH;

        mqttStatus = MQTT_Connect( &mqttContext, &connectInfo, NULL, 10, &sessionPresent );
        if( mqttStatus != MQTTSuccess )
        {
            LogError( ( "Connection with MQTT broker failed with status %s.",
                        MQTT_Status_strerror( mqttStatus ) ) );
            return 1;
        }

        LogInfo( ( "Subscribing to the MQTT topic %.*s.",
            MQTT_EXAMPLE_TOPIC_LENGTH,
            MQTT_EXAMPLE_TOPIC ) );
        ( void ) memset( ( void * ) pGlobalSubscriptionList, 0x00, sizeof( pGlobalSubscriptionList ) );
        pGlobalSubscriptionList[ 0 ].qos = MQTTQoS0;
        pGlobalSubscriptionList[ 0 ].pTopicFilter = MQTT_EXAMPLE_TOPIC;
        pGlobalSubscriptionList[ 0 ].topicFilterLength = MQTT_EXAMPLE_TOPIC_LENGTH;
        globalSubscribePacketIdentifier = MQTT_GetPacketId( &mqttContext );

        mqttStatus = MQTT_Subscribe( &mqttContext,
                                pGlobalSubscriptionList,
                                sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
                                globalSubscribePacketIdentifier );
        if( mqttStatus != MQTTSuccess ) {
            LogError( ( "Failed to send SUBSCRIBE packet to broker with error = %s.",
                    MQTT_Status_strerror( mqttStatus ) ) );
            mqttStatus = MQTT_Disconnect( &mqttContext );
            if( mqttStatus != MQTTSuccess ) {
                LogError( ( "Sending MQTT DISCONNECT failed with status=%s.",
                    MQTT_Status_strerror( mqttStatus ) ) );
                return 1;
            }
            Plaintext_Disconnect(&networkContext);
            return 1;
        }


        while( true )
        {
            mqttStatus = MQTT_ReceiveLoop( &mqttContext );
            if( mqttStatus != MQTTSuccess && mqttStatus != MQTTNeedMoreBytes )
            {
                LogError(("Receive Loop error"));
                //break;
            }
            else
            {
                LogInfo(("Success"));
            }
        }

        LogInfo( ( "UNSUBSCRIBE sent for topic %.*s to broker.\n\n",
                   MQTT_EXAMPLE_TOPIC_LENGTH,
                   MQTT_EXAMPLE_TOPIC ) );
        

        globalUnsubscribePacketIdentifier = MQTT_GetPacketId( &mqttContext );
        mqttStatus = MQTT_Unsubscribe( &mqttContext,
                                pGlobalSubscriptionList,
                                sizeof( pGlobalSubscriptionList ) / sizeof( MQTTSubscribeInfo_t ),
                                globalUnsubscribePacketIdentifier );
        if( mqttStatus != MQTTSuccess )
        {
            LogError( ( "Failed to send UNSUBSCRIBE packet to broker with error = %s.",
                        MQTT_Status_strerror( mqttStatus ) ) );
        }

        mqttStatus = MQTT_Disconnect( &mqttContext );
        if( mqttStatus != MQTTSuccess ) {
            LogError( ( "Sending MQTT DISCONNECT failed with status=%s.",
                MQTT_Status_strerror( mqttStatus ) ) );
        }
        Plaintext_Disconnect(&networkContext);

    } else {
        LogError(("Socket connection failed"));
        return 1;
    }

    return 0;
}


Ignore those error messages for a moment and try to publish a message and see if your device receives it.

@aggarg, If i ignore the below error, Publish from broker works, So how to make it blocking till we receive a message from a broker? and how to use MQTTGetCurrentTimeFunc_t as a dummy function ?

[ERROR] [Transport_Plaintext_Sockets] [plaintext_posix.c:58] A transport error occurred: Resource temporarily unavailable.
[ERROR] [MQTT] [core_mqtt.c:1722] Call to receiveSingleIteration failed. Status=MQTTRecvFailed