Send maximum limits for QoS>0.

This needs more work on the broker front to simplify the design.
pull/1203/head
Roger Light 7 years ago
parent 9aec82b0e1
commit 84660e1cbe

@ -69,6 +69,7 @@ static int mosquitto__connect_init(struct mosquitto *mosq, const char *host, int
mosq->keepalive = keepalive; mosq->keepalive = keepalive;
mosq->receive_quota = mosq->receive_maximum; mosq->receive_quota = mosq->receive_maximum;
mosq->send_quota = mosq->send_maximum;
if(mosq->sockpairR != INVALID_SOCKET){ if(mosq->sockpairR != INVALID_SOCKET){
COMPAT_CLOSE(mosq->sockpairR); COMPAT_CLOSE(mosq->sockpairR);

@ -62,6 +62,7 @@ int handle__connack(struct mosquitto *mosq)
} }
mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false); mosquitto_property_read_int16(properties, MQTT_PROP_SERVER_KEEP_ALIVE, &mosq->keepalive, false);
mosquitto_property_read_int16(properties, MQTT_PROP_RECEIVE_MAXIMUM, &mosq->send_maximum, false);
log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", mosq->id, reason_code); log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received CONNACK (%d)", mosq->id, reason_code);
pthread_mutex_lock(&mosq->callback_mutex); pthread_mutex_lock(&mosq->callback_mutex);

@ -26,6 +26,7 @@ Contributors:
#include "messages_mosq.h" #include "messages_mosq.h"
#include "send_mosq.h" #include "send_mosq.h"
#include "time_mosq.h" #include "time_mosq.h"
#include "util_mosq.h"
void message__cleanup(struct mosquitto_message_all **message) void message__cleanup(struct mosquitto_message_all **message)
{ {
@ -123,6 +124,7 @@ int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message
/* mosq->*_message_mutex should be locked before entering this function */ /* mosq->*_message_mutex should be locked before entering this function */
assert(mosq); assert(mosq);
assert(message); assert(message);
assert(message->msg.qos != 0);
if(dir == mosq_md_out){ if(dir == mosq_md_out){
mosq->out_queue_len++; mosq->out_queue_len++;
@ -133,12 +135,10 @@ int message__queue(struct mosquitto *mosq, struct mosquitto_message_all *message
mosq->out_messages = message; mosq->out_messages = message;
} }
mosq->out_messages_last = message; mosq->out_messages_last = message;
if(message->msg.qos > 0){ if(mosq->send_quota > 0){
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ mosq->send_quota--;
mosq->inflight_messages++; }else{
}else{ rc = 1;
rc = 1;
}
} }
}else{ }else{
mosq->in_queue_len++; mosq->in_queue_len++;
@ -187,17 +187,15 @@ void message__reconnect_reset(struct mosquitto *mosq)
pthread_mutex_lock(&mosq->out_message_mutex); pthread_mutex_lock(&mosq->out_message_mutex);
mosq->inflight_messages = 0; mosq->send_quota = mosq->send_maximum;
message = mosq->out_messages; message = mosq->out_messages;
mosq->out_queue_len = 0; mosq->out_queue_len = 0;
while(message){ while(message){
mosq->out_queue_len++; mosq->out_queue_len++;
message->timestamp = 0; message->timestamp = 0;
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ if(mosq->send_quota > 0){
if(message->msg.qos > 0){ mosq->send_quota--;
mosq->inflight_messages++;
}
if(message->msg.qos == 1){ if(message->msg.qos == 1){
message->state = mosq_ms_publish_qos1; message->state = mosq_ms_publish_qos1;
}else if(message->msg.qos == 2){ }else if(message->msg.qos == 2){
@ -243,9 +241,7 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
}else if(!mosq->out_messages){ }else if(!mosq->out_messages){
mosq->out_messages_last = NULL; mosq->out_messages_last = NULL;
} }
if(cur->msg.qos > 0){ util__increment_send_quota(mosq);
mosq->inflight_messages--;
}
found = true; found = true;
break; break;
} }
@ -256,9 +252,9 @@ int message__remove(struct mosquitto *mosq, uint16_t mid, enum mosquitto_msg_dir
if(found){ if(found){
cur = mosq->out_messages; cur = mosq->out_messages;
while(cur){ while(cur){
if(mosq->max_inflight_messages == 0 || mosq->inflight_messages < mosq->max_inflight_messages){ if(mosq->send_quota > 0){
if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){ if(cur->msg.qos > 0 && cur->state == mosq_ms_invalid){
mosq->inflight_messages++; mosq->send_quota--;
if(cur->msg.qos == 1){ if(cur->msg.qos == 1){
cur->state = mosq_ms_wait_for_puback; cur->state = mosq_ms_wait_for_puback;
}else if(cur->msg.qos == 2){ }else if(cur->msg.qos == 2){
@ -394,7 +390,7 @@ int mosquitto_max_inflight_messages_set(struct mosquitto *mosq, unsigned int max
{ {
if(!mosq) return MOSQ_ERR_INVAL; if(!mosq) return MOSQ_ERR_INVAL;
mosq->max_inflight_messages = max_inflight_messages; mosq->send_maximum = max_inflight_messages;
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
} }

@ -146,8 +146,8 @@ int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_st
mosq->in_messages_last = NULL; mosq->in_messages_last = NULL;
mosq->out_messages = NULL; mosq->out_messages = NULL;
mosq->out_messages_last = NULL; mosq->out_messages_last = NULL;
mosq->max_inflight_messages = 20;
mosq->receive_maximum = 20; mosq->receive_maximum = 20;
mosq->send_maximum = 20;
mosq->will = NULL; mosq->will = NULL;
mosq->on_connect = NULL; mosq->on_connect = NULL;
mosq->on_publish = NULL; mosq->on_publish = NULL;

@ -100,6 +100,7 @@ enum mosq_opt_t {
MOSQ_OPT_SSL_CTX = 2, MOSQ_OPT_SSL_CTX = 2,
MOSQ_OPT_SSL_CTX_WITH_DEFAULTS = 3, MOSQ_OPT_SSL_CTX_WITH_DEFAULTS = 3,
MOSQ_OPT_RECEIVE_MAXIMUM = 4, MOSQ_OPT_RECEIVE_MAXIMUM = 4,
MOSQ_OPT_SEND_MAXIMUM = 5,
}; };
/* MQTT specification restricts client ids to a maximum of 23 characters */ /* MQTT specification restricts client ids to a maximum of 23 characters */
@ -1375,6 +1376,15 @@ libmosq_EXPORT int mosquitto_opts_set(struct mosquitto *mosq, enum mosq_opt_t op
* will override this option. Using this option is the recommended * will override this option. Using this option is the recommended
* method however. * method however.
* *
* MOSQ_OPT_SEND_MAXIMUM
* Value can be set between 1 and 65535 inclusive, and represents
* the maximum number of outgoing QoS 1 and QoS 2 messages that this
* client will attempt to have "in flight" at once. Defaults to 20.
* This option is not valid for MQTT v3.1 or v3.1.1 clients.
* Note that if the broker being connected to sends a
* MQTT_PROP_RECEIVE_MAXIMUM property that has a lower value than
* this option, then the broker provided value will be used.
*
* MOSQ_OPT_SSL_CTX_WITH_DEFAULTS * MOSQ_OPT_SSL_CTX_WITH_DEFAULTS
* If value is set to a non zero value, then the user specified * If value is set to a non zero value, then the user specified
* SSL_CTX passed in using MOSQ_OPT_SSL_CTX will have the default * SSL_CTX passed in using MOSQ_OPT_SSL_CTX will have the default
@ -1450,6 +1460,9 @@ libmosq_EXPORT int mosquitto_reconnect_delay_set(struct mosquitto *mosq, unsigne
/* /*
* Function: mosquitto_max_inflight_messages_set * Function: mosquitto_max_inflight_messages_set
* *
* This function is deprected. Use the <mosquitto_int_option> function with the
* MOSQ_OPT_SEND_MAXIMUM option instead.
*
* Set the number of QoS 1 and 2 messages that can be "in flight" at one time. * Set the number of QoS 1 and 2 messages that can be "in flight" at one time.
* An in flight message is part way through its delivery flow. Attempts to send * An in flight message is part way through its delivery flow. Attempts to send
* further messages with <mosquitto_publish> will result in the messages being * further messages with <mosquitto_publish> will result in the messages being

@ -278,14 +278,14 @@ struct mosquitto {
bool reconnect_exponential_backoff; bool reconnect_exponential_backoff;
char threaded; char threaded;
struct mosquitto__packet *out_packet_last; struct mosquitto__packet *out_packet_last;
int inflight_messages;
# ifdef WITH_SRV # ifdef WITH_SRV
ares_channel achan; ares_channel achan;
# endif # endif
#endif #endif
int send_quota;
int receive_quota; int receive_quota;
int receive_maximum; uint16_t send_maximum;
int max_inflight_messages; uint16_t receive_maximum;
#ifdef WITH_BROKER #ifdef WITH_BROKER
UT_hash_handle hh_id; UT_hash_handle hh_id;

@ -322,6 +322,13 @@ int mosquitto_int_option(struct mosquitto *mosq, enum mosq_opt_t option, int val
mosq->receive_maximum = value; mosq->receive_maximum = value;
break; break;
case MOSQ_OPT_SEND_MAXIMUM:
if(value < 0 || value > 65535){
return MOSQ_ERR_INVAL;
}
mosq->send_maximum = value;
break;
case MOSQ_OPT_SSL_CTX_WITH_DEFAULTS: case MOSQ_OPT_SSL_CTX_WITH_DEFAULTS:
#if defined(WITH_TLS) && OPENSSL_VERSION_NUMBER >= 0x10100000L #if defined(WITH_TLS) && OPENSSL_VERSION_NUMBER >= 0x10100000L
if(value){ if(value){

@ -261,3 +261,10 @@ void util__increment_receive_quota(struct mosquitto *mosq)
} }
} }
} }
void util__increment_send_quota(struct mosquitto *mosq)
{
if(mosq->send_quota < mosq->send_maximum){
mosq->send_quota++;
}
}

@ -38,4 +38,5 @@ int mosquitto__hex2bin(const char *hex, unsigned char *bin, int bin_max_len);
#endif #endif
void util__increment_receive_quota(struct mosquitto *mosq); void util__increment_receive_quota(struct mosquitto *mosq);
void util__increment_send_quota(struct mosquitto *mosq);
#endif #endif

@ -76,7 +76,8 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
context->last_inflight_msg = NULL; context->last_inflight_msg = NULL;
context->queued_msgs = NULL; context->queued_msgs = NULL;
context->last_queued_msg = NULL; context->last_queued_msg = NULL;
context->max_inflight_messages = db->config->max_inflight_messages; context->receive_maximum = db->config->max_inflight_messages;
context->send_maximum = db->config->max_inflight_messages;
context->msg_bytes = 0; context->msg_bytes = 0;
context->msg_bytes12 = 0; context->msg_bytes12 = 0;
context->msg_count = 0; context->msg_count = 0;

@ -37,14 +37,14 @@ static unsigned long max_queued_bytes = 0;
*/ */
static bool db__ready_for_flight(struct mosquitto *context, int qos) static bool db__ready_for_flight(struct mosquitto *context, int qos)
{ {
if(qos == 0 || (context->max_inflight_messages == 0 && max_inflight_bytes == 0)){ if(qos == 0 || (context->send_maximum == 0 && max_inflight_bytes == 0)){
return true; return true;
} }
bool valid_bytes = context->msg_bytes12 < max_inflight_bytes; bool valid_bytes = context->msg_bytes12 < max_inflight_bytes;
bool valid_count = context->msg_count12 < context->max_inflight_messages; bool valid_count = context->msg_count12 < context->send_maximum;
if(context->max_inflight_messages == 0){ if(context->send_maximum == 0){
return valid_bytes; return valid_bytes;
} }
if(max_inflight_bytes == 0){ if(max_inflight_bytes == 0){
@ -72,7 +72,7 @@ static bool db__ready_for_queue(struct mosquitto *context, int qos)
unsigned long source_bytes = context->msg_bytes12; unsigned long source_bytes = context->msg_bytes12;
int source_count = context->msg_count12; int source_count = context->msg_count12;
unsigned long adjust_bytes = max_inflight_bytes; unsigned long adjust_bytes = max_inflight_bytes;
int adjust_count = context->max_inflight_messages; int adjust_count = context->send_maximum;
/* nothing in flight for offline clients */ /* nothing in flight for offline clients */
if(context->sock == INVALID_SOCKET){ if(context->sock == INVALID_SOCKET){
@ -306,7 +306,7 @@ int db__message_delete(struct mosquitto_db *db, struct mosquitto *context, uint1
tail = tail->next; tail = tail->next;
} }
} }
while (context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){ while (context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){
msg_index++; msg_index++;
tail = context->queued_msgs; tail = context->queued_msgs;
tail->timestamp = mosquitto_time(); tail->timestamp = mosquitto_time();
@ -837,7 +837,7 @@ int db__message_release(struct mosquitto_db *db, struct mosquitto *context, uint
} }
} }
while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_index < context->max_inflight_messages)){ while(context->queued_msgs && (context->send_maximum == 0 || msg_index < context->send_maximum)){
msg_index++; msg_index++;
tail = context->queued_msgs; tail = context->queued_msgs;
tail->timestamp = mosquitto_time(); tail->timestamp = mosquitto_time();
@ -988,7 +988,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context)
} }
} }
while(context->queued_msgs && (context->max_inflight_messages == 0 || msg_count < context->max_inflight_messages)){ while(context->queued_msgs && (context->send_maximum == 0 || msg_count < context->send_maximum)){
msg_count++; msg_count++;
tail = context->queued_msgs; tail = context->queued_msgs;
if(tail->direction == mosq_md_out){ if(tail->direction == mosq_md_out){

@ -40,11 +40,8 @@ int property__process_connect(struct mosquitto *context, mosquitto_property *pro
return MOSQ_ERR_PROTOCOL; return MOSQ_ERR_PROTOCOL;
} }
if(p->value.i16 == 65535){ context->send_maximum = p->value.i16;
context->max_inflight_messages = 0; context->send_quota = context->send_maximum;
}else{
context->max_inflight_messages = p->value.i16;
}
} }
p = p->next; p = p->next;
} }

Loading…
Cancel
Save