Stored messages now always stored in a hash table.

pull/2438/head
Roger A. Light 4 years ago
parent d6672194db
commit 9ad8d943be

@ -227,7 +227,6 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length)
{
struct P_msg_store chunk;
struct mosquitto_msg_store *stored = NULL;
struct mosquitto_msg_store_load *load;
int64_t message_expiry_interval64;
uint32_t message_expiry_interval;
int rc = 0;
@ -247,17 +246,6 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length)
return rc;
}
load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load));
if(!load){
fclose(db_fptr);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
mosquitto__free(chunk.payload);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
if(chunk.F.expiry_time > 0){
message_expiry_interval64 = chunk.F.expiry_time - time(NULL);
if(message_expiry_interval64 < 0 || message_expiry_interval64 > UINT32_MAX){
@ -271,7 +259,6 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length)
stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(stored == NULL){
mosquitto__free(load);
fclose(db_fptr);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
@ -297,12 +284,10 @@ static int dump__msg_store_chunk_process(FILE *db_fptr, uint32_t length)
if(rc == MOSQ_ERR_SUCCESS){
stored->source_listener = chunk.source.listener;
load->db_id = stored->db_id;
load->store = stored;
stored->db_id = stored->db_id;
HASH_ADD(hh, db.msg_store_load, db_id, sizeof(dbid_t), load);
HASH_ADD(hh, db.msg_store, db_id, sizeof(dbid_t), stored);
}else{
mosquitto__free(load);
fclose(db_fptr);
return rc;
}

@ -250,12 +250,12 @@ int db__close(void)
void db__msg_store_add(struct mosquitto_msg_store *store)
{
store->next = db.msg_store;
store->prev = NULL;
if(db.msg_store){
db.msg_store->prev = store;
struct mosquitto_msg_store *found;
HASH_FIND(hh, db.msg_store, &store->db_id, sizeof(store->db_id), found);
if(found == NULL){
HASH_ADD(hh, db.msg_store, db_id, sizeof(store->db_id), store);
}
db.msg_store = store;
}
@ -279,17 +279,8 @@ void db__msg_store_free(struct mosquitto_msg_store *store)
void db__msg_store_remove(struct mosquitto_msg_store *store)
{
if(store->prev){
store->prev->next = store->next;
if(store->next){
store->next->prev = store->prev;
}
}else{
db.msg_store = store->next;
if(store->next){
store->next->prev = NULL;
}
}
if(store == NULL) return;
HASH_DELETE(hh, db.msg_store, store);
db.msg_store_count--;
db.msg_store_bytes -= store->payloadlen;
@ -299,13 +290,10 @@ void db__msg_store_remove(struct mosquitto_msg_store *store)
void db__msg_store_clean(void)
{
struct mosquitto_msg_store *store, *next;;
struct mosquitto_msg_store *store, *store_tmp;
store = db.msg_store;
while(store){
next = store->next;
HASH_ITER(hh, db.msg_store, store, store_tmp){
db__msg_store_remove(store);
store = next;
}
}
@ -326,15 +314,12 @@ void db__msg_store_ref_dec(struct mosquitto_msg_store **store)
void db__msg_store_compact(void)
{
struct mosquitto_msg_store *store, *next;
struct mosquitto_msg_store *store, *store_tmp;
store = db.msg_store;
while(store){
next = store->next;
HASH_ITER(hh, db.msg_store, store, store_tmp){
if(store->ref_count < 1){
db__msg_store_remove(store);
}
store = next;
}
}

@ -386,15 +386,8 @@ struct mosquitto__retainhier {
uint16_t topic_len;
};
struct mosquitto_msg_store_load{
UT_hash_handle hh;
dbid_t db_id;
struct mosquitto_msg_store *store;
};
struct mosquitto_msg_store{
struct mosquitto_msg_store *next;
struct mosquitto_msg_store *prev;
UT_hash_handle hh;
dbid_t db_id;
char *source_id;
char *source_username;
@ -483,7 +476,6 @@ struct mosquitto_db{
#endif
struct clientid__index_hash *clientid_index_hash;
struct mosquitto_msg_store *msg_store;
struct mosquitto_msg_store_load *msg_store_load;
time_t now_s; /* Monotonic clock, where possible */
time_t now_real_s; /* Read clock, for measuring session/message expiry */
int next_event_ms; /* for mux timeout */

@ -112,12 +112,12 @@ int persist__read_string(FILE *db_fptr, char **str)
static int persist__client_msg_restore(struct P_client_msg *chunk)
{
struct mosquitto_client_msg *cmsg;
struct mosquitto_msg_store_load *load;
struct mosquitto_msg_store *msg;
struct mosquitto *context;
struct mosquitto_msg_data *msg_data;
HASH_FIND(hh, db.msg_store_load, &chunk->F.store_id, sizeof(dbid_t), load);
if(!load){
HASH_FIND(hh, db.msg_store, &chunk->F.store_id, sizeof(chunk->F.store_id), msg);
if(!msg){
/* Can't find message - probably expired */
return MOSQ_ERR_SUCCESS;
}
@ -144,7 +144,7 @@ static int persist__client_msg_restore(struct P_client_msg *chunk)
cmsg->dup = chunk->F.retain_dup&0x0F;
cmsg->subscription_identifier = chunk->subscription_identifier;
cmsg->store = load->store;
cmsg->store = msg;
db__msg_store_ref_inc(cmsg->store);
if(cmsg->direction == mosq_md_out){
@ -247,7 +247,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
{
struct P_msg_store chunk;
struct mosquitto_msg_store *stored = NULL;
struct mosquitto_msg_store_load *load;
int64_t message_expiry_interval64;
uint32_t message_expiry_interval;
int rc = 0;
@ -272,15 +271,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
}
}
}
load = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store_load));
if(!load){
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
mosquitto__free(chunk.payload);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
if(chunk.F.expiry_time > 0){
message_expiry_interval64 = chunk.F.expiry_time - time(NULL);
@ -290,7 +280,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
mosquitto__free(chunk.payload);
mosquitto__free(load);
return MOSQ_ERR_SUCCESS;
}else{
message_expiry_interval = (uint32_t)message_expiry_interval64;
@ -301,7 +290,6 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(stored == NULL){
mosquitto__free(load);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
@ -327,21 +315,16 @@ static int persist__msg_store_chunk_restore(FILE *db_fptr, uint32_t length)
chunk.source.username = NULL;
if(rc == MOSQ_ERR_SUCCESS){
stored->source_listener = chunk.source.listener;
load->db_id = stored->db_id;
load->store = stored;
HASH_ADD(hh, db.msg_store_load, db_id, sizeof(dbid_t), load);
return MOSQ_ERR_SUCCESS;
}else{
mosquitto__free(load);
mosquitto__free(stored);
return rc;
}
}
static int persist__retain_chunk_restore(FILE *db_fptr)
{
struct mosquitto_msg_store_load *load;
struct mosquitto_msg_store *msg;
struct P_retain chunk;
int rc;
char **split_topics;
@ -358,10 +341,10 @@ static int persist__retain_chunk_restore(FILE *db_fptr)
return rc;
}
HASH_FIND(hh, db.msg_store_load, &chunk.F.store_id, sizeof(dbid_t), load);
if(load){
if(sub__topic_tokenise(load->store->topic, &local_topic, &split_topics, NULL)) return 1;
retain__store(load->store->topic, load->store, split_topics);
HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), msg);
if(msg){
if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return 1;
retain__store(msg->topic, msg, split_topics);
mosquitto__free(local_topic);
mosquitto__free(split_topics);
}else{
@ -415,7 +398,6 @@ int persist__restore(void)
uint32_t chunk, length;
size_t rlen;
char *err;
struct mosquitto_msg_store_load *load, *load_tmp;
struct PF_cfg cfg_chunk;
assert(db.config);
@ -424,7 +406,7 @@ int persist__restore(void)
return MOSQ_ERR_SUCCESS;
}
db.msg_store_load = NULL;
db.msg_store = NULL;
fptr = mosquitto__fopen(db.config->persistence_filepath, "rb", false);
if(fptr == NULL) return MOSQ_ERR_SUCCESS;
@ -530,10 +512,6 @@ int persist__restore(void)
fclose(fptr);
HASH_ITER(hh, db.msg_store_load, load, load_tmp){
HASH_DELETE(hh, db.msg_store_load, load);
mosquitto__free(load);
}
return rc;
error:
err = strerror(errno);

@ -86,7 +86,7 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex
static int persist__message_store_save(FILE *db_fptr)
{
struct P_msg_store chunk;
struct mosquitto_msg_store *stored;
struct mosquitto_msg_store *stored, *stored_tmp;
int rc;
assert(db_fptr);
@ -94,16 +94,14 @@ static int persist__message_store_save(FILE *db_fptr)
memset(&chunk, 0, sizeof(struct P_msg_store));
stored = db.msg_store;
while(stored){
HASH_ITER(hh, db.msg_store, stored, stored_tmp){
if(stored->ref_count < 1 || stored->topic == NULL){
stored = stored->next;
continue;
}
if(!strncmp(stored->topic, "$SYS", 4)){
if(stored->ref_count <= 1 && stored->dest_id_count == 0){
/* $SYS messages that are only retained shouldn't be persisted. */
stored = stored->next;
continue;
}
/* Don't save $SYS messages as retained otherwise they can give
@ -150,7 +148,6 @@ static int persist__message_store_save(FILE *db_fptr)
if(rc){
return rc;
}
stored = stored->next;
}
return MOSQ_ERR_SUCCESS;

@ -89,7 +89,7 @@ int db__message_store(const struct mosquitto *source, struct mosquitto_msg_store
stored->db_id = store_id;
}
db.msg_store = stored;
HASH_ADD(hh, db.msg_store, db_id, sizeof(stored->db_id), stored);
return MOSQ_ERR_SUCCESS;
error:

Loading…
Cancel
Save