Simplify db__message_store() interface.

pull/1753/head
Roger A. Light 5 years ago
parent 3e595d557e
commit 6deb417804

@ -596,28 +596,35 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
{
struct mosquitto_msg_store *stored;
char *source_id;
char *topic_heap;
mosquitto__payload_uhpa payload_uhpa;
mosquitto_property *local_properties = NULL;
enum mosquitto_msg_origin origin;
assert(db);
payload_uhpa.ptr = NULL;
if(!topic) return MOSQ_ERR_INVAL;
topic_heap = mosquitto__strdup(topic);
if(!topic_heap) return MOSQ_ERR_INVAL;
stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(stored == NULL) return MOSQ_ERR_NOMEM;
stored->ref_count = 1;
stored->topic = mosquitto__strdup(topic);
if(stored->topic == NULL){
db__msg_store_free(stored);
return MOSQ_ERR_INVAL;
}
stored->qos = qos;
if(db->config->retain_available == false){
retain = 0;
stored->retain = 0;
}else{
stored->retain = retain;
}
if(UHPA_ALLOC(payload_uhpa, payloadlen) == 0){
mosquitto__free(topic_heap);
stored->payloadlen = payloadlen;
if(UHPA_ALLOC(stored->payload, stored->payloadlen) == 0){
db__msg_store_free(stored);
return MOSQ_ERR_NOMEM;
}
memcpy(UHPA_ACCESS(payload_uhpa, payloadlen), payload, payloadlen);
memcpy(UHPA_ACCESS(stored->payload, stored->payloadlen), payload, stored->payloadlen);
if(context && context->id){
source_id = context->id;
@ -625,7 +632,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
source_id = "";
}
if(properties){
local_properties = *properties;
stored->properties = *properties;
*properties = NULL;
}
@ -634,98 +641,61 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
}else{
origin = mosq_mo_broker;
}
if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1;
if(db__message_store(db, context, stored, message_expiry_interval, 0, origin)) return 1;
return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
return sub__messages_queue(db, source_id, stored->topic, stored->qos, stored->retain, &stored);
}
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin)
//int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
assert(db);
assert(stored);
temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(!temp){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
rc = MOSQ_ERR_NOMEM;
goto error;
}
temp->topic = NULL;
temp->payload.ptr = NULL;
temp->ref_count = 0;
if(source && source->id){
temp->source_id = mosquitto__strdup(source->id);
stored->source_id = mosquitto__strdup(source->id);
}else{
temp->source_id = mosquitto__strdup("");
stored->source_id = mosquitto__strdup("");
}
if(!temp->source_id){
if(!stored->source_id){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
rc = MOSQ_ERR_NOMEM;
goto error;
db__msg_store_free(stored);
return MOSQ_ERR_NOMEM;
}
if(source && source->username){
temp->source_username = mosquitto__strdup(source->username);
if(!temp->source_username){
rc = MOSQ_ERR_NOMEM;
goto error;
stored->source_username = mosquitto__strdup(source->username);
if(!stored->source_username){
db__msg_store_free(stored);
return MOSQ_ERR_NOMEM;
}
}
if(source){
temp->source_listener = source->listener;
}
temp->source_mid = source_mid;
temp->mid = 0;
temp->qos = qos;
temp->retain = retain;
temp->topic = topic;
topic = NULL;
temp->payloadlen = payloadlen;
temp->properties = properties;
temp->origin = origin;
if(payloadlen){
UHPA_MOVE(temp->payload, *payload, payloadlen);
}else{
temp->payload.ptr = NULL;
stored->source_listener = source->listener;
}
stored->mid = 0;
stored->origin = origin;
if(message_expiry_interval > 0){
temp->message_expiry_time = time(NULL) + message_expiry_interval;
stored->message_expiry_time = time(NULL) + message_expiry_interval;
}else{
temp->message_expiry_time = 0;
stored->message_expiry_time = 0;
}
temp->dest_ids = NULL;
temp->dest_id_count = 0;
stored->dest_ids = NULL;
stored->dest_id_count = 0;
db->msg_store_count++;
db->msg_store_bytes += payloadlen;
(*stored) = temp;
db->msg_store_bytes += stored->payloadlen;
if(!store_id){
temp->db_id = ++db->last_db_id;
stored->db_id = ++db->last_db_id;
}else{
temp->db_id = store_id;
stored->db_id = store_id;
}
db__msg_store_add(db, temp);
db__msg_store_add(db, stored);
return MOSQ_ERR_SUCCESS;
error:
mosquitto__free(topic);
if(temp){
mosquitto__free(temp->source_id);
mosquitto__free(temp->source_username);
mosquitto__free(temp->topic);
mosquitto__free(temp);
}
mosquitto_property_free_all(&properties);
UHPA_FREE(*payload, payloadlen);
return rc;
}
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)

@ -34,22 +34,18 @@ Contributors:
int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
{
char *topic;
mosquitto__payload_uhpa payload;
uint32_t payloadlen;
uint8_t dup, qos, retain;
uint16_t mid = 0;
uint8_t dup;
int rc = 0;
int rc2;
uint8_t header = context->in_packet.command;
int res = 0;
struct mosquitto_msg_store *stored = NULL;
struct mosquitto_msg_store *msg, *stored = NULL;
int len;
int slen;
char *topic_mount;
mosquitto_property *properties = NULL;
mosquitto_property *p, *p_prev;
mosquitto_property *msg_properties = NULL, *msg_properties_last;
mosquitto_property *msg_properties_last;
uint32_t message_expiry_interval = 0;
int topic_alias = -1;
uint8_t reason_code = 0;
@ -58,43 +54,53 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
return MOSQ_ERR_PROTOCOL;
}
payload.ptr = NULL;
msg = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(msg == NULL){
return MOSQ_ERR_NOMEM;
}
msg->ref_count = 1;
dup = (header & 0x08)>>3;
qos = (header & 0x06)>>1;
if(qos == 3){
msg->qos = (header & 0x06)>>1;
if(msg->qos == 3){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid QoS in PUBLISH from %s, disconnecting.", context->id);
db__msg_store_free(msg);
return 1;
}
if(qos > context->maximum_qos){
if(msg->qos > context->maximum_qos){
log__printf(NULL, MOSQ_LOG_INFO,
"Too high QoS in PUBLISH from %s, disconnecting.", context->id);
db__msg_store_free(msg);
return 1;
}
retain = (header & 0x01);
msg->retain = (header & 0x01);
if(retain && db->config->retain_available == false){
if(msg->retain && db->config->retain_available == false){
if(context->protocol == mosq_p_mqtt5){
send__disconnect(context, MQTT_RC_RETAIN_NOT_SUPPORTED, NULL);
}
db__msg_store_free(msg);
return 1;
}
if(packet__read_string(&context->in_packet, &topic, &slen)) return 1;
if(packet__read_string(&context->in_packet, &msg->topic, &slen)){
db__msg_store_free(msg);
return 1;
}
if(!slen && context->protocol != mosq_p_mqtt5){
/* Invalid publish topic, disconnect client. */
mosquitto__free(topic);
db__msg_store_free(msg);
return 1;
}
if(qos > 0){
if(packet__read_uint16(&context->in_packet, &mid)){
mosquitto__free(topic);
if(msg->qos > 0){
if(packet__read_uint16(&context->in_packet, &msg->source_mid)){
db__msg_store_free(msg);
return 1;
}
if(mid == 0){
mosquitto__free(topic);
if(msg->source_mid == 0){
db__msg_store_free(msg);
return MOSQ_ERR_PROTOCOL;
}
}
@ -102,11 +108,14 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
/* Handle properties */
if(context->protocol == mosq_p_mqtt5){
rc = property__read_all(CMD_PUBLISH, &context->in_packet, &properties);
if(rc) return rc;
if(rc){
db__msg_store_free(msg);
return rc;
}
p = properties;
p_prev = NULL;
msg_properties = NULL;
msg->properties = NULL;
msg_properties_last = NULL;
while(p){
switch(p->identifier){
@ -115,11 +124,11 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
case MQTT_PROP_RESPONSE_TOPIC:
case MQTT_PROP_USER_PROPERTY:
if(msg_properties){
if(msg->properties){
msg_properties_last->next = p;
msg_properties_last = p;
}else{
msg_properties = p;
msg->properties = p;
msg_properties_last = p;
}
if(p_prev){
@ -158,132 +167,127 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
mosquitto_property_free_all(&properties);
if(topic_alias == 0 || (context->listener && topic_alias > context->listener->max_topic_alias)){
mosquitto__free(topic);
db__msg_store_free(msg);
send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL);
return MOSQ_ERR_PROTOCOL;
}else if(topic_alias > 0){
if(topic){
rc = alias__add(context, topic, topic_alias);
if(msg->topic){
rc = alias__add(context, msg->topic, topic_alias);
if(rc){
mosquitto__free(topic);
db__msg_store_free(msg);
return rc;
}
}else{
rc = alias__find(context, &topic, topic_alias);
rc = alias__find(context, &msg->topic, topic_alias);
if(rc){
send__disconnect(context, MQTT_RC_TOPIC_ALIAS_INVALID, NULL);
mosquitto__free(topic);
db__msg_store_free(msg);
return rc;
}
}
}
#ifdef WITH_BRIDGE
rc = bridge__remap_topic_in(context, &topic);
if(rc) return rc;
rc = bridge__remap_topic_in(context, &msg->topic);
if(rc){
db__msg_store_free(msg);
return rc;
}
#endif
if(mosquitto_pub_topic_check(topic) != MOSQ_ERR_SUCCESS){
if(mosquitto_pub_topic_check(msg->topic) != MOSQ_ERR_SUCCESS){
/* Invalid publish topic, just swallow it. */
mosquitto__free(topic);
db__msg_store_free(msg);
return 1;
}
payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
G_PUB_BYTES_RECEIVED_INC(payloadlen);
msg->payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
G_PUB_BYTES_RECEIVED_INC(msg->payloadlen);
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + strlen(topic) + 1;
len = strlen(context->listener->mount_point) + strlen(msg->topic) + 1;
topic_mount = mosquitto__malloc(len+1);
if(!topic_mount){
mosquitto__free(topic);
mosquitto_property_free_all(&msg_properties);
db__msg_store_free(msg);
return MOSQ_ERR_NOMEM;
}
snprintf(topic_mount, len, "%s%s", context->listener->mount_point, topic);
snprintf(topic_mount, len, "%s%s", context->listener->mount_point, msg->topic);
topic_mount[len] = '\0';
mosquitto__free(topic);
topic = topic_mount;
mosquitto__free(msg->topic);
msg->topic = topic_mount;
}
if(payloadlen){
if(db->config->message_size_limit && payloadlen > db->config->message_size_limit){
log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
if(msg->payloadlen){
if(db->config->message_size_limit && msg->payloadlen > db->config->message_size_limit){
log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC;
goto process_bad_message;
}
if(UHPA_ALLOC(payload, payloadlen) == 0){
mosquitto__free(topic);
mosquitto_property_free_all(&msg_properties);
if(UHPA_ALLOC(msg->payload, msg->payloadlen) == 0){
db__msg_store_free(msg);
return MOSQ_ERR_NOMEM;
}
if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(payload, payloadlen), payloadlen)){
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
mosquitto_property_free_all(&msg_properties);
return 1;
if(packet__read_bytes(&context->in_packet, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->payloadlen)){
db__msg_store_free(msg);
return MOSQ_ERR_UNKNOWN;
}
}
/* Check for topic access */
rc = mosquitto_acl_check(db, context, topic, payloadlen, UHPA_ACCESS(payload, payloadlen), qos, retain, MOSQ_ACL_WRITE);
rc = mosquitto_acl_check(db, context, msg->topic, msg->payloadlen, UHPA_ACCESS(msg->payload, msg->payloadlen), msg->qos, msg->retain, MOSQ_ACL_WRITE);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
reason_code = MQTT_RC_NOT_AUTHORIZED;
goto process_bad_message;
}else if(rc != MOSQ_ERR_SUCCESS){
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
mosquitto_property_free_all(&msg_properties);
db__msg_store_free(msg);
return rc;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, qos, retain, mid, topic, (long)payloadlen);
if(qos > 0){
db__message_store_find(context, mid, &stored);
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
if(msg->qos > 0){
db__message_store_find(context, msg->source_mid, &stored);
}
if(!stored){
dup = 0;
if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0, mosq_mo_client)){
mosquitto_property_free_all(&msg_properties);
if(db__message_store(db, context, msg, message_expiry_interval, 0, mosq_mo_client)){
return 1;
}
msg_properties = NULL; /* Now belongs to db__message_store() */
stored = msg;
msg = NULL;
}else{
mosquitto__free(topic);
topic = stored->topic;
db__msg_store_free(msg);
msg = NULL;
dup = 1;
mosquitto_property_free_all(&msg_properties);
UHPA_FREE(payload, payloadlen);
}
switch(qos){
switch(stored->qos){
case 0:
rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored);
rc2 = sub__messages_queue(db, context->id, stored->topic, stored->qos, stored->retain, &stored);
if(rc2 > 0) rc = 1;
break;
case 1:
util__decrement_receive_quota(context);
rc2 = sub__messages_queue(db, context->id, topic, qos, retain, &stored);
rc2 = sub__messages_queue(db, context->id, stored->topic, stored->qos, stored->retain, &stored);
if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){
if(send__puback(context, mid, 0, NULL)) rc = 1;
if(send__puback(context, stored->source_mid, 0, NULL)) rc = 1;
}else if(rc2 == MOSQ_ERR_NO_SUBSCRIBERS){
if(send__puback(context, mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1;
if(send__puback(context, stored->source_mid, MQTT_RC_NO_MATCHING_SUBSCRIBERS, NULL)) rc = 1;
}else{
rc = rc2;
}
break;
case 2:
if(dup == 0){
res = db__message_insert(db, context, mid, mosq_md_in, qos, retain, stored, NULL);
res = db__message_insert(db, context, stored->source_mid, mosq_md_in, stored->qos, stored->retain, stored, NULL);
}else{
res = 0;
}
/* db__message_insert() returns 2 to indicate dropped message
* due to queue. This isn't an error so don't disconnect them. */
if(!res){
if(send__pubrec(context, mid, 0, NULL)) rc = 1;
if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1;
}else if(res == 1){
rc = 1;
}
@ -292,20 +296,25 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context)
return rc;
process_bad_message:
mosquitto__free(topic);
UHPA_FREE(payload, payloadlen);
switch(qos){
case 0:
return MOSQ_ERR_SUCCESS;
case 1:
return send__puback(context, mid, reason_code, NULL);
case 2:
if(context->protocol == mosq_p_mqtt5){
return send__pubrec(context, mid, reason_code, NULL);
}else{
return send__pubrec(context, mid, 0, NULL);
}
rc = 1;
if(msg){
switch(msg->qos){
case 0:
rc = MOSQ_ERR_SUCCESS;
break;
case 1:
rc = send__puback(context, msg->source_mid, reason_code, NULL);
break;
case 2:
if(context->protocol == mosq_p_mqtt5){
rc = send__pubrec(context, msg->source_mid, reason_code, NULL);
}else{
rc = send__pubrec(context, msg->source_mid, 0, NULL);
}
break;
}
db__msg_store_free(msg);
}
return 1;
return rc;
}

@ -658,7 +658,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context);
void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);
int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context);
int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties);
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin);
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, struct mosquitto_msg_store *stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin);
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored);
void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store);
void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store);

@ -291,10 +291,28 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp
message_expiry_interval = 0;
}
rc = db__message_store(db, &chunk.source, chunk.F.source_mid,
chunk.topic, chunk.F.qos, chunk.F.payloadlen,
&chunk.payload, chunk.F.retain, &stored, message_expiry_interval,
chunk.properties, chunk.F.store_id, mosq_mo_client);
stored = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(stored == NULL){
fclose(db_fptr);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);
mosquitto__free(chunk.topic);
UHPA_FREE(chunk.payload, chunk.F.payloadlen);
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
stored->ref_count = 1;
stored->source_mid = chunk.F.source_mid;
stored->topic = chunk.topic;
stored->qos = chunk.F.qos;
stored->payloadlen = chunk.F.payloadlen;
stored->retain = chunk.F.retain;
stored->properties = chunk.properties;
UHPA_MOVE(stored->payload, chunk.payload, stored->payloadlen);
rc = db__message_store(db, &chunk.source, stored, message_expiry_interval,
chunk.F.store_id, mosq_mo_client);
mosquitto__free(chunk.source.id);
mosquitto__free(chunk.source.username);

@ -27,16 +27,27 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock)
return m;
}
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
void db__msg_store_free(struct mosquitto_msg_store *store)
{
struct mosquitto_msg_store *temp = NULL;
int rc = MOSQ_ERR_SUCCESS;
int i;
mosquitto__free(store->source_id);
mosquitto__free(store->source_username);
if(store->dest_ids){
for(i=0; i<store->dest_id_count; i++){
mosquitto__free(store->dest_ids[i]);
}
mosquitto__free(store->dest_ids);
}
mosquitto__free(store->topic);
mosquitto_property_free_all(&store->properties);
UHPA_FREE_PAYLOAD(store);
mosquitto__free(store);
}
temp = mosquitto__calloc(1, sizeof(struct mosquitto_msg_store));
if(!temp){
rc = MOSQ_ERR_NOMEM;
goto error;
}
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, struct mosquitto_msg_store *temp, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, dbid_t store_id, enum mosquitto_msg_origin origin)
{
int rc = MOSQ_ERR_SUCCESS;
if(source && source->id){
temp->source_id = mosquitto__strdup(source->id);
@ -58,19 +69,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u
if(source){
temp->source_listener = source->listener;
}
temp->source_mid = source_mid;
temp->mid = 0;
temp->qos = qos;
temp->retain = retain;
temp->topic = topic;
topic = NULL;
temp->payloadlen = payloadlen;
temp->properties = properties;
if(payloadlen){
UHPA_MOVE(temp->payload, *payload, payloadlen);
}else{
temp->payload.ptr = NULL;
}
if(message_expiry_interval > 0){
temp->message_expiry_time = time(NULL) + message_expiry_interval;
}else{
@ -80,7 +79,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u
temp->dest_ids = NULL;
temp->dest_id_count = 0;
db->msg_store_count++;
db->msg_store_bytes += payloadlen;
db->msg_store_bytes += temp->payloadlen;
(*stored) = temp;
if(!store_id){
@ -93,14 +92,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u
return MOSQ_ERR_SUCCESS;
error:
mosquitto__free(topic);
if(temp){
mosquitto__free(temp->source_id);
mosquitto__free(temp->source_username);
mosquitto__free(temp->topic);
mosquitto__free(temp);
}
UHPA_FREE(*payload, payloadlen);
db__msg_store_free(temp);
return rc;
}

Loading…
Cancel
Save