diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index c6c03d1b..9b0120cc 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -460,9 +460,13 @@ int packet__read(struct mosquitto *mosq) * This is an arbitrary limit, but with some consideration. * If a client can't send 1000 bytes in a second it * probably shouldn't be using a 1 second keep alive. */ +#ifdef WITH_BROKER + keepalive__update(mosq); +#else pthread_mutex_lock(&mosq->msgtime_mutex); mosq->last_msg_in = mosquitto_time(); pthread_mutex_unlock(&mosq->msgtime_mutex); +#endif } return MOSQ_ERR_SUCCESS; }else{ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 65125308..6459cf4a 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -23,6 +23,7 @@ set (MOSQ_SRCS handle_subscribe.c ../lib/handle_unsuback.c handle_unsubscribe.c + keepalive.c lib_load.h logging.c loop.c diff --git a/src/Makefile b/src/Makefile index 393b1dc5..862b8b8c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -30,6 +30,7 @@ OBJS= mosquitto.o \ handle_subscribe.o \ handle_unsuback.o \ handle_unsubscribe.o \ + keepalive.o \ logging.o \ loop.o \ memory_mosq.o \ @@ -148,6 +149,9 @@ handle_unsuback.o : ../lib/handle_unsuback.c ../lib/read_handle.h handle_unsubscribe.o : handle_unsubscribe.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ +keepalive.o : keepalive.c mosquitto_broker_internal.h + ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ + logging.o : logging.c mosquitto_broker_internal.h ${CROSS_COMPILE}${CC} $(BROKER_CPPFLAGS) $(BROKER_CFLAGS) -c $< -o $@ diff --git a/src/context.c b/src/context.c index f07010a8..9b265633 100644 --- a/src/context.c +++ b/src/context.c @@ -215,6 +215,7 @@ void context__disconnect(struct mosquitto_db *db, struct mosquitto *context) }else{ session_expiry__add(db, context); } + keepalive__remove(context); mosquitto__set_state(context, mosq_cs_disconnected); } diff --git a/src/handle_connect.c b/src/handle_connect.c index a25e5122..c414bdec 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -269,6 +269,8 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v } free(auth_data_out); + keepalive__add(context); + mosquitto__set_state(context, mosq_cs_active); rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props); mosquitto_property_free_all(&connack_props); diff --git a/src/keepalive.c b/src/keepalive.c new file mode 100644 index 00000000..2a5810ee --- /dev/null +++ b/src/keepalive.c @@ -0,0 +1,72 @@ +/* +Copyright (c) 2009-2020 Roger Light + +All rights reserved. This program and the accompanying materials +are made available under the terms of the Eclipse Public License v1.0 +and Eclipse Distribution License v1.0 which accompany this distribution. + +The Eclipse Public License is available at + http://www.eclipse.org/legal/epl-v10.html +and the Eclipse Distribution License is available at + http://www.eclipse.org/org/documents/edl-v10.php. + +Contributors: + Roger Light - initial implementation and documentation. +*/ + +#include "config.h" +#include +#include "mosquitto_broker_internal.h" + + +static time_t last_keepalive_check = 0; + +/* FIXME - this is the prototype for the future tree/trie based keepalive check implementation. */ + +int keepalive__add(struct mosquitto *context) +{ + return MOSQ_ERR_SUCCESS; +} + + +void keepalive__check(struct mosquitto_db *db, time_t now) +{ + struct mosquitto *context, *ctxt_tmp; + + if(last_keepalive_check != now){ + last_keepalive_check = now; + + /* FIXME - this needs replacing with something more efficient */ + HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ + if(context->sock != INVALID_SOCKET){ + /* Local bridges never time out in this fashion. */ + if(!(context->keepalive) + || context->bridge + || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ + + }else{ + /* Client has exceeded keepalive*1.5 */ + do_disconnect(db, context, MOSQ_ERR_KEEPALIVE); + } + } + } + } +} + + +int keepalive__remove(struct mosquitto *context) +{ + return MOSQ_ERR_SUCCESS; +} + + +void keepalive__remove_all(void) +{ +} + + +int keepalive__update(struct mosquitto *context) +{ + context->last_msg_in = mosquitto_time(); + return MOSQ_ERR_SUCCESS; +} diff --git a/src/loop.c b/src/loop.c index 1c32a0bc..60eda9d2 100644 --- a/src/loop.c +++ b/src/loop.c @@ -125,32 +125,6 @@ void queue_plugin_msgs(struct mosquitto_db *db) } -void loop__keepalive_check(struct mosquitto_db *db, time_t now) -{ - struct mosquitto *context, *ctxt_tmp; - static time_t last_keepalive_check = 0; - - if(last_keepalive_check != now){ - last_keepalive_check = now; - - /* FIXME - this needs replacing with something more efficient */ - HASH_ITER(hh_sock, db->contexts_by_sock, context, ctxt_tmp){ - if(context->sock != INVALID_SOCKET){ - /* Local bridges never time out in this fashion. */ - if(!(context->keepalive) - || context->bridge - || now - context->last_msg_in <= (time_t)(context->keepalive)*3/2){ - - }else{ - /* Client has exceeded keepalive*1.5 */ - do_disconnect(db, context, MOSQ_ERR_KEEPALIVE); - } - } - } - } -} - - int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count) { #ifdef WITH_SYS_TREE @@ -188,7 +162,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, mosq_sock_t *listensock, int li #endif now = mosquitto_time(); - loop__keepalive_check(db, now); + keepalive__check(db, now); #ifdef WITH_BRIDGE bridge_check(db); diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 0ec608f4..e3d7892f 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -747,6 +747,15 @@ int mux__wait(void); int mux__handle(struct mosquitto_db *db, mosq_sock_t *listensock, int listensock_count); int mux__cleanup(struct mosquitto_db *db); +/* ============================================================ + * Property related functions + * ============================================================ */ +int keepalive__add(struct mosquitto *context); +void keepalive__check(struct mosquitto_db *db, time_t now); +int keepalive__remove(struct mosquitto *context); +void keepalive__remove_all(void); +int keepalive__update(struct mosquitto *context); + /* ============================================================ * Property related functions * ============================================================ */ diff --git a/src/websockets.c b/src/websockets.c index 44b583ac..d3fb5bdd 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -426,7 +426,7 @@ static int callback_mqtt(struct libwebsocket_context *context, /* Free data and reset values */ packet__cleanup(&mosq->in_packet); - mosq->last_msg_in = mosquitto_time(); + keepalive__update(context); if(rc && (mosq->out_packet || mosq->current_out_packet)) { if(mosq->state != mosq_cs_disconnecting){