diff --git a/lib/actions.c b/lib/actions.c index 0470bfb5..a5bf673f 100644 --- a/lib/actions.c +++ b/lib/actions.c @@ -38,7 +38,6 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in { struct mosquitto_message_all *message; uint16_t local_mid; - int queue_status; const mosquitto_property *p; const mosquitto_property *outgoing_properties = NULL; mosquitto_property local_property; @@ -140,20 +139,10 @@ int mosquitto_publish_v5(struct mosquitto *mosq, int *mid, const char *topic, in message->dup = false; pthread_mutex_lock(&mosq->msgs_out.mutex); - queue_status = message__queue(mosq, message, mosq_md_out); - if(queue_status == 0){ - if(qos == 1){ - message->state = mosq_ms_wait_for_puback; - }else if(qos == 2){ - message->state = mosq_ms_wait_for_pubrec; - } - pthread_mutex_unlock(&mosq->msgs_out.mutex); - return send__publish(mosq, message->msg.mid, message->msg.topic, message->msg.payloadlen, message->msg.payload, message->msg.qos, message->msg.retain, message->dup, outgoing_properties, NULL, 0); - }else{ - message->state = mosq_ms_invalid; - pthread_mutex_unlock(&mosq->msgs_out.mutex); - return MOSQ_ERR_SUCCESS; - } + message->state = mosq_ms_invalid; + message__queue(mosq, message, mosq_md_out); + pthread_mutex_unlock(&mosq->msgs_out.mutex); + return MOSQ_ERR_SUCCESS; } } diff --git a/lib/handle_pubackcomp.c b/lib/handle_pubackcomp.c index 323cfb7a..67921494 100644 --- a/lib/handle_pubackcomp.c +++ b/lib/handle_pubackcomp.c @@ -54,7 +54,9 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) return MOSQ_ERR_PROTOCOL; } + pthread_mutex_lock(&mosq->msgs_out.mutex); util__increment_send_quota(mosq); + pthread_mutex_unlock(&mosq->msgs_out.mutex); rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc) return rc; @@ -106,7 +108,9 @@ int handle__pubackcomp(struct mosquitto *mosq, const char *type) pthread_mutex_unlock(&mosq->callback_mutex); mosquitto_property_free_all(&properties); } + pthread_mutex_lock(&mosq->msgs_out.mutex); message__release_to_inflight(mosq, mosq_md_out); + pthread_mutex_unlock(&mosq->msgs_out.mutex); return MOSQ_ERR_SUCCESS; #endif diff --git a/lib/handle_publish.c b/lib/handle_publish.c index b7d92ed5..8c93e3ce 100644 --- a/lib/handle_publish.c +++ b/lib/handle_publish.c @@ -74,7 +74,6 @@ int handle__publish(struct mosquitto *mosq) return MOSQ_ERR_PROTOCOL; } } - util__decrement_receive_quota(mosq); rc = packet__read_uint16(&mosq->in_packet, &mid); if(rc){ @@ -133,6 +132,7 @@ int handle__publish(struct mosquitto *mosq) mosquitto_property_free_all(&properties); return MOSQ_ERR_SUCCESS; case 1: + util__decrement_receive_quota(mosq); rc = send__puback(mosq, message->msg.mid, 0); pthread_mutex_lock(&mosq->callback_mutex); if(mosq->on_message){ @@ -150,6 +150,7 @@ int handle__publish(struct mosquitto *mosq) mosquitto_property_free_all(&properties); return rc; case 2: + util__decrement_receive_quota(mosq); rc = send__pubrec(mosq, message->msg.mid, 0); pthread_mutex_lock(&mosq->msgs_in.mutex); message->state = mosq_ms_wait_for_pubrel; diff --git a/lib/handle_pubrec.c b/lib/handle_pubrec.c index d06f2e5c..81f1b496 100644 --- a/lib/handle_pubrec.c +++ b/lib/handle_pubrec.c @@ -77,7 +77,7 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received PUBREC (Mid: %d)", mosq->id, mid); - if(reason_code < 0x80){ + if(reason_code < 0x80 || mosq->protocol != mosq_p_mqtt5){ rc = message__out_update(mosq, mid, mosq_ms_wait_for_pubcomp, 2); }else{ if(!message__delete(mosq, mid, mosq_md_out, 2)){ @@ -91,7 +91,9 @@ int handle__pubrec(struct mosquitto_db *db, struct mosquitto *mosq) pthread_mutex_unlock(&mosq->callback_mutex); } util__increment_send_quota(mosq); + pthread_mutex_lock(&mosq->msgs_out.mutex); message__release_to_inflight(mosq, mosq_md_out); + pthread_mutex_unlock(&mosq->msgs_out.mutex); return MOSQ_ERR_SUCCESS; } #endif diff --git a/lib/messages_mosq.c b/lib/messages_mosq.c index 28472e26..bf77f5e6 100644 --- a/lib/messages_mosq.c +++ b/lib/messages_mosq.c @@ -118,29 +118,20 @@ void mosquitto_message_free_contents(struct mosquitto_message *message) int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message, enum mosquitto_msg_direction dir) { - struct mosquitto_msg_data *msg_data; - int rc = 0; - /* mosq->*_message_mutex should be locked before entering this function */ assert(mosq); assert(message); assert(message->msg.qos != 0); if(dir == mosq_md_out){ - msg_data = &mosq->msgs_out; - if(mosq->msgs_out.inflight_quota == 0){ - rc = 1; - } - util__decrement_send_quota(mosq); + DL_APPEND(mosq->msgs_out.inflight, message); + mosq->msgs_out.queue_len++; }else{ - msg_data = &mosq->msgs_in; - util__decrement_receive_quota(mosq); + DL_APPEND(mosq->msgs_in.inflight, message); + mosq->msgs_in.queue_len++; } - msg_data->queue_len++; - DL_APPEND(msg_data->inflight, message); - - return rc; + return message__release_to_inflight(mosq, dir); } void message__reconnect_reset(struct mosquitto *mosq) @@ -149,6 +140,7 @@ void message__reconnect_reset(struct mosquitto *mosq) assert(mosq); pthread_mutex_lock(&mosq->msgs_in.mutex); + mosq->msgs_in.inflight_quota = mosq->msgs_in.inflight_maximum; mosq->msgs_in.queue_len = 0; DL_FOREACH_SAFE(mosq->msgs_in.inflight, message, tmp){ mosq->msgs_in.queue_len++; @@ -159,6 +151,7 @@ void message__reconnect_reset(struct mosquitto *mosq) }else{ /* Message state can be preserved here because it should match * whatever the client has got. */ + util__decrement_receive_quota(mosq); } } pthread_mutex_unlock(&mosq->msgs_in.mutex); @@ -193,11 +186,11 @@ void message__reconnect_reset(struct mosquitto *mosq) int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_direction dir) { + /* mosq->*_message_mutex should be locked before entering this function */ struct mosquitto_message_all *cur, *tmp; int rc = MOSQ_ERR_SUCCESS; if(dir == mosq_md_out){ - pthread_mutex_lock(&mosq->msgs_out.mutex); DL_FOREACH_SAFE(mosq->msgs_out.inflight, cur, tmp){ if(mosq->msgs_out.inflight_quota > 0){ if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){ @@ -208,17 +201,14 @@ int message__release_to_inflight(struct mosquitto *mosq, enum mosquitto_msg_dire } rc = send__publish(mosq, cur->msg.mid, cur->msg.topic, cur->msg.payloadlen, cur->msg.payload, cur->msg.qos, cur->msg.retain, cur->dup, NULL, NULL, 0); if(rc){ - pthread_mutex_unlock(&mosq->msgs_out.mutex); return rc; } util__decrement_send_quota(mosq); } }else{ - pthread_mutex_unlock(&mosq->msgs_out.mutex); return MOSQ_ERR_SUCCESS; } } - pthread_mutex_unlock(&mosq->msgs_out.mutex); } return rc; diff --git a/lib/send_mosq.c b/lib/send_mosq.c index e0a10241..3e60f51b 100644 --- a/lib/send_mosq.c +++ b/lib/send_mosq.c @@ -97,7 +97,7 @@ int send__pubrec(struct mosquitto *mosq, uint16_t mid, uint8_t reason_code) #else if(mosq) log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s sending PUBREC (m%d, rc%d)", mosq->id, mid, reason_code); #endif - if(reason_code >= 0x80){ + if(reason_code >= 0x80 && mosq->protocol == mosq_p_mqtt5){ util__increment_receive_quota(mosq); } /* We don't use Reason String or User Property yet. */ diff --git a/src/database.c b/src/database.c index f2b682ab..fcb3302c 100644 --- a/src/database.c +++ b/src/database.c @@ -286,9 +286,6 @@ void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_d msg = msg_data->queued; DL_DELETE(msg_data->queued, msg); DL_APPEND(msg_data->inflight, msg); - if(msg_data->inflight_quota > 0){ - msg_data->inflight_quota--; - } } diff --git a/src/handle_publish.c b/src/handle_publish.c index 16bfd786..6d607fcc 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -319,6 +319,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) if(rc2 > 0) rc = 1; break; case 1: + util__decrement_receive_quota(context); rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored); if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){ if(send__puback(context, mid, 0)) rc = 1; @@ -329,7 +330,8 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) } break; case 2: - if(!dup){ + if(dup == 0){ + util__decrement_receive_quota(context); res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored, NULL); }else{ res = 0; @@ -344,10 +346,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) break; } - if(rc == MOSQ_ERR_SUCCESS && qos > 0){ - util__decrement_receive_quota(context); - } - return rc; process_bad_message: mosquitto__free(topic); diff --git a/test/lib/cpp/Makefile b/test/lib/cpp/Makefile index 5761d9d4..42db17c3 100644 --- a/test/lib/cpp/Makefile +++ b/test/lib/cpp/Makefile @@ -1,6 +1,6 @@ .PHONY: all test 01 02 03 04 08 09 clean reallyclean -CFLAGS=-I../../../lib -I../../../lib/cpp -DDEBUG -Werror +CFLAGS=-I../../../lib -I../../../lib/cpp -DDEBUG LIBS=../../../lib/libmosquitto.so.1 ../../../lib/cpp/libmosquittopp.so.1 all : 01 02 03 04 08 09