From 641158aed9f164ec1f5caab6c44b9f9c47b51fd3 Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Wed, 29 Jun 2016 14:04:07 +0000 Subject: [PATCH 1/8] config: garbage line number if file not found If the file is not found, showing 0 rather than a garbage number is mildly better. Signed-off-by: Karl Palsson --- src/conf.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/conf.c b/src/conf.c index 38c3a5fd..2b3f2ecd 100644 --- a/src/conf.c +++ b/src/conf.c @@ -475,7 +475,7 @@ int config__read(struct mosquitto__config *config, bool reload) { int rc = MOSQ_ERR_SUCCESS; struct config_recurse cr; - int lineno; + int lineno = 0; int len; #ifdef WITH_BRIDGE int i; From 9d00eab64ca6b2ed8491a9b601ad0c19a063afa7 Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Thu, 23 Jun 2016 17:33:19 +0000 Subject: [PATCH 2/8] tests: support generating longer publish packets Remaining length for publish packets wasn't being handled completely. Signed-off-by: Karl Palsson --- test/mosq_test.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/mosq_test.py b/test/mosq_test.py index 1887e7bb..c985a494 100644 --- a/test/mosq_test.py +++ b/test/mosq_test.py @@ -306,7 +306,7 @@ def gen_connack(resv=0, rc=0): def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0): rl = 2+len(topic) - pack_format = "!BBH"+str(len(topic))+"s" + pack_format = "H"+str(len(topic))+"s" if qos > 0: rl = rl + 2 pack_format = pack_format + "H" @@ -317,6 +317,7 @@ def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0): payload = "" pack_format = pack_format + "0s" + rlpacked = pack_remaining_length(rl) cmd = 48 | (qos<<1) if retain: cmd = cmd + 1 @@ -324,9 +325,9 @@ def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0): cmd = cmd + 8 if qos > 0: - return struct.pack(pack_format, cmd, rl, len(topic), topic, mid, payload) + return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, payload) else: - return struct.pack(pack_format, cmd, rl, len(topic), topic, payload) + return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, payload) def gen_puback(mid): return struct.pack('!BBH', 64, 2, mid) From 30c96f4a1cde80d333c137f82437a8208c759465 Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Tue, 21 Jun 2016 13:42:59 +0000 Subject: [PATCH 3/8] broker: track stored messages in bytes as well as count Instead of simply tracking the count of stored messages, keep track of the total byte size of stored messages. While only informational at this point, it provides the basis for byte based limits in the future. Signed-off-by: Karl Palsson --- ChangeLog.txt | 2 ++ man/mosquitto.8.xml | 25 +++++++++++++++++-------- src/database.c | 2 ++ src/mosquitto_broker_internal.h | 1 + src/sys_tree.c | 8 ++++++++ 5 files changed, 30 insertions(+), 8 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index deac8e9e..467fefe1 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -29,6 +29,8 @@ Broker: - mosquitto_db_dump tool can now output some stats on clients. - perform utf-8 validation on incoming will, subscription and unsubscription topics. +- new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored) +- new $SYS/broker/store/messages/bytes Client library: - Outgoing messages with QoS>1 are no longer retried after a timeout period. diff --git a/man/mosquitto.8.xml b/man/mosquitto.8.xml index 4b6fd651..7ff6f4f4 100644 --- a/man/mosquitto.8.xml +++ b/man/mosquitto.8.xml @@ -291,14 +291,6 @@ The total number of messages of any type sent since the broker started. - - - - The number of messages currently held in the message - store. This includes retained messages and messages - queued for durable clients. - - @@ -328,6 +320,23 @@ The total number of retained messages active on the broker. + + + (deprecated) + + The number of messages currently held in the message + store. This includes retained messages and messages + queued for durable clients. + + + + + + The number of bytes currently held by message payloads + in the message store. This includes retained messages + and messages queued for durable clients. + + diff --git a/src/database.c b/src/database.c index 6e71df26..afd3b731 100644 --- a/src/database.c +++ b/src/database.c @@ -125,6 +125,7 @@ void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *s } } db->msg_store_count--; + db->msg_store_bytes -= store->payloadlen; mosquitto__free(store->source_id); if(store->dest_ids){ @@ -556,6 +557,7 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour temp->dest_ids = NULL; temp->dest_id_count = 0; db->msg_store_count++; + db->msg_store_bytes += payloadlen; (*stored) = temp; if(!store_id){ diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 22f88155..d6a840b1 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -358,6 +358,7 @@ struct mosquitto_db{ #ifdef WITH_BRIDGE int bridge_count; #endif + unsigned long msg_store_bytes; int msg_store_count; struct mosquitto__config *config; int persistence_changes; diff --git a/src/sys_tree.c b/src/sys_tree.c index f8a7355f..8360c2ef 100644 --- a/src/sys_tree.c +++ b/src/sys_tree.c @@ -149,6 +149,7 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) char buf[BUFLEN]; static int msg_store_count = -1; + static unsigned long msg_store_bytes = -1; static unsigned long msgs_received = -1; static unsigned long msgs_sent = -1; static unsigned long publish_dropped = -1; @@ -271,6 +272,13 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) msg_store_count = db->msg_store_count; snprintf(buf, BUFLEN, "%d", msg_store_count); db__messages_easy_queue(db, NULL, "$SYS/broker/messages/stored", SYS_TREE_QOS, strlen(buf), buf, 1); + db__messages_easy_queue(db, NULL, "$SYS/broker/store/messages/count", SYS_TREE_QOS, strlen(buf), buf, 1); + } + + if (db->msg_store_bytes != msg_store_bytes){ + msg_store_bytes = db->msg_store_bytes; + snprintf(buf, BUFLEN, "%lu", msg_store_bytes); + db__messages_easy_queue(db, NULL, "$SYS/broker/store/messages/bytes", SYS_TREE_QOS, strlen(buf), buf, 1); } if(db->subscription_count != subscription_count){ From 8a48fd13f16f5fd710b9542ff301102e7f876802 Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Wed, 29 Jun 2016 14:06:15 +0000 Subject: [PATCH 4/8] broker: publish initial load averages This publishes initial 0 figures for all load averages, as is done for all the existing counter values in the SYS tree. This makes the behaviour of certain variables use for diagnostics (stored count for instance) more predictable, instead of changing due to the creation of load topics as soon as the load became non-zero. Signed-off-by: Karl Palsson --- src/sys_tree.c | 71 +++++++++++++++++++++++++++++--------------------- 1 file changed, 41 insertions(+), 30 deletions(-) diff --git a/src/sys_tree.c b/src/sys_tree.c index 8360c2ef..7f523297 100644 --- a/src/sys_tree.c +++ b/src/sys_tree.c @@ -123,14 +123,20 @@ static void sys_tree__update_memory(struct mosquitto_db *db, char *buf) } #endif -static void calc_load(struct mosquitto_db *db, char *buf, const char *topic, double exponent, double interval, double *current) +static void calc_load(struct mosquitto_db *db, char *buf, const char *topic, bool initial, double exponent, double interval, double *current) { double new_value; - new_value = interval + exponent*((*current) - interval); - if(fabs(new_value - (*current)) >= 0.01){ + if (initial) { + new_value = *current; snprintf(buf, BUFLEN, "%.2f", new_value); db__messages_easy_queue(db, NULL, topic, SYS_TREE_QOS, strlen(buf), buf, 1); + } else { + new_value = interval + exponent*((*current) - interval); + if(fabs(new_value - (*current)) >= 0.01){ + snprintf(buf, BUFLEN, "%.2f", new_value); + db__messages_easy_queue(db, NULL, topic, SYS_TREE_QOS, strlen(buf), buf, 1); + } } (*current) = new_value; } @@ -210,6 +216,11 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) db__messages_easy_queue(db, NULL, "$SYS/broker/uptime", SYS_TREE_QOS, strlen(buf), buf, 1); sys_tree__update_clients(db, buf); + bool initial_publish = false; + if(last_update == 0){ + initial_publish = true; + last_update = 1; + } if(last_update > 0){ i_mult = 60.0/(double)(now-last_update); @@ -231,41 +242,41 @@ void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time) /* 1 minute load */ exponent = exp(-1.0*(now-last_update)/60.0); - calc_load(db, buf, "$SYS/broker/load/messages/received/1min", exponent, msgs_received_interval, &msgs_received_load1); - calc_load(db, buf, "$SYS/broker/load/messages/sent/1min", exponent, msgs_sent_interval, &msgs_sent_load1); - calc_load(db, buf, "$SYS/broker/load/publish/dropped/1min", exponent, publish_dropped_interval, &publish_dropped_load1); - calc_load(db, buf, "$SYS/broker/load/publish/received/1min", exponent, publish_received_interval, &publish_received_load1); - calc_load(db, buf, "$SYS/broker/load/publish/sent/1min", exponent, publish_sent_interval, &publish_sent_load1); - calc_load(db, buf, "$SYS/broker/load/bytes/received/1min", exponent, bytes_received_interval, &bytes_received_load1); - calc_load(db, buf, "$SYS/broker/load/bytes/sent/1min", exponent, bytes_sent_interval, &bytes_sent_load1); - calc_load(db, buf, "$SYS/broker/load/sockets/1min", exponent, socket_interval, &socket_load1); - calc_load(db, buf, "$SYS/broker/load/connections/1min", exponent, connection_interval, &connection_load1); + calc_load(db, buf, "$SYS/broker/load/messages/received/1min", initial_publish, exponent, msgs_received_interval, &msgs_received_load1); + calc_load(db, buf, "$SYS/broker/load/messages/sent/1min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load1); + calc_load(db, buf, "$SYS/broker/load/publish/dropped/1min", initial_publish, exponent, publish_dropped_interval, &publish_dropped_load1); + calc_load(db, buf, "$SYS/broker/load/publish/received/1min", initial_publish, exponent, publish_received_interval, &publish_received_load1); + calc_load(db, buf, "$SYS/broker/load/publish/sent/1min", initial_publish, exponent, publish_sent_interval, &publish_sent_load1); + calc_load(db, buf, "$SYS/broker/load/bytes/received/1min", initial_publish, exponent, bytes_received_interval, &bytes_received_load1); + calc_load(db, buf, "$SYS/broker/load/bytes/sent/1min", initial_publish, exponent, bytes_sent_interval, &bytes_sent_load1); + calc_load(db, buf, "$SYS/broker/load/sockets/1min", initial_publish, exponent, socket_interval, &socket_load1); + calc_load(db, buf, "$SYS/broker/load/connections/1min", initial_publish, exponent, connection_interval, &connection_load1); /* 5 minute load */ exponent = exp(-1.0*(now-last_update)/300.0); - calc_load(db, buf, "$SYS/broker/load/messages/received/5min", exponent, msgs_received_interval, &msgs_received_load5); - calc_load(db, buf, "$SYS/broker/load/messages/sent/5min", exponent, msgs_sent_interval, &msgs_sent_load5); - calc_load(db, buf, "$SYS/broker/load/publish/dropped/5min", exponent, publish_dropped_interval, &publish_dropped_load5); - calc_load(db, buf, "$SYS/broker/load/publish/received/5min", exponent, publish_received_interval, &publish_received_load5); - calc_load(db, buf, "$SYS/broker/load/publish/sent/5min", exponent, publish_sent_interval, &publish_sent_load5); - calc_load(db, buf, "$SYS/broker/load/bytes/received/5min", exponent, bytes_received_interval, &bytes_received_load5); - calc_load(db, buf, "$SYS/broker/load/bytes/sent/5min", exponent, bytes_sent_interval, &bytes_sent_load5); - calc_load(db, buf, "$SYS/broker/load/sockets/5min", exponent, socket_interval, &socket_load5); - calc_load(db, buf, "$SYS/broker/load/connections/5min", exponent, connection_interval, &connection_load5); + calc_load(db, buf, "$SYS/broker/load/messages/received/5min", initial_publish, exponent, msgs_received_interval, &msgs_received_load5); + calc_load(db, buf, "$SYS/broker/load/messages/sent/5min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load5); + calc_load(db, buf, "$SYS/broker/load/publish/dropped/5min", initial_publish, exponent, publish_dropped_interval, &publish_dropped_load5); + calc_load(db, buf, "$SYS/broker/load/publish/received/5min", initial_publish, exponent, publish_received_interval, &publish_received_load5); + calc_load(db, buf, "$SYS/broker/load/publish/sent/5min", initial_publish, exponent, publish_sent_interval, &publish_sent_load5); + calc_load(db, buf, "$SYS/broker/load/bytes/received/5min", initial_publish, exponent, bytes_received_interval, &bytes_received_load5); + calc_load(db, buf, "$SYS/broker/load/bytes/sent/5min", initial_publish, exponent, bytes_sent_interval, &bytes_sent_load5); + calc_load(db, buf, "$SYS/broker/load/sockets/5min", initial_publish, exponent, socket_interval, &socket_load5); + calc_load(db, buf, "$SYS/broker/load/connections/5min", initial_publish, exponent, connection_interval, &connection_load5); /* 15 minute load */ exponent = exp(-1.0*(now-last_update)/900.0); - calc_load(db, buf, "$SYS/broker/load/messages/received/15min", exponent, msgs_received_interval, &msgs_received_load15); - calc_load(db, buf, "$SYS/broker/load/messages/sent/15min", exponent, msgs_sent_interval, &msgs_sent_load15); - calc_load(db, buf, "$SYS/broker/load/publish/dropped/15min", exponent, publish_dropped_interval, &publish_dropped_load15); - calc_load(db, buf, "$SYS/broker/load/publish/received/15min", exponent, publish_received_interval, &publish_received_load15); - calc_load(db, buf, "$SYS/broker/load/publish/sent/15min", exponent, publish_sent_interval, &publish_sent_load15); - calc_load(db, buf, "$SYS/broker/load/bytes/received/15min", exponent, bytes_received_interval, &bytes_received_load15); - calc_load(db, buf, "$SYS/broker/load/bytes/sent/15min", exponent, bytes_sent_interval, &bytes_sent_load15); - calc_load(db, buf, "$SYS/broker/load/sockets/15min", exponent, socket_interval, &socket_load15); - calc_load(db, buf, "$SYS/broker/load/connections/15min", exponent, connection_interval, &connection_load15); + calc_load(db, buf, "$SYS/broker/load/messages/received/15min", initial_publish, exponent, msgs_received_interval, &msgs_received_load15); + calc_load(db, buf, "$SYS/broker/load/messages/sent/15min", initial_publish, exponent, msgs_sent_interval, &msgs_sent_load15); + calc_load(db, buf, "$SYS/broker/load/publish/dropped/15min", initial_publish, exponent, publish_dropped_interval, &publish_dropped_load15); + calc_load(db, buf, "$SYS/broker/load/publish/received/15min", initial_publish, exponent, publish_received_interval, &publish_received_load15); + calc_load(db, buf, "$SYS/broker/load/publish/sent/15min", initial_publish, exponent, publish_sent_interval, &publish_sent_load15); + calc_load(db, buf, "$SYS/broker/load/bytes/received/15min", initial_publish, exponent, bytes_received_interval, &bytes_received_load15); + calc_load(db, buf, "$SYS/broker/load/bytes/sent/15min", initial_publish, exponent, bytes_sent_interval, &bytes_sent_load15); + calc_load(db, buf, "$SYS/broker/load/sockets/15min", initial_publish, exponent, socket_interval, &socket_load15); + calc_load(db, buf, "$SYS/broker/load/connections/15min", initial_publish, exponent, connection_interval, &connection_load15); } if(db->msg_store_count != msg_store_count){ From 8268e6da087c4b9ab1ae4bcb69a853365a44e94f Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Mon, 20 Jun 2016 17:27:04 +0000 Subject: [PATCH 5/8] database: drop unnecessary local variable No need to maintain count and context->msg_count separately. Signed-off-by: Karl Palsson --- src/database.c | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/database.c b/src/database.c index afd3b731..d6d0626a 100644 --- a/src/database.c +++ b/src/database.c @@ -604,7 +604,6 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte { struct mosquitto_client_msg *msg; struct mosquitto_client_msg *prev = NULL; - int count; msg = context->inflight_msgs; context->msg_count = 0; @@ -653,17 +652,15 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte * will be sent out of order. */ if(context->queued_msgs){ - count = context->msg_count; msg = context->queued_msgs; while(msg){ context->last_queued_msg = msg; - count++; context->msg_count++; if(msg->qos > 0){ context->msg_count12++; } - if (max_inflight == 0 || count <= max_inflight){ + if (max_inflight == 0 || context->msg_count <= max_inflight){ switch(msg->qos){ case 0: msg->state = mosq_ms_publish_qos0; From 642e141c23e4e24b65616dfb8c310a264976296f Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Tue, 21 Jun 2016 11:00:44 +0000 Subject: [PATCH 6/8] conf: max_queued_messages: clarify per client limit Signed-off-by: Karl Palsson --- man/mosquitto.conf.5.xml | 6 +++--- mosquitto.conf | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index 8d8bad72..c2caf11c 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -374,9 +374,9 @@ count - The maximum number of QoS 1 or 2 messages to hold in - the queue above those messages that are currently in - flight. Defaults to 100. Set to 0 for no maximum (not + The maximum number of QoS 1 or 2 messages to hold in the + queue (per client) above those messages that are currently + in flight. Defaults to 100. Set to 0 for no maximum (not recommended). See also the option. Reloaded on reload signal. diff --git a/mosquitto.conf b/mosquitto.conf index d06a7ccb..bbe24750 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -46,7 +46,7 @@ # and 2 messages. #max_inflight_messages 20 -# The maximum number of QoS 1 and 2 messages to hold in a queue +# The maximum number of QoS 1 and 2 messages to hold in a queue per client # above those that are currently in-flight. Defaults to 100. Set # to 0 for no maximum (not recommended). # See also queue_qos0_messages. From c6aac741c2c294b5e2b469ee8c05c1aad5c80c59 Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Tue, 21 Jun 2016 14:47:41 +0000 Subject: [PATCH 7/8] broker: support byte based queueing Limiting queued message depth purely based on message count is hard to control for memory constrained devices. The size of messages can vary wildly, from a few bytes, to a few kilobytes. Support a new max_queued_bytes option, and drop packets when the first limit is reached. Option defaults to 0 (disabled) by default. Support also a max_inflight_bytes variable, with similar behaviour. Fixes (partof) https://github.com/eclipse/mosquitto/issues/100 This pulls up some helper routines for calculating whether to allow inflight or queuing, resolving some inconsistences in connection resumption. Signed-off-by: Karl Palsson --- ChangeLog.txt | 2 + lib/mosquitto_internal.h | 2 + man/mosquitto.conf.5.xml | 27 ++- mosquitto.conf | 15 +- src/conf.c | 20 ++- src/context.c | 2 + src/database.c | 97 +++++++++-- src/mosquitto_broker_internal.h | 2 +- test/broker/03-publish-qos1-queued-bytes.conf | 4 + test/broker/03-publish-qos1-queued-bytes.py | 161 ++++++++++++++++++ test/broker/Makefile | 1 + travis-install.sh | 2 + 12 files changed, 320 insertions(+), 15 deletions(-) create mode 100644 test/broker/03-publish-qos1-queued-bytes.conf create mode 100755 test/broker/03-publish-qos1-queued-bytes.py diff --git a/ChangeLog.txt b/ChangeLog.txt index 467fefe1..8b6b483f 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -31,6 +31,8 @@ Broker: topics. - new $SYS/broker/store/messages/count (deprecates $SYS/broker/messages/stored) - new $SYS/broker/store/messages/bytes +- max_queued_bytes feature to limit queues by real size rather than + than just message count. Closes Eclipse #452919 or Github #100 Client library: - Outgoing messages with QoS>1 are no longer retried after a timeout period. diff --git a/lib/mosquitto_internal.h b/lib/mosquitto_internal.h index f6cb3827..04e7cc47 100644 --- a/lib/mosquitto_internal.h +++ b/lib/mosquitto_internal.h @@ -205,6 +205,8 @@ struct mosquitto { struct mosquitto_client_msg *last_inflight_msg; struct mosquitto_client_msg *queued_msgs; struct mosquitto_client_msg *last_queued_msg; + unsigned long msg_bytes; + unsigned long msg_bytes12; int msg_count; int msg_count12; struct mosquitto__acl_user *acl_list; diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml index c2caf11c..2a4f6189 100644 --- a/man/mosquitto.conf.5.xml +++ b/man/mosquitto.conf.5.xml @@ -358,6 +358,16 @@ Reloaded on reload signal. + + count + + QoS 1 and 2 messages will be allowed in flight until this byte + limit is reached. Defaults to 0. (No limit) + See also the option. + + Reloaded on reload signal. + + count @@ -371,6 +381,19 @@ Reloaded on reload signal. + + count + + QoS 1 and 2 messages above those currently in-flight will be + queued (per client) until this limit is exceeded. + Defaults to 0. (No maximum) See also the + option. + If both max_queued_messages and max_queued_bytes are specified, + packets will be queued until the first limit is reached. + + Reloaded on reload signal. + + count @@ -378,7 +401,9 @@ queue (per client) above those messages that are currently in flight. Defaults to 100. Set to 0 for no maximum (not recommended). See also the - option. + and + options. + Reloaded on reload signal. diff --git a/mosquitto.conf b/mosquitto.conf index bbe24750..085a6a16 100644 --- a/mosquitto.conf +++ b/mosquitto.conf @@ -46,15 +46,28 @@ # and 2 messages. #max_inflight_messages 20 +# QoS 1 and 2 messages will be allowed inflight per client until this limit +# is exceeded. Defaults to 0. (No maximum) +# See also max_inflight_messages +#max_inflight_bytes 0 + # The maximum number of QoS 1 and 2 messages to hold in a queue per client # above those that are currently in-flight. Defaults to 100. Set # to 0 for no maximum (not recommended). # See also queue_qos0_messages. +# See also max_queued_bytes. #max_queued_messages 100 +# QoS 1 and 2 messages above those currently in-flight will be queued per +# client until this limit is exceeded. Defaults to 0. (No maximum) +# See also max_queued_messages. +# If both max_queued_messages and max_queued_bytes are specified, packets will +# be queued until the first limit is reached. +#max_queued_bytes 0 + # Set to true to queue messages with QoS 0 when a persistent client is # disconnected. These messages are included in the limit imposed by -# max_queued_messages. +# max_queued_messages and max_queued_bytes # Defaults to false. # This is a non-standard option for the MQTT v3.1 spec but is allowed in # v3.1.1. diff --git a/src/conf.c b/src/conf.c index 2b3f2ecd..fd6e015c 100644 --- a/src/conf.c +++ b/src/conf.c @@ -50,7 +50,9 @@ struct config_recurse { int log_dest_set; int log_type; int log_type_set; + unsigned long max_inflight_bytes; int max_inflight_messages; + unsigned long max_queued_bytes; int max_queued_messages; }; @@ -485,7 +487,9 @@ int config__read(struct mosquitto__config *config, bool reload) cr.log_dest_set = 0; cr.log_type = MOSQ_LOG_NONE; cr.log_type_set = 0; + cr.max_inflight_bytes = 0; cr.max_inflight_messages = 20; + cr.max_queued_bytes = 0; cr.max_queued_messages = 100; if(!config->config_file) return 0; @@ -525,7 +529,7 @@ int config__read(struct mosquitto__config *config, bool reload) config->user = "mosquitto"; } - db__limits_set(cr.max_inflight_messages, cr.max_queued_messages); + db__limits_set(cr.max_inflight_messages, cr.max_inflight_bytes, cr.max_queued_messages, cr.max_queued_bytes); #ifdef WITH_BRIDGE for(i=0; ibridge_count; i++){ @@ -1292,6 +1296,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const }else{ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_connections value in configuration."); } + }else if(!strcmp(token, "max_inflight_bytes")){ + token = strtok_r(NULL, " ", &saveptr); + if(token){ + cr->max_inflight_bytes = atol(token); + }else{ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_bytes value in configuration."); + } }else if(!strcmp(token, "max_inflight_messages")){ token = strtok_r(NULL, " ", &saveptr); if(token){ @@ -1300,6 +1311,13 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, const }else{ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_inflight_messages value in configuration."); } + }else if(!strcmp(token, "max_queued_bytes")){ + token = strtok_r(NULL, " ", &saveptr); + if(token){ + cr->max_queued_bytes = atol(token); /* 63 bits is ok right? */ + }else{ + log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty max_queued_bytes value in configuration."); + } }else if(!strcmp(token, "max_queued_messages")){ token = strtok_r(NULL, " ", &saveptr); if(token){ diff --git a/src/context.c b/src/context.c index ab101606..a09a788c 100644 --- a/src/context.c +++ b/src/context.c @@ -74,6 +74,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) context->last_inflight_msg = NULL; context->queued_msgs = NULL; context->last_queued_msg = NULL; + context->msg_bytes = 0; + context->msg_bytes12 = 0; context->msg_count = 0; context->msg_count12 = 0; #ifdef WITH_TLS diff --git a/src/database.c b/src/database.c index d6d0626a..d7f9792e 100644 --- a/src/database.c +++ b/src/database.c @@ -26,7 +26,68 @@ Contributors: #include "time_mosq.h" static int max_inflight = 20; +static unsigned long max_inflight_bytes = 0; static int max_queued = 100; +static unsigned long max_queued_bytes = 0; + +/** + * Is this context ready to take more in flight messages right now? + * @param context the client context of interest + * @param qos qos for the packet of interest + * @return true if more in flight are allowed. + */ +static bool db__ready_for_flight(struct mosquitto *context, int qos) +{ + if(qos == 0 || (max_inflight == 0 && max_inflight_bytes == 0)){ + return true; + } + + bool valid_bytes = context->msg_bytes12 < max_inflight_bytes; + bool valid_count = context->msg_count12 < max_inflight; + + if(max_inflight == 0){ + return valid_bytes; + } + if(max_inflight_bytes == 0){ + return valid_count; + } + + return valid_bytes && valid_count; +} + + +/** + * For a given client context, are more messages allowed to be queued? + * @param context client of interest + * @return true if queuing is allowed, false if should be dropped + */ +static bool db__ready_for_queue(struct mosquitto *context) +{ + if(max_queued == 0 && max_queued_bytes == 0){ + return true; + } + + unsigned long adjust_bytes = max_inflight_bytes; + int adjust_count = max_inflight; + /* nothing in flight for offline clients */ + if(context->sock == INVALID_SOCKET){ + adjust_bytes = 0; + adjust_count = 0; + } + + bool valid_bytes = context->msg_bytes12 - adjust_bytes < max_queued_bytes; + bool valid_count = context->msg_count12 - adjust_count < max_queued; + + if(max_queued_bytes == 0){ + return valid_count; + } + if(max_queued == 0){ + return valid_bytes; + } + + return valid_bytes && valid_count; +} + int db__open(struct mosquitto__config *config, struct mosquitto_db *db) { @@ -169,6 +230,12 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex } if((*msg)->store){ + context->msg_count--; + context->msg_bytes -= (*msg)->store->payloadlen; + if((*msg)->qos > 0){ + context->msg_count12--; + context->msg_bytes12 -= (*msg)->store->payloadlen; + } db__msg_store_deref(db, &(*msg)->store); } if(last){ @@ -182,10 +249,6 @@ static void db__message_remove(struct mosquitto_db *db, struct mosquitto *contex context->last_inflight_msg = NULL; } } - context->msg_count--; - if((*msg)->qos > 0){ - context->msg_count12--; - } mosquitto__free(*msg); if(last){ *msg = last->next; @@ -305,7 +368,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 } if(context->sock != INVALID_SOCKET){ - if(qos == 0 || max_inflight == 0 || context->msg_count12 < max_inflight){ + if(db__ready_for_flight(context, qos)){ if(dir == mosq_md_out){ switch(qos){ case 0: @@ -325,7 +388,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 return 1; } } - }else if(max_queued == 0 || context->msg_count12-max_inflight < max_queued){ + }else if(db__ready_for_queue(context)){ state = mosq_ms_queued; rc = 2; }else{ @@ -340,7 +403,9 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 return 2; } }else{ - if(max_queued > 0 && context->msg_count12 >= max_queued){ + if (db__ready_for_queue(context, qos)){ + state = mosq_ms_queued; + }else{ G_MSGS_DROPPED_INC(); if(context->is_dropping == false){ context->is_dropping = true; @@ -349,8 +414,6 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 context->id); } return 2; - }else{ - state = mosq_ms_queued; } } assert(state != mosq_ms_invalid); @@ -389,8 +452,10 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 *last_msg = msg; } context->msg_count++; + context->msg_bytes += msg->store->payloadlen; if(qos > 0){ context->msg_count12++; + context->msg_bytes12 += msg->store->payloadlen; } if(db->config->allow_duplicate_messages == false && dir == mosq_md_out && retain == false){ @@ -474,6 +539,8 @@ int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context) } context->queued_msgs = NULL; context->last_queued_msg = NULL; + context->msg_bytes = 0; + context->msg_bytes12 = 0; context->msg_count = 0; context->msg_count12 = 0; @@ -606,14 +673,18 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte struct mosquitto_client_msg *prev = NULL; msg = context->inflight_msgs; + context->msg_bytes = 0; + context->msg_bytes12 = 0; context->msg_count = 0; context->msg_count12 = 0; while(msg){ context->last_inflight_msg = msg; context->msg_count++; + context->msg_bytes += msg->store->payloadlen; if(msg->qos > 0){ context->msg_count12++; + context->msg_bytes12 += msg->store->payloadlen; } if(msg->direction == mosq_md_out){ @@ -657,10 +728,12 @@ int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *conte context->last_queued_msg = msg; context->msg_count++; + context->msg_bytes += msg->store->payloadlen; if(msg->qos > 0){ context->msg_count12++; + context->msg_bytes12 += msg->store->payloadlen; } - if (max_inflight == 0 || context->msg_count <= max_inflight){ + if (db__ready_for_flight(context, msg->qos)) { switch(msg->qos){ case 0: msg->state = mosq_ms_publish_qos0; @@ -895,10 +968,12 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_SUCCESS; } -void db__limits_set(int inflight, int queued) +void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes) { max_inflight = inflight; + max_inflight_bytes = inflight_bytes; max_queued = queued; + max_queued_bytes = queued_bytes; } void db__vacuum(void) diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index d6a840b1..f9a0636b 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -513,7 +513,7 @@ int db__close(struct mosquitto_db *db); int persist__backup(struct mosquitto_db *db, bool shutdown); int persist__restore(struct mosquitto_db *db); #endif -void db__limits_set(int inflight, int queued); +void db__limits_set(int inflight, unsigned long inflight_bytes, int queued, unsigned long queued_bytes); /* Return the number of in-flight messages in count. */ int db__message_count(int *count); int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); diff --git a/test/broker/03-publish-qos1-queued-bytes.conf b/test/broker/03-publish-qos1-queued-bytes.conf new file mode 100644 index 00000000..6f853103 --- /dev/null +++ b/test/broker/03-publish-qos1-queued-bytes.conf @@ -0,0 +1,4 @@ +sys_interval 1 +max_queued_messages 0 +max_queued_bytes 400 +port 1888 diff --git a/test/broker/03-publish-qos1-queued-bytes.py b/test/broker/03-publish-qos1-queued-bytes.py new file mode 100755 index 00000000..cf4c58dc --- /dev/null +++ b/test/broker/03-publish-qos1-queued-bytes.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python + +# Test whether a PUBLISH to a topic with an offline subscriber results in a queued message +import Queue +import random +import string +import subprocess +import socket +import threading +import time + +import paho.mqtt.client +import paho.mqtt.publish + + +import inspect, os, sys +# From http://stackoverflow.com/questions/279237/python-import-a-module-from-a-folder +cmd_subfolder = os.path.realpath(os.path.abspath(os.path.join(os.path.split(inspect.getfile( inspect.currentframe() ))[0],".."))) +if cmd_subfolder not in sys.path: + sys.path.insert(0, cmd_subfolder) + +import mosq_test + +rc = 1 + +def registerOfflineSubscriber(): + """Just a durable client to trigger queuing""" + client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False) + client.connect("localhost", port=1888) + client.subscribe("test/publish/queueing/#", 1) + client.loop() + client.disconnect() + + +broker = mosq_test.start_broker(filename=os.path.basename(__file__)) + +class BrokerMonitor(threading.Thread): + def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None): + threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose) + self.rq, self.cq = args + self.stored = -1 + self.stored_bytes = -1 + self.dropped = -1 + + def store_count(self, client, userdata, message): + self.stored = int(message.payload) + + def store_bytes(self, client, userdata, message): + self.stored_bytes = int(message.payload) + + def publish_dropped(self, client, userdata, message): + self.dropped = int(message.payload) + + def run(self): + client = paho.mqtt.client.Client("broker-monitor") + client.connect("localhost", port=1888) + client.message_callback_add("$SYS/broker/store/messages/count", self.store_count) + client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes) + client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped) + client.subscribe("$SYS/broker/store/messages/#") + client.subscribe("$SYS/broker/publish/messages/dropped") + + while True: + expect_drops = cq.get() + self.cq.task_done() + if expect_drops == "quit": + break + first = time.time() + while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0): + client.loop(timeout=0.5) + if time.time() - 10 > first: + print("ABORT TIMEOUT") + break + + if expect_drops: + self.rq.put((self.stored, self.stored_bytes, self.dropped)) + else: + self.rq.put((self.stored, self.stored_bytes, 0)) + self.stored = -1 + self.stored_bytes = -1 + self.dropped = -1 + + client.disconnect() + +rq = Queue.Queue() +cq = Queue.Queue() +brokerMonitor = BrokerMonitor(args=(rq,cq)) + +class StoreCounts(): + def __init__(self): + self.stored = 0 + self.bstored = 0 + self.drops = 0 + self.diff_stored = 0 + self.diff_bstored = 0 + self.diff_drops = 0 + + def update(self, tup): + self.diff_stored = tup[0] - self.stored + self.stored = tup[0] + self.diff_bstored = tup[1] - self.bstored + self.bstored = tup[1] + self.diff_drops = tup[2] - self.drops + self.drops = tup[2] + + def __repr__(self): + return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops) + +try: + registerOfflineSubscriber() + time.sleep(2.5) # Wait for first proper dump of stats + brokerMonitor.start() + counts = StoreCounts() + cq.put(True) # Expect a dropped count (of 0, initial) + counts.update(rq.get()) # Initial start + print("rq.get (INITIAL) gave us: ", counts) + rq.task_done() + + # publish 10 short messages, should be no drops + print("publishing 10 short") + cq.put(False) # expect no updated drop count + msgs_short10 = [("test/publish/queueing/%d" % x, + ''.join(random.choice(string.hexdigits) for _ in range(10)), + 1, False) for x in range(1, 10 + 1)] + paho.mqtt.publish.multiple(msgs_short10, port=1888) + counts.update(rq.get()) # Initial start + print("rq.get (short) gave us: ", counts) + rq.task_done() + if counts.diff_stored != 10 or counts.diff_bstored < 100: + raise ValueError + if counts.diff_drops != 0: + raise ValueError + + # publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400 + print("publishing 10 medium") + cq.put(True) # expect a drop count + msgs_medium10 = [("test/publish/queueing/%d" % x, + ''.join(random.choice(string.hexdigits) for _ in range(40)), + 1, False) for x in range(1, 10 + 1)] + paho.mqtt.publish.multiple(msgs_medium10, port=1888) + counts.update(rq.get()) # Initial start + print("rq.get (medium) gave us: ", counts) + rq.task_done() + if counts.diff_stored != 8 or counts.diff_bstored < 320: + raise ValueError + if counts.diff_drops != 2: + raise ValueError + rc = 0 + +finally: + cq.put("quit") + brokerMonitor.join() + rq.join() + cq.join() + broker.terminate() + (stdo, stde) = broker.communicate() + if rc: + print(stde) + +exit(rc) + diff --git a/test/broker/Makefile b/test/broker/Makefile index a84950e9..3206d473 100644 --- a/test/broker/Makefile +++ b/test/broker/Makefile @@ -48,6 +48,7 @@ endif ./03-publish-c2b-disconnect-qos2.py ./03-publish-b2c-disconnect-qos2.py ./03-pattern-matching.py + ./03-publish-qos1-queued-bytes.py 04 : ./04-retain-qos0.py diff --git a/travis-install.sh b/travis-install.sh index edc8c173..ba034832 100755 --- a/travis-install.sh +++ b/travis-install.sh @@ -10,3 +10,5 @@ if [ "$TRAVIS_OS_NAME" == "osx" ]; then brew update brew install c-ares openssl libwebsockets fi + +sudo pip install paho-mqtt From 2dec0ed895dd314714137742c26b6dfb6be9e907 Mon Sep 17 00:00:00 2001 From: Karl Palsson Date: Fri, 1 Jul 2016 13:00:01 +0000 Subject: [PATCH 8/8] broker: fix queue_qos0_messages behaviour Prior, offline qos0 clients had an unlimited queue depth when queue_qos0_messages was true. Signed-off-by: Karl Palsson --- ChangeLog.txt | 1 + src/database.c | 19 +++++++++++++++---- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 8b6b483f..d0cf71f5 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -33,6 +33,7 @@ Broker: - new $SYS/broker/store/messages/bytes - max_queued_bytes feature to limit queues by real size rather than than just message count. Closes Eclipse #452919 or Github #100 +- queue_qos0_messages was not observing max_queued_** limits Client library: - Outgoing messages with QoS>1 are no longer retried after a timeout period. diff --git a/src/database.c b/src/database.c index d7f9792e..8d046c05 100644 --- a/src/database.c +++ b/src/database.c @@ -58,25 +58,36 @@ static bool db__ready_for_flight(struct mosquitto *context, int qos) /** * For a given client context, are more messages allowed to be queued? + * It is assumed that inflight checks and queue_qos0 checks have already + * been made. * @param context client of interest + * @param qos destination qos for the packet of interest * @return true if queuing is allowed, false if should be dropped */ -static bool db__ready_for_queue(struct mosquitto *context) +static bool db__ready_for_queue(struct mosquitto *context, int qos) { if(max_queued == 0 && max_queued_bytes == 0){ return true; } + unsigned long source_bytes = context->msg_bytes12; + int source_count = context->msg_count12; unsigned long adjust_bytes = max_inflight_bytes; int adjust_count = max_inflight; + /* nothing in flight for offline clients */ if(context->sock == INVALID_SOCKET){ adjust_bytes = 0; adjust_count = 0; } - bool valid_bytes = context->msg_bytes12 - adjust_bytes < max_queued_bytes; - bool valid_count = context->msg_count12 - adjust_count < max_queued; + if(qos == 0){ + source_bytes = context->msg_bytes; + source_count = context->msg_count; + } + + bool valid_bytes = source_bytes - adjust_bytes < max_queued_bytes; + bool valid_count = source_count - adjust_count < max_queued; if(max_queued_bytes == 0){ return valid_count; @@ -388,7 +399,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 return 1; } } - }else if(db__ready_for_queue(context)){ + }else if(db__ready_for_queue(context, qos)){ state = mosq_ms_queued; rc = 2; }else{