From 1254fe93e000ee0ed5abbeceecda36f79a144e38 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 17 Aug 2015 15:36:58 +0100 Subject: [PATCH] [474935] Increment inflight message count correctly. Don't duplicate the increment when queueing. Thanks to Joe McIlvain. Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=474935 --- ChangeLog.txt | 1 + THANKS.txt | 1 + lib/messages_mosq.c | 21 ++++++++++++++++++--- lib/messages_mosq.h | 2 +- lib/mosquitto.c | 5 +++-- 5 files changed, 24 insertions(+), 6 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 75afd340..24c2fe5d 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -11,6 +11,7 @@ Client library: - Handle fragmented TLS packets without a delay. Closes #470660. - Fix incorrect loop timeout being chosen when using threaded interface and keepalive = 0. Closes #471334. +- Increment inflight messages count correctly. Closes #474935. Clients: - Report error string on connection failure rather than error code. diff --git a/THANKS.txt b/THANKS.txt index e54316f0..813aafbc 100644 --- a/THANKS.txt +++ b/THANKS.txt @@ -40,6 +40,7 @@ Gianfranco Costamagna Hiram van Paassen Jan-Piet Mens Joan Zapata +Joe McIlvain Karl Palsson Larry Lendo Martin Assarsson diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index 24598bbe..478412b8 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -106,8 +106,18 @@ void mosquitto_message_free(struct mosquitto_message **message) _mosquitto_free(msg); } -void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir) + +/* + * Function: _mosquitto_message_queue + * + * Returns: + * 0 - to indicate an outgoing message can be started + * 1 - to indicate that the outgoing message queue is full (inflight limit has been reached) + */ +int _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir) { + int rc = 0; + /* mosq->*_message_mutex should be locked before entering this function */ assert(mosq); assert(message); @@ -121,8 +131,12 @@ void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_a mosq->out_messages = message; } mosq->out_messages_last = message; - if(message->msg.qos > 0 && (mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages)){ - mosq->inflight_messages++; + if(message->msg.qos > 0){ + if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ + mosq->inflight_messages++; + }else{ + rc = 1; + } } }else{ mosq->in_queue_len++; @@ -134,6 +148,7 @@ void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_a } mosq->in_messages_last = message; } + return rc; } void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq) diff --git a/lib/messages_mosq.h b/lib/messages_mosq.h index d38910fe..acae1b46 100644 --- a/lib/messages_mosq.h +++ b/lib/messages_mosq.h @@ -22,7 +22,7 @@ Contributors: void _mosquitto_message_cleanup_all(struct mosquitto *mosq); void _mosquitto_message_cleanup(struct mosquitto_message_all **message); int _mosquitto_message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir); -void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir); +int _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir); void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq); int _mosquitto_message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message); void _mosquitto_message_retry_check(struct mosquitto *mosq); diff --git a/lib/mosquitto.c b/lib/mosquitto.c index 1e104967..bd4b4895 100644 --- a/lib/mosquitto.c +++ b/lib/mosquitto.c @@ -549,6 +549,7 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p { struct mosquitto_message_all *message; uint16_t local_mid; + int queue_status; if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL; if(strlen(topic) == 0) return MOSQ_ERR_INVAL; @@ -594,8 +595,8 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p message->dup = false; pthread_mutex_lock(&mosq->out_message_mutex); - _mosquitto_message_queue(mosq, message, mosq_md_out); - if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ + queue_status = _mosquitto_message_queue(mosq, message, mosq_md_out); + if(queue_status == 0){ if(qos == 1){ message->state = mosq_ms_wait_for_puback; }else if(qos == 2){