[474935] Increment inflight message count correctly.

Don't duplicate the increment when queueing.

Thanks to Joe McIlvain.

Bug: https://bugs.eclipse.org/bugs/show_bug.cgi?id=474935
pull/211/merge
Roger A. Light 10 years ago
parent e49e398eb3
commit 1254fe93e0

@ -11,6 +11,7 @@ Client library:
- Handle fragmented TLS packets without a delay. Closes #470660. - Handle fragmented TLS packets without a delay. Closes #470660.
- Fix incorrect loop timeout being chosen when using threaded interface and - Fix incorrect loop timeout being chosen when using threaded interface and
keepalive = 0. Closes #471334. keepalive = 0. Closes #471334.
- Increment inflight messages count correctly. Closes #474935.
Clients: Clients:
- Report error string on connection failure rather than error code. - Report error string on connection failure rather than error code.

@ -40,6 +40,7 @@ Gianfranco Costamagna
Hiram van Paassen Hiram van Paassen
Jan-Piet Mens Jan-Piet Mens
Joan Zapata Joan Zapata
Joe McIlvain
Karl Palsson Karl Palsson
Larry Lendo Larry Lendo
Martin Assarsson Martin Assarsson

@ -106,8 +106,18 @@ void mosquitto_message_free(struct mosquitto_message **message)
_mosquitto_free(msg); _mosquitto_free(msg);
} }
void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
/*
* Function: _mosquitto_message_queue
*
* Returns:
* 0 - to indicate an outgoing message can be started
* 1 - to indicate that the outgoing message queue is full (inflight limit has been reached)
*/
int _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir)
{ {
int rc = 0;
/* mosq->*_message_mutex should be locked before entering this function */ /* mosq->*_message_mutex should be locked before entering this function */
assert(mosq); assert(mosq);
assert(message); assert(message);
@ -121,8 +131,12 @@ void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_a
mosq->out_messages = message; mosq->out_messages = message;
} }
mosq->out_messages_last = message; mosq->out_messages_last = message;
if(message->msg.qos > 0 && (mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages)){ if(message->msg.qos > 0){
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){
mosq->inflight_messages++; mosq->inflight_messages++;
}else{
rc = 1;
}
} }
}else{ }else{
mosq->in_queue_len++; mosq->in_queue_len++;
@ -134,6 +148,7 @@ void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_a
} }
mosq->in_messages_last = message; mosq->in_messages_last = message;
} }
return rc;
} }
void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq) void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq)

@ -22,7 +22,7 @@ Contributors:
void _mosquitto_message_cleanup_all(struct mosquitto *mosq); void _mosquitto_message_cleanup_all(struct mosquitto *mosq);
void _mosquitto_message_cleanup(struct mosquitto_message_all **message); void _mosquitto_message_cleanup(struct mosquitto_message_all **message);
int _mosquitto_message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir); int _mosquitto_message_delete(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir);
void _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir); int _mosquitto_message_queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir);
void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq); void _mosquitto_messages_reconnect_reset(struct mosquitto *mosq);
int _mosquitto_message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message); int _mosquitto_message_remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_direction dir, struct mosquitto_message_all **message);
void _mosquitto_message_retry_check(struct mosquitto *mosq); void _mosquitto_message_retry_check(struct mosquitto *mosq);

@ -549,6 +549,7 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p
{ {
struct mosquitto_message_all *message; struct mosquitto_message_all *message;
uint16_t local_mid; uint16_t local_mid;
int queue_status;
if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL; if(!mosq || !topic || qos<0 || qos>2) return MOSQ_ERR_INVAL;
if(strlen(topic) == 0) return MOSQ_ERR_INVAL; if(strlen(topic) == 0) return MOSQ_ERR_INVAL;
@ -594,8 +595,8 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p
message->dup = false; message->dup = false;
pthread_mutex_lock(&mosq->out_message_mutex); pthread_mutex_lock(&mosq->out_message_mutex);
_mosquitto_message_queue(mosq, message, mosq_md_out); queue_status = _mosquitto_message_queue(mosq, message, mosq_md_out);
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ if(queue_status == 0){
if(qos == 1){ if(qos == 1){
message->state = mosq_ms_wait_for_puback; message->state = mosq_ms_wait_for_puback;
}else if(qos == 2){ }else if(qos == 2){

Loading…
Cancel
Save