From b8c6f26995482d2b0874fc77e0e4137fd493f9d1 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Thu, 14 Jan 2021 15:38:51 +0000 Subject: [PATCH] Fix message expiry interval property not being honoured for plugins. This happened in `mosquitto_broker_publish` and `mosquitto_broker_publish_copy` only. --- ChangeLog.txt | 2 ++ src/loop.c | 39 +++++++++++++++++++++++++++++++++++---- 2 files changed, 37 insertions(+), 4 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index 330bc252..9919c002 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -1,6 +1,8 @@ Broker: - Fix potential duplicate Will messages being sent when a will delay interval has been set. +- Fix message expiry interval property not being honoured in + `mosquitto_broker_publish` and `mosquitto_broker_publish_copy`. 2.0.5 - 2021-01-11 diff --git a/src/loop.c b/src/loop.c index 14663c87..aa1367d8 100644 --- a/src/loop.c +++ b/src/loop.c @@ -48,6 +48,7 @@ Contributors: #include "mosquitto_broker_internal.h" #include "memory_mosq.h" +#include "mqtt_protocol.h" #include "packet_mosq.h" #include "send_mosq.h" #include "sys_tree.h" @@ -69,7 +70,7 @@ void lws__sul_callback(struct lws_sorted_usec_list *l) static struct lws_sorted_usec_list sul; #endif -static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg) +static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 *msg, uint32_t message_expiry) { struct mosquitto_msg_store *stored; uint16_t mid; @@ -95,7 +96,7 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 msg->properties = NULL; } - if(db__message_store(context, stored, 0, 0, mosq_mo_broker)) return 1; + if(db__message_store(context, stored, message_expiry, 0, mosq_mo_broker)) return 1; if(msg->qos){ mid = mosquitto__mid_generate(context); @@ -106,20 +107,50 @@ static int single_publish(struct mosquitto *context, struct mosquitto_message_v5 } +static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t *message_expiry) +{ + mosquitto_property *p, *previous = NULL; + + *message_expiry = 0; + + if(!proplist) return; + + p = *proplist; + while(p){ + if(p->identifier == MQTT_PROP_MESSAGE_EXPIRY_INTERVAL){ + *message_expiry = p->value.i32; + if(p == *proplist){ + *proplist = p->next; + }else{ + previous->next = p->next; + } + property__free(&p); + return; + + } + previous = p; + p = p->next; + } +} + void queue_plugin_msgs(void) { struct mosquitto_message_v5 *msg, *tmp; struct mosquitto *context; + uint32_t message_expiry; DL_FOREACH_SAFE(db.plugin_msgs, msg, tmp){ DL_DELETE(db.plugin_msgs, msg); + + read_message_expiry_interval(&msg->properties, &message_expiry); + if(msg->clientid){ HASH_FIND(hh_id, db.contexts_by_id, msg->clientid, strlen(msg->clientid), context); if(context){ - single_publish(context, msg); + single_publish(context, msg, message_expiry); } }else{ - db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, 0, &msg->properties); + db__messages_easy_queue(NULL, msg->topic, (uint8_t)msg->qos, (uint32_t)msg->payloadlen, msg->payload, msg->retain, message_expiry, &msg->properties); } mosquitto__free(msg->topic); mosquitto__free(msg->payload);