SimpleLink CC32xx ATCommands Library
Simplifies the implementation of Internet connectivity
atcmd_mqtt.c
1 /*
2  * Copyright (c) 2016, Texas Instruments Incorporated
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  *
9  * * Redistributions of source code must retain the above copyright
10  * notice, this list of conditions and the following disclaimer.
11  *
12  * * Redistributions in binary form must reproduce the above copyright
13  * notice, this list of conditions and the following disclaimer in the
14  * documentation and/or other materials provided with the distribution.
15  *
16  * * Neither the name of Texas Instruments Incorporated nor the names of
17  * its contributors may be used to endorse or promote products derived
18  * from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
22  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
23  * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
24  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
25  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
26  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
27  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
28  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
29  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
30  * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 //*****************************************************************************
34 // includes
35 //*****************************************************************************
36 #include <stdlib.h>
37 #include <stdio.h>
38 
39 /* TI-DRIVERS Header files */
40 #include <ti/drivers/net/wifi/simplelink.h>
41 #include <ti/net/mqtt/mqttclient.h>
42 #include <ti/net/utils/str_mpl.h>
43 
44 /* AT Header files */
45 #include <ti/net/atcmd/atcmd_defs.h>
46 #include <ti/net/atcmd/atcmd.h>
47 #include <ti/net/atcmd/atcmd_mqtt.h>
48 #include <ti/net/atcmd/atcmd_gen.h>
49 #include <ti/net/atcmd/atcmd_event.h>
50 
51 #include <pthread.h>
52 
53 //*****************************************************************************
54 // defines
55 //*****************************************************************************
56 #define ATCMDMQTT_CREATE_MAX_SECURE_FILES (4)
57 #define ATCMDMQTT_CREATE_MAX_URL_SERVER_NAME (430)
58 #define ATCMDMQTT_CREATE_MAX_CLIENT_ID_LEN (64)
59 #define ATCMDMQTT_CLIENT_RX_TASK_SIZE (4096)
60 #define ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN (0x7FFF)
61 #define ATCMDMQTT_SET_MAX_USERNAME_LEN (0xFFFF)
62 #define ATCMDMQTT_SET_MAX_PASSWORD_LEN (0xFFFF)
63 #define ATCMDMQTT_SUBSCRIBE_MAX_TOPICS (4)
64 #define ATCMDMQTT_MAX_NUM_OF_CLIENTS (1)
65 
66 
67 //*****************************************************************************
68 // typedefs
69 //*****************************************************************************
70 typedef struct _ATCmdMqtt_Set_t_
71 {
72  uint16_t option;
73  void *value;
74  char *usrName;
75  char *usrPwd;
76  uint16_t willMsgLen;
77  MQTTClient_Will mqttWill;
78  uint16_t keepAliveTimeout;
79  uint8_t clean;
80 }ATCmdMqtt_Set_t;
81 
82 typedef struct _ATCmdMqtt_CB_t_
83 {
84  MQTTClient_Handle clientIndex[ATCMDMQTT_MAX_NUM_OF_CLIENTS];
85  MQTTClient_Params attrib;
86  ATCmdMqtt_Set_t *set;
87  uint8_t dataFormat;
88  uint8_t delRequest;
89 } ATCmdMqtt_CB_t;
90 
91 typedef struct _ATCmdMqtt_Publish_t_
92 {
93  char *topic;
94  uint16_t topicLen;
95  uint32_t flags;
96  uint16_t msgLen;
97  char *msg;
98 }ATCmdMqtt_Publish_t;
99 
100 typedef struct _ATCmdMqtt_Subscribe_t_
101 {
102  uint8_t numberOfTopics;
103  MQTTClient_SubscribeParams *value;
104 }ATCmdMqtt_Subscribe_t;
105 
106 typedef struct _ATCmdMqtt_Unsubscribe_t_
107 {
108  uint8_t numberOfTopics;
109  MQTTClient_UnsubscribeParams *value;
110 }ATCmdMqtt_Unsubscribe_t;
111 
112 typedef struct _ATCmdMqtt_Event_t_
113 {
114  int32_t id;
115  uint32_t metaDateLen;
116  void * metaData;
117  uint32_t dataLen;
118  void *data;
119 }ATCmdMqtt_Event_t;
120 
121 //*****************************************************************************
122 // global variables
123 //*****************************************************************************
124 ATCmdMqtt_CB_t ATCmdMqtt_cb = {{NULL},{0},NULL,ATCMD_DATA_FORMAT_BINARY,0};
125 
126 //*****************************************************************************
127 // MQTT AT Commands Routines
128 //*****************************************************************************
129 int32_t ATCmdMqtt_setFree(ATCmdMqtt_Set_t *params);
130 
131 /*
132  Parse client index and get the corresponding handle
133 */
134 MQTTClient_Handle ATCmdMqtt_clientIndexParse(char **buff, uint32_t *index, char delim)
135 {
136  /* Mode */
137  if (StrMpl_getVal(buff, index ,delim, STRMPL_FLAG_PARAM_SIZE_32) < 0)
138  {
139  return NULL;
140  }
141  if (*index > ATCMDMQTT_MAX_NUM_OF_CLIENTS)
142  {
143  return NULL;
144  }
145  /* return client handle */
146  return ATCmdMqtt_cb.clientIndex[*index];
147 }
148 
149 /*
150  MQTT event callback which compose and send the received event to the host
151 */
152 int32_t ATCmdMqtt_eventCallback(void *args, int32_t num, char *buff)
153 {
154  ATCmdMqtt_Event_t *event = (ATCmdMqtt_Event_t *)(args);
155  MQTTClient_RecvMetaDataCB *header;
156  uint32_t outputLen = 0;
157 
158  StrMpl_setStr(ATCmd_eventMqttStr,&buff,ATCMD_DELIM_EVENT);
159  StrMpl_setListStr(ATCmd_mqttEventId, sizeof(ATCmd_mqttEventId)/sizeof(StrMpl_List_t), &event->id,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_32|STRMPL_FLAG_PARAM_SIGNED);
160  switch((MQTTClient_EventCB)event->id)
161  {
162  case MQTTClient_OPERATION_CB_EVENT:
163  {
164  StrMpl_setListStr(ATCmd_mqttEventOperationId, sizeof(ATCmd_mqttEventOperationId)/sizeof(StrMpl_List_t), &((MQTTClient_OperationMetaDataCB *)event->metaData)->messageType,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_32|STRMPL_FLAG_PARAM_UNSIGNED);
165  switch (((MQTTClient_OperationMetaDataCB *)event->metaData)->messageType)
166  {
167  case MQTTCLIENT_OPERATION_CONNACK:
168  {
169  StrMpl_setVal(event->data,&buff,ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_16|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
170  break;
171  }
172 
173  case MQTTCLIENT_OPERATION_EVT_PUBACK:
174  case MQTTCLIENT_OPERATION_UNSUBACK:
175  {
176  *(uint8_t *)(event->data) = 0;
177  StrMpl_setVal(event->data,&buff,ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
178  break;
179  }
180 
181  case MQTTCLIENT_OPERATION_SUBACK:
182  {
183  StrMpl_setArrayVal(event->data,&buff,event->dataLen,ATCMD_DELIM_TRM,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_DEC | STRMPL_FLAG_PARAM_SIZE_8 | STRMPL_FLAG_PARAM_UNSIGNED);
184  break;
185  }
186 
187  default:
188  break;
189  }
190  break;
191  }
192  case MQTTClient_RECV_CB_EVENT:
193  {
194  header = (MQTTClient_RecvMetaDataCB *)(event->metaData);
195  /* topic */
196  //topic should be string but it been send from mqtt client without '/0' so we can not read it as string
197  memcpy(buff,header->topic,header->topLen);
198  buff += header->topLen;
199  *buff = ATCMD_DELIM_ARG;
200  buff++;
201  /* qos */
202  StrMpl_setListStr(ATCmd_mqttQos, sizeof(ATCmd_mqttQos)/sizeof(StrMpl_List_t), &header->qos,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_UNSIGNED);
203  /* retain */
204  StrMpl_setVal(&header->retain,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
205  /* duplicate */
206  StrMpl_setVal(&header->dup,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
207  /* format */
208  StrMpl_setVal(&ATCmdMqtt_cb.dataFormat, &buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8 |STRMPL_FLAG_PARAM_UNSIGNED|STRMPL_FLAG_PARAM_DEC);
209  /* data length */
210  outputLen = event->dataLen;
211  if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
212  {
213  /* convert length from binary to base64 */
214  outputLen = StrMpl_getBase64EncBufSize(event->dataLen);
215  }
216  StrMpl_setVal(&outputLen,&buff,ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_DEC | STRMPL_FLAG_PARAM_SIZE_32 | STRMPL_FLAG_PARAM_SIGNED);
217 
218  /* data */
219  if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
220  {
221  /* convert data to base64 */
222  StrMpl_encodeBase64(event->data, event->dataLen, (uint8_t *)buff, &outputLen);
223  }
224  else
225  {
226  memcpy(buff,event->data,event->dataLen);
227  }
228  break;
229  }
230  case MQTTClient_DISCONNECT_CB_EVENT:
231  break;
232  }
233  if (event->metaData != NULL)
234  {
235  free(event->metaData);
236  }
237  if (event->data != NULL)
238  {
239  free(event->data);
240  }
241  free(event);
242 
243  return 0;
244 }
245 
249 void ATCmdMqtt_clientCallback(int32_t id, void *metaData, uint32_t metaDateLen, void *data, uint32_t dataLen)
250 {
251  ATCmdMqtt_Event_t event;
252 
253  memset(&event,0,sizeof(ATCmdMqtt_Event_t));
254  event.id = id;
255 
256  /* meta data */
257  event.metaDateLen = metaDateLen;
258  if (metaDateLen > 0)
259  {
260  event.metaData = malloc(metaDateLen);
261  if (event.metaData == NULL)
262  {
263  ATCmd_errorResult(ATCmd_errorAllocStr,0);
264  return;
265  }
266  memcpy(event.metaData,metaData,metaDateLen);
267  }
268  /* data */
269  event.dataLen = dataLen;
270  if (dataLen > 0)
271  {
272  event.data = malloc(dataLen);
273  if (event.data == NULL)
274  {
275  ATCmd_errorResult(ATCmd_errorAllocStr,0);
276  return;
277  }
278  memcpy(event.data,data,dataLen);
279  }
280  /* send the event */
281  ATCmdEvent_compose(&event, sizeof(ATCmdMqtt_Event_t), ATCmdMqtt_eventCallback);
282 }
283 
284 
285 /*
286  MQTT client thread
287 */
288 void *ATCmdMqtt_clientThread(void * pvParameters)
289 {
290  while (1)
291  {
292  MQTTClient_run((MQTTClient_Handle)pvParameters);
293  if (ATCmdMqtt_cb.delRequest == 1)
294  {
295  ATCmdMqtt_cb.delRequest = 0;
296  break;
297  }
298  }
299  pthread_exit(0);
300 
301  return NULL;
302 }
303 
304 /*
305  Free allocated memory for MQTT client instant
306 */
307 int32_t ATCmdMqtt_clientFree(void)
308 {
309  uint8_t i;
310 
311  if (ATCmdMqtt_cb.attrib.clientId != NULL)
312  {
313  free(ATCmdMqtt_cb.attrib.clientId);
314  }
315  if (ATCmdMqtt_cb.attrib.connParams != NULL)
316  {
317  if (ATCmdMqtt_cb.attrib.connParams->serverAddr != NULL)
318  {
319  free((void *)ATCmdMqtt_cb.attrib.connParams->serverAddr);
320  }
321  if (ATCmdMqtt_cb.attrib.connParams->secureFiles != NULL)
322  {
323  for (i=0;i < ATCMDMQTT_CREATE_MAX_SECURE_FILES;i++)
324  {
325  if (ATCmdMqtt_cb.attrib.connParams->secureFiles[i] != NULL)
326  {
327  free(ATCmdMqtt_cb.attrib.connParams->secureFiles[i]);
328  }
329  }
330  free((void *)ATCmdMqtt_cb.attrib.connParams->secureFiles);
331  }
332  free(ATCmdMqtt_cb.attrib.connParams);
333  }
334  if (ATCmdMqtt_cb.set != NULL)
335  {
336  ATCmdMqtt_setFree(ATCmdMqtt_cb.set);
337  }
338  /* delete client index */
339  ATCmdMqtt_cb.clientIndex[0] = NULL;
340 
341  return 0;
342 }
343 
344 /*
345  Compose and send MQTT create client result.
346 */
347 int32_t ATCmdMqtt_createResult(void *args, int32_t num, char *buff)
348 {
349 
350  StrMpl_setStr(ATCmd_mqttCreateStr,&buff,ATCMD_DELIM_EVENT);
351  StrMpl_setVal(&num,&buff,ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_32|STRMPL_FLAG_PARAM_DEC|STRMPL_FLAG_PARAM_UNSIGNED);
352 
353  return 0;
354 }
355 
356 /*
357  Parse MQTT client create command.
358 */
359 int32_t ATCmdMqtt_createParse(char *buff, MQTTClient_Params *params)
360 {
361  int32_t ret = 0;
362  uint8_t secure = 0;
363 
364  /* client ID */
365  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->clientId), ATCMD_DELIM_ARG, ATCMDMQTT_CREATE_MAX_CLIENT_ID_LEN ,ATCmd_excludeDelimStr)) < 0)
366  {
367  if (ret != STRMPL_ERROR_PARAM_MISSING)
368  {
369  return ret;
370  }
371  }
372 
373  /* flags */
374  if ((ret = StrMpl_getBitmaskListVal(ATCmd_mqttCreateFlags, sizeof(ATCmd_mqttCreateFlags)/sizeof(StrMpl_List_t),&buff,&params->connParams->netconnFlags,ATCMD_DELIM_ARG,ATCMD_DELIM_BIT, ATCmd_excludeDelimArray,STRMPL_FLAG_PARAM_SIZE_32)) < 0)
375  {
376  return ret;
377  }
378  /* server address (ip or url) */
379  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->connParams->serverAddr), ATCMD_DELIM_ARG, ATCMDMQTT_CREATE_MAX_URL_SERVER_NAME ,ATCmd_excludeDelimStr)) < 0)
380  {
381  return ret;
382  }
383 
384  /* port */
385  if ((ret = StrMpl_getVal(&buff, &params->connParams->port , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
386  {
387  return ret;
388  }
389 
390  if ((params->connParams->netconnFlags & MQTTCLIENT_NETCONN_SEC) != 0)
391  {
392  secure = 1;
393  }
394  /* method */
395  if ((ret = StrMpl_getListVal(ATCmd_sockSocketSecMethod, sizeof(ATCmd_sockSocketSecMethod)/sizeof(StrMpl_List_t), &buff, &params->connParams->method, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
396  {
397  if (!((ret == STRMPL_ERROR_PARAM_MISSING) && (secure == 0)))
398  {
399  return ret;
400  }
401  }
402  /* cipher */
403  if ((ret = StrMpl_getBitmaskListVal(ATCmd_sockSocketCipher, sizeof(ATCmd_sockSocketCipher)/sizeof(StrMpl_List_t), &buff, &params->connParams->cipher, ATCMD_DELIM_ARG,ATCMD_DELIM_BIT, ATCmd_excludeDelimArray,STRMPL_FLAG_PARAM_SIZE_32 )) < 0)
404  {
405  if (!((ret == STRMPL_ERROR_PARAM_MISSING) && (secure == 0)))
406  {
407  return ret;
408  }
409  /* set to the default value */
410  params->connParams->cipher = SLNETSOCK_SEC_CIPHER_FULL_LIST;
411  }
412  /* set the number of files to fix value */
413  params->connParams->nFiles = 4;
414 
415  /* Private Key File */
416  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->connParams->secureFiles[0]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
417  {
418  if (ret != STRMPL_ERROR_PARAM_MISSING)
419  {
420  return ret;
421  }
422  }
423 
424  /* Certificate File Name */
425  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->connParams->secureFiles[1]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
426  {
427  if (ret != STRMPL_ERROR_PARAM_MISSING)
428  {
429  return ret;
430  }
431  }
432 
433  /* CA File Name */
434  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->connParams->secureFiles[2]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
435  {
436  if (!((ret == STRMPL_ERROR_PARAM_MISSING) && (secure == 0)))
437  {
438  return ret;
439  }
440  }
441 
442  /* DH Key File Name */
443  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->connParams->secureFiles[3]), ATCMD_DELIM_ARG, 0xFF ,ATCmd_excludeDelimStr)) < 0)
444  {
445  if (ret != STRMPL_ERROR_PARAM_MISSING)
446  {
447  return ret;
448  }
449  }
450 
451  /* mqtt mode */
452  if ((ret = StrMpl_getListVal(ATCmd_mqttCreateMode, sizeof(ATCmd_mqttCreateMode)/sizeof(StrMpl_List_t), &buff, &params->mqttMode31, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
453  {
454  return ret;
455  }
456  /* blocking Send */
457  if ((ret = StrMpl_getVal(&buff, &params->blockingSend , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
458  {
459  return ret;
460  }
461  /* data format */
462  if ((ret = StrMpl_getVal(&buff, &ATCmdMqtt_cb.dataFormat, ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
463  {
464  return ret;
465  }
466 
467  return ret;
468 }
469 
470 
471 /*
472  MQTT client create callback.
473 */
474 int32_t ATCmdMqtt_createCallback(void *arg)
475 {
476  int32_t ret = 0;
477  pthread_attr_t pAttrs;
478  struct sched_param priParam;
479  pthread_t rxThread = (pthread_t) NULL;
480  MQTTClient_Handle handle;
481  uint8_t index = 0;
482 
483  if (ATCmdMqtt_cb.clientIndex[index] != NULL)
484  {
485  ATCmd_errorResult(ATCmd_errorCmdStr,-1);
486  return -1;
487  }
488  memset(&ATCmdMqtt_cb.attrib, 0x0, sizeof(MQTTClient_Params));
489 
490  ATCmdMqtt_cb.attrib.connParams = malloc(sizeof(MQTTClient_ConnParams));
491  if (ATCmdMqtt_cb.attrib.connParams == NULL)
492  {
493  ATCmd_errorResult(ATCmd_errorAllocStr,0);
494  return -1;
495  }
496  memset(ATCmdMqtt_cb.attrib.connParams,0,sizeof(MQTTClient_ConnParams));
497  ATCmdMqtt_cb.attrib.connParams->secureFiles = malloc(sizeof(char *) * ATCMDMQTT_CREATE_MAX_SECURE_FILES);
498  if (ATCmdMqtt_cb.attrib.connParams->secureFiles == NULL)
499  {
500  ATCmd_errorResult(ATCmd_errorAllocStr,0);
501  return -1;
502  }
503  memset((void *)ATCmdMqtt_cb.attrib.connParams->secureFiles,0,sizeof(char *) * ATCMDMQTT_CREATE_MAX_SECURE_FILES);
504 
505  /* Call the command parser */
506  ret = ATCmdMqtt_createParse((char *)arg,&ATCmdMqtt_cb.attrib);
507 
508  if(ret < 0)
509  {
510  ATCmd_errorResult(ATCmd_errorParseStr,ret);
511  return -1;
512  }
513 
514  /* create mqtt instant */
515  handle = MQTTClient_create(ATCmdMqtt_clientCallback, &ATCmdMqtt_cb.attrib);
516  if(handle == NULL)
517  {
518  ATCmdMqtt_clientFree();
519  ATCmd_errorResult(ATCmd_errorCmdStr,-1);
520  return -1;
521  }
522 
523  /* Open Client Receive Thread start the receive task. Set priority and */
524  /* stack size attributes */
525  pthread_attr_init(&pAttrs);
526  priParam.sched_priority = 2;
527  ret = pthread_attr_setschedparam(&pAttrs, &priParam);
528  ret |= pthread_attr_setstacksize(&pAttrs, ATCMDMQTT_CLIENT_RX_TASK_SIZE);
529  ret |= pthread_attr_setdetachstate(&pAttrs, PTHREAD_CREATE_DETACHED);
530  ret |= pthread_create(&rxThread, &pAttrs, ATCmdMqtt_clientThread, NULL);
531  if (ret != 0)
532  {
533  ATCmdMqtt_clientFree();
534  return -1;
535  }
536  /* insert the handle to list */
537  ATCmdMqtt_cb.clientIndex[index] = handle;
538  ATCmd_commandResult(ATCmdMqtt_createResult,NULL,index);
539  ATCmd_okResult();
540  return 0;
541 }
542 
543 /*
544  MQTT client callback.
545 */
546 int32_t ATCmdMqtt_deleteCallback(void *arg)
547 {
548  int32_t ret = 0;
549  uint32_t index;
550  MQTTClient_Handle handle;
551 
552  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_TRM);
553  if (handle == NULL)
554  {
555  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
556  return -1;
557  }
558 
559  ATCmdMqtt_cb.delRequest = 1;
560  ret = MQTTClient_delete(handle);
561  ATCmdMqtt_clientFree();
562 
563  if (ret < 0)
564  {
565  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
566  }
567  else
568  {
569  ATCmd_okResult();
570  }
571  return ret;
572 }
573 
574 /*
575  MQTT client connect callback.
576 */
577 int32_t ATCmdMqtt_connectCallback(void *arg)
578 {
579  int32_t ret = 0;
580  uint32_t index;
581  MQTTClient_Handle handle;
582 
583  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_TRM);
584  if (handle == NULL)
585  {
586  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
587  return -1;
588  }
589  ret = MQTTClient_connect(handle);
590 
591  if (ret < 0)
592  {
593  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
594  }
595  else
596  {
597  ATCmd_okResult();
598  }
599 
600  return ret;
601 }
602 
603 /*
604  MQTT client disconnect callback.
605 */
607 {
608  int32_t ret = 0;
609  uint32_t index;
610  MQTTClient_Handle handle;
611 
612  /* parse the client index */
613  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_TRM);
614  if (handle == NULL)
615  {
616  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
617  return -1;
618  }
619  /* mqtt client disconnect */
620  ret = MQTTClient_disconnect(handle);
621 
622  if (ret < 0)
623  {
624  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
625  }
626  else
627  {
628  ATCmd_okResult();
629  }
630 
631  return ret;
632 }
633 
634 /*
635  Free allocated memory for MQTT client publish command
636 */
637 int32_t ATCmdMqtt_publishFree(ATCmdMqtt_Publish_t *params)
638 {
639  if (params->topic != NULL)
640  {
641  free(params->topic);
642  }
643 
644  if (params->msg != NULL)
645  {
646  free(params->msg);
647  }
648 
649  return 0;
650 }
651 
652 
653 /*
654  Parse MQTT client publish command.
655 */
656 int32_t ATCmdMqtt_publishParse(char *buff, ATCmdMqtt_Publish_t *params)
657 {
658  int32_t ret = 0;
659  uint16_t outputLen = 0;
660  uint8_t retain;
661 
662  /* topic */
663  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->topic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
664  {
665  return ret;
666  }
667  /* topic length */
668  params->topicLen = strlen(params->topic);
669 
670  /* qos */
671  if ((ret = StrMpl_getListVal(ATCmd_mqttQos, sizeof(ATCmd_mqttQos)/sizeof(StrMpl_List_t), &buff, &params->flags, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
672  {
673  return ret;
674  }
675  /* retain */
676  if ((ret = StrMpl_getVal(&buff, &retain , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
677  {
678  return ret;
679  }
680  if (retain != 0)
681  {
682  params->flags |= MQTT_PUBLISH_RETAIN;
683  }
684 
685  /* message length */
686  if ((ret = StrMpl_getVal(&buff, &params->msgLen , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
687  {
688  return ret;
689  }
690 
691  /* message */
692  if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
693  {
694  /* convert length to binary length */
695  outputLen = StrMpl_getBase64DecBufSize((uint8_t *)buff,params->msgLen);
696  params->msg = malloc(outputLen);
697  if (params->msg == NULL)
698  {
699  return -1;
700  }
701  if (StrMpl_decodeBase64((uint8_t *)buff, params->msgLen, (uint8_t *)params->msg, (uint32_t *)&outputLen) < 0)
702  {
703  return -1;
704  }
705  params->msgLen = outputLen;
706  }
707  else
708  {
709  params->msg = malloc(params->msgLen);
710  if (params->msg == NULL)
711  {
712  return -1;
713  }
714 
715  memcpy(params->msg, buff, params->msgLen);
716  }
717 
718  return ret;
719 }
720 
721 
722 /*
723  MQTT client publish command callback.
724 */
725 int32_t ATCmdMqtt_publishCallback(void *arg)
726 {
727  int32_t ret = 0;
728  ATCmdMqtt_Publish_t params;
729  uint32_t index;
730  MQTTClient_Handle handle;
731 
732  /* parse client index */
733  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_ARG);
734  if (handle == NULL)
735  {
736  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
737  return -1;
738  }
739 
740  memset(&params, 0x0, sizeof(ATCmdMqtt_Publish_t));
741 
742  /* Call the command parser */
743  ret = ATCmdMqtt_publishParse((char *)arg,&params);
744 
745  if(ret < 0)
746  {
747  ATCmd_errorResult(ATCmd_errorParseStr,ret);
748  ATCmdMqtt_publishFree(&params);
749  return -1;
750  }
751  ret = MQTTClient_publish(handle,params.topic,params.topicLen,params.msg,params.msgLen,params.flags);
752 
753  if (ret < 0)
754  {
755  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
756  }
757  else
758  {
759  ATCmd_okResult();
760  }
761 
762  ATCmdMqtt_publishFree(&params);
763 
764  return ret;
765 }
766 
767 /*
768  Free allocated memory for MQTT client subscribe command
769 */
770 int32_t ATCmdMqtt_subscribeFree(ATCmdMqtt_Subscribe_t *params)
771 {
772  uint8_t i;
773 
774  if (params->value != NULL)
775  {
776  for (i=0;i<params->numberOfTopics;i++)
777  {
778  if (params->value[i].topic != NULL)
779  {
780  free(params->value[i].topic);
781  }
782  }
783  free(params->value);
784  }
785 
786  return 0;
787 }
788 
789 
790 /*
791  Parse MQTT client subscribe command.
792 */
793 int32_t ATCmdMqtt_subscribeParse(char *buff, ATCmdMqtt_Subscribe_t *params)
794 {
795  int32_t ret = 0;
796  uint8_t i;
797  char delim = ATCMD_DELIM_ARG;
798 
799  /* number of topics */
800  if ((ret = StrMpl_getVal(&buff, &params->numberOfTopics, ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
801  {
802  return ret;
803  }
804  if (params->numberOfTopics > ATCMDMQTT_SUBSCRIBE_MAX_TOPICS)
805  {
806  return STRMPL_ERROR_WRONG_PARAM;
807  }
808  params->value = malloc(params->numberOfTopics * sizeof(MQTTClient_SubscribeParams));
809  if (params->value == NULL)
810  {
811  return -1;
812  }
813  memset(params->value,0,params->numberOfTopics * sizeof(MQTTClient_SubscribeParams));
814  for (i=0;i<params->numberOfTopics;i++)
815  {
816  /* topic */
817  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->value[i].topic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
818  {
819  return ret;
820  }
821 
822  /* qos */
823  if ((ret = StrMpl_getListVal(ATCmd_mqttQos, sizeof(ATCmd_mqttQos)/sizeof(StrMpl_List_t), &buff, &params->value[i].qos, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
824  {
825  return ret;
826  }
827 
828  if (i == (params->numberOfTopics -1))
829  {
830  delim = ATCMD_DELIM_TRM;
831  }
832  /* persistent */
833  if ((ret = StrMpl_getVal(&buff, &params->value[i].persistent,delim,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
834  {
835  if (ret != STRMPL_ERROR_PARAM_MISSING)
836  {
837  return ret;
838  }
839  ret = 0;
840  }
841  params->value[i].callback = NULL;
842  }
843  return ret;
844 }
845 
846 
847 /*
848  MQTT client subscribe command callback.
849 */
850 int32_t ATCmdMqtt_subscribeCallback(void *arg)
851 {
852  int32_t ret = 0;
853  ATCmdMqtt_Subscribe_t params;
854  uint32_t index;
855  MQTTClient_Handle handle;
856 
857  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_ARG);
858  if (handle == NULL)
859  {
860  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
861  return -1;
862  }
863 
864  memset(&params, 0x0, sizeof(ATCmdMqtt_Subscribe_t));
865 
866  /* Call the command parser */
867  ret = ATCmdMqtt_subscribeParse((char *)arg,&params);
868 
869  if(ret < 0)
870  {
871  ATCmd_errorResult(ATCmd_errorParseStr,ret);
872  ATCmdMqtt_subscribeFree(&params);
873  return -1;
874  }
875  ret = MQTTClient_subscribe(handle,params.value,params.numberOfTopics);
876 
877  if (ret < 0)
878  {
879  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
880  }
881  else
882  {
883  ATCmd_okResult();
884  }
885 
886  ATCmdMqtt_subscribeFree(&params);
887 
888  return ret;
889 }
890 
891 /*
892  Free allocated memory for MQTT client unsubscribe command
893 */
894 int32_t ATCmdMqtt_unsubscribeFree(ATCmdMqtt_Unsubscribe_t *params)
895 {
896  uint8_t i;
897 
898  if (params->value != NULL)
899  {
900  for (i=0;i<params->numberOfTopics;i++)
901  {
902  if (params->value[i].topic != NULL)
903  {
904  free(params->value[i].topic);
905  }
906  }
907  free(params->value);
908  }
909 
910  return 0;
911 }
912 
913 
914 /*
915  Parse MQTT client unsubscribe command.
916 */
917 int32_t ATCmdMqtt_unsubscribeParse(char *buff, ATCmdMqtt_Unsubscribe_t *params)
918 {
919  int32_t ret = 0;
920  uint8_t i;
921  char delim = ATCMD_DELIM_ARG;
922 
923  /* number of topics */
924  if ((ret = StrMpl_getVal(&buff, &params->numberOfTopics, ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
925  {
926  return ret;
927  }
928  if (params->numberOfTopics > ATCMDMQTT_SUBSCRIBE_MAX_TOPICS)
929  {
930  return STRMPL_ERROR_WRONG_PARAM;
931  }
932 
933  params->value = malloc(params->numberOfTopics * sizeof(MQTTClient_UnsubscribeParams));
934  if (params->value == NULL)
935  {
936  return -1;
937  }
938  memset(params->value,0,params->numberOfTopics * sizeof(MQTTClient_UnsubscribeParams));
939  for (i=0;i<params->numberOfTopics;i++)
940  {
941  /* topic */
942  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->value[i].topic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
943  {
944  return ret;
945  }
946 
947  if (i == (params->numberOfTopics -1))
948  {
949  delim = ATCMD_DELIM_TRM;
950  }
951  /* persistent */
952  if ((ret = StrMpl_getVal(&buff, &params->value[i].persistent,delim,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
953  {
954  if (ret != STRMPL_ERROR_PARAM_MISSING)
955  {
956  return ret;
957  }
958  ret = 0;
959  }
960  }
961  return ret;
962 }
963 
964 
965 /*
966  MQTT client unsubscribe command callback.
967 */
969 {
970  int32_t ret = 0;
971  ATCmdMqtt_Unsubscribe_t params;
972  uint32_t index;
973  MQTTClient_Handle handle;
974 
975  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_ARG);
976  if (handle == NULL)
977  {
978  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
979  return -1;
980  }
981 
982  memset(&params, 0x0, sizeof(ATCmdMqtt_Unsubscribe_t));
983 
984  /* Call the command parser */
985  ret = ATCmdMqtt_unsubscribeParse((char *)arg,&params);
986 
987  if(ret < 0)
988  {
989  ATCmd_errorResult(ATCmd_errorParseStr,ret);
990  ATCmdMqtt_unsubscribeFree(&params);
991  return -1;
992  }
993  ret = MQTTClient_unsubscribe(handle,params.value,params.numberOfTopics);
994 
995  if (ret < 0)
996  {
997  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
998  }
999  else
1000  {
1001  ATCmd_okResult();
1002  }
1003 
1004  ATCmdMqtt_unsubscribeFree(&params);
1005 
1006  return ret;
1007 }
1008 
1009 /*
1010  Free allocated memory for MQTT client set command
1011 */
1012 int32_t ATCmdMqtt_setFree(ATCmdMqtt_Set_t *params)
1013 {
1014  if (params != NULL)
1015  {
1016  if (params->usrName != NULL)
1017  {
1018  free(params->usrName);
1019  }
1020  if (params->usrPwd != NULL)
1021  {
1022  free(params->usrPwd);
1023  }
1024  if (params->mqttWill.willTopic != NULL)
1025  {
1026  free((void *)params->mqttWill.willTopic);
1027  }
1028  if (params->mqttWill.willMsg != NULL)
1029  {
1030  free((void *)params->mqttWill.willMsg);
1031  }
1032 
1033  free(params);
1034  }
1035  return 0;
1036 }
1037 
1038 /*
1039  Parse MQTT client set command.
1040 */
1041 int32_t ATCmdMqtt_setParse(char *buff, ATCmdMqtt_Set_t *params)
1042 {
1043  int32_t ret = 0;
1044  uint32_t outputLen = 0;
1045 
1046  /* Option */
1047  if ((ret = StrMpl_getListVal(ATCmd_mqttSetOptions, sizeof(ATCmd_mqttSetOptions)/sizeof(StrMpl_List_t), &buff, &params->option, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_16 )) < 0)
1048  {
1049  return ret;
1050  }
1051 
1052  /* value */
1053  switch (params->option)
1054  {
1055  case MQTTClient_USER_NAME:
1056  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->usrName), ATCMD_DELIM_TRM, ATCMDMQTT_SET_MAX_USERNAME_LEN ,ATCmd_excludeDelimStr)) < 0)
1057  {
1058  return ret;
1059  }
1060  params->value = (void *)(params->usrName);
1061  break;
1062  case MQTTClient_PASSWORD:
1063  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->usrPwd), ATCMD_DELIM_TRM, ATCMDMQTT_SET_MAX_PASSWORD_LEN ,ATCmd_excludeDelimStr)) < 0)
1064  {
1065  return ret;
1066  }
1067  params->value = (void *)(params->usrPwd);
1068  break;
1069  case MQTTClient_WILL_PARAM:
1070  /* will topic */
1071  if ((ret = StrMpl_getAllocStr(&buff, (char **)&(params->mqttWill.willTopic), ATCMD_DELIM_ARG, ATCMDMQTT_PUBLISH_MAX_TOPIC_LEN ,ATCmd_excludeDelimStr)) < 0)
1072  {
1073  return ret;
1074  }
1075  /* will qos */
1076  if ((ret = StrMpl_getListVal(ATCmd_mqttQos, sizeof(ATCmd_mqttQos)/sizeof(StrMpl_List_t), &buff, &params->mqttWill.willQos, ATCMD_DELIM_ARG, STRMPL_FLAG_PARAM_SIZE_8)) < 0)
1077  {
1078  return ret;
1079  }
1080  /* will retain */
1081  if ((ret = StrMpl_getVal(&buff, &params->mqttWill.retain , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
1082  {
1083  return ret;
1084  }
1085 
1086  /* will message length */
1087  if ((ret = StrMpl_getVal(&buff, &params->willMsgLen , ATCMD_DELIM_ARG,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
1088  {
1089  return ret;
1090  }
1091 
1092  /* will message */
1093  if (ATCmdMqtt_cb.dataFormat == ATCMD_DATA_FORMAT_BASE64)
1094  {
1095  /* convert length to binary length */
1096  outputLen = StrMpl_getBase64DecBufSize((uint8_t *)buff,params->willMsgLen);
1097  params->mqttWill.willMsg = malloc(outputLen);
1098  if (params->mqttWill.willMsg == NULL)
1099  {
1100  return -1;
1101  }
1102  if (StrMpl_decodeBase64((uint8_t *)buff, params->willMsgLen, (uint8_t *)params->mqttWill.willMsg, (uint32_t *)&outputLen) < 0)
1103  {
1104  return -1;
1105  }
1106  }
1107  else
1108  {
1109  params->mqttWill.willMsg = malloc(params->willMsgLen);
1110  if (params->mqttWill.willMsg == NULL)
1111  {
1112  return -1;
1113  }
1114  memcpy((void *)params->mqttWill.willMsg, buff, params->willMsgLen);
1115  }
1116  params->value = (void *)&(params->mqttWill);
1117  break;
1118  case MQTTClient_KEEPALIVE_TIME:
1119  /* keepalive time */
1120  if ((ret = StrMpl_getVal(&buff, &params->keepAliveTimeout, ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_16)) < 0)
1121  {
1122  return ret;
1123  }
1124  params->value = (void *)&(params->keepAliveTimeout);
1125  break;
1126  case MQTTClient_CLEAN_CONNECT:
1127  /* clean */
1128  if ((ret = StrMpl_getVal(&buff, &params->clean, ATCMD_DELIM_TRM,STRMPL_FLAG_PARAM_SIZE_8)) < 0)
1129  {
1130  return ret;
1131  }
1132  params->value = (void *)&(params->clean);
1133  break;
1134  }
1135  return ret;
1136 }
1137 
1138 
1139 /*
1140  MQTT client set command callback.
1141 */
1142 int32_t ATCmdMqtt_setCallback(void *arg)
1143 {
1144  int32_t ret = 0;
1145  uint32_t index;
1146  MQTTClient_Handle handle;
1147 
1148  handle = ATCmdMqtt_clientIndexParse((char **)&arg,&index,ATCMD_DELIM_ARG);
1149  if (handle == NULL)
1150  {
1151  ATCmd_errorResult(ATCmd_errorParseStr,STRMPL_ERROR_WRONG_PARAM);
1152  return -1;
1153  }
1154 
1155  ATCmdMqtt_cb.set = malloc(sizeof(ATCmdMqtt_Set_t));
1156  if (ATCmdMqtt_cb.set == NULL)
1157  {
1158  ATCmd_errorResult(ATCmd_errorAllocStr,0);
1159  return -1;
1160  }
1161  memset(ATCmdMqtt_cb.set, 0x0, sizeof(ATCmdMqtt_Set_t));
1162 
1163  /* Call the command parser */
1164  ret = ATCmdMqtt_setParse((char *)arg, ATCmdMqtt_cb.set);
1165 
1166  if (ret < 0)
1167  {
1168  ATCmd_errorResult(ATCmd_errorParseStr,ret);
1169  ATCmdMqtt_setFree(ATCmdMqtt_cb.set);
1170  return -1;
1171  }
1172 
1173  ret = MQTTClient_set(handle,ATCmdMqtt_cb.set->option,ATCmdMqtt_cb.set->value,0);
1174 
1175  if (ret < 0)
1176  {
1177  ATCmdMqtt_setFree(ATCmdMqtt_cb.set);
1178  ATCmd_errorResult(ATCmd_errorCmdStr,ret);
1179  }
1180  else
1181  {
1182  ATCmd_okResult();
1183  }
1184  /* allocation shoud be freed in ATCmdMqtt_clientFree */
1185 
1186  return ret;
1187 }
1188 
int32_t ATCmdMqtt_publishCallback(void *arg)
Mqtt Client Publish callback.
Definition: atcmd_mqtt.c:725
int32_t ATCmdMqtt_unsubscribeCallback(void *arg)
Mqtt Client Unsubscribe callback.
Definition: atcmd_mqtt.c:968
int32_t ATCmdMqtt_setCallback(void *arg)
Mqtt Client Set callback.
Definition: atcmd_mqtt.c:1142
int32_t ATCmdMqtt_disconnectCallback(void *arg)
Mqtt Client Disconnect callback.
Definition: atcmd_mqtt.c:606
int32_t ATCmdMqtt_deleteCallback(void *arg)
Mqtt Client delete callback.
Definition: atcmd_mqtt.c:546
int32_t ATCmdMqtt_subscribeCallback(void *arg)
Mqtt Client Subscribe callback.
Definition: atcmd_mqtt.c:850
int32_t ATCmdMqtt_createCallback(void *arg)
Mqtt Client Create callback.
Definition: atcmd_mqtt.c:474
int32_t ATCmdMqtt_connectCallback(void *arg)
Mqtt Client Connect callback.
Definition: atcmd_mqtt.c:577