Don't always iterate over entire inflight messages list

If we have e.g. max_inflight_messages set to 1000, and currently have 999 messages inflight, then when we send a new message to a client we have to iterate over the whole list to get to the newest message. This change means that we start of the back of the list to find the newest items, which reduces overhead.
pull/1522/merge
Roger A. Light 6 years ago
parent 58aa41c813
commit b91e78318d

@ -3,7 +3,7 @@ include ../config.mk
.PHONY: all install uninstall clean reallyclean
ifeq ($(WITH_TLS),yes)
all : mosquitto
all : mosquitto mosquitto_passwd
else
all : mosquitto
endif

@ -319,6 +319,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte
}
msg_index--;
db__message_remove(db, &context->msgs_out, tail);
break;
}
}
@ -343,7 +344,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte
db__message_dequeue_first(context, &context->msgs_out);
}
return MOSQ_ERR_SUCCESS;
return db__message_write_inflight_out_latest(db, context);
}
int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update)
@ -524,7 +525,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1
}
if(dir == mosq_md_out && update && context->current_out_packet == NULL){
rc = db__message_write_inflight_out(db, context);
rc = db__message_write_inflight_out_latest(db, context);
if(rc) return rc;
rc = db__message_write_queued_out(db, context);
if(rc) return rc;
@ -984,9 +985,8 @@ int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *con
}
int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context)
static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg *msg)
{
struct mosquitto_client_msg *tail, *tmp;
mosquitto_property *cmsg_props = NULL, *store_props = NULL;
int rc;
uint16_t mid;
@ -999,91 +999,145 @@ int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *co
time_t now = 0;
uint32_t expiry_interval;
if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){
return MOSQ_ERR_SUCCESS;
expiry_interval = 0;
if(msg->store->message_expiry_time){
if(now == 0){
now = time(NULL);
}
if(now > msg->store->message_expiry_time){
/* Message is expired, must not send. */
db__message_remove(db, &context->msgs_out, msg);
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = msg->store->message_expiry_time - now;
}
}
mid = msg->mid;
retries = msg->dup;
retain = msg->retain;
topic = msg->store->topic;
qos = msg->qos;
payloadlen = msg->store->payloadlen;
payload = UHPA_ACCESS_PAYLOAD(msg->store);
cmsg_props = msg->properties;
store_props = msg->store->properties;
switch(msg->state){
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, &context->msgs_out, msg);
}else{
return rc;
}
break;
DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
expiry_interval = 0;
if(tail->store->message_expiry_time){
if(now == 0){
now = time(NULL);
case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS){
msg->timestamp = mosquitto_time();
msg->dup = 1; /* Any retry attempts are a duplicate. */
msg->state = mosq_ms_wait_for_puback;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, &context->msgs_out, msg);
}else{
return rc;
}
if(now > tail->store->message_expiry_time){
/* Message is expired, must not send. */
db__message_remove(db, &context->msgs_out, tail);
continue;
break;
case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS){
msg->timestamp = mosquitto_time();
msg->dup = 1; /* Any retry attempts are a duplicate. */
msg->state = mosq_ms_wait_for_pubrec;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, &context->msgs_out, msg);
}else{
expiry_interval = tail->store->message_expiry_time - now;
return rc;
}
}else{
expiry_interval = 0;
}
mid = tail->mid;
retries = tail->dup;
retain = tail->retain;
topic = tail->store->topic;
qos = tail->qos;
payloadlen = tail->store->payloadlen;
payload = UHPA_ACCESS_PAYLOAD(tail->store);
cmsg_props = tail->properties;
store_props = tail->store->properties;
break;
switch(tail->state){
case mosq_ms_publish_qos0:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, &context->msgs_out, tail);
}else{
return rc;
}
break;
case mosq_ms_resend_pubrel:
rc = send__pubrel(context, mid, NULL);
if(!rc){
msg->state = mosq_ms_wait_for_pubcomp;
}else{
return rc;
}
break;
case mosq_ms_publish_qos1:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
tail->state = mosq_ms_wait_for_puback;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, &context->msgs_out, tail);
}else{
return rc;
}
break;
case mosq_ms_invalid:
case mosq_ms_send_pubrec:
case mosq_ms_resend_pubcomp:
case mosq_ms_wait_for_puback:
case mosq_ms_wait_for_pubrec:
case mosq_ms_wait_for_pubrel:
case mosq_ms_wait_for_pubcomp:
case mosq_ms_queued:
break;
}
return MOSQ_ERR_SUCCESS;
}
case mosq_ms_publish_qos2:
rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval);
if(rc == MOSQ_ERR_SUCCESS){
tail->timestamp = mosquitto_time();
tail->dup = 1; /* Any retry attempts are a duplicate. */
tail->state = mosq_ms_wait_for_pubrec;
}else if(rc == MOSQ_ERR_OVERSIZE_PACKET){
db__message_remove(db, &context->msgs_out, tail);
}else{
return rc;
}
break;
case mosq_ms_resend_pubrel:
rc = send__pubrel(context, mid, NULL);
if(!rc){
tail->state = mosq_ms_wait_for_pubcomp;
}else{
return rc;
}
break;
int db__message_write_inflight_out_all(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *tail, *tmp;
int rc;
case mosq_ms_invalid:
case mosq_ms_send_pubrec:
case mosq_ms_resend_pubcomp:
case mosq_ms_wait_for_puback:
case mosq_ms_wait_for_pubrec:
case mosq_ms_wait_for_pubrel:
case mosq_ms_wait_for_pubcomp:
case mosq_ms_queued:
break;
}
if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){
return MOSQ_ERR_SUCCESS;
}
DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){
rc = db__message_write_inflight_out_single(db, context, tail);
if(rc) return rc;
}
return MOSQ_ERR_SUCCESS;
}
int db__message_write_inflight_out_latest(struct mosquitto_db *db, struct mosquitto *context)
{
struct mosquitto_client_msg *tail, *next;
int rc;
if(context->state != mosq_cs_active
|| context->sock == INVALID_SOCKET
|| context->msgs_out.inflight == NULL){
return MOSQ_ERR_SUCCESS;
}
if(context->msgs_out.inflight->prev == context->msgs_out.inflight){
/* Only one message */
return db__message_write_inflight_out_single(db, context, context->msgs_out.inflight);
}
/* Start at the end of the list and work backwards looking for the first
* message in a non-publish state */
tail = context->msgs_out.inflight->prev;
while(tail != context->msgs_out.inflight &&
(tail->state == mosq_ms_publish_qos0
|| tail->state == mosq_ms_publish_qos1
|| tail->state == mosq_ms_publish_qos2)){
tail = tail->prev;
}
/* Tail is now either the head of the list, if that message is waiting for
* publish, or the oldest message not waiting for a publish. In the latter
* case, any pending publishes should be next after this message. */
if(tail != context->msgs_out.inflight){
tail = tail->next;
}
while(tail){
next = tail->next;
rc = db__message_write_inflight_out_single(db, context, tail);
if(rc) return rc;
tail = next;
}
return MOSQ_ERR_SUCCESS;
}

@ -67,7 +67,10 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context)
}
#endif
mosquitto__set_state(context, mosq_cs_active);
return MOSQ_ERR_SUCCESS;
rc = db__message_write_queued_out(db, context);
if(rc) return rc;
rc = db__message_write_inflight_out_all(db, context);
return rc;
}else{
if(context->protocol == mosq_p_mqtt5){
switch(reason_code){

@ -272,11 +272,10 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v
mosquitto__set_state(context, mosq_cs_active);
rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props);
mosquitto_property_free_all(&connack_props);
rc = db__message_write_queued_out(db, context);
if(rc) return rc;
rc = db__message_write_inflight_out(db, context);
rc = db__message_write_inflight_out_all(db, context);
if(rc) return rc;
rc = db__message_write_queued_out(db, context);
return rc;
error:
free(auth_data_out);
mosquitto_property_free_all(&connack_props);

@ -221,10 +221,10 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context)
#endif
if(context->current_out_packet == NULL){
rc = db__message_write_inflight_out(db, context);
if(rc) return rc;
rc = db__message_write_queued_out(db, context);
if(rc) return rc;
rc = db__message_write_inflight_out_latest(db, context);
if(rc) return rc;
}
return rc;

@ -129,7 +129,7 @@ static int single_publish(struct mosquitto_db *db, struct mosquitto *context, st
}else{
mid = 0;
}
return db__message_insert(db, context, mid, mosq_md_out, msg->qos, 0, stored, msg->properties);
return db__message_insert(db, context, mid, mosq_md_out, msg->qos, 0, stored, msg->properties, true);
}

@ -671,7 +671,8 @@ void db__msg_store_free(struct mosquitto_msg_store *store);
int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context);
void sys_tree__init(struct mosquitto_db *db);
void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time);
int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context);
int db__message_write_inflight_out_all(struct mosquitto_db *db, struct mosquitto *context);
int db__message_write_inflight_out_latest(struct mosquitto_db *db, struct mosquitto *context);
int db__message_write_queued_out(struct mosquitto_db *db, struct mosquitto *context);
int db__message_write_queued_in(struct mosquitto_db *db, struct mosquitto *context);

@ -274,7 +274,8 @@ static int callback_mqtt(struct libwebsocket_context *context,
return -1;
}
db__message_write_inflight_out(db, mosq);
db__message_write_queued_out(db, mosq);
db__message_write_inflight_out_latest(db, mosq);
if(mosq->out_packet && !mosq->current_out_packet){
mosq->current_out_packet = mosq->out_packet;

Loading…
Cancel
Save