Protect stored messages from removal when queing.

pull/211/merge
Roger A. Light 11 years ago
parent 5b6f6976c7
commit 85a294922e

@ -197,7 +197,9 @@ static void _message_remove(struct mosquitto_db *db, struct mosquitto *context,
return; return;
} }
mosquitto__db_msg_store_deref(db, &(*msg)->store); if((*msg)->store){
mosquitto__db_msg_store_deref(db, &(*msg)->store);
}
if(last){ if(last){
last->next = (*msg)->next; last->next = (*msg)->next;
if(!last->next){ if(!last->next){
@ -488,7 +490,7 @@ int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *cont
} }
if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1; if(mqtt3_db_message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1;
return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, stored); return mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &stored);
} }
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id) int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
@ -754,7 +756,7 @@ int mqtt3_db_message_release(struct mosquitto_db *db, struct mosquitto *context,
* denied/dropped and is being processed so the client doesn't * denied/dropped and is being processed so the client doesn't
* keep resending it. That means we don't send it to other * keep resending it. That means we don't send it to other
* clients. */ * clients. */
if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, tail->store)){ if(!topic || !mqtt3_db_messages_queue(db, source_id, topic, qos, retain, &tail->store)){
_message_remove(db, context, &tail, last); _message_remove(db, context, &tail, last);
deleted = true; deleted = true;
}else{ }else{

@ -390,7 +390,7 @@ int mqtt3_db_message_update(struct mosquitto *context, uint16_t mid, enum mosqui
int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_message_write(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context); int mqtt3_db_messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain); int mqtt3_db_messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain);
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored); int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored);
int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id); int mqtt3_db_message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id);
int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); int mqtt3_db_message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); void mosquitto__db_msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);

@ -665,7 +665,7 @@ static int _db_retain_chunk_restore(struct mosquitto_db *db, FILE *db_fptr)
store_id = i64temp; store_id = i64temp;
HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load); HASH_FIND(hh, db->msg_store_load, &store_id, sizeof(dbid_t), load);
if(load){ if(load){
mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, load->store); mqtt3_db_messages_queue(db, NULL, load->store->topic, load->store->qos, load->store->retain, &load->store);
}else{ }else{
_mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message."); _mosquitto_log_printf(NULL, MOSQ_LOG_ERR, "Error: Corrupt database whilst restoring a retained message.");
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;

@ -230,10 +230,10 @@ int mqtt3_handle_publish(struct mosquitto_db *db, struct mosquitto *context)
} }
switch(qos){ switch(qos){
case 0: case 0:
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1; if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
break; break;
case 1: case 1:
if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, stored)) rc = 1; if(mqtt3_db_messages_queue(db, context->id, topic, qos, retain, &stored)) rc = 1;
if(_mosquitto_send_puback(context, mid)) rc = 1; if(_mosquitto_send_puback(context, mid)) rc = 1;
break; break;
case 2: case 2:

@ -510,7 +510,7 @@ int mqtt3_sub_remove(struct mosquitto_db *db, struct mosquitto *context, const c
return rc; return rc;
} }
int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store *stored) int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, const char *topic, int qos, int retain, struct mosquitto_msg_store **stored)
{ {
int rc = 0; int rc = 0;
struct _mosquitto_subhier *subhier; struct _mosquitto_subhier *subhier;
@ -521,6 +521,12 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons
if(_sub_topic_tokenise(topic, &tokens)) return 1; if(_sub_topic_tokenise(topic, &tokens)) return 1;
/* Protect this message until we have sent it to all
clients - this is required because websockets client calls
mqtt3_db_message_write(), which could remove the message if ref_count==0.
*/
(*stored)->ref_count++;
subhier = db->subs.children; subhier = db->subs.children;
while(subhier){ while(subhier){
if(!strcmp(subhier->topic, tokens->topic)){ if(!strcmp(subhier->topic, tokens->topic)){
@ -530,12 +536,15 @@ int mqtt3_db_messages_queue(struct mosquitto_db *db, const char *source_id, cons
*/ */
_sub_add(db, NULL, 0, subhier, tokens); _sub_add(db, NULL, 0, subhier, tokens);
} }
_sub_search(db, subhier, tokens, source_id, topic, qos, retain, stored, true); _sub_search(db, subhier, tokens, source_id, topic, qos, retain, *stored, true);
} }
subhier = subhier->next; subhier = subhier->next;
} }
_sub_topic_tokens_free(tokens); _sub_topic_tokens_free(tokens);
/* Remove our reference and free if needed. */
mosquitto__db_msg_store_deref(db, stored);
return rc; return rc;
} }

Loading…
Cancel
Save