Refactor base_msg structs.

pull/2735/head
Roger A. Light 3 years ago
parent 99df070a2b
commit 100fd31530

@ -267,13 +267,13 @@ static int dump__base_msg_chunk_process(FILE *db_fptr, uint32_t length)
mosquitto__free(chunk.payload);
return MOSQ_ERR_NOMEM;
}
stored->source_mid = chunk.F.source_mid;
stored->topic = chunk.topic;
stored->qos = chunk.F.qos;
stored->retain = chunk.F.retain;
stored->payloadlen = chunk.F.payloadlen;
stored->payload = chunk.payload;
stored->properties = chunk.properties;
stored->msg.source_mid = chunk.F.source_mid;
stored->msg.topic = chunk.topic;
stored->msg.qos = chunk.F.qos;
stored->msg.retain = chunk.F.retain;
stored->msg.payloadlen = chunk.F.payloadlen;
stored->msg.payload = chunk.payload;
stored->msg.properties = chunk.properties;
rc = db__message_store(&chunk.source, stored, message_expiry_interval,
chunk.F.store_id, mosq_mo_client);
@ -285,9 +285,9 @@ static int dump__base_msg_chunk_process(FILE *db_fptr, uint32_t length)
if(rc == MOSQ_ERR_SUCCESS){
stored->source_listener = chunk.source.listener;
stored->db_id = chunk.F.store_id;
stored->msg.store_id = chunk.F.store_id;
HASH_ADD(hh, db.msg_store, db_id, sizeof(dbid_t), stored);
HASH_ADD(hh, db.msg_store, msg.store_id, sizeof(dbid_t), stored);
}else{
fclose(db_fptr);
return rc;

@ -300,21 +300,14 @@ struct mosquitto_evt_persist_client_msg {
};
/* Data for the MOSQ_EVT_PERSIST_BASE_MSG_ADD/_DELETE/_LOAD event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_base_msg {
void *future;
struct mosquitto_base_msg {
uint64_t store_id;
int64_t expiry_time;
const char *topic;
const void *payload;
const char *source_id;
const char *source_username;
char *plugin_topic;
void *plugin_payload;
const mosquitto_property *properties;
mosquitto_property *plugin_properties;
char *topic;
void *payload;
char *source_id;
char *source_username;
mosquitto_property *properties;
uint32_t payloadlen;
uint16_t source_mid;
uint16_t source_port;
@ -325,6 +318,16 @@ struct mosquitto_evt_persist_base_msg {
};
/* Data for the MOSQ_EVT_PERSIST_BASE_MSG_ADD/_DELETE/_LOAD event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
struct mosquitto_evt_persist_base_msg {
void *future;
struct mosquitto_base_msg msg;
void *future2[8];
};
/* Data for the MOSQ_EVT_PERSIST_RETAIN_MSG_SET/_DELETE event */
/* NOTE: The persistence interface is currently marked as unstable, which means
* it may change in a future minor release. */
@ -1068,7 +1071,7 @@ mosq_EXPORT int mosquitto_persist_client_msg_clear(struct mosquitto_evt_persist_
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_NOMEM - on out of memory
*/
mosq_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_base_msg *msg);
mosq_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg);
/* Function: mosquitto_persist_base_msg_delete

@ -147,31 +147,31 @@ int persist_sqlite__base_msg_add_cb(int event, void *event_data, void *userdata)
UNUSED(event);
rc = 0;
rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 1, (int64_t)ed->store_id);
rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 2, ed->expiry_time);
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 3, ed->topic, (int)strlen(ed->topic), SQLITE_STATIC);
if(ed->payload){
rc += sqlite3_bind_blob(ms->base_msg_add_stmt, 4, ed->payload, (int)ed->payloadlen, SQLITE_STATIC);
rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 1, (int64_t)ed->msg.store_id);
rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 2, ed->msg.expiry_time);
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 3, ed->msg.topic, (int)strlen(ed->msg.topic), SQLITE_STATIC);
if(ed->msg.payload){
rc += sqlite3_bind_blob(ms->base_msg_add_stmt, 4, ed->msg.payload, (int)ed->msg.payloadlen, SQLITE_STATIC);
}else{
rc += sqlite3_bind_null(ms->base_msg_add_stmt, 4);
}
if(ed->source_id){
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 5, ed->source_id, (int)strlen(ed->source_id), SQLITE_STATIC);
if(ed->msg.source_id){
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 5, ed->msg.source_id, (int)strlen(ed->msg.source_id), SQLITE_STATIC);
}else{
rc += sqlite3_bind_null(ms->base_msg_add_stmt, 5);
}
if(ed->source_username){
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 6, ed->source_username, (int)strlen(ed->source_username), SQLITE_STATIC);
if(ed->msg.source_username){
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 6, ed->msg.source_username, (int)strlen(ed->msg.source_username), SQLITE_STATIC);
}else{
rc += sqlite3_bind_null(ms->base_msg_add_stmt, 6);
}
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 7, (int)ed->payloadlen);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 8, ed->source_mid);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 9, ed->source_port);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 10, ed->qos);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 11, ed->retain);
if(ed->properties){
str = properties_to_json(ed->properties);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 7, (int)ed->msg.payloadlen);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 8, ed->msg.source_mid);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 9, ed->msg.source_port);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 10, ed->msg.qos);
rc += sqlite3_bind_int(ms->base_msg_add_stmt, 11, ed->msg.retain);
if(ed->msg.properties){
str = properties_to_json(ed->msg.properties);
}
if(str){
rc += sqlite3_bind_text(ms->base_msg_add_stmt, 12, str, (int)strlen(str), SQLITE_STATIC);
@ -202,7 +202,7 @@ int persist_sqlite__base_msg_remove_cb(int event, void *event_data, void *userda
UNUSED(event);
if(sqlite3_bind_int64(ms->base_msg_remove_stmt, 1, (int64_t)ed->store_id) == SQLITE_OK){
if(sqlite3_bind_int64(ms->base_msg_remove_stmt, 1, (int64_t)ed->msg.store_id) == SQLITE_OK){
ms->event_count++;
rc = sqlite3_step(ms->base_msg_remove_stmt);
if(rc == SQLITE_DONE){
@ -219,24 +219,24 @@ int persist_sqlite__base_msg_remove_cb(int event, void *event_data, void *userda
int persist_sqlite__base_msg_load_cb(int event, void *event_data, void *userdata)
{
struct mosquitto_evt_persist_base_msg *msg = event_data;
struct mosquitto_evt_persist_base_msg *ed = event_data;
struct mosquitto_sqlite *ms = userdata;
UNUSED(event);
if(sqlite3_bind_int64(ms->base_msg_load_stmt, 1, (int64_t)msg->store_id) == SQLITE_OK){
if(sqlite3_bind_int64(ms->base_msg_load_stmt, 1, (int64_t)ed->msg.store_id) == SQLITE_OK){
if(sqlite3_step(ms->base_msg_load_stmt) == SQLITE_ROW){
msg->expiry_time = (time_t)sqlite3_column_int64(ms->base_msg_load_stmt, 1);
msg->topic = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 2);
msg->payload = (void *)sqlite3_column_blob(ms->base_msg_load_stmt, 3);
msg->source_id = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 4);
msg->source_username = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 5);
msg->payloadlen = (uint32_t)sqlite3_column_int(ms->base_msg_load_stmt, 6);
msg->source_mid = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 7);
msg->source_port = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 8);
msg->qos = (uint8_t)sqlite3_column_int(ms->base_msg_load_stmt, 9);
msg->retain = sqlite3_column_int(ms->base_msg_load_stmt, 10);
mosquitto_persist_base_msg_add(msg);
ed->msg.expiry_time = (time_t)sqlite3_column_int64(ms->base_msg_load_stmt, 1);
ed->msg.topic = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 2);
ed->msg.payload = (void *)sqlite3_column_blob(ms->base_msg_load_stmt, 3);
ed->msg.source_id = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 4);
ed->msg.source_username = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 5);
ed->msg.payloadlen = (uint32_t)sqlite3_column_int(ms->base_msg_load_stmt, 6);
ed->msg.source_mid = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 7);
ed->msg.source_port = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 8);
ed->msg.qos = (uint8_t)sqlite3_column_int(ms->base_msg_load_stmt, 9);
ed->msg.retain = sqlite3_column_int(ms->base_msg_load_stmt, 10);
mosquitto_persist_base_msg_add(&ed->msg);
}
}
sqlite3_finalize(ms->base_msg_load_stmt);

@ -275,7 +275,7 @@ static int subscription_restore(struct mosquitto_sqlite *ms)
static int base_msg_restore(struct mosquitto_sqlite *ms)
{
sqlite3_stmt *stmt;
struct mosquitto_evt_persist_base_msg msg;
struct mosquitto_base_msg msg;
int rc;
long count = 0, failed = 0;
const char *str;
@ -297,32 +297,32 @@ static int base_msg_restore(struct mosquitto_sqlite *ms)
msg.expiry_time = (time_t)sqlite3_column_int64(stmt, 1);
str = (const char *)sqlite3_column_text(stmt, 2);
if(str){
msg.plugin_topic = strdup(str);
if(!msg.plugin_topic){
msg.topic = strdup(str);
if(!msg.topic){
failed++;
continue;
}
}
msg.source_id = (const char *)sqlite3_column_text(stmt, 4);
msg.source_username = (const char *)sqlite3_column_text(stmt, 5);
msg.source_id = (char *)sqlite3_column_text(stmt, 4);
msg.source_username = (char *)sqlite3_column_text(stmt, 5);
payload = (const void *)sqlite3_column_blob(stmt, 3);
msg.payloadlen = (uint32_t)sqlite3_column_int(stmt, 6);
if(payload && msg.payloadlen){
msg.plugin_payload = malloc(msg.payloadlen+1);
if(!msg.plugin_payload){
free(msg.plugin_topic);
msg.payload = malloc(msg.payloadlen+1);
if(!msg.payload){
free(msg.topic);
failed++;
continue;
}
memcpy(msg.plugin_payload, payload, msg.payloadlen);
((uint8_t *)msg.plugin_payload)[msg.payloadlen] = 0;
memcpy(msg.payload, payload, msg.payloadlen);
((uint8_t *)msg.payload)[msg.payloadlen] = 0;
}
msg.source_mid = (uint16_t)sqlite3_column_int(stmt, 7);
msg.source_port = (uint16_t)sqlite3_column_int(stmt, 8);
msg.qos = (uint8_t)sqlite3_column_int(stmt, 9);
msg.retain = sqlite3_column_int(stmt, 10);
msg.plugin_properties = json_to_properties((const char *)sqlite3_column_text(stmt, 11));
msg.properties = json_to_properties((const char *)sqlite3_column_text(stmt, 11));
rc = mosquitto_persist_base_msg_add(&msg);
if(rc == MOSQ_ERR_SUCCESS){
@ -333,7 +333,7 @@ static int base_msg_restore(struct mosquitto_sqlite *ms)
}
sqlite3_finalize(stmt);
mosquitto_log_printf(MOSQ_LOG_INFO, "sqlite: Restored %ld messages (%ld failed)", count, failed);
mosquitto_log_printf(MOSQ_LOG_INFO, "sqlite: Restored %ld base messages (%ld failed)", count, failed);
return MOSQ_ERR_SUCCESS;
}

@ -40,22 +40,22 @@ int control__process(struct mosquitto *context, struct mosquitto__base_msg *base
/* Check global plugins and non-per-listener settings first */
opts = &db.config->security_options;
HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->topic, strlen(base_msg->topic), cb_found);
HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->msg.topic, strlen(base_msg->msg.topic), cb_found);
/* If not found, check for per-listener plugins. */
if(cb_found == NULL && db.config->per_listener_settings){
opts = context->listener->security_options;
HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->topic, strlen(base_msg->topic), cb_found);
HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->msg.topic, strlen(base_msg->msg.topic), cb_found);
}
if(cb_found){
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = base_msg->topic;
event_data.payload = base_msg->payload;
event_data.payloadlen = base_msg->payloadlen;
event_data.qos = base_msg->qos;
event_data.retain = base_msg->retain;
event_data.properties = base_msg->properties;
event_data.topic = base_msg->msg.topic;
event_data.payload = base_msg->msg.payload;
event_data.payloadlen = base_msg->msg.payloadlen;
event_data.qos = base_msg->msg.qos;
event_data.retain = base_msg->msg.retain;
event_data.properties = base_msg->msg.properties;
event_data.reason_code = MQTT_RC_SUCCESS;
event_data.reason_string = NULL;
@ -68,11 +68,11 @@ int control__process(struct mosquitto *context, struct mosquitto__base_msg *base
SAFE_FREE(event_data.reason_string);
}
if(base_msg->qos == 1){
rc2 = send__puback(context, base_msg->source_mid, MQTT_RC_SUCCESS, properties);
if(base_msg->msg.qos == 1){
rc2 = send__puback(context, base_msg->msg.source_mid, MQTT_RC_SUCCESS, properties);
if(rc2) rc = rc2;
}else if(base_msg->qos == 2){
rc2 = send__pubrec(context, base_msg->source_mid, MQTT_RC_SUCCESS, properties);
}else if(base_msg->msg.qos == 2){
rc2 = send__pubrec(context, base_msg->msg.source_mid, MQTT_RC_SUCCESS, properties);
if(rc2) rc = rc2;
}
mosquitto_property_free_all(&properties);

@ -141,20 +141,20 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms
void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->inflight_count++;
msg_data->inflight_bytes += msg->base_msg->payloadlen;
msg_data->inflight_bytes += msg->base_msg->msg.payloadlen;
if(msg->qos != 0){
msg_data->inflight_count12++;
msg_data->inflight_bytes12 += msg->base_msg->payloadlen;
msg_data->inflight_bytes12 += msg->base_msg->msg.payloadlen;
}
}
static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->inflight_count--;
msg_data->inflight_bytes -= msg->base_msg->payloadlen;
msg_data->inflight_bytes -= msg->base_msg->msg.payloadlen;
if(msg->qos != 0){
msg_data->inflight_count12--;
msg_data->inflight_bytes12 -= msg->base_msg->payloadlen;
msg_data->inflight_bytes12 -= msg->base_msg->msg.payloadlen;
}
}
@ -162,20 +162,20 @@ static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_da
void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->queued_count++;
msg_data->queued_bytes += msg->base_msg->payloadlen;
msg_data->queued_bytes += msg->base_msg->msg.payloadlen;
if(msg->qos != 0){
msg_data->queued_count12++;
msg_data->queued_bytes12 += msg->base_msg->payloadlen;
msg_data->queued_bytes12 += msg->base_msg->msg.payloadlen;
}
}
static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto_client_msg *msg)
{
msg_data->queued_count--;
msg_data->queued_bytes -= msg->base_msg->payloadlen;
msg_data->queued_bytes -= msg->base_msg->msg.payloadlen;
if(msg->qos != 0){
msg_data->queued_count12--;
msg_data->queued_bytes12 -= msg->base_msg->payloadlen;
msg_data->queued_bytes12 -= msg->base_msg->msg.payloadlen;
}
}
@ -249,9 +249,9 @@ int db__msg_store_add(struct mosquitto__base_msg *base_msg)
{
struct mosquitto__base_msg *found;
HASH_FIND(hh, db.msg_store, &base_msg->db_id, sizeof(base_msg->db_id), found);
HASH_FIND(hh, db.msg_store, &base_msg->msg.store_id, sizeof(base_msg->msg.store_id), found);
if(found == NULL){
HASH_ADD(hh, db.msg_store, db_id, sizeof(base_msg->db_id), base_msg);
HASH_ADD_KEYPTR(hh, db.msg_store, &base_msg->msg.store_id, sizeof(base_msg->msg.store_id), base_msg);
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_ALREADY_EXISTS;
@ -263,17 +263,17 @@ void db__msg_store_free(struct mosquitto__base_msg *base_msg)
{
int i;
mosquitto__FREE(base_msg->source_id);
mosquitto__FREE(base_msg->source_username);
mosquitto__FREE(base_msg->msg.source_id);
mosquitto__FREE(base_msg->msg.source_username);
if(base_msg->dest_ids){
for(i=0; i<base_msg->dest_id_count; i++){
mosquitto__FREE(base_msg->dest_ids[i]);
}
mosquitto__FREE(base_msg->dest_ids);
}
mosquitto__FREE(base_msg->topic);
mosquitto_property_free_all(&base_msg->properties);
mosquitto__FREE(base_msg->payload);
mosquitto__FREE(base_msg->msg.topic);
mosquitto_property_free_all(&base_msg->msg.properties);
mosquitto__FREE(base_msg->msg.payload);
mosquitto__FREE(base_msg);
}
@ -282,7 +282,7 @@ void db__msg_store_remove(struct mosquitto__base_msg *base_msg, bool notify)
if(base_msg == NULL) return;
HASH_DELETE(hh, db.msg_store, base_msg);
db.msg_store_count--;
db.msg_store_bytes -= base_msg->payloadlen;
db.msg_store_bytes -= base_msg->msg.payloadlen;
if(notify == true){
plugin_persist__handle_base_msg_delete(base_msg);
}
@ -455,9 +455,9 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
msg_data = &context->msgs_in;
if(db__ready_for_flight(context, mosq_md_in, base_msg->qos)){
if(db__ready_for_flight(context, mosq_md_in, base_msg->msg.qos)){
state = mosq_ms_wait_for_pubrel;
}else if(base_msg->qos != 0 && db__ready_for_queue(context, base_msg->qos, msg_data)){
}else if(base_msg->msg.qos != 0 && db__ready_for_queue(context, base_msg->msg.qos, msg_data)){
state = mosq_ms_queued;
rc = 2;
}else{
@ -493,16 +493,16 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
}
msg->base_msg = base_msg;
db__msg_store_ref_inc(msg->base_msg);
msg->mid = base_msg->source_mid;
msg->mid = base_msg->msg.source_mid;
msg->direction = mosq_md_in;
msg->state = state;
msg->dup = false;
if(base_msg->qos > context->max_qos){
if(base_msg->msg.qos > context->max_qos){
msg->qos = context->max_qos;
}else{
msg->qos = base_msg->qos;
msg->qos = base_msg->msg.qos;
}
msg->retain = base_msg->retain;
msg->retain = base_msg->msg.retain;
msg->subscription_identifier = 0;
if(state == mosq_ms_queued){
@ -518,7 +518,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str
plugin_persist__handle_client_msg_add(context, msg);
}
if(msg->base_msg->qos > 0){
if(msg->base_msg->msg.qos > 0){
util__decrement_receive_quota(context);
}
return rc;
@ -802,28 +802,28 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_
base_msg = mosquitto__calloc(1, sizeof(struct mosquitto__base_msg));
if(base_msg == NULL) return MOSQ_ERR_NOMEM;
base_msg->topic = mosquitto__strdup(topic);
if(base_msg->topic == NULL){
base_msg->msg.topic = mosquitto__strdup(topic);
if(base_msg->msg.topic == NULL){
db__msg_store_free(base_msg);
return MOSQ_ERR_INVAL;
}
base_msg->qos = qos;
base_msg->msg.qos = qos;
if(db.config->retain_available == false){
base_msg->retain = 0;
base_msg->msg.retain = 0;
}else{
base_msg->retain = retain;
base_msg->msg.retain = retain;
}
base_msg->payloadlen = payloadlen;
base_msg->payload = mosquitto__malloc(base_msg->payloadlen+1);
if(base_msg->payload == NULL){
base_msg->msg.payloadlen = payloadlen;
base_msg->msg.payload = mosquitto__malloc(base_msg->msg.payloadlen+1);
if(base_msg->msg.payload == NULL){
db__msg_store_free(base_msg);
return MOSQ_ERR_NOMEM;
}
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
((uint8_t *)base_msg->payload)[base_msg->payloadlen] = 0;
memcpy(base_msg->payload, payload, base_msg->payloadlen);
((uint8_t *)base_msg->msg.payload)[base_msg->msg.payloadlen] = 0;
memcpy(base_msg->msg.payload, payload, base_msg->msg.payloadlen);
if(context && context->id){
source_id = context->id;
@ -831,7 +831,7 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_
source_id = "";
}
if(properties){
base_msg->properties = *properties;
base_msg->msg.properties = *properties;
*properties = NULL;
}
@ -842,7 +842,7 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_
}
if(db__message_store(context, base_msg, message_expiry_interval, 0, origin)) return 1;
return sub__messages_queue(source_id, base_msg->topic, base_msg->qos, base_msg->retain, &base_msg);
return sub__messages_queue(source_id, base_msg->msg.topic, base_msg->msg.qos, base_msg->msg.retain, &base_msg);
}
@ -917,19 +917,19 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg
assert(base_msg);
if(source && source->id){
base_msg->source_id = mosquitto__strdup(source->id);
base_msg->msg.source_id = mosquitto__strdup(source->id);
}else{
base_msg->source_id = mosquitto__strdup("");
base_msg->msg.source_id = mosquitto__strdup("");
}
if(!base_msg->source_id){
if(!base_msg->msg.source_id){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
db__msg_store_free(base_msg);
return MOSQ_ERR_NOMEM;
}
if(source && source->username){
base_msg->source_username = mosquitto__strdup(source->username);
if(!base_msg->source_username){
base_msg->msg.source_username = mosquitto__strdup(source->username);
if(!base_msg->msg.source_username){
db__msg_store_free(base_msg);
return MOSQ_ERR_NOMEM;
}
@ -939,20 +939,20 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg
}
base_msg->origin = origin;
if(message_expiry_interval > 0){
base_msg->message_expiry_time = db.now_real_s + message_expiry_interval;
base_msg->msg.expiry_time = db.now_real_s + message_expiry_interval;
}else{
base_msg->message_expiry_time = 0;
base_msg->msg.expiry_time = 0;
}
base_msg->dest_ids = NULL;
base_msg->dest_id_count = 0;
db.msg_store_count++;
db.msg_store_bytes += base_msg->payloadlen;
db.msg_store_bytes += base_msg->msg.payloadlen;
if(!base_msg_id){
base_msg->db_id = db__new_msg_id();
base_msg->msg.store_id = db__new_msg_id();
}else{
base_msg->db_id = base_msg_id;
base_msg->msg.store_id = base_msg_id;
}
rc = db__msg_store_add(base_msg);
@ -972,14 +972,14 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu
*base_msg = NULL;
DL_FOREACH(context->msgs_in.inflight, tail){
if(tail->base_msg->source_mid == mid){
if(tail->base_msg->msg.source_mid == mid){
*base_msg = tail->base_msg;
return MOSQ_ERR_SUCCESS;
}
}
DL_FOREACH(context->msgs_in.queued, tail){
if(tail->base_msg->source_mid == mid){
if(tail->base_msg->msg.source_mid == mid){
*base_msg = tail->base_msg;
return MOSQ_ERR_SUCCESS;
}
@ -1134,7 +1134,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid)
DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
if(tail->mid == mid) {
if(tail->base_msg->qos != 2){
if(tail->base_msg->msg.qos != 2){
return MOSQ_ERR_PROTOCOL;
}
db__message_remove_inflight(context, &context->msgs_in, tail);
@ -1159,12 +1159,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid)
DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp){
if(tail->mid == mid){
if(tail->base_msg->qos != 2){
if(tail->base_msg->msg.qos != 2){
return MOSQ_ERR_PROTOCOL;
}
topic = tail->base_msg->topic;
topic = tail->base_msg->msg.topic;
retain = tail->retain;
source_id = tail->base_msg->source_id;
source_id = tail->base_msg->msg.source_id;
/* topic==NULL should be a QoS 2 message that was
* denied/dropped and is being processed so the client doesn't
@ -1210,7 +1210,7 @@ void db__expire_all_messages(struct mosquitto *context)
struct mosquitto_client_msg *msg, *tmp;
DL_FOREACH_SAFE(context->msgs_out.inflight, msg, tmp){
if(msg->base_msg->message_expiry_time && db.now_real_s > msg->base_msg->message_expiry_time){
if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){
if(msg->qos > 0){
util__increment_send_quota(context);
}
@ -1218,12 +1218,12 @@ void db__expire_all_messages(struct mosquitto *context)
}
}
DL_FOREACH_SAFE(context->msgs_out.queued, msg, tmp){
if(msg->base_msg->message_expiry_time && db.now_real_s > msg->base_msg->message_expiry_time){
if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){
db__message_remove_queued(context, &context->msgs_out, msg);
}
}
DL_FOREACH_SAFE(context->msgs_in.inflight, msg, tmp){
if(msg->base_msg->message_expiry_time && db.now_real_s > msg->base_msg->message_expiry_time){
if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){
if(msg->qos > 0){
util__increment_receive_quota(context);
}
@ -1231,7 +1231,7 @@ void db__expire_all_messages(struct mosquitto *context)
}
}
DL_FOREACH_SAFE(context->msgs_in.queued, msg, tmp){
if(msg->base_msg->message_expiry_time && db.now_real_s > msg->base_msg->message_expiry_time){
if(msg->base_msg->msg.expiry_time && db.now_real_s > msg->base_msg->msg.expiry_time){
db__message_remove_queued(context, &context->msgs_in, msg);
}
}
@ -1253,8 +1253,8 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
uint32_t subscription_id;
expiry_interval = 0;
if(msg->base_msg->message_expiry_time){
if(db.now_real_s > msg->base_msg->message_expiry_time){
if(msg->base_msg->msg.expiry_time){
if(db.now_real_s > msg->base_msg->msg.expiry_time){
/* Message is expired, must not send. */
if(msg->direction == mosq_md_out && msg->qos > 0){
util__increment_send_quota(context);
@ -1262,18 +1262,18 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru
db__message_remove_inflight(context, &context->msgs_out, msg);
return MOSQ_ERR_SUCCESS;
}else{
expiry_interval = (uint32_t)(msg->base_msg->message_expiry_time - db.now_real_s);
expiry_interval = (uint32_t)(msg->base_msg->msg.expiry_time - db.now_real_s);
}
}
mid = msg->mid;
retries = msg->dup;
retain = msg->retain;
topic = msg->base_msg->topic;
topic = msg->base_msg->msg.topic;
qos = (uint8_t)msg->qos;
payloadlen = msg->base_msg->payloadlen;
payload = msg->base_msg->payload;
payloadlen = msg->base_msg->msg.payloadlen;
payload = msg->base_msg->msg.payload;
subscription_id = msg->subscription_identifier;
base_msg_props = msg->base_msg->properties;
base_msg_props = msg->base_msg->msg.properties;
switch(msg->state){
case mosq_ms_publish_qos0:

@ -94,9 +94,9 @@ static void connection_check_acl(struct mosquitto *context, struct mosquitto_cli
}else{
access = MOSQ_ACL_WRITE;
}
if(mosquitto_acl_check(context, msg_tail->base_msg->topic,
msg_tail->base_msg->payloadlen, msg_tail->base_msg->payload,
msg_tail->base_msg->qos, msg_tail->base_msg->retain, access) != MOSQ_ERR_SUCCESS){
if(mosquitto_acl_check(context, msg_tail->base_msg->msg.topic,
msg_tail->base_msg->msg.payloadlen, msg_tail->base_msg->msg.payload,
msg_tail->base_msg->msg.qos, msg_tail->base_msg->msg.retain, access) != MOSQ_ERR_SUCCESS){
DL_DELETE((*head), msg_tail);
db__msg_store_ref_dec(&msg_tail->base_msg);

@ -41,7 +41,7 @@ int handle__publish(struct mosquitto *context)
int rc2;
uint8_t header = context->in_packet.command;
int res = 0;
struct mosquitto__base_msg *msg, *stored = NULL;
struct mosquitto__base_msg *base_msg, *stored = NULL;
size_t len;
uint16_t slen;
char *topic_mount;
@ -59,73 +59,73 @@ int handle__publish(struct mosquitto *context)
context->stats.messages_received++;
msg = mosquitto__calloc(1, sizeof(struct mosquitto__base_msg));
if(msg == NULL){
base_msg = mosquitto__calloc(1, sizeof(struct mosquitto__base_msg));
if(base_msg == NULL){
return MOSQ_ERR_NOMEM;
}
dup = (header & 0x08)>>3;
msg->qos = (header & 0x06)>>1;
if(dup == 1 && msg->qos == 0){
base_msg->msg.qos = (header & 0x06)>>1;
if(dup == 1 && base_msg->msg.qos == 0){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid PUBLISH (QoS=0 and DUP=1) from %s, disconnecting.", context->id);
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(msg->qos == 3){
if(base_msg->msg.qos == 3){
log__printf(NULL, MOSQ_LOG_INFO,
"Invalid QoS in PUBLISH from %s, disconnecting.", context->id);
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(msg->qos > context->max_qos){
if(base_msg->msg.qos > context->max_qos){
log__printf(NULL, MOSQ_LOG_INFO,
"Too high QoS in PUBLISH from %s, disconnecting.", context->id);
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_QOS_NOT_SUPPORTED;
}
msg->retain = (header & 0x01);
base_msg->msg.retain = (header & 0x01);
if(msg->retain && db.config->retain_available == false){
db__msg_store_free(msg);
if(base_msg->msg.retain && db.config->retain_available == false){
db__msg_store_free(base_msg);
return MOSQ_ERR_RETAIN_NOT_SUPPORTED;
}
if(packet__read_string(&context->in_packet, &msg->topic, &slen)){
db__msg_store_free(msg);
if(packet__read_string(&context->in_packet, &base_msg->msg.topic, &slen)){
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(!slen && context->protocol != mosq_p_mqtt5){
/* Invalid publish topic, disconnect client. */
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(msg->qos > 0){
if(base_msg->msg.qos > 0){
if(packet__read_uint16(&context->in_packet, &mid)){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
if(mid == 0){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_PROTOCOL;
}
/* It is important to have a separate copy of mid, because msg may be
* freed before we want to send a PUBACK/PUBREC. */
msg->source_mid = mid;
base_msg->msg.source_mid = mid;
}
/* Handle properties */
if(context->protocol == mosq_p_mqtt5){
rc = property__read_all(CMD_PUBLISH, &context->in_packet, &properties);
if(rc){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return rc;
}
p = properties;
p_prev = NULL;
msg->properties = NULL;
base_msg->msg.properties = NULL;
msg_properties_last = NULL;
while(p){
switch(p->identifier){
@ -134,11 +134,11 @@ int handle__publish(struct mosquitto *context)
case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR:
case MQTT_PROP_RESPONSE_TOPIC:
case MQTT_PROP_USER_PROPERTY:
if(msg->properties){
if(base_msg->msg.properties){
msg_properties_last->next = p;
msg_properties_last = p;
}else{
msg->properties = p;
base_msg->msg.properties = p;
msg_properties_last = p;
}
if(p_prev){
@ -166,7 +166,7 @@ int handle__publish(struct mosquitto *context)
case MQTT_PROP_SUBSCRIPTION_IDENTIFIER:
if(p->value.varint == 0){
mosquitto_property_free_all(&properties);
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_PROTOCOL;
}
p_prev = p;
@ -182,94 +182,94 @@ int handle__publish(struct mosquitto *context)
mosquitto_property_free_all(&properties);
if(topic_alias == 0 || (context->listener && topic_alias > context->listener->max_topic_alias)){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_TOPIC_ALIAS_INVALID;
}else if(topic_alias > 0){
if(msg->topic){
rc = alias__add_r2l(context, msg->topic, (uint16_t)topic_alias);
if(base_msg->msg.topic){
rc = alias__add_r2l(context, base_msg->msg.topic, (uint16_t)topic_alias);
if(rc){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return rc;
}
}else{
rc = alias__find_by_alias(context, ALIAS_DIR_R2L, (uint16_t)topic_alias, &msg->topic);
rc = alias__find_by_alias(context, ALIAS_DIR_R2L, (uint16_t)topic_alias, &base_msg->msg.topic);
if(rc){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_PROTOCOL;
}
}
}
#ifdef WITH_BRIDGE
rc = bridge__remap_topic_in(context, &msg->topic);
rc = bridge__remap_topic_in(context, &base_msg->msg.topic);
if(rc){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return rc;
}
#endif
if(mosquitto_pub_topic_check(msg->topic) != MOSQ_ERR_SUCCESS){
if(mosquitto_pub_topic_check(base_msg->msg.topic) != MOSQ_ERR_SUCCESS){
/* Invalid publish topic, just swallow it. */
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
msg->payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
G_PUB_BYTES_RECEIVED_INC(msg->payloadlen);
base_msg->msg.payloadlen = context->in_packet.remaining_length - context->in_packet.pos;
G_PUB_BYTES_RECEIVED_INC(base_msg->msg.payloadlen);
if(context->listener && context->listener->mount_point){
len = strlen(context->listener->mount_point) + strlen(msg->topic) + 1;
len = strlen(context->listener->mount_point) + strlen(base_msg->msg.topic) + 1;
topic_mount = mosquitto__malloc(len+1);
if(!topic_mount){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_NOMEM;
}
snprintf(topic_mount, len, "%s%s", context->listener->mount_point, msg->topic);
snprintf(topic_mount, len, "%s%s", context->listener->mount_point, base_msg->msg.topic);
topic_mount[len] = '\0';
mosquitto__FREE(msg->topic);
msg->topic = topic_mount;
mosquitto__FREE(base_msg->msg.topic);
base_msg->msg.topic = topic_mount;
}
if(msg->payloadlen){
if(db.config->message_size_limit && msg->payloadlen > db.config->message_size_limit){
log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
if(base_msg->msg.payloadlen){
if(db.config->message_size_limit && base_msg->msg.payloadlen > db.config->message_size_limit){
log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic, (long)base_msg->msg.payloadlen);
reason_code = MQTT_RC_PACKET_TOO_LARGE;
goto process_bad_message;
}
msg->payload = mosquitto__malloc(msg->payloadlen+1);
if(msg->payload == NULL){
db__msg_store_free(msg);
base_msg->msg.payload = mosquitto__malloc(base_msg->msg.payloadlen+1);
if(base_msg->msg.payload == NULL){
db__msg_store_free(base_msg);
return MOSQ_ERR_NOMEM;
}
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
((uint8_t *)msg->payload)[msg->payloadlen] = 0;
((uint8_t *)base_msg->msg.payload)[base_msg->msg.payloadlen] = 0;
if(packet__read_bytes(&context->in_packet, msg->payload, msg->payloadlen)){
db__msg_store_free(msg);
if(packet__read_bytes(&context->in_packet, base_msg->msg.payload, base_msg->msg.payloadlen)){
db__msg_store_free(base_msg);
return MOSQ_ERR_MALFORMED_PACKET;
}
}
/* Check for topic access */
rc = mosquitto_acl_check(context, msg->topic, msg->payloadlen, msg->payload, msg->qos, msg->retain, MOSQ_ACL_WRITE);
rc = mosquitto_acl_check(context, base_msg->msg.topic, base_msg->msg.payloadlen, base_msg->msg.payload, base_msg->msg.qos, base_msg->msg.retain, MOSQ_ACL_WRITE);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic,
(long)msg->payloadlen);
context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic,
(long)base_msg->msg.payloadlen);
reason_code = MQTT_RC_NOT_AUTHORIZED;
goto process_bad_message;
}else if(rc != MOSQ_ERR_SUCCESS){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return rc;
}
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic, (long)msg->payloadlen);
log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic, (long)base_msg->msg.payloadlen);
if(!strncmp(msg->topic, "$CONTROL/", 9)){
if(!strncmp(base_msg->msg.topic, "$CONTROL/", 9)){
#ifdef WITH_CONTROL
rc = control__process(context, msg);
db__msg_store_free(msg);
rc = control__process(context, base_msg);
db__msg_store_free(base_msg);
return rc;
#else
reason_code = MQTT_RC_IMPLEMENTATION_SPECIFIC;
@ -278,12 +278,12 @@ int handle__publish(struct mosquitto *context)
}
{
rc = plugin__handle_message(context, msg);
rc = plugin__handle_message(context, base_msg);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
context->id, dup, msg->qos, msg->retain, msg->source_mid, msg->topic,
(long)msg->payloadlen);
context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic,
(long)base_msg->msg.payloadlen);
reason_code = MQTT_RC_NOT_AUTHORIZED;
goto process_bad_message;
@ -294,61 +294,61 @@ int handle__publish(struct mosquitto *context)
reason_code = MQTT_RC_QUOTA_EXCEEDED;
goto process_bad_message;
}else if(rc != MOSQ_ERR_SUCCESS){
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return rc;
}
}
if(msg->qos > 0){
db__message_store_find(context, msg->source_mid, &stored);
if(base_msg->msg.qos > 0){
db__message_store_find(context, base_msg->msg.source_mid, &stored);
}
if(stored && msg->source_mid != 0 &&
(stored->qos != msg->qos
|| stored->payloadlen != msg->payloadlen
|| strcmp(stored->topic, msg->topic)
|| memcmp(stored->payload, msg->payload, msg->payloadlen) )){
if(stored && base_msg->msg.source_mid != 0 &&
(stored->msg.qos != base_msg->msg.qos
|| stored->msg.payloadlen != base_msg->msg.payloadlen
|| strcmp(stored->msg.topic, base_msg->msg.topic)
|| memcmp(stored->msg.payload, base_msg->msg.payload, base_msg->msg.payloadlen) )){
log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", msg->source_mid, context->id);
db__message_remove_incoming(context, msg->source_mid);
log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", base_msg->msg.source_mid, context->id);
db__message_remove_incoming(context, base_msg->msg.source_mid);
stored = NULL;
}
if(!stored){
if(msg->qos > 0 && context->msgs_in.inflight_quota == 0){
if(base_msg->msg.qos > 0 && context->msgs_in.inflight_quota == 0){
/* Client isn't allowed any more incoming messages, so fail early */
db__msg_store_free(msg);
db__msg_store_free(base_msg);
return MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED;
}
if(msg->qos == 0
|| db__ready_for_flight(context, mosq_md_in, msg->qos)
|| db__ready_for_queue(context, msg->qos, &context->msgs_in)){
if(base_msg->msg.qos == 0
|| db__ready_for_flight(context, mosq_md_in, base_msg->msg.qos)
|| db__ready_for_queue(context, base_msg->msg.qos, &context->msgs_in)){
dup = 0;
rc = db__message_store(context, msg, message_expiry_interval, 0, mosq_mo_client);
rc = db__message_store(context, base_msg, message_expiry_interval, 0, mosq_mo_client);
if(rc) return rc;
}else{
/* Client isn't allowed any more incoming messages, so fail early */
reason_code = MQTT_RC_QUOTA_EXCEEDED;
goto process_bad_message;
}
stored = msg;
msg = NULL;
stored = base_msg;
base_msg = NULL;
}else{
db__msg_store_free(msg);
msg = NULL;
db__msg_store_free(base_msg);
base_msg = NULL;
dup = 1;
}
switch(stored->qos){
switch(stored->msg.qos){
case 0:
rc2 = sub__messages_queue(context->id, stored->topic, stored->qos, stored->retain, &stored);
rc2 = sub__messages_queue(context->id, stored->msg.topic, stored->msg.qos, stored->msg.retain, &stored);
if(rc2 > 0) rc = 1;
break;
case 1:
util__decrement_receive_quota(context);
rc2 = sub__messages_queue(context->id, stored->topic, stored->qos, stored->retain, &stored);
rc2 = sub__messages_queue(context->id, stored->msg.topic, stored->msg.qos, stored->msg.retain, &stored);
/* stored may now be free, so don't refer to it */
if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){
if(send__puback(context, mid, 0, NULL)) rc = 1;
@ -368,7 +368,7 @@ int handle__publish(struct mosquitto *context)
* due to queue. This isn't an error so don't disconnect them. */
/* FIXME - this is no longer necessary due to failing early above */
if(!res){
if(send__pubrec(context, stored->source_mid, 0, NULL)) rc = 1;
if(send__pubrec(context, stored->msg.source_mid, 0, NULL)) rc = 1;
}else if(res == 1){
rc = 1;
}
@ -379,20 +379,19 @@ int handle__publish(struct mosquitto *context)
return rc;
process_bad_message:
rc = 1;
if(msg){
switch(msg->qos){
if(base_msg){
switch(base_msg->msg.qos){
case 0:
rc = MOSQ_ERR_SUCCESS;
break;
case 1:
rc = send__puback(context, msg->source_mid, reason_code, NULL);
rc = send__puback(context, base_msg->msg.source_mid, reason_code, NULL);
break;
case 2:
rc = send__pubrec(context, msg->source_mid, reason_code, NULL);
rc = send__pubrec(context, base_msg->msg.source_mid, reason_code, NULL);
break;
}
db__msg_store_free(msg);
db__msg_store_free(base_msg);
}
return rc;
}

@ -65,7 +65,7 @@ void lws__sul_callback(struct lws_sorted_usec_list *l)
static struct lws_sorted_usec_list sul;
#endif
static int single_publish(struct mosquitto *context, struct mosquitto__message_v5 *msg, uint32_t message_expiry)
static int single_publish(struct mosquitto *context, struct mosquitto__message_v5 *pub_msg, uint32_t message_expiry)
{
struct mosquitto__base_msg *base_msg;
uint16_t mid;
@ -73,32 +73,32 @@ static int single_publish(struct mosquitto *context, struct mosquitto__message_v
base_msg = mosquitto__calloc(1, sizeof(struct mosquitto__base_msg));
if(base_msg == NULL) return MOSQ_ERR_NOMEM;
base_msg->topic = msg->topic;
msg->topic = NULL;
base_msg->retain = 0;
base_msg->payloadlen = (uint32_t)msg->payloadlen;
base_msg->payload = mosquitto__malloc(base_msg->payloadlen+1);
if(base_msg->payload == NULL){
base_msg->msg.topic = pub_msg->topic;
pub_msg->topic = NULL;
base_msg->msg.retain = 0;
base_msg->msg.payloadlen = (uint32_t)pub_msg->payloadlen;
base_msg->msg.payload = mosquitto__malloc(base_msg->msg.payloadlen+1);
if(base_msg->msg.payload == NULL){
db__msg_store_free(base_msg);
return MOSQ_ERR_NOMEM;
}
/* Ensure payload is always zero terminated, this is the reason for the extra byte above */
((uint8_t *)base_msg->payload)[base_msg->payloadlen] = 0;
memcpy(base_msg->payload, msg->payload, base_msg->payloadlen);
((uint8_t *)base_msg->msg.payload)[base_msg->msg.payloadlen] = 0;
memcpy(base_msg->msg.payload, pub_msg->payload, base_msg->msg.payloadlen);
if(msg->properties){
base_msg->properties = msg->properties;
msg->properties = NULL;
if(pub_msg->properties){
base_msg->msg.properties = pub_msg->properties;
pub_msg->properties = NULL;
}
if(db__message_store(context, base_msg, message_expiry, 0, mosq_mo_broker)) return 1;
if(msg->qos){
if(pub_msg->qos){
mid = mosquitto__mid_generate(context);
}else{
mid = 0;
}
return db__message_insert_outgoing(context, 0, mid, (uint8_t)msg->qos, 0, base_msg, 0, true, true);
return db__message_insert_outgoing(context, 0, mid, (uint8_t)pub_msg->qos, 0, base_msg, 0, true, true);
}

@ -410,22 +410,12 @@ struct mosquitto__retainhier {
struct mosquitto__base_msg{
UT_hash_handle hh;
dbid_t db_id;
char *source_id;
char *source_username;
struct mosquitto_base_msg msg;
struct mosquitto__listener *source_listener;
char **dest_ids;
int dest_id_count;
int ref_count;
char* topic;
mosquitto_property *properties;
void *payload;
time_t message_expiry_time;
uint32_t payloadlen;
enum mosquitto_msg_origin origin;
uint16_t source_mid;
uint8_t qos;
bool retain;
bool stored;
};

@ -296,13 +296,13 @@ static int persist__base_msg_chunk_restore(FILE *db_fptr, uint32_t length)
goto cleanup;
}
base_msg->source_mid = chunk.F.source_mid;
base_msg->topic = chunk.topic;
base_msg->qos = chunk.F.qos;
base_msg->payloadlen = chunk.F.payloadlen;
base_msg->retain = chunk.F.retain;
base_msg->properties = chunk.properties;
base_msg->payload = chunk.payload;
base_msg->msg.source_mid = chunk.F.source_mid;
base_msg->msg.topic = chunk.topic;
base_msg->msg.qos = chunk.F.qos;
base_msg->msg.payloadlen = chunk.F.payloadlen;
base_msg->msg.retain = chunk.F.retain;
base_msg->msg.properties = chunk.properties;
base_msg->msg.payload = chunk.payload;
base_msg->source_listener = chunk.source.listener;
rc = db__message_store(&chunk.source, base_msg, message_expiry_interval,
@ -327,7 +327,7 @@ cleanup:
static int persist__retain_chunk_restore(FILE *db_fptr)
{
struct mosquitto__base_msg *msg;
struct mosquitto__base_msg *base_msg;
struct P_retain chunk;
int rc;
char **split_topics;
@ -344,10 +344,10 @@ static int persist__retain_chunk_restore(FILE *db_fptr)
return rc;
}
HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), msg);
if(msg){
if(sub__topic_tokenise(msg->topic, &local_topic, &split_topics, NULL)) return 1;
retain__store(msg->topic, msg, split_topics, true);
HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), base_msg);
if(base_msg){
if(sub__topic_tokenise(base_msg->msg.topic, &local_topic, &split_topics, NULL)) return 1;
retain__store(base_msg->msg.topic, base_msg, split_topics, true);
mosquitto__FREE(local_topic);
mosquitto__FREE(split_topics);
retained_count++;

@ -51,7 +51,7 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex
cmsg = queue;
while(cmsg){
if(!strncmp(cmsg->base_msg->topic, "$SYS", 4)
if(!strncmp(cmsg->base_msg->msg.topic, "$SYS", 4)
&& cmsg->base_msg->ref_count <= 1
&& cmsg->base_msg->dest_id_count == 0){
@ -61,7 +61,7 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex
continue;
}
chunk.F.store_id = cmsg->base_msg->db_id;
chunk.F.store_id = cmsg->base_msg->msg.store_id;
chunk.F.mid = cmsg->mid;
chunk.F.id_len = (uint16_t)strlen(context->id);
chunk.F.qos = cmsg->qos;
@ -95,11 +95,11 @@ static int persist__message_store_save(FILE *db_fptr)
base_msg = db.msg_store;
HASH_ITER(hh, db.msg_store, base_msg, base_msg_tmp){
if(base_msg->ref_count < 1 || base_msg->topic == NULL){
if(base_msg->ref_count < 1 || base_msg->msg.topic == NULL){
continue;
}
if(!strncmp(base_msg->topic, "$SYS", 4)){
if(!strncmp(base_msg->msg.topic, "$SYS", 4)){
if(base_msg->ref_count <= 1 && base_msg->dest_id_count == 0){
/* $SYS messages that are only retained shouldn't be persisted. */
continue;
@ -110,39 +110,39 @@ static int persist__message_store_save(FILE *db_fptr)
* queue. */
chunk.F.retain = 0;
}else{
chunk.F.retain = (uint8_t)base_msg->retain;
chunk.F.retain = (uint8_t)base_msg->msg.retain;
}
chunk.F.store_id = base_msg->db_id;
chunk.F.expiry_time = base_msg->message_expiry_time;
chunk.F.payloadlen = base_msg->payloadlen;
chunk.F.source_mid = base_msg->source_mid;
if(base_msg->source_id){
chunk.F.source_id_len = (uint16_t)strlen(base_msg->source_id);
chunk.source.id = base_msg->source_id;
chunk.F.store_id = base_msg->msg.store_id;
chunk.F.expiry_time = base_msg->msg.expiry_time;
chunk.F.payloadlen = base_msg->msg.payloadlen;
chunk.F.source_mid = base_msg->msg.source_mid;
if(base_msg->msg.source_id){
chunk.F.source_id_len = (uint16_t)strlen(base_msg->msg.source_id);
chunk.source.id = base_msg->msg.source_id;
}else{
chunk.F.source_id_len = 0;
chunk.source.id = NULL;
}
if(base_msg->source_username){
chunk.F.source_username_len = (uint16_t)strlen(base_msg->source_username);
chunk.source.username = base_msg->source_username;
if(base_msg->msg.source_username){
chunk.F.source_username_len = (uint16_t)strlen(base_msg->msg.source_username);
chunk.source.username = base_msg->msg.source_username;
}else{
chunk.F.source_username_len = 0;
chunk.source.username = NULL;
}
chunk.F.topic_len = (uint16_t)strlen(base_msg->topic);
chunk.topic = base_msg->topic;
chunk.F.topic_len = (uint16_t)strlen(base_msg->msg.topic);
chunk.topic = base_msg->msg.topic;
if(base_msg->source_listener){
chunk.F.source_port = base_msg->source_listener->port;
}else{
chunk.F.source_port = 0;
}
chunk.F.qos = base_msg->qos;
chunk.payload = base_msg->payload;
chunk.properties = base_msg->properties;
chunk.F.qos = base_msg->msg.qos;
chunk.payload = base_msg->msg.payload;
chunk.properties = base_msg->msg.properties;
rc = persist__chunk_message_store_write_v6(db_fptr, &chunk);
if(rc){
@ -277,9 +277,9 @@ static int persist__retain_save(FILE *db_fptr, struct mosquitto__retainhier *nod
memset(&retain_chunk, 0, sizeof(struct P_retain));
if(node->retained && strncmp(node->retained->topic, "$SYS", 4)){
if(node->retained && strncmp(node->retained->msg.topic, "$SYS", 4)){
/* Don't save $SYS messages. */
retain_chunk.F.store_id = node->retained->db_id;
retain_chunk.F.store_id = node->retained->msg.store_id;
rc = persist__chunk_retain_write_v6(db_fptr, &retain_chunk);
if(rc){
return rc;

@ -31,12 +31,12 @@ static int plugin__handle_message_single(struct mosquitto__security_options *opt
memset(&event_data, 0, sizeof(event_data));
event_data.client = context;
event_data.topic = stored->topic;
event_data.payloadlen = stored->payloadlen;
event_data.payload = stored->payload;
event_data.qos = stored->qos;
event_data.retain = stored->retain;
event_data.properties = stored->properties;
event_data.topic = stored->msg.topic;
event_data.payloadlen = stored->msg.payloadlen;
event_data.payload = stored->msg.payload;
event_data.qos = stored->msg.qos;
event_data.retain = stored->msg.retain;
event_data.properties = stored->msg.properties;
DL_FOREACH(opts->plugin_callbacks.message, cb_base){
rc = cb_base->cb(MOSQ_EVT_MESSAGE, &event_data, cb_base->userdata);
@ -45,14 +45,14 @@ static int plugin__handle_message_single(struct mosquitto__security_options *opt
}
}
stored->topic = event_data.topic;
if(stored->payload != event_data.payload){
mosquitto__FREE(stored->payload);
stored->payload = event_data.payload;
stored->payloadlen = event_data.payloadlen;
stored->msg.topic = event_data.topic;
if(stored->msg.payload != event_data.payload){
mosquitto__FREE(stored->msg.payload);
stored->msg.payload = event_data.payload;
stored->msg.payloadlen = event_data.payloadlen;
}
stored->retain = event_data.retain;
stored->properties = event_data.properties;
stored->msg.retain = event_data.retain;
stored->msg.properties = event_data.properties;
return rc;
}

@ -200,7 +200,7 @@ void plugin_persist__handle_client_msg_add(struct mosquitto *context, const stru
event_data.client_id = context->id;
event_data.cmsg_id = cmsg->cmsg_id;
event_data.store_id = cmsg->base_msg->db_id;
event_data.store_id = cmsg->base_msg->msg.store_id;
event_data.mid = cmsg->mid;
event_data.qos = cmsg->qos;
event_data.retain = cmsg->retain;
@ -235,7 +235,7 @@ void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const s
event_data.mid = cmsg->mid;
event_data.state = (uint8_t)cmsg->state;
event_data.qos = cmsg->qos;
event_data.store_id = cmsg->base_msg->db_id;
event_data.store_id = cmsg->base_msg->msg.store_id;
event_data.direction = (uint8_t)cmsg->direction;
DL_FOREACH(opts->plugin_callbacks.persist_client_msg_delete, cb_base){
@ -263,7 +263,7 @@ void plugin_persist__handle_client_msg_update(struct mosquitto *context, const s
event_data.client_id = context->id;
event_data.cmsg_id = cmsg->cmsg_id;
event_data.mid = cmsg->mid;
event_data.store_id = cmsg->base_msg->db_id;
event_data.store_id = cmsg->base_msg->msg.store_id;
event_data.state = (uint8_t)cmsg->state;
event_data.dup = cmsg->dup;
event_data.direction = (uint8_t)cmsg->direction;
@ -275,62 +275,62 @@ void plugin_persist__handle_client_msg_update(struct mosquitto *context, const s
}
void plugin_persist__handle_base_msg_add(struct mosquitto__base_msg *msg)
void plugin_persist__handle_base_msg_add(struct mosquitto__base_msg *base_msg)
{
struct mosquitto_evt_persist_base_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
if(msg->stored || db.shutdown) return;
if(base_msg->stored || db.shutdown) return;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.store_id = msg->db_id;
event_data.expiry_time = msg->message_expiry_time;
event_data.topic = msg->topic;
event_data.payload = msg->payload;
event_data.source_id = msg->source_id;
event_data.source_username = msg->source_username;
event_data.properties = msg->properties;
event_data.payloadlen = msg->payloadlen;
event_data.source_mid = msg->source_mid;
if(msg->source_listener){
event_data.source_port = msg->source_listener->port;
event_data.msg.store_id = base_msg->msg.store_id;
event_data.msg.expiry_time = base_msg->msg.expiry_time;
event_data.msg.topic = base_msg->msg.topic;
event_data.msg.payload = base_msg->msg.payload;
event_data.msg.source_id = base_msg->msg.source_id;
event_data.msg.source_username = base_msg->msg.source_username;
event_data.msg.properties = base_msg->msg.properties;
event_data.msg.payloadlen = base_msg->msg.payloadlen;
event_data.msg.source_mid = base_msg->msg.source_mid;
if(base_msg->source_listener){
event_data.msg.source_port = base_msg->source_listener->port;
}else{
event_data.source_port = 0;
event_data.msg.source_port = 0;
}
event_data.qos = msg->qos;
event_data.retain = msg->retain;
event_data.msg.qos = base_msg->msg.qos;
event_data.msg.retain = base_msg->msg.retain;
DL_FOREACH(opts->plugin_callbacks.persist_base_msg_add, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_BASE_MSG_ADD, &event_data, cb_base->userdata);
}
msg->stored = true;
base_msg->stored = true;
}
void plugin_persist__handle_base_msg_delete(struct mosquitto__base_msg *msg)
void plugin_persist__handle_base_msg_delete(struct mosquitto__base_msg *base_msg)
{
struct mosquitto_evt_persist_base_msg event_data;
struct mosquitto__callback *cb_base;
struct mosquitto__security_options *opts;
if(msg->stored == false || db.shutdown) return;
if(base_msg->stored == false || db.shutdown) return;
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.store_id = msg->db_id;
event_data.msg.store_id = base_msg->msg.store_id;
DL_FOREACH(opts->plugin_callbacks.persist_base_msg_delete, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_BASE_MSG_DELETE, &event_data, cb_base->userdata);
}
msg->stored = false;
base_msg->stored = false;
}
void plugin_persist__handle_retain_msg_set(struct mosquitto__base_msg *msg)
void plugin_persist__handle_retain_msg_set(struct mosquitto__base_msg *base_msg)
{
struct mosquitto_evt_persist_retain_msg event_data;
struct mosquitto__callback *cb_base;
@ -341,8 +341,8 @@ void plugin_persist__handle_retain_msg_set(struct mosquitto__base_msg *msg)
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.store_id = msg->db_id;
event_data.topic = msg->topic;
event_data.store_id = base_msg->msg.store_id;
event_data.topic = base_msg->msg.topic;
DL_FOREACH(opts->plugin_callbacks.persist_retain_msg_set, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_RETAIN_MSG_SET, &event_data, cb_base->userdata);
@ -350,7 +350,7 @@ void plugin_persist__handle_retain_msg_set(struct mosquitto__base_msg *msg)
}
void plugin_persist__handle_retain_msg_delete(struct mosquitto__base_msg *msg)
void plugin_persist__handle_retain_msg_delete(struct mosquitto__base_msg *base_msg)
{
struct mosquitto_evt_persist_retain_msg event_data;
struct mosquitto__callback *cb_base;
@ -361,7 +361,7 @@ void plugin_persist__handle_retain_msg_delete(struct mosquitto__base_msg *msg)
opts = &db.config->security_options;
memset(&event_data, 0, sizeof(event_data));
event_data.topic = msg->topic;
event_data.topic = base_msg->msg.topic;
DL_FOREACH(opts->plugin_callbacks.persist_retain_msg_delete, cb_base){
cb_base->cb(MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE, &event_data, cb_base->userdata);

@ -745,7 +745,7 @@ BROKER_EXPORT int mosquitto_subscription_delete(const char *client_id, const cha
}
BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_base_msg *msg)
BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_add)
{
struct mosquitto context;
struct mosquitto__base_msg *base_msg;
@ -758,15 +758,15 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_ba
/* db__message_store only takes a copy of .id and .username, so it is reasonably safe
* to cast the const char * to char * */
context.id = (char *)msg->source_id;
context.username = (char *)msg->source_username;
context.id = (char *)msg_add->source_id;
context.username = (char *)msg_add->source_username;
if(msg->expiry_time == 0){
if(msg_add->expiry_time == 0){
message_expiry_interval = 0;
}else if(msg->expiry_time <= db.now_real_s){
}else if(msg_add->expiry_time <= db.now_real_s){
message_expiry_interval = 1;
}else{
message_expiry_interval_tt = msg->expiry_time - db.now_real_s;
message_expiry_interval_tt = msg_add->expiry_time - db.now_real_s;
if(message_expiry_interval_tt > UINT32_MAX){
message_expiry_interval = UINT32_MAX;
}else{
@ -778,21 +778,22 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_ba
if(base_msg == NULL){
goto error;
}
base_msg->payloadlen = msg->payloadlen;
base_msg->source_mid = msg->source_mid;
base_msg->qos = msg->qos;
base_msg->retain = msg->retain;
base_msg->msg.store_id = msg_add->store_id;
base_msg->msg.payloadlen = msg_add->payloadlen;
base_msg->msg.source_mid = msg_add->source_mid;
base_msg->msg.qos = msg_add->qos;
base_msg->msg.retain = msg_add->retain;
base_msg->payload = msg->plugin_payload;
msg->plugin_payload = NULL;
base_msg->topic = msg->plugin_topic;
msg->plugin_topic = NULL;
base_msg->properties = msg->plugin_properties;
msg->plugin_properties = NULL;
base_msg->msg.payload = msg_add->payload;
msg_add->payload = NULL;
base_msg->msg.topic = msg_add->topic;
msg_add->topic = NULL;
base_msg->msg.properties = msg_add->properties;
msg_add->properties = NULL;
if(msg->source_port){
if(msg_add->source_port){
for(i=0; i<db.config->listener_count; i++){
if(db.config->listeners[i].port == msg->source_port){
if(db.config->listeners[i].port == msg_add->source_port){
base_msg->source_listener = &db.config->listeners[i];
break;
}
@ -800,13 +801,13 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_evt_persist_ba
}
base_msg->stored = true;
rc = db__message_store(&context, base_msg, message_expiry_interval, msg->store_id, mosq_mo_broker);
rc = db__message_store(&context, base_msg, message_expiry_interval, base_msg->msg.store_id, mosq_mo_broker);
return rc;
error:
mosquitto_property_free_all(&msg->plugin_properties);
mosquitto_free(msg->plugin_topic);
mosquitto_free(msg->plugin_payload);
mosquitto_property_free_all(&msg_add->properties);
mosquitto_free(msg_add->topic);
mosquitto_free(msg_add->payload);
mosquitto_free(base_msg);
return MOSQ_ERR_NOMEM;

@ -165,7 +165,7 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char
#endif
if(retainhier->retained){
if(persist && retainhier->retained->topic[0] != '$' && base_msg->payloadlen == 0){
if(persist && retainhier->retained->msg.topic[0] != '$' && base_msg->msg.payloadlen == 0){
/* Only delete if another retained message isn't replacing this one */
plugin_persist__handle_retain_msg_delete(retainhier->retained);
}
@ -173,15 +173,15 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char
#ifdef WITH_SYS_TREE
db.retained_count--;
#endif
if(base_msg->payloadlen == 0){
if(base_msg->msg.payloadlen == 0){
retainhier->retained = NULL;
retain__clean_empty_hierarchy(retainhier);
}
}
if(base_msg->payloadlen){
if(base_msg->msg.payloadlen){
retainhier->retained = base_msg;
db__msg_store_ref_inc(retainhier->retained);
if(persist && retainhier->retained->topic[0] != '$'){
if(persist && retainhier->retained->msg.topic[0] != '$'){
plugin_persist__handle_base_msg_add(retainhier->retained);
plugin_persist__handle_retain_msg_set(retainhier->retained);
}
@ -201,7 +201,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
uint16_t mid;
struct mosquitto__base_msg *retained;
if(branch->retained->message_expiry_time > 0 && db.now_real_s >= branch->retained->message_expiry_time){
if(branch->retained->msg.expiry_time > 0 && db.now_real_s >= branch->retained->msg.expiry_time){
plugin_persist__handle_retain_msg_delete(branch->retained);
db__msg_store_ref_dec(&branch->retained);
branch->retained = NULL;
@ -213,8 +213,8 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
retained = branch->retained;
rc = mosquitto_acl_check(context, retained->topic, retained->payloadlen, retained->payload,
retained->qos, retained->retain, MOSQ_ACL_READ);
rc = mosquitto_acl_check(context, retained->msg.topic, retained->msg.payloadlen, retained->msg.payload,
retained->msg.qos, retained->msg.retain, MOSQ_ACL_READ);
if(rc == MOSQ_ERR_ACL_DENIED){
return MOSQ_ERR_SUCCESS;
}else if(rc != MOSQ_ERR_SUCCESS){
@ -222,19 +222,19 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
}
/* Check for original source access */
if(db.config->check_retain_source && retained->origin != mosq_mo_broker && retained->source_id){
if(db.config->check_retain_source && retained->origin != mosq_mo_broker && retained->msg.source_id){
struct mosquitto retain_ctxt;
memset(&retain_ctxt, 0, sizeof(struct mosquitto));
retain_ctxt.id = retained->source_id;
retain_ctxt.username = retained->source_username;
retain_ctxt.id = retained->msg.source_id;
retain_ctxt.username = retained->msg.source_username;
retain_ctxt.listener = retained->source_listener;
rc = acl__find_acls(&retain_ctxt);
if(rc) return rc;
rc = mosquitto_acl_check(&retain_ctxt, retained->topic, retained->payloadlen, retained->payload,
retained->qos, retained->retain, MOSQ_ACL_WRITE);
rc = mosquitto_acl_check(&retain_ctxt, retained->msg.topic, retained->msg.payloadlen, retained->msg.payload,
retained->msg.qos, retained->msg.retain, MOSQ_ACL_WRITE);
if(rc == MOSQ_ERR_ACL_DENIED){
return MOSQ_ERR_SUCCESS;
}else if(rc != MOSQ_ERR_SUCCESS){
@ -245,7 +245,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt
if (db.config->upgrade_outgoing_qos){
qos = sub_qos;
} else {
qos = retained->qos;
qos = retained->msg.qos;
if(qos > sub_qos) qos = sub_qos;
}
if(qos > 0){

@ -68,7 +68,7 @@ static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_
int rc2;
/* Check for ACL topic access. */
rc2 = mosquitto_acl_check(leaf->context, topic, stored->payloadlen, stored->payload, stored->qos, stored->retain, MOSQ_ACL_READ);
rc2 = mosquitto_acl_check(leaf->context, topic, stored->msg.payloadlen, stored->msg.payload, stored->msg.qos, stored->msg.retain, MOSQ_ACL_READ);
if(rc2 == MOSQ_ERR_ACL_DENIED){
return MOSQ_ERR_SUCCESS;
}else if(rc2 == MOSQ_ERR_SUCCESS){

@ -30,17 +30,17 @@ void db__msg_store_free(struct mosquitto__base_msg *store)
{
int i;
mosquitto__free(store->source_id);
mosquitto__free(store->source_username);
mosquitto__free(store->msg.source_id);
mosquitto__free(store->msg.source_username);
if(store->dest_ids){
for(i=0; i<store->dest_id_count; i++){
mosquitto__free(store->dest_ids[i]);
}
mosquitto__free(store->dest_ids);
}
mosquitto__free(store->topic);
mosquitto_property_free_all(&store->properties);
mosquitto__free(store->payload);
mosquitto__free(store->msg.topic);
mosquitto_property_free_all(&store->msg.properties);
mosquitto__free(store->msg.payload);
mosquitto__free(store);
}
@ -51,18 +51,18 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg
UNUSED(origin);
if(source && source->id){
stored->source_id = mosquitto__strdup(source->id);
stored->msg.source_id = mosquitto__strdup(source->id);
}else{
stored->source_id = mosquitto__strdup("");
stored->msg.source_id = mosquitto__strdup("");
}
if(!stored->source_id){
if(!stored->msg.source_id){
rc = MOSQ_ERR_NOMEM;
goto error;
}
if(source && source->username){
stored->source_username = mosquitto__strdup(source->username);
if(!stored->source_username){
stored->msg.source_username = mosquitto__strdup(source->username);
if(!stored->msg.source_username){
rc = MOSQ_ERR_NOMEM;
goto error;
}
@ -71,23 +71,23 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg
stored->source_listener = source->listener;
}
if(message_expiry_interval > 0){
stored->message_expiry_time = time(NULL) + message_expiry_interval;
stored->msg.expiry_time = time(NULL) + message_expiry_interval;
}else{
stored->message_expiry_time = 0;
stored->msg.expiry_time = 0;
}
stored->dest_ids = NULL;
stored->dest_id_count = 0;
db.msg_store_count++;
db.msg_store_bytes += stored->payloadlen;
db.msg_store_bytes += stored->msg.payloadlen;
if(!store_id){
stored->db_id = ++db.last_db_id;
stored->msg.store_id = ++db.last_db_id;
}else{
stored->db_id = store_id;
stored->msg.store_id = store_id;
}
HASH_ADD(hh, db.msg_store, db_id, sizeof(stored->db_id), stored);
HASH_ADD(hh, db.msg_store, msg.store_id, sizeof(stored->msg.store_id), stored);
return MOSQ_ERR_SUCCESS;
error:

@ -213,18 +213,18 @@ static void TEST_v3_message_store(void)
CU_ASSERT_EQUAL(db.msg_store_bytes, 7);
CU_ASSERT_PTR_NOT_NULL(db.msg_store);
if(db.msg_store){
CU_ASSERT_EQUAL(db.msg_store->db_id, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->qos, 2);
CU_ASSERT_EQUAL(db.msg_store->retain, 1);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->topic);
if(db.msg_store->topic){
CU_ASSERT_STRING_EQUAL(db.msg_store->topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.topic);
if(db.msg_store->msg.topic){
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic");
}
CU_ASSERT_EQUAL(db.msg_store->payloadlen, 7);
if(db.msg_store->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->payload, "payload", 7);
CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7);
if(db.msg_store->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7);
}
}
}
@ -286,17 +286,17 @@ static void TEST_v3_client_message(void)
CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg);
if(context->msgs_out.inflight->base_msg){
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->ref_count, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->source_id, "source_id");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->source_mid, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->qos, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->retain, 1);
CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg->topic);
if(context->msgs_out.inflight->base_msg->topic){
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->topic, "topic");
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.source_id, "source_id");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.source_mid, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.qos, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.retain, 1);
CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg->msg.topic);
if(context->msgs_out.inflight->base_msg->msg.topic){
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.topic, "topic");
}
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->payloadlen, 7);
if(context->msgs_out.inflight->base_msg->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->payload, "payload", 7);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.payloadlen, 7);
if(context->msgs_out.inflight->base_msg->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7);
}
}
CU_ASSERT_EQUAL(context->msgs_out.inflight->mid, 0x73);
@ -331,18 +331,18 @@ static void TEST_v3_retain(void)
CU_ASSERT_EQUAL(db.msg_store_bytes, 7);
CU_ASSERT_PTR_NOT_NULL(db.msg_store);
if(db.msg_store){
CU_ASSERT_EQUAL(db.msg_store->db_id, 0x54);
CU_ASSERT_STRING_EQUAL(db.msg_store->source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->qos, 2);
CU_ASSERT_EQUAL(db.msg_store->retain, 1);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->topic);
if(db.msg_store->topic){
CU_ASSERT_STRING_EQUAL(db.msg_store->topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 0x54);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.topic);
if(db.msg_store->msg.topic){
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic");
}
CU_ASSERT_EQUAL(db.msg_store->payloadlen, 7);
if(db.msg_store->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->payload, "payload", 7);
CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7);
if(db.msg_store->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7);
}
}
CU_ASSERT_PTR_NOT_NULL(db.retains);
@ -413,18 +413,18 @@ static void TEST_v4_message_store(void)
CU_ASSERT_EQUAL(db.msg_store_bytes, 7);
CU_ASSERT_PTR_NOT_NULL(db.msg_store);
if(db.msg_store){
CU_ASSERT_EQUAL(db.msg_store->db_id, 0xFEDCBA9876543210);
CU_ASSERT_STRING_EQUAL(db.msg_store->source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->source_mid, 0x88);
CU_ASSERT_EQUAL(db.msg_store->qos, 1);
CU_ASSERT_EQUAL(db.msg_store->retain, 0);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->topic);
if(db.msg_store->topic){
CU_ASSERT_STRING_EQUAL(db.msg_store->topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 0xFEDCBA9876543210);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 0x88);
CU_ASSERT_EQUAL(db.msg_store->msg.qos, 1);
CU_ASSERT_EQUAL(db.msg_store->msg.retain, 0);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.topic);
if(db.msg_store->msg.topic){
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic");
}
CU_ASSERT_EQUAL(db.msg_store->payloadlen, 7);
if(db.msg_store->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->payload, "payload", 7);
CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7);
if(db.msg_store->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7);
}
}
}
@ -509,17 +509,17 @@ static void TEST_v6_message_store(void)
CU_ASSERT_EQUAL(db.msg_store_bytes, 7);
CU_ASSERT_PTR_NOT_NULL(db.msg_store);
if(db.msg_store){
CU_ASSERT_EQUAL(db.msg_store->db_id, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->qos, 2);
CU_ASSERT_EQUAL(db.msg_store->retain, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->payloadlen, 7);
if(db.msg_store->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->payload, "payload", 7);
CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7);
if(db.msg_store->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7);
}
CU_ASSERT_PTR_NULL(db.msg_store->properties);
CU_ASSERT_PTR_NULL(db.msg_store->msg.properties);
}
}
@ -550,20 +550,20 @@ static void TEST_v6_message_store_props(void)
CU_ASSERT_EQUAL(db.msg_store_bytes, 7);
CU_ASSERT_PTR_NOT_NULL(db.msg_store);
if(db.msg_store){
CU_ASSERT_EQUAL(db.msg_store->db_id, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->qos, 2);
CU_ASSERT_EQUAL(db.msg_store->retain, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->payloadlen, 7);
if(db.msg_store->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->payload, "payload", 7);
CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7);
if(db.msg_store->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7);
}
CU_ASSERT_PTR_NOT_NULL(db.msg_store->properties);
if(db.msg_store->properties){
CU_ASSERT_EQUAL(db.msg_store->properties->identifier, 1);
CU_ASSERT_EQUAL(db.msg_store->properties->value.i8, 1);
CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.properties);
if(db.msg_store->msg.properties){
CU_ASSERT_EQUAL(db.msg_store->msg.properties->identifier, 1);
CU_ASSERT_EQUAL(db.msg_store->msg.properties->value.i8, 1);
}
CU_ASSERT_PTR_NOT_NULL(db.msg_store->source_listener);
}
@ -664,14 +664,14 @@ static void TEST_v6_client_message(void)
CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg);
if(context->msgs_out.inflight->base_msg){
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->ref_count, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->source_id, "source_id");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->source_mid, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->qos, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->retain, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->topic, "topic");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->payloadlen, 7);
if(context->msgs_out.inflight->base_msg->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->payload, "payload", 7);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.source_id, "source_id");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.source_mid, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.qos, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.retain, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.topic, "topic");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.payloadlen, 7);
if(context->msgs_out.inflight->base_msg->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7);
}
}
CU_ASSERT_EQUAL(context->msgs_out.inflight->mid, 0x73);
@ -713,14 +713,14 @@ static void TEST_v6_client_message_props(void)
CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg);
if(context->msgs_out.inflight->base_msg){
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->ref_count, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->source_id, "source_id");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->source_mid, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->qos, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->retain, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->topic, "topic");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->payloadlen, 7);
if(context->msgs_out.inflight->base_msg->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->payload, "payload", 7);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.source_id, "source_id");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.source_mid, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.qos, 2);
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.retain, 1);
CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.topic, "topic");
CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.payloadlen, 7);
if(context->msgs_out.inflight->base_msg->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7);
}
}
CU_ASSERT_EQUAL(context->msgs_out.inflight->mid, 0x73);
@ -755,15 +755,15 @@ static void TEST_v6_retain(void)
CU_ASSERT_EQUAL(db.msg_store_bytes, 7);
CU_ASSERT_PTR_NOT_NULL(db.msg_store);
if(db.msg_store){
CU_ASSERT_EQUAL(db.msg_store->db_id, 0x54);
CU_ASSERT_STRING_EQUAL(db.msg_store->source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->qos, 2);
CU_ASSERT_EQUAL(db.msg_store->retain, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->payloadlen, 7);
if(db.msg_store->payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->payload, "payload", 7);
CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 0x54);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id");
CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2);
CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1);
CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic");
CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7);
if(db.msg_store->msg.payloadlen == 7){
CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7);
}
}
CU_ASSERT_PTR_NOT_NULL(db.retains);

Loading…
Cancel
Save