From aa5233fc85889199bce90b679a1461a2d55b0701 Mon Sep 17 00:00:00 2001 From: Chris Elston Date: Thu, 1 Apr 2021 15:48:18 +0100 Subject: [PATCH] Swap bridge topic array to linked list Bridge topics are added on startup to an allocated array which is reallocated with each new topic. This change alters bridge topic storage to use a singly linked list. This is to facilitate upcoming changes to support bridge topic add/remove operations. --- lib/send_publish.c | 6 +-- src/bridge.c | 66 ++++++++++++--------------- src/bridge_topic.c | 81 +++++++++++++++++++++++++-------- src/conf.c | 18 +++++--- src/mosquitto_broker_internal.h | 1 + 5 files changed, 106 insertions(+), 66 deletions(-) diff --git a/lib/send_publish.c b/lib/send_publish.c index 9126afc5..2ac84e0d 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -38,14 +38,13 @@ Contributors: #include "packet_mosq.h" #include "property_mosq.h" #include "send_mosq.h" - +#include "utlist.h" int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval) { #ifdef WITH_BROKER #ifdef WITH_BRIDGE size_t len; - int i; struct mosquitto__bridge_topic *cur_topic; bool match; int rc; @@ -64,8 +63,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 #ifdef WITH_BROKER #ifdef WITH_BRIDGE if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){ - for(i=0; ibridge->topic_count; i++){ - cur_topic = &mosq->bridge->topics[i]; + LL_FOREACH(mosq->bridge->topics, cur_topic){ if((cur_topic->direction == bd_both || cur_topic->direction == bd_out) && (cur_topic->remote_prefix || cur_topic->local_prefix)){ /* Topic mapping required on this topic if the message matches */ diff --git a/src/bridge.c b/src/bridge.c index 8e089263..0714e52c 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -53,6 +53,7 @@ Contributors: #include "tls_mosq.h" #include "util_mosq.h" #include "will_mosq.h" +#include "utlist.h" #ifdef WITH_BRIDGE @@ -204,6 +205,7 @@ static int bridge__connect_step1(struct mosquitto *context) char *notification_topic; size_t notification_topic_len; uint8_t notification_payload; + struct mosquitto__bridge_topic *cur_topic; int i; uint8_t qos; @@ -230,16 +232,16 @@ static int bridge__connect_step1(struct mosquitto *context) */ sub__clean_session(context); - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(context->bridge->topics[i].qos > context->max_qos){ + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic); + if(cur_topic->qos > context->max_qos){ qos = context->max_qos; }else{ - qos = context->bridge->topics[i].qos; + qos = cur_topic->qos; } if(sub__add(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, @@ -247,7 +249,7 @@ static int bridge__connect_step1(struct mosquitto *context) return 1; } retain__queue(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0); } } @@ -398,10 +400,10 @@ int bridge__connect_step3(struct mosquitto *context) int bridge__connect(struct mosquitto *context) { int rc, rc2; - int i; char *notification_topic = NULL; size_t notification_topic_len; uint8_t notification_payload; + struct mosquitto__bridge_topic *cur_topic; uint8_t qos; mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL; @@ -428,16 +430,16 @@ int bridge__connect(struct mosquitto *context) */ sub__clean_session(context); - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(context->bridge->topics[i].qos > context->max_qos){ + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic); + if(cur_topic->qos > context->max_qos){ qos = context->max_qos; }else{ - qos = context->bridge->topics[i].qos; + qos = cur_topic->qos; } if(sub__add(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, @@ -548,10 +550,10 @@ int bridge__connect(struct mosquitto *context) int bridge__on_connect(struct mosquitto *context) { - int i; char *notification_topic; size_t notification_topic_len; char notification_payload; + struct mosquitto__bridge_topic *cur_topic; int sub_opts; bool retain = true; uint8_t qos; @@ -594,12 +596,13 @@ int bridge__on_connect(struct mosquitto *context) mosquitto__free(notification_topic); } } - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){ - if(context->bridge->topics[i].qos > context->max_qos){ + + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_in || cur_topic->direction == bd_both){ + if(cur_topic->qos > context->max_qos){ sub_opts = context->max_qos; }else{ - sub_opts = context->bridge->topics[i].qos; + sub_opts = cur_topic->qos; } if(context->bridge->protocol_version == mosq_p_mqtt5){ sub_opts = sub_opts @@ -607,12 +610,12 @@ int bridge__on_connect(struct mosquitto *context) | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_SEND_RETAIN_ALWAYS; } - if(send__subscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, sub_opts, NULL)){ + if(send__subscribe(context, NULL, 1, &cur_topic->remote_topic, sub_opts, NULL)){ return 1; } }else{ if(context->bridge->attempt_unsubscribe){ - if(send__unsubscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, NULL)){ + if(send__unsubscribe(context, NULL, 1, &cur_topic->remote_topic, NULL)){ /* direction = inwards only. This means we should not be subscribed * to the topic. It is possible that we used to be subscribed to * this topic so unsubscribe. */ @@ -621,15 +624,15 @@ int bridge__on_connect(struct mosquitto *context) } } } - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - if(context->bridge->topics[i].qos > context->max_qos){ + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ + if(cur_topic->qos > context->max_qos){ qos = context->max_qos; }else{ - qos = context->bridge->topics[i].qos; + qos = cur_topic->qos; } retain__queue(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0); } } @@ -769,17 +772,6 @@ void bridge__cleanup(struct mosquitto *context) mosquitto__free(context->bridge->addresses); context->bridge->addresses = NULL; - for(i=0; ibridge->topic_count; i++){ - mosquitto__free(context->bridge->topics[i].topic); - mosquitto__free(context->bridge->topics[i].local_prefix); - mosquitto__free(context->bridge->topics[i].remote_prefix); - mosquitto__free(context->bridge->topics[i].local_topic); - mosquitto__free(context->bridge->topics[i].remote_topic); - } - - mosquitto__free(context->bridge->topics); - context->bridge->topics = NULL; - config__bridge_cleanup(context->bridge); context->bridge = NULL; } diff --git a/src/bridge_topic.c b/src/bridge_topic.c index dc19fda1..29cb8017 100644 --- a/src/bridge_topic.c +++ b/src/bridge_topic.c @@ -21,6 +21,7 @@ Contributors: #include "mosquitto.h" #include "mosquitto_broker_internal.h" #include "memory_mosq.h" +#include "utlist.h" #ifdef WITH_BRIDGE static int bridge__create_remap_topic(const char *prefix, const char *topic, char **remap_topic) @@ -97,13 +98,48 @@ static int bridge__create_prefix(char **full_prefix, const char *topic, const ch } +struct mosquitto__bridge_topic *bridge__find_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix) +{ + struct mosquitto__bridge_topic *cur_topic = NULL; + bool found = false; + + LL_FOREACH(bridge->topics, cur_topic){ + if(cur_topic->direction != direction){ + continue; + } + if(cur_topic->qos != qos){ + continue; + } + if(cur_topic->topic != NULL && topic != NULL){ + if(strcmp(cur_topic->topic, topic)){ + continue; + } + } + if(cur_topic->local_prefix != NULL && local_prefix != NULL){ + if(strcmp(cur_topic->local_prefix, local_prefix)){ + continue; + } + } + if(cur_topic->remote_prefix != NULL && remote_prefix != NULL){ + if(strcmp(cur_topic->remote_prefix, remote_prefix)){ + continue; + } + } + found = true; + break; + } + if(!found) + cur_topic = NULL; + + return cur_topic; +} + + /* topic [[[out | in | both] qos-level] local-prefix remote-prefix] */ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix) { - struct mosquitto__bridge_topic *topics; struct mosquitto__bridge_topic *cur_topic; - if(bridge == NULL) return MOSQ_ERR_INVAL; if(direction != bd_out && direction != bd_in && direction != bd_both){ return MOSQ_ERR_INVAL; @@ -126,18 +162,18 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum return MOSQ_ERR_INVAL; } + if(bridge__find_topic(bridge, topic, direction, qos, local_prefix, remote_prefix) != NULL){ + log__printf(NULL, MOSQ_LOG_INFO, "Duplicate bridge topic '%s', skipping", topic); + return MOSQ_ERR_SUCCESS; + } bridge->topic_count++; - topics = mosquitto__realloc(bridge->topics, - sizeof(struct mosquitto__bridge_topic)*(size_t)bridge->topic_count); - - if(topics == NULL){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + cur_topic = mosquitto__malloc(sizeof(struct mosquitto__bridge_topic)); + if(cur_topic == NULL){ + goto oom; } - bridge->topics = topics; + cur_topic->next = NULL; - cur_topic = &bridge->topics[bridge->topic_count-1]; cur_topic->direction = direction; cur_topic->qos = qos; cur_topic->local_prefix = NULL; @@ -148,8 +184,7 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum }else{ cur_topic->topic = mosquitto__strdup(topic); if(cur_topic->topic == NULL){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + goto oom_topic; } } @@ -157,14 +192,12 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum bridge->topic_remapping = true; if(local_prefix){ if(bridge__create_prefix(&cur_topic->local_prefix, cur_topic->topic, local_prefix, "local")){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + goto oom_lprefix; } } if(remote_prefix){ if(bridge__create_prefix(&cur_topic->remote_prefix, cur_topic->topic, remote_prefix, "local")){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + goto oom_rprefix; } } } @@ -181,7 +214,19 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum return MOSQ_ERR_INVAL; } + LL_APPEND(bridge->topics, cur_topic); + return MOSQ_ERR_SUCCESS; + +oom_rprefix: + mosquitto__free(cur_topic->local_prefix); +oom_lprefix: + mosquitto__free(cur_topic->topic); +oom_topic: + mosquitto__free(cur_topic); +oom: + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; } @@ -189,14 +234,12 @@ int bridge__remap_topic_in(struct mosquitto *context, char **topic) { struct mosquitto__bridge_topic *cur_topic; char *topic_temp; - int i; size_t len; int rc; bool match; if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){ - for(i=0; ibridge->topic_count; i++){ - cur_topic = &context->bridge->topics[i]; + LL_FOREACH(context->bridge->topics, cur_topic){ if((cur_topic->direction == bd_both || cur_topic->direction == bd_in) && (cur_topic->remote_prefix || cur_topic->local_prefix)){ diff --git a/src/conf.c b/src/conf.c index 78d7b898..01a952f2 100644 --- a/src/conf.c +++ b/src/conf.c @@ -49,6 +49,8 @@ Contributors: #include "util_mosq.h" #include "mqtt_protocol.h" +#include "utlist.h" + struct config_recurse { unsigned int log_dest; int log_dest_set; @@ -327,6 +329,8 @@ void config__cleanup(struct mosquitto__config *config) void config__bridge_cleanup(struct mosquitto__bridge *bridge) { int i; + struct mosquitto__bridge_topic *cur_topic, *topic_tmp; + if(bridge == NULL) return; mosquitto__free(bridge->name); @@ -343,12 +347,14 @@ void config__bridge_cleanup(struct mosquitto__bridge *bridge) mosquitto__free(bridge->local_username); mosquitto__free(bridge->local_password); if(bridge->topics){ - for(i=0; itopic_count; i++){ - mosquitto__free(bridge->topics[i].topic); - mosquitto__free(bridge->topics[i].local_prefix); - mosquitto__free(bridge->topics[i].remote_prefix); - mosquitto__free(bridge->topics[i].local_topic); - mosquitto__free(bridge->topics[i].remote_topic); + LL_FOREACH_SAFE(bridge->topics, cur_topic, topic_tmp){ + mosquitto__free(cur_topic->topic); + mosquitto__free(cur_topic->local_prefix); + mosquitto__free(cur_topic->remote_prefix); + mosquitto__free(cur_topic->local_topic); + mosquitto__free(cur_topic->remote_topic); + LL_DELETE(bridge->topics, cur_topic); + mosquitto__free(cur_topic); } mosquitto__free(bridge->topics); } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index f8c179c0..bb8d53f5 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -500,6 +500,7 @@ enum mosquitto_bridge_reload_type{ }; struct mosquitto__bridge_topic{ + struct mosquitto__bridge_topic *next; char *topic; char *local_prefix; char *remote_prefix;