Swap bridge topic array to linked list

Bridge topics are added on startup to an allocated array which is
reallocated with each new topic.  This change alters bridge topic
storage to use a singly linked list.

This is to facilitate upcoming changes to support bridge topic
add/remove operations.
pull/2249/head
Chris Elston 5 years ago
parent ec895f2ad3
commit aa5233fc85

@ -38,14 +38,13 @@ Contributors:
#include "packet_mosq.h" #include "packet_mosq.h"
#include "property_mosq.h" #include "property_mosq.h"
#include "send_mosq.h" #include "send_mosq.h"
#include "utlist.h"
int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval) int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint32_t payloadlen, const void *payload, uint8_t qos, bool retain, bool dup, uint32_t subscription_identifier, const mosquitto_property *store_props, uint32_t expiry_interval)
{ {
#ifdef WITH_BROKER #ifdef WITH_BROKER
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
size_t len; size_t len;
int i;
struct mosquitto__bridge_topic *cur_topic; struct mosquitto__bridge_topic *cur_topic;
bool match; bool match;
int rc; int rc;
@ -64,8 +63,7 @@ int send__publish(struct mosquitto *mosq, uint16_t mid, const char *topic, uint3
#ifdef WITH_BROKER #ifdef WITH_BROKER
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){ if(mosq->bridge && mosq->bridge->topics && mosq->bridge->topic_remapping){
for(i=0; i<mosq->bridge->topic_count; i++){ LL_FOREACH(mosq->bridge->topics, cur_topic){
cur_topic = &mosq->bridge->topics[i];
if((cur_topic->direction == bd_both || cur_topic->direction == bd_out) if((cur_topic->direction == bd_both || cur_topic->direction == bd_out)
&& (cur_topic->remote_prefix || cur_topic->local_prefix)){ && (cur_topic->remote_prefix || cur_topic->local_prefix)){
/* Topic mapping required on this topic if the message matches */ /* Topic mapping required on this topic if the message matches */

@ -53,6 +53,7 @@ Contributors:
#include "tls_mosq.h" #include "tls_mosq.h"
#include "util_mosq.h" #include "util_mosq.h"
#include "will_mosq.h" #include "will_mosq.h"
#include "utlist.h"
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
@ -204,6 +205,7 @@ static int bridge__connect_step1(struct mosquitto *context)
char *notification_topic; char *notification_topic;
size_t notification_topic_len; size_t notification_topic_len;
uint8_t notification_payload; uint8_t notification_payload;
struct mosquitto__bridge_topic *cur_topic;
int i; int i;
uint8_t qos; uint8_t qos;
@ -230,16 +232,16 @@ static int bridge__connect_step1(struct mosquitto *context)
*/ */
sub__clean_session(context); sub__clean_session(context);
for(i=0; i<context->bridge->topic_count; i++){ LL_FOREACH(context->bridge->topics, cur_topic){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
if(context->bridge->topics[i].qos > context->max_qos){ if(cur_topic->qos > context->max_qos){
qos = context->max_qos; qos = context->max_qos;
}else{ }else{
qos = context->bridge->topics[i].qos; qos = cur_topic->qos;
} }
if(sub__add(context, if(sub__add(context,
context->bridge->topics[i].local_topic, cur_topic->local_topic,
qos, qos,
0, 0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
@ -247,7 +249,7 @@ static int bridge__connect_step1(struct mosquitto *context)
return 1; return 1;
} }
retain__queue(context, retain__queue(context,
context->bridge->topics[i].local_topic, cur_topic->local_topic,
qos, 0); qos, 0);
} }
} }
@ -398,10 +400,10 @@ int bridge__connect_step3(struct mosquitto *context)
int bridge__connect(struct mosquitto *context) int bridge__connect(struct mosquitto *context)
{ {
int rc, rc2; int rc, rc2;
int i;
char *notification_topic = NULL; char *notification_topic = NULL;
size_t notification_topic_len; size_t notification_topic_len;
uint8_t notification_payload; uint8_t notification_payload;
struct mosquitto__bridge_topic *cur_topic;
uint8_t qos; uint8_t qos;
mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL; mosquitto_property topic_alias_max, *topic_alias_max_prop = NULL;
@ -428,16 +430,16 @@ int bridge__connect(struct mosquitto *context)
*/ */
sub__clean_session(context); sub__clean_session(context);
for(i=0; i<context->bridge->topic_count; i++){ LL_FOREACH(context->bridge->topics, cur_topic){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, context->bridge->topics[i].local_topic); log__printf(NULL, MOSQ_LOG_DEBUG, "Bridge %s doing local SUBSCRIBE on topic %s", context->id, cur_topic->local_topic);
if(context->bridge->topics[i].qos > context->max_qos){ if(cur_topic->qos > context->max_qos){
qos = context->max_qos; qos = context->max_qos;
}else{ }else{
qos = context->bridge->topics[i].qos; qos = cur_topic->qos;
} }
if(sub__add(context, if(sub__add(context,
context->bridge->topics[i].local_topic, cur_topic->local_topic,
qos, qos,
0, 0,
MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, MQTT_SUB_OPT_NO_LOCAL | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED,
@ -548,10 +550,10 @@ int bridge__connect(struct mosquitto *context)
int bridge__on_connect(struct mosquitto *context) int bridge__on_connect(struct mosquitto *context)
{ {
int i;
char *notification_topic; char *notification_topic;
size_t notification_topic_len; size_t notification_topic_len;
char notification_payload; char notification_payload;
struct mosquitto__bridge_topic *cur_topic;
int sub_opts; int sub_opts;
bool retain = true; bool retain = true;
uint8_t qos; uint8_t qos;
@ -594,12 +596,13 @@ int bridge__on_connect(struct mosquitto *context)
mosquitto__free(notification_topic); mosquitto__free(notification_topic);
} }
} }
for(i=0; i<context->bridge->topic_count; i++){
if(context->bridge->topics[i].direction == bd_in || context->bridge->topics[i].direction == bd_both){ LL_FOREACH(context->bridge->topics, cur_topic){
if(context->bridge->topics[i].qos > context->max_qos){ if(cur_topic->direction == bd_in || cur_topic->direction == bd_both){
if(cur_topic->qos > context->max_qos){
sub_opts = context->max_qos; sub_opts = context->max_qos;
}else{ }else{
sub_opts = context->bridge->topics[i].qos; sub_opts = cur_topic->qos;
} }
if(context->bridge->protocol_version == mosq_p_mqtt5){ if(context->bridge->protocol_version == mosq_p_mqtt5){
sub_opts = sub_opts sub_opts = sub_opts
@ -607,12 +610,12 @@ int bridge__on_connect(struct mosquitto *context)
| MQTT_SUB_OPT_RETAIN_AS_PUBLISHED | MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
| MQTT_SUB_OPT_SEND_RETAIN_ALWAYS; | MQTT_SUB_OPT_SEND_RETAIN_ALWAYS;
} }
if(send__subscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, sub_opts, NULL)){ if(send__subscribe(context, NULL, 1, &cur_topic->remote_topic, sub_opts, NULL)){
return 1; return 1;
} }
}else{ }else{
if(context->bridge->attempt_unsubscribe){ if(context->bridge->attempt_unsubscribe){
if(send__unsubscribe(context, NULL, 1, &context->bridge->topics[i].remote_topic, NULL)){ if(send__unsubscribe(context, NULL, 1, &cur_topic->remote_topic, NULL)){
/* direction = inwards only. This means we should not be subscribed /* direction = inwards only. This means we should not be subscribed
* to the topic. It is possible that we used to be subscribed to * to the topic. It is possible that we used to be subscribed to
* this topic so unsubscribe. */ * this topic so unsubscribe. */
@ -621,15 +624,15 @@ int bridge__on_connect(struct mosquitto *context)
} }
} }
} }
for(i=0; i<context->bridge->topic_count; i++){ LL_FOREACH(context->bridge->topics, cur_topic){
if(context->bridge->topics[i].direction == bd_out || context->bridge->topics[i].direction == bd_both){ if(cur_topic->direction == bd_out || cur_topic->direction == bd_both){
if(context->bridge->topics[i].qos > context->max_qos){ if(cur_topic->qos > context->max_qos){
qos = context->max_qos; qos = context->max_qos;
}else{ }else{
qos = context->bridge->topics[i].qos; qos = cur_topic->qos;
} }
retain__queue(context, retain__queue(context,
context->bridge->topics[i].local_topic, cur_topic->local_topic,
qos, 0); qos, 0);
} }
} }
@ -769,17 +772,6 @@ void bridge__cleanup(struct mosquitto *context)
mosquitto__free(context->bridge->addresses); mosquitto__free(context->bridge->addresses);
context->bridge->addresses = NULL; context->bridge->addresses = NULL;
for(i=0; i<context->bridge->topic_count; i++){
mosquitto__free(context->bridge->topics[i].topic);
mosquitto__free(context->bridge->topics[i].local_prefix);
mosquitto__free(context->bridge->topics[i].remote_prefix);
mosquitto__free(context->bridge->topics[i].local_topic);
mosquitto__free(context->bridge->topics[i].remote_topic);
}
mosquitto__free(context->bridge->topics);
context->bridge->topics = NULL;
config__bridge_cleanup(context->bridge); config__bridge_cleanup(context->bridge);
context->bridge = NULL; context->bridge = NULL;
} }

@ -21,6 +21,7 @@ Contributors:
#include "mosquitto.h" #include "mosquitto.h"
#include "mosquitto_broker_internal.h" #include "mosquitto_broker_internal.h"
#include "memory_mosq.h" #include "memory_mosq.h"
#include "utlist.h"
#ifdef WITH_BRIDGE #ifdef WITH_BRIDGE
static int bridge__create_remap_topic(const char *prefix, const char *topic, char **remap_topic) static int bridge__create_remap_topic(const char *prefix, const char *topic, char **remap_topic)
@ -97,13 +98,48 @@ static int bridge__create_prefix(char **full_prefix, const char *topic, const ch
} }
struct mosquitto__bridge_topic *bridge__find_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix)
{
struct mosquitto__bridge_topic *cur_topic = NULL;
bool found = false;
LL_FOREACH(bridge->topics, cur_topic){
if(cur_topic->direction != direction){
continue;
}
if(cur_topic->qos != qos){
continue;
}
if(cur_topic->topic != NULL && topic != NULL){
if(strcmp(cur_topic->topic, topic)){
continue;
}
}
if(cur_topic->local_prefix != NULL && local_prefix != NULL){
if(strcmp(cur_topic->local_prefix, local_prefix)){
continue;
}
}
if(cur_topic->remote_prefix != NULL && remote_prefix != NULL){
if(strcmp(cur_topic->remote_prefix, remote_prefix)){
continue;
}
}
found = true;
break;
}
if(!found)
cur_topic = NULL;
return cur_topic;
}
/* topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] */ /* topic <topic> [[[out | in | both] qos-level] local-prefix remote-prefix] */
int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix) int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum mosquitto__bridge_direction direction, uint8_t qos, const char *local_prefix, const char *remote_prefix)
{ {
struct mosquitto__bridge_topic *topics;
struct mosquitto__bridge_topic *cur_topic; struct mosquitto__bridge_topic *cur_topic;
if(bridge == NULL) return MOSQ_ERR_INVAL; if(bridge == NULL) return MOSQ_ERR_INVAL;
if(direction != bd_out && direction != bd_in && direction != bd_both){ if(direction != bd_out && direction != bd_in && direction != bd_both){
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
@ -126,18 +162,18 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
} }
if(bridge__find_topic(bridge, topic, direction, qos, local_prefix, remote_prefix) != NULL){
log__printf(NULL, MOSQ_LOG_INFO, "Duplicate bridge topic '%s', skipping", topic);
return MOSQ_ERR_SUCCESS;
}
bridge->topic_count++; bridge->topic_count++;
topics = mosquitto__realloc(bridge->topics, cur_topic = mosquitto__malloc(sizeof(struct mosquitto__bridge_topic));
sizeof(struct mosquitto__bridge_topic)*(size_t)bridge->topic_count); if(cur_topic == NULL){
goto oom;
if(topics == NULL){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
} }
bridge->topics = topics; cur_topic->next = NULL;
cur_topic = &bridge->topics[bridge->topic_count-1];
cur_topic->direction = direction; cur_topic->direction = direction;
cur_topic->qos = qos; cur_topic->qos = qos;
cur_topic->local_prefix = NULL; cur_topic->local_prefix = NULL;
@ -148,8 +184,7 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum
}else{ }else{
cur_topic->topic = mosquitto__strdup(topic); cur_topic->topic = mosquitto__strdup(topic);
if(cur_topic->topic == NULL){ if(cur_topic->topic == NULL){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); goto oom_topic;
return MOSQ_ERR_NOMEM;
} }
} }
@ -157,14 +192,12 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum
bridge->topic_remapping = true; bridge->topic_remapping = true;
if(local_prefix){ if(local_prefix){
if(bridge__create_prefix(&cur_topic->local_prefix, cur_topic->topic, local_prefix, "local")){ if(bridge__create_prefix(&cur_topic->local_prefix, cur_topic->topic, local_prefix, "local")){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); goto oom_lprefix;
return MOSQ_ERR_NOMEM;
} }
} }
if(remote_prefix){ if(remote_prefix){
if(bridge__create_prefix(&cur_topic->remote_prefix, cur_topic->topic, remote_prefix, "local")){ if(bridge__create_prefix(&cur_topic->remote_prefix, cur_topic->topic, remote_prefix, "local")){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory."); goto oom_rprefix;
return MOSQ_ERR_NOMEM;
} }
} }
} }
@ -181,7 +214,19 @@ int bridge__add_topic(struct mosquitto__bridge *bridge, const char *topic, enum
return MOSQ_ERR_INVAL; return MOSQ_ERR_INVAL;
} }
LL_APPEND(bridge->topics, cur_topic);
return MOSQ_ERR_SUCCESS; return MOSQ_ERR_SUCCESS;
oom_rprefix:
mosquitto__free(cur_topic->local_prefix);
oom_lprefix:
mosquitto__free(cur_topic->topic);
oom_topic:
mosquitto__free(cur_topic);
oom:
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
} }
@ -189,14 +234,12 @@ int bridge__remap_topic_in(struct mosquitto *context, char **topic)
{ {
struct mosquitto__bridge_topic *cur_topic; struct mosquitto__bridge_topic *cur_topic;
char *topic_temp; char *topic_temp;
int i;
size_t len; size_t len;
int rc; int rc;
bool match; bool match;
if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){ if(context->bridge && context->bridge->topics && context->bridge->topic_remapping){
for(i=0; i<context->bridge->topic_count; i++){ LL_FOREACH(context->bridge->topics, cur_topic){
cur_topic = &context->bridge->topics[i];
if((cur_topic->direction == bd_both || cur_topic->direction == bd_in) if((cur_topic->direction == bd_both || cur_topic->direction == bd_in)
&& (cur_topic->remote_prefix || cur_topic->local_prefix)){ && (cur_topic->remote_prefix || cur_topic->local_prefix)){

@ -49,6 +49,8 @@ Contributors:
#include "util_mosq.h" #include "util_mosq.h"
#include "mqtt_protocol.h" #include "mqtt_protocol.h"
#include "utlist.h"
struct config_recurse { struct config_recurse {
unsigned int log_dest; unsigned int log_dest;
int log_dest_set; int log_dest_set;
@ -327,6 +329,8 @@ void config__cleanup(struct mosquitto__config *config)
void config__bridge_cleanup(struct mosquitto__bridge *bridge) void config__bridge_cleanup(struct mosquitto__bridge *bridge)
{ {
int i; int i;
struct mosquitto__bridge_topic *cur_topic, *topic_tmp;
if(bridge == NULL) return; if(bridge == NULL) return;
mosquitto__free(bridge->name); mosquitto__free(bridge->name);
@ -343,12 +347,14 @@ void config__bridge_cleanup(struct mosquitto__bridge *bridge)
mosquitto__free(bridge->local_username); mosquitto__free(bridge->local_username);
mosquitto__free(bridge->local_password); mosquitto__free(bridge->local_password);
if(bridge->topics){ if(bridge->topics){
for(i=0; i<bridge->topic_count; i++){ LL_FOREACH_SAFE(bridge->topics, cur_topic, topic_tmp){
mosquitto__free(bridge->topics[i].topic); mosquitto__free(cur_topic->topic);
mosquitto__free(bridge->topics[i].local_prefix); mosquitto__free(cur_topic->local_prefix);
mosquitto__free(bridge->topics[i].remote_prefix); mosquitto__free(cur_topic->remote_prefix);
mosquitto__free(bridge->topics[i].local_topic); mosquitto__free(cur_topic->local_topic);
mosquitto__free(bridge->topics[i].remote_topic); mosquitto__free(cur_topic->remote_topic);
LL_DELETE(bridge->topics, cur_topic);
mosquitto__free(cur_topic);
} }
mosquitto__free(bridge->topics); mosquitto__free(bridge->topics);
} }

@ -500,6 +500,7 @@ enum mosquitto_bridge_reload_type{
}; };
struct mosquitto__bridge_topic{ struct mosquitto__bridge_topic{
struct mosquitto__bridge_topic *next;
char *topic; char *topic;
char *local_prefix; char *local_prefix;
char *remote_prefix; char *remote_prefix;

Loading…
Cancel
Save