diff --git a/man/mosquitto.conf.5.xml b/man/mosquitto.conf.5.xml
index 6ea2b10f..37ec14eb 100644
--- a/man/mosquitto.conf.5.xml
+++ b/man/mosquitto.conf.5.xml
@@ -1495,7 +1495,7 @@ openssl dhparam -out dhparam.pem 2048
Configuring Bridges
Multiple bridges (connections to other brokers) can be configured
using the following variables.
- Bridges cannot currently be reloaded on reload signal.
+ Reloaded on reload signal.
address[:port] [address[:port]]
@@ -2037,6 +2037,24 @@ topic clients/total in 0 test/mosquitto/org/ $SYS/broker/
can be used on one bridge at once.
+
+ [ lazy | immediate ]
+
+ If you change bridge options in the configuration file,
+ those configuration changes are applied during a bridge
+ reconnection. The
+
+ lazy is the default, and means
+ that any connected bridge will remain in its current state until
+ a natural reconnection happens, at which point the new configuration
+ will be used.
+
+ immediate forces a reconnection and so
+ uses the new configuration straight away.
+
+
[ true | false ]
diff --git a/mosquitto.conf b/mosquitto.conf
index 4e407417..d7c2cb06 100644
--- a/mosquitto.conf
+++ b/mosquitto.conf
@@ -806,6 +806,18 @@
# Set to 0 for "unlimited".
#bridge_max_packet_size 0
+# If you change bridge options in the configuration file, those configuration
+# changes are applied during a bridge reconnection. The bridge_reload_type
+# option determines when that reconnection happens, and can be set to either
+# lazy or immediate.
+#
+# lazy is the default, and means that any connected bridge will remain in its
+# current state until a natural reconnection happens, at which point the new
+# configuration is used.
+#
+# immediate forces a reconnection and so uses the new configuration straight
+# away.
+#bridge_reload_type lazy
# -----------------------------------------------------------------
# Certificate based SSL/TLS support
diff --git a/src/bridge.c b/src/bridge.c
index 45816418..8a428cdf 100644
--- a/src/bridge.c
+++ b/src/bridge.c
@@ -57,20 +57,7 @@ Contributors:
static void bridge__backoff_step(struct mosquitto *context);
static void bridge__backoff_reset(struct mosquitto *context);
-void bridge__start_all(void)
-{
- int i;
-
- for(i=0; ibridge_count; i++){
- if(bridge__new(&(db.config->bridges[i]))){
- log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect to bridge %s.",
- db.config->bridges[i].name);
- }
- }
-}
-
-
-int bridge__new(struct mosquitto__bridge *bridge)
+static struct mosquitto *bridge__new(struct mosquitto__bridge *bridge)
{
struct mosquitto *new_context = NULL;
struct mosquitto **bridges;
@@ -89,7 +76,7 @@ int bridge__new(struct mosquitto__bridge *bridge)
new_context = context__init(INVALID_SOCKET);
if(!new_context){
mosquitto__free(local_id);
- return MOSQ_ERR_NOMEM;
+ return NULL;
}
new_context->id = local_id;
HASH_ADD_KEYPTR(hh_id, db.contexts_by_id, new_context->id, strlen(new_context->id), new_context);
@@ -97,24 +84,24 @@ int bridge__new(struct mosquitto__bridge *bridge)
new_context->bridge = bridge;
new_context->is_bridge = true;
- new_context->username = new_context->bridge->remote_username;
- new_context->password = new_context->bridge->remote_password;
+ new_context->username = bridge->remote_username;
+ new_context->password = bridge->remote_password;
#ifdef WITH_TLS
- new_context->tls_cafile = new_context->bridge->tls_cafile;
- new_context->tls_capath = new_context->bridge->tls_capath;
- new_context->tls_certfile = new_context->bridge->tls_certfile;
- new_context->tls_keyfile = new_context->bridge->tls_keyfile;
+ new_context->tls_cafile = bridge->tls_cafile;
+ new_context->tls_capath = bridge->tls_capath;
+ new_context->tls_certfile = bridge->tls_certfile;
+ new_context->tls_keyfile = bridge->tls_keyfile;
new_context->tls_cert_reqs = SSL_VERIFY_PEER;
- new_context->tls_ocsp_required = new_context->bridge->tls_ocsp_required;
- new_context->tls_version = new_context->bridge->tls_version;
- new_context->tls_insecure = new_context->bridge->tls_insecure;
- new_context->tls_alpn = new_context->bridge->tls_alpn;
+ new_context->tls_ocsp_required = bridge->tls_ocsp_required;
+ new_context->tls_version = bridge->tls_version;
+ new_context->tls_insecure = bridge->tls_insecure;
+ new_context->tls_alpn = bridge->tls_alpn;
new_context->tls_engine = db.config->default_listener.tls_engine;
new_context->tls_keyform = db.config->default_listener.tls_keyform;
#ifdef FINAL_WITH_TLS_PSK
- new_context->tls_psk_identity = new_context->bridge->tls_psk_identity;
- new_context->tls_psk = new_context->bridge->tls_psk;
+ new_context->tls_psk_identity = bridge->tls_psk_identity;
+ new_context->tls_psk = bridge->tls_psk;
#endif
#endif
@@ -132,15 +119,43 @@ int bridge__new(struct mosquitto__bridge *bridge)
db.bridge_count++;
db.bridges[db.bridge_count-1] = new_context;
}else{
- return MOSQ_ERR_NOMEM;
+ return NULL;
}
+ return new_context;
+}
+
+static void bridge__destroy(struct mosquitto *context)
+{
+ send__disconnect(context, MQTT_RC_SUCCESS, NULL);
+ context__cleanup(context, true);
+}
+
+void bridge__start_all(void)
+{
+ int i;
+
+ for(i=0; ibridge_count; i++){
+ struct mosquitto *context;
+ int ret;
+
+ context = bridge__new(db.config->bridges[i]);
+ assert(context);
+
#if defined(__GLIBC__) && defined(WITH_ADNS)
- new_context->bridge->restart_t = 1; /* force quick restart of bridge */
- return bridge__connect_step1(new_context);
+ context->bridge->restart_t = 1; /* force quick restart of bridge */
+ ret = bridge__connect_step1(context);
#else
- return bridge__connect(new_context);
+ ret = bridge__connect(context);
#endif
+
+ if (ret){
+ log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to connect bridge %s.",
+ context->bridge->name);
+ }
+
+ db.config->bridges[i] = NULL;
+ }
}
#if defined(__GLIBC__) && defined(WITH_ADNS)
@@ -583,15 +598,69 @@ int bridge__register_local_connections(void)
}
+void bridge__reload(void)
+{
+ int i;
+ int j;
+
+ // destroy old bridges that dissappeared
+ for(i=0;ibridge_count;j++){
+ if(!strcmp(db.bridges[i]->bridge->name, db.config->bridges[j]->name)) break;
+ }
+
+ if(j==db.config->bridge_count){
+ bridge__destroy(db.bridges[i]);
+ }
+ }
+
+ for(i=0;ibridge_count;i++){
+ for(j=0;jbridges[i]->name, db.bridges[j]->bridge->name)) break;
+ }
+
+ if(j==db.bridge_count){
+ // a new bridge was found, create it
+ bridge__new(db.config->bridges[i]);
+ db.config->bridges[i] = NULL;
+ continue;
+ }
+
+ if(db.config->bridges[i]->reload_type == brt_immediate){
+ // in this case, an existing bridge should match
+ for(j=0;jbridges[i]->name, db.bridges[j]->bridge->name)) break;
+ }
+
+ assert(jwill_delay_interval = 0;
+ bridge__destroy(db.bridges[j]);
+ bridge__new(db.config->bridges[i]);
+ db.config->bridges[i] = NULL;
+ }
+ }
+}
+
+
void bridge__cleanup(struct mosquitto *context)
{
int i;
+ assert(db.bridge_count > 0);
+
for(i=0; ibridge->name);
+ context->bridge->name = NULL;
+
mosquitto__free(context->bridge->local_clientid);
context->bridge->local_clientid = NULL;
@@ -621,6 +690,27 @@ void bridge__cleanup(struct mosquitto *context)
context->ssl_ctx = NULL;
}
#endif
+
+ for(i=0; ibridge->address_count; i++){
+ mosquitto__free(context->bridge->addresses[i].address);
+ }
+
+ mosquitto__free(context->bridge->addresses);
+ context->bridge->addresses = NULL;
+
+ for(i=0; ibridge->topic_count; i++){
+ mosquitto__free(context->bridge->topics[i].topic);
+ mosquitto__free(context->bridge->topics[i].local_prefix);
+ mosquitto__free(context->bridge->topics[i].remote_prefix);
+ mosquitto__free(context->bridge->topics[i].local_topic);
+ mosquitto__free(context->bridge->topics[i].remote_topic);
+ }
+
+ mosquitto__free(context->bridge->topics);
+ context->bridge->topics = NULL;
+
+ config__bridge_cleanup(context->bridge);
+ context->bridge = NULL;
}
@@ -685,6 +775,22 @@ static void bridge__backoff_reset(struct mosquitto *context)
}
}
+static bool reload_if_needed(struct mosquitto *context)
+{
+ int i;
+
+ for(i=0;ibridge_count;i++){
+ if(db.config->bridges[i] && !strcmp(context->bridge->name, db.config->bridges[i]->name)){
+ bridge__destroy(context);
+ bridge__new(db.config->bridges[i]);
+ db.config->bridges[i] = NULL;
+ return true;
+ }
+ }
+
+ return false;
+}
+
void bridge_check(void)
{
static time_t last_check = 0;
@@ -750,6 +856,8 @@ void bridge_check(void)
if(context->sock == INVALID_SOCKET){
+ if(reload_if_needed(context)) continue;
+
/* Want to try to restart the bridge connection */
if(!context->bridge->restart_t){
context->bridge->restart_t = db.now_s+context->bridge->restart_timeout;
diff --git a/src/conf.c b/src/conf.c
index 611a6203..a7bb0e76 100644
--- a/src/conf.c
+++ b/src/conf.c
@@ -70,6 +70,9 @@ static int conf__parse_string(char **token, const char *name, char **value, char
static int config__read_file(struct mosquitto__config *config, bool reload, const char *file, struct config_recurse *config_tmp, int level, int *lineno);
static int config__check(struct mosquitto__config *config);
static void config__cleanup_plugins(struct mosquitto__config *config);
+#ifdef WITH_BRIDGE
+static int config__check_bridges(struct mosquitto__config *config);
+#endif
static void conf__set_cur_security_options(struct mosquitto__config *config, struct mosquitto__listener *cur_listener, struct mosquitto__security_options **security_options)
{
@@ -249,9 +252,6 @@ void config__init(struct mosquitto__config *config)
void config__cleanup(struct mosquitto__config *config)
{
int i;
-#ifdef WITH_BRIDGE
- int j;
-#endif
mosquitto__free(config->clientid_prefixes);
mosquitto__free(config->persistence_location);
@@ -306,39 +306,7 @@ void config__cleanup(struct mosquitto__config *config)
#ifdef WITH_BRIDGE
if(config->bridges){
for(i=0; ibridge_count; i++){
- mosquitto__free(config->bridges[i].name);
- if(config->bridges[i].addresses){
- for(j=0; jbridges[i].address_count; j++){
- mosquitto__free(config->bridges[i].addresses[j].address);
- }
- mosquitto__free(config->bridges[i].addresses);
- }
- mosquitto__free(config->bridges[i].remote_clientid);
- mosquitto__free(config->bridges[i].remote_username);
- mosquitto__free(config->bridges[i].remote_password);
- mosquitto__free(config->bridges[i].local_clientid);
- mosquitto__free(config->bridges[i].local_username);
- mosquitto__free(config->bridges[i].local_password);
- if(config->bridges[i].topics){
- for(j=0; jbridges[i].topic_count; j++){
- mosquitto__free(config->bridges[i].topics[j].topic);
- mosquitto__free(config->bridges[i].topics[j].local_prefix);
- mosquitto__free(config->bridges[i].topics[j].remote_prefix);
- mosquitto__free(config->bridges[i].topics[j].local_topic);
- mosquitto__free(config->bridges[i].topics[j].remote_topic);
- }
- mosquitto__free(config->bridges[i].topics);
- }
- mosquitto__free(config->bridges[i].notification_topic);
-#ifdef WITH_TLS
- mosquitto__free(config->bridges[i].tls_version);
- mosquitto__free(config->bridges[i].tls_cafile);
- mosquitto__free(config->bridges[i].tls_alpn);
-#ifdef FINAL_WITH_TLS_PSK
- mosquitto__free(config->bridges[i].tls_psk_identity);
- mosquitto__free(config->bridges[i].tls_psk);
-#endif
-#endif
+ config__bridge_cleanup(config->bridges[i]);
}
mosquitto__free(config->bridges);
}
@@ -355,6 +323,49 @@ void config__cleanup(struct mosquitto__config *config)
}
}
+#ifdef WITH_BRIDGE
+void config__bridge_cleanup(struct mosquitto__bridge *bridge)
+{
+ int i;
+ if(bridge == NULL) return;
+
+ mosquitto__free(bridge->name);
+ if(bridge->addresses){
+ for(i=0; iaddress_count; i++){
+ mosquitto__free(bridge->addresses[i].address);
+ }
+ mosquitto__free(bridge->addresses);
+ }
+ mosquitto__free(bridge->remote_clientid);
+ mosquitto__free(bridge->remote_username);
+ mosquitto__free(bridge->remote_password);
+ mosquitto__free(bridge->local_clientid);
+ mosquitto__free(bridge->local_username);
+ mosquitto__free(bridge->local_password);
+ if(bridge->topics){
+ for(i=0; itopic_count; i++){
+ mosquitto__free(bridge->topics[i].topic);
+ mosquitto__free(bridge->topics[i].local_prefix);
+ mosquitto__free(bridge->topics[i].remote_prefix);
+ mosquitto__free(bridge->topics[i].local_topic);
+ mosquitto__free(bridge->topics[i].remote_topic);
+ }
+ mosquitto__free(bridge->topics);
+ }
+ mosquitto__free(bridge->notification_topic);
+#ifdef WITH_TLS
+ mosquitto__free(bridge->tls_version);
+ mosquitto__free(bridge->tls_cafile);
+ mosquitto__free(bridge->tls_alpn);
+#ifdef FINAL_WITH_TLS_PSK
+ mosquitto__free(bridge->tls_psk_identity);
+ mosquitto__free(bridge->tls_psk);
+#endif
+#endif
+ mosquitto__free(bridge);
+}
+#endif
+
static void print_usage(void)
{
printf("mosquitto version %s\n\n", VERSION);
@@ -529,6 +540,8 @@ int config__parse_args(struct mosquitto__config *config, int argc, char *argv[])
void config__copy(struct mosquitto__config *src, struct mosquitto__config *dest)
{
+ int i;
+
mosquitto__free(dest->security_options.acl_file);
dest->security_options.acl_file = src->security_options.acl_file;
@@ -590,6 +603,15 @@ void config__copy(struct mosquitto__config *src, struct mosquitto__config *dest)
#ifdef WITH_WEBSOCKETS
dest->websockets_log_level = src->websockets_log_level;
#endif
+
+#ifdef WITH_BRIDGE
+ for(i=0;ibridge_count;i++){
+ if(dest->bridges[i]) config__bridge_cleanup(dest->bridges[i]);
+ }
+ mosquitto__free(dest->bridges);
+ dest->bridges = src->bridges;
+ dest->bridge_count = src->bridge_count;
+#endif
}
@@ -684,24 +706,24 @@ int config__read(struct mosquitto__config *config, bool reload)
#ifdef WITH_BRIDGE
for(i=0; ibridge_count; i++){
- if(!config->bridges[i].name){
+ if(!config->bridges[i]->name){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: bridge name not defined.");
return MOSQ_ERR_INVAL;
}
- if(config->bridges[i].addresses == 0){
+ if(config->bridges[i]->addresses == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: no remote addresses defined.");
return MOSQ_ERR_INVAL;
}
- if(config->bridges[i].topic_count == 0){
+ if(config->bridges[i]->topic_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: no topics defined.");
return MOSQ_ERR_INVAL;
}
#ifdef FINAL_WITH_TLS_PSK
- if(config->bridges[i].tls_psk && !config->bridges[i].tls_psk_identity){
+ if(config->bridges[i]->tls_psk && !config->bridges[i]->tls_psk_identity){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: missing bridge_identity.");
return MOSQ_ERR_INVAL;
}
- if(config->bridges[i].tls_psk_identity && !config->bridges[i].tls_psk){
+ if(config->bridges[i]->tls_psk_identity && !config->bridges[i]->tls_psk){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration: missing bridge_psk.");
return MOSQ_ERR_INVAL;
}
@@ -717,7 +739,11 @@ int config__read(struct mosquitto__config *config, bool reload)
}else if(cr.log_type_set){
config->log_type = cr.log_type;
}
+#ifdef WITH_BRIDGE
+ return config__check_bridges(config);
+#else
return MOSQ_ERR_SUCCESS;
+#endif
}
@@ -765,7 +791,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_string(&token, "acl_file", &cur_security_options->acl_file, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "address") || !strcmp(token, "addresses")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge || cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -929,7 +954,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_attempt_unsubscribe")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -940,7 +964,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_cafile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -957,7 +980,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_alpn")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -979,7 +1001,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_capath")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -996,7 +1017,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_certfile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1013,7 +1033,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_identity")){
#if defined(WITH_BRIDGE) && defined(FINAL_WITH_TLS_PSK)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1028,7 +1047,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_insecure")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1042,7 +1060,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_require_ocsp")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* Listeners not valid for reloading. */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1066,7 +1083,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_outgoing_retain")){
#if defined(WITH_BRIDGE)
- if(reload) continue; // Listeners not valid for reloading.
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1077,7 +1093,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_keyfile")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1094,7 +1109,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_protocol_version")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1120,7 +1134,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "bridge_psk")){
#if defined(WITH_BRIDGE) && defined(FINAL_WITH_TLS_PSK)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1132,10 +1145,32 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_string(&token, "bridge_psk", &cur_bridge->tls_psk, saveptr)) return MOSQ_ERR_INVAL;
#else
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge and/or TLS-PSK support not available.");
+#endif
+ }else if(!strcmp(token, "bridge_reload_type")){
+#ifdef WITH_BRIDGE
+ if(!cur_bridge){
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
+ return MOSQ_ERR_INVAL;
+ }
+ token = strtok_r(NULL, " ", &saveptr);
+ if(token){
+ if(!strcmp(token, "lazy")){
+ cur_bridge->reload_type = brt_lazy;
+ }else if(!strcmp(token, "immediate")){
+ cur_bridge->reload_type = brt_immediate;
+ }else{
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge_reload_type value in configuration (%s).", token);
+ return MOSQ_ERR_INVAL;
+ }
+ }else{
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty bridge_reload_type value in configuration.");
+ return MOSQ_ERR_INVAL;
+ }
+#else
+ log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Bridge support not available.");
#endif
}else if(!strcmp(token, "bridge_tls_version")){
#if defined(WITH_BRIDGE) && defined(WITH_TLS)
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1192,7 +1227,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "clientid") || !strcmp(token, "remote_clientid")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1203,7 +1237,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "cleansession")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1214,7 +1247,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "local_cleansession")){
#ifdef WITH_BRIDGE
- if(reload) continue; // FIXME
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1232,24 +1264,30 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_string(&token, "clientid_prefixes", &config->clientid_prefixes, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "connection")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
token = strtok_r(NULL, " ", &saveptr);
if(token){
/* Check for existing bridge name. */
for(i=0; ibridge_count; i++){
- if(!strcmp(config->bridges[i].name, token)){
+ if(!strcmp(config->bridges[i]->name, token)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", token);
return MOSQ_ERR_INVAL;
}
}
config->bridge_count++;
- config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
+ config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge *));
if(!config->bridges){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
- cur_bridge = &(config->bridges[config->bridge_count-1]);
+ cur_bridge = mosquitto__malloc(sizeof(struct mosquitto__bridge));
+ if(!cur_bridge){
+ log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
+ return MOSQ_ERR_NOMEM;
+ }
+
+ config->bridges[config->bridge_count-1] = cur_bridge;
+
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
cur_bridge->name = mosquitto__strdup(token);
if(!cur_bridge->name){
@@ -1271,6 +1309,7 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
cur_bridge->primary_retry_sock = INVALID_SOCKET;
cur_bridge->outgoing_retain = true;
cur_bridge->clean_start_local = -1;
+ cur_bridge->reload_type = brt_lazy;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
@@ -1303,7 +1342,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "idle_timeout")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1348,7 +1386,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "keepalive_interval")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1464,7 +1501,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "local_clientid")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1475,7 +1511,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "local_password")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1486,7 +1521,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "local_username")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1703,7 +1737,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "notifications")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1714,7 +1747,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "notifications_local_only")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration");
return MOSQ_ERR_INVAL;
@@ -1725,7 +1757,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "notification_topic")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1736,7 +1767,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "password") || !strcmp(token, "remote_password")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1864,7 +1894,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "restart_timeout")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1897,7 +1926,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: The retry_interval option is no longer available.");
}else if(!strcmp(token, "round_robin")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1910,7 +1938,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_bool(&token, "set_tcp_nodelay", &config->set_tcp_nodelay, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "start_type")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -1961,7 +1988,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "threshold")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -2014,7 +2040,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
#endif
}else if(!strcmp(token, "topic")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -2099,7 +2124,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
}
}else if(!strcmp(token, "try_private")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -2133,7 +2157,6 @@ int config__read_file_core(struct mosquitto__config *config, bool reload, struct
if(conf__parse_bool(&token, "use_username_as_clientid", &cur_listener->use_username_as_clientid, saveptr)) return MOSQ_ERR_INVAL;
}else if(!strcmp(token, "username") || !strcmp(token, "remote_username")){
#ifdef WITH_BRIDGE
- if(reload) continue; /* FIXME */
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
@@ -2209,14 +2232,40 @@ int config__read_file(struct mosquitto__config *config, bool reload, const char
return rc;
}
-
static int config__check(struct mosquitto__config *config)
{
/* Checks that are easy to make after the config has been loaded. */
int i;
+ /* Default to auto_id_prefix = 'auto-' if none set. */
+ if(config->per_listener_settings){
+ for(i=0; ilistener_count; i++){
+ if(!config->listeners[i].security_options.auto_id_prefix){
+ config->listeners[i].security_options.auto_id_prefix = mosquitto__strdup("auto-");
+ if(!config->listeners[i].security_options.auto_id_prefix){
+ return MOSQ_ERR_NOMEM;
+ }
+ config->listeners[i].security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
+ }
+ }
+ }else{
+ if(!config->security_options.auto_id_prefix){
+ config->security_options.auto_id_prefix = mosquitto__strdup("auto-");
+ if(!config->security_options.auto_id_prefix){
+ return MOSQ_ERR_NOMEM;
+ }
+ config->security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
+ }
+ }
+
+ return MOSQ_ERR_SUCCESS;
+}
+
#ifdef WITH_BRIDGE
+static int config__check_bridges(struct mosquitto__config *config)
+{
+ int i;
int j;
struct mosquitto__bridge *bridge1, *bridge2;
char hostname[256];
@@ -2225,7 +2274,7 @@ static int config__check(struct mosquitto__config *config)
/* Check for bridge duplicate local_clientid, need to generate missing IDs
* first. */
for(i=0; ibridge_count; i++){
- bridge1 = &config->bridges[i];
+ bridge1 = config->bridges[i];
if(!bridge1->remote_clientid){
if(!gethostname(hostname, 256)){
@@ -2252,9 +2301,9 @@ static int config__check(struct mosquitto__config *config)
}
for(i=0; ibridge_count; i++){
- bridge1 = &config->bridges[i];
+ bridge1 = config->bridges[i];
for(j=i+1; jbridge_count; j++){
- bridge2 = &config->bridges[j];
+ bridge2 = config->bridges[j];
if(!strcmp(bridge1->local_clientid, bridge2->local_clientid)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Bridge local_clientid "
"'%s' is not unique. Try changing or setting the "
@@ -2264,31 +2313,10 @@ static int config__check(struct mosquitto__config *config)
}
}
}
-#endif
-
- /* Default to auto_id_prefix = 'auto-' if none set. */
- if(config->per_listener_settings){
- for(i=0; ilistener_count; i++){
- if(!config->listeners[i].security_options.auto_id_prefix){
- config->listeners[i].security_options.auto_id_prefix = mosquitto__strdup("auto-");
- if(!config->listeners[i].security_options.auto_id_prefix){
- return MOSQ_ERR_NOMEM;
- }
- config->listeners[i].security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
- }
- }
- }else{
- if(!config->security_options.auto_id_prefix){
- config->security_options.auto_id_prefix = mosquitto__strdup("auto-");
- if(!config->security_options.auto_id_prefix){
- return MOSQ_ERR_NOMEM;
- }
- config->security_options.auto_id_prefix_len = (uint16_t)strlen("auto-");
- }
- }
return MOSQ_ERR_SUCCESS;
}
+#endif
static int conf__parse_bool(char **token, const char *name, bool *value, char *saveptr)
diff --git a/src/loop.c b/src/loop.c
index 14663c87..820d4923 100644
--- a/src/loop.c
+++ b/src/loop.c
@@ -210,6 +210,9 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
mosquitto_security_apply();
log__close(db.config);
log__init(db.config);
+#ifdef WITH_BRIDGE
+ bridge__reload();
+#endif
flag_reload = false;
}
if(flag_tree_print){
diff --git a/src/mosquitto_broker_internal.h b/src/mosquitto_broker_internal.h
index 1f6df3cc..898aefee 100644
--- a/src/mosquitto_broker_internal.h
+++ b/src/mosquitto_broker_internal.h
@@ -296,7 +296,7 @@ struct mosquitto__config {
uint16_t websockets_headers_size;
#endif
#ifdef WITH_BRIDGE
- struct mosquitto__bridge *bridges;
+ struct mosquitto__bridge **bridges;
int bridge_count;
#endif
struct mosquitto__security_options security_options;
@@ -485,6 +485,11 @@ enum mosquitto_bridge_start_type{
bst_once = 3
};
+enum mosquitto_bridge_reload_type{
+ brt_lazy = 0,
+ brt_immediate = 1,
+};
+
struct mosquitto__bridge_topic{
char *topic;
char *local_prefix;
@@ -539,6 +544,7 @@ struct mosquitto__bridge{
bool attempt_unsubscribe;
bool initial_notification_done;
bool outgoing_retain;
+ enum mosquitto_bridge_reload_type reload_type;
#ifdef WITH_TLS
bool tls_insecure;
bool tls_ocsp_required;
@@ -581,6 +587,9 @@ int mosquitto_main_loop(struct mosquitto__listener_sock *listensock, int listens
* ============================================================ */
/* Initialise config struct to default values. */
void config__init(struct mosquitto__config *config);
+#ifdef WITH_BRIDGE
+void config__bridge_cleanup(struct mosquitto__bridge *bridge);
+#endif
/* Parse command line options into config. */
int config__parse_args(struct mosquitto__config *config, int argc, char *argv[]);
/* Read configuration data from config->config_file into config.
@@ -713,7 +722,7 @@ void log__internal(const char *fmt, ...) __attribute__((format(printf, 1, 2)));
* ============================================================ */
#ifdef WITH_BRIDGE
void bridge__start_all(void);
-int bridge__new(struct mosquitto__bridge *bridge);
+void bridge__reload(void);
void bridge__cleanup(struct mosquitto *context);
int bridge__connect(struct mosquitto *context);
int bridge__connect_step1(struct mosquitto *context);
diff --git a/test/broker/06-bridge-config-reload.py b/test/broker/06-bridge-config-reload.py
new file mode 100755
index 00000000..281579a8
--- /dev/null
+++ b/test/broker/06-bridge-config-reload.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python3
+
+# tests that bridge configuration is reloaded on signal
+
+from mosq_test_helper import *
+import signal
+
+
+def write_config(filename, port1, port2, subtopic, reload_immediate=False):
+ with open(filename, 'w') as f:
+ f.write("listener %d\n" % (port2))
+ f.write("allow_anonymous true\n")
+ f.write("\n")
+ f.write("connection bridge_sample\n")
+ f.write("address 127.0.0.1:%d\n" % (port1))
+ f.write("topic # in 0 local/topic/ remote/%s/\n" % (subtopic))
+ f.write("notifications false\n")
+ f.write("restart_timeout 1\n")
+ if reload_immediate:
+ f.write("bridge_reload_type immediate")
+
+
+def accept_new_connection(sock):
+ conn, _ = sock.accept()
+ conn.settimeout(20)
+
+ client_id = socket.gethostname()+".bridge_sample"
+ connect_packet = mosq_test.gen_connect(
+ client_id, keepalive=60, clean_session=False, proto_ver=0x84)
+ connack_packet = mosq_test.gen_connack()
+
+ mosq_test.expect_packet(conn, "connect", connect_packet)
+ conn.send(connack_packet)
+
+ return conn
+
+
+def accept_subscription(socket, topic, mid=1, qos=0):
+ subscribe_packet = mosq_test.gen_subscribe(mid, topic, qos)
+ suback_packet = mosq_test.gen_suback(mid, qos)
+
+ mosq_test.expect_packet(socket, "subscribe", subscribe_packet)
+ socket.send(suback_packet)
+
+
+def start_fake_broker(port):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.settimeout(3)
+ sock.bind(('', port))
+ sock.listen(5)
+ return sock
+
+
+def expect_no_incoming_connection(sock):
+ try:
+ accept_new_connection(sock) # will timeout if nothing comes in
+ raise mosq_test.TestError # hence, it shouldn't reach this
+ except socket.timeout:
+ pass
+
+
+def do_test():
+ rc = 1
+
+ port1, port2 = mosq_test.get_port(2)
+ conf_file = os.path.basename(__file__).replace('.py', '.conf')
+
+ try:
+ ssock = start_fake_broker(port1)
+
+ write_config(conf_file, port1, port2, "topic1", True)
+
+ broker = mosq_test.start_broker(
+ filename=os.path.basename(__file__), port=port2, use_conf=True)
+
+ bridge = accept_new_connection(ssock)
+ accept_subscription(bridge, "remote/topic1/#")
+
+ write_config(conf_file, port1, port2, "topic2", True)
+ broker.send_signal(signal.SIGHUP)
+
+ bridge = accept_new_connection(ssock) # immediate reload forces a reconnection
+ accept_subscription(bridge, "remote/topic2/#")
+
+ write_config(conf_file, port1, port2, "topic3", False)
+ broker.send_signal(signal.SIGHUP)
+
+ expect_no_incoming_connection(ssock) # as it was set to lazy reload
+
+ bridge.close()
+
+ bridge = accept_new_connection(ssock)
+ accept_subscription(bridge, "remote/topic3/#")
+
+ rc = 0
+
+ except mosq_test.TestError:
+ pass
+ finally:
+ try:
+ broker.terminate()
+ broker.wait()
+ _, stde = broker.communicate()
+ if rc:
+ print(stde.decode('utf-8'))
+ except NameError:
+ pass
+
+ try:
+ os.remove(conf_file)
+ except FileNotFoundError:
+ pass
+
+ try:
+ bridge.close()
+ except NameError:
+ pass
+
+ try:
+ ssock.close()
+ except NameError:
+ pass
+
+ return rc
+
+
+exit_code = do_test()
+exit(exit_code)
diff --git a/test/broker/Makefile b/test/broker/Makefile
index c139e1be..89fd91f2 100644
--- a/test/broker/Makefile
+++ b/test/broker/Makefile
@@ -129,6 +129,7 @@ test : test-compile 01 02 03 04 05 06 07 08 09 10 11 12 13 14
./06-bridge-outgoing-retain.py
./06-bridge-per-listener-settings.py
./06-bridge-reconnect-local-out.py
+ ./06-bridge-config-reload.py
07 :
./07-will-delay-reconnect.py
diff --git a/test/broker/test.py b/test/broker/test.py
index 28ecc92d..c0b60c17 100755
--- a/test/broker/test.py
+++ b/test/broker/test.py
@@ -109,6 +109,7 @@ tests = [
(2, './06-bridge-outgoing-retain.py'),
(3, './06-bridge-per-listener-settings.py'),
(2, './06-bridge-reconnect-local-out.py'),
+ (2, './06-bridge-config-reload.py'),
(1, './07-will-delay-reconnect.py'),
(1, './07-will-delay-recover.py'),