|
|
|
@ -580,6 +580,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
|
|
|
|
|
char *topic_heap;
|
|
|
|
|
mosquitto__payload_uhpa payload_uhpa;
|
|
|
|
|
mosquitto_property *local_properties = NULL;
|
|
|
|
|
enum mosquitto_msg_origin origin;
|
|
|
|
|
|
|
|
|
|
assert(db);
|
|
|
|
|
|
|
|
|
@ -608,13 +609,19 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
|
|
|
|
|
local_properties = *properties;
|
|
|
|
|
*properties = NULL;
|
|
|
|
|
}
|
|
|
|
|
if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0)) return 1;
|
|
|
|
|
|
|
|
|
|
if(context){
|
|
|
|
|
origin = mosq_mo_client;
|
|
|
|
|
}else{
|
|
|
|
|
origin = mosq_mo_broker;
|
|
|
|
|
}
|
|
|
|
|
if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1;
|
|
|
|
|
|
|
|
|
|
return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */
|
|
|
|
|
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id)
|
|
|
|
|
int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_msg_store *temp = NULL;
|
|
|
|
|
int rc = MOSQ_ERR_SUCCESS;
|
|
|
|
@ -662,6 +669,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u
|
|
|
|
|
topic = NULL;
|
|
|
|
|
temp->payloadlen = payloadlen;
|
|
|
|
|
temp->properties = properties;
|
|
|
|
|
temp->origin = origin;
|
|
|
|
|
if(payloadlen){
|
|
|
|
|
UHPA_MOVE(temp->payload, *payload, payloadlen);
|
|
|
|
|
}else{
|
|
|
|
|