From ac91144495ad054fc6536f0fedc9422c1d6e4152 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Wed, 6 Mar 2019 14:47:16 +0000 Subject: [PATCH] Session expiry interval support - not working for file persistence. --- lib/mosquitto_internal.h | 8 ++ src/CMakeLists.txt | 3 +- src/Makefile | 4 + src/context.c | 29 ++++- src/handle_connect.c | 20 +++- src/loop.c | 24 +--- src/mosquitto_broker_internal.h | 8 ++ src/session_expiry.c | 103 ++++++++++++++++++ .../02-subpub-qos1-message-expiry-will.py | 3 +- test/broker/02-subpub-qos1-message-expiry.py | 3 +- test/broker/05-session-expiry-v5.py | 87 +++++++++++++++ test/broker/07-will-delay-recover.py | 3 +- test/broker/Makefile | 1 + test/broker/test.py | 1 + 14 files changed, 264 insertions(+), 33 deletions(-) create mode 100644 src/session_expiry.c create mode 100755 test/broker/05-session-expiry-v5.py diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index da9d8422..74d75ec8 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -139,6 +139,12 @@ struct mosquitto__alias{ uint16_t alias; }; +struct session_expiry_list { + struct mosquitto *context; + struct session_expiry_list *prev; + struct session_expiry_list *next; +}; + struct mosquitto__packet{ uint8_t *payload; struct mosquitto__packet *next; @@ -233,6 +239,7 @@ struct mosquitto { #endif bool clean_start; uint32_t session_expiry_interval; + time_t session_expiry_time; #ifdef WITH_BROKER bool removed_from_by_id; /* True if removed from by_id hash */ bool is_dropping; @@ -314,6 +321,7 @@ struct mosquitto { UT_hash_handle hh_id; UT_hash_handle hh_sock; struct mosquitto *for_free_next; + struct session_expiry_list *expiry_list_item; #endif #ifdef WITH_EPOLL uint32_t events; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index af92255a..0e5dd3c1 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -36,7 +36,6 @@ set (MOSQ_SRCS ../lib/property_mosq.c ../lib/property_mosq.h read_handle.c ../lib/read_handle.h - subs.c security.c security_default.c ../lib/send_mosq.c ../lib/send_mosq.h send_connack.c @@ -48,6 +47,8 @@ set (MOSQ_SRCS ../lib/send_subscribe.c send_unsuback.c ../lib/send_unsubscribe.c + session_expiry.c + subs.c sys_tree.c sys_tree.h ../lib/time_mosq.c ../lib/tls_mosq.c diff --git a/src/Makefile b/src/Makefile index 3776b73d..9ff3696e 100644 --- a/src/Makefile +++ b/src/Makefile @@ -51,6 +51,7 @@ OBJS= mosquitto.o \ send_unsuback.o \ send_unsubscribe.o \ service.o \ + session_expiry.o \ signals.o \ subs.o \ sys_tree.o \ @@ -195,6 +196,9 @@ send_unsubscribe.o : ../lib/send_unsubscribe.c ../lib/send_mosq.h service.o : service.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ +session_expiry.o : session_expiry.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ + signals.o : signals.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/context.c b/src/context.c index 4a1b0eb8..2511a872 100644 --- a/src/context.c +++ b/src/context.c @@ -249,12 +249,33 @@ void context__send_will(struct mosquitto_db *db, struct mosquitto *ctxt) } -void context__disconnect(struct mosquitto_db *db, struct mosquitto *ctxt) +void context__disconnect(struct mosquitto_db *db, struct mosquitto *context) { - context__send_will(db, ctxt); + if(context->session_expiry_interval == 0){ + context__send_will(db, context); - ctxt->disconnect_t = time(NULL); - net__socket_close(db, ctxt); + context->disconnect_t = time(NULL); + net__socket_close(db, context); + +#ifdef WITH_BRIDGE + if(!context->bridge) +#endif + { + + if(context->will_delay_interval == 0){ + /* This will be done later, after the will is published */ + context__add_to_disused(db, context); + if(context->id){ + context__remove_from_by_id(db, context); + mosquitto__free(context->id); + context->id = NULL; + } + } + } + }else{ + session_expiry__add(context); + } + context->state = mosq_cs_disconnected; } void context__add_to_disused(struct mosquitto_db *db, struct mosquitto *context) diff --git a/src/handle_connect.c b/src/handle_connect.c index 4a27eb78..47b3d242 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -313,6 +313,13 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context) } clean_start = (connect_flags & 0x02) >> 1; + /* session_expiry_interval will be overriden if the properties are read later */ + if(clean_start == false && protocol_version != PROTOCOL_VERSION_v5){ + /* v3* has clean_start == false mean the session never expires */ + context->session_expiry_interval = UINT32_MAX; + }else{ + context->session_expiry_interval = 0; + } will = connect_flags & 0x04; will_qos = (connect_flags & 0x18) >> 3; if(will_qos == 3){ @@ -630,15 +637,13 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context) } } - if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt5){ - if(clean_start == 0){ + context->clean_start = clean_start; + + if(context->clean_start == false && found_context->session_expiry_interval > 0){ + if(context->protocol == mosq_p_mqtt311 || context->protocol == mosq_p_mqtt5){ connect_ack |= 0x01; } - } - - context->clean_start = clean_start; - if(context->clean_start == false && found_context->clean_start == false){ if(found_context->inflight_msgs || found_context->queued_msgs){ context->inflight_msgs = found_context->inflight_msgs; context->queued_msgs = found_context->queued_msgs; @@ -665,7 +670,10 @@ int handle__connect(struct mosquitto_db *db, struct mosquitto *context) } } + session_expiry__remove(found_context); + found_context->clean_start = true; + found_context->session_expiry_interval = 0; found_context->state = mosq_cs_duplicate; do_disconnect(db, found_context); } diff --git a/src/loop.c b/src/loop.c index 6deb4b23..69aacc95 100644 --- a/src/loop.c +++ b/src/loop.c @@ -472,7 +472,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li now_time = time(NULL); if(db->config->persistent_client_expiration > 0 && now_time > expiration_check_time){ HASH_ITER(hh_id, db->contexts_by_id, context, ctxt_tmp){ - if(context->sock == INVALID_SOCKET && context->clean_start == 0){ + if(context->sock == INVALID_SOCKET && context->session_expiry_interval > 0){ /* This is a persistent client, check to see if the * last time it connected was longer than * persistent_client_expiration seconds ago. If so, @@ -486,7 +486,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } log__printf(NULL, MOSQ_LOG_NOTICE, "Expiring persistent client %s due to timeout.", id); G_CLIENTS_EXPIRED_INC(); - context->clean_start = true; + context->session_expiry_interval = 0; context->state = mosq_cs_expiring; do_disconnect(db, context); } @@ -555,7 +555,9 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li } } #endif - will_delay__check(db, time(NULL)); + now = time(NULL); + session_expiry__check(db, now); + will_delay__check(db, now); #ifdef WITH_PERSISTENCE if(db->config->persistence && db->config->autosave_interval){ if(db->config->autosave_on_changes){ @@ -669,22 +671,6 @@ void do_disconnect(struct mosquitto_db *db, struct mosquitto *context) } #endif context__disconnect(db, context); -#ifdef WITH_BRIDGE - if(context->clean_start && !context->bridge){ -#else - if(context->clean_start){ -#endif - if(context->will_delay_interval == 0){ - /* This will be done later, after the will is published */ - context__add_to_disused(db, context); - if(context->id){ - context__remove_from_by_id(db, context); - mosquitto__free(context->id); - context->id = NULL; - } - } - } - context->state = mosq_cs_disconnected; } } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 5351fd12..7d327931 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -657,6 +657,14 @@ int mosquitto_acl_check_default(struct mosquitto_db *db, struct mosquitto *conte int mosquitto_unpwd_check_default(struct mosquitto_db *db, struct mosquitto *context, const char *username, const char *password); int mosquitto_psk_key_get_default(struct mosquitto_db *db, struct mosquitto *context, const char *hint, const char *identity, char *key, int max_key_len); +/* ============================================================ + * Session expiry + * ============================================================ */ +int session_expiry__add(struct mosquitto *context); +void session_expiry__remove(struct mosquitto *context); +void session_expiry__check(struct mosquitto_db *db, time_t now); +void session_expiry__send_all(struct mosquitto_db *db); + /* ============================================================ * Window service and signal related functions * ============================================================ */ diff --git a/src/session_expiry.c b/src/session_expiry.c new file mode 100644 index 00000000..01d3905e --- /dev/null +++ b/src/session_expiry.c @@ -0,0 +1,103 @@ +/* +Copyright (c) 2019 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" + +#include +#include +#include + +#include "mosquitto_broker_internal.h" +#include "memory_mosq.h" +#include "time_mosq.h" + +static struct session_expiry_list *expiry_list = NULL; +static time_t last_check = 0; + + +static int session_expiry__cmp(struct session_expiry_list *i1, struct session_expiry_list *i2) +{ + return i1->context->session_expiry_interval - i2->context->session_expiry_interval; +} + + +int session_expiry__add(struct mosquitto *context) +{ + struct session_expiry_list *item; + + item = mosquitto__calloc(1, sizeof(struct session_expiry_list)); + if(!item) return MOSQ_ERR_NOMEM; + + item->context = context; + item->context->session_expiry_time = time(NULL) + item->context->session_expiry_interval; + context->expiry_list_item = item; + + DL_INSERT_INORDER(expiry_list, item, session_expiry__cmp); + + return MOSQ_ERR_SUCCESS; +} + + +void session_expiry__remove(struct mosquitto *context) +{ + if(context->expiry_list_item){ + DL_DELETE(expiry_list, context->expiry_list_item); + mosquitto__free(context->expiry_list_item); + context->expiry_list_item = NULL; + } +} + + +/* Call on broker shutdown only */ +void session_expiry__remove_all(struct mosquitto_db *db) +{ + struct session_expiry_list *item, *tmp; + struct mosquitto *context; + + DL_FOREACH_SAFE(expiry_list, item, tmp){ + context = item->context; + session_expiry__remove(context); + context->session_expiry_interval = 0; + context->will_delay_interval = 0; + context__send_will(db, context); + } + +} + +void session_expiry__check(struct mosquitto_db *db, time_t now) +{ + struct session_expiry_list *item, *tmp; + struct mosquitto *context; + + if(now <= last_check) return; + + last_check = now; + + DL_FOREACH_SAFE(expiry_list, item, tmp){ + if(item->context->session_expiry_time < now){ + context = item->context; + session_expiry__remove(context); + + context->session_expiry_interval = 0; + context__send_will(db, context); + context__add_to_disused(db, context); + }else{ + return; + } + } + +} + diff --git a/test/broker/02-subpub-qos1-message-expiry-will.py b/test/broker/02-subpub-qos1-message-expiry-will.py index b8b139b2..c2e728c5 100755 --- a/test/broker/02-subpub-qos1-message-expiry-will.py +++ b/test/broker/02-subpub-qos1-message-expiry-will.py @@ -14,7 +14,8 @@ from mosq_test_helper import * rc = 1 mid = 53 keepalive = 60 -connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False) +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60) +connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False, properties=props) connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1) diff --git a/test/broker/02-subpub-qos1-message-expiry.py b/test/broker/02-subpub-qos1-message-expiry.py index 3ba16773..b1bbac95 100755 --- a/test/broker/02-subpub-qos1-message-expiry.py +++ b/test/broker/02-subpub-qos1-message-expiry.py @@ -14,7 +14,8 @@ from mosq_test_helper import * rc = 1 mid = 53 keepalive = 60 -connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False) +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 60) +connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5, clean_session=False, properties=props) connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) connack2_packet = mosq_test.gen_connack(rc=0, proto_ver=5, flags=1) diff --git a/test/broker/05-session-expiry-v5.py b/test/broker/05-session-expiry-v5.py new file mode 100755 index 00000000..ebf5585b --- /dev/null +++ b/test/broker/05-session-expiry-v5.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python + +# MQTT v5. Test whether session expiry interval works correctly. + +from mosq_test_helper import * + +rc = 1 +keepalive = 60 + +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 1) +connect_packet = mosq_test.gen_connect("clean-qos2-test", keepalive=keepalive, clean_session=False, proto_ver=5, properties=props) +connack1_packet = mosq_test.gen_connack(flags=0, rc=0, proto_ver=5) + +connack2_packet = mosq_test.gen_connack(flags=1, rc=0, proto_ver=5) + +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 3) +disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, properties=props) + +props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 0) +disconnect2_packet = mosq_test.gen_disconnect(proto_ver=5, properties=props) + +port = mosq_test.get_port() +broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port) + +try: + # First connect, clean start is false, we expect a normal connack + sock = mosq_test.do_client_connect(connect_packet, connack1_packet, port=port, connack_error="connack 1") + # Forceful disconnect + sock.close() + + # Immediate second connect, clean start is false, we expect a connack with + # previous state + sock = mosq_test.do_client_connect(connect_packet, connack2_packet, port=port, connack_error="connack 2") + sock.close() + + # Session should expire in one second, so sleep longer + time.sleep(2) + + # Third connect, clean start is false, session should have expired so we + # expect a normal connack + sock = mosq_test.do_client_connect(connect_packet, connack1_packet, port=port, connack_error="connack 3") + # Send DISCONNECT with new session expiry, then close + sock.send(disconnect_packet) + sock.close() + + # Immediate reconnect, clean start is false, we expect a connack with + # previous state + sock = mosq_test.do_client_connect(connect_packet, connack2_packet, port=port, connack_error="connack 4") + # Send DISCONNECT with new session expiry, then close + sock.send(disconnect_packet) + sock.close() + + # Session should expire in three seconds if it has been updated, sleep for + # 2 to check it is updated from 1 second. + time.sleep(2) + + # Immediate reconnect, clean start is false, we expect a connack with + # previous state + sock = mosq_test.do_client_connect(connect_packet, connack2_packet, port=port, connack_error="connack 5") + # Send DISCONNECT with new session expiry, then close + sock.send(disconnect_packet) + sock.close() + + # Session should expire in three seconds, so sleep longer + time.sleep(4) + # Third connect, clean start is false, session should have expired so we + # expect a normal connack + sock = mosq_test.do_client_connect(connect_packet, connack1_packet, port=port, connack_error="connack 6") + # Send DISCONNECT with 0 session expiry, then close + sock.send(disconnect2_packet) + sock.close() + + # Immediate reconnect, session should have been removed. + sock = mosq_test.do_client_connect(connect_packet, connack1_packet, port=port, connack_error="connack 7") + sock.close() + rc = 0 + + sock.close() +finally: + broker.terminate() + broker.wait() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/07-will-delay-recover.py b/test/broker/07-will-delay-recover.py index 54322e19..dcf0fbda 100755 --- a/test/broker/07-will-delay-recover.py +++ b/test/broker/07-will-delay-recover.py @@ -14,8 +14,9 @@ def do_test(clean_session): connect1_packet = mosq_test.gen_connect("will-qos0-test", keepalive=keepalive, proto_ver=5) connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5) + connect_props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 30) props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_WILL_DELAY_INTERVAL, 3) - connect2_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_properties=props, clean_session=clean_session) + connect2_packet = mosq_test.gen_connect("will-helper", keepalive=keepalive, proto_ver=5, will_topic="will/test", will_payload="will delay", will_properties=props, clean_session=clean_session, properties=connect_props) connack2a_packet = mosq_test.gen_connack(rc=0, proto_ver=5) if clean_session == True: connack2b_packet = mosq_test.gen_connack(rc=0, proto_ver=5) diff --git a/test/broker/Makefile b/test/broker/Makefile index 6f378508..6d2feec4 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -112,6 +112,7 @@ endif 05 : ./05-clean-session-qos1.py + ./05-session-expiry-v5.py 06 : ./06-bridge-b2br-disconnect-qos1.py diff --git a/test/broker/test.py b/test/broker/test.py index 93583ee7..b509a989 100755 --- a/test/broker/test.py +++ b/test/broker/test.py @@ -90,6 +90,7 @@ tests = [ (2, './04-retain-check-source-persist-diff-port.py'), (1, './05-clean-session-qos1.py'), + (1, './05-session-expiry-v5.py'), (2, './06-bridge-b2br-disconnect-qos1.py'), (2, './06-bridge-b2br-disconnect-qos2.py'),