From 16e83bfe5d55cdda6c662916756303bf77f564ee Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Sun, 30 Dec 2018 21:17:31 +0000 Subject: [PATCH] Process receive maximum (as max_inflight_messages). --- lib/mosquitto_internal.h | 2 +- src/conf.c | 18 +++++++++--------- src/context.c | 1 + src/database.c | 18 ++++++++---------- src/mosquitto_broker_internal.h | 3 ++- src/property_broker.c | 10 ++++++++++ 6 files changed, 31 insertions(+), 21 deletions(-) diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index 55a537a7..5eff5d06 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -279,11 +279,11 @@ struct mosquitto { char threaded; struct mosquitto__packet *out_packet_last; int inflight_messages; - int max_inflight_messages; # ifdef WITH_SRV ares_channel achan; # endif #endif + int max_inflight_messages; #ifdef WITH_BROKER UT_hash_handle hh_id; diff --git a/src/conf.c b/src/conf.c index b18af7b1..0ae70ee4 100644 --- a/src/conf.c +++ b/src/conf.c @@ -53,7 +53,6 @@ struct config_recurse { int log_type_set; unsigned long max_inflight_bytes; unsigned long max_queued_bytes; - int max_inflight_messages; int max_queued_messages; }; @@ -213,6 +212,7 @@ static void config__init_reload(struct mosquitto_db *db, struct mosquitto__confi #endif config->log_timestamp = true; config->max_keepalive = 65535; + config->max_inflight_messages = 20; config->persistence = false; mosquitto__free(config->persistence_location); config->persistence_location = NULL; @@ -598,7 +598,6 @@ int config__read(struct mosquitto_db *db, struct mosquitto__config *config, bool cr.log_type = MOSQ_LOG_NONE; cr.log_type_set = 0; cr.max_inflight_bytes = 0; - cr.max_inflight_messages = 20; cr.max_queued_bytes = 0; cr.max_queued_messages = 100; @@ -681,7 +680,7 @@ int config__read(struct mosquitto_db *db, struct mosquitto__config *config, bool config->user = "mosquitto"; } - db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes); + db__limits_set(cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes); #ifdef WITH_BRIDGE for(i=0; ibridge_count; i++){ @@ -1511,13 +1510,14 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration."); } }else if(!strcmp(token, "max_inflight_messages")){ - token = strtok_r(NULL, " ", &saveptr); - if(token){ - cr->max_inflight_messages = atoi(token); - if(cr->max_inflight_messages < 0) cr->max_inflight_messages = 0; - }else{ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration."); + if(conf__parse_int(&token, "max_inflight_messages", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; + if(tmp_int < 0 || tmp_int == 65535){ + tmp_int = 0; + }else if(tmp_int > 65535){ + log__printf(NULL, MOSQ_LOG_ERR, "Error: max_inflight_messages must be <= 65535."); + return MOSQ_ERR_INVAL; } + config->max_inflight_messages = tmp_int; }else if(!strcmp(token, "max_keepalive")){ if(conf__parse_int(&token, "max_keepalive", &tmp_int, saveptr)) return MOSQ_ERR_INVAL; if(tmp_int < 10 || tmp_int > 65535){ diff --git a/src/context.c b/src/context.c index c7fa9da3..a93b8e30 100644 --- a/src/context.c +++ b/src/context.c @@ -76,6 +76,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) context->last_inflight_msg = NULL; context->queued_msgs = NULL; context->last_queued_msg = NULL; + context->max_inflight_messages = db->config->max_inflight_messages; context->msg_bytes = 0; context->msg_bytes12 = 0; context->msg_count = 0; diff --git a/src/database.c b/src/database.c index 666c58ef..d93d3c08 100644 --- a/src/database.c +++ b/src/database.c @@ -25,7 +25,6 @@ Contributors: #include "sys_tree.h" #include "time_mosq.h" -static int max_inflight = 20; static unsigned long max_inflight_bytes = 0; static int max_queued = 100; static unsigned long max_queued_bytes = 0; @@ -38,14 +37,14 @@ static unsigned long max_queued_bytes = 0; */ static bool db__ready_for_flight(struct mosquitto *context, int qos) { - if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){ + if(qos == 0 || (context->max_inflight_messages == 0 && max_inflight_bytes == 0)){ return true; } bool valid_bytes = context->msg_bytes12 < max_inflight_bytes; - bool valid_count = context->msg_count12 < max_inflight; + bool valid_count = context->msg_count12 < context->max_inflight_messages; - if(max_inflight == 0){ + if(context->max_inflight_messages == 0){ return valid_bytes; } if(max_inflight_bytes == 0){ @@ -73,7 +72,7 @@ static bool db__ready_for_queue(struct mosquitto *context, int qos) unsigned long source_bytes = context->msg_bytes12; int source_count = context->msg_count12; unsigned long adjust_bytes = max_inflight_bytes; - int adjust_count = max_inflight; + int adjust_count = context->max_inflight_messages; /* nothing in flight for offline clients */ if(context->sock == INVALID_SOCKET){ @@ -307,7 +306,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1 tail = tail->next; } } - while (context->queued_msgs && (max_inflight == 0 || msg_index < max_inflight)){ + while (context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){ msg_index++; tail = context->queued_msgs; tail->timestamp = mosquitto_time(); @@ -838,7 +837,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint } } - while(context->queued_msgs && (max_inflight == 0 || msg_index < max_inflight)){ + while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){ msg_index++; tail = context->queued_msgs; tail->timestamp = mosquitto_time(); @@ -989,7 +988,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) } } - while(context->queued_msgs && (max_inflight == 0 || msg_count < max_inflight)){ + while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_count < context->max_inflight_messages)){ msg_count++; tail = context->queued_msgs; if(tail->direction == mosq_md_out){ @@ -1022,9 +1021,8 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_SUCCESS; } -void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes) +void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes) { - max_inflight = inflight; max_inflight_bytes = inflight_bytes; max_queued = queued; max_queued_bytes = queued_bytes; diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index cd5eddb8..497167c9 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -260,6 +260,7 @@ struct mosquitto__config { bool log_timestamp; char *log_file; FILE *log_fptr; + uint16_t max_inflight_messages; uint16_t max_keepalive; uint32_t message_size_limit; bool persistence; @@ -552,7 +553,7 @@ int db__close(struct mosquitto_db *db); int persist__backup(struct mosquitto_db *db, bool shutdown); int persist__restore(struct mosquitto_db *db); #endif -void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes); +void db__limits_set(unsigned long inflight_bytes, int queued, unsigned long queued_bytes); /* Return the number of in-flight messages in count. */ int db__message_count(int *count); int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); diff --git a/src/property_broker.c b/src/property_broker.c index e2f9d466..57d17c9c 100644 --- a/src/property_broker.c +++ b/src/property_broker.c @@ -35,6 +35,16 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro while(p){ if(p->identifier == MQTT_PROP_SESSION_EXPIRY_INTERVAL){ context->session_expiry_interval = p->value.i32; + }else if(p->identifier == MQTT_PROP_RECEIVE_MAXIMUM){ + if(p->value.i16 == 0){ + return MOSQ_ERR_PROTOCOL; + } + + if(p->value.i16 == 65535){ + context->max_inflight_messages = 0; + }else{ + context->max_inflight_messages = p->value.i16; + } } p = p->next; }