|
|
|
@ -487,30 +487,41 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context,
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_msg_store *stored;
|
|
|
|
|
char *source_id;
|
|
|
|
|
char *topic_heap;
|
|
|
|
|
|
|
|
|
|
assert(db);
|
|
|
|
|
|
|
|
|
|
if(!topic) return MOSQ_ERR_INVAL;
|
|
|
|
|
topic_heap = mosquitto__strdup(topic);
|
|
|
|
|
if(!topic_heap) return MOSQ_ERR_INVAL;
|
|
|
|
|
|
|
|
|
|
if(context && context->id){
|
|
|
|
|
source_id = context->id;
|
|
|
|
|
}else{
|
|
|
|
|
source_id = "";
|
|
|
|
|
}
|
|
|
|
|
if(db__message_store(db, source_id, 0, topic, qos, payloadlen, payload, retain, &stored, 0)) return 1;
|
|
|
|
|
if(db__message_store(db, source_id, 0, topic_heap, qos, payloadlen, payload, retain, &stored, 0)) return 1;
|
|
|
|
|
|
|
|
|
|
return sub__messages_queue(db, source_id, topic, qos, retain, &stored);
|
|
|
|
|
return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
|
|
|
|
|
/* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. */
|
|
|
|
|
int db__message_store(struct mosquitto_db *db, const char *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, struct mosquitto_msg_store **stored, dbid_t store_id)
|
|
|
|
|
{
|
|
|
|
|
struct mosquitto_msg_store *temp;
|
|
|
|
|
struct mosquitto_msg_store *temp = NULL;
|
|
|
|
|
int rc = MOSQ_ERR_SUCCESS;
|
|
|
|
|
|
|
|
|
|
assert(db);
|
|
|
|
|
assert(stored);
|
|
|
|
|
|
|
|
|
|
temp = mosquitto__malloc(sizeof(struct mosquitto_msg_store));
|
|
|
|
|
if(!temp) return MOSQ_ERR_NOMEM;
|
|
|
|
|
if(!temp){
|
|
|
|
|
rc = MOSQ_ERR_NOMEM;
|
|
|
|
|
goto error;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
temp->topic = NULL;
|
|
|
|
|
temp->payload.ptr = NULL;
|
|
|
|
|
|
|
|
|
|
temp->ref_count = 0;
|
|
|
|
|
if(source){
|
|
|
|
@ -519,46 +530,27 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
|
|
|
|
|
temp->source_id = mosquitto__strdup("");
|
|
|
|
|
}
|
|
|
|
|
if(!temp->source_id){
|
|
|
|
|
mosquitto__free(temp);
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
|
rc = MOSQ_ERR_NOMEM;
|
|
|
|
|
goto error;
|
|
|
|
|
}
|
|
|
|
|
temp->source_mid = source_mid;
|
|
|
|
|
temp->mid = 0;
|
|
|
|
|
temp->qos = qos;
|
|
|
|
|
temp->retain = retain;
|
|
|
|
|
if(topic){
|
|
|
|
|
temp->topic = mosquitto__strdup(topic);
|
|
|
|
|
if(!temp->topic){
|
|
|
|
|
mosquitto__free(temp->source_id);
|
|
|
|
|
mosquitto__free(temp);
|
|
|
|
|
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
|
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
|
}
|
|
|
|
|
}else{
|
|
|
|
|
temp->topic = NULL;
|
|
|
|
|
}
|
|
|
|
|
temp->topic = topic;
|
|
|
|
|
topic = NULL;
|
|
|
|
|
temp->payloadlen = payloadlen;
|
|
|
|
|
if(payloadlen){
|
|
|
|
|
UHPA_ALLOC_PAYLOAD(temp);
|
|
|
|
|
if(!UHPA_ACCESS_PAYLOAD(temp)){
|
|
|
|
|
if(temp->source_id) mosquitto__free(temp->source_id);
|
|
|
|
|
if(temp->topic) mosquitto__free(temp->topic);
|
|
|
|
|
mosquitto__free(temp);
|
|
|
|
|
return MOSQ_ERR_NOMEM;
|
|
|
|
|
if(UHPA_ALLOC_PAYLOAD(temp) == 0){
|
|
|
|
|
rc = MOSQ_ERR_NOMEM;
|
|
|
|
|
goto error;
|
|
|
|
|
}
|
|
|
|
|
memcpy(UHPA_ACCESS_PAYLOAD(temp), payload, sizeof(char)*payloadlen);
|
|
|
|
|
}else{
|
|
|
|
|
temp->payload.ptr = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if(!temp->source_id || (payloadlen && !UHPA_ACCESS_PAYLOAD(temp))){
|
|
|
|
|
if(temp->source_id) mosquitto__free(temp->source_id);
|
|
|
|
|
if(temp->topic) mosquitto__free(temp->topic);
|
|
|
|
|
UHPA_FREE_PAYLOAD(temp);
|
|
|
|
|
mosquitto__free(temp);
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
temp->dest_ids = NULL;
|
|
|
|
|
temp->dest_id_count = 0;
|
|
|
|
|
db->msg_store_count++;
|
|
|
|
@ -573,6 +565,16 @@ int db__message_store(struct mosquitto_db *db, const char *source, uint16_t sour
|
|
|
|
|
db__msg_store_add(db, temp);
|
|
|
|
|
|
|
|
|
|
return MOSQ_ERR_SUCCESS;
|
|
|
|
|
error:
|
|
|
|
|
if(topic){
|
|
|
|
|
mosquitto__free(topic);
|
|
|
|
|
}
|
|
|
|
|
if(temp){
|
|
|
|
|
if(temp->source_id) mosquitto__free(temp->source_id);
|
|
|
|
|
if(temp->topic) mosquitto__free(temp->topic);
|
|
|
|
|
mosquitto__free(temp);
|
|
|
|
|
}
|
|
|
|
|
return rc;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored)
|
|
|
|
|