diff --git a/src/database.c b/src/database.c index 66965a00..476c3dd1 100644 --- a/src/database.c +++ b/src/database.c @@ -604,7 +604,6 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); if(stored == NULL) return MOSQ_ERR_NOMEM; - stored->ref_count = 1; stored->topic = mosquitto__strdup(topic); if(stored->topic == NULL){ diff --git a/src/handle_publish.c b/src/handle_publish.c index c56cbe8f..bd45aab9 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -49,6 +49,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) uint32_t message_expiry_interval = 0; int topic_alias = -1; uint8_t reason_code = 0; + uint16_t mid = 0; if(context->state != mosq_cs_active){ return MOSQ_ERR_PROTOCOL; @@ -58,7 +59,6 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) if(msg == NULL){ return MOSQ_ERR_NOMEM; } - msg->ref_count = 1; dup = (header & 0x08)>>3; msg->qos = (header & 0x06)>>1; @@ -95,14 +95,17 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) } if(msg->qos > 0){ - if(packet__read_uint16(&context->in_packet, &msg->source_mid)){ + if(packet__read_uint16(&context->in_packet, &mid)){ db__msg_store_free(msg); return 1; } - if(msg->source_mid == 0){ + if(mid == 0){ db__msg_store_free(msg); return MOSQ_ERR_PROTOCOL; } + /* It is important to have a separate copy of mid, because msg may be + * freed before we want to send a PUBACK/PUBREC. */ + msg->source_mid = mid; } /* Handle properties */ @@ -270,10 +273,11 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) case 1: util__decrement_receive_quota(context); rc2 = sub__messages_queue(db, context->id, stored->topic, stored->qos, stored->retain, &stored); + /* stored may now be free, so don't refer to it */ if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){ - if(send__puback(context, stored->source_mid, 0, NULL)) rc = 1; + if(send__puback(context, mid, 0, NULL)) rc = 1; }else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){ - if(send__puback(context, stored->source_mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1; + if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1; }else{ rc = rc2; } diff --git a/src/persist_read.c b/src/persist_read.c index 553c3c78..c270086a 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -301,7 +301,6 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); return MOSQ_ERR_NOMEM; } - stored->ref_count = 1; stored->source_mid = chunk.F.source_mid; stored->topic = chunk.topic;