diff --git a/lib/send_publish.c b/lib/send_publish.c index 9126afc5..2ac84e0d 100644 --- a/lib/send_publish.c +++ b/lib/send_publish.c @@ -38,14 +38,13 @@ Contributors: #include "packet_mosq.h" #include "property_mosq.h" #include "send_mosq.h" - +#include "utlist.h" int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval) { #ifdef WITH_BROKER #ifdef WITH_BRIDGE size_t len; - int i; struct mosquitto__bridge_topic *cur_topic; bool match; int rc; @@ -64,8 +63,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3 #ifdef WITH_BROKER #ifdef WITH_BRIDGE if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){ - for(i=0; ibridge->topic_count; i++){ - cur_topic = &mosq->bridge->topics[i]; + LL_FOREACH(mosq->bridge->topics, cur_topic){ if((cur_topic->direction == bd_both || cur_topic->direction == bd_out) && (cur_topic->remote_prefix || cur_topic->local_prefix)){ /* Topic mapping required on this topic if the message matches */ diff --git a/src/bridge.c b/src/bridge.c index 8e089263..0714e52c 100644 --- a/src/bridge.c +++ b/src/bridge.c @@ -53,6 +53,7 @@ Contributors: #include "tls_mosq.h" #include "util_mosq.h" #include "will_mosq.h" +#include "utlist.h" #ifdef WITH_BRIDGE @@ -204,6 +205,7 @@ static int bridge__connect_step1(struct mosquitto *context) char *notification_topic; size_t notification_topic_len; uint8_t notification_payload; + struct mosquitto__bridge_topic *cur_topic; int i; uint8_t qos; @@ -230,16 +232,16 @@ static int bridge__connect_step1(struct mosquitto *context) */ sub__clean_session(context); - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(context->bridge->topics[i].qos > context->max_qos){ + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic); + if(cur_topic->qos > context->max_qos){ qos = context->max_qos; }else{ - qos = context->bridge->topics[i].qos; + qos = cur_topic->qos; } if(sub__add(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, @@ -247,7 +249,7 @@ static int bridge__connect_step1(struct mosquitto *context) return 1; } retain__queue(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0); } } @@ -398,10 +400,10 @@ int bridge__connect_step3(struct mosquitto *context) int bridge__connect(struct mosquitto *context) { int rc, rc2; - int i; char *notification_topic = NULL; size_t notification_topic_len; uint8_t notification_payload; + struct mosquitto__bridge_topic *cur_topic; uint8_t qos; mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL; @@ -428,16 +430,16 @@ int bridge__connect(struct mosquitto *context) */ sub__clean_session(context); - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); - if(context->bridge->topics[i].qos > context->max_qos){ + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ + log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic); + if(cur_topic->qos > context->max_qos){ qos = context->max_qos; }else{ - qos = context->bridge->topics[i].qos; + qos = cur_topic->qos; } if(sub__add(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, @@ -548,10 +550,10 @@ int bridge__connect(struct mosquitto *context) int bridge__on_connect(struct mosquitto *context) { - int i; char *notification_topic; size_t notification_topic_len; char notification_payload; + struct mosquitto__bridge_topic *cur_topic; int sub_opts; bool retain = true; uint8_t qos; @@ -594,12 +596,13 @@ int bridge__on_connect(struct mosquitto *context) mosquitto__free(notification_topic); } } - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){ - if(context->bridge->topics[i].qos > context->max_qos){ + + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_in || cur_topic->direction == bd_both){ + if(cur_topic->qos > context->max_qos){ sub_opts = context->max_qos; }else{ - sub_opts = context->bridge->topics[i].qos; + sub_opts = cur_topic->qos; } if(context->bridge->protocol_version == mosq_p_mqtt5){ sub_opts = sub_opts @@ -607,12 +610,12 @@ int bridge__on_connect(struct mosquitto *context) | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_SEND_RETAIN_ALWAYS; } - if(send__subscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, sub_opts, NULL)){ + if(send__subscribe(context, NULL, 1, &cur_topic->remote_topic, sub_opts, NULL)){ return 1; } }else{ if(context->bridge->attempt_unsubscribe){ - if(send__unsubscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, NULL)){ + if(send__unsubscribe(context, NULL, 1, &cur_topic->remote_topic, NULL)){ /* direction = inwards only. This means we should not be subscribed * to the topic. It is possible that we used to be subscribed to * this topic so unsubscribe. */ @@ -621,15 +624,15 @@ int bridge__on_connect(struct mosquitto *context) } } } - for(i=0; ibridge->topic_count; i++){ - if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ - if(context->bridge->topics[i].qos > context->max_qos){ + LL_FOREACH(context->bridge->topics, cur_topic){ + if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){ + if(cur_topic->qos > context->max_qos){ qos = context->max_qos; }else{ - qos = context->bridge->topics[i].qos; + qos = cur_topic->qos; } retain__queue(context, - context->bridge->topics[i].local_topic, + cur_topic->local_topic, qos, 0); } } @@ -769,17 +772,6 @@ void bridge__cleanup(struct mosquitto *context) mosquitto__free(context->bridge->addresses); context->bridge->addresses = NULL; - for(i=0; ibridge->topic_count; i++){ - mosquitto__free(context->bridge->topics[i].topic); - mosquitto__free(context->bridge->topics[i].local_prefix); - mosquitto__free(context->bridge->topics[i].remote_prefix); - mosquitto__free(context->bridge->topics[i].local_topic); - mosquitto__free(context->bridge->topics[i].remote_topic); - } - - mosquitto__free(context->bridge->topics); - context->bridge->topics = NULL; - config__bridge_cleanup(context->bridge); context->bridge = NULL; } diff --git a/src/bridge_topic.c b/src/bridge_topic.c index dc19fda1..29cb8017 100644 --- a/src/bridge_topic.c +++ b/src/bridge_topic.c @@ -21,6 +21,7 @@ Contributors: #include "mosquitto.h" #include "mosquitto_broker_internal.h" #include "memory_mosq.h" +#include "utlist.h" #ifdef WITH_BRIDGE static int bridge__create_remap_topic(const char *prefix, const char *topic, char **remap_topic) @@ -97,13 +98,48 @@ static int bridge__create_prefix(char **full_prefix, const char *topic, const ch } +struct mosquitto__bridge_topic *bridge__find_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix) +{ + struct mosquitto__bridge_topic *cur_topic = NULL; + bool found = false; + + LL_FOREACH(bridge->topics, cur_topic){ + if(cur_topic->direction != direction){ + continue; + } + if(cur_topic->qos != qos){ + continue; + } + if(cur_topic->topic != NULL && topic != NULL){ + if(strcmp(cur_topic->topic, topic)){ + continue; + } + } + if(cur_topic->local_prefix != NULL && local_prefix != NULL){ + if(strcmp(cur_topic->local_prefix, local_prefix)){ + continue; + } + } + if(cur_topic->remote_prefix != NULL && remote_prefix != NULL){ + if(strcmp(cur_topic->remote_prefix, remote_prefix)){ + continue; + } + } + found = true; + break; + } + if(!found) + cur_topic = NULL; + + return cur_topic; +} + + /* topic [[[out | in | both] qos-level] local-prefix remote-prefix] */ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix) { - struct mosquitto__bridge_topic *topics; struct mosquitto__bridge_topic *cur_topic; - if(bridge == NULL) return MOSQ_ERR_INVAL; if(direction != bd_out && direction != bd_in && direction != bd_both){ return MOSQ_ERR_INVAL; @@ -126,18 +162,18 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum return MOSQ_ERR_INVAL; } + if(bridge__find_topic(bridge, topic, direction, qos, local_prefix, remote_prefix) != NULL){ + log__printf(NULL, MOSQ_LOG_INFO, "Duplicate bridge topic '%s', skipping", topic); + return MOSQ_ERR_SUCCESS; + } bridge->topic_count++; - topics = mosquitto__realloc(bridge->topics, - sizeof(struct mosquitto__bridge_topic)*(size_t)bridge->topic_count); - - if(topics == NULL){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + cur_topic = mosquitto__malloc(sizeof(struct mosquitto__bridge_topic)); + if(cur_topic == NULL){ + goto oom; } - bridge->topics = topics; + cur_topic->next = NULL; - cur_topic = &bridge->topics[bridge->topic_count-1]; cur_topic->direction = direction; cur_topic->qos = qos; cur_topic->local_prefix = NULL; @@ -148,8 +184,7 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum }else{ cur_topic->topic = mosquitto__strdup(topic); if(cur_topic->topic == NULL){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + goto oom_topic; } } @@ -157,14 +192,12 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum bridge->topic_remapping = true; if(local_prefix){ if(bridge__create_prefix(&cur_topic->local_prefix, cur_topic->topic, local_prefix, "local")){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + goto oom_lprefix; } } if(remote_prefix){ if(bridge__create_prefix(&cur_topic->remote_prefix, cur_topic->topic, remote_prefix, "local")){ - log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); - return MOSQ_ERR_NOMEM; + goto oom_rprefix; } } } @@ -181,7 +214,19 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum return MOSQ_ERR_INVAL; } + LL_APPEND(bridge->topics, cur_topic); + return MOSQ_ERR_SUCCESS; + +oom_rprefix: + mosquitto__free(cur_topic->local_prefix); +oom_lprefix: + mosquitto__free(cur_topic->topic); +oom_topic: + mosquitto__free(cur_topic); +oom: + log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); + return MOSQ_ERR_NOMEM; } @@ -189,14 +234,12 @@ int bridge__remap_topic_in(struct mosquitto *context, char **topic) { struct mosquitto__bridge_topic *cur_topic; char *topic_temp; - int i; size_t len; int rc; bool match; if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){ - for(i=0; ibridge->topic_count; i++){ - cur_topic = &context->bridge->topics[i]; + LL_FOREACH(context->bridge->topics, cur_topic){ if((cur_topic->direction == bd_both || cur_topic->direction == bd_in) && (cur_topic->remote_prefix || cur_topic->local_prefix)){ diff --git a/src/conf.c b/src/conf.c index 78d7b898..01a952f2 100644 --- a/src/conf.c +++ b/src/conf.c @@ -49,6 +49,8 @@ Contributors: #include "util_mosq.h" #include "mqtt_protocol.h" +#include "utlist.h" + struct config_recurse { unsigned int log_dest; int log_dest_set; @@ -327,6 +329,8 @@ void config__cleanup(struct mosquitto__config *config) void config__bridge_cleanup(struct mosquitto__bridge *bridge) { int i; + struct mosquitto__bridge_topic *cur_topic, *topic_tmp; + if(bridge == NULL) return; mosquitto__free(bridge->name); @@ -343,12 +347,14 @@ void config__bridge_cleanup(struct mosquitto__bridge *bridge) mosquitto__free(bridge->local_username); mosquitto__free(bridge->local_password); if(bridge->topics){ - for(i=0; itopic_count; i++){ - mosquitto__free(bridge->topics[i].topic); - mosquitto__free(bridge->topics[i].local_prefix); - mosquitto__free(bridge->topics[i].remote_prefix); - mosquitto__free(bridge->topics[i].local_topic); - mosquitto__free(bridge->topics[i].remote_topic); + LL_FOREACH_SAFE(bridge->topics, cur_topic, topic_tmp){ + mosquitto__free(cur_topic->topic); + mosquitto__free(cur_topic->local_prefix); + mosquitto__free(cur_topic->remote_prefix); + mosquitto__free(cur_topic->local_topic); + mosquitto__free(cur_topic->remote_topic); + LL_DELETE(bridge->topics, cur_topic); + mosquitto__free(cur_topic); } mosquitto__free(bridge->topics); } diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h index f8c179c0..bb8d53f5 100644 --- a/src/mosquitto_broker_internal.h +++ b/src/mosquitto_broker_internal.h @@ -500,6 +500,7 @@ enum mosquitto_bridge_reload_type{ }; struct mosquitto__bridge_topic{ + struct mosquitto__bridge_topic *next; char *topic; char *local_prefix; char *remote_prefix;