From ea8537c048640564380d3e1e215231e821ff952d Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Mon, 17 Nov 2014 23:46:02 +0000 Subject: [PATCH] Remove unused messages from store immediately. This removes the need for *store_clean*. --- lib/read_handle.h | 4 +++ lib/read_handle_shared.c | 6 ++++- src/bridge.c | 4 +-- src/conf.c | 7 +---- src/context.c | 2 +- src/database.c | 57 +++++++++++++--------------------------- src/loop.c | 14 ++++------ src/mosquitto.c | 2 +- src/mosquitto_broker.h | 12 ++++----- src/persist.c | 5 +--- src/read_handle.c | 4 +-- src/read_handle_server.c | 2 +- src/subs.c | 5 +++- 13 files changed, 50 insertions(+), 74 deletions(-) diff --git a/lib/read_handle.h b/lib/read_handle.h index 018353f8..96e0467e 100644 --- a/lib/read_handle.h +++ b/lib/read_handle.h @@ -23,7 +23,11 @@ int _mosquitto_packet_handle(struct mosquitto *mosq); int _mosquitto_handle_connack(struct mosquitto *mosq); int _mosquitto_handle_pingreq(struct mosquitto *mosq); int _mosquitto_handle_pingresp(struct mosquitto *mosq); +#ifdef WITH_BROKER +int _mosquitto_handle_pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type); +#else int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type); +#endif int _mosquitto_handle_publish(struct mosquitto *mosq); int _mosquitto_handle_pubrec(struct mosquitto *mosq); int _mosquitto_handle_pubrel(struct mosquitto_db *db, struct mosquitto *mosq); diff --git a/lib/read_handle_shared.c b/lib/read_handle_shared.c index 41e26995..3c560bb8 100644 --- a/lib/read_handle_shared.c +++ b/lib/read_handle_shared.c @@ -54,7 +54,11 @@ int _mosquitto_handle_pingresp(struct mosquitto *mosq) return MOSQ_ERR_SUCCESS; } +#ifdef WITH_BROKER +int _mosquitto_handle_pubackcomp(struct mosquitto_db *db, struct mosquitto *mosq, const char *type) +#else int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type) +#endif { uint16_t mid; int rc; @@ -66,7 +70,7 @@ int _mosquitto_handle_pubackcomp(struct mosquitto *mosq, const char *type) _mosquitto_log_printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d)", type, mosq->id, mid); if(mid){ - rc = mqtt3_db_message_delete(mosq, mid, mosq_md_out); + rc = mqtt3_db_message_delete(db, mosq, mid, mosq_md_out); if(rc) return rc; } #else diff --git a/src/bridge.c b/src/bridge.c index b488c12d..6bca39c5 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -146,10 +146,10 @@ int mqtt3_bridge_connect(struct mosquitto_db *db, struct mosquitto *context) context->ping_t = 0; context->bridge->lazy_reconnect = false; mqtt3_bridge_packet_cleanup(context); - mqtt3_db_message_reconnect_reset(context); + mqtt3_db_message_reconnect_reset(db, context); if(context->clean_session){ - mqtt3_db_messages_delete(context); + mqtt3_db_messages_delete(db, context); } /* Delete all local subscriptions even for clean_session==false. We don't diff --git a/src/conf.c b/src/conf.c index c0943a11..f5fd105e 100644 --- a/src/conf.c +++ b/src/conf.c @@ -155,7 +155,6 @@ static void _config_init_reload(struct mqtt3_config *config) config->psk_file = NULL; config->queue_qos0_messages = false; config->retry_interval = 20; - config->store_clean_interval = 10; config->sys_interval = 10; config->upgrade_outgoing_qos = false; if(config->auth_options){ @@ -1618,11 +1617,7 @@ int _config_read_file_core(struct mqtt3_config *config, bool reload, const char _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available."); #endif }else if(!strcmp(token, "store_clean_interval")){ - if(_conf_parse_int(&token, "store_clean_interval", &config->store_clean_interval, saveptr)) return MOSQ_ERR_INVAL; - if(config->store_clean_interval < 0 || config->store_clean_interval > 65535){ - _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Invalid store_clean_interval value (%d).", config->store_clean_interval); - return MOSQ_ERR_INVAL; - } + _mosquitto_log_printf(NULL, MOSQ_LOG_WARNING, "Warning: store_clean_interval is no longer needed."); }else if(!strcmp(token, "sys_interval")){ if(_conf_parse_int(&token, "sys_interval", &config->sys_interval, saveptr)) return MOSQ_ERR_INVAL; if(config->sys_interval < 0 || config->sys_interval > 65535){ diff --git a/src/context.c b/src/context.c index 5dee9230..b654ace4 100644 --- a/src/context.c +++ b/src/context.c @@ -135,7 +135,7 @@ void mqtt3_context_cleanup(struct mosquitto_db *db, struct mosquitto *context, b _mosquitto_socket_close(db, context); if((do_free || context->clean_session) && db){ mqtt3_subs_clean_session(db, context); - mqtt3_db_messages_delete(context); + mqtt3_db_messages_delete(db, context); } if(context->address){ _mosquitto_free(context->address); diff --git a/src/database.c b/src/database.c index 6bfd74c6..e9b62253 100644 --- a/src/database.c +++ b/src/database.c @@ -121,19 +121,21 @@ static void subhier_clean(struct _mosquitto_subhier *subhier) int mqtt3_db_close(struct mosquitto_db *db) { subhier_clean(db->subs.children); - mqtt3_db_store_clean(db); return MOSQ_ERR_SUCCESS; } -static void _message_remove(struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) +static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, struct mosquitto_client_msg **msg, struct mosquitto_client_msg *last) { if(!context || !msg || !(*msg)){ return; } - /* FIXME - it would be nice to be able to remove the stored message here if ref_count==0 */ (*msg)->store->ref_count--; + if((*msg)->store->ref_count == 0){ + HASH_DELETE(hh, db->msg_store, (*msg)->store); + db->msg_store_count--; + } if(last){ last->next = (*msg)->next; if(!last->next){ @@ -157,7 +159,7 @@ static void _message_remove(struct mosquitto *context, struct mosquitto_client_m } } -int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir) +int mqtt3_db_message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir) { struct mosquitto_client_msg *tail, *last = NULL; int msg_index = 0; @@ -190,7 +192,7 @@ int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosqui } if(tail->mid == mid && tail->direction == dir){ msg_index--; - _message_remove(context, &tail, last); + _message_remove(db, context, &tail, last); deleted = true; }else{ last = tail; @@ -362,7 +364,7 @@ int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, #ifdef WITH_WEBSOCKETS if(context->wsi){ - return mqtt3_db_message_write(context); + return mqtt3_db_message_write(db, context); }else{ return rc; } @@ -387,7 +389,7 @@ int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosqui return 1; } -int mqtt3_db_messages_delete(struct mosquitto *context) +int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context) { struct mosquitto_client_msg *tail, *next; @@ -395,8 +397,11 @@ int mqtt3_db_messages_delete(struct mosquitto *context) tail = context->msgs; while(tail){ - /* FIXME - it would be nice to be able to remove the stored message here if rec_count==0 */ tail->store->ref_count--; + if(tail->store->ref_count == 0){ + HASH_DELETE(hh, db->msg_store, tail->store); + db->msg_store_count--; + } next = tail->next; _mosquitto_free(tail); tail = next; @@ -523,7 +528,7 @@ int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct /* Called on reconnect to set outgoing messages to a sensible state and force a * retry, and to set incoming messages to expect an appropriate retry. */ -int mqtt3_db_message_reconnect_reset(struct mosquitto *context) +int mqtt3_db_message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context) { struct mosquitto_client_msg *msg; struct mosquitto_client_msg *prev = NULL; @@ -562,7 +567,7 @@ int mqtt3_db_message_reconnect_reset(struct mosquitto *context) if(msg->qos != 2){ /* Anything store)){ - _message_remove(context, &tail, last); + _message_remove(db, context, &tail, last); deleted = true; }else{ return 1; @@ -712,7 +717,7 @@ int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, } } -int mqtt3_db_message_write(struct mosquitto *context) +int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context) { int rc; struct mosquitto_client_msg *tail, *last = NULL; @@ -752,7 +757,7 @@ int mqtt3_db_message_write(struct mosquitto *context) case mosq_ms_publish_qos0: rc = _mosquitto_send_publish(context, mid, topic, payloadlen, payload, qos, retain, retries); if(!rc){ - _message_remove(context, &tail, last); + _message_remove(db, context, &tail, last); }else{ return rc; } @@ -838,32 +843,6 @@ int mqtt3_db_message_write(struct mosquitto *context) return MOSQ_ERR_SUCCESS; } -void mqtt3_db_store_clean(struct mosquitto_db *db) -{ - /* FIXME - this may not be necessary if checks are made when messages are removed. */ - struct mosquitto_msg_store *msg_store, *msg_tmp; - int i; - assert(db); - - HASH_ITER(hh, db->msg_store, msg_store, msg_tmp){ - if(msg_store->ref_count == 0){ - HASH_DELETE(hh, db->msg_store, msg_store); - - if(msg_store->source_id) _mosquitto_free(msg_store->source_id); - if(msg_store->dest_ids){ - for(i=0; idest_id_count; i++){ - if(msg_store->dest_ids[i]) _mosquitto_free(msg_store->dest_ids[i]); - } - _mosquitto_free(msg_store->dest_ids); - } - if(msg_store->msg.topic) _mosquitto_free(msg_store->msg.topic); - if(msg_store->msg.payload) _mosquitto_free(msg_store->msg.payload); - _mosquitto_free(msg_store); - db->msg_store_count--; - } - } -} - void mqtt3_db_limits_set(int inflight, int queued) { max_inflight = inflight; diff --git a/src/loop.c b/src/loop.c index cb331f42..0ce10b5c 100644 --- a/src/loop.c +++ b/src/loop.c @@ -67,7 +67,6 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock #ifdef WITH_PERSISTENCE time_t last_backup = mosquitto_time(); #endif - time_t last_store_clean = mosquitto_time(); time_t now = 0; time_t now_time; int time_count; @@ -162,7 +161,7 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock || context->bridge || now - context->last_msg_in < (time_t)(context->keepalive)*3/2){ - if(mqtt3_db_message_write(context) == MOSQ_ERR_SUCCESS){ + if(mqtt3_db_message_write(db, context) == MOSQ_ERR_SUCCESS){ pollfds[pollfd_index].fd = context->sock; pollfds[pollfd_index].events = POLLIN; pollfds[pollfd_index].revents = 0; @@ -287,24 +286,21 @@ int mosquitto_main_loop(struct mosquitto_db *db, int *listensock, int listensock if(db->config->persistence && db->config->autosave_interval){ if(db->config->autosave_on_changes){ if(db->persistence_changes > db->config->autosave_interval){ - mqtt3_db_backup(db, false, false); + mqtt3_db_backup(db, false); db->persistence_changes = 0; } }else{ if(last_backup + db->config->autosave_interval < mosquitto_time()){ - mqtt3_db_backup(db, false, false); + mqtt3_db_backup(db, false); last_backup = mosquitto_time(); } } } #endif - if(!db->config->store_clean_interval || last_store_clean + db->config->store_clean_interval < mosquitto_time()){ - mqtt3_db_store_clean(db); - last_store_clean = mosquitto_time(); - } + #ifdef WITH_PERSISTENCE if(flag_db_backup){ - mqtt3_db_backup(db, false, false); + mqtt3_db_backup(db, false); flag_db_backup = false; } #endif diff --git a/src/mosquitto.c b/src/mosquitto.c index 5f8bc21a..75459d0a 100644 --- a/src/mosquitto.c +++ b/src/mosquitto.c @@ -335,7 +335,7 @@ int main(int argc, char *argv[]) #ifdef WITH_PERSISTENCE if(config.persistence){ - mqtt3_db_backup(&int_db, true, true); + mqtt3_db_backup(&int_db, true); } #endif diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 88cf49a2..271d5d2c 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -113,7 +113,6 @@ struct mqtt3_config { char *psk_file; bool queue_qos0_messages; int retry_interval; - int store_clean_interval; int sys_interval; bool upgrade_outgoing_qos; char *user; @@ -360,27 +359,26 @@ int mqtt3_handle_unsubscribe(struct mosquitto_db *db, struct mosquitto *context) int mqtt3_db_open(struct mqtt3_config *config, struct mosquitto_db *db); int mqtt3_db_close(struct mosquitto_db *db); #ifdef WITH_PERSISTENCE -int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown); +int mqtt3_db_backup(struct mosquitto_db *db, bool shutdown); int mqtt3_db_restore(struct mosquitto_db *db); #endif void mqtt3_db_limits_set(int inflight, int queued); /* Return the number of in-flight messages in count. */ int mqtt3_db_message_count(int *count); -int mqtt3_db_message_delete(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); +int mqtt3_db_message_delete(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); int mqtt3_db_message_insert(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, int qos, bool retain, struct mosquitto_msg_store *stored); int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir); int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, enum mosquitto_msg_state state); -int mqtt3_db_message_write(struct mosquitto *context); -int mqtt3_db_messages_delete(struct mosquitto *context); +int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context); +int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); /* Check all messages waiting on a client reply and resend if timeout has been exceeded. */ int mqtt3_db_message_timeout_check(struct mosquitto_db *db, unsigned int timeout); -int mqtt3_db_message_reconnect_reset(struct mosquitto *context); +int mqtt3_db_message_reconnect_reset(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_retain_queue(struct mosquitto_db *db, struct mosquitto *context, const char *sub, int sub_qos); -void mqtt3_db_store_clean(struct mosquitto_db *db); void mqtt3_db_sys_update(struct mosquitto_db *db, int interval, time_t start_time); void mqtt3_db_vacuum(void); diff --git a/src/persist.c b/src/persist.c index cd1f0124..8f6e1255 100644 --- a/src/persist.c +++ b/src/persist.c @@ -322,7 +322,7 @@ static int mqtt3_db_subs_retain_write(struct mosquitto_db *db, FILE *db_fptr) return MOSQ_ERR_SUCCESS; } -int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown) +int mqtt3_db_backup(struct mosquitto_db *db, bool shutdown) { int rc = 0; FILE *db_fptr = NULL; @@ -338,9 +338,6 @@ int mqtt3_db_backup(struct mosquitto_db *db, bool cleanup, bool shutdown) if(!db || !db->config || !db->config->persistence_filepath) return MOSQ_ERR_INVAL; _mosquitto_log_printf(NULL, MOSQ_LOG_INFO, "Saving in-memory database to %s.", db->config->persistence_filepath); - if(cleanup){ - mqtt3_db_store_clean(db); - } len = strlen(db->config->persistence_filepath)+5; outfile = _mosquitto_calloc(len+1, 1); diff --git a/src/read_handle.c b/src/read_handle.c index 2679ffc2..a156641d 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -41,9 +41,9 @@ int mqtt3_packet_handle(struct mosquitto_db *db, struct mosquitto *context) case PINGRESP: return _mosquitto_handle_pingresp(context); case PUBACK: - return _mosquitto_handle_pubackcomp(context, "PUBACK"); + return _mosquitto_handle_pubackcomp(db, context, "PUBACK"); case PUBCOMP: - return _mosquitto_handle_pubackcomp(context, "PUBCOMP"); + return _mosquitto_handle_pubackcomp(db, context, "PUBCOMP"); case PUBLISH: return mqtt3_handle_publish(db, context); case PUBREC: diff --git a/src/read_handle_server.c b/src/read_handle_server.c index bc2867c7..2c0a603b 100644 --- a/src/read_handle_server.c +++ b/src/read_handle_server.c @@ -434,7 +434,7 @@ int mqtt3_handle_connect(struct mosquitto_db *db, struct mosquitto *context) if(found_context->msgs){ context->msgs = found_context->msgs; found_context->msgs = NULL; - mqtt3_db_message_reconnect_reset(context); + mqtt3_db_message_reconnect_reset(db, context); } context->subs = found_context->subs; found_context->subs = NULL; diff --git a/src/subs.c b/src/subs.c index 2855def0..9ce6ac18 100644 --- a/src/subs.c +++ b/src/subs.c @@ -81,7 +81,10 @@ static int _subs_process(struct mosquitto_db *db, struct _mosquitto_subhier *hie #endif if(hier->retained){ hier->retained->ref_count--; - /* FIXME - it would be nice to be able to remove the message from the store at this point if ref_count == 0 */ + if(hier->retained->ref_count == 0){ + HASH_DELETE(hh, db->msg_store, hier->retained); + db->msg_store_count--; + } #ifdef WITH_SYS_TREE db->retained_count--; #endif