diff --git a/apps/db_dump/db_dump.c b/apps/db_dump/db_dump.c index b5905f55..08871ee2 100644 --- a/apps/db_dump/db_dump.c +++ b/apps/db_dump/db_dump.c @@ -267,14 +267,14 @@ static int dump__base_msg_chunk_process(FILE *db_fptr, uint32_t length) mosquitto__free(chunk.payload); return MOSQ_ERR_NOMEM; } - stored->msg.store_id = chunk.F.store_id; - stored->msg.source_mid = chunk.F.source_mid; - stored->msg.topic = chunk.topic; - stored->msg.qos = chunk.F.qos; - stored->msg.retain = chunk.F.retain; - stored->msg.payloadlen = chunk.F.payloadlen; - stored->msg.payload = chunk.payload; - stored->msg.properties = chunk.properties; + stored->data.store_id = chunk.F.store_id; + stored->data.source_mid = chunk.F.source_mid; + stored->data.topic = chunk.topic; + stored->data.qos = chunk.F.qos; + stored->data.retain = chunk.F.retain; + stored->data.payloadlen = chunk.F.payloadlen; + stored->data.payload = chunk.payload; + stored->data.properties = chunk.properties; rc = db__message_store(&chunk.source, stored, message_expiry_interval, mosq_mo_client); @@ -286,9 +286,9 @@ static int dump__base_msg_chunk_process(FILE *db_fptr, uint32_t length) if(rc == MOSQ_ERR_SUCCESS){ stored->source_listener = chunk.source.listener; - stored->msg.store_id = chunk.F.store_id; + stored->data.store_id = chunk.F.store_id; - HASH_ADD(hh, db.msg_store, msg.store_id, sizeof(dbid_t), stored); + HASH_ADD(hh, db.msg_store, data.store_id, sizeof(dbid_t), stored); }else{ fclose(db_fptr); return rc; diff --git a/include/mosquitto_broker.h b/include/mosquitto_broker.h index 0b2ae3b8..9a73bf31 100644 --- a/include/mosquitto_broker.h +++ b/include/mosquitto_broker.h @@ -279,7 +279,7 @@ struct mosquitto_subscription { * it may change in a future minor release. */ struct mosquitto_evt_persist_subscription { void *future; - struct mosquitto_subscription sub; + struct mosquitto_subscription data; void *future2[8]; }; @@ -332,7 +332,7 @@ struct mosquitto_base_msg { * it may change in a future minor release. */ struct mosquitto_evt_persist_base_msg { void *future; - struct mosquitto_base_msg msg; + struct mosquitto_base_msg data; void *future2[8]; }; diff --git a/plugins/persist-sqlite/base_msgs.c b/plugins/persist-sqlite/base_msgs.c index 6c410a6e..cb59d4c1 100644 --- a/plugins/persist-sqlite/base_msgs.c +++ b/plugins/persist-sqlite/base_msgs.c @@ -147,31 +147,31 @@ int persist_sqlite__base_msg_add_cb(int event, void *event_data, void *userdata) UNUSED(event); rc = 0; - rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 1, (int64_t)ed->msg.store_id); - rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 2, ed->msg.expiry_time); - rc += sqlite3_bind_text(ms->base_msg_add_stmt, 3, ed->msg.topic, (int)strlen(ed->msg.topic), SQLITE_STATIC); - if(ed->msg.payload){ - rc += sqlite3_bind_blob(ms->base_msg_add_stmt, 4, ed->msg.payload, (int)ed->msg.payloadlen, SQLITE_STATIC); + rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 1, (int64_t)ed->data.store_id); + rc += sqlite3_bind_int64(ms->base_msg_add_stmt, 2, ed->data.expiry_time); + rc += sqlite3_bind_text(ms->base_msg_add_stmt, 3, ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC); + if(ed->data.payload){ + rc += sqlite3_bind_blob(ms->base_msg_add_stmt, 4, ed->data.payload, (int)ed->data.payloadlen, SQLITE_STATIC); }else{ rc += sqlite3_bind_null(ms->base_msg_add_stmt, 4); } - if(ed->msg.source_id){ - rc += sqlite3_bind_text(ms->base_msg_add_stmt, 5, ed->msg.source_id, (int)strlen(ed->msg.source_id), SQLITE_STATIC); + if(ed->data.source_id){ + rc += sqlite3_bind_text(ms->base_msg_add_stmt, 5, ed->data.source_id, (int)strlen(ed->data.source_id), SQLITE_STATIC); }else{ rc += sqlite3_bind_null(ms->base_msg_add_stmt, 5); } - if(ed->msg.source_username){ - rc += sqlite3_bind_text(ms->base_msg_add_stmt, 6, ed->msg.source_username, (int)strlen(ed->msg.source_username), SQLITE_STATIC); + if(ed->data.source_username){ + rc += sqlite3_bind_text(ms->base_msg_add_stmt, 6, ed->data.source_username, (int)strlen(ed->data.source_username), SQLITE_STATIC); }else{ rc += sqlite3_bind_null(ms->base_msg_add_stmt, 6); } - rc += sqlite3_bind_int(ms->base_msg_add_stmt, 7, (int)ed->msg.payloadlen); - rc += sqlite3_bind_int(ms->base_msg_add_stmt, 8, ed->msg.source_mid); - rc += sqlite3_bind_int(ms->base_msg_add_stmt, 9, ed->msg.source_port); - rc += sqlite3_bind_int(ms->base_msg_add_stmt, 10, ed->msg.qos); - rc += sqlite3_bind_int(ms->base_msg_add_stmt, 11, ed->msg.retain); - if(ed->msg.properties){ - str = properties_to_json(ed->msg.properties); + rc += sqlite3_bind_int(ms->base_msg_add_stmt, 7, (int)ed->data.payloadlen); + rc += sqlite3_bind_int(ms->base_msg_add_stmt, 8, ed->data.source_mid); + rc += sqlite3_bind_int(ms->base_msg_add_stmt, 9, ed->data.source_port); + rc += sqlite3_bind_int(ms->base_msg_add_stmt, 10, ed->data.qos); + rc += sqlite3_bind_int(ms->base_msg_add_stmt, 11, ed->data.retain); + if(ed->data.properties){ + str = properties_to_json(ed->data.properties); } if(str){ rc += sqlite3_bind_text(ms->base_msg_add_stmt, 12, str, (int)strlen(str), SQLITE_STATIC); @@ -202,7 +202,7 @@ int persist_sqlite__base_msg_remove_cb(int event, void *event_data, void *userda UNUSED(event); - if(sqlite3_bind_int64(ms->base_msg_remove_stmt, 1, (int64_t)ed->msg.store_id) == SQLITE_OK){ + if(sqlite3_bind_int64(ms->base_msg_remove_stmt, 1, (int64_t)ed->data.store_id) == SQLITE_OK){ ms->event_count++; rc = sqlite3_step(ms->base_msg_remove_stmt); if(rc == SQLITE_DONE){ @@ -224,19 +224,19 @@ int persist_sqlite__base_msg_load_cb(int event, void *event_data, void *userdata UNUSED(event); - if(sqlite3_bind_int64(ms->base_msg_load_stmt, 1, (int64_t)ed->msg.store_id) == SQLITE_OK){ + if(sqlite3_bind_int64(ms->base_msg_load_stmt, 1, (int64_t)ed->data.store_id) == SQLITE_OK){ if(sqlite3_step(ms->base_msg_load_stmt) == SQLITE_ROW){ - ed->msg.expiry_time = (time_t)sqlite3_column_int64(ms->base_msg_load_stmt, 1); - ed->msg.topic = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 2); - ed->msg.payload = (void *)sqlite3_column_blob(ms->base_msg_load_stmt, 3); - ed->msg.source_id = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 4); - ed->msg.source_username = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 5); - ed->msg.payloadlen = (uint32_t)sqlite3_column_int(ms->base_msg_load_stmt, 6); - ed->msg.source_mid = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 7); - ed->msg.source_port = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 8); - ed->msg.qos = (uint8_t)sqlite3_column_int(ms->base_msg_load_stmt, 9); - ed->msg.retain = sqlite3_column_int(ms->base_msg_load_stmt, 10); - mosquitto_persist_base_msg_add(&ed->msg); + ed->data.expiry_time = (time_t)sqlite3_column_int64(ms->base_msg_load_stmt, 1); + ed->data.topic = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 2); + ed->data.payload = (void *)sqlite3_column_blob(ms->base_msg_load_stmt, 3); + ed->data.source_id = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 4); + ed->data.source_username = (char *)sqlite3_column_text(ms->base_msg_load_stmt, 5); + ed->data.payloadlen = (uint32_t)sqlite3_column_int(ms->base_msg_load_stmt, 6); + ed->data.source_mid = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 7); + ed->data.source_port = (uint16_t)sqlite3_column_int(ms->base_msg_load_stmt, 8); + ed->data.qos = (uint8_t)sqlite3_column_int(ms->base_msg_load_stmt, 9); + ed->data.retain = sqlite3_column_int(ms->base_msg_load_stmt, 10); + mosquitto_persist_base_msg_add(&ed->data); } } sqlite3_finalize(ms->base_msg_load_stmt); diff --git a/plugins/persist-sqlite/subscriptions.c b/plugins/persist-sqlite/subscriptions.c index d4bf9058..eca71c1e 100644 --- a/plugins/persist-sqlite/subscriptions.c +++ b/plugins/persist-sqlite/subscriptions.c @@ -32,16 +32,16 @@ int persist_sqlite__subscription_add_cb(int event, void *event_data, void *userd UNUSED(event); if(sqlite3_bind_text(ms->subscription_add_stmt, 1, - ed->sub.client_id, (int)strlen(ed->sub.client_id), SQLITE_STATIC) == SQLITE_OK){ + ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK){ if(sqlite3_bind_text(ms->subscription_add_stmt, 2, - ed->sub.topic, (int)strlen(ed->sub.topic), SQLITE_STATIC) == SQLITE_OK){ + ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC) == SQLITE_OK){ if(sqlite3_bind_int(ms->subscription_add_stmt, 3, - ed->sub.options) == SQLITE_OK){ + ed->data.options) == SQLITE_OK){ if(sqlite3_bind_int(ms->subscription_add_stmt, 4, - (int)ed->sub.identifier) == SQLITE_OK){ + (int)ed->data.identifier) == SQLITE_OK){ ms->event_count++; rc = sqlite3_step(ms->subscription_add_stmt); @@ -68,10 +68,10 @@ int persist_sqlite__subscription_remove_cb(int event, void *event_data, void *us UNUSED(event); if(sqlite3_bind_text(ms->subscription_remove_stmt, 1, - ed->sub.client_id, (int)strlen(ed->sub.client_id), SQLITE_STATIC) == SQLITE_OK){ + ed->data.client_id, (int)strlen(ed->data.client_id), SQLITE_STATIC) == SQLITE_OK){ if(sqlite3_bind_text(ms->subscription_remove_stmt, 2, - ed->sub.topic, (int)strlen(ed->sub.topic), SQLITE_STATIC) == SQLITE_OK){ + ed->data.topic, (int)strlen(ed->data.topic), SQLITE_STATIC) == SQLITE_OK){ ms->event_count++; rc = sqlite3_step(ms->subscription_remove_stmt); diff --git a/src/control.c b/src/control.c index 8fcc81e0..14e79dde 100644 --- a/src/control.c +++ b/src/control.c @@ -40,22 +40,22 @@ int control__process(struct mosquitto *context, struct mosquitto__base_msg *base /* Check global plugins and non-per-listener settings first */ opts = &db.config->security_options; - HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->msg.topic, strlen(base_msg->msg.topic), cb_found); + HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->data.topic, strlen(base_msg->data.topic), cb_found); /* If not found, check for per-listener plugins. */ if(cb_found == NULL && db.config->per_listener_settings){ opts = context->listener->security_options; - HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->msg.topic, strlen(base_msg->msg.topic), cb_found); + HASH_FIND(hh, opts->plugin_callbacks.control, base_msg->data.topic, strlen(base_msg->data.topic), cb_found); } if(cb_found){ memset(&event_data, 0, sizeof(event_data)); event_data.client = context; - event_data.topic = base_msg->msg.topic; - event_data.payload = base_msg->msg.payload; - event_data.payloadlen = base_msg->msg.payloadlen; - event_data.qos = base_msg->msg.qos; - event_data.retain = base_msg->msg.retain; - event_data.properties = base_msg->msg.properties; + event_data.topic = base_msg->data.topic; + event_data.payload = base_msg->data.payload; + event_data.payloadlen = base_msg->data.payloadlen; + event_data.qos = base_msg->data.qos; + event_data.retain = base_msg->data.retain; + event_data.properties = base_msg->data.properties; event_data.reason_code = MQTT_RC_SUCCESS; event_data.reason_string = NULL; @@ -68,11 +68,11 @@ int control__process(struct mosquitto *context, struct mosquitto__base_msg *base SAFE_FREE(event_data.reason_string); } - if(base_msg->msg.qos == 1){ - rc2 = send__puback(context, base_msg->msg.source_mid, MQTT_RC_SUCCESS, properties); + if(base_msg->data.qos == 1){ + rc2 = send__puback(context, base_msg->data.source_mid, MQTT_RC_SUCCESS, properties); if(rc2) rc = rc2; - }else if(base_msg->msg.qos == 2){ - rc2 = send__pubrec(context, base_msg->msg.source_mid, MQTT_RC_SUCCESS, properties); + }else if(base_msg->data.qos == 2){ + rc2 = send__pubrec(context, base_msg->data.source_mid, MQTT_RC_SUCCESS, properties); if(rc2) rc = rc2; } mosquitto_property_free_all(&properties); diff --git a/src/database.c b/src/database.c index a260bc51..e7658378 100644 --- a/src/database.c +++ b/src/database.c @@ -141,20 +141,20 @@ bool db__ready_for_queue(struct mosquitto *context, int qos, struct mosquitto_ms void db__msg_add_to_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->inflight_count++; - msg_data->inflight_bytes += client_msg->base_msg->msg.payloadlen; + msg_data->inflight_bytes += client_msg->base_msg->data.payloadlen; if(client_msg->data.qos != 0){ msg_data->inflight_count12++; - msg_data->inflight_bytes12 += client_msg->base_msg->msg.payloadlen; + msg_data->inflight_bytes12 += client_msg->base_msg->data.payloadlen; } } static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->inflight_count--; - msg_data->inflight_bytes -= client_msg->base_msg->msg.payloadlen; + msg_data->inflight_bytes -= client_msg->base_msg->data.payloadlen; if(client_msg->data.qos != 0){ msg_data->inflight_count12--; - msg_data->inflight_bytes12 -= client_msg->base_msg->msg.payloadlen; + msg_data->inflight_bytes12 -= client_msg->base_msg->data.payloadlen; } } @@ -162,20 +162,20 @@ static void db__msg_remove_from_inflight_stats(struct mosquitto_msg_data *msg_da void db__msg_add_to_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->queued_count++; - msg_data->queued_bytes += client_msg->base_msg->msg.payloadlen; + msg_data->queued_bytes += client_msg->base_msg->data.payloadlen; if(client_msg->data.qos != 0){ msg_data->queued_count12++; - msg_data->queued_bytes12 += client_msg->base_msg->msg.payloadlen; + msg_data->queued_bytes12 += client_msg->base_msg->data.payloadlen; } } static void db__msg_remove_from_queued_stats(struct mosquitto_msg_data *msg_data, struct mosquitto__client_msg *client_msg) { msg_data->queued_count--; - msg_data->queued_bytes -= client_msg->base_msg->msg.payloadlen; + msg_data->queued_bytes -= client_msg->base_msg->data.payloadlen; if(client_msg->data.qos != 0){ msg_data->queued_count12--; - msg_data->queued_bytes12 -= client_msg->base_msg->msg.payloadlen; + msg_data->queued_bytes12 -= client_msg->base_msg->data.payloadlen; } } @@ -250,10 +250,10 @@ int db__msg_store_add(struct mosquitto__base_msg *base_msg) struct mosquitto__base_msg *found; unsigned hashv; - HASH_VALUE(&base_msg->msg.store_id, sizeof(base_msg->msg.store_id), hashv); - HASH_FIND_BYHASHVALUE(hh, db.msg_store, &base_msg->msg.store_id, sizeof(base_msg->msg.store_id), hashv, found); + HASH_VALUE(&base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv); + HASH_FIND_BYHASHVALUE(hh, db.msg_store, &base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv, found); if(found == NULL){ - HASH_ADD_KEYPTR_BYHASHVALUE(hh, db.msg_store, &base_msg->msg.store_id, sizeof(base_msg->msg.store_id), hashv, base_msg); + HASH_ADD_KEYPTR_BYHASHVALUE(hh, db.msg_store, &base_msg->data.store_id, sizeof(base_msg->data.store_id), hashv, base_msg); return MOSQ_ERR_SUCCESS; }else{ return MOSQ_ERR_ALREADY_EXISTS; @@ -265,17 +265,17 @@ void db__msg_store_free(struct mosquitto__base_msg *base_msg) { int i; - mosquitto__FREE(base_msg->msg.source_id); - mosquitto__FREE(base_msg->msg.source_username); + mosquitto__FREE(base_msg->data.source_id); + mosquitto__FREE(base_msg->data.source_username); if(base_msg->dest_ids){ for(i=0; idest_id_count; i++){ mosquitto__FREE(base_msg->dest_ids[i]); } mosquitto__FREE(base_msg->dest_ids); } - mosquitto__FREE(base_msg->msg.topic); - mosquitto_property_free_all(&base_msg->msg.properties); - mosquitto__FREE(base_msg->msg.payload); + mosquitto__FREE(base_msg->data.topic); + mosquitto_property_free_all(&base_msg->data.properties); + mosquitto__FREE(base_msg->data.payload); mosquitto__FREE(base_msg); } @@ -284,7 +284,7 @@ void db__msg_store_remove(struct mosquitto__base_msg *base_msg, bool notify) if(base_msg == NULL) return; HASH_DELETE(hh, db.msg_store, base_msg); db.msg_store_count--; - db.msg_store_bytes -= base_msg->msg.payloadlen; + db.msg_store_bytes -= base_msg->data.payloadlen; if(notify == true){ plugin_persist__handle_base_msg_delete(base_msg); } @@ -457,9 +457,9 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str msg_data = &context->msgs_in; - if(db__ready_for_flight(context, mosq_md_in, base_msg->msg.qos)){ + if(db__ready_for_flight(context, mosq_md_in, base_msg->data.qos)){ state = mosq_ms_wait_for_pubrel; - }else if(base_msg->msg.qos != 0 && db__ready_for_queue(context, base_msg->msg.qos, msg_data)){ + }else if(base_msg->data.qos != 0 && db__ready_for_queue(context, base_msg->data.qos, msg_data)){ state = mosq_ms_queued; rc = 2; }else{ @@ -495,16 +495,16 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str } client_msg->base_msg = base_msg; db__msg_store_ref_inc(client_msg->base_msg); - client_msg->data.mid = base_msg->msg.source_mid; + client_msg->data.mid = base_msg->data.source_mid; client_msg->data.direction = mosq_md_in; client_msg->data.state = state; client_msg->data.dup = false; - if(base_msg->msg.qos > context->max_qos){ + if(base_msg->data.qos > context->max_qos){ client_msg->data.qos = context->max_qos; }else{ - client_msg->data.qos = base_msg->msg.qos; + client_msg->data.qos = base_msg->data.qos; } - client_msg->data.retain = base_msg->msg.retain; + client_msg->data.retain = base_msg->data.retain; client_msg->data.subscription_identifier = 0; if(state == mosq_ms_queued){ @@ -520,7 +520,7 @@ int db__message_insert_incoming(struct mosquitto *context, uint64_t cmsg_id, str plugin_persist__handle_client_msg_add(context, client_msg); } - if(client_msg->base_msg->msg.qos > 0){ + if(client_msg->base_msg->data.qos > 0){ util__decrement_receive_quota(context); } return rc; @@ -804,28 +804,28 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_ base_msg = mosquitto__calloc(1, sizeof(struct mosquitto__base_msg)); if(base_msg == NULL) return MOSQ_ERR_NOMEM; - base_msg->msg.topic = mosquitto__strdup(topic); - if(base_msg->msg.topic == NULL){ + base_msg->data.topic = mosquitto__strdup(topic); + if(base_msg->data.topic == NULL){ db__msg_store_free(base_msg); return MOSQ_ERR_INVAL; } - base_msg->msg.qos = qos; + base_msg->data.qos = qos; if(db.config->retain_available == false){ - base_msg->msg.retain = 0; + base_msg->data.retain = 0; }else{ - base_msg->msg.retain = retain; + base_msg->data.retain = retain; } - base_msg->msg.payloadlen = payloadlen; - base_msg->msg.payload = mosquitto__malloc(base_msg->msg.payloadlen+1); - if(base_msg->msg.payload == NULL){ + base_msg->data.payloadlen = payloadlen; + base_msg->data.payload = mosquitto__malloc(base_msg->data.payloadlen+1); + if(base_msg->data.payload == NULL){ db__msg_store_free(base_msg); return MOSQ_ERR_NOMEM; } /* Ensure payload is always zero terminated, this is the reason for the extra byte above */ - ((uint8_t *)base_msg->msg.payload)[base_msg->msg.payloadlen] = 0; - memcpy(base_msg->msg.payload, payload, base_msg->msg.payloadlen); + ((uint8_t *)base_msg->data.payload)[base_msg->data.payloadlen] = 0; + memcpy(base_msg->data.payload, payload, base_msg->data.payloadlen); if(context && context->id){ source_id = context->id; @@ -833,7 +833,7 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_ source_id = ""; } if(properties){ - base_msg->msg.properties = *properties; + base_msg->data.properties = *properties; *properties = NULL; } @@ -844,7 +844,7 @@ int db__messages_easy_queue(struct mosquitto *context, const char *topic, uint8_ } if(db__message_store(context, base_msg, message_expiry_interval, origin)) return 1; - return sub__messages_queue(source_id, base_msg->msg.topic, base_msg->msg.qos, base_msg->msg.retain, &base_msg); + return sub__messages_queue(source_id, base_msg->data.topic, base_msg->data.qos, base_msg->data.retain, &base_msg); } @@ -919,19 +919,19 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg assert(base_msg); if(source && source->id){ - base_msg->msg.source_id = mosquitto__strdup(source->id); + base_msg->data.source_id = mosquitto__strdup(source->id); }else{ - base_msg->msg.source_id = mosquitto__strdup(""); + base_msg->data.source_id = mosquitto__strdup(""); } - if(!base_msg->msg.source_id){ + if(!base_msg->data.source_id){ log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); db__msg_store_free(base_msg); return MOSQ_ERR_NOMEM; } if(source && source->username){ - base_msg->msg.source_username = mosquitto__strdup(source->username); - if(!base_msg->msg.source_username){ + base_msg->data.source_username = mosquitto__strdup(source->username); + if(!base_msg->data.source_username){ db__msg_store_free(base_msg); return MOSQ_ERR_NOMEM; } @@ -941,18 +941,18 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg } base_msg->origin = origin; if(message_expiry_interval > 0){ - base_msg->msg.expiry_time = db.now_real_s + message_expiry_interval; + base_msg->data.expiry_time = db.now_real_s + message_expiry_interval; }else{ - base_msg->msg.expiry_time = 0; + base_msg->data.expiry_time = 0; } base_msg->dest_ids = NULL; base_msg->dest_id_count = 0; db.msg_store_count++; - db.msg_store_bytes += base_msg->msg.payloadlen; + db.msg_store_bytes += base_msg->data.payloadlen; - if(!base_msg->msg.store_id){ - base_msg->msg.store_id = db__new_msg_id(); + if(!base_msg->data.store_id){ + base_msg->data.store_id = db__new_msg_id(); } rc = db__msg_store_add(base_msg); @@ -972,14 +972,14 @@ int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosqu *base_msg = NULL; DL_FOREACH(context->msgs_in.inflight, client_msg){ - if(client_msg->base_msg->msg.source_mid == mid){ + if(client_msg->base_msg->data.source_mid == mid){ *base_msg = client_msg->base_msg; return MOSQ_ERR_SUCCESS; } } DL_FOREACH(context->msgs_in.queued, client_msg){ - if(client_msg->base_msg->msg.source_mid == mid){ + if(client_msg->base_msg->data.source_mid == mid){ *base_msg = client_msg->base_msg; return MOSQ_ERR_SUCCESS; } @@ -1134,7 +1134,7 @@ int db__message_remove_incoming(struct mosquitto* context, uint16_t mid) DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ if(client_msg->data.mid == mid) { - if(client_msg->base_msg->msg.qos != 2){ + if(client_msg->base_msg->data.qos != 2){ return MOSQ_ERR_PROTOCOL; } db__message_remove_inflight(context, &context->msgs_in, client_msg); @@ -1159,12 +1159,12 @@ int db__message_release_incoming(struct mosquitto *context, uint16_t mid) DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ if(client_msg->data.mid == mid){ - if(client_msg->base_msg->msg.qos != 2){ + if(client_msg->base_msg->data.qos != 2){ return MOSQ_ERR_PROTOCOL; } - topic = client_msg->base_msg->msg.topic; + topic = client_msg->base_msg->data.topic; retain = client_msg->data.retain; - source_id = client_msg->base_msg->msg.source_id; + source_id = client_msg->base_msg->data.source_id; /* topic==NULL should be a QoS 2 message that was * denied/dropped and is being processed so the client doesn't @@ -1210,7 +1210,7 @@ void db__expire_all_messages(struct mosquitto *context) struct mosquitto__client_msg *client_msg, *tmp; DL_FOREACH_SAFE(context->msgs_out.inflight, client_msg, tmp){ - if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ if(client_msg->data.qos > 0){ util__increment_send_quota(context); } @@ -1218,12 +1218,12 @@ void db__expire_all_messages(struct mosquitto *context) } } DL_FOREACH_SAFE(context->msgs_out.queued, client_msg, tmp){ - if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ db__message_remove_queued(context, &context->msgs_out, client_msg); } } DL_FOREACH_SAFE(context->msgs_in.inflight, client_msg, tmp){ - if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ if(client_msg->data.qos > 0){ util__increment_receive_quota(context); } @@ -1231,7 +1231,7 @@ void db__expire_all_messages(struct mosquitto *context) } } DL_FOREACH_SAFE(context->msgs_in.queued, client_msg, tmp){ - if(client_msg->base_msg->msg.expiry_time && db.now_real_s > client_msg->base_msg->msg.expiry_time){ + if(client_msg->base_msg->data.expiry_time && db.now_real_s > client_msg->base_msg->data.expiry_time){ db__message_remove_queued(context, &context->msgs_in, client_msg); } } @@ -1256,8 +1256,8 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru base_msg = client_msg->base_msg; expiry_interval = 0; - if(base_msg->msg.expiry_time){ - if(db.now_real_s > base_msg->msg.expiry_time){ + if(base_msg->data.expiry_time){ + if(db.now_real_s > base_msg->data.expiry_time){ /* Message is expired, must not send. */ if(client_msg->data.direction == mosq_md_out && client_msg->data.qos > 0){ util__increment_send_quota(context); @@ -1265,18 +1265,18 @@ static int db__message_write_inflight_out_single(struct mosquitto *context, stru db__message_remove_inflight(context, &context->msgs_out, client_msg); return MOSQ_ERR_SUCCESS; }else{ - expiry_interval = (uint32_t)(base_msg->msg.expiry_time - db.now_real_s); + expiry_interval = (uint32_t)(base_msg->data.expiry_time - db.now_real_s); } } mid = client_msg->data.mid; retries = client_msg->data.dup; retain = client_msg->data.retain; - topic = base_msg->msg.topic; + topic = base_msg->data.topic; qos = (uint8_t)client_msg->data.qos; - payloadlen = base_msg->msg.payloadlen; - payload = base_msg->msg.payload; + payloadlen = base_msg->data.payloadlen; + payload = base_msg->data.payload; subscription_id = client_msg->data.subscription_identifier; - base_msg_props = base_msg->msg.properties; + base_msg_props = base_msg->data.properties; switch(client_msg->data.state){ case mosq_ms_publish_qos0: diff --git a/src/handle_connect.c b/src/handle_connect.c index 5bba3613..4d37408f 100644 --- a/src/handle_connect.c +++ b/src/handle_connect.c @@ -96,9 +96,9 @@ static void connection_check_acl(struct mosquitto *context, struct mosquitto__cl }else{ access = MOSQ_ACL_WRITE; } - if(mosquitto_acl_check(context, base_msg->msg.topic, - base_msg->msg.payloadlen, base_msg->msg.payload, - base_msg->msg.qos, base_msg->msg.retain, access) != MOSQ_ERR_SUCCESS){ + if(mosquitto_acl_check(context, base_msg->data.topic, + base_msg->data.payloadlen, base_msg->data.payload, + base_msg->data.qos, base_msg->data.retain, access) != MOSQ_ERR_SUCCESS){ DL_DELETE((*head), client_msg); db__msg_store_ref_dec(&client_msg->base_msg); diff --git a/src/handle_publish.c b/src/handle_publish.c index e02907a4..6456e4e0 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -65,33 +65,33 @@ int handle__publish(struct mosquitto *context) } dup = (header & 0x08)>>3; - base_msg->msg.qos = (header & 0x06)>>1; - if(dup == 1 && base_msg->msg.qos == 0){ + base_msg->data.qos = (header & 0x06)>>1; + if(dup == 1 && base_msg->data.qos == 0){ log__printf(NULL, MOSQ_LOG_INFO, "Invalid PUBLISH (QoS=0 and DUP=1) from %s, disconnecting.", context->id); db__msg_store_free(base_msg); return MOSQ_ERR_MALFORMED_PACKET; } - if(base_msg->msg.qos == 3){ + if(base_msg->data.qos == 3){ log__printf(NULL, MOSQ_LOG_INFO, "Invalid QoS in PUBLISH from %s, disconnecting.", context->id); db__msg_store_free(base_msg); return MOSQ_ERR_MALFORMED_PACKET; } - if(base_msg->msg.qos > context->max_qos){ + if(base_msg->data.qos > context->max_qos){ log__printf(NULL, MOSQ_LOG_INFO, "Too high QoS in PUBLISH from %s, disconnecting.", context->id); db__msg_store_free(base_msg); return MOSQ_ERR_QOS_NOT_SUPPORTED; } - base_msg->msg.retain = (header & 0x01); + base_msg->data.retain = (header & 0x01); - if(base_msg->msg.retain && db.config->retain_available == false){ + if(base_msg->data.retain && db.config->retain_available == false){ db__msg_store_free(base_msg); return MOSQ_ERR_RETAIN_NOT_SUPPORTED; } - if(packet__read_string(&context->in_packet, &base_msg->msg.topic, &slen)){ + if(packet__read_string(&context->in_packet, &base_msg->data.topic, &slen)){ db__msg_store_free(base_msg); return MOSQ_ERR_MALFORMED_PACKET; } @@ -101,7 +101,7 @@ int handle__publish(struct mosquitto *context) return MOSQ_ERR_MALFORMED_PACKET; } - if(base_msg->msg.qos > 0){ + if(base_msg->data.qos > 0){ if(packet__read_uint16(&context->in_packet, &mid)){ db__msg_store_free(base_msg); return MOSQ_ERR_MALFORMED_PACKET; @@ -112,7 +112,7 @@ int handle__publish(struct mosquitto *context) } /* It is important to have a separate copy of mid, because msg may be * freed before we want to send a PUBACK/PUBREC. */ - base_msg->msg.source_mid = mid; + base_msg->data.source_mid = mid; } /* Handle properties */ @@ -125,7 +125,7 @@ int handle__publish(struct mosquitto *context) p = properties; p_prev = NULL; - base_msg->msg.properties = NULL; + base_msg->data.properties = NULL; msg_properties_last = NULL; while(p){ switch(p->identifier){ @@ -134,11 +134,11 @@ int handle__publish(struct mosquitto *context) case MQTT_PROP_PAYLOAD_FORMAT_INDICATOR: case MQTT_PROP_RESPONSE_TOPIC: case MQTT_PROP_USER_PROPERTY: - if(base_msg->msg.properties){ + if(base_msg->data.properties){ msg_properties_last->next = p; msg_properties_last = p; }else{ - base_msg->msg.properties = p; + base_msg->data.properties = p; msg_properties_last = p; } if(p_prev){ @@ -185,14 +185,14 @@ int handle__publish(struct mosquitto *context) db__msg_store_free(base_msg); return MOSQ_ERR_TOPIC_ALIAS_INVALID; }else if(topic_alias > 0){ - if(base_msg->msg.topic){ - rc = alias__add_r2l(context, base_msg->msg.topic, (uint16_t)topic_alias); + if(base_msg->data.topic){ + rc = alias__add_r2l(context, base_msg->data.topic, (uint16_t)topic_alias); if(rc){ db__msg_store_free(base_msg); return rc; } }else{ - rc = alias__find_by_alias(context, ALIAS_DIR_R2L, (uint16_t)topic_alias, &base_msg->msg.topic); + rc = alias__find_by_alias(context, ALIAS_DIR_R2L, (uint16_t)topic_alias, &base_msg->data.topic); if(rc){ db__msg_store_free(base_msg); return MOSQ_ERR_PROTOCOL; @@ -201,62 +201,62 @@ int handle__publish(struct mosquitto *context) } #ifdef WITH_BRIDGE - rc = bridge__remap_topic_in(context, &base_msg->msg.topic); + rc = bridge__remap_topic_in(context, &base_msg->data.topic); if(rc){ db__msg_store_free(base_msg); return rc; } #endif - if(mosquitto_pub_topic_check(base_msg->msg.topic) != MOSQ_ERR_SUCCESS){ + if(mosquitto_pub_topic_check(base_msg->data.topic) != MOSQ_ERR_SUCCESS){ /* Invalid publish topic, just swallow it. */ db__msg_store_free(base_msg); return MOSQ_ERR_MALFORMED_PACKET; } - base_msg->msg.payloadlen = context->in_packet.remaining_length - context->in_packet.pos; - G_PUB_BYTES_RECEIVED_INC(base_msg->msg.payloadlen); + base_msg->data.payloadlen = context->in_packet.remaining_length - context->in_packet.pos; + G_PUB_BYTES_RECEIVED_INC(base_msg->data.payloadlen); if(context->listener && context->listener->mount_point){ - len = strlen(context->listener->mount_point) + strlen(base_msg->msg.topic) + 1; + len = strlen(context->listener->mount_point) + strlen(base_msg->data.topic) + 1; topic_mount = mosquitto__malloc(len+1); if(!topic_mount){ db__msg_store_free(base_msg); return MOSQ_ERR_NOMEM; } - snprintf(topic_mount, len, "%s%s", context->listener->mount_point, base_msg->msg.topic); + snprintf(topic_mount, len, "%s%s", context->listener->mount_point, base_msg->data.topic); topic_mount[len] = '\0'; - mosquitto__FREE(base_msg->msg.topic); - base_msg->msg.topic = topic_mount; + mosquitto__FREE(base_msg->data.topic); + base_msg->data.topic = topic_mount; } - if(base_msg->msg.payloadlen){ - if(db.config->message_size_limit && base_msg->msg.payloadlen > db.config->message_size_limit){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic, (long)base_msg->msg.payloadlen); + if(base_msg->data.payloadlen){ + if(db.config->message_size_limit && base_msg->data.payloadlen > db.config->message_size_limit){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Dropped too large PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic, (long)base_msg->data.payloadlen); reason_code = MQTT_RC_PACKET_TOO_LARGE; goto process_bad_message; } - base_msg->msg.payload = mosquitto__malloc(base_msg->msg.payloadlen+1); - if(base_msg->msg.payload == NULL){ + base_msg->data.payload = mosquitto__malloc(base_msg->data.payloadlen+1); + if(base_msg->data.payload == NULL){ db__msg_store_free(base_msg); return MOSQ_ERR_NOMEM; } /* Ensure payload is always zero terminated, this is the reason for the extra byte above */ - ((uint8_t *)base_msg->msg.payload)[base_msg->msg.payloadlen] = 0; + ((uint8_t *)base_msg->data.payload)[base_msg->data.payloadlen] = 0; - if(packet__read_bytes(&context->in_packet, base_msg->msg.payload, base_msg->msg.payloadlen)){ + if(packet__read_bytes(&context->in_packet, base_msg->data.payload, base_msg->data.payloadlen)){ db__msg_store_free(base_msg); return MOSQ_ERR_MALFORMED_PACKET; } } /* Check for topic access */ - rc = mosquitto_acl_check(context, base_msg->msg.topic, base_msg->msg.payloadlen, base_msg->msg.payload, base_msg->msg.qos, base_msg->msg.retain, MOSQ_ACL_WRITE); + rc = mosquitto_acl_check(context, base_msg->data.topic, base_msg->data.payloadlen, base_msg->data.payload, base_msg->data.qos, base_msg->data.retain, MOSQ_ACL_WRITE); if(rc == MOSQ_ERR_ACL_DENIED){ log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", - context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic, - (long)base_msg->msg.payloadlen); + context->id, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic, + (long)base_msg->data.payloadlen); reason_code = MQTT_RC_NOT_AUTHORIZED; goto process_bad_message; }else if(rc != MOSQ_ERR_SUCCESS){ @@ -264,9 +264,9 @@ int handle__publish(struct mosquitto *context) return rc; } - log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic, (long)base_msg->msg.payloadlen); + log__printf(NULL, MOSQ_LOG_DEBUG, "Received PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", context->id, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic, (long)base_msg->data.payloadlen); - if(!strncmp(base_msg->msg.topic, "$CONTROL/", 9)){ + if(!strncmp(base_msg->data.topic, "$CONTROL/", 9)){ #ifdef WITH_CONTROL rc = control__process(context, base_msg); db__msg_store_free(base_msg); @@ -282,8 +282,8 @@ int handle__publish(struct mosquitto *context) if(rc == MOSQ_ERR_ACL_DENIED){ log__printf(NULL, MOSQ_LOG_DEBUG, "Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))", - context->id, dup, base_msg->msg.qos, base_msg->msg.retain, base_msg->msg.source_mid, base_msg->msg.topic, - (long)base_msg->msg.payloadlen); + context->id, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic, + (long)base_msg->data.payloadlen); reason_code = MQTT_RC_NOT_AUTHORIZED; goto process_bad_message; @@ -299,31 +299,31 @@ int handle__publish(struct mosquitto *context) } } - if(base_msg->msg.qos > 0){ - db__message_store_find(context, base_msg->msg.source_mid, &stored); + if(base_msg->data.qos > 0){ + db__message_store_find(context, base_msg->data.source_mid, &stored); } - if(stored && base_msg->msg.source_mid != 0 && - (stored->msg.qos != base_msg->msg.qos - || stored->msg.payloadlen != base_msg->msg.payloadlen - || strcmp(stored->msg.topic, base_msg->msg.topic) - || memcmp(stored->msg.payload, base_msg->msg.payload, base_msg->msg.payloadlen) )){ + if(stored && base_msg->data.source_mid != 0 && + (stored->data.qos != base_msg->data.qos + || stored->data.payloadlen != base_msg->data.payloadlen + || strcmp(stored->data.topic, base_msg->data.topic) + || memcmp(stored->data.payload, base_msg->data.payload, base_msg->data.payloadlen) )){ - log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", base_msg->msg.source_mid, context->id); - db__message_remove_incoming(context, base_msg->msg.source_mid); + log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", base_msg->data.source_mid, context->id); + db__message_remove_incoming(context, base_msg->data.source_mid); stored = NULL; } if(!stored){ - if(base_msg->msg.qos > 0 && context->msgs_in.inflight_quota == 0){ + if(base_msg->data.qos > 0 && context->msgs_in.inflight_quota == 0){ /* Client isn't allowed any more incoming messages, so fail early */ db__msg_store_free(base_msg); return MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED; } - if(base_msg->msg.qos == 0 - || db__ready_for_flight(context, mosq_md_in, base_msg->msg.qos) - || db__ready_for_queue(context, base_msg->msg.qos, &context->msgs_in)){ + if(base_msg->data.qos == 0 + || db__ready_for_flight(context, mosq_md_in, base_msg->data.qos) + || db__ready_for_queue(context, base_msg->data.qos, &context->msgs_in)){ dup = 0; rc = db__message_store(context, base_msg, message_expiry_interval, mosq_mo_client); @@ -341,14 +341,14 @@ int handle__publish(struct mosquitto *context) dup = 1; } - switch(stored->msg.qos){ + switch(stored->data.qos){ case 0: - rc2 = sub__messages_queue(context->id, stored->msg.topic, stored->msg.qos, stored->msg.retain, &stored); + rc2 = sub__messages_queue(context->id, stored->data.topic, stored->data.qos, stored->data.retain, &stored); if(rc2 > 0) rc = 1; break; case 1: util__decrement_receive_quota(context); - rc2 = sub__messages_queue(context->id, stored->msg.topic, stored->msg.qos, stored->msg.retain, &stored); + rc2 = sub__messages_queue(context->id, stored->data.topic, stored->data.qos, stored->data.retain, &stored); /* stored may now be free, so don't refer to it */ if(rc2 == MOSQ_ERR_SUCCESS || context->protocol != mosq_p_mqtt5){ if(send__puback(context, mid, 0, NULL)) rc = 1; @@ -368,7 +368,7 @@ int handle__publish(struct mosquitto *context) * due to queue. This isn't an error so don't disconnect them. */ /* FIXME - this is no longer necessary due to failing early above */ if(!res){ - if(send__pubrec(context, stored->msg.source_mid, 0, NULL)) rc = 1; + if(send__pubrec(context, stored->data.source_mid, 0, NULL)) rc = 1; }else if(res == 1){ rc = 1; } @@ -380,15 +380,15 @@ int handle__publish(struct mosquitto *context) process_bad_message: rc = 1; if(base_msg){ - switch(base_msg->msg.qos){ + switch(base_msg->data.qos){ case 0: rc = MOSQ_ERR_SUCCESS; break; case 1: - rc = send__puback(context, base_msg->msg.source_mid, reason_code, NULL); + rc = send__puback(context, base_msg->data.source_mid, reason_code, NULL); break; case 2: - rc = send__pubrec(context, base_msg->msg.source_mid, reason_code, NULL); + rc = send__pubrec(context, base_msg->data.source_mid, reason_code, NULL); break; } db__msg_store_free(base_msg); diff --git a/src/loop.c b/src/loop.c index 4777b498..b89ae6cf 100644 --- a/src/loop.c +++ b/src/loop.c @@ -73,21 +73,21 @@ static int single_publish(struct mosquitto *context, struct mosquitto__message_v base_msg = mosquitto__calloc(1, sizeof(struct mosquitto__base_msg)); if(base_msg == NULL) return MOSQ_ERR_NOMEM; - base_msg->msg.topic = pub_msg->topic; + base_msg->data.topic = pub_msg->topic; pub_msg->topic = NULL; - base_msg->msg.retain = 0; - base_msg->msg.payloadlen = (uint32_t)pub_msg->payloadlen; - base_msg->msg.payload = mosquitto__malloc(base_msg->msg.payloadlen+1); - if(base_msg->msg.payload == NULL){ + base_msg->data.retain = 0; + base_msg->data.payloadlen = (uint32_t)pub_msg->payloadlen; + base_msg->data.payload = mosquitto__malloc(base_msg->data.payloadlen+1); + if(base_msg->data.payload == NULL){ db__msg_store_free(base_msg); return MOSQ_ERR_NOMEM; } /* Ensure payload is always zero terminated, this is the reason for the extra byte above */ - ((uint8_t *)base_msg->msg.payload)[base_msg->msg.payloadlen] = 0; - memcpy(base_msg->msg.payload, pub_msg->payload, base_msg->msg.payloadlen); + ((uint8_t *)base_msg->data.payload)[base_msg->data.payloadlen] = 0; + memcpy(base_msg->data.payload, pub_msg->payload, base_msg->data.payloadlen); if(pub_msg->properties){ - base_msg->msg.properties = pub_msg->properties; + base_msg->data.properties = pub_msg->properties; pub_msg->properties = NULL; } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 2529aebd..82332290 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -410,7 +410,7 @@ struct mosquitto__retainhier { struct mosquitto__base_msg{ UT_hash_handle hh; - struct mosquitto_base_msg msg; + struct mosquitto_base_msg data; struct mosquitto__listener *source_listener; char **dest_ids; int dest_id_count; diff --git a/src/persist_read.c b/src/persist_read.c index 7bc5d6ce..ed5bf924 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -296,14 +296,14 @@ static int persist__base_msg_chunk_restore(FILE *db_fptr, uint32_t length) goto cleanup; } - base_msg->msg.store_id = chunk.F.store_id; - base_msg->msg.source_mid = chunk.F.source_mid; - base_msg->msg.topic = chunk.topic; - base_msg->msg.qos = chunk.F.qos; - base_msg->msg.payloadlen = chunk.F.payloadlen; - base_msg->msg.retain = chunk.F.retain; - base_msg->msg.properties = chunk.properties; - base_msg->msg.payload = chunk.payload; + base_msg->data.store_id = chunk.F.store_id; + base_msg->data.source_mid = chunk.F.source_mid; + base_msg->data.topic = chunk.topic; + base_msg->data.qos = chunk.F.qos; + base_msg->data.payloadlen = chunk.F.payloadlen; + base_msg->data.retain = chunk.F.retain; + base_msg->data.properties = chunk.properties; + base_msg->data.payload = chunk.payload; base_msg->source_listener = chunk.source.listener; rc = db__message_store(&chunk.source, base_msg, message_expiry_interval, @@ -347,8 +347,8 @@ static int persist__retain_chunk_restore(FILE *db_fptr) HASH_FIND(hh, db.msg_store, &chunk.F.store_id, sizeof(chunk.F.store_id), base_msg); if(base_msg){ - if(sub__topic_tokenise(base_msg->msg.topic, &local_topic, &split_topics, NULL)) return 1; - retain__store(base_msg->msg.topic, base_msg, split_topics, true); + if(sub__topic_tokenise(base_msg->data.topic, &local_topic, &split_topics, NULL)) return 1; + retain__store(base_msg->data.topic, base_msg, split_topics, true); mosquitto__FREE(local_topic); mosquitto__FREE(split_topics); retained_count++; diff --git a/src/persist_write.c b/src/persist_write.c index 265b6ee8..97e86c72 100644 --- a/src/persist_write.c +++ b/src/persist_write.c @@ -51,7 +51,7 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex cmsg = queue; while(cmsg){ - if(!strncmp(cmsg->base_msg->msg.topic, "$SYS", 4) + if(!strncmp(cmsg->base_msg->data.topic, "$SYS", 4) && cmsg->base_msg->ref_count <= 1 && cmsg->base_msg->dest_id_count == 0){ @@ -61,7 +61,7 @@ static int persist__client_messages_save(FILE *db_fptr, struct mosquitto *contex continue; } - chunk.F.store_id = cmsg->base_msg->msg.store_id; + chunk.F.store_id = cmsg->base_msg->data.store_id; chunk.F.mid = cmsg->data.mid; chunk.F.id_len = (uint16_t)strlen(context->id); chunk.F.qos = cmsg->data.qos; @@ -95,11 +95,11 @@ static int persist__message_store_save(FILE *db_fptr) base_msg = db.msg_store; HASH_ITER(hh, db.msg_store, base_msg, base_msg_tmp){ - if(base_msg->ref_count < 1 || base_msg->msg.topic == NULL){ + if(base_msg->ref_count < 1 || base_msg->data.topic == NULL){ continue; } - if(!strncmp(base_msg->msg.topic, "$SYS", 4)){ + if(!strncmp(base_msg->data.topic, "$SYS", 4)){ if(base_msg->ref_count <= 1 && base_msg->dest_id_count == 0){ /* $SYS messages that are only retained shouldn't be persisted. */ continue; @@ -110,39 +110,39 @@ static int persist__message_store_save(FILE *db_fptr) * queue. */ chunk.F.retain = 0; }else{ - chunk.F.retain = (uint8_t)base_msg->msg.retain; + chunk.F.retain = (uint8_t)base_msg->data.retain; } - chunk.F.store_id = base_msg->msg.store_id; - chunk.F.expiry_time = base_msg->msg.expiry_time; - chunk.F.payloadlen = base_msg->msg.payloadlen; - chunk.F.source_mid = base_msg->msg.source_mid; - if(base_msg->msg.source_id){ - chunk.F.source_id_len = (uint16_t)strlen(base_msg->msg.source_id); - chunk.source.id = base_msg->msg.source_id; + chunk.F.store_id = base_msg->data.store_id; + chunk.F.expiry_time = base_msg->data.expiry_time; + chunk.F.payloadlen = base_msg->data.payloadlen; + chunk.F.source_mid = base_msg->data.source_mid; + if(base_msg->data.source_id){ + chunk.F.source_id_len = (uint16_t)strlen(base_msg->data.source_id); + chunk.source.id = base_msg->data.source_id; }else{ chunk.F.source_id_len = 0; chunk.source.id = NULL; } - if(base_msg->msg.source_username){ - chunk.F.source_username_len = (uint16_t)strlen(base_msg->msg.source_username); - chunk.source.username = base_msg->msg.source_username; + if(base_msg->data.source_username){ + chunk.F.source_username_len = (uint16_t)strlen(base_msg->data.source_username); + chunk.source.username = base_msg->data.source_username; }else{ chunk.F.source_username_len = 0; chunk.source.username = NULL; } - chunk.F.topic_len = (uint16_t)strlen(base_msg->msg.topic); - chunk.topic = base_msg->msg.topic; + chunk.F.topic_len = (uint16_t)strlen(base_msg->data.topic); + chunk.topic = base_msg->data.topic; if(base_msg->source_listener){ chunk.F.source_port = base_msg->source_listener->port; }else{ chunk.F.source_port = 0; } - chunk.F.qos = base_msg->msg.qos; - chunk.payload = base_msg->msg.payload; - chunk.properties = base_msg->msg.properties; + chunk.F.qos = base_msg->data.qos; + chunk.payload = base_msg->data.payload; + chunk.properties = base_msg->data.properties; rc = persist__chunk_message_store_write_v6(db_fptr, &chunk); if(rc){ @@ -277,9 +277,9 @@ static int persist__retain_save(FILE *db_fptr, struct mosquitto__retainhier *nod memset(&retain_chunk, 0, sizeof(struct P_retain)); - if(node->retained && strncmp(node->retained->msg.topic, "$SYS", 4)){ + if(node->retained && strncmp(node->retained->data.topic, "$SYS", 4)){ /* Don't save $SYS messages. */ - retain_chunk.F.store_id = node->retained->msg.store_id; + retain_chunk.F.store_id = node->retained->data.store_id; rc = persist__chunk_retain_write_v6(db_fptr, &retain_chunk); if(rc){ return rc; diff --git a/src/plugin_message.c b/src/plugin_message.c index 3d1f493f..21a91304 100644 --- a/src/plugin_message.c +++ b/src/plugin_message.c @@ -31,12 +31,12 @@ static int plugin__handle_message_single(struct mosquitto__security_options *opt memset(&event_data, 0, sizeof(event_data)); event_data.client = context; - event_data.topic = stored->msg.topic; - event_data.payloadlen = stored->msg.payloadlen; - event_data.payload = stored->msg.payload; - event_data.qos = stored->msg.qos; - event_data.retain = stored->msg.retain; - event_data.properties = stored->msg.properties; + event_data.topic = stored->data.topic; + event_data.payloadlen = stored->data.payloadlen; + event_data.payload = stored->data.payload; + event_data.qos = stored->data.qos; + event_data.retain = stored->data.retain; + event_data.properties = stored->data.properties; DL_FOREACH(opts->plugin_callbacks.message, cb_base){ rc = cb_base->cb(MOSQ_EVT_MESSAGE, &event_data, cb_base->userdata); @@ -45,14 +45,14 @@ static int plugin__handle_message_single(struct mosquitto__security_options *opt } } - stored->msg.topic = event_data.topic; - if(stored->msg.payload != event_data.payload){ - mosquitto__FREE(stored->msg.payload); - stored->msg.payload = event_data.payload; - stored->msg.payloadlen = event_data.payloadlen; + stored->data.topic = event_data.topic; + if(stored->data.payload != event_data.payload){ + mosquitto__FREE(stored->data.payload); + stored->data.payload = event_data.payload; + stored->data.payloadlen = event_data.payloadlen; } - stored->msg.retain = event_data.retain; - stored->msg.properties = event_data.properties; + stored->data.retain = event_data.retain; + stored->data.properties = event_data.properties; return rc; } diff --git a/src/plugin_persist.c b/src/plugin_persist.c index 0dbee3d3..01df88f8 100644 --- a/src/plugin_persist.c +++ b/src/plugin_persist.c @@ -152,10 +152,10 @@ void plugin_persist__handle_subscription_add(struct mosquitto *context, const st opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.sub.client_id = context->id; - event_data.sub.topic = sub->topic; - event_data.sub.identifier = sub->identifier; - event_data.sub.options = sub->options; + event_data.data.client_id = context->id; + event_data.data.topic = sub->topic; + event_data.data.identifier = sub->identifier; + event_data.data.options = sub->options; DL_FOREACH(opts->plugin_callbacks.persist_subscription_add, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_ADD, &event_data, cb_base->userdata); @@ -174,8 +174,8 @@ void plugin_persist__handle_subscription_delete(struct mosquitto *context, char opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.sub.client_id = context->id; - event_data.sub.topic = sub; + event_data.data.client_id = context->id; + event_data.data.topic = sub; DL_FOREACH(opts->plugin_callbacks.persist_subscription_delete, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_SUBSCRIPTION_DELETE, &event_data, cb_base->userdata); @@ -201,7 +201,7 @@ void plugin_persist__handle_client_msg_add(struct mosquitto *context, const stru event_data.data.client_id = context->id; event_data.data.cmsg_id = client_msg->data.cmsg_id; - event_data.data.store_id = client_msg->base_msg->msg.store_id; + event_data.data.store_id = client_msg->base_msg->data.store_id; event_data.data.mid = client_msg->data.mid; event_data.data.qos = client_msg->data.qos; event_data.data.retain = client_msg->data.retain; @@ -236,7 +236,7 @@ void plugin_persist__handle_client_msg_delete(struct mosquitto *context, const s event_data.data.mid = client_msg->data.mid; event_data.data.state = (uint8_t)client_msg->data.state; event_data.data.qos = client_msg->data.qos; - event_data.data.store_id = client_msg->base_msg->msg.store_id; + event_data.data.store_id = client_msg->base_msg->data.store_id; event_data.data.direction = (uint8_t)client_msg->data.direction; DL_FOREACH(opts->plugin_callbacks.persist_client_msg_delete, cb_base){ @@ -264,7 +264,7 @@ void plugin_persist__handle_client_msg_update(struct mosquitto *context, const s event_data.data.client_id = context->id; event_data.data.cmsg_id = client_msg->data.cmsg_id; event_data.data.mid = client_msg->data.mid; - event_data.data.store_id = client_msg->base_msg->msg.store_id; + event_data.data.store_id = client_msg->base_msg->data.store_id; event_data.data.state = (uint8_t)client_msg->data.state; event_data.data.dup = client_msg->data.dup; event_data.data.direction = (uint8_t)client_msg->data.direction; @@ -287,22 +287,22 @@ void plugin_persist__handle_base_msg_add(struct mosquitto__base_msg *base_msg) opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.msg.store_id = base_msg->msg.store_id; - event_data.msg.expiry_time = base_msg->msg.expiry_time; - event_data.msg.topic = base_msg->msg.topic; - event_data.msg.payload = base_msg->msg.payload; - event_data.msg.source_id = base_msg->msg.source_id; - event_data.msg.source_username = base_msg->msg.source_username; - event_data.msg.properties = base_msg->msg.properties; - event_data.msg.payloadlen = base_msg->msg.payloadlen; - event_data.msg.source_mid = base_msg->msg.source_mid; + event_data.data.store_id = base_msg->data.store_id; + event_data.data.expiry_time = base_msg->data.expiry_time; + event_data.data.topic = base_msg->data.topic; + event_data.data.payload = base_msg->data.payload; + event_data.data.source_id = base_msg->data.source_id; + event_data.data.source_username = base_msg->data.source_username; + event_data.data.properties = base_msg->data.properties; + event_data.data.payloadlen = base_msg->data.payloadlen; + event_data.data.source_mid = base_msg->data.source_mid; if(base_msg->source_listener){ - event_data.msg.source_port = base_msg->source_listener->port; + event_data.data.source_port = base_msg->source_listener->port; }else{ - event_data.msg.source_port = 0; + event_data.data.source_port = 0; } - event_data.msg.qos = base_msg->msg.qos; - event_data.msg.retain = base_msg->msg.retain; + event_data.data.qos = base_msg->data.qos; + event_data.data.retain = base_msg->data.retain; DL_FOREACH(opts->plugin_callbacks.persist_base_msg_add, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_BASE_MSG_ADD, &event_data, cb_base->userdata); @@ -322,7 +322,7 @@ void plugin_persist__handle_base_msg_delete(struct mosquitto__base_msg *base_msg opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.msg.store_id = base_msg->msg.store_id; + event_data.data.store_id = base_msg->data.store_id; DL_FOREACH(opts->plugin_callbacks.persist_base_msg_delete, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_BASE_MSG_DELETE, &event_data, cb_base->userdata); @@ -342,8 +342,8 @@ void plugin_persist__handle_retain_msg_set(struct mosquitto__base_msg *base_msg) opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.store_id = base_msg->msg.store_id; - event_data.topic = base_msg->msg.topic; + event_data.store_id = base_msg->data.store_id; + event_data.topic = base_msg->data.topic; DL_FOREACH(opts->plugin_callbacks.persist_retain_msg_set, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_RETAIN_MSG_SET, &event_data, cb_base->userdata); @@ -362,7 +362,7 @@ void plugin_persist__handle_retain_msg_delete(struct mosquitto__base_msg *base_m opts = &db.config->security_options; memset(&event_data, 0, sizeof(event_data)); - event_data.topic = base_msg->msg.topic; + event_data.topic = base_msg->data.topic; DL_FOREACH(opts->plugin_callbacks.persist_retain_msg_delete, cb_base){ cb_base->cb(MOSQ_EVT_PERSIST_RETAIN_MSG_DELETE, &event_data, cb_base->userdata); diff --git a/src/plugin_public.c b/src/plugin_public.c index 0cdc7bc7..5f41068e 100644 --- a/src/plugin_public.c +++ b/src/plugin_public.c @@ -779,17 +779,17 @@ BROKER_EXPORT int mosquitto_persist_base_msg_add(struct mosquitto_base_msg *msg_ if(base_msg == NULL){ goto error; } - base_msg->msg.store_id = msg_add->store_id; - base_msg->msg.payloadlen = msg_add->payloadlen; - base_msg->msg.source_mid = msg_add->source_mid; - base_msg->msg.qos = msg_add->qos; - base_msg->msg.retain = msg_add->retain; + base_msg->data.store_id = msg_add->store_id; + base_msg->data.payloadlen = msg_add->payloadlen; + base_msg->data.source_mid = msg_add->source_mid; + base_msg->data.qos = msg_add->qos; + base_msg->data.retain = msg_add->retain; - base_msg->msg.payload = msg_add->payload; + base_msg->data.payload = msg_add->payload; msg_add->payload = NULL; - base_msg->msg.topic = msg_add->topic; + base_msg->data.topic = msg_add->topic; msg_add->topic = NULL; - base_msg->msg.properties = msg_add->properties; + base_msg->data.properties = msg_add->properties; msg_add->properties = NULL; if(msg_add->source_port){ diff --git a/src/retain.c b/src/retain.c index ed6b9f80..4dc39046 100644 --- a/src/retain.c +++ b/src/retain.c @@ -165,7 +165,7 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char #endif if(retainhier->retained){ - if(persist && retainhier->retained->msg.topic[0] != '$' && base_msg->msg.payloadlen == 0){ + if(persist && retainhier->retained->data.topic[0] != '$' && base_msg->data.payloadlen == 0){ /* Only delete if another retained message isn't replacing this one */ plugin_persist__handle_retain_msg_delete(retainhier->retained); } @@ -173,15 +173,15 @@ int retain__store(const char *topic, struct mosquitto__base_msg *base_msg, char #ifdef WITH_SYS_TREE db.retained_count--; #endif - if(base_msg->msg.payloadlen == 0){ + if(base_msg->data.payloadlen == 0){ retainhier->retained = NULL; retain__clean_empty_hierarchy(retainhier); } } - if(base_msg->msg.payloadlen){ + if(base_msg->data.payloadlen){ retainhier->retained = base_msg; db__msg_store_ref_inc(retainhier->retained); - if(persist && retainhier->retained->msg.topic[0] != '$'){ + if(persist && retainhier->retained->data.topic[0] != '$'){ plugin_persist__handle_base_msg_add(retainhier->retained); plugin_persist__handle_retain_msg_set(retainhier->retained); } @@ -201,7 +201,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt uint16_t mid; struct mosquitto__base_msg *retained; - if(branch->retained->msg.expiry_time > 0 && db.now_real_s >= branch->retained->msg.expiry_time){ + if(branch->retained->data.expiry_time > 0 && db.now_real_s >= branch->retained->data.expiry_time){ plugin_persist__handle_retain_msg_delete(branch->retained); db__msg_store_ref_dec(&branch->retained); branch->retained = NULL; @@ -213,8 +213,8 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt retained = branch->retained; - rc = mosquitto_acl_check(context, retained->msg.topic, retained->msg.payloadlen, retained->msg.payload, - retained->msg.qos, retained->msg.retain, MOSQ_ACL_READ); + rc = mosquitto_acl_check(context, retained->data.topic, retained->data.payloadlen, retained->data.payload, + retained->data.qos, retained->data.retain, MOSQ_ACL_READ); if(rc == MOSQ_ERR_ACL_DENIED){ return MOSQ_ERR_SUCCESS; }else if(rc != MOSQ_ERR_SUCCESS){ @@ -222,19 +222,19 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt } /* Check for original source access */ - if(db.config->check_retain_source && retained->origin != mosq_mo_broker && retained->msg.source_id){ + if(db.config->check_retain_source && retained->origin != mosq_mo_broker && retained->data.source_id){ struct mosquitto retain_ctxt; memset(&retain_ctxt, 0, sizeof(struct mosquitto)); - retain_ctxt.id = retained->msg.source_id; - retain_ctxt.username = retained->msg.source_username; + retain_ctxt.id = retained->data.source_id; + retain_ctxt.username = retained->data.source_username; retain_ctxt.listener = retained->source_listener; rc = acl__find_acls(&retain_ctxt); if(rc) return rc; - rc = mosquitto_acl_check(&retain_ctxt, retained->msg.topic, retained->msg.payloadlen, retained->msg.payload, - retained->msg.qos, retained->msg.retain, MOSQ_ACL_WRITE); + rc = mosquitto_acl_check(&retain_ctxt, retained->data.topic, retained->data.payloadlen, retained->data.payload, + retained->data.qos, retained->data.retain, MOSQ_ACL_WRITE); if(rc == MOSQ_ERR_ACL_DENIED){ return MOSQ_ERR_SUCCESS; }else if(rc != MOSQ_ERR_SUCCESS){ @@ -246,7 +246,7 @@ static int retain__process(struct mosquitto__retainhier *branch, struct mosquitt if (db.config->upgrade_outgoing_qos){ qos = sub_qos; } else { - qos = retained->msg.qos; + qos = retained->data.qos; if(qos > sub_qos) qos = sub_qos; } if(qos > 0){ diff --git a/src/subs.c b/src/subs.c index d7183e8e..69dc5f56 100644 --- a/src/subs.c +++ b/src/subs.c @@ -68,7 +68,7 @@ static int subs__send(struct mosquitto__subleaf *leaf, const char *topic, uint8_ int rc2; /* Check for ACL topic access. */ - rc2 = mosquitto_acl_check(leaf->context, topic, stored->msg.payloadlen, stored->msg.payload, stored->msg.qos, stored->msg.retain, MOSQ_ACL_READ); + rc2 = mosquitto_acl_check(leaf->context, topic, stored->data.payloadlen, stored->data.payload, stored->data.qos, stored->data.retain, MOSQ_ACL_READ); if(rc2 == MOSQ_ERR_ACL_DENIED){ return MOSQ_ERR_SUCCESS; }else if(rc2 == MOSQ_ERR_SUCCESS){ diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index a2a4af6f..a9e783d2 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -30,17 +30,17 @@ void db__msg_store_free(struct mosquitto__base_msg *store) { int i; - mosquitto__free(store->msg.source_id); - mosquitto__free(store->msg.source_username); + mosquitto__free(store->data.source_id); + mosquitto__free(store->data.source_username); if(store->dest_ids){ for(i=0; idest_id_count; i++){ mosquitto__free(store->dest_ids[i]); } mosquitto__free(store->dest_ids); } - mosquitto__free(store->msg.topic); - mosquitto_property_free_all(&store->msg.properties); - mosquitto__free(store->msg.payload); + mosquitto__free(store->data.topic); + mosquitto_property_free_all(&store->data.properties); + mosquitto__free(store->data.payload); mosquitto__free(store); } @@ -51,18 +51,18 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg UNUSED(origin); if(source && source->id){ - stored->msg.source_id = mosquitto__strdup(source->id); + stored->data.source_id = mosquitto__strdup(source->id); }else{ - stored->msg.source_id = mosquitto__strdup(""); + stored->data.source_id = mosquitto__strdup(""); } - if(!stored->msg.source_id){ + if(!stored->data.source_id){ rc = MOSQ_ERR_NOMEM; goto error; } if(source && source->username){ - stored->msg.source_username = mosquitto__strdup(source->username); - if(!stored->msg.source_username){ + stored->data.source_username = mosquitto__strdup(source->username); + if(!stored->data.source_username){ rc = MOSQ_ERR_NOMEM; goto error; } @@ -71,21 +71,21 @@ int db__message_store(const struct mosquitto *source, struct mosquitto__base_msg stored->source_listener = source->listener; } if(message_expiry_interval > 0){ - stored->msg.expiry_time = time(NULL) + message_expiry_interval; + stored->data.expiry_time = time(NULL) + message_expiry_interval; }else{ - stored->msg.expiry_time = 0; + stored->data.expiry_time = 0; } stored->dest_ids = NULL; stored->dest_id_count = 0; db.msg_store_count++; - db.msg_store_bytes += stored->msg.payloadlen; + db.msg_store_bytes += stored->data.payloadlen; - if(!stored->msg.store_id){ - stored->msg.store_id = ++db.last_db_id; + if(!stored->data.store_id){ + stored->data.store_id = ++db.last_db_id; } - HASH_ADD(hh, db.msg_store, msg.store_id, sizeof(stored->msg.store_id), stored); + HASH_ADD(hh, db.msg_store, data.store_id, sizeof(stored->data.store_id), stored); return MOSQ_ERR_SUCCESS; error: diff --git a/test/unit/persist_read_test.c b/test/unit/persist_read_test.c index c9c544eb..80fd9d61 100644 --- a/test/unit/persist_read_test.c +++ b/test/unit/persist_read_test.c @@ -213,18 +213,18 @@ static void TEST_v3_message_store(void) CU_ASSERT_EQUAL(db.msg_store_bytes, 7); CU_ASSERT_PTR_NOT_NULL(db.msg_store); if(db.msg_store){ - CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 1); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1); - CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.topic); - if(db.msg_store->msg.topic){ - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic"); + CU_ASSERT_EQUAL(db.msg_store->data.store_id, 1); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.source_id, "source_id"); + CU_ASSERT_EQUAL(db.msg_store->data.source_mid, 2); + CU_ASSERT_EQUAL(db.msg_store->data.qos, 2); + CU_ASSERT_EQUAL(db.msg_store->data.retain, 1); + CU_ASSERT_PTR_NOT_NULL(db.msg_store->data.topic); + if(db.msg_store->data.topic){ + CU_ASSERT_STRING_EQUAL(db.msg_store->data.topic, "topic"); } - CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7); - if(db.msg_store->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(db.msg_store->data.payloadlen, 7); + if(db.msg_store->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(db.msg_store->data.payload, "payload", 7); } } } @@ -286,17 +286,17 @@ static void TEST_v3_client_message(void) CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg); if(context->msgs_out.inflight->base_msg){ CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->ref_count, 1); - CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.source_mid, 2); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.qos, 2); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.retain, 1); - CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg->msg.topic); - if(context->msgs_out.inflight->base_msg->msg.topic){ - CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.topic, "topic"); + CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->data.source_id, "source_id"); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.source_mid, 2); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.qos, 2); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.retain, 1); + CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg->data.topic); + if(context->msgs_out.inflight->base_msg->data.topic){ + CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->data.topic, "topic"); } - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.payloadlen, 7); - if(context->msgs_out.inflight->base_msg->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.payloadlen, 7); + if(context->msgs_out.inflight->base_msg->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->data.payload, "payload", 7); } } CU_ASSERT_EQUAL(context->msgs_out.inflight->data.mid, 0x73); @@ -331,18 +331,18 @@ static void TEST_v3_retain(void) CU_ASSERT_EQUAL(db.msg_store_bytes, 7); CU_ASSERT_PTR_NOT_NULL(db.msg_store); if(db.msg_store){ - CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 0x54); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1); - CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.topic); - if(db.msg_store->msg.topic){ - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic"); + CU_ASSERT_EQUAL(db.msg_store->data.store_id, 0x54); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.source_id, "source_id"); + CU_ASSERT_EQUAL(db.msg_store->data.source_mid, 2); + CU_ASSERT_EQUAL(db.msg_store->data.qos, 2); + CU_ASSERT_EQUAL(db.msg_store->data.retain, 1); + CU_ASSERT_PTR_NOT_NULL(db.msg_store->data.topic); + if(db.msg_store->data.topic){ + CU_ASSERT_STRING_EQUAL(db.msg_store->data.topic, "topic"); } - CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7); - if(db.msg_store->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(db.msg_store->data.payloadlen, 7); + if(db.msg_store->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(db.msg_store->data.payload, "payload", 7); } } CU_ASSERT_PTR_NOT_NULL(db.retains); @@ -413,18 +413,18 @@ static void TEST_v4_message_store(void) CU_ASSERT_EQUAL(db.msg_store_bytes, 7); CU_ASSERT_PTR_NOT_NULL(db.msg_store); if(db.msg_store){ - CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 0xFEDCBA9876543210); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 0x88); - CU_ASSERT_EQUAL(db.msg_store->msg.qos, 1); - CU_ASSERT_EQUAL(db.msg_store->msg.retain, 0); - CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.topic); - if(db.msg_store->msg.topic){ - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic"); + CU_ASSERT_EQUAL(db.msg_store->data.store_id, 0xFEDCBA9876543210); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.source_id, "source_id"); + CU_ASSERT_EQUAL(db.msg_store->data.source_mid, 0x88); + CU_ASSERT_EQUAL(db.msg_store->data.qos, 1); + CU_ASSERT_EQUAL(db.msg_store->data.retain, 0); + CU_ASSERT_PTR_NOT_NULL(db.msg_store->data.topic); + if(db.msg_store->data.topic){ + CU_ASSERT_STRING_EQUAL(db.msg_store->data.topic, "topic"); } - CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7); - if(db.msg_store->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(db.msg_store->data.payloadlen, 7); + if(db.msg_store->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(db.msg_store->data.payload, "payload", 7); } } } @@ -509,17 +509,17 @@ static void TEST_v6_message_store(void) CU_ASSERT_EQUAL(db.msg_store_bytes, 7); CU_ASSERT_PTR_NOT_NULL(db.msg_store); if(db.msg_store){ - CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 1); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic"); - CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7); - if(db.msg_store->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(db.msg_store->data.store_id, 1); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.source_id, "source_id"); + CU_ASSERT_EQUAL(db.msg_store->data.source_mid, 2); + CU_ASSERT_EQUAL(db.msg_store->data.qos, 2); + CU_ASSERT_EQUAL(db.msg_store->data.retain, 1); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.topic, "topic"); + CU_ASSERT_EQUAL(db.msg_store->data.payloadlen, 7); + if(db.msg_store->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(db.msg_store->data.payload, "payload", 7); } - CU_ASSERT_PTR_NULL(db.msg_store->msg.properties); + CU_ASSERT_PTR_NULL(db.msg_store->data.properties); } } @@ -550,20 +550,20 @@ static void TEST_v6_message_store_props(void) CU_ASSERT_EQUAL(db.msg_store_bytes, 7); CU_ASSERT_PTR_NOT_NULL(db.msg_store); if(db.msg_store){ - CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 1); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic"); - CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7); - if(db.msg_store->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(db.msg_store->data.store_id, 1); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.source_id, "source_id"); + CU_ASSERT_EQUAL(db.msg_store->data.source_mid, 2); + CU_ASSERT_EQUAL(db.msg_store->data.qos, 2); + CU_ASSERT_EQUAL(db.msg_store->data.retain, 1); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.topic, "topic"); + CU_ASSERT_EQUAL(db.msg_store->data.payloadlen, 7); + if(db.msg_store->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(db.msg_store->data.payload, "payload", 7); } - CU_ASSERT_PTR_NOT_NULL(db.msg_store->msg.properties); - if(db.msg_store->msg.properties){ - CU_ASSERT_EQUAL(db.msg_store->msg.properties->identifier, 1); - CU_ASSERT_EQUAL(db.msg_store->msg.properties->value.i8, 1); + CU_ASSERT_PTR_NOT_NULL(db.msg_store->data.properties); + if(db.msg_store->data.properties){ + CU_ASSERT_EQUAL(db.msg_store->data.properties->identifier, 1); + CU_ASSERT_EQUAL(db.msg_store->data.properties->value.i8, 1); } CU_ASSERT_PTR_NOT_NULL(db.msg_store->source_listener); } @@ -664,14 +664,14 @@ static void TEST_v6_client_message(void) CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg); if(context->msgs_out.inflight->base_msg){ CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->ref_count, 1); - CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.source_mid, 2); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.qos, 2); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.retain, 1); - CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.topic, "topic"); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.payloadlen, 7); - if(context->msgs_out.inflight->base_msg->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7); + CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->data.source_id, "source_id"); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.source_mid, 2); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.qos, 2); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.retain, 1); + CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->data.topic, "topic"); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.payloadlen, 7); + if(context->msgs_out.inflight->base_msg->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->data.payload, "payload", 7); } } CU_ASSERT_EQUAL(context->msgs_out.inflight->data.mid, 0x73); @@ -713,14 +713,14 @@ static void TEST_v6_client_message_props(void) CU_ASSERT_PTR_NOT_NULL(context->msgs_out.inflight->base_msg); if(context->msgs_out.inflight->base_msg){ CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->ref_count, 1); - CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.source_mid, 2); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.qos, 2); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.retain, 1); - CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->msg.topic, "topic"); - CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->msg.payloadlen, 7); - if(context->msgs_out.inflight->base_msg->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->msg.payload, "payload", 7); + CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->data.source_id, "source_id"); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.source_mid, 2); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.qos, 2); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.retain, 1); + CU_ASSERT_STRING_EQUAL(context->msgs_out.inflight->base_msg->data.topic, "topic"); + CU_ASSERT_EQUAL(context->msgs_out.inflight->base_msg->data.payloadlen, 7); + if(context->msgs_out.inflight->base_msg->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(context->msgs_out.inflight->base_msg->data.payload, "payload", 7); } } CU_ASSERT_EQUAL(context->msgs_out.inflight->data.mid, 0x73); @@ -755,15 +755,15 @@ static void TEST_v6_retain(void) CU_ASSERT_EQUAL(db.msg_store_bytes, 7); CU_ASSERT_PTR_NOT_NULL(db.msg_store); if(db.msg_store){ - CU_ASSERT_EQUAL(db.msg_store->msg.store_id, 0x54); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.source_id, "source_id"); - CU_ASSERT_EQUAL(db.msg_store->msg.source_mid, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.qos, 2); - CU_ASSERT_EQUAL(db.msg_store->msg.retain, 1); - CU_ASSERT_STRING_EQUAL(db.msg_store->msg.topic, "topic"); - CU_ASSERT_EQUAL(db.msg_store->msg.payloadlen, 7); - if(db.msg_store->msg.payloadlen == 7){ - CU_ASSERT_NSTRING_EQUAL(db.msg_store->msg.payload, "payload", 7); + CU_ASSERT_EQUAL(db.msg_store->data.store_id, 0x54); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.source_id, "source_id"); + CU_ASSERT_EQUAL(db.msg_store->data.source_mid, 2); + CU_ASSERT_EQUAL(db.msg_store->data.qos, 2); + CU_ASSERT_EQUAL(db.msg_store->data.retain, 1); + CU_ASSERT_STRING_EQUAL(db.msg_store->data.topic, "topic"); + CU_ASSERT_EQUAL(db.msg_store->data.payloadlen, 7); + if(db.msg_store->data.payloadlen == 7){ + CU_ASSERT_NSTRING_EQUAL(db.msg_store->data.payload, "payload", 7); } } CU_ASSERT_PTR_NOT_NULL(db.retains);