Shared Wifi resources and Network connection. Complex Question about using Semaphore

I would like the following help:

I have a complex hardware that works with ESP32 S3 16MB of Ram and ESP IDF 5

The function of this hardware is to connect to Wifi and ping different IP devices.

This ping command is not done multiple times. I mean that I don’t call the ping function several times. I wait for one ping to finish and start another.

I did it this way, because I understand that it was safer.

However, on this same hardware I have two MQTT connections, one connection sends information every 1 minute and another connection listens to a topic that can receive data every moment.

My question is is this connection to the wifi hardware a system restriction? Do I have to apply a Semaphoro or event group to each thing?

Currently MQTT and Ping run at the same time. But I don’t know if it’s correct.

What do you think ?

I’m going to paste a part of the code here so you can understand a little of the context and also a photo of the product.


void v_Pingador(void *parameters) {
    while(1){
        for (int i = 0; i < MAX_IP_COUNT; i++) {
            if (Device[i].Servico != NULL && strcmp(Device[i].Servico, "ping") == 0) {
                if(f_Ipv4Valid(Device[i].ip_addresses)){
                    xEventGroupWaitBits(xEventGroupWifi, WIFI_BIT_0, pdFALSE, pdFALSE, portMAX_DELAY);
                    xSemaphoreTake(SemaphorePing, portMAX_DELAY);
                    f_Ping((void *)&Device[i]);
                }
            }
        }
    }
}

void f_CoreSubscribe(){
    for (int i = 0; i < MAX_IP_COUNT; i++) {
        if (Device[i].Servico != NULL && strcmp(Device[i].Servico, "mqtt") == 0) {
            if(Device[i].topico != NULL){
                if(f_ValidaTopico(Device[i].topico)){
                    f_subscribe(Device[i].topico);
                }
            }
        }
    }
}

void v_Display_0(void *pvParameters) {
    int8_t Alarme_Device;
    int32_t TimeOut_Count[MAX_IP_COUNT];
    unsigned long received_item;
    char ms_str[22];
    char ms_str2[26];
    while(1) {
        char * receivedMqtt;
        for (int i=0; i<MAX_IP_COUNT;i++){
            xEventGroupWaitBits(EventG_Display, BIT_0, pdFALSE, pdFALSE, portMAX_DELAY);
            if (Device[i].xQueue!=NULL){
                if (Device[i].Servico != NULL && strcmp(Device[i].Servico, "ping") == 0){
                        if (xQueueReceive(Device[i].xQueue, &received_item, 0)) {
                            if (received_item != UINT32_MAX){
                                snprintf(ms_str, sizeof(ms_str), "Tempo - %ldms", received_item);
                                f_Mostra(ms_str, Device[i].ip_addresses, Device[i].Device_Name, Clear_OFF, i);
                                TimeOut_Count[i] = 0;
                            }else{
                                snprintf(ms_str2, sizeof(ms_str2), "%s - Outs:%lu", Device[i].Device_Name, TimeOut_Count[i]+1);
                                f_Mostra("TIME OUT!!", Device[i].ip_addresses, ms_str2, Clear_ON, i);
                                TimeOut_Count[i]++;
                            }
                            EventBits_t bits = xEventGroupGetBits(EventG_Alarme);
                            if(TimeOut_Count[i]>Alarme_NumAtivarStatus && !(bits & BIT_0)){
                                xEventGroupSetBits(EventG_Alarme, BIT_0);
                                f_Led(LED_VERDE, Led_Desliga);
                                urlFinal = f_urlFinal(AlarmeWebHookURL_val, AlarmeWebHookText_val, Device[i].Device_Name, Device[i].ip_addresses);
                                Alarme_Device = i;
                                AlarmeSonoroSuprimir = true;
                            }else if(i == Alarme_Device && TimeOut_Count[i] == 0 && (bits & BIT_0)){
                                xEventGroupClearBits(EventG_Alarme, BIT_0);
                                AlarmeSonoroSuprimir = false;
                                f_Led(LED_VERDE, Led_Liga);
                                ESP_LOGW(TAG, "Led Verde...ligado");
                            }
                        }
                } else if (Device[i].Servico != NULL && strcmp(Device[i].Servico, "mqtt") == 0){
                    if (xQueueReceive(Device[i].xQueue, &receivedMqtt, 0)) {
                        f_Mostra(receivedMqtt, Device[i].topico,Device[i].Device_Name, Clear_OFF, i);
                    }
                    
                }
            }
            else{
                if (Texto_Display_Vazio != NULL && (strlen(Texto_Display_Vazio)>3)) {
                    f_Mostra("", "", Texto_Display_Vazio, Clear_OFF, i);
                }
            }
            //ESP_LOGW(TAG, "Device[%i]", i);
        }
        vPortYield();
    }
}

and

The ping code:


void on_ping_success(esp_ping_handle_t hdl, void *args) {
        uint32_t elapsed_time_ms;
        esp_err_t ret;
        ip_addr_t target_addr;
        device_t *Device_X = (device_t *)args;
        ret = esp_ping_get_profile(hdl, ESP_PING_PROF_TIMEGAP, &elapsed_time_ms, sizeof(uint32_t));
        if (ret != ESP_OK) {ESP_LOGE("TAG", "Erro ao obter o tempo decorrido: %d", ret);return;}
        ret = esp_ping_get_profile(hdl, ESP_PING_PROF_IPADDR, &target_addr, sizeof(ip_addr_t));
        if (ret != ESP_OK) {ESP_LOGE("TAG", "Erro ao obter o endereço IP alvo: %d", ret);return;}
        xQueueSend(Device_X->xQueue, &elapsed_time_ms, 100);
}

void on_ping_timeout(esp_ping_handle_t hdl, void *args) {
    esp_err_t ret;
    ip_addr_t target_addr;
    device_t *Device_X = (device_t *)args;
    ret = esp_ping_get_profile(hdl, ESP_PING_PROF_IPADDR, &target_addr, sizeof(ip_addr_t));
    if (ret != ESP_OK) {ESP_LOGE("TAG", "Erro ao obter o endereço IP alvo: %d", ret);return;}
    uint32_t Max =  UINT32_MAX;
    xQueueSend(Device_X->xQueue, &Max, 100);
}

void on_ping_end(esp_ping_handle_t hdl, void *args) {
        esp_err_t ret;
        ret = esp_ping_delete_session(hdl);
        if (ret != ESP_OK) {printf("Erro ao encerrar a sessão de ping: %d\n", ret);return;}
        ip_addr_t target_addr;
        ret = esp_ping_get_profile(hdl, ESP_PING_PROF_IPADDR, &target_addr, sizeof(ip_addr_t));
        xSemaphoreGive(SemaphorePing);
}

void f_Ping(void *parameters) {
        device_t *Device_X = (device_t *)parameters;
        if (f_PingValida(Device_X)==false) {xSemaphoreGive(SemaphorePing);return;}
        ip_addr_t ip;
        ip.u_addr.ip4.addr = esp_ip4addr_aton(Device_X->ip_addresses);
        ip.type = IPADDR_TYPE_V4;
        esp_ping_config_t ping_config = {
            .target_addr = ip, // Endereço IP do host a ser pingado
            .count = Device_X->count, // Número de pacotes a serem enviados
            .interval_ms = Device_X->interval_ms, // Intervalo entre os envios de pacotes (em milissegundos)
            .data_size = Device_X->data_size, // Tamanho dos dados do pacote ICMP
            .timeout_ms = Device_X->timeout_ms, // Tempo limite para esperar por uma resposta (em milissegundos)
            .tos = Device_X->tos, // Tipo de Serviço (TOS) para o pacote IP
            .ttl = Device_X->ttl, // Tempo de Vida (TTL) do pacote IP
            //.interface = NULL, // Interface de rede a ser usada para o ping (NULL para usar a interface padrão)
            .task_stack_size = 3000,
            .task_prio = 5
        };
        esp_ping_callbacks_t ping_callbacks = {.on_ping_success = on_ping_success,.on_ping_timeout = on_ping_timeout,.on_ping_end = on_ping_end,.cb_args = (void *)Device_X};
        esp_ping_handle_t ping_handle;
        esp_err_t ret = esp_ping_new_session(&ping_config, &ping_callbacks, &ping_handle);
        if (ret != ESP_OK) {printf("Erro ao criar a sessão de ping: %d\n", ret);ESP_LOGE(TAG, "IP[%s]", Device_X->ip_addresses);xSemaphoreGive(SemaphorePing);return;}
        ret = esp_ping_start(ping_handle);
        if (ret != ESP_OK) {printf("Erro ao iniciar o ping: %d\n", ret);ESP_LOGW(TAG, "IP[%s]", Device_X->ip_addresses);xSemaphoreGive(SemaphorePing);return;}
}

The MQTT Code:


//-----------------------------------------------------------------------------------------------------------------------
//---MQTT 1--------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------
        static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data){
            ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id);
            esp_mqtt_event_handle_t event = event_data;
            esp_mqtt_client_handle_t client = event->client;
            switch ((esp_mqtt_event_id_t)event_id) {
            case MQTT_EVENT_CONNECTED:
                ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
                f_CoreSubscribe();
                break;
            case MQTT_EVENT_DISCONNECTED:
                ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
                break;
            case MQTT_EVENT_DATA:
                char *topic = strndup(event->topic, event->topic_len);
                char *data = strndup(event->data, event->data_len);
                if (topic && data) {f_SendDataMqtt(topic, data);} else {ESP_LOGE(TAG, "Falha ao alocar memória para tópico ou payload");}
                free(topic);free(data);
                break;
            case MQTT_EVENT_ERROR:
                ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
                if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
                    log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
                    log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
                    log_error_if_nonzero("captured as transport's socket errno",  event->error_handle->esp_transport_sock_errno);
                    ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
                }
                break;
            default:
                ESP_LOGI(TAG, "Other event id:%d", event->event_id);
                break;
            }
        }
//-----------------------------------------------------------------------------------------------------------------------
//---MQTT 2--------------------------------------------------------------------------------------------------------------
//-----------------------------------------------------------------------------------------------------------------------
        static void mqtt_event_handler_out(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data){
            ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%" PRIi32 "", base, event_id);
            esp_mqtt_event_handle_t event = event_data;
            esp_mqtt_client_handle_t client = event->client;
            switch ((esp_mqtt_event_id_t)event_id) {
            case MQTT_EVENT_CONNECTED:
                ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED");
                esp_mqtt_client_subscribe(client, TOPICO_OTA, 0);
                esp_mqtt_client_subscribe(client, TOPICO_URL, 0);
                esp_mqtt_client_subscribe(client, f_topico(TOPICO_LICENCA), 0);
                f_mqttPublica(f_topico(zMQTT_sufixo), f_prepJsonConfig(mqtt_ON), cMqttOut);
                f_mqttPublica(f_topico(zMQTT_sufixo_Config), f_prepJsonConfig(mqtt_Config), cMqttOut);
                break;
            case MQTT_EVENT_DISCONNECTED:
                ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED");
                break;
            case MQTT_EVENT_DATA:
                ESP_LOGI(TAG, "MQTT_EVENT_DATA");
                printf("TOPIC=%.*s\r\n", event->topic_len, event->topic);
                printf("DATA=%.*s\r\n", event->data_len, event->data);
                //Testa qual tópico foi recebido e começa a fazer as coisas conforme o tópico
                if (strncmp(event->topic, TOPICO_OTA, event->topic_len) == 0) {
                    f_otaMQTT(event->topic, event->topic_len, event->data, event->data_len);
                }
                if (strncmp(event->topic, TOPICO_URL, event->topic_len) == 0) {
                    ESP_LOGW(TAG, "TOPICO URL");//Desenvolver ainda...
                }
                if (strncmp(event->topic, f_topico(TOPICO_LICENCA), event->topic_len) == 0) {
                    cJSON *token = cJSON_GetObjectItem(cJSON_Parse((char *)event->data), "token");
                    if (!cJSON_IsString(token)){ESP_LOGE(TAG, "Erro na leitura do json");}
                    f_nvsSet(zTokenTemp, token->valuestring, false);
                    xTaskCreatePinnedToCore(v_InsertLicenca, "v_InsertLicenca", 8000, NULL, tskIDLE_PRIORITY, NULL, tskNO_AFFINITY);
                }
                break;
            case MQTT_EVENT_ERROR:
                ESP_LOGI(TAG, "MQTT_EVENT_ERROR");
                if (event->error_handle->error_type == MQTT_ERROR_TYPE_TCP_TRANSPORT) {
                    log_error_if_nonzero("reported from esp-tls", event->error_handle->esp_tls_last_esp_err);
                    log_error_if_nonzero("reported from tls stack", event->error_handle->esp_tls_stack_err);
                    log_error_if_nonzero("captured as transport's socket errno",  event->error_handle->esp_transport_sock_errno);
                    ESP_LOGI(TAG, "Last errno string (%s)", strerror(event->error_handle->esp_transport_sock_errno));
                }
                break;
            default:
                ESP_LOGI(TAG, "Other event id:%d", event->event_id);
                break;
            }
        }


void f_mqttPublica(const char * topico, const char * payload, esp_mqtt_client_handle_t handle){
     EventBits_t bits = xEventGroupWaitBits(xEventGroupWifi, WIFI_BIT_0, pdFALSE, pdFALSE, 200);
     if((bits & WIFI_BIT_0)){
          int len = strlen(payload);
          //xSemaphoreTake(SemaphorePing, portMAX_DELAY);
          int resp = esp_mqtt_client_publish(handle, topico,payload,len,0,0);
          if (resp==0){
             ESP_LOGI(TAG, "JSON publicado via MQTT. Com sucesso");
          }else if(resp<0){
             ESP_LOGE(TAG, "JSON publicado via MQTT. Erro na publicação");
          }
          //xSemaphoreGive(SemaphorePing);
     }
}

extern const uint8_t ca_cert_pem_start[] asm("_binary_ca_cert_pem_start");
extern const uint8_t ca_cert_pem_end[] asm("_binary_ca_cert_pem_end");

void f_setupMQTT(void){
    Dados_Mqtt_t DadosMqtt, DadosMqttout;
    if(f_DadosMqtt(&DadosMqtt) == ESP_OK){
                ESP_LOGW(TAG, "MQTT");
                //ESP_LOGW(TAG, "cert:%s", DadosMqtt.mqtt_cert);
                //ESP_LOGW(TAG, "cert:%i", DadosMqtt.mqtt_cert_len);
                esp_mqtt_client_config_t mqtt_cfg = {
                .broker.address.uri = DadosMqtt.mqtt_server,
                .broker.address.port = DadosMqtt.mqtt_port,
                .broker.verification.certificate = (const char *)ca_cert_pem_start,
                .broker.verification.certificate_len = (ca_cert_pem_end - ca_cert_pem_start),
                //.broker.verification.certificate = DadosMqtt.mqtt_cert,
                //.broker.verification.certificate_len = DadosMqtt.mqtt_cert_len,
                .credentials.username = DadosMqtt.mqtt_user,
                .credentials.authentication.password = DadosMqtt.mqtt_pass,
                .credentials.client_id = "enervision",
                .session.last_will.msg = f_prepJsonConfig(mqtt_OFF),
                .session.last_will.msg_len = strlen(f_prepJsonConfig(mqtt_OFF)),
                .session.last_will.topic = f_topico(zMQTT_sufixo),
                .session.last_will.retain = true,
                .session.last_will.qos = 1,
                .session.keepalive = 2        
            };

                //Fazer isso no futuro - v.2.0  //Não conectou ... Busca o backup por OTA ?
                //Antes de buscar a conexão via backup, conferir conexões, ping, verificar se o dns 
            EventBits_t bits = xEventGroupWaitBits(xEventGroupWifi, WIFI_BIT_0, pdFALSE, pdFALSE, pdMS_TO_TICKS(40000));
            if((bits & WIFI_BIT_0)){
                client = esp_mqtt_client_init(&mqtt_cfg);
                esp_mqtt_client_register_event(client, ESP_EVENT_ANY_ID, mqtt_event_handler, NULL);
                esp_mqtt_client_start(client);
            }
    }
    
    if(f_DadosMqttout(&DadosMqttout) == ESP_OK){
            ESP_LOGW(TAG, "MQTT OUT");
            esp_mqtt_client_config_t mqtt_cfg_out = {
                .broker.address.uri = DadosMqttout.mqtt_server,
                .broker.address.port = DadosMqttout.mqtt_port,
                .broker.verification.certificate = (const char *)ca_cert_pem_start,
                .broker.verification.certificate_len = (ca_cert_pem_end - ca_cert_pem_start),
                .credentials.username = DadosMqttout.mqtt_user,
                .credentials.authentication.password = DadosMqttout.mqtt_pass,
                .credentials.client_id = f_serialNumber(),
                .session.last_will.msg = f_prepJsonConfig(mqtt_OFF),
                .session.last_will.msg_len = strlen(f_prepJsonConfig(mqtt_OFF)),
                .session.last_will.topic = f_topico(zMQTT_sufixo),
                .session.last_will.retain = true,
                .session.last_will.qos = 1,
                .session.keepalive = 2        
            };
            EventBits_t bits = xEventGroupWaitBits(xEventGroupWifi, WIFI_BIT_0, pdFALSE, pdFALSE, pdMS_TO_TICKS(40000));
            if((bits & WIFI_BIT_0)){
                cMqttOut = esp_mqtt_client_init(&mqtt_cfg_out);
                esp_mqtt_client_register_event(cMqttOut, ESP_EVENT_ANY_ID, mqtt_event_handler_out, NULL);
                esp_mqtt_client_start(cMqttOut);
            }
    }
}

My Setup Screen:

My product:


Which system restriction are you talking about? You seem to be using MQTT library from Espressif and therefore, you should reach out to them for questions about it.

I do not think ICMP ping and MQTT should interfare with each other but please confirm with Espressif.

One other point to think about - do you really need 2 connections? Can you not use one connection to both publish and subscribe?

I am immensely grateful for the help provided.

When I refer to a constraint, I’m asking if it’s a good practice to use these features with Semaphoros or Event Group.

An IO operation for example, I know it is good practice to use semaphoros, you shouldn’t have several different tasks use the same IO’s at the same time, without semaphoros. For example.

That’s why my question: Is it a good practice to also apply this concept in separate tasks that use Wifi connectivity?

Or does the fact that the internet connection is asynchronous make it unnecessary? What is the best practice in this case?

Does the same question apply above?

In my case, in particular, they are different servers, with different purposes, which is why I need two connections.

which network stack do you use? Some already enforce implicit thread safety, others require explicit guarding, some do not allow sharing of sockets between tasks (which I believe is not applicable in your case, just mentioning it for completeness’ sake), and so on. It is best to consult with the stack manufacturer’s documentation.

Also, you are using the term “semaforo” where most likely (from your description) a mutex would be your preferred choice. Very common fallacy.

As @RAc mentioned, you need to check with Espressif if their TCP and MQTT libraries can be used from multiple tasks.

Makes sense.

I’m going to make a post like this on the expressif forum.

Still, I would like your opinion regarding my code. What do you think ?

Is it correct from a freertos perspective?

As @RAc mentioned, if you are trying to protect a resource from simultaneous access, you need mutex and not semaphore. Other than that, from your description, the application looks correct.