40 #include <ti/drivers/net/wifi/simplelink.h> 41 #include <ti/net/mqtt/mqttclient.h> 42 #include <ti/net/utils/str_mpl.h> 45 #include <ti/net/atcmd/atcmd_defs.h> 46 #include <ti/net/atcmd/atcmd.h> 47 #include <ti/net/atcmd/atcmd_mqtt.h> 48 #include <ti/net/atcmd/atcmd_gen.h> 49 #include <ti/net/atcmd/atcmd_event.h> 56 #define ATCMDMQTT_CREATE_MAX_SECURE_FILES (4) 57 #define ATCMDMQTT_CREATE_MAX_URL_SERVER_NAME (430) 58 #define ATCMDMQTT_CREATE_MAX_CLIENT_ID_LEN (64) 59 #define ATCMDMQTT_CLIENT_RX_TASK_SIZE (4096) 60 #define ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN (0x7FFF) 61 #define ATCMDMQTT_SET_MAX_USERNAME_LEN (0xFFFF) 62 #define ATCMDMQTT_SET_MAX_PASSWORD_LEN (0xFFFF) 63 #define ATCMDMQTT_SUBSCRIBE_MAX_TOPICS (4) 64 #define ATCMDMQTT_MAX_NUM_OF_CLIENTS (1) 70 typedef struct _ATCmdMqtt_Set_t_
77 MQTTClient_Will mqttWill;
78 uint16_t keepAliveTimeout;
82 typedef struct _ATCmdMqtt_CB_t_
84 MQTTClient_Handle clientIndex[ATCMDMQTT_MAX_NUM_OF_CLIENTS];
85 MQTTClient_Params attrib;
91 typedef struct _ATCmdMqtt_Publish_t_
100 typedef struct _ATCmdMqtt_Subscribe_t_
102 uint8_t numberOfTopics;
103 MQTTClient_SubscribeParams *value;
104 }ATCmdMqtt_Subscribe_t;
106 typedef struct _ATCmdMqtt_Unsubscribe_t_
108 uint8_t numberOfTopics;
109 MQTTClient_UnsubscribeParams *value;
110 }ATCmdMqtt_Unsubscribe_t;
112 typedef struct _ATCmdMqtt_Event_t_
115 uint32_t metaDateLen;
124 ATCmdMqtt_CB_t ATCmdMqtt_cb = {{NULL},{0},NULL,ATCMD_DATA_FORMAT_BINARY,0};
129 int32_t ATCmdMqtt_setFree(ATCmdMqtt_Set_t *params);
134 MQTTClient_Handle ATCmdMqtt_clientIndexParse(
char **buff, uint32_t *index,
char delim)
137 if (StrMpl_getVal(buff, index ,delim, STRMPL_FLAG_PARAM_SIZE_32) < 0)
141 if (*index > ATCMDMQTT_MAX_NUM_OF_CLIENTS)
146 return ATCmdMqtt_cb.clientIndex[*index];
152 int32_t ATCmdMqtt_eventCallback(
void *args, int32_t num,
char *buff)
154 ATCmdMqtt_Event_t *
event = (ATCmdMqtt_Event_t *)(args);
155 MQTTClient_RecvMetaDataCB *header;
156 uint32_t outputLen = 0;
158 StrMpl_setStr(ATCmd_eventMqttStr,&buff,ATCMD_DELIM_EVENT);
159 StrMpl_setListStr(ATCmd_mqttEventId,
sizeof(ATCmd_mqttEventId)/
sizeof(StrMpl_List_t), &event->id,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_32|STRMPL_FLAG_PARAM_SIGNED);
160 switch((MQTTClient_EventCB)event->id)
162 case MQTTClient_OPERATION_CB_EVENT:
164 StrMpl_setListStr(ATCmd_mqttEventOperationId,
sizeof(ATCmd_mqttEventOperationId)/
sizeof(StrMpl_List_t), &((MQTTClient_OperationMetaDataCB *)event->metaData)->messageType,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_32|STRMPL_FLAG_PARAM_UNSIGNED);
165 switch (((MQTTClient_OperationMetaDataCB *)event->metaData)->messageType)
167 case MQTTCLIENT_OPERATION_CONNACK:
169 StrMpl_setVal(event->data,&buff,ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_16|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
173 case MQTTCLIENT_OPERATION_EVT_PUBACK:
174 case MQTTCLIENT_OPERATION_UNSUBACK:
176 *(uint8_t *)(event->data) = 0;
177 StrMpl_setVal(event->data,&buff,ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
181 case MQTTCLIENT_OPERATION_SUBACK:
183 StrMpl_setArrayVal(event->data,&buff,event->dataLen,ATCMD_DELIM_TRM,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_DEC | STRMPL_FLAG_PARAM_SIZE_8 | STRMPL_FLAG_PARAM_UNSIGNED);
192 case MQTTClient_RECV_CB_EVENT:
194 header = (MQTTClient_RecvMetaDataCB *)(event->metaData);
197 memcpy(buff,header->topic,header->topLen);
198 buff += header->topLen;
199 *buff = ATCMD_DELIM_ARG;
202 StrMpl_setListStr(ATCmd_mqttQos,
sizeof(ATCmd_mqttQos)/
sizeof(StrMpl_List_t), &header->qos,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_UNSIGNED);
204 StrMpl_setVal(&header->retain,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
206 StrMpl_setVal(&header->dup,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
208 StrMpl_setVal(&ATCmdMqtt_cb.dataFormat, &buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8 |STRMPL_FLAG_PARAM_UNSIGNED|STRMPL_FLAG_PARAM_DEC);
210 outputLen =
event->dataLen;
211 if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
214 outputLen = StrMpl_getBase64EncBufSize(event->dataLen);
216 StrMpl_setVal(&outputLen,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_DEC | STRMPL_FLAG_PARAM_SIZE_32 | STRMPL_FLAG_PARAM_SIGNED);
219 if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
222 StrMpl_encodeBase64(event->data, event->dataLen, (uint8_t *)buff, &outputLen);
226 memcpy(buff,event->data,event->dataLen);
230 case MQTTClient_DISCONNECT_CB_EVENT:
233 if (event->metaData != NULL)
235 free(event->metaData);
237 if (event->data != NULL)
249 void ATCmdMqtt_clientCallback(int32_t
id,
void *metaData, uint32_t metaDateLen,
void *data, uint32_t dataLen)
251 ATCmdMqtt_Event_t event;
253 memset(&event,0,
sizeof(ATCmdMqtt_Event_t));
257 event.metaDateLen = metaDateLen;
260 event.metaData = malloc(metaDateLen);
261 if (event.metaData == NULL)
263 ATCmd_errorResult(ATCmd_errorAllocStr,0);
266 memcpy(event.metaData,metaData,metaDateLen);
269 event.dataLen = dataLen;
272 event.data = malloc(dataLen);
273 if (event.data == NULL)
275 ATCmd_errorResult(ATCmd_errorAllocStr,0);
278 memcpy(event.data,data,dataLen);
281 ATCmdEvent_compose(&event,
sizeof(ATCmdMqtt_Event_t), ATCmdMqtt_eventCallback);
288 void *ATCmdMqtt_clientThread(
void * pvParameters)
292 MQTTClient_run((MQTTClient_Handle)pvParameters);
293 if (ATCmdMqtt_cb.delRequest == 1)
295 ATCmdMqtt_cb.delRequest = 0;
307 int32_t ATCmdMqtt_clientFree(
void)
311 if (ATCmdMqtt_cb.attrib.clientId != NULL)
313 free(ATCmdMqtt_cb.attrib.clientId);
315 if (ATCmdMqtt_cb.attrib.connParams != NULL)
317 if (ATCmdMqtt_cb.attrib.connParams->serverAddr != NULL)
319 free((
void *)ATCmdMqtt_cb.attrib.connParams->serverAddr);
321 if (ATCmdMqtt_cb.attrib.connParams->secureFiles != NULL)
323 for (i=0;i < ATCMDMQTT_CREATE_MAX_SECURE_FILES;i++)
325 if (ATCmdMqtt_cb.attrib.connParams->secureFiles[i] != NULL)
327 free(ATCmdMqtt_cb.attrib.connParams->secureFiles[i]);
330 free((
void *)ATCmdMqtt_cb.attrib.connParams->secureFiles);
332 free(ATCmdMqtt_cb.attrib.connParams);
334 if (ATCmdMqtt_cb.set != NULL)
336 ATCmdMqtt_setFree(ATCmdMqtt_cb.set);
339 ATCmdMqtt_cb.clientIndex[0] = NULL;
347 int32_t ATCmdMqtt_createResult(
void *args, int32_t num,
char *buff)
350 StrMpl_setStr(ATCmd_mqttCreateStr,&buff,ATCMD_DELIM_EVENT);
351 StrMpl_setVal(&num,&buff,ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_32|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
359 int32_t ATCmdMqtt_createParse(
char *buff, MQTTClient_Params *params)
365 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->clientId), ATCMD_DELIM_ARG, ATCMDMQTT_CREATE_MAX_CLIENT_ID_LEN ,ATCmd_excludeDelimStr)) < 0)
367 if (ret != STRMPL_ERROR_PARAM_MISSING)
374 if ((ret = StrMpl_getBitmaskListVal(ATCmd_mqttCreateFlags,
sizeof(ATCmd_mqttCreateFlags)/
sizeof(StrMpl_List_t),&buff,¶ms->connParams->netconnFlags,ATCMD_DELIM_ARG,ATCMD_DELIM_BIT, ATCmd_excludeDelimArray,STRMPL_FLAG_PARAM_SIZE_32)) < 0)
379 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->connParams->serverAddr), ATCMD_DELIM_ARG, ATCMDMQTT_CREATE_MAX_URL_SERVER_NAME ,ATCmd_excludeDelimStr)) < 0)
385 if ((ret = StrMpl_getVal(&buff, ¶ms->connParams->port , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
390 if ((params->connParams->netconnFlags & MQTTCLIENT_NETCONN_SEC) != 0)
395 if ((ret = StrMpl_getListVal(ATCmd_sockSocketSecMethod,
sizeof(ATCmd_sockSocketSecMethod)/
sizeof(StrMpl_List_t), &buff, ¶ms->connParams->method, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
397 if (!((ret == STRMPL_ERROR_PARAM_MISSING) && (secure == 0)))
403 if ((ret = StrMpl_getBitmaskListVal(ATCmd_sockSocketCipher,
sizeof(ATCmd_sockSocketCipher)/
sizeof(StrMpl_List_t), &buff, ¶ms->connParams->cipher, ATCMD_DELIM_ARG,ATCMD_DELIM_BIT, ATCmd_excludeDelimArray,STRMPL_FLAG_PARAM_SIZE_32 )) < 0)
405 if (!((ret == STRMPL_ERROR_PARAM_MISSING) && (secure == 0)))
410 params->connParams->cipher = SLNETSOCK_SEC_CIPHER_FULL_LIST;
413 params->connParams->nFiles = 4;
416 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->connParams->secureFiles[0]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
418 if (ret != STRMPL_ERROR_PARAM_MISSING)
425 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->connParams->secureFiles[1]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
427 if (ret != STRMPL_ERROR_PARAM_MISSING)
434 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->connParams->secureFiles[2]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
436 if (!((ret == STRMPL_ERROR_PARAM_MISSING) && (secure == 0)))
443 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->connParams->secureFiles[3]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
445 if (ret != STRMPL_ERROR_PARAM_MISSING)
452 if ((ret = StrMpl_getListVal(ATCmd_mqttCreateMode,
sizeof(ATCmd_mqttCreateMode)/
sizeof(StrMpl_List_t), &buff, ¶ms->mqttMode31, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
457 if ((ret = StrMpl_getVal(&buff, ¶ms->blockingSend , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
462 if ((ret = StrMpl_getVal(&buff, &ATCmdMqtt_cb.dataFormat, ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
477 pthread_attr_t pAttrs;
478 struct sched_param priParam;
479 pthread_t rxThread = (pthread_t) NULL;
480 MQTTClient_Handle handle;
483 if (ATCmdMqtt_cb.clientIndex[index] != NULL)
485 ATCmd_errorResult(ATCmd_errorCmdStr,-1);
488 memset(&ATCmdMqtt_cb.attrib, 0x0,
sizeof(MQTTClient_Params));
490 ATCmdMqtt_cb.attrib.connParams = malloc(
sizeof(MQTTClient_ConnParams));
491 if (ATCmdMqtt_cb.attrib.connParams == NULL)
493 ATCmd_errorResult(ATCmd_errorAllocStr,0);
496 memset(ATCmdMqtt_cb.attrib.connParams,0,
sizeof(MQTTClient_ConnParams));
497 ATCmdMqtt_cb.attrib.connParams->secureFiles = malloc(
sizeof(
char *) * ATCMDMQTT_CREATE_MAX_SECURE_FILES);
498 if (ATCmdMqtt_cb.attrib.connParams->secureFiles == NULL)
500 ATCmd_errorResult(ATCmd_errorAllocStr,0);
503 memset((
void *)ATCmdMqtt_cb.attrib.connParams->secureFiles,0,
sizeof(
char *) * ATCMDMQTT_CREATE_MAX_SECURE_FILES);
506 ret = ATCmdMqtt_createParse((
char *)arg,&ATCmdMqtt_cb.attrib);
510 ATCmd_errorResult(ATCmd_errorParseStr,ret);
515 handle = MQTTClient_create(ATCmdMqtt_clientCallback, &ATCmdMqtt_cb.attrib);
518 ATCmdMqtt_clientFree();
519 ATCmd_errorResult(ATCmd_errorCmdStr,-1);
525 pthread_attr_init(&pAttrs);
526 priParam.sched_priority = 2;
527 ret = pthread_attr_setschedparam(&pAttrs, &priParam);
528 ret |= pthread_attr_setstacksize(&pAttrs, ATCMDMQTT_CLIENT_RX_TASK_SIZE);
529 ret |= pthread_attr_setdetachstate(&pAttrs, PTHREAD_CREATE_DETACHED);
530 ret |= pthread_create(&rxThread, &pAttrs, ATCmdMqtt_clientThread, NULL);
533 ATCmdMqtt_clientFree();
537 ATCmdMqtt_cb.clientIndex[index] = handle;
538 ATCmd_commandResult(ATCmdMqtt_createResult,NULL,index);
550 MQTTClient_Handle handle;
552 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_TRM);
555 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
559 ATCmdMqtt_cb.delRequest = 1;
560 ret = MQTTClient_delete(handle);
561 ATCmdMqtt_clientFree();
565 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
581 MQTTClient_Handle handle;
583 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_TRM);
586 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
589 ret = MQTTClient_connect(handle);
593 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
610 MQTTClient_Handle handle;
613 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_TRM);
616 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
620 ret = MQTTClient_disconnect(handle);
624 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
637 int32_t ATCmdMqtt_publishFree(ATCmdMqtt_Publish_t *params)
639 if (params->topic != NULL)
644 if (params->msg != NULL)
656 int32_t ATCmdMqtt_publishParse(
char *buff, ATCmdMqtt_Publish_t *params)
659 uint16_t outputLen = 0;
663 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->topic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
668 params->topicLen = strlen(params->topic);
671 if ((ret = StrMpl_getListVal(ATCmd_mqttQos,
sizeof(ATCmd_mqttQos)/
sizeof(StrMpl_List_t), &buff, ¶ms->flags, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
676 if ((ret = StrMpl_getVal(&buff, &retain , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
682 params->flags |= MQTT_PUBLISH_RETAIN;
686 if ((ret = StrMpl_getVal(&buff, ¶ms->msgLen , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
692 if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
695 outputLen = StrMpl_getBase64DecBufSize((uint8_t *)buff,params->msgLen);
696 params->msg = malloc(outputLen);
697 if (params->msg == NULL)
701 if (StrMpl_decodeBase64((uint8_t *)buff, params->msgLen, (uint8_t *)params->msg, (uint32_t *)&outputLen) < 0)
705 params->msgLen = outputLen;
709 params->msg = malloc(params->msgLen);
710 if (params->msg == NULL)
715 memcpy(params->msg, buff, params->msgLen);
728 ATCmdMqtt_Publish_t params;
730 MQTTClient_Handle handle;
733 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_ARG);
736 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
740 memset(¶ms, 0x0,
sizeof(ATCmdMqtt_Publish_t));
743 ret = ATCmdMqtt_publishParse((
char *)arg,¶ms);
747 ATCmd_errorResult(ATCmd_errorParseStr,ret);
748 ATCmdMqtt_publishFree(¶ms);
751 ret = MQTTClient_publish(handle,params.topic,params.topicLen,params.msg,params.msgLen,params.flags);
755 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
762 ATCmdMqtt_publishFree(¶ms);
770 int32_t ATCmdMqtt_subscribeFree(ATCmdMqtt_Subscribe_t *params)
774 if (params->value != NULL)
776 for (i=0;i<params->numberOfTopics;i++)
778 if (params->value[i].topic != NULL)
780 free(params->value[i].topic);
793 int32_t ATCmdMqtt_subscribeParse(
char *buff, ATCmdMqtt_Subscribe_t *params)
797 char delim = ATCMD_DELIM_ARG;
800 if ((ret = StrMpl_getVal(&buff, ¶ms->numberOfTopics, ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
804 if (params->numberOfTopics > ATCMDMQTT_SUBSCRIBE_MAX_TOPICS)
806 return STRMPL_ERROR_WRONG_PARAM;
808 params->value = malloc(params->numberOfTopics *
sizeof(MQTTClient_SubscribeParams));
809 if (params->value == NULL)
813 memset(params->value,0,params->numberOfTopics *
sizeof(MQTTClient_SubscribeParams));
814 for (i=0;i<params->numberOfTopics;i++)
817 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->value[i].topic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
823 if ((ret = StrMpl_getListVal(ATCmd_mqttQos,
sizeof(ATCmd_mqttQos)/
sizeof(StrMpl_List_t), &buff, ¶ms->value[i].qos, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
828 if (i == (params->numberOfTopics -1))
830 delim = ATCMD_DELIM_TRM;
833 if ((ret = StrMpl_getVal(&buff, ¶ms->value[i].persistent,delim,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
835 if (ret != STRMPL_ERROR_PARAM_MISSING)
841 params->value[i].callback = NULL;
853 ATCmdMqtt_Subscribe_t params;
855 MQTTClient_Handle handle;
857 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_ARG);
860 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
864 memset(¶ms, 0x0,
sizeof(ATCmdMqtt_Subscribe_t));
867 ret = ATCmdMqtt_subscribeParse((
char *)arg,¶ms);
871 ATCmd_errorResult(ATCmd_errorParseStr,ret);
872 ATCmdMqtt_subscribeFree(¶ms);
875 ret = MQTTClient_subscribe(handle,params.value,params.numberOfTopics);
879 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
886 ATCmdMqtt_subscribeFree(¶ms);
894 int32_t ATCmdMqtt_unsubscribeFree(ATCmdMqtt_Unsubscribe_t *params)
898 if (params->value != NULL)
900 for (i=0;i<params->numberOfTopics;i++)
902 if (params->value[i].topic != NULL)
904 free(params->value[i].topic);
917 int32_t ATCmdMqtt_unsubscribeParse(
char *buff, ATCmdMqtt_Unsubscribe_t *params)
921 char delim = ATCMD_DELIM_ARG;
924 if ((ret = StrMpl_getVal(&buff, ¶ms->numberOfTopics, ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
928 if (params->numberOfTopics > ATCMDMQTT_SUBSCRIBE_MAX_TOPICS)
930 return STRMPL_ERROR_WRONG_PARAM;
933 params->value = malloc(params->numberOfTopics *
sizeof(MQTTClient_UnsubscribeParams));
934 if (params->value == NULL)
938 memset(params->value,0,params->numberOfTopics *
sizeof(MQTTClient_UnsubscribeParams));
939 for (i=0;i<params->numberOfTopics;i++)
942 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->value[i].topic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
947 if (i == (params->numberOfTopics -1))
949 delim = ATCMD_DELIM_TRM;
952 if ((ret = StrMpl_getVal(&buff, ¶ms->value[i].persistent,delim,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
954 if (ret != STRMPL_ERROR_PARAM_MISSING)
971 ATCmdMqtt_Unsubscribe_t params;
973 MQTTClient_Handle handle;
975 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_ARG);
978 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
982 memset(¶ms, 0x0,
sizeof(ATCmdMqtt_Unsubscribe_t));
985 ret = ATCmdMqtt_unsubscribeParse((
char *)arg,¶ms);
989 ATCmd_errorResult(ATCmd_errorParseStr,ret);
990 ATCmdMqtt_unsubscribeFree(¶ms);
993 ret = MQTTClient_unsubscribe(handle,params.value,params.numberOfTopics);
997 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
1004 ATCmdMqtt_unsubscribeFree(¶ms);
1012 int32_t ATCmdMqtt_setFree(ATCmdMqtt_Set_t *params)
1016 if (params->usrName != NULL)
1018 free(params->usrName);
1020 if (params->usrPwd != NULL)
1022 free(params->usrPwd);
1024 if (params->mqttWill.willTopic != NULL)
1026 free((
void *)params->mqttWill.willTopic);
1028 if (params->mqttWill.willMsg != NULL)
1030 free((
void *)params->mqttWill.willMsg);
1041 int32_t ATCmdMqtt_setParse(
char *buff, ATCmdMqtt_Set_t *params)
1044 uint32_t outputLen = 0;
1047 if ((ret = StrMpl_getListVal(ATCmd_mqttSetOptions,
sizeof(ATCmd_mqttSetOptions)/
sizeof(StrMpl_List_t), &buff, ¶ms->option, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_16 )) < 0)
1053 switch (params->option)
1055 case MQTTClient_USER_NAME:
1056 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->usrName), ATCMD_DELIM_TRM, ATCMDMQTT_SET_MAX_USERNAME_LEN ,ATCmd_excludeDelimStr)) < 0)
1060 params->value = (
void *)(params->usrName);
1062 case MQTTClient_PASSWORD:
1063 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->usrPwd), ATCMD_DELIM_TRM, ATCMDMQTT_SET_MAX_PASSWORD_LEN ,ATCmd_excludeDelimStr)) < 0)
1067 params->value = (
void *)(params->usrPwd);
1069 case MQTTClient_WILL_PARAM:
1071 if ((ret = StrMpl_getAllocStr(&buff, (
char **)&(params->mqttWill.willTopic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
1076 if ((ret = StrMpl_getListVal(ATCmd_mqttQos,
sizeof(ATCmd_mqttQos)/
sizeof(StrMpl_List_t), &buff, ¶ms->mqttWill.willQos, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
1081 if ((ret = StrMpl_getVal(&buff, ¶ms->mqttWill.retain , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
1087 if ((ret = StrMpl_getVal(&buff, ¶ms->willMsgLen , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
1093 if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
1096 outputLen = StrMpl_getBase64DecBufSize((uint8_t *)buff,params->willMsgLen);
1097 params->mqttWill.willMsg = malloc(outputLen);
1098 if (params->mqttWill.willMsg == NULL)
1102 if (StrMpl_decodeBase64((uint8_t *)buff, params->willMsgLen, (uint8_t *)params->mqttWill.willMsg, (uint32_t *)&outputLen) < 0)
1109 params->mqttWill.willMsg = malloc(params->willMsgLen);
1110 if (params->mqttWill.willMsg == NULL)
1114 memcpy((
void *)params->mqttWill.willMsg, buff, params->willMsgLen);
1116 params->value = (
void *)&(params->mqttWill);
1118 case MQTTClient_KEEPALIVE_TIME:
1120 if ((ret = StrMpl_getVal(&buff, ¶ms->keepAliveTimeout, ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
1124 params->value = (
void *)&(params->keepAliveTimeout);
1126 case MQTTClient_CLEAN_CONNECT:
1128 if ((ret = StrMpl_getVal(&buff, ¶ms->clean, ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
1132 params->value = (
void *)&(params->clean);
1146 MQTTClient_Handle handle;
1148 handle = ATCmdMqtt_clientIndexParse((
char **)&arg,&index,ATCMD_DELIM_ARG);
1151 ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
1155 ATCmdMqtt_cb.set = malloc(
sizeof(ATCmdMqtt_Set_t));
1156 if (ATCmdMqtt_cb.set == NULL)
1158 ATCmd_errorResult(ATCmd_errorAllocStr,0);
1161 memset(ATCmdMqtt_cb.set, 0x0,
sizeof(ATCmdMqtt_Set_t));
1164 ret = ATCmdMqtt_setParse((
char *)arg, ATCmdMqtt_cb.set);
1168 ATCmd_errorResult(ATCmd_errorParseStr,ret);
1169 ATCmdMqtt_setFree(ATCmdMqtt_cb.set);
1173 ret = MQTTClient_set(handle,ATCmdMqtt_cb.set->option,ATCmdMqtt_cb.set->value,0);
1177 ATCmdMqtt_setFree(ATCmdMqtt_cb.set);
1178 ATCmd_errorResult(ATCmd_errorCmdStr,ret);
int32_t ATCmdMqtt_publishCallback(void *arg)
Mqtt Client Publish callback.
int32_t ATCmdMqtt_unsubscribeCallback(void *arg)
Mqtt Client Unsubscribe callback.
int32_t ATCmdMqtt_setCallback(void *arg)
Mqtt Client Set callback.
int32_t ATCmdMqtt_disconnectCallback(void *arg)
Mqtt Client Disconnect callback.
int32_t ATCmdMqtt_deleteCallback(void *arg)
Mqtt Client delete callback.
int32_t ATCmdMqtt_subscribeCallback(void *arg)
Mqtt Client Subscribe callback.
int32_t ATCmdMqtt_createCallback(void *arg)
Mqtt Client Create callback.
int32_t ATCmdMqtt_connectCallback(void *arg)
Mqtt Client Connect callback.