diff --git a/lib/connect.c b/lib/connect.c index 615a381c..bcece56c 100644 --- a/lib/connect.c +++ b/lib/connect.c @@ -267,6 +267,7 @@ void do_client_disconnect(struct mosquitto *mosq, int reason_code, const mosquit mosq->current_out_packet = mosq->out_packet; if(mosq->out_packet){ mosq->out_packet = mosq->out_packet->next; + mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 9a8fd66c..bfc04199 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -224,7 +224,6 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st void mosquitto__destroy(struct mosquitto *mosq) { - struct mosquitto__packet *packet; if(!mosq) return; #ifdef WITH_THREADING @@ -295,22 +294,7 @@ void mosquitto__destroy(struct mosquitto *mosq) mosquitto_property_free_all(&mosq->connect_properties); - /* Out packet cleanup */ - if(mosq->out_packet && !mosq->current_out_packet){ - mosq->current_out_packet = mosq->out_packet; - mosq->out_packet = mosq->out_packet->next; - } - while(mosq->current_out_packet){ - packet = mosq->current_out_packet; - /* Free data and reset values */ - mosq->current_out_packet = mosq->out_packet; - if(mosq->out_packet){ - mosq->out_packet = mosq->out_packet->next; - } - - packet__cleanup(packet); - mosquitto__free(packet); - } + packet__cleanup_all(mosq); packet__cleanup(&mosq->in_packet); if(mosq->sockpairR != INVALID_SOCKET){ diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index be0fda70..d0df571b 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -233,8 +233,9 @@ struct mosquitto { struct mosquitto_message_all *will; struct mosquitto__alias *aliases; struct will_delay_list *will_delay_entry; - uint32_t maximum_packet_size; + int out_packet_len; int alias_count; + uint32_t maximum_packet_size; uint32_t will_delay_interval; time_t will_delay_time; #ifdef WITH_TLS diff --git a/lib/packet_mosq.c b/lib/packet_mosq.c index 3b45bca8..11595459 100644 --- a/lib/packet_mosq.c +++ b/lib/packet_mosq.c @@ -123,6 +123,7 @@ void packet__cleanup_all(struct mosquitto *mosq) packet__cleanup(packet); mosquitto__free(packet); } + mosq->out_packet_len = 0; packet__cleanup(&mosq->in_packet); @@ -150,6 +151,7 @@ int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet) mosq->out_packet = packet; } mosq->out_packet_last = packet; + mosq->out_packet_len++; pthread_mutex_unlock(&mosq->out_packet_mutex); #ifdef WITH_BROKER # ifdef WITH_WEBSOCKETS @@ -213,6 +215,7 @@ int packet__write(struct mosquitto *mosq) if(mosq->out_packet && !mosq->current_out_packet){ mosq->current_out_packet = mosq->out_packet; mosq->out_packet = mosq->out_packet->next; + mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } @@ -294,6 +297,7 @@ int packet__write(struct mosquitto *mosq) mosq->current_out_packet = mosq->out_packet; if(mosq->out_packet){ mosq->out_packet = mosq->out_packet->next; + mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } diff --git a/lib/send_publish.c b/lib/send_publish.c index bcbc7795..00aecdcf 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -63,6 +63,20 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 } #ifdef WITH_BROKER + if(qos == 0){ + /* This is a crude, incorrect limit on the number of QoS 0 PUBLISHes. + * We limit QoS 1 and 2 *messages* to max_inflight_messages+max_queued_messages. + * We don't create QoS 0 *messages* though, only *packets*. So it is + * tricky to add a correct limit on QoS 0 PUBLISHes. + * This check will drop any further outgoing QoS PUBLISHes if the queue + * of packets to be sent hits the max_queued_messages limit. It won't + * be exactly correct, but does set an upper limit on queued QoS 0 + * packets. + */ + if(mosq->out_packet_len >= db.config->max_queued_messages){ + return MOSQ_ERR_SUCCESS; + } + } if(mosq->listener && mosq->listener->mount_point){ len = strlen(mosq->listener->mount_point); if(len < strlen(topic)){ diff --git a/src/bridge.c b/src/bridge.c index 199a156d..4fccc59f 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -609,6 +609,7 @@ void bridge__packet_cleanup(struct mosquitto *context) } context->out_packet = NULL; context->out_packet_last = NULL; + context->out_packet_len = 0; packet__cleanup(&(context->in_packet)); } diff --git a/src/conf.c b/src/conf.c index 221a5ba2..6271cbc8 100644 --- a/src/conf.c +++ b/src/conf.c @@ -52,9 +52,6 @@ struct config_recurse { int log_dest_set; unsigned int log_type; int log_type_set; - unsigned long max_inflight_bytes; - unsigned long max_queued_bytes; - int max_queued_messages; }; #if defined(WIN32) || defined(__CYGWIN__) @@ -191,6 +188,9 @@ static void config__init_reload(struct mosquitto__config *config) config->max_keepalive = 65535; config->max_packet_size = 0; config->max_inflight_messages = 20; + config->max_queued_messages = 1000; + config->max_inflight_bytes = 0; + config->max_queued_bytes = 0; config->persistence = false; mosquitto__free(config->persistence_location); config->persistence_location = NULL; @@ -607,9 +607,6 @@ int config__read(struct mosquitto__config *config, bool reload) cr.log_dest_set = 0; cr.log_type = MOSQ_LOG_NONE; cr.log_type_set = 0; - cr.max_inflight_bytes = 0; - cr.max_queued_bytes = 0; - cr.max_queued_messages = 1000; if(!db.config_file) return 0; @@ -676,8 +673,6 @@ int config__read(struct mosquitto__config *config, bool reload) config->user = mosquitto__strdup("mosquitto"); } - db__limits_set(cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes); - #ifdef WITH_BRIDGE for(i=0; ibridge_count; i++){ if(!config->bridges[i].name){ @@ -1629,12 +1624,9 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct } cur_listener->maximum_qos = (uint8_t)tmp_int; }else if(!strcmp(token, "max_inflight_bytes")){ - token = strtok_r(NULL, " ", &saveptr); - if(token){ - cr->max_inflight_bytes = (unsigned long)atol(token); - }else{ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration."); - } + if(conf__parse_int(&token, "max_inflight_bytes", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; + if(tmp_int < 0) tmp_int = 0; + config->max_inflight_bytes = (size_t)tmp_int; }else if(!strcmp(token, "max_inflight_messages")){ if(conf__parse_int(&token, "max_inflight_messages", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; if(tmp_int < 0 || tmp_int == UINT16_MAX){ @@ -1659,20 +1651,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct } config->max_packet_size = (uint32_t)tmp_int; }else if(!strcmp(token, "max_queued_bytes")){ - token = strtok_r(NULL, " ", &saveptr); - if(token){ - cr->max_queued_bytes = (unsigned long)atol(token); /* 63 bits is ok right? */ - }else{ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration."); - } + if(conf__parse_int(&token, "max_queued_bytes", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; + if(tmp_int < 0) tmp_int = 0; + config->max_queued_bytes = (size_t)tmp_int; }else if(!strcmp(token, "max_queued_messages")){ - token = strtok_r(NULL, " ", &saveptr); - if(token){ - cr->max_queued_messages = atoi(token); - if(cr->max_queued_messages < 0) cr->max_queued_messages = 0; - }else{ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_messages value in configuration."); - } + if(conf__parse_int(&token, "max_queued_messages", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; + if(tmp_int < 0) tmp_int = 0; + config->max_queued_messages = tmp_int; }else if(!strcmp(token, "memory_limit")){ ssize_t lim; if(conf__parse_ssize_t(&token, "memory_limit", &lim, saveptr)) return MOSQ_ERR_INVAL; diff --git a/src/context.c b/src/context.c index 7e99ac30..f98e4bfa 100644 --- a/src/context.c +++ b/src/context.c @@ -65,6 +65,7 @@ struct mosquitto *context__init(mosq_sock_t sock) context->in_packet.payload = NULL; packet__cleanup(&context->in_packet); context->out_packet = NULL; + context->out_packet_len = 0; context->current_out_packet = NULL; context->address = NULL; @@ -155,6 +156,7 @@ void context__cleanup(struct mosquitto *context, bool force_free) context->out_packet = context->out_packet->next; mosquitto__free(packet); } + context->out_packet_len = 0; #if defined(WITH_BROKER) && defined(__GLIBC__) && defined(WITH_ADNS) if(context->adns){ gai_cancel(context->adns); diff --git a/src/database.c b/src/database.c index 3ac253b2..af24755f 100644 --- a/src/database.c +++ b/src/database.c @@ -27,10 +27,6 @@ Contributors: #include "time_mosq.h" #include "util_mosq.h" -static unsigned long max_inflight_bytes = 0; -static int max_queued = 100; -static unsigned long max_queued_bytes = 0; - /** * Is this context ready to take more in flight messages right now? * @param context the client context of interest @@ -42,17 +38,17 @@ bool db__ready_for_flight(struct mosquitto_msg_data *msgs, int qos) bool valid_bytes; bool valid_count; - if(qos == 0 || (msgs->inflight_maximum == 0 && max_inflight_bytes == 0)){ + if(qos == 0 || (msgs->inflight_maximum == 0 && db.config->max_inflight_bytes == 0)){ return true; } - valid_bytes = msgs->msg_bytes12 < max_inflight_bytes; + valid_bytes = msgs->msg_bytes12 < db.config->max_inflight_bytes; valid_count = msgs->inflight_quota > 0; if(msgs->inflight_maximum == 0){ return valid_bytes; } - if(max_inflight_bytes == 0){ + if(db.config->max_inflight_bytes == 0){ return valid_count; } @@ -73,11 +69,11 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms int source_count; int adjust_count; unsigned long source_bytes; - unsigned long adjust_bytes = max_inflight_bytes; + unsigned long adjust_bytes = db.config->max_inflight_bytes; bool valid_bytes; bool valid_count; - if(max_queued == 0 && max_queued_bytes == 0){ + if(db.config->max_queued_messages == 0 && db.config->max_queued_bytes == 0){ return true; } @@ -96,13 +92,13 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms adjust_count = 0; } - valid_bytes = source_bytes - adjust_bytes < max_queued_bytes; - valid_count = source_count - adjust_count < max_queued; + valid_bytes = source_bytes - adjust_bytes < db.config->max_queued_bytes; + valid_count = source_count - adjust_count < db.config->max_queued_messages; - if(max_queued_bytes == 0){ + if(db.config->max_queued_bytes == 0){ return valid_count; } - if(max_queued == 0){ + if(db.config->max_queued_messages == 0){ return valid_bytes; } @@ -1204,12 +1200,3 @@ int db__message_write_queued_out(struct mosquitto *context) } return MOSQ_ERR_SUCCESS; } - - -void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes) -{ - max_inflight_bytes = inflight_bytes; - max_queued = queued; - max_queued_bytes = queued_bytes; -} - diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 44d6e66b..4d02d94b 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -347,10 +347,13 @@ struct mosquitto__config { char *log_timestamp_format; char *log_file; FILE *log_fptr; - uint16_t max_inflight_messages; - uint16_t max_keepalive; + size_t max_inflight_bytes; + size_t max_queued_bytes; + int max_queued_messages; uint32_t max_packet_size; uint32_t message_size_limit; + uint16_t max_inflight_messages; + uint16_t max_keepalive; bool persistence; char *persistence_location; char *persistence_file; @@ -708,7 +711,6 @@ int db__close(void); int persist__backup(bool shutdown); int persist__restore(void); #endif -void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes); /* Return the number of in-flight messages in count. */ int db__message_count(int *count); int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos); diff --git a/src/websockets.c b/src/websockets.c index f39214c4..18712833 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -278,6 +278,7 @@ static int callback_mqtt(struct libwebsocket_context *context, if(mosq->out_packet && !mosq->current_out_packet){ mosq->current_out_packet = mosq->out_packet; mosq->out_packet = mosq->out_packet->next; + mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; } @@ -338,6 +339,7 @@ static int callback_mqtt(struct libwebsocket_context *context, mosq->current_out_packet = mosq->out_packet; if(mosq->out_packet){ mosq->out_packet = mosq->out_packet->next; + mosq->out_packet_len--; if(!mosq->out_packet){ mosq->out_packet_last = NULL; }