From 85a294922edae45fae0972b6915d05b124e11abf Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 20 Nov 2014 21:13:21 +0000 Subject: [PATCH] Protect stored messages from removal when queing. --- src/database.c | 8 +++++--- src/mosquitto_broker.h | 2 +- src/persist.c | 2 +- src/read_handle.c | 4 ++-- src/subs.c | 13 +++++++++++-- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/src/database.c b/src/database.c index c7a5504b..5cfdf094 100644 --- a/src/database.c +++ b/src/database.c @@ -197,7 +197,9 @@ static void _message_remove(struct mosquitto_db *db, struct mosquitto *context, return; } - mosquitto__db_msg_store_deref(db, &(*msg)->store); + if((*msg)->store){ + mosquitto__db_msg_store_deref(db, &(*msg)->store); + } if(last){ last->next = (*msg)->next; if(!last->next){ @@ -488,7 +490,7 @@ int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *cont } if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1; - return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, stored); + return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &stored); } int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id) @@ -754,7 +756,7 @@ int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context, * denied/dropped and is being processed so the client doesn't * keep resending it. That means we don't send it to other * clients. */ - if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, tail->store)){ + if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &tail->store)){ _message_remove(db, context, &tail, last); deleted = true; }else{ diff --git a/src/mosquitto_broker.h b/src/mosquitto_broker.h index 8cd022be..683c64dc 100644 --- a/src/mosquitto_broker.h +++ b/src/mosquitto_broker.h @@ -390,7 +390,7 @@ int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosqui int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); -int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); +int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored); int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); diff --git a/src/persist.c b/src/persist.c index e8f7fe19..30f78137 100644 --- a/src/persist.c +++ b/src/persist.c @@ -665,7 +665,7 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr) store_id = i64temp; HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); if(load){ - mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, load->store); + mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store); }else{ _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message."); return MOSQ_ERR_INVAL; diff --git a/src/read_handle.c b/src/read_handle.c index a156641d..ea316029 100644 --- a/src/read_handle.c +++ b/src/read_handle.c @@ -230,10 +230,10 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context) } switch(qos){ case 0: - if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1; + if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; break; case 1: - if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1; + if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1; if(_mosquitto_send_puback(context, mid)) rc = 1; break; case 2: diff --git a/src/subs.c b/src/subs.c index b7350c1d..4aaff864 100644 --- a/src/subs.c +++ b/src/subs.c @@ -510,7 +510,7 @@ int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const c return rc; } -int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored) +int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored) { int rc = 0; struct _mosquitto_subhier *subhier; @@ -521,6 +521,12 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons if(_sub_topic_tokenise(topic, &tokens)) return 1; + /* Protect this message until we have sent it to all + clients - this is required because websockets client calls + mqtt3_db_message_write(), which could remove the message if ref_count==0. + */ + (*stored)->ref_count++; + subhier = db->subs.children; while(subhier){ if(!strcmp(subhier->topic, tokens->topic)){ @@ -530,12 +536,15 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons */ _sub_add(db, NULL, 0, subhier, tokens); } - _sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true); + _sub_search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true); } subhier = subhier->next; } _sub_topic_tokens_free(tokens); + /* Remove our reference and free if needed. */ + mosquitto__db_msg_store_deref(db, stored); + return rc; }