Will delay tests and implementation.

pull/1203/head
Roger A. Light 7 years ago
parent b0c60fb6e1
commit c506c8335b

2
.gitignore vendored

@ -37,6 +37,8 @@ man/mosquitto_pub.1
man/mosquitto_sub.1
man/mqtt.7
out/
src/db_dump/mosquitto_db_dump
src/mosquitto
src/mosquitto_passwd

@ -195,6 +195,8 @@ struct mosquitto {
struct mosquitto__alias *aliases;
uint32_t maximum_packet_size;
int alias_count;
uint32_t will_delay_interval;
time_t will_delay_time;
#ifdef WITH_TLS
SSL *ssl;
SSL_CTX *ssl_ctx;

@ -60,6 +60,7 @@ OBJS= mosquitto.o \
util_mosq.o \
util_topic.o \
websockets.o \
will_delay.o \
will_mosq.o
mosquitto : ${OBJS}
@ -221,6 +222,9 @@ utf8_mosq.o : ../lib/utf8_mosq.c
websockets.o : websockets.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
will_delay.o : will_delay.c mosquitto_broker_internal.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@
will_mosq.o : ../lib/will_mosq.c ../lib/will_mosq.h
${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@

@ -218,6 +218,11 @@ void context__cleanup(struct mosquitto_db *db, struct mosquitto *context, bool d
void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt)
{
if(ctxt->state != mosq_cs_disconnecting && ctxt->will){
if(ctxt->will_delay_interval > 0){
will_delay__add(ctxt);
return;
}
if(mosquitto_acl_check(db, ctxt,
ctxt->will->msg.topic,
ctxt->will->msg.payloadlen,

File diff suppressed because it is too large Load Diff

@ -165,6 +165,7 @@ static int will__read(struct mosquitto *context, struct mosquitto_message_all **
struct mosquitto_message_all *will_struct = NULL;
char *will_topic_mount = NULL;
uint16_t payloadlen;
mosquitto_property *properties = NULL;
will_struct = mosquitto__calloc(1, sizeof(struct mosquitto_message_all));
if(!will_struct){
@ -172,11 +173,12 @@ static int will__read(struct mosquitto *context, struct mosquitto_message_all **
goto error_cleanup;
}
if(context->protocol == PROTOCOL_VERSION_v5){
rc = property__read_all(CMD_WILL, &context->in_packet, &will_struct->properties);
rc = property__read_all(CMD_WILL, &context->in_packet, &properties);
if(rc) goto error_cleanup;
mosquitto_property_read_int32(will_struct->properties, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &will_struct->expiry_interval, false);
mosquitto_property_free_all(&will_struct->properties); /* FIXME - TEMPORARY UNTIL PROPERTIES PROCESSED */
rc = property__process_will(context, will_struct, properties);
mosquitto_property_free_all(&properties);
if(rc) goto error_cleanup;
}
rc = packet__read_string(&context->in_packet, &will_struct->msg.topic, &slen);
if(rc) goto error_cleanup;
@ -228,6 +230,7 @@ error_cleanup:
if(will_struct){
mosquitto__free(will_struct->msg.topic);
mosquitto__free(will_struct->msg.payload);
mosquitto_property_free_all(&will_struct->properties);
mosquitto__free(will_struct);
}
return rc;

@ -295,6 +295,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
topic = stored->topic;
dup = 1;
mosquitto_property_free_all(&msg_properties);
UHPA_FREE(payload, payloadlen);
}
switch(qos){

@ -555,6 +555,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li
}
}
#endif
will_delay__check(db, time(NULL));
#ifdef WITH_PERSISTENCE
if(db->config->persistence && db->config->autosave_interval){
if(db->config->autosave_on_changes){
@ -673,11 +674,14 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context)
#else
if(context->clean_start){
#endif
context__add_to_disused(db, context);
if(context->id){
context__remove_from_by_id(db, context);
mosquitto__free(context->id);
context->id = NULL;
if(context->will_delay_interval == 0){
/* This will be done later, after the will is published */
context__add_to_disused(db, context);
if(context->id){
context__remove_from_by_id(db, context);
mosquitto__free(context->id);
context->id = NULL;
}
}
}
context->state = mosq_cs_disconnected;

@ -384,9 +384,13 @@ int main(int argc, char *argv[])
}
#endif
/* FIXME - this isn't quite right, all wills with will delay zero should be
* sent now, but those with positive will delay should be persisted and
* restored, pending the client reconnecting in time. */
HASH_ITER(hh_id, int_db.contexts_by_id, ctxt, ctxt_tmp){
context__send_will(&int_db, ctxt);
}
will_delay__send_all(&int_db);
#ifdef WITH_PERSISTENCE
if(config.persistence){

@ -628,6 +628,7 @@ void bridge__packet_cleanup(struct mosquitto *context);
* Property related functions
* ============================================================ */
int property__process_connect(struct mosquitto *context, mosquitto_property *props);
int property__process_will(struct mosquitto *context, struct mosquitto_message_all *msg, mosquitto_property *props);
int property__process_disconnect(struct mosquitto *context, mosquitto_property *props);
/* ============================================================
@ -674,5 +675,12 @@ struct libwebsocket_context *mosq_websockets_init(struct mosquitto__listener *li
#endif
void do_disconnect(struct mosquitto_db *db, struct mosquitto *context);
/* ============================================================
* Will delay
* ============================================================ */
int will_delay__add(struct mosquitto *context);
void will_delay__check(struct mosquitto_db *db, time_t now);
void will_delay__send_all(struct mosquitto_db *db);
#endif

@ -54,6 +54,62 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro
return MOSQ_ERR_SUCCESS;
}
int property__process_will(struct mosquitto *context, struct mosquitto_message_all *msg, mosquitto_property *props)
{
mosquitto_property *p, *p_prev;
mosquitto_property *msg_properties, *msg_properties_last;
p = props;
p_prev = NULL;
msg_properties = NULL;
msg_properties_last = NULL;
while(p){
switch(p->identifier){
case MQTT_PROP_CONTENT_TYPE:
case MQTT_PROP_CORRELATION_DATA:
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
case MQTT_PROP_RESPONSE_TOPIC:
if(msg_properties){
msg_properties_last->next = p;
msg_properties_last = p;
}else{
msg_properties = p;
msg_properties_last = p;
}
if(p_prev){
p_prev->next = p->next;
p = p_prev->next;
}else{
props = p->next;
p = props;
}
msg_properties_last->next = NULL;
break;
case MQTT_PROP_WILL_DELAY_INTERVAL:
context->will_delay_interval = p->value.i32;
p_prev = p;
p = p->next;
break;
case MQTT_PROP_MESSAGE_EXPIRY_INTERVAL:
msg->expiry_interval = p->value.i32;
p_prev = p;
p = p->next;
break;
default:
return MOSQ_ERR_PROTOCOL;
break;
}
}
msg->properties = msg_properties;
return MOSQ_ERR_SUCCESS;
}
/* Process the incoming properties, we should be able to assume that only valid
* properties for DISCONNECT are present here. */
int property__process_disconnect(struct mosquitto *context, mosquitto_property *props)

@ -0,0 +1,94 @@
/*
Copyright (c) 2019 Roger Light <roger@atchoo.org>
All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License v1.0
and Eclipse Distribution License v1.0 which accompany this distribution.
The Eclipse Public License is available at
http://www.eclipse.org/legal/epl-v10.html
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.
Contributors:
Roger Light - initial implementation and documentation.
*/
#include "config.h"
#include <math.h>
#include <stdio.h>
#include <utlist.h>
#include "mosquitto_broker_internal.h"
#include "memory_mosq.h"
#include "time_mosq.h"
struct will_delay_list {
struct mosquitto *context;
struct will_delay_list *prev;
struct will_delay_list *next;
};
static struct will_delay_list *delay_list = NULL;
static time_t last_check = 0;
static int will_delay__cmp(struct will_delay_list *i1, struct will_delay_list *i2)
{
return i1->context->will_delay_interval - i2->context->will_delay_interval;
}
int will_delay__add(struct mosquitto *context)
{
struct will_delay_list *item;
item = mosquitto__calloc(1, sizeof(struct will_delay_list));
if(!item) return MOSQ_ERR_NOMEM;
item->context = context;
item->context->will_delay_time = time(NULL) + item->context->will_delay_interval;
DL_INSERT_INORDER(delay_list, item, will_delay__cmp);
return MOSQ_ERR_SUCCESS;
}
/* Call on broker shutdown only */
void will_delay__send_all(struct mosquitto_db *db)
{
struct will_delay_list *item, *tmp;
DL_FOREACH_SAFE(delay_list, item, tmp){
DL_DELETE(delay_list, item);
item->context->will_delay_interval = 0;
context__send_will(db, item->context);
mosquitto__free(item);
}
}
void will_delay__check(struct mosquitto_db *db, time_t now)
{
struct will_delay_list *item, *tmp;
if(now <= last_check) return;
last_check = now;
DL_FOREACH_SAFE(delay_list, item, tmp){
if(item->context->will_delay_time < now){
DL_DELETE(delay_list, item);
item->context->will_delay_interval = 0;
context__send_will(db, item->context);
context__add_to_disused(db, item->context);
mosquitto__free(item);
}else{
return;
}
}
}

@ -0,0 +1,64 @@
#!/usr/bin/env python
# Test whether a client with a will delay handles correctly on the client reconnecting
# First connection is durable, second is clean session, and without a will, so the will should not be received.
# MQTT 5
from mosq_test_helper import *
def do_test():
rc = 1
keepalive = 60
mid = 1
connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3)
connect2a_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_properties=props, clean_session=False)
connack2a_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
connect2b_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, clean_session=True)
connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
subscribe_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port)
mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet, "suback")
sock2 = mosq_test.do_client_connect(connect2a_packet, connack2a_packet, timeout=30, port=port)
sock2.close()
time.sleep(1)
sock2 = mosq_test.do_client_connect(connect2b_packet, connack2b_packet, timeout=30, port=port)
time.sleep(3)
# The client2 has reconnected within the original will delay interval, which has now
# passed, but it should have been deleted anyway. Disconnect and see
# whether we get the old will. We should not.
sock2.close()
mosq_test.do_send_receive(sock1, pingreq_packet, pingresp_packet, "pingresp")
rc = 0
sock1.close()
sock2.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)
do_test()

@ -0,0 +1,62 @@
#!/usr/bin/env python
# Test whether a client with a will delay recovers on the client reconnecting
# MQTT 5
from mosq_test_helper import *
def do_test(clean_session):
rc = 1
keepalive = 60
mid = 1
connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3)
connect2_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_properties=props, clean_session=clean_session)
connack2a_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
if clean_session == True:
connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
else:
connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1)
subscribe_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
pingreq_packet = mosq_test.gen_pingreq()
pingresp_packet = mosq_test.gen_pingresp()
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port)
mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet, "suback")
sock2 = mosq_test.do_client_connect(connect2_packet, connack2a_packet, timeout=30, port=port)
sock2.close()
time.sleep(1)
sock2 = mosq_test.do_client_connect(connect2_packet, connack2b_packet, timeout=30, port=port)
time.sleep(3)
# The client2 has reconnected within the will delay interval, which has now
# passed. We should not have received the will at this point.
mosq_test.do_send_receive(sock1, pingreq_packet, pingresp_packet, "pingresp")
rc = 0
sock1.close()
sock2.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)
do_test(clean_session=True)
do_test(clean_session=False)

@ -0,0 +1,51 @@
#!/usr/bin/env python
# Test whether a client will is transmitted with a delay correctly.
# MQTT 5
from mosq_test_helper import *
def do_test(clean_session):
rc = 1
keepalive = 60
mid = 1
connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5)
connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3)
connect2_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_qos=2, will_properties=props, clean_session=clean_session)
connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
subscribe_packet = mosq_test.gen_subscribe(mid, "will/test", 0, proto_ver=5)
suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
publish_packet = mosq_test.gen_publish("will/test", qos=0, payload="will delay", proto_ver=5)
port = mosq_test.get_port()
broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
try:
sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=30, port=port)
mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet, "suback")
sock2 = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=30, port=port)
sock2.close()
t_start = time.time()
if mosq_test.expect_packet(sock1, "publish", publish_packet):
t_finish = time.time()
if t_finish - t_start > 2 and t_finish - t_start < 5:
rc = 0
sock1.close()
finally:
broker.terminate()
broker.wait()
(stdo, stde) = broker.communicate()
if rc:
print(stde)
exit(rc)
do_test(clean_session=True)
do_test(clean_session=False)

@ -128,6 +128,9 @@ endif
./07-will-null-topic.py
./07-will-invalid-utf8.py
./07-will-no-flag.py
./07-will-delay.py
./07-will-delay-recover.py
./07-will-delay-reconnect.py
08 :
ifeq ($(WITH_TLS),yes)

@ -104,6 +104,9 @@ tests = [
(1, './07-will-null-topic.py'),
(1, './07-will-invalid-utf8.py'),
(1, './07-will-no-flag.py'),
(1, './07-will-delay.py'),
(1, './07-will-delay-recover.py'),
(1, './07-will-delay-reconnect.py'),
(2, './08-ssl-connect-no-auth.py'),
(2, './08-ssl-connect-no-auth-wrong-ca.py'),

@ -26,7 +26,7 @@ def start_broker(filename, cmd=None, port=0, use_conf=False, expect_fail=False):
port = 1888
if os.environ.get('MOSQ_USE_VALGRIND') is not None:
cmd = ['valgrind', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+filename+'.vglog'] + cmd
cmd = ['valgrind', '-q', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+filename+'.vglog'] + cmd
delay = 1
#print(port)

Loading…
Cancel
Save