diff --git a/.gitignore b/.gitignore index d640d5b8..a395a05e 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,8 @@ man/mosquitto_pub.1 man/mosquitto_sub.1 man/mqtt.7 +out/ + src/db_dump/mosquitto_db_dump src/mosquitto src/mosquitto_passwd diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 41d6a896..666a80c0 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -195,6 +195,8 @@ struct mosquitto { struct mosquitto__alias *aliases; uint32_t maximum_packet_size; int alias_count; + uint32_t will_delay_interval; + time_t will_delay_time; #ifdef WITH_TLS SSL *ssl; SSL_CTX *ssl_ctx; diff --git a/src/Makefile b/src/Makefile index bf7775ef..3776b73d 100644 --- a/src/Makefile +++ b/src/Makefile @@ -60,6 +60,7 @@ OBJS= mosquitto.o \ util_mosq.o \ util_topic.o \ websockets.o \ + will_delay.o \ will_mosq.o mosquitto : ${OBJS} @@ -221,6 +222,9 @@ utf8_mosq.o : ../lib/utf8_mosq.c websockets.o : websockets.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ +will_delay.o : will_delay.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ + will_mosq.o : ../lib/will_mosq.c ../lib/will_mosq.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/context.c b/src/context.c index bb8ee605..7f999c1a 100644 --- a/src/context.c +++ b/src/context.c @@ -218,6 +218,11 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt) { if(ctxt->state != mosq_cs_disconnecting && ctxt->will){ + if(ctxt->will_delay_interval > 0){ + will_delay__add(ctxt); + return; + } + if(mosquitto_acl_check(db, ctxt, ctxt->will->msg.topic, ctxt->will->msg.payloadlen, diff --git a/src/deps/utlist.h b/src/deps/utlist.h new file mode 100644 index 00000000..5bb1ac9b --- /dev/null +++ b/src/deps/utlist.h @@ -0,0 +1,1073 @@ +/* +Copyright (c) 2007-2018, Troy D. Hanson http://troydhanson.github.com/uthash/ +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED +TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A +PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER +OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +*/ + +#ifndef UTLIST_H +#define UTLIST_H + +#define UTLIST_VERSION 2.1.0 + +#include + +/* + * This file contains macros to manipulate singly and doubly-linked lists. + * + * 1. LL_ macros: singly-linked lists. + * 2. DL_ macros: doubly-linked lists. + * 3. CDL_ macros: circular doubly-linked lists. + * + * To use singly-linked lists, your structure must have a "next" pointer. + * To use doubly-linked lists, your structure must "prev" and "next" pointers. + * Either way, the pointer to the head of the list must be initialized to NULL. + * + * ----------------.EXAMPLE ------------------------- + * struct item { + * int id; + * struct item *prev, *next; + * } + * + * struct item *list = NULL: + * + * int main() { + * struct item *item; + * ... allocate and populate item ... + * DL_APPEND(list, item); + * } + * -------------------------------------------------- + * + * For doubly-linked lists, the append and delete macros are O(1) + * For singly-linked lists, append and delete are O(n) but prepend is O(1) + * The sort macro is O(n log(n)) for all types of single/double/circular lists. + */ + +/* These macros use decltype or the earlier __typeof GNU extension. + As decltype is only available in newer compilers (VS2010 or gcc 4.3+ + when compiling c++ source) this code uses whatever method is needed + or, for VS2008 where neither is available, uses casting workarounds. */ +#if !defined(LDECLTYPE) && !defined(NO_DECLTYPE) +#if defined(_MSC_VER) /* MS compiler */ +#if _MSC_VER >= 1600 && defined(__cplusplus) /* VS2010 or newer in C++ mode */ +#define LDECLTYPE(x) decltype(x) +#else /* VS2008 or older (or VS2010 in C mode) */ +#define NO_DECLTYPE +#endif +#elif defined(__BORLANDC__) || defined(__ICCARM__) || defined(__LCC__) || defined(__WATCOMC__) +#define NO_DECLTYPE +#else /* GNU, Sun and other compilers */ +#define LDECLTYPE(x) __typeof(x) +#endif +#endif + +/* for VS2008 we use some workarounds to get around the lack of decltype, + * namely, we always reassign our tmp variable to the list head if we need + * to dereference its prev/next pointers, and save/restore the real head.*/ +#ifdef NO_DECLTYPE +#define IF_NO_DECLTYPE(x) x +#define LDECLTYPE(x) char* +#define UTLIST_SV(elt,list) _tmp = (char*)(list); {char **_alias = (char**)&(list); *_alias = (elt); } +#define UTLIST_NEXT(elt,list,next) ((char*)((list)->next)) +#define UTLIST_NEXTASGN(elt,list,to,next) { char **_alias = (char**)&((list)->next); *_alias=(char*)(to); } +/* #define UTLIST_PREV(elt,list,prev) ((char*)((list)->prev)) */ +#define UTLIST_PREVASGN(elt,list,to,prev) { char **_alias = (char**)&((list)->prev); *_alias=(char*)(to); } +#define UTLIST_RS(list) { char **_alias = (char**)&(list); *_alias=_tmp; } +#define UTLIST_CASTASGN(a,b) { char **_alias = (char**)&(a); *_alias=(char*)(b); } +#else +#define IF_NO_DECLTYPE(x) +#define UTLIST_SV(elt,list) +#define UTLIST_NEXT(elt,list,next) ((elt)->next) +#define UTLIST_NEXTASGN(elt,list,to,next) ((elt)->next)=(to) +/* #define UTLIST_PREV(elt,list,prev) ((elt)->prev) */ +#define UTLIST_PREVASGN(elt,list,to,prev) ((elt)->prev)=(to) +#define UTLIST_RS(list) +#define UTLIST_CASTASGN(a,b) (a)=(b) +#endif + +/****************************************************************************** + * The sort macro is an adaptation of Simon Tatham's O(n log(n)) mergesort * + * Unwieldy variable names used here to avoid shadowing passed-in variables. * + *****************************************************************************/ +#define LL_SORT(list, cmp) \ + LL_SORT2(list, cmp, next) + +#define LL_SORT2(list, cmp, next) \ +do { \ + LDECLTYPE(list) _ls_p; \ + LDECLTYPE(list) _ls_q; \ + LDECLTYPE(list) _ls_e; \ + LDECLTYPE(list) _ls_tail; \ + IF_NO_DECLTYPE(LDECLTYPE(list) _tmp;) \ + int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping; \ + if (list) { \ + _ls_insize = 1; \ + _ls_looping = 1; \ + while (_ls_looping) { \ + UTLIST_CASTASGN(_ls_p,list); \ + (list) = NULL; \ + _ls_tail = NULL; \ + _ls_nmerges = 0; \ + while (_ls_p) { \ + _ls_nmerges++; \ + _ls_q = _ls_p; \ + _ls_psize = 0; \ + for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) { \ + _ls_psize++; \ + UTLIST_SV(_ls_q,list); _ls_q = UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); \ + if (!_ls_q) break; \ + } \ + _ls_qsize = _ls_insize; \ + while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) { \ + if (_ls_psize == 0) { \ + _ls_e = _ls_q; UTLIST_SV(_ls_q,list); _ls_q = \ + UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); _ls_qsize--; \ + } else if (_ls_qsize == 0 || !_ls_q) { \ + _ls_e = _ls_p; UTLIST_SV(_ls_p,list); _ls_p = \ + UTLIST_NEXT(_ls_p,list,next); UTLIST_RS(list); _ls_psize--; \ + } else if (cmp(_ls_p,_ls_q) <= 0) { \ + _ls_e = _ls_p; UTLIST_SV(_ls_p,list); _ls_p = \ + UTLIST_NEXT(_ls_p,list,next); UTLIST_RS(list); _ls_psize--; \ + } else { \ + _ls_e = _ls_q; UTLIST_SV(_ls_q,list); _ls_q = \ + UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); _ls_qsize--; \ + } \ + if (_ls_tail) { \ + UTLIST_SV(_ls_tail,list); UTLIST_NEXTASGN(_ls_tail,list,_ls_e,next); UTLIST_RS(list); \ + } else { \ + UTLIST_CASTASGN(list,_ls_e); \ + } \ + _ls_tail = _ls_e; \ + } \ + _ls_p = _ls_q; \ + } \ + if (_ls_tail) { \ + UTLIST_SV(_ls_tail,list); UTLIST_NEXTASGN(_ls_tail,list,NULL,next); UTLIST_RS(list); \ + } \ + if (_ls_nmerges <= 1) { \ + _ls_looping=0; \ + } \ + _ls_insize *= 2; \ + } \ + } \ +} while (0) + + +#define DL_SORT(list, cmp) \ + DL_SORT2(list, cmp, prev, next) + +#define DL_SORT2(list, cmp, prev, next) \ +do { \ + LDECLTYPE(list) _ls_p; \ + LDECLTYPE(list) _ls_q; \ + LDECLTYPE(list) _ls_e; \ + LDECLTYPE(list) _ls_tail; \ + IF_NO_DECLTYPE(LDECLTYPE(list) _tmp;) \ + int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping; \ + if (list) { \ + _ls_insize = 1; \ + _ls_looping = 1; \ + while (_ls_looping) { \ + UTLIST_CASTASGN(_ls_p,list); \ + (list) = NULL; \ + _ls_tail = NULL; \ + _ls_nmerges = 0; \ + while (_ls_p) { \ + _ls_nmerges++; \ + _ls_q = _ls_p; \ + _ls_psize = 0; \ + for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) { \ + _ls_psize++; \ + UTLIST_SV(_ls_q,list); _ls_q = UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); \ + if (!_ls_q) break; \ + } \ + _ls_qsize = _ls_insize; \ + while ((_ls_psize > 0) || ((_ls_qsize > 0) && _ls_q)) { \ + if (_ls_psize == 0) { \ + _ls_e = _ls_q; UTLIST_SV(_ls_q,list); _ls_q = \ + UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); _ls_qsize--; \ + } else if ((_ls_qsize == 0) || (!_ls_q)) { \ + _ls_e = _ls_p; UTLIST_SV(_ls_p,list); _ls_p = \ + UTLIST_NEXT(_ls_p,list,next); UTLIST_RS(list); _ls_psize--; \ + } else if (cmp(_ls_p,_ls_q) <= 0) { \ + _ls_e = _ls_p; UTLIST_SV(_ls_p,list); _ls_p = \ + UTLIST_NEXT(_ls_p,list,next); UTLIST_RS(list); _ls_psize--; \ + } else { \ + _ls_e = _ls_q; UTLIST_SV(_ls_q,list); _ls_q = \ + UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); _ls_qsize--; \ + } \ + if (_ls_tail) { \ + UTLIST_SV(_ls_tail,list); UTLIST_NEXTASGN(_ls_tail,list,_ls_e,next); UTLIST_RS(list); \ + } else { \ + UTLIST_CASTASGN(list,_ls_e); \ + } \ + UTLIST_SV(_ls_e,list); UTLIST_PREVASGN(_ls_e,list,_ls_tail,prev); UTLIST_RS(list); \ + _ls_tail = _ls_e; \ + } \ + _ls_p = _ls_q; \ + } \ + UTLIST_CASTASGN((list)->prev, _ls_tail); \ + UTLIST_SV(_ls_tail,list); UTLIST_NEXTASGN(_ls_tail,list,NULL,next); UTLIST_RS(list); \ + if (_ls_nmerges <= 1) { \ + _ls_looping=0; \ + } \ + _ls_insize *= 2; \ + } \ + } \ +} while (0) + +#define CDL_SORT(list, cmp) \ + CDL_SORT2(list, cmp, prev, next) + +#define CDL_SORT2(list, cmp, prev, next) \ +do { \ + LDECLTYPE(list) _ls_p; \ + LDECLTYPE(list) _ls_q; \ + LDECLTYPE(list) _ls_e; \ + LDECLTYPE(list) _ls_tail; \ + LDECLTYPE(list) _ls_oldhead; \ + LDECLTYPE(list) _tmp; \ + int _ls_insize, _ls_nmerges, _ls_psize, _ls_qsize, _ls_i, _ls_looping; \ + if (list) { \ + _ls_insize = 1; \ + _ls_looping = 1; \ + while (_ls_looping) { \ + UTLIST_CASTASGN(_ls_p,list); \ + UTLIST_CASTASGN(_ls_oldhead,list); \ + (list) = NULL; \ + _ls_tail = NULL; \ + _ls_nmerges = 0; \ + while (_ls_p) { \ + _ls_nmerges++; \ + _ls_q = _ls_p; \ + _ls_psize = 0; \ + for (_ls_i = 0; _ls_i < _ls_insize; _ls_i++) { \ + _ls_psize++; \ + UTLIST_SV(_ls_q,list); \ + if (UTLIST_NEXT(_ls_q,list,next) == _ls_oldhead) { \ + _ls_q = NULL; \ + } else { \ + _ls_q = UTLIST_NEXT(_ls_q,list,next); \ + } \ + UTLIST_RS(list); \ + if (!_ls_q) break; \ + } \ + _ls_qsize = _ls_insize; \ + while (_ls_psize > 0 || (_ls_qsize > 0 && _ls_q)) { \ + if (_ls_psize == 0) { \ + _ls_e = _ls_q; UTLIST_SV(_ls_q,list); _ls_q = \ + UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); _ls_qsize--; \ + if (_ls_q == _ls_oldhead) { _ls_q = NULL; } \ + } else if (_ls_qsize == 0 || !_ls_q) { \ + _ls_e = _ls_p; UTLIST_SV(_ls_p,list); _ls_p = \ + UTLIST_NEXT(_ls_p,list,next); UTLIST_RS(list); _ls_psize--; \ + if (_ls_p == _ls_oldhead) { _ls_p = NULL; } \ + } else if (cmp(_ls_p,_ls_q) <= 0) { \ + _ls_e = _ls_p; UTLIST_SV(_ls_p,list); _ls_p = \ + UTLIST_NEXT(_ls_p,list,next); UTLIST_RS(list); _ls_psize--; \ + if (_ls_p == _ls_oldhead) { _ls_p = NULL; } \ + } else { \ + _ls_e = _ls_q; UTLIST_SV(_ls_q,list); _ls_q = \ + UTLIST_NEXT(_ls_q,list,next); UTLIST_RS(list); _ls_qsize--; \ + if (_ls_q == _ls_oldhead) { _ls_q = NULL; } \ + } \ + if (_ls_tail) { \ + UTLIST_SV(_ls_tail,list); UTLIST_NEXTASGN(_ls_tail,list,_ls_e,next); UTLIST_RS(list); \ + } else { \ + UTLIST_CASTASGN(list,_ls_e); \ + } \ + UTLIST_SV(_ls_e,list); UTLIST_PREVASGN(_ls_e,list,_ls_tail,prev); UTLIST_RS(list); \ + _ls_tail = _ls_e; \ + } \ + _ls_p = _ls_q; \ + } \ + UTLIST_CASTASGN((list)->prev,_ls_tail); \ + UTLIST_CASTASGN(_tmp,list); \ + UTLIST_SV(_ls_tail,list); UTLIST_NEXTASGN(_ls_tail,list,_tmp,next); UTLIST_RS(list); \ + if (_ls_nmerges <= 1) { \ + _ls_looping=0; \ + } \ + _ls_insize *= 2; \ + } \ + } \ +} while (0) + +/****************************************************************************** + * singly linked list macros (non-circular) * + *****************************************************************************/ +#define LL_PREPEND(head,add) \ + LL_PREPEND2(head,add,next) + +#define LL_PREPEND2(head,add,next) \ +do { \ + (add)->next = (head); \ + (head) = (add); \ +} while (0) + +#define LL_CONCAT(head1,head2) \ + LL_CONCAT2(head1,head2,next) + +#define LL_CONCAT2(head1,head2,next) \ +do { \ + LDECLTYPE(head1) _tmp; \ + if (head1) { \ + _tmp = (head1); \ + while (_tmp->next) { _tmp = _tmp->next; } \ + _tmp->next=(head2); \ + } else { \ + (head1)=(head2); \ + } \ +} while (0) + +#define LL_APPEND(head,add) \ + LL_APPEND2(head,add,next) + +#define LL_APPEND2(head,add,next) \ +do { \ + LDECLTYPE(head) _tmp; \ + (add)->next=NULL; \ + if (head) { \ + _tmp = (head); \ + while (_tmp->next) { _tmp = _tmp->next; } \ + _tmp->next=(add); \ + } else { \ + (head)=(add); \ + } \ +} while (0) + +#define LL_INSERT_INORDER(head,add,cmp) \ + LL_INSERT_INORDER2(head,add,cmp,next) + +#define LL_INSERT_INORDER2(head,add,cmp,next) \ +do { \ + LDECLTYPE(head) _tmp; \ + if (head) { \ + LL_LOWER_BOUND2(head, _tmp, add, cmp, next); \ + LL_APPEND_ELEM2(head, _tmp, add, next); \ + } else { \ + (head) = (add); \ + (head)->next = NULL; \ + } \ +} while (0) + +#define LL_LOWER_BOUND(head,elt,like,cmp) \ + LL_LOWER_BOUND2(head,elt,like,cmp,next) + +#define LL_LOWER_BOUND2(head,elt,like,cmp,next) \ + do { \ + if ((head) == NULL || (cmp(head, like)) >= 0) { \ + (elt) = NULL; \ + } else { \ + for ((elt) = (head); (elt)->next != NULL; (elt) = (elt)->next) { \ + if (cmp((elt)->next, like) >= 0) { \ + break; \ + } \ + } \ + } \ + } while (0) + +#define LL_DELETE(head,del) \ + LL_DELETE2(head,del,next) + +#define LL_DELETE2(head,del,next) \ +do { \ + LDECLTYPE(head) _tmp; \ + if ((head) == (del)) { \ + (head)=(head)->next; \ + } else { \ + _tmp = (head); \ + while (_tmp->next && (_tmp->next != (del))) { \ + _tmp = _tmp->next; \ + } \ + if (_tmp->next) { \ + _tmp->next = (del)->next; \ + } \ + } \ +} while (0) + +#define LL_COUNT(head,el,counter) \ + LL_COUNT2(head,el,counter,next) \ + +#define LL_COUNT2(head,el,counter,next) \ +do { \ + (counter) = 0; \ + LL_FOREACH2(head,el,next) { ++(counter); } \ +} while (0) + +#define LL_FOREACH(head,el) \ + LL_FOREACH2(head,el,next) + +#define LL_FOREACH2(head,el,next) \ + for ((el) = (head); el; (el) = (el)->next) + +#define LL_FOREACH_SAFE(head,el,tmp) \ + LL_FOREACH_SAFE2(head,el,tmp,next) + +#define LL_FOREACH_SAFE2(head,el,tmp,next) \ + for ((el) = (head); (el) && ((tmp) = (el)->next, 1); (el) = (tmp)) + +#define LL_SEARCH_SCALAR(head,out,field,val) \ + LL_SEARCH_SCALAR2(head,out,field,val,next) + +#define LL_SEARCH_SCALAR2(head,out,field,val,next) \ +do { \ + LL_FOREACH2(head,out,next) { \ + if ((out)->field == (val)) break; \ + } \ +} while (0) + +#define LL_SEARCH(head,out,elt,cmp) \ + LL_SEARCH2(head,out,elt,cmp,next) + +#define LL_SEARCH2(head,out,elt,cmp,next) \ +do { \ + LL_FOREACH2(head,out,next) { \ + if ((cmp(out,elt))==0) break; \ + } \ +} while (0) + +#define LL_REPLACE_ELEM2(head, el, add, next) \ +do { \ + LDECLTYPE(head) _tmp; \ + assert((head) != NULL); \ + assert((el) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el)->next; \ + if ((head) == (el)) { \ + (head) = (add); \ + } else { \ + _tmp = (head); \ + while (_tmp->next && (_tmp->next != (el))) { \ + _tmp = _tmp->next; \ + } \ + if (_tmp->next) { \ + _tmp->next = (add); \ + } \ + } \ +} while (0) + +#define LL_REPLACE_ELEM(head, el, add) \ + LL_REPLACE_ELEM2(head, el, add, next) + +#define LL_PREPEND_ELEM2(head, el, add, next) \ +do { \ + if (el) { \ + LDECLTYPE(head) _tmp; \ + assert((head) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el); \ + if ((head) == (el)) { \ + (head) = (add); \ + } else { \ + _tmp = (head); \ + while (_tmp->next && (_tmp->next != (el))) { \ + _tmp = _tmp->next; \ + } \ + if (_tmp->next) { \ + _tmp->next = (add); \ + } \ + } \ + } else { \ + LL_APPEND2(head, add, next); \ + } \ +} while (0) \ + +#define LL_PREPEND_ELEM(head, el, add) \ + LL_PREPEND_ELEM2(head, el, add, next) + +#define LL_APPEND_ELEM2(head, el, add, next) \ +do { \ + if (el) { \ + assert((head) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el)->next; \ + (el)->next = (add); \ + } else { \ + LL_PREPEND2(head, add, next); \ + } \ +} while (0) \ + +#define LL_APPEND_ELEM(head, el, add) \ + LL_APPEND_ELEM2(head, el, add, next) + +#ifdef NO_DECLTYPE +/* Here are VS2008 / NO_DECLTYPE replacements for a few functions */ + +#undef LL_CONCAT2 +#define LL_CONCAT2(head1,head2,next) \ +do { \ + char *_tmp; \ + if (head1) { \ + _tmp = (char*)(head1); \ + while ((head1)->next) { (head1) = (head1)->next; } \ + (head1)->next = (head2); \ + UTLIST_RS(head1); \ + } else { \ + (head1)=(head2); \ + } \ +} while (0) + +#undef LL_APPEND2 +#define LL_APPEND2(head,add,next) \ +do { \ + if (head) { \ + (add)->next = head; /* use add->next as a temp variable */ \ + while ((add)->next->next) { (add)->next = (add)->next->next; } \ + (add)->next->next=(add); \ + } else { \ + (head)=(add); \ + } \ + (add)->next=NULL; \ +} while (0) + +#undef LL_INSERT_INORDER2 +#define LL_INSERT_INORDER2(head,add,cmp,next) \ +do { \ + if ((head) == NULL || (cmp(head, add)) >= 0) { \ + (add)->next = (head); \ + (head) = (add); \ + } else { \ + char *_tmp = (char*)(head); \ + while ((head)->next != NULL && (cmp((head)->next, add)) < 0) { \ + (head) = (head)->next; \ + } \ + (add)->next = (head)->next; \ + (head)->next = (add); \ + UTLIST_RS(head); \ + } \ +} while (0) + +#undef LL_DELETE2 +#define LL_DELETE2(head,del,next) \ +do { \ + if ((head) == (del)) { \ + (head)=(head)->next; \ + } else { \ + char *_tmp = (char*)(head); \ + while ((head)->next && ((head)->next != (del))) { \ + (head) = (head)->next; \ + } \ + if ((head)->next) { \ + (head)->next = ((del)->next); \ + } \ + UTLIST_RS(head); \ + } \ +} while (0) + +#undef LL_REPLACE_ELEM2 +#define LL_REPLACE_ELEM2(head, el, add, next) \ +do { \ + assert((head) != NULL); \ + assert((el) != NULL); \ + assert((add) != NULL); \ + if ((head) == (el)) { \ + (head) = (add); \ + } else { \ + (add)->next = head; \ + while ((add)->next->next && ((add)->next->next != (el))) { \ + (add)->next = (add)->next->next; \ + } \ + if ((add)->next->next) { \ + (add)->next->next = (add); \ + } \ + } \ + (add)->next = (el)->next; \ +} while (0) + +#undef LL_PREPEND_ELEM2 +#define LL_PREPEND_ELEM2(head, el, add, next) \ +do { \ + if (el) { \ + assert((head) != NULL); \ + assert((add) != NULL); \ + if ((head) == (el)) { \ + (head) = (add); \ + } else { \ + (add)->next = (head); \ + while ((add)->next->next && ((add)->next->next != (el))) { \ + (add)->next = (add)->next->next; \ + } \ + if ((add)->next->next) { \ + (add)->next->next = (add); \ + } \ + } \ + (add)->next = (el); \ + } else { \ + LL_APPEND2(head, add, next); \ + } \ +} while (0) \ + +#endif /* NO_DECLTYPE */ + +/****************************************************************************** + * doubly linked list macros (non-circular) * + *****************************************************************************/ +#define DL_PREPEND(head,add) \ + DL_PREPEND2(head,add,prev,next) + +#define DL_PREPEND2(head,add,prev,next) \ +do { \ + (add)->next = (head); \ + if (head) { \ + (add)->prev = (head)->prev; \ + (head)->prev = (add); \ + } else { \ + (add)->prev = (add); \ + } \ + (head) = (add); \ +} while (0) + +#define DL_APPEND(head,add) \ + DL_APPEND2(head,add,prev,next) + +#define DL_APPEND2(head,add,prev,next) \ +do { \ + if (head) { \ + (add)->prev = (head)->prev; \ + (head)->prev->next = (add); \ + (head)->prev = (add); \ + (add)->next = NULL; \ + } else { \ + (head)=(add); \ + (head)->prev = (head); \ + (head)->next = NULL; \ + } \ +} while (0) + +#define DL_INSERT_INORDER(head,add,cmp) \ + DL_INSERT_INORDER2(head,add,cmp,prev,next) + +#define DL_INSERT_INORDER2(head,add,cmp,prev,next) \ +do { \ + LDECLTYPE(head) _tmp; \ + if (head) { \ + DL_LOWER_BOUND2(head, _tmp, add, cmp, next); \ + DL_APPEND_ELEM2(head, _tmp, add, prev, next); \ + } else { \ + (head) = (add); \ + (head)->prev = (head); \ + (head)->next = NULL; \ + } \ +} while (0) + +#define DL_LOWER_BOUND(head,elt,like,cmp) \ + DL_LOWER_BOUND2(head,elt,like,cmp,next) + +#define DL_LOWER_BOUND2(head,elt,like,cmp,next) \ +do { \ + if ((head) == NULL || (cmp(head, like)) >= 0) { \ + (elt) = NULL; \ + } else { \ + for ((elt) = (head); (elt)->next != NULL; (elt) = (elt)->next) { \ + if ((cmp((elt)->next, like)) >= 0) { \ + break; \ + } \ + } \ + } \ +} while (0) + +#define DL_CONCAT(head1,head2) \ + DL_CONCAT2(head1,head2,prev,next) + +#define DL_CONCAT2(head1,head2,prev,next) \ +do { \ + LDECLTYPE(head1) _tmp; \ + if (head2) { \ + if (head1) { \ + UTLIST_CASTASGN(_tmp, (head2)->prev); \ + (head2)->prev = (head1)->prev; \ + (head1)->prev->next = (head2); \ + UTLIST_CASTASGN((head1)->prev, _tmp); \ + } else { \ + (head1)=(head2); \ + } \ + } \ +} while (0) + +#define DL_DELETE(head,del) \ + DL_DELETE2(head,del,prev,next) + +#define DL_DELETE2(head,del,prev,next) \ +do { \ + assert((head) != NULL); \ + assert((del)->prev != NULL); \ + if ((del)->prev == (del)) { \ + (head)=NULL; \ + } else if ((del)==(head)) { \ + (del)->next->prev = (del)->prev; \ + (head) = (del)->next; \ + } else { \ + (del)->prev->next = (del)->next; \ + if ((del)->next) { \ + (del)->next->prev = (del)->prev; \ + } else { \ + (head)->prev = (del)->prev; \ + } \ + } \ +} while (0) + +#define DL_COUNT(head,el,counter) \ + DL_COUNT2(head,el,counter,next) \ + +#define DL_COUNT2(head,el,counter,next) \ +do { \ + (counter) = 0; \ + DL_FOREACH2(head,el,next) { ++(counter); } \ +} while (0) + +#define DL_FOREACH(head,el) \ + DL_FOREACH2(head,el,next) + +#define DL_FOREACH2(head,el,next) \ + for ((el) = (head); el; (el) = (el)->next) + +/* this version is safe for deleting the elements during iteration */ +#define DL_FOREACH_SAFE(head,el,tmp) \ + DL_FOREACH_SAFE2(head,el,tmp,next) + +#define DL_FOREACH_SAFE2(head,el,tmp,next) \ + for ((el) = (head); (el) && ((tmp) = (el)->next, 1); (el) = (tmp)) + +/* these are identical to their singly-linked list counterparts */ +#define DL_SEARCH_SCALAR LL_SEARCH_SCALAR +#define DL_SEARCH LL_SEARCH +#define DL_SEARCH_SCALAR2 LL_SEARCH_SCALAR2 +#define DL_SEARCH2 LL_SEARCH2 + +#define DL_REPLACE_ELEM2(head, el, add, prev, next) \ +do { \ + assert((head) != NULL); \ + assert((el) != NULL); \ + assert((add) != NULL); \ + if ((head) == (el)) { \ + (head) = (add); \ + (add)->next = (el)->next; \ + if ((el)->next == NULL) { \ + (add)->prev = (add); \ + } else { \ + (add)->prev = (el)->prev; \ + (add)->next->prev = (add); \ + } \ + } else { \ + (add)->next = (el)->next; \ + (add)->prev = (el)->prev; \ + (add)->prev->next = (add); \ + if ((el)->next == NULL) { \ + (head)->prev = (add); \ + } else { \ + (add)->next->prev = (add); \ + } \ + } \ +} while (0) + +#define DL_REPLACE_ELEM(head, el, add) \ + DL_REPLACE_ELEM2(head, el, add, prev, next) + +#define DL_PREPEND_ELEM2(head, el, add, prev, next) \ +do { \ + if (el) { \ + assert((head) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el); \ + (add)->prev = (el)->prev; \ + (el)->prev = (add); \ + if ((head) == (el)) { \ + (head) = (add); \ + } else { \ + (add)->prev->next = (add); \ + } \ + } else { \ + DL_APPEND2(head, add, prev, next); \ + } \ +} while (0) \ + +#define DL_PREPEND_ELEM(head, el, add) \ + DL_PREPEND_ELEM2(head, el, add, prev, next) + +#define DL_APPEND_ELEM2(head, el, add, prev, next) \ +do { \ + if (el) { \ + assert((head) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el)->next; \ + (add)->prev = (el); \ + (el)->next = (add); \ + if ((add)->next) { \ + (add)->next->prev = (add); \ + } else { \ + (head)->prev = (add); \ + } \ + } else { \ + DL_PREPEND2(head, add, prev, next); \ + } \ +} while (0) \ + +#define DL_APPEND_ELEM(head, el, add) \ + DL_APPEND_ELEM2(head, el, add, prev, next) + +#ifdef NO_DECLTYPE +/* Here are VS2008 / NO_DECLTYPE replacements for a few functions */ + +#undef DL_INSERT_INORDER2 +#define DL_INSERT_INORDER2(head,add,cmp,prev,next) \ +do { \ + if ((head) == NULL) { \ + (add)->prev = (add); \ + (add)->next = NULL; \ + (head) = (add); \ + } else if ((cmp(head, add)) >= 0) { \ + (add)->prev = (head)->prev; \ + (add)->next = (head); \ + (head)->prev = (add); \ + (head) = (add); \ + } else { \ + char *_tmp = (char*)(head); \ + while ((head)->next && (cmp((head)->next, add)) < 0) { \ + (head) = (head)->next; \ + } \ + (add)->prev = (head); \ + (add)->next = (head)->next; \ + (head)->next = (add); \ + UTLIST_RS(head); \ + if ((add)->next) { \ + (add)->next->prev = (add); \ + } else { \ + (head)->prev = (add); \ + } \ + } \ +} while (0) +#endif /* NO_DECLTYPE */ + +/****************************************************************************** + * circular doubly linked list macros * + *****************************************************************************/ +#define CDL_APPEND(head,add) \ + CDL_APPEND2(head,add,prev,next) + +#define CDL_APPEND2(head,add,prev,next) \ +do { \ + if (head) { \ + (add)->prev = (head)->prev; \ + (add)->next = (head); \ + (head)->prev = (add); \ + (add)->prev->next = (add); \ + } else { \ + (add)->prev = (add); \ + (add)->next = (add); \ + (head) = (add); \ + } \ +} while (0) + +#define CDL_PREPEND(head,add) \ + CDL_PREPEND2(head,add,prev,next) + +#define CDL_PREPEND2(head,add,prev,next) \ +do { \ + if (head) { \ + (add)->prev = (head)->prev; \ + (add)->next = (head); \ + (head)->prev = (add); \ + (add)->prev->next = (add); \ + } else { \ + (add)->prev = (add); \ + (add)->next = (add); \ + } \ + (head) = (add); \ +} while (0) + +#define CDL_INSERT_INORDER(head,add,cmp) \ + CDL_INSERT_INORDER2(head,add,cmp,prev,next) + +#define CDL_INSERT_INORDER2(head,add,cmp,prev,next) \ +do { \ + LDECLTYPE(head) _tmp; \ + if (head) { \ + CDL_LOWER_BOUND2(head, _tmp, add, cmp, next); \ + CDL_APPEND_ELEM2(head, _tmp, add, prev, next); \ + } else { \ + (head) = (add); \ + (head)->next = (head); \ + (head)->prev = (head); \ + } \ +} while (0) + +#define CDL_LOWER_BOUND(head,elt,like,cmp) \ + CDL_LOWER_BOUND2(head,elt,like,cmp,next) + +#define CDL_LOWER_BOUND2(head,elt,like,cmp,next) \ +do { \ + if ((head) == NULL || (cmp(head, like)) >= 0) { \ + (elt) = NULL; \ + } else { \ + for ((elt) = (head); (elt)->next != (head); (elt) = (elt)->next) { \ + if ((cmp((elt)->next, like)) >= 0) { \ + break; \ + } \ + } \ + } \ +} while (0) + +#define CDL_DELETE(head,del) \ + CDL_DELETE2(head,del,prev,next) + +#define CDL_DELETE2(head,del,prev,next) \ +do { \ + if (((head)==(del)) && ((head)->next == (head))) { \ + (head) = NULL; \ + } else { \ + (del)->next->prev = (del)->prev; \ + (del)->prev->next = (del)->next; \ + if ((del) == (head)) (head)=(del)->next; \ + } \ +} while (0) + +#define CDL_COUNT(head,el,counter) \ + CDL_COUNT2(head,el,counter,next) \ + +#define CDL_COUNT2(head, el, counter,next) \ +do { \ + (counter) = 0; \ + CDL_FOREACH2(head,el,next) { ++(counter); } \ +} while (0) + +#define CDL_FOREACH(head,el) \ + CDL_FOREACH2(head,el,next) + +#define CDL_FOREACH2(head,el,next) \ + for ((el)=(head);el;(el)=(((el)->next==(head)) ? NULL : (el)->next)) + +#define CDL_FOREACH_SAFE(head,el,tmp1,tmp2) \ + CDL_FOREACH_SAFE2(head,el,tmp1,tmp2,prev,next) + +#define CDL_FOREACH_SAFE2(head,el,tmp1,tmp2,prev,next) \ + for ((el) = (head), (tmp1) = (head) ? (head)->prev : NULL; \ + (el) && ((tmp2) = (el)->next, 1); \ + (el) = ((el) == (tmp1) ? NULL : (tmp2))) + +#define CDL_SEARCH_SCALAR(head,out,field,val) \ + CDL_SEARCH_SCALAR2(head,out,field,val,next) + +#define CDL_SEARCH_SCALAR2(head,out,field,val,next) \ +do { \ + CDL_FOREACH2(head,out,next) { \ + if ((out)->field == (val)) break; \ + } \ +} while (0) + +#define CDL_SEARCH(head,out,elt,cmp) \ + CDL_SEARCH2(head,out,elt,cmp,next) + +#define CDL_SEARCH2(head,out,elt,cmp,next) \ +do { \ + CDL_FOREACH2(head,out,next) { \ + if ((cmp(out,elt))==0) break; \ + } \ +} while (0) + +#define CDL_REPLACE_ELEM2(head, el, add, prev, next) \ +do { \ + assert((head) != NULL); \ + assert((el) != NULL); \ + assert((add) != NULL); \ + if ((el)->next == (el)) { \ + (add)->next = (add); \ + (add)->prev = (add); \ + (head) = (add); \ + } else { \ + (add)->next = (el)->next; \ + (add)->prev = (el)->prev; \ + (add)->next->prev = (add); \ + (add)->prev->next = (add); \ + if ((head) == (el)) { \ + (head) = (add); \ + } \ + } \ +} while (0) + +#define CDL_REPLACE_ELEM(head, el, add) \ + CDL_REPLACE_ELEM2(head, el, add, prev, next) + +#define CDL_PREPEND_ELEM2(head, el, add, prev, next) \ +do { \ + if (el) { \ + assert((head) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el); \ + (add)->prev = (el)->prev; \ + (el)->prev = (add); \ + (add)->prev->next = (add); \ + if ((head) == (el)) { \ + (head) = (add); \ + } \ + } else { \ + CDL_APPEND2(head, add, prev, next); \ + } \ +} while (0) + +#define CDL_PREPEND_ELEM(head, el, add) \ + CDL_PREPEND_ELEM2(head, el, add, prev, next) + +#define CDL_APPEND_ELEM2(head, el, add, prev, next) \ +do { \ + if (el) { \ + assert((head) != NULL); \ + assert((add) != NULL); \ + (add)->next = (el)->next; \ + (add)->prev = (el); \ + (el)->next = (add); \ + (add)->next->prev = (add); \ + } else { \ + CDL_PREPEND2(head, add, prev, next); \ + } \ +} while (0) + +#define CDL_APPEND_ELEM(head, el, add) \ + CDL_APPEND_ELEM2(head, el, add, prev, next) + +#ifdef NO_DECLTYPE +/* Here are VS2008 / NO_DECLTYPE replacements for a few functions */ + +#undef CDL_INSERT_INORDER2 +#define CDL_INSERT_INORDER2(head,add,cmp,prev,next) \ +do { \ + if ((head) == NULL) { \ + (add)->prev = (add); \ + (add)->next = (add); \ + (head) = (add); \ + } else if ((cmp(head, add)) >= 0) { \ + (add)->prev = (head)->prev; \ + (add)->next = (head); \ + (add)->prev->next = (add); \ + (head)->prev = (add); \ + (head) = (add); \ + } else { \ + char *_tmp = (char*)(head); \ + while ((char*)(head)->next != _tmp && (cmp((head)->next, add)) < 0) { \ + (head) = (head)->next; \ + } \ + (add)->prev = (head); \ + (add)->next = (head)->next; \ + (add)->next->prev = (add); \ + (head)->next = (add); \ + UTLIST_RS(head); \ + } \ +} while (0) +#endif /* NO_DECLTYPE */ + +#endif /* UTLIST_H */ diff --git a/src/handle_connect.c b/src/handle_connect.c index d47e0011..208ce67f 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -165,6 +165,7 @@ static int will__read(struct mosquitto *context, struct mosquitto_message_all ** struct mosquitto_message_all *will_struct = NULL; char *will_topic_mount = NULL; uint16_t payloadlen; + mosquitto_property *properties = NULL; will_struct = mosquitto__calloc(1, sizeof(struct mosquitto_message_all)); if(!will_struct){ @@ -172,11 +173,12 @@ static int will__read(struct mosquitto *context, struct mosquitto_message_all ** goto error_cleanup; } if(context->protocol == PROTOCOL_VERSION_v5){ - rc = property__read_all(CMD_WILL, &context->in_packet, &will_struct->properties); + rc = property__read_all(CMD_WILL, &context->in_packet, &properties); if(rc) goto error_cleanup; - mosquitto_property_read_int32(will_struct->properties, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &will_struct->expiry_interval, false); - mosquitto_property_free_all(&will_struct->properties); /* FIXME - TEMPORARY UNTIL PROPERTIES PROCESSED */ + rc = property__process_will(context, will_struct, properties); + mosquitto_property_free_all(&properties); + if(rc) goto error_cleanup; } rc = packet__read_string(&context->in_packet, &will_struct->msg.topic, &slen); if(rc) goto error_cleanup; @@ -228,6 +230,7 @@ error_cleanup: if(will_struct){ mosquitto__free(will_struct->msg.topic); mosquitto__free(will_struct->msg.payload); + mosquitto_property_free_all(&will_struct->properties); mosquitto__free(will_struct); } return rc; diff --git a/src/handle_publish.c b/src/handle_publish.c index fe536589..366c6521 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -295,6 +295,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) topic = stored->topic; dup = 1; mosquitto_property_free_all(&msg_properties); + UHPA_FREE(payload, payloadlen); } switch(qos){ diff --git a/src/loop.c b/src/loop.c index 4678dada..6deb4b23 100644 --- a/src/loop.c +++ b/src/loop.c @@ -555,6 +555,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } } #endif + will_delay__check(db, time(NULL)); #ifdef WITH_PERSISTENCE if(db->config->persistence && db->config->autosave_interval){ if(db->config->autosave_on_changes){ @@ -673,11 +674,14 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context) #else if(context->clean_start){ #endif - context__add_to_disused(db, context); - if(context->id){ - context__remove_from_by_id(db, context); - mosquitto__free(context->id); - context->id = NULL; + if(context->will_delay_interval == 0){ + /* This will be done later, after the will is published */ + context__add_to_disused(db, context); + if(context->id){ + context__remove_from_by_id(db, context); + mosquitto__free(context->id); + context->id = NULL; + } } } context->state = mosq_cs_disconnected; diff --git a/src/mosquitto.c b/src/mosquitto.c index 768d5c10..85a4d8fe 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -384,9 +384,13 @@ int main(int argc, char *argv[]) } #endif + /* FIXME - this isn't quite right, all wills with will delay zero should be + * sent now, but those with positive will delay should be persisted and + * restored, pending the client reconnecting in time. */ HASH_ITER(hh_id, int_db.contexts_by_id, ctxt, ctxt_tmp){ context__send_will(&int_db, ctxt); } + will_delay__send_all(&int_db); #ifdef WITH_PERSISTENCE if(config.persistence){ diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 7e09933e..22d84c1b 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -628,6 +628,7 @@ void bridge__packet_cleanup(struct mosquitto *context); * Property related functions * ============================================================ */ int property__process_connect(struct mosquitto *context, mosquitto_property *props); +int property__process_will(struct mosquitto *context, struct mosquitto_message_all *msg, mosquitto_property *props); int property__process_disconnect(struct mosquitto *context, mosquitto_property *props); /* ============================================================ @@ -674,5 +675,12 @@ struct libwebsocket_context *mosq_websockets_init(struct mosquitto__listener *li #endif void do_disconnect(struct mosquitto_db *db, struct mosquitto *context); +/* ============================================================ + * Will delay + * ============================================================ */ +int will_delay__add(struct mosquitto *context); +void will_delay__check(struct mosquitto_db *db, time_t now); +void will_delay__send_all(struct mosquitto_db *db); + #endif diff --git a/src/property_broker.c b/src/property_broker.c index 654c802b..3747df44 100644 --- a/src/property_broker.c +++ b/src/property_broker.c @@ -54,6 +54,62 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro return MOSQ_ERR_SUCCESS; } + +int property__process_will(struct mosquitto *context, struct mosquitto_message_all *msg, mosquitto_property *props) +{ + mosquitto_property *p, *p_prev; + mosquitto_property *msg_properties, *msg_properties_last; + + p = props; + p_prev = NULL; + msg_properties = NULL; + msg_properties_last = NULL; + while(p){ + switch(p->identifier){ + case MQTT_PROP_CONTENT_TYPE: + case MQTT_PROP_CORRELATION_DATA: + case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: + case MQTT_PROP_RESPONSE_TOPIC: + if(msg_properties){ + msg_properties_last->next = p; + msg_properties_last = p; + }else{ + msg_properties = p; + msg_properties_last = p; + } + if(p_prev){ + p_prev->next = p->next; + p = p_prev->next; + }else{ + props = p->next; + p = props; + } + msg_properties_last->next = NULL; + break; + + case MQTT_PROP_WILL_DELAY_INTERVAL: + context->will_delay_interval = p->value.i32; + p_prev = p; + p = p->next; + break; + + case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL: + msg->expiry_interval = p->value.i32; + p_prev = p; + p = p->next; + break; + + default: + return MOSQ_ERR_PROTOCOL; + break; + } + } + + msg->properties = msg_properties; + return MOSQ_ERR_SUCCESS; +} + + /* Process the incoming properties, we should be able to assume that only valid * properties for DISCONNECT are present here. */ int property__process_disconnect(struct mosquitto *context, mosquitto_property *props) diff --git a/src/will_delay.c b/src/will_delay.c new file mode 100644 index 00000000..b3ff8670 --- /dev/null +++ b/src/will_delay.c @@ -0,0 +1,94 @@ +/* +Copyright (c) 2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include +#include +#include + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "time_mosq.h" + +struct will_delay_list { + struct mosquitto *context; + struct will_delay_list *prev; + struct will_delay_list *next; +}; + +static struct will_delay_list *delay_list = NULL; +static time_t last_check = 0; + + +static int will_delay__cmp(struct will_delay_list *i1, struct will_delay_list *i2) +{ + return i1->context->will_delay_interval - i2->context->will_delay_interval; +} + + +int will_delay__add(struct mosquitto *context) +{ + struct will_delay_list *item; + + item = mosquitto__calloc(1, sizeof(struct will_delay_list)); + if(!item) return MOSQ_ERR_NOMEM; + + item->context = context; + item->context->will_delay_time = time(NULL) + item->context->will_delay_interval; + + DL_INSERT_INORDER(delay_list, item, will_delay__cmp); + + return MOSQ_ERR_SUCCESS; +} + + +/* Call on broker shutdown only */ +void will_delay__send_all(struct mosquitto_db *db) +{ + struct will_delay_list *item, *tmp; + + DL_FOREACH_SAFE(delay_list, item, tmp){ + DL_DELETE(delay_list, item); + item->context->will_delay_interval = 0; + context__send_will(db, item->context); + mosquitto__free(item); + } + +} + +void will_delay__check(struct mosquitto_db *db, time_t now) +{ + struct will_delay_list *item, *tmp; + + if(now <= last_check) return; + + last_check = now; + + DL_FOREACH_SAFE(delay_list, item, tmp){ + if(item->context->will_delay_time < now){ + DL_DELETE(delay_list, item); + item->context->will_delay_interval = 0; + context__send_will(db, item->context); + context__add_to_disused(db, item->context); + mosquitto__free(item); + }else{ + return; + } + } + +} + diff --git a/test/broker/07-will-delay-reconnect.py b/test/broker/07-will-delay-reconnect.py new file mode 100755 index 00000000..c5d5ac2e --- /dev/null +++ b/test/broker/07-will-delay-reconnect.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python + +# Test whether a client with a will delay handles correctly on the client reconnecting +# First connection is durable, second is clean session, and without a will, so the will should not be received. +# MQTT 5 + +from mosq_test_helper import * + + +def do_test(): + rc = 1 + keepalive = 60 + + mid = 1 + connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5) + connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3) + connect2a_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_properties=props, clean_session=False) + connack2a_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + connect2b_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, clean_session=True) + connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + subscribe_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=5) + suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5) + + pingreq_packet = mosq_test.gen_pingreq() + pingresp_packet = mosq_test.gen_pingresp() + + port = mosq_test.get_port() + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + + try: + sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port) + mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet, "suback") + + sock2 = mosq_test.do_client_connect(connect2a_packet, connack2a_packet, timeout=30, port=port) + sock2.close() + + time.sleep(1) + sock2 = mosq_test.do_client_connect(connect2b_packet, connack2b_packet, timeout=30, port=port) + time.sleep(3) + + # The client2 has reconnected within the original will delay interval, which has now + # passed, but it should have been deleted anyway. Disconnect and see + # whether we get the old will. We should not. + sock2.close() + + mosq_test.do_send_receive(sock1, pingreq_packet, pingresp_packet, "pingresp") + rc = 0 + + sock1.close() + sock2.close() + finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + exit(rc) + + +do_test() diff --git a/test/broker/07-will-delay-recover.py b/test/broker/07-will-delay-recover.py new file mode 100755 index 00000000..54322e19 --- /dev/null +++ b/test/broker/07-will-delay-recover.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +# Test whether a client with a will delay recovers on the client reconnecting +# MQTT 5 + +from mosq_test_helper import * + + +def do_test(clean_session): + rc = 1 + keepalive = 60 + + mid = 1 + connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5) + connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3) + connect2_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_properties=props, clean_session=clean_session) + connack2a_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + if clean_session == True: + connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + else: + connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1) + + subscribe_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=5) + suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5) + + pingreq_packet = mosq_test.gen_pingreq() + pingresp_packet = mosq_test.gen_pingresp() + + port = mosq_test.get_port() + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + + try: + sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port) + mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet, "suback") + + sock2 = mosq_test.do_client_connect(connect2_packet, connack2a_packet, timeout=30, port=port) + sock2.close() + + time.sleep(1) + sock2 = mosq_test.do_client_connect(connect2_packet, connack2b_packet, timeout=30, port=port) + time.sleep(3) + + # The client2 has reconnected within the will delay interval, which has now + # passed. We should not have received the will at this point. + mosq_test.do_send_receive(sock1, pingreq_packet, pingresp_packet, "pingresp") + rc = 0 + + sock1.close() + sock2.close() + finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + exit(rc) + + +do_test(clean_session=True) +do_test(clean_session=False) diff --git a/test/broker/07-will-delay.py b/test/broker/07-will-delay.py new file mode 100755 index 00000000..997d70c3 --- /dev/null +++ b/test/broker/07-will-delay.py @@ -0,0 +1,51 @@ +#!/usr/bin/env python + +# Test whether a client will is transmitted with a delay correctly. +# MQTT 5 + +from mosq_test_helper import * + +def do_test(clean_session): + rc = 1 + keepalive = 60 + + mid = 1 + connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5) + connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3) + connect2_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_qos=2, will_properties=props, clean_session=clean_session) + connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + + subscribe_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=5) + suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5) + + publish_packet = mosq_test.gen_publish("will/test", qos=0, payload="will delay", proto_ver=5) + + port = mosq_test.get_port() + broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + + try: + sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port) + mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet, "suback") + + sock2 = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=30, port=port) + sock2.close() + + t_start = time.time() + if mosq_test.expect_packet(sock1, "publish", publish_packet): + t_finish = time.time() + if t_finish - t_start > 2 and t_finish - t_start < 5: + rc = 0 + + sock1.close() + finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + exit(rc) + +do_test(clean_session=True) +do_test(clean_session=False) diff --git a/test/broker/Makefile b/test/broker/Makefile index 440c38f2..b78969b8 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -128,6 +128,9 @@ endif ./07-will-null-topic.py ./07-will-invalid-utf8.py ./07-will-no-flag.py + ./07-will-delay.py + ./07-will-delay-recover.py + ./07-will-delay-reconnect.py 08 : ifeq ($(WITH_TLS),yes) diff --git a/test/broker/test.py b/test/broker/test.py index f93d9c31..607a0fad 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -104,6 +104,9 @@ tests = [ (1, './07-will-null-topic.py'), (1, './07-will-invalid-utf8.py'), (1, './07-will-no-flag.py'), + (1, './07-will-delay.py'), + (1, './07-will-delay-recover.py'), + (1, './07-will-delay-reconnect.py'), (2, './08-ssl-connect-no-auth.py'), (2, './08-ssl-connect-no-auth-wrong-ca.py'), diff --git a/test/mosq_test.py b/test/mosq_test.py index bb5e9147..6e9bb7b4 100644 --- a/test/mosq_test.py +++ b/test/mosq_test.py @@ -26,7 +26,7 @@ def start_broker(filename, cmd=None, port=0, use_conf=False, expect_fail=False): port = 1888 if os.environ.get('MOSQ_USE_VALGRIND') is not None: - cmd = ['valgrind', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+filename+'.vglog'] + cmd + cmd = ['valgrind', '-q', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+filename+'.vglog'] + cmd delay = 1 #print(port)