emMQTT Publisher-Subscriber 2Tasks (Sample)

From SEGGER Knowledge Base
Jump to navigation Jump to search
IP_MQTT_CLIENT_PublisherSubscriber_2Tasks_Sample.c
Requires modifications No (optional)
Download IP_MQTT_CLIENT_PublisherSubscriber_2Tasks_Sample.c

This Sample sends and receives messages over a single connection to a broker. You can subscribe to one or more topics and also publish messages. All QoS levels are supported. The application uses two tasks. One to manage initialization of the client, connect to the broker and handle the reception of the messages, and a second task that periodically sends a message.

The client connects to the free broker broker.emqx.io and sends a subscribe message for one topic. The subscribed topic should be delivered with QoS level 1. Furthermore, the client publishes messages (sends messages to the broker).

Code

/*********************************************************************
*                   (c) SEGGER Microcontroller GmbH                  *
*                        The Embedded Experts                        *
*                           www.segger.com                           *
**********************************************************************

-------------------------- END-OF-HEADER -----------------------------

Purpose : Sample program for embOS & emNet demonstrating MQTT.
          This sample application sends and receives messages over a
          single connection to a broker. You can subscribe to one or
          more topics and also publish messages. All QoS levels are
          supported.

          The client connects to the free broker test.mosquitto.org
          and sends a subscribe message for one topic.
          The subscribed topic should be delivered with QoS level 1.
          Furthermore, the client publishes messages (sends messages
          to the broker).

          The application uses two tasks. One to manage initialization
          of the client, connect to the broker and handle the
          reception of the messages and a second task that periodically
          sends a message.

Additional information:
  Preparations:
    Works out-of-the-box.

    Change the define USE_MQTT_5 to 0 if you want to test MQTT 3.1.1.
    Change the defines TOPIC_SUBSCRIBE_QOS and TOPIC_PUBLISH_QOS to
    one of the following defines to set a specific
    "quality of service" level for subscribing and publishing:
      IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS0:  QOS = 0, Fire and forget    (At most once)
      IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS1:  QOS = 1, Simple acknowledge (At least once)
      IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS2:  QOS = 2, Full handshake     (Exactly once)

    To keep the setup for this application as simple as
    possible the publisher task sends message for the
    topic which the subscriber task requests from the
    broker.

    In a real-life setup the subscriber will be most likely not
    interested in data that was previously published from itself.
    Therefore, We recommend to subscribe to another topic.

  Expected behavior:
    The publisher task sends messages to the broker.
    The subscriber task receives messages from the broker.
    The MQTT client handles all QoS related messages.

  Sample output:
    ...
    APP: Connected to 34.243.217.54, port 1883.
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_RECEIVE_MAXIMUM" received
    _HandleProperties: IN: Maximum number of concurrent QoS 1 and QoS 2: 7 (Server reports 7)
    APP: Received Property IP_MQTT_PROP_TYPE_RECEIVE_MAXIMUM for CONNACK with PacketID 0.
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_MAXIMUM_PACKET_SIZE" received
    _HandleProperties: IN: Maximum packet size: 1048576
    APP: Received Property IP_MQTT_PROP_TYPE_MAXIMUM_PACKET_SIZE for CONNACK with PacketID 0.
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_RETAIN_AVAILABLE" received
    _HandleProperties: IN: Server retain available 1
    APP: Received Property IP_MQTT_PROP_TYPE_RETAIN_AVAILABLE for CONNACK with PacketID 0.
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_SHARED_SUBSCRIPTION_AVAILABLE" received
    APP: Received Property IP_MQTT_PROP_TYPE_SHARED_SUBSCRIPTION_AVAILABLE for CONNACK with PacketID 0.
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_SUBSCRIPTION_IDENTIFIER_AVAILABLE" received
    APP: Received Property IP_MQTT_PROP_TYPE_SUBSCRIPTION_IDENTIFIER_AVAILABLE for CONNACK with PacketI
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_TOPIC_ALIAS_MAXIMUM" received
    APP: Broker reports Topic Alias Maximum of 65535.
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_WILDCARD_SUBSCRIPTION_AVAILABLE" received
    APP: Received Property IP_MQTT_PROP_TYPE_WILDCARD_SUBSCRIPTION_AVAILABLE for CONNACK with PacketID
    _HandleProperties: IN: Property "IP_MQTT_PROP_TYPE_ASSIGNED_CLIENT_IDENTIFIER" received
    _HandleProperties: IN: New ID assigned by the server fivhJg0CXi2Xclvk
    APP: Received Property IP_MQTT_PROP_TYPE_ASSIGNED_CLIENT_IDENTIFIER for CONNACK with PacketID 0.
    MQTT: Session established.
    MQTT: SUBSCRIBE message (Id: 17280) sent.
    IP_MQTT_CLIENT_Exec: SUBACK received.
    MQTT: SUBACK (Id: 17280) received.
    APP: Message (Type: SUBACK, Id: 17280) received. Reason Code: IP_MQTT_REASON_SUCCESS.
    APP: ----
    IP_MQTT_CLIENT_Exec: PUBLISH received.
    MQTT: PUBLISH (QoS: 0 | Ret: 1 | Dup: 0) received.
    APP: IN: Message with (Id: 0 | QoS: 0 | Retain: 1 | Duplicate: 0) received for topic "emMQTT"
    APP: IN:   Property "IP_MQTT_PROP_TYPE_TOPIC_ALIAS" received
    APP: IN:   Topic Alias: "1"
    APP: IN:   Payload: "Client '' has disconnected ungracefully"
    APP: ----
    MQTT: PUBLISH (Id: 22299) sent.
    APP: OUT: Message (Id: 22299) sent. ("Target Test from 'fivhJg0CXi2Xclvk'")
    APP: ----
    IP_MQTT_CLIENT_Exec: PUBACK received.
    MQTT: PUBACK (Id: 22299) received.
    APP: OUT: Message with Id: 22299 has been freed. Reason Code received: IP_MQTT_REASON_SUCCESS.
    APP: ----
    IP_MQTT_CLIENT_Exec: PUBLISH received.
    MQTT: PUBLISH (QoS: 1 | Ret: 0 | Dup: 0) received.
    APP: IN: Message with (Id: 1 | QoS: 1 | Retain: 0 | Duplicate: 0) received for topic ""
    APP: IN:   Property "IP_MQTT_PROP_TYPE_USER_PROPERTY" received
    APP: IN:     Key:   "emMQTT Publish UserProp Name"
    APP: IN:     Value: "emMQTT Publish UserProp Value"
    APP: IN:   Property "IP_MQTT_PROP_TYPE_USER_PROPERTY" received
    APP: IN:     Key:   "Dynamic user property key 1"
    APP: IN:     Value: "Dynamic user property value 1"
    APP: IN:   Property "IP_MQTT_PROP_TYPE_MESSAGE_EXPIRY_INTERVAL" received
    APP: IN:   Message expire interval set to: "60" seconds
    APP: IN:   Property "IP_MQTT_PROP_TYPE_TOPIC_ALIAS" received
    APP: IN:   Topic Alias: "1"
    APP: IN:   Payload: "Target Test from 'fivhJg0CXi2Xclvk'"
    APP: ----
    MQTT: PUBACK (Id: 1) sent.
    APP: IN: Message with Id: 1 can be processed. Reason Code sent: IP_MQTT_REASON_SUCCESS.
    APP: ----

*/

#include "RTOS.h"
#include "BSP.h"
#include "IP.h"
#include "IP_MQTT_CLIENT.h"
#include "SEGGER.h"
#include "SEGGER_IP.h"
#include "SEGGER_UTIL.h"

/*********************************************************************
*
*       Configuration
*
**********************************************************************
*/

#define USE_RX_TASK                0  // 0: Packets are read in ISR, 1: Packets are read in a task of its own.

#ifndef   APP_USE_SSL
  #define APP_USE_SSL             (0) // If enabled, creates SSL sockets as well.
#endif

#if (APP_USE_SSL != 0)
  #warning "When using Mosquitto it's necessary to install the mosquitto certificate to verify the server connection."
  //
  // In order to establish an SSL connection to test.mosquitto.org, the certificate authority file needs to be added
  // to SSL_X_Config.c. This can be done by running PrintCert (in the emSSL folder under /Windows/SSL/PrintCert.exe)
  // on the certificate file that can be downloaded from test.mosquitto.org. The output then has to be copied into
  // SSL_X_TrustedCerts.c and added as a new root certificate in SSL_X_Config.c.
  // Please refer to chapter 3.7.3 in UM15001_emSSL for more information.
  //
  #include "SSL.h"
#endif

#ifndef USE_MQTT_5
  //
  // This sample is able to communicate using MQTT 5 and MQTT 3.1.1.
  // The default is to use MQTT 5, if you want to use MQTT 3.1.1 set the following define to 0.
  // IP_MQTT_CLIENT_SUPPORT_V5 must be enabled to use MQTT 5.
  //
  #if IP_MQTT_CLIENT_SUPPORT_V5
    #define USE_MQTT_5 1
  #else
    #define USE_MQTT_5 0
  #endif
#endif

//
// Functions in this sample which are not part of the MQTT add-on.
// If you are not using emNet you can change these macros.
//
#ifndef GET_TIME_32
  #define GET_TIME_32 IP_OS_GetTime32()
#endif

#ifndef CHECK_TIME_EXPIRED
  #define CHECK_TIME_EXPIRED(x) IP_IsExpired(x)
#endif

#define NUM_PROPERTIES_CONNECT    3
#define NUM_PROPERTIES_PUBLISH    4
#define NUM_PROPERTIES_LAST_WILL  1
//
// Broker settings
//
#define MQTT_BROKER_ADDR                 "broker.emqx.io"  // Alternate test broker: test.mosquitto.org
#if (APP_USE_SSL != 0)
  #define MQTT_BROKER_PORT               8883
#else
  #define MQTT_BROKER_PORT               1883
#endif

//
// Client settings
//
#define MQTT_CLIENT_NAME                 "" // An empty client name will allow the server to automatically choose a unique client ID.
                                            // This will be assigned during the connect.
#define MQTT_CLIENT_BUFFER_SIZE          256
#define MQTT_CLIENT_MESSAGE_BUFFER_SIZE  512
#define RCV_TIMEOUT                      5000
#define MQTT_CLIENT_MEMORY_POOL_SIZE     1024                               // Pool size should be at least 512 bytes.

//
// Application settings
//
#define TOPIC_FILTER01_TO_SUBSCRIBE      "emMQTT"                           // Topic to subscribe.
#define TOPIC_SUBSCRIBE_QOS              IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS2  // Quality of service level to use when subscribing
#define TOPIC_PUBLISH_QOS                IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS1  // Quality of service level to use when publishing
#define PUBLISH_FREQUENCY                5000                               // Send a publish message each x milliseconds.

//
// Set keep-alive timeout to 60 seconds.
// For "test.mosquitto.org" this must not be 0, otherwise the server will refuse the connection.
//
#define PING_FREQUENCY                   60                                 // Configure MQTT ping to x seconds.
//
// Timeout for the case where the server is not responding.
//
#define NO_REPLY_TIMEOUT                 (30 * 1000)

//
// Publish related definitions
//
#define TOPIC_FILTER_TO_PUBLISH          "emMQTT"                           // Topic to publish
#define TOPIC_ALIAS                      55                                 // Topic alias ID

//
// Task stack sizes that might not fit for some interfaces (multiples of sizeof(int)).
//
#ifndef       APP_MAIN_STACK_SIZE
  #if (APP_USE_SSL != 0)
    #define   APP_MAIN_STACK_SIZE        (4096)
  #else
    #define   APP_MAIN_STACK_SIZE        (TASK_STACK_SIZE_IP_TASK + 256)
  #endif
#endif

#ifndef       PUBLISH_TASK_STACK_SIZE
  #if (APP_USE_SSL != 0)
    #define   PUBLISH_TASK_STACK_SIZE    (2304)
  #else
    #define   PUBLISH_TASK_STACK_SIZE    (TASK_STACK_SIZE_IP_TASK + 256)
  #endif
#endif

//
// Task priorities.
//
enum {
   TASK_PRIO_IP_TASK = 150  // Priority should be higher than all IP application tasks.
#if USE_RX_TASK
  ,TASK_PRIO_IP_RX_TASK     // Must be the highest priority of all IP related tasks.
#endif
};

/*********************************************************************
*
*       Static data
*
**********************************************************************
*/

static IP_HOOK_ON_STATE_CHANGE _StateChangeHook;
static int                     _IFaceId;

//
// Task stacks and Task-Control-Blocks.
//
static OS_STACKPTR int _IPStack[TASK_STACK_SIZE_IP_TASK/sizeof(int)];       // Stack of the IP_Task.
static OS_TASK         _IPTCB;                                              // Task-Control-Block of the IP_Task.

static OS_STACKPTR int _MQTTPubStack[PUBLISH_TASK_STACK_SIZE/sizeof(int)];  // Stack of the MQTT Publisher task.
static OS_TASK         _PublishTCB;                                         // Task-Control-Block of the IP_Task.

static OS_STACKPTR int APP_MainStack[APP_MAIN_STACK_SIZE / sizeof(int)];    // Stack of the starting point of this sample.
static OS_TASK         APP_MainTCB;                                         // Task-Control-Block of the IP_Task.

#if USE_RX_TASK
static OS_STACKPTR int _IPRxStack[TASK_STACK_SIZE_IP_RX_TASK/sizeof(int)];  // Stack of the IP_RxTask.
static OS_TASK         _IPRxTCB;                                            // Task-Control-Block of the IP_RxTask.
#endif

static char            _acBuffer[MQTT_CLIENT_BUFFER_SIZE];                  // Memory block used by the MQTT client.
static char            _aPayloadIN[MQTT_CLIENT_MESSAGE_BUFFER_SIZE];        // Memory used to store the received payload for printf().
static char            _aPayloadOUT[MQTT_CLIENT_MESSAGE_BUFFER_SIZE];       // Memory used to store the payload for snprintf().


static IP_MQTT_CLIENT_CONTEXT _MQTTClient;
static U32             _aMQTTPool[MQTT_CLIENT_MEMORY_POOL_SIZE / sizeof(int)]; // Memory pool for the MQTT client. Required for the maintenance structures

static int             _BrokerSupportsAlias;

//
// MQTT API locking resources.
//
static OS_RSEMA _RSema;

#if (APP_USE_SSL != 0)
//
// SSL
//
static SSL_SESSION _SSLSession;
#endif

#if USE_MQTT_5
static const IP_MQTT_PROPERTY     * apPropertiesConnect[NUM_PROPERTIES_CONNECT];
static const IP_MQTT_PROPERTY     * apPropertiesPublish[NUM_PROPERTIES_PUBLISH];
static const IP_MQTT_PROPERTY     * apPropertiesLastWill[NUM_PROPERTIES_LAST_WILL];

//
// Static connect properties
//
static const IP_MQTT_PROPERTY _PropUserConnect = {
  .PropType = IP_MQTT_PROP_TYPE_USER_PROPERTY,
  .PropData = {
    .Data_StrPair = {
      .Len1 = 28,
      .Len2 = 29,
      .pData1 = "emMQTT Connect UserProp Name",
      .pData2 = "emMQTT Connect UserProp Value"
    }
  }
};

static const IP_MQTT_PROPERTY _PropReceiveMax = {
  .PropType = IP_MQTT_PROP_TYPE_RECEIVE_MAXIMUM,
  .PropData = {
    .Data_U8 = 0x07
  }
};

static const IP_MQTT_PROPERTY _PropAliasMax = {
  .PropType = IP_MQTT_PROP_TYPE_TOPIC_ALIAS_MAXIMUM,
  .PropData = {
    .Data_U16 = 0x01
  }
};

//
// Static publish properties (See _PUBLISH_and_PING_Task() for a dynamic usage sample)
//
static const IP_MQTT_PROPERTY _PropUserPublish = {
  .PropType = IP_MQTT_PROP_TYPE_USER_PROPERTY,
  .PropData = {
    .Data_StrPair = {
      .Len1 = 28,
      .Len2 = 29,
      .pData1 = "emMQTT Publish UserProp Name",
      .pData2 = "emMQTT Publish UserProp Value"
    }
  }
};

static const IP_MQTT_PROPERTY _PropExpiry = {
  .PropType = IP_MQTT_PROP_TYPE_MESSAGE_EXPIRY_INTERVAL,
  .PropData = {
    .Data_U32 = 60 // in seconds
  }
};

static const IP_MQTT_PROPERTY _PropAlias = {
  .PropType = IP_MQTT_PROP_TYPE_TOPIC_ALIAS,
  .PropData = {
    .Data_U16 = TOPIC_ALIAS
  }
};

//
// Static Last Will properties
//
static const IP_MQTT_PROPERTY _PropWillDelay = {
  .PropType = IP_MQTT_PROP_TYPE_WILL_DELAY_INTERVAL,
  .PropData = {
    .Data_U32 = 30 // in seconds
  }
};
#endif

//
// Packet types.
//
static const char* MQTT_PACKET_Types[] = {
  "Unknown",
  "CONNECT",
  "CONNACK",
  "PUBLISH",
  "PUBACK",
  "PUBREC",
  "PUBREL",
  "PUBCOMP",
  "SUBSCRIBE",
  "SUBACK",
  "UNSUBSCRIBE",
  "UNSUBACK",
  "PINGREQ",
  "PINGRESP",
  "DISCONNECT"
};

/*********************************************************************
*
*       Prototypes
*
**********************************************************************
*/

#ifdef __cplusplus
extern "C" {     /* Make sure we have C-declarations in C++ programs */
#endif
void MainTask(void);
#ifdef __cplusplus
}
#endif

#if (APP_USE_SSL != 0)
static const SSL_TRANSPORT_API _IP_Transport;
#endif

/*********************************************************************
*
*       Local functions
*
**********************************************************************
*/

/*********************************************************************
*
*       _OnStateChange()
*
*  Function description
*    Callback that will be notified once the state of an interface
*    changes.
*
*  Parameters
*    IFaceId   : Zero-based interface index.
*    AdminState: Is this interface enabled ?
*    HWState   : Is this interface physically ready ?
*/
static void _OnStateChange(unsigned IFaceId, U8 AdminState, U8 HWState) {
  //
  // Check if this is a disconnect from the peer or a link down.
  // In this case call IP_Disconnect() to get into a known state.
  //
  if (((AdminState == IP_ADMIN_STATE_DOWN) && (HWState == 1)) ||  // Typical for dial-up connection e.g. PPP when closed from peer. Link up but app. closed.
      ((AdminState == IP_ADMIN_STATE_UP)   && (HWState == 0))) {  // Typical for any Ethernet connection e.g. PPPoE. App. opened but link down.
    IP_Disconnect(IFaceId);                                       // Disconnect the interface to a clean state.
  }
}

/*********************************************************************
*
*       _Connect()
*
*  Function description
*    Creates a socket and opens a TCP connection to the MQTT broker.
*
*  Return value
*    != NULL: O.K.
*    == NULL: Error.
*/
static IP_MQTT_CLIENT_SOCKET _Connect(const char* sBrokerAddr, unsigned BrokerPort) {
  struct sockaddr_in      sin;
  SEGGER_PARSE_IP_STATUS  Status;
  SEGGER_PARSE_IP_TYPE    Type;
  U32                     Ip;
  long                    hSock;
  int                     SoError;
  int                     r;
  U32                     Timeout;
  U8                      acBuff[16];

  Status = SEGGER_ParseIP(sBrokerAddr, (unsigned char*)&acBuff, sizeof(acBuff), &Type);
  if (Status == SEGGER_PARSE_IP_STATUS_OK) {
    IP_MQTT_CLIENT_MEMCPY(&Ip, &acBuff[0], sizeof(Ip));
    Ip = htonl(Ip);
  } else {
    //
    // Convert host into IP address.
    //
    r = IP_ResolveHost(sBrokerAddr, &Ip, 10000);  // Resolve host name.
    if (r != 0) {
      IP_Logf_Application("Could not resolve host addr. for \"%s\"", sBrokerAddr);
      return NULL;
    }
  }
  //
  // Create socket and connect to the MQTT broker.
  //
  hSock = socket(AF_INET, SOCK_STREAM, 0);
  if(hSock == -1) {
    IP_MQTT_CLIENT_APP_WARN(("APP: Could not create socket!"));
    return NULL;
  }
  //
  // Set receive timeout.
  //
  Timeout = RCV_TIMEOUT;
  setsockopt(hSock, SOL_SOCKET, SO_RCVTIMEO, (char*)&Timeout, sizeof(Timeout));
  //
  // Connect.
  //
  IP_MQTT_CLIENT_MEMSET(&sin, 0, sizeof(sin));
  sin.sin_family      = AF_INET;
  sin.sin_port        = htons((U16)BrokerPort);
  sin.sin_addr.s_addr = Ip;
  r = connect(hSock, (struct sockaddr*)&sin, sizeof(sin));
  if(r == SOCKET_ERROR) {
    getsockopt(hSock, SOL_SOCKET, SO_ERROR, &SoError, sizeof(SoError));
    closesocket(hSock);
    IP_MQTT_CLIENT_APP_LOG(("APP: \nSocket error: %s", IP_Err2Str(SoError)));
    return NULL;
  }
  IP_MQTT_CLIENT_APP_LOG(("APP: Connected to %y, port %d.", Ip, BrokerPort));
  return (IP_MQTT_CLIENT_SOCKET)hSock;
}

/*********************************************************************
*
*       _Disconnect()
*
*  Function description
*    Closes a socket.
*/
static void _Disconnect(void* pSocket) {
  _BrokerSupportsAlias = 0;
  if (pSocket) {
    IP_MQTT_CLIENT_APP_LOG(("_Disconnect: closing socket %d", (long)pSocket));
    closesocket((long)pSocket);
  }
}

/*********************************************************************
*
*       _Recv()
*
*  Function description
*    Receives data via socket interface.
*
*  Return value
*    >   0: O.K., number of bytes received.
*    ==  0: Connection has been gracefully closed by the broker.
*    == -1: Error.
*/
static int _Recv(void* pSocket, char* pBuffer, int NumBytes) {
  return recv((long)pSocket, pBuffer, NumBytes, 0);
}

/*********************************************************************
*
*       _Send()
*
*  Function description
*    Sends data via socket interface.
*
*  Return value
*    >   0: O.K., number of bytes sent.
*    ==  0: Connection has been gracefully closed by the broker.
*    == -1: Error.
*/
static int _Send(void* pSocket, const char* pBuffer, int NumBytes) {
  return send((long)pSocket, pBuffer, NumBytes, 0);
}

#if (APP_USE_SSL != 0)

/*********************************************************************
*
*       _MQTT_SSL_Connect()
*/
static IP_MQTT_CLIENT_SOCKET _MQTT_SSL_Connect(const char* sBrokerAddr, unsigned BrokerPort) {
  struct sockaddr_in      sin;
  SEGGER_PARSE_IP_STATUS  Status;
  SEGGER_PARSE_IP_TYPE    Type;
  U32                     Ip;
  long                    hSock;
  int                     SoError;
  int                     r;
  U32                     Timeout;
  U8                      acBuff[16];

  Status = SEGGER_ParseIP(sBrokerAddr, (unsigned char*)&acBuff, sizeof(acBuff), &Type);
  if (Status == SEGGER_PARSE_IP_STATUS_OK) {
    IP_MQTT_CLIENT_MEMCPY(&Ip, &acBuff[0], sizeof(Ip));
    Ip = htonl(Ip);
  } else {
    //
    // Convert host into IP address.
    //
    r = IP_ResolveHost(sBrokerAddr, &Ip, 10000);  // Resolve host name.
    if (r != 0) {
      IP_Logf_Application("Could not resolve host addr. for \"%s\"", sBrokerAddr);
      return NULL;
    }
  }
  //
  // Create socket and connect to the MQTT broker.
  //
  hSock = socket(AF_INET, SOCK_STREAM, 0);
  if(hSock == -1) {
    IP_MQTT_CLIENT_APP_WARN(("APP: Could not create socket!"));
    return NULL;
  }
  //
  // Set receive timeout.
  //
  Timeout = RCV_TIMEOUT;
  setsockopt(hSock, SOL_SOCKET, SO_RCVTIMEO, (char*)&Timeout, sizeof(Timeout));
  //
  // Connect.
  //
  IP_MQTT_CLIENT_MEMSET(&sin, 0, sizeof(sin));
  sin.sin_family      = AF_INET;
  sin.sin_port        = htons((U16)BrokerPort);
  sin.sin_addr.s_addr = Ip;
  r = connect(hSock, (struct sockaddr*)&sin, sizeof(sin));
  if(r == SOCKET_ERROR) {
    getsockopt(hSock, SOL_SOCKET, SO_ERROR, &SoError, sizeof(SoError));
    closesocket(hSock);
    IP_MQTT_CLIENT_APP_LOG(("APP: \nSocket error: %s", IP_Err2Str(SoError)));
    return NULL;
  }
  //
  // Activate Security
  //
  SSL_SESSION_Prepare(&_SSLSession, hSock, &_IP_Transport);
  //
  // Connect.
  //
  r = SSL_SESSION_Connect(&_SSLSession, sBrokerAddr);
  if(r < 0) {
    IP_MQTT_CLIENT_LOG(("APP: SSL upgrade error : %d", r));
    closesocket(hSock);
    return NULL;
  }
  //
  if (_SSLSession.State == SSL_CONNECTED) {
    IP_MQTT_CLIENT_LOG(("APP: Secured using %s.", SSL_SUITE_GetIANASuiteName(SSL_SUITE_GetID(SSL_SESSION_GetSuite(&_SSLSession)))));
  }
  //
  IP_MQTT_CLIENT_LOG(("APP: Connected to %y, port %d.", Ip, BrokerPort));
  return (IP_MQTT_CLIENT_SOCKET)&_SSLSession;

}

/*********************************************************************
*
*       _MQTT_SSL_Disconnect()
*/
static void _MQTT_SSL_Disconnect(void* pSocket) {
  if (pSocket != NULL) {
    IP_MQTT_CLIENT_APP_LOG(("_Disconnect: closing socket %d", (long)pSocket));
    SSL_SESSION_Disconnect(pSocket);
  }
}

/*********************************************************************
*
*       _MQTT_SSL_Recv()
*
*  Function description
*    Web Server API wrapper for SSL recv()
*/
static int _MQTT_SSL_Recv(void* pSocket, char* pBuffer, int NumBytes) {
  SSL_SESSION*        pSession;
  int                 r;

  pSession = (SSL_SESSION*)pSocket;
    do {
      r = SSL_SESSION_Receive(pSession, pBuffer, NumBytes);
    } while (r == 0);  // Receiving 0 bytes means something different on a plain socket.
    //
    // Translate EOF into "connection closed",
    // the MQTT Client expects same return values as with recv().
    //
    if (r == SSL_ERROR_EOF) {
      r = 0;
    }
  return r;
}

/*********************************************************************
*
*       _SSL_Recv()
*
*  Function description
*    SSL transport API wrapper for recv()
*/
static int _SSL_Recv(int Socket, char *pData, int Len, int Flags) {
  return recv(Socket, pData, Len, Flags);
}

/*********************************************************************
*
*       _MQTT_SSL_Send()
*
*  Function description
*    Web Server API wrapper for SSL send()
*/
static int _MQTT_SSL_Send(void* pSocket, const char* pBuffer, int NumBytes) {
  SSL_SESSION*        pSession;
  int                 r;

  pSession = (SSL_SESSION*)pSocket;
  r = SSL_SESSION_Send(pSession, pBuffer, NumBytes);
  return r;
}

/*********************************************************************
*
*       _SSL_Send()
*
*  Function description
*    SSL transport API wrapper for send()
*/
static int _SSL_Send(int Socket, const char *pData, int Len, int Flags) {
  return send(Socket, pData, Len, Flags);
}

/*********************************************************************
*
*       MQTT SSL transport layer
*
*  Description
*    Web server transport API for SSL connections.
*/
static const IP_MQTT_CLIENT_TRANSPORT_API _IP_Api_SSL = {
  _MQTT_SSL_Connect,
  _MQTT_SSL_Disconnect,
  _MQTT_SSL_Recv,
  _MQTT_SSL_Send,
};

/*********************************************************************
*
*       SSL transport layer
*
*  Description
*    SSL transport API.
*/
static const SSL_TRANSPORT_API _IP_Transport = {
  _SSL_Send,
  _SSL_Recv,
  // GetTime was set to NULL instead because otherwise the certificate will be expired.
  NULL, //IP_OS_GetTime32,
  NULL
};

#endif

/*********************************************************************
*
*       IP_MQTT_CLIENT_TRANSPORT_API
*
*  Description
*    IP stack related function table
*/
static const IP_MQTT_CLIENT_TRANSPORT_API _IP_Api = {
  _Connect,
  _Disconnect,
  _Recv,
  _Send
};

/*********************************************************************
*
*       _GenRandom()
*
*  Function description
*    Generates a random number.
*/
static U16 _GenRandom(void) {
  U32 TimeStamp;

  //
  // Return a random value, which can be used for packet Id, ...
  // We just use the time in this sample because we do not have a different random source.
  //
  TimeStamp = OS_GetTime32();
  return (U16)TimeStamp;
}

/*********************************************************************
*
*       _Alloc()
*
*  Function description
*    Wrapper for Alloc(). (emNet: IP_MEM_Alloc())
*/
static void* _Alloc(U32 NumBytesReq) {
  return IP_AllocEx(_aMQTTPool, NumBytesReq);
}

/*********************************************************************
*
*       _Free()
*
*  Function description
*    Wrapper for Free(). (emNet: IP_Free())
*/
static void _Free(void *p) {
  IP_Free(p);
}

/*********************************************************************
*
*       _Lock()
*
*  Function description
*    Acquires the lock to ensure MQTT API is thread safe.
*/
static void _Lock(void) {
  OS_Use(&_RSema);
}

/*********************************************************************
*
*       _Unlock()
*
*  Function description
*    Releases a previously acquired lock for thread safety MQTT API.
*/
static void _Unlock(void) {
  OS_Unuse(&_RSema);
}

/*********************************************************************
*
*       _SkipProperty()
*
*/
static void _SkipProperty(U8 PropType, char **pp, int *pBytes) {
  U32         skipLen;
  U16         Len;

  switch (PropType) {
    //
    // 1-byte values
    //
    case IP_MQTT_PROP_TYPE_PAYLOAD_FORMAT_INDICATOR:
    case IP_MQTT_PROP_TYPE_MAXIMUM_QOS:
    case IP_MQTT_PROP_TYPE_RETAIN_AVAILABLE:
    case IP_MQTT_PROP_TYPE_SUBSCRIPTION_ID:
      skipLen = 1;
      break;
    //
    // 4-byte values
    //
    case IP_MQTT_PROP_TYPE_MESSAGE_EXPIRY_INTERVAL:
    case IP_MQTT_PROP_TYPE_TOPIC_ALIAS_MAXIMUM:
    case IP_MQTT_PROP_TYPE_SESSION_EXPIRY_INTERVAL:
    case IP_MQTT_PROP_TYPE_WILL_DELAY_INTERVAL:
      skipLen = 4;
      break;
    //
    // Two-byte length + binary data
    //
    case IP_MQTT_PROP_TYPE_CORRELATION_DATA:
    case IP_MQTT_PROP_TYPE_CONTENT_TYPE:
    case IP_MQTT_PROP_TYPE_RESPONSE_TOPIC:
      Len = SEGGER_RdU16BE((U8*)*pp);
      skipLen = 2 + Len;
      break;
    //
    // Unknown property, skip everything
    //
    default:
      skipLen = *pBytes;
      break;
  }
  *pp += skipLen;
  *pBytes -= skipLen;
}

/*********************************************************************
*
*       _RecvMessageEx()
*
*  Function description
*    Process received PUBLISH messages.
*
*  Return value
*    >  0: O.K.
*    == 0: Connection has been gracefully closed by the broker.
*    <  0: Error
*/
static int _RecvMessageEx (void* pContext, void* pHandle, int NumBytesRem, U8 * pReasonCode) {
  IP_MQTT_CLIENT_CONTEXT* pMQTTClient;
  IP_MQTT_CLIENT_MESSAGE* pPublish;
  int                     NumBytes;
  int                     r;
  int                     NumBytesTopic;
  int                     NumBytesPayload;
  int                     NumBytesReceived;
  int                     NumBytesProperties;
  char*                   pTopic;
  char*                   pPayload;
  char*                   pProperties;
#if USE_MQTT_5
  U8                      PropType;
  U16                     Data16;
  U32                     Data32;
#endif

  pPublish            = (IP_MQTT_CLIENT_MESSAGE*)pHandle;
  pMQTTClient         = (IP_MQTT_CLIENT_CONTEXT*)pContext;
  pTopic              = NULL;
  pPayload            = NULL;
  pProperties         = NULL;
  NumBytesTopic       = 0;
  NumBytesReceived    = 0;
  NumBytesPayload     = 0;
  NumBytesProperties  = 0;
  //
  // Check if we can store the complete MQTT message in the application buffer.
  //
  if (NumBytesRem > MQTT_CLIENT_MESSAGE_BUFFER_SIZE) {
    NumBytes = MQTT_CLIENT_MESSAGE_BUFFER_SIZE;   // Get as much data as fits into the buffer.
  } else {
    NumBytes = NumBytesRem;                       // Get the complete message.
  }
  //
  // Read and process the MQTT message
  //
  r = IP_MQTT_CLIENT_Recv(pMQTTClient, _aPayloadIN, NumBytes);
  if (r > 0) {
    NumBytesRem      -= r;
    NumBytesReceived += r;
    //
    // Get the information in MQTT message header.
    //
    IP_MQTT_CLIENT_ParsePublishEx(pMQTTClient, pPublish, _aPayloadIN, NumBytes, &pTopic, &NumBytesTopic, &pPayload, &NumBytesPayload, &pProperties, &NumBytesProperties);
    //
    // Process the data
    // To visualize the data transmission, we output the message if it fits into the supplied buffer.
    //
    // Attention: The topic is not zero-terminated (strings in MQTT are generally not zero terminated because they always have a length header).
    //
    IP_MQTT_CLIENT_APP_LOG(("APP: IN: Message with (Id: %d | QoS: %d | Retain: %d | Duplicate: %d) received for topic \"%.*s\"", pPublish->PacketId, pPublish->QoS, pPublish->Retain, pPublish->Duplicate, NumBytesTopic, pTopic));
    if ((r < MQTT_CLIENT_MESSAGE_BUFFER_SIZE) && (NumBytesPayload < MQTT_CLIENT_MESSAGE_BUFFER_SIZE)) {  // If possible finalize the string to output it.
#if USE_MQTT_5
      if (pProperties != NULL) {
        do {
          PropType = *pProperties;
          pProperties++; // First data byte.
          NumBytesProperties--;
          IP_MQTT_CLIENT_APP_LOG(("APP: IN:   Property \"%s\" received", IP_MQTT_Property2String(PropType)));
          //
          // PUBLISH properties only.
          //
          switch (PropType) {
            case IP_MQTT_PROP_TYPE_USER_PROPERTY:
              //
              // A User property is a key-value UTF-8 string pair with the format
              // [U16 Len1][String1][U16 Len2][String2]
              // The strings are normally _not_ zero-terminated.
              //
              Data16 = SEGGER_RdU16BE((const U8*)pProperties); // Len1
              pProperties += 2;
              if (Data16 > 0) {
                IP_MQTT_CLIENT_APP_LOG(("APP: IN:     Key:   \"%.*s\" ", Data16, pProperties));
              }
              pProperties += Data16;
              NumBytesProperties -= 2 + Data16;
              Data16 = SEGGER_RdU16BE((const U8*)pProperties); // Len2
              pProperties += 2;
              if (Data16 > 0) {
                IP_MQTT_CLIENT_APP_LOG(("APP: IN:     Value: \"%.*s\" ", Data16, pProperties));
              }
              pProperties += Data16;
              NumBytesProperties -= 2 + Data16;
              break;
            case IP_MQTT_PROP_TYPE_MESSAGE_EXPIRY_INTERVAL:
              //
              // The Expiry Interval property is a 4 byte integer containing the expiry time in seconds.
              //
              Data32 = SEGGER_RdU32BE((const U8*)pProperties);
              NumBytesProperties -= 4;
              pProperties += 4;
              IP_MQTT_CLIENT_APP_LOG(("APP: IN:   Message expire interval set to: \"%d\" seconds ", Data32));
              break;
            case IP_MQTT_PROP_TYPE_TOPIC_ALIAS:
              //
              // The Topic Alias property is a 2 byte integer containing the negotiated topic alias ID.
              // The application can create a mapping table here in case multiple topics are used with different aliases.
              //
              Data16 = SEGGER_RdU16BE((const U8*)pProperties);
              NumBytesProperties -= 2;
              pProperties += 2;
              IP_MQTT_CLIENT_APP_LOG(("APP: IN:   Topic Alias: \"%d\" ", Data16));
              break;
            case IP_MQTT_PROP_TYPE_PAYLOAD_FORMAT_INDICATOR:
            case IP_MQTT_PROP_TYPE_CONTENT_TYPE:
            case IP_MQTT_PROP_TYPE_RESPONSE_TOPIC:
            case IP_MQTT_PROP_TYPE_CORRELATION_DATA:
            case IP_MQTT_PROP_TYPE_SUBSCRIPTION_ID:
            case IP_MQTT_PROP_TYPE_RETAIN_AVAILABLE:
            default:
              //
              // Property not decoded by this sample application.
              // Skip its value and continue parsing.
              //
              _SkipProperty(PropType, &pProperties, &NumBytesProperties);
              break;
          }
        } while (NumBytesProperties != 0);
      }
#endif
      if (pPayload != NULL) {
        *(pPayload + NumBytesPayload) = '\0';
        IP_MQTT_CLIENT_APP_LOG(("APP: IN:   Payload: \"%s\"", pPayload));
      } else {
        IP_MQTT_CLIENT_APP_LOG(("APP: IN:   No payload."));
      }
    } else {
      IP_MQTT_CLIENT_APP_LOG(("APP: IN:   Payload: Too large to output."));
    }
    //
    // If message is larger then the available buffer, read and discard it to clean the buffer.
    //
    while (NumBytesRem > 0) {
      if (NumBytesRem < NumBytes) {
        NumBytes = NumBytesRem;
      }
      r = IP_MQTT_CLIENT_Recv(pMQTTClient, _aPayloadIN, NumBytes);
      if (r < 0) {
        NumBytesReceived = r; // Receive failed. Connection closed ?
        break;
      }
      NumBytesRem      -= r;
      NumBytesReceived += r;
    };
  }
  //
  // Set a Reason Code to send back to the server.
  // This is only relevant if QoS of the received message is 1 or 2.
  //
  if (pPublish->QoS > 0) {
    *pReasonCode = IP_MQTT_REASON_SUCCESS;
    //*pReasonCode = IP_MQTT_REASON_IMPL_SPECIFIC_ERR; // Error code for testing.
  }
  IP_MQTT_CLIENT_APP_LOG(("APP: ----"));
  OS_Delay(10);
  return NumBytesReceived;
}

/*********************************************************************
*
*       _MessageHandledEx()
*
*  Function description
*    Called if all QoS related messages are processed and
*      - Sent messages were freed or can be freed.
*      - Received messages can be processed by the application.
*
*  Note
*   The application has to store the message with QoS > 0 until it gets/has sent an acknowledgement.
*   Target sends PUBLISH with QoS 0 to broker -> Message can be directly discarded after message has been sent.
*   Target sends PUBLISH with QoS 1 to broker -> Message can be discarded after PUBACK has been received from the broker.
*   Target sends PUBLISH with QoS 2 to broker -> Message can be discarded after PUBCOMP has been received.
*   Broker sends PUBLISH with QoS 0 to target -> Message can be directly processed.
*   Broker sends PUBLISH with QoS 1 to target -> Message can be processed after PUBACK has been sent to the broker.
*   Broker sends PUBLISH with QoS 2 to target -> Message can be processed after PUBCOMP has been received from the broker.
*
*  Return value
*    == 1: O.K.
*    != 1: Error
*/
static int _MessageHandledEx(void* pContext, U8 Type, U16 PacketId, U8 ReasonCode) {
  (void)pContext;
#if USE_MQTT_5 == 0
  (void)ReasonCode;
#endif
  //
  switch (Type) {
    //
    // Tx related packet types
    //
    case IP_MQTT_CLIENT_TYPES_PUBACK:  // PUBACK received. QoS level 1 successfully done. -> Message was freed.
    case IP_MQTT_CLIENT_TYPES_PUBCOMP: // PUCOMP received. QoS level 2 successfully done. -> Message was freed.
#if USE_MQTT_5
      IP_MQTT_CLIENT_APP_LOG(("APP: OUT: Message with Id: %d has been freed. Reason Code received: %s.", PacketId, IP_MQTT_ReasonCode2String(ReasonCode)));
#else
      IP_MQTT_CLIENT_APP_LOG(("APP: OUT: Message with Id: %d has been freed.", PacketId));
#endif
      break;
    //
    // Rx related packet types
    //
    case IP_MQTT_CLIENT_TYPES_PUBLISH: // PUBACK sent. -> Received packet can be processed.
    case IP_MQTT_CLIENT_TYPES_PUBREL:  // PUBREC sent. PUBREL received. PUBCOMP sent. -> Received packet can be processed.
#if USE_MQTT_5
      IP_MQTT_CLIENT_APP_LOG(("APP: IN: Message with Id: %d can be processed. Reason Code sent: %s.", PacketId, IP_MQTT_ReasonCode2String(ReasonCode)));
#else
      IP_MQTT_CLIENT_APP_LOG(("APP: IN: Message with Id: %d can be processed. ", PacketId));
#endif
      break;
    case IP_MQTT_CLIENT_TYPES_PINGRESP:
      IP_MQTT_CLIENT_APP_LOG(("APP: IN: Received PING Response from server."));
      break;
    //
    // Subscription packet types
    //
    case IP_MQTT_CLIENT_TYPES_SUBACK:
    case IP_MQTT_CLIENT_TYPES_UNSUBACK: // UNSUBACK received. The subscribe structure can be freed.
    default:
#if USE_MQTT_5
      IP_MQTT_CLIENT_APP_LOG(("APP: Message (Type: %s, Id: %d) received. Reason Code: %s.", MQTT_PACKET_Types[Type], PacketId, IP_MQTT_ReasonCode2String(ReasonCode)));
#else
      IP_MQTT_CLIENT_APP_LOG(("APP: Message (Type: %s, Id: %d) received.", MQTT_PACKET_Types[Type], PacketId));
#endif
      break;
  }
  IP_MQTT_CLIENT_APP_LOG(("APP: ----"));
  return 1;
}

/*********************************************************************
*
*       _OnProperty()
*
*  Function description
*    Called when a non-PUBLISH (CONNACK, PUBACK, PUBREC, PUBCOMP, SUBACK, UNSUBACK, DISCONNECT, AUTH) message is received with a Property.
*
*  Return value
*    > 0: O.K.
*    < 0: Error
*/
#if USE_MQTT_5
static void _OnProperty (void* pMQTTClient, U16 PacketId, U8 PacketType, IP_MQTT_PROPERTY * pProp) {
  int parsedFlag;
  (void)pMQTTClient;

  parsedFlag = 0;
  //
  // Additional property parsing can be added here.
  //
  if (PacketType == IP_MQTT_CLIENT_TYPES_CONNACK) { // CONNACK
    if (pProp->PropType == IP_MQTT_PROP_TYPE_TOPIC_ALIAS_MAXIMUM) {
      parsedFlag = 1;
      if (pProp->PropData.Data_U16 > 0) {
        _BrokerSupportsAlias = 1;
      } else {
        _BrokerSupportsAlias = 0;
      }
      IP_MQTT_CLIENT_APP_LOG(("APP: Broker reports Topic Alias Maximum of %d.", pProp->PropData.Data_U16));
    }
  }
  //
  // Unparsed properties produce a debug print.
  //
  if (parsedFlag == 0) {
    IP_MQTT_CLIENT_APP_LOG(("APP: Received Property %s for %s with PacketID %d.", IP_MQTT_Property2String(pProp->PropType), MQTT_PACKET_Types[PacketType], PacketId));
  }
  return;
}
#endif

/*********************************************************************
*
*       _HandleError()
*
*  Function description
*    Called in case of an error.
*    Can be used to analyze/handle a connection problem.
*
*  Return value
*    > 0: O.K.
*    < 0: Error
*/
static int _HandleError (void* pContext) {
  IP_MQTT_CLIENT_CONTEXT* pMQTTClient;
  int Error;

  Error       = 1;
  pMQTTClient = (IP_MQTT_CLIENT_CONTEXT*)pContext;
  getsockopt((long)pMQTTClient->Connection.Socket, SOL_SOCKET, SO_ERROR, &Error, sizeof(Error));
  if (Error < 0) {
    IP_MQTT_CLIENT_APP_LOG(("APP: Socket error %d. Disconnect MQTT client.", Error));
    IP_MQTT_CLIENT_Disconnect(pMQTTClient);
  }
  return Error;
}

/*********************************************************************
*
*       _HandleDisconnect()
*
*  Function description
*    Called in case of a disconnect from the server.
*    Only available if MQTT 5 is used.
*    With MQTT 3.1.1 there is no option for the server to send a disconnect.
*/
#if USE_MQTT_5
static void _HandleDisconnect (void* pContext, U8 ReasonCode) {
  IP_MQTT_CLIENT_CONTEXT* pMQTTClient;

  pMQTTClient = (IP_MQTT_CLIENT_CONTEXT*)pContext;

  IP_MQTT_CLIENT_APP_LOG(("APP: _HandleDisconnect: %d %s", ReasonCode, IP_MQTT_ReasonCode2String(ReasonCode)));
  IP_MQTT_CLIENT_Disconnect(pMQTTClient);
  return;
}
#endif

/*********************************************************************
*
*       _OnCheckTimeout()
*
*  Function description
*    This function will be called by the emMQTT module for messages with
*    the CheckUserTimeout flag set to 1. It should check based on the
*    UserTimeout, which the application should set previously, whether
*    a message should be timed out.
*
*  Return value
*    1: Message has timed out. The emMQTT module will call the _Free callback.
*    0: Message has not timed out yet.
*/
static int _OnCheckTimeout(void* pMQTTClient, IP_MQTT_CLIENT_MESSAGE* pMessage) {
  (void)pMQTTClient;

  if (CHECK_TIME_EXPIRED(pMessage->UserTimeout)) {
    return 1;
  } else {
    return 0;
  }
}

/*********************************************************************
*
*       IP_MQTT_CLIENT_APP_API
*
*  Description
*    Application related function table
*/
static const IP_MQTT_CLIENT_APP_API _APP_Api = {
  _GenRandom,       // (*pfGenRandom)
  _Alloc,           // (*pfAlloc)
  _Free,            // (*pfFree)
  _Lock,            // (*pfLock)
  _Unlock,          // (*pfUnlock)
  NULL,             // (*pfRecvMessage)
  NULL,             // (*pfOnMessageConfirm)    // Used with MQTT v3.1.1 only
  _HandleError,     // (*pfHandleError)
#if USE_MQTT_5
  _HandleDisconnect,// (*pfHandleDisconnect)    // Used with MQTT v5 only
#else
  NULL,
#endif
  _MessageHandledEx,// (*pfOnMessageConfirmEx)  // Used with MQTT v5 or 3.1.1 if "pfOnMessageConfirm" is not set
  _RecvMessageEx,   // (*pfRecvMessageEx)
#if USE_MQTT_5
  _OnProperty,      // (*pfOnProperty)          // Used with MQTT v5 only
#else
  NULL,
#endif
  _OnCheckTimeout
};

/*********************************************************************
*
*       _PUBLISH_and_PING_Task()
*
*  Function description
*    Sends publish and PINGREQ messages.
*/
static void _PUBLISH_and_PING_Task(void) {
  IP_MQTT_CLIENT_MESSAGE* pPublish;
  int                     r;
  U32                     TimeExpirePublish;
  U32                     TimeExpirePing;
#if USE_MQTT_5
  IP_MQTT_PROPERTY        DynamicUserProp;
  char                    acKey[40];
  char                    acValue[45];
  unsigned                Counter;
  int                     FirstPublishFlag;

  Counter = 0;
#endif

  TimeExpirePublish = 0;
  TimeExpirePing    = 0;
  FirstPublishFlag  = 1;

  do {
    r = IP_MQTT_CLIENT_IsClientConnected(&_MQTTClient);
    if (r > 0) {
      IP_MQTT_CLIENT_CheckMessageTimeouts(&_MQTTClient);
      //
      // Initially set timeouts. This is done after the connect check because
      // the KeepAlive timeout value can change depending on the CONNACK from the server.
      // The server's KeepAlive timeout takes precedence.
      // KeepAlive time is in seconds.
      // As per MQTT 3.1.1 or 5 spec the server allows for one and a half times the
      // KeepAlive time period before disconnecting the client.
      //
      if (TimeExpirePublish == 0 && TimeExpirePing == 0) {
        TimeExpirePublish = GET_TIME_32 + PUBLISH_FREQUENCY;
        TimeExpirePing    = GET_TIME_32 + (_MQTTClient.KeepAlive * 1000);
      }
      //
      // Send KeepAlive (PINGREQ) if configured.
      //
      if (_MQTTClient.KeepAlive != 0 && CHECK_TIME_EXPIRED(TimeExpirePing)) {
        TimeExpirePing = GET_TIME_32 + (_MQTTClient.KeepAlive * 1000); // Set new expire time.
        IP_MQTT_CLIENT_Timer(); // Send PINGREQ.
        IP_MQTT_CLIENT_APP_LOG(("APP: PINGREQ sent."));
      }
      //
      // Build and send Publish
      //
      //
      if (CHECK_TIME_EXPIRED(TimeExpirePublish)) {
        TimeExpirePublish = GET_TIME_32 + PUBLISH_FREQUENCY; // Set new expire time.
        //
        // Allocate message structure.
        //
        pPublish = (IP_MQTT_CLIENT_MESSAGE*)_Alloc(sizeof(IP_MQTT_CLIENT_MESSAGE));
        if (pPublish != NULL) {
          //
          // Initialize the MQTT message, which should be published.
          //
          IP_MQTT_CLIENT_MEMSET(pPublish, 0, sizeof(IP_MQTT_CLIENT_MESSAGE));
          if ((_BrokerSupportsAlias != 1) || (FirstPublishFlag == 1)) {
            pPublish->sTopic  = TOPIC_FILTER_TO_PUBLISH;
          } else {
            pPublish->sTopic  = "";
          }
          IP_MQTT_CLIENT_SNPRINTF(_aPayloadOUT, sizeof(_aPayloadOUT), "Target Test from '%s'", _MQTTClient.sId);
          pPublish->pData   = _aPayloadOUT;
          pPublish->DataLen = IP_MQTT_CLIENT_STRLEN(_aPayloadOUT);
          pPublish->QoS     = TOPIC_PUBLISH_QOS;
          pPublish->CheckUserTimeout = 1;
          pPublish->UserTimeout = GET_TIME_32 + NO_REPLY_TIMEOUT;
#if USE_MQTT_5
          //
          // Add static const publish properties.
          //
          apPropertiesPublish[0] = &_PropUserPublish;
          apPropertiesPublish[1] = &_PropExpiry;
          //
          // Fill and add a dynamic property.
          //
          Counter++;
          DynamicUserProp.PropType = IP_MQTT_PROP_TYPE_USER_PROPERTY;
          IP_MQTT_CLIENT_SNPRINTF(acKey, sizeof(acKey), "Dynamic user property key %d", Counter);
          IP_MQTT_CLIENT_SNPRINTF(acValue, sizeof(acValue), "Dynamic user property value %d", Counter);
          DynamicUserProp.PropData.Data_StrPair.pData1 = acKey;
          DynamicUserProp.PropData.Data_StrPair.Len1   = IP_MQTT_CLIENT_STRLEN(acKey);
          DynamicUserProp.PropData.Data_StrPair.pData2 = acValue;
          DynamicUserProp.PropData.Data_StrPair.Len2   = IP_MQTT_CLIENT_STRLEN(acValue);
          apPropertiesPublish[2] = &DynamicUserProp;
          //
          // Add topic alias if supported by broker
          //
          if (_BrokerSupportsAlias == 1) {
            apPropertiesPublish[3] = &_PropAlias;
            pPublish->NumProperties = SEGGER_COUNTOF(apPropertiesPublish);
          } else {
            pPublish->NumProperties = SEGGER_COUNTOF(apPropertiesPublish) - 1;
          }
          pPublish->paProperties = apPropertiesPublish;
#endif
          //
          // Publish a message.
          //
          r = IP_MQTT_CLIENT_Publish(&_MQTTClient, pPublish);
          if (r >= 0) {                                                     // Message successfully sent.
            if (pPublish->QoS > IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS0) {        // If QoS level > QoS level 0: Output the Message Id to visualize the message flow.
              IP_MQTT_CLIENT_APP_LOG(("APP: OUT: Message (Id: %d) sent. (\"%s\")", pPublish->PacketId, pPublish->pData));
            } else {
              IP_MQTT_CLIENT_APP_LOG(("APP: OUT: Message sent."));
            }
            FirstPublishFlag = 0;
          } else {
            IP_MQTT_CLIENT_APP_LOG(("APP: Could not send message."));
            //
            // If we were unable to send the PUBLISH message (e.g. we just lost our network connection)
            // we have to free the message here.
            // Alternatively the application can re-use the allocated structure and try again.
            //
            _Free(pPublish);
          }
          if (pPublish->QoS == 0) {
            //
            // Free message structure, if no QoS handling is required
            // If QoS is used the structure will be freed from the MQTT module, if it is no longer required.
            //
            _Free(pPublish);
          }
        } else {
          IP_MQTT_CLIENT_APP_LOG(("APP: Could not allocate memory to send message."));
        }
        IP_MQTT_CLIENT_APP_LOG(("APP: ----"));
      }
    } else {
      FirstPublishFlag = 1;
    }
    OS_Delay(10);
    IP_MQTT_CLIENT_CheckMessageTimeouts(&_MQTTClient);
  } while (1);
}

/*********************************************************************
*
*       _MQTT_Client()
*
*  Function description
*    MQTT client application.
*    Initializes the MQTT client and creates a task to publish
*    messages.
*
*    The main application loop establishes a connection to a broker,
*    send a subscribe message and handles incoming packets.
*/
static void _MQTT_Client(void) {
  IP_MQTT_CLIENT_MESSAGE      LastWill;
  IP_MQTT_CLIENT_TOPIC_FILTER TopicFilter[1];
  IP_MQTT_CLIENT_SUBSCRIBE*   pSubscribe;
  IP_MQTT_CONNECT_PARAM       ConnectPara;
  U8                          ReasonCode;
  int                         r;
  IP_fd_set                   ReadFds;

  pSubscribe = NULL;
  IP_AddMemory(_aMQTTPool, sizeof(_aMQTTPool));    // Memory pool used for allocation of maintenance structures.
  //
  // Initialize MQTT client context.
  //
#if (APP_USE_SSL == 0)
  r = IP_MQTT_CLIENT_Init(&_MQTTClient, _acBuffer, MQTT_CLIENT_BUFFER_SIZE, &_IP_Api, &_APP_Api, MQTT_CLIENT_NAME);
#else
  r = IP_MQTT_CLIENT_Init(&_MQTTClient, _acBuffer, MQTT_CLIENT_BUFFER_SIZE, &_IP_Api_SSL, &_APP_Api, MQTT_CLIENT_NAME);
#endif
  if (r == -1) {
    IP_MQTT_CLIENT_APP_LOG(("APP: Init Error."));
    return;
  }

  OS_CREATETASK(&_PublishTCB , "MQTTSend" , _PUBLISH_and_PING_Task , TASK_PRIO_IP_TASK - 1 , _MQTTPubStack);    // Create the Publish task
  OS_CREATERSEMA(&_RSema);                         // Create semaphore for locking.
  //
  // Main application loop.
  //
  do {
    //
    // Set the Last Will message for our publish topic.
    // This message will be sent to all other clients subscribed to this topic
    // if this client disconnects ungracefully, and the Last Will Timeout has passed.
    //
    IP_MQTT_CLIENT_MEMSET(&LastWill, 0, sizeof(IP_MQTT_CLIENT_MESSAGE));
    LastWill.sTopic  = TOPIC_FILTER_TO_PUBLISH;
    IP_MQTT_CLIENT_SNPRINTF(_aPayloadOUT, sizeof(_aPayloadOUT), "Client '%s' has disconnected ungracefully", _MQTTClient.sId);
    LastWill.pData   = _aPayloadOUT;
    LastWill.DataLen = IP_MQTT_CLIENT_STRLEN(_aPayloadOUT);
    LastWill.QoS     = IP_MQTT_CLIENT_FLAGS_PUBLISH_QOS0;
#if USE_MQTT_5
    apPropertiesLastWill[0] = &_PropWillDelay;
    LastWill.paProperties   = apPropertiesLastWill;
    LastWill.NumProperties  = SEGGER_COUNTOF(apPropertiesLastWill);
#endif
    IP_MQTT_CLIENT_SetLastWill(&_MQTTClient, &LastWill);
    //
    // Connect to the MQTT broker.
    //
    IP_MQTT_CLIENT_MEMSET(&ConnectPara, 0, sizeof(IP_MQTT_CONNECT_PARAM));
    ConnectPara.CleanSession  = 1;
    ConnectPara.Port          = MQTT_BROKER_PORT;
    ConnectPara.sAddr         = MQTT_BROKER_ADDR;
#if USE_MQTT_5
    ConnectPara.Version       = 5;
    apPropertiesConnect[0]    = &_PropUserConnect;
    apPropertiesConnect[1]    = &_PropReceiveMax;
    apPropertiesConnect[2]    = &_PropAliasMax;
    ConnectPara.paProperties  = apPropertiesConnect;
    ConnectPara.NumProperties = SEGGER_COUNTOF(apPropertiesConnect);
#else
    ConnectPara.Version = 4; // MQTT 3.1.1
#endif
    //
    // Configure MQTT keep alive time (for sending PINGREQ messages)
    //
    if (PING_FREQUENCY != 0) {
      r = IP_MQTT_CLIENT_SetKeepAlive(&_MQTTClient, PING_FREQUENCY);
      if (r != 0) {
        IP_MQTT_CLIENT_APP_LOG(("APP: Could not set timeout (currently connected ?)"));
      }
    }
    //
    // Execute connect.
    //
    r = IP_MQTT_CLIENT_ConnectEx(&_MQTTClient, &ConnectPara, &ReasonCode);
    if (r != 0) {
#if USE_MQTT_5
      IP_MQTT_CLIENT_APP_LOG(("APP: MQTT connect error: %d, ReasonCode %s.", r, (ReasonCode == 0) ? "N/A" : IP_MQTT_ReasonCode2String(ReasonCode)));
#else
      IP_MQTT_CLIENT_APP_LOG(("APP: MQTT connect error: %d", r));
#endif
      goto Disconnect;
    }
    //
    // Initialize the topic filter and subscribe structures.
    //
    pSubscribe = (IP_MQTT_CLIENT_SUBSCRIBE*)_Alloc(sizeof(IP_MQTT_CLIENT_SUBSCRIBE));
    if (pSubscribe == NULL) {
      IP_MQTT_CLIENT_APP_WARN(("APP: Could not allocate IP_MQTT_CLIENT_SUBSCRIBE structure."));
      goto Disconnect;
    }
    IP_MQTT_CLIENT_MEMSET(TopicFilter, 0, sizeof(TopicFilter));     // Topic 1
    TopicFilter[0].sTopicFilter = TOPIC_FILTER01_TO_SUBSCRIBE;
    TopicFilter[0].QoS          = TOPIC_SUBSCRIBE_QOS;
    IP_MQTT_CLIENT_MEMSET(pSubscribe, 0, sizeof(IP_MQTT_CLIENT_SUBSCRIBE));             // Subscribe structure.
    pSubscribe->pTopicFilter    = TopicFilter;
    pSubscribe->TopicCnt        = SEGGER_COUNTOF(TopicFilter);
    //
    // Subscribe to topic.
    //
    r = IP_MQTT_CLIENT_Subscribe(&_MQTTClient, pSubscribe);
    if (r < 0) {
      IP_MQTT_CLIENT_APP_LOG(("APP: Subscribe failed. Check topic filter structure."));
    } else {
      //
      // Process incoming messages and send messages
      //
      while (1) {
        r = IP_MQTT_CLIENT_IsClientConnected(&_MQTTClient);
        if (r > 0) {
          IP_FD_ZERO(&ReadFds);                                       // Clear the set
#if (APP_USE_SSL != 0)
          IP_FD_SET(_SSLSession.Socket, &ReadFds);                    // Add socket to the set
#else
          IP_FD_SET((long)_MQTTClient.Connection.Socket, &ReadFds);   // Add socket to the set
#endif
          r = select(&ReadFds, NULL, NULL, 500);                      // Check for activity. Wait 500ms.
          if (r > 0) {
            r = IP_MQTT_CLIENT_Exec(&_MQTTClient);                    // Get messages for subscribed topics, process QoS messages, ...
            if (r == 0) {
              IP_MQTT_CLIENT_APP_LOG(("IP_MQTT_CLIENT_Exec: Connection gracefully closed by peer."));
              break; // Exit loop and try to re-connect.
            } else if (r < 0) {
              IP_MQTT_CLIENT_APP_LOG(("IP_MQTT_CLIENT_Exec: Error: %d.", r));
              break; // Exit loop and try to re-connect.
            }
          }
        } else {
          //
          // If we lost our connection - exit the message processing loop.
          //
          break;
        }
      }
    }
    //
    // Disconnect.
    //
Disconnect:
    IP_MQTT_CLIENT_Disconnect(&_MQTTClient);
    IP_MQTT_CLIENT_APP_LOG(("APP: Disconnect done."));
    if (pSubscribe != NULL) {
      _Free(pSubscribe);
      pSubscribe = NULL;
    }
    OS_Delay(1000);
  } while (1);
}

/*********************************************************************
*
*       APP_MainTask()
*
*  Function description
*    Sample starting point.
*/
static void APP_MainTask(void) {
#if (APP_USE_SSL != 0)
  //
  // Initialize SSL.
  //
  SSL_Init();
#endif
  _MQTT_Client();
}

/*********************************************************************
*
*       MainTask()
*
*  Function description
*    Main task executed by the RTOS to create further resources and
*    running the main application.
*/
void MainTask(void) {
  IP_Init();
  _IFaceId = IP_INFO_GetNumInterfaces() - 1;                                           // Get the last registered interface ID as this is most likely the interface we want to use in this sample.
  OS_SetPriority(OS_GetTaskID(), TASK_PRIO_IP_TASK - 1);                               // For now, this task has highest prio except IP management tasks.
  OS_CREATETASK(&_IPTCB  , "IP_Task"  , IP_Task  , TASK_PRIO_IP_TASK   , _IPStack);    // Start the IP_Task.
#if USE_RX_TASK
  OS_CREATETASK(&_IPRxTCB, "IP_RxTask", IP_RxTask, TASK_PRIO_IP_RX_TASK, _IPRxStack);  // Start the IP_RxTask, optional.
#endif
  IP_AddStateChangeHook(&_StateChangeHook, _OnStateChange);                            // Register hook to be notified on disconnects.
  IP_Connect(_IFaceId);                                                                // Connect the interface if necessary.
  while (IP_IFaceIsReadyEx(_IFaceId) == 0) {
    BSP_ToggleLED(1);
    OS_Delay(50);
  }
  BSP_ClrLED(0);

  OS_CREATETASK(&APP_MainTCB, "APP_MainTask", APP_MainTask, TASK_PRIO_IP_TASK - 1 , APP_MainStack);
  OS_TASK_Terminate(NULL);
}

/*************************** End of file ****************************/