From 9ad8d943bee99d1be86780b272eb0292503c2dba Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 3 Aug 2021 11:18:43 +0100 Subject: [PATCH] Stored messages now always stored in a hash table. --- apps/db_dump/db_dump.c | 19 ++------------ src/database.c | 37 +++++++++------------------ src/mosquitto_broker_internal.h | 10 +------- src/persist_read.c | 44 +++++++++------------------------ src/persist_write.c | 7 ++---- test/unit/persist_read_stubs.c | 2 +- 6 files changed, 28 insertions(+), 91 deletions(-) diff --git a/apps/db_dump/db_dump.c b/apps/db_dump/db_dump.c index 6a587471..46719ada 100644 --- a/apps/db_dump/db_dump.c +++ b/apps/db_dump/db_dump.c @@ -227,7 +227,6 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length) { struct P_msg_store chunk; struct mosquitto_msg_store *stored = NULL; - struct mosquitto_msg_store_load *load; int64_t message_expiry_interval64; uint32_t message_expiry_interval; int rc = 0; @@ -247,17 +246,6 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length) return rc; } - load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load)); - if(!load){ - fclose(db_fptr); - mosquitto__free(chunk.source.id); - mosquitto__free(chunk.source.username); - mosquitto__free(chunk.topic); - mosquitto__free(chunk.payload); - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } - if(chunk.F.expiry_time > 0){ message_expiry_interval64 = chunk.F.expiry_time - time(NULL); if(message_expiry_interval64 < 0 || message_expiry_interval64 > UINT32_MAX){ @@ -271,7 +259,6 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length) stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); if(stored == NULL){ - mosquitto__free(load); fclose(db_fptr); mosquitto__free(chunk.source.id); mosquitto__free(chunk.source.username); @@ -297,12 +284,10 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length) if(rc == MOSQ_ERR_SUCCESS){ stored->source_listener = chunk.source.listener; - load->db_id = stored->db_id; - load->store = stored; + stored->db_id = stored->db_id; - HASH_ADD(hh, db.msg_store_load, db_id, sizeof(dbid_t), load); + HASH_ADD(hh, db.msg_store, db_id, sizeof(dbid_t), stored); }else{ - mosquitto__free(load); fclose(db_fptr); return rc; } diff --git a/src/database.c b/src/database.c index 145044c8..c918ebe0 100644 --- a/src/database.c +++ b/src/database.c @@ -250,12 +250,12 @@ int db__close(void) void db__msg_store_add(struct mosquitto_msg_store *store) { - store->next = db.msg_store; - store->prev = NULL; - if(db.msg_store){ - db.msg_store->prev = store; + struct mosquitto_msg_store *found; + + HASH_FIND(hh, db.msg_store, &store->db_id, sizeof(store->db_id), found); + if(found == NULL){ + HASH_ADD(hh, db.msg_store, db_id, sizeof(store->db_id), store); } - db.msg_store = store; } @@ -279,17 +279,8 @@ void db__msg_store_free(struct mosquitto_msg_store *store) void db__msg_store_remove(struct mosquitto_msg_store *store) { - if(store->prev){ - store->prev->next = store->next; - if(store->next){ - store->next->prev = store->prev; - } - }else{ - db.msg_store = store->next; - if(store->next){ - store->next->prev = NULL; - } - } + if(store == NULL) return; + HASH_DELETE(hh, db.msg_store, store); db.msg_store_count--; db.msg_store_bytes -= store->payloadlen; @@ -299,13 +290,10 @@ void db__msg_store_remove(struct mosquitto_msg_store *store) void db__msg_store_clean(void) { - struct mosquitto_msg_store *store, *next;; + struct mosquitto_msg_store *store, *store_tmp; - store = db.msg_store; - while(store){ - next = store->next; + HASH_ITER(hh, db.msg_store, store, store_tmp){ db__msg_store_remove(store); - store = next; } } @@ -326,15 +314,12 @@ void db__msg_store_ref_dec(struct mosquitto_msg_store **store) void db__msg_store_compact(void) { - struct mosquitto_msg_store *store, *next; + struct mosquitto_msg_store *store, *store_tmp; - store = db.msg_store; - while(store){ - next = store->next; + HASH_ITER(hh, db.msg_store, store, store_tmp){ if(store->ref_count < 1){ db__msg_store_remove(store); } - store = next; } } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index c4cda6bd..43bbf5a8 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -386,15 +386,8 @@ struct mosquitto__retainhier { uint16_t topic_len; }; -struct mosquitto_msg_store_load{ - UT_hash_handle hh; - dbid_t db_id; - struct mosquitto_msg_store *store; -}; - struct mosquitto_msg_store{ - struct mosquitto_msg_store *next; - struct mosquitto_msg_store *prev; + UT_hash_handle hh; dbid_t db_id; char *source_id; char *source_username; @@ -483,7 +476,6 @@ struct mosquitto_db{ #endif struct clientid__index_hash *clientid_index_hash; struct mosquitto_msg_store *msg_store; - struct mosquitto_msg_store_load *msg_store_load; time_t now_s; /* Monotonic clock, where possible */ time_t now_real_s; /* Read clock, for measuring session/message expiry */ int next_event_ms; /* for mux timeout */ diff --git a/src/persist_read.c b/src/persist_read.c index 18f7f043..f0590e56 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -112,12 +112,12 @@ int persist__read_string(FILE *db_fptr, char **str) static int persist__client_msg_restore(struct P_client_msg *chunk) { struct mosquitto_client_msg *cmsg; - struct mosquitto_msg_store_load *load; + struct mosquitto_msg_store *msg; struct mosquitto *context; struct mosquitto_msg_data *msg_data; - HASH_FIND(hh, db.msg_store_load, &chunk->F.store_id, sizeof(dbid_t), load); - if(!load){ + HASH_FIND(hh, db.msg_store, &chunk->F.store_id, sizeof(chunk->F.store_id), msg); + if(!msg){ /* Can't find message - probably expired */ return MOSQ_ERR_SUCCESS; } @@ -144,7 +144,7 @@ static int persist__client_msg_restore(struct P_client_msg *chunk) cmsg->dup = chunk->F.retain_dup&0x0F; cmsg->subscription_identifier = chunk->subscription_identifier; - cmsg->store = load->store; + cmsg->store = msg; db__msg_store_ref_inc(cmsg->store); if(cmsg->direction == mosq_md_out){ @@ -247,7 +247,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length) { struct P_msg_store chunk; struct mosquitto_msg_store *stored = NULL; - struct mosquitto_msg_store_load *load; int64_t message_expiry_interval64; uint32_t message_expiry_interval; int rc = 0; @@ -272,15 +271,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length) } } } - load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load)); - if(!load){ - mosquitto__free(chunk.source.id); - mosquitto__free(chunk.source.username); - mosquitto__free(chunk.topic); - mosquitto__free(chunk.payload); - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; - } if(chunk.F.expiry_time > 0){ message_expiry_interval64 = chunk.F.expiry_time - time(NULL); @@ -290,7 +280,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length) mosquitto__free(chunk.source.username); mosquitto__free(chunk.topic); mosquitto__free(chunk.payload); - mosquitto__free(load); return MOSQ_ERR_SUCCESS; }else{ message_expiry_interval = (uint32_t)message_expiry_interval64; @@ -301,7 +290,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length) stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); if(stored == NULL){ - mosquitto__free(load); mosquitto__free(chunk.source.id); mosquitto__free(chunk.source.username); mosquitto__free(chunk.topic); @@ -327,21 +315,16 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length) chunk.source.username = NULL; if(rc == MOSQ_ERR_SUCCESS){ - stored->source_listener = chunk.source.listener; - load->db_id = stored->db_id; - load->store = stored; - - HASH_ADD(hh, db.msg_store_load, db_id, sizeof(dbid_t), load); return MOSQ_ERR_SUCCESS; }else{ - mosquitto__free(load); + mosquitto__free(stored); return rc; } } static int persist__retain_chunk_restore(FILE *db_fptr) { - struct mosquitto_msg_store_load *load; + struct mosquitto_msg_store *msg; struct P_retain chunk; int rc; char **split_topics; @@ -358,10 +341,10 @@ static int persist__retain_chunk_restore(FILE *db_fptr) return rc; } - HASH_FIND(hh, db.msg_store_load, &chunk.F.store_id, sizeof(dbid_t), load); - if(load){ - if(sub__topic_tokenise(load->store->topic, &local_topic, &split_topics, NULL)) return 1; - retain__store(load->store->topic, load->store, split_topics); + HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), msg); + if(msg){ + if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return 1; + retain__store(msg->topic, msg, split_topics); mosquitto__free(local_topic); mosquitto__free(split_topics); }else{ @@ -415,7 +398,6 @@ int persist__restore(void) uint32_t chunk, length; size_t rlen; char *err; - struct mosquitto_msg_store_load *load, *load_tmp; struct PF_cfg cfg_chunk; assert(db.config); @@ -424,7 +406,7 @@ int persist__restore(void) return MOSQ_ERR_SUCCESS; } - db.msg_store_load = NULL; + db.msg_store = NULL; fptr = mosquitto__fopen(db.config->persistence_filepath, "rb", false); if(fptr == NULL) return MOSQ_ERR_SUCCESS; @@ -530,10 +512,6 @@ int persist__restore(void) fclose(fptr); - HASH_ITER(hh, db.msg_store_load, load, load_tmp){ - HASH_DELETE(hh, db.msg_store_load, load); - mosquitto__free(load); - } return rc; error: err = strerror(errno); diff --git a/src/persist_write.c b/src/persist_write.c index d879b8b0..3fc86dad 100644 --- a/src/persist_write.c +++ b/src/persist_write.c @@ -86,7 +86,7 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex static int persist__message_store_save(FILE *db_fptr) { struct P_msg_store chunk; - struct mosquitto_msg_store *stored; + struct mosquitto_msg_store *stored, *stored_tmp; int rc; assert(db_fptr); @@ -94,16 +94,14 @@ static int persist__message_store_save(FILE *db_fptr) memset(&chunk, 0, sizeof(struct P_msg_store)); stored = db.msg_store; - while(stored){ + HASH_ITER(hh, db.msg_store, stored, stored_tmp){ if(stored->ref_count < 1 || stored->topic == NULL){ - stored = stored->next; continue; } if(!strncmp(stored->topic, "$SYS", 4)){ if(stored->ref_count <= 1 && stored->dest_id_count == 0){ /* $SYS messages that are only retained shouldn't be persisted. */ - stored = stored->next; continue; } /* Don't save $SYS messages as retained otherwise they can give @@ -150,7 +148,6 @@ static int persist__message_store_save(FILE *db_fptr) if(rc){ return rc; } - stored = stored->next; } return MOSQ_ERR_SUCCESS; diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index f0cdc883..39c654a5 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -89,7 +89,7 @@ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store stored->db_id = store_id; } - db.msg_store = stored; + HASH_ADD(hh, db.msg_store, db_id, sizeof(stored->db_id), stored); return MOSQ_ERR_SUCCESS; error: