diff --git a/src/database.c b/src/database.c index 9ea1287f..66965a00 100644 --- a/src/database.c +++ b/src/database.c @@ -596,28 +596,35 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, { struct mosquitto_msg_store *stored; char *source_id; - char *topic_heap; - mosquitto__payload_uhpa payload_uhpa; - mosquitto_property *local_properties = NULL; enum mosquitto_msg_origin origin; assert(db); - payload_uhpa.ptr = NULL; - if(!topic) return MOSQ_ERR_INVAL; - topic_heap = mosquitto__strdup(topic); - if(!topic_heap) return MOSQ_ERR_INVAL; + stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); + if(stored == NULL) return MOSQ_ERR_NOMEM; + stored->ref_count = 1; + + stored->topic = mosquitto__strdup(topic); + if(stored->topic == NULL){ + db__msg_store_free(stored); + return MOSQ_ERR_INVAL; + } + + stored->qos = qos; if(db->config->retain_available == false){ - retain = 0; + stored->retain = 0; + }else{ + stored->retain = retain; } - if(UHPA_ALLOC(payload_uhpa, payloadlen) == 0){ - mosquitto__free(topic_heap); + stored->payloadlen = payloadlen; + if(UHPA_ALLOC(stored->payload, stored->payloadlen) == 0){ + db__msg_store_free(stored); return MOSQ_ERR_NOMEM; } - memcpy(UHPA_ACCESS(payload_uhpa, payloadlen), payload, payloadlen); + memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), payload, stored->payloadlen); if(context && context->id){ source_id = context->id; @@ -625,7 +632,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, source_id = ""; } if(properties){ - local_properties = *properties; + stored->properties = *properties; *properties = NULL; } @@ -634,98 +641,61 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, }else{ origin = mosq_mo_broker; } - if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1; + if(db__message_store(db, context, stored, message_expiry_interval, 0, origin)) return 1; - return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored); + return sub__messages_queue(db, source_id, stored->topic, stored->qos, stored->retain, &stored); } /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */ -int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin) +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin) +//int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin) { - struct mosquitto_msg_store *temp = NULL; - int rc = MOSQ_ERR_SUCCESS; - assert(db); assert(stored); - temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); - if(!temp){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - rc = MOSQ_ERR_NOMEM; - goto error; - } - - temp->topic = NULL; - temp->payload.ptr = NULL; - - temp->ref_count = 0; if(source && source->id){ - temp->source_id = mosquitto__strdup(source->id); + stored->source_id = mosquitto__strdup(source->id); }else{ - temp->source_id = mosquitto__strdup(""); + stored->source_id = mosquitto__strdup(""); } - if(!temp->source_id){ + if(!stored->source_id){ log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - rc = MOSQ_ERR_NOMEM; - goto error; + db__msg_store_free(stored); + return MOSQ_ERR_NOMEM; } if(source && source->username){ - temp->source_username = mosquitto__strdup(source->username); - if(!temp->source_username){ - rc = MOSQ_ERR_NOMEM; - goto error; + stored->source_username = mosquitto__strdup(source->username); + if(!stored->source_username){ + db__msg_store_free(stored); + return MOSQ_ERR_NOMEM; } } if(source){ - temp->source_listener = source->listener; - } - temp->source_mid = source_mid; - temp->mid = 0; - temp->qos = qos; - temp->retain = retain; - temp->topic = topic; - topic = NULL; - temp->payloadlen = payloadlen; - temp->properties = properties; - temp->origin = origin; - if(payloadlen){ - UHPA_MOVE(temp->payload, *payload, payloadlen); - }else{ - temp->payload.ptr = NULL; + stored->source_listener = source->listener; } + stored->mid = 0; + stored->origin = origin; if(message_expiry_interval > 0){ - temp->message_expiry_time = time(NULL) + message_expiry_interval; + stored->message_expiry_time = time(NULL) + message_expiry_interval; }else{ - temp->message_expiry_time = 0; + stored->message_expiry_time = 0; } - temp->dest_ids = NULL; - temp->dest_id_count = 0; + stored->dest_ids = NULL; + stored->dest_id_count = 0; db->msg_store_count++; - db->msg_store_bytes += payloadlen; - (*stored) = temp; + db->msg_store_bytes += stored->payloadlen; if(!store_id){ - temp->db_id = ++db->last_db_id; + stored->db_id = ++db->last_db_id; }else{ - temp->db_id = store_id; + stored->db_id = store_id; } - db__msg_store_add(db, temp); + db__msg_store_add(db, stored); return MOSQ_ERR_SUCCESS; -error: - mosquitto__free(topic); - if(temp){ - mosquitto__free(temp->source_id); - mosquitto__free(temp->source_username); - mosquitto__free(temp->topic); - mosquitto__free(temp); - } - mosquitto_property_free_all(&properties); - UHPA_FREE(*payload, payloadlen); - return rc; } int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored) diff --git a/src/handle_publish.c b/src/handle_publish.c index 274bab5f..c56cbe8f 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -34,22 +34,18 @@ Contributors: int handle__publish(struct mosquitto_db *db, struct mosquitto *context) { - char *topic; - mosquitto__payload_uhpa payload; - uint32_t payloadlen; - uint8_t dup, qos, retain; - uint16_t mid = 0; + uint8_t dup; int rc = 0; int rc2; uint8_t header = context->in_packet.command; int res = 0; - struct mosquitto_msg_store *stored = NULL; + struct mosquitto_msg_store *msg, *stored = NULL; int len; int slen; char *topic_mount; mosquitto_property *properties = NULL; mosquitto_property *p, *p_prev; - mosquitto_property *msg_properties = NULL, *msg_properties_last; + mosquitto_property *msg_properties_last; uint32_t message_expiry_interval = 0; int topic_alias = -1; uint8_t reason_code = 0; @@ -58,43 +54,53 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) return MOSQ_ERR_PROTOCOL; } - payload.ptr = NULL; + msg = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); + if(msg == NULL){ + return MOSQ_ERR_NOMEM; + } + msg->ref_count = 1; dup = (header & 0x08)>>3; - qos = (header & 0x06)>>1; - if(qos == 3){ + msg->qos = (header & 0x06)>>1; + if(msg->qos == 3){ log__printf(NULL, MOSQ_LOG_INFO, "Invalid QoS in PUBLISH from %s, disconnecting.", context->id); + db__msg_store_free(msg); return 1; } - if(qos > context->maximum_qos){ + if(msg->qos > context->maximum_qos){ log__printf(NULL, MOSQ_LOG_INFO, "Too high QoS in PUBLISH from %s, disconnecting.", context->id); + db__msg_store_free(msg); return 1; } - retain = (header & 0x01); + msg->retain = (header & 0x01); - if(retain && db->config->retain_available == false){ + if(msg->retain && db->config->retain_available == false){ if(context->protocol == mosq_p_mqtt5){ send__disconnect(context, MQTT_RC_RETAIN_NOT_SUPPORTED, NULL); } + db__msg_store_free(msg); return 1; } - if(packet__read_string(&context->in_packet, &topic, &slen)) return 1; + if(packet__read_string(&context->in_packet, &msg->topic, &slen)){ + db__msg_store_free(msg); + return 1; + } if(!slen && context->protocol != mosq_p_mqtt5){ /* Invalid publish topic, disconnect client. */ - mosquitto__free(topic); + db__msg_store_free(msg); return 1; } - if(qos > 0){ - if(packet__read_uint16(&context->in_packet, &mid)){ - mosquitto__free(topic); + if(msg->qos > 0){ + if(packet__read_uint16(&context->in_packet, &msg->source_mid)){ + db__msg_store_free(msg); return 1; } - if(mid == 0){ - mosquitto__free(topic); + if(msg->source_mid == 0){ + db__msg_store_free(msg); return MOSQ_ERR_PROTOCOL; } } @@ -102,11 +108,14 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) /* Handle properties */ if(context->protocol == mosq_p_mqtt5){ rc = property__read_all(CMD_PUBLISH, &context->in_packet, &properties); - if(rc) return rc; + if(rc){ + db__msg_store_free(msg); + return rc; + } p = properties; p_prev = NULL; - msg_properties = NULL; + msg->properties = NULL; msg_properties_last = NULL; while(p){ switch(p->identifier){ @@ -115,11 +124,11 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: case MQTT_PROP_RESPONSE_TOPIC: case MQTT_PROP_USER_PROPERTY: - if(msg_properties){ + if(msg->properties){ msg_properties_last->next = p; msg_properties_last = p; }else{ - msg_properties = p; + msg->properties = p; msg_properties_last = p; } if(p_prev){ @@ -158,132 +167,127 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) mosquitto_property_free_all(&properties); if(topic_alias == 0 || (context->listener && topic_alias > context->listener->max_topic_alias)){ - mosquitto__free(topic); + db__msg_store_free(msg); send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL); return MOSQ_ERR_PROTOCOL; }else if(topic_alias > 0){ - if(topic){ - rc = alias__add(context, topic, topic_alias); + if(msg->topic){ + rc = alias__add(context, msg->topic, topic_alias); if(rc){ - mosquitto__free(topic); + db__msg_store_free(msg); return rc; } }else{ - rc = alias__find(context, &topic, topic_alias); + rc = alias__find(context, &msg->topic, topic_alias); if(rc){ send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL); - mosquitto__free(topic); + db__msg_store_free(msg); return rc; } } } #ifdef WITH_BRIDGE - rc = bridge__remap_topic_in(context, &topic); - if(rc) return rc; + rc = bridge__remap_topic_in(context, &msg->topic); + if(rc){ + db__msg_store_free(msg); + return rc; + } #endif - if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){ + if(mosquitto_pub_topic_check(msg->topic) != MOSQ_ERR_SUCCESS){ /* Invalid publish topic, just swallow it. */ - mosquitto__free(topic); + db__msg_store_free(msg); return 1; } - payloadlen = context->in_packet.remaining_length - context->in_packet.pos; - G_PUB_BYTES_RECEIVED_INC(payloadlen); + msg->payloadlen = context->in_packet.remaining_length - context->in_packet.pos; + G_PUB_BYTES_RECEIVED_INC(msg->payloadlen); if(context->listener && context->listener->mount_point){ - len = strlen(context->listener->mount_point) + strlen(topic) + 1; + len = strlen(context->listener->mount_point) + strlen(msg->topic) + 1; topic_mount = mosquitto__malloc(len+1); if(!topic_mount){ - mosquitto__free(topic); - mosquitto_property_free_all(&msg_properties); + db__msg_store_free(msg); return MOSQ_ERR_NOMEM; } - snprintf(topic_mount, len, "%s%s", context->listener->mount_point, topic); + snprintf(topic_mount, len, "%s%s", context->listener->mount_point, msg->topic); topic_mount[len] = '\0'; - mosquitto__free(topic); - topic = topic_mount; + mosquitto__free(msg->topic); + msg->topic = topic_mount; } - if(payloadlen){ - if(db->config->message_size_limit && payloadlen > db->config->message_size_limit){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen); + if(msg->payloadlen){ + if(db->config->message_size_limit && msg->payloadlen > db->config->message_size_limit){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen); reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC; goto process_bad_message; } - if(UHPA_ALLOC(payload, payloadlen) == 0){ - mosquitto__free(topic); - mosquitto_property_free_all(&msg_properties); + if(UHPA_ALLOC(msg->payload, msg->payloadlen) == 0){ + db__msg_store_free(msg); return MOSQ_ERR_NOMEM; } - if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(payload, payloadlen), payloadlen)){ - mosquitto__free(topic); - UHPA_FREE(payload, payloadlen); - mosquitto_property_free_all(&msg_properties); - return 1; + if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->payloadlen)){ + db__msg_store_free(msg); + return MOSQ_ERR_UNKNOWN; } } /* Check for topic access */ - rc = mosquitto_acl_check(db, context, topic, payloadlen, UHPA_ACCESS(payload, payloadlen), qos, retain, MOSQ_ACL_WRITE); + rc = mosquitto_acl_check(db, context, msg->topic, msg->payloadlen, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->qos, msg->retain, MOSQ_ACL_WRITE); if(rc == MOSQ_ERR_ACL_DENIED){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen); + log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen); reason_code = MQTT_RC_NOT_AUTHORIZED; goto process_bad_message; }else if(rc != MOSQ_ERR_SUCCESS){ - mosquitto__free(topic); - UHPA_FREE(payload, payloadlen); - mosquitto_property_free_all(&msg_properties); + db__msg_store_free(msg); return rc; } - log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen); - if(qos > 0){ - db__message_store_find(context, mid, &stored); + log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen); + if(msg->qos > 0){ + db__message_store_find(context, msg->source_mid, &stored); } if(!stored){ dup = 0; - if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0, mosq_mo_client)){ - mosquitto_property_free_all(&msg_properties); + if(db__message_store(db, context, msg, message_expiry_interval, 0, mosq_mo_client)){ return 1; } - msg_properties = NULL; /* Now belongs to db__message_store() */ + stored = msg; + msg = NULL; }else{ - mosquitto__free(topic); - topic = stored->topic; + db__msg_store_free(msg); + msg = NULL; dup = 1; - mosquitto_property_free_all(&msg_properties); - UHPA_FREE(payload, payloadlen); } - switch(qos){ + switch(stored->qos){ case 0: - rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored); + rc2 = sub__messages_queue(db, context->id, stored->topic, stored->qos, stored->retain, &stored); if(rc2 > 0) rc = 1; break; case 1: util__decrement_receive_quota(context); - rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored); + rc2 = sub__messages_queue(db, context->id, stored->topic, stored->qos, stored->retain, &stored); if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){ - if(send__puback(context, mid, 0, NULL)) rc = 1; + if(send__puback(context, stored->source_mid, 0, NULL)) rc = 1; }else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){ - if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1; + if(send__puback(context, stored->source_mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1; }else{ rc = rc2; } break; case 2: if(dup == 0){ - res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored, NULL); + res = db__message_insert(db, context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL); }else{ res = 0; } /* db__message_insert() returns 2 to indicate dropped message * due to queue. This isn't an error so don't disconnect them. */ if(!res){ - if(send__pubrec(context, mid, 0, NULL)) rc = 1; + if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1; }else if(res == 1){ rc = 1; } @@ -292,20 +296,25 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) return rc; process_bad_message: - mosquitto__free(topic); - UHPA_FREE(payload, payloadlen); - switch(qos){ - case 0: - return MOSQ_ERR_SUCCESS; - case 1: - return send__puback(context, mid, reason_code, NULL); - case 2: - if(context->protocol == mosq_p_mqtt5){ - return send__pubrec(context, mid, reason_code, NULL); - }else{ - return send__pubrec(context, mid, 0, NULL); - } + rc = 1; + if(msg){ + switch(msg->qos){ + case 0: + rc = MOSQ_ERR_SUCCESS; + break; + case 1: + rc = send__puback(context, msg->source_mid, reason_code, NULL); + break; + case 2: + if(context->protocol == mosq_p_mqtt5){ + rc = send__pubrec(context, msg->source_mid, reason_code, NULL); + }else{ + rc = send__pubrec(context, msg->source_mid, 0, NULL); + } + break; + } + db__msg_store_free(msg); } - return 1; + return rc; } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index f9d8aeca..f1fb4ebc 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -658,7 +658,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context); int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); -int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin); +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin); int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store); diff --git a/src/persist_read.c b/src/persist_read.c index 53ffdbad..553c3c78 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -291,10 +291,28 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp message_expiry_interval = 0; } - rc = db__message_store(db, &chunk.source, chunk.F.source_mid, - chunk.topic, chunk.F.qos, chunk.F.payloadlen, - &chunk.payload, chunk.F.retain, &stored, message_expiry_interval, - chunk.properties, chunk.F.store_id, mosq_mo_client); + stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); + if(stored == NULL){ + fclose(db_fptr); + mosquitto__free(chunk.source.id); + mosquitto__free(chunk.source.username); + mosquitto__free(chunk.topic); + UHPA_FREE(chunk.payload, chunk.F.payloadlen); + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; + } + stored->ref_count = 1; + + stored->source_mid = chunk.F.source_mid; + stored->topic = chunk.topic; + stored->qos = chunk.F.qos; + stored->payloadlen = chunk.F.payloadlen; + stored->retain = chunk.F.retain; + stored->properties = chunk.properties; + UHPA_MOVE(stored->payload, chunk.payload, stored->payloadlen); + + rc = db__message_store(db, &chunk.source, stored, message_expiry_interval, + chunk.F.store_id, mosq_mo_client); mosquitto__free(chunk.source.id); mosquitto__free(chunk.source.username); diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index 5471f034..ae61f5ce 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -27,16 +27,27 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) return m; } -int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin) +void db__msg_store_free(struct mosquitto_msg_store *store) { - struct mosquitto_msg_store *temp = NULL; - int rc = MOSQ_ERR_SUCCESS; + int i; + + mosquitto__free(store->source_id); + mosquitto__free(store->source_username); + if(store->dest_ids){ + for(i=0; idest_id_count; i++){ + mosquitto__free(store->dest_ids[i]); + } + mosquitto__free(store->dest_ids); + } + mosquitto__free(store->topic); + mosquitto_property_free_all(&store->properties); + UHPA_FREE_PAYLOAD(store); + mosquitto__free(store); +} - temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store)); - if(!temp){ - rc = MOSQ_ERR_NOMEM; - goto error; - } +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, struct mosquitto_msg_store *temp, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin) +{ + int rc = MOSQ_ERR_SUCCESS; if(source && source->id){ temp->source_id = mosquitto__strdup(source->id); @@ -58,19 +69,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u if(source){ temp->source_listener = source->listener; } - temp->source_mid = source_mid; temp->mid = 0; - temp->qos = qos; - temp->retain = retain; - temp->topic = topic; - topic = NULL; - temp->payloadlen = payloadlen; - temp->properties = properties; - if(payloadlen){ - UHPA_MOVE(temp->payload, *payload, payloadlen); - }else{ - temp->payload.ptr = NULL; - } if(message_expiry_interval > 0){ temp->message_expiry_time = time(NULL) + message_expiry_interval; }else{ @@ -80,7 +79,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u temp->dest_ids = NULL; temp->dest_id_count = 0; db->msg_store_count++; - db->msg_store_bytes += payloadlen; + db->msg_store_bytes += temp->payloadlen; (*stored) = temp; if(!store_id){ @@ -93,14 +92,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u return MOSQ_ERR_SUCCESS; error: - mosquitto__free(topic); - if(temp){ - mosquitto__free(temp->source_id); - mosquitto__free(temp->source_username); - mosquitto__free(temp->topic); - mosquitto__free(temp); - } - UHPA_FREE(*payload, payloadlen); + db__msg_store_free(temp); return rc; }