diff --git a/src/database.c b/src/database.c index 77eb4520..0faf998e 100644 --- a/src/database.c +++ b/src/database.c @@ -893,6 +893,31 @@ int db__message_reconnect_reset(struct mosquitto *context) } +int db__message_remove_incoming(struct mosquitto* context, uint16_t mid) +{ + struct mosquitto_client_msg* tail, * tmp; + bool deleted = false; + + if (!context) return MOSQ_ERR_INVAL; + + DL_FOREACH_SAFE(context->msgs_in.inflight, tail, tmp) { + if (tail->mid == mid) { + if (tail->store->qos != 2) { + return MOSQ_ERR_PROTOCOL; + } + db__message_remove(&context->msgs_in, tail); + deleted = true; + } + } + + if (deleted) { + return MOSQ_ERR_SUCCESS; + } + else { + return MOSQ_ERR_NOT_FOUND; + } +} + int db__message_release_incoming(struct mosquitto *context, uint16_t mid) { struct mosquitto_client_msg *tail, *tmp; diff --git a/src/handle_publish.c b/src/handle_publish.c index 68359b69..75bc8f23 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -285,6 +285,13 @@ int handle__publish(struct mosquitto *context) if(msg->qos > 0){ db__message_store_find(context, msg->source_mid, &stored); } + + if (stored && msg->source_mid != 0 && (stored->qos != msg->qos || stored->payloadlen != msg->payloadlen || strcmp(stored->topic, msg->topic) || memcmp(stored->payload, msg->payload, msg->payloadlen) )){ + log__printf(NULL, MOSQ_LOG_WARNING, "Reused message ID %u from %s detected. Clearing from storage.", msg->source_mid, context->id); + db__message_remove_incoming(context, msg->source_mid); + stored = NULL; + } + if(!stored){ if(msg->qos == 0 || db__ready_for_flight(&context->msgs_in, msg->qos) diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 894e7500..58c26bfe 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -642,6 +642,7 @@ int persist__restore(void); int db__message_count(int *count); int db__message_delete_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state expect_state, int qos); int db__message_insert(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_direction dir, uint8_t qos, bool retain, struct mosquitto_msg_store *stored, mosquitto_property *properties, bool update); +int db__message_remove_incoming(struct mosquitto* context, uint16_t mid); int db__message_release_incoming(struct mosquitto *context, uint16_t mid); int db__message_update_outgoing(struct mosquitto *context, uint16_t mid, enum mosquitto_msg_state state, int qos); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data);