From b91e78318d9505d60286a0eade531475d8de0a47 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 18 Feb 2020 17:08:49 +0000 Subject: [PATCH] Don't always iterate over entire inflight messages list If we have e.g. max_inflight_messages set to 1000, and currently have 999 messages inflight, then when we send a new message to a client we have to iterate over the whole list to get to the newest message. This change means that we start of the back of the list to find the newest items, which reduces overhead. --- src/Makefile | 2 +- src/database.c | 212 ++++++++++++++++++++------------ src/handle_connack.c | 5 +- src/handle_connect.c | 5 +- src/handle_subscribe.c | 4 +- src/loop.c | 2 +- src/mosquitto_broker_internal.h | 3 +- src/websockets.c | 3 +- 8 files changed, 147 insertions(+), 89 deletions(-) diff --git a/src/Makefile b/src/Makefile index 207c8c9f..393b1dc5 100644 --- a/src/Makefile +++ b/src/Makefile @@ -3,7 +3,7 @@ include ../config.mk .PHONY: all install uninstall clean reallyclean ifeq ($(WITH_TLS),yes) -all : mosquitto +all : mosquitto mosquitto_passwd else all : mosquitto endif diff --git a/src/database.c b/src/database.c index 361e605f..e868ecd8 100644 --- a/src/database.c +++ b/src/database.c @@ -319,6 +319,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte } msg_index--; db__message_remove(db, &context->msgs_out, tail); + break; } } @@ -343,7 +344,7 @@ int db__message_delete_outgoing(struct mosquitto_db *db, struct mosquitto *conte db__message_dequeue_first(context, &context->msgs_out); } - return MOSQ_ERR_SUCCESS; + return db__message_write_inflight_out_latest(db, context); } int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update) @@ -524,7 +525,7 @@ int db__message_insert(struct mosquitto_db *db, struct mosquitto *context, uint1 } if(dir == mosq_md_out && update && context->current_out_packet == NULL){ - rc = db__message_write_inflight_out(db, context); + rc = db__message_write_inflight_out_latest(db, context); if(rc) return rc; rc = db__message_write_queued_out(db, context); if(rc) return rc; @@ -984,9 +985,8 @@ int db__message_write_inflight_in(struct mosquitto_db *db, struct mosquitto *con } -int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context) +static int db__message_write_inflight_out_single(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg *msg) { - struct mosquitto_client_msg *tail, *tmp; mosquitto_property *cmsg_props = NULL, *store_props = NULL; int rc; uint16_t mid; @@ -999,91 +999,145 @@ int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *co time_t now = 0; uint32_t expiry_interval; - if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){ - return MOSQ_ERR_SUCCESS; + expiry_interval = 0; + if(msg->store->message_expiry_time){ + if(now == 0){ + now = time(NULL); + } + if(now > msg->store->message_expiry_time){ + /* Message is expired, must not send. */ + db__message_remove(db, &context->msgs_out, msg); + return MOSQ_ERR_SUCCESS; + }else{ + expiry_interval = msg->store->message_expiry_time - now; + } } + mid = msg->mid; + retries = msg->dup; + retain = msg->retain; + topic = msg->store->topic; + qos = msg->qos; + payloadlen = msg->store->payloadlen; + payload = UHPA_ACCESS_PAYLOAD(msg->store); + cmsg_props = msg->properties; + store_props = msg->store->properties; + + switch(msg->state){ + case mosq_ms_publish_qos0: + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); + if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ + db__message_remove(db, &context->msgs_out, msg); + }else{ + return rc; + } + break; - DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){ - expiry_interval = 0; - if(tail->store->message_expiry_time){ - if(now == 0){ - now = time(NULL); + case mosq_ms_publish_qos1: + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); + if(rc == MOSQ_ERR_SUCCESS){ + msg->timestamp = mosquitto_time(); + msg->dup = 1; /* Any retry attempts are a duplicate. */ + msg->state = mosq_ms_wait_for_puback; + }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ + db__message_remove(db, &context->msgs_out, msg); + }else{ + return rc; } - if(now > tail->store->message_expiry_time){ - /* Message is expired, must not send. */ - db__message_remove(db, &context->msgs_out, tail); - continue; + break; + + case mosq_ms_publish_qos2: + rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); + if(rc == MOSQ_ERR_SUCCESS){ + msg->timestamp = mosquitto_time(); + msg->dup = 1; /* Any retry attempts are a duplicate. */ + msg->state = mosq_ms_wait_for_pubrec; + }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ + db__message_remove(db, &context->msgs_out, msg); }else{ - expiry_interval = tail->store->message_expiry_time - now; + return rc; } - }else{ - expiry_interval = 0; - } - mid = tail->mid; - retries = tail->dup; - retain = tail->retain; - topic = tail->store->topic; - qos = tail->qos; - payloadlen = tail->store->payloadlen; - payload = UHPA_ACCESS_PAYLOAD(tail->store); - cmsg_props = tail->properties; - store_props = tail->store->properties; + break; - switch(tail->state){ - case mosq_ms_publish_qos0: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); - if(rc == MOSQ_ERR_SUCCESS || rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(db, &context->msgs_out, tail); - }else{ - return rc; - } - break; + case mosq_ms_resend_pubrel: + rc = send__pubrel(context, mid, NULL); + if(!rc){ + msg->state = mosq_ms_wait_for_pubcomp; + }else{ + return rc; + } + break; - case mosq_ms_publish_qos1: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); - if(rc == MOSQ_ERR_SUCCESS){ - tail->timestamp = mosquitto_time(); - tail->dup = 1; /* Any retry attempts are a duplicate. */ - tail->state = mosq_ms_wait_for_puback; - }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(db, &context->msgs_out, tail); - }else{ - return rc; - } - break; + case mosq_ms_invalid: + case mosq_ms_send_pubrec: + case mosq_ms_resend_pubcomp: + case mosq_ms_wait_for_puback: + case mosq_ms_wait_for_pubrec: + case mosq_ms_wait_for_pubrel: + case mosq_ms_wait_for_pubcomp: + case mosq_ms_queued: + break; + } + return MOSQ_ERR_SUCCESS; +} - case mosq_ms_publish_qos2: - rc = send__publish(context, mid, topic, payloadlen, payload, qos, retain, retries, cmsg_props, store_props, expiry_interval); - if(rc == MOSQ_ERR_SUCCESS){ - tail->timestamp = mosquitto_time(); - tail->dup = 1; /* Any retry attempts are a duplicate. */ - tail->state = mosq_ms_wait_for_pubrec; - }else if(rc == MOSQ_ERR_OVERSIZE_PACKET){ - db__message_remove(db, &context->msgs_out, tail); - }else{ - return rc; - } - break; - case mosq_ms_resend_pubrel: - rc = send__pubrel(context, mid, NULL); - if(!rc){ - tail->state = mosq_ms_wait_for_pubcomp; - }else{ - return rc; - } - break; +int db__message_write_inflight_out_all(struct mosquitto_db *db, struct mosquitto *context) +{ + struct mosquitto_client_msg *tail, *tmp; + int rc; - case mosq_ms_invalid: - case mosq_ms_send_pubrec: - case mosq_ms_resend_pubcomp: - case mosq_ms_wait_for_puback: - case mosq_ms_wait_for_pubrec: - case mosq_ms_wait_for_pubrel: - case mosq_ms_wait_for_pubcomp: - case mosq_ms_queued: - break; - } + if(context->state != mosq_cs_active || context->sock == INVALID_SOCKET){ + return MOSQ_ERR_SUCCESS; + } + + DL_FOREACH_SAFE(context->msgs_out.inflight, tail, tmp){ + rc = db__message_write_inflight_out_single(db, context, tail); + if(rc) return rc; + } + return MOSQ_ERR_SUCCESS; +} + + +int db__message_write_inflight_out_latest(struct mosquitto_db *db, struct mosquitto *context) +{ + struct mosquitto_client_msg *tail, *next; + int rc; + + if(context->state != mosq_cs_active + || context->sock == INVALID_SOCKET + || context->msgs_out.inflight == NULL){ + + return MOSQ_ERR_SUCCESS; + } + + if(context->msgs_out.inflight->prev == context->msgs_out.inflight){ + /* Only one message */ + return db__message_write_inflight_out_single(db, context, context->msgs_out.inflight); + } + + /* Start at the end of the list and work backwards looking for the first + * message in a non-publish state */ + tail = context->msgs_out.inflight->prev; + while(tail != context->msgs_out.inflight && + (tail->state == mosq_ms_publish_qos0 + || tail->state == mosq_ms_publish_qos1 + || tail->state == mosq_ms_publish_qos2)){ + + tail = tail->prev; + } + + /* Tail is now either the head of the list, if that message is waiting for + * publish, or the oldest message not waiting for a publish. In the latter + * case, any pending publishes should be next after this message. */ + if(tail != context->msgs_out.inflight){ + tail = tail->next; + } + + while(tail){ + next = tail->next; + rc = db__message_write_inflight_out_single(db, context, tail); + if(rc) return rc; + tail = next; } return MOSQ_ERR_SUCCESS; } diff --git a/src/handle_connack.c b/src/handle_connack.c index 8b6dc83c..ec487bba 100644 --- a/src/handle_connack.c +++ b/src/handle_connack.c @@ -67,7 +67,10 @@ int handle__connack(struct mosquitto_db *db, struct mosquitto *context) } #endif mosquitto__set_state(context, mosq_cs_active); - return MOSQ_ERR_SUCCESS; + rc = db__message_write_queued_out(db, context); + if(rc) return rc; + rc = db__message_write_inflight_out_all(db, context); + return rc; }else{ if(context->protocol == mosq_p_mqtt5){ switch(reason_code){ diff --git a/src/handle_connect.c b/src/handle_connect.c index bec9eb06..a25e5122 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -272,11 +272,10 @@ int connect__on_authorised(struct mosquitto_db *db, struct mosquitto *context, v mosquitto__set_state(context, mosq_cs_active); rc = send__connack(db, context, connect_ack, CONNACK_ACCEPTED, connack_props); mosquitto_property_free_all(&connack_props); + rc = db__message_write_queued_out(db, context); if(rc) return rc; - rc = db__message_write_inflight_out(db, context); + rc = db__message_write_inflight_out_all(db, context); if(rc) return rc; - rc = db__message_write_queued_out(db, context); - return rc; error: free(auth_data_out); mosquitto_property_free_all(&connack_props); diff --git a/src/handle_subscribe.c b/src/handle_subscribe.c index 302ae6c6..b6e7bb2d 100644 --- a/src/handle_subscribe.c +++ b/src/handle_subscribe.c @@ -221,10 +221,10 @@ int handle__subscribe(struct mosquitto_db *db, struct mosquitto *context) #endif if(context->current_out_packet == NULL){ - rc = db__message_write_inflight_out(db, context); - if(rc) return rc; rc = db__message_write_queued_out(db, context); if(rc) return rc; + rc = db__message_write_inflight_out_latest(db, context); + if(rc) return rc; } return rc; diff --git a/src/loop.c b/src/loop.c index 4a192da5..c18aa716 100644 --- a/src/loop.c +++ b/src/loop.c @@ -129,7 +129,7 @@ static int single_publish(struct mosquitto_db *db, struct mosquitto *context, st }else{ mid = 0; } - return db__message_insert(db, context, mid, mosq_md_out, msg->qos, 0, stored, msg->properties); + return db__message_insert(db, context, mid, mosq_md_out, msg->qos, 0, stored, msg->properties, true); } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 6a2d0fa9..c06fb264 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -671,7 +671,8 @@ void db__msg_store_free(struct mosquitto_msg_store *store); int db__message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); void sys_tree__init(struct mosquitto_db *db); void sys_tree__update(struct mosquitto_db *db, int interval, time_t start_time); -int db__message_write_inflight_out(struct mosquitto_db *db, struct mosquitto *context); +int db__message_write_inflight_out_all(struct mosquitto_db *db, struct mosquitto *context); +int db__message_write_inflight_out_latest(struct mosquitto_db *db, struct mosquitto *context); int db__message_write_queued_out(struct mosquitto_db *db, struct mosquitto *context); int db__message_write_queued_in(struct mosquitto_db *db, struct mosquitto *context); diff --git a/src/websockets.c b/src/websockets.c index 6b0a5c6f..35423213 100644 --- a/src/websockets.c +++ b/src/websockets.c @@ -274,7 +274,8 @@ static int callback_mqtt(struct libwebsocket_context *context, return -1; } - db__message_write_inflight_out(db, mosq); + db__message_write_queued_out(db, mosq); + db__message_write_inflight_out_latest(db, mosq); if(mosq->out_packet && !mosq->current_out_packet){ mosq->current_out_packet = mosq->out_packet;