From 895e209c414bf42cb9f9d0d5499751f01da3f0a3 Mon Sep 17 00:00:00 2001 From: "Roger A. Light" Date: Tue, 30 Apr 2019 11:45:28 +0100 Subject: [PATCH] Fix broker originated messages not being sent. This occurred when `check_retain_source` was set to true. Closes #1245. Thanks to Christoph Krey. --- ChangeLog.txt | 2 ++ src/database.c | 12 ++++++++++-- src/handle_publish.c | 2 +- src/mosquitto_broker_internal.h | 9 ++++++++- src/persist_read.c | 3 ++- src/subs.c | 2 +- test/unit/persist_read_stubs.c | 2 +- 7 files changed, 25 insertions(+), 7 deletions(-) diff --git a/ChangeLog.txt b/ChangeLog.txt index b33bc53b..275ae21f 100644 --- a/ChangeLog.txt +++ b/ChangeLog.txt @@ -8,6 +8,8 @@ Broker: `response-topic`. Closes #1244. - Fix build for WITH_TLS=no. Closes #1250. - Fix Will message not allowing user-property properties. +- Fix broker originated messages (e.g. $SYS/broker/version) not being + published when `check_retain_source` set to true. Closes #1245. Library: - Fix crash after client has been unable to connect to a broker. This occurs diff --git a/src/database.c b/src/database.c index bc495281..2a32b82c 100644 --- a/src/database.c +++ b/src/database.c @@ -580,6 +580,7 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, char *topic_heap; mosquitto__payload_uhpa payload_uhpa; mosquitto_property *local_properties = NULL; + enum mosquitto_msg_origin origin; assert(db); @@ -608,13 +609,19 @@ int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, local_properties = *properties; *properties = NULL; } - if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0)) return 1; + + if(context){ + origin = mosq_mo_client; + }else{ + origin = mosq_mo_broker; + } + if(db__message_store(db, context, 0, topic_heap, qos, payloadlen, &payload_uhpa, retain, &stored, message_expiry_interval, local_properties, 0, origin)) return 1; return sub__messages_queue(db, source_id, topic_heap, qos, retain, &stored); } /* This function requires topic to be allocated on the heap. Once called, it owns topic and will free it on error. Likewise payload and properties. */ -int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id) +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin) { struct mosquitto_msg_store *temp = NULL; int rc = MOSQ_ERR_SUCCESS; @@ -662,6 +669,7 @@ int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, u topic = NULL; temp->payloadlen = payloadlen; temp->properties = properties; + temp->origin = origin; if(payloadlen){ UHPA_MOVE(temp->payload, *payload, payloadlen); }else{ diff --git a/src/handle_publish.c b/src/handle_publish.c index 0ac20ef2..8b76fd5e 100644 --- a/src/handle_publish.c +++ b/src/handle_publish.c @@ -300,7 +300,7 @@ int handle__publish(struct mosquitto_db *db, struct mosquitto *context) } if(!stored){ dup = 0; - if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0)){ + if(db__message_store(db, context, mid, topic, qos, payloadlen, &payload, retain, &stored, message_expiry_interval, msg_properties, 0, mosq_mo_client)){ mosquitto_property_free_all(&msg_properties); return 1; } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index 0654166f..38cc1360 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -157,6 +157,12 @@ typedef int (*FUNC_auth_plugin_acl_check_v2)(void *, const char *, const char *, typedef int (*FUNC_auth_plugin_unpwd_check_v2)(void *, const char *, const char *); typedef int (*FUNC_auth_plugin_psk_key_get_v2)(void *, const char *, const char *, char *, int); + +enum mosquitto_msg_origin{ + mosq_mo_client = 0, + mosq_mo_broker = 1 +}; + struct mosquitto__auth_plugin{ void *lib; void *user_data; @@ -367,6 +373,7 @@ struct mosquitto_msg_store{ uint16_t mid; uint8_t qos; bool retain; + uint8_t origin; }; struct mosquitto_client_msg{ @@ -608,7 +615,7 @@ int db__message_write(struct mosquitto_db *db, struct mosquitto *context); void db__message_dequeue_first(struct mosquitto *context, struct mosquitto_msg_data *msg_data); int db__messages_delete(struct mosquitto_db *db, struct mosquitto *context); int db__messages_easy_queue(struct mosquitto_db *db, struct mosquitto *context, const char *topic, int qos, uint32_t payloadlen, const void *payload, int retain, uint32_t message_expiry_interval, mosquitto_property **properties); -int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id); +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin); int db__message_store_find(struct mosquitto *context, uint16_t mid, struct mosquitto_msg_store **stored); void db__msg_store_add(struct mosquitto_db *db, struct mosquitto_msg_store *store); void db__msg_store_remove(struct mosquitto_db *db, struct mosquitto_msg_store *store); diff --git a/src/persist_read.c b/src/persist_read.c index 8fe01ab6..1ae2f7a6 100644 --- a/src/persist_read.c +++ b/src/persist_read.c @@ -283,7 +283,8 @@ static int persist__msg_store_chunk_restore(struct mosquitto_db *db, FILE *db_fp rc = db__message_store(db, &chunk.source, chunk.F.source_mid, chunk.topic, chunk.F.qos, chunk.F.payloadlen, - &chunk.payload, chunk.F.retain, &stored, message_expiry_interval, chunk.properties, chunk.F.store_id); + &chunk.payload, chunk.F.retain, &stored, message_expiry_interval, + chunk.properties, chunk.F.store_id, mosq_mo_client); mosquitto__free(chunk.source.id); mosquitto__free(chunk.source.username); diff --git a/src/subs.c b/src/subs.c index a316cb0f..11093819 100644 --- a/src/subs.c +++ b/src/subs.c @@ -1000,7 +1000,7 @@ static int retain__process(struct mosquitto_db *db, struct mosquitto__subhier *b } /* Check for original source access */ - if(db->config->check_retain_source && retained->source_id){ + if(db->config->check_retain_source && retained->origin != mosq_mo_broker && retained->source_id){ struct mosquitto retain_ctxt; memset(&retain_ctxt, 0, sizeof(struct mosquitto)); diff --git a/test/unit/persist_read_stubs.c b/test/unit/persist_read_stubs.c index 146a5d17..7e0d18c4 100644 --- a/test/unit/persist_read_stubs.c +++ b/test/unit/persist_read_stubs.c @@ -28,7 +28,7 @@ struct mosquitto *context__init(struct mosquitto_db *db, mosq_sock_t sock) return m; } -int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id) +int db__message_store(struct mosquitto_db *db, const struct mosquitto *source, uint16_t source_mid, char *topic, int qos, uint32_t payloadlen, mosquitto__payload_uhpa *payload, int retain, struct mosquitto_msg_store **stored, uint32_t message_expiry_interval, mosquitto_property *properties, dbid_t store_id, enum mosquitto_msg_origin origin) { struct mosquitto_msg_store *temp = NULL; int rc = MOSQ_ERR_SUCCESS;