diff --git a/ChangeLog.txt b/ChangeLog.txt index 330bc252..9919c002 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,6 +1,8 @@ Broker: - Fix potential duplicate Will messages being sent when a will delay interval has been set. +- Fix message expiry interval property not being honoured in + `mosquitto_broker_publish` and `mosquitto_broker_publish_copy`. 2.0.5 - 2021-01-11 diff --git a/src/loop.c b/src/loop.c index 14663c87..aa1367d8 100644 --- a/src/loop.c +++ b/src/loop.c @@ -48,6 +48,7 @@ Contributors: #include "mosquitto_broker_internal.h" #include "memory_mosq.h" +#include "mqtt_protocol.h" #include "packet_mosq.h" #include "send_mosq.h" #include "sys_tree.h" @@ -69,7 +70,7 @@ void lws__sul_callback(struct lws_sorted_usec_list *l) static struct lws_sorted_usec_list sul; #endif -static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg) +static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry) { struct mosquitto_msg_store *stored; uint16_t mid; @@ -95,7 +96,7 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 msg->properties = NULL; } - if(db__message_store(context, stored, 0, 0, mosq_mo_broker)) return 1; + if(db__message_store(context, stored, message_expiry, 0, mosq_mo_broker)) return 1; if(msg->qos){ mid = mosquitto__mid_generate(context); @@ -106,20 +107,50 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 } +static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t *message_expiry) +{ + mosquitto_property *p, *previous = NULL; + + *message_expiry = 0; + + if(!proplist) return; + + p = *proplist; + while(p){ + if(p->identifier == MQTT_PROP_MESSAGE_EXPIRY_INTERVAL){ + *message_expiry = p->value.i32; + if(p == *proplist){ + *proplist = p->next; + }else{ + previous->next = p->next; + } + property__free(&p); + return; + + } + previous = p; + p = p->next; + } +} + void queue_plugin_msgs(void) { struct mosquitto_message_v5 *msg, *tmp; struct mosquitto *context; + uint32_t message_expiry; DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){ DL_DELETE(db.plugin_msgs, msg); + + read_message_expiry_interval(&msg->properties, &message_expiry); + if(msg->clientid){ HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context); if(context){ - single_publish(context, msg); + single_publish(context, msg, message_expiry); } }else{ - db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties); + db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, message_expiry, &msg->properties); } mosquitto__free(msg->topic); mosquitto__free(msg->payload);